diff --git a/src/broadcaster.rs b/src/broadcast_stage.rs similarity index 75% rename from src/broadcaster.rs rename to src/broadcast_stage.rs index 8d618c4b2f..c35216ea26 100644 --- a/src/broadcaster.rs +++ b/src/broadcast_stage.rs @@ -1,4 +1,4 @@ -//! The `broadcaster` broadcasts data from a leader node to validators +//! The `broadcast_stage` broadcasts data from a leader node to validators //! use counter::Counter; use crdt::{Crdt, CrdtError, NodeInfo}; @@ -171,60 +171,3 @@ pub fn broadcaster( }) .unwrap() } - -fn retransmit( - crdt: &Arc>, - recycler: &BlobRecycler, - r: &BlobReceiver, - sock: &UdpSocket, -) -> Result<()> { - let timer = Duration::new(1, 0); - let mut dq = r.recv_timeout(timer)?; - while let Ok(mut nq) = r.try_recv() { - dq.append(&mut nq); - } - { - for b in &dq { - Crdt::retransmit(&crdt, b, sock)?; - } - } - while let Some(b) = dq.pop_front() { - recycler.recycle(b); - } - Ok(()) -} - -/// Service to retransmit messages from the leader to layer 1 nodes. -/// See `crdt` for network layer definitions. -/// # Arguments -/// * `sock` - Socket to read from. Read timeout is set to 1. -/// * `exit` - Boolean to signal system exit. -/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. -/// * `recycler` - Blob recycler. -/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. -pub fn retransmitter( - sock: UdpSocket, - crdt: Arc>, - recycler: BlobRecycler, - r: BlobReceiver, -) -> JoinHandle<()> { - Builder::new() - .name("solana-retransmitter".to_string()) - .spawn(move || { - trace!("retransmitter started"); - loop { - if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => { - inc_new_counter_info!("streamer-retransmit-error", 1, 1); - error!("retransmitter error: {:?}", e); - } - } - } - } - trace!("exiting retransmitter"); - }) - .unwrap() -} diff --git a/src/fullnode.rs b/src/fullnode.rs index 48d7f07e91..f3ee1f4754 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -1,7 +1,7 @@ //! The `fullnode` module hosts all the fullnode microservices. use bank::Bank; -use broadcaster; +use broadcast_stage; use crdt::{Crdt, NodeInfo, TestNode}; use entry::Entry; use ledger::read_ledger; @@ -230,7 +230,7 @@ impl FullNode { ).expect("Ncp::new"); thread_hdls.extend(ncp.thread_hdls()); - let t_broadcast = broadcaster::broadcaster( + let t_broadcast = broadcast_stage::broadcaster( node.sockets.broadcast, crdt, window, diff --git a/src/lib.rs b/src/lib.rs index 15a53a063a..1dd4b30531 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ pub mod counter; pub mod bank; pub mod banking_stage; pub mod blob_fetch_stage; -pub mod broadcaster; +pub mod broadcast_stage; pub mod budget; pub mod choose_gossip_peer_strategy; pub mod client; diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index dc80a48226..587cdc4459 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -1,16 +1,80 @@ //! The `retransmit_stage` retransmits blobs between validators -use broadcaster; +use counter::Counter; use crdt::Crdt; +#[cfg(feature = "erasure")] +use erasure; +use log::Level; use packet::BlobRecycler; +use result::{Error, Result}; use service::Service; use std::net::UdpSocket; +use std::sync::atomic::AtomicUsize; use std::sync::mpsc::channel; +use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; +use std::thread::Builder; use std::thread::{self, JoinHandle}; +use std::time::Duration; use streamer::BlobReceiver; use window::{self, SharedWindow}; +fn retransmit( + crdt: &Arc>, + recycler: &BlobRecycler, + r: &BlobReceiver, + sock: &UdpSocket, +) -> Result<()> { + let timer = Duration::new(1, 0); + let mut dq = r.recv_timeout(timer)?; + while let Ok(mut nq) = r.try_recv() { + dq.append(&mut nq); + } + { + for b in &dq { + Crdt::retransmit(&crdt, b, sock)?; + } + } + while let Some(b) = dq.pop_front() { + recycler.recycle(b); + } + Ok(()) +} + +/// Service to retransmit messages from the leader to layer 1 nodes. +/// See `crdt` for network layer definitions. +/// # Arguments +/// * `sock` - Socket to read from. Read timeout is set to 1. +/// * `exit` - Boolean to signal system exit. +/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. +/// * `recycler` - Blob recycler. +/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. +pub fn retransmitter( + sock: UdpSocket, + crdt: Arc>, + recycler: BlobRecycler, + r: BlobReceiver, +) -> JoinHandle<()> { + Builder::new() + .name("solana-retransmitter".to_string()) + .spawn(move || { + trace!("retransmitter started"); + loop { + if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => { + inc_new_counter_info!("streamer-retransmit-error", 1, 1); + } + } + } + } + trace!("exiting retransmitter"); + }) + .unwrap() +} + pub struct RetransmitStage { thread_hdls: Vec>, } @@ -26,7 +90,7 @@ impl RetransmitStage { ) -> (Self, BlobReceiver) { let (retransmit_sender, retransmit_receiver) = channel(); - let t_retransmit = broadcaster::retransmitter( + let t_retransmit = retransmitter( retransmit_socket, crdt.clone(), blob_recycler.clone(),