Integrating block builder code with plugin

This commit is contained in:
godmodegalactus 2024-06-04 18:48:38 +02:00
parent f64ece0b37
commit ffe93036ff
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
10 changed files with 58 additions and 11 deletions

1
Cargo.lock generated
View File

@ -2519,6 +2519,7 @@ dependencies = [
"clap", "clap",
"git-version", "git-version",
"log", "log",
"quic-geyser-block-builder",
"quic-geyser-common", "quic-geyser-common",
"quic-geyser-server", "quic-geyser-server",
"serde", "serde",

View File

@ -63,7 +63,7 @@ quic-geyser-plugin = {path = "plugin", version="0.1.3"}
quic-geyser-server = {path = "server", version="0.1.3"} quic-geyser-server = {path = "server", version="0.1.3"}
quic-geyser-quiche-utils = {path = "quiche", version = "0.1.3"} quic-geyser-quiche-utils = {path = "quiche", version = "0.1.3"}
quic-geyser-blocking-client = {path = "blocking_client", version = "0.1.3"} quic-geyser-blocking-client = {path = "blocking_client", version = "0.1.3"}
quic-geyser-block-builder = {path = "blocking_client", version = "0.1.3"} quic-geyser-block-builder = {path = "block-builder", version = "0.1.3"}
[profile.release] [profile.release]
debug = true debug = true

View File

@ -127,6 +127,7 @@ mod tests {
log_level: "debug".to_string(), log_level: "debug".to_string(),
allow_accounts: true, allow_accounts: true,
allow_accounts_at_startup: false, allow_accounts_at_startup: false,
enable_block_builder: false,
}; };
let quic_server = QuicServer::new(config).unwrap(); let quic_server = QuicServer::new(config).unwrap();
// wait for client to connect and subscribe // wait for client to connect and subscribe

View File

@ -241,6 +241,7 @@ mod tests {
log_level: "debug".to_string(), log_level: "debug".to_string(),
allow_accounts: true, allow_accounts: true,
allow_accounts_at_startup: false, allow_accounts_at_startup: false,
enable_block_builder: false,
}; };
let quic_server = QuicServer::new(config).unwrap(); let quic_server = QuicServer::new(config).unwrap();
// wait for client to connect and subscribe // wait for client to connect and subscribe

View File

@ -28,6 +28,8 @@ pub struct ConfigQuicPlugin {
pub allow_accounts: bool, pub allow_accounts: bool,
#[serde(default)] #[serde(default)]
pub allow_accounts_at_startup: bool, pub allow_accounts_at_startup: bool,
#[serde(default = "ConfigQuicPlugin::default_enable_block_builder")]
pub enable_block_builder: bool,
} }
impl ConfigQuicPlugin { impl ConfigQuicPlugin {
@ -46,6 +48,10 @@ impl ConfigQuicPlugin {
fn default_allow_accounts() -> bool { fn default_allow_accounts() -> bool {
true true
} }
fn default_enable_block_builder() -> bool {
true
}
} }
#[derive(Debug, Clone, Serialize, Deserialize, Copy)] #[derive(Debug, Clone, Serialize, Deserialize, Copy)]

View File

@ -31,6 +31,7 @@ pub fn main() {
number_of_retries: 100, number_of_retries: 100,
allow_accounts: true, allow_accounts: true,
allow_accounts_at_startup: false, allow_accounts_at_startup: false,
enable_block_builder: true,
}; };
let quic_server = QuicServer::new(config).unwrap(); let quic_server = QuicServer::new(config).unwrap();

View File

@ -25,6 +25,7 @@ thiserror = {workspace = true}
quic-geyser-common = { workspace = true } quic-geyser-common = { workspace = true }
quic-geyser-server = { workspace = true } quic-geyser-server = { workspace = true }
quic-geyser-block-builder = { workspace = true }
[build-dependencies] [build-dependencies]
anyhow = { workspace = true } anyhow = { workspace = true }

View File

@ -2,6 +2,7 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions, GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus, ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
}; };
use quic_geyser_block_builder::block_builder::start_block_building_thread;
use quic_geyser_common::{ use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage}, channel_message::{AccountData, ChannelMessage},
plugin_error::QuicGeyserError, plugin_error::QuicGeyserError,
@ -22,6 +23,7 @@ use crate::config::Config;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct QuicGeyserPlugin { pub struct QuicGeyserPlugin {
quic_server: Option<QuicServer>, quic_server: Option<QuicServer>,
block_builder_channel: Option<std::sync::mpsc::Sender<ChannelMessage>>,
} }
impl GeyserPlugin for QuicGeyserPlugin { impl GeyserPlugin for QuicGeyserPlugin {
@ -32,11 +34,22 @@ impl GeyserPlugin for QuicGeyserPlugin {
fn on_load(&mut self, config_file: &str) -> PluginResult<()> { fn on_load(&mut self, config_file: &str) -> PluginResult<()> {
log::info!("loading quic_geyser plugin"); log::info!("loading quic_geyser plugin");
let config = Config::load_from_file(config_file)?; let config = Config::load_from_file(config_file)?;
let compression_type = config.quic_plugin.compression_parameters.compression_type;
let enable_block_builder = config.quic_plugin.enable_block_builder;
log::info!("Quic plugin config correctly loaded"); log::info!("Quic plugin config correctly loaded");
solana_logger::setup_with_default(&config.quic_plugin.log_level); solana_logger::setup_with_default(&config.quic_plugin.log_level);
let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| { let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer)) GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?; })?;
if enable_block_builder {
let (sx, rx) = std::sync::mpsc::channel();
start_block_building_thread(
rx,
quic_server.data_channel_sender.clone(),
compression_type,
);
self.block_builder_channel = Some(sx);
}
self.quic_server = Some(quic_server); self.quic_server = Some(quic_server);
Ok(()) Ok(())
@ -75,15 +88,21 @@ impl GeyserPlugin for QuicGeyserPlugin {
}; };
let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey"); let pubkey: Pubkey = Pubkey::try_from(account_info.pubkey).expect("valid pubkey");
quic_server let channel_message = ChannelMessage::Account(
.send_message(ChannelMessage::Account(
AccountData { AccountData {
pubkey, pubkey,
account, account,
write_version: account_info.write_version, write_version: account_info.write_version,
}, },
slot, slot,
)) );
if let Some(block_channel) = &self.block_builder_channel {
let _ = block_channel.send(channel_message.clone());
}
quic_server
.send_message(channel_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
Ok(()) Ok(())
} }
@ -108,6 +127,11 @@ impl GeyserPlugin for QuicGeyserPlugin {
SlotStatus::Confirmed => CommitmentConfig::confirmed(), SlotStatus::Confirmed => CommitmentConfig::confirmed(),
}; };
let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level); let slot_message = ChannelMessage::Slot(slot, parent.unwrap_or_default(), commitment_level);
if let Some(block_channel) = &self.block_builder_channel {
let _ = block_channel.send(slot_message.clone());
}
quic_server quic_server
.send_message(slot_message) .send_message(slot_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
@ -173,6 +197,11 @@ impl GeyserPlugin for QuicGeyserPlugin {
}; };
let transaction_message = ChannelMessage::Transaction(Box::new(transaction)); let transaction_message = ChannelMessage::Transaction(Box::new(transaction));
if let Some(block_channel) = &self.block_builder_channel {
let _ = block_channel.send(transaction_message.clone());
}
quic_server quic_server
.send_message(transaction_message) .send_message(transaction_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
@ -206,8 +235,14 @@ impl GeyserPlugin for QuicGeyserPlugin {
entries_count: blockinfo.entry_count, entries_count: blockinfo.entry_count,
}; };
let block_meta_message = ChannelMessage::BlockMeta(block_meta);
if let Some(block_channel) = &self.block_builder_channel {
let _ = block_channel.send(block_meta_message.clone());
}
quic_server quic_server
.send_message(ChannelMessage::BlockMeta(block_meta)) .send_message(block_meta_message)
.map_err(|e| GeyserPluginError::Custom(Box::new(e)))?; .map_err(|e| GeyserPluginError::Custom(Box::new(e)))?;
Ok(()) Ok(())
} }

View File

@ -58,6 +58,7 @@ pub fn main() -> anyhow::Result<()> {
number_of_retries: 100, number_of_retries: 100,
allow_accounts: true, allow_accounts: true,
allow_accounts_at_startup: false, allow_accounts_at_startup: false,
enable_block_builder: false,
}; };
let (server_sender, server_reciever) = std::sync::mpsc::channel::<ChannelMessage>(); let (server_sender, server_reciever) = std::sync::mpsc::channel::<ChannelMessage>();

View File

@ -7,7 +7,7 @@ use quic_geyser_common::{
use super::quiche_server_loop::server_loop; use super::quiche_server_loop::server_loop;
pub struct QuicServer { pub struct QuicServer {
data_channel_sender: mpsc::Sender<ChannelMessage>, pub data_channel_sender: mpsc::Sender<ChannelMessage>,
pub quic_plugin_config: ConfigQuicPlugin, pub quic_plugin_config: ConfigQuicPlugin,
} }