diff --git a/Cargo.lock b/Cargo.lock index 01e4ebfe21..0368819422 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2949,18 +2949,18 @@ dependencies = [ [[package]] name = "phf" -version = "0.8.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" +checksum = "b9fc3db1018c4b59d7d582a739436478b6035138b6aecbce989fc91c3e98409f" dependencies = [ "phf_shared", ] [[package]] name = "phf_shared" -version = "0.8.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" +checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" dependencies = [ "siphasher", ] @@ -3076,9 +3076,9 @@ dependencies = [ [[package]] name = "postgres-types" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +checksum = "04619f94ba0cc80999f4fc7073607cb825bc739a883cb6d20900fc5e009d6b0d" dependencies = [ "bytes 1.0.1", "chrono", @@ -4287,9 +4287,11 @@ dependencies = [ "serde_json", "solana-accountsdb-plugin-interface", "solana-logger 1.9.0", + "solana-measure", "solana-metrics", "solana-sdk", "thiserror", + "tokio-postgres", ] [[package]] @@ -6452,9 +6454,9 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2b1383c7e4fb9a09e292c7c6afb7da54418d53b045f1c1fac7a911411a2b8b" +checksum = "2f916ee7e52c8a74dfe4162dd73a073d0d7d4b387ea7b97a774c0c10b0776531" dependencies = [ "async-trait", "byteorder", diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs index b75914bf34..6e5c8637c9 100644 --- a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -25,19 +25,19 @@ pub enum ReplicaAccountInfoVersions<'a> { #[derive(Error, Debug)] pub enum AccountsDbPluginError { - #[error("Error opening config file.")] + #[error("Error opening config file. Error detail: ({0}).")] ConfigFileOpenError(#[from] io::Error), - #[error("Error reading config file.")] + #[error("Error reading config file. Error message: ({msg})")] ConfigFileReadError { msg: String }, - #[error("Error updating account.")] + #[error("Error updating account. Error message: ({msg})")] AccountsUpdateError { msg: String }, - #[error("Error updating slot status.")] + #[error("Error updating slot status. Error message: ({msg})")] SlotStatusUpdateError { msg: String }, - #[error("Plugin-defined custom error.")] + #[error("Plugin-defined custom error. Error message: ({0})")] Custom(Box), } @@ -78,7 +78,15 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug { fn on_unload(&mut self) {} /// Called when an account is updated at a slot. - fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()>; + fn update_account( + &mut self, + account: ReplicaAccountInfoVersions, + slot: u64, + is_startup: bool, + ) -> Result<()>; + + /// Called when all accounts are notified of during startup. + fn notify_end_of_startup(&mut self) -> Result<()>; /// Called when a slot status is updated fn update_slot_status( diff --git a/accountsdb-plugin-manager/src/accounts_update_notifier.rs b/accountsdb-plugin-manager/src/accounts_update_notifier.rs index dc1204703d..7a835e09fb 100644 --- a/accountsdb-plugin-manager/src/accounts_update_notifier.rs +++ b/accountsdb-plugin-manager/src/accounts_update_notifier.rs @@ -26,13 +26,65 @@ pub(crate) struct AccountsUpdateNotifierImpl { impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) { if let Some(account_info) = self.accountinfo_from_shared_account_data(pubkey, account) { - self.notify_plugins_of_account_update(account_info, slot); + self.notify_plugins_of_account_update(account_info, slot, false); } } fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) { - if let Some(account_info) = self.accountinfo_from_stored_account_meta(account) { - self.notify_plugins_of_account_update(account_info, slot); + let mut measure_all = Measure::start("accountsdb-plugin-notify-account-restore-all"); + let mut measure_copy = Measure::start("accountsdb-plugin-copy-stored-account-info"); + + let account = self.accountinfo_from_stored_account_meta(account); + measure_copy.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-copy-stored-account-info-us", + measure_copy.as_us() as usize, + 100000, + 100000 + ); + + if let Some(account_info) = account { + self.notify_plugins_of_account_update(account_info, slot, true); + } + measure_all.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-notify-account-restore-all-us", + measure_all.as_us() as usize, + 100000, + 100000 + ); + } + + fn notify_end_of_restore_from_snapshot(&self) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter_mut() { + let mut measure = Measure::start("accountsdb-plugin-end-of-restore-from-snapshot"); + match plugin.notify_end_of_startup() { + Err(err) => { + error!( + "Failed to notify the end of restore from snapshot, error: {} to plugin {}", + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully notified the end of restore from snapshot to plugin {}", + plugin.name() + ); + } + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-end-of-restore-from-snapshot", + measure.as_us() as usize + ); } } @@ -83,7 +135,13 @@ impl AccountsUpdateNotifierImpl { }) } - fn notify_plugins_of_account_update(&self, account: ReplicaAccountInfo, slot: Slot) { + fn notify_plugins_of_account_update( + &self, + account: ReplicaAccountInfo, + slot: Slot, + is_startup: bool, + ) { + let mut measure2 = Measure::start("accountsdb-plugin-notify_plugins_of_account_update"); let mut plugin_manager = self.plugin_manager.write().unwrap(); if plugin_manager.plugins.is_empty() { @@ -91,7 +149,11 @@ impl AccountsUpdateNotifierImpl { } for plugin in plugin_manager.plugins.iter_mut() { let mut measure = Measure::start("accountsdb-plugin-update-account"); - match plugin.update_account(ReplicaAccountInfoVersions::V0_0_1(&account), slot) { + match plugin.update_account( + ReplicaAccountInfoVersions::V0_0_1(&account), + slot, + is_startup, + ) { Err(err) => { error!( "Failed to update account {} at slot {}, error: {} to plugin {}", @@ -111,13 +173,20 @@ impl AccountsUpdateNotifierImpl { } } measure.stop(); - inc_new_counter_info!( - "accountsdb-plugin-update-account-ms", - measure.as_ms() as usize, + inc_new_counter_debug!( + "accountsdb-plugin-update-account-us", + measure.as_us() as usize, 100000, 100000 ); } + measure2.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-notify_plugins_of_account_update-us", + measure2.as_us() as usize, + 100000, + 100000 + ); } pub fn notify_slot_status(&self, slot: Slot, parent: Option, slot_status: SlotStatus) { @@ -146,9 +215,9 @@ impl AccountsUpdateNotifierImpl { } } measure.stop(); - inc_new_counter_info!( - "accountsdb-plugin-update-slot-ms", - measure.as_ms() as usize, + inc_new_counter_debug!( + "accountsdb-plugin-update-slot-us", + measure.as_us() as usize, 1000, 1000 ); diff --git a/accountsdb-plugin-postgres/Cargo.toml b/accountsdb-plugin-postgres/Cargo.toml index e9e2d86a79..434383d3d6 100644 --- a/accountsdb-plugin-postgres/Cargo.toml +++ b/accountsdb-plugin-postgres/Cargo.toml @@ -23,9 +23,10 @@ serde_derive = "1.0.103" serde_json = "1.0.67" solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.9.0" } solana-logger = { path = "../logger", version = "=1.9.0" } +solana-measure = { path = "../measure", version = "=1.9.0" } solana-metrics = { path = "../metrics", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" } thiserror = "1.0.30" - +tokio-postgres = "0.7.3" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs index 950e8b7ed8..8dfa719e4f 100644 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -1,10 +1,10 @@ +use solana_measure::measure::Measure; + /// Main entry for the PostgreSQL plugin use { crate::{ accounts_selector::AccountsSelector, - postgres_client::{ - ParallelPostgresClient, PostgresClient, PostgresClientBuilder, SimplePostgresClient, - }, + postgres_client::{ParallelPostgresClient, PostgresClientBuilder}, }, bs58, log::*, @@ -13,19 +13,14 @@ use { solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result, SlotStatus, }, + solana_metrics::*, std::{fs::File, io::Read}, thiserror::Error, }; -#[allow(clippy::large_enum_variant)] -enum PostgresClientEnum { - Simple(SimplePostgresClient), - Parallel(ParallelPostgresClient), -} - #[derive(Default)] pub struct AccountsDbPluginPostgres { - client: Option, + client: Option, accounts_selector: Option, } @@ -41,14 +36,15 @@ pub struct AccountsDbPluginPostgresConfig { pub user: String, pub threads: Option, pub port: Option, + pub batch_size: Option, } #[derive(Error, Debug)] pub enum AccountsDbPluginPostgresError { - #[error("Error connecting to the backend data store.")] + #[error("Error connecting to the backend data store. Error message: ({msg})")] DataStoreConnectionError { msg: String }, - #[error("Error preparing data store schema.")] + #[error("Error preparing data store schema. Error message: ({msg})")] DataSchemaError { msg: String }, } @@ -79,7 +75,9 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { /// "host" specifies the PostgreSQL server. /// "user" specifies the PostgreSQL user. /// "threads" optional, specifies the number of worker threads for the plugin. A thread - /// maintains a PostgreSQL connection to the server. + /// maintains a PostgreSQL connection to the server. The default is 10. + /// "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created + /// from restoring a snapshot. The default is "10". /// # Examples /// { /// "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so", @@ -116,13 +114,8 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { }) } Ok(config) => { - self.client = if config.threads.is_some() && config.threads.unwrap() > 1 { - let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?; - Some(PostgresClientEnum::Parallel(client)) - } else { - let client = PostgresClientBuilder::build_simple_postgres_client(&config)?; - Some(PostgresClientEnum::Simple(client)) - }; + let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?; + self.client = Some(client); } } @@ -134,18 +127,23 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { match &mut self.client { None => {} - Some(client) => match client { - PostgresClientEnum::Parallel(client) => client.join().unwrap(), - PostgresClientEnum::Simple(client) => { - client.join().unwrap(); - } - }, + Some(client) => { + client.join().unwrap(); + } } } - fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()> { + fn update_account( + &mut self, + account: ReplicaAccountInfoVersions, + slot: u64, + is_startup: bool, + ) -> Result<()> { + let mut measure_all = Measure::start("accountsdb-plugin-postgres-update-account-main"); match account { ReplicaAccountInfoVersions::V0_0_1(account) => { + let mut measure_select = + Measure::start("accountsdb-plugin-postgres-update-account-select"); if let Some(accounts_selector) = &self.accounts_selector { if !accounts_selector.is_account_selected(account.pubkey, account.owner) { return Ok(()); @@ -153,6 +151,13 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { } else { return Ok(()); } + measure_select.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-select-us", + measure_select.as_us() as usize, + 100000, + 100000 + ); debug!( "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}", @@ -172,14 +177,17 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { ))); } Some(client) => { - let result = match client { - PostgresClientEnum::Parallel(client) => { - client.update_account(account, slot) - } - PostgresClientEnum::Simple(client) => { - client.update_account(account, slot) - } - }; + let mut measure_update = + Measure::start("accountsdb-plugin-postgres-update-account-client"); + let result = { client.update_account(account, slot, is_startup) }; + measure_update.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-client-us", + measure_update.as_us() as usize, + 100000, + 100000 + ); if let Err(err) = result { return Err(AccountsDbPluginError::AccountsUpdateError { @@ -190,6 +198,16 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { } } } + + measure_all.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-main-us", + measure_all.as_us() as usize, + 100000, + 100000 + ); + Ok(()) } @@ -210,14 +228,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { ))); } Some(client) => { - let result = match client { - PostgresClientEnum::Parallel(client) => { - client.update_slot_status(slot, parent, status) - } - PostgresClientEnum::Simple(client) => { - client.update_slot_status(slot, parent, status) - } - }; + let result = client.update_slot_status(slot, parent, status); if let Err(err) = result { return Err(AccountsDbPluginError::SlotStatusUpdateError{ @@ -229,6 +240,29 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { Ok(()) } + + fn notify_end_of_startup(&mut self) -> Result<()> { + info!("Notifying the end of startup for accounts notifications"); + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database.".to_string(), + }, + ))); + } + Some(client) => { + let result = client.notify_end_of_startup(); + + if let Err(err) = result { + return Err(AccountsDbPluginError::SlotStatusUpdateError{ + msg: format!("Failed to notify the end of startup for accounts notifications. Error: {:?}", err) + }); + } + } + } + Ok(()) + } } impl AccountsDbPluginPostgres { diff --git a/accountsdb-plugin-postgres/src/postgres_client.rs b/accountsdb-plugin-postgres/src/postgres_client.rs index 05e3043e29..1474a6858d 100644 --- a/accountsdb-plugin-postgres/src/postgres_client.rs +++ b/accountsdb-plugin-postgres/src/postgres_client.rs @@ -1,3 +1,5 @@ +#![allow(clippy::integer_arithmetic)] + /// A concurrent implementation for writing accounts into the PostgreSQL in parallel. use { crate::accountsdb_plugin_postgres::{ @@ -10,34 +12,44 @@ use { solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ AccountsDbPluginError, ReplicaAccountInfo, SlotStatus, }, - solana_metrics::datapoint_info, + solana_measure::measure::Measure, + solana_metrics::*, solana_sdk::timing::AtomicInterval, std::{ sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex, }, - thread::{self, Builder, JoinHandle}, + thread::{self, sleep, Builder, JoinHandle}, time::Duration, }, + tokio_postgres::types::ToSql, }; /// The maximum asynchronous requests allowed in the channel to avoid excessive /// memory usage. The downside -- calls after this threshold is reached can get blocked. -const MAX_ASYNC_REQUESTS: usize = 10240; +const MAX_ASYNC_REQUESTS: usize = 40960; const DEFAULT_POSTGRES_PORT: u16 = 5432; +const DEFAULT_THREADS_COUNT: usize = 100; +const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10; +const ACCOUNT_COLUMN_COUNT: usize = 8; struct PostgresSqlClientWrapper { client: Client, update_account_stmt: Statement, + bulk_account_insert_stmt: Statement, } pub struct SimplePostgresClient { + batch_size: usize, + pending_account_updates: Vec, client: Mutex, } struct PostgresClientWorker { client: SimplePostgresClient, + /// Indicating if accounts notification during startup is done. + is_startup_done: bool, } impl Eq for DbAccountInfo {} @@ -45,23 +57,25 @@ impl Eq for DbAccountInfo {} #[derive(Clone, PartialEq, Debug)] pub struct DbAccountInfo { pub pubkey: Vec, - pub lamports: u64, + pub lamports: i64, pub owner: Vec, pub executable: bool, - pub rent_epoch: u64, + pub rent_epoch: i64, pub data: Vec, + pub slot: i64, } impl DbAccountInfo { - fn new(account: &T) -> DbAccountInfo { + fn new(account: &T, slot: u64) -> DbAccountInfo { let data = account.data().to_vec(); Self { pubkey: account.pubkey().to_vec(), - lamports: account.lamports(), + lamports: account.lamports() as i64, owner: account.owner().to_vec(), executable: account.executable(), - rent_epoch: account.rent_epoch(), + rent_epoch: account.rent_epoch() as i64, data, + slot: slot as i64, } } } @@ -69,9 +83,9 @@ impl DbAccountInfo { pub trait ReadableAccountInfo: Sized { fn pubkey(&self) -> &[u8]; fn owner(&self) -> &[u8]; - fn lamports(&self) -> u64; + fn lamports(&self) -> i64; fn executable(&self) -> bool; - fn rent_epoch(&self) -> u64; + fn rent_epoch(&self) -> i64; fn data(&self) -> &[u8]; } @@ -84,7 +98,7 @@ impl ReadableAccountInfo for DbAccountInfo { &self.owner } - fn lamports(&self) -> u64 { + fn lamports(&self) -> i64 { self.lamports } @@ -92,7 +106,7 @@ impl ReadableAccountInfo for DbAccountInfo { self.executable } - fn rent_epoch(&self) -> u64 { + fn rent_epoch(&self) -> i64 { self.rent_epoch } @@ -110,16 +124,16 @@ impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> { self.owner } - fn lamports(&self) -> u64 { - self.lamports + fn lamports(&self) -> i64 { + self.lamports as i64 } fn executable(&self) -> bool { self.executable } - fn rent_epoch(&self) -> u64 { - self.rent_epoch + fn rent_epoch(&self) -> i64 { + self.rent_epoch as i64 } fn data(&self) -> &[u8] { @@ -132,10 +146,10 @@ pub trait PostgresClient { Ok(()) } - fn update_account( + fn update_account( &mut self, - account: &T, - slot: u64, + account: DbAccountInfo, + is_startup: bool, ) -> Result<(), AccountsDbPluginError>; fn update_slot_status( @@ -144,73 +158,120 @@ pub trait PostgresClient { parent: Option, status: SlotStatus, ) -> Result<(), AccountsDbPluginError>; + + fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError>; } impl SimplePostgresClient { - pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { + fn connect_to_db( + config: &AccountsDbPluginPostgresConfig, + ) -> Result { let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT); let connection_str = format!("host={} user={} port={}", config.host, config.user, port); match Client::connect(&connection_str, NoTls) { Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataStoreConnectionError { - msg: format!( + let msg = format!( "Error in connecting to the PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, connection_str + err, config.host, config.user, connection_str); + error!("{}", msg); + Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { msg }, + ))) + } + Ok(client) => Ok(client), + } + } + + fn build_bulk_account_insert_statement( + client: &mut Client, + config: &AccountsDbPluginPostgresConfig, + ) -> Result { + let batch_size = config + .batch_size + .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE); + let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) VALUES"); + for j in 0..batch_size { + let row = j * ACCOUNT_COLUMN_COUNT; + let val_str = format!( + "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})", + row + 1, + row + 2, + row + 3, + row + 4, + row + 5, + row + 6, + row + 7, + row + 8, + ); + + if j == 0 { + stmt = format!("{} {}", &stmt, val_str); + } else { + stmt = format!("{}, {}", &stmt, val_str); + } + } + + let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \ + data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot"; + + stmt = format!("{} {}", stmt, handle_conflict); + + info!("{}", stmt); + let bulk_stmt = client.prepare(&stmt); + + match bulk_stmt { + Err(err) => { + return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the accounts update PostgreSQL database: {} host: {} user: {} config: {:?}", + err, config.host, config.user, config ), }))); } - Ok(mut client) => { - let result = client.prepare("INSERT INTO account (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ - ON CONFLICT (pubkey) DO UPDATE SET slot=$2, owner=$3, lamports=$4, executable=$5, rent_epoch=$6, \ - data=$7, updated_on=$8"); - - match result { - Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the accounts update PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, connection_str - ), - }))); - } - Ok(update_account_stmt) => Ok(Self { - client: Mutex::new(PostgresSqlClientWrapper { - client, - update_account_stmt, - }), - }), - } - } + Ok(update_account_stmt) => Ok(update_account_stmt), } } -} -impl PostgresClient for SimplePostgresClient { - fn update_account( - &mut self, - account: &T, - slot: u64, + fn build_single_account_upsert_statement( + client: &mut Client, + config: &AccountsDbPluginPostgresConfig, + ) -> Result { + let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ + ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \ + data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot"; + + let stmt = client.prepare(stmt); + + match stmt { + Err(err) => { + return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the accounts update PostgreSQL database: {} host: {} user: {} config: {:?}", + err, config.host, config.user, config + ), + }))); + } + Ok(update_account_stmt) => Ok(update_account_stmt), + } + } + + /// Internal function for updating or inserting a single account + fn upsert_account_internal( + account: &DbAccountInfo, + statement: &Statement, + client: &mut Client, ) -> Result<(), AccountsDbPluginError> { - trace!( - "Updating account {} with owner {} at slot {}", - bs58::encode(account.pubkey()).into_string(), - bs58::encode(account.owner()).into_string(), - slot, - ); - - let slot = slot as i64; // postgres only supports i64 let lamports = account.lamports() as i64; let rent_epoch = account.rent_epoch() as i64; let updated_on = Utc::now().naive_utc(); - let client = self.client.get_mut().unwrap(); - let result = client.client.query( - &client.update_account_stmt, + let result = client.query( + statement, &[ &account.pubkey(), - &slot, + &account.slot, &account.owner(), &lamports, &account.executable(), @@ -228,9 +289,141 @@ impl PostgresClient for SimplePostgresClient { error!("{}", msg); return Err(AccountsDbPluginError::AccountsUpdateError { msg }); } + Ok(()) } + /// Update or insert a single account + fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> { + let client = self.client.get_mut().unwrap(); + let statement = &client.update_account_stmt; + let client = &mut client.client; + Self::upsert_account_internal(account, statement, client) + } + + /// Insert accounts in batch to reduce network overhead + fn insert_accounts_in_batch( + &mut self, + account: DbAccountInfo, + ) -> Result<(), AccountsDbPluginError> { + self.pending_account_updates.push(account); + + if self.pending_account_updates.len() == self.batch_size { + let mut measure = Measure::start("accountsdb-plugin-postgres-prepare-values"); + + let mut values: Vec<&(dyn ToSql + Sync)> = + Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT); + let updated_on = Utc::now().naive_utc(); + for j in 0..self.batch_size { + let account = &self.pending_account_updates[j]; + + values.push(&account.pubkey); + values.push(&account.slot); + values.push(&account.owner); + values.push(&account.lamports); + values.push(&account.executable); + values.push(&account.rent_epoch); + values.push(&account.data); + values.push(&updated_on); + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-prepare-values-us", + measure.as_us() as usize, + 10000, + 10000 + ); + + let mut measure = Measure::start("accountsdb-plugin-postgres-update-account"); + let client = self.client.get_mut().unwrap(); + let result = client + .client + .query(&client.bulk_account_insert_stmt, &values); + + self.pending_account_updates.clear(); + if let Err(err) = result { + let msg = format!( + "Failed to persist the update of account to the PostgreSQL database. Error: {:?}", + err + ); + error!("{}", msg); + return Err(AccountsDbPluginError::AccountsUpdateError { msg }); + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-us", + measure.as_us() as usize, + 10000, + 10000 + ); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-count", + self.batch_size, + 10000, + 10000 + ); + } + Ok(()) + } + + /// Flush any left over accounts in batch which are not processed in the last batch + fn flush_buffered_writes(&mut self) -> Result<(), AccountsDbPluginError> { + if self.pending_account_updates.is_empty() { + return Ok(()); + } + + let client = self.client.get_mut().unwrap(); + let statement = &client.update_account_stmt; + let client = &mut client.client; + + for account in self.pending_account_updates.drain(..) { + Self::upsert_account_internal(&account, statement, client)?; + } + + Ok(()) + } + + pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { + info!("Creating SimplePostgresClient..."); + let mut client = Self::connect_to_db(config)?; + let bulk_account_insert_stmt = + Self::build_bulk_account_insert_statement(&mut client, config)?; + let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?; + + let batch_size = config + .batch_size + .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE); + info!("Created SimplePostgresClient."); + Ok(Self { + batch_size, + pending_account_updates: Vec::with_capacity(batch_size), + client: Mutex::new(PostgresSqlClientWrapper { + client, + update_account_stmt, + bulk_account_insert_stmt, + }), + }) + } +} + +impl PostgresClient for SimplePostgresClient { + fn update_account( + &mut self, + account: DbAccountInfo, + is_startup: bool, + ) -> Result<(), AccountsDbPluginError> { + trace!( + "Updating account {} with owner {} at slot {}", + bs58::encode(account.pubkey()).into_string(), + bs58::encode(account.owner()).into_string(), + account.slot, + ); + if !is_startup { + return self.upsert_account(&account); + } + self.insert_accounts_in_batch(account) + } + fn update_slot_status( &mut self, slot: u64, @@ -289,11 +482,15 @@ impl PostgresClient for SimplePostgresClient { Ok(()) } + + fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> { + self.flush_buffered_writes() + } } struct UpdateAccountRequest { account: DbAccountInfo, - slot: u64, + is_startup: bool, } struct UpdateSlotRequest { @@ -309,22 +506,41 @@ enum DbWorkItem { impl PostgresClientWorker { fn new(config: AccountsDbPluginPostgresConfig) -> Result { - let client = SimplePostgresClient::new(&config)?; - Ok(PostgresClientWorker { client }) + let result = SimplePostgresClient::new(&config); + match result { + Ok(client) => Ok(PostgresClientWorker { + client, + is_startup_done: false, + }), + Err(err) => { + error!("Error in creating SimplePostgresClient: {}", err); + Err(err) + } + } } fn do_work( &mut self, receiver: Receiver, exit_worker: Arc, + is_startup_done: Arc, + startup_done_count: Arc, ) -> Result<(), AccountsDbPluginError> { while !exit_worker.load(Ordering::Relaxed) { + let mut measure = Measure::start("accountsdb-plugin-postgres-worker-recv"); let work = receiver.recv_timeout(Duration::from_millis(500)); - + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-worker-recv-us", + measure.as_us() as usize, + 100000, + 100000 + ); match work { Ok(work) => match work { DbWorkItem::UpdateAccount(request) => { - self.client.update_account(&request.account, request.slot)?; + self.client + .update_account(request.account, request.is_startup)?; } DbWorkItem::UpdateSlot(request) => { self.client.update_slot_status( @@ -336,6 +552,12 @@ impl PostgresClientWorker { }, Err(err) => match err { RecvTimeoutError::Timeout => { + if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) { + self.client.notify_end_of_startup()?; + self.is_startup_done = true; + startup_done_count.fetch_add(1, Ordering::Relaxed); + } + continue; } _ => { @@ -351,43 +573,67 @@ impl PostgresClientWorker { pub struct ParallelPostgresClient { workers: Vec>>, exit_worker: Arc, + is_startup_done: Arc, + startup_done_count: Arc, + initialized_worker_count: Arc, sender: Sender, last_report: AtomicInterval, } impl ParallelPostgresClient { pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { + info!("Creating ParallelPostgresClient..."); let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS); let exit_worker = Arc::new(AtomicBool::new(false)); let mut workers = Vec::default(); - - for i in 0..config.threads.unwrap() { + let is_startup_done = Arc::new(AtomicBool::new(false)); + let startup_done_count = Arc::new(AtomicUsize::new(0)); + let worker_count = config.threads.unwrap_or(DEFAULT_THREADS_COUNT); + let initialized_worker_count = Arc::new(AtomicUsize::new(0)); + for i in 0..worker_count { let cloned_receiver = receiver.clone(); let exit_clone = exit_worker.clone(); + let is_startup_done_clone = is_startup_done.clone(); + let startup_done_count_clone = startup_done_count.clone(); + let initialized_worker_count_clone = initialized_worker_count.clone(); let config = config.clone(); let worker = Builder::new() .name(format!("worker-{}", i)) .spawn(move || -> Result<(), AccountsDbPluginError> { - let mut worker = PostgresClientWorker::new(config)?; - worker.do_work(cloned_receiver, exit_clone)?; - Ok(()) + let result = PostgresClientWorker::new(config); + + match result { + Ok(mut worker) => { + initialized_worker_count_clone.fetch_add(1, Ordering::Relaxed); + worker.do_work( + cloned_receiver, + exit_clone, + is_startup_done_clone, + startup_done_count_clone, + )?; + Ok(()) + } + Err(err) => Err(err), + } }) .unwrap(); workers.push(worker); } + info!("Created ParallelPostgresClient."); Ok(Self { last_report: AtomicInterval::default(), workers, exit_worker, + is_startup_done, + startup_done_count, + initialized_worker_count, sender, }) } -} -impl PostgresClient for ParallelPostgresClient { - fn join(&mut self) -> thread::Result<()> { + pub fn join(&mut self) -> thread::Result<()> { self.exit_worker.store(true, Ordering::Relaxed); while !self.workers.is_empty() { let worker = self.workers.pop(); @@ -404,24 +650,36 @@ impl PostgresClient for ParallelPostgresClient { Ok(()) } - fn update_account( + pub fn update_account( &mut self, - account: &T, + account: &ReplicaAccountInfo, slot: u64, + is_startup: bool, ) -> Result<(), AccountsDbPluginError> { if self.last_report.should_update(30000) { - datapoint_info!( + datapoint_debug!( "postgres-plugin-stats", ("message-queue-length", self.sender.len() as i64, i64), ); } - if let Err(err) = self - .sender - .send(DbWorkItem::UpdateAccount(UpdateAccountRequest { - account: DbAccountInfo::new(account), - slot, - })) - { + let mut measure = Measure::start("accountsdb-plugin-posgres-create-work-item"); + let wrk_item = DbWorkItem::UpdateAccount(UpdateAccountRequest { + account: DbAccountInfo::new(account, slot), + is_startup, + }); + + measure.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-posgres-create-work-item-us", + measure.as_us() as usize, + 100000, + 100000 + ); + + let mut measure = Measure::start("accountsdb-plugin-posgres-send-msg"); + + if let Err(err) = self.sender.send(wrk_item) { return Err(AccountsDbPluginError::AccountsUpdateError { msg: format!( "Failed to update the account {:?}, error: {:?}", @@ -430,10 +688,19 @@ impl PostgresClient for ParallelPostgresClient { ), }); } + + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-posgres-send-msg-us", + measure.as_us() as usize, + 100000, + 100000 + ); + Ok(()) } - fn update_slot_status( + pub fn update_slot_status( &mut self, slot: u64, parent: Option, @@ -450,6 +717,30 @@ impl PostgresClient for ParallelPostgresClient { } Ok(()) } + + pub fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> { + info!("Notifying the end of startup"); + // Ensure all items in the queue has been received by the workers + while !self.sender.is_empty() { + sleep(Duration::from_millis(100)); + } + self.is_startup_done.store(true, Ordering::Relaxed); + + // Wait for all worker threads to be done with flushing + while self.startup_done_count.load(Ordering::Relaxed) + != self.initialized_worker_count.load(Ordering::Relaxed) + { + info!( + "Startup done count: {}, good worker thread count: {}", + self.startup_done_count.load(Ordering::Relaxed), + self.initialized_worker_count.load(Ordering::Relaxed) + ); + sleep(Duration::from_millis(100)); + } + + info!("Done with notifying the end of startup"); + Ok(()) + } } pub struct PostgresClientBuilder {} diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 0e890213fc..a80a327196 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -400,6 +400,8 @@ pub enum LoadedAccountAccessor<'a> { Cached(Option<(Pubkey, Cow<'a, CachedAccount>)>), } +mod accountsdb_plugin_utils; + impl<'a> LoadedAccountAccessor<'a> { fn check_and_get_loaded_account(&mut self) -> LoadedAccount { // all of these following .expect() and .unwrap() are like serious logic errors, @@ -6193,15 +6195,7 @@ impl AccountsDb { pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { self.store(slot, accounts, self.caching_enabled); - if let Some(accounts_update_notifier) = &self.accounts_update_notifier { - let notifier = &accounts_update_notifier.read().unwrap(); - - for account in accounts { - let pubkey = account.0; - let account = account.1; - notifier.notify_account_update(slot, pubkey, account); - } - } + self.notify_account_at_accounts_update(slot, accounts); } /// Store the account update. @@ -7067,24 +7061,6 @@ impl AccountsDb { } } } - - pub fn notify_account_restore_from_snapshot(&self) { - if let Some(accounts_update_notifier) = &self.accounts_update_notifier { - let notifier = &accounts_update_notifier.read().unwrap(); - let slots = self.storage.all_slots(); - for slot in &slots { - let slot_stores = self.storage.get_slot_stores(*slot).unwrap(); - - let slot_stores = slot_stores.read().unwrap(); - for (_, storage_entry) in slot_stores.iter() { - let accounts = storage_entry.all_accounts(); - for account in &accounts { - notifier.notify_account_restore_from_snapshot(*slot, account); - } - } - } - } - } } #[cfg(test)] diff --git a/runtime/src/accounts_db/accountsdb_plugin_utils.rs b/runtime/src/accounts_db/accountsdb_plugin_utils.rs new file mode 100644 index 0000000000..2223c8d2e6 --- /dev/null +++ b/runtime/src/accounts_db/accountsdb_plugin_utils.rs @@ -0,0 +1,480 @@ +use { + crate::{accounts_db::AccountsDb, append_vec::StoredAccountMeta}, + solana_measure::measure::Measure, + solana_metrics::*, + solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey}, + std::collections::{hash_map::Entry, HashMap, HashSet}, +}; + +#[derive(Default)] +pub struct AccountsDbPluginNotifyAtSnapshotRestoreStats { + pub total_accounts: usize, + pub skipped_accounts: usize, + pub notified_accounts: usize, + pub elapsed_filtering_us: usize, + pub total_pure_notify: usize, + pub total_pure_bookeeping: usize, + pub elapsed_notifying_us: usize, +} + +impl AccountsDbPluginNotifyAtSnapshotRestoreStats { + pub fn report(&self) { + datapoint_info!( + "accountsdb_plugin_notify_account_restore_from_snapshot_summary", + ("total_accounts", self.total_accounts, i64), + ("skipped_accounts", self.skipped_accounts, i64), + ("notified_accounts", self.notified_accounts, i64), + ("elapsed_filtering_us", self.elapsed_filtering_us, i64), + ("elapsed_notifying_us", self.elapsed_notifying_us, i64), + ("total_pure_notify_us", self.total_pure_notify, i64), + ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64), + ); + } +} + +impl AccountsDb { + /// Notify the plugins of of account data when AccountsDb is restored from a snapshot. The data is streamed + /// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated + /// multiple times only the last write (with highest write_version) is notified. + pub fn notify_account_restore_from_snapshot(&self) { + if self.accounts_update_notifier.is_none() { + return; + } + + let mut slots = self.storage.all_slots(); + let mut notified_accounts: HashSet = HashSet::default(); + let mut notify_stats = AccountsDbPluginNotifyAtSnapshotRestoreStats::default(); + + slots.sort_by(|a, b| b.cmp(a)); + for slot in slots { + self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats); + } + + let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap(); + let notifier = &accounts_update_notifier.read().unwrap(); + notifier.notify_end_of_restore_from_snapshot(); + notify_stats.report(); + } + + pub fn notify_account_at_accounts_update( + &self, + slot: Slot, + accounts: &[(&Pubkey, &AccountSharedData)], + ) { + if let Some(accounts_update_notifier) = &self.accounts_update_notifier { + let notifier = &accounts_update_notifier.read().unwrap(); + + for account in accounts { + let pubkey = account.0; + let account = account.1; + notifier.notify_account_update(slot, pubkey, account); + } + } + } + + fn notify_accounts_in_slot( + &self, + slot: Slot, + notified_accounts: &mut HashSet, + notify_stats: &mut AccountsDbPluginNotifyAtSnapshotRestoreStats, + ) { + let slot_stores = self.storage.get_slot_stores(slot).unwrap(); + + let slot_stores = slot_stores.read().unwrap(); + let mut accounts_to_stream: HashMap = HashMap::default(); + let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts"); + for (_, storage_entry) in slot_stores.iter() { + let mut accounts = storage_entry.all_accounts(); + let account_len = accounts.len(); + notify_stats.total_accounts += account_len; + accounts.drain(..).into_iter().for_each(|account| { + if notified_accounts.contains(&account.meta.pubkey) { + notify_stats.skipped_accounts += 1; + return; + } + match accounts_to_stream.entry(account.meta.pubkey) { + Entry::Occupied(mut entry) => { + let existing_account = entry.get(); + if account.meta.write_version > existing_account.meta.write_version { + entry.insert(account); + } else { + notify_stats.skipped_accounts += 1; + } + } + Entry::Vacant(entry) => { + entry.insert(account); + } + } + }); + } + measure_filter.stop(); + notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize; + + self.notify_filtered_accounts(slot, notified_accounts, &accounts_to_stream, notify_stats); + } + + fn notify_filtered_accounts( + &self, + slot: Slot, + notified_accounts: &mut HashSet, + accounts_to_stream: &HashMap, + notify_stats: &mut AccountsDbPluginNotifyAtSnapshotRestoreStats, + ) { + let notifier = self + .accounts_update_notifier + .as_ref() + .unwrap() + .read() + .unwrap(); + + let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts"); + for account in accounts_to_stream.values() { + let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts"); + notifier.notify_account_restore_from_snapshot(slot, account); + measure_pure_notify.stop(); + + notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize; + + let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping"); + notified_accounts.insert(account.meta.pubkey); + measure_bookkeep.stop(); + notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize; + } + notify_stats.notified_accounts += accounts_to_stream.len(); + measure_notify.stop(); + notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize; + } +} + +#[cfg(test)] +pub mod tests { + use { + crate::{ + accounts_db::AccountsDb, + accounts_update_notifier_interface::{ + AccountsUpdateNotifier, AccountsUpdateNotifierInterface, + }, + append_vec::StoredAccountMeta, + }, + dashmap::DashMap, + solana_sdk::{ + account::{AccountSharedData, ReadableAccount}, + clock::Slot, + pubkey::Pubkey, + }, + std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + }; + + impl AccountsDb { + pub fn set_accountsdb_plugin_notifer(&mut self, notifier: Option) { + self.accounts_update_notifier = notifier; + } + } + + #[derive(Debug, Default)] + struct AccountsDbTestPlugin { + pub accounts_at_snapshot_restore: DashMap>, + pub is_startup_done: AtomicBool, + } + + impl AccountsUpdateNotifierInterface for AccountsDbTestPlugin { + /// Notified when an account is updated at runtime, due to transaction activities + fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) { + self.accounts_at_snapshot_restore + .entry(*pubkey) + .or_insert(Vec::default()) + .push((slot, account.clone())); + } + + /// Notified when the AccountsDb is initialized at start when restored + /// from a snapshot. + fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) { + self.accounts_at_snapshot_restore + .entry(account.meta.pubkey) + .or_insert(Vec::default()) + .push((slot, account.clone_account())); + } + + /// Notified when a slot is optimistically confirmed + fn notify_slot_confirmed(&self, _slot: Slot, _parent: Option) {} + + /// Notified when a slot is marked frozen. + fn notify_slot_processed(&self, _slot: Slot, _parent: Option) {} + + /// Notified when a slot is rooted. + fn notify_slot_rooted(&self, _slot: Slot, _parent: Option) {} + + fn notify_end_of_restore_from_snapshot(&self) { + self.is_startup_done.store(true, Ordering::Relaxed); + } + } + + #[test] + fn test_notify_account_restore_from_snapshot_once_per_slot() { + let mut accounts = AccountsDb::new_single_for_tests(); + // Account with key1 is updated twice in the store -- should only get notified once. + let key1 = solana_sdk::pubkey::new_rand(); + let mut account1_lamports: u64 = 1; + let account1 = + AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner()); + let slot0 = 0; + accounts.store_uncached(slot0, &[(&key1, &account1)]); + + account1_lamports = 2; + let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner()); + accounts.store_uncached(slot0, &[(&key1, &account1)]); + let notifier = AccountsDbTestPlugin::default(); + + let key2 = solana_sdk::pubkey::new_rand(); + let account2_lamports: u64 = 100; + let account2 = + AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner()); + + accounts.store_uncached(slot0, &[(&key2, &account2)]); + + let notifier = Arc::new(RwLock::new(notifier)); + accounts.set_accountsdb_plugin_notifer(Some(notifier.clone())); + + accounts.notify_account_restore_from_snapshot(); + + let notifier = notifier.write().unwrap(); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key1) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0] + .1 + .lamports(), + account1_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0, + slot0 + ); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key2) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0] + .1 + .lamports(), + account2_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0, + slot0 + ); + + assert!(notifier.is_startup_done.load(Ordering::Relaxed)); + } + + #[test] + fn test_notify_account_restore_from_snapshot_once_across_slots() { + let mut accounts = AccountsDb::new_single_for_tests(); + // Account with key1 is updated twice in two different slots -- should only get notified once. + // Account with key2 is updated slot0, should get notified once + // Account with key3 is updated in slot1, should get notified once + let key1 = solana_sdk::pubkey::new_rand(); + let mut account1_lamports: u64 = 1; + let account1 = + AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner()); + let slot0 = 0; + accounts.store_uncached(slot0, &[(&key1, &account1)]); + + let key2 = solana_sdk::pubkey::new_rand(); + let account2_lamports: u64 = 200; + let account2 = + AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner()); + accounts.store_uncached(slot0, &[(&key2, &account2)]); + + account1_lamports = 2; + let slot1 = 1; + let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner()); + accounts.store_uncached(slot1, &[(&key1, &account1)]); + let notifier = AccountsDbTestPlugin::default(); + + let key3 = solana_sdk::pubkey::new_rand(); + let account3_lamports: u64 = 300; + let account3 = + AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner()); + accounts.store_uncached(slot1, &[(&key3, &account3)]); + + let notifier = Arc::new(RwLock::new(notifier)); + accounts.set_accountsdb_plugin_notifer(Some(notifier.clone())); + + accounts.notify_account_restore_from_snapshot(); + + let notifier = notifier.write().unwrap(); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key1) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0] + .1 + .lamports(), + account1_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0, + slot1 + ); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key2) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0] + .1 + .lamports(), + account2_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0, + slot0 + ); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key3) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0] + .1 + .lamports(), + account3_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0].0, + slot1 + ); + assert!(notifier.is_startup_done.load(Ordering::Relaxed)); + } + + #[test] + fn test_notify_account_at_accounts_update() { + let mut accounts = AccountsDb::new_single_for_tests(); + let notifier = AccountsDbTestPlugin::default(); + + let notifier = Arc::new(RwLock::new(notifier)); + accounts.set_accountsdb_plugin_notifer(Some(notifier.clone())); + + // Account with key1 is updated twice in two different slots -- should only get notified twice. + // Account with key2 is updated slot0, should get notified once + // Account with key3 is updated in slot1, should get notified once + let key1 = solana_sdk::pubkey::new_rand(); + let account1_lamports1: u64 = 1; + let account1 = + AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner()); + let slot0 = 0; + accounts.store_cached(slot0, &[(&key1, &account1)]); + + let key2 = solana_sdk::pubkey::new_rand(); + let account2_lamports: u64 = 200; + let account2 = + AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner()); + accounts.store_cached(slot0, &[(&key2, &account2)]); + + let account1_lamports2 = 2; + let slot1 = 1; + let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner()); + accounts.store_cached(slot1, &[(&key1, &account1)]); + + let key3 = solana_sdk::pubkey::new_rand(); + let account3_lamports: u64 = 300; + let account3 = + AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner()); + accounts.store_cached(slot1, &[(&key3, &account3)]); + + let notifier = notifier.write().unwrap(); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key1) + .unwrap() + .len(), + 2 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0] + .1 + .lamports(), + account1_lamports1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0, + slot0 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[1] + .1 + .lamports(), + account1_lamports2 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[1].0, + slot1 + ); + + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key2) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0] + .1 + .lamports(), + account2_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0, + slot0 + ); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key3) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0] + .1 + .lamports(), + account3_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0].0, + slot1 + ); + } +} diff --git a/runtime/src/accounts_update_notifier_interface.rs b/runtime/src/accounts_update_notifier_interface.rs index 70730ef01c..bfd62e9b15 100644 --- a/runtime/src/accounts_update_notifier_interface.rs +++ b/runtime/src/accounts_update_notifier_interface.rs @@ -12,6 +12,9 @@ pub trait AccountsUpdateNotifierInterface: std::fmt::Debug { /// from a snapshot. fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta); + /// Notified when all accounts have been notified when restoring from a snapshot. + fn notify_end_of_restore_from_snapshot(&self); + /// Notified when a slot is optimistically confirmed fn notify_slot_confirmed(&self, slot: Slot, parent: Option); diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index f744848d4f..0ac7719b6c 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -44,6 +44,7 @@ use { atomic::{AtomicUsize, Ordering}, Arc, RwLock, }, + thread::Builder, }, }; @@ -534,11 +535,22 @@ where accounts_db .write_version .fetch_add(snapshot_version, Ordering::Relaxed); + + let mut measure_notify = Measure::start("accounts_notify"); + + let accounts_db = Arc::new(accounts_db); + let accoounts_db_clone = accounts_db.clone(); + let handle = Builder::new() + .name("notify_account_restore_from_snapshot".to_string()) + .spawn(move || { + accoounts_db_clone.notify_account_restore_from_snapshot(); + }) + .unwrap(); + accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index); accounts_db.maybe_add_filler_accounts(genesis_config.ticks_per_slot()); - let mut measure_notify = Measure::start("accounts_notify"); - accounts_db.notify_account_restore_from_snapshot(); + handle.join().unwrap(); measure_notify.stop(); datapoint_info!( @@ -552,5 +564,5 @@ where ("accountsdb-notify-at-start-us", measure_notify.as_us(), i64), ); - Ok(accounts_db) + Ok(Arc::try_unwrap(accounts_db).unwrap()) }