making client based on quinn instead of quiche

This commit is contained in:
godmodegalactus 2024-05-29 16:27:56 +02:00
parent 26be1bc5be
commit 24757c498b
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
29 changed files with 689 additions and 411 deletions

204
Cargo.lock generated
View File

@ -755,9 +755,9 @@ dependencies = [
[[package]]
name = "clang-sys"
version = "1.7.0"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67523a3b4be3ce1989d607a828d036249522dd9c1c8de7f4dd2dae43a37369d1"
checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4"
dependencies = [
"glob",
"libc",
@ -867,6 +867,12 @@ dependencies = [
"web-sys",
]
[[package]]
name = "const-oid"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4c78c047431fee22c1a7bb92e00ad095a02a983affe4d8a72e2a2c62c1b94f3"
[[package]]
name = "constant_time_eq"
version = "0.3.0"
@ -1025,6 +1031,15 @@ dependencies = [
"syn 2.0.65",
]
[[package]]
name = "der"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6919815d73839e7ad218de758883aae3a257ba6759ce7a9992501efbb53d705c"
dependencies = [
"const-oid",
]
[[package]]
name = "deranged"
version = "0.3.11"
@ -2212,6 +2227,12 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "overload"
version = "0.1.1"
@ -2307,6 +2328,16 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkcs8"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cabda3fb821068a9a4fab19a683eac3af12edf0f34b94a8be53c4972b8149d0"
dependencies = [
"der",
"spki",
]
[[package]]
name = "pkg-config"
version = "0.3.30"
@ -2408,13 +2439,19 @@ dependencies = [
[[package]]
name = "quic-geyser-client"
version = "0.1.0"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
"itertools",
"log",
"pkcs8",
"quic-geyser-common",
"quic-geyser-server",
"quinn",
"rand 0.8.5",
"rcgen",
"rustls",
"solana-sdk",
"tokio",
"tracing-subscriber",
@ -2422,20 +2459,13 @@ dependencies = [
[[package]]
name = "quic-geyser-common"
version = "0.1.0"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
"boring",
"itertools",
"log",
"lz4",
"mio",
"mio_channel",
"quiche",
"rand 0.8.5",
"rcgen",
"ring 0.17.8",
"serde",
"solana-sdk",
"solana-transaction-status",
@ -2445,7 +2475,7 @@ dependencies = [
[[package]]
name = "quic-geyser-plugin"
version = "0.1.0"
version = "0.1.3"
dependencies = [
"agave-geyser-plugin-interface",
"anyhow",
@ -2454,6 +2484,7 @@ dependencies = [
"git-version",
"log",
"quic-geyser-common",
"quic-geyser-server",
"serde",
"serde_json",
"solana-logger",
@ -2464,7 +2495,7 @@ dependencies = [
[[package]]
name = "quic-geyser-plugin-proxy"
version = "0.1.0"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
@ -2472,10 +2503,35 @@ dependencies = [
"log",
"quic-geyser-client",
"quic-geyser-common",
"quic-geyser-server",
"serde",
"serde_json",
"solana-rpc-client",
"solana-sdk",
"tokio",
"tracing-subscriber",
]
[[package]]
name = "quic-geyser-server"
version = "0.1.3"
dependencies = [
"anyhow",
"bincode",
"boring",
"itertools",
"log",
"mio",
"mio_channel",
"quic-geyser-common",
"quiche",
"rand 0.8.5",
"rcgen",
"ring 0.17.8",
"serde",
"solana-sdk",
"solana-transaction-status",
"thiserror",
"tracing-subscriber",
]
@ -2493,6 +2549,7 @@ dependencies = [
"serde_json",
"solana-rpc-client",
"solana-sdk",
"tokio",
"tracing-subscriber",
]
@ -2506,6 +2563,7 @@ dependencies = [
"itertools",
"log",
"quic-geyser-common",
"quic-geyser-server",
"rand 0.8.5",
"serde",
"serde_json",
@ -2535,6 +2593,54 @@ dependencies = [
"winapi",
]
[[package]]
name = "quinn"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cc2c5017e4b43d5995dcea317bc46c1e09404c0a9664d2908f7f02dfe943d75"
dependencies = [
"bytes",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "quinn-proto"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "141bf7dfde2fbc246bfd3fe12f2455aa24b0fbd9af535d8c86c7bd1381ff2b1a"
dependencies = [
"bytes",
"rand 0.8.5",
"ring 0.16.20",
"rustc-hash",
"rustls",
"rustls-native-certs",
"slab",
"thiserror",
"tinyvec",
"tracing",
]
[[package]]
name = "quinn-udp"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "055b4e778e8feb9f93c4e439f71dc2156ef13360b432b799e179a8c4cdf0b1d7"
dependencies = [
"bytes",
"libc",
"socket2",
"tracing",
"windows-sys 0.48.0",
]
[[package]]
name = "quote"
version = "1.0.36"
@ -2790,16 +2896,28 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.21.12"
version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e"
checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8"
dependencies = [
"log",
"ring 0.17.8",
"ring 0.16.20",
"rustls-webpki",
"sct",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
dependencies = [
"openssl-probe",
"rustls-pemfile",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.4"
@ -2831,6 +2949,15 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "schannel"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534"
dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -2867,6 +2994,29 @@ dependencies = [
"untrusted 0.9.0",
]
[[package]]
name = "security-framework"
version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0"
dependencies = [
"bitflags 2.5.0",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "semver"
version = "1.0.23"
@ -3516,6 +3666,15 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "spki"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d01ac02a6ccf3e07db148d2be087da624fea0221a16152ed01f0496a6b0a27"
dependencies = [
"der",
]
[[package]]
name = "spl-associated-token-account"
version = "2.3.0"
@ -4015,10 +4174,23 @@ version = "0.1.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.65",
]
[[package]]
name = "tracing-core"
version = "0.1.32"

View File

@ -5,6 +5,7 @@ members = [
"plugin",
"client",
"common",
"server",
"examples/tester-client",
"examples/tester-server",
"proxy",
@ -52,10 +53,12 @@ cargo-lock = "9.0.0"
git-version = "0.3.5"
vergen = "8.2.1"
rand = "0.8.5"
tokio = "1.28.2"
quic-geyser-common = {path = "common", version="0.1.0"}
quic-geyser-client = {path = "client", version="0.1.0"}
quic-geyser-plugin = {path = "plugin", version="0.1.0"}
quic-geyser-common = {path = "common", version="0.1.3"}
quic-geyser-client = {path = "client", version="0.1.3"}
quic-geyser-plugin = {path = "plugin", version="0.1.3"}
quic-geyser-server = {path = "server", version="0.1.3"}
[profile.release]
debug = true

View File

@ -1,6 +1,6 @@
[package]
name = "quic-geyser-client"
version = "0.1.0"
version = "0.1.3"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -10,11 +10,17 @@ edition = "2021"
solana-sdk = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
tokio = "1.28.2"
quinn = "0.10.2"
rustls = "=0.21.7"
rcgen = "0.10.0"
pkcs8 = "0.8.0"
quic-geyser-common = { workspace = true }
bincode = { workspace = true }
tokio = { workspace = true }
[dev-dependencies]
rand = { workspace = true }
tracing-subscriber = { workspace = true }
itertools = { workspace = true }
itertools = { workspace = true }
quic-geyser-server = { workspace = true }

View File

@ -1,181 +1,183 @@
use quic_geyser_common::filters::Filter;
use quic_geyser_common::message::Message;
use quic_geyser_common::quic::configure_client::configure_client;
use quic_geyser_common::quic::quiche_client_loop::client_loop;
use quic_geyser_common::types::connections_parameters::ConnectionParameters;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
// THIS CODE IS DEPENDENT ON QUICHE
pub struct Client {
is_connected: Arc<AtomicBool>,
filters_sender: std::sync::mpsc::Sender<Message>,
}
// use quic_geyser_common::filters::Filter;
// use quic_geyser_common::message::Message;
// use quic_geyser_common::quic::configure_client::configure_client;
// use quic_geyser_common::quic::quiche_client_loop::client_loop;
// use quic_geyser_common::types::connections_parameters::ConnectionParameters;
// use std::net::SocketAddr;
// use std::sync::atomic::AtomicBool;
// use std::sync::Arc;
impl Client {
pub fn new(
server_address: String,
connection_parameters: ConnectionParameters,
) -> anyhow::Result<(Client, std::sync::mpsc::Receiver<Message>)> {
let config = configure_client(
connection_parameters.max_number_of_streams,
connection_parameters.recieve_window_size,
connection_parameters.timeout_in_seconds,
connection_parameters.max_ack_delay,
connection_parameters.ack_exponent,
)?;
let server_address: SocketAddr = server_address.parse()?;
let socket_addr: SocketAddr = "0.0.0.0:0"
.parse()
.expect("Socket address should be returned");
let is_connected = Arc::new(AtomicBool::new(false));
let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel();
let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel();
// pub struct Client {
// is_connected: Arc<AtomicBool>,
// filters_sender: std::sync::mpsc::Sender<Message>,
// }
let is_connected_client = is_connected.clone();
let _client_loop_jh = std::thread::spawn(move || {
if let Err(e) = client_loop(
config,
socket_addr,
server_address,
rx_sent_queue,
sx_recv_queue,
is_connected_client.clone(),
) {
log::error!("client stopped with error {e}");
}
is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed);
});
Ok((
Client {
is_connected,
filters_sender,
},
client_rx_queue,
))
}
// impl Client {
// pub fn new(
// server_address: String,
// connection_parameters: ConnectionParameters,
// ) -> anyhow::Result<(Client, std::sync::mpsc::Receiver<Message>)> {
// let config = configure_client(
// connection_parameters.max_number_of_streams,
// connection_parameters.recieve_window_size,
// connection_parameters.timeout_in_seconds,
// connection_parameters.max_ack_delay,
// connection_parameters.ack_exponent,
// )?;
// let server_address: SocketAddr = server_address.parse()?;
// let socket_addr: SocketAddr = "0.0.0.0:0"
// .parse()
// .expect("Socket address should be returned");
// let is_connected = Arc::new(AtomicBool::new(false));
// let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel();
// let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel();
pub fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
let message = Message::Filters(filters);
self.filters_sender.send(message)?;
Ok(())
}
// let is_connected_client = is_connected.clone();
// let _client_loop_jh = std::thread::spawn(move || {
// if let Err(e) = client_loop(
// config,
// socket_addr,
// server_address,
// rx_sent_queue,
// sx_recv_queue,
// is_connected_client.clone(),
// ) {
// log::error!("client stopped with error {e}");
// }
// is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed);
// });
// Ok((
// Client {
// is_connected,
// filters_sender,
// },
// client_rx_queue,
// ))
// }
pub fn is_connected(&self) -> bool {
self.is_connected.load(std::sync::atomic::Ordering::Relaxed)
}
}
// pub fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
// let message = Message::Filters(filters);
// self.filters_sender.send(message)?;
// Ok(())
// }
#[cfg(test)]
mod tests {
use itertools::Itertools;
use quic_geyser_common::{
channel_message::AccountData,
compression::CompressionType,
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
filters::Filter,
message::Message,
quic::quic_server::QuicServer,
types::{
account::Account, connections_parameters::ConnectionParameters,
slot_identifier::SlotIdentifier,
},
};
use solana_sdk::pubkey::Pubkey;
use std::{net::SocketAddr, thread::sleep, time::Duration};
// pub fn is_connected(&self) -> bool {
// self.is_connected.load(std::sync::atomic::Ordering::Relaxed)
// }
// }
pub fn get_account_for_test(slot: u64, data_size: usize) -> Account {
Account {
slot_identifier: SlotIdentifier { slot },
pubkey: Pubkey::new_unique(),
owner: Pubkey::new_unique(),
write_version: 0,
lamports: 12345,
rent_epoch: u64::MAX,
executable: false,
data: (0..data_size).map(|_| rand::random::<u8>()).collect_vec(),
compression_type: CompressionType::None,
data_length: data_size as u64,
}
}
// #[cfg(test)]
// mod tests {
// use itertools::Itertools;
// use quic_geyser_common::{
// channel_message::AccountData,
// compression::CompressionType,
// config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
// filters::Filter,
// message::Message,
// quic::quic_server::QuicServer,
// types::{
// account::Account, connections_parameters::ConnectionParameters,
// slot_identifier::SlotIdentifier,
// },
// };
// use solana_sdk::pubkey::Pubkey;
// use std::{net::SocketAddr, thread::sleep, time::Duration};
use crate::blocking::client::Client;
// pub fn get_account_for_test(slot: u64, data_size: usize) -> Account {
// Account {
// slot_identifier: SlotIdentifier { slot },
// pubkey: Pubkey::new_unique(),
// owner: Pubkey::new_unique(),
// write_version: 0,
// lamports: 12345,
// rent_epoch: u64::MAX,
// executable: false,
// data: (0..data_size).map(|_| rand::random::<u8>()).collect_vec(),
// compression_type: CompressionType::None,
// data_length: data_size as u64,
// }
// }
#[test]
pub fn test_client() {
tracing_subscriber::fmt::init();
let server_sock: SocketAddr = "0.0.0.0:30000".parse().unwrap();
let url = format!("127.0.0.1:{}", server_sock.port());
// use crate::blocking::client::Client;
let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2));
let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20));
let msg_acc_3 = Message::AccountMsg(get_account_for_test(2, 100));
let msg_acc_4 = Message::AccountMsg(get_account_for_test(3, 1_000));
let msg_acc_5 = Message::AccountMsg(get_account_for_test(4, 10_000));
let msgs = [msg_acc_1, msg_acc_2, msg_acc_3, msg_acc_4, msg_acc_5];
// #[test]
// pub fn test_client() {
// tracing_subscriber::fmt::init();
// let server_sock: SocketAddr = "0.0.0.0:30000".parse().unwrap();
// let url = format!("127.0.0.1:{}", server_sock.port());
let jh = {
let msgs = msgs.clone();
let server_sock = server_sock.clone();
std::thread::spawn(move || {
let config = ConfigQuicPlugin {
address: server_sock,
quic_parameters: QuicParameters::default(),
compression_parameters: CompressionParameters {
compression_type: CompressionType::None,
},
number_of_retries: 100,
log_level: "debug".to_string(),
allow_accounts: true,
allow_accounts_at_startup: false,
};
let quic_server = QuicServer::new(config).unwrap();
// wait for client to connect and subscribe
sleep(Duration::from_secs(2));
for msg in msgs {
let Message::AccountMsg(account) = msg else {
panic!("should never happen");
};
quic_server
.send_message(
quic_geyser_common::channel_message::ChannelMessage::Account(
AccountData {
pubkey: account.pubkey,
account: account.solana_account(),
write_version: account.write_version,
},
account.slot_identifier.slot,
),
)
.unwrap();
}
sleep(Duration::from_secs(1));
})
};
// wait for server to start
sleep(Duration::from_secs(1));
// let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2));
// let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20));
// let msg_acc_3 = Message::AccountMsg(get_account_for_test(2, 100));
// let msg_acc_4 = Message::AccountMsg(get_account_for_test(3, 1_000));
// let msg_acc_5 = Message::AccountMsg(get_account_for_test(4, 10_000));
// let msgs = [msg_acc_1, msg_acc_2, msg_acc_3, msg_acc_4, msg_acc_5];
// server started
let (client, reciever) = Client::new(
url,
ConnectionParameters {
max_number_of_streams: 10,
recieve_window_size: 1_000_000,
timeout_in_seconds: 10,
max_ack_delay: 25,
ack_exponent: 3,
},
)
.unwrap();
client.subscribe(vec![Filter::AccountsAll]).unwrap();
// let jh = {
// let msgs = msgs.clone();
// let server_sock = server_sock.clone();
// std::thread::spawn(move || {
// let config = ConfigQuicPlugin {
// address: server_sock,
// quic_parameters: QuicParameters::default(),
// compression_parameters: CompressionParameters {
// compression_type: CompressionType::None,
// },
// number_of_retries: 100,
// log_level: "debug".to_string(),
// allow_accounts: true,
// allow_accounts_at_startup: false,
// };
// let quic_server = QuicServer::new(config).unwrap();
// // wait for client to connect and subscribe
// sleep(Duration::from_secs(2));
// for msg in msgs {
// let Message::AccountMsg(account) = msg else {
// panic!("should never happen");
// };
// quic_server
// .send_message(
// quic_geyser_common::channel_message::ChannelMessage::Account(
// AccountData {
// pubkey: account.pubkey,
// account: account.solana_account(),
// write_version: account.write_version,
// },
// account.slot_identifier.slot,
// ),
// )
// .unwrap();
// }
// sleep(Duration::from_secs(1));
// })
// };
// // wait for server to start
// sleep(Duration::from_secs(1));
let mut cnt = 0;
for message_sent in msgs {
let msg = reciever.recv().unwrap();
log::info!("got message : {}", cnt);
cnt += 1;
assert_eq!(message_sent, msg);
}
jh.join().unwrap();
}
}
// // server started
// let (client, reciever) = Client::new(
// url,
// ConnectionParameters {
// max_number_of_streams: 10,
// recieve_window_size: 1_000_000,
// timeout_in_seconds: 10,
// max_ack_delay: 25,
// ack_exponent: 3,
// },
// )
// .unwrap();
// client.subscribe(vec![Filter::AccountsAll]).unwrap();
// let mut cnt = 0;
// for message_sent in msgs {
// let msg = reciever.recv().unwrap();
// log::info!("got message : {}", cnt);
// cnt += 1;
// assert_eq!(message_sent, msg);
// }
// jh.join().unwrap();
// }
// }

View File

@ -1,4 +1,2 @@
pub mod blocking;
pub mod non_blocking;
pub const DEFAULT_MAX_STREAM: u64 = quic_geyser_common::quic::configure_client::DEFAULT_MAX_STREAMS;

View File

@ -1,78 +1,177 @@
use quic_geyser_common::defaults::ALPN_GEYSER_PROTOCOL_ID;
use quic_geyser_common::defaults::DEFAULT_MAX_RECIEVE_WINDOW_SIZE;
use quic_geyser_common::defaults::MAX_DATAGRAM_SIZE;
use quic_geyser_common::filters::Filter;
use quic_geyser_common::message::Message;
use quic_geyser_common::quic::configure_client::configure_client;
use quic_geyser_common::quic::quiche_client_loop::client_loop;
use quic_geyser_common::types::connections_parameters::ConnectionParameters;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, RecvStream,
SendStream, TokioRuntime, TransportConfig, VarInt,
};
use std::net::{SocketAddr, UdpSocket};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
pub fn create_client_endpoint(connection_parameters: ConnectionParameters) -> Endpoint {
const MINIMUM_MAXIMUM_TRANSMISSION_UNIT: u16 = 2000;
const INITIAL_MAXIMUM_TRANSMISSION_UNIT: u16 = MINIMUM_MAXIMUM_TRANSMISSION_UNIT;
let mut endpoint = {
let client_socket = UdpSocket::bind("0.0.0.0:0").expect("Client socket should be binded");
let mut config = EndpointConfig::default();
config
.max_udp_payload_size(MAX_DATAGRAM_SIZE as u16)
.expect("Should set max MTU");
quinn::Endpoint::new(config, None, client_socket, Arc::new(TokioRuntime))
.expect("create_endpoint quinn::Endpoint::new")
};
let cert = rcgen::generate_simple_self_signed(vec!["quic_geyser_client".into()]).unwrap();
let key = rustls::PrivateKey(cert.serialize_private_key_der());
let cert = rustls::Certificate(cert.serialize_der().unwrap());
let mut crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(Arc::new(ClientSkipServerVerification {}))
.with_client_auth_cert(vec![cert], key)
.expect("Should create client config");
crypto.enable_early_data = true;
crypto.alpn_protocols = vec![ALPN_GEYSER_PROTOCOL_ID.to_vec()];
let mut config = ClientConfig::new(Arc::new(crypto));
let mut transport_config = TransportConfig::default();
let timeout = IdleTimeout::try_from(Duration::from_secs(30)).unwrap();
transport_config.max_idle_timeout(Some(timeout));
transport_config.keep_alive_interval(Some(Duration::from_secs(1)));
transport_config
.datagram_receive_buffer_size(Some(connection_parameters.recieve_window_size as usize));
transport_config.datagram_send_buffer_size(connection_parameters.recieve_window_size as usize);
transport_config.initial_mtu(INITIAL_MAXIMUM_TRANSMISSION_UNIT);
transport_config.max_concurrent_bidi_streams(VarInt::from(
connection_parameters.max_number_of_streams as u32,
));
transport_config.max_concurrent_uni_streams(VarInt::from(
connection_parameters.max_number_of_streams as u32,
));
transport_config.min_mtu(MINIMUM_MAXIMUM_TRANSMISSION_UNIT);
transport_config.mtu_discovery_config(None);
transport_config.enable_segmentation_offload(false);
config.transport_config(Arc::new(transport_config));
endpoint.set_default_client_config(config);
endpoint
}
pub async fn recv_message(
mut recv_stream: RecvStream,
timeout_in_seconds: u64,
) -> anyhow::Result<Message> {
let mut buffer: Vec<u8> = vec![];
while let Some(data) = tokio::time::timeout(
Duration::from_secs(timeout_in_seconds),
recv_stream.read_chunk(DEFAULT_MAX_RECIEVE_WINDOW_SIZE as usize, true),
)
.await??
{
let bytes = data.bytes.to_vec();
buffer.extend_from_slice(&bytes);
}
Ok(bincode::deserialize::<Message>(&buffer)?)
}
pub struct Client {
is_connected: Arc<AtomicBool>,
filters_sender: std::sync::mpsc::Sender<Message>,
connection: Connection,
}
pub async fn send_message(mut send_stream: SendStream, message: &Message) -> anyhow::Result<()> {
let binary = bincode::serialize(&message)?;
send_stream.write_all(&binary).await?;
send_stream.finish().await?;
Ok(())
}
impl Client {
pub fn new(
pub async fn new(
server_address: String,
connection_parameters: ConnectionParameters,
) -> anyhow::Result<(Client, tokio::sync::mpsc::UnboundedReceiver<Message>)> {
let config = configure_client(
connection_parameters.max_number_of_streams,
connection_parameters.recieve_window_size,
connection_parameters.timeout_in_seconds,
connection_parameters.max_ack_delay,
connection_parameters.ack_exponent,
)?;
let server_address: SocketAddr = server_address.parse()?;
let socket_addr: SocketAddr = "0.0.0.0:0"
.parse()
.expect("Socket address should be returned");
let is_connected = Arc::new(AtomicBool::new(false));
let (filters_sender, rx_sent_queue) = std::sync::mpsc::channel();
let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel();
let endpoint = create_client_endpoint(connection_parameters);
let socket_addr = SocketAddr::from_str(&server_address)?;
let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?;
let is_connected_client = is_connected.clone();
let _client_loop_jh = std::thread::spawn(move || {
if let Err(e) = client_loop(
config,
socket_addr,
server_address,
rx_sent_queue,
sx_recv_queue,
is_connected_client.clone(),
) {
log::error!("client stopped with error {e}");
}
is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed);
});
let (message_sx_queue, message_rx_queue) =
tokio::sync::mpsc::unbounded_channel::<Message>();
let (tokio_sx_queue, tokio_rx_queue) = tokio::sync::mpsc::unbounded_channel::<Message>();
let _tokio_depile_loop = std::thread::spawn(move || {
while let Ok(message) = client_rx_queue.recv() {
if tokio_sx_queue.send(message).is_err() {
break;
let connection = connecting.await?;
{
let connection = connection.clone();
tokio::spawn(async move {
loop {
let stream = connection.accept_uni().await;
match stream {
Ok(recv_stream) => {
let sender = message_sx_queue.clone();
tokio::spawn(async move {
let message = recv_message(recv_stream, 10).await;
match message {
Ok(message) => {
let _ = sender.send(message);
}
Err(e) => {
log::error!("Error getting message {}", e);
}
}
});
}
Err(e) => match &e {
ConnectionError::ConnectionClosed(_)
| ConnectionError::ApplicationClosed(_)
| ConnectionError::LocallyClosed => {
break;
}
_ => {
log::error!("Got {} while listing to the connection", e);
}
},
}
}
}
});
});
}
Ok((
Client {
is_connected,
filters_sender,
},
tokio_rx_queue,
))
Ok((Client { connection }, message_rx_queue))
}
pub fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
let message = Message::Filters(filters);
self.filters_sender.send(message)?;
pub async fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
let send_stream = self.connection.open_uni().await?;
send_message(send_stream, &Message::Filters(filters)).await?;
Ok(())
}
}
pub fn is_connected(&self) -> bool {
self.is_connected.load(std::sync::atomic::Ordering::Relaxed)
pub struct ClientSkipServerVerification;
impl ClientSkipServerVerification {
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
}
impl rustls::client::ServerCertVerifier for ClientSkipServerVerification {
fn verify_server_cert(
&self,
_end_entity: &rustls::Certificate,
_intermediates: &[rustls::Certificate],
_server_name: &rustls::ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
Ok(rustls::client::ServerCertVerified::assertion())
}
}
@ -85,12 +184,12 @@ mod tests {
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
filters::Filter,
message::Message,
quic::quic_server::QuicServer,
types::{
account::Account, connections_parameters::ConnectionParameters,
slot_identifier::SlotIdentifier,
},
};
use quic_geyser_server::quic_server::QuicServer;
use solana_sdk::pubkey::Pubkey;
use std::{net::SocketAddr, thread::sleep, time::Duration};
@ -175,8 +274,9 @@ mod tests {
ack_exponent: 3,
},
)
.await
.unwrap();
client.subscribe(vec![Filter::AccountsAll]).unwrap();
client.subscribe(vec![Filter::AccountsAll]).await.unwrap();
let mut cnt = 0;
for message_sent in msgs {

View File

@ -1,6 +1,6 @@
[package]
name = "quic-geyser-common"
version = "0.1.0"
version = "0.1.3"
edition = "2021"
[dependencies]
@ -8,19 +8,11 @@ solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
serde = { workspace = true }
bincode = { workspace = true }
lz4 = { workspace = true }
quiche = { workspace = true, features = ["boringssl-boring-crate"] }
boring = { workspace = true }
rcgen = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
thiserror = {workspace = true}
itertools = { workspace = true }
ring = {workspace = true}
mio = { workspace = true }
mio_channel = { workspace = true }
itertools = { workspace = true }
lz4 = { workspace = true }
[dev-dependencies]
rand = { workspace = true }

View File

@ -4,14 +4,12 @@ use serde::{Deserialize, Serialize};
use crate::{
compression::CompressionType,
quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS},
defaults::{
DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY,
DEFAULT_MAX_NB_CONNECTIONS, DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS,
},
};
pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10;
pub const DEFAULT_MAX_NB_CONNECTIONS: u64 = 10;
pub const DEFAULT_MAX_ACK_DELAY: u64 = 250;
pub const DEFAULT_ACK_EXPONENT: u64 = 3;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigQuicPlugin {

View File

@ -1,8 +1,8 @@
pub mod channel_message;
pub mod compression;
pub mod config;
pub mod defaults;
pub mod filters;
pub mod message;
pub mod plugin_error;
pub mod quic;
pub mod types;

View File

@ -1,8 +1,8 @@
use serde::{Deserialize, Serialize};
use crate::{
config::{DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY},
quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS},
use crate::defaults::{
DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY,
DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS,
};
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]

View File

@ -7,7 +7,7 @@ authors = ["Godmode Galactus"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
solana-rpc-client = "~1.17.28"
solana-rpc-client = "~1.17.31"
clap = { workspace = true, features = ["derive", "env"] }
serde = { workspace = true }
@ -17,6 +17,7 @@ anyhow = { workspace = true }
log = { workspace = true }
bincode = { workspace = true }
tracing-subscriber = { workspace = true }
tokio = { workspace = true }
quic-geyser-client = { workspace = true }
quic-geyser-common = { workspace = true }

View File

@ -1,16 +1,13 @@
use std::{
sync::{atomic::AtomicU64, Arc},
thread::sleep,
time::{Duration, Instant},
time::Duration,
};
use clap::Parser;
use cli::Args;
use quic_geyser_client::blocking::client::Client;
use quic_geyser_common::{
filters::Filter, quic::configure_client::DEFAULT_MAX_RECIEVE_WINDOW_SIZE,
types::connections_parameters::ConnectionParameters,
};
use quic_geyser_client::non_blocking::client::Client;
use quic_geyser_common::{filters::Filter, types::connections_parameters::ConnectionParameters};
use solana_rpc_client::rpc_client::RpcClient;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
@ -38,21 +35,14 @@ pub mod cli;
// let config_json = json!(config);
//println!("{}", config_json);
pub fn main() {
#[tokio::main]
pub async fn main() {
tracing_subscriber::fmt::init();
let args = Args::parse();
println!("Connecting");
let (client, reciever) = Client::new(
args.url,
ConnectionParameters {
max_number_of_streams: 1024 * 1024,
recieve_window_size: DEFAULT_MAX_RECIEVE_WINDOW_SIZE,
timeout_in_seconds: 30,
max_ack_delay: 25,
ack_exponent: 3,
},
)
.unwrap();
let (client, mut reciever) = Client::new(args.url, ConnectionParameters::default())
.await
.unwrap();
println!("Connected");
let bytes_transfered = Arc::new(AtomicU64::new(0));
@ -141,78 +131,64 @@ pub fn main() {
Filter::Slot,
Filter::BlockMeta,
])
.await
.unwrap();
println!("Subscribed");
let instant = Instant::now();
while let Some(message) = reciever.recv().await {
let message_size = bincode::serialize(&message).unwrap().len();
bytes_transfered.fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed);
match message {
quic_geyser_common::message::Message::AccountMsg(account) => {
log::trace!("got account notification : {} ", account.pubkey);
account_notification.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let data_len = account.data_length as usize;
total_accounts_size
.fetch_add(account.data_length, std::sync::atomic::Ordering::Relaxed);
let solana_account = account.solana_account();
if solana_account.data.len() != data_len {
println!("data length different");
println!(
"Account : {}, owner: {}=={}, datalen: {}=={}, lamports: {}",
account.pubkey,
account.owner,
solana_account.owner,
data_len,
solana_account.data.len(),
solana_account.lamports
);
panic!("Wrong account data");
}
loop {
match reciever.recv() {
Ok(message) => {
let message_size = bincode::serialize(&message).unwrap().len();
bytes_transfered
.fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed);
match message {
quic_geyser_common::message::Message::AccountMsg(account) => {
log::trace!("got account notification : {} ", account.pubkey);
account_notification.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let data_len = account.data_length as usize;
total_accounts_size
.fetch_add(account.data_length, std::sync::atomic::Ordering::Relaxed);
let solana_account = account.solana_account();
if solana_account.data.len() != data_len {
println!("data length different");
println!(
"Account : {}, owner: {}=={}, datalen: {}=={}, lamports: {}",
account.pubkey,
account.owner,
solana_account.owner,
data_len,
solana_account.data.len(),
solana_account.lamports
);
panic!("Wrong account data");
}
account_slot.store(
account.slot_identifier.slot,
std::sync::atomic::Ordering::Relaxed,
);
}
quic_geyser_common::message::Message::SlotMsg(slot) => {
log::trace!("got slot notification : {} ", slot.slot);
slot_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if slot.commitment_level == CommitmentLevel::Processed {
slot_slot.store(slot.slot, std::sync::atomic::Ordering::Relaxed);
}
}
quic_geyser_common::message::Message::BlockMetaMsg(block_meta) => {
log::trace!("got blockmeta notification : {} ", block_meta.slot);
blockmeta_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
blockmeta_slot.store(block_meta.slot, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::TransactionMsg(tx) => {
log::trace!(
"got transaction notification: {}",
tx.signatures[0].to_string()
);
transaction_notifications
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::Filters(_) => {
// Not supported
}
quic_geyser_common::message::Message::Ping => {
// not supported ping
}
account_slot.store(
account.slot_identifier.slot,
std::sync::atomic::Ordering::Relaxed,
);
}
quic_geyser_common::message::Message::SlotMsg(slot) => {
log::trace!("got slot notification : {} ", slot.slot);
slot_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if slot.commitment_level == CommitmentLevel::Processed {
slot_slot.store(slot.slot, std::sync::atomic::Ordering::Relaxed);
}
}
Err(e) => {
println!(
"Conection closed and streaming stopped in {} seconds with error {}, connected : {}",
instant.elapsed().as_secs(), e, client.is_connected()
quic_geyser_common::message::Message::BlockMetaMsg(block_meta) => {
log::trace!("got blockmeta notification : {} ", block_meta.slot);
blockmeta_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
blockmeta_slot.store(block_meta.slot, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::TransactionMsg(tx) => {
log::trace!(
"got transaction notification: {}",
tx.signatures[0].to_string()
);
break;
transaction_notifications.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
quic_geyser_common::message::Message::Filters(_) => {
// Not supported
}
quic_geyser_common::message::Message::Ping => {
// not supported ping
}
}
}

View File

@ -20,4 +20,5 @@ tracing-subscriber = { workspace = true }
rand = "0.8.5"
quic-geyser-common = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-server = { workspace = true }

View File

@ -10,8 +10,8 @@ use itertools::Itertools;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
quic::quic_server::QuicServer,
};
use quic_geyser_server::quic_server::QuicServer;
use rand::{thread_rng, Rng};
use solana_sdk::{account::Account, pubkey::Pubkey};

View File

@ -1,6 +1,6 @@
[package]
name = "quic-geyser-plugin"
version = "0.1.0"
version = "0.1.3"
edition = "2021"
authors = ["Godmode Galactus"]
@ -24,6 +24,7 @@ log = { workspace = true }
thiserror = {workspace = true}
quic-geyser-common = { workspace = true }
quic-geyser-server = { workspace = true }
[build-dependencies]
anyhow = { workspace = true }

View File

@ -5,13 +5,13 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
plugin_error::QuicGeyserError,
quic::quic_server::QuicServer,
types::{
block_meta::BlockMeta,
slot_identifier::SlotIdentifier,
transaction::{Transaction, TransactionMeta},
},
};
use quic_geyser_server::quic_server::QuicServer;
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message,
pubkey::Pubkey,

View File

@ -1,6 +1,6 @@
[package]
name = "quic-geyser-plugin-proxy"
version = "0.1.0"
version = "0.1.3"
edition = "2021"
authors = ["Godmode Galactus"]
@ -17,6 +17,8 @@ anyhow = { workspace = true }
log = { workspace = true }
bincode = { workspace = true }
tracing-subscriber = { workspace = true }
tokio = { workspace = true }
quic-geyser-client = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-server = { workspace = true }

View File

@ -1,7 +1,7 @@
use clap::Parser;
use quic_geyser_common::{
config::{DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY},
quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS},
use quic_geyser_common::defaults::{
DEFAULT_ACK_EXPONENT, DEFAULT_CONNECTION_TIMEOUT, DEFAULT_MAX_ACK_DELAY,
DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS,
};
#[derive(Parser, Debug, Clone)]

View File

@ -2,25 +2,26 @@ use std::{net::SocketAddr, str::FromStr};
use clap::Parser;
use cli::Args;
use quic_geyser_client::blocking::client::Client;
use quic_geyser_client::non_blocking::client::Client;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
filters::Filter,
quic::quic_server::QuicServer,
types::{
block_meta::BlockMeta, connections_parameters::ConnectionParameters,
transaction::Transaction,
},
};
use quic_geyser_server::quic_server::QuicServer;
pub mod cli;
pub fn main() -> anyhow::Result<()> {
#[tokio::main()]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let (client, message_channel) = Client::new(
let (client, mut message_channel) = Client::new(
args.source_url,
ConnectionParameters {
max_number_of_streams: args.max_number_of_streams_per_client,
@ -29,15 +30,18 @@ pub fn main() -> anyhow::Result<()> {
max_ack_delay: args.max_ack_delay,
ack_exponent: args.ack_exponent,
},
)?;
)
.await?;
log::info!("Subscribing");
client.subscribe(vec![
Filter::AccountsAll,
Filter::TransactionsAll,
Filter::Slot,
Filter::BlockMeta,
])?;
client
.subscribe(vec![
Filter::AccountsAll,
Filter::TransactionsAll,
Filter::Slot,
Filter::BlockMeta,
])
.await?;
let quic_config = ConfigQuicPlugin {
address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(),
@ -79,7 +83,7 @@ pub fn main() -> anyhow::Result<()> {
}
});
while let Ok(message) = message_channel.recv() {
while let Some(message) = message_channel.recv().await {
let channel_message = match message {
quic_geyser_common::message::Message::AccountMsg(account_message) => {
ChannelMessage::Account(

28
server/Cargo.toml Normal file
View File

@ -0,0 +1,28 @@
[package]
name = "quic-geyser-server"
version = "0.1.3"
edition = "2021"
[dependencies]
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
serde = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
thiserror = {workspace = true}
itertools = { workspace = true }
bincode = { workspace = true }
ring = {workspace = true}
quiche = { workspace = true, features = ["boringssl-boring-crate"] }
rcgen = { workspace = true }
boring = { workspace = true }
mio = { workspace = true }
mio_channel = { workspace = true }
quic-geyser-common = { workspace = true }
[dev-dependencies]
rand = { workspace = true }
tracing-subscriber = { workspace = true }

View File

@ -1,9 +1,4 @@
use crate::quic::configure_server::ALPN_GEYSER_PROTOCOL_ID;
use super::configure_server::MAX_DATAGRAM_SIZE;
pub const DEFAULT_MAX_STREAMS: u64 = 64 * 1024;
pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 1_000_000; // 64 MBs
use quic_geyser_common::defaults::{ALPN_GEYSER_PROTOCOL_ID, MAX_DATAGRAM_SIZE};
pub fn configure_client(
maximum_concurrent_streams: u64,

View File

@ -1,9 +1,9 @@
use boring::ssl::SslMethod;
use crate::config::QuicParameters;
pub const ALPN_GEYSER_PROTOCOL_ID: &[u8] = b"geyser";
pub const MAX_DATAGRAM_SIZE: usize = 1350; // MAX: 65527
use quic_geyser_common::{
config::QuicParameters,
defaults::{ALPN_GEYSER_PROTOCOL_ID, MAX_DATAGRAM_SIZE},
};
pub fn configure_server(quic_parameter: QuicParameters) -> anyhow::Result<quiche::Config> {
let max_concurrent_streams = quic_parameter.max_number_of_streams_per_client;

View File

@ -1,8 +1,8 @@
use std::{fmt::Debug, sync::mpsc};
use crate::{
use crate::configure_server::configure_server;
use quic_geyser_common::{
channel_message::ChannelMessage, config::ConfigQuicPlugin, plugin_error::QuicGeyserError,
quic::configure_server::configure_server,
};
use super::quiche_server_loop::server_loop;

View File

@ -4,15 +4,14 @@ use std::{
time::{Duration, Instant},
};
use quic_geyser_common::{defaults::MAX_DATAGRAM_SIZE, message::Message};
use crate::{
message::Message,
quic::{
configure_server::MAX_DATAGRAM_SIZE,
quiche_reciever::{recv_message, ReadStreams},
quiche_sender::{handle_writable, send_message},
quiche_utils::{get_next_unidi, PartialResponses},
},
quiche_reciever::{recv_message, ReadStreams},
quiche_sender::{handle_writable, send_message},
quiche_utils::{get_next_unidi, PartialResponses},
};
use anyhow::bail;
use ring::rand::{SecureRandom, SystemRandom};
@ -278,19 +277,20 @@ mod tests {
use itertools::Itertools;
use solana_sdk::{account::Account, pubkey::Pubkey};
use crate::{
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
compression::CompressionType,
config::QuicParameters,
filters::Filter,
message::Message,
quic::{
configure_client::configure_client, configure_server::configure_server,
quiche_server_loop::server_loop,
},
types::block_meta::SlotMeta,
};
use crate::{
configure_client::configure_client, configure_server::configure_server,
quiche_server_loop::server_loop,
};
use super::client_loop;
#[test]

View File

@ -2,9 +2,7 @@ use std::collections::HashMap;
use anyhow::bail;
use crate::message::Message;
use super::configure_server::MAX_DATAGRAM_SIZE;
use quic_geyser_common::{defaults::MAX_DATAGRAM_SIZE, message::Message};
pub fn convert_binary_to_message(bytes: Vec<u8>) -> anyhow::Result<Message> {
Ok(bincode::deserialize::<Message>(&bytes)?)

View File

@ -1,5 +1,5 @@
use super::quiche_utils::PartialResponses;
use crate::{message::Message, quic::quiche_utils::PartialResponse};
use crate::quiche_utils::{PartialResponse, PartialResponses};
use quic_geyser_common::message::Message;
use quiche::Connection;
pub fn convert_to_binary(message: &Message) -> anyhow::Result<Vec<u8>> {

View File

@ -13,21 +13,22 @@ use itertools::Itertools;
use quiche::ConnectionId;
use ring::rand::SystemRandom;
use crate::{
use quic_geyser_common::{
channel_message::ChannelMessage,
compression::CompressionType,
defaults::MAX_DATAGRAM_SIZE,
filters::Filter,
message::Message,
quic::{
quiche_reciever::recv_message,
quiche_sender::{handle_writable, send_message},
quiche_utils::{get_next_unidi, mint_token, validate_token},
},
types::{account::Account, block_meta::SlotMeta, slot_identifier::SlotIdentifier},
};
use crate::{
quiche_reciever::recv_message,
quiche_sender::{handle_writable, send_message},
quiche_utils::{get_next_unidi, mint_token, validate_token},
};
use super::{
configure_server::MAX_DATAGRAM_SIZE,
quiche_reciever::ReadStreams,
quiche_utils::{write_to_socket, PartialResponses},
};
@ -304,7 +305,7 @@ fn create_client_task(
std::thread::spawn(move || {
let mut partial_responses = PartialResponses::new();
let mut read_streams = ReadStreams::new();
let mut next_stream: u64 = 1;
let mut next_stream: u64 = 3;
let mut connection = connection;
let mut instance = Instant::now();
let mut closed = false;