From 5b6027bef0430047a4b2b7ab26b1366a798abd32 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Mon, 3 Jan 2022 16:10:44 -0800 Subject: [PATCH] Fixed issue #22124 -- missing historical data if slot updated later. (#22193) * Fixed issue #22124 -- missing historical data if slot updated later. * Fixed a couple of comments --- .../src/accountsdb_plugin_postgres.rs | 6 +- .../src/postgres_client.rs | 90 +++++++++++++++++-- .../developing/plugins/accountsdb_plugin.md | 11 ++- 3 files changed, 95 insertions(+), 12 deletions(-) diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs index ede9b462e..45bb441bd 100644 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -41,6 +41,8 @@ pub struct AccountsDbPluginPostgresConfig { pub threads: Option, pub batch_size: Option, pub panic_on_db_errors: Option, + /// Indicates if to store historical data for accounts + pub store_account_historical_data: Option, } #[derive(Error, Debug)] @@ -74,7 +76,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { /// Accounts either satisyfing the accounts condition or owners condition will be selected. /// When only owners is specified, /// all accounts belonging to the owners will be streamed. - /// The accounts field support wildcard to select all accounts: + /// The accounts field supports wildcard to select all accounts: /// "accounts_selector" : { /// "accounts" : \["*"\], /// } @@ -85,6 +87,8 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { /// Please refer to https://docs.rs/postgres/0.19.2/postgres/config/struct.Config.html for the connection configuration. /// When `connection_str` is set, the values in "host", "user" and "port" are ignored. If `connection_str` is not given, /// `host` and `user` must be given. + /// "store_account_historical_data", optional, set it to 'true', to store historical account data to account_audit + /// table. /// * "threads" optional, specifies the number of worker threads for the plugin. A thread /// maintains a PostgreSQL connection to the server. The default is '10'. /// * "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created diff --git a/accountsdb-plugin-postgres/src/postgres_client.rs b/accountsdb-plugin-postgres/src/postgres_client.rs index a79ab91b5..c151c5c33 100644 --- a/accountsdb-plugin-postgres/src/postgres_client.rs +++ b/accountsdb-plugin-postgres/src/postgres_client.rs @@ -39,6 +39,7 @@ const DEFAULT_THREADS_COUNT: usize = 100; const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10; const ACCOUNT_COLUMN_COUNT: usize = 9; const DEFAULT_PANIC_ON_DB_ERROR: bool = false; +const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false; struct PostgresSqlClientWrapper { client: Client, @@ -48,6 +49,7 @@ struct PostgresSqlClientWrapper { update_slot_without_parent_stmt: Statement, update_transaction_log_stmt: Statement, update_block_metadata_stmt: Statement, + insert_account_audit_stmt: Option, } pub struct SimplePostgresClient { @@ -324,6 +326,28 @@ impl SimplePostgresClient { } } + fn build_account_audit_insert_statement( + client: &mut Client, + config: &AccountsDbPluginPostgresConfig, + ) -> Result { + let stmt = "INSERT INTO account_audit (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"; + + let stmt = client.prepare(stmt); + + match stmt { + Err(err) => { + return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { + msg: format!( + "Error in preparing for the account_audit update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}", + err, config.host, config.user, config + ), + }))); + } + Ok(stmt) => Ok(stmt), + } + } + fn build_slot_upsert_statement_with_parent( client: &mut Client, config: &AccountsDbPluginPostgresConfig, @@ -370,8 +394,8 @@ impl SimplePostgresClient { } } - /// Internal function for updating or inserting a single account - fn upsert_account_internal( + /// Internal function for inserting an account into account_audit table. + fn insert_account_audit( account: &DbAccountInfo, statement: &Statement, client: &mut Client, @@ -379,7 +403,43 @@ impl SimplePostgresClient { let lamports = account.lamports() as i64; let rent_epoch = account.rent_epoch() as i64; let updated_on = Utc::now().naive_utc(); - let result = client.query( + let result = client.execute( + statement, + &[ + &account.pubkey(), + &account.slot, + &account.owner(), + &lamports, + &account.executable(), + &rent_epoch, + &account.data(), + &account.write_version(), + &updated_on, + ], + ); + + if let Err(err) = result { + let msg = format!( + "Failed to persist the insert of account_audit to the PostgreSQL database. Error: {:?}", + err + ); + error!("{}", msg); + return Err(AccountsDbPluginError::AccountsUpdateError { msg }); + } + Ok(()) + } + + /// Internal function for updating or inserting a single account + fn upsert_account_internal( + account: &DbAccountInfo, + statement: &Statement, + client: &mut Client, + insert_account_audit_stmt: &Option, + ) -> Result<(), AccountsDbPluginError> { + let lamports = account.lamports() as i64; + let rent_epoch = account.rent_epoch() as i64; + let updated_on = Utc::now().naive_utc(); + let result = client.execute( statement, &[ &account.pubkey(), @@ -401,6 +461,11 @@ impl SimplePostgresClient { ); error!("{}", msg); return Err(AccountsDbPluginError::AccountsUpdateError { msg }); + } else if result.unwrap() == 0 && insert_account_audit_stmt.is_some() { + // If no records modified (inserted or updated), it is because the account is updated + // at an older slot, insert the record directly into the account_audit table. + let statement = insert_account_audit_stmt.as_ref().unwrap(); + Self::insert_account_audit(account, statement, client)?; } Ok(()) @@ -409,9 +474,10 @@ impl SimplePostgresClient { /// Update or insert a single account fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> { let client = self.client.get_mut().unwrap(); + let insert_account_audit_stmt = &client.insert_account_audit_stmt; let statement = &client.update_account_stmt; let client = &mut client.client; - Self::upsert_account_internal(account, statement, client) + Self::upsert_account_internal(account, statement, client, insert_account_audit_stmt) } /// Insert accounts in batch to reduce network overhead @@ -487,11 +553,12 @@ impl SimplePostgresClient { } let client = self.client.get_mut().unwrap(); + let insert_account_audit_stmt = &client.insert_account_audit_stmt; let statement = &client.update_account_stmt; let client = &mut client.client; for account in self.pending_account_updates.drain(..) { - Self::upsert_account_internal(&account, statement, client)?; + Self::upsert_account_internal(&account, statement, client, insert_account_audit_stmt)?; } Ok(()) @@ -516,6 +583,18 @@ impl SimplePostgresClient { let batch_size = config .batch_size .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE); + + let store_account_historical_data = config + .store_account_historical_data + .unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA); + + let insert_account_audit_stmt = if store_account_historical_data { + let stmt = Self::build_account_audit_insert_statement(&mut client, config)?; + Some(stmt) + } else { + None + }; + info!("Created SimplePostgresClient."); Ok(Self { batch_size, @@ -528,6 +607,7 @@ impl SimplePostgresClient { update_slot_without_parent_stmt, update_transaction_log_stmt, update_block_metadata_stmt, + insert_account_audit_stmt, }), }) } diff --git a/docs/src/developing/plugins/accountsdb_plugin.md b/docs/src/developing/plugins/accountsdb_plugin.md index 124d3551b..0a5e51d36 100644 --- a/docs/src/developing/plugins/accountsdb_plugin.md +++ b/docs/src/developing/plugins/accountsdb_plugin.md @@ -381,8 +381,11 @@ psql -U solana -p 5433 -h 10.138.0.9 -w -d solana -f drop_schema.sql ### Capture Historical Account Data -The account historical data is captured using a database trigger as shown in -`create_schema.sql`, +To capture account historical data, in the configuration file, turn +`store_account_historical_data` to true. + +And ensure the database trigger is created to save data in the `audit_table` when +records in `account` are updated, as shown in `create_schema.sql`, ``` CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$ @@ -399,11 +402,8 @@ CREATE TRIGGER account_update_trigger AFTER UPDATE OR DELETE ON account FOR EACH ROW EXECUTE PROCEDURE audit_account_update(); ``` -The historical data is stored in the account_audit table. - The trigger can be dropped to disable this feature, for example, - ``` DROP TRIGGER account_update_trigger ON account; ``` @@ -411,7 +411,6 @@ DROP TRIGGER account_update_trigger ON account; Over time, the account_audit can accumulate large amount of data. You may choose to limit that by deleting older historical data. - For example, the following SQL statement can be used to keep up to 1000 of the most recent records for an account: