Add transaction index in slot to geyser plugin TransactionInfo (#25688)

* Define shuffle to prep using same shuffle for multiple slices

* Determine transaction indexes and plumb to execute_batch

* Pair transaction_index with transaction in TransactionStatusService

* Add new ReplicaTransactionInfoVersion

* Plumb transaction_indexes through BankingStage

* Prepare BankingStage to receive transaction indexes from PohRecorder

* Determine transaction indexes in PohRecorder; add field to WorkingBank

* Add PohRecorder::record unit test

* Only pass starting_transaction_index around PohRecorder

* Add helper structs to simplify test DashMap

* Pass entry and starting-index into process_entries_with_callback together

* Add tx-index checks to test_rebatch_transactions

* Revert shuffle definition and use zip/unzip

* Only zip/unzip if randomize

* Add confirm_slot_entries test

* Review nits

* Add type alias to make sender docs more clear
This commit is contained in:
Tyera Eulberg 2022-06-23 13:37:38 -06:00 committed by GitHub
parent c01b985717
commit a6ba5a9a05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 465 additions and 69 deletions

View File

@ -358,7 +358,7 @@ fn main() {
DEFAULT_TPU_CONNECTION_POOL_SIZE,
)),
);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
// This is so that the signal_receiver does not go out of scope after the closure.
// If it is dropped before poh_service, then poh_service will error when
@ -439,7 +439,7 @@ fn main() {
std::u64::MAX,
);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
assert!(poh_recorder.lock().unwrap().bank().is_some());
if bank.slot() > 32 {
leader_schedule_cache.set_root(&bank);

View File

@ -233,7 +233,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
Arc::new(RwLock::new(CostModel::default())),
Arc::new(ConnectionCache::default()),
);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let chunk_len = verified.len() / CHUNKS;
let mut start = 0;

View File

@ -110,6 +110,8 @@ struct RecordTransactionsSummary {
record_transactions_timings: RecordTransactionsTimings,
// Result of trying to record the transactions into the PoH stream
result: Result<(), PohRecorderError>,
// Index in the slot of the first transaction recorded
starting_transaction_index: Option<usize>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
@ -1203,6 +1205,7 @@ impl BankingStage {
recorder: &TransactionRecorder,
) -> RecordTransactionsSummary {
let mut record_transactions_timings = RecordTransactionsTimings::default();
let mut starting_transaction_index = None;
if !transactions.is_empty() {
let num_to_record = transactions.len();
@ -1217,7 +1220,9 @@ impl BankingStage {
record_transactions_timings.poh_record_us = poh_record_time.as_us();
match res {
Ok(()) => (),
Ok(starting_index) => {
starting_transaction_index = starting_index;
}
Err(PohRecorderError::MaxHeightReached) => {
inc_new_counter_info!("banking_stage-max_height_reached", 1);
inc_new_counter_info!(
@ -1227,6 +1232,7 @@ impl BankingStage {
return RecordTransactionsSummary {
record_transactions_timings,
result: Err(PohRecorderError::MaxHeightReached),
starting_transaction_index: None,
};
}
Err(e) => panic!("Poh recorder returned unexpected error: {:?}", e),
@ -1236,6 +1242,7 @@ impl BankingStage {
RecordTransactionsSummary {
record_transactions_timings,
result: Ok(()),
starting_transaction_index,
}
}
@ -1328,6 +1335,7 @@ impl BankingStage {
let RecordTransactionsSummary {
result: record_transactions_result,
record_transactions_timings,
starting_transaction_index,
} = record_transactions_summary;
execute_and_commit_timings.record_transactions_timings = RecordTransactionsTimings {
execution_results_to_transactions_us: execution_results_to_transactions_time.as_us(),
@ -1406,6 +1414,20 @@ impl BankingStage {
let post_balances = bank.collect_balances(batch);
let post_token_balances =
collect_token_balances(bank, batch, &mut mint_decimals);
let mut transaction_index = starting_transaction_index.unwrap_or_default();
let batch_transaction_indexes: Vec<_> = tx_results
.execution_results
.iter()
.map(|result| {
if result.was_executed() {
let this_transaction_index = transaction_index;
saturating_add_assign!(transaction_index, 1);
this_transaction_index
} else {
0
}
})
.collect();
transaction_status_sender.send_transaction_status_batch(
bank.clone(),
txs,
@ -1416,6 +1438,7 @@ impl BankingStage {
post_token_balances,
),
tx_results.rent_debits,
batch_transaction_indexes,
);
}
},
@ -2692,7 +2715,7 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
@ -2914,7 +2937,7 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let process_transactions_batch_output = BankingStage::process_and_record_transactions(
@ -3049,7 +3072,7 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let process_transactions_batch_output = BankingStage::process_and_record_transactions(
@ -3122,7 +3145,7 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1);
@ -3272,7 +3295,7 @@ mod tests {
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
@ -3472,7 +3495,7 @@ mod tests {
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
@ -3679,7 +3702,7 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
@ -3840,7 +3863,7 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let shreds = entries_to_test_shreds(&entries, bank.slot(), 0, true, 0);
blockstore.insert_shreds(shreds, None, false).unwrap();
@ -3997,7 +4020,7 @@ mod tests {
// Processes one packet per iteration of the loop
let num_packets_to_process_per_iteration = num_conflicting_transactions;
for num_expected_unprocessed in (0..num_conflicting_transactions).rev() {
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
BankingStage::consume_buffered_packets(
&Pubkey::default(),
max_tx_processing_ns,
@ -4046,7 +4069,7 @@ mod tests {
// each iteration of this loop will process one element of the batch per iteration of the
// loop.
let interrupted_iteration = 1;
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
let poh_recorder_ = poh_recorder.clone();
let recorder = poh_recorder_.lock().unwrap().recorder();
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();

View File

@ -858,6 +858,7 @@ impl ReplayStage {
&retransmit_slots_sender,
&mut skipped_slots_info,
has_new_vote_been_rooted,
transaction_status_sender.is_some(),
);
let poh_bank = poh_recorder.lock().unwrap().bank();
@ -1544,6 +1545,7 @@ impl ReplayStage {
retransmit_slots_sender: &RetransmitSlotsSender,
skipped_slots_info: &mut SkippedSlotsInfo,
has_new_vote_been_rooted: bool,
track_transaction_indexes: bool,
) {
// all the individual calls to poh_recorder.lock() are designed to
// increase granularity, decrease contention
@ -1659,7 +1661,10 @@ impl ReplayStage {
);
let tpu_bank = bank_forks.write().unwrap().insert(tpu_bank);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
poh_recorder
.lock()
.unwrap()
.set_bank(&tpu_bank, track_transaction_indexes);
} else {
error!("{} No next leader found", my_pubkey);
}

View File

@ -96,12 +96,32 @@ pub struct ReplicaTransactionInfo<'a> {
pub transaction_status_meta: &'a TransactionStatusMeta,
}
/// Information about a transaction, including index in block
#[derive(Clone, Debug)]
pub struct ReplicaTransactionInfoV2<'a> {
/// The first signature of the transaction, used for identifying the transaction.
pub signature: &'a Signature,
/// Indicates if the transaction is a simple vote transaction.
pub is_vote: bool,
/// The sanitized transaction.
pub transaction: &'a SanitizedTransaction,
/// Metadata of the transaction status.
pub transaction_status_meta: &'a TransactionStatusMeta,
/// The transaction's index in the block
pub index: usize,
}
/// A wrapper to future-proof ReplicaTransactionInfo handling.
/// If there were a change to the structure of ReplicaTransactionInfo,
/// there would be new enum entry for the newer version, forcing
/// plugin implementations to handle the change.
pub enum ReplicaTransactionInfoVersions<'a> {
V0_0_1(&'a ReplicaTransactionInfo<'a>),
V0_0_2(&'a ReplicaTransactionInfoV2<'a>),
}
#[derive(Clone, Debug)]

View File

@ -3,7 +3,7 @@ use {
crate::geyser_plugin_manager::GeyserPluginManager,
log::*,
solana_geyser_plugin_interface::geyser_plugin_interface::{
ReplicaTransactionInfo, ReplicaTransactionInfoVersions,
ReplicaTransactionInfoV2, ReplicaTransactionInfoVersions,
},
solana_measure::measure::Measure,
solana_metrics::*,
@ -25,13 +25,18 @@ impl TransactionNotifier for TransactionNotifierImpl {
fn notify_transaction(
&self,
slot: Slot,
index: usize,
signature: &Signature,
transaction_status_meta: &TransactionStatusMeta,
transaction: &SanitizedTransaction,
) {
let mut measure = Measure::start("geyser-plugin-notify_plugins_of_transaction_info");
let transaction_log_info =
Self::build_replica_transaction_info(signature, transaction_status_meta, transaction);
let transaction_log_info = Self::build_replica_transaction_info(
index,
signature,
transaction_status_meta,
transaction,
);
let mut plugin_manager = self.plugin_manager.write().unwrap();
@ -44,7 +49,7 @@ impl TransactionNotifier for TransactionNotifierImpl {
continue;
}
match plugin.notify_transaction(
ReplicaTransactionInfoVersions::V0_0_1(&transaction_log_info),
ReplicaTransactionInfoVersions::V0_0_2(&transaction_log_info),
slot,
) {
Err(err) => {
@ -78,11 +83,13 @@ impl TransactionNotifierImpl {
}
fn build_replica_transaction_info<'a>(
index: usize,
signature: &'a Signature,
transaction_status_meta: &'a TransactionStatusMeta,
transaction: &'a SanitizedTransaction,
) -> ReplicaTransactionInfo<'a> {
ReplicaTransactionInfo {
) -> ReplicaTransactionInfoV2<'a> {
ReplicaTransactionInfoV2 {
index,
signature,
is_vote: transaction.is_simple_vote_transaction(),
transaction,

View File

@ -93,6 +93,16 @@ impl BlockCostCapacityMeter {
}
}
struct TransactionBatchWithIndexes<'a, 'b> {
pub batch: TransactionBatch<'a, 'b>,
pub transaction_indexes: Vec<usize>,
}
struct ReplayEntry {
entry: EntryType,
starting_index: usize,
}
// get_max_thread_count to match number of threads in the old code.
// see: https://github.com/solana-labs/solana/pull/24853
lazy_static! {
@ -157,7 +167,7 @@ fn aggregate_total_execution_units(execute_timings: &ExecuteTimings) -> u64 {
}
fn execute_batch(
batch: &TransactionBatch,
batch: &TransactionBatchWithIndexes,
bank: &Arc<Bank>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
@ -165,6 +175,10 @@ fn execute_batch(
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
tx_cost: u64,
) -> Result<()> {
let TransactionBatchWithIndexes {
batch,
transaction_indexes,
} = batch;
let record_token_balances = transaction_status_sender.is_some();
let mut mint_decimals: HashMap<Pubkey, u8> = HashMap::new();
@ -244,6 +258,7 @@ fn execute_batch(
balances,
token_balances,
rent_debits,
transaction_indexes.to_vec(),
);
}
@ -253,7 +268,7 @@ fn execute_batch(
fn execute_batches_internal(
bank: &Arc<Bank>,
batches: &[TransactionBatch],
batches: &[TransactionBatchWithIndexes],
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
@ -300,18 +315,23 @@ fn rebatch_transactions<'a>(
sanitized_txs: &'a [SanitizedTransaction],
start: usize,
end: usize,
) -> TransactionBatch<'a, 'a> {
transaction_indexes: &'a [usize],
) -> TransactionBatchWithIndexes<'a, 'a> {
let txs = &sanitized_txs[start..=end];
let results = &lock_results[start..=end];
let mut tx_batch = TransactionBatch::new(results.to_vec(), bank, Cow::from(txs));
tx_batch.set_needs_unlock(false);
tx_batch
let transaction_indexes = transaction_indexes[start..=end].to_vec();
TransactionBatchWithIndexes {
batch: tx_batch,
transaction_indexes,
}
}
fn execute_batches(
bank: &Arc<Bank>,
batches: &[TransactionBatch],
batches: &[TransactionBatchWithIndexes],
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
@ -319,14 +339,16 @@ fn execute_batches(
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
cost_model: &CostModel,
) -> Result<()> {
let (lock_results, sanitized_txs): (Vec<_>, Vec<_>) = batches
let ((lock_results, sanitized_txs), transaction_indexes): ((Vec<_>, Vec<_>), Vec<_>) = batches
.iter()
.flat_map(|batch| {
batch
.batch
.lock_results()
.iter()
.cloned()
.zip(batch.sanitized_transactions().to_vec())
.zip(batch.batch.sanitized_transactions().to_vec())
.zip(batch.transaction_indexes.to_vec())
})
.unzip();
@ -352,7 +374,7 @@ fn execute_batches(
let target_batch_count = get_thread_count() as u64;
let mut tx_batches: Vec<TransactionBatch> = vec![];
let mut tx_batches: Vec<TransactionBatchWithIndexes> = vec![];
let mut tx_batch_costs: Vec<u64> = vec![];
let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {
let target_batch_cost = total_cost / target_batch_count;
@ -373,6 +395,7 @@ fn execute_batches(
&sanitized_txs,
slice_start,
index,
&transaction_indexes,
);
slice_start = next_index;
tx_batches.push(tx_batch);
@ -408,6 +431,9 @@ fn execute_batches(
/// 2. Process the locked group in parallel
/// 3. Register the `Tick` if it's available
/// 4. Update the leader scheduler, goto 1
///
/// This method is for use testing against a single Bank, and assumes `Bank::transaction_count()`
/// represents the number of transactions executed in this Bank
pub fn process_entries_for_tests(
bank: &Arc<Bank>,
entries: Vec<Entry>,
@ -423,10 +449,24 @@ pub fn process_entries_for_tests(
};
let mut timings = ExecuteTimings::default();
let mut entries = entry::verify_transactions(entries, Arc::new(verify_transaction))?;
let mut entry_starting_index: usize = bank.transaction_count().try_into().unwrap();
let mut replay_entries: Vec<_> =
entry::verify_transactions(entries, Arc::new(verify_transaction))?
.into_iter()
.map(|entry| {
let starting_index = entry_starting_index;
if let EntryType::Transactions(ref transactions) = entry {
entry_starting_index = entry_starting_index.saturating_add(transactions.len());
}
ReplayEntry {
entry,
starting_index,
}
})
.collect();
let result = process_entries_with_callback(
bank,
&mut entries,
&mut replay_entries,
randomize,
None,
transaction_status_sender,
@ -441,9 +481,10 @@ pub fn process_entries_for_tests(
}
// Note: If randomize is true this will shuffle entries' transactions in-place.
#[allow(clippy::too_many_arguments)]
fn process_entries_with_callback(
bank: &Arc<Bank>,
entries: &mut [EntryType],
entries: &mut [ReplayEntry],
randomize: bool,
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<&TransactionStatusSender>,
@ -458,7 +499,11 @@ fn process_entries_with_callback(
let mut rng = thread_rng();
let cost_model = CostModel::new();
for entry in entries {
for ReplayEntry {
entry,
starting_index,
} in entries
{
match entry {
EntryType::Tick(hash) => {
// If it's a tick, save it for later
@ -489,9 +534,18 @@ fn process_entries_with_callback(
.send_cost_details(bank.clone(), transactions.iter());
}
if randomize {
transactions.shuffle(&mut rng);
}
let starting_index = *starting_index;
let transaction_indexes = if randomize {
let mut transactions_and_indexes: Vec<(SanitizedTransaction, usize)> =
transactions.drain(..).zip(starting_index..).collect();
transactions_and_indexes.shuffle(&mut rng);
let (txs, indexes): (Vec<_>, Vec<_>) =
transactions_and_indexes.into_iter().unzip();
*transactions = txs;
indexes
} else {
(starting_index..starting_index.saturating_add(transactions.len())).collect()
};
loop {
// try to lock the accounts
@ -500,7 +554,10 @@ fn process_entries_with_callback(
// if locking worked
if first_lock_err.is_ok() {
batches.push(batch);
batches.push(TransactionBatchWithIndexes {
batch,
transaction_indexes,
});
// done with this entry
break;
}
@ -968,7 +1025,18 @@ fn confirm_slot_entries(
let slot = bank.slot();
let (entries, num_shreds, slot_full) = slot_entries_load_result;
let num_entries = entries.len();
let num_txs = entries.iter().map(|e| e.transactions.len()).sum::<usize>();
let mut entry_starting_indexes = Vec::with_capacity(num_entries);
let mut entry_starting_index = progress.num_txs;
let num_txs = entries
.iter()
.map(|e| {
let num_txs = e.transactions.len();
let next_starting_index = entry_starting_index.saturating_add(num_txs);
entry_starting_indexes.push(entry_starting_index);
entry_starting_index = next_starting_index;
num_txs
})
.sum::<usize>();
trace!(
"Fetched entries for slot {}, num_entries: {}, num_shreds: {}, num_txs: {}, slot_full: {}",
slot,
@ -1035,10 +1103,19 @@ fn confirm_slot_entries(
let mut replay_elapsed = Measure::start("replay_elapsed");
let mut execute_timings = ExecuteTimings::default();
let cost_capacity_meter = Arc::new(RwLock::new(BlockCostCapacityMeter::default()));
let mut replay_entries: Vec<_> = entries
.unwrap()
.into_iter()
.zip(entry_starting_indexes)
.map(|(entry, starting_index)| ReplayEntry {
entry,
starting_index,
})
.collect();
// Note: This will shuffle entries' transactions in-place.
let process_result = process_entries_with_callback(
bank,
&mut entries.unwrap(),
&mut replay_entries,
true, // shuffle transactions.
entry_callback,
transaction_status_sender,
@ -1468,6 +1545,7 @@ pub struct TransactionStatusBatch {
pub balances: TransactionBalancesSet,
pub token_balances: TransactionTokenBalancesSet,
pub rent_debits: Vec<RentDebits>,
pub transaction_indexes: Vec<usize>,
}
#[derive(Clone)]
@ -1484,6 +1562,7 @@ impl TransactionStatusSender {
balances: TransactionBalancesSet,
token_balances: TransactionTokenBalancesSet,
rent_debits: Vec<RentDebits>,
transaction_indexes: Vec<usize>,
) {
let slot = bank.slot();
@ -1502,6 +1581,7 @@ impl TransactionStatusSender {
balances,
token_balances,
rent_debits,
transaction_indexes,
}))
{
trace!(
@ -4005,6 +4085,121 @@ pub mod tests {
assert_eq!(slot_2_bank.get_hash_age(&slot_2_hash), Some(0));
}
#[test]
fn test_confirm_slot_entries_progress_num_txs_indexes() {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(100 * LAMPORTS_PER_SOL);
let genesis_hash = genesis_config.hash();
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let mut timing = ConfirmationTiming::default();
let mut progress = ConfirmationProgress::new(genesis_hash);
let amount = genesis_config.rent.minimum_balance(0);
let keypair1 = Keypair::new();
let keypair2 = Keypair::new();
let keypair3 = Keypair::new();
let keypair4 = Keypair::new();
bank.transfer(LAMPORTS_PER_SOL, &mint_keypair, &keypair1.pubkey())
.unwrap();
bank.transfer(LAMPORTS_PER_SOL, &mint_keypair, &keypair2.pubkey())
.unwrap();
let (transaction_status_sender, transaction_status_receiver) =
crossbeam_channel::unbounded();
let transaction_status_sender = TransactionStatusSender {
sender: transaction_status_sender,
};
let blockhash = bank.last_blockhash();
let tx1 = system_transaction::transfer(
&keypair1,
&keypair3.pubkey(),
amount,
bank.last_blockhash(),
);
let tx2 = system_transaction::transfer(
&keypair2,
&keypair4.pubkey(),
amount,
bank.last_blockhash(),
);
let entry = next_entry(&blockhash, 1, vec![tx1, tx2]);
let new_hash = entry.hash;
confirm_slot_entries(
&bank,
(vec![entry], 0, false),
&mut timing,
&mut progress,
false,
Some(&transaction_status_sender),
None,
None,
None,
&VerifyRecyclers::default(),
)
.unwrap();
assert_eq!(progress.num_txs, 2);
let batch = transaction_status_receiver.recv().unwrap();
if let TransactionStatusMessage::Batch(batch) = batch {
assert_eq!(batch.transactions.len(), 2);
assert_eq!(batch.transaction_indexes.len(), 2);
// Assert contains instead of the actual vec due to randomize
assert!(batch.transaction_indexes.contains(&0));
assert!(batch.transaction_indexes.contains(&1));
} else {
panic!("batch should have been sent");
}
let tx1 = system_transaction::transfer(
&keypair1,
&keypair3.pubkey(),
amount + 1,
bank.last_blockhash(),
);
let tx2 = system_transaction::transfer(
&keypair2,
&keypair4.pubkey(),
amount + 1,
bank.last_blockhash(),
);
let tx3 = system_transaction::transfer(
&mint_keypair,
&Pubkey::new_unique(),
amount,
bank.last_blockhash(),
);
let entry = next_entry(&new_hash, 1, vec![tx1, tx2, tx3]);
confirm_slot_entries(
&bank,
(vec![entry], 0, false),
&mut timing,
&mut progress,
false,
Some(&transaction_status_sender),
None,
None,
None,
&VerifyRecyclers::default(),
)
.unwrap();
assert_eq!(progress.num_txs, 5);
let batch = transaction_status_receiver.recv().unwrap();
if let TransactionStatusMessage::Batch(batch) = batch {
assert_eq!(batch.transactions.len(), 3);
assert_eq!(batch.transaction_indexes.len(), 3);
// Assert contains instead of the actual vec due to randomize
assert!(batch.transaction_indexes.contains(&2));
assert!(batch.transaction_indexes.contains(&3));
assert!(batch.transaction_indexes.contains(&4));
} else {
panic!("batch should have been sent");
}
}
#[test]
fn test_rebatch_transactions() {
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
@ -4018,6 +4213,8 @@ pub mod tests {
let pubkey = solana_sdk::pubkey::new_rand();
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let keypair3 = Keypair::new();
let pubkey3 = solana_sdk::pubkey::new_rand();
let txs = vec![
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
@ -4032,20 +4229,40 @@ pub mod tests {
1,
genesis_config.hash(),
)),
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&keypair3,
&pubkey3,
1,
genesis_config.hash(),
)),
];
let batch = bank.prepare_sanitized_batch(&txs);
assert!(batch.needs_unlock());
let transaction_indexes = vec![42, 43, 44];
let batch2 = rebatch_transactions(
batch.lock_results(),
&bank,
batch.sanitized_transactions(),
0,
1,
0,
&transaction_indexes,
);
assert!(batch.needs_unlock());
assert!(!batch2.needs_unlock());
assert!(!batch2.batch.needs_unlock());
assert_eq!(batch2.transaction_indexes, vec![42]);
let batch3 = rebatch_transactions(
batch.lock_results(),
&bank,
batch.sanitized_transactions(),
1,
2,
&transaction_indexes,
);
assert!(!batch3.batch.needs_unlock());
assert_eq!(batch3.transaction_indexes, vec![43, 44]);
}
#[test]

View File

@ -80,18 +80,22 @@ impl BankStart {
}
}
// Sends the Result of the record operation, including the index in the slot of the first
// transaction, if being tracked by WorkingBank
type RecordResultSender = Sender<Result<Option<usize>>>;
pub struct Record {
pub mixin: Hash,
pub transactions: Vec<VersionedTransaction>,
pub slot: Slot,
pub sender: Sender<Result<()>>,
pub sender: RecordResultSender,
}
impl Record {
pub fn new(
mixin: Hash,
transactions: Vec<VersionedTransaction>,
slot: Slot,
sender: Sender<Result<()>>,
sender: RecordResultSender,
) -> Self {
Self {
mixin,
@ -123,12 +127,13 @@ impl TransactionRecorder {
is_exited,
}
}
// Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank
pub fn record(
&self,
bank_slot: Slot,
mixin: Hash,
transactions: Vec<VersionedTransaction>,
) -> Result<()> {
) -> Result<Option<usize>> {
// create a new channel so that there is only 1 sender and when it goes out of scope, the receiver fails
let (result_sender, result_receiver) = unbounded();
let res =
@ -191,6 +196,7 @@ pub struct WorkingBank {
pub start: Arc<Instant>,
pub min_tick_height: u64,
pub max_tick_height: u64,
pub transaction_index: Option<usize>,
}
#[derive(Debug, PartialEq, Eq)]
@ -495,12 +501,13 @@ impl PohRecorder {
self.leader_last_tick_height = leader_last_tick_height;
}
pub fn set_bank(&mut self, bank: &Arc<Bank>) {
pub fn set_bank(&mut self, bank: &Arc<Bank>, track_transaction_indexes: bool) {
let working_bank = WorkingBank {
bank: bank.clone(),
start: Arc::new(Instant::now()),
min_tick_height: bank.tick_height(),
max_tick_height: bank.max_tick_height(),
transaction_index: track_transaction_indexes.then(|| 0),
};
trace!("new working bank");
assert_eq!(working_bank.bank.ticks_per_slot(), self.ticks_per_slot());
@ -737,12 +744,13 @@ impl PohRecorder {
}
}
// Returns the index of `transactions.first()` in the slot, if being tracked by WorkingBank
pub fn record(
&mut self,
bank_slot: Slot,
mixin: Hash,
transactions: Vec<VersionedTransaction>,
) -> Result<()> {
) -> Result<Option<usize>> {
// Entries without transactions are used to track real-time passing in the ledger and
// cannot be generated by `record()`
assert!(!transactions.is_empty(), "No transactions provided");
@ -758,7 +766,7 @@ impl PohRecorder {
let working_bank = self
.working_bank
.as_ref()
.as_mut()
.ok_or(PohRecorderError::MaxHeightReached)?;
if bank_slot != working_bank.bank.slot() {
return Err(PohRecorderError::MaxHeightReached);
@ -774,6 +782,7 @@ impl PohRecorder {
drop(poh_lock);
if let Some(poh_entry) = record_mixin_res {
let num_transactions = transactions.len();
let (send_entry_res, send_entry_time) = measure!(
{
let entry = Entry {
@ -787,7 +796,15 @@ impl PohRecorder {
"send_poh_entry",
);
self.send_entry_us += send_entry_time.as_us();
return Ok(send_entry_res?);
send_entry_res?;
let starting_transaction_index =
working_bank.transaction_index.map(|transaction_index| {
let next_starting_transaction_index =
transaction_index.saturating_add(num_transactions);
working_bank.transaction_index = Some(next_starting_transaction_index);
transaction_index
});
return Ok(starting_transaction_index);
}
// record() might fail if the next PoH hash needs to be a tick. But that's ok, tick()
@ -954,7 +971,7 @@ pub fn create_test_recorder(
&poh_config,
exit.clone(),
);
poh_recorder.set_bank(bank);
poh_recorder.set_bank(bank, false);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(
@ -1092,7 +1109,7 @@ mod tests {
Arc::new(AtomicBool::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.set_bank(&bank, false);
assert!(poh_recorder.working_bank.is_some());
poh_recorder.clear_bank();
assert!(poh_recorder.working_bank.is_none());
@ -1126,7 +1143,7 @@ mod tests {
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
// Set a working bank
poh_recorder.set_bank(&bank1);
poh_recorder.set_bank(&bank1, false);
// Tick until poh_recorder.tick_height == working bank's min_tick_height
let num_new_ticks = bank1.tick_height() - poh_recorder.tick_height();
@ -1195,7 +1212,7 @@ mod tests {
);
assert_eq!(poh_recorder.tick_height, bank.max_tick_height() + 1);
poh_recorder.set_bank(&bank);
poh_recorder.set_bank(&bank, false);
poh_recorder.tick();
assert_eq!(poh_recorder.tick_height, bank.max_tick_height() + 2);
@ -1236,7 +1253,7 @@ mod tests {
bank0.fill_bank_with_ticks_for_tests();
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
poh_recorder.set_bank(&bank1);
poh_recorder.set_bank(&bank1, false);
// Let poh_recorder tick up to bank1.tick_height() - 1
for _ in 0..bank1.tick_height() - 1 {
poh_recorder.tick()
@ -1277,7 +1294,7 @@ mod tests {
Arc::new(AtomicBool::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.set_bank(&bank, false);
let tx = test_tx();
let h1 = hash(b"hello world!");
@ -1321,7 +1338,7 @@ mod tests {
bank0.fill_bank_with_ticks_for_tests();
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
poh_recorder.set_bank(&bank1);
poh_recorder.set_bank(&bank1, false);
// Record up to exactly min tick height
let min_tick_height = poh_recorder.working_bank.as_ref().unwrap().min_tick_height;
@ -1375,7 +1392,7 @@ mod tests {
Arc::new(AtomicBool::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.set_bank(&bank, false);
let num_ticks_to_max = bank.max_tick_height() - poh_recorder.tick_height;
for _ in 0..num_ticks_to_max {
poh_recorder.tick();
@ -1393,6 +1410,78 @@ mod tests {
Blockstore::destroy(&ledger_path).unwrap();
}
#[test]
fn test_poh_recorder_record_transaction_index() {
let ledger_path = get_tmp_ledger_path!();
{
let blockstore = Blockstore::open(&ledger_path)
.expect("Expected to be able to open database ledger");
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(2);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
let prev_hash = bank.last_blockhash();
let (mut poh_recorder, _entry_receiver, _record_receiver) = PohRecorder::new(
0,
prev_hash,
bank.clone(),
Some((4, 4)),
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
Arc::new(AtomicBool::default()),
);
poh_recorder.set_bank(&bank, true);
poh_recorder.tick();
assert_eq!(
poh_recorder
.working_bank
.as_ref()
.unwrap()
.transaction_index
.unwrap(),
0
);
let tx0 = test_tx();
let tx1 = test_tx();
let h1 = hash(b"hello world!");
let record_result = poh_recorder
.record(bank.slot(), h1, vec![tx0.into(), tx1.into()])
.unwrap()
.unwrap();
assert_eq!(record_result, 0);
assert_eq!(
poh_recorder
.working_bank
.as_ref()
.unwrap()
.transaction_index
.unwrap(),
2
);
let tx = test_tx();
let h2 = hash(b"foobar");
let record_result = poh_recorder
.record(bank.slot(), h2, vec![tx.into()])
.unwrap()
.unwrap();
assert_eq!(record_result, 2);
assert_eq!(
poh_recorder
.working_bank
.as_ref()
.unwrap()
.transaction_index
.unwrap(),
3
);
}
Blockstore::destroy(&ledger_path).unwrap();
}
#[test]
fn test_poh_cache_on_disconnect() {
let ledger_path = get_tmp_ledger_path!();
@ -1417,7 +1506,7 @@ mod tests {
bank0.fill_bank_with_ticks_for_tests();
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
poh_recorder.set_bank(&bank1);
poh_recorder.set_bank(&bank1, false);
// Check we can make two ticks without hitting min_tick_height
let remaining_ticks_to_min =
@ -1565,7 +1654,7 @@ mod tests {
Arc::new(AtomicBool::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.set_bank(&bank, false);
assert_eq!(bank.slot(), 0);
poh_recorder.reset(bank, Some((4, 4)));
assert!(poh_recorder.working_bank.is_none());
@ -1597,7 +1686,7 @@ mod tests {
None,
Arc::new(AtomicBool::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.set_bank(&bank, false);
poh_recorder.clear_bank();
assert!(receiver.try_recv().is_ok());
}
@ -1632,7 +1721,7 @@ mod tests {
Arc::new(AtomicBool::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.set_bank(&bank, false);
// Simulate ticking much further than working_bank.max_tick_height
let max_tick_height = poh_recorder.working_bank.as_ref().unwrap().max_tick_height;
@ -1927,7 +2016,7 @@ mod tests {
// Move the bank up a slot (so that max_tick_height > slot 0's tick_height)
let bank = Arc::new(Bank::new_from_parent(&bank, &Pubkey::default(), 1));
// If we set the working bank, the node should be leader within next 2 slots
poh_recorder.set_bank(&bank);
poh_recorder.set_bank(&bank, false);
assert!(poh_recorder.would_be_leader(2 * bank.ticks_per_slot()));
}
}
@ -1961,7 +2050,7 @@ mod tests {
for _ in 0..(bank.ticks_per_slot() * 3) {
poh_recorder.tick();
}
poh_recorder.set_bank(&bank);
poh_recorder.set_bank(&bank, false);
assert!(!bank.is_hash_valid_for_age(&genesis_hash, 0));
assert!(bank.is_hash_valid_for_age(&genesis_hash, 1));
}

View File

@ -500,7 +500,7 @@ mod tests {
hashes_per_batch,
record_receiver,
);
poh_recorder.lock().unwrap().set_bank(&bank);
poh_recorder.lock().unwrap().set_bank(&bank, false);
// get some events
let mut hashes = 0;

View File

@ -8,6 +8,7 @@ pub trait TransactionNotifier {
fn notify_transaction(
&self,
slot: Slot,
transaction_slot_index: usize,
signature: &Signature,
transaction_status_meta: &TransactionStatusMeta,
transaction: &SanitizedTransaction,

View File

@ -74,6 +74,7 @@ impl TransactionStatusService {
balances,
token_balances,
rent_debits,
transaction_indexes,
}) => {
let slot = bank.slot();
for (
@ -84,6 +85,7 @@ impl TransactionStatusService {
pre_token_balances,
post_token_balances,
rent_debits,
transaction_index,
) in izip!(
transactions,
execution_results,
@ -92,6 +94,7 @@ impl TransactionStatusService {
token_balances.pre_token_balances,
token_balances.post_token_balances,
rent_debits,
transaction_indexes,
) {
if let Some(details) = execution_result {
let TransactionExecutionDetails {
@ -162,6 +165,7 @@ impl TransactionStatusService {
if let Some(transaction_notifier) = transaction_notifier.as_ref() {
transaction_notifier.write().unwrap().notify_transaction(
slot,
transaction_index,
transaction.signature(),
&transaction_status_meta,
&transaction,
@ -247,8 +251,20 @@ pub(crate) mod tests {
},
};
#[derive(Eq, Hash, PartialEq)]
struct TestNotifierKey {
slot: Slot,
transaction_index: usize,
signature: Signature,
}
struct TestNotification {
_meta: TransactionStatusMeta,
transaction: SanitizedTransaction,
}
struct TestTransactionNotifier {
notifications: DashMap<(Slot, Signature), (TransactionStatusMeta, SanitizedTransaction)>,
notifications: DashMap<TestNotifierKey, TestNotification>,
}
impl TestTransactionNotifier {
@ -263,13 +279,21 @@ pub(crate) mod tests {
fn notify_transaction(
&self,
slot: Slot,
transaction_index: usize,
signature: &Signature,
transaction_status_meta: &TransactionStatusMeta,
transaction: &SanitizedTransaction,
) {
self.notifications.insert(
(slot, *signature),
(transaction_status_meta.clone(), transaction.clone()),
TestNotifierKey {
slot,
transaction_index,
signature: *signature,
},
TestNotification {
_meta: transaction_status_meta.clone(),
transaction: transaction.clone(),
},
);
}
}
@ -390,6 +414,7 @@ pub(crate) mod tests {
let slot = bank.slot();
let signature = *transaction.signature();
let transaction_index: usize = bank.transaction_count().try_into().unwrap();
let transaction_status_batch = TransactionStatusBatch {
bank,
transactions: vec![transaction],
@ -397,6 +422,7 @@ pub(crate) mod tests {
balances,
token_balances,
rent_debits: vec![rent_debits],
transaction_indexes: vec![transaction_index],
};
let test_notifier = Arc::new(RwLock::new(TestTransactionNotifier::new()));
@ -421,9 +447,17 @@ pub(crate) mod tests {
transaction_status_service.join().unwrap();
let notifier = test_notifier.read().unwrap();
assert_eq!(notifier.notifications.len(), 1);
assert!(notifier.notifications.contains_key(&(slot, signature)));
let key = TestNotifierKey {
slot,
transaction_index,
signature,
};
assert!(notifier.notifications.contains_key(&key));
let result = &*notifier.notifications.get(&(slot, signature)).unwrap();
assert_eq!(expected_transaction.signature(), result.1.signature());
let result = &*notifier.notifications.get(&key).unwrap();
assert_eq!(
expected_transaction.signature(),
result.transaction.signature()
);
}
}