From a09d2e252af6607eb692709ec4cbd34b990eec06 Mon Sep 17 00:00:00 2001 From: Greg Fitzgerald Date: Thu, 9 Aug 2018 12:54:23 -0600 Subject: [PATCH] Move window dependencies out of streamer No tests!? --- src/broadcaster.rs | 230 ++++++++++++++++++++++++++++++++++++++++++++ src/fullnode.rs | 3 +- src/lib.rs | 1 + src/streamer.rs | 215 +---------------------------------------- src/window_stage.rs | 3 +- 5 files changed, 237 insertions(+), 215 deletions(-) create mode 100644 src/broadcaster.rs diff --git a/src/broadcaster.rs b/src/broadcaster.rs new file mode 100644 index 000000000..8d618c4b2 --- /dev/null +++ b/src/broadcaster.rs @@ -0,0 +1,230 @@ +//! The `broadcaster` broadcasts data from a leader node to validators +//! +use counter::Counter; +use crdt::{Crdt, CrdtError, NodeInfo}; +#[cfg(feature = "erasure")] +use erasure; +use log::Level; +use packet::BlobRecycler; +use result::{Error, Result}; +use std::mem; +use std::net::UdpSocket; +use std::sync::atomic::AtomicUsize; +use std::sync::mpsc::RecvTimeoutError; +use std::sync::{Arc, RwLock}; +use std::thread::{Builder, JoinHandle}; +use std::time::Duration; +use streamer::BlobReceiver; +use window::{self, SharedWindow, WindowIndex, WINDOW_SIZE}; + +fn broadcast( + node_info: &NodeInfo, + broadcast_table: &[NodeInfo], + window: &SharedWindow, + recycler: &BlobRecycler, + r: &BlobReceiver, + sock: &UdpSocket, + transmit_index: &mut WindowIndex, + receive_index: &mut u64, +) -> Result<()> { + let debug_id = node_info.debug_id(); + let timer = Duration::new(1, 0); + let mut dq = r.recv_timeout(timer)?; + while let Ok(mut nq) = r.try_recv() { + dq.append(&mut nq); + } + + // flatten deque to vec + let blobs_vec: Vec<_> = dq.into_iter().collect(); + + // We could receive more blobs than window slots so + // break them up into window-sized chunks to process + let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); + + if log_enabled!(Level::Trace) { + trace!("{}", window::print_window(debug_id, window, *receive_index)); + } + + for mut blobs in blobs_chunked { + let blobs_len = blobs.len(); + trace!("{:x}: broadcast blobs.len: {}", debug_id, blobs_len); + + // Index the blobs + window::index_blobs(node_info, &blobs, receive_index) + .expect("index blobs for initial window"); + + // keep the cache of blobs that are broadcast + inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); + { + let mut win = window.write().unwrap(); + assert!(blobs.len() <= win.len()); + for b in &blobs { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix % WINDOW_SIZE) as usize; + if let Some(x) = mem::replace(&mut win[pos].data, None) { + trace!( + "{:x} popped {} at {}", + debug_id, + x.read().unwrap().get_index().unwrap(), + pos + ); + recycler.recycle(x); + } + if let Some(x) = mem::replace(&mut win[pos].coding, None) { + trace!( + "{:x} popped {} at {}", + debug_id, + x.read().unwrap().get_index().unwrap(), + pos + ); + recycler.recycle(x); + } + + trace!("{:x} null {}", debug_id, pos); + } + while let Some(b) = blobs.pop() { + let ix = b.read().unwrap().get_index().expect("blob index"); + let pos = (ix % WINDOW_SIZE) as usize; + trace!("{:x} caching {} at {}", debug_id, ix, pos); + assert!(win[pos].data.is_none()); + win[pos].data = Some(b); + } + } + + // Fill in the coding blob data from the window data blobs + #[cfg(feature = "erasure")] + { + erasure::generate_coding( + debug_id, + &mut window.write().unwrap(), + recycler, + *receive_index, + blobs_len, + &mut transmit_index.coding, + )?; + } + + *receive_index += blobs_len as u64; + + // Send blobs out from the window + Crdt::broadcast( + &node_info, + &broadcast_table, + &window, + &sock, + transmit_index, + *receive_index, + )?; + } + Ok(()) +} + +/// Service to broadcast messages from the leader to layer 1 nodes. +/// See `crdt` for network layer definitions. +/// # Arguments +/// * `sock` - Socket to send from. +/// * `exit` - Boolean to signal system exit. +/// * `crdt` - CRDT structure +/// * `window` - Cache of blobs that we have broadcast +/// * `recycler` - Blob recycler. +/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. +pub fn broadcaster( + sock: UdpSocket, + crdt: Arc>, + window: SharedWindow, + entry_height: u64, + recycler: BlobRecycler, + r: BlobReceiver, +) -> JoinHandle<()> { + Builder::new() + .name("solana-broadcaster".to_string()) + .spawn(move || { + let mut transmit_index = WindowIndex { + data: entry_height, + coding: entry_height, + }; + let mut receive_index = entry_height; + let me = crdt.read().unwrap().my_data().clone(); + loop { + let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); + if let Err(e) = broadcast( + &me, + &broadcast_table, + &window, + &recycler, + &r, + &sock, + &mut transmit_index, + &mut receive_index, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? + _ => { + inc_new_counter_info!("streamer-broadcaster-error", 1, 1); + error!("broadcaster error: {:?}", e); + } + } + } + } + }) + .unwrap() +} + +fn retransmit( + crdt: &Arc>, + recycler: &BlobRecycler, + r: &BlobReceiver, + sock: &UdpSocket, +) -> Result<()> { + let timer = Duration::new(1, 0); + let mut dq = r.recv_timeout(timer)?; + while let Ok(mut nq) = r.try_recv() { + dq.append(&mut nq); + } + { + for b in &dq { + Crdt::retransmit(&crdt, b, sock)?; + } + } + while let Some(b) = dq.pop_front() { + recycler.recycle(b); + } + Ok(()) +} + +/// Service to retransmit messages from the leader to layer 1 nodes. +/// See `crdt` for network layer definitions. +/// # Arguments +/// * `sock` - Socket to read from. Read timeout is set to 1. +/// * `exit` - Boolean to signal system exit. +/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. +/// * `recycler` - Blob recycler. +/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. +pub fn retransmitter( + sock: UdpSocket, + crdt: Arc>, + recycler: BlobRecycler, + r: BlobReceiver, +) -> JoinHandle<()> { + Builder::new() + .name("solana-retransmitter".to_string()) + .spawn(move || { + trace!("retransmitter started"); + loop { + if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => { + inc_new_counter_info!("streamer-retransmit-error", 1, 1); + error!("retransmitter error: {:?}", e); + } + } + } + } + trace!("exiting retransmitter"); + }) + .unwrap() +} diff --git a/src/fullnode.rs b/src/fullnode.rs index 2491ee047..0cd9444fb 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -1,6 +1,7 @@ //! The `fullnode` module hosts all the fullnode microservices. use bank::Bank; +use broadcaster; use crdt::{Crdt, NodeInfo, TestNode}; use entry::Entry; use ledger::{read_ledger, Block}; @@ -244,7 +245,7 @@ impl FullNode { ).expect("Ncp::new"); thread_hdls.extend(ncp.thread_hdls()); - let t_broadcast = streamer::broadcaster( + let t_broadcast = broadcaster::broadcaster( node.sockets.broadcast, crdt, window, diff --git a/src/lib.rs b/src/lib.rs index 554159f7f..6ef6b754f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ pub mod counter; pub mod bank; pub mod banking_stage; pub mod blob_fetch_stage; +pub mod broadcaster; pub mod budget; pub mod choose_gossip_peer_strategy; pub mod client; diff --git a/src/streamer.rs b/src/streamer.rs index 0fcc65578..0fe0acc53 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,7 +1,7 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! use counter::Counter; -use crdt::{Crdt, CrdtError, NodeInfo}; +use crdt::{Crdt, NodeInfo}; #[cfg(feature = "erasure")] use erasure; use log::Level; @@ -575,7 +575,7 @@ fn recv_window( Ok(()) } -fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> String { +pub fn print_window(debug_id: u64, window: &SharedWindow, consumed: u64) -> String { let pointer: Vec<_> = window .read() .unwrap() @@ -733,217 +733,6 @@ pub fn window( .unwrap() } -fn broadcast( - node_info: &NodeInfo, - broadcast_table: &[NodeInfo], - window: &SharedWindow, - recycler: &BlobRecycler, - r: &BlobReceiver, - sock: &UdpSocket, - transmit_index: &mut WindowIndex, - receive_index: &mut u64, -) -> Result<()> { - let debug_id = node_info.debug_id(); - let timer = Duration::new(1, 0); - let mut dq = r.recv_timeout(timer)?; - while let Ok(mut nq) = r.try_recv() { - dq.append(&mut nq); - } - - // flatten deque to vec - let blobs_vec: Vec<_> = dq.into_iter().collect(); - - // We could receive more blobs than window slots so - // break them up into window-sized chunks to process - let blobs_chunked = blobs_vec.chunks(WINDOW_SIZE as usize).map(|x| x.to_vec()); - - if log_enabled!(Level::Trace) { - trace!("{}", print_window(debug_id, window, *receive_index)); - } - - for mut blobs in blobs_chunked { - let blobs_len = blobs.len(); - trace!("{:x}: broadcast blobs.len: {}", debug_id, blobs_len); - - // Index the blobs - index_blobs(node_info, &blobs, receive_index).expect("index blobs for initial window"); - - // keep the cache of blobs that are broadcast - inc_new_counter_info!("streamer-broadcast-sent", blobs.len()); - { - let mut win = window.write().unwrap(); - assert!(blobs.len() <= win.len()); - for b in &blobs { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix % WINDOW_SIZE) as usize; - if let Some(x) = mem::replace(&mut win[pos].data, None) { - trace!( - "{:x} popped {} at {}", - debug_id, - x.read().unwrap().get_index().unwrap(), - pos - ); - recycler.recycle(x); - } - if let Some(x) = mem::replace(&mut win[pos].coding, None) { - trace!( - "{:x} popped {} at {}", - debug_id, - x.read().unwrap().get_index().unwrap(), - pos - ); - recycler.recycle(x); - } - - trace!("{:x} null {}", debug_id, pos); - } - while let Some(b) = blobs.pop() { - let ix = b.read().unwrap().get_index().expect("blob index"); - let pos = (ix % WINDOW_SIZE) as usize; - trace!("{:x} caching {} at {}", debug_id, ix, pos); - assert!(win[pos].data.is_none()); - win[pos].data = Some(b); - } - } - - // Fill in the coding blob data from the window data blobs - #[cfg(feature = "erasure")] - { - erasure::generate_coding( - debug_id, - &mut window.write().unwrap(), - recycler, - *receive_index, - blobs_len, - &mut transmit_index.coding, - )?; - } - - *receive_index += blobs_len as u64; - - // Send blobs out from the window - Crdt::broadcast( - &node_info, - &broadcast_table, - &window, - &sock, - transmit_index, - *receive_index, - )?; - } - Ok(()) -} - -/// Service to broadcast messages from the leader to layer 1 nodes. -/// See `crdt` for network layer definitions. -/// # Arguments -/// * `sock` - Socket to send from. -/// * `exit` - Boolean to signal system exit. -/// * `crdt` - CRDT structure -/// * `window` - Cache of blobs that we have broadcast -/// * `recycler` - Blob recycler. -/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. -pub fn broadcaster( - sock: UdpSocket, - crdt: Arc>, - window: SharedWindow, - entry_height: u64, - recycler: BlobRecycler, - r: BlobReceiver, -) -> JoinHandle<()> { - Builder::new() - .name("solana-broadcaster".to_string()) - .spawn(move || { - let mut transmit_index = WindowIndex { - data: entry_height, - coding: entry_height, - }; - let mut receive_index = entry_height; - let me = crdt.read().unwrap().my_data().clone(); - loop { - let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); - if let Err(e) = broadcast( - &me, - &broadcast_table, - &window, - &recycler, - &r, - &sock, - &mut transmit_index, - &mut receive_index, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? - _ => { - inc_new_counter_info!("streamer-broadcaster-error", 1, 1); - error!("broadcaster error: {:?}", e); - } - } - } - } - }) - .unwrap() -} - -fn retransmit( - crdt: &Arc>, - recycler: &BlobRecycler, - r: &BlobReceiver, - sock: &UdpSocket, -) -> Result<()> { - let timer = Duration::new(1, 0); - let mut dq = r.recv_timeout(timer)?; - while let Ok(mut nq) = r.try_recv() { - dq.append(&mut nq); - } - { - for b in &dq { - Crdt::retransmit(&crdt, b, sock)?; - } - } - while let Some(b) = dq.pop_front() { - recycler.recycle(b); - } - Ok(()) -} - -/// Service to retransmit messages from the leader to layer 1 nodes. -/// See `crdt` for network layer definitions. -/// # Arguments -/// * `sock` - Socket to read from. Read timeout is set to 1. -/// * `exit` - Boolean to signal system exit. -/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. -/// * `recycler` - Blob recycler. -/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. -pub fn retransmitter( - sock: UdpSocket, - crdt: Arc>, - recycler: BlobRecycler, - r: BlobReceiver, -) -> JoinHandle<()> { - Builder::new() - .name("solana-retransmitter".to_string()) - .spawn(move || { - trace!("retransmitter started"); - loop { - if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => { - inc_new_counter_info!("streamer-retransmit-error", 1, 1); - error!("retransmitter error: {:?}", e); - } - } - } - } - trace!("exiting retransmitter"); - }) - .unwrap() -} - #[cfg(test)] mod test { use crdt::{Crdt, TestNode}; diff --git a/src/window_stage.rs b/src/window_stage.rs index dd7d68f8d..e6daaebc7 100644 --- a/src/window_stage.rs +++ b/src/window_stage.rs @@ -1,5 +1,6 @@ //! The `window_stage` maintains the blob window +use broadcaster; use crdt::Crdt; use packet::BlobRecycler; use service::Service; @@ -24,7 +25,7 @@ impl WindowStage { ) -> (Self, BlobReceiver) { let (retransmit_sender, retransmit_receiver) = channel(); - let t_retransmit = streamer::retransmitter( + let t_retransmit = broadcaster::retransmitter( retransmit_socket, crdt.clone(), blob_recycler.clone(),