From 5ac7df17f9f6e393bea314d01ffd8b3288a16167 Mon Sep 17 00:00:00 2001 From: Anatoly Yakovenko Date: Mon, 2 Apr 2018 19:32:58 -0700 Subject: [PATCH] Implement window service Batch out of order blobs until we have a contigious window. --- Cargo.toml | 1 + src/accountant_skel.rs | 78 ++++--- src/accountant_stub.rs | 3 +- src/bin/testnode.rs | 2 +- src/lib.rs | 4 +- src/packet.rs | 381 ++++++++++++++++++++++++++++++++++ src/streamer.rs | 455 +++++++++++++++-------------------------- 7 files changed, 592 insertions(+), 332 deletions(-) create mode 100644 src/packet.rs diff --git a/Cargo.toml b/Cargo.toml index fb0e76a7b..1e914dc35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,3 +55,4 @@ bincode = "1.0.0" chrono = { version = "0.4.0", features = ["serde"] } log = "^0.4.1" matches = "^0.1.6" +byteorder = "^1.2.1" diff --git a/src/accountant_skel.rs b/src/accountant_skel.rs index c5e283ce7..dae08cd71 100644 --- a/src/accountant_skel.rs +++ b/src/accountant_skel.rs @@ -13,16 +13,17 @@ use recorder::Signal; use result::Result; use serde_json; use signature::PublicKey; -use std::default::Default; use std::io::Write; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, SendError}; -use std::sync::{Arc, Mutex}; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use streamer; +use packet; +use std::sync::{Arc, Mutex}; use transaction::Transaction; +use std::collections::VecDeque; pub struct AccountantSkel { acc: Accountant, @@ -105,55 +106,51 @@ impl AccountantSkel { fn process( obj: &Arc>>, - r_reader: &streamer::Receiver, - s_responder: &streamer::Responder, - packet_recycler: &streamer::PacketRecycler, - response_recycler: &streamer::ResponseRecycler, + packet_receiver: &streamer::PacketReceiver, + blob_sender: &streamer::BlobSender, + packet_recycler: &packet::PacketRecycler, + blob_recycler: &packet::BlobRecycler, ) -> Result<()> { let timer = Duration::new(1, 0); - let msgs = r_reader.recv_timeout(timer)?; + let msgs = packet_receiver.recv_timeout(timer)?; let msgs_ = msgs.clone(); - let rsps = streamer::allocate(response_recycler); - let rsps_ = rsps.clone(); + let mut rsps = VecDeque::new(); { let mut reqs = vec![]; for packet in &msgs.read().unwrap().packets { - let rsp_addr = packet.meta.get_addr(); + let rsp_addr = packet.meta.addr(); let sz = packet.meta.size; let req = deserialize(&packet.data[0..sz])?; reqs.push((req, rsp_addr)); } let reqs = filter_valid_requests(reqs); - - let mut num = 0; - let mut ursps = rsps.write().unwrap(); for (req, rsp_addr) in reqs { if let Some(resp) = obj.lock().unwrap().log_verified_request(req) { - if ursps.responses.len() <= num { - ursps - .responses - .resize((num + 1) * 2, streamer::Response::default()); + let blob = blob_recycler.allocate(); + { + let mut b = blob.write().unwrap(); + let v = serialize(&resp)?; + let len = v.len(); + b.data[..len].copy_from_slice(&v); + b.meta.size = len; + b.meta.set_addr(&rsp_addr); } - let rsp = &mut ursps.responses[num]; - let v = serialize(&resp)?; - let len = v.len(); - rsp.data[..len].copy_from_slice(&v); - rsp.meta.size = len; - rsp.meta.set_addr(&rsp_addr); - num += 1; + rsps.push_back(blob); } } - ursps.responses.resize(num, streamer::Response::default()); } - s_responder.send(rsps_)?; - streamer::recycle(packet_recycler, msgs_); + if !rsps.is_empty() { + //don't wake up the other side if there is nothing + blob_sender.send(rsps)?; + } + packet_recycler.recycle(msgs_); Ok(()) } /// Create a UDP microservice that forwards messages the given AccountantSkel. /// Set `exit` to shutdown its threads. pub fn serve( - obj: Arc>>, + obj: &Arc>>, addr: &str, exit: Arc, ) -> Result>> { @@ -163,28 +160,27 @@ impl AccountantSkel { local.set_port(0); let write = UdpSocket::bind(local)?; - let packet_recycler = Arc::new(Mutex::new(Vec::new())); - let response_recycler = Arc::new(Mutex::new(Vec::new())); - let (s_reader, r_reader) = channel(); - let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?; - - let (s_responder, r_responder) = channel(); + let packet_recycler = packet::PacketRecycler::default(); + let blob_recycler = packet::BlobRecycler::default(); + let (packet_sender, packet_receiver) = channel(); + let t_receiver = + streamer::receiver(read, exit.clone(), packet_recycler.clone(), packet_sender)?; + let (blob_sender, blob_receiver) = channel(); let t_responder = - streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder); - + streamer::responder(write, exit.clone(), blob_recycler.clone(), blob_receiver); + let skel = obj.clone(); let t_server = spawn(move || loop { let e = AccountantSkel::process( - &obj, - &r_reader, - &s_responder, + &skel, + &packet_receiver, + &blob_sender, &packet_recycler, - &response_recycler, + &blob_recycler, ); if e.is_err() && exit.load(Ordering::Relaxed) { break; } }); - Ok(vec![t_receiver, t_responder, t_server]) } } diff --git a/src/accountant_stub.rs b/src/accountant_stub.rs index a83862ef2..69d2f9598 100644 --- a/src/accountant_stub.rs +++ b/src/accountant_stub.rs @@ -113,10 +113,11 @@ mod tests { sink(), historian, ))); - let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap(); + let _threads = AccountantSkel::serve(&acc, addr, exit.clone()).unwrap(); sleep(Duration::from_millis(300)); let socket = UdpSocket::bind(send_addr).unwrap(); + socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); let acc = AccountantStub::new(addr, socket); let last_id = acc.get_last_id().unwrap(); diff --git a/src/bin/testnode.rs b/src/bin/testnode.rs index 6ef325c42..40047a9ff 100644 --- a/src/bin/testnode.rs +++ b/src/bin/testnode.rs @@ -51,7 +51,7 @@ fn main() { historian, ))); eprintln!("Listening on {}", addr); - let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap(); + let threads = AccountantSkel::serve(&skel, addr, exit.clone()).unwrap(); for t in threads { t.join().expect("join"); } diff --git a/src/lib.rs b/src/lib.rs index 3826a1bba..a44a14be2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,16 +5,18 @@ pub mod accountant_stub; pub mod entry; pub mod event; pub mod hash; -pub mod historian; pub mod ledger; pub mod mint; pub mod plan; pub mod recorder; +pub mod historian; +pub mod packet; pub mod result; pub mod signature; pub mod streamer; pub mod transaction; extern crate bincode; +extern crate byteorder; extern crate chrono; extern crate generic_array; #[macro_use] diff --git a/src/packet.rs b/src/packet.rs new file mode 100644 index 000000000..e6c91592d --- /dev/null +++ b/src/packet.rs @@ -0,0 +1,381 @@ +use std::sync::{Arc, Mutex, RwLock}; +use std::fmt; +use std::io; +use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; +use std::collections::VecDeque; +use result::{Error, Result}; + +pub type SharedPackets = Arc>; +pub type SharedBlob = Arc>; +pub type PacketRecycler = Recycler; +pub type BlobRecycler = Recycler; + +const NUM_PACKETS: usize = 1024 * 8; +const BLOB_SIZE: usize = 64 * 1024; +pub const PACKET_SIZE: usize = 256; +pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_SIZE) / BLOB_SIZE; + +#[derive(Clone, Default)] +pub struct Meta { + pub size: usize, + pub addr: [u16; 8], + pub port: u16, + pub v6: bool, +} + +#[derive(Clone)] +pub struct Packet { + pub data: [u8; PACKET_SIZE], + pub meta: Meta, +} + +impl fmt::Debug for Packet { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Packet {{ size: {:?}, addr: {:?} }}", + self.meta.size, + self.meta.addr() + ) + } +} + +impl Default for Packet { + fn default() -> Packet { + Packet { + data: [0u8; PACKET_SIZE], + meta: Meta::default(), + } + } +} + +impl Meta { + pub fn addr(&self) -> SocketAddr { + if !self.v6 { + let addr = [ + self.addr[0] as u8, + self.addr[1] as u8, + self.addr[2] as u8, + self.addr[3] as u8, + ]; + let ipv4: Ipv4Addr = From::<[u8; 4]>::from(addr); + SocketAddr::new(IpAddr::V4(ipv4), self.port) + } else { + let ipv6: Ipv6Addr = From::<[u16; 8]>::from(self.addr); + SocketAddr::new(IpAddr::V6(ipv6), self.port) + } + } + + pub fn set_addr(&mut self, a: &SocketAddr) { + match *a { + SocketAddr::V4(v4) => { + let ip = v4.ip().octets(); + self.addr[0] = u16::from(ip[0]); + self.addr[1] = u16::from(ip[1]); + self.addr[2] = u16::from(ip[2]); + self.addr[3] = u16::from(ip[3]); + self.port = a.port(); + } + SocketAddr::V6(v6) => { + self.addr = v6.ip().segments(); + self.port = a.port(); + self.v6 = true; + } + } + } +} + +#[derive(Debug)] +pub struct Packets { + pub packets: Vec, +} + +//auto derive doesn't support large arrays +impl Default for Packets { + fn default() -> Packets { + Packets { + packets: vec![Packet::default(); NUM_PACKETS], + } + } +} + +#[derive(Clone)] +pub struct Blob { + pub data: [u8; BLOB_SIZE], + pub meta: Meta, +} + +impl fmt::Debug for Blob { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Blob {{ size: {:?}, addr: {:?} }}", + self.meta.size, + self.meta.addr() + ) + } +} + +//auto derive doesn't support large arrays +impl Default for Blob { + fn default() -> Blob { + Blob { + data: [0u8; BLOB_SIZE], + meta: Meta::default(), + } + } +} + +pub struct Recycler { + gc: Arc>>>>, +} + +impl Default for Recycler { + fn default() -> Recycler { + Recycler { + gc: Arc::new(Mutex::new(vec![])), + } + } +} + +impl Clone for Recycler { + fn clone(&self) -> Recycler { + Recycler { + gc: self.gc.clone(), + } + } +} + +impl Recycler { + pub fn allocate(&self) -> Arc> { + let mut gc = self.gc.lock().expect("recycler lock"); + gc.pop() + .unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))) + } + pub fn recycle(&self, msgs: Arc>) { + let mut gc = self.gc.lock().expect("recycler lock"); + gc.push(msgs); + } +} + +impl Packets { + fn run_read_from(&mut self, socket: &UdpSocket) -> Result { + self.packets.resize(NUM_PACKETS, Packet::default()); + let mut i = 0; + //DOCUMENTED SIDE-EFFECT + //Performance out of the IO without poll + // * block on the socket until its readable + // * set the socket to non blocking + // * read until it fails + // * set it back to blocking before returning + socket.set_nonblocking(false)?; + for p in &mut self.packets { + p.meta.size = 0; + match socket.recv_from(&mut p.data) { + Err(_) if i > 0 => { + trace!("got {:?} messages", i); + break; + } + Err(e) => { + info!("recv_from err {:?}", e); + return Err(Error::IO(e)); + } + Ok((nrecv, from)) => { + p.meta.size = nrecv; + p.meta.set_addr(&from); + if i == 0 { + socket.set_nonblocking(true)?; + } + } + } + i += 1; + } + Ok(i) + } + pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<()> { + let sz = self.run_read_from(socket)?; + self.packets.resize(sz, Packet::default()); + Ok(()) + } + pub fn send_to(&self, socket: &UdpSocket) -> Result<()> { + for p in &self.packets { + let a = p.meta.addr(); + socket.send_to(&p.data[..p.meta.size], &a)?; + } + Ok(()) + } +} + +impl Blob { + pub fn get_index(&self) -> Result { + let mut rdr = io::Cursor::new(&self.data[0..8]); + let r = rdr.read_u64::()?; + Ok(r) + } + pub fn set_index(&mut self, ix: u64) -> Result<()> { + let mut wtr = vec![]; + wtr.write_u64::(ix)?; + self.data[..8].clone_from_slice(&wtr); + Ok(()) + } + pub fn data(&self) -> &[u8] { + &self.data[8..] + } + pub fn data_mut(&mut self) -> &mut [u8] { + &mut self.data[8..] + } + pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result> { + let mut v = VecDeque::new(); + //DOCUMENTED SIDE-EFFECT + //Performance out of the IO without poll + // * block on the socket until its readable + // * set the socket to non blocking + // * read until it fails + // * set it back to blocking before returning + socket.set_nonblocking(false)?; + for i in 0..NUM_BLOBS { + let r = re.allocate(); + { + let mut p = r.write().unwrap(); + match socket.recv_from(&mut p.data) { + Err(_) if i > 0 => { + trace!("got {:?} messages", i); + break; + } + Err(e) => { + info!("recv_from err {:?}", e); + return Err(Error::IO(e)); + } + Ok((nrecv, from)) => { + p.meta.size = nrecv; + p.meta.set_addr(&from); + if i == 0 { + socket.set_nonblocking(true)?; + } + } + } + } + v.push_back(r); + } + Ok(v) + } + pub fn send_to( + re: &BlobRecycler, + socket: &UdpSocket, + v: &mut VecDeque, + ) -> Result<()> { + while let Some(r) = v.pop_front() { + { + let p = r.read().unwrap(); + let a = p.meta.addr(); + socket.send_to(&p.data[..p.meta.size], &a)?; + } + re.recycle(r); + } + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::net::UdpSocket; + use std::io::Write; + use std::io; + use std::collections::VecDeque; + use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets}; + #[test] + pub fn packet_recycler_test() { + let r = PacketRecycler::default(); + let p = r.allocate(); + r.recycle(p); + } + #[test] + pub fn blob_recycler_test() { + let r = BlobRecycler::default(); + let p = r.allocate(); + r.recycle(p); + } + #[test] + pub fn packet_send_recv() { + let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = reader.local_addr().unwrap(); + let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let saddr = sender.local_addr().unwrap(); + let r = PacketRecycler::default(); + let p = r.allocate(); + p.write().unwrap().packets.resize(10, Packet::default()); + for m in p.write().unwrap().packets.iter_mut() { + m.meta.set_addr(&addr); + m.meta.size = 256; + } + p.read().unwrap().send_to(&sender).unwrap(); + p.write().unwrap().recv_from(&reader).unwrap(); + for m in p.write().unwrap().packets.iter_mut() { + assert_eq!(m.meta.size, 256); + assert_eq!(m.meta.addr(), saddr); + } + + r.recycle(p); + } + + #[test] + pub fn blob_send_recv() { + trace!("start"); + let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = reader.local_addr().unwrap(); + let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let r = BlobRecycler::default(); + let p = r.allocate(); + p.write().unwrap().meta.set_addr(&addr); + p.write().unwrap().meta.size = 1024; + let mut v = VecDeque::new(); + v.push_back(p); + assert_eq!(v.len(), 1); + Blob::send_to(&r, &sender, &mut v).unwrap(); + trace!("send_to"); + assert_eq!(v.len(), 0); + let mut rv = Blob::recv_from(&r, &reader).unwrap(); + trace!("recv_from"); + assert_eq!(rv.len(), 1); + let rp = rv.pop_front().unwrap(); + assert_eq!(rp.write().unwrap().meta.size, 1024); + r.recycle(rp); + } + + #[cfg(all(feature = "ipv6", test))] + #[test] + pub fn blob_ipv6_send_recv() { + let reader = UdpSocket::bind("[::1]:0").expect("bind"); + let addr = reader.local_addr().unwrap(); + let sender = UdpSocket::bind("[::1]:0").expect("bind"); + let r = BlobRecycler::default(); + let p = r.allocate(); + p.write().unwrap().meta.set_addr(&addr); + p.write().unwrap().meta.size = 1024; + let mut v = VecDeque::default(); + v.push_back(p); + Blob::send_to(&r, &sender, &mut v).unwrap(); + let mut rv = Blob::recv_from(&r, &reader).unwrap(); + let rp = rv.pop_front().unwrap(); + assert_eq!(rp.write().unwrap().meta.size, 1024); + r.recycle(rp); + } + + #[test] + pub fn debug_trait() { + write!(io::sink(), "{:?}", Packet::default()).unwrap(); + write!(io::sink(), "{:?}", Packets::default()).unwrap(); + write!(io::sink(), "{:?}", Blob::default()).unwrap(); + } + #[test] + pub fn blob_test() { + let mut b = Blob::default(); + b.set_index(::max_value()).unwrap(); + assert_eq!(b.get_index().unwrap(), ::max_value()); + b.data_mut()[0] = 1; + assert_eq!(b.data()[0], 1); + assert_eq!(b.get_index().unwrap(), ::max_value()); + } + +} diff --git a/src/streamer.rs b/src/streamer.rs index 9096f3fe6..87581f32d 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,240 +1,36 @@ -//! The 'streamer` module allows for efficient batch processing of UDP packets. - -use result::{Error, Result}; -use std::fmt; -use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}; +use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread::{spawn, JoinHandle}; use std::time::Duration; +use std::net::UdpSocket; +use std::thread::{spawn, JoinHandle}; +use std::collections::VecDeque; +use result::Result; +use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, NUM_BLOBS}; -const BLOCK_SIZE: usize = 1024 * 8; -pub const PACKET_SIZE: usize = 256; -pub const RESP_SIZE: usize = 64 * 1024; -pub const NUM_RESP: usize = (BLOCK_SIZE * PACKET_SIZE) / RESP_SIZE; - -#[derive(Clone, Default)] -pub struct Meta { - pub size: usize, - pub addr: [u16; 8], - pub port: u16, - pub v6: bool, -} - -#[derive(Clone)] -pub struct Packet { - pub data: [u8; PACKET_SIZE], - pub meta: Meta, -} - -impl fmt::Debug for Packet { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "Packet {{ size: {:?}, addr: {:?} }}", - self.meta.size, - self.meta.get_addr() - ) - } -} - -impl Default for Packet { - fn default() -> Packet { - Packet { - data: [0u8; PACKET_SIZE], - meta: Meta::default(), - } - } -} - -impl Meta { - pub fn get_addr(&self) -> SocketAddr { - if !self.v6 { - let ipv4 = Ipv4Addr::new( - self.addr[0] as u8, - self.addr[1] as u8, - self.addr[2] as u8, - self.addr[3] as u8, - ); - SocketAddr::new(IpAddr::V4(ipv4), self.port) - } else { - let ipv6 = Ipv6Addr::new( - self.addr[0], - self.addr[1], - self.addr[2], - self.addr[3], - self.addr[4], - self.addr[5], - self.addr[6], - self.addr[7], - ); - SocketAddr::new(IpAddr::V6(ipv6), self.port) - } - } - - pub fn set_addr(&mut self, a: &SocketAddr) { - match *a { - SocketAddr::V4(v4) => { - let ip = v4.ip().octets(); - self.addr[0] = u16::from(ip[0]); - self.addr[1] = u16::from(ip[1]); - self.addr[2] = u16::from(ip[2]); - self.addr[3] = u16::from(ip[3]); - self.port = a.port(); - } - SocketAddr::V6(v6) => { - self.addr = v6.ip().segments(); - self.port = a.port(); - self.v6 = true; - } - } - } -} - -#[derive(Debug)] -pub struct Packets { - pub packets: Vec, -} - -impl Default for Packets { - fn default() -> Packets { - Packets { - packets: vec![Packet::default(); BLOCK_SIZE], - } - } -} - -#[derive(Clone)] -pub struct Response { - pub data: [u8; RESP_SIZE], - pub meta: Meta, -} - -impl fmt::Debug for Response { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "Response {{ size: {:?}, addr: {:?} }}", - self.meta.size, - self.meta.get_addr() - ) - } -} - -impl Default for Response { - fn default() -> Response { - Response { - data: [0u8; RESP_SIZE], - meta: Meta::default(), - } - } -} - -#[derive(Debug)] -pub struct Responses { - pub responses: Vec, -} - -impl Default for Responses { - fn default() -> Responses { - Responses { - responses: vec![Response::default(); NUM_RESP], - } - } -} - -pub type SharedPackets = Arc>; -pub type PacketRecycler = Arc>>; -pub type Receiver = mpsc::Receiver; -pub type Sender = mpsc::Sender; -pub type SharedResponses = Arc>; -pub type ResponseRecycler = Arc>>; -pub type Responder = mpsc::Sender; -pub type ResponseReceiver = mpsc::Receiver; - -impl Packets { - fn run_read_from(&mut self, socket: &UdpSocket) -> Result { - self.packets.resize(BLOCK_SIZE, Packet::default()); - let mut i = 0; - socket.set_nonblocking(false)?; - for p in &mut self.packets { - p.meta.size = 0; - match socket.recv_from(&mut p.data) { - Err(_) if i > 0 => { - trace!("got {:?} messages", i); - break; - } - Err(e) => { - info!("recv_from err {:?}", e); - return Err(Error::IO(e)); - } - Ok((nrecv, from)) => { - p.meta.size = nrecv; - p.meta.set_addr(&from); - if i == 0 { - socket.set_nonblocking(true)?; - } - } - } - i += 1; - } - Ok(i) - } - fn read_from(&mut self, socket: &UdpSocket) -> Result<()> { - let sz = self.run_read_from(socket)?; - self.packets.resize(sz, Packet::default()); - Ok(()) - } -} - -impl Responses { - fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> { - for p in &self.responses { - let a = p.meta.get_addr(); - socket.send_to(&p.data[..p.meta.size], &a)?; - //TODO(anatoly): wtf do we do about errors? - *num += 1; - } - Ok(()) - } -} - -pub fn allocate(recycler: &Arc>>>>) -> Arc> -where - T: Default, -{ - let mut gc = recycler.lock().expect("lock"); - gc.pop() - .unwrap_or_else(|| Arc::new(RwLock::new(Default::default()))) -} - -pub fn recycle(recycler: &Arc>>>>, msgs: Arc>) -where - T: Default, -{ - let mut gc = recycler.lock().expect("lock"); - gc.push(msgs); -} +pub type PacketReceiver = mpsc::Receiver; +pub type PacketSender = mpsc::Sender; +pub type BlobSender = mpsc::Sender>; +pub type BlobReceiver = mpsc::Receiver>; fn recv_loop( sock: &UdpSocket, exit: &Arc, - recycler: &PacketRecycler, - channel: &Sender, + re: &PacketRecycler, + channel: &PacketSender, ) -> Result<()> { loop { - let msgs = allocate(recycler); + let msgs = re.allocate(); let msgs_ = msgs.clone(); loop { - match msgs.write().unwrap().read_from(sock) { + match msgs.write().unwrap().recv_from(sock) { Ok(()) => { channel.send(msgs_)?; break; } Err(_) => { if exit.load(Ordering::Relaxed) { - recycle(recycler, msgs_); + re.recycle(msgs_); return Ok(()); } } @@ -247,7 +43,7 @@ pub fn receiver( sock: UdpSocket, exit: Arc, recycler: PacketRecycler, - channel: Sender, + channel: PacketSender, ) -> Result> { let timer = Duration::new(1, 0); sock.set_read_timeout(Some(timer))?; @@ -257,43 +53,105 @@ pub fn receiver( })) } -fn recv_send(sock: &UdpSocket, recycler: &ResponseRecycler, r: &ResponseReceiver) -> Result<()> { +fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> { let timer = Duration::new(1, 0); - let msgs = r.recv_timeout(timer)?; - let msgs_ = msgs.clone(); - let mut num = 0; - msgs.read().unwrap().send_to(sock, &mut num)?; - recycle(recycler, msgs_); + let mut msgs = r.recv_timeout(timer)?; + Blob::send_to(recycler, sock, &mut msgs)?; Ok(()) } pub fn responder( sock: UdpSocket, exit: Arc, - recycler: ResponseRecycler, - r: ResponseReceiver, + recycler: BlobRecycler, + r: BlobReceiver, ) -> JoinHandle<()> { spawn(move || loop { - if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) { + if recv_send(&sock, &recycler, &r).is_err() || exit.load(Ordering::Relaxed) { break; } }) } +//TODO, we would need to stick block authentication before we create the +//window. +fn recv_window( + window: &mut Vec>, + recycler: &BlobRecycler, + consumed: &mut usize, + socket: &UdpSocket, + s: &BlobSender, +) -> Result<()> { + let mut dq = Blob::recv_from(recycler, socket)?; + while let Some(b) = dq.pop_front() { + let b_ = b.clone(); + let mut p = b.write().unwrap(); + let pix = p.get_index()? as usize; + let w = pix % NUM_BLOBS; + //TODO, after the block are authenticated + //if we get different blocks at the same index + //that is a network failure/attack + { + if window[w].is_none() { + window[w] = Some(b_); + } else { + debug!("duplicate blob at index {:}", w); + } + //send a contiguous set of blocks + let mut dq = VecDeque::new(); + loop { + let k = *consumed % NUM_BLOBS; + if window[k].is_none() { + break; + } + dq.push_back(window[k].clone().unwrap()); + window[k] = None; + *consumed += 1; + } + if !dq.is_empty() { + s.send(dq)?; + } + } + } + Ok(()) +} + +pub fn window( + sock: UdpSocket, + exit: Arc, + r: BlobRecycler, + s: BlobSender, +) -> JoinHandle<()> { + spawn(move || { + let mut window = vec![None; NUM_BLOBS]; + let mut consumed = 0; + let timer = Duration::new(1, 0); + sock.set_read_timeout(Some(timer)).unwrap(); + loop { + if recv_window(&mut window, &r, &mut consumed, &sock, &s).is_err() + || exit.load(Ordering::Relaxed) + { + break; + } + } + }) +} + #[cfg(all(feature = "unstable", test))] mod bench { extern crate test; use self::test::Bencher; use result::Result; use std::net::{SocketAddr, UdpSocket}; - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; use std::thread::sleep; use std::thread::{spawn, JoinHandle}; use std::time::Duration; use std::time::SystemTime; - use streamer::{allocate, receiver, recycle, Packet, PacketRecycler, Receiver, PACKET_SIZE}; + use std::sync::mpsc::channel; + use std::sync::atomic::{AtomicBool, Ordering}; + use packet::{Packet, PacketRecycler, PACKET_SIZE}; + use streamer::{receiver, PacketReceiver}; fn producer( addr: &SocketAddr, @@ -301,7 +159,7 @@ mod bench { exit: Arc, ) -> JoinHandle<()> { let send = UdpSocket::bind("0.0.0.0:0").unwrap(); - let msgs = allocate(&recycler); + let msgs = recycler.allocate(); let msgs_ = msgs.clone(); msgs.write().unwrap().packets.resize(10, Packet::default()); for w in msgs.write().unwrap().packets.iter_mut() { @@ -314,7 +172,7 @@ mod bench { } let mut num = 0; for p in msgs_.read().unwrap().packets.iter() { - let a = p.meta.get_addr(); + let a = p.meta.addr(); send.send_to(&p.data[..p.meta.size], &a).unwrap(); num += 1; } @@ -326,7 +184,7 @@ mod bench { recycler: PacketRecycler, exit: Arc, rvs: Arc>, - r: Receiver, + r: PacketReceiver, ) -> JoinHandle<()> { spawn(move || loop { if exit.load(Ordering::Relaxed) { @@ -337,7 +195,7 @@ mod bench { Ok(msgs) => { let msgs_ = msgs.clone(); *rvs.lock().unwrap() += msgs.read().unwrap().packets.len(); - recycle(&recycler, msgs_); + recycler.recycle(msgs_); } _ => (), } @@ -347,16 +205,16 @@ mod bench { let read = UdpSocket::bind("127.0.0.1:0")?; let addr = read.local_addr()?; let exit = Arc::new(AtomicBool::new(false)); - let recycler = Arc::new(Mutex::new(Vec::new())); + let pack_recycler = PacketRecycler::default(); let (s_reader, r_reader) = channel(); - let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?; - let t_producer1 = producer(&addr, recycler.clone(), exit.clone()); - let t_producer2 = producer(&addr, recycler.clone(), exit.clone()); - let t_producer3 = producer(&addr, recycler.clone(), exit.clone()); + let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader)?; + let t_producer1 = producer(&addr, pack_recycler.clone(), exit.clone()); + let t_producer2 = producer(&addr, pack_recycler.clone(), exit.clone()); + let t_producer3 = producer(&addr, pack_recycler.clone(), exit.clone()); let rvs = Arc::new(Mutex::new(0)); - let t_sink = sink(recycler.clone(), exit.clone(), rvs.clone(), r_reader); + let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader); let start = SystemTime::now(); let start_val = *rvs.lock().unwrap(); @@ -383,17 +241,18 @@ mod bench { #[cfg(test)] mod test { - use std::io; - use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; - use std::sync::{Arc, Mutex}; + use std::io::Write; + use std::io; + use std::collections::VecDeque; use std::time::Duration; - use streamer::{allocate, receiver, responder, Packet, Packets, Receiver, Response, Responses, - PACKET_SIZE}; + use std::sync::Arc; + use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_SIZE}; + use streamer::{receiver, responder, window, BlobReceiver, PacketReceiver}; - fn get_msgs(r: Receiver, num: &mut usize) { + fn get_msgs(r: PacketReceiver, num: &mut usize) { for _t in 0..5 { let timer = Duration::new(1, 0); match r.recv_timeout(timer) { @@ -405,40 +264,11 @@ mod test { } } } - #[cfg(ipv6)] - #[test] - pub fn streamer_send_test_ipv6() { - let read = UdpSocket::bind("[::1]:0").expect("bind"); - let addr = read.local_addr().unwrap(); - let send = UdpSocket::bind("[::1]:0").expect("bind"); - let exit = Arc::new(Mutex::new(false)); - let recycler = Arc::new(Mutex::new(Vec::new())); - let (s_reader, r_reader) = channel(); - let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap(); - let (s_responder, r_responder) = channel(); - let t_responder = responder(send, exit.clone(), recycler.clone(), r_responder); - let msgs = allocate(&recycler); - msgs.write().unwrap().packets.resize(10, Packet::default()); - for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() { - w.data[0] = i as u8; - w.size = PACKET_SIZE; - w.set_addr(&addr); - assert_eq!(w.get_addr(), addr); - } - s_responder.send(msgs).expect("send"); - let mut num = 0; - get_msgs(r_reader, &mut num); - assert_eq!(num, 10); - exit.store(true, Ordering::Relaxed); - t_receiver.join().expect("join"); - t_responder.join().expect("join"); - } #[test] pub fn streamer_debug() { write!(io::sink(), "{:?}", Packet::default()).unwrap(); write!(io::sink(), "{:?}", Packets::default()).unwrap(); - write!(io::sink(), "{:?}", Response::default()).unwrap(); - write!(io::sink(), "{:?}", Responses::default()).unwrap(); + write!(io::sink(), "{:?}", Blob::default()).unwrap(); } #[test] pub fn streamer_send_test() { @@ -446,22 +276,21 @@ mod test { let addr = read.local_addr().unwrap(); let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let exit = Arc::new(AtomicBool::new(false)); - let packet_recycler = Arc::new(Mutex::new(Vec::new())); - let resp_recycler = Arc::new(Mutex::new(Vec::new())); + let pack_recycler = PacketRecycler::default(); + let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(read, exit.clone(), packet_recycler.clone(), s_reader).unwrap(); + let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader).unwrap(); let (s_responder, r_responder) = channel(); let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); - let msgs = allocate(&resp_recycler); - msgs.write() - .unwrap() - .responses - .resize(10, Response::default()); - for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() { + let mut msgs = VecDeque::new(); + for i in 0..10 { + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); w.data[0] = i as u8; w.meta.size = PACKET_SIZE; w.meta.set_addr(&addr); - assert_eq!(w.meta.get_addr(), addr); + msgs.push_back(b_); } s_responder.send(msgs).expect("send"); let mut num = 0; @@ -471,4 +300,54 @@ mod test { t_receiver.join().expect("join"); t_responder.join().expect("join"); } + + fn get_blobs(r: BlobReceiver, num: &mut usize) { + for _t in 0..5 { + let timer = Duration::new(1, 0); + match r.recv_timeout(timer) { + Ok(m) => { + for (i, v) in m.iter().enumerate() { + assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i); + } + *num += m.len(); + } + e => println!("error {:?}", e), + } + if *num == 10 { + break; + } + } + } + + #[test] + pub fn window_send_test() { + let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let addr = read.local_addr().unwrap(); + let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let exit = Arc::new(AtomicBool::new(false)); + let resp_recycler = BlobRecycler::default(); + let (s_reader, r_reader) = channel(); + let t_receiver = window(read, exit.clone(), resp_recycler.clone(), s_reader); + let (s_responder, r_responder) = channel(); + let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder); + let mut msgs = VecDeque::new(); + for v in 0..10 { + let i = 9 - v; + let b = resp_recycler.allocate(); + let b_ = b.clone(); + let mut w = b.write().unwrap(); + w.set_index(i).unwrap(); + assert_eq!(i, w.get_index().unwrap()); + w.meta.size = PACKET_SIZE; + w.meta.set_addr(&addr); + msgs.push_back(b_); + } + s_responder.send(msgs).expect("send"); + let mut num = 0; + get_blobs(r_reader, &mut num); + assert_eq!(num, 10); + exit.store(true, Ordering::Relaxed); + t_receiver.join().expect("join"); + t_responder.join().expect("join"); + } }