make sending of accounts on blocksubscription optional, false by default

This commit is contained in:
godmodegalactus 2024-06-21 10:37:08 +02:00
parent 85b5b9f182
commit d90988ef95
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
8 changed files with 42 additions and 21 deletions

View File

@ -18,9 +18,15 @@ pub fn start_block_building_thread(
channel_messages: Receiver<ChannelMessage>, channel_messages: Receiver<ChannelMessage>,
output: Sender<ChannelMessage>, output: Sender<ChannelMessage>,
compression_type: CompressionType, compression_type: CompressionType,
build_blocks_with_accounts: bool,
) { ) {
std::thread::spawn(move || { std::thread::spawn(move || {
build_blocks(channel_messages, output, compression_type); build_blocks(
channel_messages,
output,
compression_type,
build_blocks_with_accounts,
);
}); });
} }
@ -35,32 +41,35 @@ pub fn build_blocks(
channel_messages: Receiver<ChannelMessage>, channel_messages: Receiver<ChannelMessage>,
output: Sender<ChannelMessage>, output: Sender<ChannelMessage>,
compression_type: CompressionType, compression_type: CompressionType,
build_blocks_with_accounts: bool,
) { ) {
let mut partially_build_blocks = BTreeMap::<u64, PartialBlock>::new(); let mut partially_build_blocks = BTreeMap::<u64, PartialBlock>::new();
while let Ok(channel_message) = channel_messages.recv() { while let Ok(channel_message) = channel_messages.recv() {
match channel_message { match channel_message {
ChannelMessage::Account(account_data, slot) => { ChannelMessage::Account(account_data, slot) => {
if let Some(lowest) = partially_build_blocks.first_entry() { if build_blocks_with_accounts {
if *lowest.key() > slot { if let Some(lowest) = partially_build_blocks.first_entry() {
log::error!("Account update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), slot); if *lowest.key() > slot {
log::error!("Account update is too late the slot data has already been dispactched lowest slot: {}, account slot: {}", lowest.key(), slot);
}
} }
} // save account updates
// save account updates let partial_block = match partially_build_blocks.get_mut(&slot) {
let partial_block = match partially_build_blocks.get_mut(&slot) { Some(pb) => pb,
Some(pb) => pb, None => {
None => { partially_build_blocks.insert(slot, PartialBlock::default());
partially_build_blocks.insert(slot, PartialBlock::default()); partially_build_blocks.get_mut(&slot).unwrap()
partially_build_blocks.get_mut(&slot).unwrap() }
};
let update = match partial_block.account_updates.get(&account_data.pubkey) {
Some(prev_update) => prev_update.write_version < account_data.write_version,
None => true,
};
if update {
partial_block
.account_updates
.insert(account_data.pubkey, account_data);
} }
};
let update = match partial_block.account_updates.get(&account_data.pubkey) {
Some(prev_update) => prev_update.write_version < account_data.write_version,
None => true,
};
if update {
partial_block
.account_updates
.insert(account_data.pubkey, account_data);
} }
} }
ChannelMessage::Slot(slot, _parent_slot, commitment) => { ChannelMessage::Slot(slot, _parent_slot, commitment) => {

View File

@ -1,4 +1,3 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::mpsc::{channel, TryRecvError}, sync::mpsc::{channel, TryRecvError},
@ -38,6 +37,7 @@ fn test_block_creation_transactions_after_blockmeta() {
cm_rx, cm_rx,
ms_sx, ms_sx,
quic_geyser_common::compression::CompressionType::None, quic_geyser_common::compression::CompressionType::None,
true,
); );
let acc1_pk = Pubkey::new_unique(); let acc1_pk = Pubkey::new_unique();
@ -261,6 +261,7 @@ fn test_block_creation_blockmeta_after_transactions() {
cm_rx, cm_rx,
ms_sx, ms_sx,
quic_geyser_common::compression::CompressionType::None, quic_geyser_common::compression::CompressionType::None,
true,
); );
let acc1_pk = Pubkey::new_unique(); let acc1_pk = Pubkey::new_unique();
@ -485,6 +486,7 @@ fn test_block_creation_incomplete_block_after_slot_notification() {
cm_rx, cm_rx,
ms_sx, ms_sx,
quic_geyser_common::compression::CompressionType::None, quic_geyser_common::compression::CompressionType::None,
true,
); );
let acc1_pk = Pubkey::new_unique(); let acc1_pk = Pubkey::new_unique();
@ -708,6 +710,7 @@ fn test_block_creation_incomplete_slot() {
cm_rx, cm_rx,
ms_sx, ms_sx,
quic_geyser_common::compression::CompressionType::None, quic_geyser_common::compression::CompressionType::None,
true,
); );
let acc1_pk = Pubkey::new_unique(); let acc1_pk = Pubkey::new_unique();

View File

@ -127,6 +127,7 @@ mod tests {
allow_accounts: true, allow_accounts: true,
allow_accounts_at_startup: false, allow_accounts_at_startup: false,
enable_block_builder: false, enable_block_builder: false,
build_blocks_with_accounts: false,
}; };
let quic_server = QuicServer::new(config).unwrap(); let quic_server = QuicServer::new(config).unwrap();
// wait for client to connect and subscribe // wait for client to connect and subscribe

View File

@ -256,6 +256,7 @@ mod tests {
allow_accounts: true, allow_accounts: true,
allow_accounts_at_startup: false, allow_accounts_at_startup: false,
enable_block_builder: false, enable_block_builder: false,
build_blocks_with_accounts: false,
}; };
let quic_server = QuicServer::new(config).unwrap(); let quic_server = QuicServer::new(config).unwrap();
// wait for client to connect and subscribe // wait for client to connect and subscribe

View File

@ -32,6 +32,8 @@ pub struct ConfigQuicPlugin {
pub allow_accounts_at_startup: bool, pub allow_accounts_at_startup: bool,
#[serde(default = "ConfigQuicPlugin::default_enable_block_builder")] #[serde(default = "ConfigQuicPlugin::default_enable_block_builder")]
pub enable_block_builder: bool, pub enable_block_builder: bool,
#[serde(default)]
pub build_blocks_with_accounts: bool,
} }
impl ConfigQuicPlugin { impl ConfigQuicPlugin {

View File

@ -35,6 +35,7 @@ pub fn main() {
allow_accounts: true, allow_accounts: true,
allow_accounts_at_startup: false, allow_accounts_at_startup: false,
enable_block_builder: true, enable_block_builder: true,
build_blocks_with_accounts: true,
}; };
let quic_server = QuicServer::new(config).unwrap(); let quic_server = QuicServer::new(config).unwrap();

View File

@ -36,6 +36,7 @@ impl GeyserPlugin for QuicGeyserPlugin {
let config = Config::load_from_file(config_file)?; let config = Config::load_from_file(config_file)?;
let compression_type = config.quic_plugin.compression_parameters.compression_type; let compression_type = config.quic_plugin.compression_parameters.compression_type;
let enable_block_builder = config.quic_plugin.enable_block_builder; let enable_block_builder = config.quic_plugin.enable_block_builder;
let build_blocks_with_accounts = config.quic_plugin.build_blocks_with_accounts;
log::info!("Quic plugin config correctly loaded"); log::info!("Quic plugin config correctly loaded");
solana_logger::setup_with_default(&config.quic_plugin.log_level); solana_logger::setup_with_default(&config.quic_plugin.log_level);
let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| { let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| {
@ -47,6 +48,7 @@ impl GeyserPlugin for QuicGeyserPlugin {
rx, rx,
quic_server.data_channel_sender.clone(), quic_server.data_channel_sender.clone(),
compression_type, compression_type,
build_blocks_with_accounts,
); );
self.block_builder_channel = Some(sx); self.block_builder_channel = Some(sx);
} }

View File

@ -34,6 +34,7 @@ pub fn main() -> anyhow::Result<()> {
Filter::TransactionsAll, Filter::TransactionsAll,
Filter::Slot, Filter::Slot,
Filter::BlockMeta, Filter::BlockMeta,
Filter::BlockAll,
])?; ])?;
let quic_config = ConfigQuicPlugin { let quic_config = ConfigQuicPlugin {
@ -57,6 +58,7 @@ pub fn main() -> anyhow::Result<()> {
allow_accounts: true, allow_accounts: true,
allow_accounts_at_startup: false, allow_accounts_at_startup: false,
enable_block_builder: false, enable_block_builder: false,
build_blocks_with_accounts: false,
}; };
let (server_sender, server_reciever) = std::sync::mpsc::channel::<ChannelMessage>(); let (server_sender, server_reciever) = std::sync::mpsc::channel::<ChannelMessage>();