Fixed issue #22124 -- missing historical data if slot updated later. (#22193)

* Fixed issue #22124 -- missing historical data if slot updated later.

* Fixed a couple of comments
This commit is contained in:
Lijun Wang 2022-01-03 16:10:44 -08:00 committed by GitHub
parent ed0b47c6f8
commit 5b6027bef0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 12 deletions

View File

@ -41,6 +41,8 @@ pub struct AccountsDbPluginPostgresConfig {
pub threads: Option<usize>,
pub batch_size: Option<usize>,
pub panic_on_db_errors: Option<bool>,
/// Indicates if to store historical data for accounts
pub store_account_historical_data: Option<bool>,
}
#[derive(Error, Debug)]
@ -74,7 +76,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
/// Accounts either satisyfing the accounts condition or owners condition will be selected.
/// When only owners is specified,
/// all accounts belonging to the owners will be streamed.
/// The accounts field support wildcard to select all accounts:
/// The accounts field supports wildcard to select all accounts:
/// "accounts_selector" : {
/// "accounts" : \["*"\],
/// }
@ -85,6 +87,8 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres {
/// Please refer to https://docs.rs/postgres/0.19.2/postgres/config/struct.Config.html for the connection configuration.
/// When `connection_str` is set, the values in "host", "user" and "port" are ignored. If `connection_str` is not given,
/// `host` and `user` must be given.
/// "store_account_historical_data", optional, set it to 'true', to store historical account data to account_audit
/// table.
/// * "threads" optional, specifies the number of worker threads for the plugin. A thread
/// maintains a PostgreSQL connection to the server. The default is '10'.
/// * "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created

View File

@ -39,6 +39,7 @@ const DEFAULT_THREADS_COUNT: usize = 100;
const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10;
const ACCOUNT_COLUMN_COUNT: usize = 9;
const DEFAULT_PANIC_ON_DB_ERROR: bool = false;
const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false;
struct PostgresSqlClientWrapper {
client: Client,
@ -48,6 +49,7 @@ struct PostgresSqlClientWrapper {
update_slot_without_parent_stmt: Statement,
update_transaction_log_stmt: Statement,
update_block_metadata_stmt: Statement,
insert_account_audit_stmt: Option<Statement>,
}
pub struct SimplePostgresClient {
@ -324,6 +326,28 @@ impl SimplePostgresClient {
}
}
fn build_account_audit_insert_statement(
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
) -> Result<Statement, AccountsDbPluginError> {
let stmt = "INSERT INTO account_audit (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)";
let stmt = client.prepare(stmt);
match stmt {
Err(err) => {
return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError {
msg: format!(
"Error in preparing for the account_audit update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}",
err, config.host, config.user, config
),
})));
}
Ok(stmt) => Ok(stmt),
}
}
fn build_slot_upsert_statement_with_parent(
client: &mut Client,
config: &AccountsDbPluginPostgresConfig,
@ -370,8 +394,8 @@ impl SimplePostgresClient {
}
}
/// Internal function for updating or inserting a single account
fn upsert_account_internal(
/// Internal function for inserting an account into account_audit table.
fn insert_account_audit(
account: &DbAccountInfo,
statement: &Statement,
client: &mut Client,
@ -379,7 +403,43 @@ impl SimplePostgresClient {
let lamports = account.lamports() as i64;
let rent_epoch = account.rent_epoch() as i64;
let updated_on = Utc::now().naive_utc();
let result = client.query(
let result = client.execute(
statement,
&[
&account.pubkey(),
&account.slot,
&account.owner(),
&lamports,
&account.executable(),
&rent_epoch,
&account.data(),
&account.write_version(),
&updated_on,
],
);
if let Err(err) = result {
let msg = format!(
"Failed to persist the insert of account_audit to the PostgreSQL database. Error: {:?}",
err
);
error!("{}", msg);
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
}
Ok(())
}
/// Internal function for updating or inserting a single account
fn upsert_account_internal(
account: &DbAccountInfo,
statement: &Statement,
client: &mut Client,
insert_account_audit_stmt: &Option<Statement>,
) -> Result<(), AccountsDbPluginError> {
let lamports = account.lamports() as i64;
let rent_epoch = account.rent_epoch() as i64;
let updated_on = Utc::now().naive_utc();
let result = client.execute(
statement,
&[
&account.pubkey(),
@ -401,6 +461,11 @@ impl SimplePostgresClient {
);
error!("{}", msg);
return Err(AccountsDbPluginError::AccountsUpdateError { msg });
} else if result.unwrap() == 0 && insert_account_audit_stmt.is_some() {
// If no records modified (inserted or updated), it is because the account is updated
// at an older slot, insert the record directly into the account_audit table.
let statement = insert_account_audit_stmt.as_ref().unwrap();
Self::insert_account_audit(account, statement, client)?;
}
Ok(())
@ -409,9 +474,10 @@ impl SimplePostgresClient {
/// Update or insert a single account
fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> {
let client = self.client.get_mut().unwrap();
let insert_account_audit_stmt = &client.insert_account_audit_stmt;
let statement = &client.update_account_stmt;
let client = &mut client.client;
Self::upsert_account_internal(account, statement, client)
Self::upsert_account_internal(account, statement, client, insert_account_audit_stmt)
}
/// Insert accounts in batch to reduce network overhead
@ -487,11 +553,12 @@ impl SimplePostgresClient {
}
let client = self.client.get_mut().unwrap();
let insert_account_audit_stmt = &client.insert_account_audit_stmt;
let statement = &client.update_account_stmt;
let client = &mut client.client;
for account in self.pending_account_updates.drain(..) {
Self::upsert_account_internal(&account, statement, client)?;
Self::upsert_account_internal(&account, statement, client, insert_account_audit_stmt)?;
}
Ok(())
@ -516,6 +583,18 @@ impl SimplePostgresClient {
let batch_size = config
.batch_size
.unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE);
let store_account_historical_data = config
.store_account_historical_data
.unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA);
let insert_account_audit_stmt = if store_account_historical_data {
let stmt = Self::build_account_audit_insert_statement(&mut client, config)?;
Some(stmt)
} else {
None
};
info!("Created SimplePostgresClient.");
Ok(Self {
batch_size,
@ -528,6 +607,7 @@ impl SimplePostgresClient {
update_slot_without_parent_stmt,
update_transaction_log_stmt,
update_block_metadata_stmt,
insert_account_audit_stmt,
}),
})
}

View File

@ -381,8 +381,11 @@ psql -U solana -p 5433 -h 10.138.0.9 -w -d solana -f drop_schema.sql
### Capture Historical Account Data
The account historical data is captured using a database trigger as shown in
`create_schema.sql`,
To capture account historical data, in the configuration file, turn
`store_account_historical_data` to true.
And ensure the database trigger is created to save data in the `audit_table` when
records in `account` are updated, as shown in `create_schema.sql`,
```
CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$
@ -399,11 +402,8 @@ CREATE TRIGGER account_update_trigger AFTER UPDATE OR DELETE ON account
FOR EACH ROW EXECUTE PROCEDURE audit_account_update();
```
The historical data is stored in the account_audit table.
The trigger can be dropped to disable this feature, for example,
```
DROP TRIGGER account_update_trigger ON account;
```
@ -411,7 +411,6 @@ DROP TRIGGER account_update_trigger ON account;
Over time, the account_audit can accumulate large amount of data. You may choose to
limit that by deleting older historical data.
For example, the following SQL statement can be used to keep up to 1000 of the most
recent records for an account: