Adding a quic geyser proxy binary

This commit is contained in:
godmodegalactus 2024-05-27 12:22:12 +02:00
parent 8ad956eec1
commit 87e77c8b09
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
10 changed files with 260 additions and 20 deletions

17
Cargo.lock generated
View File

@ -2462,6 +2462,23 @@ dependencies = [
"vergen",
]
[[package]]
name = "quic-geyser-plugin-proxy"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"clap",
"log",
"quic-geyser-client",
"quic-geyser-common",
"serde",
"serde_json",
"solana-rpc-client",
"solana-sdk",
"tracing-subscriber",
]
[[package]]
name = "quic-plugin-tester-client"
version = "0.1.0"

View File

@ -7,6 +7,7 @@ members = [
"common",
"examples/tester-client",
"examples/tester-server",
"proxy",
]
[workspace.package]

View File

@ -9,7 +9,7 @@ use std::sync::Arc;
pub struct Client {
is_connected: Arc<AtomicBool>,
filters_sender: mio_channel::Sender<Message>,
filters_sender: std::sync::mpsc::Sender<Message>,
}
impl Client {
@ -21,13 +21,15 @@ impl Client {
connection_parameters.max_number_of_streams,
connection_parameters.recieve_window_size,
connection_parameters.timeout_in_seconds,
connection_parameters.max_ack_delay,
connection_parameters.ack_exponent,
)?;
let server_address: SocketAddr = server_address.parse()?;
let socket_addr: SocketAddr = "0.0.0.0:0"
.parse()
.expect("Socket address should be returned");
let is_connected = Arc::new(AtomicBool::new(false));
let (filters_sender, rx_sent_queue) = mio_channel::channel();
let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel();
let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel();
let is_connected_client = is_connected.clone();
@ -160,6 +162,8 @@ mod tests {
max_number_of_streams: 10,
recieve_window_size: 1_000_000,
timeout_in_seconds: 10,
max_ack_delay: 25,
ack_exponent: 3,
},
)
.unwrap();

View File

@ -9,6 +9,8 @@ pub fn configure_client(
maximum_concurrent_streams: u64,
recieve_window_size: u64,
timeout_in_seconds: u64,
maximum_ack_delay: u64,
ack_exponent: u64,
) -> anyhow::Result<quiche::Config> {
let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).unwrap();
config
@ -26,5 +28,7 @@ pub fn configure_client(
config.set_initial_max_streams_uni(maximum_concurrent_streams);
config.set_disable_active_migration(true);
config.set_cc_algorithm(quiche::CongestionControlAlgorithm::BBR2);
config.set_max_ack_delay(maximum_ack_delay);
config.set_ack_delay_exponent(ack_exponent);
Ok(config)
}

View File

@ -20,7 +20,7 @@ pub fn client_loop(
mut config: quiche::Config,
socket_addr: SocketAddr,
server_address: SocketAddr,
message_send_queue: mio_channel::Receiver<Message>,
message_send_queue: std::sync::mpsc::Receiver<Message>,
message_recv_queue: std::sync::mpsc::Sender<Message>,
is_connected: Arc<AtomicBool>,
) -> anyhow::Result<()> {
@ -125,7 +125,7 @@ pub fn create_quiche_client_thread(
connection: quiche::Connection,
mut receiver: mio_channel::Receiver<(quiche::RecvInfo, Vec<u8>)>,
sender: mio_channel::Sender<(quiche::SendInfo, Vec<u8>)>,
message_send_queue: mio_channel::Receiver<Message>,
message_send_queue: std::sync::mpsc::Receiver<Message>,
message_recv_queue: std::sync::mpsc::Sender<Message>,
is_connected: Arc<AtomicBool>,
) {
@ -210,18 +210,33 @@ pub fn create_quiche_client_thread(
}
}
// channel events
if let Ok(message_to_send) = message_send_queue.try_recv() {
current_stream_id = get_next_unidi(current_stream_id, false, maximum_streams);
let binary = bincode::serialize(&message_to_send)
.expect("Message should be serializable");
if let Err(e) = send_message(
&mut connection,
&mut partial_responses,
current_stream_id,
&binary,
) {
log::error!("Sending failed with error {e:?}");
loop {
match message_send_queue.try_recv() {
Ok(message_to_send) => {
current_stream_id =
get_next_unidi(current_stream_id, false, maximum_streams);
let binary = bincode::serialize(&message_to_send)
.expect("Message should be serializable");
if let Err(e) = send_message(
&mut connection,
&mut partial_responses,
current_stream_id,
&binary,
) {
log::error!("Sending failed with error {e:?}");
}
}
Err(e) => {
match e {
std::sync::mpsc::TryRecvError::Empty => {
// no more new messages
break;
}
std::sync::mpsc::TryRecvError::Disconnected => {
let _ = connection.close(true, 0, b"no longer needed");
}
}
}
}
}
}
@ -377,12 +392,12 @@ mod tests {
// client loop
let server_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
let (client_sx_queue, rx_sent_queue) = mio_channel::channel();
let (client_sx_queue, rx_sent_queue) = mpsc::channel();
let (sx_recv_queue, client_rx_queue) = mpsc::channel();
let _client_loop_jh = std::thread::spawn(move || {
let client_config =
configure_client(maximum_concurrent_streams, 20_000_000, 1).unwrap();
configure_client(maximum_concurrent_streams, 20_000_000, 1, 25, 3).unwrap();
let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
let is_connected = Arc::new(AtomicBool::new(false));
if let Err(e) = client_loop(

View File

@ -1,6 +1,9 @@
use serde::{Deserialize, Serialize};
use crate::quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS};
use crate::{
config::{DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY},
quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS},
};
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[repr(C)]
@ -8,6 +11,8 @@ pub struct ConnectionParameters {
pub max_number_of_streams: u64,
pub recieve_window_size: u64,
pub timeout_in_seconds: u64,
pub max_ack_delay: u64,
pub ack_exponent: u64,
}
impl Default for ConnectionParameters {
@ -15,7 +20,9 @@ impl Default for ConnectionParameters {
Self {
max_number_of_streams: DEFAULT_MAX_STREAMS,
recieve_window_size: DEFAULT_MAX_RECIEVE_WINDOW_SIZE,
timeout_in_seconds: 10,
timeout_in_seconds: DEFAULT_CONNECTION_TIMEOUT,
max_ack_delay: DEFAULT_MAX_ACK_DELAY,
ack_exponent: DEFAULT_ACK_EXPONENT,
}
}
}

View File

@ -48,6 +48,8 @@ pub fn main() {
max_number_of_streams: 1024 * 1024,
recieve_window_size: DEFAULT_MAX_RECIEVE_WINDOW_SIZE,
timeout_in_seconds: 30,
max_ack_delay: 25,
ack_exponent: 3,
},
)
.unwrap();

22
proxy/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "quic-geyser-plugin-proxy"
version = "0.1.0"
edition = "2021"
authors = ["Godmode Galactus"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
solana-rpc-client = "~1.17.28"
clap = { workspace = true, features = ["derive", "env"] }
serde = { workspace = true }
solana-sdk = { workspace = true }
serde_json = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
bincode = { workspace = true }
tracing-subscriber = { workspace = true }
quic-geyser-client = { workspace = true }
quic-geyser-common = { workspace = true }

36
proxy/src/cli.rs Normal file
View File

@ -0,0 +1,36 @@
use clap::Parser;
use quic_geyser_common::{
config::{DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY},
quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS},
};
#[derive(Parser, Debug, Clone)]
#[clap(name = "quic_plugin_tester")]
pub struct Args {
#[clap(short, long)]
pub source_url: String,
#[clap(short, long, default_value_t = 10800)]
pub port: u64,
#[clap(short, long, default_value_t = 8)]
pub compression_speed: u32,
#[clap(short, long, default_value_t = 50)]
pub max_number_of_connections: u64,
#[clap(long, default_value_t = DEFAULT_MAX_STREAMS)]
pub max_number_of_streams_per_client: u64,
#[clap(long, default_value_t = DEFAULT_MAX_RECIEVE_WINDOW_SIZE)]
pub recieve_window_size: u64,
#[clap(long, default_value_t = DEFAULT_CONNECTION_TIMEOUT)]
pub connection_timeout: u64,
#[clap(long, default_value_t = DEFAULT_MAX_ACK_DELAY)]
pub max_ack_delay: u64,
#[clap(long, default_value_t = DEFAULT_ACK_EXPONENT)]
pub ack_exponent: u64,
}

132
proxy/src/main.rs Normal file
View File

@ -0,0 +1,132 @@
use std::{net::SocketAddr, str::FromStr};
use clap::Parser;
use cli::Args;
use quic_geyser_client::client::Client;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
filters::Filter,
quic::quic_server::QuicServer,
types::{
block_meta::BlockMeta, connections_parameters::ConnectionParameters,
transaction::Transaction,
},
};
pub mod cli;
pub fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let (client, message_channel) = Client::new(
args.source_url,
ConnectionParameters {
max_number_of_streams: args.max_number_of_streams_per_client,
recieve_window_size: args.recieve_window_size,
timeout_in_seconds: args.connection_timeout,
max_ack_delay: args.max_ack_delay,
ack_exponent: args.ack_exponent,
},
)?;
log::info!("Subscribing");
client.subscribe(vec![
Filter::AccountsAll,
Filter::TransactionsAll,
Filter::Slot,
Filter::BlockMeta,
])?;
let quic_config = ConfigQuicPlugin {
address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(),
log_level: "info".to_string(),
quic_parameters: QuicParameters {
max_number_of_streams_per_client: args.max_number_of_streams_per_client,
recieve_window_size: args.recieve_window_size,
connection_timeout: args.connection_timeout,
max_number_of_connections: args.max_number_of_connections,
max_ack_delay: args.max_ack_delay,
ack_exponent: args.ack_exponent,
},
compression_parameters: CompressionParameters {
compression_type: quic_geyser_common::compression::CompressionType::Lz4Fast(
args.compression_speed,
),
},
number_of_retries: 100,
allow_accounts: true,
allow_accounts_at_startup: false,
};
let (server_sender, server_reciever) = std::sync::mpsc::channel::<ChannelMessage>();
std::thread::spawn(move || {
let quic_server = QuicServer::new(quic_config).unwrap();
loop {
match server_reciever.recv() {
Ok(channel_message) => {
if quic_server.send_message(channel_message).is_err() {
log::error!("server broken");
break;
}
}
Err(_) => {
log::info!("closing server");
break;
}
}
}
});
while let Ok(message) = message_channel.recv() {
let channel_message = match message {
quic_geyser_common::message::Message::AccountMsg(account_message) => {
ChannelMessage::Account(
AccountData {
pubkey: account_message.pubkey,
account: account_message.solana_account(),
write_version: account_message.write_version,
},
account_message.slot_identifier.slot,
)
}
quic_geyser_common::message::Message::SlotMsg(slot_message) => ChannelMessage::Slot(
slot_message.slot,
slot_message.parent,
slot_message.commitment_level,
),
quic_geyser_common::message::Message::BlockMetaMsg(block_meta_message) => {
ChannelMessage::BlockMeta(BlockMeta {
parent_slot: block_meta_message.parent_slot,
slot: block_meta_message.slot,
parent_blockhash: block_meta_message.parent_blockhash,
blockhash: block_meta_message.blockhash,
rewards: block_meta_message.rewards,
block_height: block_meta_message.block_height,
executed_transaction_count: block_meta_message.executed_transaction_count,
entries_count: block_meta_message.entries_count,
})
}
quic_geyser_common::message::Message::TransactionMsg(transaction_message) => {
ChannelMessage::Transaction(Box::new(Transaction {
slot_identifier: transaction_message.slot_identifier,
signatures: transaction_message.signatures,
message: transaction_message.message,
is_vote: transaction_message.is_vote,
transasction_meta: transaction_message.transasction_meta,
index: transaction_message.index,
}))
}
_ => {
unreachable!()
}
};
if server_sender.send(channel_message).is_err() {
log::error!("Server stopped");
break;
}
}
Ok(())
}