From 5be138826b78577b5b9448e185f6f2f220994b61 Mon Sep 17 00:00:00 2001 From: sakridge Date: Sat, 7 May 2022 16:45:03 +0200 Subject: [PATCH] Add sender stake to quic packets (#25054) --- streamer/src/quic.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 9afceeb35..91f3a359e 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -154,6 +154,7 @@ fn handle_chunk( remote_addr: &SocketAddr, packet_sender: &Sender, stats: Arc, + stake: u64, ) -> bool { match chunk { Ok(maybe_chunk) => { @@ -178,6 +179,7 @@ fn handle_chunk( let mut batch = PacketBatch::with_capacity(1); let mut packet = Packet::default(); packet.meta.set_addr(remote_addr); + packet.meta.sender_stake = stake; batch.packets.push(packet); *maybe_batch = Some(batch); stats @@ -433,6 +435,7 @@ fn handle_connection( connection_table: Arc>, stream_exit: Arc, stats: Arc, + stake: u64, ) { tokio::spawn(async move { debug!( @@ -455,6 +458,7 @@ fn handle_connection( &remote_addr, &packet_sender, stats.clone(), + stake, ) { last_update.store(timing::timestamp(), Ordering::Relaxed); break; @@ -535,21 +539,26 @@ pub fn spawn_server( let remote_addr = connection.remote_address(); - let mut connection_table_l = - if staked_nodes.read().unwrap().contains_key(&remote_addr.ip()) { + let (mut connection_table_l, stake) = { + let staked_nodes = staked_nodes.read().unwrap(); + if let Some(stake) = staked_nodes.get(&remote_addr.ip()) { + let stake = *stake; + drop(staked_nodes); let mut connection_table_l = staked_connection_table.lock().unwrap(); let num_pruned = connection_table_l.prune_oldest(max_staked_connections); stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); - connection_table_l + (connection_table_l, stake) } else { + drop(staked_nodes); let mut connection_table_l = connection_table.lock().unwrap(); let num_pruned = connection_table_l.prune_oldest(max_unstaked_connections); stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); - connection_table_l - }; + (connection_table_l, 0) + } + }; if let Some((last_update, stream_exit)) = connection_table_l .try_add_connection( @@ -570,6 +579,7 @@ pub fn spawn_server( connection_table1, stream_exit, stats, + stake, ); } else { stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);