From f52f02a4343f7380243a190a17856d3e15495a43 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Sat, 24 Mar 2018 18:01:40 -0700 Subject: [PATCH] services --- src/accountant_skel.rs | 6 +++--- src/lib.rs | 1 + src/result.rs | 1 + src/streamer.rs | 41 +++++++++++++++++++++-------------------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index e52b887df..f948c434b 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -110,15 +110,15 @@ impl AccountantSkel { let mut num = 0; let mut ursps = rsps.write().unwrap(); for packet in &msgs.read().unwrap().packets { - let sz = packet.size; + let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; if let Some(resp) = self.process_request(req) { let rsp = &mut ursps.packets[num]; let v = serialize(&resp)?; let len = v.len(); rsp.data[0..len].copy_from_slice(&v); - rsp.size = len; - rsp.set_addr(&packet.get_addr()); + rsp.meta.size = len; + rsp.meta.set_addr(&packet.meta.get_addr()); num += 1; } } diff --git a/src/lib.rs b/src/lib.rs index b3af5bcd7..db8374c15 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod accountant; pub mod accountant_skel; pub mod accountant_stub; pub mod result; +pub mod services; extern crate bincode; extern crate chrono; extern crate generic_array; diff --git a/src/result.rs b/src/result.rs index 86f2118c8..96f2f7ff4 100644 --- a/src/result.rs +++ b/src/result.rs @@ -13,6 +13,7 @@ pub enum Error { RecvTimeoutError(std::sync::mpsc::RecvTimeoutError), Serialize(std::boxed::Box), SendError, + Services, } pub type Result = std::result::Result; diff --git a/src/streamer.rs b/src/streamer.rs index 54f631b7b..a7cea82c4 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -8,23 +8,27 @@ use std::thread::{spawn, JoinHandle}; use result::{Error, Result}; const BLOCK_SIZE: usize = 1024 * 8; -pub const PACKET_SIZE: usize = 1024; - -#[derive(Clone)] -pub struct Packet { - pub data: [u8; PACKET_SIZE], +pub const PACKET_SIZE: usize = 256; +#[derive(Clone, Default)] +pub struct Meta { pub size: usize, pub addr: [u16; 8], pub port: u16, pub v6: bool, } + +#[derive(Clone)] +pub struct Packet { + pub data: [u8; PACKET_SIZE], + pub meta: Meta, +} impl fmt::Debug for Packet { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "Packet {{ size: {:?}, addr: {:?} }}", - self.size, - self.get_addr() + self.meta.size, + self.meta.get_addr() ) } } @@ -32,14 +36,11 @@ impl Default for Packet { fn default() -> Packet { Packet { data: [0u8; PACKET_SIZE], - size: 0, - addr: [0u16; 8], - port: 0, - v6: false, + meta: Meta::default(), } } } -impl Packet { +impl Meta { pub fn get_addr(&self) -> SocketAddr { if !self.v6 { let ipv4 = Ipv4Addr::new( @@ -103,7 +104,7 @@ impl PacketData { self.packets.resize(BLOCK_SIZE, Packet::default()); let mut i = 0; for p in &mut self.packets { - p.size = 0; + p.meta.size = 0; match socket.recv_from(&mut p.data) { Err(_) if i > 0 => { trace!("got {:?} messages", i); @@ -114,8 +115,8 @@ impl PacketData { return Err(Error::IO(e)); } Ok((nrecv, from)) => { - p.size = nrecv; - p.set_addr(&from); + p.meta.size = nrecv; + p.meta.set_addr(&from); if i == 0 { socket.set_nonblocking(true)?; } @@ -132,8 +133,8 @@ impl PacketData { } fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { for p in &self.packets { - let a = p.get_addr(); - socket.send_to(&p.data[0..p.size], &a)?; + let a = p.meta.get_addr(); + socket.send_to(&p.data[..p.meta.size], &a)?; //TODO(anatoly): wtf do we do about errors? *num += 1; } @@ -376,9 +377,9 @@ mod test { msgs.write().unwrap().packets.resize(10, Packet::default()); for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { w.data[0] = i as u8; - w.size = PACKET_SIZE; - w.set_addr(&addr); - assert_eq!(w.get_addr(), addr); + w.meta.size = PACKET_SIZE; + w.meta.set_addr(&addr); + assert_eq!(w.meta.get_addr(), addr); } s_sender.send(msgs).expect("send"); let mut num = 0;