packet.rs optimizations (#3818)

* packet.rs optimizations

* remove redundant and aggressive metric submission

* remove metrics submit(), get compiling again, honor log level in inc()
This commit is contained in:
Rob Walker 2019-04-17 14:14:57 -07:00 committed by GitHub
parent 51a2988bb2
commit 01657ddfe7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 53 deletions

View File

@ -83,7 +83,7 @@ fn main() -> Result<()> {
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
read_channels.push(r_reader); read_channels.push(r_reader);
read_threads.push(receiver(Arc::new(read), &exit, s_reader, "bench-streamer")); read_threads.push(receiver(Arc::new(read), &exit, s_reader));
} }
let t_producer1 = producer(&addr, exit.clone()); let t_producer1 = producer(&addr, exit.clone());

View File

@ -44,7 +44,7 @@ impl FetchStage {
) -> Self { ) -> Self {
let tpu_threads = sockets let tpu_threads = sockets
.into_iter() .into_iter()
.map(|socket| streamer::receiver(socket, &exit, sender.clone(), "fetch-stage")); .map(|socket| streamer::receiver(socket, &exit, sender.clone()));
let tpu_via_blobs_threads = tpu_via_blobs_sockets let tpu_via_blobs_threads = tpu_via_blobs_sockets
.into_iter() .into_iter()

View File

@ -65,7 +65,7 @@ impl fmt::Debug for Packet {
impl Default for Packet { impl Default for Packet {
fn default() -> Packet { fn default() -> Packet {
Packet { Packet {
data: [0u8; PACKET_DATA_SIZE], data: unsafe { std::mem::uninitialized() },
meta: Meta::default(), meta: Meta::default(),
} }
} }
@ -126,7 +126,7 @@ pub struct Packets {
impl Default for Packets { impl Default for Packets {
fn default() -> Packets { fn default() -> Packets {
Packets { Packets {
packets: vec![Packet::default(); NUM_PACKETS], packets: Vec::with_capacity(NUM_RCVMMSGS),
} }
} }
} }
@ -208,8 +208,7 @@ pub enum BlobError {
} }
impl Packets { impl Packets {
fn run_read_from(&mut self, socket: &UdpSocket) -> Result<usize> { pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<usize> {
self.packets.resize(NUM_PACKETS, Packet::default());
let mut i = 0; let mut i = 0;
//DOCUMENTED SIDE-EFFECT //DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll //Performance out of the IO without poll
@ -220,11 +219,10 @@ impl Packets {
socket.set_nonblocking(false)?; socket.set_nonblocking(false)?;
trace!("receiving on {}", socket.local_addr().unwrap()); trace!("receiving on {}", socket.local_addr().unwrap());
loop { loop {
self.packets.resize(i + NUM_RCVMMSGS, Packet::default());
match recv_mmsg(socket, &mut self.packets[i..]) { match recv_mmsg(socket, &mut self.packets[i..]) {
Err(_) if i > 0 => { Err(_) if i > 0 => {
inc_new_counter_info!("packets-recv_count", i); break;
debug!("got {:?} messages on {}", i, socket.local_addr().unwrap());
return Ok(i);
} }
Err(e) => { Err(e) => {
trace!("recv_from err {:?}", e); trace!("recv_from err {:?}", e);
@ -237,19 +235,16 @@ impl Packets {
trace!("got {} packets", npkts); trace!("got {} packets", npkts);
i += npkts; i += npkts;
if npkts != NUM_RCVMMSGS || i >= 1024 { if npkts != NUM_RCVMMSGS || i >= 1024 {
break;
}
}
}
}
self.packets.truncate(i);
inc_new_counter_info!("packets-recv_count", i); inc_new_counter_info!("packets-recv_count", i);
return Ok(i); Ok(i)
}
}
}
}
}
pub fn recv_from(&mut self, socket: &UdpSocket) -> Result<()> {
let sz = self.run_read_from(socket)?;
self.packets.resize(sz, Packet::default());
debug!("recv_from: {}", sz);
Ok(())
} }
pub fn send_to(&self, socket: &UdpSocket) -> Result<()> { pub fn send_to(&self, socket: &UdpSocket) -> Result<()> {
for p in &self.packets { for p in &self.packets {
let a = p.meta.addr(); let a = p.meta.addr();
@ -615,19 +610,26 @@ mod tests {
#[test] #[test]
pub fn packet_send_recv() { pub fn packet_send_recv() {
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); solana_logger::setup();
let addr = reader.local_addr().unwrap(); let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); let addr = recv_socket.local_addr().unwrap();
let saddr = sender.local_addr().unwrap(); let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let p = SharedPackets::default(); let saddr = send_socket.local_addr().unwrap();
p.write().unwrap().packets.resize(10, Packet::default()); let mut p = Packets::default();
for m in p.write().unwrap().packets.iter_mut() {
p.packets.resize(10, Packet::default());
for m in p.packets.iter_mut() {
m.meta.set_addr(&addr); m.meta.set_addr(&addr);
m.meta.size = PACKET_DATA_SIZE; m.meta.size = PACKET_DATA_SIZE;
} }
p.read().unwrap().send_to(&sender).unwrap(); p.send_to(&send_socket).unwrap();
p.write().unwrap().recv_from(&reader).unwrap();
for m in p.write().unwrap().packets.iter_mut() { let recvd = p.recv_from(&recv_socket).unwrap();
assert_eq!(recvd, p.packets.len());
for m in p.packets {
assert_eq!(m.meta.size, PACKET_DATA_SIZE); assert_eq!(m.meta.size, PACKET_DATA_SIZE);
assert_eq!(m.meta.addr(), saddr); assert_eq!(m.meta.addr(), saddr);
} }

View File

@ -129,12 +129,7 @@ fn create_request_processor(
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let storage_socket = Arc::new(socket); let storage_socket = Arc::new(socket);
let t_receiver = receiver( let t_receiver = receiver(storage_socket.clone(), exit, s_reader);
storage_socket.clone(),
exit,
s_reader,
"replicator-receiver",
);
thread_handles.push(t_receiver); thread_handles.push(t_receiver);
let t_responder = responder("replicator-responder", storage_socket.clone(), r_responder); let t_responder = responder("replicator-responder", storage_socket.clone(), r_responder);

View File

@ -6,7 +6,6 @@ use crate::packet::{
}; };
use crate::result::{Error, Result}; use crate::result::{Error, Result};
use bincode; use bincode;
use solana_metrics::{influxdb, submit};
use solana_sdk::timing::duration_as_ms; use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -20,28 +19,17 @@ pub type PacketSender = Sender<SharedPackets>;
pub type BlobSender = Sender<SharedBlobs>; pub type BlobSender = Sender<SharedBlobs>;
pub type BlobReceiver = Receiver<SharedBlobs>; pub type BlobReceiver = Receiver<SharedBlobs>;
fn recv_loop( fn recv_loop(sock: &UdpSocket, exit: Arc<AtomicBool>, channel: &PacketSender) -> Result<()> {
sock: &UdpSocket,
exit: Arc<AtomicBool>,
channel: &PacketSender,
channel_tag: &'static str,
) -> Result<()> {
loop { loop {
let msgs = SharedPackets::default(); let mut msgs = Packets::default();
loop { loop {
// Check for exit signal, even if socket is busy // Check for exit signal, even if socket is busy
// (for instance the leader trasaction socket) // (for instance the leader trasaction socket)
if exit.load(Ordering::Relaxed) { if exit.load(Ordering::Relaxed) {
return Ok(()); return Ok(());
} }
if msgs.write().unwrap().recv_from(sock).is_ok() { if let Ok(_len) = msgs.recv_from(sock) {
let len = msgs.read().unwrap().packets.len(); channel.send(Arc::new(RwLock::new(msgs)))?;
submit(
influxdb::Point::new(channel_tag)
.add_field("count", influxdb::Value::Integer(len as i64))
.to_owned(),
);
channel.send(msgs)?;
break; break;
} }
} }
@ -52,7 +40,6 @@ pub fn receiver(
sock: Arc<UdpSocket>, sock: Arc<UdpSocket>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
packet_sender: PacketSender, packet_sender: PacketSender,
sender_tag: &'static str,
) -> JoinHandle<()> { ) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0))); let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
if res.is_err() { if res.is_err() {
@ -62,7 +49,7 @@ pub fn receiver(
Builder::new() Builder::new()
.name("solana-receiver".to_string()) .name("solana-receiver".to_string())
.spawn(move || { .spawn(move || {
let _ = recv_loop(&sock, exit, &packet_sender, sender_tag); let _ = recv_loop(&sock, exit, &packet_sender);
}) })
.unwrap() .unwrap()
} }
@ -236,7 +223,7 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = channel(); let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), &exit, s_reader, "streamer-test"); let t_receiver = receiver(Arc::new(read), &exit, s_reader);
let t_responder = { let t_responder = {
let (s_responder, r_responder) = channel(); let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);

View File

@ -38,6 +38,13 @@ macro_rules! inc_counter {
}; };
} }
#[macro_export]
macro_rules! inc_counter_info {
($name:expr, $count:expr) => {
unsafe { $name.inc(log::Level::Info, $count) };
};
}
#[macro_export] #[macro_export]
macro_rules! inc_new_counter { macro_rules! inc_new_counter {
($name:expr, $count:expr, $level:expr, $lograte:expr) => {{ ($name:expr, $count:expr, $level:expr, $lograte:expr) => {{
@ -89,7 +96,7 @@ impl Counter {
self.lograte.store(lograte, Ordering::Relaxed); self.lograte.store(lograte, Ordering::Relaxed);
} }
if times % lograte == 0 && times > 0 && log_enabled!(level) { if times % lograte == 0 && times > 0 && log_enabled!(level) {
info!( log!(level,
"COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}, \"events\": {}}}", "COUNTER:{{\"name\": \"{}\", \"counts\": {}, \"samples\": {}, \"now\": {}, \"events\": {}}}",
self.name, self.name,
counts + events, counts + events,