commit
112aecf6eb
|
@ -92,40 +92,40 @@ impl AccountantSkel {
|
|||
fn process(
|
||||
&mut self,
|
||||
r_reader: &streamer::Receiver,
|
||||
s_sender: &streamer::Sender,
|
||||
recycler: &streamer::Recycler,
|
||||
s_responder: &streamer::Responder,
|
||||
packet_recycler: &streamer::PacketRecycler,
|
||||
response_recycler: &streamer::ResponseRecycler,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let msgs = r_reader.recv_timeout(timer)?;
|
||||
let msgs_ = msgs.clone();
|
||||
let msgs__ = msgs.clone();
|
||||
let rsps = streamer::allocate(recycler);
|
||||
let rsps = streamer::allocate(response_recycler);
|
||||
let rsps_ = rsps.clone();
|
||||
let l = msgs__.read().unwrap().packets.len();
|
||||
rsps.write()
|
||||
.unwrap()
|
||||
.packets
|
||||
.resize(l, streamer::Packet::default());
|
||||
{
|
||||
let mut num = 0;
|
||||
let mut ursps = rsps.write().unwrap();
|
||||
for packet in &msgs.read().unwrap().packets {
|
||||
let sz = packet.size;
|
||||
let sz = packet.meta.size;
|
||||
let req = deserialize(&packet.data[0..sz])?;
|
||||
if let Some(resp) = self.process_request(req) {
|
||||
let rsp = &mut ursps.packets[num];
|
||||
if ursps.responses.len() <= num {
|
||||
ursps
|
||||
.responses
|
||||
.resize((num + 1) * 2, streamer::Response::default());
|
||||
}
|
||||
let rsp = &mut ursps.responses[num];
|
||||
let v = serialize(&resp)?;
|
||||
let len = v.len();
|
||||
rsp.data[0..len].copy_from_slice(&v);
|
||||
rsp.size = len;
|
||||
rsp.set_addr(&packet.get_addr());
|
||||
rsp.data[..len].copy_from_slice(&v);
|
||||
rsp.meta.size = len;
|
||||
rsp.meta.set_addr(&packet.meta.get_addr());
|
||||
num += 1;
|
||||
}
|
||||
}
|
||||
ursps.packets.resize(num, streamer::Packet::default());
|
||||
ursps.responses.resize(num, streamer::Response::default());
|
||||
}
|
||||
s_sender.send(rsps_)?;
|
||||
streamer::recycle(recycler, msgs_);
|
||||
s_responder.send(rsps_)?;
|
||||
streamer::recycle(packet_recycler, msgs_);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -141,23 +141,30 @@ impl AccountantSkel {
|
|||
local.set_port(0);
|
||||
let write = UdpSocket::bind(local)?;
|
||||
|
||||
let recycler = Arc::new(Mutex::new(Vec::new()));
|
||||
let packet_recycler = Arc::new(Mutex::new(Vec::new()));
|
||||
let response_recycler = Arc::new(Mutex::new(Vec::new()));
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = streamer::receiver(read, exit.clone(), recycler.clone(), s_reader)?;
|
||||
let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?;
|
||||
|
||||
let (s_sender, r_sender) = channel();
|
||||
let t_sender = streamer::sender(write, exit.clone(), recycler.clone(), r_sender);
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder =
|
||||
streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder);
|
||||
|
||||
let t_server = spawn(move || {
|
||||
if let Ok(me) = Arc::try_unwrap(obj) {
|
||||
loop {
|
||||
let e = me.lock().unwrap().process(&r_reader, &s_sender, &recycler);
|
||||
let e = me.lock().unwrap().process(
|
||||
&r_reader,
|
||||
&s_responder,
|
||||
&packet_recycler,
|
||||
&response_recycler,
|
||||
);
|
||||
if e.is_err() && exit.load(Ordering::Relaxed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(vec![t_receiver, t_sender, t_server])
|
||||
Ok(vec![t_receiver, t_responder, t_server])
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ pub enum Error {
|
|||
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
|
||||
Serialize(std::boxed::Box<bincode::ErrorKind>),
|
||||
SendError,
|
||||
Services,
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
|
211
src/streamer.rs
211
src/streamer.rs
|
@ -8,38 +8,45 @@ use std::thread::{spawn, JoinHandle};
|
|||
use result::{Error, Result};
|
||||
|
||||
const BLOCK_SIZE: usize = 1024 * 8;
|
||||
pub const PACKET_SIZE: usize = 1024;
|
||||
pub const PACKET_SIZE: usize = 256;
|
||||
pub const RESP_SIZE: usize = 64 * 1024;
|
||||
pub const NUM_RESP: usize = (BLOCK_SIZE * PACKET_SIZE) / RESP_SIZE;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Packet {
|
||||
pub data: [u8; PACKET_SIZE],
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Meta {
|
||||
pub size: usize,
|
||||
pub addr: [u16; 8],
|
||||
pub port: u16,
|
||||
pub v6: bool,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Packet {
|
||||
pub data: [u8; PACKET_SIZE],
|
||||
pub meta: Meta,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Packet {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Packet {{ size: {:?}, addr: {:?} }}",
|
||||
self.size,
|
||||
self.get_addr()
|
||||
self.meta.size,
|
||||
self.meta.get_addr()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Packet {
|
||||
fn default() -> Packet {
|
||||
Packet {
|
||||
data: [0u8; PACKET_SIZE],
|
||||
size: 0,
|
||||
addr: [0u16; 8],
|
||||
port: 0,
|
||||
v6: false,
|
||||
meta: Meta::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl Packet {
|
||||
|
||||
impl Meta {
|
||||
pub fn get_addr(&self) -> SocketAddr {
|
||||
if !self.v6 {
|
||||
let ipv4 = Ipv4Addr::new(
|
||||
|
@ -83,27 +90,74 @@ impl Packet {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct PacketData {
|
||||
#[derive(Debug)]
|
||||
pub struct Packets {
|
||||
pub packets: Vec<Packet>,
|
||||
}
|
||||
|
||||
pub type SharedPacketData = Arc<RwLock<PacketData>>;
|
||||
pub type Recycler = Arc<Mutex<Vec<SharedPacketData>>>;
|
||||
pub type Receiver = mpsc::Receiver<SharedPacketData>;
|
||||
pub type Sender = mpsc::Sender<SharedPacketData>;
|
||||
|
||||
impl PacketData {
|
||||
pub fn new() -> PacketData {
|
||||
PacketData {
|
||||
impl Default for Packets {
|
||||
fn default() -> Packets {
|
||||
Packets {
|
||||
packets: vec![Packet::default(); BLOCK_SIZE],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Response {
|
||||
pub data: [u8; RESP_SIZE],
|
||||
pub meta: Meta,
|
||||
}
|
||||
|
||||
impl fmt::Debug for Response {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"Response {{ size: {:?}, addr: {:?} }}",
|
||||
self.meta.size,
|
||||
self.meta.get_addr()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Response {
|
||||
fn default() -> Response {
|
||||
Response {
|
||||
data: [0u8; RESP_SIZE],
|
||||
meta: Meta::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Responses {
|
||||
pub responses: Vec<Response>,
|
||||
}
|
||||
|
||||
impl Default for Responses {
|
||||
fn default() -> Responses {
|
||||
Responses {
|
||||
responses: vec![Response::default(); NUM_RESP],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type SharedPackets = Arc<RwLock<Packets>>;
|
||||
pub type PacketRecycler = Arc<Mutex<Vec<SharedPackets>>>;
|
||||
pub type Receiver = mpsc::Receiver<SharedPackets>;
|
||||
pub type Sender = mpsc::Sender<SharedPackets>;
|
||||
pub type SharedResponses = Arc<RwLock<Responses>>;
|
||||
pub type ResponseRecycler = Arc<Mutex<Vec<SharedResponses>>>;
|
||||
pub type Responder = mpsc::Sender<SharedResponses>;
|
||||
pub type ResponseReceiver = mpsc::Receiver<SharedResponses>;
|
||||
|
||||
impl Packets {
|
||||
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
|
||||
self.packets.resize(BLOCK_SIZE, Packet::default());
|
||||
let mut i = 0;
|
||||
socket.set_nonblocking(false)?;
|
||||
for p in &mut self.packets {
|
||||
p.size = 0;
|
||||
p.meta.size = 0;
|
||||
match socket.recv_from(&mut p.data) {
|
||||
Err(_) if i > 0 => {
|
||||
trace!("got {:?} messages", i);
|
||||
|
@ -114,8 +168,8 @@ impl PacketData {
|
|||
return Err(Error::IO(e));
|
||||
}
|
||||
Ok((nrecv, from)) => {
|
||||
p.size = nrecv;
|
||||
p.set_addr(&from);
|
||||
p.meta.size = nrecv;
|
||||
p.meta.set_addr(&from);
|
||||
if i == 0 {
|
||||
socket.set_nonblocking(true)?;
|
||||
}
|
||||
|
@ -130,10 +184,13 @@ impl PacketData {
|
|||
self.packets.resize(sz, Packet::default());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Responses {
|
||||
fn send_to(&self, socket: &UdpSocket, num: &mut usize) -> Result<()> {
|
||||
for p in &self.packets {
|
||||
let a = p.get_addr();
|
||||
socket.send_to(&p.data[0..p.size], &a)?;
|
||||
for p in &self.responses {
|
||||
let a = p.meta.get_addr();
|
||||
socket.send_to(&p.data[..p.meta.size], &a)?;
|
||||
//TODO(anatoly): wtf do we do about errors?
|
||||
*num += 1;
|
||||
}
|
||||
|
@ -141,13 +198,19 @@ impl PacketData {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn allocate(recycler: &Recycler) -> SharedPacketData {
|
||||
pub fn allocate<T>(recycler: &Arc<Mutex<Vec<Arc<RwLock<T>>>>>) -> Arc<RwLock<T>>
|
||||
where
|
||||
T: Default,
|
||||
{
|
||||
let mut gc = recycler.lock().expect("lock");
|
||||
gc.pop()
|
||||
.unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new())))
|
||||
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())))
|
||||
}
|
||||
|
||||
pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) {
|
||||
pub fn recycle<T>(recycler: &Arc<Mutex<Vec<Arc<RwLock<T>>>>>, msgs: Arc<RwLock<T>>)
|
||||
where
|
||||
T: Default,
|
||||
{
|
||||
let mut gc = recycler.lock().expect("lock");
|
||||
gc.push(msgs);
|
||||
}
|
||||
|
@ -155,7 +218,7 @@ pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) {
|
|||
fn recv_loop(
|
||||
sock: &UdpSocket,
|
||||
exit: &Arc<AtomicBool>,
|
||||
recycler: &Recycler,
|
||||
recycler: &PacketRecycler,
|
||||
channel: &Sender,
|
||||
) -> Result<()> {
|
||||
loop {
|
||||
|
@ -181,7 +244,7 @@ fn recv_loop(
|
|||
pub fn receiver(
|
||||
sock: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
recycler: Recycler,
|
||||
recycler: PacketRecycler,
|
||||
channel: Sender,
|
||||
) -> Result<JoinHandle<()>> {
|
||||
let timer = Duration::new(1, 0);
|
||||
|
@ -192,7 +255,7 @@ pub fn receiver(
|
|||
}))
|
||||
}
|
||||
|
||||
fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> {
|
||||
fn recv_send(sock: &UdpSocket, recycler: &ResponseRecycler, r: &ResponseReceiver) -> Result<()> {
|
||||
let timer = Duration::new(1, 0);
|
||||
let msgs = r.recv_timeout(timer)?;
|
||||
let msgs_ = msgs.clone();
|
||||
|
@ -202,11 +265,11 @@ fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()>
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn sender(
|
||||
pub fn responder(
|
||||
sock: UdpSocket,
|
||||
exit: Arc<AtomicBool>,
|
||||
recycler: Recycler,
|
||||
r: Receiver,
|
||||
recycler: ResponseRecycler,
|
||||
r: ResponseReceiver,
|
||||
) -> JoinHandle<()> {
|
||||
spawn(move || loop {
|
||||
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
|
||||
|
@ -226,29 +289,39 @@ mod bench {
|
|||
use std::time::SystemTime;
|
||||
use std::thread::{spawn, JoinHandle};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use result::Result;
|
||||
use streamer::{allocate, receiver, recycle, Packet, Receiver, Recycler, PACKET_SIZE};
|
||||
use streamer::{allocate, receiver, recycle, Packet, PacketRecycler, Receiver, PACKET_SIZE};
|
||||
|
||||
fn producer(addr: &SocketAddr, recycler: &Recycler, exit: Arc<AtomicBool>) -> JoinHandle<()> {
|
||||
fn producer(
|
||||
addr: &SocketAddr,
|
||||
recycler: PacketRecycler,
|
||||
exit: Arc<AtomicBool>,
|
||||
) -> JoinHandle<()> {
|
||||
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
let msgs = allocate(recycler);
|
||||
let msgs = allocate(&recycler);
|
||||
let msgs_ = msgs.clone();
|
||||
msgs.write().unwrap().packets.resize(10, Packet::default());
|
||||
for w in msgs.write().unwrap().packets.iter_mut() {
|
||||
w.size = PACKET_SIZE;
|
||||
w.set_addr(&addr);
|
||||
w.meta.size = PACKET_SIZE;
|
||||
w.meta.set_addr(&addr);
|
||||
}
|
||||
spawn(move || loop {
|
||||
if exit.load(Ordering::Relaxed) {
|
||||
return;
|
||||
}
|
||||
let mut num = 0;
|
||||
msgs.read().unwrap().send_to(&send, &mut num).unwrap();
|
||||
for p in msgs_.read().unwrap().packets.iter() {
|
||||
let a = p.meta.get_addr();
|
||||
send.send_to(&p.data[..p.meta.size], &a).unwrap();
|
||||
num += 1;
|
||||
}
|
||||
assert_eq!(num, 10);
|
||||
})
|
||||
}
|
||||
|
||||
fn sinc(
|
||||
recycler: &Recycler,
|
||||
recycler: PacketRecycler,
|
||||
exit: Arc<AtomicBool>,
|
||||
rvs: Arc<Mutex<usize>>,
|
||||
r: Receiver,
|
||||
|
@ -262,7 +335,7 @@ mod bench {
|
|||
Ok(msgs) => {
|
||||
let msgs_ = msgs.clone();
|
||||
*rvs.lock().unwrap() += msgs.read().unwrap().packets.len();
|
||||
recycle(recycler, msgs_);
|
||||
recycle(&recycler, msgs_);
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
@ -275,10 +348,10 @@ mod bench {
|
|||
let recycler = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_reader = receiver(read, exit.clone(), &recycler, s_reader)?;
|
||||
let t_producer1 = producer(&addr, &recycler, exit.clone());
|
||||
let t_producer2 = producer(&addr, &recycler, exit.clone());
|
||||
let t_producer3 = producer(&addr, &recycler, exit.clone());
|
||||
let t_reader = receiver(read, exit.clone(), recycler.clone(), s_reader)?;
|
||||
let t_producer1 = producer(&addr, recycler.clone(), exit.clone());
|
||||
let t_producer2 = producer(&addr, recycler.clone(), exit.clone());
|
||||
let t_producer3 = producer(&addr, recycler.clone(), exit.clone());
|
||||
|
||||
let rvs = Arc::new(Mutex::new(0));
|
||||
let t_sinc = sinc(recycler.clone(), exit.clone(), rvs.clone(), r_reader);
|
||||
|
@ -315,7 +388,8 @@ mod test {
|
|||
use std::sync::mpsc::channel;
|
||||
use std::io::Write;
|
||||
use std::io;
|
||||
use streamer::{allocate, receiver, sender, Packet, Receiver, PACKET_SIZE};
|
||||
use streamer::{allocate, receiver, responder, Packet, Packets, Receiver, Response, Responses,
|
||||
PACKET_SIZE};
|
||||
|
||||
fn get_msgs(r: Receiver, num: &mut usize) {
|
||||
for _t in 0..5 {
|
||||
|
@ -339,8 +413,8 @@ mod test {
|
|||
let recycler = Arc::new(Mutex::new(Vec::new()));
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
|
||||
let (s_sender, r_sender) = channel();
|
||||
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = responder(send, exit.clone(), recycler.clone(), r_responder);
|
||||
let msgs = allocate(&recycler);
|
||||
msgs.write().unwrap().packets.resize(10, Packet::default());
|
||||
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
|
||||
|
@ -349,17 +423,20 @@ mod test {
|
|||
w.set_addr(&addr);
|
||||
assert_eq!(w.get_addr(), addr);
|
||||
}
|
||||
s_sender.send(msgs).expect("send");
|
||||
s_responder.send(msgs).expect("send");
|
||||
let mut num = 0;
|
||||
get_msgs(r_reader, &mut num);
|
||||
assert_eq!(num, 10);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_sender.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
}
|
||||
#[test]
|
||||
pub fn streamer_debug() {
|
||||
write!(io::sink(), "{:?}", Packet::default()).unwrap();
|
||||
write!(io::sink(), "{:?}", Packets::default()).unwrap();
|
||||
write!(io::sink(), "{:?}", Response::default()).unwrap();
|
||||
write!(io::sink(), "{:?}", Responses::default()).unwrap();
|
||||
}
|
||||
#[test]
|
||||
pub fn streamer_send_test() {
|
||||
|
@ -367,25 +444,29 @@ mod test {
|
|||
let addr = read.local_addr().unwrap();
|
||||
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let recycler = Arc::new(Mutex::new(Vec::new()));
|
||||
let packet_recycler = Arc::new(Mutex::new(Vec::new()));
|
||||
let resp_recycler = Arc::new(Mutex::new(Vec::new()));
|
||||
let (s_reader, r_reader) = channel();
|
||||
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
|
||||
let (s_sender, r_sender) = channel();
|
||||
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
|
||||
let msgs = allocate(&recycler);
|
||||
msgs.write().unwrap().packets.resize(10, Packet::default());
|
||||
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
|
||||
let t_receiver = receiver(read, exit.clone(), packet_recycler.clone(), s_reader).unwrap();
|
||||
let (s_responder, r_responder) = channel();
|
||||
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
|
||||
let msgs = allocate(&resp_recycler);
|
||||
msgs.write()
|
||||
.unwrap()
|
||||
.responses
|
||||
.resize(10, Response::default());
|
||||
for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() {
|
||||
w.data[0] = i as u8;
|
||||
w.size = PACKET_SIZE;
|
||||
w.set_addr(&addr);
|
||||
assert_eq!(w.get_addr(), addr);
|
||||
w.meta.size = PACKET_SIZE;
|
||||
w.meta.set_addr(&addr);
|
||||
assert_eq!(w.meta.get_addr(), addr);
|
||||
}
|
||||
s_sender.send(msgs).expect("send");
|
||||
s_responder.send(msgs).expect("send");
|
||||
let mut num = 0;
|
||||
get_msgs(r_reader, &mut num);
|
||||
assert_eq!(num, 10);
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
t_receiver.join().expect("join");
|
||||
t_sender.join().expect("join");
|
||||
t_responder.join().expect("join");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue