replaces counters with datapoints in gossip metrics (#24451)

This commit is contained in:
behzad nouri 2022-04-18 23:14:59 +00:00 committed by GitHub
parent 6ff7de7cab
commit 1d50832389
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 209 additions and 83 deletions

View File

@ -71,6 +71,7 @@ impl AddAssign for RetransmitSlotStats {
struct RetransmitStats { struct RetransmitStats {
since: Option<Instant>, since: Option<Instant>,
num_nodes: AtomicUsize, num_nodes: AtomicUsize,
num_addrs_failed: AtomicUsize,
num_shreds: usize, num_shreds: usize,
num_shreds_skipped: AtomicUsize, num_shreds_skipped: AtomicUsize,
total_batches: usize, total_batches: usize,
@ -114,6 +115,7 @@ impl RetransmitStats {
("epoch_cache_update", stats.epoch_cache_update, i64), ("epoch_cache_update", stats.epoch_cache_update, i64),
("total_batches", stats.total_batches, i64), ("total_batches", stats.total_batches, i64),
("num_nodes", stats.num_nodes.into_inner(), i64), ("num_nodes", stats.num_nodes.into_inner(), i64),
("num_addrs_failed", stats.num_addrs_failed.into_inner(), i64),
("num_shreds", stats.num_shreds, i64), ("num_shreds", stats.num_shreds, i64),
( (
"num_shreds_skipped", "num_shreds_skipped",
@ -296,8 +298,9 @@ fn retransmit(
let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) { let num_nodes = match multi_target_send(socket, &shred.payload, &addrs) {
Ok(()) => addrs.len(), Ok(()) => addrs.len(),
Err(SendPktsError::IoError(ioerr, num_failed)) => { Err(SendPktsError::IoError(ioerr, num_failed)) => {
inc_new_counter_info!("cluster_info-retransmit-packets", addrs.len(), 1); stats
inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1); .num_addrs_failed
.fetch_add(num_failed, Ordering::Relaxed);
error!( error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed", "retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr, ioerr,

View File

@ -47,7 +47,6 @@ use {
serde::ser::Serialize, serde::ser::Serialize,
solana_ledger::shred::Shred, solana_ledger::shred::Shred,
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_metrics::{inc_new_counter_debug, inc_new_counter_error},
solana_net_utils::{ solana_net_utils::{
bind_common, bind_common_in_range, bind_in_range, bind_two_consecutive_in_range, bind_common, bind_common_in_range, bind_in_range, bind_two_consecutive_in_range,
find_available_port_in_range, multi_bind_in_range, PortRange, find_available_port_in_range, multi_bind_in_range, PortRange,
@ -74,7 +73,6 @@ use {
}, },
solana_streamer::{ solana_streamer::{
packet, packet,
sendmmsg::{multi_target_send, SendPktsError},
socket::SocketAddrSpace, socket::SocketAddrSpace,
streamer::{PacketBatchReceiver, PacketBatchSender}, streamer::{PacketBatchReceiver, PacketBatchSender},
}, },
@ -280,13 +278,13 @@ pub(crate) enum Protocol {
} }
impl Protocol { impl Protocol {
fn par_verify(self) -> Option<Self> { fn par_verify(self, stats: &GossipStats) -> Option<Self> {
match self { match self {
Protocol::PullRequest(_, ref caller) => { Protocol::PullRequest(_, ref caller) => {
if caller.verify() { if caller.verify() {
Some(self) Some(self)
} else { } else {
inc_new_counter_info!("cluster_info-gossip_pull_request_verify_fail", 1); stats.gossip_pull_request_verify_fail.add_relaxed(1);
None None
} }
} }
@ -294,10 +292,9 @@ impl Protocol {
let size = data.len(); let size = data.len();
let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect(); let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect();
if size != data.len() { if size != data.len() {
inc_new_counter_info!( stats
"cluster_info-gossip_pull_response_verify_fail", .gossip_pull_response_verify_fail
size - data.len() .add_relaxed((size - data.len()) as u64);
);
} }
if data.is_empty() { if data.is_empty() {
None None
@ -309,10 +306,9 @@ impl Protocol {
let size = data.len(); let size = data.len();
let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect(); let data: Vec<_> = data.into_par_iter().filter(Signable::verify).collect();
if size != data.len() { if size != data.len() {
inc_new_counter_info!( stats
"cluster_info-gossip_push_msg_verify_fail", .gossip_push_msg_verify_fail
size - data.len() .add_relaxed((size - data.len()) as u64);
);
} }
if data.is_empty() { if data.is_empty() {
None None
@ -324,7 +320,7 @@ impl Protocol {
if data.verify() { if data.verify() {
Some(self) Some(self)
} else { } else {
inc_new_counter_debug!("cluster_info-gossip_prune_msg_verify_fail", 1); stats.gossip_prune_msg_verify_fail.add_relaxed(1);
None None
} }
} }
@ -332,7 +328,7 @@ impl Protocol {
if ping.verify() { if ping.verify() {
Some(self) Some(self)
} else { } else {
inc_new_counter_info!("cluster_info-gossip_ping_msg_verify_fail", 1); stats.gossip_ping_msg_verify_fail.add_relaxed(1);
None None
} }
} }
@ -340,7 +336,7 @@ impl Protocol {
if pong.verify() { if pong.verify() {
Some(self) Some(self)
} else { } else {
inc_new_counter_info!("cluster_info-gossip_pong_msg_verify_fail", 1); stats.gossip_pong_msg_verify_fail.add_relaxed(1);
None None
} }
} }
@ -884,7 +880,7 @@ impl ClusterInfo {
if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots if DEFAULT_SLOTS_PER_EPOCH as isize > total_slots
&& crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len() && crds_value::MAX_EPOCH_SLOTS as usize <= current_slots.len()
{ {
inc_new_counter_warn!("cluster_info-epoch_slots-filled", 1); self.stats.epoch_slots_filled.add_relaxed(1);
warn!( warn!(
"EPOCH_SLOTS are filling up FAST {}/{}", "EPOCH_SLOTS are filling up FAST {}/{}",
total_slots, total_slots,
@ -1114,7 +1110,7 @@ impl ClusterInfo {
transaction transaction
}) })
.collect(); .collect();
inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); self.stats.get_votes_count.add_relaxed(txs.len() as u64);
txs txs
} }
@ -1134,7 +1130,7 @@ impl ClusterInfo {
(vote.value.label(), transaction) (vote.value.label(), transaction)
}) })
.unzip(); .unzip();
inc_new_counter_info!("cluster_info-get_votes-count", txs.len()); self.stats.get_votes_count.add_relaxed(txs.len() as u64);
(labels, txs) (labels, txs)
} }
@ -1317,42 +1313,6 @@ impl ClusterInfo {
.collect() .collect()
} }
/// retransmit messages to a list of nodes
/// # Remarks
/// We need to avoid having obj locked while doing a io, such as the `send_to`
pub fn retransmit_to(
peers: &[&ContactInfo],
data: &[u8],
s: &UdpSocket,
forwarded: bool,
socket_addr_space: &SocketAddrSpace,
) {
trace!("retransmit orders {}", peers.len());
let dests: Vec<_> = if forwarded {
peers
.iter()
.map(|peer| peer.tvu_forwards)
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect()
} else {
peers
.iter()
.map(|peer| peer.tvu)
.filter(|addr| socket_addr_space.check(addr))
.collect()
};
if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(s, data, &dests) {
inc_new_counter_info!("cluster_info-retransmit-packets", dests.len(), 1);
inc_new_counter_error!("cluster_info-retransmit-error", num_failed, 1);
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
dests.len(),
);
}
}
fn insert_self(&self) { fn insert_self(&self) {
let value = CrdsValue::new_signed( let value = CrdsValue::new_signed(
CrdsData::ContactInfo(self.my_contact_info()), CrdsData::ContactInfo(self.my_contact_info()),
@ -1659,7 +1619,7 @@ impl ClusterInfo {
stakes: &HashMap<Pubkey, u64>, stakes: &HashMap<Pubkey, u64>,
) { ) {
let self_pubkey = self.id(); let self_pubkey = self.id();
let epoch_duration = get_epoch_duration(bank_forks); let epoch_duration = get_epoch_duration(bank_forks, &self.stats);
let timeouts = self let timeouts = self
.gossip .gossip
.make_timeouts(self_pubkey, stakes, epoch_duration); .make_timeouts(self_pubkey, stakes, epoch_duration);
@ -1668,7 +1628,7 @@ impl ClusterInfo {
self.gossip self.gossip
.purge(&self_pubkey, thread_pool, timestamp(), &timeouts) .purge(&self_pubkey, thread_pool, timestamp(), &timeouts)
}; };
inc_new_counter_info!("cluster_info-purge-count", num_purged); self.stats.purge_count.add_relaxed(num_purged as u64);
} }
// Trims the CRDS table by dropping all values associated with the pubkeys // Trims the CRDS table by dropping all values associated with the pubkeys
@ -1835,10 +1795,14 @@ impl ClusterInfo {
} }
} }
if prune_message_timeout != 0 { if prune_message_timeout != 0 {
inc_new_counter_debug!("cluster_info-prune_message_timeout", prune_message_timeout); self.stats
.prune_message_timeout
.add_relaxed(prune_message_timeout);
} }
if bad_prune_destination != 0 { if bad_prune_destination != 0 {
inc_new_counter_debug!("cluster_info-bad_prune_destination", bad_prune_destination); self.stats
.bad_prune_destination
.add_relaxed(bad_prune_destination);
} }
} }
@ -1864,7 +1828,7 @@ impl ClusterInfo {
None => false, None => false,
Some(caller) if caller.id == self_pubkey => { Some(caller) if caller.id == self_pubkey => {
warn!("PullRequest ignored, I'm talking to myself"); warn!("PullRequest ignored, I'm talking to myself");
inc_new_counter_debug!("cluster_info-window-request-loopback", 1); self.stats.window_request_loopback.add_relaxed(1);
false false
} }
Some(_) => true, Some(_) => true,
@ -1986,6 +1950,7 @@ impl ClusterInfo {
&caller_and_filters, &caller_and_filters,
output_size_limit, output_size_limit,
now, now,
&self.stats,
) )
}; };
if self.require_stake_for_gossip(stakes) { if self.require_stake_for_gossip(stakes) {
@ -2033,7 +1998,7 @@ impl ClusterInfo {
packet_batch.packets.push(packet); packet_batch.packets.push(packet);
sent += 1; sent += 1;
} else { } else {
inc_new_counter_info!("gossip_pull_request-no_budget", 1); self.stats.gossip_pull_request_no_budget.add_relaxed(1);
break; break;
} }
} }
@ -2041,8 +2006,12 @@ impl ClusterInfo {
} }
time.stop(); time.stop();
let dropped_responses = responses.len() - sent; let dropped_responses = responses.len() - sent;
inc_new_counter_info!("gossip_pull_request-sent_requests", sent); self.stats
inc_new_counter_info!("gossip_pull_request-dropped_requests", dropped_responses); .gossip_pull_request_sent_requests
.add_relaxed(sent as u64);
self.stats
.gossip_pull_request_dropped_requests
.add_relaxed(dropped_responses as u64);
debug!( debug!(
"handle_pull_requests: {} sent: {} total: {} total_bytes: {}", "handle_pull_requests: {} sent: {} total: {} total_bytes: {}",
time, time,
@ -2321,7 +2290,9 @@ impl ClusterInfo {
.push_response_count .push_response_count
.add_relaxed(packet_batch.packets.len() as u64); .add_relaxed(packet_batch.packets.len() as u64);
let new_push_requests = self.new_push_requests(stakes); let new_push_requests = self.new_push_requests(stakes);
inc_new_counter_debug!("cluster_info-push_message-pushes", new_push_requests.len()); self.stats
.push_message_pushes
.add_relaxed(new_push_requests.len() as u64);
for (address, request) in new_push_requests { for (address, request) in new_push_requests {
if ContactInfo::is_valid_address(&address, &self.socket_addr_space) { if ContactInfo::is_valid_address(&address, &self.socket_addr_space) {
match Packet::from_data(Some(&address), &request) { match Packet::from_data(Some(&address), &request) {
@ -2498,7 +2469,7 @@ impl ClusterInfo {
let data = &packet.data[..packet.meta.size]; let data = &packet.data[..packet.meta.size];
let protocol: Protocol = limited_deserialize(data).ok()?; let protocol: Protocol = limited_deserialize(data).ok()?;
protocol.sanitize().ok()?; protocol.sanitize().ok()?;
let protocol = protocol.par_verify()?; let protocol = protocol.par_verify(&self.stats)?;
Some((packet.meta.addr(), protocol)) Some((packet.meta.addr(), protocol))
}; };
let packets: Vec<_> = { let packets: Vec<_> = {
@ -2553,7 +2524,7 @@ impl ClusterInfo {
response_sender, response_sender,
&stakes, &stakes,
feature_set.as_deref(), feature_set.as_deref(),
get_epoch_duration(bank_forks), get_epoch_duration(bank_forks, &self.stats),
should_check_duplicate_instance, should_check_duplicate_instance,
)?; )?;
if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL { if last_print.elapsed() > SUBMIT_GOSSIP_STATS_INTERVAL {
@ -2688,10 +2659,10 @@ impl ClusterInfo {
// Returns root bank's epoch duration. Falls back on // Returns root bank's epoch duration. Falls back on
// DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT
// if there are no working banks. // if there are no working banks.
fn get_epoch_duration(bank_forks: Option<&RwLock<BankForks>>) -> Duration { fn get_epoch_duration(bank_forks: Option<&RwLock<BankForks>>, stats: &GossipStats) -> Duration {
let num_slots = match bank_forks { let num_slots = match bank_forks {
None => { None => {
inc_new_counter_info!("cluster_info-purge-no_working_bank", 1); stats.get_epoch_duration_no_working_bank.add_relaxed(1);
DEFAULT_SLOTS_PER_EPOCH DEFAULT_SLOTS_PER_EPOCH
} }
Some(bank_forks) => { Some(bank_forks) => {
@ -4554,8 +4525,9 @@ mod tests {
#[test] #[test]
fn test_get_epoch_millis_no_bank() { fn test_get_epoch_millis_no_bank() {
let epoch_duration = get_epoch_duration(/*bank_forks:*/ None, &GossipStats::default());
assert_eq!( assert_eq!(
get_epoch_duration(/*bank_forks=*/ None).as_millis() as u64, epoch_duration.as_millis() as u64,
DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // 48 hours DEFAULT_SLOTS_PER_EPOCH * DEFAULT_MS_PER_SLOT // 48 hours
); );
} }

View File

@ -88,16 +88,31 @@ impl<'a, T> Drop for TimedGuard<'a, T> {
} }
#[derive(Default)] #[derive(Default)]
pub(crate) struct GossipStats { pub struct GossipStats {
pub(crate) all_tvu_peers: Counter, pub(crate) all_tvu_peers: Counter,
pub(crate) bad_prune_destination: Counter,
pub(crate) entrypoint2: Counter, pub(crate) entrypoint2: Counter,
pub(crate) entrypoint: Counter, pub(crate) entrypoint: Counter,
pub(crate) epoch_slots_filled: Counter,
pub(crate) epoch_slots_lookup: Counter, pub(crate) epoch_slots_lookup: Counter,
pub(crate) filter_crds_values_dropped_requests: Counter,
pub(crate) filter_crds_values_dropped_values: Counter,
pub(crate) filter_pull_response: Counter, pub(crate) filter_pull_response: Counter,
pub(crate) generate_pull_responses: Counter, pub(crate) generate_pull_responses: Counter,
pub(crate) get_accounts_hash: Counter, pub(crate) get_accounts_hash: Counter,
pub(crate) get_epoch_duration_no_working_bank: Counter,
pub(crate) get_votes: Counter, pub(crate) get_votes: Counter,
pub(crate) get_votes_count: Counter,
pub(crate) gossip_packets_dropped_count: Counter, pub(crate) gossip_packets_dropped_count: Counter,
pub(crate) gossip_ping_msg_verify_fail: Counter,
pub(crate) gossip_pong_msg_verify_fail: Counter,
pub(crate) gossip_prune_msg_verify_fail: Counter,
pub(crate) gossip_pull_request_dropped_requests: Counter,
pub(crate) gossip_pull_request_no_budget: Counter,
pub(crate) gossip_pull_request_sent_requests: Counter,
pub(crate) gossip_pull_request_verify_fail: Counter,
pub(crate) gossip_pull_response_verify_fail: Counter,
pub(crate) gossip_push_msg_verify_fail: Counter,
pub(crate) handle_batch_ping_messages_time: Counter, pub(crate) handle_batch_ping_messages_time: Counter,
pub(crate) handle_batch_pong_messages_time: Counter, pub(crate) handle_batch_pong_messages_time: Counter,
pub(crate) handle_batch_prune_messages_time: Counter, pub(crate) handle_batch_prune_messages_time: Counter,
@ -136,12 +151,15 @@ pub(crate) struct GossipStats {
pub(crate) process_push_success: Counter, pub(crate) process_push_success: Counter,
pub(crate) prune_message_count: Counter, pub(crate) prune_message_count: Counter,
pub(crate) prune_message_len: Counter, pub(crate) prune_message_len: Counter,
pub(crate) prune_message_timeout: Counter,
pub(crate) prune_received_cache: Counter, pub(crate) prune_received_cache: Counter,
pub(crate) pull_from_entrypoint_count: Counter, pub(crate) pull_from_entrypoint_count: Counter,
pub(crate) pull_request_ping_pong_check_failed_count: Counter, pub(crate) pull_request_ping_pong_check_failed_count: Counter,
pub(crate) pull_requests_count: Counter, pub(crate) pull_requests_count: Counter,
pub(crate) purge: Counter, pub(crate) purge: Counter,
pub(crate) purge_count: Counter,
pub(crate) push_message_count: Counter, pub(crate) push_message_count: Counter,
pub(crate) push_message_pushes: Counter,
pub(crate) push_message_value_count: Counter, pub(crate) push_message_value_count: Counter,
pub(crate) push_response_count: Counter, pub(crate) push_response_count: Counter,
pub(crate) push_vote_read: Counter, pub(crate) push_vote_read: Counter,
@ -155,6 +173,7 @@ pub(crate) struct GossipStats {
pub(crate) trim_crds_table_purged_values_count: Counter, pub(crate) trim_crds_table_purged_values_count: Counter,
pub(crate) tvu_peers: Counter, pub(crate) tvu_peers: Counter,
pub(crate) verify_gossip_packets_time: Counter, pub(crate) verify_gossip_packets_time: Counter,
pub(crate) window_request_loopback: Counter,
} }
pub(crate) fn submit_gossip_stats( pub(crate) fn submit_gossip_stats(
@ -180,6 +199,7 @@ pub(crate) fn submit_gossip_stats(
("entrypoint2", stats.entrypoint2.clear(), i64), ("entrypoint2", stats.entrypoint2.clear(), i64),
("push_vote_read", stats.push_vote_read.clear(), i64), ("push_vote_read", stats.push_vote_read.clear(), i64),
("get_votes", stats.get_votes.clear(), i64), ("get_votes", stats.get_votes.clear(), i64),
("get_votes_count", stats.get_votes_count.clear(), i64),
("get_accounts_hash", stats.get_accounts_hash.clear(), i64), ("get_accounts_hash", stats.get_accounts_hash.clear(), i64),
("all_tvu_peers", stats.all_tvu_peers.clear(), i64), ("all_tvu_peers", stats.all_tvu_peers.clear(), i64),
("tvu_peers", stats.tvu_peers.clear(), i64), ("tvu_peers", stats.tvu_peers.clear(), i64),
@ -211,6 +231,7 @@ pub(crate) fn submit_gossip_stats(
i64 i64
), ),
("purge", stats.purge.clear(), i64), ("purge", stats.purge.clear(), i64),
("purge_count", stats.purge_count.clear(), i64),
( (
"process_gossip_packets_time", "process_gossip_packets_time",
stats.process_gossip_packets_time.clear(), stats.process_gossip_packets_time.clear(),
@ -257,6 +278,16 @@ pub(crate) fn submit_gossip_stats(
i64 i64
), ),
("filter_pull_resp", stats.filter_pull_response.clear(), i64), ("filter_pull_resp", stats.filter_pull_response.clear(), i64),
(
"filter_crds_values_dropped_requests",
stats.filter_crds_values_dropped_requests.clear(),
i64
),
(
"filter_crds_values_dropped_values",
stats.filter_crds_values_dropped_values.clear(),
i64
),
( (
"process_pull_resp_count", "process_pull_resp_count",
stats.process_pull_response_count.clear(), stats.process_pull_response_count.clear(),
@ -316,6 +347,16 @@ pub(crate) fn submit_gossip_stats(
i64 i64
), ),
("process_prune", stats.process_prune.clear(), i64), ("process_prune", stats.process_prune.clear(), i64),
(
"prune_message_timeout",
stats.prune_message_timeout.clear(),
i64
),
(
"bad_prune_destination",
stats.bad_prune_destination.clear(),
i64
),
( (
"process_push_message", "process_push_message",
stats.process_push_message.clear(), stats.process_push_message.clear(),
@ -329,6 +370,21 @@ pub(crate) fn submit_gossip_stats(
("epoch_slots_lookup", stats.epoch_slots_lookup.clear(), i64), ("epoch_slots_lookup", stats.epoch_slots_lookup.clear(), i64),
("new_pull_requests", stats.new_pull_requests.clear(), i64), ("new_pull_requests", stats.new_pull_requests.clear(), i64),
("mark_pull_request", stats.mark_pull_request.clear(), i64), ("mark_pull_request", stats.mark_pull_request.clear(), i64),
(
"gossip_pull_request_no_budget",
stats.gossip_pull_request_no_budget.clear(),
i64
),
(
"gossip_pull_request_sent_requests",
stats.gossip_pull_request_sent_requests.clear(),
i64
),
(
"gossip_pull_request_dropped_requests",
stats.gossip_pull_request_dropped_requests.clear(),
i64
),
); );
datapoint_info!( datapoint_info!(
"cluster_info_stats4", "cluster_info_stats4",
@ -348,6 +404,11 @@ pub(crate) fn submit_gossip_stats(
i64 i64
), ),
("push_message_count", stats.push_message_count.clear(), i64), ("push_message_count", stats.push_message_count.clear(), i64),
(
"push_message_pushes",
stats.push_message_pushes.clear(),
i64
),
( (
"push_message_value_count", "push_message_value_count",
stats.push_message_value_count.clear(), stats.push_message_value_count.clear(),
@ -369,6 +430,17 @@ pub(crate) fn submit_gossip_stats(
i64 i64
), ),
("prune_message_len", stats.prune_message_len.clear(), i64), ("prune_message_len", stats.prune_message_len.clear(), i64),
("epoch_slots_filled", stats.epoch_slots_filled.clear(), i64),
(
"window_request_loopback",
stats.window_request_loopback.clear(),
i64
),
(
"get_epoch_duration_no_working_bank",
stats.get_epoch_duration_no_working_bank.clear(),
i64
),
); );
datapoint_info!( datapoint_info!(
"cluster_info_stats5", "cluster_info_stats5",
@ -448,6 +520,36 @@ pub(crate) fn submit_gossip_stats(
stats.trim_crds_table_purged_values_count.clear(), stats.trim_crds_table_purged_values_count.clear(),
i64 i64
), ),
(
"gossip_pull_request_verify_fail",
stats.gossip_pull_request_verify_fail.clear(),
i64
),
(
"gossip_pull_response_verify_fail",
stats.gossip_pull_response_verify_fail.clear(),
i64
),
(
"gossip_push_msg_verify_fail",
stats.gossip_push_msg_verify_fail.clear(),
i64
),
(
"gossip_prune_msg_verify_fail",
stats.gossip_prune_msg_verify_fail.clear(),
i64
),
(
"gossip_ping_msg_verify_fail",
stats.gossip_ping_msg_verify_fail.clear(),
i64
),
(
"gossip_pong_msg_verify_fail",
stats.gossip_pong_msg_verify_fail.clear(),
i64
),
); );
datapoint_info!( datapoint_info!(
"cluster_info_crds_stats", "cluster_info_crds_stats",

View File

@ -7,6 +7,7 @@
use { use {
crate::{ crate::{
cluster_info::Ping, cluster_info::Ping,
cluster_info_metrics::GossipStats,
contact_info::ContactInfo, contact_info::ContactInfo,
crds::{Crds, GossipRoute}, crds::{Crds, GossipRoute},
crds_gossip_error::CrdsGossipError, crds_gossip_error::CrdsGossipError,
@ -254,6 +255,7 @@ impl CrdsGossip {
filters: &[(CrdsValue, CrdsFilter)], filters: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned. output_size_limit: usize, // Limit number of crds values returned.
now: u64, now: u64,
stats: &GossipStats,
) -> Vec<Vec<CrdsValue>> { ) -> Vec<Vec<CrdsValue>> {
CrdsGossipPull::generate_pull_responses( CrdsGossipPull::generate_pull_responses(
thread_pool, thread_pool,
@ -261,6 +263,7 @@ impl CrdsGossip {
filters, filters,
output_size_limit, output_size_limit,
now, now,
stats,
) )
} }

View File

@ -14,6 +14,7 @@
use { use {
crate::{ crate::{
cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY}, cluster_info::{Ping, CRDS_UNIQUE_PUBKEY_CAPACITY},
cluster_info_metrics::GossipStats,
contact_info::ContactInfo, contact_info::ContactInfo,
crds::{Crds, GossipRoute, VersionedCrdsValue}, crds::{Crds, GossipRoute, VersionedCrdsValue},
crds_gossip::{get_stake, get_weight}, crds_gossip::{get_stake, get_weight},
@ -360,8 +361,9 @@ impl CrdsGossipPull {
requests: &[(CrdsValue, CrdsFilter)], requests: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned. output_size_limit: usize, // Limit number of crds values returned.
now: u64, now: u64,
stats: &GossipStats,
) -> Vec<Vec<CrdsValue>> { ) -> Vec<Vec<CrdsValue>> {
Self::filter_crds_values(thread_pool, crds, requests, output_size_limit, now) Self::filter_crds_values(thread_pool, crds, requests, output_size_limit, now, stats)
} }
// Checks if responses should be inserted and // Checks if responses should be inserted and
@ -513,6 +515,7 @@ impl CrdsGossipPull {
filters: &[(CrdsValue, CrdsFilter)], filters: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned. output_size_limit: usize, // Limit number of crds values returned.
now: u64, now: u64,
stats: &GossipStats,
) -> Vec<Vec<CrdsValue>> { ) -> Vec<Vec<CrdsValue>> {
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS; let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4); let jitter = rand::thread_rng().gen_range(0, msg_timeout / 4);
@ -559,14 +562,12 @@ impl CrdsGossipPull {
.map(|(caller, filter)| apply_filter(caller, filter)) .map(|(caller, filter)| apply_filter(caller, filter))
.collect() .collect()
}); });
inc_new_counter_info!( stats
"gossip_filter_crds_values-dropped_requests", .filter_crds_values_dropped_requests
dropped_requests.into_inner() .add_relaxed(dropped_requests.into_inner() as u64);
); stats
inc_new_counter_info!( .filter_crds_values_dropped_values
"gossip_filter_crds_values-dropped_values", .add_relaxed(total_skipped.into_inner() as u64);
total_skipped.into_inner()
);
ret ret
} }
@ -1257,6 +1258,7 @@ pub(crate) mod tests {
&filters, &filters,
usize::MAX, // output_size_limit usize::MAX, // output_size_limit
0, // now 0, // now
&GossipStats::default(),
); );
assert_eq!(rsp[0].len(), 0); assert_eq!(rsp[0].len(), 0);
@ -1282,6 +1284,7 @@ pub(crate) mod tests {
&filters, &filters,
usize::MAX, // output_size_limit usize::MAX, // output_size_limit
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, // now CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, // now
&GossipStats::default(),
); );
assert_eq!(rsp[0].len(), 0); assert_eq!(rsp[0].len(), 0);
assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS); assert_eq!(filters.len(), MIN_NUM_BLOOM_FILTERS);
@ -1301,6 +1304,7 @@ pub(crate) mod tests {
&filters, &filters,
usize::MAX, // output_size_limit usize::MAX, // output_size_limit
CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS, CRDS_GOSSIP_PULL_MSG_TIMEOUT_MS,
&GossipStats::default(),
); );
assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS); assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS);
// There should be only one non-empty response in the 2nd half. // There should be only one non-empty response in the 2nd half.
@ -1357,6 +1361,7 @@ pub(crate) mod tests {
&filters, &filters,
usize::MAX, // output_size_limit usize::MAX, // output_size_limit
0, // now 0, // now
&GossipStats::default(),
); );
let callers = filters.into_iter().map(|(caller, _)| caller); let callers = filters.into_iter().map(|(caller, _)| caller);
CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1); CrdsGossipPull::process_pull_requests(&dest_crds, callers, 1);
@ -1442,6 +1447,7 @@ pub(crate) mod tests {
&filters, &filters,
usize::MAX, // output_size_limit usize::MAX, // output_size_limit
0, // now 0, // now
&GossipStats::default(),
); );
CrdsGossipPull::process_pull_requests( CrdsGossipPull::process_pull_requests(
&dest_crds, &dest_crds,

View File

@ -2,7 +2,7 @@
#![allow(clippy::integer_arithmetic)] #![allow(clippy::integer_arithmetic)]
pub mod cluster_info; pub mod cluster_info;
mod cluster_info_metrics; pub mod cluster_info_metrics;
#[macro_use] #[macro_use]
pub mod contact_info; pub mod contact_info;
pub mod crds; pub mod crds;

View File

@ -6,6 +6,7 @@ use {
serial_test::serial, serial_test::serial,
solana_gossip::{ solana_gossip::{
cluster_info, cluster_info,
cluster_info_metrics::GossipStats,
contact_info::ContactInfo, contact_info::ContactInfo,
crds::GossipRoute, crds::GossipRoute,
crds_gossip::*, crds_gossip::*,
@ -541,6 +542,7 @@ fn network_run_pull(
&filters, &filters,
usize::MAX, // output_size_limit usize::MAX, // output_size_limit
now, now,
&GossipStats::default(),
) )
.into_iter() .into_iter()
.flatten() .flatten()

View File

@ -6,6 +6,7 @@ use {
rayon::iter::*, rayon::iter::*,
solana_gossip::{ solana_gossip::{
cluster_info::{ClusterInfo, Node}, cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
crds::Cursor, crds::Cursor,
gossip_service::GossipService, gossip_service::GossipService,
}, },
@ -18,7 +19,10 @@ use {
timing::timestamp, timing::timestamp,
transaction::Transaction, transaction::Transaction,
}, },
solana_streamer::socket::SocketAddrSpace, solana_streamer::{
sendmmsg::{multi_target_send, SendPktsError},
socket::SocketAddrSpace,
},
solana_vote_program::{vote_instruction, vote_state::Vote}, solana_vote_program::{vote_instruction, vote_state::Vote},
std::{ std::{
net::UdpSocket, net::UdpSocket,
@ -113,6 +117,40 @@ where
} }
assert!(done); assert!(done);
} }
/// retransmit messages to a list of nodes
fn retransmit_to(
peers: &[&ContactInfo],
data: &[u8],
socket: &UdpSocket,
forwarded: bool,
socket_addr_space: &SocketAddrSpace,
) {
trace!("retransmit orders {}", peers.len());
let dests: Vec<_> = if forwarded {
peers
.iter()
.map(|peer| peer.tvu_forwards)
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
.collect()
} else {
peers
.iter()
.map(|peer| peer.tvu)
.filter(|addr| socket_addr_space.check(addr))
.collect()
};
if let Err(SendPktsError::IoError(ioerr, num_failed)) = multi_target_send(socket, data, &dests)
{
error!(
"retransmit_to multi_target_send error: {:?}, {}/{} packets failed",
ioerr,
num_failed,
dests.len(),
);
}
}
/// ring a -> b -> c -> d -> e -> a /// ring a -> b -> c -> d -> e -> a
#[test] #[test]
fn gossip_ring() { fn gossip_ring() {
@ -220,7 +258,7 @@ pub fn cluster_info_retransmit() {
p.meta.size = 10; p.meta.size = 10;
let peers = c1.tvu_peers(); let peers = c1.tvu_peers();
let retransmit_peers: Vec<_> = peers.iter().collect(); let retransmit_peers: Vec<_> = peers.iter().collect();
ClusterInfo::retransmit_to( retransmit_to(
&retransmit_peers, &retransmit_peers,
&p.data[..p.meta.size], &p.data[..p.meta.size],
&tn1, &tn1,