diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs index 7b0163e784..11948e4322 100644 --- a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -79,21 +79,36 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug { fn on_unload(&mut self) {} /// Called when an account is updated at a slot. + #[allow(unused_variables)] fn update_account( &mut self, account: ReplicaAccountInfoVersions, slot: u64, is_startup: bool, - ) -> Result<()>; + ) -> Result<()> { + Ok(()) + } /// Called when all accounts are notified of during startup. - fn notify_end_of_startup(&mut self) -> Result<()>; + fn notify_end_of_startup(&mut self) -> Result<()> { + Ok(()) + } /// Called when a slot status is updated + #[allow(unused_variables)] fn update_slot_status( &mut self, slot: u64, parent: Option, status: SlotStatus, - ) -> Result<()>; + ) -> Result<()> { + Ok(()) + } + + /// Check if the plugin is interested in account data + /// Default is true -- if the plugin is not interested in + /// account data, please return false. + fn to_notify_account_data(&self) -> bool { + true + } } diff --git a/accountsdb-plugin-manager/src/accounts_update_notifier.rs b/accountsdb-plugin-manager/src/accounts_update_notifier.rs index d3a0d85143..c6eef85188 100644 --- a/accountsdb-plugin-manager/src/accounts_update_notifier.rs +++ b/accountsdb-plugin-manager/src/accounts_update_notifier.rs @@ -3,7 +3,7 @@ use { crate::accountsdb_plugin_manager::AccountsDbPluginManager, log::*, solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ - ReplicaAccountInfo, ReplicaAccountInfoVersions, SlotStatus, + ReplicaAccountInfo, ReplicaAccountInfoVersions, }, solana_measure::measure::Measure, solana_metrics::*, @@ -86,18 +86,6 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { ); } } - - fn notify_slot_confirmed(&self, slot: Slot, parent: Option) { - self.notify_slot_status(slot, parent, SlotStatus::Confirmed); - } - - fn notify_slot_processed(&self, slot: Slot, parent: Option) { - self.notify_slot_status(slot, parent, SlotStatus::Processed); - } - - fn notify_slot_rooted(&self, slot: Slot, parent: Option) { - self.notify_slot_status(slot, parent, SlotStatus::Rooted); - } } impl AccountsUpdateNotifierImpl { @@ -189,39 +177,4 @@ impl AccountsUpdateNotifierImpl { 100000 ); } - - pub fn notify_slot_status(&self, slot: Slot, parent: Option, slot_status: SlotStatus) { - let mut plugin_manager = self.plugin_manager.write().unwrap(); - if plugin_manager.plugins.is_empty() { - return; - } - - for plugin in plugin_manager.plugins.iter_mut() { - let mut measure = Measure::start("accountsdb-plugin-update-slot"); - match plugin.update_slot_status(slot, parent, slot_status.clone()) { - Err(err) => { - error!( - "Failed to update slot status at slot {}, error: {} to plugin {}", - slot, - err, - plugin.name() - ) - } - Ok(_) => { - trace!( - "Successfully updated slot status at slot {} to plugin {}", - slot, - plugin.name() - ); - } - } - measure.stop(); - inc_new_counter_debug!( - "accountsdb-plugin-update-slot-us", - measure.as_us() as usize, - 1000, - 1000 - ); - } - } } diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs index a6074362f8..d628814f23 100644 --- a/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs @@ -52,4 +52,14 @@ impl AccountsDbPluginManager { drop(lib); } } + + /// Check if there is any plugin interested in account data + pub fn to_notify_account_data(&self) -> bool { + for plugin in &self.plugins { + if plugin.to_notify_account_data() { + return true; + } + } + false + } } diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs index 0af928838b..2fced0442e 100644 --- a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs @@ -2,7 +2,7 @@ use { crate::{ accounts_update_notifier::AccountsUpdateNotifierImpl, accountsdb_plugin_manager::AccountsDbPluginManager, - slot_status_observer::SlotStatusObserver, + slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver, }, crossbeam_channel::Receiver, log::*, @@ -42,9 +42,9 @@ pub enum AccountsdbPluginServiceError { /// The service managing the AccountsDb plugin workflow. pub struct AccountsDbPluginService { - slot_status_observer: SlotStatusObserver, + slot_status_observer: Option, plugin_manager: Arc>, - accounts_update_notifier: AccountsUpdateNotifier, + accounts_update_notifier: Option, } impl AccountsDbPluginService { @@ -74,13 +74,27 @@ impl AccountsDbPluginService { for accountsdb_plugin_config_file in accountsdb_plugin_config_files { Self::load_plugin(&mut plugin_manager, accountsdb_plugin_config_file)?; } + let to_notify_account_data = plugin_manager.to_notify_account_data(); let plugin_manager = Arc::new(RwLock::new(plugin_manager)); - let accounts_update_notifier = Arc::new(RwLock::new(AccountsUpdateNotifierImpl::new( - plugin_manager.clone(), - ))); - let slot_status_observer = - SlotStatusObserver::new(confirmed_bank_receiver, accounts_update_notifier.clone()); + + let accounts_update_notifier: Option = if to_notify_account_data { + let accounts_update_notifier = AccountsUpdateNotifierImpl::new(plugin_manager.clone()); + Some(Arc::new(RwLock::new(accounts_update_notifier))) + } else { + None + }; + + let slot_status_observer = if to_notify_account_data { + let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); + let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier)); + Some(SlotStatusObserver::new( + confirmed_bank_receiver, + slot_status_notifier, + )) + } else { + None + }; info!("Started AccountsDbPluginService"); Ok(AccountsDbPluginService { @@ -145,12 +159,14 @@ impl AccountsDbPluginService { Ok(()) } - pub fn get_accounts_update_notifier(&self) -> AccountsUpdateNotifier { + pub fn get_accounts_update_notifier(&self) -> Option { self.accounts_update_notifier.clone() } - pub fn join(mut self) -> thread::Result<()> { - self.slot_status_observer.join()?; + pub fn join(self) -> thread::Result<()> { + if let Some(mut slot_status_observer) = self.slot_status_observer { + slot_status_observer.join()?; + } self.plugin_manager.write().unwrap().unload(); Ok(()) } diff --git a/accountsdb-plugin-manager/src/lib.rs b/accountsdb-plugin-manager/src/lib.rs index d2b38b57b3..89c305a29a 100644 --- a/accountsdb-plugin-manager/src/lib.rs +++ b/accountsdb-plugin-manager/src/lib.rs @@ -1,4 +1,5 @@ pub mod accounts_update_notifier; pub mod accountsdb_plugin_manager; pub mod accountsdb_plugin_service; +pub mod slot_status_notifier; pub mod slot_status_observer; diff --git a/accountsdb-plugin-manager/src/slot_status_notifier.rs b/accountsdb-plugin-manager/src/slot_status_notifier.rs new file mode 100644 index 0000000000..f1cc298dd7 --- /dev/null +++ b/accountsdb-plugin-manager/src/slot_status_notifier.rs @@ -0,0 +1,81 @@ +use { + crate::accountsdb_plugin_manager::AccountsDbPluginManager, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::SlotStatus, + solana_measure::measure::Measure, + solana_metrics::*, + solana_sdk::clock::Slot, + std::sync::{Arc, RwLock}, +}; + +pub trait SlotStatusNotifierInterface { + /// Notified when a slot is optimistically confirmed + fn notify_slot_confirmed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is marked frozen. + fn notify_slot_processed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is rooted. + fn notify_slot_rooted(&self, slot: Slot, parent: Option); +} + +pub type SlotStatusNotifier = Arc>; + +pub struct SlotStatusNotifierImpl { + plugin_manager: Arc>, +} + +impl SlotStatusNotifierInterface for SlotStatusNotifierImpl { + fn notify_slot_confirmed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Confirmed); + } + + fn notify_slot_processed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Processed); + } + + fn notify_slot_rooted(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Rooted); + } +} + +impl SlotStatusNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + Self { plugin_manager } + } + + pub fn notify_slot_status(&self, slot: Slot, parent: Option, slot_status: SlotStatus) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter_mut() { + let mut measure = Measure::start("accountsdb-plugin-update-slot"); + match plugin.update_slot_status(slot, parent, slot_status.clone()) { + Err(err) => { + error!( + "Failed to update slot status at slot {}, error: {} to plugin {}", + slot, + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully updated slot status at slot {} to plugin {}", + slot, + plugin.name() + ); + } + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-update-slot-us", + measure.as_us() as usize, + 1000, + 1000 + ); + } + } +} diff --git a/accountsdb-plugin-manager/src/slot_status_observer.rs b/accountsdb-plugin-manager/src/slot_status_observer.rs index 9d3b36879f..bad8fa90ec 100644 --- a/accountsdb-plugin-manager/src/slot_status_observer.rs +++ b/accountsdb-plugin-manager/src/slot_status_observer.rs @@ -1,7 +1,7 @@ use { + crate::slot_status_notifier::SlotStatusNotifier, crossbeam_channel::Receiver, solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, - solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -20,7 +20,7 @@ pub(crate) struct SlotStatusObserver { impl SlotStatusObserver { pub fn new( bank_notification_receiver: Receiver, - accounts_update_notifier: AccountsUpdateNotifier, + slot_status_notifier: SlotStatusNotifier, ) -> Self { let exit_updated_slot_server = Arc::new(AtomicBool::new(false)); @@ -28,7 +28,7 @@ impl SlotStatusObserver { bank_notification_receiver_service: Some(Self::run_bank_notification_receiver( bank_notification_receiver, exit_updated_slot_server.clone(), - accounts_update_notifier, + slot_status_notifier, )), exit_updated_slot_server, } @@ -45,7 +45,7 @@ impl SlotStatusObserver { fn run_bank_notification_receiver( bank_notification_receiver: Receiver, exit: Arc, - accounts_update_notifier: AccountsUpdateNotifier, + slot_status_notifier: SlotStatusNotifier, ) -> JoinHandle<()> { Builder::new() .name("bank_notification_receiver".to_string()) @@ -54,19 +54,19 @@ impl SlotStatusObserver { if let Ok(slot) = bank_notification_receiver.recv() { match slot { BankNotification::OptimisticallyConfirmed(slot) => { - accounts_update_notifier + slot_status_notifier .read() .unwrap() .notify_slot_confirmed(slot, None); } BankNotification::Frozen(bank) => { - accounts_update_notifier + slot_status_notifier .read() .unwrap() .notify_slot_processed(bank.slot(), Some(bank.parent_slot())); } BankNotification::Root(bank) => { - accounts_update_notifier + slot_status_notifier .read() .unwrap() .notify_slot_rooted(bank.slot(), Some(bank.parent_slot())); diff --git a/accountsdb-plugin-postgres/src/accounts_selector.rs b/accountsdb-plugin-postgres/src/accounts_selector.rs index 91c669f70a..77398b76c1 100644 --- a/accountsdb-plugin-postgres/src/accounts_selector.rs +++ b/accountsdb-plugin-postgres/src/accounts_selector.rs @@ -48,6 +48,11 @@ impl AccountsSelector { pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool { self.select_all_accounts || self.accounts.contains(account) || self.owners.contains(owner) } + + /// Check if any account is of interested at all + pub fn is_enabled(&self) -> bool { + self.select_all_accounts || !self.accounts.is_empty() || !self.owners.is_empty() + } } #[cfg(test)] diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs index 6d9a4248cc..165f971e67 100644 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -276,6 +276,15 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { } Ok(()) } + + /// Check if the plugin is interested in account data + /// Default is true -- if the plugin is not interested in + /// account data, please return false. + fn to_notify_account_data(&self) -> bool { + self.accounts_selector + .as_ref() + .map_or_else(|| false, |selector| selector.is_enabled()) + } } impl AccountsDbPluginPostgres { diff --git a/core/src/validator.rs b/core/src/validator.rs index 064c4ec428..5c22280eaf 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -414,6 +414,17 @@ impl Validator { let accounts_package_channel = channel(); + let accounts_update_notifier = + accountsdb_plugin_service + .as_ref() + .and_then(|accountsdb_plugin_service| { + accountsdb_plugin_service.get_accounts_update_notifier() + }); + info!( + "AccountsDb plugin: accounts_update_notifier: {}", + accounts_update_notifier.is_some() + ); + let ( genesis_config, bank_forks, @@ -444,9 +455,7 @@ impl Validator { &start_progress, config.no_poh_speed_test, accounts_package_channel.0.clone(), - accountsdb_plugin_service - .as_ref() - .map(|plugin_service| plugin_service.get_accounts_update_notifier()), + accounts_update_notifier, ); *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; diff --git a/runtime/src/accounts_db/accountsdb_plugin_utils.rs b/runtime/src/accounts_db/accountsdb_plugin_utils.rs index f57894ffdb..6a37684574 100644 --- a/runtime/src/accounts_db/accountsdb_plugin_utils.rs +++ b/runtime/src/accounts_db/accountsdb_plugin_utils.rs @@ -202,15 +202,6 @@ pub mod tests { .push((slot, account.clone_account())); } - /// Notified when a slot is optimistically confirmed - fn notify_slot_confirmed(&self, _slot: Slot, _parent: Option) {} - - /// Notified when a slot is marked frozen. - fn notify_slot_processed(&self, _slot: Slot, _parent: Option) {} - - /// Notified when a slot is rooted. - fn notify_slot_rooted(&self, _slot: Slot, _parent: Option) {} - fn notify_end_of_restore_from_snapshot(&self) { self.is_startup_done.store(true, Ordering::Relaxed); } diff --git a/runtime/src/accounts_update_notifier_interface.rs b/runtime/src/accounts_update_notifier_interface.rs index b42c6d5c22..18ccd58f43 100644 --- a/runtime/src/accounts_update_notifier_interface.rs +++ b/runtime/src/accounts_update_notifier_interface.rs @@ -14,15 +14,6 @@ pub trait AccountsUpdateNotifierInterface: std::fmt::Debug { /// Notified when all accounts have been notified when restoring from a snapshot. fn notify_end_of_restore_from_snapshot(&self); - - /// Notified when a slot is optimistically confirmed - fn notify_slot_confirmed(&self, slot: Slot, parent: Option); - - /// Notified when a slot is marked frozen. - fn notify_slot_processed(&self, slot: Slot, parent: Option); - - /// Notified when a slot is rooted. - fn notify_slot_rooted(&self, slot: Slot, parent: Option); } pub type AccountsUpdateNotifier = Arc>;