From 89c45a57f8b0a7d337e764ff19e97460002f769a Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Wed, 17 Nov 2021 17:11:38 -0800 Subject: [PATCH] Refactor slot status notification to decouple from accounts notifications (#21308) Problem Slot status can be used of in other scenarios in addition to account information such as transactions, blocks. The current implementation is too tightly coupled. Summary of Changes Decouple the slot status notification from accounts notification. Created a new slot status notification module. --- .../src/accountsdb_plugin_interface.rs | 21 ++++- .../src/accounts_update_notifier.rs | 49 +---------- .../src/accountsdb_plugin_manager.rs | 10 +++ .../src/accountsdb_plugin_service.rs | 38 ++++++--- accountsdb-plugin-manager/src/lib.rs | 1 + .../src/slot_status_notifier.rs | 81 +++++++++++++++++++ .../src/slot_status_observer.rs | 14 ++-- .../src/accounts_selector.rs | 5 ++ .../src/accountsdb_plugin_postgres.rs | 9 +++ core/src/validator.rs | 15 +++- .../accounts_db/accountsdb_plugin_utils.rs | 9 --- .../src/accounts_update_notifier_interface.rs | 9 --- 12 files changed, 171 insertions(+), 90 deletions(-) create mode 100644 accountsdb-plugin-manager/src/slot_status_notifier.rs diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs index 7b0163e784..11948e4322 100644 --- a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -79,21 +79,36 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug { fn on_unload(&mut self) {} /// Called when an account is updated at a slot. + #[allow(unused_variables)] fn update_account( &mut self, account: ReplicaAccountInfoVersions, slot: u64, is_startup: bool, - ) -> Result<()>; + ) -> Result<()> { + Ok(()) + } /// Called when all accounts are notified of during startup. - fn notify_end_of_startup(&mut self) -> Result<()>; + fn notify_end_of_startup(&mut self) -> Result<()> { + Ok(()) + } /// Called when a slot status is updated + #[allow(unused_variables)] fn update_slot_status( &mut self, slot: u64, parent: Option, status: SlotStatus, - ) -> Result<()>; + ) -> Result<()> { + Ok(()) + } + + /// Check if the plugin is interested in account data + /// Default is true -- if the plugin is not interested in + /// account data, please return false. + fn to_notify_account_data(&self) -> bool { + true + } } diff --git a/accountsdb-plugin-manager/src/accounts_update_notifier.rs b/accountsdb-plugin-manager/src/accounts_update_notifier.rs index d3a0d85143..c6eef85188 100644 --- a/accountsdb-plugin-manager/src/accounts_update_notifier.rs +++ b/accountsdb-plugin-manager/src/accounts_update_notifier.rs @@ -3,7 +3,7 @@ use { crate::accountsdb_plugin_manager::AccountsDbPluginManager, log::*, solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ - ReplicaAccountInfo, ReplicaAccountInfoVersions, SlotStatus, + ReplicaAccountInfo, ReplicaAccountInfoVersions, }, solana_measure::measure::Measure, solana_metrics::*, @@ -86,18 +86,6 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { ); } } - - fn notify_slot_confirmed(&self, slot: Slot, parent: Option) { - self.notify_slot_status(slot, parent, SlotStatus::Confirmed); - } - - fn notify_slot_processed(&self, slot: Slot, parent: Option) { - self.notify_slot_status(slot, parent, SlotStatus::Processed); - } - - fn notify_slot_rooted(&self, slot: Slot, parent: Option) { - self.notify_slot_status(slot, parent, SlotStatus::Rooted); - } } impl AccountsUpdateNotifierImpl { @@ -189,39 +177,4 @@ impl AccountsUpdateNotifierImpl { 100000 ); } - - pub fn notify_slot_status(&self, slot: Slot, parent: Option, slot_status: SlotStatus) { - let mut plugin_manager = self.plugin_manager.write().unwrap(); - if plugin_manager.plugins.is_empty() { - return; - } - - for plugin in plugin_manager.plugins.iter_mut() { - let mut measure = Measure::start("accountsdb-plugin-update-slot"); - match plugin.update_slot_status(slot, parent, slot_status.clone()) { - Err(err) => { - error!( - "Failed to update slot status at slot {}, error: {} to plugin {}", - slot, - err, - plugin.name() - ) - } - Ok(_) => { - trace!( - "Successfully updated slot status at slot {} to plugin {}", - slot, - plugin.name() - ); - } - } - measure.stop(); - inc_new_counter_debug!( - "accountsdb-plugin-update-slot-us", - measure.as_us() as usize, - 1000, - 1000 - ); - } - } } diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs index a6074362f8..d628814f23 100644 --- a/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs @@ -52,4 +52,14 @@ impl AccountsDbPluginManager { drop(lib); } } + + /// Check if there is any plugin interested in account data + pub fn to_notify_account_data(&self) -> bool { + for plugin in &self.plugins { + if plugin.to_notify_account_data() { + return true; + } + } + false + } } diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs index 0af928838b..2fced0442e 100644 --- a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs @@ -2,7 +2,7 @@ use { crate::{ accounts_update_notifier::AccountsUpdateNotifierImpl, accountsdb_plugin_manager::AccountsDbPluginManager, - slot_status_observer::SlotStatusObserver, + slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver, }, crossbeam_channel::Receiver, log::*, @@ -42,9 +42,9 @@ pub enum AccountsdbPluginServiceError { /// The service managing the AccountsDb plugin workflow. pub struct AccountsDbPluginService { - slot_status_observer: SlotStatusObserver, + slot_status_observer: Option, plugin_manager: Arc>, - accounts_update_notifier: AccountsUpdateNotifier, + accounts_update_notifier: Option, } impl AccountsDbPluginService { @@ -74,13 +74,27 @@ impl AccountsDbPluginService { for accountsdb_plugin_config_file in accountsdb_plugin_config_files { Self::load_plugin(&mut plugin_manager, accountsdb_plugin_config_file)?; } + let to_notify_account_data = plugin_manager.to_notify_account_data(); let plugin_manager = Arc::new(RwLock::new(plugin_manager)); - let accounts_update_notifier = Arc::new(RwLock::new(AccountsUpdateNotifierImpl::new( - plugin_manager.clone(), - ))); - let slot_status_observer = - SlotStatusObserver::new(confirmed_bank_receiver, accounts_update_notifier.clone()); + + let accounts_update_notifier: Option = if to_notify_account_data { + let accounts_update_notifier = AccountsUpdateNotifierImpl::new(plugin_manager.clone()); + Some(Arc::new(RwLock::new(accounts_update_notifier))) + } else { + None + }; + + let slot_status_observer = if to_notify_account_data { + let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); + let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier)); + Some(SlotStatusObserver::new( + confirmed_bank_receiver, + slot_status_notifier, + )) + } else { + None + }; info!("Started AccountsDbPluginService"); Ok(AccountsDbPluginService { @@ -145,12 +159,14 @@ impl AccountsDbPluginService { Ok(()) } - pub fn get_accounts_update_notifier(&self) -> AccountsUpdateNotifier { + pub fn get_accounts_update_notifier(&self) -> Option { self.accounts_update_notifier.clone() } - pub fn join(mut self) -> thread::Result<()> { - self.slot_status_observer.join()?; + pub fn join(self) -> thread::Result<()> { + if let Some(mut slot_status_observer) = self.slot_status_observer { + slot_status_observer.join()?; + } self.plugin_manager.write().unwrap().unload(); Ok(()) } diff --git a/accountsdb-plugin-manager/src/lib.rs b/accountsdb-plugin-manager/src/lib.rs index d2b38b57b3..89c305a29a 100644 --- a/accountsdb-plugin-manager/src/lib.rs +++ b/accountsdb-plugin-manager/src/lib.rs @@ -1,4 +1,5 @@ pub mod accounts_update_notifier; pub mod accountsdb_plugin_manager; pub mod accountsdb_plugin_service; +pub mod slot_status_notifier; pub mod slot_status_observer; diff --git a/accountsdb-plugin-manager/src/slot_status_notifier.rs b/accountsdb-plugin-manager/src/slot_status_notifier.rs new file mode 100644 index 0000000000..f1cc298dd7 --- /dev/null +++ b/accountsdb-plugin-manager/src/slot_status_notifier.rs @@ -0,0 +1,81 @@ +use { + crate::accountsdb_plugin_manager::AccountsDbPluginManager, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::SlotStatus, + solana_measure::measure::Measure, + solana_metrics::*, + solana_sdk::clock::Slot, + std::sync::{Arc, RwLock}, +}; + +pub trait SlotStatusNotifierInterface { + /// Notified when a slot is optimistically confirmed + fn notify_slot_confirmed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is marked frozen. + fn notify_slot_processed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is rooted. + fn notify_slot_rooted(&self, slot: Slot, parent: Option); +} + +pub type SlotStatusNotifier = Arc>; + +pub struct SlotStatusNotifierImpl { + plugin_manager: Arc>, +} + +impl SlotStatusNotifierInterface for SlotStatusNotifierImpl { + fn notify_slot_confirmed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Confirmed); + } + + fn notify_slot_processed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Processed); + } + + fn notify_slot_rooted(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Rooted); + } +} + +impl SlotStatusNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + Self { plugin_manager } + } + + pub fn notify_slot_status(&self, slot: Slot, parent: Option, slot_status: SlotStatus) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter_mut() { + let mut measure = Measure::start("accountsdb-plugin-update-slot"); + match plugin.update_slot_status(slot, parent, slot_status.clone()) { + Err(err) => { + error!( + "Failed to update slot status at slot {}, error: {} to plugin {}", + slot, + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully updated slot status at slot {} to plugin {}", + slot, + plugin.name() + ); + } + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-update-slot-us", + measure.as_us() as usize, + 1000, + 1000 + ); + } + } +} diff --git a/accountsdb-plugin-manager/src/slot_status_observer.rs b/accountsdb-plugin-manager/src/slot_status_observer.rs index 9d3b36879f..bad8fa90ec 100644 --- a/accountsdb-plugin-manager/src/slot_status_observer.rs +++ b/accountsdb-plugin-manager/src/slot_status_observer.rs @@ -1,7 +1,7 @@ use { + crate::slot_status_notifier::SlotStatusNotifier, crossbeam_channel::Receiver, solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, - solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, std::{ sync::{ atomic::{AtomicBool, Ordering}, @@ -20,7 +20,7 @@ pub(crate) struct SlotStatusObserver { impl SlotStatusObserver { pub fn new( bank_notification_receiver: Receiver, - accounts_update_notifier: AccountsUpdateNotifier, + slot_status_notifier: SlotStatusNotifier, ) -> Self { let exit_updated_slot_server = Arc::new(AtomicBool::new(false)); @@ -28,7 +28,7 @@ impl SlotStatusObserver { bank_notification_receiver_service: Some(Self::run_bank_notification_receiver( bank_notification_receiver, exit_updated_slot_server.clone(), - accounts_update_notifier, + slot_status_notifier, )), exit_updated_slot_server, } @@ -45,7 +45,7 @@ impl SlotStatusObserver { fn run_bank_notification_receiver( bank_notification_receiver: Receiver, exit: Arc, - accounts_update_notifier: AccountsUpdateNotifier, + slot_status_notifier: SlotStatusNotifier, ) -> JoinHandle<()> { Builder::new() .name("bank_notification_receiver".to_string()) @@ -54,19 +54,19 @@ impl SlotStatusObserver { if let Ok(slot) = bank_notification_receiver.recv() { match slot { BankNotification::OptimisticallyConfirmed(slot) => { - accounts_update_notifier + slot_status_notifier .read() .unwrap() .notify_slot_confirmed(slot, None); } BankNotification::Frozen(bank) => { - accounts_update_notifier + slot_status_notifier .read() .unwrap() .notify_slot_processed(bank.slot(), Some(bank.parent_slot())); } BankNotification::Root(bank) => { - accounts_update_notifier + slot_status_notifier .read() .unwrap() .notify_slot_rooted(bank.slot(), Some(bank.parent_slot())); diff --git a/accountsdb-plugin-postgres/src/accounts_selector.rs b/accountsdb-plugin-postgres/src/accounts_selector.rs index 91c669f70a..77398b76c1 100644 --- a/accountsdb-plugin-postgres/src/accounts_selector.rs +++ b/accountsdb-plugin-postgres/src/accounts_selector.rs @@ -48,6 +48,11 @@ impl AccountsSelector { pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool { self.select_all_accounts || self.accounts.contains(account) || self.owners.contains(owner) } + + /// Check if any account is of interested at all + pub fn is_enabled(&self) -> bool { + self.select_all_accounts || !self.accounts.is_empty() || !self.owners.is_empty() + } } #[cfg(test)] diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs index 6d9a4248cc..165f971e67 100644 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -276,6 +276,15 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { } Ok(()) } + + /// Check if the plugin is interested in account data + /// Default is true -- if the plugin is not interested in + /// account data, please return false. + fn to_notify_account_data(&self) -> bool { + self.accounts_selector + .as_ref() + .map_or_else(|| false, |selector| selector.is_enabled()) + } } impl AccountsDbPluginPostgres { diff --git a/core/src/validator.rs b/core/src/validator.rs index 064c4ec428..5c22280eaf 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -414,6 +414,17 @@ impl Validator { let accounts_package_channel = channel(); + let accounts_update_notifier = + accountsdb_plugin_service + .as_ref() + .and_then(|accountsdb_plugin_service| { + accountsdb_plugin_service.get_accounts_update_notifier() + }); + info!( + "AccountsDb plugin: accounts_update_notifier: {}", + accounts_update_notifier.is_some() + ); + let ( genesis_config, bank_forks, @@ -444,9 +455,7 @@ impl Validator { &start_progress, config.no_poh_speed_test, accounts_package_channel.0.clone(), - accountsdb_plugin_service - .as_ref() - .map(|plugin_service| plugin_service.get_accounts_update_notifier()), + accounts_update_notifier, ); *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; diff --git a/runtime/src/accounts_db/accountsdb_plugin_utils.rs b/runtime/src/accounts_db/accountsdb_plugin_utils.rs index f57894ffdb..6a37684574 100644 --- a/runtime/src/accounts_db/accountsdb_plugin_utils.rs +++ b/runtime/src/accounts_db/accountsdb_plugin_utils.rs @@ -202,15 +202,6 @@ pub mod tests { .push((slot, account.clone_account())); } - /// Notified when a slot is optimistically confirmed - fn notify_slot_confirmed(&self, _slot: Slot, _parent: Option) {} - - /// Notified when a slot is marked frozen. - fn notify_slot_processed(&self, _slot: Slot, _parent: Option) {} - - /// Notified when a slot is rooted. - fn notify_slot_rooted(&self, _slot: Slot, _parent: Option) {} - fn notify_end_of_restore_from_snapshot(&self) { self.is_startup_done.store(true, Ordering::Relaxed); } diff --git a/runtime/src/accounts_update_notifier_interface.rs b/runtime/src/accounts_update_notifier_interface.rs index b42c6d5c22..18ccd58f43 100644 --- a/runtime/src/accounts_update_notifier_interface.rs +++ b/runtime/src/accounts_update_notifier_interface.rs @@ -14,15 +14,6 @@ pub trait AccountsUpdateNotifierInterface: std::fmt::Debug { /// Notified when all accounts have been notified when restoring from a snapshot. fn notify_end_of_restore_from_snapshot(&self); - - /// Notified when a slot is optimistically confirmed - fn notify_slot_confirmed(&self, slot: Slot, parent: Option); - - /// Notified when a slot is marked frozen. - fn notify_slot_processed(&self, slot: Slot, parent: Option); - - /// Notified when a slot is rooted. - fn notify_slot_rooted(&self, slot: Slot, parent: Option); } pub type AccountsUpdateNotifier = Arc>;