From 1d50832389d0b61646346b3a22a80490c5a3399d Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Mon, 18 Apr 2022 23:14:59 +0000 Subject: [PATCH] replaces counters with datapoints in gossip metrics (#24451) --- core/src/retransmit_stage.rs | 7 +- gossip/src/cluster_info.rs | 108 +++++++++++------------------ gossip/src/cluster_info_metrics.rs | 104 ++++++++++++++++++++++++++- gossip/src/crds_gossip.rs | 3 + gossip/src/crds_gossip_pull.rs | 24 ++++--- gossip/src/lib.rs | 2 +- gossip/tests/crds_gossip.rs | 2 + gossip/tests/gossip.rs | 42 ++++++++++- 8 files changed, 209 insertions(+), 83 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index d6f6906c95..d837a64553 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -71,6 +71,7 @@ impl AddAssign for RetransmitSlotStats { struct RetransmitStats { since: Option, num_nodes: AtomicUsize, + num_addrs_failed: AtomicUsize, num_shreds: usize, num_shreds_skipped: AtomicUsize, total_batches: usize, @@ -114,6 +115,7 @@ impl RetransmitStats { ("epoch_cache_update", stats.epoch_cache_update, i64), ("total_batches", stats.total_batches, i64), ("num_nodes", stats.num_nodes.into_inner(), i64), + ("num_addrs_failed", stats.num_addrs_failed.into_inner(), i64), ("num_shreds", stats.num_shreds, i64), ( "num_shreds_skipped", @@ -296,8 +298,9 @@ fn retransmit( let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) { Ok(()) => addrs.len(), Err(SendPktsError::IoError(ioerr, num_failed)) => { - inc_new_counter_info!("cluster_info-retransmit-packets", addrs.len(), 1); - inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); + stats + .num_addrs_failed + .fetch_add(num_failed, Ordering::Relaxed); error!( "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", ioerr, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 541d71f446..426829e89d 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -47,7 +47,6 @@ use { serde::ser::Serialize, solana_ledger::shred::Shred, solana_measure::measure::Measure, - solana_metrics::{inc_new_counter_debug, inc_new_counter_error}, solana_net_utils::{ bind_common, bind_common_in_range, bind_in_range, bind_two_consecutive_in_range, find_available_port_in_range, multi_bind_in_range, PortRange, @@ -74,7 +73,6 @@ use { }, solana_streamer::{ packet, - sendmmsg::{multi_target_send, SendPktsError}, socket::SocketAddrSpace, streamer::{PacketBatchReceiver, PacketBatchSender}, }, @@ -280,13 +278,13 @@ pub(crate) enum Protocol { } impl Protocol { - fn par_verify(self) -> Option { + fn par_verify(self, stats: &GossipStats) -> Option { match self { Protocol::PullRequest(_, ref caller) => { if caller.verify() { Some(self) } else { - inc_new_counter_info!("cluster_info-gossip_pull_request_verify_fail", 1); + stats.gossip_pull_request_verify_fail.add_relaxed(1); None } } @@ -294,10 +292,9 @@ impl Protocol { let size = data.len(); let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect(); if size != data.len() { - inc_new_counter_info!( - "cluster_info-gossip_pull_response_verify_fail", - size - data.len() - ); + stats + .gossip_pull_response_verify_fail + .add_relaxed((size - data.len()) as u64); } if data.is_empty() { None @@ -309,10 +306,9 @@ impl Protocol { let size = data.len(); let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect(); if size != data.len() { - inc_new_counter_info!( - "cluster_info-gossip_push_msg_verify_fail", - size - data.len() - ); + stats + .gossip_push_msg_verify_fail + .add_relaxed((size - data.len()) as u64); } if data.is_empty() { None @@ -324,7 +320,7 @@ impl Protocol { if data.verify() { Some(self) } else { - inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1); + stats.gossip_prune_msg_verify_fail.add_relaxed(1); None } } @@ -332,7 +328,7 @@ impl Protocol { if ping.verify() { Some(self) } else { - inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", 1); + stats.gossip_ping_msg_verify_fail.add_relaxed(1); None } } @@ -340,7 +336,7 @@ impl Protocol { if pong.verify() { Some(self) } else { - inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", 1); + stats.gossip_pong_msg_verify_fail.add_relaxed(1); None } } @@ -884,7 +880,7 @@ impl ClusterInfo { if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots && crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() { - inc_new_counter_warn!("cluster_info-epoch_slots-filled", 1); + self.stats.epoch_slots_filled.add_relaxed(1); warn!( "EPOCH_SLOTS are filling up FAST {}/{}", total_slots, @@ -1114,7 +1110,7 @@ impl ClusterInfo { transaction }) .collect(); - inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); + self.stats.get_votes_count.add_relaxed(txs.len() as u64); txs } @@ -1134,7 +1130,7 @@ impl ClusterInfo { (vote.value.label(), transaction) }) .unzip(); - inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); + self.stats.get_votes_count.add_relaxed(txs.len() as u64); (labels, txs) } @@ -1317,42 +1313,6 @@ impl ClusterInfo { .collect() } - /// retransmit messages to a list of nodes - /// # Remarks - /// We need to avoid having obj locked while doing a io, such as the `send_to` - pub fn retransmit_to( - peers: &[&ContactInfo], - data: &[u8], - s: &UdpSocket, - forwarded: bool, - socket_addr_space: &SocketAddrSpace, - ) { - trace!("retransmit orders {}", peers.len()); - let dests: Vec<_> = if forwarded { - peers - .iter() - .map(|peer| peer.tvu_forwards) - .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) - .collect() - } else { - peers - .iter() - .map(|peer| peer.tvu) - .filter(|addr| socket_addr_space.check(addr)) - .collect() - }; - if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(s, data, &dests) { - inc_new_counter_info!("cluster_info-retransmit-packets", dests.len(), 1); - inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); - error!( - "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", - ioerr, - num_failed, - dests.len(), - ); - } - } - fn insert_self(&self) { let value = CrdsValue::new_signed( CrdsData::ContactInfo(self.my_contact_info()), @@ -1659,7 +1619,7 @@ impl ClusterInfo { stakes: &HashMap, ) { let self_pubkey = self.id(); - let epoch_duration = get_epoch_duration(bank_forks); + let epoch_duration = get_epoch_duration(bank_forks, &self.stats); let timeouts = self .gossip .make_timeouts(self_pubkey, stakes, epoch_duration); @@ -1668,7 +1628,7 @@ impl ClusterInfo { self.gossip .purge(&self_pubkey, thread_pool, timestamp(), &timeouts) }; - inc_new_counter_info!("cluster_info-purge-count", num_purged); + self.stats.purge_count.add_relaxed(num_purged as u64); } // Trims the CRDS table by dropping all values associated with the pubkeys @@ -1835,10 +1795,14 @@ impl ClusterInfo { } } if prune_message_timeout != 0 { - inc_new_counter_debug!("cluster_info-prune_message_timeout", prune_message_timeout); + self.stats + .prune_message_timeout + .add_relaxed(prune_message_timeout); } if bad_prune_destination != 0 { - inc_new_counter_debug!("cluster_info-bad_prune_destination", bad_prune_destination); + self.stats + .bad_prune_destination + .add_relaxed(bad_prune_destination); } } @@ -1864,7 +1828,7 @@ impl ClusterInfo { None => false, Some(caller) if caller.id == self_pubkey => { warn!("PullRequest ignored, I'm talking to myself"); - inc_new_counter_debug!("cluster_info-window-request-loopback", 1); + self.stats.window_request_loopback.add_relaxed(1); false } Some(_) => true, @@ -1986,6 +1950,7 @@ impl ClusterInfo { &caller_and_filters, output_size_limit, now, + &self.stats, ) }; if self.require_stake_for_gossip(stakes) { @@ -2033,7 +1998,7 @@ impl ClusterInfo { packet_batch.packets.push(packet); sent += 1; } else { - inc_new_counter_info!("gossip_pull_request-no_budget", 1); + self.stats.gossip_pull_request_no_budget.add_relaxed(1); break; } } @@ -2041,8 +2006,12 @@ impl ClusterInfo { } time.stop(); let dropped_responses = responses.len() - sent; - inc_new_counter_info!("gossip_pull_request-sent_requests", sent); - inc_new_counter_info!("gossip_pull_request-dropped_requests", dropped_responses); + self.stats + .gossip_pull_request_sent_requests + .add_relaxed(sent as u64); + self.stats + .gossip_pull_request_dropped_requests + .add_relaxed(dropped_responses as u64); debug!( "handle_pull_requests: {} sent: {} total: {} total_bytes: {}", time, @@ -2321,7 +2290,9 @@ impl ClusterInfo { .push_response_count .add_relaxed(packet_batch.packets.len() as u64); let new_push_requests = self.new_push_requests(stakes); - inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len()); + self.stats + .push_message_pushes + .add_relaxed(new_push_requests.len() as u64); for (address, request) in new_push_requests { if ContactInfo::is_valid_address(&address, &self.socket_addr_space) { match Packet::from_data(Some(&address), &request) { @@ -2498,7 +2469,7 @@ impl ClusterInfo { let data = &packet.data[..packet.meta.size]; let protocol: Protocol = limited_deserialize(data).ok()?; protocol.sanitize().ok()?; - let protocol = protocol.par_verify()?; + let protocol = protocol.par_verify(&self.stats)?; Some((packet.meta.addr(), protocol)) }; let packets: Vec<_> = { @@ -2553,7 +2524,7 @@ impl ClusterInfo { response_sender, &stakes, feature_set.as_deref(), - get_epoch_duration(bank_forks), + get_epoch_duration(bank_forks, &self.stats), should_check_duplicate_instance, )?; if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL { @@ -2688,10 +2659,10 @@ impl ClusterInfo { // Returns root bank's epoch duration. Falls back on // DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // if there are no working banks. -fn get_epoch_duration(bank_forks: Option<&RwLock>) -> Duration { +fn get_epoch_duration(bank_forks: Option<&RwLock>, stats: &GossipStats) -> Duration { let num_slots = match bank_forks { None => { - inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); + stats.get_epoch_duration_no_working_bank.add_relaxed(1); DEFAULT_SLOTS_PER_EPOCH } Some(bank_forks) => { @@ -4554,8 +4525,9 @@ mod tests { #[test] fn test_get_epoch_millis_no_bank() { + let epoch_duration = get_epoch_duration(/*bank_forks:*/ None, &GossipStats::default()); assert_eq!( - get_epoch_duration(/*bank_forks=*/ None).as_millis() as u64, + epoch_duration.as_millis() as u64, DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // 48 hours ); } diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 723bd03c07..27bd8b98b2 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -88,16 +88,31 @@ impl<'a, T> Drop for TimedGuard<'a, T> { } #[derive(Default)] -pub(crate) struct GossipStats { +pub struct GossipStats { pub(crate) all_tvu_peers: Counter, + pub(crate) bad_prune_destination: Counter, pub(crate) entrypoint2: Counter, pub(crate) entrypoint: Counter, + pub(crate) epoch_slots_filled: Counter, pub(crate) epoch_slots_lookup: Counter, + pub(crate) filter_crds_values_dropped_requests: Counter, + pub(crate) filter_crds_values_dropped_values: Counter, pub(crate) filter_pull_response: Counter, pub(crate) generate_pull_responses: Counter, pub(crate) get_accounts_hash: Counter, + pub(crate) get_epoch_duration_no_working_bank: Counter, pub(crate) get_votes: Counter, + pub(crate) get_votes_count: Counter, pub(crate) gossip_packets_dropped_count: Counter, + pub(crate) gossip_ping_msg_verify_fail: Counter, + pub(crate) gossip_pong_msg_verify_fail: Counter, + pub(crate) gossip_prune_msg_verify_fail: Counter, + pub(crate) gossip_pull_request_dropped_requests: Counter, + pub(crate) gossip_pull_request_no_budget: Counter, + pub(crate) gossip_pull_request_sent_requests: Counter, + pub(crate) gossip_pull_request_verify_fail: Counter, + pub(crate) gossip_pull_response_verify_fail: Counter, + pub(crate) gossip_push_msg_verify_fail: Counter, pub(crate) handle_batch_ping_messages_time: Counter, pub(crate) handle_batch_pong_messages_time: Counter, pub(crate) handle_batch_prune_messages_time: Counter, @@ -136,12 +151,15 @@ pub(crate) struct GossipStats { pub(crate) process_push_success: Counter, pub(crate) prune_message_count: Counter, pub(crate) prune_message_len: Counter, + pub(crate) prune_message_timeout: Counter, pub(crate) prune_received_cache: Counter, pub(crate) pull_from_entrypoint_count: Counter, pub(crate) pull_request_ping_pong_check_failed_count: Counter, pub(crate) pull_requests_count: Counter, pub(crate) purge: Counter, + pub(crate) purge_count: Counter, pub(crate) push_message_count: Counter, + pub(crate) push_message_pushes: Counter, pub(crate) push_message_value_count: Counter, pub(crate) push_response_count: Counter, pub(crate) push_vote_read: Counter, @@ -155,6 +173,7 @@ pub(crate) struct GossipStats { pub(crate) trim_crds_table_purged_values_count: Counter, pub(crate) tvu_peers: Counter, pub(crate) verify_gossip_packets_time: Counter, + pub(crate) window_request_loopback: Counter, } pub(crate) fn submit_gossip_stats( @@ -180,6 +199,7 @@ pub(crate) fn submit_gossip_stats( ("entrypoint2", stats.entrypoint2.clear(), i64), ("push_vote_read", stats.push_vote_read.clear(), i64), ("get_votes", stats.get_votes.clear(), i64), + ("get_votes_count", stats.get_votes_count.clear(), i64), ("get_accounts_hash", stats.get_accounts_hash.clear(), i64), ("all_tvu_peers", stats.all_tvu_peers.clear(), i64), ("tvu_peers", stats.tvu_peers.clear(), i64), @@ -211,6 +231,7 @@ pub(crate) fn submit_gossip_stats( i64 ), ("purge", stats.purge.clear(), i64), + ("purge_count", stats.purge_count.clear(), i64), ( "process_gossip_packets_time", stats.process_gossip_packets_time.clear(), @@ -257,6 +278,16 @@ pub(crate) fn submit_gossip_stats( i64 ), ("filter_pull_resp", stats.filter_pull_response.clear(), i64), + ( + "filter_crds_values_dropped_requests", + stats.filter_crds_values_dropped_requests.clear(), + i64 + ), + ( + "filter_crds_values_dropped_values", + stats.filter_crds_values_dropped_values.clear(), + i64 + ), ( "process_pull_resp_count", stats.process_pull_response_count.clear(), @@ -316,6 +347,16 @@ pub(crate) fn submit_gossip_stats( i64 ), ("process_prune", stats.process_prune.clear(), i64), + ( + "prune_message_timeout", + stats.prune_message_timeout.clear(), + i64 + ), + ( + "bad_prune_destination", + stats.bad_prune_destination.clear(), + i64 + ), ( "process_push_message", stats.process_push_message.clear(), @@ -329,6 +370,21 @@ pub(crate) fn submit_gossip_stats( ("epoch_slots_lookup", stats.epoch_slots_lookup.clear(), i64), ("new_pull_requests", stats.new_pull_requests.clear(), i64), ("mark_pull_request", stats.mark_pull_request.clear(), i64), + ( + "gossip_pull_request_no_budget", + stats.gossip_pull_request_no_budget.clear(), + i64 + ), + ( + "gossip_pull_request_sent_requests", + stats.gossip_pull_request_sent_requests.clear(), + i64 + ), + ( + "gossip_pull_request_dropped_requests", + stats.gossip_pull_request_dropped_requests.clear(), + i64 + ), ); datapoint_info!( "cluster_info_stats4", @@ -348,6 +404,11 @@ pub(crate) fn submit_gossip_stats( i64 ), ("push_message_count", stats.push_message_count.clear(), i64), + ( + "push_message_pushes", + stats.push_message_pushes.clear(), + i64 + ), ( "push_message_value_count", stats.push_message_value_count.clear(), @@ -369,6 +430,17 @@ pub(crate) fn submit_gossip_stats( i64 ), ("prune_message_len", stats.prune_message_len.clear(), i64), + ("epoch_slots_filled", stats.epoch_slots_filled.clear(), i64), + ( + "window_request_loopback", + stats.window_request_loopback.clear(), + i64 + ), + ( + "get_epoch_duration_no_working_bank", + stats.get_epoch_duration_no_working_bank.clear(), + i64 + ), ); datapoint_info!( "cluster_info_stats5", @@ -448,6 +520,36 @@ pub(crate) fn submit_gossip_stats( stats.trim_crds_table_purged_values_count.clear(), i64 ), + ( + "gossip_pull_request_verify_fail", + stats.gossip_pull_request_verify_fail.clear(), + i64 + ), + ( + "gossip_pull_response_verify_fail", + stats.gossip_pull_response_verify_fail.clear(), + i64 + ), + ( + "gossip_push_msg_verify_fail", + stats.gossip_push_msg_verify_fail.clear(), + i64 + ), + ( + "gossip_prune_msg_verify_fail", + stats.gossip_prune_msg_verify_fail.clear(), + i64 + ), + ( + "gossip_ping_msg_verify_fail", + stats.gossip_ping_msg_verify_fail.clear(), + i64 + ), + ( + "gossip_pong_msg_verify_fail", + stats.gossip_pong_msg_verify_fail.clear(), + i64 + ), ); datapoint_info!( "cluster_info_crds_stats", diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index 199c881c7f..0820ab75a4 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -7,6 +7,7 @@ use { crate::{ cluster_info::Ping, + cluster_info_metrics::GossipStats, contact_info::ContactInfo, crds::{Crds, GossipRoute}, crds_gossip_error::CrdsGossipError, @@ -254,6 +255,7 @@ impl CrdsGossip { filters: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, + stats: &GossipStats, ) -> Vec> { CrdsGossipPull::generate_pull_responses( thread_pool, @@ -261,6 +263,7 @@ impl CrdsGossip { filters, output_size_limit, now, + stats, ) } diff --git a/gossip/src/crds_gossip_pull.rs b/gossip/src/crds_gossip_pull.rs index de38c75589..0abc374e04 100644 --- a/gossip/src/crds_gossip_pull.rs +++ b/gossip/src/crds_gossip_pull.rs @@ -14,6 +14,7 @@ use { crate::{ cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, + cluster_info_metrics::GossipStats, contact_info::ContactInfo, crds::{Crds, GossipRoute, VersionedCrdsValue}, crds_gossip::{get_stake, get_weight}, @@ -360,8 +361,9 @@ impl CrdsGossipPull { requests: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, + stats: &GossipStats, ) -> Vec> { - Self::filter_crds_values(thread_pool, crds, requests, output_size_limit, now) + Self::filter_crds_values(thread_pool, crds, requests, output_size_limit, now, stats) } // Checks if responses should be inserted and @@ -513,6 +515,7 @@ impl CrdsGossipPull { filters: &[(CrdsValue, CrdsFilter)], output_size_limit: usize, // Limit number of crds values returned. now: u64, + stats: &GossipStats, ) -> Vec> { let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4); @@ -559,14 +562,12 @@ impl CrdsGossipPull { .map(|(caller, filter)| apply_filter(caller, filter)) .collect() }); - inc_new_counter_info!( - "gossip_filter_crds_values-dropped_requests", - dropped_requests.into_inner() - ); - inc_new_counter_info!( - "gossip_filter_crds_values-dropped_values", - total_skipped.into_inner() - ); + stats + .filter_crds_values_dropped_requests + .add_relaxed(dropped_requests.into_inner() as u64); + stats + .filter_crds_values_dropped_values + .add_relaxed(total_skipped.into_inner() as u64); ret } @@ -1257,6 +1258,7 @@ pub(crate) mod tests { &filters, usize::MAX, // output_size_limit 0, // now + &GossipStats::default(), ); assert_eq!(rsp[0].len(), 0); @@ -1282,6 +1284,7 @@ pub(crate) mod tests { &filters, usize::MAX, // output_size_limit CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, // now + &GossipStats::default(), ); assert_eq!(rsp[0].len(), 0); assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS); @@ -1301,6 +1304,7 @@ pub(crate) mod tests { &filters, usize::MAX, // output_size_limit CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, + &GossipStats::default(), ); assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS); // There should be only one non-empty response in the 2nd half. @@ -1357,6 +1361,7 @@ pub(crate) mod tests { &filters, usize::MAX, // output_size_limit 0, // now + &GossipStats::default(), ); let callers = filters.into_iter().map(|(caller, _)| caller); CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1); @@ -1442,6 +1447,7 @@ pub(crate) mod tests { &filters, usize::MAX, // output_size_limit 0, // now + &GossipStats::default(), ); CrdsGossipPull::process_pull_requests( &dest_crds, diff --git a/gossip/src/lib.rs b/gossip/src/lib.rs index 142ebf40cc..7b37240f89 100644 --- a/gossip/src/lib.rs +++ b/gossip/src/lib.rs @@ -2,7 +2,7 @@ #![allow(clippy::integer_arithmetic)] pub mod cluster_info; -mod cluster_info_metrics; +pub mod cluster_info_metrics; #[macro_use] pub mod contact_info; pub mod crds; diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index fb0f6b2f34..9fb2770b29 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -6,6 +6,7 @@ use { serial_test::serial, solana_gossip::{ cluster_info, + cluster_info_metrics::GossipStats, contact_info::ContactInfo, crds::GossipRoute, crds_gossip::*, @@ -541,6 +542,7 @@ fn network_run_pull( &filters, usize::MAX, // output_size_limit now, + &GossipStats::default(), ) .into_iter() .flatten() diff --git a/gossip/tests/gossip.rs b/gossip/tests/gossip.rs index 5fe93e43bf..00044f1549 100644 --- a/gossip/tests/gossip.rs +++ b/gossip/tests/gossip.rs @@ -6,6 +6,7 @@ use { rayon::iter::*, solana_gossip::{ cluster_info::{ClusterInfo, Node}, + contact_info::ContactInfo, crds::Cursor, gossip_service::GossipService, }, @@ -18,7 +19,10 @@ use { timing::timestamp, transaction::Transaction, }, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{ + sendmmsg::{multi_target_send, SendPktsError}, + socket::SocketAddrSpace, + }, solana_vote_program::{vote_instruction, vote_state::Vote}, std::{ net::UdpSocket, @@ -113,6 +117,40 @@ where } assert!(done); } + +/// retransmit messages to a list of nodes +fn retransmit_to( + peers: &[&ContactInfo], + data: &[u8], + socket: &UdpSocket, + forwarded: bool, + socket_addr_space: &SocketAddrSpace, +) { + trace!("retransmit orders {}", peers.len()); + let dests: Vec<_> = if forwarded { + peers + .iter() + .map(|peer| peer.tvu_forwards) + .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) + .collect() + } else { + peers + .iter() + .map(|peer| peer.tvu) + .filter(|addr| socket_addr_space.check(addr)) + .collect() + }; + if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(socket, data, &dests) + { + error!( + "retransmit_to multi_target_send error: {:?}, {}/{} packets failed", + ioerr, + num_failed, + dests.len(), + ); + } +} + /// ring a -> b -> c -> d -> e -> a #[test] fn gossip_ring() { @@ -220,7 +258,7 @@ pub fn cluster_info_retransmit() { p.meta.size = 10; let peers = c1.tvu_peers(); let retransmit_peers: Vec<_> = peers.iter().collect(); - ClusterInfo::retransmit_to( + retransmit_to( &retransmit_peers, &p.data[..p.meta.size], &tn1,