From e66edeb180334ba20f712e05c03e7cb7c4b3993d Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 20 Mar 2023 20:34:41 +0000 Subject: [PATCH] moves turbine-disabled check to shred-fetch-stage (#30799) If turbine_disabled is true, the commit discards turbine packets earlier in the pipeline so that they won't interfere with the deduper and the packets can get through once turbine is enabled again. This is a prerequisite of: https://github.com/solana-labs/solana/pull/30786 so that local-cluster tests pass. --- core/src/shred_fetch_stage.rs | 34 ++++++++++++++++++++++++---------- core/src/sigverify_shreds.rs | 14 +++----------- core/src/tvu.rs | 2 +- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index c69440b101..325f955bf0 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -17,7 +17,10 @@ use { solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, std::{ net::UdpSocket, - sync::{atomic::AtomicBool, Arc, RwLock}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, thread::{self, Builder, JoinHandle}, time::{Duration, Instant}, }, @@ -41,6 +44,7 @@ impl ShredFetchStage { name: &'static str, flags: PacketFlags, repair_context: Option<(&UdpSocket, &ClusterInfo)>, + turbine_disabled: Arc, ) { const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1); let mut rng = rand::thread_rng(); @@ -95,16 +99,19 @@ impl ShredFetchStage { let max_slot = last_slot + 2 * slots_per_epoch; let should_drop_merkle_shreds = |shred_slot| should_drop_merkle_shreds(shred_slot, &root_bank); + let turbine_disabled = turbine_disabled.load(Ordering::Relaxed); for packet in packet_batch.iter_mut() { - if should_discard_packet( - packet, - last_root, - max_slot, - shred_version, - &deduper, - should_drop_merkle_shreds, - &mut stats, - ) { + if turbine_disabled + || should_discard_packet( + packet, + last_root, + max_slot, + shred_version, + &deduper, + should_drop_merkle_shreds, + &mut stats, + ) + { packet.meta_mut().set_discard(true); } else { packet.meta_mut().flags.insert(flags); @@ -117,6 +124,7 @@ impl ShredFetchStage { } } + #[allow(clippy::too_many_arguments)] fn packet_modifier( sockets: Vec>, exit: &Arc, @@ -127,6 +135,7 @@ impl ShredFetchStage { name: &'static str, flags: PacketFlags, repair_context: Option<(Arc, Arc)>, + turbine_disabled: Arc, ) -> (Vec>, JoinHandle<()>) { let (packet_sender, packet_receiver) = unbounded(); let streamers = sockets @@ -158,6 +167,7 @@ impl ShredFetchStage { name, flags, repair_context, + turbine_disabled, ) }) .unwrap(); @@ -172,6 +182,7 @@ impl ShredFetchStage { shred_version: u16, bank_forks: Arc>, cluster_info: Arc, + turbine_disabled: Arc, exit: &Arc, ) -> Self { let recycler = PacketBatchRecycler::warmed(100, 1024); @@ -186,6 +197,7 @@ impl ShredFetchStage { "shred_fetch", PacketFlags::empty(), None, // repair_context + turbine_disabled.clone(), ); let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier( @@ -198,6 +210,7 @@ impl ShredFetchStage { "shred_fetch_tvu_forwards", PacketFlags::FORWARDED, None, // repair_context + turbine_disabled.clone(), ); let (repair_receiver, repair_handler) = Self::packet_modifier( @@ -210,6 +223,7 @@ impl ShredFetchStage { "shred_fetch_repair", PacketFlags::REPAIR, Some((repair_socket, cluster_info)), + turbine_disabled, ); tvu_threads.extend(tvu_forwards_threads.into_iter()); diff --git a/core/src/sigverify_shreds.rs b/core/src/sigverify_shreds.rs index df92403c91..ac6093741a 100644 --- a/core/src/sigverify_shreds.rs +++ b/core/src/sigverify_shreds.rs @@ -11,10 +11,7 @@ use { solana_sdk::{clock::Slot, pubkey::Pubkey}, std::{ collections::HashMap, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, RwLock, - }, + sync::{Arc, RwLock}, thread::{Builder, JoinHandle}, time::{Duration, Instant}, }, @@ -34,7 +31,6 @@ pub(crate) fn spawn_shred_sigverify( shred_fetch_receiver: Receiver, retransmit_sender: Sender>>, verified_sender: Sender>, - turbine_disabled: Arc, ) -> JoinHandle<()> { let recycler_cache = RecyclerCache::warmed(); let mut stats = ShredSigVerifyStats::new(Instant::now()); @@ -56,7 +52,6 @@ pub(crate) fn spawn_shred_sigverify( &shred_fetch_receiver, &retransmit_sender, &verified_sender, - &turbine_disabled, &mut stats, ) { Ok(()) => (), @@ -83,7 +78,6 @@ fn run_shred_sigverify( shred_fetch_receiver: &Receiver, retransmit_sender: &Sender>>, verified_sender: &Sender>, - turbine_disabled: &AtomicBool, stats: &mut ShredSigVerifyStats, ) -> Result<(), Error> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); @@ -113,10 +107,8 @@ fn run_shred_sigverify( .map(<[u8]>::to_vec) .collect(); stats.num_retransmit_shreds += shreds.len(); - if !turbine_disabled.load(Ordering::Relaxed) { - retransmit_sender.send(shreds)?; - verified_sender.send(packets)?; - } + retransmit_sender.send(shreds)?; + verified_sender.send(packets)?; stats.elapsed_micros += now.elapsed().as_micros() as u64; Ok(()) } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 39f20c1b23..5bcc13f901 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -161,6 +161,7 @@ impl Tvu { tvu_config.shred_version, bank_forks.clone(), cluster_info.clone(), + turbine_disabled, exit, ); @@ -173,7 +174,6 @@ impl Tvu { fetch_receiver, retransmit_sender.clone(), verified_sender, - turbine_disabled, ); let retransmit_stage = RetransmitStage::new(