This commit is contained in:
Anatoly Yakovenko 2018-03-24 18:01:40 -07:00
parent 4d7d4d673e
commit f52f02a434
4 changed files with 26 additions and 23 deletions

View File

@ -110,15 +110,15 @@ impl AccountantSkel {
let mut num = 0; let mut num = 0;
let mut ursps = rsps.write().unwrap(); let mut ursps = rsps.write().unwrap();
for packet in &msgs.read().unwrap().packets { for packet in &msgs.read().unwrap().packets {
let sz = packet.size; let sz = packet.meta.size;
let req = deserialize(&packet.data[0..sz])?; let req = deserialize(&packet.data[0..sz])?;
if let Some(resp) = self.process_request(req) { if let Some(resp) = self.process_request(req) {
let rsp = &mut ursps.packets[num]; let rsp = &mut ursps.packets[num];
let v = serialize(&resp)?; let v = serialize(&resp)?;
let len = v.len(); let len = v.len();
rsp.data[0..len].copy_from_slice(&v); rsp.data[0..len].copy_from_slice(&v);
rsp.size = len; rsp.meta.size = len;
rsp.set_addr(&packet.get_addr()); rsp.meta.set_addr(&packet.meta.get_addr());
num += 1; num += 1;
} }
} }

View File

@ -14,6 +14,7 @@ pub mod accountant;
pub mod accountant_skel; pub mod accountant_skel;
pub mod accountant_stub; pub mod accountant_stub;
pub mod result; pub mod result;
pub mod services;
extern crate bincode; extern crate bincode;
extern crate chrono; extern crate chrono;
extern crate generic_array; extern crate generic_array;

View File

@ -13,6 +13,7 @@ pub enum Error {
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
Serialize(std::boxed::Box<bincode::ErrorKind>), Serialize(std::boxed::Box<bincode::ErrorKind>),
SendError, SendError,
Services,
} }
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;

View File

@ -8,23 +8,27 @@ use std::thread::{spawn, JoinHandle};
use result::{Error, Result}; use result::{Error, Result};
const BLOCK_SIZE: usize = 1024 * 8; const BLOCK_SIZE: usize = 1024 * 8;
pub const PACKET_SIZE: usize = 1024; pub const PACKET_SIZE: usize = 256;
#[derive(Clone, Default)]
#[derive(Clone)] pub struct Meta {
pub struct Packet {
pub data: [u8; PACKET_SIZE],
pub size: usize, pub size: usize,
pub addr: [u16; 8], pub addr: [u16; 8],
pub port: u16, pub port: u16,
pub v6: bool, pub v6: bool,
} }
#[derive(Clone)]
pub struct Packet {
pub data: [u8; PACKET_SIZE],
pub meta: Meta,
}
impl fmt::Debug for Packet { impl fmt::Debug for Packet {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!( write!(
f, f,
"Packet {{ size: {:?}, addr: {:?} }}", "Packet {{ size: {:?}, addr: {:?} }}",
self.size, self.meta.size,
self.get_addr() self.meta.get_addr()
) )
} }
} }
@ -32,14 +36,11 @@ impl Default for Packet {
fn default() -> Packet { fn default() -> Packet {
Packet { Packet {
data: [0u8; PACKET_SIZE], data: [0u8; PACKET_SIZE],
size: 0, meta: Meta::default(),
addr: [0u16; 8],
port: 0,
v6: false,
} }
} }
} }
impl Packet { impl Meta {
pub fn get_addr(&self) -> SocketAddr { pub fn get_addr(&self) -> SocketAddr {
if !self.v6 { if !self.v6 {
let ipv4 = Ipv4Addr::new( let ipv4 = Ipv4Addr::new(
@ -103,7 +104,7 @@ impl PacketData {
self.packets.resize(BLOCK_SIZE, Packet::default()); self.packets.resize(BLOCK_SIZE, Packet::default());
let mut i = 0; let mut i = 0;
for p in &mut self.packets { for p in &mut self.packets {
p.size = 0; p.meta.size = 0;
match socket.recv_from(&mut p.data) { match socket.recv_from(&mut p.data) {
Err(_) if i > 0 => { Err(_) if i > 0 => {
trace!("got {:?} messages", i); trace!("got {:?} messages", i);
@ -114,8 +115,8 @@ impl PacketData {
return Err(Error::IO(e)); return Err(Error::IO(e));
} }
Ok((nrecv, from)) => { Ok((nrecv, from)) => {
p.size = nrecv; p.meta.size = nrecv;
p.set_addr(&from); p.meta.set_addr(&from);
if i == 0 { if i == 0 {
socket.set_nonblocking(true)?; socket.set_nonblocking(true)?;
} }
@ -132,8 +133,8 @@ impl PacketData {
} }
fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> {
for p in &self.packets { for p in &self.packets {
let a = p.get_addr(); let a = p.meta.get_addr();
socket.send_to(&p.data[0..p.size], &a)?; socket.send_to(&p.data[..p.meta.size], &a)?;
//TODO(anatoly): wtf do we do about errors? //TODO(anatoly): wtf do we do about errors?
*num += 1; *num += 1;
} }
@ -376,9 +377,9 @@ mod test {
msgs.write().unwrap().packets.resize(10, Packet::default()); msgs.write().unwrap().packets.resize(10, Packet::default());
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
w.data[0] = i as u8; w.data[0] = i as u8;
w.size = PACKET_SIZE; w.meta.size = PACKET_SIZE;
w.set_addr(&addr); w.meta.set_addr(&addr);
assert_eq!(w.get_addr(), addr); assert_eq!(w.meta.get_addr(), addr);
} }
s_sender.send(msgs).expect("send"); s_sender.send(msgs).expect("send");
let mut num = 0; let mut num = 0;