Add in metrics for detecting Redundant Pulls (#199)
This commit is contained in:
parent
6bcb77dcfa
commit
d49ceb0e3f
|
@ -315,6 +315,11 @@ pub(crate) fn submit_gossip_stats(
|
||||||
stats.process_pull_response_timeout.clear(),
|
stats.process_pull_response_timeout.clear(),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"num_redundant_pull_responses",
|
||||||
|
crds_stats.num_redundant_pull_responses,
|
||||||
|
i64
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"push_response_count",
|
"push_response_count",
|
||||||
stats.push_response_count.clear(),
|
stats.push_response_count.clear(),
|
||||||
|
|
|
@ -115,6 +115,9 @@ pub(crate) struct CrdsDataStats {
|
||||||
pub(crate) struct CrdsStats {
|
pub(crate) struct CrdsStats {
|
||||||
pub(crate) pull: CrdsDataStats,
|
pub(crate) pull: CrdsDataStats,
|
||||||
pub(crate) push: CrdsDataStats,
|
pub(crate) push: CrdsDataStats,
|
||||||
|
/// number of times a message was first received via a PullResponse
|
||||||
|
/// and that message was later received via a PushMessage
|
||||||
|
pub(crate) num_redundant_pull_responses: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This structure stores some local metadata associated with the CrdsValue
|
/// This structure stores some local metadata associated with the CrdsValue
|
||||||
|
@ -127,8 +130,10 @@ pub struct VersionedCrdsValue {
|
||||||
pub(crate) local_timestamp: u64,
|
pub(crate) local_timestamp: u64,
|
||||||
/// value hash
|
/// value hash
|
||||||
pub(crate) value_hash: Hash,
|
pub(crate) value_hash: Hash,
|
||||||
/// Number of times duplicates of this value are recevied from gossip push.
|
/// None -> value upserted by GossipRoute::{LocalMessage,PullRequest}
|
||||||
num_push_dups: u8,
|
/// Some(0) -> value upserted by GossipRoute::PullResponse
|
||||||
|
/// Some(k) if k > 0 -> value upserted by GossipRoute::PushMessage w/ k - 1 push duplicates
|
||||||
|
num_push_recv: Option<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Default)]
|
#[derive(Clone, Copy, Default)]
|
||||||
|
@ -147,14 +152,21 @@ impl Cursor {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl VersionedCrdsValue {
|
impl VersionedCrdsValue {
|
||||||
fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64) -> Self {
|
fn new(value: CrdsValue, cursor: Cursor, local_timestamp: u64, route: GossipRoute) -> Self {
|
||||||
let value_hash = hash(&serialize(&value).unwrap());
|
let value_hash = hash(&serialize(&value).unwrap());
|
||||||
|
let num_push_recv = match route {
|
||||||
|
GossipRoute::LocalMessage => None,
|
||||||
|
GossipRoute::PullRequest => None,
|
||||||
|
GossipRoute::PullResponse => Some(0),
|
||||||
|
GossipRoute::PushMessage(_) => Some(1),
|
||||||
|
};
|
||||||
|
|
||||||
VersionedCrdsValue {
|
VersionedCrdsValue {
|
||||||
ordinal: cursor.ordinal(),
|
ordinal: cursor.ordinal(),
|
||||||
value,
|
value,
|
||||||
local_timestamp,
|
local_timestamp,
|
||||||
value_hash,
|
value_hash,
|
||||||
num_push_dups: 0u8,
|
num_push_recv,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -222,7 +234,7 @@ impl Crds {
|
||||||
) -> Result<(), CrdsError> {
|
) -> Result<(), CrdsError> {
|
||||||
let label = value.label();
|
let label = value.label();
|
||||||
let pubkey = value.pubkey();
|
let pubkey = value.pubkey();
|
||||||
let value = VersionedCrdsValue::new(value, self.cursor, now);
|
let value = VersionedCrdsValue::new(value, self.cursor, now, route);
|
||||||
match self.table.entry(label) {
|
match self.table.entry(label) {
|
||||||
Entry::Vacant(entry) => {
|
Entry::Vacant(entry) => {
|
||||||
self.stats.lock().unwrap().record_insert(&value, route);
|
self.stats.lock().unwrap().record_insert(&value, route);
|
||||||
|
@ -303,8 +315,12 @@ impl Crds {
|
||||||
Err(CrdsError::InsertFailed)
|
Err(CrdsError::InsertFailed)
|
||||||
} else if matches!(route, GossipRoute::PushMessage(_)) {
|
} else if matches!(route, GossipRoute::PushMessage(_)) {
|
||||||
let entry = entry.get_mut();
|
let entry = entry.get_mut();
|
||||||
entry.num_push_dups = entry.num_push_dups.saturating_add(1);
|
if entry.num_push_recv == Some(0) {
|
||||||
Err(CrdsError::DuplicatePush(entry.num_push_dups))
|
self.stats.lock().unwrap().num_redundant_pull_responses += 1;
|
||||||
|
}
|
||||||
|
let num_push_dups = entry.num_push_recv.unwrap_or_default();
|
||||||
|
entry.num_push_recv = Some(num_push_dups.saturating_add(1));
|
||||||
|
Err(CrdsError::DuplicatePush(num_push_dups))
|
||||||
} else {
|
} else {
|
||||||
Err(CrdsError::InsertFailed)
|
Err(CrdsError::InsertFailed)
|
||||||
}
|
}
|
||||||
|
@ -1450,8 +1466,9 @@ mod tests {
|
||||||
#[allow(clippy::neg_cmp_op_on_partial_ord)]
|
#[allow(clippy::neg_cmp_op_on_partial_ord)]
|
||||||
fn test_equal() {
|
fn test_equal() {
|
||||||
let val = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::default()));
|
let val = CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::default()));
|
||||||
let v1 = VersionedCrdsValue::new(val.clone(), Cursor::default(), 1);
|
let v1 =
|
||||||
let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1);
|
VersionedCrdsValue::new(val.clone(), Cursor::default(), 1, GossipRoute::LocalMessage);
|
||||||
|
let v2 = VersionedCrdsValue::new(val, Cursor::default(), 1, GossipRoute::LocalMessage);
|
||||||
assert_eq!(v1, v2);
|
assert_eq!(v1, v2);
|
||||||
assert!(!(v1 != v2));
|
assert!(!(v1 != v2));
|
||||||
assert!(!overrides(&v1.value, &v2));
|
assert!(!overrides(&v1.value, &v2));
|
||||||
|
@ -1467,6 +1484,7 @@ mod tests {
|
||||||
))),
|
))),
|
||||||
Cursor::default(),
|
Cursor::default(),
|
||||||
1, // local_timestamp
|
1, // local_timestamp
|
||||||
|
GossipRoute::LocalMessage,
|
||||||
);
|
);
|
||||||
let v2 = VersionedCrdsValue::new(
|
let v2 = VersionedCrdsValue::new(
|
||||||
{
|
{
|
||||||
|
@ -1476,6 +1494,7 @@ mod tests {
|
||||||
},
|
},
|
||||||
Cursor::default(),
|
Cursor::default(),
|
||||||
1, // local_timestamp
|
1, // local_timestamp
|
||||||
|
GossipRoute::LocalMessage,
|
||||||
);
|
);
|
||||||
|
|
||||||
assert_eq!(v1.value.label(), v2.value.label());
|
assert_eq!(v1.value.label(), v2.value.label());
|
||||||
|
@ -1501,6 +1520,7 @@ mod tests {
|
||||||
))),
|
))),
|
||||||
Cursor::default(),
|
Cursor::default(),
|
||||||
1, // local_timestamp
|
1, // local_timestamp
|
||||||
|
GossipRoute::LocalMessage,
|
||||||
);
|
);
|
||||||
let v2 = VersionedCrdsValue::new(
|
let v2 = VersionedCrdsValue::new(
|
||||||
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
|
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
|
||||||
|
@ -1509,6 +1529,7 @@ mod tests {
|
||||||
))),
|
))),
|
||||||
Cursor::default(),
|
Cursor::default(),
|
||||||
1, // local_timestamp
|
1, // local_timestamp
|
||||||
|
GossipRoute::LocalMessage,
|
||||||
);
|
);
|
||||||
assert_eq!(v1.value.label(), v2.value.label());
|
assert_eq!(v1.value.label(), v2.value.label());
|
||||||
assert!(overrides(&v1.value, &v2));
|
assert!(overrides(&v1.value, &v2));
|
||||||
|
@ -1527,6 +1548,7 @@ mod tests {
|
||||||
))),
|
))),
|
||||||
Cursor::default(),
|
Cursor::default(),
|
||||||
1, // local_timestamp
|
1, // local_timestamp
|
||||||
|
GossipRoute::LocalMessage,
|
||||||
);
|
);
|
||||||
let v2 = VersionedCrdsValue::new(
|
let v2 = VersionedCrdsValue::new(
|
||||||
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
|
CrdsValue::new_unsigned(CrdsData::LegacyContactInfo(ContactInfo::new_localhost(
|
||||||
|
@ -1535,6 +1557,7 @@ mod tests {
|
||||||
))),
|
))),
|
||||||
Cursor::default(),
|
Cursor::default(),
|
||||||
1, // local_timestamp
|
1, // local_timestamp
|
||||||
|
GossipRoute::LocalMessage,
|
||||||
);
|
);
|
||||||
assert_ne!(v1, v2);
|
assert_ne!(v1, v2);
|
||||||
assert!(!(v1 == v2));
|
assert!(!(v1 == v2));
|
||||||
|
|
Loading…
Reference in New Issue