From 3efccbffab6393358ad3728742d9e70d40d4e560 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 12 Aug 2021 12:04:01 -0400 Subject: [PATCH] sends shreds (instead of packets) to retransmit stage Working towards channelling through shreds recovered from erasure codes to retransmit stage. --- core/benches/retransmit_stage.rs | 27 ++-- core/src/packet_hasher.rs | 24 +++- core/src/retransmit_stage.rs | 229 ++++++++++++------------------- core/src/window_service.rs | 72 ++++------ gossip/src/cluster_info.rs | 4 +- gossip/tests/gossip.rs | 2 +- 6 files changed, 148 insertions(+), 210 deletions(-) diff --git a/core/benches/retransmit_stage.rs b/core/benches/retransmit_stage.rs index fbe842af7..cc9484318 100644 --- a/core/benches/retransmit_stage.rs +++ b/core/benches/retransmit_stage.rs @@ -17,11 +17,10 @@ use { shred::Shredder, }, solana_measure::measure::Measure, - solana_perf::packet::{Packet, Packets}, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ hash::Hash, - pubkey, + pubkey::Pubkey, signature::{Keypair, Signer}, system_transaction, timing::timestamp, @@ -40,6 +39,13 @@ use { test::Bencher, }; +// TODO: The benchmark is ignored as it currently may indefinitely block. +// The code incorrectly expects that the node receiving the shred on tvu socket +// retransmits that to other nodes in its neighborhood. But that is no longer +// the case since https://github.com/solana-labs/solana/pull/17716. +// So depending on shred seed, peers may not receive packets and the receive +// threads loop indefinitely. +#[ignore] #[bench] #[allow(clippy::same_item_push)] fn bench_retransmitter(bencher: &mut Bencher) { @@ -52,12 +58,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { const NUM_PEERS: usize = 4; let mut peer_sockets = Vec::new(); for _ in 0..NUM_PEERS { - // This ensures that cluster_info.id() is the root of turbine - // retransmit tree and so the shreds are retransmited to all other - // nodes in the cluster. - let id = std::iter::repeat_with(pubkey::new_rand) - .find(|pk| cluster_info.id() < *pk) - .unwrap(); + let id = Pubkey::new_unique(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); contact_info.tvu = socket.local_addr().unwrap(); @@ -76,8 +77,8 @@ fn bench_retransmitter(bencher: &mut Bencher) { let bank_forks = BankForks::new(bank0); let bank = bank_forks.working_bank(); let bank_forks = Arc::new(RwLock::new(bank_forks)); - let (packet_sender, packet_receiver) = channel(); - let packet_receiver = Arc::new(Mutex::new(packet_receiver)); + let (shreds_sender, shreds_receiver) = channel(); + let shreds_receiver = Arc::new(Mutex::new(shreds_receiver)); const NUM_THREADS: usize = 2; let sockets = (0..NUM_THREADS) .map(|_| UdpSocket::bind("0.0.0.0:0").unwrap()) @@ -109,7 +110,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { bank_forks, leader_schedule_cache, cluster_info, - packet_receiver, + shreds_receiver, Arc::default(), // solana_rpc::max_slots::MaxSlots None, ); @@ -148,9 +149,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { shred.set_index(index); index += 1; index %= 200; - let mut p = Packet::default(); - shred.copy_to_packet(&mut p); - let _ = packet_sender.send(Packets::new(vec![p])); + let _ = shreds_sender.send(vec![shred.clone()]); } slot += 1; diff --git a/core/src/packet_hasher.rs b/core/src/packet_hasher.rs index 77612428c..575c9733f 100644 --- a/core/src/packet_hasher.rs +++ b/core/src/packet_hasher.rs @@ -1,10 +1,13 @@ // Get a unique hash value for a packet // Used in retransmit and shred fetch to prevent dos with same packet data. -use ahash::AHasher; -use rand::{thread_rng, Rng}; -use solana_perf::packet::Packet; -use std::hash::Hasher; +use { + ahash::AHasher, + rand::{thread_rng, Rng}, + solana_ledger::shred::Shred, + solana_perf::packet::Packet, + std::hash::Hasher, +}; #[derive(Clone)] pub struct PacketHasher { @@ -22,9 +25,18 @@ impl Default for PacketHasher { } impl PacketHasher { - pub fn hash_packet(&self, packet: &Packet) -> u64 { + pub(crate) fn hash_packet(&self, packet: &Packet) -> u64 { + let size = packet.data.len().min(packet.meta.size); + self.hash_data(&packet.data[..size]) + } + + pub(crate) fn hash_shred(&self, shred: &Shred) -> u64 { + self.hash_data(&shred.payload) + } + + fn hash_data(&self, data: &[u8]) -> u64 { let mut hasher = AHasher::new_with_keys(self.seed1, self.seed2); - hasher.write(&packet.data[0..packet.meta.size]); + hasher.write(data); hasher.finish() } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 36aed021a..cd78ef95d 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -19,12 +19,12 @@ use { solana_client::rpc_response::SlotUpdate, solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, solana_ledger::{ - shred::{get_shred_slot_index_type, ShredFetchStats}, + shred::Shred, {blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}, }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, - solana_perf::packet::{Packet, Packets}, + solana_perf::packet::Packets, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ @@ -33,14 +33,13 @@ use { pubkey::Pubkey, timing::{timestamp, AtomicInterval}, }, - solana_streamer::streamer::PacketReceiver, std::{ collections::{BTreeSet, HashSet}, net::UdpSocket, ops::DerefMut, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - mpsc::{channel, RecvTimeoutError}, + mpsc::{self, channel, RecvTimeoutError}, Arc, Mutex, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -51,16 +50,17 @@ use { const MAX_DUPLICATE_COUNT: usize = 2; const DEFAULT_LRU_SIZE: usize = 10_000; -// Limit a given thread to consume about this many packets so that +// Limit a given thread to consume about this many shreds so that // it doesn't pull up too much work. -const MAX_PACKET_BATCH_SIZE: usize = 100; +const MAX_SHREDS_BATCH_SIZE: usize = 100; const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8; const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); #[derive(Default)] struct RetransmitStats { - total_packets: AtomicU64, + num_shreds: AtomicU64, + num_shreds_skipped: AtomicU64, total_batches: AtomicU64, total_time: AtomicU64, epoch_fetch: AtomicU64, @@ -68,25 +68,27 @@ struct RetransmitStats { retransmit_total: AtomicU64, last_ts: AtomicInterval, compute_turbine_peers_total: AtomicU64, - retransmit_tree_mismatch: AtomicU64, } #[allow(clippy::too_many_arguments)] fn update_retransmit_stats( stats: &RetransmitStats, total_time: u64, - total_packets: usize, + num_shreds: usize, + num_shreds_skipped: usize, retransmit_total: u64, compute_turbine_peers_total: u64, peers_len: usize, epoch_fetch: u64, epoch_cach_update: u64, - retransmit_tree_mismatch: u64, ) { stats.total_time.fetch_add(total_time, Ordering::Relaxed); stats - .total_packets - .fetch_add(total_packets as u64, Ordering::Relaxed); + .num_shreds + .fetch_add(num_shreds as u64, Ordering::Relaxed); + stats + .num_shreds_skipped + .fetch_add(num_shreds_skipped as u64, Ordering::Relaxed); stats .retransmit_total .fetch_add(retransmit_total, Ordering::Relaxed); @@ -98,9 +100,6 @@ fn update_retransmit_stats( stats .epoch_cache_update .fetch_add(epoch_cach_update, Ordering::Relaxed); - stats - .retransmit_tree_mismatch - .fetch_add(retransmit_tree_mismatch, Ordering::Relaxed); if stats.last_ts.should_update(2000) { datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); datapoint_info!( @@ -126,8 +125,13 @@ fn update_retransmit_stats( i64 ), ( - "total_packets", - stats.total_packets.swap(0, Ordering::Relaxed) as i64, + "num_shreds", + stats.num_shreds.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "num_shreds_skipped", + stats.num_shreds_skipped.swap(0, Ordering::Relaxed) as i64, i64 ), ( @@ -135,11 +139,6 @@ fn update_retransmit_stats( stats.retransmit_total.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "retransmit_tree_mismatch", - stats.retransmit_tree_mismatch.swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "compute_turbine", stats.compute_turbine_peers_total.swap(0, Ordering::Relaxed) as i64, @@ -150,34 +149,30 @@ fn update_retransmit_stats( } // Map of shred (slot, index, is_data) => list of hash values seen for that key. -pub type ShredFilter = LruCache<(Slot, u32, bool), Vec>; +type ShredFilter = LruCache<(Slot, u32, bool), Vec>; -pub type ShredFilterAndHasher = (ShredFilter, PacketHasher); +type ShredFilterAndHasher = (ShredFilter, PacketHasher); -// Returns None if shred is already received and should skip retransmit. -// Otherwise returns shred's slot and whether the shred is a data shred. -fn check_if_already_received( - packet: &Packet, - shreds_received: &Mutex, -) -> Option { - let shred = get_shred_slot_index_type(packet, &mut ShredFetchStats::default())?; +// Returns true if shred is already received and should skip retransmit. +fn should_skip_retransmit(shred: &Shred, shreds_received: &Mutex) -> bool { + let key = (shred.slot(), shred.index(), shred.is_data()); let mut shreds_received = shreds_received.lock().unwrap(); let (cache, hasher) = shreds_received.deref_mut(); - match cache.get_mut(&shred) { - Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => None, + match cache.get_mut(&key) { + Some(sent) if sent.len() >= MAX_DUPLICATE_COUNT => true, Some(sent) => { - let hash = hasher.hash_packet(packet); + let hash = hasher.hash_shred(shred); if sent.contains(&hash) { - None + true } else { sent.push(hash); - Some(shred.0) + false } } None => { - let hash = hasher.hash_packet(packet); - cache.put(shred, vec![hash]); - Some(shred.0) + let hash = hasher.hash_shred(shred); + cache.put(key, vec![hash]); + false } } } @@ -232,7 +227,7 @@ fn retransmit( bank_forks: &RwLock, leader_schedule_cache: &LeaderScheduleCache, cluster_info: &ClusterInfo, - r: &Mutex, + shreds_receiver: &Mutex>>, sock: &UdpSocket, id: u32, stats: &RetransmitStats, @@ -244,19 +239,16 @@ fn retransmit( rpc_subscriptions: Option<&RpcSubscriptions>, ) -> Result<()> { const RECV_TIMEOUT: Duration = Duration::from_secs(1); - let r_lock = r.lock().unwrap(); - let packets = r_lock.recv_timeout(RECV_TIMEOUT)?; + let shreds_receiver = shreds_receiver.lock().unwrap(); + let mut shreds = shreds_receiver.recv_timeout(RECV_TIMEOUT)?; let mut timer_start = Measure::start("retransmit"); - let mut total_packets = packets.packets.len(); - let mut packets = vec![packets]; - while let Ok(nq) = r_lock.try_recv() { - total_packets += nq.packets.len(); - packets.push(nq); - if total_packets >= MAX_PACKET_BATCH_SIZE { + while let Ok(more_shreds) = shreds_receiver.try_recv() { + shreds.extend(more_shreds); + if shreds.len() >= MAX_SHREDS_BATCH_SIZE { break; } } - drop(r_lock); + drop(shreds_receiver); let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); let (working_bank, root_bank) = { @@ -269,26 +261,19 @@ fn retransmit( maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts); epoch_cache_update.stop(); + let num_shreds = shreds.len(); let my_id = cluster_info.id(); let socket_addr_space = cluster_info.socket_addr_space(); let mut retransmit_total = 0; + let mut num_shreds_skipped = 0; let mut compute_turbine_peers_total = 0; - let mut retransmit_tree_mismatch = 0; let mut max_slot = 0; - for packet in packets.iter().flat_map(|p| p.packets.iter()) { - // skip discarded packets and repair packets - if packet.meta.discard { - total_packets -= 1; + for shred in shreds { + if should_skip_retransmit(&shred, shreds_received) { + num_shreds_skipped += 1; continue; } - if packet.meta.repair { - total_packets -= 1; - continue; - } - let shred_slot = match check_if_already_received(packet, shreds_received) { - Some(slot) => slot, - None => continue, - }; + let shred_slot = shred.slot(); max_slot = max_slot.max(shred_slot); if let Some(rpc_subscriptions) = rpc_subscriptions { @@ -301,20 +286,14 @@ fn retransmit( } let mut compute_turbine_peers = Measure::start("turbine_start"); + // TODO: consider using root-bank here for leader lookup! let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); + let shred_seed = shred.seed(slot_leader, &root_bank); let (neighbors, children) = - cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); - // If the node is on the critical path (i.e. the first node in each - // neighborhood), then we expect that the packet arrives at tvu socket - // as opposed to tvu-forwards. If this is not the case, then the - // turbine broadcast/retransmit tree is mismatched across nodes. + cluster_nodes.get_retransmit_peers(shred_seed, DATA_PLANE_FANOUT, slot_leader); let anchor_node = neighbors[0].id == my_id; - if packet.meta.forward == anchor_node { - // TODO: Consider forwarding the packet to the root node here. - retransmit_tree_mismatch += 1; - } compute_turbine_peers.stop(); compute_turbine_peers_total += compute_turbine_peers.as_us(); @@ -327,15 +306,15 @@ fn retransmit( // First neighbor is this node itself, so skip it. ClusterInfo::retransmit_to( &neighbors[1..], - packet, + &shred.payload, sock, - /*forward socket=*/ true, + true, // forward socket socket_addr_space, ); } ClusterInfo::retransmit_to( &children, - packet, + &shred.payload, sock, !anchor_node, // send to forward socket! socket_addr_space, @@ -346,8 +325,8 @@ fn retransmit( max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed); timer_start.stop(); debug!( - "retransmitted {} packets in {}ms retransmit_time: {}ms id: {}", - total_packets, + "retransmitted {} shreds in {}ms retransmit_time: {}ms id: {}", + num_shreds, timer_start.as_ms(), retransmit_total, id, @@ -357,13 +336,13 @@ fn retransmit( update_retransmit_stats( stats, timer_start.as_us(), - total_packets, + num_shreds, + num_shreds_skipped, retransmit_total, compute_turbine_peers_total, cluster_nodes.num_peers(), epoch_fetch.as_us(), epoch_cache_update.as_us(), - retransmit_tree_mismatch, ); Ok(()) @@ -382,7 +361,7 @@ pub fn retransmitter( bank_forks: Arc>, leader_schedule_cache: Arc, cluster_info: Arc, - r: Arc>, + shreds_receiver: Arc>>>, max_slots: Arc, rpc_subscriptions: Option>, ) -> Vec> { @@ -402,7 +381,7 @@ pub fn retransmitter( let sockets = sockets.clone(); let bank_forks = bank_forks.clone(); let leader_schedule_cache = leader_schedule_cache.clone(); - let r = r.clone(); + let shreds_receiver = shreds_receiver.clone(); let cluster_info = cluster_info.clone(); let stats = stats.clone(); let cluster_nodes_cache = Arc::clone(&cluster_nodes_cache); @@ -421,7 +400,7 @@ pub fn retransmitter( &bank_forks, &leader_schedule_cache, &cluster_info, - &r, + &shreds_receiver, &sockets[s], s as u32, &stats, @@ -559,17 +538,19 @@ impl RetransmitStage { #[cfg(test)] mod tests { - use super::*; - use solana_gossip::contact_info::ContactInfo; - use solana_ledger::blockstore_processor::{process_blockstore, ProcessOptions}; - use solana_ledger::create_new_tmp_ledger; - use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use solana_ledger::shred::Shred; - use solana_net_utils::find_available_port_in_range; - use solana_perf::packet::{Packet, Packets}; - use solana_sdk::signature::Keypair; - use solana_streamer::socket::SocketAddrSpace; - use std::net::{IpAddr, Ipv4Addr}; + use { + super::*, + solana_gossip::contact_info::ContactInfo, + solana_ledger::{ + blockstore_processor::{process_blockstore, ProcessOptions}, + create_new_tmp_ledger, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }, + solana_net_utils::find_available_port_in_range, + solana_sdk::signature::Keypair, + solana_streamer::socket::SocketAddrSpace, + std::net::{IpAddr, Ipv4Addr}, + }; #[test] fn test_skip_repair() { @@ -627,26 +608,9 @@ mod tests { None, ); - let mut shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); - let mut packet = Packet::default(); - shred.copy_to_packet(&mut packet); - - let packets = Packets::new(vec![packet.clone()]); + let shred = Shred::new_from_data(0, 0, 0, None, true, true, 0, 0x20, 0); // it should send this over the sockets. - retransmit_sender.send(packets).unwrap(); - let mut packets = Packets::new(vec![]); - solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); - assert_eq!(packets.packets.len(), 1); - assert!(!packets.packets[0].meta.repair); - - let mut repair = packet.clone(); - repair.meta.repair = true; - - shred.set_slot(1); - shred.copy_to_packet(&mut packet); - // send 1 repair and 1 "regular" packet so that we don't block forever on the recv_from - let packets = Packets::new(vec![repair, packet]); - retransmit_sender.send(packets).unwrap(); + retransmit_sender.send(vec![shred]).unwrap(); let mut packets = Packets::new(vec![]); solana_streamer::packet::recv_from(&mut packets, &me_retransmit, 1).unwrap(); assert_eq!(packets.packets.len(), 1); @@ -655,61 +619,42 @@ mod tests { #[test] fn test_already_received() { - let mut packet = Packet::default(); let slot = 1; let index = 5; let version = 0x40; let shred = Shred::new_from_data(slot, index, 0, None, true, true, 0, version, 0); - shred.copy_to_packet(&mut packet); let shreds_received = Arc::new(Mutex::new((LruCache::new(100), PacketHasher::default()))); // unique shred for (1, 5) should pass - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // duplicate shred for (1, 5) blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_from_data(slot, index, 2, None, true, true, 0, version, 0); - shred.copy_to_packet(&mut packet); // first duplicate shred for (1, 5) passed - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // then blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_from_data(slot, index, 8, None, true, true, 0, version, 0); - shred.copy_to_packet(&mut packet); // 2nd duplicate shred for (1, 5) blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_empty_coding(slot, index, 0, 1, 1, version); - shred.copy_to_packet(&mut packet); // Coding at (1, 5) passes - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // then blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_empty_coding(slot, index, 2, 1, 1, version); - shred.copy_to_packet(&mut packet); // 2nd unique coding at (1, 5) passes - assert_eq!( - check_if_already_received(&packet, &shreds_received), - Some(slot) - ); + assert!(!should_skip_retransmit(&shred, &shreds_received)); // same again is blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); let shred = Shred::new_empty_coding(slot, index, 3, 1, 1, version); - shred.copy_to_packet(&mut packet); // Another unique coding at (1, 5) always blocked - assert_eq!(check_if_already_received(&packet, &shreds_received), None); - assert_eq!(check_if_already_received(&packet, &shreds_received), None); + assert!(should_skip_retransmit(&shred, &shreds_received)); + assert!(should_skip_retransmit(&shred, &shreds_received)); } } diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 9ea6f030a..0ea699dad 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -26,15 +26,14 @@ use { solana_rayon_threadlimit::get_thread_count, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{clock::Slot, packet::PACKET_DATA_SIZE, pubkey::Pubkey}, - solana_streamer::streamer::PacketSender, std::collections::HashSet, std::{ cmp::Reverse, collections::HashMap, net::{SocketAddr, UdpSocket}, - ops::Deref, sync::{ atomic::{AtomicBool, Ordering}, + mpsc::Sender, Arc, RwLock, }, thread::{self, Builder, JoinHandle}, @@ -309,11 +308,10 @@ where fn recv_window( blockstore: &Blockstore, - leader_schedule_cache: &LeaderScheduleCache, bank_forks: &RwLock, insert_shred_sender: &CrossbeamSender<(Vec, Vec>)>, verified_receiver: &CrossbeamReceiver>, - retransmit: &PacketSender, + retransmit_sender: &Sender>, shred_filter: F, thread_pool: &ThreadPool, stats: &mut ReceiveWindowStats, @@ -325,13 +323,9 @@ where let mut packets = verified_receiver.recv_timeout(timer)?; packets.extend(verified_receiver.try_iter().flatten()); let now = Instant::now(); - - let (root_bank, working_bank) = { - let bank_forks = bank_forks.read().unwrap(); - (bank_forks.root_bank(), bank_forks.working_bank()) - }; let last_root = blockstore.last_root(); - let handle_packet = |packet: &mut Packet| { + let working_bank = bank_forks.read().unwrap().working_bank(); + let handle_packet = |packet: &Packet| { if packet.meta.discard { inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1); return None; @@ -341,20 +335,10 @@ where // call to `new_from_serialized_shred` is safe. assert_eq!(packet.data.len(), PACKET_DATA_SIZE); let serialized_shred = packet.data.to_vec(); - let working_bank = Arc::clone(&working_bank); - let shred = match Shred::new_from_serialized_shred(serialized_shred) { - Ok(shred) if shred_filter(&shred, working_bank, last_root) => { - let leader_pubkey = - leader_schedule_cache.slot_leader_at(shred.slot(), Some(root_bank.deref())); - packet.meta.slot = shred.slot(); - packet.meta.seed = shred.seed(leader_pubkey, root_bank.deref()); - shred - } - Ok(_) | Err(_) => { - packet.meta.discard = true; - return None; - } - }; + let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?; + if !shred_filter(&shred, working_bank.clone(), last_root) { + return None; + } if packet.meta.repair { let repair_info = RepairMeta { _from_addr: packet.meta.addr(), @@ -368,28 +352,31 @@ where }; let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| { packets - .par_iter_mut() - .flat_map_iter(|packet| packet.packets.iter_mut().filter_map(handle_packet)) + .par_iter() + .flat_map_iter(|pkt| pkt.packets.iter().filter_map(handle_packet)) .unzip() }); - stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::(); + // Exclude repair packets from retransmit. + let _ = retransmit_sender.send( + shreds + .iter() + .zip(&repair_infos) + .filter(|(_, repair_info)| repair_info.is_none()) + .map(|(shred, _)| shred) + .cloned() + .collect(), + ); stats.num_repairs += repair_infos.iter().filter(|r| r.is_some()).count(); stats.num_shreds += shreds.len(); for shred in &shreds { *stats.slots.entry(shred.slot()).or_default() += 1; } + insert_shred_sender.send((shreds, repair_infos))?; + + stats.num_packets += packets.iter().map(|pkt| pkt.packets.len()).sum::(); for packet in packets.iter().flat_map(|pkt| pkt.packets.iter()) { *stats.addrs.entry(packet.meta.addr()).or_default() += 1; } - - for packets in packets.into_iter() { - if !packets.is_empty() { - // Ignore the send error, as the retransmit is optional (e.g. archivers don't retransmit) - let _ = retransmit.send(packets); - } - } - - insert_shred_sender.send((shreds, repair_infos))?; stats.elapsed += now.elapsed(); Ok(()) } @@ -429,7 +416,7 @@ impl WindowService { pub(crate) fn new( blockstore: Arc, verified_receiver: CrossbeamReceiver>, - retransmit: PacketSender, + retransmit_sender: Sender>, repair_socket: Arc, exit: Arc, repair_info: RepairInfo, @@ -476,7 +463,7 @@ impl WindowService { let t_insert = Self::start_window_insert_thread( exit.clone(), blockstore.clone(), - leader_schedule_cache.clone(), + leader_schedule_cache, insert_receiver, duplicate_sender, completed_data_sets_sender, @@ -490,9 +477,8 @@ impl WindowService { insert_sender, verified_receiver, shred_filter, - leader_schedule_cache, bank_forks, - retransmit, + retransmit_sender, ); WindowService { @@ -598,9 +584,8 @@ impl WindowService { insert_sender: CrossbeamSender<(Vec, Vec>)>, verified_receiver: CrossbeamReceiver>, shred_filter: F, - leader_schedule_cache: Arc, bank_forks: Arc>, - retransmit: PacketSender, + retransmit_sender: Sender>, ) -> JoinHandle<()> where F: 'static @@ -635,11 +620,10 @@ impl WindowService { }; if let Err(e) = recv_window( &blockstore, - &leader_schedule_cache, &bank_forks, &insert_sender, &verified_receiver, - &retransmit, + &retransmit_sender, |shred, bank, last_root| shred_filter(&id, shred, Some(bank), last_root), &thread_pool, &mut stats, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index a9a0c0555..76caff301 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1241,7 +1241,7 @@ impl ClusterInfo { /// We need to avoid having obj locked while doing a io, such as the `send_to` pub fn retransmit_to( peers: &[&ContactInfo], - packet: &Packet, + data: &[u8], s: &UdpSocket, forwarded: bool, socket_addr_space: &SocketAddrSpace, @@ -1260,8 +1260,6 @@ impl ClusterInfo { .filter(|addr| socket_addr_space.check(addr)) .collect() }; - let data = &packet.data[..packet.meta.size]; - if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(s, data, &dests) { inc_new_counter_info!("cluster_info-retransmit-packets", dests.len(), 1); inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index 265c67775..2365c1caa 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -220,7 +220,7 @@ pub fn cluster_info_retransmit() { let retransmit_peers: Vec<_> = peers.iter().collect(); ClusterInfo::retransmit_to( &retransmit_peers, - &p, + &p.data[..p.meta.size], &tn1, false, &SocketAddrSpace::Unspecified,