From ffe93036ff5cfd3a1eb4712a3f0f061e3c9c6aef Mon Sep 17 00:00:00 2001 From: godmodegalactus Date: Tue, 4 Jun 2024 18:48:38 +0200 Subject: [PATCH] Integrating block builder code with plugin --- Cargo.lock | 1 + Cargo.toml | 2 +- blocking_client/src/client.rs | 1 + client/src/non_blocking/client.rs | 1 + common/src/config.rs | 6 ++++ examples/tester-server/src/main.rs | 1 + plugin/Cargo.toml | 1 + plugin/src/quic_plugin.rs | 53 +++++++++++++++++++++++++----- proxy/src/main.rs | 1 + server/src/quic_server.rs | 2 +- 10 files changed, 58 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76cded2..ab2b8b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2519,6 +2519,7 @@ dependencies = [ "clap", "git-version", "log", + "quic-geyser-block-builder", "quic-geyser-common", "quic-geyser-server", "serde", diff --git a/Cargo.toml b/Cargo.toml index 7cf2ea2..56e9c0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,7 +63,7 @@ quic-geyser-plugin = {path = "plugin", version="0.1.3"} quic-geyser-server = {path = "server", version="0.1.3"} quic-geyser-quiche-utils = {path = "quiche", version = "0.1.3"} quic-geyser-blocking-client = {path = "blocking_client", version = "0.1.3"} -quic-geyser-block-builder = {path = "blocking_client", version = "0.1.3"} +quic-geyser-block-builder = {path = "block-builder", version = "0.1.3"} [profile.release] debug = true diff --git a/blocking_client/src/client.rs b/blocking_client/src/client.rs index 880460c..49e8265 100644 --- a/blocking_client/src/client.rs +++ b/blocking_client/src/client.rs @@ -127,6 +127,7 @@ mod tests { log_level: "debug".to_string(), allow_accounts: true, allow_accounts_at_startup: false, + enable_block_builder: false, }; let quic_server = QuicServer::new(config).unwrap(); // wait for client to connect and subscribe diff --git a/client/src/non_blocking/client.rs b/client/src/non_blocking/client.rs index 77e2db7..4e7c8a9 100644 --- a/client/src/non_blocking/client.rs +++ b/client/src/non_blocking/client.rs @@ -241,6 +241,7 @@ mod tests { log_level: "debug".to_string(), allow_accounts: true, allow_accounts_at_startup: false, + enable_block_builder: false, }; let quic_server = QuicServer::new(config).unwrap(); // wait for client to connect and subscribe diff --git a/common/src/config.rs b/common/src/config.rs index 3a895d2..97878ae 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -28,6 +28,8 @@ pub struct ConfigQuicPlugin { pub allow_accounts: bool, #[serde(default)] pub allow_accounts_at_startup: bool, + #[serde(default = "ConfigQuicPlugin::default_enable_block_builder")] + pub enable_block_builder: bool, } impl ConfigQuicPlugin { @@ -46,6 +48,10 @@ impl ConfigQuicPlugin { fn default_allow_accounts() -> bool { true } + + fn default_enable_block_builder() -> bool { + true + } } #[derive(Debug, Clone, Serialize, Deserialize, Copy)] diff --git a/examples/tester-server/src/main.rs b/examples/tester-server/src/main.rs index f5bfe76..b72139d 100644 --- a/examples/tester-server/src/main.rs +++ b/examples/tester-server/src/main.rs @@ -31,6 +31,7 @@ pub fn main() { number_of_retries: 100, allow_accounts: true, allow_accounts_at_startup: false, + enable_block_builder: true, }; let quic_server = QuicServer::new(config).unwrap(); diff --git a/plugin/Cargo.toml b/plugin/Cargo.toml index e695a7c..de94f8c 100644 --- a/plugin/Cargo.toml +++ b/plugin/Cargo.toml @@ -25,6 +25,7 @@ thiserror = {workspace = true} quic-geyser-common = { workspace = true } quic-geyser-server = { workspace = true } +quic-geyser-block-builder = { workspace = true } [build-dependencies] anyhow = { workspace = true } diff --git a/plugin/src/quic_plugin.rs b/plugin/src/quic_plugin.rs index 7059563..53e4996 100644 --- a/plugin/src/quic_plugin.rs +++ b/plugin/src/quic_plugin.rs @@ -2,6 +2,7 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{ GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, }; +use quic_geyser_block_builder::block_builder::start_block_building_thread; use quic_geyser_common::{ channel_message::{AccountData, ChannelMessage}, plugin_error::QuicGeyserError, @@ -22,6 +23,7 @@ use crate::config::Config; #[derive(Debug, Default)] pub struct QuicGeyserPlugin { quic_server: Option, + block_builder_channel: Option>, } impl GeyserPlugin for QuicGeyserPlugin { @@ -32,11 +34,22 @@ impl GeyserPlugin for QuicGeyserPlugin { fn on_load(&mut self, config_file: &str) -> PluginResult<()> { log::info!("loading quic_geyser plugin"); let config = Config::load_from_file(config_file)?; + let compression_type = config.quic_plugin.compression_parameters.compression_type; + let enable_block_builder = config.quic_plugin.enable_block_builder; log::info!("Quic plugin config correctly loaded"); solana_logger::setup_with_default(&config.quic_plugin.log_level); let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| { GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) })?; + if enable_block_builder { + let (sx, rx) = std::sync::mpsc::channel(); + start_block_building_thread( + rx, + quic_server.data_channel_sender.clone(), + compression_type, + ); + self.block_builder_channel = Some(sx); + } self.quic_server = Some(quic_server); Ok(()) @@ -75,15 +88,21 @@ impl GeyserPlugin for QuicGeyserPlugin { }; let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey"); + let channel_message = ChannelMessage::Account( + AccountData { + pubkey, + account, + write_version: account_info.write_version, + }, + slot, + ); + + if let Some(block_channel) = &self.block_builder_channel { + let _ = block_channel.send(channel_message.clone()); + } + quic_server - .send_message(ChannelMessage::Account( - AccountData { - pubkey, - account, - write_version: account_info.write_version, - }, - slot, - )) + .send_message(channel_message) .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; Ok(()) } @@ -108,6 +127,11 @@ impl GeyserPlugin for QuicGeyserPlugin { SlotStatus::Confirmed => CommitmentConfig::confirmed(), }; let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level); + + if let Some(block_channel) = &self.block_builder_channel { + let _ = block_channel.send(slot_message.clone()); + } + quic_server .send_message(slot_message) .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; @@ -173,6 +197,11 @@ impl GeyserPlugin for QuicGeyserPlugin { }; let transaction_message = ChannelMessage::Transaction(Box::new(transaction)); + + if let Some(block_channel) = &self.block_builder_channel { + let _ = block_channel.send(transaction_message.clone()); + } + quic_server .send_message(transaction_message) .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; @@ -206,8 +235,14 @@ impl GeyserPlugin for QuicGeyserPlugin { entries_count: blockinfo.entry_count, }; + let block_meta_message = ChannelMessage::BlockMeta(block_meta); + + if let Some(block_channel) = &self.block_builder_channel { + let _ = block_channel.send(block_meta_message.clone()); + } + quic_server - .send_message(ChannelMessage::BlockMeta(block_meta)) + .send_message(block_meta_message) .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; Ok(()) } diff --git a/proxy/src/main.rs b/proxy/src/main.rs index a9e24ef..ba5813c 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -58,6 +58,7 @@ pub fn main() -> anyhow::Result<()> { number_of_retries: 100, allow_accounts: true, allow_accounts_at_startup: false, + enable_block_builder: false, }; let (server_sender, server_reciever) = std::sync::mpsc::channel::(); diff --git a/server/src/quic_server.rs b/server/src/quic_server.rs index 895dfd0..fb720c6 100644 --- a/server/src/quic_server.rs +++ b/server/src/quic_server.rs @@ -7,7 +7,7 @@ use quic_geyser_common::{ use super::quiche_server_loop::server_loop; pub struct QuicServer { - data_channel_sender: mpsc::Sender, + pub data_channel_sender: mpsc::Sender, pub quic_plugin_config: ConfigQuicPlugin, }