Adding config file, rust toolchain file and tester example

This commit is contained in:
Godmode Galactus 2024-05-14 19:34:09 +02:00
parent 71bde1cf80
commit 2808b8ddb8
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
8 changed files with 251 additions and 21 deletions

83
Cargo.lock generated
View File

@ -789,6 +789,7 @@ checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
dependencies = [
"atty",
"bitflags 1.3.2",
"clap_derive 3.2.25",
"clap_lex 0.2.4",
"indexmap 1.9.3",
"once_cell",
@ -804,7 +805,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0"
dependencies = [
"clap_builder",
"clap_derive",
"clap_derive 4.5.4",
]
[[package]]
@ -819,13 +820,26 @@ dependencies = [
"strsim 0.11.1",
]
[[package]]
name = "clap_derive"
version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008"
dependencies = [
"heck 0.4.1",
"proc-macro-error",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "clap_derive"
version = "4.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64"
dependencies = [
"heck",
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.61",
@ -1454,23 +1468,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "geyser-quic-plugin"
version = "0.1.0"
dependencies = [
"agave-geyser-plugin-interface",
"anyhow",
"clap 4.5.4",
"log",
"quic-geyser-common",
"quinn",
"serde",
"serde_json",
"solana-sdk",
"thiserror",
"tokio",
]
[[package]]
name = "geyser-quic-plugin-tester"
version = "0.1.0"
@ -1482,6 +1479,7 @@ dependencies = [
"log",
"quic-geyser-client",
"quic-geyser-common",
"quic-geyser-plugin",
"serde",
"serde_json",
"solana-rpc-client",
@ -1564,6 +1562,12 @@ version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "heck"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "heck"
version = "0.5.0"
@ -2461,6 +2465,30 @@ dependencies = [
"toml_edit 0.21.1",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn 1.0.109",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro2"
version = "1.0.82"
@ -2523,6 +2551,23 @@ dependencies = [
"tokio",
]
[[package]]
name = "quic-geyser-plugin"
version = "0.1.0"
dependencies = [
"agave-geyser-plugin-interface",
"anyhow",
"clap 3.2.25",
"log",
"quic-geyser-common",
"quinn",
"serde",
"serde_json",
"solana-sdk",
"thiserror",
"tokio",
]
[[package]]
name = "quinn"
version = "0.10.2"

3
config.json Normal file
View File

@ -0,0 +1,3 @@
{
"libpath": "target/debug/libquic_geyser_plugin.so"
}

View File

@ -1,5 +1,5 @@
[package]
name = "geyser-quic-plugin"
name = "quic-geyser-plugin"
version = "0.1.0"
edition = "2021"
authors = ["Godmode Galactus"]

View File

@ -1,4 +1,4 @@
use {clap::Parser, geyser_quic_plugin::config::Config};
use {clap::Parser, quic_geyser_plugin::config::Config};
#[derive(Debug, Parser)]
#[clap(author, version, about)]

2
rust-toolchain.toml Normal file
View File

@ -0,0 +1,2 @@
[toolchain]
channel = "1.75.0"

24
tester/Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "geyser-quic-plugin-tester"
version = "0.1.0"
edition = "2021"
authors = ["Godmode Galactus"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
solana-rpc-client = "~1.17.28"
tokio = { workspace = "true" }
clap = { workspace = "true", features = ["derive", "env"] }
serde = { workspace = "true" }
solana-sdk = { workspace = "true" }
serde_json = { workspace = "true" }
anyhow = { workspace = "true" }
log = { workspace = "true" }
futures = { workspace = "true" }
bincode = { workspace = "true" }
quic-geyser-client = { path = "../client" }
quic-geyser-common = { path = "../common" }
quic-geyser-plugin = { path = "../plugin" }

11
tester/src/cli.rs Normal file
View File

@ -0,0 +1,11 @@
use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
#[arg(short, long)]
pub url: String,
#[arg(short, long)]
pub rpc_url: String,
}

145
tester/src/main.rs Normal file
View File

@ -0,0 +1,145 @@
use std::{
net::{Ipv4Addr, SocketAddrV4},
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use clap::Parser;
use cli::Args;
use futures::StreamExt;
use quic_geyser_client::{client::Client, DEFAULT_MAX_STREAM};
use quic_geyser_common::filters::{AccountFilter, Filter};
use quic_geyser_plugin::config::{CompressionParameters, Config, ConfigQuicPlugin, QuicParameters};
use serde_json::json;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair};
use tokio::pin;
pub mod cli;
#[tokio::main]
async fn main() {
let config = Config {
libpath: "temp".to_string(),
quic_plugin: ConfigQuicPlugin {
address: std::net::SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 10800)),
quic_parameters: QuicParameters {
max_number_of_streams_per_client: 1024,
recieve_window_size: 1_000_000,
connection_timeout: 600,
},
compression_parameters: CompressionParameters {
compression_type: quic_geyser_common::compression::CompressionType::Lz4Fast(8),
},
number_of_retries: 100,
},
};
let config_json = json!(config);
println!("{}", config_json);
let args = Args::parse();
let client = Client::new(args.url, &Keypair::new(), DEFAULT_MAX_STREAM)
.await
.unwrap();
let bytes_transfered = Arc::new(AtomicU64::new(0));
let slot_notifications = Arc::new(AtomicU64::new(0));
let account_notification = Arc::new(AtomicU64::new(0));
let blockmeta_notifications = Arc::new(AtomicU64::new(0));
let cluster_slot = Arc::new(AtomicU64::new(0));
let account_slot = Arc::new(AtomicU64::new(0));
let slot_slot = Arc::new(AtomicU64::new(0));
let blockmeta_slot = Arc::new(AtomicU64::new(0));
{
let cluster_slot = cluster_slot.clone();
let rpc = RpcClient::new(args.rpc_url);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
let slot = rpc
.get_slot_with_commitment(CommitmentConfig::processed())
.await
.unwrap();
cluster_slot.store(slot, std::sync::atomic::Ordering::Relaxed);
}
});
}
{
let bytes_transfered: Arc<AtomicU64> = bytes_transfered.clone();
let slot_notifications = slot_notifications.clone();
let account_notification = account_notification.clone();
let blockmeta_notifications = blockmeta_notifications.clone();
let cluster_slot = cluster_slot.clone();
let account_slot = account_slot.clone();
let slot_slot = slot_slot.clone();
let blockmeta_slot = blockmeta_slot.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let bytes_transfered =
bytes_transfered.swap(0, std::sync::atomic::Ordering::Relaxed);
log::info!("------------------------------------------");
log::info!(" Bytes Transfered : {}", bytes_transfered);
log::info!(
" Accounts Notified : {}",
account_notification.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::info!(
" Slots Notified : {}",
slot_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::info!(
" Blockmeta notified : {}",
blockmeta_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
log::info!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {} ", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed));
}
});
}
client
.subscribe(vec![
Filter::Account(AccountFilter {
owner: Some(Pubkey::default()),
accounts: None,
}),
Filter::Slot,
Filter::BlockMeta,
])
.await
.unwrap();
let stream = client.get_stream();
pin!(stream);
while let Some(message) = stream.next().await {
let message_size = bincode::serialize(&message).unwrap().len();
bytes_transfered.fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed);
match message {
quic_geyser_common::message::Message::AccountMsg(account) => {
log::debug!("got account notification : {} ", account.pubkey);
account_notification.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
account_slot.store(
account.slot_identifier.slot,
std::sync::atomic::Ordering::Relaxed,
);
}
quic_geyser_common::message::Message::SlotMsg(slot) => {
log::debug!("got slot notification : {} ", slot.slot);
slot_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
slot_slot.store(slot.slot, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::BlockMetaMsg(block_meta) => {
log::debug!("got blockmeta notification : {} ", block_meta.slot);
blockmeta_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
blockmeta_slot.store(block_meta.slot, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::Filters(_) => todo!(),
}
}
}