Fixing the tests for send and recv large and small accounts

This commit is contained in:
godmodegalactus 2024-05-21 10:18:12 +02:00
parent 9c36f14cfc
commit 7d276c2ed7
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
5 changed files with 315 additions and 150 deletions

View File

@ -1,5 +1,3 @@
use boring::ssl::SslMethod;
use crate::quic::configure_server::ALPN_GEYSER_PROTOCOL_ID;
use super::configure_server::MAX_DATAGRAM_SIZE;
@ -10,16 +8,23 @@ pub const DEFAULT_MAX_TRANSACTION_STREAMS: u32 = 32;
pub const DEFAULT_MAX_ACCOUNT_STREAMS: u32 =
DEFAULT_MAX_STREAMS - DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS - DEFAULT_MAX_TRANSACTION_STREAMS;
pub fn configure_client(maximum_concurrent_streams: u32) -> anyhow::Result<quiche::Config> {
pub fn configure_client(
maximum_concurrent_streams: u32,
recieve_window_size: u64,
timeout_in_seconds: u64,
) -> anyhow::Result<quiche::Config> {
let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
config
.set_application_protos(&[ALPN_GEYSER_PROTOCOL_ID])
.unwrap();
config.set_max_idle_timeout(5000);
config.set_max_idle_timeout(timeout_in_seconds * 1000);
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_initial_max_data(10_000_000);
config.set_initial_max_stream_data_bidi_local(10_000_000);
config.set_initial_max_stream_data_bidi_remote(10_000_000);
config.set_initial_max_stream_data_uni(10_000_000);
config.set_initial_max_data(recieve_window_size);
config.set_initial_max_stream_data_bidi_local(recieve_window_size);
config.set_initial_max_stream_data_bidi_remote(recieve_window_size);
config.set_initial_max_stream_data_uni(recieve_window_size);
config.set_initial_max_streams_bidi(maximum_concurrent_streams as u64);
config.set_initial_max_streams_uni(maximum_concurrent_streams as u64);
config.set_disable_active_migration(true);

View File

@ -1,11 +1,11 @@
use boring::ssl::SslMethod;
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"quic_geyser_plugin";
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser";
pub const MAX_DATAGRAM_SIZE: usize = 1350;
pub fn configure_server(
max_concurrent_streams: u32,
recieve_window_size: u32,
recieve_window_size: u64,
connection_timeout: u64,
) -> anyhow::Result<quiche::Config> {
let cert = rcgen::generate_simple_self_signed(vec!["quic_geyser".into()]).unwrap();
@ -20,15 +20,19 @@ pub fn configure_server(
let mut config =
quiche::Config::with_boring_ssl_ctx_builder(quiche::PROTOCOL_VERSION, boring_ssl_context)
.unwrap();
.expect("Should create config struct");
config
.set_application_protos(&[ALPN_GEYSER_PROTOCOL_ID])
.unwrap();
config.set_max_idle_timeout(connection_timeout * 1000);
config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE);
config.set_initial_max_data(10_000_000);
config.set_initial_max_stream_data_bidi_local(10_000_000);
config.set_initial_max_stream_data_bidi_remote(10_000_000);
config.set_initial_max_stream_data_uni(10_000_000);
config.set_initial_max_data(recieve_window_size);
config.set_initial_max_stream_data_bidi_local(recieve_window_size);
config.set_initial_max_stream_data_bidi_remote(recieve_window_size);
config.set_initial_max_stream_data_uni(recieve_window_size);
config.set_initial_max_streams_bidi(max_concurrent_streams as u64);
config.set_initial_max_streams_uni(max_concurrent_streams as u64);
config.set_disable_active_migration(true);

View File

@ -4,4 +4,5 @@ pub mod connection_manager;
pub mod quic_server;
pub mod quiche_reciever;
pub mod quiche_sender;
pub mod quiche_utils;
pub mod skip_verification;

View File

@ -37,6 +37,7 @@ mod tests {
time::Duration,
};
use itertools::Itertools;
use quiche::ConnectionId;
use ring::rand::{SecureRandom, SystemRandom};
use std::net::UdpSocket;
@ -48,69 +49,21 @@ mod tests {
configure_server::{configure_server, MAX_DATAGRAM_SIZE},
quiche_reciever::recv_message,
quiche_sender::send_message,
quiche_utils::{mint_token, validate_token},
},
types::account::Account,
};
fn validate_token<'a>(
src: &std::net::SocketAddr,
token: &'a [u8],
) -> Option<quiche::ConnectionId<'a>> {
if token.len() < 6 {
return None;
}
if &token[..6] != b"quiche" {
return None;
}
let token = &token[6..];
let addr = match src.ip() {
std::net::IpAddr::V4(a) => a.octets().to_vec(),
std::net::IpAddr::V6(a) => a.octets().to_vec(),
};
if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() {
return None;
}
Some(quiche::ConnectionId::from_ref(&token[addr.len()..]))
}
fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec<u8> {
let mut token = Vec::new();
token.extend_from_slice(b"quiche");
let addr = match src.ip() {
std::net::IpAddr::V4(a) => a.octets().to_vec(),
std::net::IpAddr::V6(a) => a.octets().to_vec(),
};
token.extend_from_slice(&addr);
token.extend_from_slice(&hdr.dcid);
token
}
#[test]
fn test_send_and_recieve_of_small_account() {
let mut config = configure_server(1, 100000, 1).unwrap();
// Setup the event loop.
let mut poll = mio::Poll::new().unwrap();
let mut events = mio::Events::with_capacity(1024);
let socket_addr = SocketAddr::from_str("0.0.0.0:0").unwrap();
let mut socket = UdpSocket::bind(socket_addr).unwrap();
// poll.registry()
// .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
// .unwrap();
let socket = UdpSocket::bind(socket_addr).unwrap();
let port = socket.local_addr().unwrap().port();
let local_addr = socket.local_addr().unwrap();
let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE];
let account = Account::get_account_for_test(123456, 2);
let message = Message::AccountMsg(account);
@ -119,16 +72,11 @@ mod tests {
let message_to_send = message.clone();
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
std::thread::spawn(move || {
let mut client_config = configure_client(1).unwrap();
let mut client_config = configure_client(1, 1000, 10).unwrap();
// Setup the event loop.
let mut poll = mio::Poll::new().unwrap();
let mut events = mio::Events::with_capacity(1024);
let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
let mut socket = std::net::UdpSocket::bind(socket_addr).unwrap();
// poll.registry()
// .register(&mut socket, mio::Token(0), mio::Interest::READABLE)
// .unwrap();
let socket = std::net::UdpSocket::bind(socket_addr).unwrap();
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
SystemRandom::new().fill(&mut scid[..]).unwrap();
@ -141,7 +89,7 @@ mod tests {
let mut conn =
quiche::connect(None, &scid, local_addr, server_addr, &mut client_config)
.unwrap();
//let mut out = [0; MAX_DATAGRAM_SIZE];
let mut out = [0; MAX_DATAGRAM_SIZE];
println!("sending message");
let (write, send_info) = conn.send(&mut out).expect("initial send failed");
@ -154,104 +102,270 @@ mod tests {
})
};
let (len, from) = match socket.recv_from(&mut buf) {
Ok(v) => v,
Err(e) => {
panic!("recv() failed: {:?}", e);
loop {
let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE];
let (len, from) = match socket.recv_from(&mut buf) {
Ok(v) => v,
Err(e) => {
panic!("recv() failed: {:?}", e);
}
};
println!("recieved first packet");
log::debug!("got {} bytes", len);
let pkt_buf = &mut buf[..len];
// Parse the QUIC packet's header.
let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) {
Ok(header) => header,
Err(e) => {
panic!("Parsing packet header failed: {:?}", e);
}
};
let rng = SystemRandom::new();
let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap();
let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid);
let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN];
let conn_id: ConnectionId<'static> = conn_id.to_vec().into();
if hdr.ty != quiche::Type::Initial {
panic!("Packet is not Initial");
}
if !quiche::version_is_supported(hdr.version) {
log::warn!("Doing version negotiation");
let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap();
let out = &out[..len];
if let Err(e) = socket.send_to(out, from) {
panic!("send() failed: {:?}", e);
}
}
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
scid.copy_from_slice(&conn_id);
let scid = quiche::ConnectionId::from_ref(&scid);
// Token is always present in Initial packets.
let token = hdr.token.as_ref().unwrap();
println!("token: {}", token.iter().map(|x| x.to_string()).join(", "));
// Do stateless retry if the client didn't send a token.
if token.is_empty() {
log::warn!("Doing stateless retry");
let new_token = mint_token(&hdr, &from);
let len = quiche::retry(
&hdr.scid,
&hdr.dcid,
&scid,
&new_token,
hdr.version,
&mut out,
)
.unwrap();
let out = &out[..len];
if let Err(e) = socket.send_to(out, from) {
panic!("send() failed: {:?}", e);
} else {
break;
}
}
let odcid = validate_token(&from, token);
// The token was not valid, meaning the retry failed, so
// drop the packet.
if odcid.is_none() {
panic!("Invalid address validation token");
}
if scid.len() != hdr.dcid.len() {
panic!("Invalid destination connection ID");
}
// Reuse the source connection ID we sent in the Retry packet,
// instead of changing it again.
let scid = hdr.dcid.clone();
log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid);
let mut conn =
quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config).unwrap();
let recvd_message = recv_message(&mut conn, 0).unwrap();
assert_eq!(recvd_message, message);
std::thread::sleep(Duration::from_secs(1));
assert_eq!(conn.is_closed(), true);
jh.join().unwrap();
break;
}
}
#[test]
fn test_send_and_recieve_of_large_account() {
let mut config = configure_server(1, 100000, 1).unwrap();
// Setup the event loop.
let socket_addr = SocketAddr::from_str("0.0.0.0:0").unwrap();
let socket = UdpSocket::bind(socket_addr).unwrap();
let port = socket.local_addr().unwrap().port();
let local_addr = socket.local_addr().unwrap();
let account = Account::get_account_for_test(123456, 10_000_000);
let message = Message::AccountMsg(account);
let jh = {
let message = message.clone();
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
std::thread::spawn(move || {
let mut client_config = configure_client(1, 12_000_000, 10).unwrap();
// Setup the event loop.
let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
let socket = std::net::UdpSocket::bind(socket_addr).unwrap();
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
SystemRandom::new().fill(&mut scid[..]).unwrap();
let scid = quiche::ConnectionId::from_ref(&scid);
// Get local address.
let local_addr = socket.local_addr().unwrap();
println!("connecting");
let mut conn =
quiche::connect(None, &scid, local_addr, server_addr, &mut client_config)
.unwrap();
let mut out = [0; MAX_DATAGRAM_SIZE];
println!("sending message");
let (write, send_info) = conn.send(&mut out).expect("initial send failed");
while let Err(e) = socket.send_to(&out[..write], send_info.to) {
panic!("send() failed: {:?}", e);
}
let recvd_message = recv_message(&mut conn, 0).unwrap();
assert_eq!(recvd_message, message);
std::thread::sleep(Duration::from_secs(1));
assert_eq!(conn.is_closed(), true);
})
};
println!("recieved first packet");
log::debug!("got {} bytes", len);
loop {
let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE];
let pkt_buf = &mut buf[..len];
let (len, from) = match socket.recv_from(&mut buf) {
Ok(v) => v,
Err(e) => {
panic!("recv() failed: {:?}", e);
}
};
println!("recieved first packet");
// Parse the QUIC packet's header.
let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) {
Ok(v) => v,
log::debug!("got {} bytes", len);
Err(e) => {
panic!("Parsing packet header failed: {:?}", e);
let pkt_buf = &mut buf[..len];
// Parse the QUIC packet's header.
let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) {
Ok(header) => header,
Err(e) => {
panic!("Parsing packet header failed: {:?}", e);
}
};
let rng = SystemRandom::new();
let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap();
let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid);
let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN];
let conn_id: ConnectionId<'static> = conn_id.to_vec().into();
if hdr.ty != quiche::Type::Initial {
panic!("Packet is not Initial");
}
};
let rng = SystemRandom::new();
let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap();
let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid);
let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN];
let conn_id: ConnectionId<'static> = conn_id.to_vec().into();
if hdr.ty != quiche::Type::Initial {
panic!("Packet is not Initial");
}
if !quiche::version_is_supported(hdr.version) {
log::warn!("Doing version negotiation");
if !quiche::version_is_supported(hdr.version) {
log::warn!("Doing version negotiation");
let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap();
let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap();
let out = &out[..len];
let out = &out[..len];
if let Err(e) = socket.send_to(out, from) {
panic!("send() failed: {:?}", e);
if let Err(e) = socket.send_to(out, from) {
panic!("send() failed: {:?}", e);
}
}
}
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
scid.copy_from_slice(&conn_id);
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
scid.copy_from_slice(&conn_id);
let scid = quiche::ConnectionId::from_ref(&scid);
let scid = quiche::ConnectionId::from_ref(&scid);
// Token is always present in Initial packets.
let token = hdr.token.as_ref().unwrap();
// Token is always present in Initial packets.
let token = hdr.token.as_ref().unwrap();
// Do stateless retry if the client didn't send a token.
if token.is_empty() {
log::warn!("Doing stateless retry");
println!("token: {}", token.iter().map(|x| x.to_string()).join(", "));
let new_token = mint_token(&hdr, &from);
// Do stateless retry if the client didn't send a token.
if token.is_empty() {
log::warn!("Doing stateless retry");
let len = quiche::retry(
&hdr.scid,
&hdr.dcid,
&scid,
&new_token,
hdr.version,
&mut out,
)
.unwrap();
let new_token = mint_token(&hdr, &from);
let out = &out[..len];
let len = quiche::retry(
&hdr.scid,
&hdr.dcid,
&scid,
&new_token,
hdr.version,
&mut out,
)
.unwrap();
if let Err(e) = socket.send_to(out, from) {
panic!("send() failed: {:?}", e);
let out = &out[..len];
if let Err(e) = socket.send_to(out, from) {
panic!("send() failed: {:?}", e);
} else {
break;
}
}
let odcid = validate_token(&from, token);
// The token was not valid, meaning the retry failed, so
// drop the packet.
if odcid.is_none() {
panic!("Invalid address validation token");
}
if scid.len() != hdr.dcid.len() {
panic!("Invalid destination connection ID");
}
// Reuse the source connection ID we sent in the Retry packet,
// instead of changing it again.
let scid = hdr.dcid.clone();
log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid);
let mut conn =
quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config).unwrap();
send_message(&mut conn, 0, &message).unwrap();
conn.close(true, 0, b"not required").unwrap();
jh.join().unwrap();
break;
}
let odcid = validate_token(&from, token);
// The token was not valid, meaning the retry failed, so
// drop the packet.
if odcid.is_none() {
panic!("Invalid address validation token");
}
if scid.len() != hdr.dcid.len() {
panic!("Invalid destination connection ID");
}
// Reuse the source connection ID we sent in the Retry packet,
// instead of changing it again.
let scid = hdr.dcid.clone();
log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid);
let mut conn =
quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config).unwrap();
let recvd_message = recv_message(&mut conn, 0).unwrap();
assert_eq!(recvd_message, message);
std::thread::sleep(Duration::from_secs(1));
assert_eq!(conn.is_closed(), true);
jh.join().unwrap();
}
}

View File

@ -0,0 +1,41 @@
pub fn validate_token<'a>(
src: &std::net::SocketAddr,
token: &'a [u8],
) -> Option<quiche::ConnectionId<'a>> {
if token.len() < 6 {
return None;
}
if &token[..6] != b"quiche" {
return None;
}
let token = &token[6..];
let addr = match src.ip() {
std::net::IpAddr::V4(a) => a.octets().to_vec(),
std::net::IpAddr::V6(a) => a.octets().to_vec(),
};
if token.len() < addr.len() || &token[..addr.len()] != addr.as_slice() {
return None;
}
Some(quiche::ConnectionId::from_ref(&token[addr.len()..]))
}
pub fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec<u8> {
let mut token = Vec::new();
token.extend_from_slice(b"quiche");
let addr = match src.ip() {
std::net::IpAddr::V4(a) => a.octets().to_vec(),
std::net::IpAddr::V6(a) => a.octets().to_vec(),
};
token.extend_from_slice(&addr);
token.extend_from_slice(&hdr.dcid);
token
}