stats for staked/unstaked repair requests (#28215)
This commit is contained in:
parent
ac983b725f
commit
e3e888c0e0
|
@ -46,7 +46,7 @@ use {
|
|||
streamer::{PacketBatchReceiver, PacketBatchSender},
|
||||
},
|
||||
std::{
|
||||
collections::HashSet,
|
||||
collections::{HashMap, HashSet},
|
||||
net::{SocketAddr, UdpSocket},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
|
@ -157,7 +157,10 @@ struct ServeRepairStats {
|
|||
dropped_requests_load_shed: usize,
|
||||
total_dropped_response_packets: usize,
|
||||
total_response_packets: usize,
|
||||
total_response_bytes: usize,
|
||||
total_response_bytes_staked: usize,
|
||||
total_response_bytes_unstaked: usize,
|
||||
handle_requests_staked: usize,
|
||||
handle_requests_unstaked: usize,
|
||||
processed: usize,
|
||||
self_repair: usize,
|
||||
window_index: usize,
|
||||
|
@ -475,6 +478,7 @@ impl ServeRepair {
|
|||
stats.total_requests += total_requests;
|
||||
|
||||
let root_bank = self.bank_forks.read().unwrap().root_bank();
|
||||
let epoch_staked_nodes = root_bank.epoch_staked_nodes(root_bank.epoch());
|
||||
for reqs in reqs_v {
|
||||
self.handle_packets(
|
||||
ping_cache,
|
||||
|
@ -482,9 +486,9 @@ impl ServeRepair {
|
|||
blockstore,
|
||||
reqs,
|
||||
response_sender,
|
||||
&root_bank,
|
||||
stats,
|
||||
data_budget,
|
||||
&epoch_staked_nodes,
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
|
@ -519,8 +523,24 @@ impl ServeRepair {
|
|||
stats.total_dropped_response_packets,
|
||||
i64
|
||||
),
|
||||
("handle_requests_staked", stats.handle_requests_staked, i64),
|
||||
(
|
||||
"handle_requests_unstaked",
|
||||
stats.handle_requests_unstaked,
|
||||
i64
|
||||
),
|
||||
("processed", stats.processed, i64),
|
||||
("total_response_packets", stats.total_response_packets, i64),
|
||||
("total_response_bytes", stats.total_response_bytes, i64),
|
||||
(
|
||||
"total_response_bytes_staked",
|
||||
stats.total_response_bytes_staked,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"total_response_bytes_unstaked",
|
||||
stats.total_response_bytes_unstaked,
|
||||
i64
|
||||
),
|
||||
("self_repair", stats.self_repair, i64),
|
||||
("window_index", stats.window_index, i64),
|
||||
(
|
||||
|
@ -670,9 +690,9 @@ impl ServeRepair {
|
|||
blockstore: &Blockstore,
|
||||
packet_batch: PacketBatch,
|
||||
response_sender: &PacketBatchSender,
|
||||
_root_bank: &Bank,
|
||||
stats: &mut ServeRepairStats,
|
||||
data_budget: &DataBudget,
|
||||
epoch_staked_nodes: &Option<Arc<HashMap<Pubkey, u64>>>,
|
||||
) {
|
||||
let identity_keypair = self.cluster_info.keypair().clone();
|
||||
let my_id = identity_keypair.pubkey();
|
||||
|
@ -687,6 +707,15 @@ impl ServeRepair {
|
|||
}
|
||||
};
|
||||
|
||||
let staked = epoch_staked_nodes
|
||||
.as_ref()
|
||||
.map(|nodes| nodes.contains_key(request.sender()))
|
||||
.unwrap_or_default();
|
||||
match staked {
|
||||
true => stats.handle_requests_staked += 1,
|
||||
false => stats.handle_requests_unstaked += 1,
|
||||
}
|
||||
|
||||
if request.sender() == &my_id {
|
||||
stats.self_repair += 1;
|
||||
continue;
|
||||
|
@ -710,8 +739,11 @@ impl ServeRepair {
|
|||
let num_response_packets = rsp.len();
|
||||
let num_response_bytes = rsp.iter().map(|p| p.meta.size).sum();
|
||||
if data_budget.take(num_response_bytes) && response_sender.send(rsp).is_ok() {
|
||||
stats.total_response_bytes += num_response_bytes;
|
||||
stats.total_response_packets += num_response_packets;
|
||||
match staked {
|
||||
true => stats.total_response_bytes_staked += num_response_bytes,
|
||||
false => stats.total_response_bytes_unstaked += num_response_bytes,
|
||||
}
|
||||
} else {
|
||||
stats.dropped_requests_outbound_bandwidth += packet_batch.len() - i;
|
||||
stats.total_dropped_response_packets += num_response_packets;
|
||||
|
|
Loading…
Reference in New Issue