From a6ba5a9a05d2c222c1fe5c93d6a3e2731bff04a2 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Thu, 23 Jun 2022 13:37:38 -0600 Subject: [PATCH] Add transaction index in slot to geyser plugin TransactionInfo (#25688) * Define shuffle to prep using same shuffle for multiple slices * Determine transaction indexes and plumb to execute_batch * Pair transaction_index with transaction in TransactionStatusService * Add new ReplicaTransactionInfoVersion * Plumb transaction_indexes through BankingStage * Prepare BankingStage to receive transaction indexes from PohRecorder * Determine transaction indexes in PohRecorder; add field to WorkingBank * Add PohRecorder::record unit test * Only pass starting_transaction_index around PohRecorder * Add helper structs to simplify test DashMap * Pass entry and starting-index into process_entries_with_callback together * Add tx-index checks to test_rebatch_transactions * Revert shuffle definition and use zip/unzip * Only zip/unzip if randomize * Add confirm_slot_entries test * Review nits * Add type alias to make sender docs more clear --- banking-bench/src/main.rs | 4 +- core/benches/banking_stage.rs | 2 +- core/src/banking_stage.rs | 45 ++- core/src/replay_stage.rs | 7 +- .../src/geyser_plugin_interface.rs | 20 ++ .../src/transaction_notifier.rs | 19 +- ledger/src/blockstore_processor.rs | 257 ++++++++++++++++-- poh/src/poh_recorder.rs | 131 +++++++-- poh/src/poh_service.rs | 2 +- rpc/src/transaction_notifier_interface.rs | 1 + rpc/src/transaction_status_service.rs | 46 +++- 11 files changed, 465 insertions(+), 69 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index bad140ac4f..ea0438c36a 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -358,7 +358,7 @@ fn main() { DEFAULT_TPU_CONNECTION_POOL_SIZE, )), ); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); // This is so that the signal_receiver does not go out of scope after the closure. // If it is dropped before poh_service, then poh_service will error when @@ -439,7 +439,7 @@ fn main() { std::u64::MAX, ); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); assert!(poh_recorder.lock().unwrap().bank().is_some()); if bank.slot() > 32 { leader_schedule_cache.set_root(&bank); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 90da9e0cba..fe788dca2c 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -233,7 +233,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { Arc::new(RwLock::new(CostModel::default())), Arc::new(ConnectionCache::default()), ); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let chunk_len = verified.len() / CHUNKS; let mut start = 0; diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 41d3e331a5..7cde63c052 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -110,6 +110,8 @@ struct RecordTransactionsSummary { record_transactions_timings: RecordTransactionsTimings, // Result of trying to record the transactions into the PoH stream result: Result<(), PohRecorderError>, + // Index in the slot of the first transaction recorded + starting_transaction_index: Option, } #[derive(Clone, Debug, PartialEq, Eq)] @@ -1203,6 +1205,7 @@ impl BankingStage { recorder: &TransactionRecorder, ) -> RecordTransactionsSummary { let mut record_transactions_timings = RecordTransactionsTimings::default(); + let mut starting_transaction_index = None; if !transactions.is_empty() { let num_to_record = transactions.len(); @@ -1217,7 +1220,9 @@ impl BankingStage { record_transactions_timings.poh_record_us = poh_record_time.as_us(); match res { - Ok(()) => (), + Ok(starting_index) => { + starting_transaction_index = starting_index; + } Err(PohRecorderError::MaxHeightReached) => { inc_new_counter_info!("banking_stage-max_height_reached", 1); inc_new_counter_info!( @@ -1227,6 +1232,7 @@ impl BankingStage { return RecordTransactionsSummary { record_transactions_timings, result: Err(PohRecorderError::MaxHeightReached), + starting_transaction_index: None, }; } Err(e) => panic!("Poh recorder returned unexpected error: {:?}", e), @@ -1236,6 +1242,7 @@ impl BankingStage { RecordTransactionsSummary { record_transactions_timings, result: Ok(()), + starting_transaction_index, } } @@ -1328,6 +1335,7 @@ impl BankingStage { let RecordTransactionsSummary { result: record_transactions_result, record_transactions_timings, + starting_transaction_index, } = record_transactions_summary; execute_and_commit_timings.record_transactions_timings = RecordTransactionsTimings { execution_results_to_transactions_us: execution_results_to_transactions_time.as_us(), @@ -1406,6 +1414,20 @@ impl BankingStage { let post_balances = bank.collect_balances(batch); let post_token_balances = collect_token_balances(bank, batch, &mut mint_decimals); + let mut transaction_index = starting_transaction_index.unwrap_or_default(); + let batch_transaction_indexes: Vec<_> = tx_results + .execution_results + .iter() + .map(|result| { + if result.was_executed() { + let this_transaction_index = transaction_index; + saturating_add_assign!(transaction_index, 1); + this_transaction_index + } else { + 0 + } + }) + .collect(); transaction_status_sender.send_transaction_status_batch( bank.clone(), txs, @@ -1416,6 +1438,7 @@ impl BankingStage { post_token_balances, ), tx_results.rent_debits, + batch_transaction_indexes, ); } }, @@ -2692,7 +2715,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let pubkey = solana_sdk::pubkey::new_rand(); let keypair2 = Keypair::new(); let pubkey2 = solana_sdk::pubkey::new_rand(); @@ -2914,7 +2937,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let process_transactions_batch_output = BankingStage::process_and_record_transactions( @@ -3049,7 +3072,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let process_transactions_batch_output = BankingStage::process_and_record_transactions( @@ -3122,7 +3145,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1); @@ -3272,7 +3295,7 @@ mod tests { let recorder = poh_recorder.recorder(); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); @@ -3472,7 +3495,7 @@ mod tests { let recorder = poh_recorder.recorder(); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let poh_simulator = simulate_poh(record_receiver, &poh_recorder); @@ -3679,7 +3702,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0); blockstore.insert_shreds(shreds, None, false).unwrap(); @@ -3840,7 +3863,7 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0); blockstore.insert_shreds(shreds, None, false).unwrap(); @@ -3997,7 +4020,7 @@ mod tests { // Processes one packet per iteration of the loop let num_packets_to_process_per_iteration = num_conflicting_transactions; for num_expected_unprocessed in (0..num_conflicting_transactions).rev() { - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); BankingStage::consume_buffered_packets( &Pubkey::default(), max_tx_processing_ns, @@ -4046,7 +4069,7 @@ mod tests { // each iteration of this loop will process one element of the batch per iteration of the // loop. let interrupted_iteration = 1; - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); let poh_recorder_ = poh_recorder.clone(); let recorder = poh_recorder_.lock().unwrap().recorder(); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 30c1f356ea..5d69896cba 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -858,6 +858,7 @@ impl ReplayStage { &retransmit_slots_sender, &mut skipped_slots_info, has_new_vote_been_rooted, + transaction_status_sender.is_some(), ); let poh_bank = poh_recorder.lock().unwrap().bank(); @@ -1544,6 +1545,7 @@ impl ReplayStage { retransmit_slots_sender: &RetransmitSlotsSender, skipped_slots_info: &mut SkippedSlotsInfo, has_new_vote_been_rooted: bool, + track_transaction_indexes: bool, ) { // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -1659,7 +1661,10 @@ impl ReplayStage { ); let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank); - poh_recorder.lock().unwrap().set_bank(&tpu_bank); + poh_recorder + .lock() + .unwrap() + .set_bank(&tpu_bank, track_transaction_indexes); } else { error!("{} No next leader found", my_pubkey); } diff --git a/geyser-plugin-interface/src/geyser_plugin_interface.rs b/geyser-plugin-interface/src/geyser_plugin_interface.rs index 8dd9bdc54d..4b49701b7f 100644 --- a/geyser-plugin-interface/src/geyser_plugin_interface.rs +++ b/geyser-plugin-interface/src/geyser_plugin_interface.rs @@ -96,12 +96,32 @@ pub struct ReplicaTransactionInfo<'a> { pub transaction_status_meta: &'a TransactionStatusMeta, } +/// Information about a transaction, including index in block +#[derive(Clone, Debug)] +pub struct ReplicaTransactionInfoV2<'a> { + /// The first signature of the transaction, used for identifying the transaction. + pub signature: &'a Signature, + + /// Indicates if the transaction is a simple vote transaction. + pub is_vote: bool, + + /// The sanitized transaction. + pub transaction: &'a SanitizedTransaction, + + /// Metadata of the transaction status. + pub transaction_status_meta: &'a TransactionStatusMeta, + + /// The transaction's index in the block + pub index: usize, +} + /// A wrapper to future-proof ReplicaTransactionInfo handling. /// If there were a change to the structure of ReplicaTransactionInfo, /// there would be new enum entry for the newer version, forcing /// plugin implementations to handle the change. pub enum ReplicaTransactionInfoVersions<'a> { V0_0_1(&'a ReplicaTransactionInfo<'a>), + V0_0_2(&'a ReplicaTransactionInfoV2<'a>), } #[derive(Clone, Debug)] diff --git a/geyser-plugin-manager/src/transaction_notifier.rs b/geyser-plugin-manager/src/transaction_notifier.rs index e30b728465..6fff1cd9f9 100644 --- a/geyser-plugin-manager/src/transaction_notifier.rs +++ b/geyser-plugin-manager/src/transaction_notifier.rs @@ -3,7 +3,7 @@ use { crate::geyser_plugin_manager::GeyserPluginManager, log::*, solana_geyser_plugin_interface::geyser_plugin_interface::{ - ReplicaTransactionInfo, ReplicaTransactionInfoVersions, + ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions, }, solana_measure::measure::Measure, solana_metrics::*, @@ -25,13 +25,18 @@ impl TransactionNotifier for TransactionNotifierImpl { fn notify_transaction( &self, slot: Slot, + index: usize, signature: &Signature, transaction_status_meta: &TransactionStatusMeta, transaction: &SanitizedTransaction, ) { let mut measure = Measure::start("geyser-plugin-notify_plugins_of_transaction_info"); - let transaction_log_info = - Self::build_replica_transaction_info(signature, transaction_status_meta, transaction); + let transaction_log_info = Self::build_replica_transaction_info( + index, + signature, + transaction_status_meta, + transaction, + ); let mut plugin_manager = self.plugin_manager.write().unwrap(); @@ -44,7 +49,7 @@ impl TransactionNotifier for TransactionNotifierImpl { continue; } match plugin.notify_transaction( - ReplicaTransactionInfoVersions::V0_0_1(&transaction_log_info), + ReplicaTransactionInfoVersions::V0_0_2(&transaction_log_info), slot, ) { Err(err) => { @@ -78,11 +83,13 @@ impl TransactionNotifierImpl { } fn build_replica_transaction_info<'a>( + index: usize, signature: &'a Signature, transaction_status_meta: &'a TransactionStatusMeta, transaction: &'a SanitizedTransaction, - ) -> ReplicaTransactionInfo<'a> { - ReplicaTransactionInfo { + ) -> ReplicaTransactionInfoV2<'a> { + ReplicaTransactionInfoV2 { + index, signature, is_vote: transaction.is_simple_vote_transaction(), transaction, diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index f4c51c16cd..61be723952 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -93,6 +93,16 @@ impl BlockCostCapacityMeter { } } +struct TransactionBatchWithIndexes<'a, 'b> { + pub batch: TransactionBatch<'a, 'b>, + pub transaction_indexes: Vec, +} + +struct ReplayEntry { + entry: EntryType, + starting_index: usize, +} + // get_max_thread_count to match number of threads in the old code. // see: https://github.com/solana-labs/solana/pull/24853 lazy_static! { @@ -157,7 +167,7 @@ fn aggregate_total_execution_units(execute_timings: &ExecuteTimings) -> u64 { } fn execute_batch( - batch: &TransactionBatch, + batch: &TransactionBatchWithIndexes, bank: &Arc, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, @@ -165,6 +175,10 @@ fn execute_batch( cost_capacity_meter: Arc>, tx_cost: u64, ) -> Result<()> { + let TransactionBatchWithIndexes { + batch, + transaction_indexes, + } = batch; let record_token_balances = transaction_status_sender.is_some(); let mut mint_decimals: HashMap = HashMap::new(); @@ -244,6 +258,7 @@ fn execute_batch( balances, token_balances, rent_debits, + transaction_indexes.to_vec(), ); } @@ -253,7 +268,7 @@ fn execute_batch( fn execute_batches_internal( bank: &Arc, - batches: &[TransactionBatch], + batches: &[TransactionBatchWithIndexes], entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, @@ -300,18 +315,23 @@ fn rebatch_transactions<'a>( sanitized_txs: &'a [SanitizedTransaction], start: usize, end: usize, -) -> TransactionBatch<'a, 'a> { + transaction_indexes: &'a [usize], +) -> TransactionBatchWithIndexes<'a, 'a> { let txs = &sanitized_txs[start..=end]; let results = &lock_results[start..=end]; let mut tx_batch = TransactionBatch::new(results.to_vec(), bank, Cow::from(txs)); tx_batch.set_needs_unlock(false); - tx_batch + let transaction_indexes = transaction_indexes[start..=end].to_vec(); + TransactionBatchWithIndexes { + batch: tx_batch, + transaction_indexes, + } } fn execute_batches( bank: &Arc, - batches: &[TransactionBatch], + batches: &[TransactionBatchWithIndexes], entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option<&TransactionStatusSender>, replay_vote_sender: Option<&ReplayVoteSender>, @@ -319,14 +339,16 @@ fn execute_batches( cost_capacity_meter: Arc>, cost_model: &CostModel, ) -> Result<()> { - let (lock_results, sanitized_txs): (Vec<_>, Vec<_>) = batches + let ((lock_results, sanitized_txs), transaction_indexes): ((Vec<_>, Vec<_>), Vec<_>) = batches .iter() .flat_map(|batch| { batch + .batch .lock_results() .iter() .cloned() - .zip(batch.sanitized_transactions().to_vec()) + .zip(batch.batch.sanitized_transactions().to_vec()) + .zip(batch.transaction_indexes.to_vec()) }) .unzip(); @@ -352,7 +374,7 @@ fn execute_batches( let target_batch_count = get_thread_count() as u64; - let mut tx_batches: Vec = vec![]; + let mut tx_batches: Vec = vec![]; let mut tx_batch_costs: Vec = vec![]; let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) { let target_batch_cost = total_cost / target_batch_count; @@ -373,6 +395,7 @@ fn execute_batches( &sanitized_txs, slice_start, index, + &transaction_indexes, ); slice_start = next_index; tx_batches.push(tx_batch); @@ -408,6 +431,9 @@ fn execute_batches( /// 2. Process the locked group in parallel /// 3. Register the `Tick` if it's available /// 4. Update the leader scheduler, goto 1 +/// +/// This method is for use testing against a single Bank, and assumes `Bank::transaction_count()` +/// represents the number of transactions executed in this Bank pub fn process_entries_for_tests( bank: &Arc, entries: Vec, @@ -423,10 +449,24 @@ pub fn process_entries_for_tests( }; let mut timings = ExecuteTimings::default(); - let mut entries = entry::verify_transactions(entries, Arc::new(verify_transaction))?; + let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap(); + let mut replay_entries: Vec<_> = + entry::verify_transactions(entries, Arc::new(verify_transaction))? + .into_iter() + .map(|entry| { + let starting_index = entry_starting_index; + if let EntryType::Transactions(ref transactions) = entry { + entry_starting_index = entry_starting_index.saturating_add(transactions.len()); + } + ReplayEntry { + entry, + starting_index, + } + }) + .collect(); let result = process_entries_with_callback( bank, - &mut entries, + &mut replay_entries, randomize, None, transaction_status_sender, @@ -441,9 +481,10 @@ pub fn process_entries_for_tests( } // Note: If randomize is true this will shuffle entries' transactions in-place. +#[allow(clippy::too_many_arguments)] fn process_entries_with_callback( bank: &Arc, - entries: &mut [EntryType], + entries: &mut [ReplayEntry], randomize: bool, entry_callback: Option<&ProcessCallback>, transaction_status_sender: Option<&TransactionStatusSender>, @@ -458,7 +499,11 @@ fn process_entries_with_callback( let mut rng = thread_rng(); let cost_model = CostModel::new(); - for entry in entries { + for ReplayEntry { + entry, + starting_index, + } in entries + { match entry { EntryType::Tick(hash) => { // If it's a tick, save it for later @@ -489,9 +534,18 @@ fn process_entries_with_callback( .send_cost_details(bank.clone(), transactions.iter()); } - if randomize { - transactions.shuffle(&mut rng); - } + let starting_index = *starting_index; + let transaction_indexes = if randomize { + let mut transactions_and_indexes: Vec<(SanitizedTransaction, usize)> = + transactions.drain(..).zip(starting_index..).collect(); + transactions_and_indexes.shuffle(&mut rng); + let (txs, indexes): (Vec<_>, Vec<_>) = + transactions_and_indexes.into_iter().unzip(); + *transactions = txs; + indexes + } else { + (starting_index..starting_index.saturating_add(transactions.len())).collect() + }; loop { // try to lock the accounts @@ -500,7 +554,10 @@ fn process_entries_with_callback( // if locking worked if first_lock_err.is_ok() { - batches.push(batch); + batches.push(TransactionBatchWithIndexes { + batch, + transaction_indexes, + }); // done with this entry break; } @@ -968,7 +1025,18 @@ fn confirm_slot_entries( let slot = bank.slot(); let (entries, num_shreds, slot_full) = slot_entries_load_result; let num_entries = entries.len(); - let num_txs = entries.iter().map(|e| e.transactions.len()).sum::(); + let mut entry_starting_indexes = Vec::with_capacity(num_entries); + let mut entry_starting_index = progress.num_txs; + let num_txs = entries + .iter() + .map(|e| { + let num_txs = e.transactions.len(); + let next_starting_index = entry_starting_index.saturating_add(num_txs); + entry_starting_indexes.push(entry_starting_index); + entry_starting_index = next_starting_index; + num_txs + }) + .sum::(); trace!( "Fetched entries for slot {}, num_entries: {}, num_shreds: {}, num_txs: {}, slot_full: {}", slot, @@ -1035,10 +1103,19 @@ fn confirm_slot_entries( let mut replay_elapsed = Measure::start("replay_elapsed"); let mut execute_timings = ExecuteTimings::default(); let cost_capacity_meter = Arc::new(RwLock::new(BlockCostCapacityMeter::default())); + let mut replay_entries: Vec<_> = entries + .unwrap() + .into_iter() + .zip(entry_starting_indexes) + .map(|(entry, starting_index)| ReplayEntry { + entry, + starting_index, + }) + .collect(); // Note: This will shuffle entries' transactions in-place. let process_result = process_entries_with_callback( bank, - &mut entries.unwrap(), + &mut replay_entries, true, // shuffle transactions. entry_callback, transaction_status_sender, @@ -1468,6 +1545,7 @@ pub struct TransactionStatusBatch { pub balances: TransactionBalancesSet, pub token_balances: TransactionTokenBalancesSet, pub rent_debits: Vec, + pub transaction_indexes: Vec, } #[derive(Clone)] @@ -1484,6 +1562,7 @@ impl TransactionStatusSender { balances: TransactionBalancesSet, token_balances: TransactionTokenBalancesSet, rent_debits: Vec, + transaction_indexes: Vec, ) { let slot = bank.slot(); @@ -1502,6 +1581,7 @@ impl TransactionStatusSender { balances, token_balances, rent_debits, + transaction_indexes, })) { trace!( @@ -4005,6 +4085,121 @@ pub mod tests { assert_eq!(slot_2_bank.get_hash_age(&slot_2_hash), Some(0)); } + #[test] + fn test_confirm_slot_entries_progress_num_txs_indexes() { + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(100 * LAMPORTS_PER_SOL); + let genesis_hash = genesis_config.hash(); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let mut timing = ConfirmationTiming::default(); + let mut progress = ConfirmationProgress::new(genesis_hash); + let amount = genesis_config.rent.minimum_balance(0); + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let keypair3 = Keypair::new(); + let keypair4 = Keypair::new(); + bank.transfer(LAMPORTS_PER_SOL, &mint_keypair, &keypair1.pubkey()) + .unwrap(); + bank.transfer(LAMPORTS_PER_SOL, &mint_keypair, &keypair2.pubkey()) + .unwrap(); + + let (transaction_status_sender, transaction_status_receiver) = + crossbeam_channel::unbounded(); + let transaction_status_sender = TransactionStatusSender { + sender: transaction_status_sender, + }; + + let blockhash = bank.last_blockhash(); + let tx1 = system_transaction::transfer( + &keypair1, + &keypair3.pubkey(), + amount, + bank.last_blockhash(), + ); + let tx2 = system_transaction::transfer( + &keypair2, + &keypair4.pubkey(), + amount, + bank.last_blockhash(), + ); + let entry = next_entry(&blockhash, 1, vec![tx1, tx2]); + let new_hash = entry.hash; + + confirm_slot_entries( + &bank, + (vec![entry], 0, false), + &mut timing, + &mut progress, + false, + Some(&transaction_status_sender), + None, + None, + None, + &VerifyRecyclers::default(), + ) + .unwrap(); + assert_eq!(progress.num_txs, 2); + let batch = transaction_status_receiver.recv().unwrap(); + if let TransactionStatusMessage::Batch(batch) = batch { + assert_eq!(batch.transactions.len(), 2); + assert_eq!(batch.transaction_indexes.len(), 2); + // Assert contains instead of the actual vec due to randomize + assert!(batch.transaction_indexes.contains(&0)); + assert!(batch.transaction_indexes.contains(&1)); + } else { + panic!("batch should have been sent"); + } + + let tx1 = system_transaction::transfer( + &keypair1, + &keypair3.pubkey(), + amount + 1, + bank.last_blockhash(), + ); + let tx2 = system_transaction::transfer( + &keypair2, + &keypair4.pubkey(), + amount + 1, + bank.last_blockhash(), + ); + let tx3 = system_transaction::transfer( + &mint_keypair, + &Pubkey::new_unique(), + amount, + bank.last_blockhash(), + ); + let entry = next_entry(&new_hash, 1, vec![tx1, tx2, tx3]); + + confirm_slot_entries( + &bank, + (vec![entry], 0, false), + &mut timing, + &mut progress, + false, + Some(&transaction_status_sender), + None, + None, + None, + &VerifyRecyclers::default(), + ) + .unwrap(); + assert_eq!(progress.num_txs, 5); + let batch = transaction_status_receiver.recv().unwrap(); + if let TransactionStatusMessage::Batch(batch) = batch { + assert_eq!(batch.transactions.len(), 3); + assert_eq!(batch.transaction_indexes.len(), 3); + // Assert contains instead of the actual vec due to randomize + assert!(batch.transaction_indexes.contains(&2)); + assert!(batch.transaction_indexes.contains(&3)); + assert!(batch.transaction_indexes.contains(&4)); + } else { + panic!("batch should have been sent"); + } + } + #[test] fn test_rebatch_transactions() { let dummy_leader_pubkey = solana_sdk::pubkey::new_rand(); @@ -4018,6 +4213,8 @@ pub mod tests { let pubkey = solana_sdk::pubkey::new_rand(); let keypair2 = Keypair::new(); let pubkey2 = solana_sdk::pubkey::new_rand(); + let keypair3 = Keypair::new(); + let pubkey3 = solana_sdk::pubkey::new_rand(); let txs = vec![ SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( @@ -4032,20 +4229,40 @@ pub mod tests { 1, genesis_config.hash(), )), + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &keypair3, + &pubkey3, + 1, + genesis_config.hash(), + )), ]; let batch = bank.prepare_sanitized_batch(&txs); assert!(batch.needs_unlock()); + let transaction_indexes = vec![42, 43, 44]; let batch2 = rebatch_transactions( batch.lock_results(), &bank, batch.sanitized_transactions(), 0, - 1, + 0, + &transaction_indexes, ); assert!(batch.needs_unlock()); - assert!(!batch2.needs_unlock()); + assert!(!batch2.batch.needs_unlock()); + assert_eq!(batch2.transaction_indexes, vec![42]); + + let batch3 = rebatch_transactions( + batch.lock_results(), + &bank, + batch.sanitized_transactions(), + 1, + 2, + &transaction_indexes, + ); + assert!(!batch3.batch.needs_unlock()); + assert_eq!(batch3.transaction_indexes, vec![43, 44]); } #[test] diff --git a/poh/src/poh_recorder.rs b/poh/src/poh_recorder.rs index df2112b6ad..000d58ab46 100644 --- a/poh/src/poh_recorder.rs +++ b/poh/src/poh_recorder.rs @@ -80,18 +80,22 @@ impl BankStart { } } +// Sends the Result of the record operation, including the index in the slot of the first +// transaction, if being tracked by WorkingBank +type RecordResultSender = Sender>>; + pub struct Record { pub mixin: Hash, pub transactions: Vec, pub slot: Slot, - pub sender: Sender>, + pub sender: RecordResultSender, } impl Record { pub fn new( mixin: Hash, transactions: Vec, slot: Slot, - sender: Sender>, + sender: RecordResultSender, ) -> Self { Self { mixin, @@ -123,12 +127,13 @@ impl TransactionRecorder { is_exited, } } + // Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank pub fn record( &self, bank_slot: Slot, mixin: Hash, transactions: Vec, - ) -> Result<()> { + ) -> Result> { // create a new channel so that there is only 1 sender and when it goes out of scope, the receiver fails let (result_sender, result_receiver) = unbounded(); let res = @@ -191,6 +196,7 @@ pub struct WorkingBank { pub start: Arc, pub min_tick_height: u64, pub max_tick_height: u64, + pub transaction_index: Option, } #[derive(Debug, PartialEq, Eq)] @@ -495,12 +501,13 @@ impl PohRecorder { self.leader_last_tick_height = leader_last_tick_height; } - pub fn set_bank(&mut self, bank: &Arc) { + pub fn set_bank(&mut self, bank: &Arc, track_transaction_indexes: bool) { let working_bank = WorkingBank { bank: bank.clone(), start: Arc::new(Instant::now()), min_tick_height: bank.tick_height(), max_tick_height: bank.max_tick_height(), + transaction_index: track_transaction_indexes.then(|| 0), }; trace!("new working bank"); assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot()); @@ -737,12 +744,13 @@ impl PohRecorder { } } + // Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank pub fn record( &mut self, bank_slot: Slot, mixin: Hash, transactions: Vec, - ) -> Result<()> { + ) -> Result> { // Entries without transactions are used to track real-time passing in the ledger and // cannot be generated by `record()` assert!(!transactions.is_empty(), "No transactions provided"); @@ -758,7 +766,7 @@ impl PohRecorder { let working_bank = self .working_bank - .as_ref() + .as_mut() .ok_or(PohRecorderError::MaxHeightReached)?; if bank_slot != working_bank.bank.slot() { return Err(PohRecorderError::MaxHeightReached); @@ -774,6 +782,7 @@ impl PohRecorder { drop(poh_lock); if let Some(poh_entry) = record_mixin_res { + let num_transactions = transactions.len(); let (send_entry_res, send_entry_time) = measure!( { let entry = Entry { @@ -787,7 +796,15 @@ impl PohRecorder { "send_poh_entry", ); self.send_entry_us += send_entry_time.as_us(); - return Ok(send_entry_res?); + send_entry_res?; + let starting_transaction_index = + working_bank.transaction_index.map(|transaction_index| { + let next_starting_transaction_index = + transaction_index.saturating_add(num_transactions); + working_bank.transaction_index = Some(next_starting_transaction_index); + transaction_index + }); + return Ok(starting_transaction_index); } // record() might fail if the next PoH hash needs to be a tick. But that's ok, tick() @@ -954,7 +971,7 @@ pub fn create_test_recorder( &poh_config, exit.clone(), ); - poh_recorder.set_bank(bank); + poh_recorder.set_bank(bank, false); let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let poh_service = PohService::new( @@ -1092,7 +1109,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank); + poh_recorder.set_bank(&bank, false); assert!(poh_recorder.working_bank.is_some()); poh_recorder.clear_bank(); assert!(poh_recorder.working_bank.is_none()); @@ -1126,7 +1143,7 @@ mod tests { let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); // Set a working bank - poh_recorder.set_bank(&bank1); + poh_recorder.set_bank(&bank1, false); // Tick until poh_recorder.tick_height == working bank's min_tick_height let num_new_ticks = bank1.tick_height() - poh_recorder.tick_height(); @@ -1195,7 +1212,7 @@ mod tests { ); assert_eq!(poh_recorder.tick_height, bank.max_tick_height() + 1); - poh_recorder.set_bank(&bank); + poh_recorder.set_bank(&bank, false); poh_recorder.tick(); assert_eq!(poh_recorder.tick_height, bank.max_tick_height() + 2); @@ -1236,7 +1253,7 @@ mod tests { bank0.fill_bank_with_ticks_for_tests(); let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); - poh_recorder.set_bank(&bank1); + poh_recorder.set_bank(&bank1, false); // Let poh_recorder tick up to bank1.tick_height() - 1 for _ in 0..bank1.tick_height() - 1 { poh_recorder.tick() @@ -1277,7 +1294,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank); + poh_recorder.set_bank(&bank, false); let tx = test_tx(); let h1 = hash(b"hello world!"); @@ -1321,7 +1338,7 @@ mod tests { bank0.fill_bank_with_ticks_for_tests(); let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); - poh_recorder.set_bank(&bank1); + poh_recorder.set_bank(&bank1, false); // Record up to exactly min tick height let min_tick_height = poh_recorder.working_bank.as_ref().unwrap().min_tick_height; @@ -1375,7 +1392,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank); + poh_recorder.set_bank(&bank, false); let num_ticks_to_max = bank.max_tick_height() - poh_recorder.tick_height; for _ in 0..num_ticks_to_max { poh_recorder.tick(); @@ -1393,6 +1410,78 @@ mod tests { Blockstore::destroy(&ledger_path).unwrap(); } + #[test] + fn test_poh_recorder_record_transaction_index() { + let ledger_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&ledger_path) + .expect("Expected to be able to open database ledger"); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2); + let bank = Arc::new(Bank::new_for_tests(&genesis_config)); + let prev_hash = bank.last_blockhash(); + let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new( + 0, + prev_hash, + bank.clone(), + Some((4, 4)), + bank.ticks_per_slot(), + &Pubkey::default(), + &Arc::new(blockstore), + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &Arc::new(PohConfig::default()), + Arc::new(AtomicBool::default()), + ); + + poh_recorder.set_bank(&bank, true); + poh_recorder.tick(); + assert_eq!( + poh_recorder + .working_bank + .as_ref() + .unwrap() + .transaction_index + .unwrap(), + 0 + ); + + let tx0 = test_tx(); + let tx1 = test_tx(); + let h1 = hash(b"hello world!"); + let record_result = poh_recorder + .record(bank.slot(), h1, vec![tx0.into(), tx1.into()]) + .unwrap() + .unwrap(); + assert_eq!(record_result, 0); + assert_eq!( + poh_recorder + .working_bank + .as_ref() + .unwrap() + .transaction_index + .unwrap(), + 2 + ); + + let tx = test_tx(); + let h2 = hash(b"foobar"); + let record_result = poh_recorder + .record(bank.slot(), h2, vec![tx.into()]) + .unwrap() + .unwrap(); + assert_eq!(record_result, 2); + assert_eq!( + poh_recorder + .working_bank + .as_ref() + .unwrap() + .transaction_index + .unwrap(), + 3 + ); + } + Blockstore::destroy(&ledger_path).unwrap(); + } + #[test] fn test_poh_cache_on_disconnect() { let ledger_path = get_tmp_ledger_path!(); @@ -1417,7 +1506,7 @@ mod tests { bank0.fill_bank_with_ticks_for_tests(); let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); - poh_recorder.set_bank(&bank1); + poh_recorder.set_bank(&bank1, false); // Check we can make two ticks without hitting min_tick_height let remaining_ticks_to_min = @@ -1565,7 +1654,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank); + poh_recorder.set_bank(&bank, false); assert_eq!(bank.slot(), 0); poh_recorder.reset(bank, Some((4, 4))); assert!(poh_recorder.working_bank.is_none()); @@ -1597,7 +1686,7 @@ mod tests { None, Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank); + poh_recorder.set_bank(&bank, false); poh_recorder.clear_bank(); assert!(receiver.try_recv().is_ok()); } @@ -1632,7 +1721,7 @@ mod tests { Arc::new(AtomicBool::default()), ); - poh_recorder.set_bank(&bank); + poh_recorder.set_bank(&bank, false); // Simulate ticking much further than working_bank.max_tick_height let max_tick_height = poh_recorder.working_bank.as_ref().unwrap().max_tick_height; @@ -1927,7 +2016,7 @@ mod tests { // Move the bank up a slot (so that max_tick_height > slot 0's tick_height) let bank = Arc::new(Bank::new_from_parent(&bank, &Pubkey::default(), 1)); // If we set the working bank, the node should be leader within next 2 slots - poh_recorder.set_bank(&bank); + poh_recorder.set_bank(&bank, false); assert!(poh_recorder.would_be_leader(2 * bank.ticks_per_slot())); } } @@ -1961,7 +2050,7 @@ mod tests { for _ in 0..(bank.ticks_per_slot() * 3) { poh_recorder.tick(); } - poh_recorder.set_bank(&bank); + poh_recorder.set_bank(&bank, false); assert!(!bank.is_hash_valid_for_age(&genesis_hash, 0)); assert!(bank.is_hash_valid_for_age(&genesis_hash, 1)); } diff --git a/poh/src/poh_service.rs b/poh/src/poh_service.rs index 925d053dff..fda686f115 100644 --- a/poh/src/poh_service.rs +++ b/poh/src/poh_service.rs @@ -500,7 +500,7 @@ mod tests { hashes_per_batch, record_receiver, ); - poh_recorder.lock().unwrap().set_bank(&bank); + poh_recorder.lock().unwrap().set_bank(&bank, false); // get some events let mut hashes = 0; diff --git a/rpc/src/transaction_notifier_interface.rs b/rpc/src/transaction_notifier_interface.rs index 5d67674e03..ab765d1207 100644 --- a/rpc/src/transaction_notifier_interface.rs +++ b/rpc/src/transaction_notifier_interface.rs @@ -8,6 +8,7 @@ pub trait TransactionNotifier { fn notify_transaction( &self, slot: Slot, + transaction_slot_index: usize, signature: &Signature, transaction_status_meta: &TransactionStatusMeta, transaction: &SanitizedTransaction, diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index 73382de629..abdb96b175 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -74,6 +74,7 @@ impl TransactionStatusService { balances, token_balances, rent_debits, + transaction_indexes, }) => { let slot = bank.slot(); for ( @@ -84,6 +85,7 @@ impl TransactionStatusService { pre_token_balances, post_token_balances, rent_debits, + transaction_index, ) in izip!( transactions, execution_results, @@ -92,6 +94,7 @@ impl TransactionStatusService { token_balances.pre_token_balances, token_balances.post_token_balances, rent_debits, + transaction_indexes, ) { if let Some(details) = execution_result { let TransactionExecutionDetails { @@ -162,6 +165,7 @@ impl TransactionStatusService { if let Some(transaction_notifier) = transaction_notifier.as_ref() { transaction_notifier.write().unwrap().notify_transaction( slot, + transaction_index, transaction.signature(), &transaction_status_meta, &transaction, @@ -247,8 +251,20 @@ pub(crate) mod tests { }, }; + #[derive(Eq, Hash, PartialEq)] + struct TestNotifierKey { + slot: Slot, + transaction_index: usize, + signature: Signature, + } + + struct TestNotification { + _meta: TransactionStatusMeta, + transaction: SanitizedTransaction, + } + struct TestTransactionNotifier { - notifications: DashMap<(Slot, Signature), (TransactionStatusMeta, SanitizedTransaction)>, + notifications: DashMap, } impl TestTransactionNotifier { @@ -263,13 +279,21 @@ pub(crate) mod tests { fn notify_transaction( &self, slot: Slot, + transaction_index: usize, signature: &Signature, transaction_status_meta: &TransactionStatusMeta, transaction: &SanitizedTransaction, ) { self.notifications.insert( - (slot, *signature), - (transaction_status_meta.clone(), transaction.clone()), + TestNotifierKey { + slot, + transaction_index, + signature: *signature, + }, + TestNotification { + _meta: transaction_status_meta.clone(), + transaction: transaction.clone(), + }, ); } } @@ -390,6 +414,7 @@ pub(crate) mod tests { let slot = bank.slot(); let signature = *transaction.signature(); + let transaction_index: usize = bank.transaction_count().try_into().unwrap(); let transaction_status_batch = TransactionStatusBatch { bank, transactions: vec![transaction], @@ -397,6 +422,7 @@ pub(crate) mod tests { balances, token_balances, rent_debits: vec![rent_debits], + transaction_indexes: vec![transaction_index], }; let test_notifier = Arc::new(RwLock::new(TestTransactionNotifier::new())); @@ -421,9 +447,17 @@ pub(crate) mod tests { transaction_status_service.join().unwrap(); let notifier = test_notifier.read().unwrap(); assert_eq!(notifier.notifications.len(), 1); - assert!(notifier.notifications.contains_key(&(slot, signature))); + let key = TestNotifierKey { + slot, + transaction_index, + signature, + }; + assert!(notifier.notifications.contains_key(&key)); - let result = &*notifier.notifications.get(&(slot, signature)).unwrap(); - assert_eq!(expected_transaction.signature(), result.1.signature()); + let result = &*notifier.notifications.get(&key).unwrap(); + assert_eq!( + expected_transaction.signature(), + result.transaction.signature() + ); } }