diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 0e474d3cf..74dc0c43e 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -315,6 +315,11 @@ pub(crate) fn submit_gossip_stats( stats.process_pull_response_timeout.clear(), i64 ), + ( + "num_redundant_pull_responses", + crds_stats.num_redundant_pull_responses, + i64 + ), ( "push_response_count", stats.push_response_count.clear(), diff --git a/gossip/src/crds.rs b/gossip/src/crds.rs index 719bc1384..dbb6c43c0 100644 --- a/gossip/src/crds.rs +++ b/gossip/src/crds.rs @@ -115,6 +115,9 @@ pub(crate) struct CrdsDataStats { pub(crate) struct CrdsStats { pub(crate) pull: CrdsDataStats, pub(crate) push: CrdsDataStats, + /// number of times a message was first received via a PullResponse + /// and that message was later received via a PushMessage + pub(crate) num_redundant_pull_responses: u64, } /// This structure stores some local metadata associated with the CrdsValue @@ -127,8 +130,10 @@ pub struct VersionedCrdsValue { pub(crate) local_timestamp: u64, /// value hash pub(crate) value_hash: Hash, - /// Number of times duplicates of this value are recevied from gossip push. - num_push_dups: u8, + /// None -> value upserted by GossipRoute::{LocalMessage,PullRequest} + /// Some(0) -> value upserted by GossipRoute::PullResponse + /// Some(k) if k > 0 -> value upserted by GossipRoute::PushMessage w/ k - 1 push duplicates + num_push_recv: Option, } #[derive(Clone, Copy, Default)] @@ -147,14 +152,21 @@ impl Cursor { } impl VersionedCrdsValue { - fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64) -> Self { + fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64, route: GossipRoute) -> Self { let value_hash = hash(&serialize(&value).unwrap()); + let num_push_recv = match route { + GossipRoute::LocalMessage => None, + GossipRoute::PullRequest => None, + GossipRoute::PullResponse => Some(0), + GossipRoute::PushMessage(_) => Some(1), + }; + VersionedCrdsValue { ordinal: cursor.ordinal(), value, local_timestamp, value_hash, - num_push_dups: 0u8, + num_push_recv, } } } @@ -222,7 +234,7 @@ impl Crds { ) -> Result<(), CrdsError> { let label = value.label(); let pubkey = value.pubkey(); - let value = VersionedCrdsValue::new(value, self.cursor, now); + let value = VersionedCrdsValue::new(value, self.cursor, now, route); match self.table.entry(label) { Entry::Vacant(entry) => { self.stats.lock().unwrap().record_insert(&value, route); @@ -303,8 +315,12 @@ impl Crds { Err(CrdsError::InsertFailed) } else if matches!(route, GossipRoute::PushMessage(_)) { let entry = entry.get_mut(); - entry.num_push_dups = entry.num_push_dups.saturating_add(1); - Err(CrdsError::DuplicatePush(entry.num_push_dups)) + if entry.num_push_recv == Some(0) { + self.stats.lock().unwrap().num_redundant_pull_responses += 1; + } + let num_push_dups = entry.num_push_recv.unwrap_or_default(); + entry.num_push_recv = Some(num_push_dups.saturating_add(1)); + Err(CrdsError::DuplicatePush(num_push_dups)) } else { Err(CrdsError::InsertFailed) } @@ -1450,8 +1466,9 @@ mod tests { #[allow(clippy::neg_cmp_op_on_partial_ord)] fn test_equal() { let val = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::default())); - let v1 = VersionedCrdsValue::new(val.clone(), Cursor::default(), 1); - let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1); + let v1 = + VersionedCrdsValue::new(val.clone(), Cursor::default(), 1, GossipRoute::LocalMessage); + let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1, GossipRoute::LocalMessage); assert_eq!(v1, v2); assert!(!(v1 != v2)); assert!(!overrides(&v1.value, &v2)); @@ -1467,6 +1484,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); let v2 = VersionedCrdsValue::new( { @@ -1476,6 +1494,7 @@ mod tests { }, Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); assert_eq!(v1.value.label(), v2.value.label()); @@ -1501,6 +1520,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); let v2 = VersionedCrdsValue::new( CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost( @@ -1509,6 +1529,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); assert_eq!(v1.value.label(), v2.value.label()); assert!(overrides(&v1.value, &v2)); @@ -1527,6 +1548,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); let v2 = VersionedCrdsValue::new( CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost( @@ -1535,6 +1557,7 @@ mod tests { ))), Cursor::default(), 1, // local_timestamp + GossipRoute::LocalMessage, ); assert_ne!(v1, v2); assert!(!(v1 == v2));