diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 7c7676e150..4947256932 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -151,6 +151,7 @@ fn main() { &poh_recorder, verified_receiver, vote_receiver, + None, ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index ee8954aef2..6da3c8fbb5 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -80,6 +80,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &poh_recorder, &mut packets, 10_000, + None, ); }); @@ -195,6 +196,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { &poh_recorder, verified_receiver, vote_receiver, + None, ); poh_recorder.lock().unwrap().set_bank(&bank); @@ -286,7 +288,7 @@ fn simulate_process_entries( hash: next_hash(&bank.last_blockhash(), 1, &tx_vector), transactions: tx_vector, }; - process_entries(&bank, &vec![entry], randomize_txs).unwrap(); + process_entries(&bank, &vec![entry], randomize_txs, None).unwrap(); } fn bench_process_entries(randomize_txs: bool, bencher: &mut Bencher) { diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index ac50b44144..d5b7110eab 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -12,12 +12,14 @@ use crate::{ use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use solana_ledger::{ - blocktree::Blocktree, entry::hash_transactions, leader_schedule_cache::LeaderScheduleCache, + blocktree::Blocktree, + blocktree_processor::{send_transaction_status_batch, TransactionStatusSender}, + entry::hash_transactions, + leader_schedule_cache::LeaderScheduleCache, }; use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_debug, inc_new_counter_info, inc_new_counter_warn}; -use solana_perf::cuda_runtime::PinnedVec; -use solana_perf::perf_libs; +use solana_perf::{cuda_runtime::PinnedVec, perf_libs}; use solana_runtime::{accounts_db::ErrorCounters, bank::Bank, transaction_batch::TransactionBatch}; use solana_sdk::{ clock::{ @@ -73,6 +75,7 @@ impl BankingStage { poh_recorder: &Arc>, verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, + transaction_status_sender: Option, ) -> Self { Self::new_num_threads( cluster_info, @@ -80,6 +83,7 @@ impl BankingStage { verified_receiver, verified_vote_receiver, Self::num_threads(), + transaction_status_sender, ) } @@ -89,6 +93,7 @@ impl BankingStage { verified_receiver: CrossbeamReceiver>, verified_vote_receiver: CrossbeamReceiver>, num_threads: u32, + transaction_status_sender: Option, ) -> Self { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); // Single thread to generate entries from many banks. @@ -108,6 +113,7 @@ impl BankingStage { let poh_recorder = poh_recorder.clone(); let cluster_info = cluster_info.clone(); let mut recv_start = Instant::now(); + let transaction_status_sender = transaction_status_sender.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) .spawn(move || { @@ -121,6 +127,7 @@ impl BankingStage { enable_forwarding, i, batch_limit, + transaction_status_sender.clone(), ); }) .unwrap() @@ -155,6 +162,7 @@ impl BankingStage { poh_recorder: &Arc>, buffered_packets: &mut Vec, batch_limit: usize, + transaction_status_sender: Option, ) -> Result { let mut unprocessed_packets = vec![]; let mut rebuffered_packets = 0; @@ -185,6 +193,7 @@ impl BankingStage { &poh_recorder, &msgs, unprocessed_indexes.to_owned(), + transaction_status_sender.clone(), ); new_tx_count += processed; @@ -277,6 +286,7 @@ impl BankingStage { buffered_packets: &mut Vec, enable_forwarding: bool, batch_limit: usize, + transaction_status_sender: Option, ) -> Result<()> { let (leader_at_slot_offset, poh_has_bank, would_be_leader) = { let poh = poh_recorder.lock().unwrap(); @@ -303,6 +313,7 @@ impl BankingStage { poh_recorder, buffered_packets, batch_limit, + transaction_status_sender, )?; buffered_packets.append(&mut unprocessed); Ok(()) @@ -350,6 +361,7 @@ impl BankingStage { enable_forwarding: bool, id: u32, batch_limit: usize, + transaction_status_sender: Option, ) { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = vec![]; @@ -363,6 +375,7 @@ impl BankingStage { &mut buffered_packets, enable_forwarding, batch_limit, + transaction_status_sender.clone(), ) .unwrap_or_else(|_| buffered_packets.clear()); } @@ -385,6 +398,7 @@ impl BankingStage { recv_timeout, id, batch_limit, + transaction_status_sender.clone(), ) { Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout)) => (), Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected)) => break, @@ -482,9 +496,10 @@ impl BankingStage { } fn process_and_record_transactions_locked( - bank: &Bank, + bank: &Arc, poh: &Arc>, batch: &TransactionBatch, + transaction_status_sender: Option, ) -> (Result, Vec) { let mut load_execute_time = Measure::start("load_execute_time"); // Use a shorter maximum age when adding transactions into the pipeline. This will reduce @@ -512,14 +527,24 @@ impl BankingStage { let num_to_commit = num_to_commit.unwrap(); if num_to_commit != 0 { - bank.commit_transactions( - txs, - None, - &mut loaded_accounts, - &results, - tx_count, - signature_count, - ); + let transaction_statuses = bank + .commit_transactions( + txs, + None, + &mut loaded_accounts, + &results, + tx_count, + signature_count, + ) + .processing_results; + if let Some(sender) = transaction_status_sender { + send_transaction_status_batch( + bank.clone(), + batch.transactions(), + transaction_statuses, + sender, + ); + } } commit_time.stop(); @@ -538,10 +563,11 @@ impl BankingStage { } pub fn process_and_record_transactions( - bank: &Bank, + bank: &Arc, txs: &[Transaction], poh: &Arc>, chunk_offset: usize, + transaction_status_sender: Option, ) -> (Result, Vec) { let mut lock_time = Measure::start("lock_time"); // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -549,8 +575,12 @@ impl BankingStage { let batch = bank.prepare_batch(txs, None); lock_time.stop(); - let (result, mut retryable_txs) = - Self::process_and_record_transactions_locked(bank, poh, &batch); + let (result, mut retryable_txs) = Self::process_and_record_transactions_locked( + bank, + poh, + &batch, + transaction_status_sender, + ); retryable_txs.iter_mut().for_each(|x| *x += chunk_offset); let mut unlock_time = Measure::start("unlock_time"); @@ -574,9 +604,10 @@ impl BankingStage { /// Returns the number of transactions successfully processed by the bank, which may be less /// than the total number if max PoH height was reached and the bank halted fn process_transactions( - bank: &Bank, + bank: &Arc, transactions: &[Transaction], poh: &Arc>, + transaction_status_sender: Option, ) -> (usize, Vec) { let mut chunk_start = 0; let mut unprocessed_txs = vec![]; @@ -591,6 +622,7 @@ impl BankingStage { &transactions[chunk_start..chunk_end], poh, chunk_start, + transaction_status_sender.clone(), ); trace!("process_transactions result: {:?}", result); @@ -724,6 +756,7 @@ impl BankingStage { poh: &Arc>, msgs: &Packets, packet_indexes: Vec, + transaction_status_sender: Option, ) -> (usize, usize, Vec) { let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets(msgs, &packet_indexes); @@ -736,7 +769,7 @@ impl BankingStage { let tx_len = transactions.len(); let (processed, unprocessed_tx_indexes) = - Self::process_transactions(bank, &transactions, poh); + Self::process_transactions(bank, &transactions, poh, transaction_status_sender); let unprocessed_tx_count = unprocessed_tx_indexes.len(); @@ -815,6 +848,7 @@ impl BankingStage { recv_timeout: Duration, id: u32, batch_limit: usize, + transaction_status_sender: Option, ) -> Result { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; @@ -851,8 +885,13 @@ impl BankingStage { } let bank = bank.unwrap(); - let (processed, verified_txs_len, unprocessed_indexes) = - Self::process_received_packets(&bank, &poh, &msgs, packet_indexes); + let (processed, verified_txs_len, unprocessed_indexes) = Self::process_received_packets( + &bank, + &poh, + &msgs, + packet_indexes, + transaction_status_sender.clone(), + ); new_tx_count += processed; @@ -969,20 +1008,30 @@ pub fn create_test_recorder( #[cfg(test)] mod tests { use super::*; - use crate::cluster_info::Node; - use crate::genesis_utils::{create_genesis_config, GenesisConfigInfo}; - use crate::packet::to_packets; - use crate::poh_recorder::WorkingBank; + use crate::{ + cluster_info::Node, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + packet::to_packets, + poh_recorder::WorkingBank, + transaction_status_service::TransactionStatusService, + }; use crossbeam_channel::unbounded; use itertools::Itertools; - use solana_ledger::entry::{Entry, EntrySlice}; - use solana_ledger::get_tmp_ledger_path; - use solana_sdk::instruction::InstructionError; - use solana_sdk::signature::{Keypair, KeypairUtil}; - use solana_sdk::system_transaction; - use solana_sdk::transaction::TransactionError; - use std::sync::atomic::Ordering; - use std::thread::sleep; + use solana_ledger::{ + blocktree::entries_to_test_shreds, + entry::{next_entry, Entry, EntrySlice}, + get_tmp_ledger_path, + }; + use solana_sdk::{ + instruction::InstructionError, + signature::{Keypair, KeypairUtil}, + system_transaction, + transaction::TransactionError, + }; + use std::{ + sync::{atomic::Ordering, mpsc::channel}, + thread::sleep, + }; #[test] fn test_banking_stage_shutdown1() { @@ -1004,6 +1053,7 @@ mod tests { &poh_recorder, verified_receiver, vote_receiver, + None, ); drop(verified_sender); drop(vote_sender); @@ -1042,6 +1092,7 @@ mod tests { &poh_recorder, verified_receiver, vote_receiver, + None, ); trace!("sending bank"); drop(verified_sender); @@ -1103,6 +1154,7 @@ mod tests { &poh_recorder, verified_receiver, vote_receiver, + None, ); // fund another account so we can send 2 good transactions in a single batch. @@ -1244,6 +1296,7 @@ mod tests { verified_receiver, vote_receiver, 2, + None, ); // wait for banking_stage to eat the packets @@ -1644,9 +1697,15 @@ mod tests { poh_recorder.lock().unwrap().set_working_bank(working_bank); - BankingStage::process_and_record_transactions(&bank, &transactions, &poh_recorder, 0) - .0 - .unwrap(); + BankingStage::process_and_record_transactions( + &bank, + &transactions, + &poh_recorder, + 0, + None, + ) + .0 + .unwrap(); poh_recorder.lock().unwrap().tick(); let mut done = false; @@ -1678,7 +1737,8 @@ mod tests { &bank, &transactions, &poh_recorder, - 0 + 0, + None, ) .0, Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) @@ -1735,6 +1795,7 @@ mod tests { &transactions, &poh_recorder, 0, + None, ); assert!(result.is_ok()); @@ -1819,7 +1880,7 @@ mod tests { let poh_recorder = Arc::new(Mutex::new(poh_recorder)); let (processed_transactions_count, mut retryable_txs) = - BankingStage::process_transactions(&bank, &transactions, &poh_recorder); + BankingStage::process_transactions(&bank, &transactions, &poh_recorder, None); assert_eq!(processed_transactions_count, 0,); @@ -1830,4 +1891,101 @@ mod tests { Blocktree::destroy(&ledger_path).unwrap(); } + + #[test] + fn test_write_persist_transaction_status() { + solana_logger::setup(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10_000); + let bank = Arc::new(Bank::new(&genesis_config)); + let pubkey = Pubkey::new_rand(); + let pubkey1 = Pubkey::new_rand(); + let keypair1 = Keypair::new(); + + let success_tx = + system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()); + let success_signature = success_tx.signatures[0]; + let entry_1 = next_entry(&genesis_config.hash(), 1, vec![success_tx.clone()]); + let ix_error_tx = + system_transaction::transfer(&keypair1, &pubkey1, 10, genesis_config.hash()); + let ix_error_signature = ix_error_tx.signatures[0]; + let entry_2 = next_entry(&entry_1.hash, 1, vec![ix_error_tx.clone()]); + let fail_tx = + system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()); + let entry_3 = next_entry(&entry_2.hash, 1, vec![fail_tx.clone()]); + let entries = vec![entry_1, entry_2, entry_3]; + + let transactions = vec![success_tx, ix_error_tx, fail_tx]; + bank.transfer(4, &mint_keypair, &keypair1.pubkey()).unwrap(); + + let working_bank = WorkingBank { + bank: bank.clone(), + min_tick_height: bank.tick_height(), + max_tick_height: bank.tick_height() + 1, + }; + let ledger_path = get_tmp_ledger_path!(); + { + let blocktree = + Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"); + let blocktree = Arc::new(blocktree); + let (poh_recorder, _entry_receiver) = PohRecorder::new( + bank.tick_height(), + bank.last_blockhash(), + bank.slot(), + Some((4, 4)), + bank.ticks_per_slot(), + &pubkey, + &blocktree, + &Arc::new(LeaderScheduleCache::new_from_bank(&bank)), + &Arc::new(PohConfig::default()), + ); + let poh_recorder = Arc::new(Mutex::new(poh_recorder)); + + poh_recorder.lock().unwrap().set_working_bank(working_bank); + + let shreds = entries_to_test_shreds(entries.clone(), bank.slot(), 0, true, 0); + blocktree.insert_shreds(shreds, None, false).unwrap(); + blocktree.set_roots(&[bank.slot()]).unwrap(); + + let (transaction_status_sender, transaction_status_receiver) = channel(); + let transaction_status_service = TransactionStatusService::new( + transaction_status_receiver, + blocktree.clone(), + &Arc::new(AtomicBool::new(false)), + ); + + let _ = BankingStage::process_and_record_transactions( + &bank, + &transactions, + &poh_recorder, + 0, + Some(transaction_status_sender), + ); + + transaction_status_service.join().unwrap(); + + let confirmed_block = blocktree.get_confirmed_block(bank.slot()).unwrap(); + assert_eq!(confirmed_block.transactions.len(), 3); + + for (transaction, result) in confirmed_block.transactions.into_iter() { + if transaction.signatures[0] == success_signature { + assert_eq!(result.unwrap().status, Ok(())); + } else if transaction.signatures[0] == ix_error_signature { + assert_eq!( + result.unwrap().status, + Err(TransactionError::InstructionError( + 0, + InstructionError::CustomError(1) + )) + ); + } else { + assert_eq!(result, None); + } + } + } + Blocktree::destroy(&ledger_path).unwrap(); + } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 78f90e3b38..0684012716 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -55,6 +55,7 @@ pub mod snapshot_packager_service; pub mod storage_stage; pub mod streamer; pub mod tpu; +pub mod transaction_status_service; pub mod tvu; pub mod validator; pub mod weighted_shuffle; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 135584b905..8748487178 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1,20 +1,20 @@ //! The `replay_stage` replays transactions broadcast by the leader. -use crate::cluster_info::ClusterInfo; -use crate::commitment::{ - AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData, +use crate::{ + cluster_info::ClusterInfo, + commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData}, + consensus::{StakeLockout, Tower}, + poh_recorder::PohRecorder, + result::{Error, Result}, + rpc_subscriptions::RpcSubscriptions, + thread_mem_usage, }; -use crate::consensus::{StakeLockout, Tower}; -use crate::poh_recorder::PohRecorder; -use crate::result::{Error, Result}; -use crate::rpc_subscriptions::RpcSubscriptions; -use crate::thread_mem_usage; use jemalloc_ctl::thread::allocatedp; use solana_ledger::{ bank_forks::BankForks, block_error::BlockError, blocktree::{Blocktree, BlocktreeError}, - blocktree_processor, + blocktree_processor::{self, TransactionStatusSender}, entry::{Entry, EntrySlice}, leader_schedule_cache::LeaderScheduleCache, snapshot_package::SnapshotPackageSender, @@ -182,6 +182,7 @@ impl ReplayStage { slot_full_senders: Vec>, snapshot_package_sender: Option, block_commitment_cache: Arc>, + transaction_status_sender: Option, ) -> (Self, Receiver>>) where T: 'static + KeypairUtil + Send + Sync, @@ -245,6 +246,7 @@ impl ReplayStage { &my_pubkey, &mut progress, &slot_full_senders, + transaction_status_sender.clone(), ); datapoint_debug!( "replay_stage-memory", @@ -493,6 +495,7 @@ impl ReplayStage { bank: &Arc, blocktree: &Blocktree, bank_progress: &mut ForkProgress, + transaction_status_sender: Option, ) -> (Result<()>, usize) { let mut tx_count = 0; let now = Instant::now(); @@ -514,7 +517,14 @@ impl ReplayStage { slot_full, ); tx_count += entries.iter().map(|e| e.transactions.len()).sum::(); - Self::replay_entries_into_bank(bank, bank_progress, entries, num_shreds, slot_full) + Self::replay_entries_into_bank( + bank, + bank_progress, + entries, + num_shreds, + slot_full, + transaction_status_sender, + ) }); if Self::is_replay_result_fatal(&replay_result) { @@ -663,6 +673,7 @@ impl ReplayStage { my_pubkey: &Pubkey, progress: &mut HashMap, slot_full_senders: &[Sender<(u64, Pubkey)>], + transaction_status_sender: Option, ) -> bool { let mut did_complete_bank = false; let mut tx_count = 0; @@ -685,8 +696,12 @@ impl ReplayStage { .entry(bank.slot()) .or_insert_with(|| ForkProgress::new(bank.slot(), bank.last_blockhash())); if bank.collector_id() != my_pubkey { - let (replay_result, replay_tx_count) = - Self::replay_blocktree_into_bank(&bank, &blocktree, bank_progress); + let (replay_result, replay_tx_count) = Self::replay_blocktree_into_bank( + &bank, + &blocktree, + bank_progress, + transaction_status_sender.clone(), + ); tx_count += replay_tx_count; if Self::is_replay_result_fatal(&replay_result) { trace!("replay_result_fatal slot {}", bank_slot); @@ -950,6 +965,7 @@ impl ReplayStage { entries: Vec, num_shreds: usize, slot_full: bool, + transaction_status_sender: Option, ) -> Result<()> { let result = Self::verify_and_process_entries( &bank, @@ -957,6 +973,7 @@ impl ReplayStage { slot_full, bank_progress.num_shreds, bank_progress, + transaction_status_sender, ); bank_progress.num_shreds += num_shreds; bank_progress.num_entries += entries.len(); @@ -1008,6 +1025,7 @@ impl ReplayStage { slot_full: bool, shred_index: usize, bank_progress: &mut ForkProgress, + transaction_status_sender: Option, ) -> Result<()> { let last_entry = &bank_progress.last_entry; let tick_hash_count = &mut bank_progress.tick_hash_count; @@ -1042,7 +1060,8 @@ impl ReplayStage { let mut entry_state = entries.start_verify(last_entry); let mut replay_elapsed = Measure::start("replay_elapsed"); - let res = blocktree_processor::process_entries(bank, entries, true); + let res = + blocktree_processor::process_entries(bank, entries, true, transaction_status_sender); replay_elapsed.stop(); bank_progress.stats.replay_elapsed += replay_elapsed.as_us(); @@ -1125,29 +1144,38 @@ impl ReplayStage { #[cfg(test)] mod test { use super::*; - use crate::commitment::BlockCommitment; - use crate::genesis_utils::{create_genesis_config, create_genesis_config_with_leader}; - use crate::replay_stage::ReplayStage; - use solana_ledger::blocktree::make_slot_entries; - use solana_ledger::entry; - use solana_ledger::shred::{ - CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED, - SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD, + use crate::{ + commitment::BlockCommitment, + genesis_utils::{create_genesis_config, create_genesis_config_with_leader}, + replay_stage::ReplayStage, + transaction_status_service::TransactionStatusService, }; use solana_ledger::{ + blocktree::make_slot_entries, blocktree::{entries_to_test_shreds, BlocktreeError}, + create_new_tmp_ledger, + entry::{self, next_entry}, get_tmp_ledger_path, + shred::{ + CodingShredHeader, DataShredHeader, Shred, ShredCommonHeader, DATA_COMPLETE_SHRED, + SIZE_OF_COMMON_SHRED_HEADER, SIZE_OF_DATA_SHRED_HEADER, SIZE_OF_DATA_SHRED_PAYLOAD, + }, }; use solana_runtime::genesis_utils::GenesisConfigInfo; - use solana_sdk::hash::{hash, Hash}; - use solana_sdk::packet::PACKET_DATA_SIZE; - use solana_sdk::signature::{Keypair, KeypairUtil}; - use solana_sdk::system_transaction; - use solana_sdk::transaction::TransactionError; + use solana_sdk::{ + hash::{hash, Hash}, + instruction::InstructionError, + packet::PACKET_DATA_SIZE, + signature::{Keypair, KeypairUtil}, + system_transaction, + transaction::TransactionError, + }; use solana_vote_program::vote_state::VoteState; - use std::fs::remove_dir_all; - use std::iter::FromIterator; - use std::sync::{Arc, RwLock}; + use std::{ + fs::remove_dir_all, + iter::FromIterator, + sync::{Arc, RwLock}, + }; #[test] fn test_child_slots_of_same_parent() { @@ -1429,8 +1457,12 @@ mod test { .or_insert_with(|| ForkProgress::new(0, last_blockhash)); let shreds = shred_to_insert(&mint_keypair, bank0.clone()); blocktree.insert_shreds(shreds, None, false).unwrap(); - let (res, _tx_count) = - ReplayStage::replay_blocktree_into_bank(&bank0, &blocktree, &mut bank0_progress); + let (res, _tx_count) = ReplayStage::replay_blocktree_into_bank( + &bank0, + &blocktree, + &mut bank0_progress, + None, + ); // Check that the erroring bank was marked as dead in the progress map assert!(progress @@ -1675,4 +1707,90 @@ mod test { assert!(res.1.is_some()); assert_eq!(res.1.unwrap().slot(), 11); } + + #[test] + fn test_write_persist_transaction_status() { + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(1000); + let (ledger_path, blockhash) = create_new_tmp_ledger!(&genesis_config); + { + let blocktree = Blocktree::open(&ledger_path) + .expect("Expected to successfully open database ledger"); + let blocktree = Arc::new(blocktree); + + let keypair1 = Keypair::new(); + let keypair2 = Keypair::new(); + let keypair3 = Keypair::new(); + + let bank0 = Arc::new(Bank::new(&genesis_config)); + bank0 + .transfer(4, &mint_keypair, &keypair2.pubkey()) + .unwrap(); + + let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1)); + let slot = bank1.slot(); + + // Generate transactions for processing + // Successful transaction + let success_tx = + system_transaction::transfer(&mint_keypair, &keypair1.pubkey(), 2, blockhash); + let success_signature = success_tx.signatures[0]; + let entry_1 = next_entry(&blockhash, 1, vec![success_tx]); + // Failed transaction, InstructionError + let ix_error_tx = + system_transaction::transfer(&keypair2, &keypair3.pubkey(), 10, blockhash); + let ix_error_signature = ix_error_tx.signatures[0]; + let entry_2 = next_entry(&entry_1.hash, 1, vec![ix_error_tx]); + // Failed transaction + let fail_tx = + system_transaction::transfer(&mint_keypair, &keypair2.pubkey(), 2, Hash::default()); + let entry_3 = next_entry(&entry_2.hash, 1, vec![fail_tx]); + let entries = vec![entry_1, entry_2, entry_3]; + + let shreds = entries_to_test_shreds(entries.clone(), slot, bank0.slot(), true, 0); + blocktree.insert_shreds(shreds, None, false).unwrap(); + blocktree.set_roots(&[slot]).unwrap(); + + let (transaction_status_sender, transaction_status_receiver) = channel(); + let transaction_status_service = TransactionStatusService::new( + transaction_status_receiver, + blocktree.clone(), + &Arc::new(AtomicBool::new(false)), + ); + + // Check that process_entries successfully writes can_commit transactions statuses, and + // that they are matched properly by get_confirmed_block + let _result = blocktree_processor::process_entries( + &bank1, + &entries, + true, + Some(transaction_status_sender), + ); + + transaction_status_service.join().unwrap(); + + let confirmed_block = blocktree.get_confirmed_block(slot).unwrap(); + assert_eq!(confirmed_block.transactions.len(), 3); + + for (transaction, result) in confirmed_block.transactions.into_iter() { + if transaction.signatures[0] == success_signature { + assert_eq!(result.unwrap().status, Ok(())); + } else if transaction.signatures[0] == ix_error_signature { + assert_eq!( + result.unwrap().status, + Err(TransactionError::InstructionError( + 0, + InstructionError::CustomError(1) + )) + ); + } else { + assert_eq!(result, None); + } + } + } + Blocktree::destroy(&ledger_path).unwrap(); + } } diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 330b151720..f370890945 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -1,21 +1,27 @@ //! The `tpu` module implements the Transaction Processing Unit, a //! multi-stage transaction processing pipeline in software. -use crate::banking_stage::BankingStage; -use crate::broadcast_stage::{BroadcastStage, BroadcastStageType}; -use crate::cluster_info::ClusterInfo; -use crate::cluster_info_vote_listener::ClusterInfoVoteListener; -use crate::fetch_stage::FetchStage; -use crate::poh_recorder::{PohRecorder, WorkingBankEntry}; -use crate::sigverify::TransactionSigVerifier; -use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage}; +use crate::{ + banking_stage::BankingStage, + broadcast_stage::{BroadcastStage, BroadcastStageType}, + cluster_info::ClusterInfo, + cluster_info_vote_listener::ClusterInfoVoteListener, + fetch_stage::FetchStage, + poh_recorder::{PohRecorder, WorkingBankEntry}, + sigverify::TransactionSigVerifier, + sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, +}; use crossbeam_channel::unbounded; -use solana_ledger::blocktree::Blocktree; -use std::net::UdpSocket; -use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{channel, Receiver}; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread; +use solana_ledger::{blocktree::Blocktree, blocktree_processor::TransactionStatusSender}; +use std::{ + net::UdpSocket, + sync::{ + atomic::AtomicBool, + mpsc::{channel, Receiver}, + Arc, Mutex, RwLock, + }, + thread, +}; pub struct Tpu { fetch_stage: FetchStage, @@ -35,6 +41,7 @@ impl Tpu { tpu_forwards_sockets: Vec, broadcast_socket: UdpSocket, sigverify_disabled: bool, + transaction_status_sender: Option, blocktree: &Arc, broadcast_type: &BroadcastStageType, exit: &Arc, @@ -72,6 +79,7 @@ impl Tpu { poh_recorder, verified_receiver, verified_vote_receiver, + transaction_status_sender, ); let broadcast_stage = broadcast_type.new_broadcast_stage( diff --git a/core/src/transaction_status_service.rs b/core/src/transaction_status_service.rs new file mode 100644 index 0000000000..da8a24a21b --- /dev/null +++ b/core/src/transaction_status_service.rs @@ -0,0 +1,79 @@ +use crate::result::{Error, Result}; +use solana_client::rpc_request::RpcTransactionStatus; +use solana_ledger::{blocktree::Blocktree, blocktree_processor::TransactionStatusBatch}; +use solana_runtime::bank::Bank; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{Receiver, RecvTimeoutError}, + Arc, + }, + thread::{self, Builder, JoinHandle}, + time::Duration, +}; + +pub struct TransactionStatusService { + thread_hdl: JoinHandle<()>, +} + +impl TransactionStatusService { + #[allow(clippy::new_ret_no_self)] + pub fn new( + write_transaction_status_receiver: Receiver, + blocktree: Arc, + exit: &Arc, + ) -> Self { + let exit = exit.clone(); + let thread_hdl = Builder::new() + .name("solana-transaction-status-writer".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + break; + } + if let Err(e) = Self::write_transaction_status_batch( + &write_transaction_status_receiver, + &blocktree, + ) { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => info!("Error from write_transaction_statuses: {:?}", e), + } + } + }) + .unwrap(); + Self { thread_hdl } + } + + fn write_transaction_status_batch( + write_transaction_status_receiver: &Receiver, + blocktree: &Arc, + ) -> Result<()> { + let TransactionStatusBatch { + bank, + transactions, + statuses, + } = write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))?; + + let slot = bank.slot(); + for (transaction, status) in transactions.iter().zip(statuses) { + if Bank::can_commit(&status) && !transaction.signatures.is_empty() { + let fee_calculator = bank + .get_fee_calculator(&transaction.message().recent_blockhash) + .expect("FeeCalculator must exist"); + let fee = fee_calculator.calculate_fee(transaction.message()); + blocktree + .write_transaction_status( + (slot, transaction.signatures[0]), + &RpcTransactionStatus { status, fee }, + ) + .expect("Expect database write to succeed"); + } + } + Ok(()) + } + + pub fn join(self) -> thread::Result<()> { + self.thread_hdl.join() + } +} diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 46ae55b56f..456b12178c 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -1,32 +1,43 @@ //! The `tvu` module implements the Transaction Validation Unit, a multi-stage transaction //! validation pipeline in software. -use crate::blockstream_service::BlockstreamService; -use crate::cluster_info::ClusterInfo; -use crate::commitment::BlockCommitmentCache; -use crate::ledger_cleanup_service::LedgerCleanupService; -use crate::partition_cfg::PartitionCfg; -use crate::poh_recorder::PohRecorder; -use crate::replay_stage::ReplayStage; -use crate::retransmit_stage::RetransmitStage; -use crate::rpc_subscriptions::RpcSubscriptions; -use crate::shred_fetch_stage::ShredFetchStage; -use crate::sigverify_shreds::ShredSigVerifier; -use crate::sigverify_stage::{DisabledSigVerifier, SigVerifyStage}; -use crate::snapshot_packager_service::SnapshotPackagerService; -use crate::storage_stage::{StorageStage, StorageState}; +use crate::{ + blockstream_service::BlockstreamService, + cluster_info::ClusterInfo, + commitment::BlockCommitmentCache, + ledger_cleanup_service::LedgerCleanupService, + partition_cfg::PartitionCfg, + poh_recorder::PohRecorder, + replay_stage::ReplayStage, + retransmit_stage::RetransmitStage, + rpc_subscriptions::RpcSubscriptions, + shred_fetch_stage::ShredFetchStage, + sigverify_shreds::ShredSigVerifier, + sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, + snapshot_packager_service::SnapshotPackagerService, + storage_stage::{StorageStage, StorageState}, +}; use crossbeam_channel::unbounded; -use solana_ledger::bank_forks::BankForks; -use solana_ledger::blocktree::{Blocktree, CompletedSlotsReceiver}; use solana_ledger::leader_schedule_cache::LeaderScheduleCache; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::signature::{Keypair, KeypairUtil}; -use std::net::UdpSocket; -use std::path::PathBuf; -use std::sync::atomic::AtomicBool; -use std::sync::mpsc::{channel, Receiver}; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread; +use solana_ledger::{ + bank_forks::BankForks, + blocktree::{Blocktree, CompletedSlotsReceiver}, + blocktree_processor::TransactionStatusSender, +}; +use solana_sdk::{ + pubkey::Pubkey, + signature::{Keypair, KeypairUtil}, +}; +use std::{ + net::UdpSocket, + path::PathBuf, + sync::{ + atomic::AtomicBool, + mpsc::{channel, Receiver}, + Arc, Mutex, RwLock, + }, + thread, +}; pub struct Tvu { fetch_stage: ShredFetchStage, @@ -75,6 +86,7 @@ impl Tvu { sigverify_disabled: bool, cfg: Option, shred_version: u16, + transaction_status_sender: Option, ) -> Self where T: 'static + KeypairUtil + Sync + Send, @@ -165,6 +177,7 @@ impl Tvu { vec![blockstream_slot_sender, ledger_cleanup_slot_sender], snapshot_package_sender, block_commitment_cache, + transaction_status_sender, ); let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket { @@ -297,6 +310,7 @@ pub mod tests { false, None, 0, + None, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index 966986f552..700d122ac5 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -16,6 +16,7 @@ use crate::{ sigverify, storage_stage::StorageState, tpu::Tpu, + transaction_status_service::TransactionStatusService, tvu::{Sockets, Tvu}, }; use solana_ledger::{ @@ -43,7 +44,7 @@ use std::{ path::{Path, PathBuf}, process, sync::atomic::{AtomicBool, Ordering}, - sync::mpsc::Receiver, + sync::mpsc::{channel, Receiver}, sync::{Arc, Mutex, RwLock}, thread::Result, }; @@ -54,6 +55,7 @@ pub struct ValidatorConfig { pub dev_halt_at_slot: Option, pub expected_genesis_hash: Option, pub voting_disabled: bool, + pub transaction_status_service_disabled: bool, pub blockstream_unix_socket: Option, pub storage_slots_per_turn: u64, pub account_paths: Option, @@ -71,6 +73,7 @@ impl Default for ValidatorConfig { dev_halt_at_slot: None, expected_genesis_hash: None, voting_disabled: false, + transaction_status_service_disabled: false, blockstream_unix_socket: None, storage_slots_per_turn: DEFAULT_SLOTS_PER_TURN, max_ledger_slots: None, @@ -105,6 +108,7 @@ pub struct Validator { validator_exit: Arc>>, rpc_service: Option, rpc_pubsub_service: Option, + transaction_status_service: Option, gossip_service: GossipService, poh_recorder: Arc>, poh_service: PohService, @@ -238,6 +242,21 @@ impl Validator { )) }; + let (transaction_status_sender, transaction_status_service) = + if rpc_service.is_some() && !config.transaction_status_service_disabled { + let (transaction_status_sender, transaction_status_receiver) = channel(); + ( + Some(transaction_status_sender), + Some(TransactionStatusService::new( + transaction_status_receiver, + blocktree.clone(), + &exit, + )), + ) + } else { + (None, None) + }; + info!( "Starting PoH: epoch={} slot={} tick_height={} blockhash={} leader={:?}", bank.epoch(), @@ -350,6 +369,7 @@ impl Validator { config.dev_sigverify_disabled, config.partition_cfg.clone(), shred_version, + transaction_status_sender.clone(), ); if config.dev_sigverify_disabled { @@ -364,6 +384,7 @@ impl Validator { node.sockets.tpu_forwards, node.sockets.broadcast, config.dev_sigverify_disabled, + transaction_status_sender, &blocktree, &config.broadcast_stage_type, &exit, @@ -376,6 +397,7 @@ impl Validator { gossip_service, rpc_service, rpc_pubsub_service, + transaction_status_service, tpu, tvu, poh_service, @@ -426,6 +448,9 @@ impl Validator { if let Some(rpc_pubsub_service) = self.rpc_pubsub_service { rpc_pubsub_service.join()?; } + if let Some(transaction_status_service) = self.transaction_status_service { + transaction_status_service.join()?; + } self.gossip_service.join()?; self.tpu.join()?; @@ -529,6 +554,8 @@ pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) { let leader_voting_keypair = Arc::new(voting_keypair); let storage_keypair = Arc::new(Keypair::new()); + let mut config = ValidatorConfig::default(); + config.transaction_status_service_disabled = true; let node = Validator::new( node, &node_keypair, @@ -538,7 +565,7 @@ pub fn new_validator_for_tests() -> (Validator, ContactInfo, Keypair, PathBuf) { &storage_keypair, None, true, - &ValidatorConfig::default(), + &config, ); discover_cluster(&contact_info.gossip, 1).expect("Node startup failed"); (node, contact_info, mint_keypair, ledger_path) @@ -565,6 +592,8 @@ mod tests { let voting_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new()); + let mut config = ValidatorConfig::default(); + config.transaction_status_service_disabled = true; let validator = Validator::new( validator_node, &Arc::new(validator_keypair), @@ -574,7 +603,7 @@ mod tests { &storage_keypair, Some(&leader_node.info), true, - &ValidatorConfig::default(), + &config, ); validator.close().unwrap(); remove_dir_all(validator_ledger_path).unwrap(); @@ -597,6 +626,8 @@ mod tests { ledger_paths.push(validator_ledger_path.clone()); let voting_keypair = Arc::new(Keypair::new()); let storage_keypair = Arc::new(Keypair::new()); + let mut config = ValidatorConfig::default(); + config.transaction_status_service_disabled = true; Validator::new( validator_node, &Arc::new(validator_keypair), @@ -606,7 +637,7 @@ mod tests { &storage_keypair, Some(&leader_node.info), true, - &ValidatorConfig::default(), + &config, ) }) .collect(); diff --git a/core/tests/storage_stage.rs b/core/tests/storage_stage.rs index 1d31a3519a..929d110d2a 100644 --- a/core/tests/storage_stage.rs +++ b/core/tests/storage_stage.rs @@ -117,6 +117,7 @@ mod tests { bank.last_blockhash(), ), true, + None, ) .unwrap(); let message = Message::new_with_payer(vec![mining_proof_ix], Some(&mint_keypair.pubkey())); @@ -206,6 +207,7 @@ mod tests { &bank, &entry::create_ticks(64, 0, bank.last_blockhash()), true, + None, ) .expect("failed process entries"); last_bank = bank; diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index 55d73dc5c9..85243321f4 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -30,7 +30,7 @@ use solana_sdk::{ clock::{Slot, DEFAULT_TICKS_PER_SECOND}, genesis_config::GenesisConfig, hash::Hash, - signature::{Keypair, KeypairUtil}, + signature::{Keypair, KeypairUtil, Signature}, timing::timestamp, transaction::Transaction, }; @@ -1175,6 +1175,14 @@ impl Blocktree { .collect() } + pub fn write_transaction_status( + &self, + index: (Slot, Signature), + status: &RpcTransactionStatus, + ) -> Result<()> { + self.transaction_status_cf.put(index, status) + } + /// Returns the entry vector for the slot starting with `shred_start_index` pub fn get_slot_entries( &self, diff --git a/ledger/src/blocktree_processor.rs b/ledger/src/blocktree_processor.rs index 4863e94487..bce8c5afe1 100644 --- a/ledger/src/blocktree_processor.rs +++ b/ledger/src/blocktree_processor.rs @@ -12,19 +12,22 @@ use rand::{seq::SliceRandom, thread_rng}; use rayon::{prelude::*, ThreadPool}; use solana_metrics::{datapoint, datapoint_error, inc_new_counter_debug}; use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::{bank::Bank, transaction_batch::TransactionBatch}; +use solana_runtime::{ + bank::{Bank, TransactionResults}, + transaction_batch::TransactionBatch, +}; use solana_sdk::{ clock::{Slot, MAX_RECENT_BLOCKHASHES}, genesis_config::GenesisConfig, hash::Hash, signature::{Keypair, KeypairUtil}, timing::duration_as_ms, - transaction::Result, + transaction::{Result, Transaction}, }; use std::{ cell::RefCell, result, - sync::Arc, + sync::{mpsc::Sender, Arc}, time::{Duration, Instant}, }; @@ -43,13 +46,29 @@ fn first_err(results: &[Result<()>]) -> Result<()> { Ok(()) } -fn execute_batch(batch: &TransactionBatch) -> Result<()> { - let results = batch +fn execute_batch( + batch: &TransactionBatch, + bank: &Arc, + transaction_status_sender: Option, +) -> Result<()> { + let TransactionResults { + fee_collection_results, + processing_results, + } = batch .bank() .load_execute_and_commit_transactions(batch, MAX_RECENT_BLOCKHASHES); + if let Some(sender) = transaction_status_sender { + send_transaction_status_batch( + bank.clone(), + batch.transactions(), + processing_results, + sender, + ); + } + let mut first_err = None; - for (result, transaction) in results.iter().zip(batch.transactions()) { + for (result, transaction) in fee_collection_results.iter().zip(batch.transactions()) { if let Err(ref err) = result { if first_err.is_none() { first_err = Some(result.clone()); @@ -75,14 +94,15 @@ fn execute_batches( bank: &Arc, batches: &[TransactionBatch], entry_callback: Option<&ProcessCallback>, + transaction_status_sender: Option, ) -> Result<()> { inc_new_counter_debug!("bank-par_execute_entries-count", batches.len()); let results: Vec> = PAR_THREAD_POOL.with(|thread_pool| { thread_pool.borrow().install(|| { batches .into_par_iter() - .map(|batch| { - let result = execute_batch(batch); + .map_with(transaction_status_sender, |sender, batch| { + let result = execute_batch(batch, bank, sender.clone()); if let Some(entry_callback) = entry_callback { entry_callback(bank); } @@ -100,8 +120,13 @@ fn execute_batches( /// 2. Process the locked group in parallel /// 3. Register the `Tick` if it's available /// 4. Update the leader scheduler, goto 1 -pub fn process_entries(bank: &Arc, entries: &[Entry], randomize: bool) -> Result<()> { - process_entries_with_callback(bank, entries, randomize, None) +pub fn process_entries( + bank: &Arc, + entries: &[Entry], + randomize: bool, + transaction_status_sender: Option, +) -> Result<()> { + process_entries_with_callback(bank, entries, randomize, None, transaction_status_sender) } fn process_entries_with_callback( @@ -109,6 +134,7 @@ fn process_entries_with_callback( entries: &[Entry], randomize: bool, entry_callback: Option<&ProcessCallback>, + transaction_status_sender: Option, ) -> Result<()> { // accumulator for entries that can be processed in parallel let mut batches = vec![]; @@ -120,7 +146,12 @@ fn process_entries_with_callback( if bank.is_block_boundary(bank.tick_height() + tick_hashes.len() as u64) { // If it's a tick that will cause a new blockhash to be created, // execute the group and register the tick - execute_batches(bank, &batches, entry_callback)?; + execute_batches( + bank, + &batches, + entry_callback, + transaction_status_sender.clone(), + )?; batches.clear(); for hash in &tick_hashes { bank.register_tick(hash); @@ -170,12 +201,17 @@ fn process_entries_with_callback( } else { // else we have an entry that conflicts with a prior entry // execute the current queue and try to process this entry again - execute_batches(bank, &batches, entry_callback)?; + execute_batches( + bank, + &batches, + entry_callback, + transaction_status_sender.clone(), + )?; batches.clear(); } } } - execute_batches(bank, &batches, entry_callback)?; + execute_batches(bank, &batches, entry_callback, transaction_status_sender)?; for hash in tick_hashes { bank.register_tick(&hash); } @@ -343,16 +379,15 @@ fn verify_and_process_slot_entries( } } - process_entries_with_callback(bank, &entries, true, opts.entry_callback.as_ref()).map_err( - |err| { + process_entries_with_callback(bank, &entries, true, opts.entry_callback.as_ref(), None) + .map_err(|err| { warn!( "Failed to process entries for slot {}: {:?}", bank.slot(), err ); BlocktreeProcessorError::InvalidTransaction - }, - )?; + })?; Ok(entries.last().unwrap().hash) } @@ -508,6 +543,33 @@ fn process_pending_slots( Ok(fork_info) } +pub struct TransactionStatusBatch { + pub bank: Arc, + pub transactions: Vec, + pub statuses: Vec>, +} +pub type TransactionStatusSender = Sender; + +pub fn send_transaction_status_batch( + bank: Arc, + transactions: &[Transaction], + statuses: Vec>, + transaction_status_sender: TransactionStatusSender, +) { + let slot = bank.slot(); + if let Err(e) = transaction_status_sender.send(TransactionStatusBatch { + bank, + transactions: transactions.to_vec(), + statuses, + }) { + trace!( + "Slot {} transaction_status send batch failed: {:?}", + slot, + e + ); + } +} + // used for tests only pub fn fill_blocktree_slot_with_ticks( blocktree: &Blocktree, @@ -542,9 +604,11 @@ pub fn fill_blocktree_slot_with_ticks( #[cfg(test)] pub mod tests { use super::*; - use crate::entry::{create_ticks, next_entry, next_entry_mut}; - use crate::genesis_utils::{ - create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo, + use crate::{ + entry::{create_ticks, next_entry, next_entry_mut}, + genesis_utils::{ + create_genesis_config, create_genesis_config_with_leader, GenesisConfigInfo, + }, }; use matches::assert_matches; use rand::{thread_rng, Rng}; @@ -1094,7 +1158,7 @@ pub mod tests { ); // Now ensure the TX is accepted despite pointing to the ID of an empty entry. - process_entries(&bank, &slot_entries, true).unwrap(); + process_entries(&bank, &slot_entries, true, None).unwrap(); assert_eq!(bank.process_transaction(&tx), Ok(())); } @@ -1300,7 +1364,7 @@ pub mod tests { // ensure bank can process a tick assert_eq!(bank.tick_height(), 0); let tick = next_entry(&genesis_config.hash(), 1, vec![]); - assert_eq!(process_entries(&bank, &[tick.clone()], true), Ok(())); + assert_eq!(process_entries(&bank, &[tick.clone()], true, None), Ok(())); assert_eq!(bank.tick_height(), 1); } @@ -1332,7 +1396,10 @@ pub mod tests { bank.last_blockhash(), ); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); - assert_eq!(process_entries(&bank, &[entry_1, entry_2], true), Ok(())); + assert_eq!( + process_entries(&bank, &[entry_1, entry_2], true, None), + Ok(()) + ); assert_eq!(bank.get_balance(&keypair1.pubkey()), 2); assert_eq!(bank.get_balance(&keypair2.pubkey()), 2); assert_eq!(bank.last_blockhash(), blockhash); @@ -1386,7 +1453,12 @@ pub mod tests { ); assert_eq!( - process_entries(&bank, &[entry_1_to_mint, entry_2_to_3_mint_to_1], false), + process_entries( + &bank, + &[entry_1_to_mint, entry_2_to_3_mint_to_1], + false, + None + ), Ok(()) ); @@ -1456,6 +1528,7 @@ pub mod tests { &bank, &[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()], false, + None, ) .is_err()); @@ -1566,6 +1639,7 @@ pub mod tests { entry_conflict_itself.clone() ], false, + None, ) .is_err()); @@ -1612,7 +1686,10 @@ pub mod tests { let tx = system_transaction::transfer(&keypair2, &keypair4.pubkey(), 1, bank.last_blockhash()); let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]); - assert_eq!(process_entries(&bank, &[entry_1, entry_2], true), Ok(())); + assert_eq!( + process_entries(&bank, &[entry_1, entry_2], true, None), + Ok(()) + ); assert_eq!(bank.get_balance(&keypair3.pubkey()), 1); assert_eq!(bank.get_balance(&keypair4.pubkey()), 1); assert_eq!(bank.last_blockhash(), blockhash); @@ -1670,7 +1747,7 @@ pub mod tests { next_entry_mut(&mut hash, 0, transactions) }) .collect(); - assert_eq!(process_entries(&bank, &entries, true), Ok(())); + assert_eq!(process_entries(&bank, &entries, true, None), Ok(())); } #[test] @@ -1730,7 +1807,7 @@ pub mod tests { // Transfer lamports to each other let entry = next_entry(&bank.last_blockhash(), 1, tx_vector); - assert_eq!(process_entries(&bank, &vec![entry], true), Ok(())); + assert_eq!(process_entries(&bank, &vec![entry], true, None), Ok(())); bank.squash(); // Even number keypair should have balance of 2 * initial_lamports and @@ -1794,7 +1871,8 @@ pub mod tests { process_entries( &bank, &[entry_1.clone(), tick.clone(), entry_2.clone()], - true + true, + None ), Ok(()) ); @@ -1806,7 +1884,7 @@ pub mod tests { system_transaction::transfer(&keypair2, &keypair3.pubkey(), 1, bank.last_blockhash()); let entry_3 = next_entry(&entry_2.hash, 1, vec![tx]); assert_eq!( - process_entries(&bank, &[entry_3], true), + process_entries(&bank, &[entry_3], true, None), Err(TransactionError::AccountNotFound) ); } @@ -1886,7 +1964,7 @@ pub mod tests { ); assert_eq!( - process_entries(&bank, &[entry_1_to_mint], false), + process_entries(&bank, &[entry_1_to_mint], false, None), Err(TransactionError::AccountInUse) ); @@ -2030,7 +2108,7 @@ pub mod tests { }) .collect(); info!("paying iteration {}", i); - process_entries(&bank, &entries, true).expect("paying failed"); + process_entries(&bank, &entries, true, None).expect("paying failed"); let entries: Vec<_> = (0..NUM_TRANSFERS) .step_by(NUM_TRANSFERS_PER_ENTRY) @@ -2053,7 +2131,7 @@ pub mod tests { .collect(); info!("refunding iteration {}", i); - process_entries(&bank, &entries, true).expect("refunding failed"); + process_entries(&bank, &entries, true, None).expect("refunding failed"); // advance to next block process_entries( @@ -2062,6 +2140,7 @@ pub mod tests { .map(|_| next_entry_mut(&mut hash, 1, vec![])) .collect::>(), true, + None, ) .expect("process ticks failed"); @@ -2102,7 +2181,7 @@ pub mod tests { let entry = next_entry(&new_blockhash, 1, vec![tx]); entries.push(entry); - process_entries_with_callback(&bank0, &entries, true, None).unwrap(); + process_entries_with_callback(&bank0, &entries, true, None, None).unwrap(); assert_eq!(bank0.get_balance(&keypair.pubkey()), 1) } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 058e398714..61088154bd 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -196,6 +196,8 @@ impl LocalCluster { let leader_contact_info = leader_node.info.clone(); let leader_storage_keypair = Arc::new(storage_keypair); let leader_voting_keypair = Arc::new(voting_keypair); + let mut leader_config = config.validator_configs[0].clone(); + leader_config.transaction_status_service_disabled = true; let leader_server = Validator::new( leader_node, &leader_keypair, @@ -205,7 +207,7 @@ impl LocalCluster { &leader_storage_keypair, None, true, - &config.validator_configs[0], + &leader_config, ); let mut validators = HashMap::new(); @@ -327,6 +329,8 @@ impl LocalCluster { .unwrap(); } + let mut config = validator_config.clone(); + config.transaction_status_service_disabled = true; let voting_keypair = Arc::new(voting_keypair); let validator_server = Validator::new( validator_node, @@ -337,7 +341,7 @@ impl LocalCluster { &storage_keypair, Some(&self.entry_point_info), true, - &validator_config, + &config, ); self.validators @@ -637,6 +641,9 @@ impl Cluster for LocalCluster { // Update the stored ContactInfo for this node let node = Node::new_localhost_with_pubkey(&pubkey); cluster_validator_info.info.contact_info = node.info.clone(); + cluster_validator_info + .config + .transaction_status_service_disabled = true; let entry_point_info = { if *pubkey == self.entry_point_info.id { diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 61b1e67c34..414ff5f519 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -153,6 +153,11 @@ impl StatusCacheRc { pub type EnteredEpochCallback = Box () + Sync + Send>; +pub struct TransactionResults { + pub fee_collection_results: Vec>, + pub processing_results: Vec>, +} + /// Manager for the state of all accounts and programs after processing its entries. #[derive(Default, Deserialize, Serialize)] pub struct Bank { @@ -745,6 +750,11 @@ impl Bank { ) } + pub fn get_fee_calculator(&self, hash: &Hash) -> Option { + let blockhash_queue = self.blockhash_queue.read().unwrap(); + blockhash_queue.get_fee_calculator(hash).cloned() + } + pub fn confirmed_last_blockhash(&self) -> (Hash, FeeCalculator) { const NUM_BLOCKHASH_CONFIRMATIONS: usize = 3; @@ -1153,7 +1163,7 @@ impl Bank { executed: &[Result<()>], tx_count: u64, signature_count: u64, - ) -> Vec> { + ) -> TransactionResults { assert!( !self.is_frozen(), "commit_transactions() working on a frozen bank!" @@ -1186,7 +1196,12 @@ impl Bank { write_time.stop(); debug!("store: {}us txs_len={}", write_time.as_us(), txs.len(),); self.update_transaction_statuses(txs, iteration_order, &executed); - self.filter_program_errors_and_collect_fee(txs, iteration_order, executed) + let fee_collection_results = + self.filter_program_errors_and_collect_fee(txs, iteration_order, executed); + TransactionResults { + fee_collection_results, + processing_results: executed.to_vec(), + } } fn distribute_rent(&self) { @@ -1223,7 +1238,7 @@ impl Bank { &self, batch: &TransactionBatch, max_age: usize, - ) -> Vec> { + ) -> TransactionResults { let (mut loaded_accounts, executed, _, tx_count, signature_count) = self.load_and_execute_transactions(batch, max_age); @@ -1241,6 +1256,7 @@ impl Bank { pub fn process_transactions(&self, txs: &[Transaction]) -> Vec> { let batch = self.prepare_batch(txs, None); self.load_execute_and_commit_transactions(&batch, MAX_RECENT_BLOCKHASHES) + .fee_collection_results } /// Create, sign, and process a Transaction from `keypair` to `to` of @@ -2786,8 +2802,9 @@ mod tests { let pay_alice = vec![tx1]; let lock_result = bank.prepare_batch(&pay_alice, None); - let results_alice = - bank.load_execute_and_commit_transactions(&lock_result, MAX_RECENT_BLOCKHASHES); + let results_alice = bank + .load_execute_and_commit_transactions(&lock_result, MAX_RECENT_BLOCKHASHES) + .fee_collection_results; assert_eq!(results_alice[0], Ok(())); // try executing an interleaved transfer twice