diff --git a/client/src/client.rs b/client/src/client.rs index 1deda09..0d8f600 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -159,7 +159,7 @@ mod tests { let (client, reciever) = Client::new( url, ConnectionParameters { - max_number_of_streams: 3, + max_number_of_streams: 10, recieve_window_size: 1_000_000, timeout_in_seconds: 10, }, diff --git a/common/src/quic/configure_client.rs b/common/src/quic/configure_client.rs index 65e2f93..fc63032 100644 --- a/common/src/quic/configure_client.rs +++ b/common/src/quic/configure_client.rs @@ -2,7 +2,7 @@ use crate::quic::configure_server::ALPN_GEYSER_PROTOCOL_ID; use super::configure_server::MAX_DATAGRAM_SIZE; -pub const DEFAULT_MAX_STREAMS: u32 = 4096; +pub const DEFAULT_MAX_STREAMS: u32 = 32 * 1024; pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 64_000_000; // 64 MBs pub fn configure_client( diff --git a/common/src/quic/quic_server.rs b/common/src/quic/quic_server.rs index 8b68c59..17bf02a 100644 --- a/common/src/quic/quic_server.rs +++ b/common/src/quic/quic_server.rs @@ -29,7 +29,13 @@ impl QuicServer { let (data_channel_sender, data_channel_tx) = mio_channel::channel(); let _server_loop_jh = std::thread::spawn(move || { - if let Err(e) = server_loop(server_config, socket, data_channel_tx, compression_type) { + if let Err(e) = server_loop( + server_config, + socket, + data_channel_tx, + compression_type, + true, + ) { panic!("Server loop closed by error : {e}"); } }); diff --git a/common/src/quic/quiche_client_loop.rs b/common/src/quic/quiche_client_loop.rs index 388cd86..35f0535 100644 --- a/common/src/quic/quiche_client_loop.rs +++ b/common/src/quic/quiche_client_loop.rs @@ -308,6 +308,7 @@ mod tests { socket_addr, rx_sent_queue, CompressionType::Lz4Fast(8), + true, ) { println!("Server loop closed by error : {e}"); } diff --git a/common/src/quic/quiche_server_loop.rs b/common/src/quic/quiche_server_loop.rs index 2db8815..aba66e5 100644 --- a/common/src/quic/quiche_server_loop.rs +++ b/common/src/quic/quiche_server_loop.rs @@ -38,6 +38,7 @@ pub fn server_loop( socket_addr: SocketAddr, mut message_send_queue: mio_channel::Receiver, compression_type: CompressionType, + stop_laggy_client: bool, ) -> anyhow::Result<()> { let mut socket = UdpSocket::bind(socket_addr)?; @@ -320,6 +321,15 @@ pub fn server_loop( &binary, ) { log::error!("Error sending message : {e}"); + if stop_laggy_client { + log::info!( + "Stopping laggy client : {}", + client.conn.trace_id() + ); + if let Err(e) = client.conn.close(true, 1, b"laggy client") { + log::error!("error closing client : {}", e); + } + } } } } diff --git a/examples/tester-server/src/main.rs b/examples/tester-server/src/main.rs index e372b61..d98a25b 100644 --- a/examples/tester-server/src/main.rs +++ b/examples/tester-server/src/main.rs @@ -1,77 +1,64 @@ -pub fn main() -> anyhow::Result<()> { - Ok(()) +use std::{ + net::SocketAddr, + str::FromStr, + time::{Duration, Instant}, +}; + +use clap::Parser; +use cli::Args; +use itertools::Itertools; +use quic_geyser_common::{ + channel_message::{AccountData, ChannelMessage}, + config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, + quic::quic_server::QuicServer, +}; +use rand::{thread_rng, Rng}; +use solana_sdk::{account::Account, pubkey::Pubkey}; + +pub mod cli; + +pub fn main() { + let args = Args::parse(); + + let config = ConfigQuicPlugin { + address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(), + quic_parameters: QuicParameters::default(), + compression_parameters: CompressionParameters { + compression_type: quic_geyser_common::compression::CompressionType::None, + }, + number_of_retries: 100, + }; + let quic_server = QuicServer::new(config).unwrap(); + + let mut instant = Instant::now(); + // to avoid errors + std::thread::sleep(Duration::from_millis(500)); + + let mut slot = 1; + let mut write_version = 1; + let mut rand = thread_rng(); + let data = (0..args.account_data_size as usize) + .map(|_| rand.gen::()) + .collect_vec(); + loop { + std::thread::sleep(Duration::from_secs(1) - Instant::now().duration_since(instant)); + instant = Instant::now(); + slot += 1; + for _ in 0..args.accounts_per_second { + write_version += 1; + let account = AccountData { + pubkey: Pubkey::new_unique(), + account: Account { + lamports: rand.gen(), + data: data.clone(), + owner: Pubkey::new_unique(), + executable: false, + rent_epoch: u64::MAX, + }, + write_version, + }; + let channel_message = ChannelMessage::Account(account, slot, false); + quic_server.send_message(channel_message).unwrap(); + } + } } -// use std::{ -// net::SocketAddr, -// str::FromStr, -// time::{Duration, Instant}, -// }; - -// use clap::Parser; -// use cli::Args; -// use itertools::Itertools; -// use quic_geyser_common::{ -// config::{CompressionParameters, ConfigQuicPlugin, QuicParameters}, -// quic::quic_server::{AccountData, ChannelMessage, QuicServer}, -// }; -// use rand::{thread_rng, Rng}; -// use solana_sdk::{account::Account, pubkey::Pubkey}; -// use tokio::runtime::Builder; - -// pub mod cli; - -// pub fn main() -> anyhow::Result<()> { -// let args = Args::parse(); - -// let runtime = Builder::new_multi_thread() -// .thread_name_fn(|| "solGeyserQuic".to_string()) -// .enable_all() -// .build() -// .map_err(|error| { -// let s = error.to_string(); -// log::error!("Runtime Error : {}", s); -// error -// })?; - -// let config = ConfigQuicPlugin { -// address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(), -// quic_parameters: QuicParameters::default(), -// compression_parameters: CompressionParameters { -// compression_type: quic_geyser_common::compression::CompressionType::None, -// }, -// number_of_retries: 100, -// }; -// let quic_server = QuicServer::new(runtime, config, args.max_lagging).unwrap(); - -// let mut instant = Instant::now(); -// // to avoid errors -// std::thread::sleep(Duration::from_millis(500)); - -// let mut slot = 1; -// let mut write_version = 1; -// let mut rand = thread_rng(); -// let data = (0..args.account_data_size as usize) -// .map(|_| rand.gen::()) -// .collect_vec(); -// loop { -// std::thread::sleep(Duration::from_secs(1) - Instant::now().duration_since(instant)); -// instant = Instant::now(); -// slot += 1; -// for _ in 0..args.accounts_per_second { -// write_version += 1; -// let account = AccountData { -// pubkey: Pubkey::new_unique(), -// account: Account { -// lamports: rand.gen(), -// data: data.clone(), -// owner: Pubkey::new_unique(), -// executable: false, -// rent_epoch: u64::MAX, -// }, -// write_version, -// }; -// let channel_message = ChannelMessage::Account(account, slot, false); -// quic_server.send_message(channel_message)?; -// } -// } -// } diff --git a/plugin/src/quic_plugin.rs b/plugin/src/quic_plugin.rs index e3f5291..83a7ff5 100644 --- a/plugin/src/quic_plugin.rs +++ b/plugin/src/quic_plugin.rs @@ -1,247 +1,231 @@ -// use std::sync::atomic::{AtomicUsize, Ordering}; +use agave_geyser_plugin_interface::geyser_plugin_interface::{ + GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, + ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, +}; +use quic_geyser_common::{ + channel_message::{AccountData, ChannelMessage}, + plugin_error::QuicGeyserError, + quic::quic_server::QuicServer, + types::{ + block_meta::BlockMeta, + slot_identifier::SlotIdentifier, + transaction::{Transaction, TransactionMeta}, + }, +}; +use solana_sdk::{ + account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message, + pubkey::Pubkey, +}; -// use agave_geyser_plugin_interface::geyser_plugin_interface::{ -// GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, -// ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, -// }; -// use quic_geyser_common::{ -// plugin_error::QuicGeyserError, -// quic::quic_server::{AccountData, ChannelMessage, QuicServer}, -// types::{ -// block_meta::BlockMeta, -// slot_identifier::SlotIdentifier, -// transaction::{Transaction, TransactionMeta}, -// }, -// }; -// use solana_sdk::{ -// account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message, -// pubkey::Pubkey, -// }; -// use tokio::runtime::Builder; +use crate::config::Config; -// use crate::config::Config; +#[derive(Debug, Default)] +pub struct QuicGeyserPlugin { + quic_server: Option, +} -// #[derive(Debug, Default)] -// pub struct QuicGeyserPlugin { -// quic_server: Option, -// } +impl GeyserPlugin for QuicGeyserPlugin { + fn name(&self) -> &'static str { + "quic_geyser_plugin" + } -// impl GeyserPlugin for QuicGeyserPlugin { -// fn name(&self) -> &'static str { -// "quic_geyser_plugin" -// } + fn on_load(&mut self, config_file: &str) -> PluginResult<()> { + solana_logger::setup_with_default("info"); + log::info!("loading quic_geyser plugin"); + let config = Config::load_from_file(config_file)?; + log::info!("Quic plugin config correctly loaded"); + let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| { + GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) + })?; + self.quic_server = Some(quic_server); -// fn on_load(&mut self, config_file: &str) -> PluginResult<()> { -// solana_logger::setup_with_default("info"); -// log::info!("loading quic_geyser plugin"); -// let config = Config::load_from_file(config_file)?; -// log::info!("Quic plugin config correctly loaded"); -// let runtime = Builder::new_multi_thread() -// .thread_name_fn(|| { -// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); -// let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed); -// format!("solGeyserQuic{id:02}") -// }) -// .enable_all() -// .build() -// .map_err(|error| { -// let s = error.to_string(); -// log::error!("Runtime Error : {}", s); -// GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) -// })?; + Ok(()) + } -// let quic_server = QuicServer::new(runtime, config.quic_plugin, 20_000).map_err(|_| { -// GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) -// })?; -// self.quic_server = Some(quic_server); + fn on_unload(&mut self) { + self.quic_server = None; + } -// Ok(()) -// } + fn update_account( + &self, + account: ReplicaAccountInfoVersions, + slot: Slot, + is_startup: bool, + ) -> PluginResult<()> { + let Some(quic_server) = &self.quic_server else { + return Ok(()); + }; + let ReplicaAccountInfoVersions::V0_0_3(account_info) = account else { + return Err(GeyserPluginError::AccountsUpdateError { + msg: "Unsupported account info version".to_string(), + }); + }; + let account = Account { + lamports: account_info.lamports, + data: account_info.data.to_vec(), + owner: Pubkey::try_from(account_info.owner).expect("valid pubkey"), + executable: account_info.executable, + rent_epoch: account_info.rent_epoch, + }; + let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey"); + quic_server + .send_message(ChannelMessage::Account( + AccountData { + pubkey, + account, + write_version: account_info.write_version, + }, + slot, + is_startup, + )) + .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; + Ok(()) + } -// fn on_unload(&mut self) { -// self.quic_server = None; -// } + fn notify_end_of_startup(&self) -> PluginResult<()> { + Ok(()) + } -// fn update_account( -// &self, -// account: ReplicaAccountInfoVersions, -// slot: Slot, -// is_startup: bool, -// ) -> PluginResult<()> { -// let Some(quic_server) = &self.quic_server else { -// return Ok(()); -// }; -// let ReplicaAccountInfoVersions::V0_0_3(account_info) = account else { -// return Err(GeyserPluginError::AccountsUpdateError { -// msg: "Unsupported account info version".to_string(), -// }); -// }; -// let account = Account { -// lamports: account_info.lamports, -// data: account_info.data.to_vec(), -// owner: Pubkey::try_from(account_info.owner).expect("valid pubkey"), -// executable: account_info.executable, -// rent_epoch: account_info.rent_epoch, -// }; -// let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey"); -// quic_server -// .send_message(ChannelMessage::Account( -// AccountData { -// pubkey, -// account, -// write_version: account_info.write_version, -// }, -// slot, -// is_startup, -// )) -// .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; -// Ok(()) -// } + fn update_slot_status( + &self, + slot: Slot, + parent: Option, + status: SlotStatus, + ) -> PluginResult<()> { + // Todo + let Some(quic_server) = &self.quic_server else { + return Ok(()); + }; + let commitment_level = match status { + SlotStatus::Processed => CommitmentLevel::Processed, + SlotStatus::Rooted => CommitmentLevel::Finalized, + SlotStatus::Confirmed => CommitmentLevel::Confirmed, + }; + let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level); + quic_server + .send_message(slot_message) + .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; + Ok(()) + } -// fn notify_end_of_startup(&self) -> PluginResult<()> { -// Ok(()) -// } + fn notify_transaction( + &self, + transaction: ReplicaTransactionInfoVersions, + slot: Slot, + ) -> PluginResult<()> { + let Some(quic_server) = &self.quic_server else { + return Ok(()); + }; + let ReplicaTransactionInfoVersions::V0_0_2(solana_transaction) = transaction else { + return Err(GeyserPluginError::TransactionUpdateError { + msg: "Unsupported transaction version".to_string(), + }); + }; -// fn update_slot_status( -// &self, -// slot: Slot, -// parent: Option, -// status: SlotStatus, -// ) -> PluginResult<()> { -// // Todo -// let Some(quic_server) = &self.quic_server else { -// return Ok(()); -// }; -// let commitment_level = match status { -// SlotStatus::Processed => CommitmentLevel::Processed, -// SlotStatus::Rooted => CommitmentLevel::Finalized, -// SlotStatus::Confirmed => CommitmentLevel::Confirmed, -// }; -// let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level); -// quic_server -// .send_message(slot_message) -// .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; -// Ok(()) -// } + let message = solana_transaction.transaction.message(); + let mut account_keys = vec![]; -// fn notify_transaction( -// &self, -// transaction: ReplicaTransactionInfoVersions, -// slot: Slot, -// ) -> PluginResult<()> { -// let Some(quic_server) = &self.quic_server else { -// return Ok(()); -// }; -// let ReplicaTransactionInfoVersions::V0_0_2(solana_transaction) = transaction else { -// return Err(GeyserPluginError::TransactionUpdateError { -// msg: "Unsupported transaction version".to_string(), -// }); -// }; + for index in 0.. { + let account = message.account_keys().get(index); + match account { + Some(account) => account_keys.push(*account), + None => break, + } + } -// let message = solana_transaction.transaction.message(); -// let mut account_keys = vec![]; + let v0_message = Message { + header: *message.header(), + account_keys, + recent_blockhash: *message.recent_blockhash(), + instructions: message.instructions().to_vec(), + address_table_lookups: message.message_address_table_lookups().to_vec(), + }; -// for index in 0.. { -// let account = message.account_keys().get(index); -// match account { -// Some(account) => account_keys.push(*account), -// None => break, -// } -// } + let status_meta = solana_transaction.transaction_status_meta; -// let v0_message = Message { -// header: *message.header(), -// account_keys, -// recent_blockhash: *message.recent_blockhash(), -// instructions: message.instructions().to_vec(), -// address_table_lookups: message.message_address_table_lookups().to_vec(), -// }; + let transaction = Transaction { + slot_identifier: SlotIdentifier { slot }, + signatures: solana_transaction.transaction.signatures().to_vec(), + message: v0_message, + is_vote: solana_transaction.is_vote, + transasction_meta: TransactionMeta { + error: match &status_meta.status { + Ok(_) => None, + Err(e) => Some(e.clone()), + }, + fee: status_meta.fee, + pre_balances: status_meta.pre_balances.clone(), + post_balances: status_meta.post_balances.clone(), + inner_instructions: status_meta.inner_instructions.clone(), + log_messages: status_meta.log_messages.clone(), + rewards: status_meta.rewards.clone(), + loaded_addresses: status_meta.loaded_addresses.clone(), + return_data: status_meta.return_data.clone(), + compute_units_consumed: status_meta.compute_units_consumed, + }, + index: solana_transaction.index as u64, + }; -// let status_meta = solana_transaction.transaction_status_meta; + let transaction_message = ChannelMessage::Transaction(Box::new(transaction)); + quic_server + .send_message(transaction_message) + .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; + Ok(()) + } -// let transaction = Transaction { -// slot_identifier: SlotIdentifier { slot }, -// signatures: solana_transaction.transaction.signatures().to_vec(), -// message: v0_message, -// is_vote: solana_transaction.is_vote, -// transasction_meta: TransactionMeta { -// error: match &status_meta.status { -// Ok(_) => None, -// Err(e) => Some(e.clone()), -// }, -// fee: status_meta.fee, -// pre_balances: status_meta.pre_balances.clone(), -// post_balances: status_meta.post_balances.clone(), -// inner_instructions: status_meta.inner_instructions.clone(), -// log_messages: status_meta.log_messages.clone(), -// rewards: status_meta.rewards.clone(), -// loaded_addresses: status_meta.loaded_addresses.clone(), -// return_data: status_meta.return_data.clone(), -// compute_units_consumed: status_meta.compute_units_consumed, -// }, -// index: solana_transaction.index as u64, -// }; + fn notify_entry(&self, _entry: ReplicaEntryInfoVersions) -> PluginResult<()> { + // Not required + Ok(()) + } -// let transaction_message = ChannelMessage::Transaction(Box::new(transaction)); -// quic_server -// .send_message(transaction_message) -// .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; -// Ok(()) -// } + fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> PluginResult<()> { + let Some(quic_server) = &self.quic_server else { + return Ok(()); + }; -// fn notify_entry(&self, _entry: ReplicaEntryInfoVersions) -> PluginResult<()> { -// // Not required -// Ok(()) -// } + let ReplicaBlockInfoVersions::V0_0_3(blockinfo) = blockinfo else { + return Err(GeyserPluginError::AccountsUpdateError { + msg: "Unsupported account info version".to_string(), + }); + }; -// fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> PluginResult<()> { -// let Some(quic_server) = &self.quic_server else { -// return Ok(()); -// }; + let block_meta = BlockMeta { + parent_slot: blockinfo.parent_slot, + slot: blockinfo.slot, + parent_blockhash: blockinfo.parent_blockhash.to_string(), + blockhash: blockinfo.blockhash.to_string(), + rewards: blockinfo.rewards.to_vec(), + block_height: blockinfo.block_height, + executed_transaction_count: blockinfo.executed_transaction_count, + entries_count: blockinfo.entry_count, + }; -// let ReplicaBlockInfoVersions::V0_0_3(blockinfo) = blockinfo else { -// return Err(GeyserPluginError::AccountsUpdateError { -// msg: "Unsupported account info version".to_string(), -// }); -// }; + quic_server + .send_message(ChannelMessage::BlockMeta(block_meta)) + .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; + Ok(()) + } -// let block_meta = BlockMeta { -// parent_slot: blockinfo.parent_slot, -// slot: blockinfo.slot, -// parent_blockhash: blockinfo.parent_blockhash.to_string(), -// blockhash: blockinfo.blockhash.to_string(), -// rewards: blockinfo.rewards.to_vec(), -// block_height: blockinfo.block_height, -// executed_transaction_count: blockinfo.executed_transaction_count, -// entries_count: blockinfo.entry_count, -// }; + fn account_data_notifications_enabled(&self) -> bool { + true + } -// quic_server -// .send_message(ChannelMessage::BlockMeta(block_meta)) -// .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; -// Ok(()) -// } + fn transaction_notifications_enabled(&self) -> bool { + true + } -// fn account_data_notifications_enabled(&self) -> bool { -// true -// } + fn entry_notifications_enabled(&self) -> bool { + false + } +} -// fn transaction_notifications_enabled(&self) -> bool { -// true -// } - -// fn entry_notifications_enabled(&self) -> bool { -// false -// } -// } - -// #[no_mangle] -// #[allow(improper_ctypes_definitions)] -// /// # Safety -// /// -// /// This function returns the Plugin pointer as trait GeyserPlugin. -// pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin { -// let plugin = QuicGeyserPlugin::default(); -// let plugin: Box = Box::new(plugin); -// Box::into_raw(plugin) -// } +#[no_mangle] +#[allow(improper_ctypes_definitions)] +/// # Safety +/// +/// This function returns the Plugin pointer as trait GeyserPlugin. +pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin { + let plugin = QuicGeyserPlugin::default(); + let plugin: Box = Box::new(plugin); + Box::into_raw(plugin) +}