Solving issues with quiche server and client, making tests work for small transfers

This commit is contained in:
godmodegalactus 2024-05-22 11:14:39 +02:00
parent 1a46617c64
commit b54e614de8
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
11 changed files with 333 additions and 204 deletions

69
Cargo.lock generated
View File

@ -2031,6 +2031,16 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]] [[package]]
name = "num" name = "num"
version = "0.2.1" version = "0.2.1"
@ -2239,6 +2249,12 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.12.2" version = "0.12.2"
@ -2464,6 +2480,7 @@ dependencies = [
"mio_channel", "mio_channel",
"pkcs8", "pkcs8",
"quiche", "quiche",
"rand 0.8.5",
"rcgen", "rcgen",
"ring 0.17.8", "ring 0.17.8",
"rustls", "rustls",
@ -2472,6 +2489,7 @@ dependencies = [
"solana-transaction-status", "solana-transaction-status",
"thiserror", "thiserror",
"tokio", "tokio",
"tracing-subscriber",
] ]
[[package]] [[package]]
@ -3025,6 +3043,15 @@ dependencies = [
"keccak", "keccak",
] ]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]] [[package]]
name = "shlex" name = "shlex"
version = "1.3.0" version = "1.3.0"
@ -3833,6 +3860,16 @@ dependencies = [
"syn 2.0.63", "syn 2.0.63",
] ]
[[package]]
name = "thread_local"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.36" version = "0.3.36"
@ -4030,6 +4067,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54"
dependencies = [ dependencies = [
"once_cell", "once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"nu-ansi-term",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
] ]
[[package]] [[package]]
@ -4129,6 +4192,12 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]] [[package]]
name = "vergen" name = "vergen"
version = "8.3.1" version = "8.3.1"

View File

@ -48,13 +48,14 @@ async-stream = "0.3.5"
mio = "0.8.11" mio = "0.8.11"
mio_channel = "0.1.3" mio_channel = "0.1.3"
quiche = "0.21.0" quiche = "=0.21.0"
boring = "4.6.0" boring = "4.6.0"
ring = "0.17.8" ring = "0.17.8"
cargo-lock = "9.0.0" cargo-lock = "9.0.0"
git-version = "0.3.5" git-version = "0.3.5"
vergen = "8.2.1" vergen = "8.2.1"
rand = "0.8.5"
quic-geyser-common = {path = "common", version="0.1.0"} quic-geyser-common = {path = "common", version="0.1.0"}
quic-geyser-client = {path = "client", version="0.1.0"} quic-geyser-client = {path = "client", version="0.1.0"}

View File

@ -24,4 +24,6 @@ mio = { workspace = true, features = ["net", "os-poll"] }
mio_channel = { workspace = true } mio_channel = { workspace = true }
ring = {workspace = true} ring = {workspace = true}
[dev-dependencies] [dev-dependencies]
rand = { workspace = true }
tracing-subscriber = { workspace = true }

View File

@ -4,14 +4,14 @@ use solana_sdk::{
use crate::types::{block_meta::BlockMeta, transaction::Transaction}; use crate::types::{block_meta::BlockMeta, transaction::Transaction};
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct AccountData { pub struct AccountData {
pub pubkey: Pubkey, pub pubkey: Pubkey,
pub account: Account, pub account: Account,
pub write_version: u64, pub write_version: u64,
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelMessage { pub enum ChannelMessage {
Account(AccountData, Slot, bool), Account(AccountData, Slot, bool),
Slot(u64, u64, CommitmentLevel), Slot(u64, u64, CommitmentLevel),

View File

@ -5,7 +5,6 @@ use crate::{
types::{ types::{
account::Account, account::Account,
block_meta::{BlockMeta, SlotMeta}, block_meta::{BlockMeta, SlotMeta},
connections_parameters::ConnectionParameters,
transaction::Transaction, transaction::Transaction,
}, },
}; };
@ -21,5 +20,5 @@ pub enum Message {
BlockMetaMsg(BlockMeta), BlockMetaMsg(BlockMeta),
TransactionMsg(Box<Transaction>), TransactionMsg(Box<Transaction>),
Filters(Vec<Filter>), // sent from client to server Filters(Vec<Filter>), // sent from client to server
ConnectionParameters(ConnectionParameters), AddStream(u64),
} }

View File

@ -4,7 +4,7 @@ use crate::{
message::Message, message::Message,
quic::{ quic::{
configure_server::MAX_DATAGRAM_SIZE, quiche_reciever::recv_message, configure_server::MAX_DATAGRAM_SIZE, quiche_reciever::recv_message,
quiche_sender::send_message, quiche_sender::send_message, quiche_utils::get_next_unidi,
}, },
}; };
use anyhow::bail; use anyhow::bail;
@ -49,7 +49,7 @@ pub fn client_loop(
bail!("send() failed: {:?}", e); bail!("send() failed: {:?}", e);
} }
let mut stream_send_id = 0; let mut current_stream_id = 3;
let mut buf = [0; 65535]; let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE]; let mut out = [0; MAX_DATAGRAM_SIZE];
@ -66,8 +66,8 @@ pub fn client_loop(
break; break;
} }
let network_updates = events.iter().any(|x| x.token().0 == 0); let network_updates = true;
let channel_updates = events.iter().any(|x| x.token().0 == 1); let channel_updates = true;
if network_updates { if network_updates {
'read: loop { 'read: loop {
@ -114,7 +114,7 @@ pub fn client_loop(
let message = recv_message(&mut conn, stream); let message = recv_message(&mut conn, stream);
match message { match message {
Ok(message) => { Ok(message) => {
message_recv_queue.send(message)?; message_recv_queue.send(message).unwrap();
} }
Err(e) => { Err(e) => {
log::error!("Error recieving message : {e}") log::error!("Error recieving message : {e}")
@ -122,13 +122,15 @@ pub fn client_loop(
} }
} }
} }
// chanel updates // chanel updates
if channel_updates { if channel_updates && conn.is_established() {
// channel events // channel events
let message_to_send = message_send_queue.try_recv()?; if let Ok(message_to_send) = message_send_queue.try_recv() {
stream_send_id += 1; current_stream_id = get_next_unidi(current_stream_id, false);
if let Err(e) = send_message(&mut conn, stream_send_id, &message_to_send) { if let Err(e) = send_message(&mut conn, current_stream_id, &message_to_send) {
log::error!("Error sending message on stream : {}", e); log::error!("Error sending message on stream : {}", e);
}
} }
} }
@ -171,198 +173,210 @@ mod tests {
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr, str::FromStr,
sync::mpsc, sync::mpsc,
thread::sleep,
time::Duration,
}; };
use itertools::Itertools; use itertools::Itertools;
use quiche::ConnectionId; use solana_sdk::{account::Account, pubkey::Pubkey};
use ring::rand::SystemRandom;
use std::net::UdpSocket;
use crate::{ use crate::{
channel_message::{AccountData, ChannelMessage},
compression::CompressionType,
filters::Filter,
message::Message, message::Message,
quic::{ quic::{
configure_client::configure_client, configure_client::configure_client, configure_server::configure_server,
configure_server::{configure_server, MAX_DATAGRAM_SIZE}, quiche_server_loop::server_loop,
quiche_reciever::recv_message,
quiche_sender::send_message,
quiche_utils::{mint_token, validate_token},
}, },
types::{account::Account, block_meta::SlotMeta}, types::block_meta::SlotMeta,
}; };
use super::client_loop; use super::client_loop;
#[test] #[test]
fn test_send_and_recieve_of_large_account_with_client_loop() { fn test_send_and_recieve_of_large_account_with_client_loop() {
let mut config = configure_server(1, 100000, 1).unwrap(); tracing_subscriber::fmt::init();
// Setup the event loop. // Setup the event loop.
let socket_addr = SocketAddr::from_str("0.0.0.0:0").unwrap(); let socket_addr = SocketAddr::from_str("0.0.0.0:10900").unwrap();
let socket = UdpSocket::bind(socket_addr).unwrap();
let port = socket.local_addr().unwrap().port(); let port = 10900;
let local_addr = socket.local_addr().unwrap();
let account = Account::get_account_for_test(123456, 10_000_000); let message_1 = ChannelMessage::Slot(
let message_1 = Message::SlotMsg(SlotMeta { 3,
slot: 1, 2,
parent: 0, solana_sdk::commitment_config::CommitmentLevel::Confirmed,
commitment_level: solana_sdk::commitment_config::CommitmentLevel::Confirmed, );
}); let message_2 = ChannelMessage::Account(
let message_2 = Message::AccountMsg(account); AccountData {
let message_3 = Message::SlotMsg(SlotMeta { pubkey: Pubkey::new_unique(),
slot: 4, account: Account {
parent: 3, lamports: 12345,
commitment_level: solana_sdk::commitment_config::CommitmentLevel::Processed, data: (0..100).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
false,
);
let message_3 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: 23456,
data: (0..10_000).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
false,
);
let message_4 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: 34567,
data: (0..1_000_000).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
false,
);
let message_5 = ChannelMessage::Account(
AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: 45678,
data: (0..10_000_000).map(|_| rand::random::<u8>()).collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version: 1,
},
5,
false,
);
// server loop
let (server_send_queue, rx_sent_queue) = mio_channel::channel::<ChannelMessage>();
let _server_loop_jh = std::thread::spawn(move || {
let config = configure_server(100, 20_000_000, 1).unwrap();
if let Err(e) = server_loop(
config,
socket_addr,
rx_sent_queue,
CompressionType::Lz4Fast(8),
) {
println!("Server loop closed by error : {e}");
}
}); });
let jh = { // client loop
let message_1 = message_1.clone(); let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let message_2 = message_2.clone(); let (client_sx_queue, rx_sent_queue) = mio_channel::channel();
let message_3 = message_3.clone(); let (sx_recv_queue, client_rx_queue) = mpsc::channel();
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let (sx_sent_queue, rx_sent_queue) = mio_channel::channel(); let _client_loop_jh = std::thread::spawn(move || {
let (sx_recv_queue, rx_recv_queue) = mpsc::channel(); let client_config = configure_client(100, 20_000_000, 1).unwrap();
std::thread::spawn(move || { let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
let jh = std::thread::spawn(move || { if let Err(e) = client_loop(
let client_config = configure_client(1, 12_000_000, 10).unwrap(); client_config,
let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap(); socket_addr,
if let Err(e) = client_loop( server_addr,
client_config, rx_sent_queue,
socket_addr, sx_recv_queue,
server_addr, ) {
rx_sent_queue, println!("client stopped with error {e}");
sx_recv_queue, }
) { });
println!("client stopped with error {e}"); client_sx_queue
} .send(Message::Filters(vec![
}); Filter::AccountsAll,
sx_sent_queue.send(message_1).unwrap(); Filter::TransactionsAll,
let rx_message = rx_recv_queue.recv().unwrap(); Filter::Slot,
assert_eq!(rx_message, message_2); ]))
println!("verified second message"); .unwrap();
sx_sent_queue.send(message_3).unwrap(); sleep(Duration::from_millis(100));
let rx_message = rx_recv_queue.recv().unwrap(); server_send_queue.send(message_1.clone()).unwrap();
assert_eq!(rx_message, message_2); server_send_queue.send(message_2.clone()).unwrap();
println!("verified fourth message"); server_send_queue.send(message_3.clone()).unwrap();
jh.join().unwrap(); sleep(Duration::from_millis(100));
server_send_queue.send(message_4.clone()).unwrap();
server_send_queue.send(message_5.clone()).unwrap();
sleep(Duration::from_millis(100));
let message_rx_1 = client_rx_queue.recv().unwrap();
assert_eq!(
message_rx_1,
Message::SlotMsg(SlotMeta {
slot: 3,
parent: 2,
commitment_level: solana_sdk::commitment_config::CommitmentLevel::Confirmed
}) })
);
let message_rx_2 = client_rx_queue.recv().unwrap();
let ChannelMessage::Account(account, slot, _) = &message_2 else {
panic!("message should be account");
}; };
let Message::AccountMsg(message_rx_2) = message_rx_2 else {
panic!("message should be account");
};
let message_account = message_rx_2.solana_account();
assert_eq!(account.pubkey, message_rx_2.pubkey);
assert_eq!(account.account, message_account);
assert_eq!(message_rx_2.slot_identifier.slot, *slot);
loop { let message_rx_3 = client_rx_queue.recv().unwrap();
let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE];
let (len, from) = match socket.recv_from(&mut buf) { let ChannelMessage::Account(account, slot, _) = &message_3 else {
Ok(v) => v, panic!("message should be account");
Err(e) => { };
panic!("recv() failed: {:?}", e); let Message::AccountMsg(message_rx_3) = message_rx_3 else {
} panic!("message should be account");
}; };
println!("recieved first packet"); let message_account = message_rx_3.solana_account();
assert_eq!(account.pubkey, message_rx_3.pubkey);
assert_eq!(account.account, message_account);
assert_eq!(message_rx_3.slot_identifier.slot, *slot);
log::debug!("got {} bytes", len); let message_rx_4 = client_rx_queue.recv().unwrap();
let ChannelMessage::Account(account, slot, _) = &message_4 else {
panic!("message should be account");
};
let Message::AccountMsg(message_rx_4) = message_rx_4 else {
panic!("message should be account");
};
let message_account = message_rx_4.solana_account();
assert_eq!(account.pubkey, message_rx_4.pubkey);
assert_eq!(account.account, message_account);
assert_eq!(message_rx_4.slot_identifier.slot, *slot);
let pkt_buf = &mut buf[..len]; let message_rx_5 = client_rx_queue.recv().unwrap();
let ChannelMessage::Account(account, slot, _) = &message_5 else {
// Parse the QUIC packet's header. panic!("message should be account");
let hdr = match quiche::Header::from_slice(pkt_buf, quiche::MAX_CONN_ID_LEN) { };
Ok(header) => header, let Message::AccountMsg(message_rx_5) = message_rx_5 else {
panic!("message should be account");
Err(e) => { };
panic!("Parsing packet header failed: {:?}", e); let message_account = message_rx_5.solana_account();
} assert_eq!(account.pubkey, message_rx_5.pubkey);
}; assert_eq!(account.account, message_account);
let rng = SystemRandom::new(); assert_eq!(message_rx_5.slot_identifier.slot, *slot);
let conn_id_seed = ring::hmac::Key::generate(ring::hmac::HMAC_SHA256, &rng).unwrap();
let conn_id = ring::hmac::sign(&conn_id_seed, &hdr.dcid);
let conn_id = &conn_id.as_ref()[..quiche::MAX_CONN_ID_LEN];
let conn_id: ConnectionId<'static> = conn_id.to_vec().into();
if hdr.ty != quiche::Type::Initial {
panic!("Packet is not Initial");
}
if !quiche::version_is_supported(hdr.version) {
log::warn!("Doing version negotiation");
let len = quiche::negotiate_version(&hdr.scid, &hdr.dcid, &mut out).unwrap();
let out = &out[..len];
if let Err(e) = socket.send_to(out, from) {
panic!("send() failed: {:?}", e);
}
}
let mut scid = [0; quiche::MAX_CONN_ID_LEN];
scid.copy_from_slice(&conn_id);
let scid = quiche::ConnectionId::from_ref(&scid);
// Token is always present in Initial packets.
let token = hdr.token.as_ref().unwrap();
println!("token: {}", token.iter().map(|x| x.to_string()).join(", "));
// Do stateless retry if the client didn't send a token.
if token.is_empty() {
log::warn!("Doing stateless retry");
let new_token = mint_token(&hdr, &from);
let len = quiche::retry(
&hdr.scid,
&hdr.dcid,
&scid,
&new_token,
hdr.version,
&mut out,
)
.unwrap();
let out = &out[..len];
if let Err(e) = socket.send_to(out, from) {
panic!("send() failed: {:?}", e);
} else {
continue;
}
}
let odcid = validate_token(&from, token);
// The token was not valid, meaning the retry failed, so
// drop the packet.
if odcid.is_none() {
panic!("Invalid address validation token");
}
if scid.len() != hdr.dcid.len() {
panic!("Invalid destination connection ID");
}
// Reuse the source connection ID we sent in the Retry packet,
// instead of changing it again.
let scid = hdr.dcid.clone();
log::debug!("New connection: dcid={:?} scid={:?}", hdr.dcid, scid);
let mut conn =
quiche::accept(&scid, odcid.as_ref(), local_addr, from, &mut config).unwrap();
let r_m = recv_message(&mut conn, 1).unwrap();
assert_eq!(r_m, message_1);
println!("verified first message");
send_message(&mut conn, 1, &message_2).unwrap();
let r_m = recv_message(&mut conn, 2).unwrap();
assert_eq!(r_m, message_3);
println!("verified third message");
send_message(&mut conn, 2, &message_2).unwrap();
conn.close(true, 0, b"not required").unwrap();
jh.join().unwrap();
break;
}
} }
} }

View File

@ -17,13 +17,15 @@ pub fn recv_message(
let mut buf = [0; MAX_DATAGRAM_SIZE]; // 10kk buffer size let mut buf = [0; MAX_DATAGRAM_SIZE]; // 10kk buffer size
match connection.stream_recv(stream_id, &mut buf) { match connection.stream_recv(stream_id, &mut buf) {
Ok((read, fin)) => { Ok((read, fin)) => {
total_buf.append(&mut buf[0..read].to_vec()); log::debug!("read {} on stream {}", read, stream_id);
total_buf.extend_from_slice(&buf[..read]);
if fin { if fin {
log::debug!("fin stream : {}", stream_id);
return Ok(bincode::deserialize::<Message>(&total_buf)?); return Ok(bincode::deserialize::<Message>(&total_buf)?);
} }
} }
Err(_) => { Err(e) => {
bail!("Fail to read from stream {stream_id}"); bail!("Fail to read from stream {stream_id} : error : {e}");
} }
} }
} }

View File

@ -12,7 +12,7 @@ use crate::{
message::Message, message::Message,
quic::{ quic::{
quiche_reciever::recv_message, quiche_reciever::recv_message,
quiche_utils::{mint_token, validate_token}, quiche_utils::{get_next_unidi, mint_token, validate_token},
}, },
types::{account::Account, block_meta::SlotMeta, slot_identifier::SlotIdentifier}, types::{account::Account, block_meta::SlotMeta, slot_identifier::SlotIdentifier},
}; };
@ -28,7 +28,7 @@ struct Client {
pub conn: quiche::Connection, pub conn: quiche::Connection,
pub partial_responses: HashMap<u64, PartialResponse>, pub partial_responses: HashMap<u64, PartialResponse>,
pub filters: Vec<Filter>, pub filters: Vec<Filter>,
pub last_sent_stream_id: u64, pub next_stream: u64,
} }
type ClientMap = HashMap<quiche::ConnectionId<'static>, Client>; type ClientMap = HashMap<quiche::ConnectionId<'static>, Client>;
@ -47,8 +47,11 @@ pub fn server_loop(
let mut poll = mio::Poll::new()?; let mut poll = mio::Poll::new()?;
let mut events = mio::Events::with_capacity(1024); let mut events = mio::Events::with_capacity(1024);
poll.registry() poll.registry().register(
.register(&mut socket, mio::Token(0), mio::Interest::READABLE)?; &mut socket,
mio::Token(0),
mio::Interest::READABLE | mio::Interest::WRITABLE,
)?;
poll.registry().register( poll.registry().register(
&mut message_send_queue, &mut message_send_queue,
@ -62,11 +65,12 @@ pub fn server_loop(
let mut clients = ClientMap::new(); let mut clients = ClientMap::new();
loop { loop {
let timeout = clients.values().filter_map(|c| c.conn.timeout()).min(); let timeout = clients.values().filter_map(|c| c.conn.timeout()).min();
log::debug!("timeout : {}", timeout.unwrap_or_default().as_millis());
poll.poll(&mut events, timeout).unwrap(); poll.poll(&mut events, timeout).unwrap();
let network_updates = events.iter().any(|x| x.token().0 == 0); let network_updates = true;
let channel_updates = events.iter().any(|x| x.token().0 == 1); let channel_updates = true;
if network_updates { if network_updates {
'read: loop { 'read: loop {
if events.is_empty() { if events.is_empty() {
@ -139,7 +143,7 @@ pub fn server_loop(
// Do stateless retry if the client didn't send a token. // Do stateless retry if the client didn't send a token.
if token.is_empty() { if token.is_empty() {
log::warn!("Doing stateless retry"); log::debug!("Doing stateless retry");
let new_token = mint_token(&hdr, &from); let new_token = mint_token(&hdr, &from);
@ -189,7 +193,7 @@ pub fn server_loop(
conn, conn,
partial_responses: HashMap::new(), partial_responses: HashMap::new(),
filters: Vec::new(), filters: Vec::new(),
last_sent_stream_id: u64::MAX / 2, next_stream: get_next_unidi(0, true),
}; };
clients.insert(scid.clone(), client); clients.insert(scid.clone(), client);
clients clients
@ -234,8 +238,8 @@ pub fn server_loop(
Message::Filters(mut filters) => { Message::Filters(mut filters) => {
client.filters.append(&mut filters); client.filters.append(&mut filters);
} }
Message::ConnectionParameters(_) => { Message::AddStream(_) => {
// ignore for now not needed // do nothing
} }
_ => { _ => {
log::error!("unknown message from the client"); log::error!("unknown message from the client");
@ -254,7 +258,10 @@ pub fn server_loop(
while let Ok(message) = message_send_queue.try_recv() { while let Ok(message) = message_send_queue.try_recv() {
let dispatch_to = clients let dispatch_to = clients
.iter_mut() .iter_mut()
.filter(|(_, client)| client.filters.iter().any(|x| x.allows(&message))) .filter(|(_, client)| {
client.conn.is_established()
&& client.filters.iter().any(|x| x.allows(&message))
})
.map(|x| x.1) .map(|x| x.1)
.collect_vec(); .collect_vec();
if dispatch_to.len() > 0 { if dispatch_to.len() > 0 {
@ -288,8 +295,13 @@ pub fn server_loop(
let binary = convert_to_binary(&message) let binary = convert_to_binary(&message)
.expect("Message should be serializable in binary"); .expect("Message should be serializable in binary");
for client in dispatch_to { for client in dispatch_to {
client.last_sent_stream_id += 1; let stream_id = client.next_stream;
let stream_id = client.last_sent_stream_id; client.next_stream = get_next_unidi(stream_id, true);
log::debug!(
"dispatching {} on stream id : {}",
binary.len(),
stream_id
);
let written = match client.conn.stream_send(stream_id, &binary, true) { let written = match client.conn.stream_send(stream_id, &binary, true) {
Ok(v) => v, Ok(v) => v,
@ -304,6 +316,7 @@ pub fn server_loop(
continue; continue;
} }
}; };
log::debug!("dispatched {} on stream id : {}", written, stream_id);
if written < binary.len() { if written < binary.len() {
let response = PartialResponse { let response = PartialResponse {

View File

@ -39,3 +39,29 @@ pub fn mint_token(hdr: &quiche::Header, src: &std::net::SocketAddr) -> Vec<u8> {
token token
} }
pub fn is_bidi(stream_id: u64) -> bool {
(stream_id & 0x2) == 0
}
pub fn get_next_bidi(current_stream_id: u64) -> u64 {
for stream_id in current_stream_id + 1.. {
if is_bidi(stream_id) {
return stream_id;
}
}
panic!("stream not found");
}
pub fn is_unidi(stream_id: u64, is_server: bool) -> bool {
(stream_id & 0x1) == (is_server as u64)
}
pub fn get_next_unidi(current_stream_id: u64, is_server: bool) -> u64 {
for stream_id in current_stream_id + 1.. {
if is_unidi(stream_id, is_server) && !is_bidi(stream_id) {
return stream_id;
}
}
panic!("stream not found");
}

View File

@ -1,5 +1,5 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use solana_sdk::{account::Account as SolanaAccount, clock::Slot, pubkey::Pubkey}; use solana_sdk::{account::Account as SolanaAccount, pubkey::Pubkey};
use crate::compression::CompressionType; use crate::compression::CompressionType;
@ -21,7 +21,10 @@ pub struct Account {
} }
impl Account { impl Account {
pub fn get_account_for_test(slot: Slot, data_size: usize) -> Self { #[cfg(test)]
pub fn get_account_for_test(slot: u64, data_size: usize) -> Self {
use itertools::Itertools;
Account { Account {
slot_identifier: SlotIdentifier { slot }, slot_identifier: SlotIdentifier { slot },
pubkey: Pubkey::new_unique(), pubkey: Pubkey::new_unique(),
@ -30,7 +33,7 @@ impl Account {
lamports: 12345, lamports: 12345,
rent_epoch: u64::MAX, rent_epoch: u64::MAX,
executable: false, executable: false,
data: vec![178; data_size], data: (0..data_size).map(|_| rand::random::<u8>()).collect_vec(),
compression_type: CompressionType::None, compression_type: CompressionType::None,
data_length: data_size as u64, data_length: data_size as u64,
} }

View File

@ -183,7 +183,7 @@ async fn main() {
quic_geyser_common::message::Message::Filters(_) => { quic_geyser_common::message::Message::Filters(_) => {
// Not supported // Not supported
} }
quic_geyser_common::message::Message::ConnectionParameters(_) => { quic_geyser_common::message::Message::AddStream(_) => {
// Not supported // Not supported
} }
} }