diff --git a/Cargo.lock b/Cargo.lock index cefbaff..920a709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -789,6 +789,7 @@ checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "atty", "bitflags 1.3.2", + "clap_derive 3.2.25", "clap_lex 0.2.4", "indexmap 1.9.3", "once_cell", @@ -804,7 +805,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" dependencies = [ "clap_builder", - "clap_derive", + "clap_derive 4.5.4", ] [[package]] @@ -819,13 +820,26 @@ dependencies = [ "strsim 0.11.1", ] +[[package]] +name = "clap_derive" +version = "3.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008" +dependencies = [ + "heck 0.4.1", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "clap_derive" version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.61", @@ -1454,23 +1468,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "geyser-quic-plugin" -version = "0.1.0" -dependencies = [ - "agave-geyser-plugin-interface", - "anyhow", - "clap 4.5.4", - "log", - "quic-geyser-common", - "quinn", - "serde", - "serde_json", - "solana-sdk", - "thiserror", - "tokio", -] - [[package]] name = "geyser-quic-plugin-tester" version = "0.1.0" @@ -1482,6 +1479,7 @@ dependencies = [ "log", "quic-geyser-client", "quic-geyser-common", + "quic-geyser-plugin", "serde", "serde_json", "solana-rpc-client", @@ -1564,6 +1562,12 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +[[package]] +name = "heck" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" + [[package]] name = "heck" version = "0.5.0" @@ -2461,6 +2465,30 @@ dependencies = [ "toml_edit 0.21.1", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.82" @@ -2523,6 +2551,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "quic-geyser-plugin" +version = "0.1.0" +dependencies = [ + "agave-geyser-plugin-interface", + "anyhow", + "clap 3.2.25", + "log", + "quic-geyser-common", + "quinn", + "serde", + "serde_json", + "solana-sdk", + "thiserror", + "tokio", +] + [[package]] name = "quinn" version = "0.10.2" diff --git a/config.json b/config.json new file mode 100644 index 0000000..e046790 --- /dev/null +++ b/config.json @@ -0,0 +1,3 @@ +{ + "libpath": "target/debug/libquic_geyser_plugin.so" +} diff --git a/plugin/Cargo.toml b/plugin/Cargo.toml index 9960fa7..6a20d43 100644 --- a/plugin/Cargo.toml +++ b/plugin/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "geyser-quic-plugin" +name = "quic-geyser-plugin" version = "0.1.0" edition = "2021" authors = ["Godmode Galactus"] diff --git a/plugin/src/bin/config-check.rs b/plugin/src/bin/config-check.rs index 485d7a8..9f6fa38 100644 --- a/plugin/src/bin/config-check.rs +++ b/plugin/src/bin/config-check.rs @@ -1,4 +1,4 @@ -use {clap::Parser, geyser_quic_plugin::config::Config}; +use {clap::Parser, quic_geyser_plugin::config::Config}; #[derive(Debug, Parser)] #[clap(author, version, about)] diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..7897a24 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "1.75.0" diff --git a/tester/Cargo.toml b/tester/Cargo.toml new file mode 100644 index 0000000..fb039e8 --- /dev/null +++ b/tester/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "geyser-quic-plugin-tester" +version = "0.1.0" +edition = "2021" +authors = ["Godmode Galactus"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +solana-rpc-client = "~1.17.28" + +tokio = { workspace = "true" } +clap = { workspace = "true", features = ["derive", "env"] } +serde = { workspace = "true" } +solana-sdk = { workspace = "true" } +serde_json = { workspace = "true" } +anyhow = { workspace = "true" } +log = { workspace = "true" } +futures = { workspace = "true" } +bincode = { workspace = "true" } + +quic-geyser-client = { path = "../client" } +quic-geyser-common = { path = "../common" } +quic-geyser-plugin = { path = "../plugin" } \ No newline at end of file diff --git a/tester/src/cli.rs b/tester/src/cli.rs new file mode 100644 index 0000000..ba2075a --- /dev/null +++ b/tester/src/cli.rs @@ -0,0 +1,11 @@ +use clap::Parser; + +#[derive(Parser, Debug, Clone)] +#[command(author, version, about, long_about = None)] +pub struct Args { + #[arg(short, long)] + pub url: String, + + #[arg(short, long)] + pub rpc_url: String, +} diff --git a/tester/src/main.rs b/tester/src/main.rs new file mode 100644 index 0000000..0578d3a --- /dev/null +++ b/tester/src/main.rs @@ -0,0 +1,145 @@ +use std::{ + net::{Ipv4Addr, SocketAddrV4}, + sync::{atomic::AtomicU64, Arc}, + time::Duration, +}; + +use clap::Parser; +use cli::Args; +use futures::StreamExt; +use quic_geyser_client::{client::Client, DEFAULT_MAX_STREAM}; +use quic_geyser_common::filters::{AccountFilter, Filter}; +use quic_geyser_plugin::config::{CompressionParameters, Config, ConfigQuicPlugin, QuicParameters}; +use serde_json::json; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair}; +use tokio::pin; + +pub mod cli; + +#[tokio::main] +async fn main() { + let config = Config { + libpath: "temp".to_string(), + quic_plugin: ConfigQuicPlugin { + address: std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 10800)), + quic_parameters: QuicParameters { + max_number_of_streams_per_client: 1024, + recieve_window_size: 1_000_000, + connection_timeout: 600, + }, + compression_parameters: CompressionParameters { + compression_type: quic_geyser_common::compression::CompressionType::Lz4Fast(8), + }, + number_of_retries: 100, + }, + }; + let config_json = json!(config); + println!("{}", config_json); + + let args = Args::parse(); + let client = Client::new(args.url, &Keypair::new(), DEFAULT_MAX_STREAM) + .await + .unwrap(); + + let bytes_transfered = Arc::new(AtomicU64::new(0)); + let slot_notifications = Arc::new(AtomicU64::new(0)); + let account_notification = Arc::new(AtomicU64::new(0)); + let blockmeta_notifications = Arc::new(AtomicU64::new(0)); + + let cluster_slot = Arc::new(AtomicU64::new(0)); + let account_slot = Arc::new(AtomicU64::new(0)); + let slot_slot = Arc::new(AtomicU64::new(0)); + let blockmeta_slot = Arc::new(AtomicU64::new(0)); + + { + let cluster_slot = cluster_slot.clone(); + let rpc = RpcClient::new(args.rpc_url); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(100)).await; + let slot = rpc + .get_slot_with_commitment(CommitmentConfig::processed()) + .await + .unwrap(); + cluster_slot.store(slot, std::sync::atomic::Ordering::Relaxed); + } + }); + } + + { + let bytes_transfered: Arc = bytes_transfered.clone(); + let slot_notifications = slot_notifications.clone(); + let account_notification = account_notification.clone(); + let blockmeta_notifications = blockmeta_notifications.clone(); + + let cluster_slot = cluster_slot.clone(); + let account_slot = account_slot.clone(); + let slot_slot = slot_slot.clone(); + let blockmeta_slot = blockmeta_slot.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + let bytes_transfered = + bytes_transfered.swap(0, std::sync::atomic::Ordering::Relaxed); + log::info!("------------------------------------------"); + log::info!(" Bytes Transfered : {}", bytes_transfered); + log::info!( + " Accounts Notified : {}", + account_notification.swap(0, std::sync::atomic::Ordering::Relaxed) + ); + log::info!( + " Slots Notified : {}", + slot_notifications.swap(0, std::sync::atomic::Ordering::Relaxed) + ); + log::info!( + " Blockmeta notified : {}", + blockmeta_notifications.swap(0, std::sync::atomic::Ordering::Relaxed) + ); + + log::info!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {} ", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed)); + } + }); + } + + client + .subscribe(vec![ + Filter::Account(AccountFilter { + owner: Some(Pubkey::default()), + accounts: None, + }), + Filter::Slot, + Filter::BlockMeta, + ]) + .await + .unwrap(); + + let stream = client.get_stream(); + pin!(stream); + + while let Some(message) = stream.next().await { + let message_size = bincode::serialize(&message).unwrap().len(); + bytes_transfered.fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed); + match message { + quic_geyser_common::message::Message::AccountMsg(account) => { + log::debug!("got account notification : {} ", account.pubkey); + account_notification.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + account_slot.store( + account.slot_identifier.slot, + std::sync::atomic::Ordering::Relaxed, + ); + } + quic_geyser_common::message::Message::SlotMsg(slot) => { + log::debug!("got slot notification : {} ", slot.slot); + slot_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + slot_slot.store(slot.slot, std::sync::atomic::Ordering::Relaxed); + } + quic_geyser_common::message::Message::BlockMetaMsg(block_meta) => { + log::debug!("got blockmeta notification : {} ", block_meta.slot); + blockmeta_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + blockmeta_slot.store(block_meta.slot, std::sync::atomic::Ordering::Relaxed); + } + quic_geyser_common::message::Message::Filters(_) => todo!(), + } + } +}