From c6ae0667e6db973b3e871a1dcf0f7b7bcb4f4ea6 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Tue, 19 Jan 2021 04:16:19 +0000 Subject: [PATCH] feature gates turbine retransmit peers patch (#14631) --- core/src/retransmit_stage.rs | 145 ++++++++++++++++++++++------------- sdk/src/feature_set.rs | 5 ++ 2 files changed, 95 insertions(+), 55 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 994f95356a..7075036d54 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -23,17 +23,21 @@ use solana_ledger::{ use solana_measure::measure::Measure; use solana_metrics::inc_new_counter_error; use solana_perf::packet::{Packet, Packets}; -use solana_runtime::bank_forks::BankForks; -use solana_sdk::clock::{Epoch, Slot}; -use solana_sdk::epoch_schedule::EpochSchedule; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::timing::timestamp; +use solana_runtime::{bank::Bank, bank_forks::BankForks}; +use solana_sdk::{ + clock::{Epoch, Slot}, + epoch_schedule::EpochSchedule, + feature_set, + pubkey::Pubkey, + timing::timestamp, +}; use solana_streamer::streamer::PacketReceiver; use std::{ cmp, collections::hash_set::HashSet, collections::{BTreeMap, HashMap}, net::UdpSocket, + ops::{Deref, DerefMut}, sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::mpsc::channel, sync::mpsc::RecvTimeoutError, @@ -68,7 +72,7 @@ struct RetransmitStats { #[allow(clippy::too_many_arguments)] fn update_retransmit_stats( - stats: &Arc, + stats: &RetransmitStats, total_time: u64, total_packets: usize, retransmit_total: u64, @@ -209,49 +213,62 @@ pub type ShredFilter = LruCache<(Slot, u32, bool), Vec>; pub type ShredFilterAndHasher = (ShredFilter, PacketHasher); -// Return true if shred is already received and should skip retransmit +// Returns None if shred is already received and should skip retransmit. +// Otherwise returns shred's slot. fn check_if_already_received( packet: &Packet, - shreds_received: &Arc>, -) -> bool { - match get_shred_slot_index_type(packet, &mut ShredFetchStats::default()) { - Some(slot_index) => { - let mut received = shreds_received.lock().unwrap(); - let hasher = received.1.clone(); - if let Some(sent) = received.0.get_mut(&slot_index) { - if sent.len() < MAX_DUPLICATE_COUNT { - let hash = hasher.hash_packet(packet); - if sent.contains(&hash) { - return true; - } - - sent.push(hash); - } else { - return true; - } + shreds_received: &Mutex, +) -> Option { + let shred = get_shred_slot_index_type(packet, &mut ShredFetchStats::default())?; + let mut shreds_received = shreds_received.lock().unwrap(); + let (cache, hasher) = shreds_received.deref_mut(); + match cache.get_mut(&shred) { + Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => None, + Some(sent) => { + let hash = hasher.hash_packet(packet); + if sent.contains(&hash) { + None } else { - let hash = hasher.hash_packet(&packet); - received.0.put(slot_index, vec![hash]); + sent.push(hash); + Some(shred.0) } - - false } - None => true, + None => { + let hash = hasher.hash_packet(packet); + cache.put(shred, vec![hash]); + Some(shred.0) + } + } +} + +// Returns true if turbine retransmit peers patch (#14565) is enabled. +fn enable_turbine_retransmit_peers_patch(shred_slot: Slot, root_bank: &Bank) -> bool { + let feature_slot = root_bank + .feature_set + .activated_slot(&feature_set::turbine_retransmit_peers_patch::id()); + match feature_slot { + None => false, + Some(feature_slot) => { + let epoch_schedule = root_bank.epoch_schedule(); + let feature_epoch = epoch_schedule.get_epoch(feature_slot); + let shred_epoch = epoch_schedule.get_epoch(shred_slot); + feature_epoch < shred_epoch + } } } #[allow(clippy::too_many_arguments)] fn retransmit( - bank_forks: &Arc>, - leader_schedule_cache: &Arc, + bank_forks: &RwLock, + leader_schedule_cache: &LeaderScheduleCache, cluster_info: &ClusterInfo, - r: &Arc>, + r: &Mutex, sock: &UdpSocket, id: u32, - stats: &Arc, - epoch_stakes_cache: &Arc>, - last_peer_update: &Arc, - shreds_received: &Arc>, + stats: &RetransmitStats, + epoch_stakes_cache: &RwLock, + last_peer_update: &AtomicU64, + shreds_received: &Mutex, ) -> Result<()> { let timer = Duration::new(1, 0); let r_lock = r.lock().unwrap(); @@ -269,7 +286,10 @@ fn retransmit( drop(r_lock); let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); - let r_bank = bank_forks.read().unwrap().working_bank(); + let (r_bank, root_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.working_bank(), bank_forks.root_bank()) + }; let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot()); epoch_fetch.stop(); @@ -330,19 +350,22 @@ fn retransmit( repair_total += 1; continue; } - - if check_if_already_received(packet, shreds_received) { - continue; - } - + let shred_slot = match check_if_already_received(packet, shreds_received) { + Some(slot) => slot, + None => continue, + }; let mut compute_turbine_peers = Measure::start("turbine_start"); - let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( + let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( &my_id, &r_epoch_stakes_cache.peers, &r_epoch_stakes_cache.stakes_and_index, packet.meta.seed, ); peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); + // Until the patch is activated, do the old buggy thing. + if !enable_turbine_retransmit_peers_patch(shred_slot, root_bank.deref()) { + shuffled_stakes_and_index.remove(my_index); + } // split off the indexes, we don't need the stakes anymore let indexes = shuffled_stakes_and_index .into_iter() @@ -671,41 +694,53 @@ mod tests { shred.copy_to_packet(&mut packet); let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default()))); // unique shred for (1, 5) should pass - assert!(!check_if_already_received(&packet, &shreds_received)); + assert_eq!( + check_if_already_received(&packet, &shreds_received), + Some(slot) + ); // duplicate shred for (1, 5) blocked - assert!(check_if_already_received(&packet, &shreds_received)); + assert_eq!(check_if_already_received(&packet, &shreds_received), None); let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0); shred.copy_to_packet(&mut packet); // first duplicate shred for (1, 5) passed - assert!(!check_if_already_received(&packet, &shreds_received)); + assert_eq!( + check_if_already_received(&packet, &shreds_received), + Some(slot) + ); // then blocked - assert!(check_if_already_received(&packet, &shreds_received)); + assert_eq!(check_if_already_received(&packet, &shreds_received), None); let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0); shred.copy_to_packet(&mut packet); // 2nd duplicate shred for (1, 5) blocked - assert!(check_if_already_received(&packet, &shreds_received)); - assert!(check_if_already_received(&packet, &shreds_received)); + assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert_eq!(check_if_already_received(&packet, &shreds_received), None); let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, 0, version); shred.copy_to_packet(&mut packet); // Coding at (1, 5) passes - assert!(!check_if_already_received(&packet, &shreds_received)); + assert_eq!( + check_if_already_received(&packet, &shreds_received), + Some(slot) + ); // then blocked - assert!(check_if_already_received(&packet, &shreds_received)); + assert_eq!(check_if_already_received(&packet, &shreds_received), None); let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, 0, version); shred.copy_to_packet(&mut packet); // 2nd unique coding at (1, 5) passes - assert!(!check_if_already_received(&packet, &shreds_received)); + assert_eq!( + check_if_already_received(&packet, &shreds_received), + Some(slot) + ); // same again is blocked - assert!(check_if_already_received(&packet, &shreds_received)); + assert_eq!(check_if_already_received(&packet, &shreds_received), None); let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, 0, version); shred.copy_to_packet(&mut packet); // Another unique coding at (1, 5) always blocked - assert!(check_if_already_received(&packet, &shreds_received)); - assert!(check_if_already_received(&packet, &shreds_received)); + assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert_eq!(check_if_already_received(&packet, &shreds_received), None); } } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index cc1bccdea9..05f5d6fc9d 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -138,6 +138,10 @@ pub mod use_loaded_executables { solana_sdk::declare_id!("2SLL2KLakB83YAnF1TwFb1hpycrWeHAfHYyLhwk2JRGn"); } +pub mod turbine_retransmit_peers_patch { + solana_sdk::declare_id!("5Lu3JnWSFwRYpXzwDMkanWSk6XqSuF2i5fpnVhzB5CTc"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -174,6 +178,7 @@ lazy_static! { (use_loaded_program_accounts::id(), "Use loaded program accounts"), (abort_on_all_cpi_failures::id(), "Abort on all CPI failures"), (use_loaded_executables::id(), "Use loaded executable accounts"), + (turbine_retransmit_peers_patch::id(), "turbine retransmit peers patch #14631"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()