More precise function names
This commit is contained in:
parent
64a892321a
commit
4fdd891b54
33
src/tpu.rs
33
src/tpu.rs
|
@ -205,7 +205,20 @@ impl Tpu {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn deserialize_packets(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
|
pub fn deserialize_requests(p: &packet::Packets) -> Vec<Option<(Request, SocketAddr)>> {
|
||||||
|
p.packets
|
||||||
|
.par_iter()
|
||||||
|
.map(|x| {
|
||||||
|
deserialize(&x.data[0..x.meta.size])
|
||||||
|
.map(|req| (req, x.meta.addr()))
|
||||||
|
.ok()
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy-paste of deserialize_requests() because I can't figure out how to
|
||||||
|
// route the lifetimes in a generic version.
|
||||||
|
pub fn deserialize_events(p: &packet::Packets) -> Vec<Option<(Event, SocketAddr)>> {
|
||||||
p.packets
|
p.packets
|
||||||
.par_iter()
|
.par_iter()
|
||||||
.map(|x| {
|
.map(|x| {
|
||||||
|
@ -263,7 +276,7 @@ impl Tpu {
|
||||||
Ok(blobs)
|
Ok(blobs)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process(
|
fn process_request_packets(
|
||||||
obj: &Tpu,
|
obj: &Tpu,
|
||||||
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
verified_receiver: &Receiver<Vec<(SharedPackets, Vec<u8>)>>,
|
||||||
responder_sender: &streamer::BlobSender,
|
responder_sender: &streamer::BlobSender,
|
||||||
|
@ -283,7 +296,7 @@ impl Tpu {
|
||||||
);
|
);
|
||||||
let proc_start = Instant::now();
|
let proc_start = Instant::now();
|
||||||
for (msgs, vers) in mms {
|
for (msgs, vers) in mms {
|
||||||
let reqs = Self::deserialize_packets(&msgs.read().unwrap());
|
let reqs = Self::deserialize_requests(&msgs.read().unwrap());
|
||||||
reqs_len += reqs.len();
|
reqs_len += reqs.len();
|
||||||
let req_vers = reqs.into_iter()
|
let req_vers = reqs.into_iter()
|
||||||
.zip(vers)
|
.zip(vers)
|
||||||
|
@ -425,7 +438,7 @@ impl Tpu {
|
||||||
|
|
||||||
let tpu = obj.clone();
|
let tpu = obj.clone();
|
||||||
let t_server = spawn(move || loop {
|
let t_server = spawn(move || loop {
|
||||||
let e = Self::process(
|
let e = Self::process_request_packets(
|
||||||
&mut tpu.clone(),
|
&mut tpu.clone(),
|
||||||
&verified_receiver,
|
&verified_receiver,
|
||||||
&responder_sender,
|
&responder_sender,
|
||||||
|
@ -572,7 +585,7 @@ impl Tpu {
|
||||||
let tpu = obj.clone();
|
let tpu = obj.clone();
|
||||||
let s_exit = exit.clone();
|
let s_exit = exit.clone();
|
||||||
let t_server = spawn(move || loop {
|
let t_server = spawn(move || loop {
|
||||||
let e = Self::process(
|
let e = Self::process_request_packets(
|
||||||
&mut tpu.clone(),
|
&mut tpu.clone(),
|
||||||
&verified_receiver,
|
&verified_receiver,
|
||||||
&responder_sender,
|
&responder_sender,
|
||||||
|
@ -606,7 +619,7 @@ impl Tpu {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub fn to_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
|
pub fn to_request_packets(r: &packet::PacketRecycler, reqs: Vec<Request>) -> Vec<SharedPackets> {
|
||||||
let mut out = vec![];
|
let mut out = vec![];
|
||||||
for rrs in reqs.chunks(packet::NUM_PACKETS) {
|
for rrs in reqs.chunks(packet::NUM_PACKETS) {
|
||||||
let p = r.allocate();
|
let p = r.allocate();
|
||||||
|
@ -664,7 +677,7 @@ mod tests {
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use streamer;
|
use streamer;
|
||||||
use tpu::{test_node, to_packets, Request, Tpu};
|
use tpu::{test_node, to_request_packets, Request, Tpu};
|
||||||
use transaction::{memfind, test_tx, Transaction};
|
use transaction::{memfind, test_tx, Transaction};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -679,15 +692,15 @@ mod tests {
|
||||||
fn test_to_packets() {
|
fn test_to_packets() {
|
||||||
let tr = Request::Transaction(test_tx());
|
let tr = Request::Transaction(test_tx());
|
||||||
let re = PacketRecycler::default();
|
let re = PacketRecycler::default();
|
||||||
let rv = to_packets(&re, vec![tr.clone(); 1]);
|
let rv = to_request_packets(&re, vec![tr.clone(); 1]);
|
||||||
assert_eq!(rv.len(), 1);
|
assert_eq!(rv.len(), 1);
|
||||||
assert_eq!(rv[0].read().unwrap().packets.len(), 1);
|
assert_eq!(rv[0].read().unwrap().packets.len(), 1);
|
||||||
|
|
||||||
let rv = to_packets(&re, vec![tr.clone(); NUM_PACKETS]);
|
let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS]);
|
||||||
assert_eq!(rv.len(), 1);
|
assert_eq!(rv.len(), 1);
|
||||||
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
|
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
|
||||||
|
|
||||||
let rv = to_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]);
|
let rv = to_request_packets(&re, vec![tr.clone(); NUM_PACKETS + 1]);
|
||||||
assert_eq!(rv.len(), 2);
|
assert_eq!(rv.len(), 2);
|
||||||
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
|
assert_eq!(rv[0].read().unwrap().packets.len(), NUM_PACKETS);
|
||||||
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
|
assert_eq!(rv[1].read().unwrap().packets.len(), 1);
|
||||||
|
|
Loading…
Reference in New Issue