From 47048e31d0ac6e9d17a57fa3d0ab91c5070c0961 Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Mon, 20 May 2024 11:28:30 +0200 Subject: [PATCH] Adding a new example server, other minor changes --- Cargo.lock | 58 ++++++++++----- Cargo.toml | 3 +- client/Cargo.toml | 2 +- client/src/client.rs | 11 ++- common/src/config.rs | 7 +- common/src/quic/connection_manager.rs | 11 +-- common/src/quic/quic_server.rs | 24 +++++-- {tester => examples/tester-client}/Cargo.toml | 8 +-- {tester => examples/tester-client}/src/cli.rs | 0 .../tester-client}/src/main.rs | 0 examples/tester-server/Cargo.toml | 26 +++++++ examples/tester-server/src/cli.rs | 17 +++++ examples/tester-server/src/main.rs | 71 +++++++++++++++++++ plugin/Cargo.toml | 2 +- plugin/src/quic_plugin.rs | 4 +- 15 files changed, 199 insertions(+), 45 deletions(-) rename {tester => examples/tester-client}/Cargo.toml (76%) rename {tester => examples/tester-client}/src/cli.rs (100%) rename {tester => examples/tester-client}/src/main.rs (100%) create mode 100644 examples/tester-server/Cargo.toml create mode 100644 examples/tester-server/src/cli.rs create mode 100644 examples/tester-server/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index bf922ed..2e0310e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1302,25 +1302,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "geyser-quic-plugin-tester" -version = "0.1.0" -dependencies = [ - "anyhow", - "bincode", - "clap", - "futures", - "log", - "quic-geyser-client", - "quic-geyser-common", - "quic-geyser-plugin", - "serde", - "serde_json", - "solana-rpc-client", - "solana-sdk", - "tokio", -] - [[package]] name = "gimli" version = "0.28.1" @@ -2324,6 +2305,45 @@ dependencies = [ "vergen", ] +[[package]] +name = "quic-plugin-tester-client" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "clap", + "futures", + "log", + "quic-geyser-client", + "quic-geyser-common", + "quic-geyser-plugin", + "serde", + "serde_json", + "solana-rpc-client", + "solana-sdk", + "tokio", +] + +[[package]] +name = "quic-plugin-tester-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "clap", + "futures", + "itertools", + "log", + "quic-geyser-client", + "quic-geyser-common", + "quic-geyser-plugin", + "rand 0.8.5", + "serde", + "serde_json", + "solana-sdk", + "tokio", +] + [[package]] name = "quinn" version = "0.10.2" diff --git a/Cargo.toml b/Cargo.toml index 7f60249..22965b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,8 @@ members = [ "plugin", "client", "common", - "tester", + "examples/tester-client", + "examples/tester-server", ] [workspace.package] diff --git a/client/Cargo.toml b/client/Cargo.toml index fd4f579..640643e 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -14,4 +14,4 @@ async-stream = { workspace = true } tokio = { workspace = true } log = { workspace = true } -quic-geyser-common = { path = "../common" } \ No newline at end of file +quic-geyser-common = { workspace = true } \ No newline at end of file diff --git a/client/src/client.rs b/client/src/client.rs index 1c92618..2242780 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -90,13 +90,12 @@ mod tests { use futures::StreamExt; use quic_geyser_common::{ - filters::{AccountFilter, Filter}, + filters::Filter, message::Message, quic::{configure_server::configure_server, connection_manager::ConnectionManager}, types::{account::Account, connections_parameters::ConnectionParameters}, }; use quinn::{Endpoint, EndpointConfig, TokioRuntime}; - use solana_sdk::pubkey::Pubkey; use tokio::{pin, sync::Notify}; use crate::client::Client; @@ -135,7 +134,7 @@ mod tests { notify_server_start.notify_one(); notify_subscription.notified().await; for msg in msgs { - connection_manager.dispatch(msg, 10).await; + connection_manager.dispatch(msg, 10, false).await; } }); } @@ -146,9 +145,9 @@ mod tests { let client = Client::new( url, ConnectionParameters { - max_number_of_streams: 30, - streams_for_slot_data: 10, - streams_for_transactions: 10, + max_number_of_streams: 3, + streams_for_slot_data: 1, + streams_for_transactions: 1, }, ) .await diff --git a/common/src/config.rs b/common/src/config.rs index d57c7a0..fe44628 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize}; use crate::{compression::CompressionType, quic::configure_client::DEFAULT_MAX_STREAMS}; +pub const DEFAULT_WINDOW_SIZE: u32 = 1_000_000; +pub const DEFAULT_CONNECTION_TIMEOUT: u32 = 10; + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct ConfigQuicPlugin { @@ -39,8 +42,8 @@ impl Default for QuicParameters { fn default() -> Self { Self { max_number_of_streams_per_client: DEFAULT_MAX_STREAMS, - recieve_window_size: 1_000_000, // 1 Mb - connection_timeout: 10, // 10s + recieve_window_size: DEFAULT_WINDOW_SIZE, // 1 Mb + connection_timeout: DEFAULT_CONNECTION_TIMEOUT, // 10s } } } diff --git a/common/src/quic/connection_manager.rs b/common/src/quic/connection_manager.rs index f41476a..8cceeaa 100644 --- a/common/src/quic/connection_manager.rs +++ b/common/src/quic/connection_manager.rs @@ -187,7 +187,7 @@ impl ConnectionManager { }); } - pub async fn dispatch(&self, message: Message, retry_count: u64) { + pub async fn dispatch(&self, message: Message, retry_count: u64, drop_lagger: bool) { let lk = self.connections.read().await; for connection_data in lk.iter() { @@ -216,7 +216,7 @@ impl ConnectionManager { let id = connection_data.id; tokio::spawn(async move { - let permit_result = semaphore.try_acquire_owned(); + let permit_result = semaphore.clone().try_acquire_owned(); let _permit = match permit_result { Ok(permit) => permit, @@ -227,8 +227,11 @@ impl ConnectionManager { id, message_type ); - connection.close(VarInt::from_u32(0), b"laggy client"); - return; + if drop_lagger { + connection.close(VarInt::from_u32(0), b"laggy client"); + return; + } + semaphore.acquire_owned().await.expect("Permit is aquired") } }; diff --git a/common/src/quic/quic_server.rs b/common/src/quic/quic_server.rs index 6cb4457..ce149f2 100644 --- a/common/src/quic/quic_server.rs +++ b/common/src/quic/quic_server.rs @@ -39,7 +39,11 @@ pub struct QuicServer { } impl QuicServer { - pub fn new(runtime: Runtime, config: ConfigQuicPlugin) -> anyhow::Result { + pub fn new( + runtime: Runtime, + config: ConfigQuicPlugin, + drop_laggers: bool, + ) -> anyhow::Result { let server_config = configure_server( config.quic_parameters.max_number_of_streams_per_client, config.quic_parameters.recieve_window_size, @@ -74,6 +78,7 @@ impl QuicServer { slot, compression_type, retry_count, + drop_laggers, ); } } @@ -83,15 +88,21 @@ impl QuicServer { parent, commitment_level, }); - quic_connection_manager.dispatch(message, retry_count).await; + quic_connection_manager + .dispatch(message, retry_count, drop_laggers) + .await; } ChannelMessage::BlockMeta(block_meta) => { let message = Message::BlockMetaMsg(block_meta); - quic_connection_manager.dispatch(message, retry_count).await; + quic_connection_manager + .dispatch(message, retry_count, drop_laggers) + .await; } ChannelMessage::Transaction(transaction) => { let message = Message::TransactionMsg(transaction); - quic_connection_manager.dispatch(message, retry_count).await; + quic_connection_manager + .dispatch(message, retry_count, drop_laggers) + .await; } } } @@ -118,6 +129,7 @@ fn process_account_message( slot: Slot, compression_type: CompressionType, retry_count: u64, + drop_laggers: bool, ) { tokio::spawn(async move { let slot_identifier = SlotIdentifier { slot }; @@ -130,6 +142,8 @@ fn process_account_message( ); let message = Message::AccountMsg(geyser_account); - quic_connection_manager.dispatch(message, retry_count).await; + quic_connection_manager + .dispatch(message, retry_count, drop_laggers) + .await; }); } diff --git a/tester/Cargo.toml b/examples/tester-client/Cargo.toml similarity index 76% rename from tester/Cargo.toml rename to examples/tester-client/Cargo.toml index fce09c6..1841e36 100644 --- a/tester/Cargo.toml +++ b/examples/tester-client/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "geyser-quic-plugin-tester" +name = "quic-plugin-tester-client" version = "0.1.0" edition = "2021" authors = ["Godmode Galactus"] @@ -19,6 +19,6 @@ log = { workspace = true } futures = { workspace = true } bincode = { workspace = true } -quic-geyser-client = { path = "../client" } -quic-geyser-common = { path = "../common" } -quic-geyser-plugin = { path = "../plugin" } \ No newline at end of file +quic-geyser-client = { workspace = true } +quic-geyser-common = { workspace = true } +quic-geyser-plugin = { workspace = true } \ No newline at end of file diff --git a/tester/src/cli.rs b/examples/tester-client/src/cli.rs similarity index 100% rename from tester/src/cli.rs rename to examples/tester-client/src/cli.rs diff --git a/tester/src/main.rs b/examples/tester-client/src/main.rs similarity index 100% rename from tester/src/main.rs rename to examples/tester-client/src/main.rs diff --git a/examples/tester-server/Cargo.toml b/examples/tester-server/Cargo.toml new file mode 100644 index 0000000..9e3183e --- /dev/null +++ b/examples/tester-server/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "quic-plugin-tester-server" +version = "0.1.0" +edition = "2021" +authors = ["Godmode Galactus"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] + +tokio = { workspace = true } +clap = { workspace = true, features = ["derive", "env"] } +serde = { workspace = true } +solana-sdk = { workspace = true } +serde_json = { workspace = true } +anyhow = { workspace = true } +log = { workspace = true } +futures = { workspace = true } +bincode = { workspace = true } +itertools = {workspace = true} + +rand = "0.8.5" + +quic-geyser-client = { workspace = true } +quic-geyser-common = { workspace = true } +quic-geyser-plugin = { workspace = true } \ No newline at end of file diff --git a/examples/tester-server/src/cli.rs b/examples/tester-server/src/cli.rs new file mode 100644 index 0000000..fd91a3d --- /dev/null +++ b/examples/tester-server/src/cli.rs @@ -0,0 +1,17 @@ +use clap::Parser; + +#[derive(Parser, Debug, Clone)] +#[clap(name = "quic_plugin_tester")] +pub struct Args { + #[clap(short, long, default_value_t = 10900)] + pub port: u32, + + #[clap(short, long, default_value_t = 20_000)] + pub accounts_per_second: u32, + + #[clap(short = 'l', long, default_value_t = 1_000_000)] + pub account_data_size: u32, + + #[clap(short, long, default_value_t = false)] + pub drop_laggers: bool, +} diff --git a/examples/tester-server/src/main.rs b/examples/tester-server/src/main.rs new file mode 100644 index 0000000..aef3be0 --- /dev/null +++ b/examples/tester-server/src/main.rs @@ -0,0 +1,71 @@ +use std::{ + net::SocketAddr, + str::FromStr, + time::{Duration, Instant}, +}; + +use clap::Parser; +use cli::Args; +use itertools::Itertools; +use quic_geyser_common::{ + config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, + quic::quic_server::{AccountData, ChannelMessage, QuicServer}, +}; +use rand::{thread_rng, Rng}; +use solana_sdk::{account::Account, pubkey::Pubkey}; +use tokio::runtime::Builder; + +pub mod cli; + +pub fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + let runtime = Builder::new_multi_thread() + .thread_name_fn(|| "solGeyserQuic".to_string()) + .enable_all() + .build() + .map_err(|error| { + let s = error.to_string(); + log::error!("Runtime Error : {}", s); + error + })?; + + let config = ConfigQuicPlugin { + address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(), + quic_parameters: QuicParameters::default(), + compression_parameters: CompressionParameters { + compression_type: quic_geyser_common::compression::CompressionType::None, + }, + number_of_retries: 100, + }; + let quic_server = QuicServer::new(runtime, config, args.drop_laggers).unwrap(); + + let mut instant = Instant::now(); + + let mut slot = 1; + let mut write_version = 1; + let mut rand = thread_rng(); + loop { + std::thread::sleep(Duration::from_secs(1) - Instant::now().duration_since(instant)); + instant = Instant::now(); + slot += 1; + for _ in 0..args.accounts_per_second { + write_version += 1; + let account = AccountData { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: rand.gen(), + data: (0..args.account_data_size as usize) + .map(|_| rand.gen::()) + .collect_vec(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version, + }; + let channel_message = ChannelMessage::Account(account, slot, false); + quic_server.send_message(channel_message)?; + } + } +} diff --git a/plugin/Cargo.toml b/plugin/Cargo.toml index 5b43d67..8d4be90 100644 --- a/plugin/Cargo.toml +++ b/plugin/Cargo.toml @@ -25,7 +25,7 @@ quinn = { workspace = true } log = { workspace = true } thiserror = {workspace = true} -quic-geyser-common = { path = "../common" } +quic-geyser-common = { workspace = true } [build-dependencies] anyhow = { workspace = true } diff --git a/plugin/src/quic_plugin.rs b/plugin/src/quic_plugin.rs index 103f153..05a02a3 100644 --- a/plugin/src/quic_plugin.rs +++ b/plugin/src/quic_plugin.rs @@ -40,7 +40,7 @@ impl GeyserPlugin for QuicGeyserPlugin { .thread_name_fn(|| { static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed); - format!("solGeyserGrpc{id:02}") + format!("solGeyserQuic{id:02}") }) .enable_all() .build() @@ -50,7 +50,7 @@ impl GeyserPlugin for QuicGeyserPlugin { GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) })?; - let quic_server = QuicServer::new(runtime, config.quic_plugin).map_err(|_| { + let quic_server = QuicServer::new(runtime, config.quic_plugin, true).map_err(|_| { GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) })?; self.quic_server = Some(quic_server);