From c7ca3b85c23d1a4969307d4fc4d2d07545d4986b Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Wed, 19 Jun 2024 11:18:00 +0200 Subject: [PATCH] creating more scid if needed by quic --- quiche/src/quiche_utils.rs | 79 ++++++++++++++++++++++++++++++++ server/src/quiche_server_loop.rs | 74 ++++++++++++++++++++++-------- 2 files changed, 134 insertions(+), 19 deletions(-) diff --git a/quiche/src/quiche_utils.rs b/quiche/src/quiche_utils.rs index 270395c..93c426a 100644 --- a/quiche/src/quiche_utils.rs +++ b/quiche/src/quiche_utils.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use ring::rand::SecureRandom; + pub fn validate_token<'a>( src: &std::net::SocketAddr, token: &'a [u8], @@ -83,6 +85,83 @@ pub fn get_next_unidi( } } +pub fn handle_path_events(conn: &mut quiche::Connection) { + while let Some(qe) = conn.path_event_next() { + match qe { + quiche::PathEvent::New(local_addr, peer_addr) => { + log::info!( + "{} Seen new path ({}, {})", + conn.trace_id(), + local_addr, + peer_addr + ); + + // Directly probe the new path. + conn.probe_path(local_addr, peer_addr) + .expect("cannot probe"); + } + + quiche::PathEvent::Validated(local_addr, peer_addr) => { + log::info!( + "{} Path ({}, {}) is now validated", + conn.trace_id(), + local_addr, + peer_addr + ); + } + + quiche::PathEvent::FailedValidation(local_addr, peer_addr) => { + log::info!( + "{} Path ({}, {}) failed validation", + conn.trace_id(), + local_addr, + peer_addr + ); + } + + quiche::PathEvent::Closed(local_addr, peer_addr) => { + log::info!( + "{} Path ({}, {}) is now closed and unusable", + conn.trace_id(), + local_addr, + peer_addr + ); + } + + quiche::PathEvent::ReusedSourceConnectionId(cid_seq, old, new) => { + log::info!( + "{} Peer reused cid seq {} (initially {:?}) on {:?}", + conn.trace_id(), + cid_seq, + old, + new + ); + } + + quiche::PathEvent::PeerMigrated(local_addr, peer_addr) => { + log::info!( + "{} Connection migrated to ({}, {})", + conn.trace_id(), + local_addr, + peer_addr + ); + } + } + } +} + +pub fn generate_cid_and_reset_token( + rng: &T, +) -> (quiche::ConnectionId<'static>, u128) { + let mut scid = [0; quiche::MAX_CONN_ID_LEN]; + rng.fill(&mut scid).unwrap(); + let scid = scid.to_vec().into(); + let mut reset_token = [0; 16]; + rng.fill(&mut reset_token).unwrap(); + let reset_token = u128::from_be_bytes(reset_token); + (scid, reset_token) +} + pub struct PartialResponse { pub binary: Vec, pub written: usize, diff --git a/server/src/quiche_server_loop.rs b/server/src/quiche_server_loop.rs index c36d5d9..3cc622f 100644 --- a/server/src/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -27,7 +27,10 @@ use quic_geyser_common::{ use quic_geyser_quiche_utils::{ quiche_reciever::{recv_message, ReadStreams}, quiche_sender::{handle_writable, send_message}, - quiche_utils::{get_next_unidi, mint_token, validate_token, PartialResponses}, + quiche_utils::{ + generate_cid_and_reset_token, get_next_unidi, handle_path_events, mint_token, + validate_token, PartialResponses, + }, }; use crate::configure_server::configure_server; @@ -66,19 +69,15 @@ pub fn server_loop( let local_addr = socket.local_addr()?; let rng = SystemRandom::new(); let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); - let mut clients: HashMap< - quiche::ConnectionId<'static>, + let mut client_messsage_channel_by_id: HashMap< + u64, mio_channel::Sender<(quiche::RecvInfo, Vec)>, > = HashMap::new(); + let clients_by_id: Arc, u64>>> = + Arc::new(Mutex::new(HashMap::new())); let (write_sender, write_reciver) = std::sync::mpsc::channel::<(quiche::SendInfo, Vec)>(); - // poll.registry().register( - // &mut write_reciver, - // mio::Token(1), - // mio::Interest::READABLE, - // )?; - let enable_pacing = if quic_params.enable_pacing { set_txtime_sockopt(&socket).is_ok() } else { @@ -95,6 +94,7 @@ pub fn server_loop( dispatching_connections.clone(), compression_type, ); + let mut client_id_counter = 0; loop { poll.poll(&mut events, Some(Duration::from_millis(10)))?; @@ -125,7 +125,8 @@ pub fn server_loop( let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); - if !clients.contains_key(&hdr.dcid) && !clients.contains_key(&conn_id) { + let mut clients_lk = clients_by_id.lock().unwrap(); + if !clients_lk.contains_key(&hdr.dcid) && !clients_lk.contains_key(&conn_id) { if hdr.ty != quiche::Type::Initial { log::error!("Packet is not Initial"); continue 'read; @@ -211,10 +212,14 @@ pub fn server_loop( let (client_sender, client_reciver) = mio_channel::channel(); let (client_message_sx, client_message_rx) = mpsc::channel(); let messages_in_queue = Arc::new(AtomicUsize::new(0)); + let current_client_id = client_id_counter; + client_id_counter += 1; let filters = Arc::new(RwLock::new(Vec::new())); create_client_task( conn, + current_client_id, + clients_by_id.clone(), client_reciver, write_sender.clone(), client_message_rx, @@ -223,6 +228,7 @@ pub fn server_loop( stop_laggy_client, messages_in_queue.clone(), quic_params.incremental_priority, + rng.clone(), ); let mut lk = dispatching_connections.lock().unwrap(); lk.insert( @@ -233,12 +239,13 @@ pub fn server_loop( messages_in_queue, }, ); - clients.insert(scid, client_sender); + clients_lk.insert(scid, current_client_id); + client_messsage_channel_by_id.insert(current_client_id, client_sender); } else { // get the existing client - let client = match clients.get(&hdr.dcid) { - Some(v) => v, - None => clients + let client_id = match clients_lk.get(&hdr.dcid) { + Some(v) => *v, + None => *clients_lk .get(&conn_id) .expect("The client should exist in the map"), }; @@ -247,10 +254,18 @@ pub fn server_loop( to: socket.local_addr().unwrap(), from, }; - if client.send((recv_info, pkt_buf.to_vec())).is_err() { - // client is closed - clients.remove(&hdr.dcid); - clients.remove(&conn_id); + match client_messsage_channel_by_id.get_mut(&client_id) { + Some(channel) => { + if channel.send((recv_info, pkt_buf.to_vec())).is_err() { + // client is closed + clients_lk.remove(&hdr.dcid); + clients_lk.remove(&conn_id); + client_messsage_channel_by_id.remove(&client_id); + } + } + None => { + log::error!("channel with client id {client_id} not found"); + } } }; } @@ -274,6 +289,8 @@ pub fn server_loop( #[allow(clippy::too_many_arguments)] fn create_client_task( connection: quiche::Connection, + client_id: u64, + client_id_by_scid: Arc, u64>>>, mut receiver: mio_channel::Receiver<(quiche::RecvInfo, Vec)>, sender: mpsc::Sender<(quiche::SendInfo, Vec)>, message_channel: mpsc::Receiver<(Vec, u8)>, @@ -282,6 +299,7 @@ fn create_client_task( stop_laggy_client: bool, messages_in_queue: Arc, incremental_priority: bool, + rng: SystemRandom, ) { std::thread::spawn(move || { let mut partial_responses = PartialResponses::new(); @@ -483,9 +501,27 @@ fn create_client_task( } } - if instance.elapsed() > Duration::from_secs(2) { + if instance.elapsed() > Duration::from_secs(1) { instance = Instant::now(); connection.on_timeout(); + handle_path_events(&mut connection); + + // See whether source Connection IDs have been retired. + while let Some(retired_scid) = connection.retired_scid_next() { + log::info!("Retiring source CID {:?}", retired_scid); + client_id_by_scid.lock().unwrap().remove(&retired_scid); + } + + // Provides as many CIDs as possible. + while connection.scids_left() > 0 { + let (scid, reset_token) = generate_cid_and_reset_token(&rng); + + log::info!("providing new scid {scid:?}"); + if connection.new_scid(&scid, reset_token, false).is_err() { + break; + } + client_id_by_scid.lock().unwrap().insert(scid, client_id); + } } loop {