bypasses window-service stage before retransmitting shreds (#26291)

With recent patches, window-service recv-window does not do much other
than redirecting packets/shreds to downstream channels.
The commit removes window-service recv-window and instead sends
packets/shreds directly from sigverify to retransmit-stage and
window-service insert thread.
This commit is contained in:
behzad nouri 2022-07-06 11:49:58 +00:00 committed by GitHub
parent 37f4621c06
commit d33c548660
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 209 additions and 368 deletions

View File

@ -3,17 +3,10 @@
use {
crate::{
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_nodes::{ClusterNodes, ClusterNodesCache},
cluster_slots::ClusterSlots,
cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver},
completed_data_sets_service::CompletedDataSetsSender,
packet_hasher::PacketHasher,
repair_service::{DuplicateSlotsResetSender, RepairInfo},
window_service::WindowService,
},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::{izip, Itertools},
lru::LruCache,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
@ -23,27 +16,25 @@ use {
contact_info::ContactInfo,
},
solana_ledger::{
blockstore::Blockstore,
leader_schedule_cache::LeaderScheduleCache,
shred::{self, ShredId},
},
solana_measure::measure::Measure,
solana_perf::packet::PacketBatch,
solana_rayon_threadlimit::get_thread_count,
solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp},
solana_sdk::{clock::Slot, pubkey::Pubkey, timing::timestamp},
solana_streamer::{
sendmmsg::{multi_target_send, SendPktsError},
socket::SocketAddrSpace,
},
std::{
collections::{HashMap, HashSet},
collections::HashMap,
iter::repeat,
net::UdpSocket,
ops::AddAssign,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
@ -378,122 +369,61 @@ pub fn retransmitter(
.unwrap();
Builder::new()
.name("solana-retransmitter".to_string())
.spawn(move || {
trace!("retransmitter started");
loop {
match retransmit(
&thread_pool,
&bank_forks,
&leader_schedule_cache,
&cluster_info,
&shreds_receiver,
&sockets,
&mut stats,
&cluster_nodes_cache,
&mut hasher_reset_ts,
&mut shreds_received,
&mut packet_hasher,
&max_slots,
rpc_subscriptions.as_deref(),
) {
Ok(()) => (),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
}
.spawn(move || loop {
match retransmit(
&thread_pool,
&bank_forks,
&leader_schedule_cache,
&cluster_info,
&shreds_receiver,
&sockets,
&mut stats,
&cluster_nodes_cache,
&mut hasher_reset_ts,
&mut shreds_received,
&mut packet_hasher,
&max_slots,
rpc_subscriptions.as_deref(),
) {
Ok(()) => (),
Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
}
trace!("exiting retransmitter");
})
.unwrap()
}
pub struct RetransmitStage {
retransmit_thread_handle: JoinHandle<()>,
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,
}
impl RetransmitStage {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
blockstore: Arc<Blockstore>,
cluster_info: Arc<ClusterInfo>,
retransmit_sockets: Arc<Vec<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
verified_receiver: Receiver<Vec<PacketBatch>>,
exit: Arc<AtomicBool>,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
turbine_disabled: Arc<AtomicBool>,
cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
verified_vote_receiver: VerifiedVoteReceiver,
repair_validators: Option<HashSet<Pubkey>>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_receiver: Receiver<Vec</*shred:*/ Vec<u8>>>,
max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
duplicate_slots_sender: Sender<Slot>,
ancestor_hashes_replay_update_receiver: AncestorHashesReplayUpdateReceiver,
) -> Self {
let (retransmit_sender, retransmit_receiver) = unbounded();
let retransmit_thread_handle = retransmitter(
retransmit_sockets,
bank_forks.clone(),
leader_schedule_cache.clone(),
cluster_info.clone(),
bank_forks,
leader_schedule_cache,
cluster_info,
retransmit_receiver,
max_slots,
rpc_subscriptions,
);
let cluster_slots_service = ClusterSlotsService::new(
blockstore.clone(),
cluster_slots.clone(),
bank_forks.clone(),
cluster_info.clone(),
cluster_slots_update_receiver,
exit.clone(),
);
let repair_info = RepairInfo {
bank_forks,
epoch_schedule,
duplicate_slots_reset_sender,
repair_validators,
cluster_info,
cluster_slots,
};
let window_service = WindowService::new(
blockstore,
verified_receiver,
retransmit_sender,
repair_socket,
ancestor_hashes_socket,
exit,
repair_info,
leader_schedule_cache,
turbine_disabled,
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
ancestor_hashes_replay_update_receiver,
);
Self {
retransmit_thread_handle,
window_service,
cluster_slots_service,
}
}
pub(crate) fn join(self) -> thread::Result<()> {
self.retransmit_thread_handle.join()?;
self.window_service.join()?;
self.cluster_slots_service.join()
self.retransmit_thread_handle.join()
}
}

View File

@ -14,7 +14,10 @@ use {
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::HashMap,
sync::{Arc, RwLock},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
},
};
@ -24,7 +27,9 @@ pub struct ShredSigVerifier {
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
recycler_cache: RecyclerCache,
retransmit_sender: Sender<Vec</*shred:*/ Vec<u8>>>,
packet_sender: Sender<Vec<PacketBatch>>,
turbine_disabled: Arc<AtomicBool>,
}
impl ShredSigVerifier {
@ -32,7 +37,9 @@ impl ShredSigVerifier {
pubkey: Pubkey,
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
retransmit_sender: Sender<Vec</*shred:*/ Vec<u8>>>,
packet_sender: Sender<Vec<PacketBatch>>,
turbine_disabled: Arc<AtomicBool>,
) -> Self {
sigverify::init();
Self {
@ -40,7 +47,9 @@ impl ShredSigVerifier {
bank_forks,
leader_schedule_cache,
recycler_cache: RecyclerCache::warmed(),
retransmit_sender,
packet_sender,
turbine_disabled,
}
}
}
@ -52,6 +61,20 @@ impl SigVerifier for ShredSigVerifier {
&mut self,
packet_batches: Vec<PacketBatch>,
) -> Result<(), SigVerifyServiceError<Self::SendType>> {
if self.turbine_disabled.load(Ordering::Relaxed) {
return Ok(());
}
// Exclude repair packets from retransmit.
// TODO: return the error here!
let _ = self.retransmit_sender.send(
packet_batches
.iter()
.flat_map(PacketBatch::iter)
.filter(|packet| !packet.meta.discard() && !packet.meta.repair())
.filter_map(shred::layout::get_shred)
.map(<[u8]>::to_vec)
.collect(),
);
self.packet_sender.send(packet_batches)?;
Ok(())
}
@ -140,8 +163,15 @@ pub mod tests {
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let bf = Arc::new(RwLock::new(BankForks::new(bank)));
let (sender, receiver) = unbounded();
let mut verifier = ShredSigVerifier::new(Pubkey::new_unique(), bf, cache, sender);
let (retransmit_sender, _retransmit_receiver) = unbounded();
let mut verifier = ShredSigVerifier::new(
Pubkey::new_unique(),
bf,
cache,
retransmit_sender,
sender,
Arc::<AtomicBool>::default(), // turbine_disabled
);
let batch_size = 2;
let mut batch = PacketBatch::with_capacity(batch_size);
batch.resize(batch_size, Packet::default());

View File

@ -10,10 +10,12 @@ use {
VerifiedVoteReceiver, VoteTracker,
},
cluster_slots::ClusterSlots,
cluster_slots_service::ClusterSlotsService,
completed_data_sets_service::CompletedDataSetsSender,
cost_update_service::CostUpdateService,
drop_bank_service::DropBankService,
ledger_cleanup_service::LedgerCleanupService,
repair_service::RepairInfo,
replay_stage::{ReplayStage, ReplayStageConfig},
retransmit_stage::RetransmitStage,
rewards_recorder_service::RewardsRecorderSender,
@ -24,6 +26,7 @@ use {
validator::ProcessBlockStore,
voting_service::VotingService,
warm_quic_cache_service::WarmQuicCacheService,
window_service::WindowService,
},
crossbeam_channel::{unbounded, Receiver},
solana_client::connection_cache::ConnectionCache,
@ -61,6 +64,8 @@ pub struct Tvu {
fetch_stage: ShredFetchStage,
sigverify_stage: SigVerifyStage,
retransmit_stage: RetransmitStage,
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,
replay_stage: ReplayStage,
ledger_cleanup_service: Option<LedgerCleanupService>,
cost_update_service: CostUpdateService,
@ -157,45 +162,69 @@ impl Tvu {
);
let (verified_sender, verified_receiver) = unbounded();
let (retransmit_sender, retransmit_receiver) = unbounded();
let sigverify_stage = SigVerifyStage::new(
fetch_receiver,
ShredSigVerifier::new(
cluster_info.id(),
bank_forks.clone(),
leader_schedule_cache.clone(),
retransmit_sender.clone(),
verified_sender,
turbine_disabled,
),
"shred-verifier",
);
let retransmit_stage = RetransmitStage::new(
bank_forks.clone(),
leader_schedule_cache.clone(),
cluster_info.clone(),
Arc::new(retransmit_sockets),
retransmit_receiver,
max_slots.clone(),
Some(rpc_subscriptions.clone()),
);
let cluster_slots = Arc::new(ClusterSlots::default());
let (duplicate_slots_reset_sender, duplicate_slots_reset_receiver) = unbounded();
let (duplicate_slots_sender, duplicate_slots_receiver) = unbounded();
let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
let (ancestor_hashes_replay_update_sender, ancestor_hashes_replay_update_receiver) =
unbounded();
let retransmit_stage = RetransmitStage::new(
bank_forks.clone(),
leader_schedule_cache.clone(),
let window_service = {
let epoch_schedule = *bank_forks.read().unwrap().working_bank().epoch_schedule();
let repair_info = RepairInfo {
bank_forks: bank_forks.clone(),
epoch_schedule,
duplicate_slots_reset_sender,
repair_validators: tvu_config.repair_validators,
cluster_info: cluster_info.clone(),
cluster_slots: cluster_slots.clone(),
};
WindowService::new(
blockstore.clone(),
verified_receiver,
retransmit_sender,
repair_socket,
ancestor_hashes_socket,
exit.clone(),
repair_info,
leader_schedule_cache.clone(),
verified_vote_receiver,
completed_data_sets_sender,
duplicate_slots_sender,
ancestor_hashes_replay_update_receiver,
)
};
let (cluster_slots_update_sender, cluster_slots_update_receiver) = unbounded();
let cluster_slots_service = ClusterSlotsService::new(
blockstore.clone(),
cluster_info.clone(),
Arc::new(retransmit_sockets),
repair_socket,
ancestor_hashes_socket,
verified_receiver,
exit.clone(),
cluster_slots_update_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
turbine_disabled,
cluster_slots.clone(),
duplicate_slots_reset_sender,
verified_vote_receiver,
tvu_config.repair_validators,
completed_data_sets_sender,
max_slots.clone(),
Some(rpc_subscriptions.clone()),
duplicate_slots_sender,
ancestor_hashes_replay_update_receiver,
bank_forks.clone(),
cluster_info.clone(),
cluster_slots_update_receiver,
exit.clone(),
);
let (ledger_cleanup_slot_sender, ledger_cleanup_slot_receiver) = unbounded();
@ -292,6 +321,8 @@ impl Tvu {
fetch_stage,
sigverify_stage,
retransmit_stage,
window_service,
cluster_slots_service,
replay_stage,
ledger_cleanup_service,
cost_update_service,
@ -304,6 +335,8 @@ impl Tvu {
pub fn join(self) -> thread::Result<()> {
self.retransmit_stage.join()?;
self.window_service.join()?;
self.cluster_slots_service.join()?;
self.fetch_stage.join()?;
self.sigverify_stage.join()?;
if self.ledger_cleanup_service.is_some() {

View File

@ -22,7 +22,7 @@ use {
solana_metrics::inc_new_counter_error,
solana_perf::packet::{Packet, PacketBatch},
solana_rayon_threadlimit::get_thread_count,
solana_sdk::{clock::Slot, pubkey::Pubkey},
solana_sdk::clock::Slot,
std::{
cmp::Reverse,
collections::{HashMap, HashSet},
@ -43,7 +43,10 @@ pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
#[derive(Default)]
struct WindowServiceMetrics {
run_insert_count: u64,
num_shreds_received: u64,
num_packets: usize,
num_repairs: usize,
num_shreds_received: usize,
handle_packets_elapsed_us: u64,
shred_receiver_elapsed_us: u64,
prune_shreds_elapsed_us: u64,
num_shreds_pruned_invalid_repair: usize,
@ -52,14 +55,23 @@ struct WindowServiceMetrics {
num_errors_cross_beam_recv_timeout: u64,
num_errors_other: u64,
num_errors_try_crossbeam_send: u64,
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
}
impl WindowServiceMetrics {
fn report_metrics(&self, metric_name: &'static str) {
const MAX_NUM_ADDRS: usize = 5;
datapoint_info!(
metric_name,
(
"handle_packets_elapsed_us",
self.handle_packets_elapsed_us,
i64
),
("run_insert_count", self.run_insert_count as i64, i64),
("num_shreds_received", self.num_shreds_received as i64, i64),
("num_packets", self.num_packets, i64),
("num_repairs", self.num_repairs, i64),
("num_shreds_received", self.num_shreds_received, i64),
(
"shred_receiver_elapsed_us",
self.shred_receiver_elapsed_us as i64,
@ -89,6 +101,19 @@ impl WindowServiceMetrics {
i64
),
);
let mut addrs: Vec<_> = self.addrs.iter().collect();
let reverse_count = |(_addr, count): &_| Reverse(*count);
if addrs.len() > MAX_NUM_ADDRS {
addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count);
addrs.truncate(MAX_NUM_ADDRS);
}
addrs.sort_unstable_by_key(reverse_count);
info!(
"num addresses: {}, top packets by source: {:?}",
self.addrs.len(),
addrs
);
}
fn record_error(&mut self, err: &Error) {
@ -105,52 +130,6 @@ impl WindowServiceMetrics {
}
}
#[derive(Default)]
struct ReceiveWindowStats {
num_iters: usize,
num_packets: usize,
num_repairs: usize,
num_shreds: usize, // num_discards: num_packets - num_shreds
elapsed: Duration, // excludes waiting time on the receiver channel.
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
since: Option<Instant>,
}
impl ReceiveWindowStats {
fn maybe_submit(&mut self) {
const MAX_NUM_ADDRS: usize = 5;
const SUBMIT_CADENCE: Duration = Duration::from_secs(2);
let elapsed = self.since.as_ref().map(Instant::elapsed);
if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE {
return;
}
datapoint_info!(
"receive_window_stats",
("num_iters", self.num_iters, i64),
("num_packets", self.num_packets, i64),
("num_shreds", self.num_shreds, i64),
("num_repairs", self.num_repairs, i64),
("elapsed_micros", self.elapsed.as_micros(), i64),
);
let mut addrs: Vec<_> = std::mem::take(&mut self.addrs).into_iter().collect();
let reverse_count = |(_addr, count): &_| Reverse(*count);
if addrs.len() > MAX_NUM_ADDRS {
addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count);
addrs.truncate(MAX_NUM_ADDRS);
}
addrs.sort_unstable_by_key(reverse_count);
info!(
"num addresses: {}, top packets by source: {:?}",
self.addrs.len(),
addrs
);
*self = Self {
since: Some(Instant::now()),
..Self::default()
};
}
}
fn run_check_duplicate(
cluster_info: &ClusterInfo,
blockstore: &Blockstore,
@ -229,8 +208,10 @@ fn prune_shreds_invalid_repair(
assert_eq!(shreds.len(), repair_infos.len());
}
#[allow(clippy::too_many_arguments)]
fn run_insert<F>(
shred_receiver: &Receiver<(Vec<ShredPayload>, Vec<Option<RepairMeta>>)>,
thread_pool: &ThreadPool,
verified_receiver: &Receiver<Vec<PacketBatch>>,
blockstore: &Blockstore,
leader_schedule_cache: &LeaderScheduleCache,
handle_duplicate: F,
@ -243,26 +224,46 @@ fn run_insert<F>(
where
F: Fn(Shred),
{
ws_metrics.run_insert_count += 1;
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
let timer = Duration::from_millis(200);
let (mut shreds, mut repair_infos) = shred_receiver.recv_timeout(timer)?;
while let Ok((more_shreds, more_repair_infos)) = shred_receiver.try_recv() {
shreds.extend(more_shreds);
repair_infos.extend(more_repair_infos);
}
let mut packets = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
packets.extend(verified_receiver.try_iter().flatten());
shred_receiver_elapsed.stop();
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
ws_metrics.num_shreds_received += shreds.len() as u64;
// TODO: Consider using thread-pool here instead of recv_window.
let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = shreds
.into_iter()
.zip(repair_infos)
.filter_map(|(shred, repair_info)| {
let shred = Shred::new_from_serialized_shred(shred).ok()?;
Some((shred, repair_info))
})
.unzip();
ws_metrics.run_insert_count += 1;
let handle_packet = |packet: &Packet| {
if packet.meta.discard() {
return None;
}
let shred = shred::layout::get_shred(packet)?;
let shred = Shred::new_from_serialized_shred(shred.to_vec()).ok()?;
if packet.meta.repair() {
let repair_info = RepairMeta {
_from_addr: packet.meta.socket_addr(),
// If can't parse the nonce, dump the packet.
nonce: repair_response::nonce(packet)?,
};
Some((shred, Some(repair_info)))
} else {
Some((shred, None))
}
};
let now = Instant::now();
let (mut shreds, mut repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
packets
.par_iter()
.flat_map_iter(|packets| packets.iter().filter_map(handle_packet))
.unzip()
});
ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
ws_metrics.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
ws_metrics.num_shreds_received += shreds.len();
for packet in packets.iter().flat_map(PacketBatch::iter) {
let addr = packet.meta.socket_addr();
*ws_metrics.addrs.entry(addr).or_default() += 1;
}
let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed");
let num_shreds = shreds.len();
prune_shreds_invalid_repair(&mut shreds, &mut repair_infos, outstanding_requests);
@ -293,90 +294,12 @@ where
Ok(())
}
fn recv_window(
insert_shred_sender: &Sender<(Vec<ShredPayload>, Vec<Option<RepairMeta>>)>,
verified_receiver: &Receiver<Vec<PacketBatch>>,
retransmit_sender: &Sender<Vec<ShredPayload>>,
turbine_disabled: &AtomicBool,
thread_pool: &ThreadPool,
stats: &mut ReceiveWindowStats,
) -> Result<()> {
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut packet_batches = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
packet_batches.extend(verified_receiver.try_iter().flatten());
let now = Instant::now();
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
let handle_packet = |packet: &Packet| {
if turbine_disabled || packet.meta.discard() {
return None;
}
let shred = shred::layout::get_shred(packet)?;
if packet.meta.repair() {
let repair_info = RepairMeta {
_from_addr: packet.meta.socket_addr(),
// If can't parse the nonce, dump the packet.
nonce: repair_response::nonce(packet)?,
};
Some((shred.to_vec(), Some(repair_info)))
} else {
Some((shred.to_vec(), None))
}
};
let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
packet_batches
.par_iter()
.flat_map_iter(|packet_batch| packet_batch.iter().filter_map(handle_packet))
.unzip()
});
// Exclude repair packets from retransmit.
let _ = retransmit_sender.send(
shreds
.iter()
.zip(&repair_infos)
.filter(|(_, repair_info)| repair_info.is_none())
.map(|(shred, _)| shred)
.cloned()
.collect(),
);
stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count();
stats.num_shreds += shreds.len();
insert_shred_sender.send((shreds, repair_infos))?;
stats.num_iters += 1;
stats.num_packets += packet_batches.iter().map(PacketBatch::len).sum::<usize>();
for packet in packet_batches.iter().flat_map(PacketBatch::iter) {
let addr = packet.meta.socket_addr();
*stats.addrs.entry(addr).or_default() += 1;
}
stats.elapsed += now.elapsed();
Ok(())
}
struct RepairMeta {
_from_addr: SocketAddr,
nonce: Nonce,
}
// Implement a destructor for the window_service thread to signal it exited
// even on panics
struct Finalizer {
exit_sender: Arc<AtomicBool>,
}
impl Finalizer {
fn new(exit_sender: Arc<AtomicBool>) -> Self {
Finalizer { exit_sender }
}
}
// Implement a destructor for Finalizer.
impl Drop for Finalizer {
fn drop(&mut self) {
self.exit_sender.clone().store(true, Ordering::Relaxed);
}
}
pub(crate) struct WindowService {
t_window: JoinHandle<()>,
t_insert: JoinHandle<()>,
t_check_duplicate: JoinHandle<()>,
repair_service: RepairService,
@ -393,7 +316,6 @@ impl WindowService {
exit: Arc<AtomicBool>,
repair_info: RepairInfo,
leader_schedule_cache: Arc<LeaderScheduleCache>,
turbine_disabled: Arc<AtomicBool>,
verified_vote_receiver: VerifiedVoteReceiver,
completed_data_sets_sender: CompletedDataSetsSender,
duplicate_slots_sender: DuplicateSlotSender,
@ -402,7 +324,6 @@ impl WindowService {
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
let cluster_info = repair_info.cluster_info.clone();
let id = cluster_info.id();
let repair_service = RepairService::new(
blockstore.clone(),
@ -415,7 +336,6 @@ impl WindowService {
ancestor_hashes_replay_update_receiver,
);
let (insert_sender, insert_receiver) = unbounded();
let (duplicate_sender, duplicate_receiver) = unbounded();
let t_check_duplicate = Self::start_check_duplicate_thread(
@ -427,27 +347,17 @@ impl WindowService {
);
let t_insert = Self::start_window_insert_thread(
exit.clone(),
exit,
blockstore,
leader_schedule_cache,
insert_receiver,
verified_receiver,
duplicate_sender,
completed_data_sets_sender,
retransmit_sender.clone(),
retransmit_sender,
outstanding_requests,
);
let t_window = Self::start_recv_window_thread(
id,
exit,
insert_sender,
verified_receiver,
turbine_disabled,
retransmit_sender,
);
WindowService {
t_window,
t_insert,
t_check_duplicate,
repair_service,
@ -466,20 +376,17 @@ impl WindowService {
};
Builder::new()
.name("solana-check-duplicate".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let mut noop = || {};
if let Err(e) = run_check_duplicate(
&cluster_info,
&blockstore,
&duplicate_receiver,
&duplicate_slots_sender,
) {
if Self::should_exit_on_error(e, &mut noop, &handle_error) {
break;
.spawn(move || {
while !exit.load(Ordering::Relaxed) {
if let Err(e) = run_check_duplicate(
&cluster_info,
&blockstore,
&duplicate_receiver,
&duplicate_slots_sender,
) {
if Self::should_exit_on_error(e, &handle_error) {
break;
}
}
}
})
@ -490,17 +397,20 @@ impl WindowService {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
insert_receiver: Receiver<(Vec<ShredPayload>, Vec<Option<RepairMeta>>)>,
verified_receiver: Receiver<Vec<PacketBatch>>,
check_duplicate_sender: Sender<Shred>,
completed_data_sets_sender: CompletedDataSetsSender,
retransmit_sender: Sender<Vec<ShredPayload>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
) -> JoinHandle<()> {
let mut handle_timeout = || {};
let handle_error = || {
inc_new_counter_error!("solana-window-insert-error", 1, 1);
};
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count().min(8))
.thread_name(|i| format!("window-insert-{}", i))
.build()
.unwrap();
Builder::new()
.name("solana-window-insert".to_string())
.spawn(move || {
@ -510,13 +420,10 @@ impl WindowService {
let mut metrics = BlockstoreInsertionMetrics::default();
let mut ws_metrics = WindowServiceMetrics::default();
let mut last_print = Instant::now();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
while !exit.load(Ordering::Relaxed) {
if let Err(e) = run_insert(
&insert_receiver,
&thread_pool,
&verified_receiver,
&blockstore,
&leader_schedule_cache,
&handle_duplicate,
@ -527,7 +434,7 @@ impl WindowService {
&outstanding_requests,
) {
ws_metrics.record_error(&e);
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
if Self::should_exit_on_error(e, &handle_error) {
break;
}
}
@ -544,71 +451,13 @@ impl WindowService {
.unwrap()
}
#[allow(clippy::too_many_arguments)]
fn start_recv_window_thread(
id: Pubkey,
exit: Arc<AtomicBool>,
insert_sender: Sender<(Vec<ShredPayload>, Vec<Option<RepairMeta>>)>,
verified_receiver: Receiver<Vec<PacketBatch>>,
turbine_disabled: Arc<AtomicBool>,
retransmit_sender: Sender<Vec<ShredPayload>>,
) -> JoinHandle<()> {
let mut stats = ReceiveWindowStats::default();
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit.clone());
trace!("{}: RECV_WINDOW started", id);
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
.unwrap();
let mut now = Instant::now();
let handle_error = || {
inc_new_counter_error!("solana-window-error", 1, 1);
};
while !exit.load(Ordering::Relaxed) {
let mut handle_timeout = || {
if now.elapsed() > Duration::from_secs(30) {
warn!(
"Window does not seem to be receiving data. \
Ensure port configuration is correct..."
);
now = Instant::now();
}
};
if let Err(e) = recv_window(
&insert_sender,
&verified_receiver,
&retransmit_sender,
&turbine_disabled,
&thread_pool,
&mut stats,
) {
if Self::should_exit_on_error(e, &mut handle_timeout, &handle_error) {
break;
}
} else {
now = Instant::now();
}
stats.maybe_submit();
}
})
.unwrap()
}
fn should_exit_on_error<F, H>(e: Error, handle_timeout: &mut F, handle_error: &H) -> bool
fn should_exit_on_error<H>(e: Error, handle_error: &H) -> bool
where
F: FnMut(),
H: Fn(),
{
match e {
Error::RecvTimeout(RecvTimeoutError::Disconnected) => true,
Error::RecvTimeout(RecvTimeoutError::Timeout) => {
handle_timeout();
false
}
Error::RecvTimeout(RecvTimeoutError::Timeout) => false,
Error::Send => true,
_ => {
handle_error();
@ -619,7 +468,6 @@ impl WindowService {
}
pub(crate) fn join(self) -> thread::Result<()> {
self.t_window.join()?;
self.t_insert.join()?;
self.t_check_duplicate.join()?;
self.repair_service.join()