adds metrics for gossip push fanout (#29065)

This commit is contained in:
behzad nouri 2022-12-04 15:20:51 +00:00 committed by GitHub
parent fcde8c8366
commit 718f433206
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 37 additions and 11 deletions

View File

@ -1511,11 +1511,17 @@ impl ClusterInfo {
}
fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id();
let mut push_messages = {
let (mut push_messages, num_entries, num_nodes) = {
let _st = ScopedTimer::from(&self.stats.new_push_requests);
self.gossip
.new_push_messages(self.drain_push_queue(), timestamp())
};
self.stats
.push_fanout_num_entries
.add_relaxed(num_entries as u64);
self.stats
.push_fanout_num_nodes
.add_relaxed(num_nodes as u64);
if self.require_stake_for_gossip(stakes) {
push_messages.retain(|_, data| {
retain_staked(data, stakes);
@ -3723,7 +3729,7 @@ RPC Enabled Nodes: 1"#;
&SocketAddrSpace::Unspecified,
);
//check that all types of gossip messages are signed correctly
let push_messages = cluster_info
let (push_messages, _, _) = cluster_info
.gossip
.new_push_messages(cluster_info.drain_push_queue(), timestamp());
// there should be some pushes ready

View File

@ -166,6 +166,8 @@ pub struct GossipStats {
pub(crate) pull_requests_count: Counter,
pub(crate) purge: Counter,
pub(crate) purge_count: Counter,
pub(crate) push_fanout_num_entries: Counter,
pub(crate) push_fanout_num_nodes: Counter,
pub(crate) push_message_count: Counter,
pub(crate) push_message_pushes: Counter,
pub(crate) push_message_value_count: Counter,
@ -443,6 +445,16 @@ pub(crate) fn submit_gossip_stats(
i64
),
("push_message_count", stats.push_message_count.clear(), i64),
(
"push_fanout_num_entries",
stats.push_fanout_num_entries.clear(),
i64
),
(
"push_fanout_num_nodes",
stats.push_fanout_num_nodes.clear(),
i64
),
(
"push_message_pushes",
stats.push_message_pushes.clear(),

View File

@ -86,7 +86,11 @@ impl CrdsGossip {
&self,
pending_push_messages: Vec<CrdsValue>,
now: u64,
) -> HashMap<Pubkey, Vec<CrdsValue>> {
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
usize, // number of push messages
) {
{
let mut crds = self.crds.write().unwrap();
for entry in pending_push_messages {

View File

@ -269,12 +269,16 @@ impl CrdsGossipPush {
&self,
crds: &RwLock<Crds>,
now: u64,
) -> HashMap<Pubkey, Vec<CrdsValue>> {
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
usize, // number of push messages
) {
let active_set = self.active_set.read().unwrap();
let active_set_len = active_set.len();
let push_fanout = self.push_fanout.min(active_set_len);
if push_fanout == 0 {
return HashMap::default();
return (HashMap::default(), 0, 0);
}
let mut num_pushes = 0;
let mut num_values = 0;
@ -318,7 +322,7 @@ impl CrdsGossipPush {
for target_pubkey in push_messages.keys().copied() {
last_pushed_to.put(target_pubkey, now);
}
push_messages
(push_messages, num_values, num_pushes)
}
/// Add the `from` to the peer's filter of nodes.
@ -997,7 +1001,7 @@ mod tests {
[Ok(origin)]
);
assert_eq!(push.active_set.read().unwrap().len(), 1);
assert_eq!(push.new_push_messages(&crds, 0), expected);
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
}
#[test]
fn test_personalized_push_messages() {
@ -1051,7 +1055,7 @@ mod tests {
.into_iter()
.collect();
assert_eq!(push.active_set.read().unwrap().len(), 3);
assert_eq!(push.new_push_messages(&crds, now), expected);
assert_eq!(push.new_push_messages(&crds, now).0, expected);
}
#[test]
fn test_process_prune() {
@ -1096,7 +1100,7 @@ mod tests {
&peer.label().pubkey(),
&[new_msg.label().pubkey()],
);
assert_eq!(push.new_push_messages(&crds, 0), expected);
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
}
#[test]
fn test_purge_old_pending_push_messages() {
@ -1131,7 +1135,7 @@ mod tests {
push.process_push_message(&crds, &Pubkey::default(), vec![new_msg], 1),
[Ok(origin)],
);
assert_eq!(push.new_push_messages(&crds, 0), expected);
assert_eq!(push.new_push_messages(&crds, 0).0, expected);
}
#[test]

View File

@ -347,7 +347,7 @@ fn network_run_push(
Duration::from_millis(node.gossip.pull.crds_timeout),
);
node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts);
(node_pubkey, node.gossip.new_push_messages(vec![], now))
(node_pubkey, node.gossip.new_push_messages(vec![], now).0)
})
.collect();
let transfered: Vec<_> = requests