Making quic server and client use quiche, adding client tests

This commit is contained in:
godmodegalactus 2024-05-22 14:51:21 +02:00
parent 55245dfe29
commit 660299f3fc
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
20 changed files with 305 additions and 725 deletions

36
Cargo.lock generated
View File

@ -348,28 +348,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-stream"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.63",
]
[[package]]
name = "async-trait"
version = "0.1.80"
@ -2458,12 +2436,14 @@ name = "quic-geyser-client"
version = "0.1.0"
dependencies = [
"anyhow",
"async-stream",
"futures",
"itertools",
"log",
"mio",
"mio_channel",
"quic-geyser-common",
"rand 0.8.5",
"solana-sdk",
"tokio",
"tracing-subscriber",
]
[[package]]
@ -2488,7 +2468,6 @@ dependencies = [
"solana-sdk",
"solana-transaction-status",
"thiserror",
"tokio",
"tracing-subscriber",
]
@ -2508,7 +2487,6 @@ dependencies = [
"solana-logger",
"solana-sdk",
"thiserror",
"tokio",
"vergen",
]
@ -2519,7 +2497,6 @@ dependencies = [
"anyhow",
"bincode",
"clap",
"futures",
"log",
"quic-geyser-client",
"quic-geyser-common",
@ -2528,7 +2505,6 @@ dependencies = [
"serde_json",
"solana-rpc-client",
"solana-sdk",
"tokio",
]
[[package]]
@ -2538,7 +2514,6 @@ dependencies = [
"anyhow",
"bincode",
"clap",
"futures",
"itertools",
"log",
"quic-geyser-client",
@ -2548,7 +2523,6 @@ dependencies = [
"serde",
"serde_json",
"solana-sdk",
"tokio",
]
[[package]]

View File

@ -17,7 +17,6 @@ license = "AGPL"
edition = "2021"
[workspace.dependencies]
tokio = "1.37.0"
solana-sdk = "=1.17.31"
agave-geyser-plugin-interface = "=1.17.31"
solana-transaction-status = "=1.17.31"
@ -31,7 +30,6 @@ bincode = "=1.3.3"
bs58 = "0.4.0"
base64 = "0.21.0"
thiserror = "1.0.40"
futures = "0.3.28"
bytes = "1.4.0"
anyhow = "1.0.70"
log = "0.4.17"
@ -44,7 +42,6 @@ rustls = "=0.21.7"
rcgen = "0.10.0"
pkcs8 = "0.8.0"
lz4 = "1.24.0"
async-stream = "0.3.5"
mio = "0.8.11"
mio_channel = "0.1.3"

View File

@ -9,9 +9,13 @@ edition = "2021"
solana-sdk = { workspace = true }
anyhow = { workspace = true }
futures = { workspace = true }
async-stream = { workspace = true }
tokio = { workspace = true }
log = { workspace = true }
mio = { workspace = true }
mio_channel = { workspace = true }
quic-geyser-common = { workspace = true }
quic-geyser-common = { workspace = true }
[dev-dependencies]
rand = { workspace = true }
tracing-subscriber = { workspace = true }
itertools = { workspace = true }

View File

@ -1,172 +1,179 @@
use std::{net::SocketAddr, str::FromStr};
use async_stream::stream;
use futures::Stream;
use quic_geyser_common::filters::Filter;
use quic_geyser_common::message::Message;
use quic_geyser_common::quic::configure_client::configure_client;
use quic_geyser_common::quic::quiche_reciever::recv_message;
use quic_geyser_common::quic::quiche_sender::send_message;
use quic_geyser_common::{filters::Filter, types::connections_parameters::ConnectionParameters};
//use quinn::{Connection, ConnectionError};
use quic_geyser_common::quic::quiche_client_loop::client_loop;
use quic_geyser_common::types::connections_parameters::ConnectionParameters;
use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
pub struct Client {
pub address: String,
is_connected: Arc<AtomicBool>,
filters_sender: mio_channel::Sender<Message>,
}
impl Client {
pub async fn new(
pub fn new(
server_address: String,
connection_parameters: ConnectionParameters,
) -> anyhow::Result<Client> {
// let endpoint = configure_client(connection_parameters.max_number_of_streams).await?;
// let socket_addr = SocketAddr::from_str(&server_address)?;
// let connecting = endpoint.connect(socket_addr, "quic_geyser_client")?;
// let connection = connecting.await?;
// let send_stream = connection.open_uni().await?;
// send_message(
// send_stream,
// &Message::ConnectionParameters(connection_parameters),
// )
// .await?;
) -> anyhow::Result<(Client, std::sync::mpsc::Receiver<Message>)> {
let config = configure_client(
connection_parameters.max_number_of_streams,
connection_parameters.recieve_window_size,
connection_parameters.timeout_in_seconds,
)?;
let server_address: SocketAddr = server_address.parse()?;
let socket_addr: SocketAddr = "0.0.0.0:0"
.parse()
.expect("Socket address should be returned");
let is_connected = Arc::new(AtomicBool::new(false));
let (filters_sender, rx_sent_queue) = mio_channel::channel();
let (sx_recv_queue, client_rx_queue) = std::sync::mpsc::channel();
Ok(Client {
address: server_address,
})
let is_connected_client = is_connected.clone();
let _client_loop_jh = std::thread::spawn(move || {
if let Err(e) = client_loop(
config,
socket_addr,
server_address,
rx_sent_queue,
sx_recv_queue,
is_connected_client.clone(),
) {
log::error!("client stopped with error {e}");
}
is_connected_client.store(false, std::sync::atomic::Ordering::Relaxed);
});
Ok((
Client {
is_connected,
filters_sender,
},
client_rx_queue,
))
}
pub async fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
// let send_stream = self.connection.open_uni().await?;
// send_message(send_stream, &Message::Filters(filters)).await?;
pub fn subscribe(&self, filters: Vec<Filter>) -> anyhow::Result<()> {
let message = Message::Filters(filters);
self.filters_sender.send(message)?;
Ok(())
}
pub fn create_stream(&self) -> impl Stream<Item = Message> {
//let connection = self.connection.clone();
let (sender, mut reciever) = tokio::sync::mpsc::unbounded_channel::<Message>();
// tokio::spawn(async move {
// loop {
// let stream = connection.accept_uni().await;
// match stream {
// Ok(recv_stream) => {
// let sender = sender.clone();
// tokio::spawn(async move {
// let message = recv_message(recv_stream, 10).await;
// match message {
// Ok(message) => {
// let _ = sender.send(message);
// }
// Err(e) => {
// log::error!("Error getting message {}", e);
// }
// }
// });
// }
// Err(e) => match &e {
// ConnectionError::ConnectionClosed(_)
// | ConnectionError::ApplicationClosed(_)
// | ConnectionError::LocallyClosed => {
// break;
// }
// _ => {
// log::error!("Got {} while listing to the connection", e);
// }
// },
// }
// }
// });
stream! {
while let Some(message) = reciever.recv().await {
yield message;
}
}
pub fn is_connected(&self) -> bool {
self.is_connected.load(std::sync::atomic::Ordering::Relaxed)
}
}
// #[cfg(test)]
// mod tests {
// use std::{net::UdpSocket, sync::Arc};
#[cfg(test)]
mod tests {
use itertools::Itertools;
use quic_geyser_common::{
channel_message::AccountData,
compression::CompressionType,
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
filters::Filter,
message::Message,
quic::quic_server::QuicServer,
types::{
account::Account, connections_parameters::ConnectionParameters,
slot_identifier::SlotIdentifier,
},
};
use solana_sdk::pubkey::Pubkey;
use std::{net::SocketAddr, thread::sleep, time::Duration};
// use futures::StreamExt;
// use quic_geyser_common::{
// filters::Filter,
// message::Message,
// quic::{configure_server::configure_server, connection_manager::ConnectionManager},
// types::{account::Account, connections_parameters::ConnectionParameters},
// };
// use tokio::{pin, sync::Notify};
pub fn get_account_for_test(slot: u64, data_size: usize) -> Account {
Account {
slot_identifier: SlotIdentifier { slot },
pubkey: Pubkey::new_unique(),
owner: Pubkey::new_unique(),
write_version: 0,
lamports: 12345,
rent_epoch: u64::MAX,
executable: false,
data: (0..data_size).map(|_| rand::random::<u8>()).collect_vec(),
compression_type: CompressionType::None,
data_length: data_size as u64,
}
}
// use crate::client::Client;
use crate::client::Client;
// #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// pub async fn test_client() {
// let config = configure_server(1, 100000, 1).unwrap();
#[test]
pub fn test_client() {
tracing_subscriber::fmt::init();
let server_sock: SocketAddr = "0.0.0.0:20000".parse().unwrap();
let url = format!("127.0.0.1:{}", server_sock.port());
// let sock = UdpSocket::bind("0.0.0.0:0").unwrap();
// let port = sock.local_addr().unwrap().port();
// let url = format!("127.0.0.1:{}", port);
// let notify_server_start = Arc::new(Notify::new());
// let notify_subscription = Arc::new(Notify::new());
let msg_acc_1 = Message::AccountMsg(get_account_for_test(0, 2));
let msg_acc_2 = Message::AccountMsg(get_account_for_test(1, 20));
let msg_acc_3 = Message::AccountMsg(get_account_for_test(2, 100));
let msg_acc_4 = Message::AccountMsg(get_account_for_test(3, 1_000));
let msg_acc_5 = Message::AccountMsg(get_account_for_test(4, 10_000));
let msgs = [msg_acc_1, msg_acc_2, msg_acc_3, msg_acc_4, msg_acc_5];
// let msg_acc_1 = Message::AccountMsg(Account::get_account_for_test(0, 2));
// let msg_acc_2 = Message::AccountMsg(Account::get_account_for_test(1, 20));
// let msg_acc_3 = Message::AccountMsg(Account::get_account_for_test(2, 100));
// let msg_acc_4 = Message::AccountMsg(Account::get_account_for_test(3, 1000));
// let msg_acc_5 = Message::AccountMsg(Account::get_account_for_test(4, 10000));
// let msgs = [msg_acc_1, msg_acc_2, msg_acc_3, msg_acc_4, msg_acc_5];
let jh = {
let msgs = msgs.clone();
let server_sock = server_sock.clone();
std::thread::spawn(move || {
let config = ConfigQuicPlugin {
address: server_sock,
quic_parameters: QuicParameters {
max_number_of_streams_per_client: 10,
recieve_window_size: 1_000_000,
connection_timeout: 10,
},
compression_parameters: CompressionParameters {
compression_type: CompressionType::None,
},
number_of_retries: 100,
};
let quic_server = QuicServer::new(config).unwrap();
// wait for client to connect and subscribe
sleep(Duration::from_secs(2));
for msg in msgs {
let Message::AccountMsg(account) = msg else {
panic!("should never happen");
};
quic_server
.send_message(
quic_geyser_common::channel_message::ChannelMessage::Account(
AccountData {
pubkey: account.pubkey,
account: account.solana_account(),
write_version: account.write_version,
},
account.slot_identifier.slot,
false,
),
)
.unwrap();
}
sleep(Duration::from_secs(1));
})
};
// wait for server to start
sleep(Duration::from_secs(1));
// {
// let msgs = msgs.clone();
// let notify_server_start = notify_server_start.clone();
// let notify_subscription = notify_subscription.clone();
// tokio::spawn(async move {
// let endpoint = Endpoint::new(
// EndpointConfig::default(),
// Some(config),
// sock,
// Arc::new(TokioRuntime),
// )
// .unwrap();
// server started
let (client, reciever) = Client::new(
url,
ConnectionParameters {
max_number_of_streams: 3,
recieve_window_size: 1_000_000,
timeout_in_seconds: 10,
},
)
.unwrap();
client.subscribe(vec![Filter::AccountsAll]).unwrap();
// let (connection_manager, _jh) = ConnectionManager::new(endpoint, 10);
// notify_server_start.notify_one();
// notify_subscription.notified().await;
// for msg in msgs {
// connection_manager.dispatch(msg, 10).await;
// }
// });
// }
// notify_server_start.notified().await;
// // server started
// let client = Client::new(
// url,
// ConnectionParameters {
// max_number_of_streams: 3,
// streams_for_slot_data: 1,
// streams_for_transactions: 1,
// },
// )
// .await
// .unwrap();
// client.subscribe(vec![Filter::AccountsAll]).await.unwrap();
// notify_subscription.notify_one();
// let stream = client.create_stream();
// pin!(stream);
// for _ in 0..5 {
// let msg = stream.next().await.unwrap();
// match &msg {
// Message::AccountMsg(account) => {
// let index = account.slot_identifier.slot as usize;
// let sent_message = &msgs[index];
// assert_eq!(*sent_message, msg);
// }
// _ => {
// panic!("should only get account messages")
// }
// }
// }
// }
// }
let mut cnt = 0;
for message_sent in msgs {
let msg = reciever.recv().unwrap();
log::info!("got message : {}", cnt);
cnt += 1;
assert_eq!(message_sent, msg);
}
jh.join().unwrap();
}
}

View File

@ -16,7 +16,6 @@ rustls = { workspace = true, default-features = false }
rcgen = { workspace = true }
pkcs8 = { workspace = true }
anyhow = { workspace = true }
tokio = { workspace = true }
log = { workspace = true }
thiserror = {workspace = true}
itertools = { workspace = true }

View File

@ -2,10 +2,12 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use serde::{Deserialize, Serialize};
use crate::{compression::CompressionType, quic::configure_client::DEFAULT_MAX_STREAMS};
use crate::{
compression::CompressionType,
quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS},
};
pub const DEFAULT_WINDOW_SIZE: u32 = 1_000_000;
pub const DEFAULT_CONNECTION_TIMEOUT: u32 = 10;
pub const DEFAULT_CONNECTION_TIMEOUT: u64 = 10;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
@ -34,16 +36,16 @@ impl ConfigQuicPlugin {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QuicParameters {
pub max_number_of_streams_per_client: u32,
pub recieve_window_size: u32,
pub connection_timeout: u32,
pub recieve_window_size: u64,
pub connection_timeout: u64,
}
impl Default for QuicParameters {
fn default() -> Self {
Self {
max_number_of_streams_per_client: DEFAULT_MAX_STREAMS,
recieve_window_size: DEFAULT_WINDOW_SIZE, // 1 Mb
connection_timeout: DEFAULT_CONNECTION_TIMEOUT, // 10s
recieve_window_size: DEFAULT_MAX_RECIEVE_WINDOW_SIZE, // 1 Mb
connection_timeout: DEFAULT_CONNECTION_TIMEOUT, // 10s
}
}
}

View File

@ -10,7 +10,7 @@ use crate::{
};
// current maximum message size
const MAX_MESSAGE_SIZE: u64 = 20_000_000;
pub const MAX_MESSAGE_SIZE: u64 = 20_000_000;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[repr(C)]
@ -20,5 +20,4 @@ pub enum Message {
BlockMetaMsg(BlockMeta),
TransactionMsg(Box<Transaction>),
Filters(Vec<Filter>), // sent from client to server
AddStream(u64),
}

View File

@ -3,10 +3,7 @@ use crate::quic::configure_server::ALPN_GEYSER_PROTOCOL_ID;
use super::configure_server::MAX_DATAGRAM_SIZE;
pub const DEFAULT_MAX_STREAMS: u32 = 4096;
pub const DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS: u32 = 4;
pub const DEFAULT_MAX_TRANSACTION_STREAMS: u32 = 32;
pub const DEFAULT_MAX_ACCOUNT_STREAMS: u32 =
DEFAULT_MAX_STREAMS - DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS - DEFAULT_MAX_TRANSACTION_STREAMS;
pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 64_000_000; // 64 MBs
pub fn configure_client(
maximum_concurrent_streams: u32,

View File

@ -1,286 +0,0 @@
// use std::fmt::Debug;
// use std::sync::atomic::AtomicU64;
// use std::sync::Arc;
// use std::{collections::VecDeque, time::Duration};
// use tokio::sync::Semaphore;
// use tokio::{sync::RwLock, task::JoinHandle, time::Instant};
// use crate::types::connections_parameters::ConnectionParameters;
// use crate::{filters::Filter, message::Message};
// use super::{quiche_reciever::recv_message, quiche_sender::send_message};
// pub struct ConnectionData {
// pub id: u64,
// pub connection: quiche::Connection,
// pub filters: Vec<Filter>,
// pub since: Instant,
// stream_semaphore_for_accounts: Arc<Semaphore>,
// stream_semaphore_for_slot_data: Arc<Semaphore>,
// stream_semaphore_for_transactions: Arc<Semaphore>,
// lagging_count: Arc<AtomicU64>,
// max_lagging_stream: u64,
// }
// impl ConnectionData {
// pub fn new(
// id: u64,
// connection: quiche::Connection,
// connections_parameters: ConnectionParameters,
// max_lagging_stream: u64,
// ) -> Self {
// let accounts_streams_count = connections_parameters
// .max_number_of_streams
// .saturating_sub(connections_parameters.streams_for_slot_data)
// .saturating_sub(connections_parameters.streams_for_transactions);
// Self {
// id,
// connection,
// filters: vec![],
// since: Instant::now(),
// stream_semaphore_for_accounts: Arc::new(Semaphore::new(
// accounts_streams_count as usize,
// )),
// stream_semaphore_for_slot_data: Arc::new(Semaphore::new(
// connections_parameters.streams_for_slot_data as usize,
// )),
// stream_semaphore_for_transactions: Arc::new(Semaphore::new(
// connections_parameters.streams_for_transactions as usize,
// )),
// lagging_count: Arc::new(AtomicU64::new(0)),
// max_lagging_stream,
// }
// }
// }
// /*
// This class will take care of adding connections and filters etc
// */
// #[derive(Clone)]
// pub struct ConnectionManager {
// connections: Arc<RwLock<VecDeque<ConnectionData>>>,
// }
// impl Debug for ConnectionManager {
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// f.debug_struct("ConnectionManager").finish()
// }
// }
// impl ConnectionManager {
// pub fn new(endpoint: Endpoint, max_lagging_stream: u64) -> (Self, JoinHandle<()>) {
// let connections: Arc<RwLock<VecDeque<ConnectionData>>> =
// Arc::new(RwLock::new(VecDeque::new()));
// // create a task to add incoming connections
// let connection_adder_jh = {
// let connections = connections.clone();
// tokio::spawn(async move {
// let mut id: u64 = 0;
// loop {
// // accept incoming connections
// if let Some(connecting) = endpoint.accept().await {
// let connections = connections.clone();
// id += 1;
// let current_id = id;
// tokio::spawn(async move {
// let connection_result =
// tokio::time::timeout(Duration::from_secs(10), connecting).await;
// match connection_result {
// Ok(Ok(connection)) => {
// let parameter_stream = tokio::time::timeout(
// Duration::from_secs(10),
// connection.accept_uni(),
// )
// .await;
// match parameter_stream {
// Ok(Ok(recv_stream)) => {
// let message = recv_message(recv_stream, 10).await;
// if let Ok(Message::ConnectionParameters(
// connections_parameters,
// )) = message
// {
// Self::manage_new_connection(
// connections,
// connection,
// current_id,
// connections_parameters,
// max_lagging_stream,
// )
// .await;
// }
// }
// Ok(Err(e)) => {
// log::error!(
// "connection params unistream errored {}",
// e
// );
// }
// Err(_timeout) => {
// log::error!("connection params unistream timeout")
// }
// }
// }
// Ok(Err(e)) => log::error!("Error connecting {}", e),
// Err(_elapsed) => log::error!("Connection timeout"),
// }
// });
// }
// }
// })
// };
// (Self { connections }, connection_adder_jh)
// }
// async fn manage_new_connection(
// connections: Arc<RwLock<VecDeque<ConnectionData>>>,
// connection: quiche::Connection,
// current_id: u64,
// connections_parameters: ConnectionParameters,
// max_lagging_stream: u64,
// ) {
// // connection established
// // add the connection in the connections list
// let mut lk = connections.write().await;
// log::info!("New connection id: {}", current_id);
// lk.push_back(ConnectionData::new(
// current_id,
// connection.clone(),
// connections_parameters,
// max_lagging_stream,
// ));
// drop(lk);
// let connections_tmp = connections.clone();
// // task to add filters
// let connection_to_listen = connection.clone();
// tokio::spawn(async move {
// loop {
// if let Ok(recv_stream) = connection_to_listen.accept_uni().await {
// log::info!("new unistream connection to update filters");
// match tokio::time::timeout(
// Duration::from_secs(10),
// recv_message(recv_stream, 10),
// )
// .await
// {
// Ok(Ok(filters)) => {
// let Message::Filters(mut filters) = filters else {
// continue;
// };
// let mut lk = connections_tmp.write().await;
// let connection_data = lk.iter_mut().find(|x| x.id == current_id);
// if let Some(connection_data) = connection_data {
// connection_data.filters.append(&mut filters);
// }
// }
// Ok(Err(e)) => {
// log::error!("error getting message from the client : {}", e);
// }
// Err(_timeout) => {
// log::warn!("Client request timeout");
// }
// }
// }
// }
// });
// let connections = connections.clone();
// // create a connection removing task
// tokio::spawn(async move {
// // if connection is closed remove it
// let closed_error = connection.closed().await;
// log::info!(
// "connection closed with id {} error {}",
// closed_error,
// current_id
// );
// let mut lk = connections.write().await;
// lk.retain(|x| x.id != current_id);
// });
// }
// pub async fn dispatch(&self, message: Message, retry_count: u64) {
// let lk = self.connections.read().await;
// for connection_data in lk.iter() {
// if connection_data.filters.iter().any(|x| x.allows(&message)) {
// let connection = connection_data.connection.clone();
// let message = message.clone();
// let (message_type, semaphore) = match &message {
// Message::SlotMsg(_) => (
// "slot",
// connection_data.stream_semaphore_for_slot_data.clone(),
// ),
// Message::BlockMetaMsg(_) => (
// "block meta",
// connection_data.stream_semaphore_for_slot_data.clone(),
// ),
// Message::TransactionMsg(_) => (
// "transactions",
// connection_data.stream_semaphore_for_transactions.clone(),
// ),
// _ => (
// "accounts",
// connection_data.stream_semaphore_for_accounts.clone(),
// ),
// };
// let id = connection_data.id;
// let lagging_count = connection_data.lagging_count.clone();
// let max_lagging_stream = connection_data.max_lagging_stream;
// tokio::spawn(async move {
// let permit_result = semaphore.clone().try_acquire_owned();
// let _permit = match permit_result {
// Ok(permit) => permit,
// Err(_) => {
// // all permits are taken wait log warning and wait for permit
// log::warn!(
// "Stream {} seems to be lagging for {} message type",
// id,
// message_type
// );
// let lc =
// lagging_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// if lc > max_lagging_stream {
// connection.close(VarInt::from_u32(0), b"laggy client");
// return;
// }
// let p = semaphore.acquire_owned().await.expect("Permit is aquired");
// lagging_count.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
// p
// }
// };
// for _ in 0..retry_count {
// let send_stream = connection.open_uni().await;
// match send_stream {
// Ok(send_stream) => match send_message(send_stream, &message).await {
// Ok(_) => {
// log::debug!("Message sucessfully sent");
// break;
// }
// Err(e) => {
// log::error!(
// "error dispatching message and sending data : {}",
// e
// )
// }
// },
// Err(e) => {
// log::error!(
// "error dispatching message while creating stream : {}",
// e
// );
// break;
// }
// }
// }
// });
// }
// }
// }
// }

View File

@ -1,6 +1,5 @@
pub mod configure_client;
pub mod configure_server;
pub mod connection_manager;
pub mod quic_server;
pub mod quiche_client_loop;
pub mod quiche_reciever;

View File

@ -1,138 +1,47 @@
// use std::{net::UdpSocket, sync::Arc};
use std::fmt::Debug;
// use crate::{
// compression::CompressionType,
// config::ConfigQuicPlugin,
// message::Message,
// plugin_error::QuicGeyserError,
// quic::{configure_server::configure_server, connection_manager::ConnectionManager},
// types::{
// account::Account as GeyserAccount,
// block_meta::{BlockMeta, SlotMeta},
// slot_identifier::SlotIdentifier,
// transaction::Transaction,
// },
// };
// use quinn::{Endpoint, EndpointConfig, TokioRuntime};
// use solana_sdk::{
// account::Account, clock::Slot, commitment_config::CommitmentLevel, pubkey::Pubkey,
// };
// use tokio::{runtime::Runtime, sync::mpsc::UnboundedSender};
use crate::{
channel_message::ChannelMessage, config::ConfigQuicPlugin, plugin_error::QuicGeyserError,
quic::configure_server::configure_server,
};
// pub struct AccountData {
// pub pubkey: Pubkey,
// pub account: Account,
// pub write_version: u64,
// }
use super::quiche_server_loop::server_loop;
pub struct QuicServer {
data_channel_sender: mio_channel::Sender<ChannelMessage>,
}
// pub enum ChannelMessage {
// Account(AccountData, Slot, bool),
// Slot(u64, u64, CommitmentLevel),
// BlockMeta(BlockMeta),
// Transaction(Box<Transaction>),
// }
impl Debug for QuicServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QuicServer").finish()
}
}
// #[derive(Debug)]
// pub struct QuicServer {
// _runtime: Runtime,
// data_channel_sender: UnboundedSender<ChannelMessage>,
// }
impl QuicServer {
pub fn new(config: ConfigQuicPlugin) -> anyhow::Result<Self> {
let server_config = configure_server(
config.quic_parameters.max_number_of_streams_per_client,
config.quic_parameters.recieve_window_size,
config.quic_parameters.connection_timeout,
)?;
let socket = config.address;
let compression_type = config.compression_parameters.compression_type;
// impl QuicServer {
// pub fn new(
// runtime: Runtime,
// config: ConfigQuicPlugin,
// max_lagging: u64,
// ) -> anyhow::Result<Self> {
// let server_config = configure_server(
// config.quic_parameters.max_number_of_streams_per_client,
// config.quic_parameters.recieve_window_size,
// config.quic_parameters.connection_timeout as u64,
// )?;
// let socket = UdpSocket::bind(config.address)?;
// let compression_type = config.compression_parameters.compression_type;
let (data_channel_sender, data_channel_tx) = mio_channel::channel();
// let (data_channel_sender, mut data_channel_tx) = tokio::sync::mpsc::unbounded_channel();
let _server_loop_jh = std::thread::spawn(move || {
if let Err(e) = server_loop(server_config, socket, data_channel_tx, compression_type) {
panic!("Server loop closed by error : {e}");
}
});
// {
// runtime.spawn(async move {
// let endpoint = Endpoint::new(
// EndpointConfig::default(),
// Some(server_config),
// socket,
// Arc::new(TokioRuntime),
// )
// .unwrap();
// let retry_count = config.number_of_retries;
Ok(QuicServer {
data_channel_sender,
})
}
// let (quic_connection_manager, _jh) = ConnectionManager::new(endpoint, max_lagging);
// log::info!("Connection manager sucessfully started");
// while let Some(channel_message) = data_channel_tx.recv().await {
// match channel_message {
// ChannelMessage::Account(account, slot, is_startup) => {
// // avoid sending messages at startup
// if !is_startup {
// process_account_message(
// quic_connection_manager.clone(),
// account,
// slot,
// compression_type,
// retry_count,
// )
// .await;
// }
// }
// ChannelMessage::Slot(slot, parent, commitment_level) => {
// let message = Message::SlotMsg(SlotMeta {
// slot,
// parent,
// commitment_level,
// });
// quic_connection_manager.dispatch(message, retry_count).await;
// }
// ChannelMessage::BlockMeta(block_meta) => {
// let message = Message::BlockMetaMsg(block_meta);
// quic_connection_manager.dispatch(message, retry_count).await;
// }
// ChannelMessage::Transaction(transaction) => {
// let message = Message::TransactionMsg(transaction);
// quic_connection_manager.dispatch(message, retry_count).await;
// }
// }
// }
// log::error!("quic server dispatch task stopped");
// });
// }
// Ok(QuicServer {
// data_channel_sender,
// _runtime: runtime,
// })
// }
// pub fn send_message(&self, message: ChannelMessage) -> Result<(), QuicGeyserError> {
// self.data_channel_sender
// .send(message)
// .map_err(|_| QuicGeyserError::MessageChannelClosed)
// }
// }
// async fn process_account_message(
// quic_connection_manager: ConnectionManager,
// account: AccountData,
// slot: Slot,
// compression_type: CompressionType,
// retry_count: u64,
// ) {
// let slot_identifier = SlotIdentifier { slot };
// let geyser_account = GeyserAccount::new(
// account.pubkey,
// account.account,
// compression_type,
// slot_identifier,
// account.write_version,
// );
// let message = Message::AccountMsg(geyser_account);
// quic_connection_manager.dispatch(message, retry_count).await;
// }
pub fn send_message(&self, message: ChannelMessage) -> Result<(), QuicGeyserError> {
self.data_channel_sender
.send(message)
.map_err(|_| QuicGeyserError::MessageChannelClosed)
}
}

View File

@ -1,4 +1,7 @@
use std::net::SocketAddr;
use std::{
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},
};
use crate::{
message::Message,
@ -18,6 +21,7 @@ pub fn client_loop(
server_address: SocketAddr,
mut message_send_queue: mio_channel::Receiver<Message>,
message_recv_queue: std::sync::mpsc::Sender<Message>,
is_connected: Arc<AtomicBool>,
) -> anyhow::Result<()> {
let mut socket = mio::net::UdpSocket::bind(socket_addr)?;
let mut poll = mio::Poll::new()?;
@ -47,7 +51,7 @@ pub fn client_loop(
let mut out = [0; MAX_DATAGRAM_SIZE];
let (write, send_info) = conn.send(&mut out).expect("initial send failed");
while let Err(e) = socket.send_to(&out[..write], send_info.to) {
if let Err(e) = socket.send_to(&out[..write], send_info.to) {
bail!("send() failed: {:?}", e);
}
@ -56,6 +60,7 @@ pub fn client_loop(
let mut out = [0; MAX_DATAGRAM_SIZE];
let mut partial_responses = PartialResponses::new();
let mut read_streams = ReadStreams::new();
let mut connected = false;
'client: loop {
poll.poll(&mut events, conn.timeout()).unwrap();
@ -134,6 +139,11 @@ pub fn client_loop(
}
}
if !connected && conn.is_established() {
is_connected.store(true, std::sync::atomic::Ordering::Relaxed);
connected = true;
}
// chanel updates
if channel_updates && conn.is_established() {
// channel events
@ -190,7 +200,7 @@ mod tests {
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
str::FromStr,
sync::mpsc,
sync::{atomic::AtomicBool, mpsc, Arc},
thread::sleep,
time::Duration,
};
@ -311,12 +321,14 @@ mod tests {
let _client_loop_jh = std::thread::spawn(move || {
let client_config = configure_client(100, 20_000_000, 1).unwrap();
let socket_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
let is_connected = Arc::new(AtomicBool::new(false));
if let Err(e) = client_loop(
client_config,
socket_addr,
server_addr,
rx_sent_queue,
sx_recv_queue,
is_connected,
) {
println!("client stopped with error {e}");
}

View File

@ -15,7 +15,7 @@ pub fn send_message(
stream_id: u64,
message: &Vec<u8>,
) -> anyhow::Result<()> {
let written = match connection.stream_send(stream_id, &message, true) {
let written = match connection.stream_send(stream_id, message, true) {
Ok(v) => v,
Err(quiche::Error::Done) => 0,

View File

@ -235,19 +235,14 @@ pub fn server_loop(
let message =
recv_message(&mut client.conn, &mut client.read_streams, stream);
match message {
Ok(Some(message)) => {
match message {
Message::Filters(mut filters) => {
client.filters.append(&mut filters);
}
Message::AddStream(_) => {
// do nothing
}
_ => {
log::error!("unknown message from the client");
}
Ok(Some(message)) => match message {
Message::Filters(mut filters) => {
client.filters.append(&mut filters);
}
}
_ => {
log::error!("unknown message from the client");
}
},
Ok(None) => {}
Err(e) => {
log::error!("Error recieving message : {e}")
@ -267,8 +262,8 @@ pub fn server_loop(
})
.map(|x| x.1)
.collect_vec();
if dispatch_to.len() > 0 {
let message = match message {
if !dispatch_to.is_empty() {
let (message, priority) = match message {
ChannelMessage::Account(account, slot, _) => {
let slot_identifier = SlotIdentifier { slot };
let geyser_account = Account::new(
@ -279,26 +274,39 @@ pub fn server_loop(
account.write_version,
);
Message::AccountMsg(geyser_account)
(Message::AccountMsg(geyser_account), 4)
}
ChannelMessage::Slot(slot, parent, commitment_level) => {
ChannelMessage::Slot(slot, parent, commitment_level) => (
Message::SlotMsg(SlotMeta {
slot,
parent,
commitment_level,
})
}
}),
1,
),
ChannelMessage::BlockMeta(block_meta) => {
Message::BlockMetaMsg(block_meta)
(Message::BlockMetaMsg(block_meta), 2)
}
ChannelMessage::Transaction(transaction) => {
Message::TransactionMsg(transaction)
(Message::TransactionMsg(transaction), 3)
}
};
let binary = bincode::serialize(&message)
.expect("Message should be serializable in binary");
for client in dispatch_to {
let stream_id = client.next_stream;
match client.conn.stream_priority(stream_id, priority, true) {
Ok(_) => {
log::trace!("priority was set correctly");
}
Err(e) => {
log::error!(
"Unable to set priority for the stream {}, error {}",
stream_id,
e
);
}
}
client.next_stream = get_next_unidi(stream_id, true);
log::debug!(
"dispatching {} on stream id : {}",

View File

@ -21,24 +21,6 @@ pub struct Account {
}
impl Account {
#[cfg(test)]
pub fn get_account_for_test(slot: u64, data_size: usize) -> Self {
use itertools::Itertools;
Account {
slot_identifier: SlotIdentifier { slot },
pubkey: Pubkey::new_unique(),
owner: Pubkey::new_unique(),
write_version: 0,
lamports: 12345,
rent_epoch: u64::MAX,
executable: false,
data: (0..data_size).map(|_| rand::random::<u8>()).collect_vec(),
compression_type: CompressionType::None,
data_length: data_size as u64,
}
}
pub fn new(
pubkey: Pubkey,
solana_account: SolanaAccount,
@ -48,7 +30,7 @@ impl Account {
) -> Self {
let data_length = solana_account.data.len() as u64;
let data = if solana_account.data.len() > 0 {
let data = if !solana_account.data.is_empty() {
match compression_type {
CompressionType::None => solana_account.data,
CompressionType::Lz4Fast(speed) => lz4::block::compress(

View File

@ -1,23 +1,21 @@
use serde::{Deserialize, Serialize};
use crate::quic::configure_client::{
DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS, DEFAULT_MAX_STREAMS, DEFAULT_MAX_TRANSACTION_STREAMS,
};
use crate::quic::configure_client::{DEFAULT_MAX_RECIEVE_WINDOW_SIZE, DEFAULT_MAX_STREAMS};
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[repr(C)]
pub struct ConnectionParameters {
pub max_number_of_streams: u32,
pub streams_for_slot_data: u32,
pub streams_for_transactions: u32,
pub recieve_window_size: u64,
pub timeout_in_seconds: u64,
}
impl Default for ConnectionParameters {
fn default() -> Self {
Self {
max_number_of_streams: DEFAULT_MAX_STREAMS,
streams_for_slot_data: DEFAULT_MAX_SLOT_BLOCKMETA_STREAMS,
streams_for_transactions: DEFAULT_MAX_TRANSACTION_STREAMS,
recieve_window_size: DEFAULT_MAX_RECIEVE_WINDOW_SIZE,
timeout_in_seconds: 10,
}
}
}

View File

@ -9,14 +9,12 @@ authors = ["Godmode Galactus"]
[dependencies]
solana-rpc-client = "~1.17.28"
tokio = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
serde = { workspace = true }
solana-sdk = { workspace = true }
serde_json = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
futures = { workspace = true }
bincode = { workspace = true }
quic-geyser-client = { workspace = true }

View File

@ -1,16 +1,15 @@
use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
thread::sleep,
time::{Duration, Instant},
};
use clap::Parser;
use cli::Args;
use futures::StreamExt;
use quic_geyser_client::client::Client;
use quic_geyser_common::{filters::Filter, types::connections_parameters::ConnectionParameters};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::{pin, time::Instant};
pub mod cli;
@ -36,13 +35,10 @@ pub mod cli;
// let config_json = json!(config);
//println!("{}", config_json);
#[tokio::main]
async fn main() {
pub fn main() {
let args = Args::parse();
println!("Connecting");
let client = Client::new(args.url, ConnectionParameters::default())
.await
.unwrap();
let (client, reciever) = Client::new(args.url, ConnectionParameters::default()).unwrap();
println!("Connected");
let bytes_transfered = Arc::new(AtomicU64::new(0));
@ -60,15 +56,12 @@ async fn main() {
if let Some(rpc_url) = args.rpc_url {
let cluster_slot = cluster_slot.clone();
let rpc = RpcClient::new(rpc_url);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
let slot = rpc
.get_slot_with_commitment(CommitmentConfig::processed())
.await
.unwrap();
cluster_slot.store(slot, std::sync::atomic::Ordering::Relaxed);
}
std::thread::spawn(move || loop {
sleep(Duration::from_millis(100));
let slot = rpc
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap();
cluster_slot.store(slot, std::sync::atomic::Ordering::Relaxed);
});
}
@ -84,36 +77,33 @@ async fn main() {
let account_slot = account_slot.clone();
let slot_slot = slot_slot.clone();
let blockmeta_slot = blockmeta_slot.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let bytes_transfered =
bytes_transfered.swap(0, std::sync::atomic::Ordering::Relaxed);
println!("------------------------------------------");
println!(" Bytes Transfered : {}", bytes_transfered);
println!(
" Accounts transfered size (uncompressed) : {}",
total_accounts_size.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Accounts Notified : {}",
account_notification.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Slots Notified : {}",
slot_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Blockmeta notified : {}",
blockmeta_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Transactions notified : {}",
transaction_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
std::thread::spawn(move || loop {
sleep(Duration::from_secs(1));
let bytes_transfered = bytes_transfered.swap(0, std::sync::atomic::Ordering::Relaxed);
println!("------------------------------------------");
println!(" Bytes Transfered : {}", bytes_transfered);
println!(
" Accounts transfered size (uncompressed) : {}",
total_accounts_size.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Accounts Notified : {}",
account_notification.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Slots Notified : {}",
slot_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Blockmeta notified : {}",
blockmeta_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(
" Transactions notified : {}",
transaction_notifications.swap(0, std::sync::atomic::Ordering::Relaxed)
);
println!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {} ", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed));
}
println!(" Cluster Slots: {}, Account Slot: {}, Slot Notification slot: {}, BlockMeta slot: {} ", cluster_slot.load(std::sync::atomic::Ordering::Relaxed), account_slot.load(std::sync::atomic::Ordering::Relaxed), slot_slot.load(std::sync::atomic::Ordering::Relaxed), blockmeta_slot.load(std::sync::atomic::Ordering::Relaxed));
});
}
@ -125,15 +115,12 @@ async fn main() {
Filter::Slot,
Filter::BlockMeta,
])
.await
.unwrap();
println!("Subscribed");
let stream = client.create_stream();
pin!(stream);
let instant = Instant::now();
while let Some(message) = stream.next().await {
while let Ok(message) = reciever.recv() {
let message_size = bincode::serialize(&message).unwrap().len();
bytes_transfered.fetch_add(message_size as u64, std::sync::atomic::Ordering::Relaxed);
match message {
@ -183,9 +170,6 @@ async fn main() {
quic_geyser_common::message::Message::Filters(_) => {
// Not supported
}
quic_geyser_common::message::Message::AddStream(_) => {
// Not supported
}
}
}
println!(

View File

@ -8,14 +8,12 @@ authors = ["Godmode Galactus"]
[dependencies]
tokio = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
serde = { workspace = true }
solana-sdk = { workspace = true }
serde_json = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
futures = { workspace = true }
bincode = { workspace = true }
itertools = {workspace = true}

View File

@ -13,7 +13,6 @@ name = "config-check"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { workspace = true }
clap = { workspace = true, features = ["derive"] }
serde = { workspace = true }
solana-sdk = { workspace = true }