solana/src/retransmit_stage.rs

124 lines
3.5 KiB
Rust
Raw Normal View History

//! The `retransmit_stage` retransmits blobs between validators
use counter::Counter;
use crdt::Crdt;
use log::Level;
2018-06-27 11:33:56 -07:00
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel;
use std::sync::mpsc::RecvTimeoutError;
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
2018-08-09 12:31:34 -07:00
use streamer::BlobReceiver;
use window::{self, SharedWindow};
fn retransmit(
crdt: &Arc<RwLock<Crdt>>,
recycler: &BlobRecycler,
r: &BlobReceiver,
sock: &UdpSocket,
) -> Result<()> {
let timer = Duration::new(1, 0);
let mut dq = r.recv_timeout(timer)?;
while let Ok(mut nq) = r.try_recv() {
dq.append(&mut nq);
}
{
for b in &dq {
Crdt::retransmit(&crdt, b, sock)?;
}
}
while let Some(b) = dq.pop_front() {
recycler.recycle(b);
}
Ok(())
}
/// Service to retransmit messages from the leader to layer 1 nodes.
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
fn retransmitter(
sock: Arc<UdpSocket>,
crdt: Arc<RwLock<Crdt>>,
recycler: BlobRecycler,
r: BlobReceiver,
) -> JoinHandle<()> {
Builder::new()
.name("solana-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
loop {
if let Err(e) = retransmit(&crdt, &recycler, &r, &sock) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
inc_new_counter_info!("streamer-retransmit-error", 1, 1);
}
}
}
}
trace!("exiting retransmitter");
})
.unwrap()
}
pub struct RetransmitStage {
thread_hdls: Vec<JoinHandle<()>>,
}
impl RetransmitStage {
pub fn new(
2018-07-11 13:40:46 -07:00
crdt: &Arc<RwLock<Crdt>>,
window: SharedWindow,
2018-06-27 12:35:58 -07:00
entry_height: u64,
retransmit_socket: Arc<UdpSocket>,
2018-07-11 13:40:46 -07:00
blob_recycler: &BlobRecycler,
2018-06-27 11:33:56 -07:00
fetch_stage_receiver: BlobReceiver,
) -> (Self, BlobReceiver) {
let (retransmit_sender, retransmit_receiver) = channel();
let t_retransmit = retransmitter(
retransmit_socket,
crdt.clone(),
blob_recycler.clone(),
retransmit_receiver,
);
let (blob_sender, blob_receiver) = channel();
2018-08-09 12:31:34 -07:00
let t_window = window::window(
crdt.clone(),
window,
2018-06-27 12:35:58 -07:00
entry_height,
blob_recycler.clone(),
fetch_stage_receiver,
blob_sender,
retransmit_sender,
);
let thread_hdls = vec![t_retransmit, t_window];
(RetransmitStage { thread_hdls }, blob_receiver)
}
}
impl Service for RetransmitStage {
fn thread_hdls(self) -> Vec<JoinHandle<()>> {
self.thread_hdls
}
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls() {
thread_hdl.join()?;
}
Ok(())
}
}