Adding a new example server, other minor changes

This commit is contained in:
godmodegalactus 2024-05-20 11:28:30 +02:00
parent 17590072b1
commit 47048e31d0
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
15 changed files with 199 additions and 45 deletions

58
Cargo.lock generated
View File

@ -1302,25 +1302,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "geyser-quic-plugin-tester"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"clap",
"futures",
"log",
"quic-geyser-client",
"quic-geyser-common",
"quic-geyser-plugin",
"serde",
"serde_json",
"solana-rpc-client",
"solana-sdk",
"tokio",
]
[[package]]
name = "gimli"
version = "0.28.1"
@ -2324,6 +2305,45 @@ dependencies = [
"vergen",
]
[[package]]
name = "quic-plugin-tester-client"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"clap",
"futures",
"log",
"quic-geyser-client",
"quic-geyser-common",
"quic-geyser-plugin",
"serde",
"serde_json",
"solana-rpc-client",
"solana-sdk",
"tokio",
]
[[package]]
name = "quic-plugin-tester-server"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"clap",
"futures",
"itertools",
"log",
"quic-geyser-client",
"quic-geyser-common",
"quic-geyser-plugin",
"rand 0.8.5",
"serde",
"serde_json",
"solana-sdk",
"tokio",
]
[[package]]
name = "quinn"
version = "0.10.2"

View File

@ -5,7 +5,8 @@ members = [
"plugin",
"client",
"common",
"tester",
"examples/tester-client",
"examples/tester-server",
]
[workspace.package]

View File

@ -14,4 +14,4 @@ async-stream = { workspace = true }
tokio = { workspace = true }
log = { workspace = true }
quic-geyser-common = { path = "../common" }
quic-geyser-common = { workspace = true }

View File

@ -90,13 +90,12 @@ mod tests {
use futures::StreamExt;
use quic_geyser_common::{
filters::{AccountFilter, Filter},
filters::Filter,
message::Message,
quic::{configure_server::configure_server, connection_manager::ConnectionManager},
types::{account::Account, connections_parameters::ConnectionParameters},
};
use quinn::{Endpoint, EndpointConfig, TokioRuntime};
use solana_sdk::pubkey::Pubkey;
use tokio::{pin, sync::Notify};
use crate::client::Client;
@ -135,7 +134,7 @@ mod tests {
notify_server_start.notify_one();
notify_subscription.notified().await;
for msg in msgs {
connection_manager.dispatch(msg, 10).await;
connection_manager.dispatch(msg, 10, false).await;
}
});
}
@ -146,9 +145,9 @@ mod tests {
let client = Client::new(
url,
ConnectionParameters {
max_number_of_streams: 30,
streams_for_slot_data: 10,
streams_for_transactions: 10,
max_number_of_streams: 3,
streams_for_slot_data: 1,
streams_for_transactions: 1,
},
)
.await

View File

@ -4,6 +4,9 @@ use serde::{Deserialize, Serialize};
use crate::{compression::CompressionType, quic::configure_client::DEFAULT_MAX_STREAMS};
pub const DEFAULT_WINDOW_SIZE: u32 = 1_000_000;
pub const DEFAULT_CONNECTION_TIMEOUT: u32 = 10;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigQuicPlugin {
@ -39,8 +42,8 @@ impl Default for QuicParameters {
fn default() -> Self {
Self {
max_number_of_streams_per_client: DEFAULT_MAX_STREAMS,
recieve_window_size: 1_000_000, // 1 Mb
connection_timeout: 10, // 10s
recieve_window_size: DEFAULT_WINDOW_SIZE, // 1 Mb
connection_timeout: DEFAULT_CONNECTION_TIMEOUT, // 10s
}
}
}

View File

@ -187,7 +187,7 @@ impl ConnectionManager {
});
}
pub async fn dispatch(&self, message: Message, retry_count: u64) {
pub async fn dispatch(&self, message: Message, retry_count: u64, drop_lagger: bool) {
let lk = self.connections.read().await;
for connection_data in lk.iter() {
@ -216,7 +216,7 @@ impl ConnectionManager {
let id = connection_data.id;
tokio::spawn(async move {
let permit_result = semaphore.try_acquire_owned();
let permit_result = semaphore.clone().try_acquire_owned();
let _permit = match permit_result {
Ok(permit) => permit,
@ -227,8 +227,11 @@ impl ConnectionManager {
id,
message_type
);
connection.close(VarInt::from_u32(0), b"laggy client");
return;
if drop_lagger {
connection.close(VarInt::from_u32(0), b"laggy client");
return;
}
semaphore.acquire_owned().await.expect("Permit is aquired")
}
};

View File

@ -39,7 +39,11 @@ pub struct QuicServer {
}
impl QuicServer {
pub fn new(runtime: Runtime, config: ConfigQuicPlugin) -> anyhow::Result<Self> {
pub fn new(
runtime: Runtime,
config: ConfigQuicPlugin,
drop_laggers: bool,
) -> anyhow::Result<Self> {
let server_config = configure_server(
config.quic_parameters.max_number_of_streams_per_client,
config.quic_parameters.recieve_window_size,
@ -74,6 +78,7 @@ impl QuicServer {
slot,
compression_type,
retry_count,
drop_laggers,
);
}
}
@ -83,15 +88,21 @@ impl QuicServer {
parent,
commitment_level,
});
quic_connection_manager.dispatch(message, retry_count).await;
quic_connection_manager
.dispatch(message, retry_count, drop_laggers)
.await;
}
ChannelMessage::BlockMeta(block_meta) => {
let message = Message::BlockMetaMsg(block_meta);
quic_connection_manager.dispatch(message, retry_count).await;
quic_connection_manager
.dispatch(message, retry_count, drop_laggers)
.await;
}
ChannelMessage::Transaction(transaction) => {
let message = Message::TransactionMsg(transaction);
quic_connection_manager.dispatch(message, retry_count).await;
quic_connection_manager
.dispatch(message, retry_count, drop_laggers)
.await;
}
}
}
@ -118,6 +129,7 @@ fn process_account_message(
slot: Slot,
compression_type: CompressionType,
retry_count: u64,
drop_laggers: bool,
) {
tokio::spawn(async move {
let slot_identifier = SlotIdentifier { slot };
@ -130,6 +142,8 @@ fn process_account_message(
);
let message = Message::AccountMsg(geyser_account);
quic_connection_manager.dispatch(message, retry_count).await;
quic_connection_manager
.dispatch(message, retry_count, drop_laggers)
.await;
});
}

View File

@ -1,5 +1,5 @@
[package]
name = "geyser-quic-plugin-tester"
name = "quic-plugin-tester-client"
version = "0.1.0"
edition = "2021"
authors = ["Godmode Galactus"]
@ -19,6 +19,6 @@ log = { workspace = true }
futures = { workspace = true }
bincode = { workspace = true }
quic-geyser-client = { path = "../client" }
quic-geyser-common = { path = "../common" }
quic-geyser-plugin = { path = "../plugin" }
quic-geyser-client = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-plugin = { workspace = true }

View File

@ -0,0 +1,26 @@
[package]
name = "quic-plugin-tester-server"
version = "0.1.0"
edition = "2021"
authors = ["Godmode Galactus"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
serde = { workspace = true }
solana-sdk = { workspace = true }
serde_json = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
futures = { workspace = true }
bincode = { workspace = true }
itertools = {workspace = true}
rand = "0.8.5"
quic-geyser-client = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-plugin = { workspace = true }

View File

@ -0,0 +1,17 @@
use clap::Parser;
#[derive(Parser, Debug, Clone)]
#[clap(name = "quic_plugin_tester")]
pub struct Args {
#[clap(short, long, default_value_t = 10900)]
pub port: u32,
#[clap(short, long, default_value_t = 20_000)]
pub accounts_per_second: u32,
#[clap(short = 'l', long, default_value_t = 1_000_000)]
pub account_data_size: u32,
#[clap(short, long, default_value_t = false)]
pub drop_laggers: bool,
}

View File

@ -0,0 +1,71 @@
use std::{
net::SocketAddr,
str::FromStr,
time::{Duration, Instant},
};
use clap::Parser;
use cli::Args;
use itertools::Itertools;
use quic_geyser_common::{
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
quic::quic_server::{AccountData, ChannelMessage, QuicServer},
};
use rand::{thread_rng, Rng};
use solana_sdk::{account::Account, pubkey::Pubkey};
use tokio::runtime::Builder;
pub mod cli;
pub fn main() -> anyhow::Result<()> {
let args = Args::parse();
let runtime = Builder::new_multi_thread()
.thread_name_fn(|| "solGeyserQuic".to_string())
.enable_all()
.build()
.map_err(|error| {
let s = error.to_string();
log::error!("Runtime Error : {}", s);
error
})?;
let config = ConfigQuicPlugin {
address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(),
quic_parameters: QuicParameters::default(),
compression_parameters: CompressionParameters {
compression_type: quic_geyser_common::compression::CompressionType::None,
},
number_of_retries: 100,
};
let quic_server = QuicServer::new(runtime, config, args.drop_laggers).unwrap();
let mut instant = Instant::now();
let mut slot = 1;
let mut write_version = 1;
let mut rand = thread_rng();
loop {
std::thread::sleep(Duration::from_secs(1) - Instant::now().duration_since(instant));
instant = Instant::now();
slot += 1;
for _ in 0..args.accounts_per_second {
write_version += 1;
let account = AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: rand.gen(),
data: (0..args.account_data_size as usize)
.map(|_| rand.gen::<u8>())
.collect_vec(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version,
};
let channel_message = ChannelMessage::Account(account, slot, false);
quic_server.send_message(channel_message)?;
}
}
}

View File

@ -25,7 +25,7 @@ quinn = { workspace = true }
log = { workspace = true }
thiserror = {workspace = true}
quic-geyser-common = { path = "../common" }
quic-geyser-common = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }

View File

@ -40,7 +40,7 @@ impl GeyserPlugin for QuicGeyserPlugin {
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
format!("solGeyserGrpc{id:02}")
format!("solGeyserQuic{id:02}")
})
.enable_all()
.build()
@ -50,7 +50,7 @@ impl GeyserPlugin for QuicGeyserPlugin {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?;
let quic_server = QuicServer::new(runtime, config.quic_plugin).map_err(|_| {
let quic_server = QuicServer::new(runtime, config.quic_plugin, true).map_err(|_| {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?;
self.quic_server = Some(quic_server);