From 7d276c2ed77d158dda57eabac96e73d1f071012b Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 21 May 2024 10:18:12 +0200 Subject: [PATCH] Fixing the tests for send and recv large and small accounts --- common/src/quic/configure_client.rs | 21 +- common/src/quic/configure_server.rs | 18 +- common/src/quic/mod.rs | 1 + common/src/quic/quiche_reciever.rs | 384 ++++++++++++++++++---------- common/src/quic/quiche_utils.rs | 41 +++ 5 files changed, 315 insertions(+), 150 deletions(-) create mode 100644 common/src/quic/quiche_utils.rs diff --git a/common/src/quic/configure_client.rs b/common/src/quic/configure_client.rs index 78e7bf4..aaf4ca6 100644 --- a/common/src/quic/configure_client.rs +++ b/common/src/quic/configure_client.rs @@ -1,5 +1,3 @@ -use boring::ssl::SslMethod; - use crate::quic::configure_server::ALPN_GEYSER_PROTOCOL_ID; use super::configure_server::MAX_DATAGRAM_SIZE; @@ -10,16 +8,23 @@ pub const DEFAULT_MAX_TRANSACTION_STREAMS: u32 = 32; pub const DEFAULT_MAX_ACCOUNT_STREAMS: u32 = DEFAULT_MAX_STREAMS - DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS - DEFAULT_MAX_TRANSACTION_STREAMS; -pub fn configure_client(maximum_concurrent_streams: u32) -> anyhow::Result { +pub fn configure_client( + maximum_concurrent_streams: u32, + recieve_window_size: u64, + timeout_in_seconds: u64, +) -> anyhow::Result { let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap(); + config + .set_application_protos(&[ALPN_GEYSER_PROTOCOL_ID]) + .unwrap(); - config.set_max_idle_timeout(5000); + config.set_max_idle_timeout(timeout_in_seconds * 1000); config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE); - config.set_initial_max_data(10_000_000); - config.set_initial_max_stream_data_bidi_local(10_000_000); - config.set_initial_max_stream_data_bidi_remote(10_000_000); - config.set_initial_max_stream_data_uni(10_000_000); + config.set_initial_max_data(recieve_window_size); + config.set_initial_max_stream_data_bidi_local(recieve_window_size); + config.set_initial_max_stream_data_bidi_remote(recieve_window_size); + config.set_initial_max_stream_data_uni(recieve_window_size); config.set_initial_max_streams_bidi(maximum_concurrent_streams as u64); config.set_initial_max_streams_uni(maximum_concurrent_streams as u64); config.set_disable_active_migration(true); diff --git a/common/src/quic/configure_server.rs b/common/src/quic/configure_server.rs index 485e05e..e99f741 100644 --- a/common/src/quic/configure_server.rs +++ b/common/src/quic/configure_server.rs @@ -1,11 +1,11 @@ use boring::ssl::SslMethod; -pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"quic_geyser_plugin"; +pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser"; pub const MAX_DATAGRAM_SIZE: usize = 1350; pub fn configure_server( max_concurrent_streams: u32, - recieve_window_size: u32, + recieve_window_size: u64, connection_timeout: u64, ) -> anyhow::Result { let cert = rcgen::generate_simple_self_signed(vec!["quic_geyser".into()]).unwrap(); @@ -20,15 +20,19 @@ pub fn configure_server( let mut config = quiche::Config::with_boring_ssl_ctx_builder(quiche::PROTOCOL_VERSION, boring_ssl_context) - .unwrap(); + .expect("Should create config struct"); + + config + .set_application_protos(&[ALPN_GEYSER_PROTOCOL_ID]) + .unwrap(); config.set_max_idle_timeout(connection_timeout * 1000); config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE); - config.set_initial_max_data(10_000_000); - config.set_initial_max_stream_data_bidi_local(10_000_000); - config.set_initial_max_stream_data_bidi_remote(10_000_000); - config.set_initial_max_stream_data_uni(10_000_000); + config.set_initial_max_data(recieve_window_size); + config.set_initial_max_stream_data_bidi_local(recieve_window_size); + config.set_initial_max_stream_data_bidi_remote(recieve_window_size); + config.set_initial_max_stream_data_uni(recieve_window_size); config.set_initial_max_streams_bidi(max_concurrent_streams as u64); config.set_initial_max_streams_uni(max_concurrent_streams as u64); config.set_disable_active_migration(true); diff --git a/common/src/quic/mod.rs b/common/src/quic/mod.rs index fe7143d..96d0ef2 100644 --- a/common/src/quic/mod.rs +++ b/common/src/quic/mod.rs @@ -4,4 +4,5 @@ pub mod connection_manager; pub mod quic_server; pub mod quiche_reciever; pub mod quiche_sender; +pub mod quiche_utils; pub mod skip_verification; diff --git a/common/src/quic/quiche_reciever.rs b/common/src/quic/quiche_reciever.rs index fa5aded..c1d3f98 100644 --- a/common/src/quic/quiche_reciever.rs +++ b/common/src/quic/quiche_reciever.rs @@ -37,6 +37,7 @@ mod tests { time::Duration, }; + use itertools::Itertools; use quiche::ConnectionId; use ring::rand::{SecureRandom, SystemRandom}; use std::net::UdpSocket; @@ -48,69 +49,21 @@ mod tests { configure_server::{configure_server, MAX_DATAGRAM_SIZE}, quiche_reciever::recv_message, quiche_sender::send_message, + quiche_utils::{mint_token, validate_token}, }, types::account::Account, }; - fn validate_token<'a>( - src: &std::net::SocketAddr, - token: &'a [u8], - ) -> Option> { - if token.len() < 6 { - return None; - } - - if &token[..6] != b"quiche" { - return None; - } - - let token = &token[6..]; - - let addr = match src.ip() { - std::net::IpAddr::V4(a) => a.octets().to_vec(), - std::net::IpAddr::V6(a) => a.octets().to_vec(), - }; - - if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() { - return None; - } - - Some(quiche::ConnectionId::from_ref(&token[addr.len()..])) - } - - fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec { - let mut token = Vec::new(); - - token.extend_from_slice(b"quiche"); - - let addr = match src.ip() { - std::net::IpAddr::V4(a) => a.octets().to_vec(), - std::net::IpAddr::V6(a) => a.octets().to_vec(), - }; - - token.extend_from_slice(&addr); - token.extend_from_slice(&hdr.dcid); - - token - } - #[test] fn test_send_and_recieve_of_small_account() { let mut config = configure_server(1, 100000, 1).unwrap(); // Setup the event loop. - let mut poll = mio::Poll::new().unwrap(); - let mut events = mio::Events::with_capacity(1024); let socket_addr = SocketAddr::from_str("0.0.0.0:0").unwrap(); - let mut socket = UdpSocket::bind(socket_addr).unwrap(); - // poll.registry() - // .register(&mut socket, mio::Token(0), mio::Interest::READABLE) - // .unwrap(); + let socket = UdpSocket::bind(socket_addr).unwrap(); let port = socket.local_addr().unwrap().port(); let local_addr = socket.local_addr().unwrap(); - let mut buf = [0; 65535]; - let mut out = [0; MAX_DATAGRAM_SIZE]; let account = Account::get_account_for_test(123456, 2); let message = Message::AccountMsg(account); @@ -119,16 +72,11 @@ mod tests { let message_to_send = message.clone(); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); std::thread::spawn(move || { - let mut client_config = configure_client(1).unwrap(); + let mut client_config = configure_client(1, 1000, 10).unwrap(); // Setup the event loop. - let mut poll = mio::Poll::new().unwrap(); - let mut events = mio::Events::with_capacity(1024); let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); - let mut socket = std::net::UdpSocket::bind(socket_addr).unwrap(); - // poll.registry() - // .register(&mut socket, mio::Token(0), mio::Interest::READABLE) - // .unwrap(); + let socket = std::net::UdpSocket::bind(socket_addr).unwrap(); let mut scid = [0; quiche::MAX_CONN_ID_LEN]; SystemRandom::new().fill(&mut scid[..]).unwrap(); @@ -141,7 +89,7 @@ mod tests { let mut conn = quiche::connect(None, &scid, local_addr, server_addr, &mut client_config) .unwrap(); - //let mut out = [0; MAX_DATAGRAM_SIZE]; + let mut out = [0; MAX_DATAGRAM_SIZE]; println!("sending message"); let (write, send_info) = conn.send(&mut out).expect("initial send failed"); @@ -154,104 +102,270 @@ mod tests { }) }; - let (len, from) = match socket.recv_from(&mut buf) { - Ok(v) => v, - Err(e) => { - panic!("recv() failed: {:?}", e); + loop { + let mut buf = [0; 65535]; + let mut out = [0; MAX_DATAGRAM_SIZE]; + + let (len, from) = match socket.recv_from(&mut buf) { + Ok(v) => v, + Err(e) => { + panic!("recv() failed: {:?}", e); + } + }; + println!("recieved first packet"); + + log::debug!("got {} bytes", len); + + let pkt_buf = &mut buf[..len]; + + // Parse the QUIC packet's header. + let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) { + Ok(header) => header, + + Err(e) => { + panic!("Parsing packet header failed: {:?}", e); + } + }; + let rng = SystemRandom::new(); + let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); + let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); + let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; + let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); + + if hdr.ty != quiche::Type::Initial { + panic!("Packet is not Initial"); } + + if !quiche::version_is_supported(hdr.version) { + log::warn!("Doing version negotiation"); + + let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap(); + + let out = &out[..len]; + + if let Err(e) = socket.send_to(out, from) { + panic!("send() failed: {:?}", e); + } + } + + let mut scid = [0; quiche::MAX_CONN_ID_LEN]; + scid.copy_from_slice(&conn_id); + + let scid = quiche::ConnectionId::from_ref(&scid); + + // Token is always present in Initial packets. + let token = hdr.token.as_ref().unwrap(); + + println!("token: {}", token.iter().map(|x| x.to_string()).join(", ")); + + // Do stateless retry if the client didn't send a token. + if token.is_empty() { + log::warn!("Doing stateless retry"); + + let new_token = mint_token(&hdr, &from); + + let len = quiche::retry( + &hdr.scid, + &hdr.dcid, + &scid, + &new_token, + hdr.version, + &mut out, + ) + .unwrap(); + + let out = &out[..len]; + + if let Err(e) = socket.send_to(out, from) { + panic!("send() failed: {:?}", e); + } else { + break; + } + } + let odcid = validate_token(&from, token); + // The token was not valid, meaning the retry failed, so + // drop the packet. + if odcid.is_none() { + panic!("Invalid address validation token"); + } + + if scid.len() != hdr.dcid.len() { + panic!("Invalid destination connection ID"); + } + + // Reuse the source connection ID we sent in the Retry packet, + // instead of changing it again. + let scid = hdr.dcid.clone(); + + log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); + + let mut conn = + quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config).unwrap(); + + let recvd_message = recv_message(&mut conn, 0).unwrap(); + assert_eq!(recvd_message, message); + std::thread::sleep(Duration::from_secs(1)); + assert_eq!(conn.is_closed(), true); + jh.join().unwrap(); + break; + } + } + + #[test] + fn test_send_and_recieve_of_large_account() { + let mut config = configure_server(1, 100000, 1).unwrap(); + + // Setup the event loop. + let socket_addr = SocketAddr::from_str("0.0.0.0:0").unwrap(); + let socket = UdpSocket::bind(socket_addr).unwrap(); + + let port = socket.local_addr().unwrap().port(); + let local_addr = socket.local_addr().unwrap(); + + let account = Account::get_account_for_test(123456, 10_000_000); + let message = Message::AccountMsg(account); + + let jh = { + let message = message.clone(); + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + std::thread::spawn(move || { + let mut client_config = configure_client(1, 12_000_000, 10).unwrap(); + + // Setup the event loop. + let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + let socket = std::net::UdpSocket::bind(socket_addr).unwrap(); + + let mut scid = [0; quiche::MAX_CONN_ID_LEN]; + SystemRandom::new().fill(&mut scid[..]).unwrap(); + + let scid = quiche::ConnectionId::from_ref(&scid); + + // Get local address. + let local_addr = socket.local_addr().unwrap(); + println!("connecting"); + let mut conn = + quiche::connect(None, &scid, local_addr, server_addr, &mut client_config) + .unwrap(); + let mut out = [0; MAX_DATAGRAM_SIZE]; + println!("sending message"); + let (write, send_info) = conn.send(&mut out).expect("initial send failed"); + + while let Err(e) = socket.send_to(&out[..write], send_info.to) { + panic!("send() failed: {:?}", e); + } + + let recvd_message = recv_message(&mut conn, 0).unwrap(); + assert_eq!(recvd_message, message); + std::thread::sleep(Duration::from_secs(1)); + assert_eq!(conn.is_closed(), true); + }) }; - println!("recieved first packet"); - log::debug!("got {} bytes", len); + loop { + let mut buf = [0; 65535]; + let mut out = [0; MAX_DATAGRAM_SIZE]; - let pkt_buf = &mut buf[..len]; + let (len, from) = match socket.recv_from(&mut buf) { + Ok(v) => v, + Err(e) => { + panic!("recv() failed: {:?}", e); + } + }; + println!("recieved first packet"); - // Parse the QUIC packet's header. - let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) { - Ok(v) => v, + log::debug!("got {} bytes", len); - Err(e) => { - panic!("Parsing packet header failed: {:?}", e); + let pkt_buf = &mut buf[..len]; + + // Parse the QUIC packet's header. + let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) { + Ok(header) => header, + + Err(e) => { + panic!("Parsing packet header failed: {:?}", e); + } + }; + let rng = SystemRandom::new(); + let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); + let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); + let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; + let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); + + if hdr.ty != quiche::Type::Initial { + panic!("Packet is not Initial"); } - }; - let rng = SystemRandom::new(); - let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); - let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); - let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; - let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); - if hdr.ty != quiche::Type::Initial { - panic!("Packet is not Initial"); - } + if !quiche::version_is_supported(hdr.version) { + log::warn!("Doing version negotiation"); - if !quiche::version_is_supported(hdr.version) { - log::warn!("Doing version negotiation"); + let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap(); - let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap(); + let out = &out[..len]; - let out = &out[..len]; - - if let Err(e) = socket.send_to(out, from) { - panic!("send() failed: {:?}", e); + if let Err(e) = socket.send_to(out, from) { + panic!("send() failed: {:?}", e); + } } - } - let mut scid = [0; quiche::MAX_CONN_ID_LEN]; - scid.copy_from_slice(&conn_id); + let mut scid = [0; quiche::MAX_CONN_ID_LEN]; + scid.copy_from_slice(&conn_id); - let scid = quiche::ConnectionId::from_ref(&scid); + let scid = quiche::ConnectionId::from_ref(&scid); - // Token is always present in Initial packets. - let token = hdr.token.as_ref().unwrap(); + // Token is always present in Initial packets. + let token = hdr.token.as_ref().unwrap(); - // Do stateless retry if the client didn't send a token. - if token.is_empty() { - log::warn!("Doing stateless retry"); + println!("token: {}", token.iter().map(|x| x.to_string()).join(", ")); - let new_token = mint_token(&hdr, &from); + // Do stateless retry if the client didn't send a token. + if token.is_empty() { + log::warn!("Doing stateless retry"); - let len = quiche::retry( - &hdr.scid, - &hdr.dcid, - &scid, - &new_token, - hdr.version, - &mut out, - ) - .unwrap(); + let new_token = mint_token(&hdr, &from); - let out = &out[..len]; + let len = quiche::retry( + &hdr.scid, + &hdr.dcid, + &scid, + &new_token, + hdr.version, + &mut out, + ) + .unwrap(); - if let Err(e) = socket.send_to(out, from) { - panic!("send() failed: {:?}", e); + let out = &out[..len]; + + if let Err(e) = socket.send_to(out, from) { + panic!("send() failed: {:?}", e); + } else { + break; + } } + let odcid = validate_token(&from, token); + // The token was not valid, meaning the retry failed, so + // drop the packet. + if odcid.is_none() { + panic!("Invalid address validation token"); + } + + if scid.len() != hdr.dcid.len() { + panic!("Invalid destination connection ID"); + } + + // Reuse the source connection ID we sent in the Retry packet, + // instead of changing it again. + let scid = hdr.dcid.clone(); + + log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); + + let mut conn = + quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config).unwrap(); + + send_message(&mut conn, 0, &message).unwrap(); + conn.close(true, 0, b"not required").unwrap(); + + jh.join().unwrap(); + break; } - - let odcid = validate_token(&from, token); - - // The token was not valid, meaning the retry failed, so - // drop the packet. - if odcid.is_none() { - panic!("Invalid address validation token"); - } - - if scid.len() != hdr.dcid.len() { - panic!("Invalid destination connection ID"); - } - - // Reuse the source connection ID we sent in the Retry packet, - // instead of changing it again. - let scid = hdr.dcid.clone(); - - log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); - - let mut conn = - quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config).unwrap(); - - let recvd_message = recv_message(&mut conn, 0).unwrap(); - assert_eq!(recvd_message, message); - std::thread::sleep(Duration::from_secs(1)); - assert_eq!(conn.is_closed(), true); - jh.join().unwrap(); } } diff --git a/common/src/quic/quiche_utils.rs b/common/src/quic/quiche_utils.rs new file mode 100644 index 0000000..ac04918 --- /dev/null +++ b/common/src/quic/quiche_utils.rs @@ -0,0 +1,41 @@ +pub fn validate_token<'a>( + src: &std::net::SocketAddr, + token: &'a [u8], +) -> Option> { + if token.len() < 6 { + return None; + } + + if &token[..6] != b"quiche" { + return None; + } + + let token = &token[6..]; + + let addr = match src.ip() { + std::net::IpAddr::V4(a) => a.octets().to_vec(), + std::net::IpAddr::V6(a) => a.octets().to_vec(), + }; + + if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() { + return None; + } + + Some(quiche::ConnectionId::from_ref(&token[addr.len()..])) +} + +pub fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec { + let mut token = Vec::new(); + + token.extend_from_slice(b"quiche"); + + let addr = match src.ip() { + std::net::IpAddr::V4(a) => a.octets().to_vec(), + std::net::IpAddr::V6(a) => a.octets().to_vec(), + }; + + token.extend_from_slice(&addr); + token.extend_from_slice(&hdr.dcid); + + token +}