2018-08-09 12:03:34 -07:00
|
|
|
//! The `retransmit_stage` retransmits blobs between validators
|
2018-06-13 21:52:23 -07:00
|
|
|
|
2019-02-21 11:19:45 -08:00
|
|
|
use crate::bank_forks::BankForks;
|
2019-05-09 14:10:04 -07:00
|
|
|
use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
|
2019-05-07 13:24:58 -07:00
|
|
|
use crate::cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT};
|
2019-04-22 18:41:01 -07:00
|
|
|
use crate::leader_schedule_cache::LeaderScheduleCache;
|
2019-05-09 14:10:04 -07:00
|
|
|
use crate::repair_service::RepairStrategy;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::result::{Error, Result};
|
|
|
|
use crate::service::Service;
|
2019-02-28 13:15:25 -08:00
|
|
|
use crate::staking_utils;
|
2019-08-20 17:16:06 -07:00
|
|
|
use crate::streamer::PacketReceiver;
|
2019-05-13 21:19:51 -07:00
|
|
|
use crate::window_service::{should_retransmit_and_persist, WindowService};
|
2019-06-01 07:55:43 -07:00
|
|
|
use rand::SeedableRng;
|
|
|
|
use rand_chacha::ChaChaRng;
|
2019-10-07 15:33:22 -07:00
|
|
|
use solana_measure::measure::Measure;
|
|
|
|
use solana_metrics::{datapoint_debug, inc_new_counter_error};
|
2019-05-13 16:24:32 -07:00
|
|
|
use solana_runtime::epoch_schedule::EpochSchedule;
|
2019-07-30 13:18:33 -07:00
|
|
|
use std::cmp;
|
2018-06-13 21:52:23 -07:00
|
|
|
use std::net::UdpSocket;
|
2019-02-13 20:04:20 -08:00
|
|
|
use std::sync::atomic::AtomicBool;
|
2019-02-04 15:33:43 -08:00
|
|
|
use std::sync::mpsc::channel;
|
2018-08-09 13:41:21 -07:00
|
|
|
use std::sync::mpsc::RecvTimeoutError;
|
2018-06-13 21:52:23 -07:00
|
|
|
use std::sync::{Arc, RwLock};
|
2018-08-09 14:17:50 -07:00
|
|
|
use std::thread::{self, Builder, JoinHandle};
|
2018-08-09 13:41:21 -07:00
|
|
|
use std::time::Duration;
|
2018-06-13 21:52:23 -07:00
|
|
|
|
2019-10-06 12:56:17 -07:00
|
|
|
pub fn retransmit(
|
2019-02-21 11:19:45 -08:00
|
|
|
bank_forks: &Arc<RwLock<BankForks>>,
|
2019-04-22 18:41:01 -07:00
|
|
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
2019-02-11 16:20:31 -08:00
|
|
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
2019-08-20 17:16:06 -07:00
|
|
|
r: &PacketReceiver,
|
2019-02-11 16:20:31 -08:00
|
|
|
sock: &UdpSocket,
|
|
|
|
) -> Result<()> {
|
|
|
|
let timer = Duration::new(1, 0);
|
2019-10-07 15:33:22 -07:00
|
|
|
let packets = r.recv_timeout(timer)?;
|
|
|
|
let mut timer_start = Measure::start("retransmit");
|
|
|
|
let mut total_packets = packets.packets.len();
|
|
|
|
let mut packet_v = vec![packets];
|
|
|
|
while let Ok(nq) = r.try_recv() {
|
|
|
|
total_packets += nq.packets.len();
|
|
|
|
packet_v.push(nq);
|
2019-02-11 16:20:31 -08:00
|
|
|
}
|
|
|
|
|
2019-10-07 15:33:22 -07:00
|
|
|
datapoint_debug!("retransmit-stage", ("count", total_packets, i64));
|
2019-05-10 08:33:58 -07:00
|
|
|
|
2019-04-19 22:31:40 -07:00
|
|
|
let r_bank = bank_forks.read().unwrap().working_bank();
|
|
|
|
let bank_epoch = r_bank.get_stakers_epoch(r_bank.slot());
|
2019-07-30 13:18:33 -07:00
|
|
|
let mut peers_len = 0;
|
2019-10-04 11:52:02 -07:00
|
|
|
let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch);
|
|
|
|
let (peers, stakes_and_index) = cluster_info
|
|
|
|
.read()
|
|
|
|
.unwrap()
|
|
|
|
.sorted_retransmit_peers_and_stakes(stakes.as_ref());
|
2019-10-07 15:33:22 -07:00
|
|
|
let mut retransmit_total = 0;
|
|
|
|
for packets in packet_v {
|
|
|
|
for packet in &packets.packets {
|
|
|
|
let (my_index, mut shuffled_stakes_and_index) =
|
|
|
|
cluster_info.read().unwrap().shuffle_peers_and_index(
|
|
|
|
&peers,
|
|
|
|
&stakes_and_index,
|
|
|
|
ChaChaRng::from_seed(packet.meta.seed),
|
|
|
|
);
|
|
|
|
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
|
|
|
|
shuffled_stakes_and_index.remove(my_index);
|
|
|
|
// split off the indexes, we don't need the stakes anymore
|
|
|
|
let indexes = shuffled_stakes_and_index
|
|
|
|
.into_iter()
|
|
|
|
.map(|(_, index)| index)
|
|
|
|
.collect();
|
2019-06-03 20:38:05 -07:00
|
|
|
|
2019-10-07 15:33:22 -07:00
|
|
|
let (neighbors, children) =
|
|
|
|
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, indexes);
|
|
|
|
let neighbors: Vec<_> = neighbors.into_iter().map(|index| &peers[index]).collect();
|
|
|
|
let children: Vec<_> = children.into_iter().map(|index| &peers[index]).collect();
|
2019-06-03 20:38:05 -07:00
|
|
|
|
2019-10-07 15:33:22 -07:00
|
|
|
let leader =
|
|
|
|
leader_schedule_cache.slot_leader_at(packet.meta.slot, Some(r_bank.as_ref()));
|
|
|
|
let mut retransmit_time = Measure::start("retransmit_to");
|
|
|
|
if !packet.meta.forward {
|
|
|
|
ClusterInfo::retransmit_to(&cluster_info, &neighbors, packet, leader, sock, true)?;
|
|
|
|
ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, false)?;
|
|
|
|
} else {
|
|
|
|
ClusterInfo::retransmit_to(&cluster_info, &children, packet, leader, sock, true)?;
|
|
|
|
}
|
|
|
|
retransmit_time.stop();
|
|
|
|
retransmit_total += retransmit_time.as_us();
|
2019-02-12 10:56:48 -08:00
|
|
|
}
|
2018-08-09 13:41:21 -07:00
|
|
|
}
|
2019-10-07 15:33:22 -07:00
|
|
|
timer_start.stop();
|
|
|
|
debug!(
|
|
|
|
"retransmitted {} packets in {}us retransmit_time: {}us",
|
|
|
|
total_packets,
|
|
|
|
timer_start.as_us(),
|
|
|
|
retransmit_total
|
|
|
|
);
|
2019-10-04 16:25:22 -07:00
|
|
|
datapoint_debug!("cluster_info-num_nodes", ("count", peers_len, i64));
|
2018-08-09 13:41:21 -07:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2019-01-02 00:46:15 -08:00
|
|
|
/// Service to retransmit messages from the leader or layer 1 to relevant peer nodes.
|
2018-10-08 19:55:54 -07:00
|
|
|
/// See `cluster_info` for network layer definitions.
|
2018-08-09 13:41:21 -07:00
|
|
|
/// # Arguments
|
|
|
|
/// * `sock` - Socket to read from. Read timeout is set to 1.
|
|
|
|
/// * `exit` - Boolean to signal system exit.
|
2018-10-08 19:55:54 -07:00
|
|
|
/// * `cluster_info` - This structure needs to be updated and populated by the bank and via gossip.
|
2018-08-09 13:41:21 -07:00
|
|
|
/// * `recycler` - Blob recycler.
|
|
|
|
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
|
2018-10-08 19:55:54 -07:00
|
|
|
fn retransmitter(
|
|
|
|
sock: Arc<UdpSocket>,
|
2019-02-21 11:19:45 -08:00
|
|
|
bank_forks: Arc<RwLock<BankForks>>,
|
2019-04-22 18:41:01 -07:00
|
|
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
2019-08-20 17:16:06 -07:00
|
|
|
r: PacketReceiver,
|
2018-10-08 19:55:54 -07:00
|
|
|
) -> JoinHandle<()> {
|
2019-04-22 15:21:10 -07:00
|
|
|
let bank_forks = bank_forks.clone();
|
2019-04-22 18:41:01 -07:00
|
|
|
let leader_schedule_cache = leader_schedule_cache.clone();
|
2018-08-09 13:41:21 -07:00
|
|
|
Builder::new()
|
|
|
|
.name("solana-retransmitter".to_string())
|
|
|
|
.spawn(move || {
|
|
|
|
trace!("retransmitter started");
|
|
|
|
loop {
|
2019-04-22 18:41:01 -07:00
|
|
|
if let Err(e) = retransmit(
|
|
|
|
&bank_forks,
|
|
|
|
&leader_schedule_cache,
|
|
|
|
&cluster_info,
|
|
|
|
&r,
|
|
|
|
&sock,
|
|
|
|
) {
|
2018-08-09 13:41:21 -07:00
|
|
|
match e {
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
|
|
|
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
|
|
|
_ => {
|
2019-05-17 07:00:06 -07:00
|
|
|
inc_new_counter_error!("streamer-retransmit-error", 1, 1);
|
2018-08-09 13:41:21 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
trace!("exiting retransmitter");
|
2018-12-07 19:01:28 -08:00
|
|
|
})
|
|
|
|
.unwrap()
|
2018-08-09 13:41:21 -07:00
|
|
|
}
|
|
|
|
|
2018-08-09 12:03:34 -07:00
|
|
|
pub struct RetransmitStage {
|
2018-10-10 16:49:41 -07:00
|
|
|
thread_hdls: Vec<JoinHandle<()>>,
|
2019-02-07 15:10:54 -08:00
|
|
|
window_service: WindowService,
|
2018-06-13 21:52:23 -07:00
|
|
|
}
|
|
|
|
|
2018-08-09 12:03:34 -07:00
|
|
|
impl RetransmitStage {
|
2019-02-07 15:10:54 -08:00
|
|
|
#[allow(clippy::new_ret_no_self)]
|
2019-05-09 14:10:04 -07:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2018-06-13 21:52:23 -07:00
|
|
|
pub fn new(
|
2019-04-22 15:21:10 -07:00
|
|
|
bank_forks: Arc<RwLock<BankForks>>,
|
2019-04-22 18:41:01 -07:00
|
|
|
leader_schedule_cache: &Arc<LeaderScheduleCache>,
|
2019-02-07 20:52:39 -08:00
|
|
|
blocktree: Arc<Blocktree>,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
2018-08-28 16:32:40 -07:00
|
|
|
retransmit_socket: Arc<UdpSocket>,
|
2018-08-30 12:07:54 -07:00
|
|
|
repair_socket: Arc<UdpSocket>,
|
2019-08-20 17:16:06 -07:00
|
|
|
fetch_stage_receiver: PacketReceiver,
|
2019-03-04 20:50:02 -08:00
|
|
|
exit: &Arc<AtomicBool>,
|
2019-05-09 14:10:04 -07:00
|
|
|
completed_slots_receiver: CompletedSlotsReceiver,
|
2019-05-13 15:37:50 -07:00
|
|
|
epoch_schedule: EpochSchedule,
|
2019-02-04 15:33:43 -08:00
|
|
|
) -> Self {
|
2018-06-13 21:52:23 -07:00
|
|
|
let (retransmit_sender, retransmit_receiver) = channel();
|
|
|
|
|
2019-01-02 00:46:15 -08:00
|
|
|
let t_retransmit = retransmitter(
|
|
|
|
retransmit_socket,
|
2019-02-21 11:19:45 -08:00
|
|
|
bank_forks.clone(),
|
2019-04-22 18:41:01 -07:00
|
|
|
leader_schedule_cache,
|
2019-01-02 00:46:15 -08:00
|
|
|
cluster_info.clone(),
|
|
|
|
retransmit_receiver,
|
|
|
|
);
|
2019-05-09 14:10:04 -07:00
|
|
|
|
|
|
|
let repair_strategy = RepairStrategy::RepairAll {
|
|
|
|
bank_forks,
|
|
|
|
completed_slots_receiver,
|
2019-05-13 15:37:50 -07:00
|
|
|
epoch_schedule,
|
2019-05-09 14:10:04 -07:00
|
|
|
};
|
2019-05-13 21:19:51 -07:00
|
|
|
let leader_schedule_cache = leader_schedule_cache.clone();
|
2019-02-07 15:10:54 -08:00
|
|
|
let window_service = WindowService::new(
|
2019-02-07 20:52:39 -08:00
|
|
|
blocktree,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info.clone(),
|
2018-06-13 21:52:23 -07:00
|
|
|
fetch_stage_receiver,
|
|
|
|
retransmit_sender,
|
2018-08-30 12:07:54 -07:00
|
|
|
repair_socket,
|
2019-01-31 13:43:22 -08:00
|
|
|
exit,
|
2019-05-09 14:10:04 -07:00
|
|
|
repair_strategy,
|
2019-09-05 18:20:30 -07:00
|
|
|
&leader_schedule_cache.clone(),
|
2019-09-17 18:22:46 -07:00
|
|
|
move |id, shred, working_bank, last_root| {
|
2019-08-27 19:28:00 -07:00
|
|
|
should_retransmit_and_persist(
|
|
|
|
shred,
|
|
|
|
working_bank,
|
|
|
|
&leader_schedule_cache,
|
|
|
|
id,
|
2019-09-16 13:13:53 -07:00
|
|
|
last_root,
|
2019-08-27 19:28:00 -07:00
|
|
|
)
|
2019-05-13 21:19:51 -07:00
|
|
|
},
|
2018-06-13 21:52:23 -07:00
|
|
|
);
|
|
|
|
|
2019-02-07 15:10:54 -08:00
|
|
|
let thread_hdls = vec![t_retransmit];
|
|
|
|
Self {
|
|
|
|
thread_hdls,
|
|
|
|
window_service,
|
|
|
|
}
|
2018-06-13 21:52:23 -07:00
|
|
|
}
|
|
|
|
}
|
2018-07-03 21:14:08 -07:00
|
|
|
|
2018-08-09 12:03:34 -07:00
|
|
|
impl Service for RetransmitStage {
|
2018-10-10 16:49:41 -07:00
|
|
|
type JoinReturnType = ();
|
2018-07-03 21:14:08 -07:00
|
|
|
|
2018-10-10 16:49:41 -07:00
|
|
|
fn join(self) -> thread::Result<()> {
|
|
|
|
for thread_hdl in self.thread_hdls {
|
|
|
|
thread_hdl.join()?;
|
2018-07-03 21:14:08 -07:00
|
|
|
}
|
2019-02-07 15:10:54 -08:00
|
|
|
self.window_service.join()?;
|
2018-10-10 16:49:41 -07:00
|
|
|
Ok(())
|
2018-07-03 21:14:08 -07:00
|
|
|
}
|
|
|
|
}
|