commit
123d7c6a37
|
@ -55,3 +55,4 @@ bincode = "1.0.0"
|
||||||
chrono = { version = "0.4.0", features = ["serde"] }
|
chrono = { version = "0.4.0", features = ["serde"] }
|
||||||
log = "^0.4.1"
|
log = "^0.4.1"
|
||||||
matches = "^0.1.6"
|
matches = "^0.1.6"
|
||||||
|
byteorder = "^1.2.1"
|
||||||
|
|
|
@ -13,16 +13,17 @@ use recorder::Signal;
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use signature::PublicKey;
|
use signature::PublicKey;
|
||||||
use std::default::Default;
|
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::{channel, SendError};
|
use std::sync::mpsc::{channel, SendError};
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::thread::{spawn, JoinHandle};
|
use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer;
|
use streamer;
|
||||||
|
use packet;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
use transaction::Transaction;
|
use transaction::Transaction;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
pub struct AccountantSkel<W: Write + Send + 'static> {
|
pub struct AccountantSkel<W: Write + Send + 'static> {
|
||||||
acc: Accountant,
|
acc: Accountant,
|
||||||
|
@ -105,55 +106,51 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
|
|
||||||
fn process(
|
fn process(
|
||||||
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||||
r_reader: &streamer::Receiver,
|
packet_receiver: &streamer::PacketReceiver,
|
||||||
s_responder: &streamer::Responder,
|
blob_sender: &streamer::BlobSender,
|
||||||
packet_recycler: &streamer::PacketRecycler,
|
packet_recycler: &packet::PacketRecycler,
|
||||||
response_recycler: &streamer::ResponseRecycler,
|
blob_recycler: &packet::BlobRecycler,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let msgs = r_reader.recv_timeout(timer)?;
|
let msgs = packet_receiver.recv_timeout(timer)?;
|
||||||
let msgs_ = msgs.clone();
|
let msgs_ = msgs.clone();
|
||||||
let rsps = streamer::allocate(response_recycler);
|
let mut rsps = VecDeque::new();
|
||||||
let rsps_ = rsps.clone();
|
|
||||||
{
|
{
|
||||||
let mut reqs = vec![];
|
let mut reqs = vec![];
|
||||||
for packet in &msgs.read().unwrap().packets {
|
for packet in &msgs.read().unwrap().packets {
|
||||||
let rsp_addr = packet.meta.get_addr();
|
let rsp_addr = packet.meta.addr();
|
||||||
let sz = packet.meta.size;
|
let sz = packet.meta.size;
|
||||||
let req = deserialize(&packet.data[0..sz])?;
|
let req = deserialize(&packet.data[0..sz])?;
|
||||||
reqs.push((req, rsp_addr));
|
reqs.push((req, rsp_addr));
|
||||||
}
|
}
|
||||||
let reqs = filter_valid_requests(reqs);
|
let reqs = filter_valid_requests(reqs);
|
||||||
|
|
||||||
let mut num = 0;
|
|
||||||
let mut ursps = rsps.write().unwrap();
|
|
||||||
for (req, rsp_addr) in reqs {
|
for (req, rsp_addr) in reqs {
|
||||||
if let Some(resp) = obj.lock().unwrap().log_verified_request(req) {
|
if let Some(resp) = obj.lock().unwrap().log_verified_request(req) {
|
||||||
if ursps.responses.len() <= num {
|
let blob = blob_recycler.allocate();
|
||||||
ursps
|
{
|
||||||
.responses
|
let mut b = blob.write().unwrap();
|
||||||
.resize((num + 1) * 2, streamer::Response::default());
|
|
||||||
}
|
|
||||||
let rsp = &mut ursps.responses[num];
|
|
||||||
let v = serialize(&resp)?;
|
let v = serialize(&resp)?;
|
||||||
let len = v.len();
|
let len = v.len();
|
||||||
rsp.data[..len].copy_from_slice(&v);
|
b.data[..len].copy_from_slice(&v);
|
||||||
rsp.meta.size = len;
|
b.meta.size = len;
|
||||||
rsp.meta.set_addr(&rsp_addr);
|
b.meta.set_addr(&rsp_addr);
|
||||||
num += 1;
|
}
|
||||||
|
rsps.push_back(blob);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ursps.responses.resize(num, streamer::Response::default());
|
|
||||||
}
|
}
|
||||||
s_responder.send(rsps_)?;
|
if !rsps.is_empty() {
|
||||||
streamer::recycle(packet_recycler, msgs_);
|
//don't wake up the other side if there is nothing
|
||||||
|
blob_sender.send(rsps)?;
|
||||||
|
}
|
||||||
|
packet_recycler.recycle(msgs_);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a UDP microservice that forwards messages the given AccountantSkel.
|
/// Create a UDP microservice that forwards messages the given AccountantSkel.
|
||||||
/// Set `exit` to shutdown its threads.
|
/// Set `exit` to shutdown its threads.
|
||||||
pub fn serve(
|
pub fn serve(
|
||||||
obj: Arc<Mutex<AccountantSkel<W>>>,
|
obj: &Arc<Mutex<AccountantSkel<W>>>,
|
||||||
addr: &str,
|
addr: &str,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> Result<Vec<JoinHandle<()>>> {
|
) -> Result<Vec<JoinHandle<()>>> {
|
||||||
|
@ -163,28 +160,27 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
|
||||||
local.set_port(0);
|
local.set_port(0);
|
||||||
let write = UdpSocket::bind(local)?;
|
let write = UdpSocket::bind(local)?;
|
||||||
|
|
||||||
let packet_recycler = Arc::new(Mutex::new(Vec::new()));
|
let packet_recycler = packet::PacketRecycler::default();
|
||||||
let response_recycler = Arc::new(Mutex::new(Vec::new()));
|
let blob_recycler = packet::BlobRecycler::default();
|
||||||
let (s_reader, r_reader) = channel();
|
let (packet_sender, packet_receiver) = channel();
|
||||||
let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?;
|
let t_receiver =
|
||||||
|
streamer::receiver(read, exit.clone(), packet_recycler.clone(), packet_sender)?;
|
||||||
let (s_responder, r_responder) = channel();
|
let (blob_sender, blob_receiver) = channel();
|
||||||
let t_responder =
|
let t_responder =
|
||||||
streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder);
|
streamer::responder(write, exit.clone(), blob_recycler.clone(), blob_receiver);
|
||||||
|
let skel = obj.clone();
|
||||||
let t_server = spawn(move || loop {
|
let t_server = spawn(move || loop {
|
||||||
let e = AccountantSkel::process(
|
let e = AccountantSkel::process(
|
||||||
&obj,
|
&skel,
|
||||||
&r_reader,
|
&packet_receiver,
|
||||||
&s_responder,
|
&blob_sender,
|
||||||
&packet_recycler,
|
&packet_recycler,
|
||||||
&response_recycler,
|
&blob_recycler,
|
||||||
);
|
);
|
||||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(vec![t_receiver, t_responder, t_server])
|
Ok(vec![t_receiver, t_responder, t_server])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -113,10 +113,11 @@ mod tests {
|
||||||
sink(),
|
sink(),
|
||||||
historian,
|
historian,
|
||||||
)));
|
)));
|
||||||
let _threads = AccountantSkel::serve(acc, addr, exit.clone()).unwrap();
|
let _threads = AccountantSkel::serve(&acc, addr, exit.clone()).unwrap();
|
||||||
sleep(Duration::from_millis(300));
|
sleep(Duration::from_millis(300));
|
||||||
|
|
||||||
let socket = UdpSocket::bind(send_addr).unwrap();
|
let socket = UdpSocket::bind(send_addr).unwrap();
|
||||||
|
socket.set_read_timeout(Some(Duration::new(5, 0))).unwrap();
|
||||||
|
|
||||||
let acc = AccountantStub::new(addr, socket);
|
let acc = AccountantStub::new(addr, socket);
|
||||||
let last_id = acc.get_last_id().unwrap();
|
let last_id = acc.get_last_id().unwrap();
|
||||||
|
|
|
@ -51,7 +51,7 @@ fn main() {
|
||||||
historian,
|
historian,
|
||||||
)));
|
)));
|
||||||
eprintln!("Listening on {}", addr);
|
eprintln!("Listening on {}", addr);
|
||||||
let threads = AccountantSkel::serve(skel, addr, exit.clone()).unwrap();
|
let threads = AccountantSkel::serve(&skel, addr, exit.clone()).unwrap();
|
||||||
for t in threads {
|
for t in threads {
|
||||||
t.join().expect("join");
|
t.join().expect("join");
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,16 +5,18 @@ pub mod accountant_stub;
|
||||||
pub mod entry;
|
pub mod entry;
|
||||||
pub mod event;
|
pub mod event;
|
||||||
pub mod hash;
|
pub mod hash;
|
||||||
pub mod historian;
|
|
||||||
pub mod ledger;
|
pub mod ledger;
|
||||||
pub mod mint;
|
pub mod mint;
|
||||||
pub mod plan;
|
pub mod plan;
|
||||||
pub mod recorder;
|
pub mod recorder;
|
||||||
|
pub mod historian;
|
||||||
|
pub mod packet;
|
||||||
pub mod result;
|
pub mod result;
|
||||||
pub mod signature;
|
pub mod signature;
|
||||||
pub mod streamer;
|
pub mod streamer;
|
||||||
pub mod transaction;
|
pub mod transaction;
|
||||||
extern crate bincode;
|
extern crate bincode;
|
||||||
|
extern crate byteorder;
|
||||||
extern crate chrono;
|
extern crate chrono;
|
||||||
extern crate generic_array;
|
extern crate generic_array;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
|
|
|
@ -0,0 +1,381 @@
|
||||||
|
use std::sync::{Arc, Mutex, RwLock};
|
||||||
|
use std::fmt;
|
||||||
|
use std::io;
|
||||||
|
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||||
|
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use result::{Error, Result};
|
||||||
|
|
||||||
|
pub type SharedPackets = Arc<RwLock<Packets>>;
|
||||||
|
pub type SharedBlob = Arc<RwLock<Blob>>;
|
||||||
|
pub type PacketRecycler = Recycler<Packets>;
|
||||||
|
pub type BlobRecycler = Recycler<Blob>;
|
||||||
|
|
||||||
|
const NUM_PACKETS: usize = 1024 * 8;
|
||||||
|
const BLOB_SIZE: usize = 64 * 1024;
|
||||||
|
pub const PACKET_SIZE: usize = 256;
|
||||||
|
pub const NUM_BLOBS: usize = (NUM_PACKETS * PACKET_SIZE) / BLOB_SIZE;
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
pub struct Meta {
|
||||||
|
pub size: usize,
|
||||||
|
pub addr: [u16; 8],
|
||||||
|
pub port: u16,
|
||||||
|
pub v6: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Packet {
|
||||||
|
pub data: [u8; PACKET_SIZE],
|
||||||
|
pub meta: Meta,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Packet {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"Packet {{ size: {:?}, addr: {:?} }}",
|
||||||
|
self.meta.size,
|
||||||
|
self.meta.addr()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Packet {
|
||||||
|
fn default() -> Packet {
|
||||||
|
Packet {
|
||||||
|
data: [0u8; PACKET_SIZE],
|
||||||
|
meta: Meta::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Meta {
|
||||||
|
pub fn addr(&self) -> SocketAddr {
|
||||||
|
if !self.v6 {
|
||||||
|
let addr = [
|
||||||
|
self.addr[0] as u8,
|
||||||
|
self.addr[1] as u8,
|
||||||
|
self.addr[2] as u8,
|
||||||
|
self.addr[3] as u8,
|
||||||
|
];
|
||||||
|
let ipv4: Ipv4Addr = From::<[u8; 4]>::from(addr);
|
||||||
|
SocketAddr::new(IpAddr::V4(ipv4), self.port)
|
||||||
|
} else {
|
||||||
|
let ipv6: Ipv6Addr = From::<[u16; 8]>::from(self.addr);
|
||||||
|
SocketAddr::new(IpAddr::V6(ipv6), self.port)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_addr(&mut self, a: &SocketAddr) {
|
||||||
|
match *a {
|
||||||
|
SocketAddr::V4(v4) => {
|
||||||
|
let ip = v4.ip().octets();
|
||||||
|
self.addr[0] = u16::from(ip[0]);
|
||||||
|
self.addr[1] = u16::from(ip[1]);
|
||||||
|
self.addr[2] = u16::from(ip[2]);
|
||||||
|
self.addr[3] = u16::from(ip[3]);
|
||||||
|
self.port = a.port();
|
||||||
|
}
|
||||||
|
SocketAddr::V6(v6) => {
|
||||||
|
self.addr = v6.ip().segments();
|
||||||
|
self.port = a.port();
|
||||||
|
self.v6 = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Packets {
|
||||||
|
pub packets: Vec<Packet>,
|
||||||
|
}
|
||||||
|
|
||||||
|
//auto derive doesn't support large arrays
|
||||||
|
impl Default for Packets {
|
||||||
|
fn default() -> Packets {
|
||||||
|
Packets {
|
||||||
|
packets: vec![Packet::default(); NUM_PACKETS],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Blob {
|
||||||
|
pub data: [u8; BLOB_SIZE],
|
||||||
|
pub meta: Meta,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Blob {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"Blob {{ size: {:?}, addr: {:?} }}",
|
||||||
|
self.meta.size,
|
||||||
|
self.meta.addr()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//auto derive doesn't support large arrays
|
||||||
|
impl Default for Blob {
|
||||||
|
fn default() -> Blob {
|
||||||
|
Blob {
|
||||||
|
data: [0u8; BLOB_SIZE],
|
||||||
|
meta: Meta::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Recycler<T> {
|
||||||
|
gc: Arc<Mutex<Vec<Arc<RwLock<T>>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Default> Default for Recycler<T> {
|
||||||
|
fn default() -> Recycler<T> {
|
||||||
|
Recycler {
|
||||||
|
gc: Arc::new(Mutex::new(vec![])),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Default> Clone for Recycler<T> {
|
||||||
|
fn clone(&self) -> Recycler<T> {
|
||||||
|
Recycler {
|
||||||
|
gc: self.gc.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Default> Recycler<T> {
|
||||||
|
pub fn allocate(&self) -> Arc<RwLock<T>> {
|
||||||
|
let mut gc = self.gc.lock().expect("recycler lock");
|
||||||
|
gc.pop()
|
||||||
|
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())))
|
||||||
|
}
|
||||||
|
pub fn recycle(&self, msgs: Arc<RwLock<T>>) {
|
||||||
|
let mut gc = self.gc.lock().expect("recycler lock");
|
||||||
|
gc.push(msgs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Packets {
|
||||||
|
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
|
||||||
|
self.packets.resize(NUM_PACKETS, Packet::default());
|
||||||
|
let mut i = 0;
|
||||||
|
//DOCUMENTED SIDE-EFFECT
|
||||||
|
//Performance out of the IO without poll
|
||||||
|
// * block on the socket until its readable
|
||||||
|
// * set the socket to non blocking
|
||||||
|
// * read until it fails
|
||||||
|
// * set it back to blocking before returning
|
||||||
|
socket.set_nonblocking(false)?;
|
||||||
|
for p in &mut self.packets {
|
||||||
|
p.meta.size = 0;
|
||||||
|
match socket.recv_from(&mut p.data) {
|
||||||
|
Err(_) if i > 0 => {
|
||||||
|
trace!("got {:?} messages", i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
info!("recv_from err {:?}", e);
|
||||||
|
return Err(Error::IO(e));
|
||||||
|
}
|
||||||
|
Ok((nrecv, from)) => {
|
||||||
|
p.meta.size = nrecv;
|
||||||
|
p.meta.set_addr(&from);
|
||||||
|
if i == 0 {
|
||||||
|
socket.set_nonblocking(true)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
|
Ok(i)
|
||||||
|
}
|
||||||
|
pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<()> {
|
||||||
|
let sz = self.run_read_from(socket)?;
|
||||||
|
self.packets.resize(sz, Packet::default());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub fn send_to(&self, socket: &UdpSocket) -> Result<()> {
|
||||||
|
for p in &self.packets {
|
||||||
|
let a = p.meta.addr();
|
||||||
|
socket.send_to(&p.data[..p.meta.size], &a)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Blob {
|
||||||
|
pub fn get_index(&self) -> Result<u64> {
|
||||||
|
let mut rdr = io::Cursor::new(&self.data[0..8]);
|
||||||
|
let r = rdr.read_u64::<LittleEndian>()?;
|
||||||
|
Ok(r)
|
||||||
|
}
|
||||||
|
pub fn set_index(&mut self, ix: u64) -> Result<()> {
|
||||||
|
let mut wtr = vec![];
|
||||||
|
wtr.write_u64::<LittleEndian>(ix)?;
|
||||||
|
self.data[..8].clone_from_slice(&wtr);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
pub fn data(&self) -> &[u8] {
|
||||||
|
&self.data[8..]
|
||||||
|
}
|
||||||
|
pub fn data_mut(&mut self) -> &mut [u8] {
|
||||||
|
&mut self.data[8..]
|
||||||
|
}
|
||||||
|
pub fn recv_from(re: &BlobRecycler, socket: &UdpSocket) -> Result<VecDeque<SharedBlob>> {
|
||||||
|
let mut v = VecDeque::new();
|
||||||
|
//DOCUMENTED SIDE-EFFECT
|
||||||
|
//Performance out of the IO without poll
|
||||||
|
// * block on the socket until its readable
|
||||||
|
// * set the socket to non blocking
|
||||||
|
// * read until it fails
|
||||||
|
// * set it back to blocking before returning
|
||||||
|
socket.set_nonblocking(false)?;
|
||||||
|
for i in 0..NUM_BLOBS {
|
||||||
|
let r = re.allocate();
|
||||||
|
{
|
||||||
|
let mut p = r.write().unwrap();
|
||||||
|
match socket.recv_from(&mut p.data) {
|
||||||
|
Err(_) if i > 0 => {
|
||||||
|
trace!("got {:?} messages", i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
info!("recv_from err {:?}", e);
|
||||||
|
return Err(Error::IO(e));
|
||||||
|
}
|
||||||
|
Ok((nrecv, from)) => {
|
||||||
|
p.meta.size = nrecv;
|
||||||
|
p.meta.set_addr(&from);
|
||||||
|
if i == 0 {
|
||||||
|
socket.set_nonblocking(true)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
v.push_back(r);
|
||||||
|
}
|
||||||
|
Ok(v)
|
||||||
|
}
|
||||||
|
pub fn send_to(
|
||||||
|
re: &BlobRecycler,
|
||||||
|
socket: &UdpSocket,
|
||||||
|
v: &mut VecDeque<SharedBlob>,
|
||||||
|
) -> Result<()> {
|
||||||
|
while let Some(r) = v.pop_front() {
|
||||||
|
{
|
||||||
|
let p = r.read().unwrap();
|
||||||
|
let a = p.meta.addr();
|
||||||
|
socket.send_to(&p.data[..p.meta.size], &a)?;
|
||||||
|
}
|
||||||
|
re.recycle(r);
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use std::net::UdpSocket;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::io;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets};
|
||||||
|
#[test]
|
||||||
|
pub fn packet_recycler_test() {
|
||||||
|
let r = PacketRecycler::default();
|
||||||
|
let p = r.allocate();
|
||||||
|
r.recycle(p);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
pub fn blob_recycler_test() {
|
||||||
|
let r = BlobRecycler::default();
|
||||||
|
let p = r.allocate();
|
||||||
|
r.recycle(p);
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
pub fn packet_send_recv() {
|
||||||
|
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let addr = reader.local_addr().unwrap();
|
||||||
|
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let saddr = sender.local_addr().unwrap();
|
||||||
|
let r = PacketRecycler::default();
|
||||||
|
let p = r.allocate();
|
||||||
|
p.write().unwrap().packets.resize(10, Packet::default());
|
||||||
|
for m in p.write().unwrap().packets.iter_mut() {
|
||||||
|
m.meta.set_addr(&addr);
|
||||||
|
m.meta.size = 256;
|
||||||
|
}
|
||||||
|
p.read().unwrap().send_to(&sender).unwrap();
|
||||||
|
p.write().unwrap().recv_from(&reader).unwrap();
|
||||||
|
for m in p.write().unwrap().packets.iter_mut() {
|
||||||
|
assert_eq!(m.meta.size, 256);
|
||||||
|
assert_eq!(m.meta.addr(), saddr);
|
||||||
|
}
|
||||||
|
|
||||||
|
r.recycle(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn blob_send_recv() {
|
||||||
|
trace!("start");
|
||||||
|
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let addr = reader.local_addr().unwrap();
|
||||||
|
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let r = BlobRecycler::default();
|
||||||
|
let p = r.allocate();
|
||||||
|
p.write().unwrap().meta.set_addr(&addr);
|
||||||
|
p.write().unwrap().meta.size = 1024;
|
||||||
|
let mut v = VecDeque::new();
|
||||||
|
v.push_back(p);
|
||||||
|
assert_eq!(v.len(), 1);
|
||||||
|
Blob::send_to(&r, &sender, &mut v).unwrap();
|
||||||
|
trace!("send_to");
|
||||||
|
assert_eq!(v.len(), 0);
|
||||||
|
let mut rv = Blob::recv_from(&r, &reader).unwrap();
|
||||||
|
trace!("recv_from");
|
||||||
|
assert_eq!(rv.len(), 1);
|
||||||
|
let rp = rv.pop_front().unwrap();
|
||||||
|
assert_eq!(rp.write().unwrap().meta.size, 1024);
|
||||||
|
r.recycle(rp);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(feature = "ipv6", test))]
|
||||||
|
#[test]
|
||||||
|
pub fn blob_ipv6_send_recv() {
|
||||||
|
let reader = UdpSocket::bind("[::1]:0").expect("bind");
|
||||||
|
let addr = reader.local_addr().unwrap();
|
||||||
|
let sender = UdpSocket::bind("[::1]:0").expect("bind");
|
||||||
|
let r = BlobRecycler::default();
|
||||||
|
let p = r.allocate();
|
||||||
|
p.write().unwrap().meta.set_addr(&addr);
|
||||||
|
p.write().unwrap().meta.size = 1024;
|
||||||
|
let mut v = VecDeque::default();
|
||||||
|
v.push_back(p);
|
||||||
|
Blob::send_to(&r, &sender, &mut v).unwrap();
|
||||||
|
let mut rv = Blob::recv_from(&r, &reader).unwrap();
|
||||||
|
let rp = rv.pop_front().unwrap();
|
||||||
|
assert_eq!(rp.write().unwrap().meta.size, 1024);
|
||||||
|
r.recycle(rp);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn debug_trait() {
|
||||||
|
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
||||||
|
write!(io::sink(), "{:?}", Packets::default()).unwrap();
|
||||||
|
write!(io::sink(), "{:?}", Blob::default()).unwrap();
|
||||||
|
}
|
||||||
|
#[test]
|
||||||
|
pub fn blob_test() {
|
||||||
|
let mut b = Blob::default();
|
||||||
|
b.set_index(<u64>::max_value()).unwrap();
|
||||||
|
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
|
||||||
|
b.data_mut()[0] = 1;
|
||||||
|
assert_eq!(b.data()[0], 1);
|
||||||
|
assert_eq!(b.get_index().unwrap(), <u64>::max_value());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
455
src/streamer.rs
455
src/streamer.rs
|
@ -1,240 +1,36 @@
|
||||||
//! The 'streamer` module allows for efficient batch processing of UDP packets.
|
use std::sync::Arc;
|
||||||
|
|
||||||
use result::{Error, Result};
|
|
||||||
use std::fmt;
|
|
||||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket};
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::sync::{Arc, Mutex, RwLock};
|
|
||||||
use std::thread::{spawn, JoinHandle};
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use std::net::UdpSocket;
|
||||||
|
use std::thread::{spawn, JoinHandle};
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use result::Result;
|
||||||
|
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlob, SharedPackets, NUM_BLOBS};
|
||||||
|
|
||||||
const BLOCK_SIZE: usize = 1024 * 8;
|
pub type PacketReceiver = mpsc::Receiver<SharedPackets>;
|
||||||
pub const PACKET_SIZE: usize = 256;
|
pub type PacketSender = mpsc::Sender<SharedPackets>;
|
||||||
pub const RESP_SIZE: usize = 64 * 1024;
|
pub type BlobSender = mpsc::Sender<VecDeque<SharedBlob>>;
|
||||||
pub const NUM_RESP: usize = (BLOCK_SIZE * PACKET_SIZE) / RESP_SIZE;
|
pub type BlobReceiver = mpsc::Receiver<VecDeque<SharedBlob>>;
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
|
||||||
pub struct Meta {
|
|
||||||
pub size: usize,
|
|
||||||
pub addr: [u16; 8],
|
|
||||||
pub port: u16,
|
|
||||||
pub v6: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Packet {
|
|
||||||
pub data: [u8; PACKET_SIZE],
|
|
||||||
pub meta: Meta,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for Packet {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"Packet {{ size: {:?}, addr: {:?} }}",
|
|
||||||
self.meta.size,
|
|
||||||
self.meta.get_addr()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Packet {
|
|
||||||
fn default() -> Packet {
|
|
||||||
Packet {
|
|
||||||
data: [0u8; PACKET_SIZE],
|
|
||||||
meta: Meta::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Meta {
|
|
||||||
pub fn get_addr(&self) -> SocketAddr {
|
|
||||||
if !self.v6 {
|
|
||||||
let ipv4 = Ipv4Addr::new(
|
|
||||||
self.addr[0] as u8,
|
|
||||||
self.addr[1] as u8,
|
|
||||||
self.addr[2] as u8,
|
|
||||||
self.addr[3] as u8,
|
|
||||||
);
|
|
||||||
SocketAddr::new(IpAddr::V4(ipv4), self.port)
|
|
||||||
} else {
|
|
||||||
let ipv6 = Ipv6Addr::new(
|
|
||||||
self.addr[0],
|
|
||||||
self.addr[1],
|
|
||||||
self.addr[2],
|
|
||||||
self.addr[3],
|
|
||||||
self.addr[4],
|
|
||||||
self.addr[5],
|
|
||||||
self.addr[6],
|
|
||||||
self.addr[7],
|
|
||||||
);
|
|
||||||
SocketAddr::new(IpAddr::V6(ipv6), self.port)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set_addr(&mut self, a: &SocketAddr) {
|
|
||||||
match *a {
|
|
||||||
SocketAddr::V4(v4) => {
|
|
||||||
let ip = v4.ip().octets();
|
|
||||||
self.addr[0] = u16::from(ip[0]);
|
|
||||||
self.addr[1] = u16::from(ip[1]);
|
|
||||||
self.addr[2] = u16::from(ip[2]);
|
|
||||||
self.addr[3] = u16::from(ip[3]);
|
|
||||||
self.port = a.port();
|
|
||||||
}
|
|
||||||
SocketAddr::V6(v6) => {
|
|
||||||
self.addr = v6.ip().segments();
|
|
||||||
self.port = a.port();
|
|
||||||
self.v6 = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Packets {
|
|
||||||
pub packets: Vec<Packet>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Packets {
|
|
||||||
fn default() -> Packets {
|
|
||||||
Packets {
|
|
||||||
packets: vec![Packet::default(); BLOCK_SIZE],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Response {
|
|
||||||
pub data: [u8; RESP_SIZE],
|
|
||||||
pub meta: Meta,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Debug for Response {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"Response {{ size: {:?}, addr: {:?} }}",
|
|
||||||
self.meta.size,
|
|
||||||
self.meta.get_addr()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Response {
|
|
||||||
fn default() -> Response {
|
|
||||||
Response {
|
|
||||||
data: [0u8; RESP_SIZE],
|
|
||||||
meta: Meta::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Responses {
|
|
||||||
pub responses: Vec<Response>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for Responses {
|
|
||||||
fn default() -> Responses {
|
|
||||||
Responses {
|
|
||||||
responses: vec![Response::default(); NUM_RESP],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type SharedPackets = Arc<RwLock<Packets>>;
|
|
||||||
pub type PacketRecycler = Arc<Mutex<Vec<SharedPackets>>>;
|
|
||||||
pub type Receiver = mpsc::Receiver<SharedPackets>;
|
|
||||||
pub type Sender = mpsc::Sender<SharedPackets>;
|
|
||||||
pub type SharedResponses = Arc<RwLock<Responses>>;
|
|
||||||
pub type ResponseRecycler = Arc<Mutex<Vec<SharedResponses>>>;
|
|
||||||
pub type Responder = mpsc::Sender<SharedResponses>;
|
|
||||||
pub type ResponseReceiver = mpsc::Receiver<SharedResponses>;
|
|
||||||
|
|
||||||
impl Packets {
|
|
||||||
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
|
|
||||||
self.packets.resize(BLOCK_SIZE, Packet::default());
|
|
||||||
let mut i = 0;
|
|
||||||
socket.set_nonblocking(false)?;
|
|
||||||
for p in &mut self.packets {
|
|
||||||
p.meta.size = 0;
|
|
||||||
match socket.recv_from(&mut p.data) {
|
|
||||||
Err(_) if i > 0 => {
|
|
||||||
trace!("got {:?} messages", i);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
info!("recv_from err {:?}", e);
|
|
||||||
return Err(Error::IO(e));
|
|
||||||
}
|
|
||||||
Ok((nrecv, from)) => {
|
|
||||||
p.meta.size = nrecv;
|
|
||||||
p.meta.set_addr(&from);
|
|
||||||
if i == 0 {
|
|
||||||
socket.set_nonblocking(true)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
i += 1;
|
|
||||||
}
|
|
||||||
Ok(i)
|
|
||||||
}
|
|
||||||
fn read_from(&mut self, socket: &UdpSocket) -> Result<()> {
|
|
||||||
let sz = self.run_read_from(socket)?;
|
|
||||||
self.packets.resize(sz, Packet::default());
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Responses {
|
|
||||||
fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> {
|
|
||||||
for p in &self.responses {
|
|
||||||
let a = p.meta.get_addr();
|
|
||||||
socket.send_to(&p.data[..p.meta.size], &a)?;
|
|
||||||
//TODO(anatoly): wtf do we do about errors?
|
|
||||||
*num += 1;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn allocate<T>(recycler: &Arc<Mutex<Vec<Arc<RwLock<T>>>>>) -> Arc<RwLock<T>>
|
|
||||||
where
|
|
||||||
T: Default,
|
|
||||||
{
|
|
||||||
let mut gc = recycler.lock().expect("lock");
|
|
||||||
gc.pop()
|
|
||||||
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn recycle<T>(recycler: &Arc<Mutex<Vec<Arc<RwLock<T>>>>>, msgs: Arc<RwLock<T>>)
|
|
||||||
where
|
|
||||||
T: Default,
|
|
||||||
{
|
|
||||||
let mut gc = recycler.lock().expect("lock");
|
|
||||||
gc.push(msgs);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn recv_loop(
|
fn recv_loop(
|
||||||
sock: &UdpSocket,
|
sock: &UdpSocket,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
recycler: &PacketRecycler,
|
re: &PacketRecycler,
|
||||||
channel: &Sender,
|
channel: &PacketSender,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
loop {
|
loop {
|
||||||
let msgs = allocate(recycler);
|
let msgs = re.allocate();
|
||||||
let msgs_ = msgs.clone();
|
let msgs_ = msgs.clone();
|
||||||
loop {
|
loop {
|
||||||
match msgs.write().unwrap().read_from(sock) {
|
match msgs.write().unwrap().recv_from(sock) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
channel.send(msgs_)?;
|
channel.send(msgs_)?;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
recycle(recycler, msgs_);
|
re.recycle(msgs_);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -247,7 +43,7 @@ pub fn receiver(
|
||||||
sock: UdpSocket,
|
sock: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
recycler: PacketRecycler,
|
recycler: PacketRecycler,
|
||||||
channel: Sender,
|
channel: PacketSender,
|
||||||
) -> Result<JoinHandle<()>> {
|
) -> Result<JoinHandle<()>> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
sock.set_read_timeout(Some(timer))?;
|
sock.set_read_timeout(Some(timer))?;
|
||||||
|
@ -257,43 +53,105 @@ pub fn receiver(
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recv_send(sock: &UdpSocket, recycler: &ResponseRecycler, r: &ResponseReceiver) -> Result<()> {
|
fn recv_send(sock: &UdpSocket, recycler: &BlobRecycler, r: &BlobReceiver) -> Result<()> {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
let msgs = r.recv_timeout(timer)?;
|
let mut msgs = r.recv_timeout(timer)?;
|
||||||
let msgs_ = msgs.clone();
|
Blob::send_to(recycler, sock, &mut msgs)?;
|
||||||
let mut num = 0;
|
|
||||||
msgs.read().unwrap().send_to(sock, &mut num)?;
|
|
||||||
recycle(recycler, msgs_);
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn responder(
|
pub fn responder(
|
||||||
sock: UdpSocket,
|
sock: UdpSocket,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
recycler: ResponseRecycler,
|
recycler: BlobRecycler,
|
||||||
r: ResponseReceiver,
|
r: BlobReceiver,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
spawn(move || loop {
|
||||||
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
|
if recv_send(&sock, &recycler, &r).is_err() || exit.load(Ordering::Relaxed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//TODO, we would need to stick block authentication before we create the
|
||||||
|
//window.
|
||||||
|
fn recv_window(
|
||||||
|
window: &mut Vec<Option<SharedBlob>>,
|
||||||
|
recycler: &BlobRecycler,
|
||||||
|
consumed: &mut usize,
|
||||||
|
socket: &UdpSocket,
|
||||||
|
s: &BlobSender,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut dq = Blob::recv_from(recycler, socket)?;
|
||||||
|
while let Some(b) = dq.pop_front() {
|
||||||
|
let b_ = b.clone();
|
||||||
|
let mut p = b.write().unwrap();
|
||||||
|
let pix = p.get_index()? as usize;
|
||||||
|
let w = pix % NUM_BLOBS;
|
||||||
|
//TODO, after the block are authenticated
|
||||||
|
//if we get different blocks at the same index
|
||||||
|
//that is a network failure/attack
|
||||||
|
{
|
||||||
|
if window[w].is_none() {
|
||||||
|
window[w] = Some(b_);
|
||||||
|
} else {
|
||||||
|
debug!("duplicate blob at index {:}", w);
|
||||||
|
}
|
||||||
|
//send a contiguous set of blocks
|
||||||
|
let mut dq = VecDeque::new();
|
||||||
|
loop {
|
||||||
|
let k = *consumed % NUM_BLOBS;
|
||||||
|
if window[k].is_none() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
dq.push_back(window[k].clone().unwrap());
|
||||||
|
window[k] = None;
|
||||||
|
*consumed += 1;
|
||||||
|
}
|
||||||
|
if !dq.is_empty() {
|
||||||
|
s.send(dq)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn window(
|
||||||
|
sock: UdpSocket,
|
||||||
|
exit: Arc<AtomicBool>,
|
||||||
|
r: BlobRecycler,
|
||||||
|
s: BlobSender,
|
||||||
|
) -> JoinHandle<()> {
|
||||||
|
spawn(move || {
|
||||||
|
let mut window = vec![None; NUM_BLOBS];
|
||||||
|
let mut consumed = 0;
|
||||||
|
let timer = Duration::new(1, 0);
|
||||||
|
sock.set_read_timeout(Some(timer)).unwrap();
|
||||||
|
loop {
|
||||||
|
if recv_window(&mut window, &r, &mut consumed, &sock, &s).is_err()
|
||||||
|
|| exit.load(Ordering::Relaxed)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(all(feature = "unstable", test))]
|
#[cfg(all(feature = "unstable", test))]
|
||||||
mod bench {
|
mod bench {
|
||||||
extern crate test;
|
extern crate test;
|
||||||
use self::test::Bencher;
|
use self::test::Bencher;
|
||||||
use result::Result;
|
use result::Result;
|
||||||
use std::net::{SocketAddr, UdpSocket};
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
use std::sync::mpsc::channel;
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::thread::sleep;
|
use std::thread::sleep;
|
||||||
use std::thread::{spawn, JoinHandle};
|
use std::thread::{spawn, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use streamer::{allocate, receiver, recycle, Packet, PacketRecycler, Receiver, PACKET_SIZE};
|
use std::sync::mpsc::channel;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use packet::{Packet, PacketRecycler, PACKET_SIZE};
|
||||||
|
use streamer::{receiver, PacketReceiver};
|
||||||
|
|
||||||
fn producer(
|
fn producer(
|
||||||
addr: &SocketAddr,
|
addr: &SocketAddr,
|
||||||
|
@ -301,7 +159,7 @@ mod bench {
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let msgs = allocate(&recycler);
|
let msgs = recycler.allocate();
|
||||||
let msgs_ = msgs.clone();
|
let msgs_ = msgs.clone();
|
||||||
msgs.write().unwrap().packets.resize(10, Packet::default());
|
msgs.write().unwrap().packets.resize(10, Packet::default());
|
||||||
for w in msgs.write().unwrap().packets.iter_mut() {
|
for w in msgs.write().unwrap().packets.iter_mut() {
|
||||||
|
@ -314,7 +172,7 @@ mod bench {
|
||||||
}
|
}
|
||||||
let mut num = 0;
|
let mut num = 0;
|
||||||
for p in msgs_.read().unwrap().packets.iter() {
|
for p in msgs_.read().unwrap().packets.iter() {
|
||||||
let a = p.meta.get_addr();
|
let a = p.meta.addr();
|
||||||
send.send_to(&p.data[..p.meta.size], &a).unwrap();
|
send.send_to(&p.data[..p.meta.size], &a).unwrap();
|
||||||
num += 1;
|
num += 1;
|
||||||
}
|
}
|
||||||
|
@ -326,7 +184,7 @@ mod bench {
|
||||||
recycler: PacketRecycler,
|
recycler: PacketRecycler,
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
rvs: Arc<Mutex<usize>>,
|
rvs: Arc<Mutex<usize>>,
|
||||||
r: Receiver,
|
r: PacketReceiver,
|
||||||
) -> JoinHandle<()> {
|
) -> JoinHandle<()> {
|
||||||
spawn(move || loop {
|
spawn(move || loop {
|
||||||
if exit.load(Ordering::Relaxed) {
|
if exit.load(Ordering::Relaxed) {
|
||||||
|
@ -337,7 +195,7 @@ mod bench {
|
||||||
Ok(msgs) => {
|
Ok(msgs) => {
|
||||||
let msgs_ = msgs.clone();
|
let msgs_ = msgs.clone();
|
||||||
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
|
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
|
||||||
recycle(&recycler, msgs_);
|
recycler.recycle(msgs_);
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
@ -347,16 +205,16 @@ mod bench {
|
||||||
let read = UdpSocket::bind("127.0.0.1:0")?;
|
let read = UdpSocket::bind("127.0.0.1:0")?;
|
||||||
let addr = read.local_addr()?;
|
let addr = read.local_addr()?;
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let recycler = Arc::new(Mutex::new(Vec::new()));
|
let pack_recycler = PacketRecycler::default();
|
||||||
|
|
||||||
let (s_reader, r_reader) = channel();
|
let (s_reader, r_reader) = channel();
|
||||||
let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?;
|
let t_reader = receiver(read, exit.clone(), pack_recycler.clone(), s_reader)?;
|
||||||
let t_producer1 = producer(&addr, recycler.clone(), exit.clone());
|
let t_producer1 = producer(&addr, pack_recycler.clone(), exit.clone());
|
||||||
let t_producer2 = producer(&addr, recycler.clone(), exit.clone());
|
let t_producer2 = producer(&addr, pack_recycler.clone(), exit.clone());
|
||||||
let t_producer3 = producer(&addr, recycler.clone(), exit.clone());
|
let t_producer3 = producer(&addr, pack_recycler.clone(), exit.clone());
|
||||||
|
|
||||||
let rvs = Arc::new(Mutex::new(0));
|
let rvs = Arc::new(Mutex::new(0));
|
||||||
let t_sink = sink(recycler.clone(), exit.clone(), rvs.clone(), r_reader);
|
let t_sink = sink(pack_recycler.clone(), exit.clone(), rvs.clone(), r_reader);
|
||||||
|
|
||||||
let start = SystemTime::now();
|
let start = SystemTime::now();
|
||||||
let start_val = *rvs.lock().unwrap();
|
let start_val = *rvs.lock().unwrap();
|
||||||
|
@ -383,17 +241,18 @@ mod bench {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use std::io;
|
|
||||||
use std::io::Write;
|
|
||||||
use std::net::UdpSocket;
|
use std::net::UdpSocket;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::io::Write;
|
||||||
|
use std::io;
|
||||||
|
use std::collections::VecDeque;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer::{allocate, receiver, responder, Packet, Packets, Receiver, Response, Responses,
|
use std::sync::Arc;
|
||||||
PACKET_SIZE};
|
use packet::{Blob, BlobRecycler, Packet, PacketRecycler, Packets, PACKET_SIZE};
|
||||||
|
use streamer::{receiver, responder, window, BlobReceiver, PacketReceiver};
|
||||||
|
|
||||||
fn get_msgs(r: Receiver, num: &mut usize) {
|
fn get_msgs(r: PacketReceiver, num: &mut usize) {
|
||||||
for _t in 0..5 {
|
for _t in 0..5 {
|
||||||
let timer = Duration::new(1, 0);
|
let timer = Duration::new(1, 0);
|
||||||
match r.recv_timeout(timer) {
|
match r.recv_timeout(timer) {
|
||||||
|
@ -405,40 +264,11 @@ mod test {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#[cfg(ipv6)]
|
|
||||||
#[test]
|
|
||||||
pub fn streamer_send_test_ipv6() {
|
|
||||||
let read = UdpSocket::bind("[::1]:0").expect("bind");
|
|
||||||
let addr = read.local_addr().unwrap();
|
|
||||||
let send = UdpSocket::bind("[::1]:0").expect("bind");
|
|
||||||
let exit = Arc::new(Mutex::new(false));
|
|
||||||
let recycler = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let (s_reader, r_reader) = channel();
|
|
||||||
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
|
|
||||||
let (s_responder, r_responder) = channel();
|
|
||||||
let t_responder = responder(send, exit.clone(), recycler.clone(), r_responder);
|
|
||||||
let msgs = allocate(&recycler);
|
|
||||||
msgs.write().unwrap().packets.resize(10, Packet::default());
|
|
||||||
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
|
|
||||||
w.data[0] = i as u8;
|
|
||||||
w.size = PACKET_SIZE;
|
|
||||||
w.set_addr(&addr);
|
|
||||||
assert_eq!(w.get_addr(), addr);
|
|
||||||
}
|
|
||||||
s_responder.send(msgs).expect("send");
|
|
||||||
let mut num = 0;
|
|
||||||
get_msgs(r_reader, &mut num);
|
|
||||||
assert_eq!(num, 10);
|
|
||||||
exit.store(true, Ordering::Relaxed);
|
|
||||||
t_receiver.join().expect("join");
|
|
||||||
t_responder.join().expect("join");
|
|
||||||
}
|
|
||||||
#[test]
|
#[test]
|
||||||
pub fn streamer_debug() {
|
pub fn streamer_debug() {
|
||||||
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
||||||
write!(io::sink(), "{:?}", Packets::default()).unwrap();
|
write!(io::sink(), "{:?}", Packets::default()).unwrap();
|
||||||
write!(io::sink(), "{:?}", Response::default()).unwrap();
|
write!(io::sink(), "{:?}", Blob::default()).unwrap();
|
||||||
write!(io::sink(), "{:?}", Responses::default()).unwrap();
|
|
||||||
}
|
}
|
||||||
#[test]
|
#[test]
|
||||||
pub fn streamer_send_test() {
|
pub fn streamer_send_test() {
|
||||||
|
@ -446,22 +276,21 @@ mod test {
|
||||||
let addr = read.local_addr().unwrap();
|
let addr = read.local_addr().unwrap();
|
||||||
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let packet_recycler = Arc::new(Mutex::new(Vec::new()));
|
let pack_recycler = PacketRecycler::default();
|
||||||
let resp_recycler = Arc::new(Mutex::new(Vec::new()));
|
let resp_recycler = BlobRecycler::default();
|
||||||
let (s_reader, r_reader) = channel();
|
let (s_reader, r_reader) = channel();
|
||||||
let t_receiver = receiver(read, exit.clone(), packet_recycler.clone(), s_reader).unwrap();
|
let t_receiver = receiver(read, exit.clone(), pack_recycler.clone(), s_reader).unwrap();
|
||||||
let (s_responder, r_responder) = channel();
|
let (s_responder, r_responder) = channel();
|
||||||
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
||||||
let msgs = allocate(&resp_recycler);
|
let mut msgs = VecDeque::new();
|
||||||
msgs.write()
|
for i in 0..10 {
|
||||||
.unwrap()
|
let b = resp_recycler.allocate();
|
||||||
.responses
|
let b_ = b.clone();
|
||||||
.resize(10, Response::default());
|
let mut w = b.write().unwrap();
|
||||||
for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() {
|
|
||||||
w.data[0] = i as u8;
|
w.data[0] = i as u8;
|
||||||
w.meta.size = PACKET_SIZE;
|
w.meta.size = PACKET_SIZE;
|
||||||
w.meta.set_addr(&addr);
|
w.meta.set_addr(&addr);
|
||||||
assert_eq!(w.meta.get_addr(), addr);
|
msgs.push_back(b_);
|
||||||
}
|
}
|
||||||
s_responder.send(msgs).expect("send");
|
s_responder.send(msgs).expect("send");
|
||||||
let mut num = 0;
|
let mut num = 0;
|
||||||
|
@ -471,4 +300,54 @@ mod test {
|
||||||
t_receiver.join().expect("join");
|
t_receiver.join().expect("join");
|
||||||
t_responder.join().expect("join");
|
t_responder.join().expect("join");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_blobs(r: BlobReceiver, num: &mut usize) {
|
||||||
|
for _t in 0..5 {
|
||||||
|
let timer = Duration::new(1, 0);
|
||||||
|
match r.recv_timeout(timer) {
|
||||||
|
Ok(m) => {
|
||||||
|
for (i, v) in m.iter().enumerate() {
|
||||||
|
assert_eq!(v.read().unwrap().get_index().unwrap() as usize, *num + i);
|
||||||
|
}
|
||||||
|
*num += m.len();
|
||||||
|
}
|
||||||
|
e => println!("error {:?}", e),
|
||||||
|
}
|
||||||
|
if *num == 10 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
pub fn window_send_test() {
|
||||||
|
let read = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let addr = read.local_addr().unwrap();
|
||||||
|
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||||
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
|
let resp_recycler = BlobRecycler::default();
|
||||||
|
let (s_reader, r_reader) = channel();
|
||||||
|
let t_receiver = window(read, exit.clone(), resp_recycler.clone(), s_reader);
|
||||||
|
let (s_responder, r_responder) = channel();
|
||||||
|
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
||||||
|
let mut msgs = VecDeque::new();
|
||||||
|
for v in 0..10 {
|
||||||
|
let i = 9 - v;
|
||||||
|
let b = resp_recycler.allocate();
|
||||||
|
let b_ = b.clone();
|
||||||
|
let mut w = b.write().unwrap();
|
||||||
|
w.set_index(i).unwrap();
|
||||||
|
assert_eq!(i, w.get_index().unwrap());
|
||||||
|
w.meta.size = PACKET_SIZE;
|
||||||
|
w.meta.set_addr(&addr);
|
||||||
|
msgs.push_back(b_);
|
||||||
|
}
|
||||||
|
s_responder.send(msgs).expect("send");
|
||||||
|
let mut num = 0;
|
||||||
|
get_blobs(r_reader, &mut num);
|
||||||
|
assert_eq!(num, 10);
|
||||||
|
exit.store(true, Ordering::Relaxed);
|
||||||
|
t_receiver.join().expect("join");
|
||||||
|
t_responder.join().expect("join");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue