responder

This commit is contained in:
Anatoly Yakovenko 2018-03-24 23:31:54 -07:00
parent 7732f3f5fb
commit 533b3170a7
2 changed files with 66 additions and 252 deletions

View File

@ -1,221 +0,0 @@
//! Small services library with named ports
//! see test for usage
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use result::Result;
use result::Error;
pub enum Port {
Main,
PacketReader,
Accountant,
PacketSender,
}
impl Port {
fn to_usize(self) -> usize {
match self {
Port::Main => 0,
Port::PacketReader => 1,
Port::Accountant => 2,
Port::PacketSender => 3,
}
}
}
#[derive(Clone)]
pub enum Data {
Signal,
SharedPacketData(streamer::SharedPacketData),
}
struct Locked {
ports: Vec<Sender<Data>>,
readers: Vec<Arc<Mutex<Receiver<Data>>>>,
threads: Vec<Arc<Option<JoinHandle<Result<()>>>>>,
}
pub struct Services {
lock: Arc<RwLock<Locked>>,
exit: Arc<AtomicBool>,
}
pub type Ports = Vec<Sender<Data>>;
impl Services {
pub fn new() -> Services {
let (s1, r1) = channel();
let (s2, r2) = channel();
let (s3, r3) = channel();
let (s4, r4) = channel();
let (s5, r5) = channel();
let locked = Locked {
ports: [s1, s2, s3, s4, s5].to_vec(),
readers: [
Arc::new(Mutex::new(r1)),
Arc::new(Mutex::new(r2)),
Arc::new(Mutex::new(r3)),
Arc::new(Mutex::new(r4)),
Arc::new(Mutex::new(r5)),
].to_vec(),
threads: [
Arc::new(None),
Arc::new(None),
Arc::new(None),
Arc::new(None),
Arc::new(None),
].to_vec(),
};
let exit = Arc::new(AtomicBool::new(false));
Services {
lock: Arc::new(RwLock::new(locked)),
exit: exit,
}
}
pub fn source<F>(&self, port: Port, func: F) -> Result<()>
where
F: Send + 'static + Fn(&Ports) -> Result<()>,
{
let mut w = self.lock.write().unwrap();
let pz = port.to_usize();
if w.threads[pz].is_some() {
return Err(Error::Services);
}
let c_ports = w.ports.clone();
let c_exit = self.exit.clone();
let j = spawn(move || loop {
match func(&c_ports) {
Ok(()) => (),
e => return e,
}
if c_exit.load(Ordering::Relaxed) == true {
return Ok(());
}
});
w.threads[pz] = Arc::new(Some(j));
return Ok(());
}
pub fn listen<F>(&mut self, port: Port, func: F) -> Result<()>
where
F: Send + 'static + Fn(&Ports, Data) -> Result<()>,
{
let mut w = self.lock.write().unwrap();
let pz = port.to_usize();
if w.threads[pz].is_some() {
return Err(Error::Services);
}
let recv_lock = w.readers[pz].clone();
let c_ports = w.ports.clone();
let c_exit = self.exit.clone();
let j: JoinHandle<Result<()>> = spawn(move || loop {
let recv = recv_lock.lock().unwrap();
let timer = Duration::new(0, 500000);
match recv.recv_timeout(timer) {
Ok(val) => func(&c_ports, val).expect("services listen"),
_ => (),
}
if c_exit.load(Ordering::Relaxed) {
return Ok(());
}
});
w.threads[pz] = Arc::new(Some(j));
return Ok(());
}
pub fn send(ports: &Ports, to: Port, m: Data) -> Result<()> {
ports[to.to_usize()]
.send(m)
.or_else(|_| Err(Error::SendError))
}
pub fn join(&mut self) -> Result<()> {
let pz = Port::Main.to_usize();
let recv = self.lock.write().unwrap().readers[pz].clone();
recv.lock().unwrap().recv()?;
self.shutdown()?;
return Ok(());
}
pub fn shutdown(&mut self) -> Result<()> {
self.exit.store(true, Ordering::Relaxed);
let r = self.lock.read().unwrap();
for t in r.threads.iter() {
match Arc::try_unwrap((*t).clone()) {
Ok(Some(j)) => j.join()??,
_ => (),
};
}
return Ok(());
}
}
#[cfg(test)]
mod test {
use services::Services;
use services::Port::{Accountant, Main, PacketReader};
use services::Data::Signal;
use std::sync::{Arc, Mutex};
#[test]
fn test_init() {
let mut o = Services::new();
assert_matches!(o.shutdown(), Ok(()));
}
#[test]
fn test_join() {
let mut o = Services::new();
assert_matches!(
o.source(PacketReader, move |ports| Services::send(
ports,
Main,
Signal
)),
Ok(())
);
assert_matches!(o.join(), Ok(()));
}
#[test]
fn test_source() {
let mut o = Services::new();
assert_matches!(
o.source(PacketReader, move |ports| Services::send(
ports,
Main,
Signal
)),
Ok(())
);
assert!(o.source(PacketReader, move |_ports| Ok(())).is_err());
assert!(o.listen(PacketReader, move |_ports, _data| Ok(())).is_err());
assert_matches!(o.join(), Ok(()));
}
#[test]
fn test_listen() {
let mut o = Services::new();
let val = Arc::new(Mutex::new(false));
assert_matches!(
o.source(PacketReader, move |ports| Services::send(
ports,
Accountant,
Signal
)),
Ok(())
);
let c_val = val.clone();
assert_matches!(
o.listen(Accountant, move |ports, data| match data {
Signal => {
*c_val.lock().unwrap() = true;
Services::send(ports, Main, Signal)
}
_ => Ok(()),
}),
Ok(())
);
assert_matches!(o.join(), Ok(()));
assert_eq!(*val.lock().unwrap(), true);
}
}

View File

@ -9,6 +9,9 @@ use result::{Error, Result};
const BLOCK_SIZE: usize = 1024 * 8;
pub const PACKET_SIZE: usize = 256;
pub const RESP_SIZE: usize = 64 * 1024;
pub const NUM_RESP: usize = (BLOCK_SIZE * PACKET_SIZE) / RESP_SIZE;
#[derive(Clone, Default)]
pub struct Meta {
pub size: usize,
@ -85,21 +88,48 @@ impl Meta {
}
#[derive(Clone, Debug, Default)]
pub struct PacketData {
pub struct Packets {
pub packets: Vec<Packet>,
}
pub type SharedPacketData = Arc<RwLock<PacketData>>;
pub type Recycler = Arc<Mutex<Vec<SharedPacketData>>>;
pub type Receiver = mpsc::Receiver<SharedPacketData>;
pub type Sender = mpsc::Sender<SharedPacketData>;
#[derive(Clone, Debug, Default)]
pub struct Response {
pub resp: [u8; RESP_SIZE],
pub meta: Meta,
}
impl PacketData {
pub fn new() -> PacketData {
PacketData {
pub struct Responses {
pub responses: Vec<Response>,
}
pub type SharedPackets = Arc<RwLock<Packets>>;
pub type PacketRecycler = Arc<Mutex<Vec<SharedPackets>>>;
pub type Receiver = mpsc::Receiver<SharedPackets>;
pub type Sender = mpsc::Sender<SharedPackets>;
pub type SharedResponses = Arc<RwLock<Response>>;
pub type ResponseRecycler = Arc<Mutex<Vec<SharedResponses>>>;
pub type Responder = mpsc::Sender<SharedResponses>;
pub type ResponseReceiver = mpsc::Receiver<SharedResponses>;
impl Default for Responses {
pub fn default() -> Responses {
Responses {
packets: vec![Response::default(); NUM_RESP],
}
}
}
impl Default for Packets {
pub fn default() -> Packets {
Packets {
packets: vec![Packet::default(); BLOCK_SIZE],
}
}
}
impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
self.packets.resize(BLOCK_SIZE, Packet::default());
let mut i = 0;
@ -142,13 +172,17 @@ impl PacketData {
}
}
pub fn allocate(recycler: &Recycler) -> SharedPacketData {
pub fn allocate<T>(recycler: &Arc<Mutex<Vec<T>>>) -> Arc<RwLock<T>>
where T: Default
{
let mut gc = recycler.lock().expect("lock");
gc.pop()
.unwrap_or_else(|| Arc::new(RwLock::new(PacketData::new())))
.unwrap_or_else(|| Arc::new(RwLock::new(T::default())))
}
pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) {
pub fn recycle<T>(recycler: &Arc<Mutex<Vec<T>>>, msgs: Arc<RwLock<T>>)
where T: Default
{
let mut gc = recycler.lock().expect("lock");
gc.push(msgs);
}
@ -156,7 +190,7 @@ pub fn recycle(recycler: &Recycler, msgs: SharedPacketData) {
fn recv_loop(
sock: &UdpSocket,
exit: &Arc<AtomicBool>,
recycler: &Recycler,
recycler: &PacketRecycler,
channel: &Sender,
) -> Result<()> {
loop {
@ -182,7 +216,7 @@ fn recv_loop(
pub fn receiver(
sock: UdpSocket,
exit: Arc<AtomicBool>,
recycler: Recycler,
recycler: PacketRecycler,
channel: Sender,
) -> Result<JoinHandle<()>> {
let timer = Duration::new(1, 0);
@ -193,7 +227,7 @@ pub fn receiver(
}))
}
fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()> {
fn recv_send(sock: &UdpSocket, recycler: &ResponseRecycler, r: &ResponseReceiver) -> Result<()> {
let timer = Duration::new(1, 0);
let msgs = r.recv_timeout(timer)?;
let msgs_ = msgs.clone();
@ -203,11 +237,11 @@ fn recv_send(sock: &UdpSocket, recycler: &Recycler, r: &Receiver) -> Result<()>
Ok(())
}
pub fn sender(
pub fn responder(
sock: UdpSocket,
exit: Arc<AtomicBool>,
recycler: Recycler,
r: Receiver,
recycler: ResponseRecycler,
r: ResponseReceiver,
) -> JoinHandle<()> {
spawn(move || loop {
if recv_send(&sock, &recycler, &r).is_err() && exit.load(Ordering::Relaxed) {
@ -316,7 +350,7 @@ mod test {
use std::sync::mpsc::channel;
use std::io::Write;
use std::io;
use streamer::{allocate, receiver, sender, Packet, Receiver, PACKET_SIZE};
use streamer::{allocate, receiver, responder, Packet, Receiver, PACKET_SIZE};
fn get_msgs(r: Receiver, num: &mut usize) {
for _t in 0..5 {
@ -340,8 +374,8 @@ mod test {
let recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
let (s_sender, r_sender) = channel();
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), recycler.clone(), r_responder);
let msgs = allocate(&recycler);
msgs.write().unwrap().packets.resize(10, Packet::default());
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
@ -350,13 +384,13 @@ mod test {
w.set_addr(&addr);
assert_eq!(w.get_addr(), addr);
}
s_sender.send(msgs).expect("send");
s_responder.send(msgs).expect("send");
let mut num = 0;
get_msgs(r_reader, &mut num);
assert_eq!(num, 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_sender.join().expect("join");
t_responder.join().expect("join");
}
#[test]
pub fn streamer_debug() {
@ -368,25 +402,26 @@ mod test {
let addr = read.local_addr().unwrap();
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let recycler = Arc::new(Mutex::new(Vec::new()));
let packet_recycler = Arc::new(Mutex::new(Vec::new()));
let resp_recycler = Arc::new(Mutex::new(Vec::new()));
let (s_reader, r_reader) = channel();
let t_receiver = receiver(read, exit.clone(), recycler.clone(), s_reader).unwrap();
let (s_sender, r_sender) = channel();
let t_sender = sender(send, exit.clone(), recycler.clone(), r_sender);
let msgs = allocate(&recycler);
msgs.write().unwrap().packets.resize(10, Packet::default());
for (i, w) in msgs.write().unwrap().packets.iter_mut().enumerate() {
let t_receiver = receiver(read, exit.clone(), packet_recycler.clone(), s_reader).unwrap();
let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let msgs = allocate(&resp_recycler);
msgs.write().unwrap().responses.resize(10, Responses::default());
for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() {
w.data[0] = i as u8;
w.meta.size = PACKET_SIZE;
w.meta.set_addr(&addr);
assert_eq!(w.meta.get_addr(), addr);
}
s_sender.send(msgs).expect("send");
s_responder.send(msgs).expect("send");
let mut num = 0;
get_msgs(r_reader, &mut num);
assert_eq!(num, 10);
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("join");
t_sender.join().expect("join");
t_responder.join().expect("join");
}
}