feature gates turbine retransmit peers patch (#14631)

This commit is contained in:
behzad nouri 2021-01-19 04:16:19 +00:00 committed by GitHub
parent 5d9dc609b1
commit c6ae0667e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 55 deletions

View File

@ -23,17 +23,21 @@ use solana_ledger::{
use solana_measure::measure::Measure; use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_error; use solana_metrics::inc_new_counter_error;
use solana_perf::packet::{Packet, Packets}; use solana_perf::packet::{Packet, Packets};
use solana_runtime::bank_forks::BankForks; use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::clock::{Epoch, Slot}; use solana_sdk::{
use solana_sdk::epoch_schedule::EpochSchedule; clock::{Epoch, Slot},
use solana_sdk::pubkey::Pubkey; epoch_schedule::EpochSchedule,
use solana_sdk::timing::timestamp; feature_set,
pubkey::Pubkey,
timing::timestamp,
};
use solana_streamer::streamer::PacketReceiver; use solana_streamer::streamer::PacketReceiver;
use std::{ use std::{
cmp, cmp,
collections::hash_set::HashSet, collections::hash_set::HashSet,
collections::{BTreeMap, HashMap}, collections::{BTreeMap, HashMap},
net::UdpSocket, net::UdpSocket,
ops::{Deref, DerefMut},
sync::atomic::{AtomicBool, AtomicU64, Ordering}, sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::mpsc::channel, sync::mpsc::channel,
sync::mpsc::RecvTimeoutError, sync::mpsc::RecvTimeoutError,
@ -68,7 +72,7 @@ struct RetransmitStats {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn update_retransmit_stats( fn update_retransmit_stats(
stats: &Arc<RetransmitStats>, stats: &RetransmitStats,
total_time: u64, total_time: u64,
total_packets: usize, total_packets: usize,
retransmit_total: u64, retransmit_total: u64,
@ -209,49 +213,62 @@ pub type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;
pub type ShredFilterAndHasher = (ShredFilter, PacketHasher); pub type ShredFilterAndHasher = (ShredFilter, PacketHasher);
// Return true if shred is already received and should skip retransmit // Returns None if shred is already received and should skip retransmit.
// Otherwise returns shred's slot.
fn check_if_already_received( fn check_if_already_received(
packet: &Packet, packet: &Packet,
shreds_received: &Arc<Mutex<ShredFilterAndHasher>>, shreds_received: &Mutex<ShredFilterAndHasher>,
) -> bool { ) -> Option<Slot> {
match get_shred_slot_index_type(packet, &mut ShredFetchStats::default()) { let shred = get_shred_slot_index_type(packet, &mut ShredFetchStats::default())?;
Some(slot_index) => { let mut shreds_received = shreds_received.lock().unwrap();
let mut received = shreds_received.lock().unwrap(); let (cache, hasher) = shreds_received.deref_mut();
let hasher = received.1.clone(); match cache.get_mut(&shred) {
if let Some(sent) = received.0.get_mut(&slot_index) { Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => None,
if sent.len() < MAX_DUPLICATE_COUNT { Some(sent) => {
let hash = hasher.hash_packet(packet); let hash = hasher.hash_packet(packet);
if sent.contains(&hash) { if sent.contains(&hash) {
return true; None
} } else {
sent.push(hash); sent.push(hash);
} else { Some(shred.0)
return true;
} }
} else {
let hash = hasher.hash_packet(&packet);
received.0.put(slot_index, vec![hash]);
} }
None => {
let hash = hasher.hash_packet(packet);
cache.put(shred, vec![hash]);
Some(shred.0)
}
}
}
false // Returns true if turbine retransmit peers patch (#14565) is enabled.
fn enable_turbine_retransmit_peers_patch(shred_slot: Slot, root_bank: &Bank) -> bool {
let feature_slot = root_bank
.feature_set
.activated_slot(&feature_set::turbine_retransmit_peers_patch::id());
match feature_slot {
None => false,
Some(feature_slot) => {
let epoch_schedule = root_bank.epoch_schedule();
let feature_epoch = epoch_schedule.get_epoch(feature_slot);
let shred_epoch = epoch_schedule.get_epoch(shred_slot);
feature_epoch < shred_epoch
} }
None => true,
} }
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn retransmit( fn retransmit(
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &LeaderScheduleCache,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
r: &Arc<Mutex<PacketReceiver>>, r: &Mutex<PacketReceiver>,
sock: &UdpSocket, sock: &UdpSocket,
id: u32, id: u32,
stats: &Arc<RetransmitStats>, stats: &RetransmitStats,
epoch_stakes_cache: &Arc<RwLock<EpochStakesCache>>, epoch_stakes_cache: &RwLock<EpochStakesCache>,
last_peer_update: &Arc<AtomicU64>, last_peer_update: &AtomicU64,
shreds_received: &Arc<Mutex<ShredFilterAndHasher>>, shreds_received: &Mutex<ShredFilterAndHasher>,
) -> Result<()> { ) -> Result<()> {
let timer = Duration::new(1, 0); let timer = Duration::new(1, 0);
let r_lock = r.lock().unwrap(); let r_lock = r.lock().unwrap();
@ -269,7 +286,10 @@ fn retransmit(
drop(r_lock); drop(r_lock);
let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); let mut epoch_fetch = Measure::start("retransmit_epoch_fetch");
let r_bank = bank_forks.read().unwrap().working_bank(); let (r_bank, root_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.working_bank(), bank_forks.root_bank())
};
let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot()); let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot());
epoch_fetch.stop(); epoch_fetch.stop();
@ -330,19 +350,22 @@ fn retransmit(
repair_total += 1; repair_total += 1;
continue; continue;
} }
let shred_slot = match check_if_already_received(packet, shreds_received) {
if check_if_already_received(packet, shreds_received) { Some(slot) => slot,
continue; None => continue,
} };
let mut compute_turbine_peers = Measure::start("turbine_start"); let mut compute_turbine_peers = Measure::start("turbine_start");
let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id, &my_id,
&r_epoch_stakes_cache.peers, &r_epoch_stakes_cache.peers,
&r_epoch_stakes_cache.stakes_and_index, &r_epoch_stakes_cache.stakes_and_index,
packet.meta.seed, packet.meta.seed,
); );
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len()); peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
// Until the patch is activated, do the old buggy thing.
if !enable_turbine_retransmit_peers_patch(shred_slot, root_bank.deref()) {
shuffled_stakes_and_index.remove(my_index);
}
// split off the indexes, we don't need the stakes anymore // split off the indexes, we don't need the stakes anymore
let indexes = shuffled_stakes_and_index let indexes = shuffled_stakes_and_index
.into_iter() .into_iter()
@ -671,41 +694,53 @@ mod tests {
shred.copy_to_packet(&mut packet); shred.copy_to_packet(&mut packet);
let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default()))); let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default())));
// unique shred for (1, 5) should pass // unique shred for (1, 5) should pass
assert!(!check_if_already_received(&packet, &shreds_received)); assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
// duplicate shred for (1, 5) blocked // duplicate shred for (1, 5) blocked
assert!(check_if_already_received(&packet, &shreds_received)); assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0); let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet); shred.copy_to_packet(&mut packet);
// first duplicate shred for (1, 5) passed // first duplicate shred for (1, 5) passed
assert!(!check_if_already_received(&packet, &shreds_received)); assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
// then blocked // then blocked
assert!(check_if_already_received(&packet, &shreds_received)); assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0); let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0);
shred.copy_to_packet(&mut packet); shred.copy_to_packet(&mut packet);
// 2nd duplicate shred for (1, 5) blocked // 2nd duplicate shred for (1, 5) blocked
assert!(check_if_already_received(&packet, &shreds_received)); assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert!(check_if_already_received(&packet, &shreds_received)); assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, 0, version); let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, 0, version);
shred.copy_to_packet(&mut packet); shred.copy_to_packet(&mut packet);
// Coding at (1, 5) passes // Coding at (1, 5) passes
assert!(!check_if_already_received(&packet, &shreds_received)); assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
// then blocked // then blocked
assert!(check_if_already_received(&packet, &shreds_received)); assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, 0, version); let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, 0, version);
shred.copy_to_packet(&mut packet); shred.copy_to_packet(&mut packet);
// 2nd unique coding at (1, 5) passes // 2nd unique coding at (1, 5) passes
assert!(!check_if_already_received(&packet, &shreds_received)); assert_eq!(
check_if_already_received(&packet, &shreds_received),
Some(slot)
);
// same again is blocked // same again is blocked
assert!(check_if_already_received(&packet, &shreds_received)); assert_eq!(check_if_already_received(&packet, &shreds_received), None);
let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, 0, version); let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, 0, version);
shred.copy_to_packet(&mut packet); shred.copy_to_packet(&mut packet);
// Another unique coding at (1, 5) always blocked // Another unique coding at (1, 5) always blocked
assert!(check_if_already_received(&packet, &shreds_received)); assert_eq!(check_if_already_received(&packet, &shreds_received), None);
assert!(check_if_already_received(&packet, &shreds_received)); assert_eq!(check_if_already_received(&packet, &shreds_received), None);
} }
} }

View File

@ -138,6 +138,10 @@ pub mod use_loaded_executables {
solana_sdk::declare_id!("2SLL2KLakB83YAnF1TwFb1hpycrWeHAfHYyLhwk2JRGn"); solana_sdk::declare_id!("2SLL2KLakB83YAnF1TwFb1hpycrWeHAfHYyLhwk2JRGn");
} }
pub mod turbine_retransmit_peers_patch {
solana_sdk::declare_id!("5Lu3JnWSFwRYpXzwDMkanWSk6XqSuF2i5fpnVhzB5CTc");
}
lazy_static! { lazy_static! {
/// Map of feature identifiers to user-visible description /// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [ pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
@ -174,6 +178,7 @@ lazy_static! {
(use_loaded_program_accounts::id(), "Use loaded program accounts"), (use_loaded_program_accounts::id(), "Use loaded program accounts"),
(abort_on_all_cpi_failures::id(), "Abort on all CPI failures"), (abort_on_all_cpi_failures::id(), "Abort on all CPI failures"),
(use_loaded_executables::id(), "Use loaded executable accounts"), (use_loaded_executables::id(), "Use loaded executable accounts"),
(turbine_retransmit_peers_patch::id(), "turbine retransmit peers patch #14631"),
/*************** ADD NEW FEATURES HERE ***************/ /*************** ADD NEW FEATURES HERE ***************/
] ]
.iter() .iter()