diff --git a/Cargo.lock b/Cargo.lock index 999361c9c..ad2a321f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -946,6 +946,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "crypto-mac" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6" +dependencies = [ + "generic-array 0.14.4", + "subtle", +] + [[package]] name = "crypto-mac" version = "0.11.0" @@ -1336,6 +1346,12 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fast-math" version = "0.1.1" @@ -1801,6 +1817,16 @@ dependencies = [ "digest 0.9.0", ] +[[package]] +name = "hmac" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15" +dependencies = [ + "crypto-mac 0.10.0", + "digest 0.9.0", +] + [[package]] name = "hmac" version = "0.11.0" @@ -2406,6 +2432,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" +[[package]] +name = "md-5" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" +dependencies = [ + "block-buffer 0.9.0", + "digest 0.9.0", + "opaque-debug 0.3.0", +] + [[package]] name = "memchr" version = "2.4.0" @@ -2909,6 +2946,24 @@ dependencies = [ "indexmap", ] +[[package]] +name = "phf" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" +dependencies = [ + "siphasher", +] + [[package]] name = "pickledb" version = "0.4.1" @@ -2986,6 +3041,50 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" +[[package]] +name = "postgres" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7871ee579860d8183f542e387b176a25f2656b9fb5211e045397f745a68d1c2" +dependencies = [ + "bytes 1.0.1", + "fallible-iterator", + "futures 0.3.17", + "log 0.4.14", + "tokio", + "tokio-postgres", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes 1.0.1", + "fallible-iterator", + "hmac 0.10.1", + "md-5", + "memchr", + "rand 0.8.3", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +dependencies = [ + "bytes 1.0.1", + "chrono", + "fallible-iterator", + "postgres-protocol", +] + [[package]] name = "ppv-lite86" version = "0.2.8" @@ -4015,6 +4114,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711" +[[package]] +name = "siphasher" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b" + [[package]] name = "slab" version = "0.4.2" @@ -4138,6 +4243,51 @@ dependencies = [ "spl-token", ] +[[package]] +name = "solana-accountsdb-plugin-interface" +version = "1.8.0" +dependencies = [ + "log 0.4.14", + "thiserror", +] + +[[package]] +name = "solana-accountsdb-plugin-manager" +version = "1.8.0" +dependencies = [ + "bs58 0.4.0", + "crossbeam-channel", + "libloading", + "log 0.4.14", + "serde", + "serde_derive", + "serde_json", + "solana-accountsdb-plugin-interface", + "solana-logger 1.8.0", + "solana-metrics", + "solana-rpc", + "solana-runtime", + "solana-sdk", + "thiserror", +] + +[[package]] +name = "solana-accountsdb-plugin-postgres" +version = "1.8.0" +dependencies = [ + "bs58 0.4.0", + "chrono", + "libloading", + "log 0.4.14", + "postgres", + "serde", + "serde_derive", + "serde_json", + "solana-accountsdb-plugin-interface", + "solana-logger 1.8.0", + "thiserror", +] + [[package]] name = "solana-banking-bench" version = "1.8.0" @@ -4473,6 +4623,7 @@ dependencies = [ "serde_json", "serial_test", "solana-account-decoder", + "solana-accountsdb-plugin-manager", "solana-client", "solana-config-program", "solana-entry", @@ -5249,6 +5400,7 @@ dependencies = [ "futures-util", "log 0.4.14", "prost", + "solana-rpc", "solana-runtime", "solana-sdk", "tokio", @@ -5891,6 +6043,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "stringprep" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "strsim" version = "0.8.0" @@ -6310,6 +6472,29 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2b1383c7e4fb9a09e292c7c6afb7da54418d53b045f1c1fac7a911411a2b8b" +dependencies = [ + "async-trait", + "byteorder", + "bytes 1.0.1", + "fallible-iterator", + "futures 0.3.17", + "log 0.4.14", + "parking_lot 0.11.2", + "percent-encoding 2.1.0", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "socket2", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-reactor" version = "0.1.12" diff --git a/Cargo.toml b/Cargo.toml index 434e97e89..75e763d7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,8 @@ [workspace] members = [ + "accountsdb-plugin-interface", + "accountsdb-plugin-manager", + "accountsdb-plugin-postgres", "accounts-cluster-bench", "bench-streamer", "bench-tps", diff --git a/accountsdb-plugin-interface/Cargo.toml b/accountsdb-plugin-interface/Cargo.toml new file mode 100644 index 000000000..c2b361863 --- /dev/null +++ b/accountsdb-plugin-interface/Cargo.toml @@ -0,0 +1,17 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-accountsdb-plugin-interface" +description = "The Solana AccountsDb plugin interface." +version = "1.8.0" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[dependencies] +log = "0.4.11" +thiserror = "1.0.29" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-interface/README.md b/accountsdb-plugin-interface/README.md new file mode 100644 index 000000000..6646b9908 --- /dev/null +++ b/accountsdb-plugin-interface/README.md @@ -0,0 +1,20 @@ +

+ + Solana + +

+ +# Solana AccountsDb Plugin Interface + +This crate enables an AccountsDb plugin to be plugged into the Solana Validator runtime to take actions +at the time of each account update; for example, saving the account state to an external database. The plugin must implement the `AccountsDbPlugin` trait. Please see the detail of the `accountsdb_plugin_interface.rs` for the interface definition. + +The plugin should produce a `cdylib` dynamic library, which must expose a `C` function `_create_plugin()` that +instantiates the implementation of the interface. + +The `solana-accountsdb-plugin-postgres` crate provides an example of how to create a plugin which saves the accounts data into an +external PostgreSQL databases. + +More information about Solana is available in the [Solana documentation](https://docs.solana.com/). + +Still have questions? Ask us on [Discord](https://discordapp.com/invite/pquxPsq) diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs new file mode 100644 index 000000000..b75914bf3 --- /dev/null +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -0,0 +1,90 @@ +/// The interface for AccountsDb plugins. A plugin must implement +/// the AccountsDbPlugin trait to work with the runtime. +/// In addition, the dynamic library must export a "C" function _create_plugin which +/// creates the implementation of the plugin. +use { + std::{any::Any, error, io}, + thiserror::Error, +}; + +impl Eq for ReplicaAccountInfo<'_> {} + +#[derive(Clone, PartialEq, Debug)] +pub struct ReplicaAccountInfo<'a> { + pub pubkey: &'a [u8], + pub lamports: u64, + pub owner: &'a [u8], + pub executable: bool, + pub rent_epoch: u64, + pub data: &'a [u8], +} + +pub enum ReplicaAccountInfoVersions<'a> { + V0_0_1(&'a ReplicaAccountInfo<'a>), +} + +#[derive(Error, Debug)] +pub enum AccountsDbPluginError { + #[error("Error opening config file.")] + ConfigFileOpenError(#[from] io::Error), + + #[error("Error reading config file.")] + ConfigFileReadError { msg: String }, + + #[error("Error updating account.")] + AccountsUpdateError { msg: String }, + + #[error("Error updating slot status.")] + SlotStatusUpdateError { msg: String }, + + #[error("Plugin-defined custom error.")] + Custom(Box), +} + +#[derive(Debug, Clone)] +pub enum SlotStatus { + Processed, + Rooted, + Confirmed, +} + +impl SlotStatus { + pub fn as_str(&self) -> &'static str { + match self { + SlotStatus::Confirmed => "confirmed", + SlotStatus::Processed => "processed", + SlotStatus::Rooted => "rooted", + } + } +} + +pub type Result = std::result::Result; + +pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug { + fn name(&self) -> &'static str; + + /// The callback called when a plugin is loaded by the system, + /// used for doing whatever initialization is required by the plugin. + /// The _config_file contains the name of the + /// of the config file. The config must be in JSON format and + /// include a field "libpath" indicating the full path + /// name of the shared library implementing this interface. + fn on_load(&mut self, _config_file: &str) -> Result<()> { + Ok(()) + } + + /// The callback called right before a plugin is unloaded by the system + /// Used for doing cleanup before unload. + fn on_unload(&mut self) {} + + /// Called when an account is updated at a slot. + fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()>; + + /// Called when a slot status is updated + fn update_slot_status( + &mut self, + slot: u64, + parent: Option, + status: SlotStatus, + ) -> Result<()>; +} diff --git a/accountsdb-plugin-interface/src/lib.rs b/accountsdb-plugin-interface/src/lib.rs new file mode 100644 index 000000000..0c3e43c20 --- /dev/null +++ b/accountsdb-plugin-interface/src/lib.rs @@ -0,0 +1 @@ +pub mod accountsdb_plugin_interface; diff --git a/accountsdb-plugin-manager/Cargo.toml b/accountsdb-plugin-manager/Cargo.toml new file mode 100644 index 000000000..65e741532 --- /dev/null +++ b/accountsdb-plugin-manager/Cargo.toml @@ -0,0 +1,29 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-accountsdb-plugin-manager" +description = "The Solana AccountsDb plugin manager." +version = "1.8.0" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[dependencies] +bs58 = "0.4.0" +crossbeam-channel = "0.5" +libloading = "0.7.0" +log = "0.4.11" +serde = "1.0.130" +serde_derive = "1.0.103" +serde_json = "1.0.67" +solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.8.0" } +solana-logger = { path = "../logger", version = "=1.8.0" } +solana-metrics = { path = "../metrics", version = "=1.8.0" } +solana-rpc = { path = "../rpc", version = "=1.8.0" } +solana-runtime = { path = "../runtime", version = "=1.8.0" } +solana-sdk = { path = "../sdk", version = "=1.8.0" } +thiserror = "1.0.29" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-manager/src/accounts_update_notifier.rs b/accountsdb-plugin-manager/src/accounts_update_notifier.rs new file mode 100644 index 000000000..1896dca01 --- /dev/null +++ b/accountsdb-plugin-manager/src/accounts_update_notifier.rs @@ -0,0 +1,129 @@ +/// Module responsible for notifying plugins of account updates +use { + crate::accountsdb_plugin_manager::AccountsDbPluginManager, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + ReplicaAccountInfo, ReplicaAccountInfoVersions, SlotStatus, + }, + solana_runtime::{ + accounts_update_notifier_interface::AccountsUpdateNotifierInterface, + append_vec::StoredAccountMeta, + }, + solana_sdk::{ + account::{AccountSharedData, ReadableAccount}, + clock::Slot, + pubkey::Pubkey, + }, + std::sync::{Arc, RwLock}, +}; +#[derive(Debug)] +pub(crate) struct AccountsUpdateNotifierImpl { + plugin_manager: Arc>, +} + +impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { + fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) { + if let Some(account_info) = self.accountinfo_from_shared_account_data(pubkey, account) { + self.notify_plugins_of_account_update(account_info, slot); + } + } + + fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) { + if let Some(account_info) = self.accountinfo_from_stored_account_meta(account) { + self.notify_plugins_of_account_update(account_info, slot); + } + } + + fn notify_slot_confirmed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Confirmed); + } + + fn notify_slot_processed(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Processed); + } + + fn notify_slot_rooted(&self, slot: Slot, parent: Option) { + self.notify_slot_status(slot, parent, SlotStatus::Rooted); + } +} + +impl AccountsUpdateNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + AccountsUpdateNotifierImpl { plugin_manager } + } + + fn accountinfo_from_shared_account_data<'a>( + &self, + pubkey: &'a Pubkey, + account: &'a AccountSharedData, + ) -> Option> { + Some(ReplicaAccountInfo { + pubkey: pubkey.as_ref(), + lamports: account.lamports(), + owner: account.owner().as_ref(), + executable: account.executable(), + rent_epoch: account.rent_epoch(), + data: account.data(), + }) + } + + fn accountinfo_from_stored_account_meta<'a>( + &self, + stored_account_meta: &'a StoredAccountMeta, + ) -> Option> { + Some(ReplicaAccountInfo { + pubkey: stored_account_meta.meta.pubkey.as_ref(), + lamports: stored_account_meta.account_meta.lamports, + owner: stored_account_meta.account_meta.owner.as_ref(), + executable: stored_account_meta.account_meta.executable, + rent_epoch: stored_account_meta.account_meta.rent_epoch, + data: stored_account_meta.data, + }) + } + + fn notify_plugins_of_account_update(&self, account: ReplicaAccountInfo, slot: Slot) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + + if plugin_manager.plugins.is_empty() { + return; + } + for plugin in plugin_manager.plugins.iter_mut() { + match plugin.update_account(ReplicaAccountInfoVersions::V0_0_1(&account), slot) { + Err(err) => { + error!( + "Failed to update account {:?} at slot {:?}, error: {:?}", + account.pubkey, slot, err + ) + } + Ok(_) => { + trace!( + "Successfully updated account {:?} at slot {:?}", + account.pubkey, + slot + ); + } + } + } + } + + pub fn notify_slot_status(&self, slot: Slot, parent: Option, slot_status: SlotStatus) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter_mut() { + match plugin.update_slot_status(slot, parent, slot_status.clone()) { + Err(err) => { + error!( + "Failed to update slot status at slot {:?}, error: {:?}", + slot, err + ) + } + Ok(_) => { + trace!("Successfully updated slot status at slot {:?}", slot); + } + } + } + } +} diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs new file mode 100644 index 000000000..a6074362f --- /dev/null +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs @@ -0,0 +1,55 @@ +/// Managing the AccountsDb plugins +use { + libloading::{Library, Symbol}, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::AccountsDbPlugin, + std::error::Error, +}; + +#[derive(Default, Debug)] +pub struct AccountsDbPluginManager { + pub plugins: Vec>, + libs: Vec, +} + +impl AccountsDbPluginManager { + pub fn new() -> Self { + AccountsDbPluginManager { + plugins: Vec::default(), + libs: Vec::default(), + } + } + + /// # Safety + /// + /// This function loads the dynamically linked library specified in the path. The library + /// must do necessary initializations. + pub unsafe fn load_plugin( + &mut self, + libpath: &str, + config_file: &str, + ) -> Result<(), Box> { + type PluginConstructor = unsafe fn() -> *mut dyn AccountsDbPlugin; + let lib = Library::new(libpath)?; + let constructor: Symbol = lib.get(b"_create_plugin")?; + let plugin_raw = constructor(); + let mut plugin = Box::from_raw(plugin_raw); + plugin.on_load(config_file)?; + self.plugins.push(plugin); + self.libs.push(lib); + Ok(()) + } + + /// Unload all plugins and loaded plugin libraries, making sure to fire + /// their `on_plugin_unload()` methods so they can do any necessary cleanup. + pub fn unload(&mut self) { + for mut plugin in self.plugins.drain(..) { + info!("Unloading plugin for {:?}", plugin.name()); + plugin.on_unload(); + } + + for lib in self.libs.drain(..) { + drop(lib); + } + } +} diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs new file mode 100644 index 000000000..ed1043309 --- /dev/null +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs @@ -0,0 +1,150 @@ +use { + crate::{ + accounts_update_notifier::AccountsUpdateNotifierImpl, + accountsdb_plugin_manager::AccountsDbPluginManager, + slot_status_observer::SlotStatusObserver, + }, + crossbeam_channel::Receiver, + log::*, + serde_json, + solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, + solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, + std::{ + fs::File, + io::Read, + path::Path, + sync::{Arc, RwLock}, + thread, + }, + thiserror::Error, +}; + +#[derive(Error, Debug)] +pub enum AccountsdbPluginServiceError { + #[error("Cannot open the the plugin config file")] + CannotOpenConfigFile(String), + + #[error("Cannot read the the plugin config file")] + CannotReadConfigFile(String), + + #[error("The config file is not in a valid Json format")] + InvalidConfigFileFormat(String), + + #[error("Plugin library path is not specified in the config file")] + LibPathNotSet, + + #[error("Invalid plugin path")] + InvalidPluginPath, + + #[error("Cannot load plugin shared library")] + PluginLoadError(String), +} + +/// The service managing the AccountsDb plugin workflow. +pub struct AccountsDbPluginService { + slot_status_observer: SlotStatusObserver, + plugin_manager: Arc>, + accounts_update_notifier: AccountsUpdateNotifier, +} + +impl AccountsDbPluginService { + /// Creates and returns the AccountsDbPluginService. + /// # Arguments + /// * `confirmed_bank_receiver` - The receiver for confirmed bank notification + /// * `accountsdb_plugin_config_file` - The config file path for the plugin. The + /// config file controls the plugin responsible + /// for transporting the data to external data stores. It is defined in JSON format. + /// The `libpath` field should be pointed to the full path of the dynamic shared library + /// (.so file) to be loaded. The shared library must implement the `AccountsDbPlugin` + /// trait. And the shared library shall export a `C` function `_create_plugin` which + /// shall create the implementation of `AccountsDbPlugin` and returns to the caller. + /// The rest of the JSON fields' definition is up to to the concrete plugin implementation + /// It is usually used to configure the connection information for the external data store. + + pub fn new( + confirmed_bank_receiver: Receiver, + accountsdb_plugin_config_file: &Path, + ) -> Result { + info!( + "Starting AccountsDbPluginService from config file: {:?}", + accountsdb_plugin_config_file + ); + let plugin_manager = AccountsDbPluginManager::new(); + let plugin_manager = Arc::new(RwLock::new(plugin_manager)); + + let mut file = match File::open(accountsdb_plugin_config_file) { + Ok(file) => file, + Err(err) => { + return Err(AccountsdbPluginServiceError::CannotOpenConfigFile(format!( + "Failed to open the plugin config file {:?}, error: {:?}", + accountsdb_plugin_config_file, err + ))); + } + }; + + let mut contents = String::new(); + if let Err(err) = file.read_to_string(&mut contents) { + return Err(AccountsdbPluginServiceError::CannotReadConfigFile(format!( + "Failed to read the plugin config file {:?}, error: {:?}", + accountsdb_plugin_config_file, err + ))); + } + + let result: serde_json::Value = match serde_json::from_str(&contents) { + Ok(value) => value, + Err(err) => { + return Err(AccountsdbPluginServiceError::InvalidConfigFileFormat( + format!( + "The config file {:?} is not in a valid Json format, error: {:?}", + accountsdb_plugin_config_file, err + ), + )); + } + }; + + let accounts_update_notifier = Arc::new(RwLock::new(AccountsUpdateNotifierImpl::new( + plugin_manager.clone(), + ))); + let slot_status_observer = + SlotStatusObserver::new(confirmed_bank_receiver, accounts_update_notifier.clone()); + + let libpath = result["libpath"] + .as_str() + .ok_or(AccountsdbPluginServiceError::LibPathNotSet)?; + let config_file = accountsdb_plugin_config_file + .as_os_str() + .to_str() + .ok_or(AccountsdbPluginServiceError::InvalidPluginPath)?; + + unsafe { + let result = plugin_manager + .write() + .unwrap() + .load_plugin(libpath, config_file); + if let Err(err) = result { + let msg = format!( + "Failed to load the plugin library: {:?}, error: {:?}", + libpath, err + ); + return Err(AccountsdbPluginServiceError::PluginLoadError(msg)); + } + } + + info!("Started AccountsDbPluginService"); + Ok(AccountsDbPluginService { + slot_status_observer, + plugin_manager, + accounts_update_notifier, + }) + } + + pub fn get_accounts_update_notifier(&self) -> AccountsUpdateNotifier { + self.accounts_update_notifier.clone() + } + + pub fn join(mut self) -> thread::Result<()> { + self.slot_status_observer.join()?; + self.plugin_manager.write().unwrap().unload(); + Ok(()) + } +} diff --git a/accountsdb-plugin-manager/src/lib.rs b/accountsdb-plugin-manager/src/lib.rs new file mode 100644 index 000000000..d2b38b57b --- /dev/null +++ b/accountsdb-plugin-manager/src/lib.rs @@ -0,0 +1,4 @@ +pub mod accounts_update_notifier; +pub mod accountsdb_plugin_manager; +pub mod accountsdb_plugin_service; +pub mod slot_status_observer; diff --git a/accountsdb-plugin-manager/src/slot_status_observer.rs b/accountsdb-plugin-manager/src/slot_status_observer.rs new file mode 100644 index 000000000..9d3b36879 --- /dev/null +++ b/accountsdb-plugin-manager/src/slot_status_observer.rs @@ -0,0 +1,80 @@ +use { + crossbeam_channel::Receiver, + solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, + solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + }, +}; + +#[derive(Debug)] +pub(crate) struct SlotStatusObserver { + bank_notification_receiver_service: Option>, + exit_updated_slot_server: Arc, +} + +impl SlotStatusObserver { + pub fn new( + bank_notification_receiver: Receiver, + accounts_update_notifier: AccountsUpdateNotifier, + ) -> Self { + let exit_updated_slot_server = Arc::new(AtomicBool::new(false)); + + Self { + bank_notification_receiver_service: Some(Self::run_bank_notification_receiver( + bank_notification_receiver, + exit_updated_slot_server.clone(), + accounts_update_notifier, + )), + exit_updated_slot_server, + } + } + + pub fn join(&mut self) -> thread::Result<()> { + self.exit_updated_slot_server.store(true, Ordering::Relaxed); + self.bank_notification_receiver_service + .take() + .map(JoinHandle::join) + .unwrap() + } + + fn run_bank_notification_receiver( + bank_notification_receiver: Receiver, + exit: Arc, + accounts_update_notifier: AccountsUpdateNotifier, + ) -> JoinHandle<()> { + Builder::new() + .name("bank_notification_receiver".to_string()) + .spawn(move || { + while !exit.load(Ordering::Relaxed) { + if let Ok(slot) = bank_notification_receiver.recv() { + match slot { + BankNotification::OptimisticallyConfirmed(slot) => { + accounts_update_notifier + .read() + .unwrap() + .notify_slot_confirmed(slot, None); + } + BankNotification::Frozen(bank) => { + accounts_update_notifier + .read() + .unwrap() + .notify_slot_processed(bank.slot(), Some(bank.parent_slot())); + } + BankNotification::Root(bank) => { + accounts_update_notifier + .read() + .unwrap() + .notify_slot_rooted(bank.slot(), Some(bank.parent_slot())); + } + } + } + } + }) + .unwrap() + } +} diff --git a/accountsdb-plugin-postgres/Cargo.toml b/accountsdb-plugin-postgres/Cargo.toml new file mode 100644 index 000000000..e750dae9f --- /dev/null +++ b/accountsdb-plugin-postgres/Cargo.toml @@ -0,0 +1,29 @@ +[package] +authors = ["Solana Maintainers "] +edition = "2018" +name = "solana-accountsdb-plugin-postgres" +description = "The Solana AccountsDb plugin for PostgreSQL database." +version = "1.8.0" +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +documentation = "https://docs.rs/solana-validator" + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +bs58 = "0.4.0" +chrono = { version = "0.4.11", features = ["serde"] } +libloading = "0.7.0" +log = "0.4.14" +postgres = { version = "0.19.1", features = ["with-chrono-0_4"] } +serde = "1.0.130" +serde_derive = "1.0.103" +serde_json = "1.0.67" +solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.8.0" } +solana-logger = { path = "../logger", version = "=1.8.0" } +thiserror = "1.0.29" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-postgres/scripts/create_schema.sql b/accountsdb-plugin-postgres/scripts/create_schema.sql new file mode 100644 index 000000000..41be52f5b --- /dev/null +++ b/accountsdb-plugin-postgres/scripts/create_schema.sql @@ -0,0 +1,52 @@ +/** + * This plugin implementation for PostgreSQL requires the following tables + */ +-- The table storing accounts + + +CREATE TABLE account ( + pubkey BYTEA PRIMARY KEY, + owner BYTEA, + lamports BIGINT NOT NULL, + slot BIGINT NOT NULL, + executable BOOL NOT NULL, + rent_epoch BIGINT NOT NULL, + data BYTEA, + updated_on TIMESTAMP NOT NULL +); + +-- The table storing slot information +CREATE TABLE slot ( + slot BIGINT PRIMARY KEY, + parent BIGINT, + status varchar(16) NOT NULL, + updated_on TIMESTAMP NOT NULL +); + +/** + * The following is for keeping historical data for accounts and is not required for plugin to work. + */ +-- The table storing historical data for accounts +CREATE TABLE account_audit ( + pubkey BYTEA, + owner BYTEA, + lamports BIGINT NOT NULL, + slot BIGINT NOT NULL, + executable BOOL NOT NULL, + rent_epoch BIGINT NOT NULL, + data BYTEA, + updated_on TIMESTAMP NOT NULL +); + +CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$ + BEGIN + INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, updated_on) + VALUES (OLD.pubkey, OLD.owner, OLD.lamports, OLD.slot, + OLD.executable, OLD.rent_epoch, OLD.data, OLD.updated_on); + RETURN NEW; + END; + +$audit_account_update$ LANGUAGE plpgsql; + +CREATE TRIGGER account_update_trigger AFTER UPDATE OR DELETE ON account + FOR EACH ROW EXECUTE PROCEDURE audit_account_update(); diff --git a/accountsdb-plugin-postgres/scripts/drop_schema.sql b/accountsdb-plugin-postgres/scripts/drop_schema.sql new file mode 100644 index 000000000..e76a0dc16 --- /dev/null +++ b/accountsdb-plugin-postgres/scripts/drop_schema.sql @@ -0,0 +1,7 @@ +/** + * Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin. + */ + +DROP FUNCTION audit_account_update; +DROP TABLE account_audit; +DROP TABLE account; \ No newline at end of file diff --git a/accountsdb-plugin-postgres/src/accounts_selector.rs b/accountsdb-plugin-postgres/src/accounts_selector.rs new file mode 100644 index 000000000..91c669f70 --- /dev/null +++ b/accountsdb-plugin-postgres/src/accounts_selector.rs @@ -0,0 +1,69 @@ +use {log::*, std::collections::HashSet}; + +#[derive(Debug)] +pub(crate) struct AccountsSelector { + pub accounts: HashSet>, + pub owners: HashSet>, + pub select_all_accounts: bool, +} + +impl AccountsSelector { + pub fn default() -> Self { + AccountsSelector { + accounts: HashSet::default(), + owners: HashSet::default(), + select_all_accounts: true, + } + } + + pub fn new(accounts: &[String], owners: &[String]) -> Self { + info!( + "Creating AccountsSelector from accounts: {:?}, owners: {:?}", + accounts, owners + ); + + let select_all_accounts = accounts.iter().any(|key| key == "*"); + if select_all_accounts { + return AccountsSelector { + accounts: HashSet::default(), + owners: HashSet::default(), + select_all_accounts, + }; + } + let accounts = accounts + .iter() + .map(|key| bs58::decode(key).into_vec().unwrap()) + .collect(); + let owners = owners + .iter() + .map(|key| bs58::decode(key).into_vec().unwrap()) + .collect(); + AccountsSelector { + accounts, + owners, + select_all_accounts, + } + } + + pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool { + self.select_all_accounts || self.accounts.contains(account) || self.owners.contains(owner) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + + #[test] + fn test_create_accounts_selector() { + AccountsSelector::new( + &["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()], + &[], + ); + + AccountsSelector::new( + &[], + &["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()], + ); + } +} diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs new file mode 100644 index 000000000..1663a0e2d --- /dev/null +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -0,0 +1,349 @@ +/// Main entry for the PostgreSQL plugin +use { + crate::accounts_selector::AccountsSelector, + bs58, + chrono::Utc, + log::*, + postgres::{Client, NoTls, Statement}, + serde_derive::{Deserialize, Serialize}, + serde_json, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result, SlotStatus, + }, + std::{fs::File, io::Read, sync::Mutex}, + thiserror::Error, +}; + +struct PostgresSqlClientWrapper { + client: Client, + update_account_stmt: Statement, +} + +#[derive(Default)] +pub struct AccountsDbPluginPostgres { + client: Option>, + accounts_selector: Option, +} + +impl std::fmt::Debug for AccountsDbPluginPostgres { + fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + Ok(()) + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +struct AccountsDbPluginPostgresConfig { + host: String, + user: String, +} + +#[derive(Error, Debug)] +enum AccountsDbPluginPostgresError { + #[error("Error connecting to the backend data store.")] + DataStoreConnectionError { msg: String }, + + #[error("Error preparing data store schema.")] + DataSchemaError { msg: String }, +} + +impl AccountsDbPlugin for AccountsDbPluginPostgres { + fn name(&self) -> &'static str { + "AccountsDbPluginPostgres" + } + + /// Do initialization for the PostgreSQL plugin. + /// # Arguments + /// + /// Format of the config file: + /// The `accounts_selector` section allows the user to controls accounts selections. + /// "accounts_selector" : { + /// "accounts" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\], + /// } + /// or: + /// "accounts_selector" = { + /// "owners" : \["pubkey-1', 'pubkey-2", ..., "pubkey-m"\] + /// } + /// Accounts either satisyfing the accounts condition or owners condition will be selected. + /// When only owners is specified, + /// all accounts belonging to the owners will be streamed. + /// The accounts field support wildcard to select all accounts: + /// "accounts_selector" : { + /// "accounts" : \["*"\], + /// } + /// "host" specifies the PostgreSQL server. + /// "user" specifies the PostgreSQL user. + /// # Examples + /// { + /// "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so", + /// "host": "host_foo", + /// "user": "solana", + /// "accounts_selector" : { + /// "owners" : ["9oT9R5ZyRovSVnt37QvVoBttGpNqR3J7unkb567NP8k3"] + /// } + + fn on_load(&mut self, config_file: &str) -> Result<()> { + solana_logger::setup_with_default("info"); + info!( + "Loading plugin {:?} from config_file {:?}", + self.name(), + config_file + ); + let mut file = File::open(config_file)?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + + let result: serde_json::Value = serde_json::from_str(&contents).unwrap(); + self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result)); + + let result: serde_json::Result = + serde_json::from_str(&contents); + match result { + Err(err) => { + return Err(AccountsDbPluginError::ConfigFileReadError { + msg: format!( + "The config file is not in the JSON format expected: {:?}", + err + ), + }) + } + Ok(config) => { + let connection_str = format!("host={} user={}", config.host, config.user); + match Client::connect(&connection_str, NoTls) { + Err(err) => { + return Err(AccountsDbPluginError::Custom( + Box::new(AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: format!( + "Error in connecting to the PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, connection_str), + }))); + } + Ok(mut client) => { + let result = client.prepare("INSERT INTO account (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ + ON CONFLICT (pubkey) DO UPDATE SET slot=$2, owner=$3, lamports=$4, executable=$5, rent_epoch=$6, \ + data=$7, updated_on=$8"); + + match result { + Err(err) => { + return Err(AccountsDbPluginError::Custom( + Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the accounts update PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, connection_str + ), + }))); + } + Ok(update_account_stmt) => { + self.client = Some(Mutex::new(PostgresSqlClientWrapper { + client, + update_account_stmt, + })); + } + } + } + } + } + } + + Ok(()) + } + + /// Unload all plugins and loaded plugin libraries, making sure to fire + /// their `on_plugin_unload()` methods so they can do any necessary cleanup. + fn on_unload(&mut self) { + info!("Unloading plugin: {:?}", self.name()); + } + + fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()> { + match account { + ReplicaAccountInfoVersions::V0_0_1(account) => { + if let Some(accounts_selector) = &self.accounts_selector { + if !accounts_selector.is_account_selected(account.pubkey, account.owner) { + return Ok(()); + } + } else { + return Ok(()); + } + + debug!( + "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}", + bs58::encode(account.pubkey).into_string(), + bs58::encode(account.owner).into_string(), + slot, + self.accounts_selector.as_ref().unwrap() + ); + + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database." + .to_string(), + }, + ))); + } + Some(client) => { + let slot = slot as i64; // postgres only supports i64 + let lamports = account.lamports as i64; + let rent_epoch = account.rent_epoch as i64; + let updated_on = Utc::now().naive_utc(); + let client = client.get_mut().unwrap(); + let result = client.client.query( + &client.update_account_stmt, + &[ + &account.pubkey, + &slot, + &account.owner, + &lamports, + &account.executable, + &rent_epoch, + &account.data, + &updated_on, + ], + ); + + if let Err(err) = result { + return Err(AccountsDbPluginError::AccountsUpdateError { + msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err) + }); + } + } + } + } + } + Ok(()) + } + + fn update_slot_status( + &mut self, + slot: u64, + parent: Option, + status: SlotStatus, + ) -> Result<()> { + info!("Updating slot {:?} at with status {:?}", slot, status); + + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database.".to_string(), + }, + ))); + } + Some(client) => { + let slot = slot as i64; // postgres only supports i64 + let parent = parent.map(|parent| parent as i64); + let updated_on = Utc::now().naive_utc(); + let status_str = status.as_str(); + + let result = match parent { + Some(parent) => { + client.get_mut().unwrap().client.execute( + "INSERT INTO slot (slot, parent, status, updated_on) \ + VALUES ($1, $2, $3, $4) \ + ON CONFLICT (slot) DO UPDATE SET parent=$2, status=$3, updated_on=$4", + &[ + &slot, + &parent, + &status_str, + &updated_on, + ], + ) + } + None => { + client.get_mut().unwrap().client.execute( + "INSERT INTO slot (slot, status, updated_on) \ + VALUES ($1, $2, $3) \ + ON CONFLICT (slot) DO UPDATE SET status=$2, updated_on=$3", + &[ + &slot, + &status_str, + &updated_on, + ], + ) + } + }; + + match result { + Err(err) => { + return Err(AccountsDbPluginError::SlotStatusUpdateError{ + msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err) + }); + } + Ok(rows) => { + assert_eq!(1, rows, "Expected one rows to be updated a time"); + } + } + } + } + + Ok(()) + } +} + +impl AccountsDbPluginPostgres { + fn create_accounts_selector_from_config(config: &serde_json::Value) -> AccountsSelector { + let accounts_selector = &config["accounts_selector"]; + + if accounts_selector.is_null() { + AccountsSelector::default() + } else { + let accounts = &accounts_selector["accounts"]; + let accounts: Vec = if accounts.is_array() { + accounts + .as_array() + .unwrap() + .iter() + .map(|val| val.as_str().unwrap().to_string()) + .collect() + } else { + Vec::default() + }; + let owners = &accounts_selector["owners"]; + let owners: Vec = if owners.is_array() { + owners + .as_array() + .unwrap() + .iter() + .map(|val| val.as_str().unwrap().to_string()) + .collect() + } else { + Vec::default() + }; + AccountsSelector::new(&accounts, &owners) + } + } + + pub fn new() -> Self { + AccountsDbPluginPostgres { + client: None, + accounts_selector: None, + } + } +} + +#[no_mangle] +#[allow(improper_ctypes_definitions)] +/// # Safety +/// +/// This function returns the AccountsDbPluginPostgres pointer as trait AccountsDbPlugin. +pub unsafe extern "C" fn _create_plugin() -> *mut dyn AccountsDbPlugin { + let plugin = AccountsDbPluginPostgres::new(); + let plugin: Box = Box::new(plugin); + Box::into_raw(plugin) +} + +#[cfg(test)] +pub(crate) mod tests { + use {super::*, serde_json}; + + #[test] + fn test_accounts_selector_from_config() { + let config = "{\"accounts_selector\" : { \ + \"owners\" : [\"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin\"] \ + }}"; + + let config: serde_json::Value = serde_json::from_str(config).unwrap(); + AccountsDbPluginPostgres::create_accounts_selector_from_config(&config); + } +} diff --git a/accountsdb-plugin-postgres/src/lib.rs b/accountsdb-plugin-postgres/src/lib.rs new file mode 100644 index 000000000..ccc4f1790 --- /dev/null +++ b/accountsdb-plugin-postgres/src/lib.rs @@ -0,0 +1,2 @@ +pub mod accounts_selector; +pub mod accountsdb_plugin_postgres; diff --git a/core/Cargo.toml b/core/Cargo.toml index 0aebf09f2..2d45fc6d9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -35,6 +35,7 @@ retain_mut = "0.1.4" serde = "1.0.130" serde_derive = "1.0.103" solana-account-decoder = { path = "../account-decoder", version = "=1.8.0" } +solana-accountsdb-plugin-manager = { path = "../accountsdb-plugin-manager", version = "=1.8.0" } solana-client = { path = "../client", version = "=1.8.0" } solana-config-program = { path = "../programs/config", version = "=1.8.0" } solana-entry = { path = "../entry", version = "=1.8.0" } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index fc99d7052..3770f532d 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -506,6 +506,7 @@ mod tests { None, None, accounts_package_sender, + None, ) .unwrap(); let leader_schedule_cache = Arc::new(cached_leader_schedule); diff --git a/core/src/validator.rs b/core/src/validator.rs index 9bd7bd072..5bdf99edc 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -20,6 +20,7 @@ use { }, crossbeam_channel::{bounded, unbounded}, rand::{thread_rng, Rng}, + solana_accountsdb_plugin_manager::accountsdb_plugin_service::AccountsDbPluginService, solana_entry::poh::compute_hash_time_ns, solana_gossip::{ cluster_info::{ @@ -63,6 +64,7 @@ use { solana_runtime::{ accounts_db::{AccountShrinkThreshold, AccountsDbConfig}, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank::Bank, bank_forks::BankForks, commitment::BlockCommitmentCache, @@ -113,6 +115,7 @@ pub struct ValidatorConfig { pub account_shrink_paths: Option>, pub rpc_config: JsonRpcConfig, pub accountsdb_repl_service_config: Option, + pub accountsdb_plugin_config_file: Option, pub rpc_addrs: Option<(SocketAddr, SocketAddr)>, // (JsonRpc, JsonRpcPubSub) pub pubsub_config: PubSubConfig, pub snapshot_config: Option, @@ -173,6 +176,7 @@ impl Default for ValidatorConfig { account_shrink_paths: None, rpc_config: JsonRpcConfig::default(), accountsdb_repl_service_config: None, + accountsdb_plugin_config_file: None, rpc_addrs: None, pubsub_config: PubSubConfig::default(), snapshot_config: None, @@ -279,6 +283,7 @@ pub struct Validator { ip_echo_server: Option, pub cluster_info: Arc, accountsdb_repl_service: Option, + accountsdb_plugin_service: Option, } // in the distant future, get rid of ::new()/exit() and use Result properly... @@ -315,6 +320,27 @@ impl Validator { warn!("identity: {}", id); warn!("vote account: {}", vote_account); + let mut bank_notification_senders = Vec::new(); + + let accountsdb_plugin_service = + if let Some(accountsdb_plugin_config_file) = &config.accountsdb_plugin_config_file { + let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); + bank_notification_senders.push(confirmed_bank_sender); + let result = AccountsDbPluginService::new( + confirmed_bank_receiver, + accountsdb_plugin_config_file, + ); + match result { + Ok(accountsdb_plugin_service) => Some(accountsdb_plugin_service), + Err(err) => { + error!("Failed to load the AccountsDb plugin: {:?}", err); + abort(); + } + } + } else { + None + }; + if config.voting_disabled { warn!("voting disabled"); authorized_voter_keypairs.write().unwrap().clear(); @@ -380,6 +406,7 @@ impl Validator { } let accounts_package_channel = channel(); + let ( genesis_config, bank_forks, @@ -410,6 +437,9 @@ impl Validator { &start_progress, config.no_poh_speed_test, accounts_package_channel.0.clone(), + accountsdb_plugin_service + .as_ref() + .map(|plugin_service| plugin_service.get_accounts_update_notifier()), ); *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; @@ -545,13 +575,19 @@ impl Validator { )); } - let (confirmed_bank_sender, confirmed_bank_receiver) = unbounded(); - let accountsdb_repl_service = config.accountsdb_repl_service_config.as_ref().map(|accountsdb_repl_service_config| { + let (bank_notification_sender, bank_notification_receiver) = unbounded(); + bank_notification_senders.push(bank_notification_sender); accountsdb_repl_server_factory::AccountsDbReplServerFactory::build_accountsdb_repl_server( - accountsdb_repl_service_config.clone(), confirmed_bank_receiver, bank_forks.clone())}); + accountsdb_repl_service_config.clone(), bank_notification_receiver, bank_forks.clone()) + }); let (bank_notification_sender, bank_notification_receiver) = unbounded(); + let confirmed_bank_subscribers = if !bank_notification_senders.is_empty() { + Some(Arc::new(RwLock::new(bank_notification_senders))) + } else { + None + }; ( Some(JsonRpcService::new( rpc_addr, @@ -596,7 +632,7 @@ impl Validator { bank_forks.clone(), optimistically_confirmed_bank, rpc_subscriptions.clone(), - Some(Arc::new(RwLock::new(vec![confirmed_bank_sender]))), + confirmed_bank_subscribers, )), Some(bank_notification_sender), accountsdb_repl_service, @@ -841,6 +877,7 @@ impl Validator { validator_exit: config.validator_exit.clone(), cluster_info, accountsdb_repl_service, + accountsdb_plugin_service, } } @@ -951,6 +988,12 @@ impl Validator { .join() .expect("accountsdb_repl_service"); } + + if let Some(accountsdb_plugin_service) = self.accountsdb_plugin_service { + accountsdb_plugin_service + .join() + .expect("accountsdb_plugin_service"); + } } } @@ -1093,6 +1136,7 @@ fn new_banks_from_ledger( start_progress: &Arc>, no_poh_speed_test: bool, accounts_package_sender: AccountsPackageSender, + accounts_update_notifier: Option, ) -> ( GenesisConfig, BankForks, @@ -1211,6 +1255,7 @@ fn new_banks_from_ledger( .cache_block_meta_sender .as_ref(), accounts_package_sender, + accounts_update_notifier, ) .unwrap_or_else(|err| { error!("Failed to load ledger: {:?}", err); diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 1cb323ee5..c0e22b8fe 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -203,6 +203,7 @@ mod tests { false, false, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) .unwrap(); @@ -838,6 +839,7 @@ mod tests { false, false, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, )?; assert_eq!(bank, &deserialized_bank); @@ -1016,6 +1018,7 @@ mod tests { false, false, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) .unwrap(); diff --git a/docs/src/developing/backwards-compatibility.md b/docs/src/developing/backwards-compatibility.md index 9f26ad03f..a6db1f094 100644 --- a/docs/src/developing/backwards-compatibility.md +++ b/docs/src/developing/backwards-compatibility.md @@ -76,6 +76,7 @@ Major releases: - [`solana-program`](https://docs.rs/solana-program/) - Rust SDK for writing programs - [`solana-client`](https://docs.rs/solana-client/) - Rust client for connecting to RPC API - [`solana-cli-config`](https://docs.rs/solana-cli-config/) - Rust client for managing Solana CLI config files +- [`solana-accountsdb-plugin-interface`](https://docs.rs/solana-accountsdb-plugin-interface/) - Rust interface for developing Solana AccountsDb plugins. Patch releases: diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index fa0cc8af5..2e334dfe0 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -752,6 +752,7 @@ fn load_bank_forks( None, None, accounts_package_sender, + None, ) } diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 82a9bb8f0..0f2cdf5ef 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -6,11 +6,13 @@ use crate::{ }, leader_schedule_cache::LeaderScheduleCache, }; + use log::*; use solana_entry::entry::VerifyRecyclers; use solana_runtime::{ - bank_forks::BankForks, snapshot_archive_info::SnapshotArchiveInfoGetter, - snapshot_config::SnapshotConfig, snapshot_package::AccountsPackageSender, snapshot_utils, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank_forks::BankForks, + snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, + snapshot_package::AccountsPackageSender, snapshot_utils, }; use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash}; use std::{fs, path::PathBuf, process, result}; @@ -45,6 +47,7 @@ fn to_loadresult( /// /// If a snapshot config is given, and a snapshot is found, it will be loaded. Otherwise, load /// from genesis. +#[allow(clippy::too_many_arguments)] pub fn load( genesis_config: &GenesisConfig, blockstore: &Blockstore, @@ -55,6 +58,7 @@ pub fn load( transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, accounts_package_sender: AccountsPackageSender, + accounts_update_notifier: Option, ) -> LoadResult { if let Some(snapshot_config) = snapshot_config { info!( @@ -80,6 +84,7 @@ pub fn load( transaction_status_sender, cache_block_meta_sender, accounts_package_sender, + accounts_update_notifier, ); } else { info!("No snapshot package available; will load from genesis"); @@ -96,6 +101,7 @@ pub fn load( cache_block_meta_sender, snapshot_config, accounts_package_sender, + accounts_update_notifier, ) } @@ -107,6 +113,7 @@ fn load_from_genesis( cache_block_meta_sender: Option<&CacheBlockMetaSender>, snapshot_config: Option<&SnapshotConfig>, accounts_package_sender: AccountsPackageSender, + accounts_update_notifier: Option, ) -> LoadResult { info!("Processing ledger from genesis"); to_loadresult( @@ -118,6 +125,7 @@ fn load_from_genesis( cache_block_meta_sender, snapshot_config, accounts_package_sender, + accounts_update_notifier, ), None, ) @@ -134,6 +142,7 @@ fn load_from_snapshot( transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, accounts_package_sender: AccountsPackageSender, + accounts_update_notifier: Option, ) -> LoadResult { // Fail hard here if snapshot fails to load, don't silently continue if account_paths.is_empty() { @@ -158,6 +167,7 @@ fn load_from_snapshot( process_options.accounts_db_skip_shrink, process_options.verify_index, process_options.accounts_db_config.clone(), + accounts_update_notifier, ) .expect("Load from snapshot failed"); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 142e0e1c4..6719e0c75 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -18,6 +18,7 @@ use solana_rayon_threadlimit::get_thread_count; use solana_runtime::{ accounts_db::{AccountShrinkThreshold, AccountsDbConfig}, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank::{ Bank, ExecuteTimings, InnerInstructionsList, RentDebits, TransactionBalancesSet, TransactionExecutionResult, TransactionLogMessages, TransactionResults, @@ -483,6 +484,7 @@ pub fn process_blockstore( cache_block_meta_sender: Option<&CacheBlockMetaSender>, snapshot_config: Option<&SnapshotConfig>, accounts_package_sender: AccountsPackageSender, + accounts_update_notifier: Option, ) -> BlockstoreProcessorResult { if let Some(num_threads) = opts.override_num_threads { PAR_THREAD_POOL.with(|pool| { @@ -505,6 +507,7 @@ pub fn process_blockstore( opts.shrink_ratio, false, opts.accounts_db_config.clone(), + accounts_update_notifier, ); let bank0 = Arc::new(bank0); info!("processing ledger for slot 0..."); @@ -1513,6 +1516,7 @@ pub mod tests { None, None, accounts_package_sender, + None, ) .unwrap() } diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 581b7763e..50a5943ac 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -13,6 +13,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { account_shrink_paths: config.account_shrink_paths.clone(), rpc_config: config.rpc_config.clone(), accountsdb_repl_service_config: config.accountsdb_repl_service_config.clone(), + accountsdb_plugin_config_file: config.accountsdb_plugin_config_file.clone(), rpc_addrs: config.rpc_addrs, pubsub_config: config.pubsub_config.clone(), snapshot_config: config.snapshot_config.clone(), diff --git a/replica-lib/Cargo.toml b/replica-lib/Cargo.toml index 0fe45d7d8..addd34489 100644 --- a/replica-lib/Cargo.toml +++ b/replica-lib/Cargo.toml @@ -14,6 +14,7 @@ crossbeam-channel = "0.5" futures-util = "0.3" log = "0.4.11" prost = "0.8.0" +solana-rpc = { path = "../rpc", version = "=1.8.0" } solana-runtime = { path = "../runtime", version = "=1.8.0" } solana-sdk = { path = "../sdk", version = "=1.8.0" } tokio = { version = "1", features = ["full"] } diff --git a/replica-lib/src/accountsdb_repl_server_factory.rs b/replica-lib/src/accountsdb_repl_server_factory.rs index e35d26bbf..5cc1a9008 100644 --- a/replica-lib/src/accountsdb_repl_server_factory.rs +++ b/replica-lib/src/accountsdb_repl_server_factory.rs @@ -5,8 +5,8 @@ use { replica_confirmed_slots_server::ReplicaSlotConfirmationServerImpl, }, crossbeam_channel::Receiver, + solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, solana_runtime::bank_forks::BankForks, - solana_sdk::clock::Slot, std::sync::{Arc, RwLock}, }; @@ -15,7 +15,7 @@ pub struct AccountsDbReplServerFactory {} impl AccountsDbReplServerFactory { pub fn build_accountsdb_repl_server( config: AccountsDbReplServiceConfig, - confirmed_bank_receiver: Receiver, + confirmed_bank_receiver: Receiver, bank_forks: Arc>, ) -> AccountsDbReplService { AccountsDbReplService::new( diff --git a/replica-lib/src/replica_confirmed_slots_server.rs b/replica-lib/src/replica_confirmed_slots_server.rs index 37963632b..017970bd4 100644 --- a/replica-lib/src/replica_confirmed_slots_server.rs +++ b/replica-lib/src/replica_confirmed_slots_server.rs @@ -1,6 +1,7 @@ use { crate::accountsdb_repl_server::{self, ReplicaSlotConfirmationServer}, crossbeam_channel::Receiver, + solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, solana_sdk::{clock::Slot, commitment_config::CommitmentLevel}, std::{ collections::VecDeque, @@ -58,7 +59,7 @@ impl ReplicaSlotConfirmationServer for ReplicaSlotConfirmationServerImpl { const MAX_ELIGIBLE_SLOT_SET_SIZE: usize = 262144; impl ReplicaSlotConfirmationServerImpl { - pub fn new(confirmed_bank_receiver: Receiver) -> Self { + pub fn new(confirmed_bank_receiver: Receiver) -> Self { let eligible_slot_set = ReplicaEligibleSlotSet::default(); let exit_updated_slot_server = Arc::new(AtomicBool::new(false)); @@ -79,7 +80,7 @@ impl ReplicaSlotConfirmationServerImpl { } fn run_confirmed_bank_receiver( - confirmed_bank_receiver: Receiver, + confirmed_bank_receiver: Receiver, eligible_slot_set: ReplicaEligibleSlotSet, exit: Arc, ) -> JoinHandle<()> { @@ -87,7 +88,9 @@ impl ReplicaSlotConfirmationServerImpl { .name("confirmed_bank_receiver".to_string()) .spawn(move || { while !exit.load(Ordering::Relaxed) { - if let Ok(slot) = confirmed_bank_receiver.recv() { + if let Ok(BankNotification::OptimisticallyConfirmed(slot)) = + confirmed_bank_receiver.recv() + { let mut slot_set = eligible_slot_set.slot_set.write().unwrap(); slot_set.push_back((slot, CommitmentLevel::Confirmed)); } diff --git a/replica-node/src/replica_node.rs b/replica-node/src/replica_node.rs index a7a9d2d1a..720692f0d 100644 --- a/replica-node/src/replica_node.rs +++ b/replica-node/src/replica_node.rs @@ -133,6 +133,7 @@ fn initialize_from_snapshot( false, process_options.verify_index, process_options.accounts_db_config, + None, ) .unwrap(); diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index 800f9714d..d5efa8049 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -31,6 +31,7 @@ impl OptimisticallyConfirmedBank { } } +#[derive(Clone)] pub enum BankNotification { OptimisticallyConfirmed(Slot), Frozen(Arc), @@ -63,7 +64,7 @@ impl OptimisticallyConfirmedBankTracker { bank_forks: Arc>, optimistically_confirmed_bank: Arc>, subscriptions: Arc, - confirmed_bank_subscribers: Option>>>>, + bank_notification_subscribers: Option>>>, ) -> Self { let exit_ = exit.clone(); let mut pending_optimistically_confirmed_banks = HashSet::new(); @@ -84,7 +85,7 @@ impl OptimisticallyConfirmedBankTracker { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, - &confirmed_bank_subscribers, + &bank_notification_subscribers, ) { break; } @@ -101,7 +102,7 @@ impl OptimisticallyConfirmedBankTracker { mut pending_optimistically_confirmed_banks: &mut HashSet, mut last_notified_confirmed_slot: &mut Slot, mut highest_confirmed_slot: &mut Slot, - confirmed_bank_subscribers: &Option>>>>, + bank_notification_subscribers: &Option>>>, ) -> Result<(), RecvTimeoutError> { let notification = receiver.recv_timeout(Duration::from_secs(1))?; Self::process_notification( @@ -112,18 +113,37 @@ impl OptimisticallyConfirmedBankTracker { &mut pending_optimistically_confirmed_banks, &mut last_notified_confirmed_slot, &mut highest_confirmed_slot, - confirmed_bank_subscribers, + bank_notification_subscribers, ); Ok(()) } + fn notify_slot_status( + bank_notification_subscribers: &Option>>>, + notifcation: BankNotification, + ) { + if let Some(bank_notification_subscribers) = bank_notification_subscribers { + for sender in bank_notification_subscribers.read().unwrap().iter() { + match sender.send(notifcation.clone()) { + Ok(_) => {} + Err(err) => { + info!( + "Failed to send notification {:?}, error: {:?}", + notifcation, err + ); + } + } + } + } + } + fn notify_or_defer( subscriptions: &Arc, bank_forks: &Arc>, bank: &Arc, last_notified_confirmed_slot: &mut Slot, pending_optimistically_confirmed_banks: &mut HashSet, - confirmed_bank_subscribers: &Option>>>>, + bank_notification_subscribers: &Option>>>, ) { if bank.is_frozen() { if bank.slot() > *last_notified_confirmed_slot { @@ -133,20 +153,10 @@ impl OptimisticallyConfirmedBankTracker { ); subscriptions.notify_gossip_subscribers(bank.slot()); *last_notified_confirmed_slot = bank.slot(); - if let Some(confirmed_bank_subscribers) = confirmed_bank_subscribers { - for sender in confirmed_bank_subscribers.read().unwrap().iter() { - match sender.send(bank.slot()) { - Ok(_) => {} - Err(err) => { - info!( - "Failed to send slot {:} update, error: {:?}", - bank.slot(), - err - ); - } - } - } - } + Self::notify_slot_status( + bank_notification_subscribers, + BankNotification::OptimisticallyConfirmed(bank.slot()), + ); } } else if bank.slot() > bank_forks.read().unwrap().root_bank().slot() { pending_optimistically_confirmed_banks.insert(bank.slot()); @@ -161,7 +171,7 @@ impl OptimisticallyConfirmedBankTracker { slot_threshold: Slot, mut last_notified_confirmed_slot: &mut Slot, mut pending_optimistically_confirmed_banks: &mut HashSet, - confirmed_bank_subscribers: &Option>>>>, + bank_notification_subscribers: &Option>>>, ) { for confirmed_bank in bank.clone().parents_inclusive().iter().rev() { if confirmed_bank.slot() > slot_threshold { @@ -175,7 +185,7 @@ impl OptimisticallyConfirmedBankTracker { confirmed_bank, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, - confirmed_bank_subscribers, + bank_notification_subscribers, ); } } @@ -189,7 +199,7 @@ impl OptimisticallyConfirmedBankTracker { mut pending_optimistically_confirmed_banks: &mut HashSet, mut last_notified_confirmed_slot: &mut Slot, highest_confirmed_slot: &mut Slot, - confirmed_bank_subscribers: &Option>>>>, + bank_notification_subscribers: &Option>>>, ) { debug!("received bank notification: {:?}", notification); match notification { @@ -211,7 +221,7 @@ impl OptimisticallyConfirmedBankTracker { *highest_confirmed_slot, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, - confirmed_bank_subscribers, + bank_notification_subscribers, ); *highest_confirmed_slot = slot; @@ -245,6 +255,11 @@ impl OptimisticallyConfirmedBankTracker { max_transactions_per_entry: bank.transactions_per_entry_max(), }, }); + + Self::notify_slot_status( + bank_notification_subscribers, + BankNotification::Frozen(bank.clone()), + ); } if pending_optimistically_confirmed_banks.remove(&bank.slot()) { @@ -260,7 +275,7 @@ impl OptimisticallyConfirmedBankTracker { *last_notified_confirmed_slot, &mut last_notified_confirmed_slot, &mut pending_optimistically_confirmed_banks, - confirmed_bank_subscribers, + bank_notification_subscribers, ); let mut w_optimistically_confirmed_bank = @@ -272,6 +287,10 @@ impl OptimisticallyConfirmedBankTracker { } } BankNotification::Root(bank) => { + Self::notify_slot_status( + bank_notification_subscribers, + BankNotification::Root(bank.clone()), + ); let root_slot = bank.slot(); let mut w_optimistically_confirmed_bank = optimistically_confirmed_bank.write().unwrap(); diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index f32e769f0..6d1e34c1f 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -5,6 +5,7 @@ use crate::{ ACCOUNTS_DB_CONFIG_FOR_TESTING, }, accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult}, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::Ancestors, bank::{ NonceRollbackFull, NonceRollbackInfo, RentDebits, TransactionCheckResult, @@ -139,6 +140,7 @@ impl Accounts { caching_enabled, shrink_ratio, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) } @@ -156,6 +158,7 @@ impl Accounts { caching_enabled, shrink_ratio, Some(ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS), + None, ) } @@ -166,6 +169,7 @@ impl Accounts { caching_enabled: bool, shrink_ratio: AccountShrinkThreshold, accounts_db_config: Option, + accounts_update_notifier: Option, ) -> Self { Self { accounts_db: Arc::new(AccountsDb::new_with_config( @@ -175,6 +179,7 @@ impl Accounts { caching_enabled, shrink_ratio, accounts_db_config, + accounts_update_notifier, )), account_locks: Mutex::new(AccountLocks::default()), } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index d84f051ec..37efea90a 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -28,6 +28,7 @@ use crate::{ SlotSlice, ZeroLamport, ACCOUNTS_INDEX_CONFIG_FOR_BENCHMARKS, ACCOUNTS_INDEX_CONFIG_FOR_TESTING, }, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::Ancestors, append_vec::{AppendVec, StoredAccountMeta, StoredMeta, StoredMetaWriteVersion}, cache_hash_data::CacheHashData, @@ -1037,6 +1038,9 @@ pub struct AccountsDb { /// Zero-lamport accounts that are *not* purged during clean because they need to stay alive /// for incremental snapshot support. zero_lamport_accounts_to_purge_after_full_snapshot: DashSet<(Slot, Pubkey)>, + + /// AccountsDbPlugin accounts update notifier + accounts_update_notifier: Option, } #[derive(Debug, Default)] @@ -1504,6 +1508,7 @@ impl AccountsDb { shrink_ratio: AccountShrinkThreshold::default(), dirty_stores: DashMap::default(), zero_lamport_accounts_to_purge_after_full_snapshot: DashSet::default(), + accounts_update_notifier: None, } } @@ -1515,6 +1520,7 @@ impl AccountsDb { false, AccountShrinkThreshold::default(), Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) } @@ -1525,6 +1531,7 @@ impl AccountsDb { caching_enabled: bool, shrink_ratio: AccountShrinkThreshold, accounts_db_config: Option, + accounts_update_notifier: Option, ) -> Self { let accounts_index = AccountsIndex::new(accounts_db_config.as_ref().and_then(|x| x.index.clone())); @@ -1539,6 +1546,7 @@ impl AccountsDb { account_indexes, caching_enabled, shrink_ratio, + accounts_update_notifier, ..Self::default_with_accounts_index(accounts_index, accounts_hash_cache_path) } } else { @@ -4205,6 +4213,7 @@ impl AccountsDb { { let stored_size = offsets[1] - offsets[0]; storage.add_account(stored_size); + infos.push(AccountInfo { store_id: storage.append_vec_id(), offset: offsets[0], @@ -4224,6 +4233,7 @@ impl AccountsDb { self.stats .store_find_store .fetch_add(total_storage_find_us, Ordering::Relaxed); + infos } @@ -5921,6 +5931,16 @@ impl AccountsDb { pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { self.store(slot, accounts, self.caching_enabled); + + if let Some(accounts_update_notifier) = &self.accounts_update_notifier { + let notifier = &accounts_update_notifier.read().unwrap(); + + for account in accounts { + let pubkey = account.0; + let account = account.1; + notifier.notify_account_update(slot, pubkey, account); + } + } } /// Store the account update. @@ -6662,6 +6682,24 @@ impl AccountsDb { } } } + + pub fn notify_account_restore_from_snapshot(&self) { + if let Some(accounts_update_notifier) = &self.accounts_update_notifier { + let notifier = &accounts_update_notifier.read().unwrap(); + let slots = self.storage.all_slots(); + for slot in &slots { + let slot_stores = self.storage.get_slot_stores(*slot).unwrap(); + + let slot_stores = slot_stores.read().unwrap(); + for (_, storage_entry) in slot_stores.iter() { + let accounts = storage_entry.all_accounts(); + for account in &accounts { + notifier.notify_account_restore_from_snapshot(*slot, account); + } + } + } + } + } } #[cfg(test)] @@ -6684,6 +6722,7 @@ impl AccountsDb { caching_enabled, shrink_ratio, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) } diff --git a/runtime/src/accounts_update_notifier_interface.rs b/runtime/src/accounts_update_notifier_interface.rs new file mode 100644 index 000000000..70730ef01 --- /dev/null +++ b/runtime/src/accounts_update_notifier_interface.rs @@ -0,0 +1,25 @@ +use { + crate::append_vec::StoredAccountMeta, + solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey}, + std::sync::{Arc, RwLock}, +}; + +pub trait AccountsUpdateNotifierInterface: std::fmt::Debug { + /// Notified when an account is updated at runtime, due to transaction activities + fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData); + + /// Notified when the AccountsDb is initialized at start when restored + /// from a snapshot. + fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta); + + /// Notified when a slot is optimistically confirmed + fn notify_slot_confirmed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is marked frozen. + fn notify_slot_processed(&self, slot: Slot, parent: Option); + + /// Notified when a slot is rooted. + fn notify_slot_rooted(&self, slot: Slot, parent: Option); +} + +pub type AccountsUpdateNotifier = Arc>; diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index c4316841d..3a92d50b9 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -40,6 +40,7 @@ use crate::{ ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS, ACCOUNTS_DB_CONFIG_FOR_TESTING, }, accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult}, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::{Ancestors, AncestorsForSerialization}, blockhash_queue::BlockhashQueue, builtins::{self, ActivationType, Builtin, Builtins}, @@ -1138,6 +1139,7 @@ impl Bank { shrink_ratio, debug_do_not_add_builtins, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) } @@ -1163,6 +1165,7 @@ impl Bank { shrink_ratio, debug_do_not_add_builtins, Some(ACCOUNTS_DB_CONFIG_FOR_BENCHMARKS), + None, ) } @@ -1178,6 +1181,7 @@ impl Bank { shrink_ratio: AccountShrinkThreshold, debug_do_not_add_builtins: bool, accounts_db_config: Option, + accounts_update_notifier: Option, ) -> Self { let accounts = Accounts::new_with_config( paths, @@ -1186,6 +1190,7 @@ impl Bank { accounts_db_caching_enabled, shrink_ratio, accounts_db_config, + accounts_update_notifier, ); let mut bank = Self::default_with_accounts(accounts); bank.ancestors = Ancestors::from(vec![bank.slot()]); diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 4e1a464a2..6ff0f4ea4 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -7,6 +7,7 @@ pub mod accounts_db; pub mod accounts_hash; pub mod accounts_index; pub mod accounts_index_storage; +pub mod accounts_update_notifier_interface; pub mod ancestors; pub mod append_vec; pub mod bank; diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 40743bf4c..6e0378a45 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -6,6 +6,7 @@ use { BankHashInfo, }, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, ancestors::Ancestors, append_vec::{AppendVec, StoredMetaWriteVersion}, bank::{Bank, BankFieldsToDeserialize, BankRc}, @@ -204,6 +205,7 @@ pub(crate) fn bank_from_streams( shrink_ratio: AccountShrinkThreshold, verify_index: bool, accounts_db_config: Option, + accounts_update_notifier: Option, ) -> std::result::Result where R: Read, @@ -242,6 +244,7 @@ where shrink_ratio, verify_index, accounts_db_config, + accounts_update_notifier, )?; Ok(bank) }}; @@ -335,6 +338,7 @@ fn reconstruct_bank_from_fields( shrink_ratio: AccountShrinkThreshold, verify_index: bool, accounts_db_config: Option, + accounts_update_notifier: Option, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -350,6 +354,7 @@ where shrink_ratio, verify_index, accounts_db_config, + accounts_update_notifier, )?; accounts_db.freeze_accounts( &Ancestors::from(&bank_fields.ancestors), @@ -404,6 +409,7 @@ fn reconstruct_accountsdb_from_fields( shrink_ratio: AccountShrinkThreshold, verify_index: bool, accounts_db_config: Option, + accounts_update_notifier: Option, ) -> Result where E: SerializableStorage + std::marker::Sync, @@ -415,6 +421,7 @@ where caching_enabled, shrink_ratio, accounts_db_config, + accounts_update_notifier, ); let AccountsDbFields( @@ -530,6 +537,10 @@ where .fetch_add(snapshot_version, Ordering::Relaxed); accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index); + let mut measure_notify = Measure::start("accounts_notify"); + accounts_db.notify_account_restore_from_snapshot(); + measure_notify.stop(); + datapoint_info!( "reconstruct_accountsdb_from_fields()", ("remap-time-us", measure_remap.as_us(), i64), @@ -538,6 +549,7 @@ where num_collisions.load(Ordering::Relaxed), i64 ), + ("accountsdb-notify-at-start-us", measure_notify.as_us(), i64), ); Ok(accounts_db) diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index 5e4bdfab4..c79812f24 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -84,6 +84,7 @@ where AccountShrinkThreshold::default(), false, Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) } @@ -246,6 +247,7 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) { AccountShrinkThreshold::default(), false, Some(crate::accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) .unwrap(); dbank.src = ref_sc; diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index d27623aec..a75c99e2f 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -2,6 +2,7 @@ use { crate::{ accounts_db::{AccountShrinkThreshold, AccountsDbConfig}, accounts_index::AccountSecondaryIndexes, + accounts_update_notifier_interface::AccountsUpdateNotifier, bank::{Bank, BankSlotDelta}, builtins::Builtins, hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap}, @@ -734,6 +735,7 @@ pub fn bank_from_snapshot_archives( accounts_db_skip_shrink: bool, verify_index: bool, accounts_db_config: Option, + accounts_update_notifier: Option, ) -> Result<(Bank, BankFromArchiveTimings)> { check_are_snapshots_compatible( full_snapshot_archive_info, @@ -798,6 +800,7 @@ pub fn bank_from_snapshot_archives( shrink_ratio, verify_index, accounts_db_config, + accounts_update_notifier, )?; measure_rebuild.stop(); info!("{}", measure_rebuild); @@ -844,6 +847,7 @@ pub fn bank_from_latest_snapshot_archives( accounts_db_skip_shrink: bool, verify_index: bool, accounts_db_config: Option, + accounts_update_notifier: Option, ) -> Result<( Bank, BankFromArchiveTimings, @@ -887,6 +891,7 @@ pub fn bank_from_latest_snapshot_archives( accounts_db_skip_shrink, verify_index, accounts_db_config, + accounts_update_notifier, )?; verify_bank_against_expected_slot_hash( @@ -1424,6 +1429,7 @@ fn rebuild_bank_from_snapshots( shrink_ratio: AccountShrinkThreshold, verify_index: bool, accounts_db_config: Option, + accounts_update_notifier: Option, ) -> Result { let (full_snapshot_version, full_snapshot_root_paths) = verify_unpacked_snapshots_dir_and_version( @@ -1472,6 +1478,7 @@ fn rebuild_bank_from_snapshots( shrink_ratio, verify_index, accounts_db_config, + accounts_update_notifier, ), }?, ) @@ -2645,6 +2652,7 @@ mod tests { false, false, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) .unwrap(); @@ -2736,6 +2744,7 @@ mod tests { false, false, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) .unwrap(); @@ -2846,6 +2855,7 @@ mod tests { false, false, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) .unwrap(); @@ -2945,6 +2955,7 @@ mod tests { false, false, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) .unwrap(); @@ -3086,6 +3097,7 @@ mod tests { false, false, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) .unwrap(); assert_eq!( @@ -3148,6 +3160,7 @@ mod tests { false, false, Some(ACCOUNTS_DB_CONFIG_FOR_TESTING), + None, ) .unwrap(); assert_eq!( diff --git a/validator/src/main.rs b/validator/src/main.rs index 877de430d..d68d0a13b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1865,6 +1865,14 @@ pub fn main() { .hidden(true) .help("Number of threads to use for servicing AccountsDb Replication requests"), ) + .arg( + Arg::with_name("accountsdb_plugin_config") + .long("accountsdb-plugin-config") + .value_name("FILE") + .takes_value(true) + .hidden(true) + .help("Specify the configuration file for the AccountsDb plugin."), + ) .arg( Arg::with_name("halt_on_trusted_validators_accounts_hash_mismatch") .alias("halt-on-trusted-validators-accounts-hash-mismatch") @@ -2579,6 +2587,10 @@ pub fn main() { None }; + let accountsdb_plugin_config_file = matches + .value_of("accountsdb_plugin_config") + .map(PathBuf::from); + let mut validator_config = ValidatorConfig { require_tower: matches.is_present("require_tower"), tower_storage, @@ -2620,6 +2632,7 @@ pub fn main() { rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"), }, accountsdb_repl_service_config, + accountsdb_plugin_config_file, rpc_addrs: value_t!(matches, "rpc_port", u16).ok().map(|rpc_port| { ( SocketAddr::new(rpc_bind_address, rpc_port),