diff --git a/src/tpu.rs b/src/tpu.rs index cc01f957e..f8ce29017 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -205,7 +205,20 @@ impl Tpu { Ok(()) } - pub fn deserialize_packets(p: &packet::Packets) -> Vec> { + pub fn deserialize_requests(p: &packet::Packets) -> Vec> { + p.packets + .par_iter() + .map(|x| { + deserialize(&x.data[0..x.meta.size]) + .map(|req| (req, x.meta.addr())) + .ok() + }) + .collect() + } + + // Copy-paste of deserialize_requests() because I can't figure out how to + // route the lifetimes in a generic version. + pub fn deserialize_events(p: &packet::Packets) -> Vec> { p.packets .par_iter() .map(|x| { @@ -263,7 +276,7 @@ impl Tpu { Ok(blobs) } - fn process( + fn process_request_packets( obj: &Tpu, verified_receiver: &Receiver)>>, responder_sender: &streamer::BlobSender, @@ -283,7 +296,7 @@ impl Tpu { ); let proc_start = Instant::now(); for (msgs, vers) in mms { - let reqs = Self::deserialize_packets(&msgs.read().unwrap()); + let reqs = Self::deserialize_requests(&msgs.read().unwrap()); reqs_len += reqs.len(); let req_vers = reqs.into_iter() .zip(vers) @@ -425,7 +438,7 @@ impl Tpu { let tpu = obj.clone(); let t_server = spawn(move || loop { - let e = Self::process( + let e = Self::process_request_packets( &mut tpu.clone(), &verified_receiver, &responder_sender, @@ -572,7 +585,7 @@ impl Tpu { let tpu = obj.clone(); let s_exit = exit.clone(); let t_server = spawn(move || loop { - let e = Self::process( + let e = Self::process_request_packets( &mut tpu.clone(), &verified_receiver, &responder_sender, @@ -606,7 +619,7 @@ impl Tpu { } #[cfg(test)] -pub fn to_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { +pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec) -> Vec { let mut out = vec![]; for rrs in reqs.chunks(packet::NUM_PACKETS) { let p = r.allocate(); @@ -664,7 +677,7 @@ mod tests { use std::sync::{Arc, RwLock}; use std::time::Duration; use streamer; - use tpu::{test_node, to_packets, Request, Tpu}; + use tpu::{test_node, to_request_packets, Request, Tpu}; use transaction::{memfind, test_tx, Transaction}; #[test] @@ -679,15 +692,15 @@ mod tests { fn test_to_packets() { let tr = Request::Transaction(test_tx()); let re = PacketRecycler::default(); - let rv = to_packets(&re, vec![tr.clone(); 1]); + let rv = to_request_packets(&re, vec![tr.clone(); 1]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].read().unwrap().packets.len(), 1); - let rv = to_packets(&re, vec![tr.clone(); NUM_PACKETS]); + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]); assert_eq!(rv.len(), 1); assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); - let rv = to_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); + let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]); assert_eq!(rv.len(), 2); assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS); assert_eq!(rv[1].read().unwrap().packets.len(), 1);