From 5c95d8e963ea2d62dfb280a70ec87a0f8421dfba Mon Sep 17 00:00:00 2001 From: sakridge Date: Thu, 10 Dec 2020 07:54:15 -0800 Subject: [PATCH] Shred filter (#14030) --- core/benches/retransmit_stage.rs | 50 ++++++++++++++++------ core/src/retransmit_stage.rs | 72 +++++++++++++++++++++++++++----- core/src/shred_fetch_stage.rs | 4 +- 3 files changed, 101 insertions(+), 25 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index 1a6be80188..9a02721240 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -7,14 +7,18 @@ use log::*; use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::contact_info::ContactInfo; use solana_core::retransmit_stage::retransmitter; +use solana_ledger::entry::Entry; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; +use solana_ledger::shred::Shredder; use solana_measure::measure::Measure; -use solana_perf::packet::to_packets_chunked; -use solana_perf::test_tx::test_tx; +use solana_perf::packet::{Packet, Packets}; use solana_runtime::bank::Bank; use solana_runtime::bank_forks::BankForks; +use solana_sdk::hash::Hash; use solana_sdk::pubkey; +use solana_sdk::signature::{Keypair, Signer}; +use solana_sdk::system_transaction; use solana_sdk::timing::timestamp; use std::net::UdpSocket; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -63,14 +67,24 @@ fn bench_retransmitter(bencher: &mut Bencher) { let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); // To work reliably with higher values, this needs larger udp rmem size - let tx = test_tx(); - const NUM_PACKETS: usize = 50; - let chunk_size = NUM_PACKETS / (4 * NUM_THREADS); - let batches = to_packets_chunked( - &std::iter::repeat(tx).take(NUM_PACKETS).collect::>(), - chunk_size, - ); - info!("batches: {}", batches.len()); + let entries: Vec<_> = (0..5) + .map(|_| { + let keypair0 = Keypair::new(); + let keypair1 = Keypair::new(); + let tx0 = + system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default()); + Entry::new(&Hash::default(), 1, vec![tx0]) + }) + .collect(); + + let keypair = Arc::new(Keypair::new()); + let slot = 0; + let parent = 0; + let shredder = + Shredder::new(slot, parent, 0.0, keypair, 0, 0).expect("Failed to create entry shredder"); + let mut data_shreds = shredder.entries_to_shreds(&entries, true, 0).0; + + let num_packets = data_shreds.len(); let retransmitter_handles = retransmitter( Arc::new(sockets), @@ -80,6 +94,8 @@ fn bench_retransmitter(bencher: &mut Bencher) { packet_receiver, ); + let mut index = 0; + let mut slot = 0; let total = Arc::new(AtomicUsize::new(0)); bencher.iter(move || { let peer_sockets1 = peer_sockets.clone(); @@ -96,7 +112,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { while peer_sockets2[p].recv(&mut buf).is_ok() { total2.fetch_add(1, Ordering::Relaxed); } - if total2.load(Ordering::Relaxed) >= NUM_PACKETS { + if total2.load(Ordering::Relaxed) >= num_packets { break; } info!("{} recv", total2.load(Ordering::Relaxed)); @@ -107,9 +123,17 @@ fn bench_retransmitter(bencher: &mut Bencher) { }) .collect(); - for packets in batches.clone() { - packet_sender.send(packets).unwrap(); + for shred in data_shreds.iter_mut() { + shred.set_slot(slot); + shred.set_index(index); + index += 1; + index %= 200; + let mut p = Packet::default(); + shred.copy_to_packet(&mut p); + let _ = packet_sender.send(Packets::new(vec![p])); } + slot += 1; + info!("sent..."); let mut join_time = Measure::start("join"); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 1e75c45644..38f7238117 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -1,5 +1,7 @@ //! The `retransmit_stage` retransmits shreds between validators +use crate::shred_fetch_stage::ShredFetchStage; +use crate::shred_fetch_stage::ShredFetchStats; use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, cluster_info_vote_listener::VerifiedVoteReceiver, @@ -12,7 +14,10 @@ use crate::{ result::{Error, Result}, window_service::{should_retransmit_and_persist, WindowService}, }; +use ahash::AHasher; use crossbeam_channel::Receiver; +use lru::LruCache; +use rand::{thread_rng, Rng}; use solana_ledger::{ blockstore::{Blockstore, CompletedSlotsReceiver}, leader_schedule_cache::LeaderScheduleCache, @@ -27,6 +32,7 @@ use solana_sdk::epoch_schedule::EpochSchedule; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use solana_streamer::streamer::PacketReceiver; +use std::hash::Hasher; use std::{ cmp, collections::hash_set::HashSet, @@ -41,6 +47,9 @@ use std::{ time::Duration, }; +const MAX_DUPLICATE_COUNT: usize = 2; +const DEFAULT_LRU_SIZE: usize = 10_000; + // Limit a given thread to consume about this many packets so that // it doesn't pull up too much work. const MAX_PACKET_BATCH_SIZE: usize = 100; @@ -196,6 +205,9 @@ struct EpochStakesCache { stakes_and_index: Vec<(u64, usize)>, } +pub type ShredFilterAndSeeds = (LruCache<(Slot, u32), Vec>, u128, u128); + +#[allow(clippy::too_many_arguments)] fn retransmit( bank_forks: &Arc>, leader_schedule_cache: &Arc, @@ -206,6 +218,7 @@ fn retransmit( stats: &Arc, epoch_stakes_cache: &Arc>, last_peer_update: &Arc, + shreds_received: &Arc>, ) -> Result<()> { let timer = Duration::new(1, 0); let r_lock = r.lock().unwrap(); @@ -254,6 +267,12 @@ fn retransmit( w_epoch_stakes_cache.stakes_and_index = stakes_and_index; drop(w_epoch_stakes_cache); r_epoch_stakes_cache = epoch_stakes_cache.read().unwrap(); + { + let mut sr = shreds_received.lock().unwrap(); + sr.0.clear(); + sr.1 = thread_rng().gen::(); + sr.2 = thread_rng().gen::(); + } } let mut peers_len = 0; epoch_cache_update.stop(); @@ -279,6 +298,33 @@ fn retransmit( continue; } + match ShredFetchStage::get_slot_index(packet, &mut ShredFetchStats::default()) { + Some(slot_index) => { + let mut received = shreds_received.lock().unwrap(); + let seed1 = received.1; + let seed2 = received.2; + if let Some(sent) = received.0.get_mut(&slot_index) { + if sent.len() < MAX_DUPLICATE_COUNT { + let mut hasher = AHasher::new_with_keys(seed1, seed2); + hasher.write(&packet.data[0..packet.meta.size]); + let hash = hasher.finish(); + if sent.contains(&hash) { + continue; + } + + sent.push(hash); + } else { + continue; + } + } else { + let mut hasher = AHasher::new_with_keys(seed1, seed2); + hasher.write(&packet.data[0..packet.meta.size]); + let hash = hasher.finish(); + received.0.put(slot_index, vec![hash]); + } + } + None => continue, + } let mut compute_turbine_peers = Measure::start("turbine_start"); let (my_index, mut shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index( &my_id, @@ -367,6 +413,7 @@ pub fn retransmitter( r: Arc>, ) -> Vec> { let stats = Arc::new(RetransmitStats::default()); + let shreds_received = Arc::new(Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), 0, 0))); (0..sockets.len()) .map(|s| { let sockets = sockets.clone(); @@ -377,6 +424,7 @@ pub fn retransmitter( let stats = stats.clone(); let epoch_stakes_cache = Arc::new(RwLock::new(EpochStakesCache::default())); let last_peer_update = Arc::new(AtomicU64::new(0)); + let shreds_received = shreds_received.clone(); Builder::new() .name("solana-retransmitter".to_string()) @@ -393,6 +441,7 @@ pub fn retransmitter( &stats, &epoch_stakes_cache, &last_peer_update, + &shreds_received, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -519,11 +568,12 @@ mod tests { use solana_ledger::create_new_tmp_ledger; use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; use solana_net_utils::find_available_port_in_range; - use solana_perf::packet::{Meta, Packet, Packets}; + use solana_perf::packet::{Packet, Packets}; use std::net::{IpAddr, Ipv4Addr}; #[test] fn test_skip_repair() { + solana_logger::setup(); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(123); let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_config); let blockstore = Blockstore::open(&ledger_path).unwrap(); @@ -565,7 +615,12 @@ mod tests { ); let _thread_hdls = vec![t_retransmit]; - let packets = Packets::new(vec![Packet::default()]); + let mut shred = + solana_ledger::shred::Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); + let mut packet = Packet::default(); + shred.copy_to_packet(&mut packet); + + let packets = Packets::new(vec![packet.clone()]); // it should send this over the sockets. retransmit_sender.send(packets).unwrap(); let mut packets = Packets::new(vec![]); @@ -573,16 +628,13 @@ mod tests { assert_eq!(packets.packets.len(), 1); assert_eq!(packets.packets[0].meta.repair, false); - let repair = Packet { - meta: Meta { - repair: true, - ..Meta::default() - }, - ..Packet::default() - }; + let mut repair = packet.clone(); + repair.meta.repair = true; + shred.set_slot(1); + shred.copy_to_packet(&mut packet); // send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from - let packets = Packets::new(vec![repair, Packet::default()]); + let packets = Packets::new(vec![repair, packet]); retransmit_sender.send(packets).unwrap(); let mut packets = Packets::new(vec![]); solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 781e0c9095..b91e6233b1 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -28,7 +28,7 @@ const DEFAULT_LRU_SIZE: usize = 10_000; pub type ShredsReceived = LruCache; #[derive(Default)] -struct ShredFetchStats { +pub struct ShredFetchStats { index_overrun: usize, shred_count: usize, index_bad_deserialize: usize, @@ -43,7 +43,7 @@ pub struct ShredFetchStage { } impl ShredFetchStage { - fn get_slot_index(p: &Packet, stats: &mut ShredFetchStats) -> Option<(u64, u32)> { + pub fn get_slot_index(p: &Packet, stats: &mut ShredFetchStats) -> Option<(u64, u32)> { let index_start = OFFSET_OF_SHRED_INDEX; let index_end = index_start + SIZE_OF_SHRED_INDEX; let slot_start = OFFSET_OF_SHRED_SLOT;