This commit is contained in:
Anatoly Yakovenko 2018-03-25 00:06:48 -07:00
parent 8ad90807ee
commit e39c0b34e5
2 changed files with 64 additions and 31 deletions

View File

@ -110,7 +110,9 @@ impl AccountantSkel {
let req = deserialize(&packet.data[0..sz])?;
if let Some(resp) = self.process_request(req) {
if ursps.len() <= num {
ursps.responses.resize(num * 2, streamer::Response::default());
ursps
.responses
.resize(num * 2, streamer::Response::default());
}
let rsp = &mut ursps.responses[num];
let v = serialize(&resp)?;
@ -146,12 +148,18 @@ impl AccountantSkel {
let t_receiver = streamer::receiver(read, exit.clone(), packet_recycler.clone(), s_reader)?;
let (s_responder, r_responder) = channel();
let t_responder = streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder);
let t_responder =
streamer::responder(write, exit.clone(), response_recycler.clone(), r_responder);
let t_server = spawn(move || {
if let Ok(me) = Arc::try_unwrap(obj) {
loop {
let e = me.lock().unwrap().process(&r_reader, &s_responder, &packet_recycler, &response_recycler);
let e = me.lock().unwrap().process(
&r_reader,
&s_responder,
&packet_recycler,
&response_recycler,
);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}

View File

@ -25,6 +25,7 @@ pub struct Packet {
pub data: [u8; PACKET_SIZE],
pub meta: Meta,
}
impl fmt::Debug for Packet {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
@ -35,6 +36,7 @@ impl fmt::Debug for Packet {
)
}
}
impl Default for Packet {
fn default() -> Packet {
Packet {
@ -43,6 +45,7 @@ impl Default for Packet {
}
}
}
impl Meta {
pub fn get_addr(&self) -> SocketAddr {
if !self.v6 {
@ -87,48 +90,65 @@ impl Meta {
}
}
#[derive(Clone, Debug, Default)]
pub struct Packets {
pub packets: Vec<Packet>,
}
#[derive(Clone, Debug, Default)]
impl Default for Packets {
fn default() -> Packets {
Packets {
packets: vec![Packet::default(); BLOCK_SIZE],
}
}
}
#[derive(Clone)]
pub struct Response {
pub resp: [u8; RESP_SIZE],
pub data: [u8; RESP_SIZE],
pub meta: Meta,
}
impl fmt::Debug for Response {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Response {{ size: {:?}, addr: {:?} }}",
self.meta.size,
self.meta.get_addr()
)
}
}
impl Default for Response {
fn default() -> Response {
Response {
data: [0u8; RESP_SIZE],
meta: Meta::default(),
}
}
}
pub struct Responses {
pub responses: Vec<Response>,
}
impl Default for Responses {
fn default() -> Responses {
Responses {
responses: vec![Response::default(); NUM_RESP],
}
}
}
pub type SharedPackets = Arc<RwLock<Packets>>;
pub type PacketRecycler = Arc<Mutex<Vec<SharedPackets>>>;
pub type Receiver = mpsc::Receiver<SharedPackets>;
pub type Sender = mpsc::Sender<SharedPackets>;
pub type SharedResponses = Arc<RwLock<Response>>;
pub type SharedResponses = Arc<RwLock<Responses>>;
pub type ResponseRecycler = Arc<Mutex<Vec<SharedResponses>>>;
pub type Responder = mpsc::Sender<SharedResponses>;
pub type ResponseReceiver = mpsc::Receiver<SharedResponses>;
impl Default for Responses {
pub fn default() -> Responses {
Responses {
packets: vec![Response::default(); NUM_RESP],
}
}
}
impl Default for Packets {
pub fn default() -> Packets {
Packets {
packets: vec![Packet::default(); BLOCK_SIZE],
}
}
}
impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> {
self.packets.resize(BLOCK_SIZE, Packet::default());
@ -172,16 +192,18 @@ impl Packets {
}
}
pub fn allocate<T>(recycler: &Arc<Mutex<Vec<T>>>) -> Arc<RwLock<T>>
where T: Default
pub fn allocate<T>(recycler: &Arc<Mutex<Vec<Arc<RwLock<T>>>>>) -> Arc<RwLock<T>>
where
T: Default,
{
let mut gc = recycler.lock().expect("lock");
gc.pop()
.unwrap_or_else(|| Arc::new(RwLock::new(T::default())))
.unwrap_or_else(|| Arc::new(RwLock::new(Default::default())))
}
pub fn recycle<T>(recycler: &Arc<Mutex<Vec<T>>>, msgs: Arc<RwLock<T>>)
where T: Default
pub fn recycle<T>(recycler: &Arc<Mutex<Vec<Arc<RwLock<T>>>>>, msgs: Arc<RwLock<T>>)
where
T: Default,
{
let mut gc = recycler.lock().expect("lock");
gc.push(msgs);
@ -350,7 +372,7 @@ mod test {
use std::sync::mpsc::channel;
use std::io::Write;
use std::io;
use streamer::{allocate, receiver, responder, Packet, Receiver, PACKET_SIZE};
use streamer::{allocate, receiver, responder, Packet, Receiver, Responses, PACKET_SIZE};
fn get_msgs(r: Receiver, num: &mut usize) {
for _t in 0..5 {
@ -409,7 +431,10 @@ mod test {
let (s_responder, r_responder) = channel();
let t_responder = responder(send, exit.clone(), resp_recycler.clone(), r_responder);
let msgs = allocate(&resp_recycler);
msgs.write().unwrap().responses.resize(10, Responses::default());
msgs.write()
.unwrap()
.responses
.resize(10, Responses::default());
for (i, w) in msgs.write().unwrap().responses.iter_mut().enumerate() {
w.data[0] = i as u8;
w.meta.size = PACKET_SIZE;