Rename broadcaster to broadcast_stage

And move retransmitter code into retransmit_stage.

TODO: Add a BroadcastStage service
This commit is contained in:
Greg Fitzgerald 2018-08-09 14:41:21 -06:00
parent 8707abe091
commit 0e66606c7f
4 changed files with 70 additions and 63 deletions

View File

@ -1,4 +1,4 @@
//! The `broadcaster` broadcasts data from a leader node to validators
//! The `broadcast_stage` broadcasts data from a leader node to validators
//!
use counter::Counter;
use crdt::{Crdt, CrdtError, NodeInfo};
@ -171,60 +171,3 @@ pub fn broadcaster(
})
.unwrap()
}
fn retransmit(
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
{
for b in &dq {
Crdt::retransmit(&crdt, b, sock)?;
}
}
while let Some(b) = dq.pop_front() {
recycler.recycle(b);
}
Ok(())
}
/// Service to retransmit messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
Builder::new()
.name("solana-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
loop {
if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-retransmit-error", 1, 1);
error!("retransmitter error: {:?}", e);
}
}
}
}
trace!("exiting retransmitter");
})
.unwrap()
}

View File

@ -1,7 +1,7 @@
//! The `fullnode` module hosts all the fullnode microservices.
use bank::Bank;
use broadcaster;
use broadcast_stage;
use crdt::{Crdt, NodeInfo, TestNode};
use entry::Entry;
use ledger::read_ledger;
@ -230,7 +230,7 @@ impl FullNode {
).expect("Ncp::new");
thread_hdls.extend(ncp.thread_hdls());
let t_broadcast = broadcaster::broadcaster(
let t_broadcast = broadcast_stage::broadcaster(
node.sockets.broadcast,
crdt,
window,

View File

@ -12,7 +12,7 @@ pub mod counter;
pub mod bank;
pub mod banking_stage;
pub mod blob_fetch_stage;
pub mod broadcaster;
pub mod broadcast_stage;
pub mod budget;
pub mod choose_gossip_peer_strategy;
pub mod client;

View File

@ -1,16 +1,80 @@
//! The `retransmit_stage` retransmits blobs between validators
use broadcaster;
use counter::Counter;
use crdt::Crdt;
#[cfg(feature = "erasure")]
use erasure;
use log::Level;
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::Builder;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use window::{self, SharedWindow};
fn retransmit(
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
{
for b in &dq {
Crdt::retransmit(&crdt, b, sock)?;
}
}
while let Some(b) = dq.pop_front() {
recycler.recycle(b);
}
Ok(())
}
/// Service to retransmit messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
sock: UdpSocket,
crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
Builder::new()
.name("solana-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
loop {
if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-retransmit-error", 1, 1);
}
}
}
}
trace!("exiting retransmitter");
})
.unwrap()
}
pub struct RetransmitStage {
thread_hdls: Vec<JoinHandle<()>>,
}
@ -26,7 +90,7 @@ impl RetransmitStage {
) -> (Self, BlobReceiver) {
let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = broadcaster::retransmitter(
let t_retransmit = retransmitter(
retransmit_socket,
crdt.clone(),
blob_recycler.clone(),