Add sender stake to quic packets (#25054)

This commit is contained in:
sakridge 2022-05-07 16:45:03 +02:00 committed by GitHub
parent d9deab4d2c
commit 5be138826b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 5 deletions

View File

@ -154,6 +154,7 @@ fn handle_chunk(
remote_addr: &SocketAddr,
packet_sender: &Sender<PacketBatch>,
stats: Arc<StreamStats>,
stake: u64,
) -> bool {
match chunk {
Ok(maybe_chunk) => {
@ -178,6 +179,7 @@ fn handle_chunk(
let mut batch = PacketBatch::with_capacity(1);
let mut packet = Packet::default();
packet.meta.set_addr(remote_addr);
packet.meta.sender_stake = stake;
batch.packets.push(packet);
*maybe_batch = Some(batch);
stats
@ -433,6 +435,7 @@ fn handle_connection(
connection_table: Arc<Mutex<ConnectionTable>>,
stream_exit: Arc<AtomicBool>,
stats: Arc<StreamStats>,
stake: u64,
) {
tokio::spawn(async move {
debug!(
@ -455,6 +458,7 @@ fn handle_connection(
&remote_addr,
&packet_sender,
stats.clone(),
stake,
) {
last_update.store(timing::timestamp(), Ordering::Relaxed);
break;
@ -535,21 +539,26 @@ pub fn spawn_server(
let remote_addr = connection.remote_address();
let mut connection_table_l =
if staked_nodes.read().unwrap().contains_key(&remote_addr.ip()) {
let (mut connection_table_l, stake) = {
let staked_nodes = staked_nodes.read().unwrap();
if let Some(stake) = staked_nodes.get(&remote_addr.ip()) {
let stake = *stake;
drop(staked_nodes);
let mut connection_table_l =
staked_connection_table.lock().unwrap();
let num_pruned =
connection_table_l.prune_oldest(max_staked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection_table_l
(connection_table_l, stake)
} else {
drop(staked_nodes);
let mut connection_table_l = connection_table.lock().unwrap();
let num_pruned =
connection_table_l.prune_oldest(max_unstaked_connections);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
connection_table_l
};
(connection_table_l, 0)
}
};
if let Some((last_update, stream_exit)) = connection_table_l
.try_add_connection(
@ -570,6 +579,7 @@ pub fn spawn_server(
connection_table1,
stream_exit,
stats,
stake,
);
} else {
stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);