From f14365f4b46b88c5d4e10e103f4ced55b10a9b49 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Sun, 24 Oct 2021 12:43:33 -0700 Subject: [PATCH] Accountsdb plugin postgres -- bulk insertion at startup (#20763) Use bulk insertion to Postgres at startup to reduce time taken for initial snapshot restore for postgres plugin. Avoid duplicate writes of accounts at startup. Doing account plugin notification and indexing in parallel. Improved error handling for postgres plugin to show the real db issues for debug purpose Added more metrics for postgres plugin. Refactored plugin centric code out to a sub module from accounts_db and added unit tests --- Cargo.lock | 18 +- .../src/accountsdb_plugin_interface.rs | 20 +- .../src/accounts_update_notifier.rs | 91 +++- accountsdb-plugin-postgres/Cargo.toml | 3 +- .../src/accountsdb_plugin_postgres.rs | 120 +++-- .../src/postgres_client.rs | 469 +++++++++++++---- runtime/src/accounts_db.rs | 30 +- .../accounts_db/accountsdb_plugin_utils.rs | 480 ++++++++++++++++++ .../src/accounts_update_notifier_interface.rs | 3 + runtime/src/serde_snapshot.rs | 18 +- 10 files changed, 1064 insertions(+), 188 deletions(-) create mode 100644 runtime/src/accounts_db/accountsdb_plugin_utils.rs diff --git a/Cargo.lock b/Cargo.lock index 01e4ebfe2..036881942 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2949,18 +2949,18 @@ dependencies = [ [[package]] name = "phf" -version = "0.8.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12" +checksum = "b9fc3db1018c4b59d7d582a739436478b6035138b6aecbce989fc91c3e98409f" dependencies = [ "phf_shared", ] [[package]] name = "phf_shared" -version = "0.8.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7" +checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" dependencies = [ "siphasher", ] @@ -3076,9 +3076,9 @@ dependencies = [ [[package]] name = "postgres-types" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +checksum = "04619f94ba0cc80999f4fc7073607cb825bc739a883cb6d20900fc5e009d6b0d" dependencies = [ "bytes 1.0.1", "chrono", @@ -4287,9 +4287,11 @@ dependencies = [ "serde_json", "solana-accountsdb-plugin-interface", "solana-logger 1.9.0", + "solana-measure", "solana-metrics", "solana-sdk", "thiserror", + "tokio-postgres", ] [[package]] @@ -6452,9 +6454,9 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.2" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2b1383c7e4fb9a09e292c7c6afb7da54418d53b045f1c1fac7a911411a2b8b" +checksum = "2f916ee7e52c8a74dfe4162dd73a073d0d7d4b387ea7b97a774c0c10b0776531" dependencies = [ "async-trait", "byteorder", diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs index b75914bf3..6e5c8637c 100644 --- a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -25,19 +25,19 @@ pub enum ReplicaAccountInfoVersions<'a> { #[derive(Error, Debug)] pub enum AccountsDbPluginError { - #[error("Error opening config file.")] + #[error("Error opening config file. Error detail: ({0}).")] ConfigFileOpenError(#[from] io::Error), - #[error("Error reading config file.")] + #[error("Error reading config file. Error message: ({msg})")] ConfigFileReadError { msg: String }, - #[error("Error updating account.")] + #[error("Error updating account. Error message: ({msg})")] AccountsUpdateError { msg: String }, - #[error("Error updating slot status.")] + #[error("Error updating slot status. Error message: ({msg})")] SlotStatusUpdateError { msg: String }, - #[error("Plugin-defined custom error.")] + #[error("Plugin-defined custom error. Error message: ({0})")] Custom(Box), } @@ -78,7 +78,15 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug { fn on_unload(&mut self) {} /// Called when an account is updated at a slot. - fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()>; + fn update_account( + &mut self, + account: ReplicaAccountInfoVersions, + slot: u64, + is_startup: bool, + ) -> Result<()>; + + /// Called when all accounts are notified of during startup. + fn notify_end_of_startup(&mut self) -> Result<()>; /// Called when a slot status is updated fn update_slot_status( diff --git a/accountsdb-plugin-manager/src/accounts_update_notifier.rs b/accountsdb-plugin-manager/src/accounts_update_notifier.rs index dc1204703..7a835e09f 100644 --- a/accountsdb-plugin-manager/src/accounts_update_notifier.rs +++ b/accountsdb-plugin-manager/src/accounts_update_notifier.rs @@ -26,13 +26,65 @@ pub(crate) struct AccountsUpdateNotifierImpl { impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) { if let Some(account_info) = self.accountinfo_from_shared_account_data(pubkey, account) { - self.notify_plugins_of_account_update(account_info, slot); + self.notify_plugins_of_account_update(account_info, slot, false); } } fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) { - if let Some(account_info) = self.accountinfo_from_stored_account_meta(account) { - self.notify_plugins_of_account_update(account_info, slot); + let mut measure_all = Measure::start("accountsdb-plugin-notify-account-restore-all"); + let mut measure_copy = Measure::start("accountsdb-plugin-copy-stored-account-info"); + + let account = self.accountinfo_from_stored_account_meta(account); + measure_copy.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-copy-stored-account-info-us", + measure_copy.as_us() as usize, + 100000, + 100000 + ); + + if let Some(account_info) = account { + self.notify_plugins_of_account_update(account_info, slot, true); + } + measure_all.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-notify-account-restore-all-us", + measure_all.as_us() as usize, + 100000, + 100000 + ); + } + + fn notify_end_of_restore_from_snapshot(&self) { + let mut plugin_manager = self.plugin_manager.write().unwrap(); + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter_mut() { + let mut measure = Measure::start("accountsdb-plugin-end-of-restore-from-snapshot"); + match plugin.notify_end_of_startup() { + Err(err) => { + error!( + "Failed to notify the end of restore from snapshot, error: {} to plugin {}", + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully notified the end of restore from snapshot to plugin {}", + plugin.name() + ); + } + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-end-of-restore-from-snapshot", + measure.as_us() as usize + ); } } @@ -83,7 +135,13 @@ impl AccountsUpdateNotifierImpl { }) } - fn notify_plugins_of_account_update(&self, account: ReplicaAccountInfo, slot: Slot) { + fn notify_plugins_of_account_update( + &self, + account: ReplicaAccountInfo, + slot: Slot, + is_startup: bool, + ) { + let mut measure2 = Measure::start("accountsdb-plugin-notify_plugins_of_account_update"); let mut plugin_manager = self.plugin_manager.write().unwrap(); if plugin_manager.plugins.is_empty() { @@ -91,7 +149,11 @@ impl AccountsUpdateNotifierImpl { } for plugin in plugin_manager.plugins.iter_mut() { let mut measure = Measure::start("accountsdb-plugin-update-account"); - match plugin.update_account(ReplicaAccountInfoVersions::V0_0_1(&account), slot) { + match plugin.update_account( + ReplicaAccountInfoVersions::V0_0_1(&account), + slot, + is_startup, + ) { Err(err) => { error!( "Failed to update account {} at slot {}, error: {} to plugin {}", @@ -111,13 +173,20 @@ impl AccountsUpdateNotifierImpl { } } measure.stop(); - inc_new_counter_info!( - "accountsdb-plugin-update-account-ms", - measure.as_ms() as usize, + inc_new_counter_debug!( + "accountsdb-plugin-update-account-us", + measure.as_us() as usize, 100000, 100000 ); } + measure2.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-notify_plugins_of_account_update-us", + measure2.as_us() as usize, + 100000, + 100000 + ); } pub fn notify_slot_status(&self, slot: Slot, parent: Option, slot_status: SlotStatus) { @@ -146,9 +215,9 @@ impl AccountsUpdateNotifierImpl { } } measure.stop(); - inc_new_counter_info!( - "accountsdb-plugin-update-slot-ms", - measure.as_ms() as usize, + inc_new_counter_debug!( + "accountsdb-plugin-update-slot-us", + measure.as_us() as usize, 1000, 1000 ); diff --git a/accountsdb-plugin-postgres/Cargo.toml b/accountsdb-plugin-postgres/Cargo.toml index e9e2d86a7..434383d3d 100644 --- a/accountsdb-plugin-postgres/Cargo.toml +++ b/accountsdb-plugin-postgres/Cargo.toml @@ -23,9 +23,10 @@ serde_derive = "1.0.103" serde_json = "1.0.67" solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.9.0" } solana-logger = { path = "../logger", version = "=1.9.0" } +solana-measure = { path = "../measure", version = "=1.9.0" } solana-metrics = { path = "../metrics", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" } thiserror = "1.0.30" - +tokio-postgres = "0.7.3" [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs index 950e8b7ed..8dfa719e4 100644 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -1,10 +1,10 @@ +use solana_measure::measure::Measure; + /// Main entry for the PostgreSQL plugin use { crate::{ accounts_selector::AccountsSelector, - postgres_client::{ - ParallelPostgresClient, PostgresClient, PostgresClientBuilder, SimplePostgresClient, - }, + postgres_client::{ParallelPostgresClient, PostgresClientBuilder}, }, bs58, log::*, @@ -13,19 +13,14 @@ use { solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result, SlotStatus, }, + solana_metrics::*, std::{fs::File, io::Read}, thiserror::Error, }; -#[allow(clippy::large_enum_variant)] -enum PostgresClientEnum { - Simple(SimplePostgresClient), - Parallel(ParallelPostgresClient), -} - #[derive(Default)] pub struct AccountsDbPluginPostgres { - client: Option, + client: Option, accounts_selector: Option, } @@ -41,14 +36,15 @@ pub struct AccountsDbPluginPostgresConfig { pub user: String, pub threads: Option, pub port: Option, + pub batch_size: Option, } #[derive(Error, Debug)] pub enum AccountsDbPluginPostgresError { - #[error("Error connecting to the backend data store.")] + #[error("Error connecting to the backend data store. Error message: ({msg})")] DataStoreConnectionError { msg: String }, - #[error("Error preparing data store schema.")] + #[error("Error preparing data store schema. Error message: ({msg})")] DataSchemaError { msg: String }, } @@ -79,7 +75,9 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { /// "host" specifies the PostgreSQL server. /// "user" specifies the PostgreSQL user. /// "threads" optional, specifies the number of worker threads for the plugin. A thread - /// maintains a PostgreSQL connection to the server. + /// maintains a PostgreSQL connection to the server. The default is 10. + /// "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created + /// from restoring a snapshot. The default is "10". /// # Examples /// { /// "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so", @@ -116,13 +114,8 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { }) } Ok(config) => { - self.client = if config.threads.is_some() && config.threads.unwrap() > 1 { - let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?; - Some(PostgresClientEnum::Parallel(client)) - } else { - let client = PostgresClientBuilder::build_simple_postgres_client(&config)?; - Some(PostgresClientEnum::Simple(client)) - }; + let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?; + self.client = Some(client); } } @@ -134,18 +127,23 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { match &mut self.client { None => {} - Some(client) => match client { - PostgresClientEnum::Parallel(client) => client.join().unwrap(), - PostgresClientEnum::Simple(client) => { - client.join().unwrap(); - } - }, + Some(client) => { + client.join().unwrap(); + } } } - fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()> { + fn update_account( + &mut self, + account: ReplicaAccountInfoVersions, + slot: u64, + is_startup: bool, + ) -> Result<()> { + let mut measure_all = Measure::start("accountsdb-plugin-postgres-update-account-main"); match account { ReplicaAccountInfoVersions::V0_0_1(account) => { + let mut measure_select = + Measure::start("accountsdb-plugin-postgres-update-account-select"); if let Some(accounts_selector) = &self.accounts_selector { if !accounts_selector.is_account_selected(account.pubkey, account.owner) { return Ok(()); @@ -153,6 +151,13 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { } else { return Ok(()); } + measure_select.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-select-us", + measure_select.as_us() as usize, + 100000, + 100000 + ); debug!( "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}", @@ -172,14 +177,17 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { ))); } Some(client) => { - let result = match client { - PostgresClientEnum::Parallel(client) => { - client.update_account(account, slot) - } - PostgresClientEnum::Simple(client) => { - client.update_account(account, slot) - } - }; + let mut measure_update = + Measure::start("accountsdb-plugin-postgres-update-account-client"); + let result = { client.update_account(account, slot, is_startup) }; + measure_update.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-client-us", + measure_update.as_us() as usize, + 100000, + 100000 + ); if let Err(err) = result { return Err(AccountsDbPluginError::AccountsUpdateError { @@ -190,6 +198,16 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { } } } + + measure_all.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-main-us", + measure_all.as_us() as usize, + 100000, + 100000 + ); + Ok(()) } @@ -210,14 +228,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { ))); } Some(client) => { - let result = match client { - PostgresClientEnum::Parallel(client) => { - client.update_slot_status(slot, parent, status) - } - PostgresClientEnum::Simple(client) => { - client.update_slot_status(slot, parent, status) - } - }; + let result = client.update_slot_status(slot, parent, status); if let Err(err) = result { return Err(AccountsDbPluginError::SlotStatusUpdateError{ @@ -229,6 +240,29 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { Ok(()) } + + fn notify_end_of_startup(&mut self) -> Result<()> { + info!("Notifying the end of startup for accounts notifications"); + match &mut self.client { + None => { + return Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { + msg: "There is no connection to the PostgreSQL database.".to_string(), + }, + ))); + } + Some(client) => { + let result = client.notify_end_of_startup(); + + if let Err(err) = result { + return Err(AccountsDbPluginError::SlotStatusUpdateError{ + msg: format!("Failed to notify the end of startup for accounts notifications. Error: {:?}", err) + }); + } + } + } + Ok(()) + } } impl AccountsDbPluginPostgres { diff --git a/accountsdb-plugin-postgres/src/postgres_client.rs b/accountsdb-plugin-postgres/src/postgres_client.rs index 05e3043e2..1474a6858 100644 --- a/accountsdb-plugin-postgres/src/postgres_client.rs +++ b/accountsdb-plugin-postgres/src/postgres_client.rs @@ -1,3 +1,5 @@ +#![allow(clippy::integer_arithmetic)] + /// A concurrent implementation for writing accounts into the PostgreSQL in parallel. use { crate::accountsdb_plugin_postgres::{ @@ -10,34 +12,44 @@ use { solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ AccountsDbPluginError, ReplicaAccountInfo, SlotStatus, }, - solana_metrics::datapoint_info, + solana_measure::measure::Measure, + solana_metrics::*, solana_sdk::timing::AtomicInterval, std::{ sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, Mutex, }, - thread::{self, Builder, JoinHandle}, + thread::{self, sleep, Builder, JoinHandle}, time::Duration, }, + tokio_postgres::types::ToSql, }; /// The maximum asynchronous requests allowed in the channel to avoid excessive /// memory usage. The downside -- calls after this threshold is reached can get blocked. -const MAX_ASYNC_REQUESTS: usize = 10240; +const MAX_ASYNC_REQUESTS: usize = 40960; const DEFAULT_POSTGRES_PORT: u16 = 5432; +const DEFAULT_THREADS_COUNT: usize = 100; +const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10; +const ACCOUNT_COLUMN_COUNT: usize = 8; struct PostgresSqlClientWrapper { client: Client, update_account_stmt: Statement, + bulk_account_insert_stmt: Statement, } pub struct SimplePostgresClient { + batch_size: usize, + pending_account_updates: Vec, client: Mutex, } struct PostgresClientWorker { client: SimplePostgresClient, + /// Indicating if accounts notification during startup is done. + is_startup_done: bool, } impl Eq for DbAccountInfo {} @@ -45,23 +57,25 @@ impl Eq for DbAccountInfo {} #[derive(Clone, PartialEq, Debug)] pub struct DbAccountInfo { pub pubkey: Vec, - pub lamports: u64, + pub lamports: i64, pub owner: Vec, pub executable: bool, - pub rent_epoch: u64, + pub rent_epoch: i64, pub data: Vec, + pub slot: i64, } impl DbAccountInfo { - fn new(account: &T) -> DbAccountInfo { + fn new(account: &T, slot: u64) -> DbAccountInfo { let data = account.data().to_vec(); Self { pubkey: account.pubkey().to_vec(), - lamports: account.lamports(), + lamports: account.lamports() as i64, owner: account.owner().to_vec(), executable: account.executable(), - rent_epoch: account.rent_epoch(), + rent_epoch: account.rent_epoch() as i64, data, + slot: slot as i64, } } } @@ -69,9 +83,9 @@ impl DbAccountInfo { pub trait ReadableAccountInfo: Sized { fn pubkey(&self) -> &[u8]; fn owner(&self) -> &[u8]; - fn lamports(&self) -> u64; + fn lamports(&self) -> i64; fn executable(&self) -> bool; - fn rent_epoch(&self) -> u64; + fn rent_epoch(&self) -> i64; fn data(&self) -> &[u8]; } @@ -84,7 +98,7 @@ impl ReadableAccountInfo for DbAccountInfo { &self.owner } - fn lamports(&self) -> u64 { + fn lamports(&self) -> i64 { self.lamports } @@ -92,7 +106,7 @@ impl ReadableAccountInfo for DbAccountInfo { self.executable } - fn rent_epoch(&self) -> u64 { + fn rent_epoch(&self) -> i64 { self.rent_epoch } @@ -110,16 +124,16 @@ impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> { self.owner } - fn lamports(&self) -> u64 { - self.lamports + fn lamports(&self) -> i64 { + self.lamports as i64 } fn executable(&self) -> bool { self.executable } - fn rent_epoch(&self) -> u64 { - self.rent_epoch + fn rent_epoch(&self) -> i64 { + self.rent_epoch as i64 } fn data(&self) -> &[u8] { @@ -132,10 +146,10 @@ pub trait PostgresClient { Ok(()) } - fn update_account( + fn update_account( &mut self, - account: &T, - slot: u64, + account: DbAccountInfo, + is_startup: bool, ) -> Result<(), AccountsDbPluginError>; fn update_slot_status( @@ -144,73 +158,120 @@ pub trait PostgresClient { parent: Option, status: SlotStatus, ) -> Result<(), AccountsDbPluginError>; + + fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError>; } impl SimplePostgresClient { - pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { + fn connect_to_db( + config: &AccountsDbPluginPostgresConfig, + ) -> Result { let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT); let connection_str = format!("host={} user={} port={}", config.host, config.user, port); match Client::connect(&connection_str, NoTls) { Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataStoreConnectionError { - msg: format!( + let msg = format!( "Error in connecting to the PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, connection_str + err, config.host, config.user, connection_str); + error!("{}", msg); + Err(AccountsDbPluginError::Custom(Box::new( + AccountsDbPluginPostgresError::DataStoreConnectionError { msg }, + ))) + } + Ok(client) => Ok(client), + } + } + + fn build_bulk_account_insert_statement( + client: &mut Client, + config: &AccountsDbPluginPostgresConfig, + ) -> Result { + let batch_size = config + .batch_size + .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE); + let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) VALUES"); + for j in 0..batch_size { + let row = j * ACCOUNT_COLUMN_COUNT; + let val_str = format!( + "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})", + row + 1, + row + 2, + row + 3, + row + 4, + row + 5, + row + 6, + row + 7, + row + 8, + ); + + if j == 0 { + stmt = format!("{} {}", &stmt, val_str); + } else { + stmt = format!("{}, {}", &stmt, val_str); + } + } + + let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \ + data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot"; + + stmt = format!("{} {}", stmt, handle_conflict); + + info!("{}", stmt); + let bulk_stmt = client.prepare(&stmt); + + match bulk_stmt { + Err(err) => { + return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the accounts update PostgreSQL database: {} host: {} user: {} config: {:?}", + err, config.host, config.user, config ), }))); } - Ok(mut client) => { - let result = client.prepare("INSERT INTO account (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ - ON CONFLICT (pubkey) DO UPDATE SET slot=$2, owner=$3, lamports=$4, executable=$5, rent_epoch=$6, \ - data=$7, updated_on=$8"); - - match result { - Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the accounts update PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, connection_str - ), - }))); - } - Ok(update_account_stmt) => Ok(Self { - client: Mutex::new(PostgresSqlClientWrapper { - client, - update_account_stmt, - }), - }), - } - } + Ok(update_account_stmt) => Ok(update_account_stmt), } } -} -impl PostgresClient for SimplePostgresClient { - fn update_account( - &mut self, - account: &T, - slot: u64, + fn build_single_account_upsert_statement( + client: &mut Client, + config: &AccountsDbPluginPostgresConfig, + ) -> Result { + let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \ + ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \ + data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot"; + + let stmt = client.prepare(stmt); + + match stmt { + Err(err) => { + return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the accounts update PostgreSQL database: {} host: {} user: {} config: {:?}", + err, config.host, config.user, config + ), + }))); + } + Ok(update_account_stmt) => Ok(update_account_stmt), + } + } + + /// Internal function for updating or inserting a single account + fn upsert_account_internal( + account: &DbAccountInfo, + statement: &Statement, + client: &mut Client, ) -> Result<(), AccountsDbPluginError> { - trace!( - "Updating account {} with owner {} at slot {}", - bs58::encode(account.pubkey()).into_string(), - bs58::encode(account.owner()).into_string(), - slot, - ); - - let slot = slot as i64; // postgres only supports i64 let lamports = account.lamports() as i64; let rent_epoch = account.rent_epoch() as i64; let updated_on = Utc::now().naive_utc(); - let client = self.client.get_mut().unwrap(); - let result = client.client.query( - &client.update_account_stmt, + let result = client.query( + statement, &[ &account.pubkey(), - &slot, + &account.slot, &account.owner(), &lamports, &account.executable(), @@ -228,9 +289,141 @@ impl PostgresClient for SimplePostgresClient { error!("{}", msg); return Err(AccountsDbPluginError::AccountsUpdateError { msg }); } + Ok(()) } + /// Update or insert a single account + fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> { + let client = self.client.get_mut().unwrap(); + let statement = &client.update_account_stmt; + let client = &mut client.client; + Self::upsert_account_internal(account, statement, client) + } + + /// Insert accounts in batch to reduce network overhead + fn insert_accounts_in_batch( + &mut self, + account: DbAccountInfo, + ) -> Result<(), AccountsDbPluginError> { + self.pending_account_updates.push(account); + + if self.pending_account_updates.len() == self.batch_size { + let mut measure = Measure::start("accountsdb-plugin-postgres-prepare-values"); + + let mut values: Vec<&(dyn ToSql + Sync)> = + Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT); + let updated_on = Utc::now().naive_utc(); + for j in 0..self.batch_size { + let account = &self.pending_account_updates[j]; + + values.push(&account.pubkey); + values.push(&account.slot); + values.push(&account.owner); + values.push(&account.lamports); + values.push(&account.executable); + values.push(&account.rent_epoch); + values.push(&account.data); + values.push(&updated_on); + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-prepare-values-us", + measure.as_us() as usize, + 10000, + 10000 + ); + + let mut measure = Measure::start("accountsdb-plugin-postgres-update-account"); + let client = self.client.get_mut().unwrap(); + let result = client + .client + .query(&client.bulk_account_insert_stmt, &values); + + self.pending_account_updates.clear(); + if let Err(err) = result { + let msg = format!( + "Failed to persist the update of account to the PostgreSQL database. Error: {:?}", + err + ); + error!("{}", msg); + return Err(AccountsDbPluginError::AccountsUpdateError { msg }); + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-us", + measure.as_us() as usize, + 10000, + 10000 + ); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-update-account-count", + self.batch_size, + 10000, + 10000 + ); + } + Ok(()) + } + + /// Flush any left over accounts in batch which are not processed in the last batch + fn flush_buffered_writes(&mut self) -> Result<(), AccountsDbPluginError> { + if self.pending_account_updates.is_empty() { + return Ok(()); + } + + let client = self.client.get_mut().unwrap(); + let statement = &client.update_account_stmt; + let client = &mut client.client; + + for account in self.pending_account_updates.drain(..) { + Self::upsert_account_internal(&account, statement, client)?; + } + + Ok(()) + } + + pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { + info!("Creating SimplePostgresClient..."); + let mut client = Self::connect_to_db(config)?; + let bulk_account_insert_stmt = + Self::build_bulk_account_insert_statement(&mut client, config)?; + let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?; + + let batch_size = config + .batch_size + .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE); + info!("Created SimplePostgresClient."); + Ok(Self { + batch_size, + pending_account_updates: Vec::with_capacity(batch_size), + client: Mutex::new(PostgresSqlClientWrapper { + client, + update_account_stmt, + bulk_account_insert_stmt, + }), + }) + } +} + +impl PostgresClient for SimplePostgresClient { + fn update_account( + &mut self, + account: DbAccountInfo, + is_startup: bool, + ) -> Result<(), AccountsDbPluginError> { + trace!( + "Updating account {} with owner {} at slot {}", + bs58::encode(account.pubkey()).into_string(), + bs58::encode(account.owner()).into_string(), + account.slot, + ); + if !is_startup { + return self.upsert_account(&account); + } + self.insert_accounts_in_batch(account) + } + fn update_slot_status( &mut self, slot: u64, @@ -289,11 +482,15 @@ impl PostgresClient for SimplePostgresClient { Ok(()) } + + fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> { + self.flush_buffered_writes() + } } struct UpdateAccountRequest { account: DbAccountInfo, - slot: u64, + is_startup: bool, } struct UpdateSlotRequest { @@ -309,22 +506,41 @@ enum DbWorkItem { impl PostgresClientWorker { fn new(config: AccountsDbPluginPostgresConfig) -> Result { - let client = SimplePostgresClient::new(&config)?; - Ok(PostgresClientWorker { client }) + let result = SimplePostgresClient::new(&config); + match result { + Ok(client) => Ok(PostgresClientWorker { + client, + is_startup_done: false, + }), + Err(err) => { + error!("Error in creating SimplePostgresClient: {}", err); + Err(err) + } + } } fn do_work( &mut self, receiver: Receiver, exit_worker: Arc, + is_startup_done: Arc, + startup_done_count: Arc, ) -> Result<(), AccountsDbPluginError> { while !exit_worker.load(Ordering::Relaxed) { + let mut measure = Measure::start("accountsdb-plugin-postgres-worker-recv"); let work = receiver.recv_timeout(Duration::from_millis(500)); - + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-postgres-worker-recv-us", + measure.as_us() as usize, + 100000, + 100000 + ); match work { Ok(work) => match work { DbWorkItem::UpdateAccount(request) => { - self.client.update_account(&request.account, request.slot)?; + self.client + .update_account(request.account, request.is_startup)?; } DbWorkItem::UpdateSlot(request) => { self.client.update_slot_status( @@ -336,6 +552,12 @@ impl PostgresClientWorker { }, Err(err) => match err { RecvTimeoutError::Timeout => { + if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) { + self.client.notify_end_of_startup()?; + self.is_startup_done = true; + startup_done_count.fetch_add(1, Ordering::Relaxed); + } + continue; } _ => { @@ -351,43 +573,67 @@ impl PostgresClientWorker { pub struct ParallelPostgresClient { workers: Vec>>, exit_worker: Arc, + is_startup_done: Arc, + startup_done_count: Arc, + initialized_worker_count: Arc, sender: Sender, last_report: AtomicInterval, } impl ParallelPostgresClient { pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { + info!("Creating ParallelPostgresClient..."); let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS); let exit_worker = Arc::new(AtomicBool::new(false)); let mut workers = Vec::default(); - - for i in 0..config.threads.unwrap() { + let is_startup_done = Arc::new(AtomicBool::new(false)); + let startup_done_count = Arc::new(AtomicUsize::new(0)); + let worker_count = config.threads.unwrap_or(DEFAULT_THREADS_COUNT); + let initialized_worker_count = Arc::new(AtomicUsize::new(0)); + for i in 0..worker_count { let cloned_receiver = receiver.clone(); let exit_clone = exit_worker.clone(); + let is_startup_done_clone = is_startup_done.clone(); + let startup_done_count_clone = startup_done_count.clone(); + let initialized_worker_count_clone = initialized_worker_count.clone(); let config = config.clone(); let worker = Builder::new() .name(format!("worker-{}", i)) .spawn(move || -> Result<(), AccountsDbPluginError> { - let mut worker = PostgresClientWorker::new(config)?; - worker.do_work(cloned_receiver, exit_clone)?; - Ok(()) + let result = PostgresClientWorker::new(config); + + match result { + Ok(mut worker) => { + initialized_worker_count_clone.fetch_add(1, Ordering::Relaxed); + worker.do_work( + cloned_receiver, + exit_clone, + is_startup_done_clone, + startup_done_count_clone, + )?; + Ok(()) + } + Err(err) => Err(err), + } }) .unwrap(); workers.push(worker); } + info!("Created ParallelPostgresClient."); Ok(Self { last_report: AtomicInterval::default(), workers, exit_worker, + is_startup_done, + startup_done_count, + initialized_worker_count, sender, }) } -} -impl PostgresClient for ParallelPostgresClient { - fn join(&mut self) -> thread::Result<()> { + pub fn join(&mut self) -> thread::Result<()> { self.exit_worker.store(true, Ordering::Relaxed); while !self.workers.is_empty() { let worker = self.workers.pop(); @@ -404,24 +650,36 @@ impl PostgresClient for ParallelPostgresClient { Ok(()) } - fn update_account( + pub fn update_account( &mut self, - account: &T, + account: &ReplicaAccountInfo, slot: u64, + is_startup: bool, ) -> Result<(), AccountsDbPluginError> { if self.last_report.should_update(30000) { - datapoint_info!( + datapoint_debug!( "postgres-plugin-stats", ("message-queue-length", self.sender.len() as i64, i64), ); } - if let Err(err) = self - .sender - .send(DbWorkItem::UpdateAccount(UpdateAccountRequest { - account: DbAccountInfo::new(account), - slot, - })) - { + let mut measure = Measure::start("accountsdb-plugin-posgres-create-work-item"); + let wrk_item = DbWorkItem::UpdateAccount(UpdateAccountRequest { + account: DbAccountInfo::new(account, slot), + is_startup, + }); + + measure.stop(); + + inc_new_counter_debug!( + "accountsdb-plugin-posgres-create-work-item-us", + measure.as_us() as usize, + 100000, + 100000 + ); + + let mut measure = Measure::start("accountsdb-plugin-posgres-send-msg"); + + if let Err(err) = self.sender.send(wrk_item) { return Err(AccountsDbPluginError::AccountsUpdateError { msg: format!( "Failed to update the account {:?}, error: {:?}", @@ -430,10 +688,19 @@ impl PostgresClient for ParallelPostgresClient { ), }); } + + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-posgres-send-msg-us", + measure.as_us() as usize, + 100000, + 100000 + ); + Ok(()) } - fn update_slot_status( + pub fn update_slot_status( &mut self, slot: u64, parent: Option, @@ -450,6 +717,30 @@ impl PostgresClient for ParallelPostgresClient { } Ok(()) } + + pub fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> { + info!("Notifying the end of startup"); + // Ensure all items in the queue has been received by the workers + while !self.sender.is_empty() { + sleep(Duration::from_millis(100)); + } + self.is_startup_done.store(true, Ordering::Relaxed); + + // Wait for all worker threads to be done with flushing + while self.startup_done_count.load(Ordering::Relaxed) + != self.initialized_worker_count.load(Ordering::Relaxed) + { + info!( + "Startup done count: {}, good worker thread count: {}", + self.startup_done_count.load(Ordering::Relaxed), + self.initialized_worker_count.load(Ordering::Relaxed) + ); + sleep(Duration::from_millis(100)); + } + + info!("Done with notifying the end of startup"); + Ok(()) + } } pub struct PostgresClientBuilder {} diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 0e890213f..a80a32719 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -400,6 +400,8 @@ pub enum LoadedAccountAccessor<'a> { Cached(Option<(Pubkey, Cow<'a, CachedAccount>)>), } +mod accountsdb_plugin_utils; + impl<'a> LoadedAccountAccessor<'a> { fn check_and_get_loaded_account(&mut self) -> LoadedAccount { // all of these following .expect() and .unwrap() are like serious logic errors, @@ -6193,15 +6195,7 @@ impl AccountsDb { pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) { self.store(slot, accounts, self.caching_enabled); - if let Some(accounts_update_notifier) = &self.accounts_update_notifier { - let notifier = &accounts_update_notifier.read().unwrap(); - - for account in accounts { - let pubkey = account.0; - let account = account.1; - notifier.notify_account_update(slot, pubkey, account); - } - } + self.notify_account_at_accounts_update(slot, accounts); } /// Store the account update. @@ -7067,24 +7061,6 @@ impl AccountsDb { } } } - - pub fn notify_account_restore_from_snapshot(&self) { - if let Some(accounts_update_notifier) = &self.accounts_update_notifier { - let notifier = &accounts_update_notifier.read().unwrap(); - let slots = self.storage.all_slots(); - for slot in &slots { - let slot_stores = self.storage.get_slot_stores(*slot).unwrap(); - - let slot_stores = slot_stores.read().unwrap(); - for (_, storage_entry) in slot_stores.iter() { - let accounts = storage_entry.all_accounts(); - for account in &accounts { - notifier.notify_account_restore_from_snapshot(*slot, account); - } - } - } - } - } } #[cfg(test)] diff --git a/runtime/src/accounts_db/accountsdb_plugin_utils.rs b/runtime/src/accounts_db/accountsdb_plugin_utils.rs new file mode 100644 index 000000000..2223c8d2e --- /dev/null +++ b/runtime/src/accounts_db/accountsdb_plugin_utils.rs @@ -0,0 +1,480 @@ +use { + crate::{accounts_db::AccountsDb, append_vec::StoredAccountMeta}, + solana_measure::measure::Measure, + solana_metrics::*, + solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey}, + std::collections::{hash_map::Entry, HashMap, HashSet}, +}; + +#[derive(Default)] +pub struct AccountsDbPluginNotifyAtSnapshotRestoreStats { + pub total_accounts: usize, + pub skipped_accounts: usize, + pub notified_accounts: usize, + pub elapsed_filtering_us: usize, + pub total_pure_notify: usize, + pub total_pure_bookeeping: usize, + pub elapsed_notifying_us: usize, +} + +impl AccountsDbPluginNotifyAtSnapshotRestoreStats { + pub fn report(&self) { + datapoint_info!( + "accountsdb_plugin_notify_account_restore_from_snapshot_summary", + ("total_accounts", self.total_accounts, i64), + ("skipped_accounts", self.skipped_accounts, i64), + ("notified_accounts", self.notified_accounts, i64), + ("elapsed_filtering_us", self.elapsed_filtering_us, i64), + ("elapsed_notifying_us", self.elapsed_notifying_us, i64), + ("total_pure_notify_us", self.total_pure_notify, i64), + ("total_pure_bookeeping_us", self.total_pure_bookeeping, i64), + ); + } +} + +impl AccountsDb { + /// Notify the plugins of of account data when AccountsDb is restored from a snapshot. The data is streamed + /// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated + /// multiple times only the last write (with highest write_version) is notified. + pub fn notify_account_restore_from_snapshot(&self) { + if self.accounts_update_notifier.is_none() { + return; + } + + let mut slots = self.storage.all_slots(); + let mut notified_accounts: HashSet = HashSet::default(); + let mut notify_stats = AccountsDbPluginNotifyAtSnapshotRestoreStats::default(); + + slots.sort_by(|a, b| b.cmp(a)); + for slot in slots { + self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats); + } + + let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap(); + let notifier = &accounts_update_notifier.read().unwrap(); + notifier.notify_end_of_restore_from_snapshot(); + notify_stats.report(); + } + + pub fn notify_account_at_accounts_update( + &self, + slot: Slot, + accounts: &[(&Pubkey, &AccountSharedData)], + ) { + if let Some(accounts_update_notifier) = &self.accounts_update_notifier { + let notifier = &accounts_update_notifier.read().unwrap(); + + for account in accounts { + let pubkey = account.0; + let account = account.1; + notifier.notify_account_update(slot, pubkey, account); + } + } + } + + fn notify_accounts_in_slot( + &self, + slot: Slot, + notified_accounts: &mut HashSet, + notify_stats: &mut AccountsDbPluginNotifyAtSnapshotRestoreStats, + ) { + let slot_stores = self.storage.get_slot_stores(slot).unwrap(); + + let slot_stores = slot_stores.read().unwrap(); + let mut accounts_to_stream: HashMap = HashMap::default(); + let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts"); + for (_, storage_entry) in slot_stores.iter() { + let mut accounts = storage_entry.all_accounts(); + let account_len = accounts.len(); + notify_stats.total_accounts += account_len; + accounts.drain(..).into_iter().for_each(|account| { + if notified_accounts.contains(&account.meta.pubkey) { + notify_stats.skipped_accounts += 1; + return; + } + match accounts_to_stream.entry(account.meta.pubkey) { + Entry::Occupied(mut entry) => { + let existing_account = entry.get(); + if account.meta.write_version > existing_account.meta.write_version { + entry.insert(account); + } else { + notify_stats.skipped_accounts += 1; + } + } + Entry::Vacant(entry) => { + entry.insert(account); + } + } + }); + } + measure_filter.stop(); + notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize; + + self.notify_filtered_accounts(slot, notified_accounts, &accounts_to_stream, notify_stats); + } + + fn notify_filtered_accounts( + &self, + slot: Slot, + notified_accounts: &mut HashSet, + accounts_to_stream: &HashMap, + notify_stats: &mut AccountsDbPluginNotifyAtSnapshotRestoreStats, + ) { + let notifier = self + .accounts_update_notifier + .as_ref() + .unwrap() + .read() + .unwrap(); + + let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts"); + for account in accounts_to_stream.values() { + let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts"); + notifier.notify_account_restore_from_snapshot(slot, account); + measure_pure_notify.stop(); + + notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize; + + let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping"); + notified_accounts.insert(account.meta.pubkey); + measure_bookkeep.stop(); + notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize; + } + notify_stats.notified_accounts += accounts_to_stream.len(); + measure_notify.stop(); + notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize; + } +} + +#[cfg(test)] +pub mod tests { + use { + crate::{ + accounts_db::AccountsDb, + accounts_update_notifier_interface::{ + AccountsUpdateNotifier, AccountsUpdateNotifierInterface, + }, + append_vec::StoredAccountMeta, + }, + dashmap::DashMap, + solana_sdk::{ + account::{AccountSharedData, ReadableAccount}, + clock::Slot, + pubkey::Pubkey, + }, + std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + }; + + impl AccountsDb { + pub fn set_accountsdb_plugin_notifer(&mut self, notifier: Option) { + self.accounts_update_notifier = notifier; + } + } + + #[derive(Debug, Default)] + struct AccountsDbTestPlugin { + pub accounts_at_snapshot_restore: DashMap>, + pub is_startup_done: AtomicBool, + } + + impl AccountsUpdateNotifierInterface for AccountsDbTestPlugin { + /// Notified when an account is updated at runtime, due to transaction activities + fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) { + self.accounts_at_snapshot_restore + .entry(*pubkey) + .or_insert(Vec::default()) + .push((slot, account.clone())); + } + + /// Notified when the AccountsDb is initialized at start when restored + /// from a snapshot. + fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) { + self.accounts_at_snapshot_restore + .entry(account.meta.pubkey) + .or_insert(Vec::default()) + .push((slot, account.clone_account())); + } + + /// Notified when a slot is optimistically confirmed + fn notify_slot_confirmed(&self, _slot: Slot, _parent: Option) {} + + /// Notified when a slot is marked frozen. + fn notify_slot_processed(&self, _slot: Slot, _parent: Option) {} + + /// Notified when a slot is rooted. + fn notify_slot_rooted(&self, _slot: Slot, _parent: Option) {} + + fn notify_end_of_restore_from_snapshot(&self) { + self.is_startup_done.store(true, Ordering::Relaxed); + } + } + + #[test] + fn test_notify_account_restore_from_snapshot_once_per_slot() { + let mut accounts = AccountsDb::new_single_for_tests(); + // Account with key1 is updated twice in the store -- should only get notified once. + let key1 = solana_sdk::pubkey::new_rand(); + let mut account1_lamports: u64 = 1; + let account1 = + AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner()); + let slot0 = 0; + accounts.store_uncached(slot0, &[(&key1, &account1)]); + + account1_lamports = 2; + let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner()); + accounts.store_uncached(slot0, &[(&key1, &account1)]); + let notifier = AccountsDbTestPlugin::default(); + + let key2 = solana_sdk::pubkey::new_rand(); + let account2_lamports: u64 = 100; + let account2 = + AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner()); + + accounts.store_uncached(slot0, &[(&key2, &account2)]); + + let notifier = Arc::new(RwLock::new(notifier)); + accounts.set_accountsdb_plugin_notifer(Some(notifier.clone())); + + accounts.notify_account_restore_from_snapshot(); + + let notifier = notifier.write().unwrap(); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key1) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0] + .1 + .lamports(), + account1_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0, + slot0 + ); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key2) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0] + .1 + .lamports(), + account2_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0, + slot0 + ); + + assert!(notifier.is_startup_done.load(Ordering::Relaxed)); + } + + #[test] + fn test_notify_account_restore_from_snapshot_once_across_slots() { + let mut accounts = AccountsDb::new_single_for_tests(); + // Account with key1 is updated twice in two different slots -- should only get notified once. + // Account with key2 is updated slot0, should get notified once + // Account with key3 is updated in slot1, should get notified once + let key1 = solana_sdk::pubkey::new_rand(); + let mut account1_lamports: u64 = 1; + let account1 = + AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner()); + let slot0 = 0; + accounts.store_uncached(slot0, &[(&key1, &account1)]); + + let key2 = solana_sdk::pubkey::new_rand(); + let account2_lamports: u64 = 200; + let account2 = + AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner()); + accounts.store_uncached(slot0, &[(&key2, &account2)]); + + account1_lamports = 2; + let slot1 = 1; + let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner()); + accounts.store_uncached(slot1, &[(&key1, &account1)]); + let notifier = AccountsDbTestPlugin::default(); + + let key3 = solana_sdk::pubkey::new_rand(); + let account3_lamports: u64 = 300; + let account3 = + AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner()); + accounts.store_uncached(slot1, &[(&key3, &account3)]); + + let notifier = Arc::new(RwLock::new(notifier)); + accounts.set_accountsdb_plugin_notifer(Some(notifier.clone())); + + accounts.notify_account_restore_from_snapshot(); + + let notifier = notifier.write().unwrap(); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key1) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0] + .1 + .lamports(), + account1_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0, + slot1 + ); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key2) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0] + .1 + .lamports(), + account2_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0, + slot0 + ); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key3) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0] + .1 + .lamports(), + account3_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0].0, + slot1 + ); + assert!(notifier.is_startup_done.load(Ordering::Relaxed)); + } + + #[test] + fn test_notify_account_at_accounts_update() { + let mut accounts = AccountsDb::new_single_for_tests(); + let notifier = AccountsDbTestPlugin::default(); + + let notifier = Arc::new(RwLock::new(notifier)); + accounts.set_accountsdb_plugin_notifer(Some(notifier.clone())); + + // Account with key1 is updated twice in two different slots -- should only get notified twice. + // Account with key2 is updated slot0, should get notified once + // Account with key3 is updated in slot1, should get notified once + let key1 = solana_sdk::pubkey::new_rand(); + let account1_lamports1: u64 = 1; + let account1 = + AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner()); + let slot0 = 0; + accounts.store_cached(slot0, &[(&key1, &account1)]); + + let key2 = solana_sdk::pubkey::new_rand(); + let account2_lamports: u64 = 200; + let account2 = + AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner()); + accounts.store_cached(slot0, &[(&key2, &account2)]); + + let account1_lamports2 = 2; + let slot1 = 1; + let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner()); + accounts.store_cached(slot1, &[(&key1, &account1)]); + + let key3 = solana_sdk::pubkey::new_rand(); + let account3_lamports: u64 = 300; + let account3 = + AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner()); + accounts.store_cached(slot1, &[(&key3, &account3)]); + + let notifier = notifier.write().unwrap(); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key1) + .unwrap() + .len(), + 2 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0] + .1 + .lamports(), + account1_lamports1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0, + slot0 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[1] + .1 + .lamports(), + account1_lamports2 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[1].0, + slot1 + ); + + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key2) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0] + .1 + .lamports(), + account2_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0, + slot0 + ); + assert_eq!( + notifier + .accounts_at_snapshot_restore + .get(&key3) + .unwrap() + .len(), + 1 + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0] + .1 + .lamports(), + account3_lamports + ); + assert_eq!( + notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0].0, + slot1 + ); + } +} diff --git a/runtime/src/accounts_update_notifier_interface.rs b/runtime/src/accounts_update_notifier_interface.rs index 70730ef01..bfd62e9b1 100644 --- a/runtime/src/accounts_update_notifier_interface.rs +++ b/runtime/src/accounts_update_notifier_interface.rs @@ -12,6 +12,9 @@ pub trait AccountsUpdateNotifierInterface: std::fmt::Debug { /// from a snapshot. fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta); + /// Notified when all accounts have been notified when restoring from a snapshot. + fn notify_end_of_restore_from_snapshot(&self); + /// Notified when a slot is optimistically confirmed fn notify_slot_confirmed(&self, slot: Slot, parent: Option); diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index f744848d4..0ac7719b6 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -44,6 +44,7 @@ use { atomic::{AtomicUsize, Ordering}, Arc, RwLock, }, + thread::Builder, }, }; @@ -534,11 +535,22 @@ where accounts_db .write_version .fetch_add(snapshot_version, Ordering::Relaxed); + + let mut measure_notify = Measure::start("accounts_notify"); + + let accounts_db = Arc::new(accounts_db); + let accoounts_db_clone = accounts_db.clone(); + let handle = Builder::new() + .name("notify_account_restore_from_snapshot".to_string()) + .spawn(move || { + accoounts_db_clone.notify_account_restore_from_snapshot(); + }) + .unwrap(); + accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index); accounts_db.maybe_add_filler_accounts(genesis_config.ticks_per_slot()); - let mut measure_notify = Measure::start("accounts_notify"); - accounts_db.notify_account_restore_from_snapshot(); + handle.join().unwrap(); measure_notify.stop(); datapoint_info!( @@ -552,5 +564,5 @@ where ("accountsdb-notify-at-start-us", measure_notify.as_us(), i64), ); - Ok(accounts_db) + Ok(Arc::try_unwrap(accounts_db).unwrap()) }