parallel writes
This commit is contained in:
parent
65ce71cc2d
commit
7a1bc411b7
|
@ -17,6 +17,7 @@ use prometheus::{opts, register_int_gauge, IntGauge};
|
|||
use serde::Serialize;
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::pubkey::Pubkey;
|
||||
use solana_sdk::signature::Signature;
|
||||
use solana_sdk::transaction::TransactionError;
|
||||
use tokio::sync::mpsc::error::SendTimeoutError;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
@ -936,34 +937,26 @@ impl PostgresSession {
|
|||
// 750ms
|
||||
let _span = tracing::info_span!("save_block", slot = block_info.slot);
|
||||
let instant = Instant::now();
|
||||
let signatures = {
|
||||
let fut_signatures = async {
|
||||
// .3ms
|
||||
let _span = tracing::debug_span!("map_signatures", slot = block_info.slot);
|
||||
block_info
|
||||
.transactions
|
||||
.iter()
|
||||
.map(|transaction| transaction.signature.clone())
|
||||
.collect()
|
||||
.collect_vec()
|
||||
};
|
||||
let accounts = {
|
||||
let fut_accounts = async {
|
||||
// .6ms
|
||||
let _span = tracing::debug_span!("map_accounts", slot = block_info.slot);
|
||||
block_info
|
||||
.heavily_locked_accounts
|
||||
.iter()
|
||||
.map(|acc| acc.key.clone())
|
||||
.collect()
|
||||
.collect::<HashSet<String>>()
|
||||
};
|
||||
|
||||
let both_started_at = Instant::now();
|
||||
let create_tx_ids = self.create_transaction_ids(signatures, slot);
|
||||
let create_accs = self.create_accounts_for_transaction(accounts, slot);
|
||||
try_join!(create_tx_ids, create_accs)?;
|
||||
TIME_TO_SAVE_TRANSACTION.set(both_started_at.elapsed().as_millis() as i64);
|
||||
ACCOUNT_SAVE_TIME.set(both_started_at.elapsed().as_millis() as i64);
|
||||
|
||||
let instant_acc_tx: Instant = Instant::now();
|
||||
let txs_accounts = {
|
||||
let fut_txs_accounts = async {
|
||||
// 90ms
|
||||
let _span = tracing::debug_span!("map_txs_accounts", slot = block_info.slot);
|
||||
block_info
|
||||
|
@ -980,28 +973,37 @@ impl PostgresSession {
|
|||
is_signer: acc.is_signer,
|
||||
is_atl: acc.is_alt,
|
||||
})
|
||||
.collect(),
|
||||
.collect_vec(),
|
||||
})
|
||||
.collect_vec()
|
||||
.collect()
|
||||
};
|
||||
|
||||
if let Err(e) = self.insert_accounts_for_transaction(txs_accounts, slot).await {
|
||||
error!("Error inserting accounts for transactions : {e:?}");
|
||||
}
|
||||
TIME_TO_STORE_TX_ACCOUNT.set(instant_acc_tx.elapsed().as_millis() as i64);
|
||||
let (signatures, accounts, txs_accounts) = join!(fut_signatures, fut_accounts, fut_txs_accounts);
|
||||
|
||||
// insert transactions
|
||||
let instant_save_tx = Instant::now();
|
||||
self.insert_transactions_for_block(&block_info.transactions, slot)
|
||||
.await?;
|
||||
TIME_TO_SAVE_TRANSACTION_DATA.set(instant_save_tx.elapsed().as_millis() as i64);
|
||||
let both_started_at = Instant::now();
|
||||
let fut_create_tx_ids = self.create_transaction_ids(signatures, slot);
|
||||
let fut_create_accs = self.create_accounts_for_transaction(accounts, slot);
|
||||
try_join!(fut_create_tx_ids, fut_create_accs)?;
|
||||
TIME_TO_SAVE_TRANSACTION.set(both_started_at.elapsed().as_millis() as i64);
|
||||
ACCOUNT_SAVE_TIME.set(both_started_at.elapsed().as_millis() as i64);
|
||||
|
||||
let both_started_at = Instant::now();
|
||||
// depends on transactions and accounts mapping tables
|
||||
let fut_insert_accounts_for_transaction = self.insert_accounts_for_transaction(txs_accounts, slot);
|
||||
// depends on transactions mapping table
|
||||
let fut_insert_transactions_for_block = self.insert_transactions_for_block(&block_info.transactions, slot);
|
||||
let ((), ()) = try_join!(fut_insert_accounts_for_transaction, fut_insert_transactions_for_block)?;
|
||||
TIME_TO_STORE_TX_ACCOUNT.set(both_started_at.elapsed().as_millis() as i64);
|
||||
TIME_TO_SAVE_TRANSACTION_DATA.set(both_started_at.elapsed().as_millis() as i64);
|
||||
|
||||
// save account usage in blocks
|
||||
let ins = Instant::now();
|
||||
// depends on accounts mapping table
|
||||
self.save_account_usage_in_block(&block_info).await?;
|
||||
TIME_TO_SAVE_BLOCK_ACCOUNTS.set(ins.elapsed().as_millis() as i64);
|
||||
|
||||
let inst_block_info = Instant::now();
|
||||
// no dependencies
|
||||
self.save_block_info(&block_info).await?;
|
||||
BLOCK_INFO_SAVE_TIME.set(inst_block_info.elapsed().as_millis() as i64);
|
||||
|
||||
|
|
Loading…
Reference in New Issue