From 24757c498b8584fcecee00d6f31f2a1ad6437907 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Wed, 29 May 2024 16:27:56 +0200 Subject: [PATCH] making client based on quinn instead of quiche --- Cargo.lock | 204 ++++++++++- Cargo.toml | 9 +- client/Cargo.toml | 12 +- client/src/blocking/client.rs | 338 +++++++++--------- client/src/lib.rs | 2 - client/src/non_blocking/client.rs | 212 ++++++++--- common/Cargo.toml | 14 +- common/src/config.rs | 10 +- common/src/lib.rs | 2 +- common/src/types/connections_parameters.rs | 6 +- examples/tester-client/Cargo.toml | 3 +- examples/tester-client/src/main.rs | 142 +++----- examples/tester-server/Cargo.toml | 3 +- examples/tester-server/src/main.rs | 2 +- plugin/Cargo.toml | 3 +- plugin/src/quic_plugin.rs | 2 +- proxy/Cargo.toml | 6 +- proxy/src/cli.rs | 6 +- proxy/src/main.rs | 28 +- server/Cargo.toml | 28 ++ .../quic => server/src}/configure_client.rs | 7 +- .../quic => server/src}/configure_server.rs | 8 +- common/src/quic/mod.rs => server/src/lib.rs | 0 .../src/quic => server/src}/quic_server.rs | 4 +- .../quic => server/src}/quiche_client_loop.rs | 24 +- .../quic => server/src}/quiche_reciever.rs | 4 +- .../src/quic => server/src}/quiche_sender.rs | 4 +- .../quic => server/src}/quiche_server_loop.rs | 17 +- .../src/quic => server/src}/quiche_utils.rs | 0 29 files changed, 689 insertions(+), 411 deletions(-) create mode 100644 server/Cargo.toml rename {common/src/quic => server/src}/configure_client.rs (83%) rename {common/src/quic => server/src}/configure_server.rs (93%) rename common/src/quic/mod.rs => server/src/lib.rs (100%) rename {common/src/quic => server/src}/quic_server.rs (95%) rename {common/src/quic => server/src}/quiche_client_loop.rs (97%) rename {common/src/quic => server/src}/quiche_reciever.rs (95%) rename {common/src/quic => server/src}/quiche_sender.rs (95%) rename {common/src/quic => server/src}/quiche_server_loop.rs (98%) rename {common/src/quic => server/src}/quiche_utils.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 6ca3337..2a8b729 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -755,9 +755,9 @@ dependencies = [ [[package]] name = "clang-sys" -version = "1.7.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" dependencies = [ "glob", "libc", @@ -867,6 +867,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "const-oid" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3" + [[package]] name = "constant_time_eq" version = "0.3.0" @@ -1025,6 +1031,15 @@ dependencies = [ "syn 2.0.65", ] +[[package]] +name = "der" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c" +dependencies = [ + "const-oid", +] + [[package]] name = "deranged" version = "0.3.11" @@ -2212,6 +2227,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "overload" version = "0.1.1" @@ -2307,6 +2328,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkcs8" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0" +dependencies = [ + "der", + "spki", +] + [[package]] name = "pkg-config" version = "0.3.30" @@ -2408,13 +2439,19 @@ dependencies = [ [[package]] name = "quic-geyser-client" -version = "0.1.0" +version = "0.1.3" dependencies = [ "anyhow", + "bincode", "itertools", "log", + "pkcs8", "quic-geyser-common", + "quic-geyser-server", + "quinn", "rand 0.8.5", + "rcgen", + "rustls", "solana-sdk", "tokio", "tracing-subscriber", @@ -2422,20 +2459,13 @@ dependencies = [ [[package]] name = "quic-geyser-common" -version = "0.1.0" +version = "0.1.3" dependencies = [ "anyhow", - "bincode", - "boring", "itertools", "log", "lz4", - "mio", - "mio_channel", - "quiche", "rand 0.8.5", - "rcgen", - "ring 0.17.8", "serde", "solana-sdk", "solana-transaction-status", @@ -2445,7 +2475,7 @@ dependencies = [ [[package]] name = "quic-geyser-plugin" -version = "0.1.0" +version = "0.1.3" dependencies = [ "agave-geyser-plugin-interface", "anyhow", @@ -2454,6 +2484,7 @@ dependencies = [ "git-version", "log", "quic-geyser-common", + "quic-geyser-server", "serde", "serde_json", "solana-logger", @@ -2464,7 +2495,7 @@ dependencies = [ [[package]] name = "quic-geyser-plugin-proxy" -version = "0.1.0" +version = "0.1.3" dependencies = [ "anyhow", "bincode", @@ -2472,10 +2503,35 @@ dependencies = [ "log", "quic-geyser-client", "quic-geyser-common", + "quic-geyser-server", "serde", "serde_json", "solana-rpc-client", "solana-sdk", + "tokio", + "tracing-subscriber", +] + +[[package]] +name = "quic-geyser-server" +version = "0.1.3" +dependencies = [ + "anyhow", + "bincode", + "boring", + "itertools", + "log", + "mio", + "mio_channel", + "quic-geyser-common", + "quiche", + "rand 0.8.5", + "rcgen", + "ring 0.17.8", + "serde", + "solana-sdk", + "solana-transaction-status", + "thiserror", "tracing-subscriber", ] @@ -2493,6 +2549,7 @@ dependencies = [ "serde_json", "solana-rpc-client", "solana-sdk", + "tokio", "tracing-subscriber", ] @@ -2506,6 +2563,7 @@ dependencies = [ "itertools", "log", "quic-geyser-common", + "quic-geyser-server", "rand 0.8.5", "serde", "serde_json", @@ -2535,6 +2593,54 @@ dependencies = [ "winapi", ] +[[package]] +name = "quinn" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cc2c5017e4b43d5995dcea317bc46c1e09404c0a9664d2908f7f02dfe943d75" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "141bf7dfde2fbc246bfd3fe12f2455aa24b0fbd9af535d8c86c7bd1381ff2b1a" +dependencies = [ + "bytes", + "rand 0.8.5", + "ring 0.16.20", + "rustc-hash", + "rustls", + "rustls-native-certs", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7" +dependencies = [ + "bytes", + "libc", + "socket2", + "tracing", + "windows-sys 0.48.0", +] + [[package]] name = "quote" version = "1.0.36" @@ -2790,16 +2896,28 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.12" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", - "ring 0.17.8", + "ring 0.16.20", "rustls-webpki", "sct", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -2831,6 +2949,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -2867,6 +2994,29 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "security-framework" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +dependencies = [ + "bitflags 2.5.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.23" @@ -3516,6 +3666,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spki" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27" +dependencies = [ + "der", +] + [[package]] name = "spl-associated-token-account" version = "2.3.0" @@ -4015,10 +4174,23 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.65", +] + [[package]] name = "tracing-core" version = "0.1.32" diff --git a/Cargo.toml b/Cargo.toml index d8acc2f..5603fe5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "plugin", "client", "common", + "server", "examples/tester-client", "examples/tester-server", "proxy", @@ -52,10 +53,12 @@ cargo-lock = "9.0.0" git-version = "0.3.5" vergen = "8.2.1" rand = "0.8.5" +tokio = "1.28.2" -quic-geyser-common = {path = "common", version="0.1.0"} -quic-geyser-client = {path = "client", version="0.1.0"} -quic-geyser-plugin = {path = "plugin", version="0.1.0"} +quic-geyser-common = {path = "common", version="0.1.3"} +quic-geyser-client = {path = "client", version="0.1.3"} +quic-geyser-plugin = {path = "plugin", version="0.1.3"} +quic-geyser-server = {path = "server", version="0.1.3"} [profile.release] debug = true diff --git a/client/Cargo.toml b/client/Cargo.toml index 66dd1a4..7cc134e 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quic-geyser-client" -version = "0.1.0" +version = "0.1.3" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -10,11 +10,17 @@ edition = "2021" solana-sdk = { workspace = true } anyhow = { workspace = true } log = { workspace = true } -tokio = "1.28.2" +quinn = "0.10.2" +rustls = "=0.21.7" +rcgen = "0.10.0" +pkcs8 = "0.8.0" quic-geyser-common = { workspace = true } +bincode = { workspace = true } +tokio = { workspace = true } [dev-dependencies] rand = { workspace = true } tracing-subscriber = { workspace = true } -itertools = { workspace = true } \ No newline at end of file +itertools = { workspace = true } +quic-geyser-server = { workspace = true } \ No newline at end of file diff --git a/client/src/blocking/client.rs b/client/src/blocking/client.rs index ccfd652..607c293 100644 --- a/client/src/blocking/client.rs +++ b/client/src/blocking/client.rs @@ -1,181 +1,183 @@ -use quic_geyser_common::filters::Filter; -use quic_geyser_common::message::Message; -use quic_geyser_common::quic::configure_client::configure_client; -use quic_geyser_common::quic::quiche_client_loop::client_loop; -use quic_geyser_common::types::connections_parameters::ConnectionParameters; -use std::net::SocketAddr; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; +// THIS CODE IS DEPENDENT ON QUICHE -pub struct Client { - is_connected: Arc, - filters_sender: std::sync::mpsc::Sender, -} +// use quic_geyser_common::filters::Filter; +// use quic_geyser_common::message::Message; +// use quic_geyser_common::quic::configure_client::configure_client; +// use quic_geyser_common::quic::quiche_client_loop::client_loop; +// use quic_geyser_common::types::connections_parameters::ConnectionParameters; +// use std::net::SocketAddr; +// use std::sync::atomic::AtomicBool; +// use std::sync::Arc; -impl Client { - pub fn new( - server_address: String, - connection_parameters: ConnectionParameters, - ) -> anyhow::Result<(Client, std::sync::mpsc::Receiver)> { - let config = configure_client( - connection_parameters.max_number_of_streams, - connection_parameters.recieve_window_size, - connection_parameters.timeout_in_seconds, - connection_parameters.max_ack_delay, - connection_parameters.ack_exponent, - )?; - let server_address: SocketAddr = server_address.parse()?; - let socket_addr: SocketAddr = "0.0.0.0:0" - .parse() - .expect("Socket address should be returned"); - let is_connected = Arc::new(AtomicBool::new(false)); - let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel(); - let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel(); +// pub struct Client { +// is_connected: Arc, +// filters_sender: std::sync::mpsc::Sender, +// } - let is_connected_client = is_connected.clone(); - let _client_loop_jh = std::thread::spawn(move || { - if let Err(e) = client_loop( - config, - socket_addr, - server_address, - rx_sent_queue, - sx_recv_queue, - is_connected_client.clone(), - ) { - log::error!("client stopped with error {e}"); - } - is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed); - }); - Ok(( - Client { - is_connected, - filters_sender, - }, - client_rx_queue, - )) - } +// impl Client { +// pub fn new( +// server_address: String, +// connection_parameters: ConnectionParameters, +// ) -> anyhow::Result<(Client, std::sync::mpsc::Receiver)> { +// let config = configure_client( +// connection_parameters.max_number_of_streams, +// connection_parameters.recieve_window_size, +// connection_parameters.timeout_in_seconds, +// connection_parameters.max_ack_delay, +// connection_parameters.ack_exponent, +// )?; +// let server_address: SocketAddr = server_address.parse()?; +// let socket_addr: SocketAddr = "0.0.0.0:0" +// .parse() +// .expect("Socket address should be returned"); +// let is_connected = Arc::new(AtomicBool::new(false)); +// let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel(); +// let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel(); - pub fn subscribe(&self, filters: Vec) -> anyhow::Result<()> { - let message = Message::Filters(filters); - self.filters_sender.send(message)?; - Ok(()) - } +// let is_connected_client = is_connected.clone(); +// let _client_loop_jh = std::thread::spawn(move || { +// if let Err(e) = client_loop( +// config, +// socket_addr, +// server_address, +// rx_sent_queue, +// sx_recv_queue, +// is_connected_client.clone(), +// ) { +// log::error!("client stopped with error {e}"); +// } +// is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed); +// }); +// Ok(( +// Client { +// is_connected, +// filters_sender, +// }, +// client_rx_queue, +// )) +// } - pub fn is_connected(&self) -> bool { - self.is_connected.load(std::sync::atomic::Ordering::Relaxed) - } -} +// pub fn subscribe(&self, filters: Vec) -> anyhow::Result<()> { +// let message = Message::Filters(filters); +// self.filters_sender.send(message)?; +// Ok(()) +// } -#[cfg(test)] -mod tests { - use itertools::Itertools; - use quic_geyser_common::{ - channel_message::AccountData, - compression::CompressionType, - config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, - filters::Filter, - message::Message, - quic::quic_server::QuicServer, - types::{ - account::Account, connections_parameters::ConnectionParameters, - slot_identifier::SlotIdentifier, - }, - }; - use solana_sdk::pubkey::Pubkey; - use std::{net::SocketAddr, thread::sleep, time::Duration}; +// pub fn is_connected(&self) -> bool { +// self.is_connected.load(std::sync::atomic::Ordering::Relaxed) +// } +// } - pub fn get_account_for_test(slot: u64, data_size: usize) -> Account { - Account { - slot_identifier: SlotIdentifier { slot }, - pubkey: Pubkey::new_unique(), - owner: Pubkey::new_unique(), - write_version: 0, - lamports: 12345, - rent_epoch: u64::MAX, - executable: false, - data: (0..data_size).map(|_| rand::random::()).collect_vec(), - compression_type: CompressionType::None, - data_length: data_size as u64, - } - } +// #[cfg(test)] +// mod tests { +// use itertools::Itertools; +// use quic_geyser_common::{ +// channel_message::AccountData, +// compression::CompressionType, +// config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, +// filters::Filter, +// message::Message, +// quic::quic_server::QuicServer, +// types::{ +// account::Account, connections_parameters::ConnectionParameters, +// slot_identifier::SlotIdentifier, +// }, +// }; +// use solana_sdk::pubkey::Pubkey; +// use std::{net::SocketAddr, thread::sleep, time::Duration}; - use crate::blocking::client::Client; +// pub fn get_account_for_test(slot: u64, data_size: usize) -> Account { +// Account { +// slot_identifier: SlotIdentifier { slot }, +// pubkey: Pubkey::new_unique(), +// owner: Pubkey::new_unique(), +// write_version: 0, +// lamports: 12345, +// rent_epoch: u64::MAX, +// executable: false, +// data: (0..data_size).map(|_| rand::random::()).collect_vec(), +// compression_type: CompressionType::None, +// data_length: data_size as u64, +// } +// } - #[test] - pub fn test_client() { - tracing_subscriber::fmt::init(); - let server_sock: SocketAddr = "0.0.0.0:30000".parse().unwrap(); - let url = format!("127.0.0.1:{}", server_sock.port()); +// use crate::blocking::client::Client; - let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2)); - let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20)); - let msg_acc_3 = Message::AccountMsg(get_account_for_test(2, 100)); - let msg_acc_4 = Message::AccountMsg(get_account_for_test(3, 1_000)); - let msg_acc_5 = Message::AccountMsg(get_account_for_test(4, 10_000)); - let msgs = [msg_acc_1, msg_acc_2, msg_acc_3, msg_acc_4, msg_acc_5]; +// #[test] +// pub fn test_client() { +// tracing_subscriber::fmt::init(); +// let server_sock: SocketAddr = "0.0.0.0:30000".parse().unwrap(); +// let url = format!("127.0.0.1:{}", server_sock.port()); - let jh = { - let msgs = msgs.clone(); - let server_sock = server_sock.clone(); - std::thread::spawn(move || { - let config = ConfigQuicPlugin { - address: server_sock, - quic_parameters: QuicParameters::default(), - compression_parameters: CompressionParameters { - compression_type: CompressionType::None, - }, - number_of_retries: 100, - log_level: "debug".to_string(), - allow_accounts: true, - allow_accounts_at_startup: false, - }; - let quic_server = QuicServer::new(config).unwrap(); - // wait for client to connect and subscribe - sleep(Duration::from_secs(2)); - for msg in msgs { - let Message::AccountMsg(account) = msg else { - panic!("should never happen"); - }; - quic_server - .send_message( - quic_geyser_common::channel_message::ChannelMessage::Account( - AccountData { - pubkey: account.pubkey, - account: account.solana_account(), - write_version: account.write_version, - }, - account.slot_identifier.slot, - ), - ) - .unwrap(); - } - sleep(Duration::from_secs(1)); - }) - }; - // wait for server to start - sleep(Duration::from_secs(1)); +// let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2)); +// let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20)); +// let msg_acc_3 = Message::AccountMsg(get_account_for_test(2, 100)); +// let msg_acc_4 = Message::AccountMsg(get_account_for_test(3, 1_000)); +// let msg_acc_5 = Message::AccountMsg(get_account_for_test(4, 10_000)); +// let msgs = [msg_acc_1, msg_acc_2, msg_acc_3, msg_acc_4, msg_acc_5]; - // server started - let (client, reciever) = Client::new( - url, - ConnectionParameters { - max_number_of_streams: 10, - recieve_window_size: 1_000_000, - timeout_in_seconds: 10, - max_ack_delay: 25, - ack_exponent: 3, - }, - ) - .unwrap(); - client.subscribe(vec![Filter::AccountsAll]).unwrap(); +// let jh = { +// let msgs = msgs.clone(); +// let server_sock = server_sock.clone(); +// std::thread::spawn(move || { +// let config = ConfigQuicPlugin { +// address: server_sock, +// quic_parameters: QuicParameters::default(), +// compression_parameters: CompressionParameters { +// compression_type: CompressionType::None, +// }, +// number_of_retries: 100, +// log_level: "debug".to_string(), +// allow_accounts: true, +// allow_accounts_at_startup: false, +// }; +// let quic_server = QuicServer::new(config).unwrap(); +// // wait for client to connect and subscribe +// sleep(Duration::from_secs(2)); +// for msg in msgs { +// let Message::AccountMsg(account) = msg else { +// panic!("should never happen"); +// }; +// quic_server +// .send_message( +// quic_geyser_common::channel_message::ChannelMessage::Account( +// AccountData { +// pubkey: account.pubkey, +// account: account.solana_account(), +// write_version: account.write_version, +// }, +// account.slot_identifier.slot, +// ), +// ) +// .unwrap(); +// } +// sleep(Duration::from_secs(1)); +// }) +// }; +// // wait for server to start +// sleep(Duration::from_secs(1)); - let mut cnt = 0; - for message_sent in msgs { - let msg = reciever.recv().unwrap(); - log::info!("got message : {}", cnt); - cnt += 1; - assert_eq!(message_sent, msg); - } - jh.join().unwrap(); - } -} +// // server started +// let (client, reciever) = Client::new( +// url, +// ConnectionParameters { +// max_number_of_streams: 10, +// recieve_window_size: 1_000_000, +// timeout_in_seconds: 10, +// max_ack_delay: 25, +// ack_exponent: 3, +// }, +// ) +// .unwrap(); +// client.subscribe(vec![Filter::AccountsAll]).unwrap(); + +// let mut cnt = 0; +// for message_sent in msgs { +// let msg = reciever.recv().unwrap(); +// log::info!("got message : {}", cnt); +// cnt += 1; +// assert_eq!(message_sent, msg); +// } +// jh.join().unwrap(); +// } +// } diff --git a/client/src/lib.rs b/client/src/lib.rs index 433c623..47a0d7f 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,4 +1,2 @@ pub mod blocking; pub mod non_blocking; - -pub const DEFAULT_MAX_STREAM: u64 = quic_geyser_common::quic::configure_client::DEFAULT_MAX_STREAMS; diff --git a/client/src/non_blocking/client.rs b/client/src/non_blocking/client.rs index 710efdf..5726e96 100644 --- a/client/src/non_blocking/client.rs +++ b/client/src/non_blocking/client.rs @@ -1,78 +1,177 @@ +use quic_geyser_common::defaults::ALPN_GEYSER_PROTOCOL_ID; +use quic_geyser_common::defaults::DEFAULT_MAX_RECIEVE_WINDOW_SIZE; +use quic_geyser_common::defaults::MAX_DATAGRAM_SIZE; use quic_geyser_common::filters::Filter; use quic_geyser_common::message::Message; -use quic_geyser_common::quic::configure_client::configure_client; -use quic_geyser_common::quic::quiche_client_loop::client_loop; use quic_geyser_common::types::connections_parameters::ConnectionParameters; -use std::net::SocketAddr; -use std::sync::atomic::AtomicBool; +use quinn::{ + ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, RecvStream, + SendStream, TokioRuntime, TransportConfig, VarInt, +}; +use std::net::{SocketAddr, UdpSocket}; +use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; + +pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> Endpoint { + const MINIMUM_MAXIMUM_TRANSMISSION_UNIT: u16 = 2000; + const INITIAL_MAXIMUM_TRANSMISSION_UNIT: u16 = MINIMUM_MAXIMUM_TRANSMISSION_UNIT; + + let mut endpoint = { + let client_socket = UdpSocket::bind("0.0.0.0:0").expect("Client socket should be binded"); + let mut config = EndpointConfig::default(); + config + .max_udp_payload_size(MAX_DATAGRAM_SIZE as u16) + .expect("Should set max MTU"); + quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime)) + .expect("create_endpoint quinn::Endpoint::new") + }; + + let cert = rcgen::generate_simple_self_signed(vec!["quic_geyser_client".into()]).unwrap(); + let key = rustls::PrivateKey(cert.serialize_private_key_der()); + let cert = rustls::Certificate(cert.serialize_der().unwrap()); + + let mut crypto = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_custom_certificate_verifier(Arc::new(ClientSkipServerVerification {})) + .with_client_auth_cert(vec![cert], key) + .expect("Should create client config"); + + crypto.enable_early_data = true; + crypto.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()]; + + let mut config = ClientConfig::new(Arc::new(crypto)); + let mut transport_config = TransportConfig::default(); + + let timeout = IdleTimeout::try_from(Duration::from_secs(30)).unwrap(); + transport_config.max_idle_timeout(Some(timeout)); + transport_config.keep_alive_interval(Some(Duration::from_secs(1))); + transport_config + .datagram_receive_buffer_size(Some(connection_parameters.recieve_window_size as usize)); + transport_config.datagram_send_buffer_size(connection_parameters.recieve_window_size as usize); + transport_config.initial_mtu(INITIAL_MAXIMUM_TRANSMISSION_UNIT); + transport_config.max_concurrent_bidi_streams(VarInt::from( + connection_parameters.max_number_of_streams as u32, + )); + transport_config.max_concurrent_uni_streams(VarInt::from( + connection_parameters.max_number_of_streams as u32, + )); + transport_config.min_mtu(MINIMUM_MAXIMUM_TRANSMISSION_UNIT); + transport_config.mtu_discovery_config(None); + transport_config.enable_segmentation_offload(false); + config.transport_config(Arc::new(transport_config)); + + endpoint.set_default_client_config(config); + + endpoint +} + +pub async fn recv_message( + mut recv_stream: RecvStream, + timeout_in_seconds: u64, +) -> anyhow::Result { + let mut buffer: Vec = vec![]; + + while let Some(data) = tokio::time::timeout( + Duration::from_secs(timeout_in_seconds), + recv_stream.read_chunk(DEFAULT_MAX_RECIEVE_WINDOW_SIZE as usize, true), + ) + .await?? + { + let bytes = data.bytes.to_vec(); + buffer.extend_from_slice(&bytes); + } + Ok(bincode::deserialize::(&buffer)?) +} pub struct Client { - is_connected: Arc, - filters_sender: std::sync::mpsc::Sender, + connection: Connection, +} + +pub async fn send_message(mut send_stream: SendStream, message: &Message) -> anyhow::Result<()> { + let binary = bincode::serialize(&message)?; + send_stream.write_all(&binary).await?; + send_stream.finish().await?; + Ok(()) } impl Client { - pub fn new( + pub async fn new( server_address: String, connection_parameters: ConnectionParameters, ) -> anyhow::Result<(Client, tokio::sync::mpsc::UnboundedReceiver)> { - let config = configure_client( - connection_parameters.max_number_of_streams, - connection_parameters.recieve_window_size, - connection_parameters.timeout_in_seconds, - connection_parameters.max_ack_delay, - connection_parameters.ack_exponent, - )?; - let server_address: SocketAddr = server_address.parse()?; - let socket_addr: SocketAddr = "0.0.0.0:0" - .parse() - .expect("Socket address should be returned"); - let is_connected = Arc::new(AtomicBool::new(false)); - let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel(); - let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel(); + let endpoint = create_client_endpoint(connection_parameters); + let socket_addr = SocketAddr::from_str(&server_address)?; + let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?; - let is_connected_client = is_connected.clone(); - let _client_loop_jh = std::thread::spawn(move || { - if let Err(e) = client_loop( - config, - socket_addr, - server_address, - rx_sent_queue, - sx_recv_queue, - is_connected_client.clone(), - ) { - log::error!("client stopped with error {e}"); - } - is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed); - }); + let (message_sx_queue, message_rx_queue) = + tokio::sync::mpsc::unbounded_channel::(); - let (tokio_sx_queue, tokio_rx_queue) = tokio::sync::mpsc::unbounded_channel::(); - let _tokio_depile_loop = std::thread::spawn(move || { - while let Ok(message) = client_rx_queue.recv() { - if tokio_sx_queue.send(message).is_err() { - break; + let connection = connecting.await?; + { + let connection = connection.clone(); + tokio::spawn(async move { + loop { + let stream = connection.accept_uni().await; + match stream { + Ok(recv_stream) => { + let sender = message_sx_queue.clone(); + tokio::spawn(async move { + let message = recv_message(recv_stream, 10).await; + match message { + Ok(message) => { + let _ = sender.send(message); + } + Err(e) => { + log::error!("Error getting message {}", e); + } + } + }); + } + Err(e) => match &e { + ConnectionError::ConnectionClosed(_) + | ConnectionError::ApplicationClosed(_) + | ConnectionError::LocallyClosed => { + break; + } + _ => { + log::error!("Got {} while listing to the connection", e); + } + }, + } } - } - }); + }); + } - Ok(( - Client { - is_connected, - filters_sender, - }, - tokio_rx_queue, - )) + Ok((Client { connection }, message_rx_queue)) } - pub fn subscribe(&self, filters: Vec) -> anyhow::Result<()> { - let message = Message::Filters(filters); - self.filters_sender.send(message)?; + pub async fn subscribe(&self, filters: Vec) -> anyhow::Result<()> { + let send_stream = self.connection.open_uni().await?; + send_message(send_stream, &Message::Filters(filters)).await?; Ok(()) } +} - pub fn is_connected(&self) -> bool { - self.is_connected.load(std::sync::atomic::Ordering::Relaxed) +pub struct ClientSkipServerVerification; + +impl ClientSkipServerVerification { + pub fn new() -> Arc { + Arc::new(Self) + } +} + +impl rustls::client::ServerCertVerifier for ClientSkipServerVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) } } @@ -85,12 +184,12 @@ mod tests { config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, filters::Filter, message::Message, - quic::quic_server::QuicServer, types::{ account::Account, connections_parameters::ConnectionParameters, slot_identifier::SlotIdentifier, }, }; + use quic_geyser_server::quic_server::QuicServer; use solana_sdk::pubkey::Pubkey; use std::{net::SocketAddr, thread::sleep, time::Duration}; @@ -175,8 +274,9 @@ mod tests { ack_exponent: 3, }, ) + .await .unwrap(); - client.subscribe(vec![Filter::AccountsAll]).unwrap(); + client.subscribe(vec![Filter::AccountsAll]).await.unwrap(); let mut cnt = 0; for message_sent in msgs { diff --git a/common/Cargo.toml b/common/Cargo.toml index 79b7061..f648819 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quic-geyser-common" -version = "0.1.0" +version = "0.1.3" edition = "2021" [dependencies] @@ -8,19 +8,11 @@ solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } serde = { workspace = true } -bincode = { workspace = true } -lz4 = { workspace = true } -quiche = { workspace = true, features = ["boringssl-boring-crate"] } -boring = { workspace = true } -rcgen = { workspace = true } anyhow = { workspace = true } log = { workspace = true } thiserror = {workspace = true} -itertools = { workspace = true } -ring = {workspace = true} - -mio = { workspace = true } -mio_channel = { workspace = true } +itertools = { workspace = true } +lz4 = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/common/src/config.rs b/common/src/config.rs index 296d904..3a895d2 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -4,14 +4,12 @@ use serde::{Deserialize, Serialize}; use crate::{ compression::CompressionType, - quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS}, + defaults::{ + DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY, + DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS, + }, }; -pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10; -pub const DEFAULT_MAX_NB_CONNECTIONS: u64 = 10; -pub const DEFAULT_MAX_ACK_DELAY: u64 = 250; -pub const DEFAULT_ACK_EXPONENT: u64 = 3; - #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct ConfigQuicPlugin { diff --git a/common/src/lib.rs b/common/src/lib.rs index 2a9196c..4b8998b 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,8 +1,8 @@ pub mod channel_message; pub mod compression; pub mod config; +pub mod defaults; pub mod filters; pub mod message; pub mod plugin_error; -pub mod quic; pub mod types; diff --git a/common/src/types/connections_parameters.rs b/common/src/types/connections_parameters.rs index f5b1ca1..941ad34 100644 --- a/common/src/types/connections_parameters.rs +++ b/common/src/types/connections_parameters.rs @@ -1,8 +1,8 @@ use serde::{Deserialize, Serialize}; -use crate::{ - config::{DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY}, - quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS}, +use crate::defaults::{ + DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY, + DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS, }; #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] diff --git a/examples/tester-client/Cargo.toml b/examples/tester-client/Cargo.toml index c73c729..9890fe9 100644 --- a/examples/tester-client/Cargo.toml +++ b/examples/tester-client/Cargo.toml @@ -7,7 +7,7 @@ authors = ["Godmode Galactus"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -solana-rpc-client = "~1.17.28" +solana-rpc-client = "~1.17.31" clap = { workspace = true, features = ["derive", "env"] } serde = { workspace = true } @@ -17,6 +17,7 @@ anyhow = { workspace = true } log = { workspace = true } bincode = { workspace = true } tracing-subscriber = { workspace = true } +tokio = { workspace = true } quic-geyser-client = { workspace = true } quic-geyser-common = { workspace = true } \ No newline at end of file diff --git a/examples/tester-client/src/main.rs b/examples/tester-client/src/main.rs index 31c73a2..e56c1b5 100644 --- a/examples/tester-client/src/main.rs +++ b/examples/tester-client/src/main.rs @@ -1,16 +1,13 @@ use std::{ sync::{atomic::AtomicU64, Arc}, thread::sleep, - time::{Duration, Instant}, + time::Duration, }; use clap::Parser; use cli::Args; -use quic_geyser_client::blocking::client::Client; -use quic_geyser_common::{ - filters::Filter, quic::configure_client::DEFAULT_MAX_RECIEVE_WINDOW_SIZE, - types::connections_parameters::ConnectionParameters, -}; +use quic_geyser_client::non_blocking::client::Client; +use quic_geyser_common::{filters::Filter, types::connections_parameters::ConnectionParameters}; use solana_rpc_client::rpc_client::RpcClient; use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel}; @@ -38,21 +35,14 @@ pub mod cli; // let config_json = json!(config); //println!("{}", config_json); -pub fn main() { +#[tokio::main] +pub async fn main() { tracing_subscriber::fmt::init(); let args = Args::parse(); println!("Connecting"); - let (client, reciever) = Client::new( - args.url, - ConnectionParameters { - max_number_of_streams: 1024 * 1024, - recieve_window_size: DEFAULT_MAX_RECIEVE_WINDOW_SIZE, - timeout_in_seconds: 30, - max_ack_delay: 25, - ack_exponent: 3, - }, - ) - .unwrap(); + let (client, mut reciever) = Client::new(args.url, ConnectionParameters::default()) + .await + .unwrap(); println!("Connected"); let bytes_transfered = Arc::new(AtomicU64::new(0)); @@ -141,78 +131,64 @@ pub fn main() { Filter::Slot, Filter::BlockMeta, ]) + .await .unwrap(); println!("Subscribed"); - let instant = Instant::now(); + while let Some(message) = reciever.recv().await { + let message_size = bincode::serialize(&message).unwrap().len(); + bytes_transfered.fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed); + match message { + quic_geyser_common::message::Message::AccountMsg(account) => { + log::trace!("got account notification : {} ", account.pubkey); + account_notification.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let data_len = account.data_length as usize; + total_accounts_size + .fetch_add(account.data_length, std::sync::atomic::Ordering::Relaxed); + let solana_account = account.solana_account(); + if solana_account.data.len() != data_len { + println!("data length different"); + println!( + "Account : {}, owner: {}=={}, datalen: {}=={}, lamports: {}", + account.pubkey, + account.owner, + solana_account.owner, + data_len, + solana_account.data.len(), + solana_account.lamports + ); + panic!("Wrong account data"); + } - loop { - match reciever.recv() { - Ok(message) => { - let message_size = bincode::serialize(&message).unwrap().len(); - bytes_transfered - .fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed); - match message { - quic_geyser_common::message::Message::AccountMsg(account) => { - log::trace!("got account notification : {} ", account.pubkey); - account_notification.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let data_len = account.data_length as usize; - total_accounts_size - .fetch_add(account.data_length, std::sync::atomic::Ordering::Relaxed); - let solana_account = account.solana_account(); - if solana_account.data.len() != data_len { - println!("data length different"); - println!( - "Account : {}, owner: {}=={}, datalen: {}=={}, lamports: {}", - account.pubkey, - account.owner, - solana_account.owner, - data_len, - solana_account.data.len(), - solana_account.lamports - ); - panic!("Wrong account data"); - } - - account_slot.store( - account.slot_identifier.slot, - std::sync::atomic::Ordering::Relaxed, - ); - } - quic_geyser_common::message::Message::SlotMsg(slot) => { - log::trace!("got slot notification : {} ", slot.slot); - slot_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if slot.commitment_level == CommitmentLevel::Processed { - slot_slot.store(slot.slot, std::sync::atomic::Ordering::Relaxed); - } - } - quic_geyser_common::message::Message::BlockMetaMsg(block_meta) => { - log::trace!("got blockmeta notification : {} ", block_meta.slot); - blockmeta_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - blockmeta_slot.store(block_meta.slot, std::sync::atomic::Ordering::Relaxed); - } - quic_geyser_common::message::Message::TransactionMsg(tx) => { - log::trace!( - "got transaction notification: {}", - tx.signatures[0].to_string() - ); - transaction_notifications - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - quic_geyser_common::message::Message::Filters(_) => { - // Not supported - } - quic_geyser_common::message::Message::Ping => { - // not supported ping - } + account_slot.store( + account.slot_identifier.slot, + std::sync::atomic::Ordering::Relaxed, + ); + } + quic_geyser_common::message::Message::SlotMsg(slot) => { + log::trace!("got slot notification : {} ", slot.slot); + slot_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if slot.commitment_level == CommitmentLevel::Processed { + slot_slot.store(slot.slot, std::sync::atomic::Ordering::Relaxed); } } - Err(e) => { - println!( - "Conection closed and streaming stopped in {} seconds with error {}, connected : {}", - instant.elapsed().as_secs(), e, client.is_connected() + quic_geyser_common::message::Message::BlockMetaMsg(block_meta) => { + log::trace!("got blockmeta notification : {} ", block_meta.slot); + blockmeta_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + blockmeta_slot.store(block_meta.slot, std::sync::atomic::Ordering::Relaxed); + } + quic_geyser_common::message::Message::TransactionMsg(tx) => { + log::trace!( + "got transaction notification: {}", + tx.signatures[0].to_string() ); - break; + transaction_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + quic_geyser_common::message::Message::Filters(_) => { + // Not supported + } + quic_geyser_common::message::Message::Ping => { + // not supported ping } } } diff --git a/examples/tester-server/Cargo.toml b/examples/tester-server/Cargo.toml index 404ddca..0975900 100644 --- a/examples/tester-server/Cargo.toml +++ b/examples/tester-server/Cargo.toml @@ -20,4 +20,5 @@ tracing-subscriber = { workspace = true } rand = "0.8.5" -quic-geyser-common = { workspace = true } \ No newline at end of file +quic-geyser-common = { workspace = true } +quic-geyser-server = { workspace = true } \ No newline at end of file diff --git a/examples/tester-server/src/main.rs b/examples/tester-server/src/main.rs index d5ae80f..f5bfe76 100644 --- a/examples/tester-server/src/main.rs +++ b/examples/tester-server/src/main.rs @@ -10,8 +10,8 @@ use itertools::Itertools; use quic_geyser_common::{ channel_message::{AccountData, ChannelMessage}, config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, - quic::quic_server::QuicServer, }; +use quic_geyser_server::quic_server::QuicServer; use rand::{thread_rng, Rng}; use solana_sdk::{account::Account, pubkey::Pubkey}; diff --git a/plugin/Cargo.toml b/plugin/Cargo.toml index dfa358d..e695a7c 100644 --- a/plugin/Cargo.toml +++ b/plugin/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quic-geyser-plugin" -version = "0.1.0" +version = "0.1.3" edition = "2021" authors = ["Godmode Galactus"] @@ -24,6 +24,7 @@ log = { workspace = true } thiserror = {workspace = true} quic-geyser-common = { workspace = true } +quic-geyser-server = { workspace = true } [build-dependencies] anyhow = { workspace = true } diff --git a/plugin/src/quic_plugin.rs b/plugin/src/quic_plugin.rs index f64abfa..3b0f7ef 100644 --- a/plugin/src/quic_plugin.rs +++ b/plugin/src/quic_plugin.rs @@ -5,13 +5,13 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{ use quic_geyser_common::{ channel_message::{AccountData, ChannelMessage}, plugin_error::QuicGeyserError, - quic::quic_server::QuicServer, types::{ block_meta::BlockMeta, slot_identifier::SlotIdentifier, transaction::{Transaction, TransactionMeta}, }, }; +use quic_geyser_server::quic_server::QuicServer; use solana_sdk::{ account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message, pubkey::Pubkey, diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 677432b..91cbebc 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "quic-geyser-plugin-proxy" -version = "0.1.0" +version = "0.1.3" edition = "2021" authors = ["Godmode Galactus"] @@ -17,6 +17,8 @@ anyhow = { workspace = true } log = { workspace = true } bincode = { workspace = true } tracing-subscriber = { workspace = true } +tokio = { workspace = true } quic-geyser-client = { workspace = true } -quic-geyser-common = { workspace = true } \ No newline at end of file +quic-geyser-common = { workspace = true } +quic-geyser-server = { workspace = true } \ No newline at end of file diff --git a/proxy/src/cli.rs b/proxy/src/cli.rs index a1ed73e..1e0c1ac 100644 --- a/proxy/src/cli.rs +++ b/proxy/src/cli.rs @@ -1,7 +1,7 @@ use clap::Parser; -use quic_geyser_common::{ - config::{DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY}, - quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS}, +use quic_geyser_common::defaults::{ + DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY, + DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS, }; #[derive(Parser, Debug, Clone)] diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 8faa1df..03c77d9 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -2,25 +2,26 @@ use std::{net::SocketAddr, str::FromStr}; use clap::Parser; use cli::Args; -use quic_geyser_client::blocking::client::Client; +use quic_geyser_client::non_blocking::client::Client; use quic_geyser_common::{ channel_message::{AccountData, ChannelMessage}, config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, filters::Filter, - quic::quic_server::QuicServer, types::{ block_meta::BlockMeta, connections_parameters::ConnectionParameters, transaction::Transaction, }, }; +use quic_geyser_server::quic_server::QuicServer; pub mod cli; -pub fn main() -> anyhow::Result<()> { +#[tokio::main()] +pub async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let args = Args::parse(); - let (client, message_channel) = Client::new( + let (client, mut message_channel) = Client::new( args.source_url, ConnectionParameters { max_number_of_streams: args.max_number_of_streams_per_client, @@ -29,15 +30,18 @@ pub fn main() -> anyhow::Result<()> { max_ack_delay: args.max_ack_delay, ack_exponent: args.ack_exponent, }, - )?; + ) + .await?; log::info!("Subscribing"); - client.subscribe(vec![ - Filter::AccountsAll, - Filter::TransactionsAll, - Filter::Slot, - Filter::BlockMeta, - ])?; + client + .subscribe(vec![ + Filter::AccountsAll, + Filter::TransactionsAll, + Filter::Slot, + Filter::BlockMeta, + ]) + .await?; let quic_config = ConfigQuicPlugin { address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(), @@ -79,7 +83,7 @@ pub fn main() -> anyhow::Result<()> { } }); - while let Ok(message) = message_channel.recv() { + while let Some(message) = message_channel.recv().await { let channel_message = match message { quic_geyser_common::message::Message::AccountMsg(account_message) => { ChannelMessage::Account( diff --git a/server/Cargo.toml b/server/Cargo.toml new file mode 100644 index 0000000..e2b99d6 --- /dev/null +++ b/server/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "quic-geyser-server" +version = "0.1.3" +edition = "2021" + +[dependencies] +solana-sdk = { workspace = true } +solana-transaction-status = { workspace = true } + +serde = { workspace = true } +anyhow = { workspace = true } +log = { workspace = true } +thiserror = {workspace = true} +itertools = { workspace = true } +bincode = { workspace = true } +ring = {workspace = true} +quiche = { workspace = true, features = ["boringssl-boring-crate"] } +rcgen = { workspace = true } +boring = { workspace = true } + +mio = { workspace = true } +mio_channel = { workspace = true } + +quic-geyser-common = { workspace = true } + +[dev-dependencies] +rand = { workspace = true } +tracing-subscriber = { workspace = true } \ No newline at end of file diff --git a/common/src/quic/configure_client.rs b/server/src/configure_client.rs similarity index 83% rename from common/src/quic/configure_client.rs rename to server/src/configure_client.rs index 7121bd7..cb1832e 100644 --- a/common/src/quic/configure_client.rs +++ b/server/src/configure_client.rs @@ -1,9 +1,4 @@ -use crate::quic::configure_server::ALPN_GEYSER_PROTOCOL_ID; - -use super::configure_server::MAX_DATAGRAM_SIZE; - -pub const DEFAULT_MAX_STREAMS: u64 = 64 * 1024; -pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 1_000_000; // 64 MBs +use quic_geyser_common::defaults::{ALPN_GEYSER_PROTOCOL_ID, MAX_DATAGRAM_SIZE}; pub fn configure_client( maximum_concurrent_streams: u64, diff --git a/common/src/quic/configure_server.rs b/server/src/configure_server.rs similarity index 93% rename from common/src/quic/configure_server.rs rename to server/src/configure_server.rs index 9aadf79..3491c4c 100644 --- a/common/src/quic/configure_server.rs +++ b/server/src/configure_server.rs @@ -1,9 +1,9 @@ use boring::ssl::SslMethod; -use crate::config::QuicParameters; - -pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser"; -pub const MAX_DATAGRAM_SIZE: usize = 1350; // MAX: 65527 +use quic_geyser_common::{ + config::QuicParameters, + defaults::{ALPN_GEYSER_PROTOCOL_ID, MAX_DATAGRAM_SIZE}, +}; pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result { let max_concurrent_streams = quic_parameter.max_number_of_streams_per_client; diff --git a/common/src/quic/mod.rs b/server/src/lib.rs similarity index 100% rename from common/src/quic/mod.rs rename to server/src/lib.rs diff --git a/common/src/quic/quic_server.rs b/server/src/quic_server.rs similarity index 95% rename from common/src/quic/quic_server.rs rename to server/src/quic_server.rs index 9f7a78f..1a70d43 100644 --- a/common/src/quic/quic_server.rs +++ b/server/src/quic_server.rs @@ -1,8 +1,8 @@ use std::{fmt::Debug, sync::mpsc}; -use crate::{ +use crate::configure_server::configure_server; +use quic_geyser_common::{ channel_message::ChannelMessage, config::ConfigQuicPlugin, plugin_error::QuicGeyserError, - quic::configure_server::configure_server, }; use super::quiche_server_loop::server_loop; diff --git a/common/src/quic/quiche_client_loop.rs b/server/src/quiche_client_loop.rs similarity index 97% rename from common/src/quic/quiche_client_loop.rs rename to server/src/quiche_client_loop.rs index 62d6519..87d1882 100644 --- a/common/src/quic/quiche_client_loop.rs +++ b/server/src/quiche_client_loop.rs @@ -4,15 +4,14 @@ use std::{ time::{Duration, Instant}, }; +use quic_geyser_common::{defaults::MAX_DATAGRAM_SIZE, message::Message}; + use crate::{ - message::Message, - quic::{ - configure_server::MAX_DATAGRAM_SIZE, - quiche_reciever::{recv_message, ReadStreams}, - quiche_sender::{handle_writable, send_message}, - quiche_utils::{get_next_unidi, PartialResponses}, - }, + quiche_reciever::{recv_message, ReadStreams}, + quiche_sender::{handle_writable, send_message}, + quiche_utils::{get_next_unidi, PartialResponses}, }; + use anyhow::bail; use ring::rand::{SecureRandom, SystemRandom}; @@ -278,19 +277,20 @@ mod tests { use itertools::Itertools; use solana_sdk::{account::Account, pubkey::Pubkey}; - use crate::{ + use quic_geyser_common::{ channel_message::{AccountData, ChannelMessage}, compression::CompressionType, config::QuicParameters, filters::Filter, message::Message, - quic::{ - configure_client::configure_client, configure_server::configure_server, - quiche_server_loop::server_loop, - }, types::block_meta::SlotMeta, }; + use crate::{ + configure_client::configure_client, configure_server::configure_server, + quiche_server_loop::server_loop, + }; + use super::client_loop; #[test] diff --git a/common/src/quic/quiche_reciever.rs b/server/src/quiche_reciever.rs similarity index 95% rename from common/src/quic/quiche_reciever.rs rename to server/src/quiche_reciever.rs index c5c5758..5254068 100644 --- a/common/src/quic/quiche_reciever.rs +++ b/server/src/quiche_reciever.rs @@ -2,9 +2,7 @@ use std::collections::HashMap; use anyhow::bail; -use crate::message::Message; - -use super::configure_server::MAX_DATAGRAM_SIZE; +use quic_geyser_common::{defaults::MAX_DATAGRAM_SIZE, message::Message}; pub fn convert_binary_to_message(bytes: Vec) -> anyhow::Result { Ok(bincode::deserialize::(&bytes)?) diff --git a/common/src/quic/quiche_sender.rs b/server/src/quiche_sender.rs similarity index 95% rename from common/src/quic/quiche_sender.rs rename to server/src/quiche_sender.rs index bc94a97..bc5ea9a 100644 --- a/common/src/quic/quiche_sender.rs +++ b/server/src/quiche_sender.rs @@ -1,5 +1,5 @@ -use super::quiche_utils::PartialResponses; -use crate::{message::Message, quic::quiche_utils::PartialResponse}; +use crate::quiche_utils::{PartialResponse, PartialResponses}; +use quic_geyser_common::message::Message; use quiche::Connection; pub fn convert_to_binary(message: &Message) -> anyhow::Result> { diff --git a/common/src/quic/quiche_server_loop.rs b/server/src/quiche_server_loop.rs similarity index 98% rename from common/src/quic/quiche_server_loop.rs rename to server/src/quiche_server_loop.rs index 9c6ab3c..5a1b2b8 100644 --- a/common/src/quic/quiche_server_loop.rs +++ b/server/src/quiche_server_loop.rs @@ -13,21 +13,22 @@ use itertools::Itertools; use quiche::ConnectionId; use ring::rand::SystemRandom; -use crate::{ +use quic_geyser_common::{ channel_message::ChannelMessage, compression::CompressionType, + defaults::MAX_DATAGRAM_SIZE, filters::Filter, message::Message, - quic::{ - quiche_reciever::recv_message, - quiche_sender::{handle_writable, send_message}, - quiche_utils::{get_next_unidi, mint_token, validate_token}, - }, types::{account::Account, block_meta::SlotMeta, slot_identifier::SlotIdentifier}, }; +use crate::{ + quiche_reciever::recv_message, + quiche_sender::{handle_writable, send_message}, + quiche_utils::{get_next_unidi, mint_token, validate_token}, +}; + use super::{ - configure_server::MAX_DATAGRAM_SIZE, quiche_reciever::ReadStreams, quiche_utils::{write_to_socket, PartialResponses}, }; @@ -304,7 +305,7 @@ fn create_client_task( std::thread::spawn(move || { let mut partial_responses = PartialResponses::new(); let mut read_streams = ReadStreams::new(); - let mut next_stream: u64 = 1; + let mut next_stream: u64 = 3; let mut connection = connection; let mut instance = Instant::now(); let mut closed = false; diff --git a/common/src/quic/quiche_utils.rs b/server/src/quiche_utils.rs similarity index 100% rename from common/src/quic/quiche_utils.rs rename to server/src/quiche_utils.rs