adding blocking client as it is faster than quinn

This commit is contained in:
godmodegalactus 2024-05-29 20:50:33 +02:00
parent c618840f11
commit f94e73954f
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
21 changed files with 299 additions and 223 deletions

42
Cargo.lock generated
View File

@ -2437,6 +2437,26 @@ dependencies = [
"syn 2.0.65",
]
[[package]]
name = "quic-geyser-blocking-client"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
"itertools",
"log",
"mio",
"mio_channel",
"quic-geyser-common",
"quic-geyser-quiche-utils",
"quic-geyser-server",
"quiche",
"rand 0.8.5",
"ring 0.17.8",
"solana-sdk",
"tracing-subscriber",
]
[[package]]
name = "quic-geyser-client"
version = "0.1.3"
@ -2501,14 +2521,31 @@ dependencies = [
"bincode",
"clap",
"log",
"quic-geyser-client",
"quic-geyser-blocking-client",
"quic-geyser-common",
"quic-geyser-server",
"serde",
"serde_json",
"solana-rpc-client",
"solana-sdk",
"tokio",
"tracing-subscriber",
]
[[package]]
name = "quic-geyser-quiche-utils"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
"itertools",
"log",
"mio",
"quic-geyser-common",
"quic-geyser-server",
"quiche",
"rand 0.8.5",
"ring 0.17.8",
"solana-sdk",
"tracing-subscriber",
]
@ -2524,6 +2561,7 @@ dependencies = [
"mio",
"mio_channel",
"quic-geyser-common",
"quic-geyser-quiche-utils",
"quiche",
"rand 0.8.5",
"rcgen",

View File

@ -6,6 +6,7 @@ members = [
"client",
"common",
"server",
"blocking_client",
"examples/tester-client",
"examples/tester-server",
"proxy",
@ -59,6 +60,8 @@ quic-geyser-common = {path = "common", version="0.1.3"}
quic-geyser-client = {path = "client", version="0.1.3"}
quic-geyser-plugin = {path = "plugin", version="0.1.3"}
quic-geyser-server = {path = "server", version="0.1.3"}
quic-geyser-quiche-utils = {path = "quiche", version = "0.1.3"}
quic-geyser-blocking-client = {path = "blocking_client", version = "0.1.3"}
[profile.release]
debug = true

View File

@ -0,0 +1,26 @@
[package]
name = "quic-geyser-blocking-client"
version = "0.1.3"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
solana-sdk = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-quiche-utils = { workspace = true }
bincode = { workspace = true }
ring = {workspace = true}
quiche = { workspace = true }
mio = { workspace = true }
mio_channel = { workspace = true }
[dev-dependencies]
rand = { workspace = true }
tracing-subscriber = { workspace = true }
itertools = { workspace = true }
quic-geyser-server = { workspace = true }

View File

@ -0,0 +1,180 @@
use crate::configure_client::configure_client;
use crate::quiche_client_loop::client_loop;
use quic_geyser_common::filters::Filter;
use quic_geyser_common::message::Message;
use quic_geyser_common::types::connections_parameters::ConnectionParameters;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
pub struct Client {
is_connected: Arc<AtomicBool>,
filters_sender: std::sync::mpsc::Sender<Message>,
}
impl Client {
pub fn new(
server_address: String,
connection_parameters: ConnectionParameters,
) -> anyhow::Result<(Client, std::sync::mpsc::Receiver<Message>)> {
let config = configure_client(
connection_parameters.max_number_of_streams,
connection_parameters.recieve_window_size,
connection_parameters.timeout_in_seconds,
connection_parameters.max_ack_delay,
connection_parameters.ack_exponent,
)?;
let server_address: SocketAddr = server_address.parse()?;
let socket_addr: SocketAddr = "0.0.0.0:0"
.parse()
.expect("Socket address should be returned");
let is_connected = Arc::new(AtomicBool::new(false));
let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel();
let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel();
let is_connected_client = is_connected.clone();
let _client_loop_jh = std::thread::spawn(move || {
if let Err(e) = client_loop(
config,
socket_addr,
server_address,
rx_sent_queue,
sx_recv_queue,
is_connected_client.clone(),
) {
log::error!("client stopped with error {e}");
}
is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed);
});
Ok((
Client {
is_connected,
filters_sender,
},
client_rx_queue,
))
}
pub fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
let message = Message::Filters(filters);
self.filters_sender.send(message)?;
Ok(())
}
pub fn is_connected(&self) -> bool {
self.is_connected.load(std::sync::atomic::Ordering::Relaxed)
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use quic_geyser_common::{
channel_message::AccountData,
compression::CompressionType,
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
filters::Filter,
message::Message,
types::{
account::Account, connections_parameters::ConnectionParameters,
slot_identifier::SlotIdentifier,
},
};
use quic_geyser_server::quic_server::QuicServer;
use solana_sdk::pubkey::Pubkey;
use std::{net::SocketAddr, thread::sleep, time::Duration};
pub fn get_account_for_test(slot: u64, data_size: usize) -> Account {
Account {
slot_identifier: SlotIdentifier { slot },
pubkey: Pubkey::new_unique(),
owner: Pubkey::new_unique(),
write_version: 0,
lamports: 12345,
rent_epoch: u64::MAX,
executable: false,
data: (0..data_size).map(|_| rand::random::<u8>()).collect_vec(),
compression_type: CompressionType::None,
data_length: data_size as u64,
}
}
use crate::client::Client;
#[test]
pub fn test_client() {
let server_sock: SocketAddr = "0.0.0.0:30000".parse().unwrap();
let url = format!("127.0.0.1:{}", server_sock.port());
let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2));
let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20));
let msg_acc_3 = Message::AccountMsg(get_account_for_test(2, 100));
let msg_acc_4 = Message::AccountMsg(get_account_for_test(3, 1_000));
let msg_acc_5 = Message::AccountMsg(get_account_for_test(4, 10_000));
let msgs = [msg_acc_1, msg_acc_2, msg_acc_3, msg_acc_4, msg_acc_5];
let jh = {
let msgs = msgs.clone();
let server_sock = server_sock.clone();
std::thread::spawn(move || {
let config = ConfigQuicPlugin {
address: server_sock,
quic_parameters: QuicParameters::default(),
compression_parameters: CompressionParameters {
compression_type: CompressionType::None,
},
number_of_retries: 100,
log_level: "debug".to_string(),
allow_accounts: true,
allow_accounts_at_startup: false,
};
let quic_server = QuicServer::new(config).unwrap();
// wait for client to connect and subscribe
sleep(Duration::from_secs(2));
for msg in msgs {
let Message::AccountMsg(account) = msg else {
panic!("should never happen");
};
quic_server
.send_message(
quic_geyser_common::channel_message::ChannelMessage::Account(
AccountData {
pubkey: account.pubkey,
account: account.solana_account(),
write_version: account.write_version,
},
account.slot_identifier.slot,
),
)
.unwrap();
}
sleep(Duration::from_secs(1));
})
};
// wait for server to start
sleep(Duration::from_secs(1));
// server started
let (client, reciever) = Client::new(
url,
ConnectionParameters {
max_number_of_streams: 10,
recieve_window_size: 1_000_000,
timeout_in_seconds: 10,
max_ack_delay: 25,
ack_exponent: 3,
},
)
.unwrap();
client.subscribe(vec![Filter::AccountsAll]).unwrap();
let mut cnt = 0;
for message_sent in msgs {
let msg = reciever.recv().unwrap();
log::info!("got message : {}", cnt);
cnt += 1;
assert_eq!(message_sent, msg);
}
jh.join().unwrap();
}
}

View File

@ -0,0 +1,3 @@
pub mod client;
pub mod configure_client;
pub mod quiche_client_loop;

View File

@ -6,7 +6,7 @@ use std::{
use quic_geyser_common::{defaults::MAX_DATAGRAM_SIZE, message::Message};
use crate::{
use quic_geyser_quiche_utils::{
quiche_reciever::{recv_message, ReadStreams},
quiche_sender::{handle_writable, send_message},
quiche_utils::{get_next_unidi, PartialResponses},
@ -275,6 +275,7 @@ mod tests {
};
use itertools::Itertools;
use quic_geyser_server::{configure_server::configure_server, quiche_server_loop::server_loop};
use solana_sdk::{account::Account, pubkey::Pubkey};
use quic_geyser_common::{
@ -286,10 +287,7 @@ mod tests {
types::block_meta::SlotMeta,
};
use crate::{
configure_client::configure_client, configure_server::configure_server,
quiche_server_loop::server_loop,
};
use crate::configure_client::configure_client;
use super::client_loop;

View File

@ -1,183 +0,0 @@
// THIS CODE IS DEPENDENT ON QUICHE
// use quic_geyser_common::filters::Filter;
// use quic_geyser_common::message::Message;
// use quic_geyser_common::quic::configure_client::configure_client;
// use quic_geyser_common::quic::quiche_client_loop::client_loop;
// use quic_geyser_common::types::connections_parameters::ConnectionParameters;
// use std::net::SocketAddr;
// use std::sync::atomic::AtomicBool;
// use std::sync::Arc;
// pub struct Client {
// is_connected: Arc<AtomicBool>,
// filters_sender: std::sync::mpsc::Sender<Message>,
// }
// impl Client {
// pub fn new(
// server_address: String,
// connection_parameters: ConnectionParameters,
// ) -> anyhow::Result<(Client, std::sync::mpsc::Receiver<Message>)> {
// let config = configure_client(
// connection_parameters.max_number_of_streams,
// connection_parameters.recieve_window_size,
// connection_parameters.timeout_in_seconds,
// connection_parameters.max_ack_delay,
// connection_parameters.ack_exponent,
// )?;
// let server_address: SocketAddr = server_address.parse()?;
// let socket_addr: SocketAddr = "0.0.0.0:0"
// .parse()
// .expect("Socket address should be returned");
// let is_connected = Arc::new(AtomicBool::new(false));
// let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel();
// let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel();
// let is_connected_client = is_connected.clone();
// let _client_loop_jh = std::thread::spawn(move || {
// if let Err(e) = client_loop(
// config,
// socket_addr,
// server_address,
// rx_sent_queue,
// sx_recv_queue,
// is_connected_client.clone(),
// ) {
// log::error!("client stopped with error {e}");
// }
// is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed);
// });
// Ok((
// Client {
// is_connected,
// filters_sender,
// },
// client_rx_queue,
// ))
// }
// pub fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
// let message = Message::Filters(filters);
// self.filters_sender.send(message)?;
// Ok(())
// }
// pub fn is_connected(&self) -> bool {
// self.is_connected.load(std::sync::atomic::Ordering::Relaxed)
// }
// }
// #[cfg(test)]
// mod tests {
// use itertools::Itertools;
// use quic_geyser_common::{
// channel_message::AccountData,
// compression::CompressionType,
// config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
// filters::Filter,
// message::Message,
// quic::quic_server::QuicServer,
// types::{
// account::Account, connections_parameters::ConnectionParameters,
// slot_identifier::SlotIdentifier,
// },
// };
// use solana_sdk::pubkey::Pubkey;
// use std::{net::SocketAddr, thread::sleep, time::Duration};
// pub fn get_account_for_test(slot: u64, data_size: usize) -> Account {
// Account {
// slot_identifier: SlotIdentifier { slot },
// pubkey: Pubkey::new_unique(),
// owner: Pubkey::new_unique(),
// write_version: 0,
// lamports: 12345,
// rent_epoch: u64::MAX,
// executable: false,
// data: (0..data_size).map(|_| rand::random::<u8>()).collect_vec(),
// compression_type: CompressionType::None,
// data_length: data_size as u64,
// }
// }
// use crate::blocking::client::Client;
// #[test]
// pub fn test_client() {
// tracing_subscriber::fmt::init();
// let server_sock: SocketAddr = "0.0.0.0:30000".parse().unwrap();
// let url = format!("127.0.0.1:{}", server_sock.port());
// let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2));
// let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20));
// let msg_acc_3 = Message::AccountMsg(get_account_for_test(2, 100));
// let msg_acc_4 = Message::AccountMsg(get_account_for_test(3, 1_000));
// let msg_acc_5 = Message::AccountMsg(get_account_for_test(4, 10_000));
// let msgs = [msg_acc_1, msg_acc_2, msg_acc_3, msg_acc_4, msg_acc_5];
// let jh = {
// let msgs = msgs.clone();
// let server_sock = server_sock.clone();
// std::thread::spawn(move || {
// let config = ConfigQuicPlugin {
// address: server_sock,
// quic_parameters: QuicParameters::default(),
// compression_parameters: CompressionParameters {
// compression_type: CompressionType::None,
// },
// number_of_retries: 100,
// log_level: "debug".to_string(),
// allow_accounts: true,
// allow_accounts_at_startup: false,
// };
// let quic_server = QuicServer::new(config).unwrap();
// // wait for client to connect and subscribe
// sleep(Duration::from_secs(2));
// for msg in msgs {
// let Message::AccountMsg(account) = msg else {
// panic!("should never happen");
// };
// quic_server
// .send_message(
// quic_geyser_common::channel_message::ChannelMessage::Account(
// AccountData {
// pubkey: account.pubkey,
// account: account.solana_account(),
// write_version: account.write_version,
// },
// account.slot_identifier.slot,
// ),
// )
// .unwrap();
// }
// sleep(Duration::from_secs(1));
// })
// };
// // wait for server to start
// sleep(Duration::from_secs(1));
// // server started
// let (client, reciever) = Client::new(
// url,
// ConnectionParameters {
// max_number_of_streams: 10,
// recieve_window_size: 1_000_000,
// timeout_in_seconds: 10,
// max_ack_delay: 25,
// ack_exponent: 3,
// },
// )
// .unwrap();
// client.subscribe(vec![Filter::AccountsAll]).unwrap();
// let mut cnt = 0;
// for message_sent in msgs {
// let msg = reciever.recv().unwrap();
// log::info!("got message : {}", cnt);
// cnt += 1;
// assert_eq!(message_sent, msg);
// }
// jh.join().unwrap();
// }
// }

View File

@ -1 +0,0 @@
pub mod client;

View File

@ -1,2 +1 @@
pub mod blocking;
pub mod non_blocking;

View File

@ -6,4 +6,4 @@ pub const DEFAULT_MAX_NB_CONNECTIONS: u64 = 10;
pub const DEFAULT_MAX_ACK_DELAY: u64 = 400;
pub const DEFAULT_ACK_EXPONENT: u64 = 3;
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser";
pub const MAX_DATAGRAM_SIZE: usize = 2000; // MAX: 65527
pub const MAX_DATAGRAM_SIZE: usize = 1350; // MAX: 65527

View File

@ -17,8 +17,7 @@ anyhow = { workspace = true }
log = { workspace = true }
bincode = { workspace = true }
tracing-subscriber = { workspace = true }
tokio = { workspace = true }
quic-geyser-client = { workspace = true }
quic-geyser-blocking-client = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-server = { workspace = true }

View File

@ -2,7 +2,7 @@ use std::{net::SocketAddr, str::FromStr};
use clap::Parser;
use cli::Args;
use quic_geyser_client::non_blocking::client::Client;
use quic_geyser_blocking_client::client::Client;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
@ -16,12 +16,11 @@ use quic_geyser_server::quic_server::QuicServer;
pub mod cli;
#[tokio::main()]
pub async fn main() -> anyhow::Result<()> {
pub fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let (client, mut message_channel) = Client::new(
let (client, message_channel) = Client::new(
args.source_url,
ConnectionParameters {
max_number_of_streams: args.max_number_of_streams_per_client,
@ -30,18 +29,15 @@ pub async fn main() -> anyhow::Result<()> {
max_ack_delay: args.max_ack_delay,
ack_exponent: args.ack_exponent,
},
)
.await?;
)?;
log::info!("Subscribing");
client
.subscribe(vec![
Filter::AccountsAll,
Filter::TransactionsAll,
Filter::Slot,
Filter::BlockMeta,
])
.await?;
client.subscribe(vec![
Filter::AccountsAll,
Filter::TransactionsAll,
Filter::Slot,
Filter::BlockMeta,
])?;
let quic_config = ConfigQuicPlugin {
address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(),
@ -83,7 +79,7 @@ pub async fn main() -> anyhow::Result<()> {
}
});
while let Some(message) = message_channel.recv().await {
while let Ok(message) = message_channel.recv() {
let channel_message = match message {
quic_geyser_common::message::Message::AccountMsg(account_message) => {
ChannelMessage::Account(

24
quiche/Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "quic-geyser-quiche-utils"
version = "0.1.3"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
solana-sdk = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
quic-geyser-common = { workspace = true }
bincode = { workspace = true }
ring = {workspace = true}
quiche = { workspace = true }
mio = { workspace = true }
[dev-dependencies]
rand = { workspace = true }
tracing-subscriber = { workspace = true }
itertools = { workspace = true }
quic-geyser-server = { workspace = true }

3
quiche/src/lib.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod quiche_reciever;
pub mod quiche_sender;
pub mod quiche_utils;

View File

@ -15,6 +15,7 @@ itertools = { workspace = true }
bincode = { workspace = true }
ring = {workspace = true}
quiche = { workspace = true, features = ["boringssl-boring-crate"] }
quic-geyser-quiche-utils = { workspace = true }
rcgen = { workspace = true }
boring = { workspace = true }

View File

@ -1,8 +1,3 @@
pub mod configure_client;
pub mod configure_server;
pub mod quic_server;
pub mod quiche_client_loop;
pub mod quiche_reciever;
pub mod quiche_sender;
pub mod quiche_server_loop;
pub mod quiche_utils;

View File

@ -23,15 +23,10 @@ use quic_geyser_common::{
types::{account::Account, block_meta::SlotMeta, slot_identifier::SlotIdentifier},
};
use crate::{
quiche_reciever::recv_message,
use quic_geyser_quiche_utils::{
quiche_reciever::{recv_message, ReadStreams},
quiche_sender::{handle_writable, send_message},
quiche_utils::{get_next_unidi, mint_token, validate_token},
};
use super::{
quiche_reciever::ReadStreams,
quiche_utils::{write_to_socket, PartialResponses},
quiche_utils::{get_next_unidi, mint_token, validate_token, write_to_socket, PartialResponses},
};
struct DispatchingData {