diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index c35216ea26..7ab9b21a56 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -7,12 +7,13 @@ use erasure; use log::Level; use packet::BlobRecycler; use result::{Error, Result}; +use service::Service; use std::mem; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; -use std::thread::{Builder, JoinHandle}; +use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::BlobReceiver; use window::{self, SharedWindow, WindowIndex, WINDOW_SIZE}; @@ -22,15 +23,15 @@ fn broadcast( broadcast_table: &[NodeInfo], window: &SharedWindow, recycler: &BlobRecycler, - r: &BlobReceiver, + receiver: &BlobReceiver, sock: &UdpSocket, transmit_index: &mut WindowIndex, receive_index: &mut u64, ) -> Result<()> { let debug_id = node_info.debug_id(); let timer = Duration::new(1, 0); - let mut dq = r.recv_timeout(timer)?; - while let Ok(mut nq) = r.try_recv() { + let mut dq = receiver.recv_timeout(timer)?; + while let Ok(mut nq) = receiver.try_recv() { dq.append(&mut nq); } @@ -119,55 +120,84 @@ fn broadcast( Ok(()) } -/// Service to broadcast messages from the leader to layer 1 nodes. -/// See `crdt` for network layer definitions. -/// # Arguments -/// * `sock` - Socket to send from. -/// * `exit` - Boolean to signal system exit. -/// * `crdt` - CRDT structure -/// * `window` - Cache of blobs that we have broadcast -/// * `recycler` - Blob recycler. -/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. -pub fn broadcaster( - sock: UdpSocket, - crdt: Arc>, - window: SharedWindow, - entry_height: u64, - recycler: BlobRecycler, - r: BlobReceiver, -) -> JoinHandle<()> { - Builder::new() - .name("solana-broadcaster".to_string()) - .spawn(move || { - let mut transmit_index = WindowIndex { - data: entry_height, - coding: entry_height, - }; - let mut receive_index = entry_height; - let me = crdt.read().unwrap().my_data().clone(); - loop { - let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); - if let Err(e) = broadcast( - &me, - &broadcast_table, - &window, - &recycler, - &r, - &sock, - &mut transmit_index, - &mut receive_index, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? - _ => { - inc_new_counter_info!("streamer-broadcaster-error", 1, 1); - error!("broadcaster error: {:?}", e); - } +pub struct BroadcastStage { + thread_hdl: JoinHandle<()>, +} + +impl BroadcastStage { + fn run( + sock: UdpSocket, + crdt: Arc>, + window: SharedWindow, + entry_height: u64, + recycler: BlobRecycler, + receiver: BlobReceiver, + ) { + let mut transmit_index = WindowIndex { + data: entry_height, + coding: entry_height, + }; + let mut receive_index = entry_height; + let me = crdt.read().unwrap().my_data().clone(); + loop { + let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); + if let Err(e) = broadcast( + &me, + &broadcast_table, + &window, + &recycler, + &receiver, + &sock, + &mut transmit_index, + &mut receive_index, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? + _ => { + inc_new_counter_info!("streamer-broadcaster-error", 1, 1); + error!("broadcaster error: {:?}", e); } } } - }) - .unwrap() + } + } + + /// Service to broadcast messages from the leader to layer 1 nodes. + /// See `crdt` for network layer definitions. + /// # Arguments + /// * `sock` - Socket to send from. + /// * `exit` - Boolean to signal system exit. + /// * `crdt` - CRDT structure + /// * `window` - Cache of blobs that we have broadcast + /// * `recycler` - Blob recycler. + /// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. + pub fn new( + sock: UdpSocket, + crdt: Arc>, + window: SharedWindow, + entry_height: u64, + recycler: BlobRecycler, + receiver: BlobReceiver, + ) -> Self { + let thread_hdl = Builder::new() + .name("solana-broadcaster".to_string()) + .spawn(move || { + Self::run(sock, crdt, window, entry_height, recycler, receiver); + }) + .unwrap(); + + BroadcastStage { thread_hdl } + } +} + +impl Service for BroadcastStage { + fn thread_hdls(self) -> Vec> { + vec![self.thread_hdl] + } + + fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } } diff --git a/src/fullnode.rs b/src/fullnode.rs index f3ee1f4754..badb533c96 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -1,7 +1,7 @@ //! The `fullnode` module hosts all the fullnode microservices. use bank::Bank; -use broadcast_stage; +use broadcast_stage::BroadcastStage; use crdt::{Crdt, NodeInfo, TestNode}; use entry::Entry; use ledger::read_ledger; @@ -230,7 +230,7 @@ impl FullNode { ).expect("Ncp::new"); thread_hdls.extend(ncp.thread_hdls()); - let t_broadcast = broadcast_stage::broadcaster( + let broadcast_stage = BroadcastStage::new( node.sockets.broadcast, crdt, window, @@ -238,7 +238,7 @@ impl FullNode { blob_recycler.clone(), blob_receiver, ); - thread_hdls.extend(vec![t_broadcast]); + thread_hdls.extend(broadcast_stage.thread_hdls()); FullNode { exit, thread_hdls } } diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 587cdc4459..1e770f8a30 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -13,8 +13,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::mpsc::channel; use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, RwLock}; -use std::thread::Builder; -use std::thread::{self, JoinHandle}; +use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::BlobReceiver; use window::{self, SharedWindow}; @@ -49,7 +48,7 @@ fn retransmit( /// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. /// * `recycler` - Blob recycler. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. -pub fn retransmitter( +fn retransmitter( sock: UdpSocket, crdt: Arc>, recycler: BlobRecycler,