Different staked vs unstaked chunks_received (#27033)
* Different staked vs unstaked chunks_received * Suppress a clippy warning
This commit is contained in:
parent
52d8a20a84
commit
ddd660e2d3
|
@ -275,6 +275,7 @@ fn handle_and_cache_new_connection(
|
||||||
timing::timestamp(),
|
timing::timestamp(),
|
||||||
params.max_connections_per_peer,
|
params.max_connections_per_peer,
|
||||||
) {
|
) {
|
||||||
|
let peer_type = connection_table_l.peer_type;
|
||||||
drop(connection_table_l);
|
drop(connection_table_l);
|
||||||
tokio::spawn(handle_connection(
|
tokio::spawn(handle_connection(
|
||||||
uni_streams,
|
uni_streams,
|
||||||
|
@ -286,6 +287,7 @@ fn handle_and_cache_new_connection(
|
||||||
stream_exit,
|
stream_exit,
|
||||||
params.stats.clone(),
|
params.stats.clone(),
|
||||||
params.stake,
|
params.stake,
|
||||||
|
peer_type,
|
||||||
));
|
));
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
|
@ -478,6 +480,7 @@ async fn setup_connection(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn handle_connection(
|
async fn handle_connection(
|
||||||
mut uni_streams: IncomingUniStreams,
|
mut uni_streams: IncomingUniStreams,
|
||||||
packet_sender: Sender<PacketBatch>,
|
packet_sender: Sender<PacketBatch>,
|
||||||
|
@ -488,6 +491,7 @@ async fn handle_connection(
|
||||||
stream_exit: Arc<AtomicBool>,
|
stream_exit: Arc<AtomicBool>,
|
||||||
stats: Arc<StreamStats>,
|
stats: Arc<StreamStats>,
|
||||||
stake: u64,
|
stake: u64,
|
||||||
|
peer_type: ConnectionPeerType,
|
||||||
) {
|
) {
|
||||||
debug!(
|
debug!(
|
||||||
"quic new connection {} streams: {} connections: {}",
|
"quic new connection {} streams: {} connections: {}",
|
||||||
|
@ -527,6 +531,7 @@ async fn handle_connection(
|
||||||
&packet_sender,
|
&packet_sender,
|
||||||
stats.clone(),
|
stats.clone(),
|
||||||
stake,
|
stake,
|
||||||
|
peer_type,
|
||||||
) {
|
) {
|
||||||
last_update.store(timing::timestamp(), Ordering::Relaxed);
|
last_update.store(timing::timestamp(), Ordering::Relaxed);
|
||||||
break;
|
break;
|
||||||
|
@ -574,6 +579,7 @@ fn handle_chunk(
|
||||||
packet_sender: &Sender<PacketBatch>,
|
packet_sender: &Sender<PacketBatch>,
|
||||||
stats: Arc<StreamStats>,
|
stats: Arc<StreamStats>,
|
||||||
stake: u64,
|
stake: u64,
|
||||||
|
peer_type: ConnectionPeerType,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
match chunk {
|
match chunk {
|
||||||
Ok(maybe_chunk) => {
|
Ok(maybe_chunk) => {
|
||||||
|
@ -620,6 +626,18 @@ fn handle_chunk(
|
||||||
.copy_from_slice(&chunk.bytes);
|
.copy_from_slice(&chunk.bytes);
|
||||||
batch[0].meta.size = std::cmp::max(batch[0].meta.size, end_of_chunk);
|
batch[0].meta.size = std::cmp::max(batch[0].meta.size, end_of_chunk);
|
||||||
stats.total_chunks_received.fetch_add(1, Ordering::Relaxed);
|
stats.total_chunks_received.fetch_add(1, Ordering::Relaxed);
|
||||||
|
match peer_type {
|
||||||
|
ConnectionPeerType::Staked => {
|
||||||
|
stats
|
||||||
|
.total_staked_chunks_received
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
ConnectionPeerType::Unstaked => {
|
||||||
|
stats
|
||||||
|
.total_unstaked_chunks_received
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
trace!("chunk is none");
|
trace!("chunk is none");
|
||||||
|
|
|
@ -123,6 +123,8 @@ pub struct StreamStats {
|
||||||
pub(crate) total_invalid_chunk_size: AtomicUsize,
|
pub(crate) total_invalid_chunk_size: AtomicUsize,
|
||||||
pub(crate) total_packets_allocated: AtomicUsize,
|
pub(crate) total_packets_allocated: AtomicUsize,
|
||||||
pub(crate) total_chunks_received: AtomicUsize,
|
pub(crate) total_chunks_received: AtomicUsize,
|
||||||
|
pub(crate) total_staked_chunks_received: AtomicUsize,
|
||||||
|
pub(crate) total_unstaked_chunks_received: AtomicUsize,
|
||||||
pub(crate) total_packet_batch_send_err: AtomicUsize,
|
pub(crate) total_packet_batch_send_err: AtomicUsize,
|
||||||
pub(crate) total_packet_batches_sent: AtomicUsize,
|
pub(crate) total_packet_batches_sent: AtomicUsize,
|
||||||
pub(crate) total_packet_batches_none: AtomicUsize,
|
pub(crate) total_packet_batches_none: AtomicUsize,
|
||||||
|
@ -252,6 +254,17 @@ impl StreamStats {
|
||||||
self.total_chunks_received.swap(0, Ordering::Relaxed),
|
self.total_chunks_received.swap(0, Ordering::Relaxed),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"staked_chunks_received",
|
||||||
|
self.total_staked_chunks_received.swap(0, Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"unstaked_chunks_received",
|
||||||
|
self.total_unstaked_chunks_received
|
||||||
|
.swap(0, Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
(
|
(
|
||||||
"packet_batch_send_error",
|
"packet_batch_send_error",
|
||||||
self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
|
self.total_packet_batch_send_err.swap(0, Ordering::Relaxed),
|
||||||
|
|
Loading…
Reference in New Issue