From b54e614de896672854a8e68c2f759bd5d097a6d0 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Wed, 22 May 2024 11:14:39 +0200 Subject: [PATCH] Solving issues with quiche server and client, making tests work for small transfers --- Cargo.lock | 69 +++++ Cargo.toml | 3 +- common/Cargo.toml | 4 +- common/src/channel_message.rs | 4 +- common/src/message.rs | 3 +- common/src/quic/quiche_client_loop.rs | 370 +++++++++++++------------- common/src/quic/quiche_reciever.rs | 8 +- common/src/quic/quiche_server_loop.rs | 39 ++- common/src/quic/quiche_utils.rs | 26 ++ common/src/types/account.rs | 9 +- examples/tester-client/src/main.rs | 2 +- 11 files changed, 333 insertions(+), 204 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65017b9..3ae0691 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2031,6 +2031,16 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.2.1" @@ -2239,6 +2249,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking_lot" version = "0.12.2" @@ -2464,6 +2480,7 @@ dependencies = [ "mio_channel", "pkcs8", "quiche", + "rand 0.8.5", "rcgen", "ring 0.17.8", "rustls", @@ -2472,6 +2489,7 @@ dependencies = [ "solana-transaction-status", "thiserror", "tokio", + "tracing-subscriber", ] [[package]] @@ -3025,6 +3043,15 @@ dependencies = [ "keccak", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3833,6 +3860,16 @@ dependencies = [ "syn 2.0.63", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.3.36" @@ -4030,6 +4067,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -4129,6 +4192,12 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "vergen" version = "8.3.1" diff --git a/Cargo.toml b/Cargo.toml index 50820d7..90d9c00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,13 +48,14 @@ async-stream = "0.3.5" mio = "0.8.11" mio_channel = "0.1.3" -quiche = "0.21.0" +quiche = "=0.21.0" boring = "4.6.0" ring = "0.17.8" cargo-lock = "9.0.0" git-version = "0.3.5" vergen = "8.2.1" +rand = "0.8.5" quic-geyser-common = {path = "common", version="0.1.0"} quic-geyser-client = {path = "client", version="0.1.0"} diff --git a/common/Cargo.toml b/common/Cargo.toml index 66ac805..e825f69 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -24,4 +24,6 @@ mio = { workspace = true, features = ["net", "os-poll"] } mio_channel = { workspace = true } ring = {workspace = true} -[dev-dependencies] \ No newline at end of file +[dev-dependencies] +rand = { workspace = true } +tracing-subscriber = { workspace = true } \ No newline at end of file diff --git a/common/src/channel_message.rs b/common/src/channel_message.rs index 9adb518..2ef7400 100644 --- a/common/src/channel_message.rs +++ b/common/src/channel_message.rs @@ -4,14 +4,14 @@ use solana_sdk::{ use crate::types::{block_meta::BlockMeta, transaction::Transaction}; -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct AccountData { pub pubkey: Pubkey, pub account: Account, pub write_version: u64, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum ChannelMessage { Account(AccountData, Slot, bool), Slot(u64, u64, CommitmentLevel), diff --git a/common/src/message.rs b/common/src/message.rs index f6d4d65..0e7a762 100644 --- a/common/src/message.rs +++ b/common/src/message.rs @@ -5,7 +5,6 @@ use crate::{ types::{ account::Account, block_meta::{BlockMeta, SlotMeta}, - connections_parameters::ConnectionParameters, transaction::Transaction, }, }; @@ -21,5 +20,5 @@ pub enum Message { BlockMetaMsg(BlockMeta), TransactionMsg(Box), Filters(Vec), // sent from client to server - ConnectionParameters(ConnectionParameters), + AddStream(u64), } diff --git a/common/src/quic/quiche_client_loop.rs b/common/src/quic/quiche_client_loop.rs index 615dfa7..5ca2b65 100644 --- a/common/src/quic/quiche_client_loop.rs +++ b/common/src/quic/quiche_client_loop.rs @@ -4,7 +4,7 @@ use crate::{ message::Message, quic::{ configure_server::MAX_DATAGRAM_SIZE, quiche_reciever::recv_message, - quiche_sender::send_message, + quiche_sender::send_message, quiche_utils::get_next_unidi, }, }; use anyhow::bail; @@ -49,7 +49,7 @@ pub fn client_loop( bail!("send() failed: {:?}", e); } - let mut stream_send_id = 0; + let mut current_stream_id = 3; let mut buf = [0; 65535]; let mut out = [0; MAX_DATAGRAM_SIZE]; @@ -66,8 +66,8 @@ pub fn client_loop( break; } - let network_updates = events.iter().any(|x| x.token().0 == 0); - let channel_updates = events.iter().any(|x| x.token().0 == 1); + let network_updates = true; + let channel_updates = true; if network_updates { 'read: loop { @@ -114,7 +114,7 @@ pub fn client_loop( let message = recv_message(&mut conn, stream); match message { Ok(message) => { - message_recv_queue.send(message)?; + message_recv_queue.send(message).unwrap(); } Err(e) => { log::error!("Error recieving message : {e}") @@ -122,13 +122,15 @@ pub fn client_loop( } } } + // chanel updates - if channel_updates { + if channel_updates && conn.is_established() { // channel events - let message_to_send = message_send_queue.try_recv()?; - stream_send_id += 1; - if let Err(e) = send_message(&mut conn, stream_send_id, &message_to_send) { - log::error!("Error sending message on stream : {}", e); + if let Ok(message_to_send) = message_send_queue.try_recv() { + current_stream_id = get_next_unidi(current_stream_id, false); + if let Err(e) = send_message(&mut conn, current_stream_id, &message_to_send) { + log::error!("Error sending message on stream : {}", e); + } } } @@ -171,198 +173,210 @@ mod tests { net::{IpAddr, Ipv4Addr, SocketAddr}, str::FromStr, sync::mpsc, + thread::sleep, + time::Duration, }; use itertools::Itertools; - use quiche::ConnectionId; - use ring::rand::SystemRandom; - use std::net::UdpSocket; + use solana_sdk::{account::Account, pubkey::Pubkey}; use crate::{ + channel_message::{AccountData, ChannelMessage}, + compression::CompressionType, + filters::Filter, message::Message, quic::{ - configure_client::configure_client, - configure_server::{configure_server, MAX_DATAGRAM_SIZE}, - quiche_reciever::recv_message, - quiche_sender::send_message, - quiche_utils::{mint_token, validate_token}, + configure_client::configure_client, configure_server::configure_server, + quiche_server_loop::server_loop, }, - types::{account::Account, block_meta::SlotMeta}, + types::block_meta::SlotMeta, }; use super::client_loop; #[test] fn test_send_and_recieve_of_large_account_with_client_loop() { - let mut config = configure_server(1, 100000, 1).unwrap(); - + tracing_subscriber::fmt::init(); // Setup the event loop. - let socket_addr = SocketAddr::from_str("0.0.0.0:0").unwrap(); - let socket = UdpSocket::bind(socket_addr).unwrap(); + let socket_addr = SocketAddr::from_str("0.0.0.0:10900").unwrap(); - let port = socket.local_addr().unwrap().port(); - let local_addr = socket.local_addr().unwrap(); + let port = 10900; - let account = Account::get_account_for_test(123456, 10_000_000); - let message_1 = Message::SlotMsg(SlotMeta { - slot: 1, - parent: 0, - commitment_level: solana_sdk::commitment_config::CommitmentLevel::Confirmed, - }); - let message_2 = Message::AccountMsg(account); - let message_3 = Message::SlotMsg(SlotMeta { - slot: 4, - parent: 3, - commitment_level: solana_sdk::commitment_config::CommitmentLevel::Processed, + let message_1 = ChannelMessage::Slot( + 3, + 2, + solana_sdk::commitment_config::CommitmentLevel::Confirmed, + ); + let message_2 = ChannelMessage::Account( + AccountData { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: 12345, + data: (0..100).map(|_| rand::random::()).collect_vec(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version: 1, + }, + 5, + false, + ); + + let message_3 = ChannelMessage::Account( + AccountData { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: 23456, + data: (0..10_000).map(|_| rand::random::()).collect_vec(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version: 1, + }, + 5, + false, + ); + + let message_4 = ChannelMessage::Account( + AccountData { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: 34567, + data: (0..1_000_000).map(|_| rand::random::()).collect_vec(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version: 1, + }, + 5, + false, + ); + + let message_5 = ChannelMessage::Account( + AccountData { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: 45678, + data: (0..10_000_000).map(|_| rand::random::()).collect_vec(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version: 1, + }, + 5, + false, + ); + + // server loop + let (server_send_queue, rx_sent_queue) = mio_channel::channel::(); + let _server_loop_jh = std::thread::spawn(move || { + let config = configure_server(100, 20_000_000, 1).unwrap(); + if let Err(e) = server_loop( + config, + socket_addr, + rx_sent_queue, + CompressionType::Lz4Fast(8), + ) { + println!("Server loop closed by error : {e}"); + } }); - let jh = { - let message_1 = message_1.clone(); - let message_2 = message_2.clone(); - let message_3 = message_3.clone(); - let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); - let (sx_sent_queue, rx_sent_queue) = mio_channel::channel(); - let (sx_recv_queue, rx_recv_queue) = mpsc::channel(); - std::thread::spawn(move || { - let jh = std::thread::spawn(move || { - let client_config = configure_client(1, 12_000_000, 10).unwrap(); - let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); - if let Err(e) = client_loop( - client_config, - socket_addr, - server_addr, - rx_sent_queue, - sx_recv_queue, - ) { - println!("client stopped with error {e}"); - } - }); - sx_sent_queue.send(message_1).unwrap(); - let rx_message = rx_recv_queue.recv().unwrap(); - assert_eq!(rx_message, message_2); - println!("verified second message"); - sx_sent_queue.send(message_3).unwrap(); - let rx_message = rx_recv_queue.recv().unwrap(); - assert_eq!(rx_message, message_2); - println!("verified fourth message"); - jh.join().unwrap(); + // client loop + let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port); + let (client_sx_queue, rx_sent_queue) = mio_channel::channel(); + let (sx_recv_queue, client_rx_queue) = mpsc::channel(); + + let _client_loop_jh = std::thread::spawn(move || { + let client_config = configure_client(100, 20_000_000, 1).unwrap(); + let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); + if let Err(e) = client_loop( + client_config, + socket_addr, + server_addr, + rx_sent_queue, + sx_recv_queue, + ) { + println!("client stopped with error {e}"); + } + }); + client_sx_queue + .send(Message::Filters(vec![ + Filter::AccountsAll, + Filter::TransactionsAll, + Filter::Slot, + ])) + .unwrap(); + sleep(Duration::from_millis(100)); + server_send_queue.send(message_1.clone()).unwrap(); + server_send_queue.send(message_2.clone()).unwrap(); + server_send_queue.send(message_3.clone()).unwrap(); + sleep(Duration::from_millis(100)); + server_send_queue.send(message_4.clone()).unwrap(); + server_send_queue.send(message_5.clone()).unwrap(); + sleep(Duration::from_millis(100)); + + let message_rx_1 = client_rx_queue.recv().unwrap(); + assert_eq!( + message_rx_1, + Message::SlotMsg(SlotMeta { + slot: 3, + parent: 2, + commitment_level: solana_sdk::commitment_config::CommitmentLevel::Confirmed }) + ); + + let message_rx_2 = client_rx_queue.recv().unwrap(); + + let ChannelMessage::Account(account, slot, _) = &message_2 else { + panic!("message should be account"); }; + let Message::AccountMsg(message_rx_2) = message_rx_2 else { + panic!("message should be account"); + }; + let message_account = message_rx_2.solana_account(); + assert_eq!(account.pubkey, message_rx_2.pubkey); + assert_eq!(account.account, message_account); + assert_eq!(message_rx_2.slot_identifier.slot, *slot); - loop { - let mut buf = [0; 65535]; - let mut out = [0; MAX_DATAGRAM_SIZE]; + let message_rx_3 = client_rx_queue.recv().unwrap(); - let (len, from) = match socket.recv_from(&mut buf) { - Ok(v) => v, - Err(e) => { - panic!("recv() failed: {:?}", e); - } - }; - println!("recieved first packet"); + let ChannelMessage::Account(account, slot, _) = &message_3 else { + panic!("message should be account"); + }; + let Message::AccountMsg(message_rx_3) = message_rx_3 else { + panic!("message should be account"); + }; + let message_account = message_rx_3.solana_account(); + assert_eq!(account.pubkey, message_rx_3.pubkey); + assert_eq!(account.account, message_account); + assert_eq!(message_rx_3.slot_identifier.slot, *slot); - log::debug!("got {} bytes", len); + let message_rx_4 = client_rx_queue.recv().unwrap(); + let ChannelMessage::Account(account, slot, _) = &message_4 else { + panic!("message should be account"); + }; + let Message::AccountMsg(message_rx_4) = message_rx_4 else { + panic!("message should be account"); + }; + let message_account = message_rx_4.solana_account(); + assert_eq!(account.pubkey, message_rx_4.pubkey); + assert_eq!(account.account, message_account); + assert_eq!(message_rx_4.slot_identifier.slot, *slot); - let pkt_buf = &mut buf[..len]; - - // Parse the QUIC packet's header. - let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) { - Ok(header) => header, - - Err(e) => { - panic!("Parsing packet header failed: {:?}", e); - } - }; - let rng = SystemRandom::new(); - let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap(); - let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid); - let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN]; - let conn_id: ConnectionId<'static> = conn_id.to_vec().into(); - - if hdr.ty != quiche::Type::Initial { - panic!("Packet is not Initial"); - } - - if !quiche::version_is_supported(hdr.version) { - log::warn!("Doing version negotiation"); - - let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap(); - - let out = &out[..len]; - - if let Err(e) = socket.send_to(out, from) { - panic!("send() failed: {:?}", e); - } - } - - let mut scid = [0; quiche::MAX_CONN_ID_LEN]; - scid.copy_from_slice(&conn_id); - - let scid = quiche::ConnectionId::from_ref(&scid); - - // Token is always present in Initial packets. - let token = hdr.token.as_ref().unwrap(); - - println!("token: {}", token.iter().map(|x| x.to_string()).join(", ")); - - // Do stateless retry if the client didn't send a token. - if token.is_empty() { - log::warn!("Doing stateless retry"); - - let new_token = mint_token(&hdr, &from); - - let len = quiche::retry( - &hdr.scid, - &hdr.dcid, - &scid, - &new_token, - hdr.version, - &mut out, - ) - .unwrap(); - - let out = &out[..len]; - - if let Err(e) = socket.send_to(out, from) { - panic!("send() failed: {:?}", e); - } else { - continue; - } - } - let odcid = validate_token(&from, token); - // The token was not valid, meaning the retry failed, so - // drop the packet. - if odcid.is_none() { - panic!("Invalid address validation token"); - } - - if scid.len() != hdr.dcid.len() { - panic!("Invalid destination connection ID"); - } - - // Reuse the source connection ID we sent in the Retry packet, - // instead of changing it again. - let scid = hdr.dcid.clone(); - - log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid); - - let mut conn = - quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config).unwrap(); - - let r_m = recv_message(&mut conn, 1).unwrap(); - assert_eq!(r_m, message_1); - println!("verified first message"); - - send_message(&mut conn, 1, &message_2).unwrap(); - - let r_m = recv_message(&mut conn, 2).unwrap(); - assert_eq!(r_m, message_3); - println!("verified third message"); - - send_message(&mut conn, 2, &message_2).unwrap(); - conn.close(true, 0, b"not required").unwrap(); - jh.join().unwrap(); - break; - } + let message_rx_5 = client_rx_queue.recv().unwrap(); + let ChannelMessage::Account(account, slot, _) = &message_5 else { + panic!("message should be account"); + }; + let Message::AccountMsg(message_rx_5) = message_rx_5 else { + panic!("message should be account"); + }; + let message_account = message_rx_5.solana_account(); + assert_eq!(account.pubkey, message_rx_5.pubkey); + assert_eq!(account.account, message_account); + assert_eq!(message_rx_5.slot_identifier.slot, *slot); } } diff --git a/common/src/quic/quiche_reciever.rs b/common/src/quic/quiche_reciever.rs index 315968c..22e29a9 100644 --- a/common/src/quic/quiche_reciever.rs +++ b/common/src/quic/quiche_reciever.rs @@ -17,13 +17,15 @@ pub fn recv_message( let mut buf = [0; MAX_DATAGRAM_SIZE]; // 10kk buffer size match connection.stream_recv(stream_id, &mut buf) { Ok((read, fin)) => { - total_buf.append(&mut buf[0..read].to_vec()); + log::debug!("read {} on stream {}", read, stream_id); + total_buf.extend_from_slice(&buf[..read]); if fin { + log::debug!("fin stream : {}", stream_id); return Ok(bincode::deserialize::(&total_buf)?); } } - Err(_) => { - bail!("Fail to read from stream {stream_id}"); + Err(e) => { + bail!("Fail to read from stream {stream_id} : error : {e}"); } } } diff --git a/common/src/quic/quiche_server_loop.rs b/common/src/quic/quiche_server_loop.rs index 4a647a3..0de2a97 100644 --- a/common/src/quic/quiche_server_loop.rs +++ b/common/src/quic/quiche_server_loop.rs @@ -12,7 +12,7 @@ use crate::{ message::Message, quic::{ quiche_reciever::recv_message, - quiche_utils::{mint_token, validate_token}, + quiche_utils::{get_next_unidi, mint_token, validate_token}, }, types::{account::Account, block_meta::SlotMeta, slot_identifier::SlotIdentifier}, }; @@ -28,7 +28,7 @@ struct Client { pub conn: quiche::Connection, pub partial_responses: HashMap, pub filters: Vec, - pub last_sent_stream_id: u64, + pub next_stream: u64, } type ClientMap = HashMap, Client>; @@ -47,8 +47,11 @@ pub fn server_loop( let mut poll = mio::Poll::new()?; let mut events = mio::Events::with_capacity(1024); - poll.registry() - .register(&mut socket, mio::Token(0), mio::Interest::READABLE)?; + poll.registry().register( + &mut socket, + mio::Token(0), + mio::Interest::READABLE | mio::Interest::WRITABLE, + )?; poll.registry().register( &mut message_send_queue, @@ -62,11 +65,12 @@ pub fn server_loop( let mut clients = ClientMap::new(); loop { let timeout = clients.values().filter_map(|c| c.conn.timeout()).min(); + log::debug!("timeout : {}", timeout.unwrap_or_default().as_millis()); poll.poll(&mut events, timeout).unwrap(); - let network_updates = events.iter().any(|x| x.token().0 == 0); - let channel_updates = events.iter().any(|x| x.token().0 == 1); + let network_updates = true; + let channel_updates = true; if network_updates { 'read: loop { if events.is_empty() { @@ -139,7 +143,7 @@ pub fn server_loop( // Do stateless retry if the client didn't send a token. if token.is_empty() { - log::warn!("Doing stateless retry"); + log::debug!("Doing stateless retry"); let new_token = mint_token(&hdr, &from); @@ -189,7 +193,7 @@ pub fn server_loop( conn, partial_responses: HashMap::new(), filters: Vec::new(), - last_sent_stream_id: u64::MAX / 2, + next_stream: get_next_unidi(0, true), }; clients.insert(scid.clone(), client); clients @@ -234,8 +238,8 @@ pub fn server_loop( Message::Filters(mut filters) => { client.filters.append(&mut filters); } - Message::ConnectionParameters(_) => { - // ignore for now not needed + Message::AddStream(_) => { + // do nothing } _ => { log::error!("unknown message from the client"); @@ -254,7 +258,10 @@ pub fn server_loop( while let Ok(message) = message_send_queue.try_recv() { let dispatch_to = clients .iter_mut() - .filter(|(_, client)| client.filters.iter().any(|x| x.allows(&message))) + .filter(|(_, client)| { + client.conn.is_established() + && client.filters.iter().any(|x| x.allows(&message)) + }) .map(|x| x.1) .collect_vec(); if dispatch_to.len() > 0 { @@ -288,8 +295,13 @@ pub fn server_loop( let binary = convert_to_binary(&message) .expect("Message should be serializable in binary"); for client in dispatch_to { - client.last_sent_stream_id += 1; - let stream_id = client.last_sent_stream_id; + let stream_id = client.next_stream; + client.next_stream = get_next_unidi(stream_id, true); + log::debug!( + "dispatching {} on stream id : {}", + binary.len(), + stream_id + ); let written = match client.conn.stream_send(stream_id, &binary, true) { Ok(v) => v, @@ -304,6 +316,7 @@ pub fn server_loop( continue; } }; + log::debug!("dispatched {} on stream id : {}", written, stream_id); if written < binary.len() { let response = PartialResponse { diff --git a/common/src/quic/quiche_utils.rs b/common/src/quic/quiche_utils.rs index ac04918..af414a3 100644 --- a/common/src/quic/quiche_utils.rs +++ b/common/src/quic/quiche_utils.rs @@ -39,3 +39,29 @@ pub fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec { token } + +pub fn is_bidi(stream_id: u64) -> bool { + (stream_id & 0x2) == 0 +} + +pub fn get_next_bidi(current_stream_id: u64) -> u64 { + for stream_id in current_stream_id + 1.. { + if is_bidi(stream_id) { + return stream_id; + } + } + panic!("stream not found"); +} + +pub fn is_unidi(stream_id: u64, is_server: bool) -> bool { + (stream_id & 0x1) == (is_server as u64) +} + +pub fn get_next_unidi(current_stream_id: u64, is_server: bool) -> u64 { + for stream_id in current_stream_id + 1.. { + if is_unidi(stream_id, is_server) && !is_bidi(stream_id) { + return stream_id; + } + } + panic!("stream not found"); +} diff --git a/common/src/types/account.rs b/common/src/types/account.rs index df899ba..2c64c79 100644 --- a/common/src/types/account.rs +++ b/common/src/types/account.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use solana_sdk::{account::Account as SolanaAccount, clock::Slot, pubkey::Pubkey}; +use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey}; use crate::compression::CompressionType; @@ -21,7 +21,10 @@ pub struct Account { } impl Account { - pub fn get_account_for_test(slot: Slot, data_size: usize) -> Self { + #[cfg(test)] + pub fn get_account_for_test(slot: u64, data_size: usize) -> Self { + use itertools::Itertools; + Account { slot_identifier: SlotIdentifier { slot }, pubkey: Pubkey::new_unique(), @@ -30,7 +33,7 @@ impl Account { lamports: 12345, rent_epoch: u64::MAX, executable: false, - data: vec![178; data_size], + data: (0..data_size).map(|_| rand::random::()).collect_vec(), compression_type: CompressionType::None, data_length: data_size as u64, } diff --git a/examples/tester-client/src/main.rs b/examples/tester-client/src/main.rs index 6a787af..8d77da7 100644 --- a/examples/tester-client/src/main.rs +++ b/examples/tester-client/src/main.rs @@ -183,7 +183,7 @@ async fn main() { quic_geyser_common::message::Message::Filters(_) => { // Not supported } - quic_geyser_common::message::Message::ConnectionParameters(_) => { + quic_geyser_common::message::Message::AddStream(_) => { // Not supported } }