reenable block building code, avoid droping init accounts in filters

This commit is contained in:
Godmode Galactus 2024-11-14 11:13:06 +01:00
parent 7c2a6ee38f
commit 36f1aeba92
No known key found for this signature in database
GPG Key ID: A6B75566742EA987
7 changed files with 34 additions and 35 deletions

1
Cargo.lock generated
View File

@ -2683,6 +2683,7 @@ dependencies = [
"bincode",
"itertools 0.10.5",
"log",
"mio_channel",
"quic-geyser-common",
"quic-geyser-server",
"rand 0.8.5",

View File

@ -10,6 +10,7 @@ edition = "2021"
solana-sdk = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
mio_channel = {workspace = true}
quic-geyser-common = { workspace = true }
bincode = { workspace = true }

View File

@ -1,6 +1,6 @@
use std::{
collections::{BTreeMap, HashMap},
sync::mpsc::{Receiver, Sender},
sync::mpsc::Receiver,
};
use itertools::Itertools;
@ -16,7 +16,7 @@ use solana_sdk::pubkey::Pubkey;
pub fn start_block_building_thread(
channel_messages: Receiver<ChannelMessage>,
output: Sender<ChannelMessage>,
output: mio_channel::Sender<ChannelMessage>,
compression_type: CompressionType,
build_blocks_with_accounts: bool,
) {
@ -39,7 +39,7 @@ struct PartialBlock {
pub fn build_blocks(
channel_messages: Receiver<ChannelMessage>,
output: Sender<ChannelMessage>,
output: mio_channel::Sender<ChannelMessage>,
compression_type: CompressionType,
build_blocks_with_accounts: bool,
) {
@ -169,7 +169,7 @@ pub fn build_blocks(
fn dispatch_partial_block(
partial_blocks: &mut BTreeMap<u64, PartialBlock>,
slot: u64,
output: &Sender<ChannelMessage>,
output: &mio_channel::Sender<ChannelMessage>,
compression_type: CompressionType,
) {
if let Some(dispatched_partial_block) = partial_blocks.remove(&slot) {

View File

@ -32,7 +32,7 @@ use crate::block_builder::start_block_building_thread;
#[test]
fn test_block_creation_transactions_after_blockmeta() {
let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel();
let (ms_sx, msg_rx) = mio_channel::channel();
start_block_building_thread(
cm_rx,
ms_sx,
@ -231,7 +231,8 @@ fn test_block_creation_transactions_after_blockmeta() {
.send(ChannelMessage::Transaction(Box::new(tx3.clone())))
.unwrap();
let block_message = msg_rx.recv().unwrap();
sleep(Duration::from_millis(1));
let block_message = msg_rx.try_recv().unwrap();
let ChannelMessage::Block(block) = block_message else {
unreachable!();
};
@ -260,7 +261,7 @@ fn test_block_creation_transactions_after_blockmeta() {
#[test]
fn test_block_creation_blockmeta_after_transactions() {
let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel();
let (ms_sx, msg_rx) = mio_channel::channel();
start_block_building_thread(
cm_rx,
ms_sx,
@ -460,7 +461,8 @@ fn test_block_creation_blockmeta_after_transactions() {
.send(ChannelMessage::BlockMeta(block_meta.clone()))
.unwrap();
let block_message = msg_rx.recv().unwrap();
sleep(Duration::from_millis(1));
let block_message = msg_rx.try_recv().unwrap();
let ChannelMessage::Block(block) = block_message else {
unreachable!();
};
@ -489,7 +491,7 @@ fn test_block_creation_blockmeta_after_transactions() {
#[test]
fn test_block_creation_incomplete_block_after_slot_notification() {
let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel();
let (ms_sx, msg_rx) = mio_channel::channel();
start_block_building_thread(
cm_rx,
ms_sx,
@ -688,7 +690,8 @@ fn test_block_creation_incomplete_block_after_slot_notification() {
.send(ChannelMessage::Transaction(Box::new(tx3.clone())))
.unwrap();
let block_message = msg_rx.recv().unwrap();
sleep(Duration::from_millis(1));
let block_message = msg_rx.try_recv().unwrap();
let ChannelMessage::Block(block) = block_message else {
unreachable!();
};
@ -717,7 +720,7 @@ fn test_block_creation_incomplete_block_after_slot_notification() {
#[test]
fn test_block_creation_incomplete_slot() {
let (channelmsg_sx, cm_rx) = channel();
let (ms_sx, msg_rx) = channel();
let (ms_sx, msg_rx) = mio_channel::channel();
start_block_building_thread(
cm_rx,
ms_sx,

View File

@ -25,8 +25,7 @@ impl Filter {
match &self {
Filter::Account(account) => account.allows(message),
Filter::AccountsAll => match message {
ChannelMessage::Account(account, _, init) => {
!init &&
ChannelMessage::Account(account, _, _init) => {
account.account.owner != solana_program::vote::program::ID // does not belong to vote program
&& account.account.owner != solana_program::stake::program::ID
// does not belong to stake program
@ -86,10 +85,7 @@ pub struct AccountFilter {
impl AccountFilter {
pub fn allows(&self, message: &ChannelMessage) -> bool {
if let ChannelMessage::Account(account, _, init) = message {
if *init {
return false;
}
if let ChannelMessage::Account(account, _, _init) = message {
if let Some(owner) = self.owner {
if owner == account.account.owner {
// to do move the filtering somewhere else because here we need to decode the account data
@ -228,7 +224,7 @@ mod tests {
filters: None,
};
assert_eq!(f1.allows(&msg_0), false);
assert_eq!(f1.allows(&msg_0), true);
assert_eq!(f1.allows(&msg_1), true);
assert_eq!(f1.allows(&msg_2), true);
assert_eq!(f1.allows(&msg_3), false);
@ -250,7 +246,7 @@ mod tests {
accounts: None,
filters: Some(vec![AccountFilterType::Datasize(10)]),
};
assert_eq!(f3.allows(&msg_0), false);
assert_eq!(f3.allows(&msg_0), true);
assert_eq!(f3.allows(&msg_1), true);
assert_eq!(f3.allows(&msg_2), true);
assert_eq!(f3.allows(&msg_3), false);
@ -264,7 +260,7 @@ mod tests {
data: crate::filters::MemcmpFilterData::Bytes(vec![3, 4, 5]),
})]),
};
assert_eq!(f4.allows(&msg_0), false);
assert_eq!(f4.allows(&msg_0), true);
assert_eq!(f4.allows(&msg_1), true);
assert_eq!(f4.allows(&msg_2), false);
assert_eq!(f4.allows(&msg_3), false);

View File

@ -1,5 +1,5 @@
{
"libpath": "target/debug/libquic_geyser_plugin.so",
"libpath": "target/release/libquic_geyser_plugin.so",
"quic_plugin": {
"address": "[::]:10800",
"compression_parameters": {
@ -7,8 +7,5 @@
"Lz4Fast": 8
}
}
},
"rpc_server" : {
"enable" : false
}
}

View File

@ -3,6 +3,7 @@ use agave_geyser_plugin_interface::geyser_plugin_interface::{
GeyserPlugin, GeyserPluginError, ReplicaAccountInfoVersions, ReplicaBlockInfoVersions,
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult, SlotStatus,
};
use quic_geyser_block_builder::block_builder::start_block_building_thread;
use quic_geyser_common::{
channel_message::{AccountData, ChannelMessage},
plugin_error::QuicGeyserError,
@ -33,24 +34,24 @@ impl GeyserPlugin for QuicGeyserPlugin {
fn on_load(&mut self, config_file: &str, _is_reload: bool) -> PluginResult<()> {
log::info!("loading quic_geyser plugin");
let config = Config::load_from_file(config_file)?;
// let compression_type = config.quic_plugin.compression_parameters.compression_type;
let compression_type = config.quic_plugin.compression_parameters.compression_type;
let enable_block_builder = config.quic_plugin.enable_block_builder;
// let build_blocks_with_accounts = config.quic_plugin.build_blocks_with_accounts;
let build_blocks_with_accounts = config.quic_plugin.build_blocks_with_accounts;
log::info!("Quic plugin config correctly loaded");
solana_logger::setup_with_default(&config.quic_plugin.log_level);
let quic_server = QuicServer::new(config.quic_plugin).map_err(|_| {
GeyserPluginError::Custom(Box::new(QuicGeyserError::ErrorConfiguringServer))
})?;
if enable_block_builder {
// // disable block building for now
// let (sx, rx) = std::sync::mpsc::channel();
// start_block_building_thread(
// rx,
// quic_server.data_channel_sender.clone(),
// compression_type,
// build_blocks_with_accounts,
// );
// self.block_builder_channel = Some(sx);
// disable block building for now
let (sx, rx) = std::sync::mpsc::channel();
start_block_building_thread(
rx,
quic_server.data_channel_sender.clone(),
compression_type,
build_blocks_with_accounts,
);
self.block_builder_channel = Some(sx);
}
self.quic_server = Some(quic_server);