creating more scid if needed by quic

This commit is contained in:
godmodegalactus 2024-06-19 11:18:00 +02:00
parent 70dfef9a2d
commit c7ca3b85c2
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
2 changed files with 134 additions and 19 deletions

View File

@ -1,5 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use ring::rand::SecureRandom;
pub fn validate_token<'a>( pub fn validate_token<'a>(
src: &std::net::SocketAddr, src: &std::net::SocketAddr,
token: &'a [u8], token: &'a [u8],
@ -83,6 +85,83 @@ pub fn get_next_unidi(
} }
} }
pub fn handle_path_events(conn: &mut quiche::Connection) {
while let Some(qe) = conn.path_event_next() {
match qe {
quiche::PathEvent::New(local_addr, peer_addr) => {
log::info!(
"{} Seen new path ({}, {})",
conn.trace_id(),
local_addr,
peer_addr
);
// Directly probe the new path.
conn.probe_path(local_addr, peer_addr)
.expect("cannot probe");
}
quiche::PathEvent::Validated(local_addr, peer_addr) => {
log::info!(
"{} Path ({}, {}) is now validated",
conn.trace_id(),
local_addr,
peer_addr
);
}
quiche::PathEvent::FailedValidation(local_addr, peer_addr) => {
log::info!(
"{} Path ({}, {}) failed validation",
conn.trace_id(),
local_addr,
peer_addr
);
}
quiche::PathEvent::Closed(local_addr, peer_addr) => {
log::info!(
"{} Path ({}, {}) is now closed and unusable",
conn.trace_id(),
local_addr,
peer_addr
);
}
quiche::PathEvent::ReusedSourceConnectionId(cid_seq, old, new) => {
log::info!(
"{} Peer reused cid seq {} (initially {:?}) on {:?}",
conn.trace_id(),
cid_seq,
old,
new
);
}
quiche::PathEvent::PeerMigrated(local_addr, peer_addr) => {
log::info!(
"{} Connection migrated to ({}, {})",
conn.trace_id(),
local_addr,
peer_addr
);
}
}
}
}
pub fn generate_cid_and_reset_token<T: SecureRandom>(
rng: &T,
) -> (quiche::ConnectionId<'static>, u128) {
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
rng.fill(&mut scid).unwrap();
let scid = scid.to_vec().into();
let mut reset_token = [0; 16];
rng.fill(&mut reset_token).unwrap();
let reset_token = u128::from_be_bytes(reset_token);
(scid, reset_token)
}
pub struct PartialResponse { pub struct PartialResponse {
pub binary: Vec<u8>, pub binary: Vec<u8>,
pub written: usize, pub written: usize,

View File

@ -27,7 +27,10 @@ use quic_geyser_common::{
use quic_geyser_quiche_utils::{ use quic_geyser_quiche_utils::{
quiche_reciever::{recv_message, ReadStreams}, quiche_reciever::{recv_message, ReadStreams},
quiche_sender::{handle_writable, send_message}, quiche_sender::{handle_writable, send_message},
quiche_utils::{get_next_unidi, mint_token, validate_token, PartialResponses}, quiche_utils::{
generate_cid_and_reset_token, get_next_unidi, handle_path_events, mint_token,
validate_token, PartialResponses,
},
}; };
use crate::configure_server::configure_server; use crate::configure_server::configure_server;
@ -66,19 +69,15 @@ pub fn server_loop(
let local_addr = socket.local_addr()?; let local_addr = socket.local_addr()?;
let rng = SystemRandom::new(); let rng = SystemRandom::new();
let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap();
let mut clients: HashMap< let mut client_messsage_channel_by_id: HashMap<
quiche::ConnectionId<'static>, u64,
mio_channel::Sender<(quiche::RecvInfo, Vec<u8>)>, mio_channel::Sender<(quiche::RecvInfo, Vec<u8>)>,
> = HashMap::new(); > = HashMap::new();
let clients_by_id: Arc<Mutex<HashMap<ConnectionId<'static>, u64>>> =
Arc::new(Mutex::new(HashMap::new()));
let (write_sender, write_reciver) = std::sync::mpsc::channel::<(quiche::SendInfo, Vec<u8>)>(); let (write_sender, write_reciver) = std::sync::mpsc::channel::<(quiche::SendInfo, Vec<u8>)>();
// poll.registry().register(
// &mut write_reciver,
// mio::Token(1),
// mio::Interest::READABLE,
// )?;
let enable_pacing = if quic_params.enable_pacing { let enable_pacing = if quic_params.enable_pacing {
set_txtime_sockopt(&socket).is_ok() set_txtime_sockopt(&socket).is_ok()
} else { } else {
@ -95,6 +94,7 @@ pub fn server_loop(
dispatching_connections.clone(), dispatching_connections.clone(),
compression_type, compression_type,
); );
let mut client_id_counter = 0;
loop { loop {
poll.poll(&mut events, Some(Duration::from_millis(10)))?; poll.poll(&mut events, Some(Duration::from_millis(10)))?;
@ -125,7 +125,8 @@ pub fn server_loop(
let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid);
let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN];
let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); let conn_id: ConnectionId<'static> = conn_id.to_vec().into();
if !clients.contains_key(&hdr.dcid) && !clients.contains_key(&conn_id) { let mut clients_lk = clients_by_id.lock().unwrap();
if !clients_lk.contains_key(&hdr.dcid) && !clients_lk.contains_key(&conn_id) {
if hdr.ty != quiche::Type::Initial { if hdr.ty != quiche::Type::Initial {
log::error!("Packet is not Initial"); log::error!("Packet is not Initial");
continue 'read; continue 'read;
@ -211,10 +212,14 @@ pub fn server_loop(
let (client_sender, client_reciver) = mio_channel::channel(); let (client_sender, client_reciver) = mio_channel::channel();
let (client_message_sx, client_message_rx) = mpsc::channel(); let (client_message_sx, client_message_rx) = mpsc::channel();
let messages_in_queue = Arc::new(AtomicUsize::new(0)); let messages_in_queue = Arc::new(AtomicUsize::new(0));
let current_client_id = client_id_counter;
client_id_counter += 1;
let filters = Arc::new(RwLock::new(Vec::new())); let filters = Arc::new(RwLock::new(Vec::new()));
create_client_task( create_client_task(
conn, conn,
current_client_id,
clients_by_id.clone(),
client_reciver, client_reciver,
write_sender.clone(), write_sender.clone(),
client_message_rx, client_message_rx,
@ -223,6 +228,7 @@ pub fn server_loop(
stop_laggy_client, stop_laggy_client,
messages_in_queue.clone(), messages_in_queue.clone(),
quic_params.incremental_priority, quic_params.incremental_priority,
rng.clone(),
); );
let mut lk = dispatching_connections.lock().unwrap(); let mut lk = dispatching_connections.lock().unwrap();
lk.insert( lk.insert(
@ -233,12 +239,13 @@ pub fn server_loop(
messages_in_queue, messages_in_queue,
}, },
); );
clients.insert(scid, client_sender); clients_lk.insert(scid, current_client_id);
client_messsage_channel_by_id.insert(current_client_id, client_sender);
} else { } else {
// get the existing client // get the existing client
let client = match clients.get(&hdr.dcid) { let client_id = match clients_lk.get(&hdr.dcid) {
Some(v) => v, Some(v) => *v,
None => clients None => *clients_lk
.get(&conn_id) .get(&conn_id)
.expect("The client should exist in the map"), .expect("The client should exist in the map"),
}; };
@ -247,10 +254,18 @@ pub fn server_loop(
to: socket.local_addr().unwrap(), to: socket.local_addr().unwrap(),
from, from,
}; };
if client.send((recv_info, pkt_buf.to_vec())).is_err() { match client_messsage_channel_by_id.get_mut(&client_id) {
Some(channel) => {
if channel.send((recv_info, pkt_buf.to_vec())).is_err() {
// client is closed // client is closed
clients.remove(&hdr.dcid); clients_lk.remove(&hdr.dcid);
clients.remove(&conn_id); clients_lk.remove(&conn_id);
client_messsage_channel_by_id.remove(&client_id);
}
}
None => {
log::error!("channel with client id {client_id} not found");
}
} }
}; };
} }
@ -274,6 +289,8 @@ pub fn server_loop(
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn create_client_task( fn create_client_task(
connection: quiche::Connection, connection: quiche::Connection,
client_id: u64,
client_id_by_scid: Arc<Mutex<HashMap<ConnectionId<'static>, u64>>>,
mut receiver: mio_channel::Receiver<(quiche::RecvInfo, Vec<u8>)>, mut receiver: mio_channel::Receiver<(quiche::RecvInfo, Vec<u8>)>,
sender: mpsc::Sender<(quiche::SendInfo, Vec<u8>)>, sender: mpsc::Sender<(quiche::SendInfo, Vec<u8>)>,
message_channel: mpsc::Receiver<(Vec<u8>, u8)>, message_channel: mpsc::Receiver<(Vec<u8>, u8)>,
@ -282,6 +299,7 @@ fn create_client_task(
stop_laggy_client: bool, stop_laggy_client: bool,
messages_in_queue: Arc<AtomicUsize>, messages_in_queue: Arc<AtomicUsize>,
incremental_priority: bool, incremental_priority: bool,
rng: SystemRandom,
) { ) {
std::thread::spawn(move || { std::thread::spawn(move || {
let mut partial_responses = PartialResponses::new(); let mut partial_responses = PartialResponses::new();
@ -483,9 +501,27 @@ fn create_client_task(
} }
} }
if instance.elapsed() > Duration::from_secs(2) { if instance.elapsed() > Duration::from_secs(1) {
instance = Instant::now(); instance = Instant::now();
connection.on_timeout(); connection.on_timeout();
handle_path_events(&mut connection);
// See whether source Connection IDs have been retired.
while let Some(retired_scid) = connection.retired_scid_next() {
log::info!("Retiring source CID {:?}", retired_scid);
client_id_by_scid.lock().unwrap().remove(&retired_scid);
}
// Provides as many CIDs as possible.
while connection.scids_left() > 0 {
let (scid, reset_token) = generate_cid_and_reset_token(&rng);
log::info!("providing new scid {scid:?}");
if connection.new_scid(&scid, reset_token, false).is_err() {
break;
}
client_id_by_scid.lock().unwrap().insert(scid, client_id);
}
} }
loop { loop {