Accountsdb plugin postgres -- bulk insertion at startup (#20763)

Use bulk insertion to Postgres at startup to reduce time taken for initial snapshot restore for postgres plugin. Avoid duplicate writes of accounts at startup. Doing account plugin notification and indexing in parallel.

Improved error handling for postgres plugin to show the real db issues for debug purpose
Added more metrics for postgres plugin.
Refactored plugin centric code out to a sub module from accounts_db and added unit tests
This commit is contained in:
Lijun Wang 2021-10-24 12:43:33 -07:00 committed by GitHub
parent 5e1cf39c74
commit f14365f4b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1064 additions and 188 deletions

18
Cargo.lock generated
View File

@ -2949,18 +2949,18 @@ dependencies = [
[[package]]
name = "phf"
version = "0.8.0"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dfb61232e34fcb633f43d12c58f83c1df82962dcdfa565a4e866ffc17dafe12"
checksum = "b9fc3db1018c4b59d7d582a739436478b6035138b6aecbce989fc91c3e98409f"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_shared"
version = "0.8.0"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c00cf8b9eafe68dde5e9eaa2cef8ee84a9336a47d566ec55ca16589633b65af7"
checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096"
dependencies = [
"siphasher",
]
@ -3076,9 +3076,9 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f"
checksum = "04619f94ba0cc80999f4fc7073607cb825bc739a883cb6d20900fc5e009d6b0d"
dependencies = [
"bytes 1.0.1",
"chrono",
@ -4287,9 +4287,11 @@ dependencies = [
"serde_json",
"solana-accountsdb-plugin-interface",
"solana-logger 1.9.0",
"solana-measure",
"solana-metrics",
"solana-sdk",
"thiserror",
"tokio-postgres",
]
[[package]]
@ -6452,9 +6454,9 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.2"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2b1383c7e4fb9a09e292c7c6afb7da54418d53b045f1c1fac7a911411a2b8b"
checksum = "2f916ee7e52c8a74dfe4162dd73a073d0d7d4b387ea7b97a774c0c10b0776531"
dependencies = [
"async-trait",
"byteorder",

View File

@ -25,19 +25,19 @@ pub enum ReplicaAccountInfoVersions<'a> {
#[derive(Error, Debug)]
pub enum AccountsDbPluginError {
#[error("Error opening config file.")]
#[error("Error opening config file. Error detail: ({0}).")]
ConfigFileOpenError(#[from] io::Error),
#[error("Error reading config file.")]
#[error("Error reading config file. Error message: ({msg})")]
ConfigFileReadError { msg: String },
#[error("Error updating account.")]
#[error("Error updating account. Error message: ({msg})")]
AccountsUpdateError { msg: String },
#[error("Error updating slot status.")]
#[error("Error updating slot status. Error message: ({msg})")]
SlotStatusUpdateError { msg: String },
#[error("Plugin-defined custom error.")]
#[error("Plugin-defined custom error. Error message: ({0})")]
Custom(Box<dyn error::Error + Send + Sync>),
}
@ -78,7 +78,15 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug {
fn on_unload(&mut self) {}
/// Called when an account is updated at a slot.
fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()>;
fn update_account(
&mut self,
account: ReplicaAccountInfoVersions,
slot: u64,
is_startup: bool,
) -> Result<()>;
/// Called when all accounts are notified of during startup.
fn notify_end_of_startup(&mut self) -> Result<()>;
/// Called when a slot status is updated
fn update_slot_status(

View File

@ -26,13 +26,65 @@ pub(crate) struct AccountsUpdateNotifierImpl {
impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) {
if let Some(account_info) = self.accountinfo_from_shared_account_data(pubkey, account) {
self.notify_plugins_of_account_update(account_info, slot);
self.notify_plugins_of_account_update(account_info, slot, false);
}
}
fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
if let Some(account_info) = self.accountinfo_from_stored_account_meta(account) {
self.notify_plugins_of_account_update(account_info, slot);
let mut measure_all = Measure::start("accountsdb-plugin-notify-account-restore-all");
let mut measure_copy = Measure::start("accountsdb-plugin-copy-stored-account-info");
let account = self.accountinfo_from_stored_account_meta(account);
measure_copy.stop();
inc_new_counter_debug!(
"accountsdb-plugin-copy-stored-account-info-us",
measure_copy.as_us() as usize,
100000,
100000
);
if let Some(account_info) = account {
self.notify_plugins_of_account_update(account_info, slot, true);
}
measure_all.stop();
inc_new_counter_debug!(
"accountsdb-plugin-notify-account-restore-all-us",
measure_all.as_us() as usize,
100000,
100000
);
}
fn notify_end_of_restore_from_snapshot(&self) {
let mut plugin_manager = self.plugin_manager.write().unwrap();
if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter_mut() {
let mut measure = Measure::start("accountsdb-plugin-end-of-restore-from-snapshot");
match plugin.notify_end_of_startup() {
Err(err) => {
error!(
"Failed to notify the end of restore from snapshot, error: {} to plugin {}",
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully notified the end of restore from snapshot to plugin {}",
plugin.name()
);
}
}
measure.stop();
inc_new_counter_debug!(
"accountsdb-plugin-end-of-restore-from-snapshot",
measure.as_us() as usize
);
}
}
@ -83,7 +135,13 @@ impl AccountsUpdateNotifierImpl {
})
}
fn notify_plugins_of_account_update(&self, account: ReplicaAccountInfo, slot: Slot) {
fn notify_plugins_of_account_update(
&self,
account: ReplicaAccountInfo,
slot: Slot,
is_startup: bool,
) {
let mut measure2 = Measure::start("accountsdb-plugin-notify_plugins_of_account_update");
let mut plugin_manager = self.plugin_manager.write().unwrap();
if plugin_manager.plugins.is_empty() {
@ -91,7 +149,11 @@ impl AccountsUpdateNotifierImpl {
}
for plugin in plugin_manager.plugins.iter_mut() {
let mut measure = Measure::start("accountsdb-plugin-update-account");
match plugin.update_account(ReplicaAccountInfoVersions::V0_0_1(&account), slot) {
match plugin.update_account(
ReplicaAccountInfoVersions::V0_0_1(&account),
slot,
is_startup,
) {
Err(err) => {
error!(
"Failed to update account {} at slot {}, error: {} to plugin {}",
@ -111,13 +173,20 @@ impl AccountsUpdateNotifierImpl {
}
}
measure.stop();
inc_new_counter_info!(
"accountsdb-plugin-update-account-ms",
measure.as_ms() as usize,
inc_new_counter_debug!(
"accountsdb-plugin-update-account-us",
measure.as_us() as usize,
100000,
100000
);
}
measure2.stop();
inc_new_counter_debug!(
"accountsdb-plugin-notify_plugins_of_account_update-us",
measure2.as_us() as usize,
100000,
100000
);
}
pub fn notify_slot_status(&self, slot: Slot, parent: Option<Slot>, slot_status: SlotStatus) {
@ -146,9 +215,9 @@ impl AccountsUpdateNotifierImpl {
}
}
measure.stop();
inc_new_counter_info!(
"accountsdb-plugin-update-slot-ms",
measure.as_ms() as usize,
inc_new_counter_debug!(
"accountsdb-plugin-update-slot-us",
measure.as_us() as usize,
1000,
1000
);

View File

@ -23,9 +23,10 @@ serde_derive = "1.0.103"
serde_json = "1.0.67"
solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.9.0" }
solana-logger = { path = "../logger", version = "=1.9.0" }
solana-measure = { path = "../measure", version = "=1.9.0" }
solana-metrics = { path = "../metrics", version = "=1.9.0" }
solana-sdk = { path = "../sdk", version = "=1.9.0" }
thiserror = "1.0.30"
tokio-postgres = "0.7.3"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@ -1,10 +1,10 @@
use solana_measure::measure::Measure;
/// Main entry for the PostgreSQL plugin
use {
crate::{
accounts_selector::AccountsSelector,
postgres_client::{
ParallelPostgresClient, PostgresClient, PostgresClientBuilder, SimplePostgresClient,
},
postgres_client::{ParallelPostgresClient, PostgresClientBuilder},
},
bs58,
log::*,
@ -13,19 +13,14 @@ use {
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, Result, SlotStatus,
},
solana_metrics::*,
std::{fs::File, io::Read},
thiserror::Error,
};
#[allow(clippy::large_enum_variant)]
enum PostgresClientEnum {
Simple(SimplePostgresClient),
Parallel(ParallelPostgresClient),
}
#[derive(Default)]
pub struct AccountsDbPluginPostgres {
client: Option<PostgresClientEnum>,
client: Option<ParallelPostgresClient>,
accounts_selector: Option<AccountsSelector>,
}
@ -41,14 +36,15 @@ pub struct AccountsDbPluginPostgresConfig {
pub user: String,
pub threads: Option<usize>,
pub port: Option<u16>,
pub batch_size: Option<usize>,
}
#[derive(Error, Debug)]
pub enum AccountsDbPluginPostgresError {
#[error("Error connecting to the backend data store.")]
#[error("Error connecting to the backend data store. Error message: ({msg})")]
DataStoreConnectionError { msg: String },
#[error("Error preparing data store schema.")]
#[error("Error preparing data store schema. Error message: ({msg})")]
DataSchemaError { msg: String },
}
@ -79,7 +75,9 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
/// "host" specifies the PostgreSQL server.
/// "user" specifies the PostgreSQL user.
/// "threads" optional, specifies the number of worker threads for the plugin. A thread
/// maintains a PostgreSQL connection to the server.
/// maintains a PostgreSQL connection to the server. The default is 10.
/// "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created
/// from restoring a snapshot. The default is "10".
/// # Examples
/// {
/// "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so",
@ -116,13 +114,8 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
})
}
Ok(config) => {
self.client = if config.threads.is_some() && config.threads.unwrap() > 1 {
let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?;
Some(PostgresClientEnum::Parallel(client))
} else {
let client = PostgresClientBuilder::build_simple_postgres_client(&config)?;
Some(PostgresClientEnum::Simple(client))
};
let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?;
self.client = Some(client);
}
}
@ -134,18 +127,23 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
match &mut self.client {
None => {}
Some(client) => match client {
PostgresClientEnum::Parallel(client) => client.join().unwrap(),
PostgresClientEnum::Simple(client) => {
client.join().unwrap();
}
},
Some(client) => {
client.join().unwrap();
}
}
}
fn update_account(&mut self, account: ReplicaAccountInfoVersions, slot: u64) -> Result<()> {
fn update_account(
&mut self,
account: ReplicaAccountInfoVersions,
slot: u64,
is_startup: bool,
) -> Result<()> {
let mut measure_all = Measure::start("accountsdb-plugin-postgres-update-account-main");
match account {
ReplicaAccountInfoVersions::V0_0_1(account) => {
let mut measure_select =
Measure::start("accountsdb-plugin-postgres-update-account-select");
if let Some(accounts_selector) = &self.accounts_selector {
if !accounts_selector.is_account_selected(account.pubkey, account.owner) {
return Ok(());
@ -153,6 +151,13 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
} else {
return Ok(());
}
measure_select.stop();
inc_new_counter_debug!(
"accountsdb-plugin-postgres-update-account-select-us",
measure_select.as_us() as usize,
100000,
100000
);
debug!(
"Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}",
@ -172,14 +177,17 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
)));
}
Some(client) => {
let result = match client {
PostgresClientEnum::Parallel(client) => {
client.update_account(account, slot)
}
PostgresClientEnum::Simple(client) => {
client.update_account(account, slot)
}
};
let mut measure_update =
Measure::start("accountsdb-plugin-postgres-update-account-client");
let result = { client.update_account(account, slot, is_startup) };
measure_update.stop();
inc_new_counter_debug!(
"accountsdb-plugin-postgres-update-account-client-us",
measure_update.as_us() as usize,
100000,
100000
);
if let Err(err) = result {
return Err(AccountsDbPluginError::AccountsUpdateError {
@ -190,6 +198,16 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
}
}
}
measure_all.stop();
inc_new_counter_debug!(
"accountsdb-plugin-postgres-update-account-main-us",
measure_all.as_us() as usize,
100000,
100000
);
Ok(())
}
@ -210,14 +228,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
)));
}
Some(client) => {
let result = match client {
PostgresClientEnum::Parallel(client) => {
client.update_slot_status(slot, parent, status)
}
PostgresClientEnum::Simple(client) => {
client.update_slot_status(slot, parent, status)
}
};
let result = client.update_slot_status(slot, parent, status);
if let Err(err) = result {
return Err(AccountsDbPluginError::SlotStatusUpdateError{
@ -229,6 +240,29 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
Ok(())
}
fn notify_end_of_startup(&mut self) -> Result<()> {
info!("Notifying the end of startup for accounts notifications");
match &mut self.client {
None => {
return Err(AccountsDbPluginError::Custom(Box::new(
AccountsDbPluginPostgresError::DataStoreConnectionError {
msg: "There is no connection to the PostgreSQL database.".to_string(),
},
)));
}
Some(client) => {
let result = client.notify_end_of_startup();
if let Err(err) = result {
return Err(AccountsDbPluginError::SlotStatusUpdateError{
msg: format!("Failed to notify the end of startup for accounts notifications. Error: {:?}", err)
});
}
}
}
Ok(())
}
}
impl AccountsDbPluginPostgres {

View File

@ -1,3 +1,5 @@
#![allow(clippy::integer_arithmetic)]
/// A concurrent implementation for writing accounts into the PostgreSQL in parallel.
use {
crate::accountsdb_plugin_postgres::{
@ -10,34 +12,44 @@ use {
solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{
AccountsDbPluginError, ReplicaAccountInfo, SlotStatus,
},
solana_metrics::datapoint_info,
solana_measure::measure::Measure,
solana_metrics::*,
solana_sdk::timing::AtomicInterval,
std::{
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
},
thread::{self, Builder, JoinHandle},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
tokio_postgres::types::ToSql,
};
/// The maximum asynchronous requests allowed in the channel to avoid excessive
/// memory usage. The downside -- calls after this threshold is reached can get blocked.
const MAX_ASYNC_REQUESTS: usize = 10240;
const MAX_ASYNC_REQUESTS: usize = 40960;
const DEFAULT_POSTGRES_PORT: u16 = 5432;
const DEFAULT_THREADS_COUNT: usize = 100;
const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
const ACCOUNT_COLUMN_COUNT: usize = 8;
struct PostgresSqlClientWrapper {
client: Client,
update_account_stmt: Statement,
bulk_account_insert_stmt: Statement,
}
pub struct SimplePostgresClient {
batch_size: usize,
pending_account_updates: Vec<DbAccountInfo>,
client: Mutex<PostgresSqlClientWrapper>,
}
struct PostgresClientWorker {
client: SimplePostgresClient,
/// Indicating if accounts notification during startup is done.
is_startup_done: bool,
}
impl Eq for DbAccountInfo {}
@ -45,23 +57,25 @@ impl Eq for DbAccountInfo {}
#[derive(Clone, PartialEq, Debug)]
pub struct DbAccountInfo {
pub pubkey: Vec<u8>,
pub lamports: u64,
pub lamports: i64,
pub owner: Vec<u8>,
pub executable: bool,
pub rent_epoch: u64,
pub rent_epoch: i64,
pub data: Vec<u8>,
pub slot: i64,
}
impl DbAccountInfo {
fn new<T: ReadableAccountInfo>(account: &T) -> DbAccountInfo {
fn new<T: ReadableAccountInfo>(account: &T, slot: u64) -> DbAccountInfo {
let data = account.data().to_vec();
Self {
pubkey: account.pubkey().to_vec(),
lamports: account.lamports(),
lamports: account.lamports() as i64,
owner: account.owner().to_vec(),
executable: account.executable(),
rent_epoch: account.rent_epoch(),
rent_epoch: account.rent_epoch() as i64,
data,
slot: slot as i64,
}
}
}
@ -69,9 +83,9 @@ impl DbAccountInfo {
pub trait ReadableAccountInfo: Sized {
fn pubkey(&self) -> &[u8];
fn owner(&self) -> &[u8];
fn lamports(&self) -> u64;
fn lamports(&self) -> i64;
fn executable(&self) -> bool;
fn rent_epoch(&self) -> u64;
fn rent_epoch(&self) -> i64;
fn data(&self) -> &[u8];
}
@ -84,7 +98,7 @@ impl ReadableAccountInfo for DbAccountInfo {
&self.owner
}
fn lamports(&self) -> u64 {
fn lamports(&self) -> i64 {
self.lamports
}
@ -92,7 +106,7 @@ impl ReadableAccountInfo for DbAccountInfo {
self.executable
}
fn rent_epoch(&self) -> u64 {
fn rent_epoch(&self) -> i64 {
self.rent_epoch
}
@ -110,16 +124,16 @@ impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> {
self.owner
}
fn lamports(&self) -> u64 {
self.lamports
fn lamports(&self) -> i64 {
self.lamports as i64
}
fn executable(&self) -> bool {
self.executable
}
fn rent_epoch(&self) -> u64 {
self.rent_epoch
fn rent_epoch(&self) -> i64 {
self.rent_epoch as i64
}
fn data(&self) -> &[u8] {
@ -132,10 +146,10 @@ pub trait PostgresClient {
Ok(())
}
fn update_account<T: ReadableAccountInfo>(
fn update_account(
&mut self,
account: &T,
slot: u64,
account: DbAccountInfo,
is_startup: bool,
) -> Result<(), AccountsDbPluginError>;
fn update_slot_status(
@ -144,73 +158,120 @@ pub trait PostgresClient {
parent: Option<u64>,
status: SlotStatus,
) -> Result<(), AccountsDbPluginError>;
fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError>;
}
impl SimplePostgresClient {
pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
fn connect_to_db(
config: &AccountsDbPluginPostgresConfig,
) -> Result<Client, AccountsDbPluginError> {
let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT);
let connection_str = format!("host={} user={} port={}", config.host, config.user, port);
match Client::connect(&connection_str, NoTls) {
Err(err) => {
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataStoreConnectionError {
msg: format!(
let msg = format!(
"Error in connecting to the PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}",
err, config.host, config.user, connection_str
err, config.host, config.user, connection_str);
error!("{}", msg);
Err(AccountsDbPluginError::Custom(Box::new(
AccountsDbPluginPostgresError::DataStoreConnectionError { msg },
)))
}
Ok(client) => Ok(client),
}
}
fn build_bulk_account_insert_statement(
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
) -> Result<Statement, AccountsDbPluginError> {
let batch_size = config
.batch_size
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) VALUES");
for j in 0..batch_size {
let row = j * ACCOUNT_COLUMN_COUNT;
let val_str = format!(
"(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})",
row + 1,
row + 2,
row + 3,
row + 4,
row + 5,
row + 6,
row + 7,
row + 8,
);
if j == 0 {
stmt = format!("{} {}", &stmt, val_str);
} else {
stmt = format!("{}, {}", &stmt, val_str);
}
}
let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot";
stmt = format!("{} {}", stmt, handle_conflict);
info!("{}", stmt);
let bulk_stmt = client.prepare(&stmt);
match bulk_stmt {
Err(err) => {
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
msg: format!(
"Error in preparing for the accounts update PostgreSQL database: {} host: {} user: {} config: {:?}",
err, config.host, config.user, config
),
})));
}
Ok(mut client) => {
let result = client.prepare("INSERT INTO account (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
ON CONFLICT (pubkey) DO UPDATE SET slot=$2, owner=$3, lamports=$4, executable=$5, rent_epoch=$6, \
data=$7, updated_on=$8");
match result {
Err(err) => {
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
msg: format!(
"Error in preparing for the accounts update PostgreSQL database: {:?} host: {:?} user: {:?} config: {:?}",
err, config.host, config.user, connection_str
),
})));
}
Ok(update_account_stmt) => Ok(Self {
client: Mutex::new(PostgresSqlClientWrapper {
client,
update_account_stmt,
}),
}),
}
}
Ok(update_account_stmt) => Ok(update_account_stmt),
}
}
}
impl PostgresClient for SimplePostgresClient {
fn update_account<T: ReadableAccountInfo>(
&mut self,
account: &T,
slot: u64,
fn build_single_account_upsert_statement(
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
) -> Result<Statement, AccountsDbPluginError> {
let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) \
ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \
data=excluded.data, updated_on=excluded.updated_on WHERE acct.slot <= excluded.slot";
let stmt = client.prepare(stmt);
match stmt {
Err(err) => {
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
msg: format!(
"Error in preparing for the accounts update PostgreSQL database: {} host: {} user: {} config: {:?}",
err, config.host, config.user, config
),
})));
}
Ok(update_account_stmt) => Ok(update_account_stmt),
}
}
/// Internal function for updating or inserting a single account
fn upsert_account_internal(
account: &DbAccountInfo,
statement: &Statement,
client: &mut Client,
) -> Result<(), AccountsDbPluginError> {
trace!(
"Updating account {} with owner {} at slot {}",
bs58::encode(account.pubkey()).into_string(),
bs58::encode(account.owner()).into_string(),
slot,
);
let slot = slot as i64; // postgres only supports i64
let lamports = account.lamports() as i64;
let rent_epoch = account.rent_epoch() as i64;
let updated_on = Utc::now().naive_utc();
let client = self.client.get_mut().unwrap();
let result = client.client.query(
&client.update_account_stmt,
let result = client.query(
statement,
&[
&account.pubkey(),
&slot,
&account.slot,
&account.owner(),
&lamports,
&account.executable(),
@ -228,9 +289,141 @@ impl PostgresClient for SimplePostgresClient {
error!("{}", msg);
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
}
Ok(())
}
/// Update or insert a single account
fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> {
let client = self.client.get_mut().unwrap();
let statement = &client.update_account_stmt;
let client = &mut client.client;
Self::upsert_account_internal(account, statement, client)
}
/// Insert accounts in batch to reduce network overhead
fn insert_accounts_in_batch(
&mut self,
account: DbAccountInfo,
) -> Result<(), AccountsDbPluginError> {
self.pending_account_updates.push(account);
if self.pending_account_updates.len() == self.batch_size {
let mut measure = Measure::start("accountsdb-plugin-postgres-prepare-values");
let mut values: Vec<&(dyn ToSql + Sync)> =
Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT);
let updated_on = Utc::now().naive_utc();
for j in 0..self.batch_size {
let account = &self.pending_account_updates[j];
values.push(&account.pubkey);
values.push(&account.slot);
values.push(&account.owner);
values.push(&account.lamports);
values.push(&account.executable);
values.push(&account.rent_epoch);
values.push(&account.data);
values.push(&updated_on);
}
measure.stop();
inc_new_counter_debug!(
"accountsdb-plugin-postgres-prepare-values-us",
measure.as_us() as usize,
10000,
10000
);
let mut measure = Measure::start("accountsdb-plugin-postgres-update-account");
let client = self.client.get_mut().unwrap();
let result = client
.client
.query(&client.bulk_account_insert_stmt, &values);
self.pending_account_updates.clear();
if let Err(err) = result {
let msg = format!(
"Failed to persist the update of account to the PostgreSQL database. Error: {:?}",
err
);
error!("{}", msg);
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
}
measure.stop();
inc_new_counter_debug!(
"accountsdb-plugin-postgres-update-account-us",
measure.as_us() as usize,
10000,
10000
);
inc_new_counter_debug!(
"accountsdb-plugin-postgres-update-account-count",
self.batch_size,
10000,
10000
);
}
Ok(())
}
/// Flush any left over accounts in batch which are not processed in the last batch
fn flush_buffered_writes(&mut self) -> Result<(), AccountsDbPluginError> {
if self.pending_account_updates.is_empty() {
return Ok(());
}
let client = self.client.get_mut().unwrap();
let statement = &client.update_account_stmt;
let client = &mut client.client;
for account in self.pending_account_updates.drain(..) {
Self::upsert_account_internal(&account, statement, client)?;
}
Ok(())
}
pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
info!("Creating SimplePostgresClient...");
let mut client = Self::connect_to_db(config)?;
let bulk_account_insert_stmt =
Self::build_bulk_account_insert_statement(&mut client, config)?;
let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?;
let batch_size = config
.batch_size
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
info!("Created SimplePostgresClient.");
Ok(Self {
batch_size,
pending_account_updates: Vec::with_capacity(batch_size),
client: Mutex::new(PostgresSqlClientWrapper {
client,
update_account_stmt,
bulk_account_insert_stmt,
}),
})
}
}
impl PostgresClient for SimplePostgresClient {
fn update_account(
&mut self,
account: DbAccountInfo,
is_startup: bool,
) -> Result<(), AccountsDbPluginError> {
trace!(
"Updating account {} with owner {} at slot {}",
bs58::encode(account.pubkey()).into_string(),
bs58::encode(account.owner()).into_string(),
account.slot,
);
if !is_startup {
return self.upsert_account(&account);
}
self.insert_accounts_in_batch(account)
}
fn update_slot_status(
&mut self,
slot: u64,
@ -289,11 +482,15 @@ impl PostgresClient for SimplePostgresClient {
Ok(())
}
fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
self.flush_buffered_writes()
}
}
struct UpdateAccountRequest {
account: DbAccountInfo,
slot: u64,
is_startup: bool,
}
struct UpdateSlotRequest {
@ -309,22 +506,41 @@ enum DbWorkItem {
impl PostgresClientWorker {
fn new(config: AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
let client = SimplePostgresClient::new(&config)?;
Ok(PostgresClientWorker { client })
let result = SimplePostgresClient::new(&config);
match result {
Ok(client) => Ok(PostgresClientWorker {
client,
is_startup_done: false,
}),
Err(err) => {
error!("Error in creating SimplePostgresClient: {}", err);
Err(err)
}
}
}
fn do_work(
&mut self,
receiver: Receiver<DbWorkItem>,
exit_worker: Arc<AtomicBool>,
is_startup_done: Arc<AtomicBool>,
startup_done_count: Arc<AtomicUsize>,
) -> Result<(), AccountsDbPluginError> {
while !exit_worker.load(Ordering::Relaxed) {
let mut measure = Measure::start("accountsdb-plugin-postgres-worker-recv");
let work = receiver.recv_timeout(Duration::from_millis(500));
measure.stop();
inc_new_counter_debug!(
"accountsdb-plugin-postgres-worker-recv-us",
measure.as_us() as usize,
100000,
100000
);
match work {
Ok(work) => match work {
DbWorkItem::UpdateAccount(request) => {
self.client.update_account(&request.account, request.slot)?;
self.client
.update_account(request.account, request.is_startup)?;
}
DbWorkItem::UpdateSlot(request) => {
self.client.update_slot_status(
@ -336,6 +552,12 @@ impl PostgresClientWorker {
},
Err(err) => match err {
RecvTimeoutError::Timeout => {
if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) {
self.client.notify_end_of_startup()?;
self.is_startup_done = true;
startup_done_count.fetch_add(1, Ordering::Relaxed);
}
continue;
}
_ => {
@ -351,43 +573,67 @@ impl PostgresClientWorker {
pub struct ParallelPostgresClient {
workers: Vec<JoinHandle<Result<(), AccountsDbPluginError>>>,
exit_worker: Arc<AtomicBool>,
is_startup_done: Arc<AtomicBool>,
startup_done_count: Arc<AtomicUsize>,
initialized_worker_count: Arc<AtomicUsize>,
sender: Sender<DbWorkItem>,
last_report: AtomicInterval,
}
impl ParallelPostgresClient {
pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result<Self, AccountsDbPluginError> {
info!("Creating ParallelPostgresClient...");
let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS);
let exit_worker = Arc::new(AtomicBool::new(false));
let mut workers = Vec::default();
for i in 0..config.threads.unwrap() {
let is_startup_done = Arc::new(AtomicBool::new(false));
let startup_done_count = Arc::new(AtomicUsize::new(0));
let worker_count = config.threads.unwrap_or(DEFAULT_THREADS_COUNT);
let initialized_worker_count = Arc::new(AtomicUsize::new(0));
for i in 0..worker_count {
let cloned_receiver = receiver.clone();
let exit_clone = exit_worker.clone();
let is_startup_done_clone = is_startup_done.clone();
let startup_done_count_clone = startup_done_count.clone();
let initialized_worker_count_clone = initialized_worker_count.clone();
let config = config.clone();
let worker = Builder::new()
.name(format!("worker-{}", i))
.spawn(move || -> Result<(), AccountsDbPluginError> {
let mut worker = PostgresClientWorker::new(config)?;
worker.do_work(cloned_receiver, exit_clone)?;
Ok(())
let result = PostgresClientWorker::new(config);
match result {
Ok(mut worker) => {
initialized_worker_count_clone.fetch_add(1, Ordering::Relaxed);
worker.do_work(
cloned_receiver,
exit_clone,
is_startup_done_clone,
startup_done_count_clone,
)?;
Ok(())
}
Err(err) => Err(err),
}
})
.unwrap();
workers.push(worker);
}
info!("Created ParallelPostgresClient.");
Ok(Self {
last_report: AtomicInterval::default(),
workers,
exit_worker,
is_startup_done,
startup_done_count,
initialized_worker_count,
sender,
})
}
}
impl PostgresClient for ParallelPostgresClient {
fn join(&mut self) -> thread::Result<()> {
pub fn join(&mut self) -> thread::Result<()> {
self.exit_worker.store(true, Ordering::Relaxed);
while !self.workers.is_empty() {
let worker = self.workers.pop();
@ -404,24 +650,36 @@ impl PostgresClient for ParallelPostgresClient {
Ok(())
}
fn update_account<T: ReadableAccountInfo>(
pub fn update_account(
&mut self,
account: &T,
account: &ReplicaAccountInfo,
slot: u64,
is_startup: bool,
) -> Result<(), AccountsDbPluginError> {
if self.last_report.should_update(30000) {
datapoint_info!(
datapoint_debug!(
"postgres-plugin-stats",
("message-queue-length", self.sender.len() as i64, i64),
);
}
if let Err(err) = self
.sender
.send(DbWorkItem::UpdateAccount(UpdateAccountRequest {
account: DbAccountInfo::new(account),
slot,
}))
{
let mut measure = Measure::start("accountsdb-plugin-posgres-create-work-item");
let wrk_item = DbWorkItem::UpdateAccount(UpdateAccountRequest {
account: DbAccountInfo::new(account, slot),
is_startup,
});
measure.stop();
inc_new_counter_debug!(
"accountsdb-plugin-posgres-create-work-item-us",
measure.as_us() as usize,
100000,
100000
);
let mut measure = Measure::start("accountsdb-plugin-posgres-send-msg");
if let Err(err) = self.sender.send(wrk_item) {
return Err(AccountsDbPluginError::AccountsUpdateError {
msg: format!(
"Failed to update the account {:?}, error: {:?}",
@ -430,10 +688,19 @@ impl PostgresClient for ParallelPostgresClient {
),
});
}
measure.stop();
inc_new_counter_debug!(
"accountsdb-plugin-posgres-send-msg-us",
measure.as_us() as usize,
100000,
100000
);
Ok(())
}
fn update_slot_status(
pub fn update_slot_status(
&mut self,
slot: u64,
parent: Option<u64>,
@ -450,6 +717,30 @@ impl PostgresClient for ParallelPostgresClient {
}
Ok(())
}
pub fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> {
info!("Notifying the end of startup");
// Ensure all items in the queue has been received by the workers
while !self.sender.is_empty() {
sleep(Duration::from_millis(100));
}
self.is_startup_done.store(true, Ordering::Relaxed);
// Wait for all worker threads to be done with flushing
while self.startup_done_count.load(Ordering::Relaxed)
!= self.initialized_worker_count.load(Ordering::Relaxed)
{
info!(
"Startup done count: {}, good worker thread count: {}",
self.startup_done_count.load(Ordering::Relaxed),
self.initialized_worker_count.load(Ordering::Relaxed)
);
sleep(Duration::from_millis(100));
}
info!("Done with notifying the end of startup");
Ok(())
}
}
pub struct PostgresClientBuilder {}

View File

@ -400,6 +400,8 @@ pub enum LoadedAccountAccessor<'a> {
Cached(Option<(Pubkey, Cow<'a, CachedAccount>)>),
}
mod accountsdb_plugin_utils;
impl<'a> LoadedAccountAccessor<'a> {
fn check_and_get_loaded_account(&mut self) -> LoadedAccount {
// all of these following .expect() and .unwrap() are like serious logic errors,
@ -6193,15 +6195,7 @@ impl AccountsDb {
pub fn store_cached(&self, slot: Slot, accounts: &[(&Pubkey, &AccountSharedData)]) {
self.store(slot, accounts, self.caching_enabled);
if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
let notifier = &accounts_update_notifier.read().unwrap();
for account in accounts {
let pubkey = account.0;
let account = account.1;
notifier.notify_account_update(slot, pubkey, account);
}
}
self.notify_account_at_accounts_update(slot, accounts);
}
/// Store the account update.
@ -7067,24 +7061,6 @@ impl AccountsDb {
}
}
}
pub fn notify_account_restore_from_snapshot(&self) {
if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
let notifier = &accounts_update_notifier.read().unwrap();
let slots = self.storage.all_slots();
for slot in &slots {
let slot_stores = self.storage.get_slot_stores(*slot).unwrap();
let slot_stores = slot_stores.read().unwrap();
for (_, storage_entry) in slot_stores.iter() {
let accounts = storage_entry.all_accounts();
for account in &accounts {
notifier.notify_account_restore_from_snapshot(*slot, account);
}
}
}
}
}
}
#[cfg(test)]

View File

@ -0,0 +1,480 @@
use {
crate::{accounts_db::AccountsDb, append_vec::StoredAccountMeta},
solana_measure::measure::Measure,
solana_metrics::*,
solana_sdk::{account::AccountSharedData, clock::Slot, pubkey::Pubkey},
std::collections::{hash_map::Entry, HashMap, HashSet},
};
#[derive(Default)]
pub struct AccountsDbPluginNotifyAtSnapshotRestoreStats {
pub total_accounts: usize,
pub skipped_accounts: usize,
pub notified_accounts: usize,
pub elapsed_filtering_us: usize,
pub total_pure_notify: usize,
pub total_pure_bookeeping: usize,
pub elapsed_notifying_us: usize,
}
impl AccountsDbPluginNotifyAtSnapshotRestoreStats {
pub fn report(&self) {
datapoint_info!(
"accountsdb_plugin_notify_account_restore_from_snapshot_summary",
("total_accounts", self.total_accounts, i64),
("skipped_accounts", self.skipped_accounts, i64),
("notified_accounts", self.notified_accounts, i64),
("elapsed_filtering_us", self.elapsed_filtering_us, i64),
("elapsed_notifying_us", self.elapsed_notifying_us, i64),
("total_pure_notify_us", self.total_pure_notify, i64),
("total_pure_bookeeping_us", self.total_pure_bookeeping, i64),
);
}
}
impl AccountsDb {
/// Notify the plugins of of account data when AccountsDb is restored from a snapshot. The data is streamed
/// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated
/// multiple times only the last write (with highest write_version) is notified.
pub fn notify_account_restore_from_snapshot(&self) {
if self.accounts_update_notifier.is_none() {
return;
}
let mut slots = self.storage.all_slots();
let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
let mut notify_stats = AccountsDbPluginNotifyAtSnapshotRestoreStats::default();
slots.sort_by(|a, b| b.cmp(a));
for slot in slots {
self.notify_accounts_in_slot(slot, &mut notified_accounts, &mut notify_stats);
}
let accounts_update_notifier = self.accounts_update_notifier.as_ref().unwrap();
let notifier = &accounts_update_notifier.read().unwrap();
notifier.notify_end_of_restore_from_snapshot();
notify_stats.report();
}
pub fn notify_account_at_accounts_update(
&self,
slot: Slot,
accounts: &[(&Pubkey, &AccountSharedData)],
) {
if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
let notifier = &accounts_update_notifier.read().unwrap();
for account in accounts {
let pubkey = account.0;
let account = account.1;
notifier.notify_account_update(slot, pubkey, account);
}
}
}
fn notify_accounts_in_slot(
&self,
slot: Slot,
notified_accounts: &mut HashSet<Pubkey>,
notify_stats: &mut AccountsDbPluginNotifyAtSnapshotRestoreStats,
) {
let slot_stores = self.storage.get_slot_stores(slot).unwrap();
let slot_stores = slot_stores.read().unwrap();
let mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta> = HashMap::default();
let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts");
for (_, storage_entry) in slot_stores.iter() {
let mut accounts = storage_entry.all_accounts();
let account_len = accounts.len();
notify_stats.total_accounts += account_len;
accounts.drain(..).into_iter().for_each(|account| {
if notified_accounts.contains(&account.meta.pubkey) {
notify_stats.skipped_accounts += 1;
return;
}
match accounts_to_stream.entry(account.meta.pubkey) {
Entry::Occupied(mut entry) => {
let existing_account = entry.get();
if account.meta.write_version > existing_account.meta.write_version {
entry.insert(account);
} else {
notify_stats.skipped_accounts += 1;
}
}
Entry::Vacant(entry) => {
entry.insert(account);
}
}
});
}
measure_filter.stop();
notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;
self.notify_filtered_accounts(slot, notified_accounts, &accounts_to_stream, notify_stats);
}
fn notify_filtered_accounts(
&self,
slot: Slot,
notified_accounts: &mut HashSet<Pubkey>,
accounts_to_stream: &HashMap<Pubkey, StoredAccountMeta>,
notify_stats: &mut AccountsDbPluginNotifyAtSnapshotRestoreStats,
) {
let notifier = self
.accounts_update_notifier
.as_ref()
.unwrap()
.read()
.unwrap();
let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
for account in accounts_to_stream.values() {
let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
notifier.notify_account_restore_from_snapshot(slot, account);
measure_pure_notify.stop();
notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
notified_accounts.insert(account.meta.pubkey);
measure_bookkeep.stop();
notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
}
notify_stats.notified_accounts += accounts_to_stream.len();
measure_notify.stop();
notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
}
}
#[cfg(test)]
pub mod tests {
use {
crate::{
accounts_db::AccountsDb,
accounts_update_notifier_interface::{
AccountsUpdateNotifier, AccountsUpdateNotifierInterface,
},
append_vec::StoredAccountMeta,
},
dashmap::DashMap,
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::Slot,
pubkey::Pubkey,
},
std::sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
};
impl AccountsDb {
pub fn set_accountsdb_plugin_notifer(&mut self, notifier: Option<AccountsUpdateNotifier>) {
self.accounts_update_notifier = notifier;
}
}
#[derive(Debug, Default)]
struct AccountsDbTestPlugin {
pub accounts_at_snapshot_restore: DashMap<Pubkey, Vec<(Slot, AccountSharedData)>>,
pub is_startup_done: AtomicBool,
}
impl AccountsUpdateNotifierInterface for AccountsDbTestPlugin {
/// Notified when an account is updated at runtime, due to transaction activities
fn notify_account_update(&self, slot: Slot, pubkey: &Pubkey, account: &AccountSharedData) {
self.accounts_at_snapshot_restore
.entry(*pubkey)
.or_insert(Vec::default())
.push((slot, account.clone()));
}
/// Notified when the AccountsDb is initialized at start when restored
/// from a snapshot.
fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta) {
self.accounts_at_snapshot_restore
.entry(account.meta.pubkey)
.or_insert(Vec::default())
.push((slot, account.clone_account()));
}
/// Notified when a slot is optimistically confirmed
fn notify_slot_confirmed(&self, _slot: Slot, _parent: Option<Slot>) {}
/// Notified when a slot is marked frozen.
fn notify_slot_processed(&self, _slot: Slot, _parent: Option<Slot>) {}
/// Notified when a slot is rooted.
fn notify_slot_rooted(&self, _slot: Slot, _parent: Option<Slot>) {}
fn notify_end_of_restore_from_snapshot(&self) {
self.is_startup_done.store(true, Ordering::Relaxed);
}
}
#[test]
fn test_notify_account_restore_from_snapshot_once_per_slot() {
let mut accounts = AccountsDb::new_single_for_tests();
// Account with key1 is updated twice in the store -- should only get notified once.
let key1 = solana_sdk::pubkey::new_rand();
let mut account1_lamports: u64 = 1;
let account1 =
AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
let slot0 = 0;
accounts.store_uncached(slot0, &[(&key1, &account1)]);
account1_lamports = 2;
let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
accounts.store_uncached(slot0, &[(&key1, &account1)]);
let notifier = AccountsDbTestPlugin::default();
let key2 = solana_sdk::pubkey::new_rand();
let account2_lamports: u64 = 100;
let account2 =
AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
accounts.store_uncached(slot0, &[(&key2, &account2)]);
let notifier = Arc::new(RwLock::new(notifier));
accounts.set_accountsdb_plugin_notifer(Some(notifier.clone()));
accounts.notify_account_restore_from_snapshot();
let notifier = notifier.write().unwrap();
assert_eq!(
notifier
.accounts_at_snapshot_restore
.get(&key1)
.unwrap()
.len(),
1
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0]
.1
.lamports(),
account1_lamports
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0,
slot0
);
assert_eq!(
notifier
.accounts_at_snapshot_restore
.get(&key2)
.unwrap()
.len(),
1
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0]
.1
.lamports(),
account2_lamports
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0,
slot0
);
assert!(notifier.is_startup_done.load(Ordering::Relaxed));
}
#[test]
fn test_notify_account_restore_from_snapshot_once_across_slots() {
let mut accounts = AccountsDb::new_single_for_tests();
// Account with key1 is updated twice in two different slots -- should only get notified once.
// Account with key2 is updated slot0, should get notified once
// Account with key3 is updated in slot1, should get notified once
let key1 = solana_sdk::pubkey::new_rand();
let mut account1_lamports: u64 = 1;
let account1 =
AccountSharedData::new(account1_lamports, 1, AccountSharedData::default().owner());
let slot0 = 0;
accounts.store_uncached(slot0, &[(&key1, &account1)]);
let key2 = solana_sdk::pubkey::new_rand();
let account2_lamports: u64 = 200;
let account2 =
AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
accounts.store_uncached(slot0, &[(&key2, &account2)]);
account1_lamports = 2;
let slot1 = 1;
let account1 = AccountSharedData::new(account1_lamports, 1, account1.owner());
accounts.store_uncached(slot1, &[(&key1, &account1)]);
let notifier = AccountsDbTestPlugin::default();
let key3 = solana_sdk::pubkey::new_rand();
let account3_lamports: u64 = 300;
let account3 =
AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
accounts.store_uncached(slot1, &[(&key3, &account3)]);
let notifier = Arc::new(RwLock::new(notifier));
accounts.set_accountsdb_plugin_notifer(Some(notifier.clone()));
accounts.notify_account_restore_from_snapshot();
let notifier = notifier.write().unwrap();
assert_eq!(
notifier
.accounts_at_snapshot_restore
.get(&key1)
.unwrap()
.len(),
1
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0]
.1
.lamports(),
account1_lamports
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0,
slot1
);
assert_eq!(
notifier
.accounts_at_snapshot_restore
.get(&key2)
.unwrap()
.len(),
1
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0]
.1
.lamports(),
account2_lamports
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0,
slot0
);
assert_eq!(
notifier
.accounts_at_snapshot_restore
.get(&key3)
.unwrap()
.len(),
1
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0]
.1
.lamports(),
account3_lamports
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0].0,
slot1
);
assert!(notifier.is_startup_done.load(Ordering::Relaxed));
}
#[test]
fn test_notify_account_at_accounts_update() {
let mut accounts = AccountsDb::new_single_for_tests();
let notifier = AccountsDbTestPlugin::default();
let notifier = Arc::new(RwLock::new(notifier));
accounts.set_accountsdb_plugin_notifer(Some(notifier.clone()));
// Account with key1 is updated twice in two different slots -- should only get notified twice.
// Account with key2 is updated slot0, should get notified once
// Account with key3 is updated in slot1, should get notified once
let key1 = solana_sdk::pubkey::new_rand();
let account1_lamports1: u64 = 1;
let account1 =
AccountSharedData::new(account1_lamports1, 1, AccountSharedData::default().owner());
let slot0 = 0;
accounts.store_cached(slot0, &[(&key1, &account1)]);
let key2 = solana_sdk::pubkey::new_rand();
let account2_lamports: u64 = 200;
let account2 =
AccountSharedData::new(account2_lamports, 1, AccountSharedData::default().owner());
accounts.store_cached(slot0, &[(&key2, &account2)]);
let account1_lamports2 = 2;
let slot1 = 1;
let account1 = AccountSharedData::new(account1_lamports2, 1, account1.owner());
accounts.store_cached(slot1, &[(&key1, &account1)]);
let key3 = solana_sdk::pubkey::new_rand();
let account3_lamports: u64 = 300;
let account3 =
AccountSharedData::new(account3_lamports, 1, AccountSharedData::default().owner());
accounts.store_cached(slot1, &[(&key3, &account3)]);
let notifier = notifier.write().unwrap();
assert_eq!(
notifier
.accounts_at_snapshot_restore
.get(&key1)
.unwrap()
.len(),
2
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0]
.1
.lamports(),
account1_lamports1
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[0].0,
slot0
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[1]
.1
.lamports(),
account1_lamports2
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key1).unwrap()[1].0,
slot1
);
assert_eq!(
notifier
.accounts_at_snapshot_restore
.get(&key2)
.unwrap()
.len(),
1
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0]
.1
.lamports(),
account2_lamports
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key2).unwrap()[0].0,
slot0
);
assert_eq!(
notifier
.accounts_at_snapshot_restore
.get(&key3)
.unwrap()
.len(),
1
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0]
.1
.lamports(),
account3_lamports
);
assert_eq!(
notifier.accounts_at_snapshot_restore.get(&key3).unwrap()[0].0,
slot1
);
}
}

View File

@ -12,6 +12,9 @@ pub trait AccountsUpdateNotifierInterface: std::fmt::Debug {
/// from a snapshot.
fn notify_account_restore_from_snapshot(&self, slot: Slot, account: &StoredAccountMeta);
/// Notified when all accounts have been notified when restoring from a snapshot.
fn notify_end_of_restore_from_snapshot(&self);
/// Notified when a slot is optimistically confirmed
fn notify_slot_confirmed(&self, slot: Slot, parent: Option<Slot>);

View File

@ -44,6 +44,7 @@ use {
atomic::{AtomicUsize, Ordering},
Arc, RwLock,
},
thread::Builder,
},
};
@ -534,11 +535,22 @@ where
accounts_db
.write_version
.fetch_add(snapshot_version, Ordering::Relaxed);
let mut measure_notify = Measure::start("accounts_notify");
let accounts_db = Arc::new(accounts_db);
let accoounts_db_clone = accounts_db.clone();
let handle = Builder::new()
.name("notify_account_restore_from_snapshot".to_string())
.spawn(move || {
accoounts_db_clone.notify_account_restore_from_snapshot();
})
.unwrap();
accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index);
accounts_db.maybe_add_filler_accounts(genesis_config.ticks_per_slot());
let mut measure_notify = Measure::start("accounts_notify");
accounts_db.notify_account_restore_from_snapshot();
handle.join().unwrap();
measure_notify.stop();
datapoint_info!(
@ -552,5 +564,5 @@ where
("accountsdb-notify-at-start-us", measure_notify.as_us(), i64),
);
Ok(accounts_db)
Ok(Arc::try_unwrap(accounts_db).unwrap())
}