diff --git a/core/benches/cluster_nodes.rs b/core/benches/cluster_nodes.rs index 7f69930d0..662902d2b 100644 --- a/core/benches/cluster_nodes.rs +++ b/core/benches/cluster_nodes.rs @@ -49,7 +49,7 @@ fn get_retransmit_peers_deterministic( 0, 0, ); - let (_neighbors, _children) = cluster_nodes.get_retransmit_peers( + let (_root_distance, _neighbors, _children) = cluster_nodes.get_retransmit_peers( *slot_leader, &shred, root_bank, diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 6786b47c4..7fda96f0c 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -180,33 +180,34 @@ impl ClusterNodes { shred: &Shred, root_bank: &Bank, fanout: usize, - ) -> Vec { - let (neighbors, children) = + ) -> (/*root_distance:*/ usize, Vec) { + let (root_distance, neighbors, children) = self.get_retransmit_peers(slot_leader, shred, root_bank, fanout); if neighbors.is_empty() { let peers = children.into_iter().filter_map(Node::contact_info); - return peers.map(|peer| peer.tvu).collect(); + let addrs = peers.map(|peer| peer.tvu).collect(); + return (root_distance, addrs); } // If the node is on the critical path (i.e. the first node in each // neighborhood), it should send the packet to tvu socket of its // children and also tvu_forward socket of its neighbors. Otherwise it // should only forward to tvu_forwards socket of its children. if neighbors[0].pubkey() != self.pubkey { - return children + let addrs = children .iter() - .filter_map(|node| Some(node.contact_info()?.tvu_forwards)) - .collect(); + .filter_map(|node| Some(node.contact_info()?.tvu_forwards)); + return (root_distance, addrs.collect()); } // First neighbor is this node itself, so skip it. - neighbors[1..] + let addrs = neighbors[1..] .iter() .filter_map(|node| Some(node.contact_info()?.tvu_forwards)) .chain( children .iter() .filter_map(|node| Some(node.contact_info()?.tvu)), - ) - .collect() + ); + (root_distance, addrs.collect()) } pub fn get_retransmit_peers( @@ -216,6 +217,7 @@ impl ClusterNodes { root_bank: &Bank, fanout: usize, ) -> ( + usize, // distance from the root node Vec<&Node>, // neighbors Vec<&Node>, // children ) { @@ -237,14 +239,28 @@ impl ClusterNodes { .position(|node| node.pubkey() == self.pubkey) .unwrap(); if drop_redundant_turbine_path(shred.slot(), root_bank) { + let root_distance = if self_index == 0 { + 0 + } else if self_index <= fanout { + 1 + } else { + 2 + }; let peers = get_retransmit_peers(fanout, self_index, &nodes); - return (Vec::default(), peers.collect()); + return (root_distance, Vec::default(), peers.collect()); } + let root_distance = if self_index == 0 { + 0 + } else if self_index < fanout { + 1 + } else { + 2 + }; let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes); // Assert that the node itself is included in the set of neighbors, at // the right offset. debug_assert_eq!(neighbors[self_index % fanout].pubkey(), self.pubkey); - (neighbors, children) + (root_distance, neighbors, children) } } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index c05a434d1..39849ef46 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -54,22 +54,16 @@ const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5); #[derive(Default)] struct RetransmitSlotStats { - num_shreds: usize, - num_nodes: usize, + asof: u64, // Latest timestamp struct was updated. + outset: u64, // 1st shred retransmit timestamp. + // Number of shreds sent and received at different + // distances from the turbine broadcast root. + num_shreds_received: [usize; 3], + num_shreds_sent: [usize; 3], } -impl AddAssign for RetransmitSlotStats { - fn add_assign(&mut self, other: Self) { - *self = Self { - num_shreds: self.num_shreds + other.num_shreds, - num_nodes: self.num_nodes + other.num_nodes, - } - } -} - -#[derive(Default)] struct RetransmitStats { - since: Option, + since: Instant, num_nodes: AtomicUsize, num_addrs_failed: AtomicUsize, num_shreds: usize, @@ -80,7 +74,7 @@ struct RetransmitStats { epoch_cache_update: u64, retransmit_total: AtomicU64, compute_turbine_peers_total: AtomicU64, - slot_stats: HashMap, + slot_stats: LruCache, unknown_shred_slot_leader: AtomicUsize, } @@ -93,20 +87,13 @@ impl RetransmitStats { cluster_nodes_cache: &ClusterNodesCache, ) { const SUBMIT_CADENCE: Duration = Duration::from_secs(2); - let elapsed = self.since.as_ref().map(Instant::elapsed); - if elapsed.unwrap_or(Duration::MAX) < SUBMIT_CADENCE { + if self.since.elapsed() < SUBMIT_CADENCE { return; } let num_peers = cluster_nodes_cache .get(root_bank.slot(), root_bank, working_bank, cluster_info) .num_peers(); - let stats = std::mem::replace( - self, - Self { - since: Some(Instant::now()), - ..Self::default() - }, - ); + let stats = std::mem::replace(self, Self::new(Instant::now())); datapoint_info!("retransmit-num_nodes", ("count", num_peers, i64)); datapoint_info!( "retransmit-stage", @@ -134,14 +121,6 @@ impl RetransmitStats { i64 ), ); - for (slot, stats) in stats.slot_stats { - datapoint_info!( - "retransmit-stage-slot-stats", - ("slot", slot, i64), - ("num_shreds", stats.num_shreds, i64), - ("num_nodes", stats.num_nodes, i64), - ); - } } } @@ -251,7 +230,7 @@ fn retransmit( let retransmit_shred = |shred: &Shred, socket: &UdpSocket| { if should_skip_retransmit(shred, shreds_received) { stats.num_shreds_skipped.fetch_add(1, Ordering::Relaxed); - return 0; + return None; } let shred_slot = shred.slot(); max_slots @@ -279,13 +258,14 @@ fn retransmit( stats .unknown_shred_slot_leader .fetch_add(1, Ordering::Relaxed); - return 0; + return None; } }; let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); - let addrs: Vec<_> = cluster_nodes - .get_retransmit_addrs(slot_leader, shred, &root_bank, DATA_PLANE_FANOUT) + let (root_distance, addrs) = + cluster_nodes.get_retransmit_addrs(slot_leader, shred, &root_bank, DATA_PLANE_FANOUT); + let addrs: Vec<_> = addrs .into_iter() .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) .collect(); @@ -315,43 +295,29 @@ fn retransmit( stats .retransmit_total .fetch_add(retransmit_time.as_us(), Ordering::Relaxed); - num_nodes + Some((root_distance, num_nodes)) }; - fn merge(mut acc: HashMap, other: HashMap) -> HashMap - where - K: Eq + std::hash::Hash, - V: Default + AddAssign, - { - if acc.len() < other.len() { - return merge(other, acc); - } - for (key, value) in other { - *acc.entry(key).or_default() += value; - } - acc - } let slot_stats = thread_pool.install(|| { shreds .into_par_iter() .with_min_len(4) - .map(|shred| { + .filter_map(|shred| { let index = thread_pool.current_thread_index().unwrap(); let socket = &sockets[index % sockets.len()]; - let num_nodes = retransmit_shred(&shred, socket); - (shred.slot(), num_nodes) + Some((shred.slot(), retransmit_shred(&shred, socket)?)) }) .fold( HashMap::::new, - |mut acc, (slot, num_nodes)| { - let stats = acc.entry(slot).or_default(); - stats.num_nodes += num_nodes; - stats.num_shreds += 1; + |mut acc, (slot, (root_distance, num_nodes))| { + let now = timestamp(); + let slot_stats = acc.entry(slot).or_default(); + slot_stats.record(now, root_distance, num_nodes); acc }, ) - .reduce(HashMap::new, merge) + .reduce(HashMap::new, RetransmitSlotStats::merge) }); - stats.slot_stats = merge(std::mem::take(&mut stats.slot_stats), slot_stats); + stats.upsert_slot_stats(slot_stats); timer_start.stop(); stats.total_time += timer_start.as_us(); stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache); @@ -380,7 +346,7 @@ pub fn retransmitter( CLUSTER_NODES_CACHE_TTL, ); let mut hasher_reset_ts = Instant::now(); - let mut stats = RetransmitStats::default(); + let mut stats = RetransmitStats::new(Instant::now()); let shreds_received = Mutex::new((LruCache::new(DEFAULT_LRU_SIZE), PacketHasher::default())); let first_shreds_received = Mutex::>::default(); let num_threads = get_thread_count().min(8).max(sockets.len()); @@ -526,6 +492,126 @@ impl RetransmitStage { } } +impl AddAssign for RetransmitSlotStats { + fn add_assign(&mut self, other: Self) { + let Self { + asof, + outset, + num_shreds_received, + num_shreds_sent, + } = other; + self.asof = self.asof.max(asof); + self.outset = if self.outset == 0 { + outset + } else { + self.outset.min(outset) + }; + for k in 0..3 { + self.num_shreds_received[k] += num_shreds_received[k]; + self.num_shreds_sent[k] += num_shreds_sent[k]; + } + } +} + +impl RetransmitStats { + const SLOT_STATS_CACHE_CAPACITY: usize = 750; + + fn new(now: Instant) -> Self { + Self { + since: now, + num_nodes: AtomicUsize::default(), + num_addrs_failed: AtomicUsize::default(), + num_shreds: 0usize, + num_shreds_skipped: AtomicUsize::default(), + total_batches: 0usize, + total_time: 0u64, + epoch_fetch: 0u64, + epoch_cache_update: 0u64, + retransmit_total: AtomicU64::default(), + compute_turbine_peers_total: AtomicU64::default(), + // Cache capacity is manually enforced. + slot_stats: LruCache::::unbounded(), + unknown_shred_slot_leader: AtomicUsize::default(), + } + } + + fn upsert_slot_stats(&mut self, feed: I) + where + I: IntoIterator, + { + for (slot, slot_stats) in feed { + match self.slot_stats.get_mut(&slot) { + None => { + self.slot_stats.put(slot, slot_stats); + } + Some(entry) => { + *entry += slot_stats; + } + } + } + while self.slot_stats.len() > Self::SLOT_STATS_CACHE_CAPACITY { + // Pop and submit metrics for the slot which was updated least + // recently. At this point the node most likely will not receive + // and retransmit any more shreds for this slot. + match self.slot_stats.pop_lru() { + Some((slot, stats)) => stats.submit(slot), + None => break, + } + } + } +} + +impl RetransmitSlotStats { + fn record(&mut self, now: u64, root_distance: usize, num_nodes: usize) { + self.outset = if self.outset == 0 { + now + } else { + self.outset.min(now) + }; + self.asof = self.asof.max(now); + self.num_shreds_received[root_distance] += 1; + self.num_shreds_sent[root_distance] += num_nodes; + } + + fn merge(mut acc: HashMap, other: HashMap) -> HashMap { + if acc.len() < other.len() { + return Self::merge(other, acc); + } + for (key, value) in other { + *acc.entry(key).or_default() += value; + } + acc + } + + fn submit(&self, slot: Slot) { + let num_shreds: usize = self.num_shreds_received.iter().sum(); + let num_nodes: usize = self.num_shreds_sent.iter().sum(); + let elapsed_millis = self.asof.saturating_sub(self.outset); + datapoint_info!( + "retransmit-stage-slot-stats", + ("slot", slot, i64), + ("outset_timestamp", self.outset, i64), + ("elapsed_millis", elapsed_millis, i64), + ("num_shreds", num_shreds, i64), + ("num_nodes", num_nodes, i64), + ("num_shreds_received_root", self.num_shreds_received[0], i64), + ( + "num_shreds_received_1st_layer", + self.num_shreds_received[1], + i64 + ), + ( + "num_shreds_received_2nd_layer", + self.num_shreds_received[2], + i64 + ), + ("num_shreds_sent_root", self.num_shreds_sent[0], i64), + ("num_shreds_sent_1st_layer", self.num_shreds_sent[1], i64), + ("num_shreds_sent_2nd_layer", self.num_shreds_sent[2], i64), + ); + } +} + #[cfg(test)] mod tests { use {super::*, solana_ledger::shred::ShredFlags};