Implementing geyser plugin and testers with quiche

This commit is contained in:
godmodegalactus 2024-05-22 15:21:00 +02:00
parent 660299f3fc
commit 389701c899
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
7 changed files with 289 additions and 301 deletions

View File

@ -159,7 +159,7 @@ mod tests {
let (client, reciever) = Client::new(
url,
ConnectionParameters {
max_number_of_streams: 3,
max_number_of_streams: 10,
recieve_window_size: 1_000_000,
timeout_in_seconds: 10,
},

View File

@ -2,7 +2,7 @@ use crate::quic::configure_server::ALPN_GEYSER_PROTOCOL_ID;
use super::configure_server::MAX_DATAGRAM_SIZE;
pub const DEFAULT_MAX_STREAMS: u32 = 4096;
pub const DEFAULT_MAX_STREAMS: u32 = 32 * 1024;
pub const DEFAULT_MAX_RECIEVE_WINDOW_SIZE: u64 = 64_000_000; // 64 MBs
pub fn configure_client(

View File

@ -29,7 +29,13 @@ impl QuicServer {
let (data_channel_sender, data_channel_tx) = mio_channel::channel();
let _server_loop_jh = std::thread::spawn(move || {
if let Err(e) = server_loop(server_config, socket, data_channel_tx, compression_type) {
if let Err(e) = server_loop(
server_config,
socket,
data_channel_tx,
compression_type,
true,
) {
panic!("Server loop closed by error : {e}");
}
});

View File

@ -308,6 +308,7 @@ mod tests {
socket_addr,
rx_sent_queue,
CompressionType::Lz4Fast(8),
true,
) {
println!("Server loop closed by error : {e}");
}

View File

@ -38,6 +38,7 @@ pub fn server_loop(
socket_addr: SocketAddr,
mut message_send_queue: mio_channel::Receiver<ChannelMessage>,
compression_type: CompressionType,
stop_laggy_client: bool,
) -> anyhow::Result<()> {
let mut socket = UdpSocket::bind(socket_addr)?;
@ -320,6 +321,15 @@ pub fn server_loop(
&binary,
) {
log::error!("Error sending message : {e}");
if stop_laggy_client {
log::info!(
"Stopping laggy client : {}",
client.conn.trace_id()
);
if let Err(e) = client.conn.close(true, 1, b"laggy client") {
log::error!("error closing client : {}", e);
}
}
}
}
}

View File

@ -1,77 +1,64 @@
pub fn main() -> anyhow::Result<()> {
Ok(())
use std::{
net::SocketAddr,
str::FromStr,
time::{Duration, Instant},
};
use clap::Parser;
use cli::Args;
use itertools::Itertools;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
quic::quic_server::QuicServer,
};
use rand::{thread_rng, Rng};
use solana_sdk::{account::Account, pubkey::Pubkey};
pub mod cli;
pub fn main() {
let args = Args::parse();
let config = ConfigQuicPlugin {
address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(),
quic_parameters: QuicParameters::default(),
compression_parameters: CompressionParameters {
compression_type: quic_geyser_common::compression::CompressionType::None,
},
number_of_retries: 100,
};
let quic_server = QuicServer::new(config).unwrap();
let mut instant = Instant::now();
// to avoid errors
std::thread::sleep(Duration::from_millis(500));
let mut slot = 1;
let mut write_version = 1;
let mut rand = thread_rng();
let data = (0..args.account_data_size as usize)
.map(|_| rand.gen::<u8>())
.collect_vec();
loop {
std::thread::sleep(Duration::from_secs(1) - Instant::now().duration_since(instant));
instant = Instant::now();
slot += 1;
for _ in 0..args.accounts_per_second {
write_version += 1;
let account = AccountData {
pubkey: Pubkey::new_unique(),
account: Account {
lamports: rand.gen(),
data: data.clone(),
owner: Pubkey::new_unique(),
executable: false,
rent_epoch: u64::MAX,
},
write_version,
};
let channel_message = ChannelMessage::Account(account, slot, false);
quic_server.send_message(channel_message).unwrap();
}
}
}
// use std::{
// net::SocketAddr,
// str::FromStr,
// time::{Duration, Instant},
// };
// use clap::Parser;
// use cli::Args;
// use itertools::Itertools;
// use quic_geyser_common::{
// config::{CompressionParameters, ConfigQuicPlugin, QuicParameters},
// quic::quic_server::{AccountData, ChannelMessage, QuicServer},
// };
// use rand::{thread_rng, Rng};
// use solana_sdk::{account::Account, pubkey::Pubkey};
// use tokio::runtime::Builder;
// pub mod cli;
// pub fn main() -> anyhow::Result<()> {
// let args = Args::parse();
// let runtime = Builder::new_multi_thread()
// .thread_name_fn(|| "solGeyserQuic".to_string())
// .enable_all()
// .build()
// .map_err(|error| {
// let s = error.to_string();
// log::error!("Runtime Error : {}", s);
// error
// })?;
// let config = ConfigQuicPlugin {
// address: SocketAddr::from_str(format!("0.0.0.0:{}", args.port).as_str()).unwrap(),
// quic_parameters: QuicParameters::default(),
// compression_parameters: CompressionParameters {
// compression_type: quic_geyser_common::compression::CompressionType::None,
// },
// number_of_retries: 100,
// };
// let quic_server = QuicServer::new(runtime, config, args.max_lagging).unwrap();
// let mut instant = Instant::now();
// // to avoid errors
// std::thread::sleep(Duration::from_millis(500));
// let mut slot = 1;
// let mut write_version = 1;
// let mut rand = thread_rng();
// let data = (0..args.account_data_size as usize)
// .map(|_| rand.gen::<u8>())
// .collect_vec();
// loop {
// std::thread::sleep(Duration::from_secs(1) - Instant::now().duration_since(instant));
// instant = Instant::now();
// slot += 1;
// for _ in 0..args.accounts_per_second {
// write_version += 1;
// let account = AccountData {
// pubkey: Pubkey::new_unique(),
// account: Account {
// lamports: rand.gen(),
// data: data.clone(),
// owner: Pubkey::new_unique(),
// executable: false,
// rent_epoch: u64::MAX,
// },
// write_version,
// };
// let channel_message = ChannelMessage::Account(account, slot, false);
// quic_server.send_message(channel_message)?;
// }
// }
// }

View File

@ -1,247 +1,231 @@
// use std::sync::atomic::{AtomicUsize, Ordering};
use agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
};
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
plugin_error::QuicGeyserError,
quic::quic_server::QuicServer,
types::{
block_meta::BlockMeta,
slot_identifier::SlotIdentifier,
transaction::{Transaction, TransactionMeta},
},
};
use solana_sdk::{
account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message,
pubkey::Pubkey,
};
// use agave_geyser_plugin_interface::geyser_plugin_interface::{
// GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
// ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
// };
// use quic_geyser_common::{
// plugin_error::QuicGeyserError,
// quic::quic_server::{AccountData, ChannelMessage, QuicServer},
// types::{
// block_meta::BlockMeta,
// slot_identifier::SlotIdentifier,
// transaction::{Transaction, TransactionMeta},
// },
// };
// use solana_sdk::{
// account::Account, clock::Slot, commitment_config::CommitmentLevel, message::v0::Message,
// pubkey::Pubkey,
// };
// use tokio::runtime::Builder;
use crate::config::Config;
// use crate::config::Config;
#[derive(Debug, Default)]
pub struct QuicGeyserPlugin {
quic_server: Option<QuicServer>,
}
// #[derive(Debug, Default)]
// pub struct QuicGeyserPlugin {
// quic_server: Option<QuicServer>,
// }
impl GeyserPlugin for QuicGeyserPlugin {
fn name(&self) -> &'static str {
"quic_geyser_plugin"
}
// impl GeyserPlugin for QuicGeyserPlugin {
// fn name(&self) -> &'static str {
// "quic_geyser_plugin"
// }
fn on_load(&mut self, config_file: &str) -> PluginResult<()> {
solana_logger::setup_with_default("info");
log::info!("loading quic_geyser plugin");
let config = Config::load_from_file(config_file)?;
log::info!("Quic plugin config correctly loaded");
let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?;
self.quic_server = Some(quic_server);
// fn on_load(&mut self, config_file: &str) -> PluginResult<()> {
// solana_logger::setup_with_default("info");
// log::info!("loading quic_geyser plugin");
// let config = Config::load_from_file(config_file)?;
// log::info!("Quic plugin config correctly loaded");
// let runtime = Builder::new_multi_thread()
// .thread_name_fn(|| {
// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
// let id = ATOMIC_ID.fetch_add(1, Ordering::Relaxed);
// format!("solGeyserQuic{id:02}")
// })
// .enable_all()
// .build()
// .map_err(|error| {
// let s = error.to_string();
// log::error!("Runtime Error : {}", s);
// GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
// })?;
Ok(())
}
// let quic_server = QuicServer::new(runtime, config.quic_plugin, 20_000).map_err(|_| {
// GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
// })?;
// self.quic_server = Some(quic_server);
fn on_unload(&mut self) {
self.quic_server = None;
}
// Ok(())
// }
fn update_account(
&self,
account: ReplicaAccountInfoVersions,
slot: Slot,
is_startup: bool,
) -> PluginResult<()> {
let Some(quic_server) = &self.quic_server else {
return Ok(());
};
let ReplicaAccountInfoVersions::V0_0_3(account_info) = account else {
return Err(GeyserPluginError::AccountsUpdateError {
msg: "Unsupported account info version".to_string(),
});
};
let account = Account {
lamports: account_info.lamports,
data: account_info.data.to_vec(),
owner: Pubkey::try_from(account_info.owner).expect("valid pubkey"),
executable: account_info.executable,
rent_epoch: account_info.rent_epoch,
};
let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey");
quic_server
.send_message(ChannelMessage::Account(
AccountData {
pubkey,
account,
write_version: account_info.write_version,
},
slot,
is_startup,
))
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
Ok(())
}
// fn on_unload(&mut self) {
// self.quic_server = None;
// }
fn notify_end_of_startup(&self) -> PluginResult<()> {
Ok(())
}
// fn update_account(
// &self,
// account: ReplicaAccountInfoVersions,
// slot: Slot,
// is_startup: bool,
// ) -> PluginResult<()> {
// let Some(quic_server) = &self.quic_server else {
// return Ok(());
// };
// let ReplicaAccountInfoVersions::V0_0_3(account_info) = account else {
// return Err(GeyserPluginError::AccountsUpdateError {
// msg: "Unsupported account info version".to_string(),
// });
// };
// let account = Account {
// lamports: account_info.lamports,
// data: account_info.data.to_vec(),
// owner: Pubkey::try_from(account_info.owner).expect("valid pubkey"),
// executable: account_info.executable,
// rent_epoch: account_info.rent_epoch,
// };
// let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey");
// quic_server
// .send_message(ChannelMessage::Account(
// AccountData {
// pubkey,
// account,
// write_version: account_info.write_version,
// },
// slot,
// is_startup,
// ))
// .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
// Ok(())
// }
fn update_slot_status(
&self,
slot: Slot,
parent: Option<u64>,
status: SlotStatus,
) -> PluginResult<()> {
// Todo
let Some(quic_server) = &self.quic_server else {
return Ok(());
};
let commitment_level = match status {
SlotStatus::Processed => CommitmentLevel::Processed,
SlotStatus::Rooted => CommitmentLevel::Finalized,
SlotStatus::Confirmed => CommitmentLevel::Confirmed,
};
let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level);
quic_server
.send_message(slot_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
Ok(())
}
// fn notify_end_of_startup(&self) -> PluginResult<()> {
// Ok(())
// }
fn notify_transaction(
&self,
transaction: ReplicaTransactionInfoVersions,
slot: Slot,
) -> PluginResult<()> {
let Some(quic_server) = &self.quic_server else {
return Ok(());
};
let ReplicaTransactionInfoVersions::V0_0_2(solana_transaction) = transaction else {
return Err(GeyserPluginError::TransactionUpdateError {
msg: "Unsupported transaction version".to_string(),
});
};
// fn update_slot_status(
// &self,
// slot: Slot,
// parent: Option<u64>,
// status: SlotStatus,
// ) -> PluginResult<()> {
// // Todo
// let Some(quic_server) = &self.quic_server else {
// return Ok(());
// };
// let commitment_level = match status {
// SlotStatus::Processed => CommitmentLevel::Processed,
// SlotStatus::Rooted => CommitmentLevel::Finalized,
// SlotStatus::Confirmed => CommitmentLevel::Confirmed,
// };
// let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level);
// quic_server
// .send_message(slot_message)
// .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
// Ok(())
// }
let message = solana_transaction.transaction.message();
let mut account_keys = vec![];
// fn notify_transaction(
// &self,
// transaction: ReplicaTransactionInfoVersions,
// slot: Slot,
// ) -> PluginResult<()> {
// let Some(quic_server) = &self.quic_server else {
// return Ok(());
// };
// let ReplicaTransactionInfoVersions::V0_0_2(solana_transaction) = transaction else {
// return Err(GeyserPluginError::TransactionUpdateError {
// msg: "Unsupported transaction version".to_string(),
// });
// };
for index in 0.. {
let account = message.account_keys().get(index);
match account {
Some(account) => account_keys.push(*account),
None => break,
}
}
// let message = solana_transaction.transaction.message();
// let mut account_keys = vec![];
let v0_message = Message {
header: *message.header(),
account_keys,
recent_blockhash: *message.recent_blockhash(),
instructions: message.instructions().to_vec(),
address_table_lookups: message.message_address_table_lookups().to_vec(),
};
// for index in 0.. {
// let account = message.account_keys().get(index);
// match account {
// Some(account) => account_keys.push(*account),
// None => break,
// }
// }
let status_meta = solana_transaction.transaction_status_meta;
// let v0_message = Message {
// header: *message.header(),
// account_keys,
// recent_blockhash: *message.recent_blockhash(),
// instructions: message.instructions().to_vec(),
// address_table_lookups: message.message_address_table_lookups().to_vec(),
// };
let transaction = Transaction {
slot_identifier: SlotIdentifier { slot },
signatures: solana_transaction.transaction.signatures().to_vec(),
message: v0_message,
is_vote: solana_transaction.is_vote,
transasction_meta: TransactionMeta {
error: match &status_meta.status {
Ok(_) => None,
Err(e) => Some(e.clone()),
},
fee: status_meta.fee,
pre_balances: status_meta.pre_balances.clone(),
post_balances: status_meta.post_balances.clone(),
inner_instructions: status_meta.inner_instructions.clone(),
log_messages: status_meta.log_messages.clone(),
rewards: status_meta.rewards.clone(),
loaded_addresses: status_meta.loaded_addresses.clone(),
return_data: status_meta.return_data.clone(),
compute_units_consumed: status_meta.compute_units_consumed,
},
index: solana_transaction.index as u64,
};
// let status_meta = solana_transaction.transaction_status_meta;
let transaction_message = ChannelMessage::Transaction(Box::new(transaction));
quic_server
.send_message(transaction_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
Ok(())
}
// let transaction = Transaction {
// slot_identifier: SlotIdentifier { slot },
// signatures: solana_transaction.transaction.signatures().to_vec(),
// message: v0_message,
// is_vote: solana_transaction.is_vote,
// transasction_meta: TransactionMeta {
// error: match &status_meta.status {
// Ok(_) => None,
// Err(e) => Some(e.clone()),
// },
// fee: status_meta.fee,
// pre_balances: status_meta.pre_balances.clone(),
// post_balances: status_meta.post_balances.clone(),
// inner_instructions: status_meta.inner_instructions.clone(),
// log_messages: status_meta.log_messages.clone(),
// rewards: status_meta.rewards.clone(),
// loaded_addresses: status_meta.loaded_addresses.clone(),
// return_data: status_meta.return_data.clone(),
// compute_units_consumed: status_meta.compute_units_consumed,
// },
// index: solana_transaction.index as u64,
// };
fn notify_entry(&self, _entry: ReplicaEntryInfoVersions) -> PluginResult<()> {
// Not required
Ok(())
}
// let transaction_message = ChannelMessage::Transaction(Box::new(transaction));
// quic_server
// .send_message(transaction_message)
// .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
// Ok(())
// }
fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> PluginResult<()> {
let Some(quic_server) = &self.quic_server else {
return Ok(());
};
// fn notify_entry(&self, _entry: ReplicaEntryInfoVersions) -> PluginResult<()> {
// // Not required
// Ok(())
// }
let ReplicaBlockInfoVersions::V0_0_3(blockinfo) = blockinfo else {
return Err(GeyserPluginError::AccountsUpdateError {
msg: "Unsupported account info version".to_string(),
});
};
// fn notify_block_metadata(&self, blockinfo: ReplicaBlockInfoVersions) -> PluginResult<()> {
// let Some(quic_server) = &self.quic_server else {
// return Ok(());
// };
let block_meta = BlockMeta {
parent_slot: blockinfo.parent_slot,
slot: blockinfo.slot,
parent_blockhash: blockinfo.parent_blockhash.to_string(),
blockhash: blockinfo.blockhash.to_string(),
rewards: blockinfo.rewards.to_vec(),
block_height: blockinfo.block_height,
executed_transaction_count: blockinfo.executed_transaction_count,
entries_count: blockinfo.entry_count,
};
// let ReplicaBlockInfoVersions::V0_0_3(blockinfo) = blockinfo else {
// return Err(GeyserPluginError::AccountsUpdateError {
// msg: "Unsupported account info version".to_string(),
// });
// };
quic_server
.send_message(ChannelMessage::BlockMeta(block_meta))
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
Ok(())
}
// let block_meta = BlockMeta {
// parent_slot: blockinfo.parent_slot,
// slot: blockinfo.slot,
// parent_blockhash: blockinfo.parent_blockhash.to_string(),
// blockhash: blockinfo.blockhash.to_string(),
// rewards: blockinfo.rewards.to_vec(),
// block_height: blockinfo.block_height,
// executed_transaction_count: blockinfo.executed_transaction_count,
// entries_count: blockinfo.entry_count,
// };
fn account_data_notifications_enabled(&self) -> bool {
true
}
// quic_server
// .send_message(ChannelMessage::BlockMeta(block_meta))
// .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
// Ok(())
// }
fn transaction_notifications_enabled(&self) -> bool {
true
}
// fn account_data_notifications_enabled(&self) -> bool {
// true
// }
fn entry_notifications_enabled(&self) -> bool {
false
}
}
// fn transaction_notifications_enabled(&self) -> bool {
// true
// }
// fn entry_notifications_enabled(&self) -> bool {
// false
// }
// }
// #[no_mangle]
// #[allow(improper_ctypes_definitions)]
// /// # Safety
// ///
// /// This function returns the Plugin pointer as trait GeyserPlugin.
// pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
// let plugin = QuicGeyserPlugin::default();
// let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
// Box::into_raw(plugin)
// }
#[no_mangle]
#[allow(improper_ctypes_definitions)]
/// # Safety
///
/// This function returns the Plugin pointer as trait GeyserPlugin.
pub unsafe extern "C" fn _create_plugin() -> *mut dyn GeyserPlugin {
let plugin = QuicGeyserPlugin::default();
let plugin: Box<dyn GeyserPlugin> = Box::new(plugin);
Box::into_raw(plugin)
}