Make BroadcastStage an actual stage

TODO: Why isn't BroadcastStage/RetransmitStage managed by the NCP?
This commit is contained in:
Greg Fitzgerald 2018-08-09 15:17:50 -06:00
parent 0e66606c7f
commit c830c604f4
3 changed files with 87 additions and 58 deletions

View File

@ -7,12 +7,13 @@ use erasure;
use log::Level; use log::Level;
use packet::BlobRecycler; use packet::BlobRecycler;
use result::{Error, Result}; use result::{Error, Result};
use service::Service;
use std::mem; use std::mem;
use std::net::UdpSocket; use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::{Builder, JoinHandle}; use std::thread::{self, Builder, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::BlobReceiver; use streamer::BlobReceiver;
use window::{self, SharedWindow, WindowIndex, WINDOW_SIZE}; use window::{self, SharedWindow, WindowIndex, WINDOW_SIZE};
@ -22,15 +23,15 @@ fn broadcast(
broadcast_table: &[NodeInfo], broadcast_table: &[NodeInfo],
window: &SharedWindow, window: &SharedWindow,
recycler: &BlobRecycler, recycler: &BlobRecycler,
r: &BlobReceiver, receiver: &BlobReceiver,
sock: &UdpSocket, sock: &UdpSocket,
transmit_index: &mut WindowIndex, transmit_index: &mut WindowIndex,
receive_index: &mut u64, receive_index: &mut u64,
) -> Result<()> { ) -> Result<()> {
let debug_id = node_info.debug_id(); let debug_id = node_info.debug_id();
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?; let mut dq = receiver.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() { while let Ok(mut nq) = receiver.try_recv() {
dq.append(&mut nq); dq.append(&mut nq);
} }
@ -119,55 +120,84 @@ fn broadcast(
Ok(()) Ok(())
} }
/// Service to broadcast messages from the leader to layer 1 nodes. pub struct BroadcastStage {
/// See `crdt` for network layer definitions. thread_hdl: JoinHandle<()>,
/// # Arguments }
/// * `sock` - Socket to send from.
/// * `exit` - Boolean to signal system exit. impl BroadcastStage {
/// * `crdt` - CRDT structure fn run(
/// * `window` - Cache of blobs that we have broadcast sock: UdpSocket,
/// * `recycler` - Blob recycler. crdt: Arc<RwLock<Crdt>>,
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. window: SharedWindow,
pub fn broadcaster( entry_height: u64,
sock: UdpSocket, recycler: BlobRecycler,
crdt: Arc<RwLock<Crdt>>, receiver: BlobReceiver,
window: SharedWindow, ) {
entry_height: u64, let mut transmit_index = WindowIndex {
recycler: BlobRecycler, data: entry_height,
r: BlobReceiver, coding: entry_height,
) -> JoinHandle<()> { };
Builder::new() let mut receive_index = entry_height;
.name("solana-broadcaster".to_string()) let me = crdt.read().unwrap().my_data().clone();
.spawn(move || { loop {
let mut transmit_index = WindowIndex { let broadcast_table = crdt.read().unwrap().compute_broadcast_table();
data: entry_height, if let Err(e) = broadcast(
coding: entry_height, &me,
}; &broadcast_table,
let mut receive_index = entry_height; &window,
let me = crdt.read().unwrap().my_data().clone(); &recycler,
loop { &receiver,
let broadcast_table = crdt.read().unwrap().compute_broadcast_table(); &sock,
if let Err(e) = broadcast( &mut transmit_index,
&me, &mut receive_index,
&broadcast_table, ) {
&window, match e {
&recycler, Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
&r, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
&sock, Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
&mut transmit_index, _ => {
&mut receive_index, inc_new_counter_info!("streamer-broadcaster-error", 1, 1);
) { error!("broadcaster error: {:?}", e);
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::CrdtError(CrdtError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => {
inc_new_counter_info!("streamer-broadcaster-error", 1, 1);
error!("broadcaster error: {:?}", e);
}
} }
} }
} }
}) }
.unwrap() }
/// Service to broadcast messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to send from.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - CRDT structure
/// * `window` - Cache of blobs that we have broadcast
/// * `recycler` - Blob recycler.
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn new(
sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>,
window: SharedWindow,
entry_height: u64,
recycler: BlobRecycler,
receiver: BlobReceiver,
) -> Self {
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
Self::run(sock, crdt, window, entry_height, recycler, receiver);
})
.unwrap();
BroadcastStage { thread_hdl }
}
}
impl Service for BroadcastStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
vec![self.thread_hdl]
}
fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
} }

View File

@ -1,7 +1,7 @@
//! The `fullnode` module hosts all the fullnode microservices. //! The `fullnode` module hosts all the fullnode microservices.
use bank::Bank; use bank::Bank;
use broadcast_stage; use broadcast_stage::BroadcastStage;
use crdt::{Crdt, NodeInfo, TestNode}; use crdt::{Crdt, NodeInfo, TestNode};
use entry::Entry; use entry::Entry;
use ledger::read_ledger; use ledger::read_ledger;
@ -230,7 +230,7 @@ impl FullNode {
).expect("Ncp::new"); ).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls()); thread_hdls.extend(ncp.thread_hdls());
let t_broadcast = broadcast_stage::broadcaster( let broadcast_stage = BroadcastStage::new(
node.sockets.broadcast, node.sockets.broadcast,
crdt, crdt,
window, window,
@ -238,7 +238,7 @@ impl FullNode {
blob_recycler.clone(), blob_recycler.clone(),
blob_receiver, blob_receiver,
); );
thread_hdls.extend(vec![t_broadcast]); thread_hdls.extend(broadcast_stage.thread_hdls());
FullNode { exit, thread_hdls } FullNode { exit, thread_hdls }
} }

View File

@ -13,8 +13,7 @@ use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError; use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread::Builder; use std::thread::{self, Builder, JoinHandle};
use std::thread::{self, JoinHandle};
use std::time::Duration; use std::time::Duration;
use streamer::BlobReceiver; use streamer::BlobReceiver;
use window::{self, SharedWindow}; use window::{self, SharedWindow};
@ -49,7 +48,7 @@ fn retransmit(
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. /// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler. /// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter( fn retransmitter(
sock: UdpSocket, sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>, crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler, recycler: BlobRecycler,