enable repair ping/pong cache (#28408)

This commit is contained in:
Jeff Biseda 2022-10-19 14:55:55 -07:00 committed by GitHub
parent 09ede8b85c
commit 0df4be06a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 71 additions and 1 deletions

View File

@ -166,6 +166,8 @@ struct ServeRepairStats {
orphan: usize,
pong: usize,
ancestor_hashes: usize,
ping_cache_check_failed: usize,
pings_sent: usize,
err_time_skew: usize,
err_malformed: usize,
err_sig_verify: usize,
@ -531,6 +533,12 @@ impl ServeRepair {
i64
),
("pong", stats.pong, i64),
(
"ping_cache_check_failed",
stats.ping_cache_check_failed,
i64
),
("pings_sent", stats.pings_sent, i64),
("err_time_skew", stats.err_time_skew, i64),
("err_malformed", stats.err_malformed, i64),
("err_sig_verify", stats.err_sig_verify, i64),
@ -659,6 +667,43 @@ impl ServeRepair {
true
}
fn check_ping_cache(
ping_cache: &mut PingCache,
request: &RepairProtocol,
from_addr: &SocketAddr,
identity_keypair: &Keypair,
) -> (bool, Option<Packet>) {
let mut rng = rand::thread_rng();
let mut pingf = move || Ping::new_rand(&mut rng, identity_keypair).ok();
let (check, ping) =
ping_cache.check(Instant::now(), (*request.sender(), *from_addr), &mut pingf);
let ping_pkt = if let Some(ping) = ping {
match request {
RepairProtocol::LegacyWindowIndex(_, _, _)
| RepairProtocol::LegacyHighestWindowIndex(_, _, _)
| RepairProtocol::LegacyOrphan(_, _)
| RepairProtocol::LegacyWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyHighestWindowIndexWithNonce(_, _, _, _)
| RepairProtocol::LegacyOrphanWithNonce(_, _, _)
| RepairProtocol::WindowIndex { .. }
| RepairProtocol::HighestWindowIndex { .. }
| RepairProtocol::Orphan { .. } => {
let ping = RepairResponse::Ping(ping);
Packet::from_data(Some(from_addr), ping).ok()
}
RepairProtocol::LegacyAncestorHashes(_, _, _)
| RepairProtocol::AncestorHashes { .. } => {
let ping = AncestorHashesResponse::Ping(ping);
Packet::from_data(Some(from_addr), ping).ok()
}
RepairProtocol::Pong(_) => None,
}
} else {
None
};
(check, ping_pkt)
}
fn handle_packets(
&self,
ping_cache: &mut PingCache,
@ -672,6 +717,8 @@ impl ServeRepair {
) {
let identity_keypair = self.cluster_info.keypair().clone();
let my_id = identity_keypair.pubkey();
let socket_addr_space = *self.cluster_info.socket_addr_space();
let mut pending_pings = Vec::default();
// iter over the packets
for (i, packet) in packet_batch.iter().enumerate() {
@ -683,6 +730,12 @@ impl ServeRepair {
}
};
let from_addr = packet.meta.socket_addr();
if !ContactInfo::is_valid_address(&from_addr, &socket_addr_space) {
stats.err_malformed += 1;
continue;
}
let staked = epoch_staked_nodes
.as_ref()
.map(|nodes| nodes.contains_key(request.sender()))
@ -704,7 +757,18 @@ impl ServeRepair {
stats.unsigned_requests += 1;
}
let from_addr = packet.meta.socket_addr();
if !matches!(&request, RepairProtocol::Pong(_)) {
let (check, ping_pkt) =
Self::check_ping_cache(ping_cache, &request, &from_addr, &identity_keypair);
if let Some(ping_pkt) = ping_pkt {
pending_pings.push(ping_pkt);
}
if !check {
// collect stats for ping/pong verification
stats.ping_cache_check_failed += 1;
}
}
stats.processed += 1;
let rsp = match Self::handle_repair(
recycler, &from_addr, blockstore, request, stats, ping_cache,
@ -726,6 +790,12 @@ impl ServeRepair {
break;
}
}
if !pending_pings.is_empty() {
stats.pings_sent += pending_pings.len();
let batch = PacketBatch::new(pending_pings);
let _ignore = response_sender.send(batch);
}
}
pub fn ancestor_repair_request_bytes(