From f14928a970b03334a18450e43e5b750f23a7c474 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 29 Dec 2021 15:12:01 -0800 Subject: [PATCH] Stream additional block metadata via plugin (#22023) * Stream additional block metadata through plugin blockhash, block_height, block_time, rewards are streamed --- .../src/accountsdb_plugin_interface.rs | 23 +++- .../src/accountsdb_plugin_service.rs | 31 ++++-- .../src/block_metadata_notifier.rs | 105 ++++++++++++++++++ .../src/block_metadata_notifier_interface.rs | 20 ++++ accountsdb-plugin-manager/src/lib.rs | 2 + .../scripts/create_schema.sql | 10 ++ .../scripts/drop_schema.sql | 1 + .../src/accountsdb_plugin_postgres.rs | 27 ++++- .../src/postgres_client.rs | 53 ++++++++- .../postgres_client_block_metadata.rs | 97 ++++++++++++++++ core/src/replay_stage.rs | 14 +++ core/src/tvu.rs | 4 + core/src/validator.rs | 8 ++ 13 files changed, 383 insertions(+), 12 deletions(-) create mode 100644 accountsdb-plugin-manager/src/block_metadata_notifier.rs create mode 100644 accountsdb-plugin-manager/src/block_metadata_notifier_interface.rs create mode 100644 accountsdb-plugin-postgres/src/postgres_client/postgres_client_block_metadata.rs diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs index 95e1d221fc..0f4fffb70e 100644 --- a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -3,8 +3,8 @@ /// In addition, the dynamic library must export a "C" function _create_plugin which /// creates the implementation of the plugin. use { - solana_sdk::{signature::Signature, transaction::SanitizedTransaction}, - solana_transaction_status::TransactionStatusMeta, + solana_sdk::{clock::UnixTimestamp, signature::Signature, transaction::SanitizedTransaction}, + solana_transaction_status::{Reward, TransactionStatusMeta}, std::{any::Any, error, io}, thiserror::Error, }; @@ -60,6 +60,19 @@ pub enum ReplicaTransactionInfoVersions<'a> { V0_0_1(&'a ReplicaTransactionInfo<'a>), } +#[derive(Clone, Debug)] +pub struct ReplicaBlockInfo<'a> { + pub slot: u64, + pub blockhash: &'a str, + pub rewards: &'a [Reward], + pub block_time: Option, + pub block_height: Option, +} + +pub enum ReplicaBlockInfoVersions<'a> { + V0_0_1(&'a ReplicaBlockInfo<'a>), +} + /// Errors returned by plugin calls #[derive(Error, Debug)] pub enum AccountsDbPluginError { @@ -173,6 +186,12 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug { Ok(()) } + /// Called when block's metadata is updated. + #[allow(unused_variables)] + fn notify_block_metadata(&mut self, blockinfo: ReplicaBlockInfoVersions) -> Result<()> { + Ok(()) + } + /// Check if the plugin is interested in account data /// Default is true -- if the plugin is not interested in /// account data, please return false. diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs index fefaa8c1b8..25c74bf262 100644 --- a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs @@ -2,6 +2,8 @@ use { crate::{ accounts_update_notifier::AccountsUpdateNotifierImpl, accountsdb_plugin_manager::AccountsDbPluginManager, + block_metadata_notifier::BlockMetadataNotifierImpl, + block_metadata_notifier_interface::BlockMetadataNotifierLock, slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver, transaction_notifier::TransactionNotifierImpl, }, @@ -50,6 +52,7 @@ pub struct AccountsDbPluginService { plugin_manager: Arc>, accounts_update_notifier: Option, transaction_notifier: Option, + block_metadata_notifier: Option, } impl AccountsDbPluginService { @@ -102,17 +105,24 @@ impl AccountsDbPluginService { None }; - let slot_status_observer = - if account_data_notifications_enabled || transaction_notifications_enabled { - let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); - let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier)); + let (slot_status_observer, block_metadata_notifier): ( + Option, + Option, + ) = if account_data_notifications_enabled || transaction_notifications_enabled { + let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); + let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier)); + ( Some(SlotStatusObserver::new( confirmed_bank_receiver, slot_status_notifier, - )) - } else { - None - }; + )), + Some(Arc::new(RwLock::new(BlockMetadataNotifierImpl::new( + plugin_manager.clone(), + )))), + ) + } else { + (None, None) + }; info!("Started AccountsDbPluginService"); Ok(AccountsDbPluginService { @@ -120,6 +130,7 @@ impl AccountsDbPluginService { plugin_manager, accounts_update_notifier, transaction_notifier, + block_metadata_notifier, }) } @@ -186,6 +197,10 @@ impl AccountsDbPluginService { self.transaction_notifier.clone() } + pub fn get_block_metadata_notifier(&self) -> Option { + self.block_metadata_notifier.clone() + } + pub fn join(self) -> thread::Result<()> { if let Some(mut slot_status_observer) = self.slot_status_observer { slot_status_observer.join()?; diff --git a/accountsdb-plugin-manager/src/block_metadata_notifier.rs b/accountsdb-plugin-manager/src/block_metadata_notifier.rs new file mode 100644 index 0000000000..8291e9f038 --- /dev/null +++ b/accountsdb-plugin-manager/src/block_metadata_notifier.rs @@ -0,0 +1,105 @@ +use { + crate::{ + accountsdb_plugin_manager::AccountsDbPluginManager, + block_metadata_notifier_interface::BlockMetadataNotifier, + }, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + ReplicaBlockInfo, ReplicaBlockInfoVersions, + }, + solana_measure::measure::Measure, + solana_metrics::*, + solana_runtime::bank::RewardInfo, + solana_sdk::{clock::UnixTimestamp, pubkey::Pubkey}, + solana_transaction_status::{Reward, Rewards}, + std::sync::{Arc, RwLock}, +}; + +pub(crate) struct BlockMetadataNotifierImpl { + plugin_manager: Arc>, +} + +impl BlockMetadataNotifier for BlockMetadataNotifierImpl { + /// Notify the block metadata + fn notify_block_metadata( + &self, + slot: u64, + blockhash: &str, + rewards: &RwLock>, + block_time: Option, + block_height: Option, + ) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + if plugin_manager.plugins.is_empty() { + return; + } + let rewards = Self::build_rewards(rewards); + + for plugin in plugin_manager.plugins.iter_mut() { + let mut measure = Measure::start("accountsdb-plugin-update-slot"); + let block_info = + Self::build_replica_block_info(slot, blockhash, &rewards, block_time, block_height); + let block_info = ReplicaBlockInfoVersions::V0_0_1(&block_info); + match plugin.notify_block_metadata(block_info) { + Err(err) => { + error!( + "Failed to update block metadata at slot {}, error: {} to plugin {}", + slot, + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully updated block metadata at slot {} to plugin {}", + slot, + plugin.name() + ); + } + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-update-block-metadata-us", + measure.as_us() as usize, + 1000, + 1000 + ); + } + } +} + +impl BlockMetadataNotifierImpl { + fn build_rewards(rewards: &RwLock>) -> Rewards { + let rewards = rewards.read().unwrap(); + rewards + .iter() + .map(|(pubkey, reward)| Reward { + pubkey: pubkey.to_string(), + lamports: reward.lamports, + post_balance: reward.post_balance, + reward_type: Some(reward.reward_type), + commission: reward.commission, + }) + .collect() + } + + fn build_replica_block_info<'a>( + slot: u64, + blockhash: &'a str, + rewards: &'a [Reward], + block_time: Option, + block_height: Option, + ) -> ReplicaBlockInfo<'a> { + ReplicaBlockInfo { + slot, + blockhash, + rewards, + block_time, + block_height, + } + } + + pub fn new(plugin_manager: Arc>) -> Self { + Self { plugin_manager } + } +} diff --git a/accountsdb-plugin-manager/src/block_metadata_notifier_interface.rs b/accountsdb-plugin-manager/src/block_metadata_notifier_interface.rs new file mode 100644 index 0000000000..6d4b9f6ad2 --- /dev/null +++ b/accountsdb-plugin-manager/src/block_metadata_notifier_interface.rs @@ -0,0 +1,20 @@ +use { + solana_runtime::bank::RewardInfo, + solana_sdk::{clock::UnixTimestamp, pubkey::Pubkey}, + std::sync::{Arc, RwLock}, +}; + +/// Interface for notifying block metadata changes +pub trait BlockMetadataNotifier { + /// Notify the block metadata + fn notify_block_metadata( + &self, + slot: u64, + blockhash: &str, + rewards: &RwLock>, + block_time: Option, + block_height: Option, + ); +} + +pub type BlockMetadataNotifierLock = Arc>; diff --git a/accountsdb-plugin-manager/src/lib.rs b/accountsdb-plugin-manager/src/lib.rs index e12484eb05..5af39e804e 100644 --- a/accountsdb-plugin-manager/src/lib.rs +++ b/accountsdb-plugin-manager/src/lib.rs @@ -1,6 +1,8 @@ pub mod accounts_update_notifier; pub mod accountsdb_plugin_manager; pub mod accountsdb_plugin_service; +pub mod block_metadata_notifier; +pub mod block_metadata_notifier_interface; pub mod slot_status_notifier; pub mod slot_status_observer; pub mod transaction_notifier; diff --git a/accountsdb-plugin-postgres/scripts/create_schema.sql b/accountsdb-plugin-postgres/scripts/create_schema.sql index d62c7e31ab..994a2176cf 100644 --- a/accountsdb-plugin-postgres/scripts/create_schema.sql +++ b/accountsdb-plugin-postgres/scripts/create_schema.sql @@ -153,6 +153,16 @@ CREATE TABLE transaction ( CONSTRAINT transaction_pk PRIMARY KEY (slot, signature) ); +-- The table storing block metadata +CREATE TABLE block ( + slot BIGINT PRIMARY KEY, + blockhash VARCHAR(44), + rewards "Reward"[], + block_time BIGINT, + block_height BIGINT, + updated_on TIMESTAMP NOT NULL +); + /** * The following is for keeping historical data for accounts and is not required for plugin to work. */ diff --git a/accountsdb-plugin-postgres/scripts/drop_schema.sql b/accountsdb-plugin-postgres/scripts/drop_schema.sql index e5b756870d..448564f933 100644 --- a/accountsdb-plugin-postgres/scripts/drop_schema.sql +++ b/accountsdb-plugin-postgres/scripts/drop_schema.sql @@ -8,6 +8,7 @@ DROP TABLE account_audit; DROP TABLE account; DROP TABLE slot; DROP TABLE transaction; +DROP TABLE block; DROP TYPE "TransactionError" CASCADE; DROP TYPE "TransactionErrorCode" CASCADE; diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs index cecdbb6fa5..ede9b462e8 100644 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -12,7 +12,7 @@ use { serde_json, solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, - ReplicaTransactionInfoVersions, Result, SlotStatus, + ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, Result, SlotStatus, }, solana_metrics::*, std::{fs::File, io::Read}, @@ -334,6 +334,31 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { Ok(()) } + fn notify_block_metadata(&mut self, block_info: ReplicaBlockInfoVersions) -> Result<()> { + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database.".to_string(), + }, + ))); + } + Some(client) => match block_info { + ReplicaBlockInfoVersions::V0_0_1(block_info) => { + let result = client.update_block_metadata(block_info); + + if let Err(err) = result { + return Err(AccountsDbPluginError::SlotStatusUpdateError{ + msg: format!("Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", err) + }); + } + } + }, + } + + Ok(()) + } + /// Check if the plugin is interested in account data /// Default is true -- if the plugin is not interested in /// account data, please return false. diff --git a/accountsdb-plugin-postgres/src/postgres_client.rs b/accountsdb-plugin-postgres/src/postgres_client.rs index 73d708515e..a79ab91b58 100644 --- a/accountsdb-plugin-postgres/src/postgres_client.rs +++ b/accountsdb-plugin-postgres/src/postgres_client.rs @@ -1,4 +1,6 @@ #![allow(clippy::integer_arithmetic)] + +mod postgres_client_block_metadata; mod postgres_client_transaction; /// A concurrent implementation for writing accounts into the PostgreSQL in parallel. @@ -10,9 +12,10 @@ use { crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}, log::*, postgres::{Client, NoTls, Statement}, + postgres_client_block_metadata::DbBlockInfo, postgres_client_transaction::LogTransactionRequest, solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ - AccountsDbPluginError, ReplicaAccountInfo, SlotStatus, + AccountsDbPluginError, ReplicaAccountInfo, ReplicaBlockInfo, SlotStatus, }, solana_measure::measure::Measure, solana_metrics::*, @@ -44,6 +47,7 @@ struct PostgresSqlClientWrapper { update_slot_with_parent_stmt: Statement, update_slot_without_parent_stmt: Statement, update_transaction_log_stmt: Statement, + update_block_metadata_stmt: Statement, } pub struct SimplePostgresClient { @@ -195,6 +199,11 @@ pub trait PostgresClient { &mut self, transaction_log_info: LogTransactionRequest, ) -> Result<(), AccountsDbPluginError>; + + fn update_block_metadata( + &mut self, + block_info: UpdateBlockMetadataRequest, + ) -> Result<(), AccountsDbPluginError>; } impl SimplePostgresClient { @@ -501,6 +510,8 @@ impl SimplePostgresClient { Self::build_slot_upsert_statement_without_parent(&mut client, config)?; let update_transaction_log_stmt = Self::build_transaction_info_upsert_statement(&mut client, config)?; + let update_block_metadata_stmt = + Self::build_block_metadata_upsert_statement(&mut client, config)?; let batch_size = config .batch_size @@ -516,6 +527,7 @@ impl SimplePostgresClient { update_slot_with_parent_stmt, update_slot_without_parent_stmt, update_transaction_log_stmt, + update_block_metadata_stmt, }), }) } @@ -591,6 +603,13 @@ impl PostgresClient for SimplePostgresClient { ) -> Result<(), AccountsDbPluginError> { self.log_transaction_impl(transaction_log_info) } + + fn update_block_metadata( + &mut self, + block_info: UpdateBlockMetadataRequest, + ) -> Result<(), AccountsDbPluginError> { + self.update_block_metadata_impl(block_info) + } } struct UpdateAccountRequest { @@ -604,11 +623,16 @@ struct UpdateSlotRequest { slot_status: SlotStatus, } +pub struct UpdateBlockMetadataRequest { + pub block_info: DbBlockInfo, +} + #[warn(clippy::large_enum_variant)] enum DbWorkItem { UpdateAccount(Box), UpdateSlot(Box), LogTransaction(Box), + UpdateBlockMetadata(Box), } impl PostgresClientWorker { @@ -677,6 +701,14 @@ impl PostgresClientWorker { } } } + DbWorkItem::UpdateBlockMetadata(block_info) => { + if let Err(err) = self.client.update_block_metadata(*block_info) { + error!("Failed to update block metadata: ({})", err); + if panic_on_db_errors { + abort(); + } + } + } }, Err(err) => match err { RecvTimeoutError::Timeout => { @@ -868,6 +900,25 @@ impl ParallelPostgresClient { Ok(()) } + pub fn update_block_metadata( + &mut self, + block_info: &ReplicaBlockInfo, + ) -> Result<(), AccountsDbPluginError> { + if let Err(err) = self.sender.send(DbWorkItem::UpdateBlockMetadata(Box::new( + UpdateBlockMetadataRequest { + block_info: DbBlockInfo::from(block_info), + }, + ))) { + return Err(AccountsDbPluginError::SlotStatusUpdateError { + msg: format!( + "Failed to update the block metadata at slot {:?}, error: {:?}", + block_info.slot, err + ), + }); + } + Ok(()) + } + pub fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> { info!("Notifying the end of startup"); // Ensure all items in the queue has been received by the workers diff --git a/accountsdb-plugin-postgres/src/postgres_client/postgres_client_block_metadata.rs b/accountsdb-plugin-postgres/src/postgres_client/postgres_client_block_metadata.rs new file mode 100644 index 0000000000..a882e6767c --- /dev/null +++ b/accountsdb-plugin-postgres/src/postgres_client/postgres_client_block_metadata.rs @@ -0,0 +1,97 @@ +use { + crate::{ + accountsdb_plugin_postgres::{ + AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError, + }, + postgres_client::{ + postgres_client_transaction::DbReward, SimplePostgresClient, UpdateBlockMetadataRequest, + }, + }, + chrono::Utc, + log::*, + postgres::{Client, Statement}, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + AccountsDbPluginError, ReplicaBlockInfo, + }, +}; + +#[derive(Clone, Debug)] +pub struct DbBlockInfo { + pub slot: i64, + pub blockhash: String, + pub rewards: Vec, + pub block_time: Option, + pub block_height: Option, +} + +impl<'a> From<&ReplicaBlockInfo<'a>> for DbBlockInfo { + fn from(block_info: &ReplicaBlockInfo) -> Self { + Self { + slot: block_info.slot as i64, + blockhash: block_info.blockhash.to_string(), + rewards: block_info.rewards.iter().map(DbReward::from).collect(), + block_time: block_info.block_time, + block_height: block_info + .block_height + .map(|block_height| block_height as i64), + } + } +} + +impl SimplePostgresClient { + pub(crate) fn build_block_metadata_upsert_statement( + client: &mut Client, + config: &AccountsDbPluginPostgresConfig, + ) -> Result { + let stmt = + "INSERT INTO block (slot, blockhash, rewards, block_time, block_height, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6)"; + + let stmt = client.prepare(stmt); + + match stmt { + Err(err) => { + return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the block metadata update PostgreSQL database: ({}) host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, config + ), + }))); + } + Ok(stmt) => Ok(stmt), + } + } + + pub(crate) fn update_block_metadata_impl( + &mut self, + block_info: UpdateBlockMetadataRequest, + ) -> Result<(), AccountsDbPluginError> { + let client = self.client.get_mut().unwrap(); + let statement = &client.update_block_metadata_stmt; + let client = &mut client.client; + let updated_on = Utc::now().naive_utc(); + + let block_info = block_info.block_info; + let result = client.query( + statement, + &[ + &block_info.slot, + &block_info.blockhash, + &block_info.rewards, + &block_info.block_time, + &block_info.block_height, + &updated_on, + ], + ); + + if let Err(err) = result { + let msg = format!( + "Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", + err); + error!("{}", msg); + return Err(AccountsDbPluginError::AccountsUpdateError { msg }); + } + + Ok(()) + } +} diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 14f3ced263..72c177b396 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -26,6 +26,7 @@ use { voting_service::VoteOp, window_service::DuplicateSlotReceiver, }, + solana_accountsdb_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, solana_client::rpc_response::SlotUpdate, solana_entry::entry::VerifyRecyclers, solana_gossip::cluster_info::ClusterInfo, @@ -335,6 +336,7 @@ impl ReplayStage { cost_update_sender: Sender, voting_sender: Sender, drop_bank_sender: Sender>>, + block_metadata_notifier: Option, ) -> Self { let ReplayStageConfig { vote_account, @@ -440,6 +442,7 @@ impl ReplayStage { &cost_update_sender, &mut duplicate_slots_to_repair, &ancestor_hashes_replay_update_sender, + block_metadata_notifier.clone(), ); replay_active_banks_time.stop(); @@ -2042,6 +2045,7 @@ impl ReplayStage { cost_update_sender: &Sender, duplicate_slots_to_repair: &mut DuplicateSlotsToRepair, ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, + block_metadata_notifier: Option, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -2197,6 +2201,16 @@ impl ReplayStage { } } Self::record_rewards(&bank, rewards_recorder_sender); + if let Some(ref block_metadata_notifier) = block_metadata_notifier { + let block_metadata_notifier = block_metadata_notifier.read().unwrap(); + block_metadata_notifier.notify_block_metadata( + bank.slot(), + &bank.last_blockhash().to_string(), + &bank.rewards, + Some(bank.clock().unix_timestamp), + Some(bank.block_height()), + ) + } } else { trace!( "bank {} not completed tick_height: {}, max_tick_height: {}", diff --git a/core/src/tvu.rs b/core/src/tvu.rs index f04b5300ea..6b516f2fbe 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -26,6 +26,7 @@ use { voting_service::VotingService, }, crossbeam_channel::unbounded, + solana_accountsdb_plugin_manager::block_metadata_notifier_interface::BlockMetadataNotifierLock, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ blockstore::Blockstore, blockstore_processor::TransactionStatusSender, @@ -143,6 +144,7 @@ impl Tvu { cost_model: &Arc>, accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver), last_full_snapshot_slot: Option, + block_metadata_notifier: Option, ) -> Self { let Sockets { repair: repair_socket, @@ -333,6 +335,7 @@ impl Tvu { cost_update_sender, voting_sender, drop_bank_sender, + block_metadata_notifier, ); let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -501,6 +504,7 @@ pub mod tests { &Arc::new(RwLock::new(CostModel::default())), accounts_package_channel, None, + None, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index cc4b29ae78..447ca9f12f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -427,6 +427,13 @@ impl Validator { accountsdb_plugin_service.get_transaction_notifier() }); + let block_metadata_notifier = + accountsdb_plugin_service + .as_ref() + .and_then(|accountsdb_plugin_service| { + accountsdb_plugin_service.get_block_metadata_notifier() + }); + info!( "AccountsDb plugin: accounts_update_notifier: {} transaction_notifier: {}", accounts_update_notifier.is_some(), @@ -884,6 +891,7 @@ impl Validator { &cost_model, accounts_package_channel, last_full_snapshot_slot, + block_metadata_notifier, ); let tpu = Tpu::new(