moves turbine-disabled check to shred-fetch-stage (#30799)

If turbine_disabled is true, the commit discards turbine packets
earlier in the pipeline so that they won't interfere with the deduper
and the packets can get through once turbine is enabled again.

This is a prerequisite of:
https://github.com/solana-labs/solana/pull/30786
so that local-cluster tests pass.
This commit is contained in:
behzad nouri 2023-03-20 20:34:41 +00:00 committed by GitHub
parent c6e7aaf96c
commit e66edeb180
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 22 deletions

View File

@ -17,7 +17,10 @@ use {
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
@ -41,6 +44,7 @@ impl ShredFetchStage {
name: &'static str,
flags: PacketFlags,
repair_context: Option<(&UdpSocket, &ClusterInfo)>,
turbine_disabled: Arc<AtomicBool>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut rng = rand::thread_rng();
@ -95,16 +99,19 @@ impl ShredFetchStage {
let max_slot = last_slot + 2 * slots_per_epoch;
let should_drop_merkle_shreds =
|shred_slot| should_drop_merkle_shreds(shred_slot, &root_bank);
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
for packet in packet_batch.iter_mut() {
if should_discard_packet(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
) {
if turbine_disabled
|| should_discard_packet(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
)
{
packet.meta_mut().set_discard(true);
} else {
packet.meta_mut().flags.insert(flags);
@ -117,6 +124,7 @@ impl ShredFetchStage {
}
}
#[allow(clippy::too_many_arguments)]
fn packet_modifier(
sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
@ -127,6 +135,7 @@ impl ShredFetchStage {
name: &'static str,
flags: PacketFlags,
repair_context: Option<(Arc<UdpSocket>, Arc<ClusterInfo>)>,
turbine_disabled: Arc<AtomicBool>,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>) {
let (packet_sender, packet_receiver) = unbounded();
let streamers = sockets
@ -158,6 +167,7 @@ impl ShredFetchStage {
name,
flags,
repair_context,
turbine_disabled,
)
})
.unwrap();
@ -172,6 +182,7 @@ impl ShredFetchStage {
shred_version: u16,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
turbine_disabled: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> Self {
let recycler = PacketBatchRecycler::warmed(100, 1024);
@ -186,6 +197,7 @@ impl ShredFetchStage {
"shred_fetch",
PacketFlags::empty(),
None, // repair_context
turbine_disabled.clone(),
);
let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier(
@ -198,6 +210,7 @@ impl ShredFetchStage {
"shred_fetch_tvu_forwards",
PacketFlags::FORWARDED,
None, // repair_context
turbine_disabled.clone(),
);
let (repair_receiver, repair_handler) = Self::packet_modifier(
@ -210,6 +223,7 @@ impl ShredFetchStage {
"shred_fetch_repair",
PacketFlags::REPAIR,
Some((repair_socket, cluster_info)),
turbine_disabled,
);
tvu_threads.extend(tvu_forwards_threads.into_iter());

View File

@ -11,10 +11,7 @@ use {
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
sync::{Arc, RwLock},
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
@ -34,7 +31,6 @@ pub(crate) fn spawn_shred_sigverify(
shred_fetch_receiver: Receiver<PacketBatch>,
retransmit_sender: Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: Sender<Vec<PacketBatch>>,
turbine_disabled: Arc<AtomicBool>,
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
@ -56,7 +52,6 @@ pub(crate) fn spawn_shred_sigverify(
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
&turbine_disabled,
&mut stats,
) {
Ok(()) => (),
@ -83,7 +78,6 @@ fn run_shred_sigverify(
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
turbine_disabled: &AtomicBool,
stats: &mut ShredSigVerifyStats,
) -> Result<(), Error> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
@ -113,10 +107,8 @@ fn run_shred_sigverify(
.map(<[u8]>::to_vec)
.collect();
stats.num_retransmit_shreds += shreds.len();
if !turbine_disabled.load(Ordering::Relaxed) {
retransmit_sender.send(shreds)?;
verified_sender.send(packets)?;
}
retransmit_sender.send(shreds)?;
verified_sender.send(packets)?;
stats.elapsed_micros += now.elapsed().as_micros() as u64;
Ok(())
}

View File

@ -161,6 +161,7 @@ impl Tvu {
tvu_config.shred_version,
bank_forks.clone(),
cluster_info.clone(),
turbine_disabled,
exit,
);
@ -173,7 +174,6 @@ impl Tvu {
fetch_receiver,
retransmit_sender.clone(),
verified_sender,
turbine_disabled,
);
let retransmit_stage = RetransmitStage::new(