BankingStage Refactor: Add state to Committer (#30107)

This commit is contained in:
Andrew Fitzgerald 2023-02-09 13:22:42 -08:00 committed by GitHub
parent e8c43aa0d1
commit 4b17acf64e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 89 additions and 85 deletions

View File

@ -10,7 +10,7 @@ use {
rayon::prelude::*, rayon::prelude::*,
solana_client::connection_cache::ConnectionCache, solana_client::connection_cache::ConnectionCache,
solana_core::{ solana_core::{
banking_stage::{BankingStage, BankingStageStats}, banking_stage::{committer::Committer, BankingStage, BankingStageStats},
banking_trace::{BankingPacketBatch, BankingTracer}, banking_trace::{BankingPacketBatch, BankingTracer},
leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker,
qos_service::QosService, qos_service::QosService,
@ -91,16 +91,16 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
ThreadType::Transactions, ThreadType::Transactions,
); );
let (s, _r) = unbounded(); let (s, _r) = unbounded();
let committer = Committer::new(None, s);
// This tests the performance of buffering packets. // This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor. // If the packet buffers are copied, performance will be poor.
bencher.iter(move || { bencher.iter(move || {
BankingStage::consume_buffered_packets( BankingStage::consume_buffered_packets(
&bank_start, &bank_start,
&mut transaction_buffer, &mut transaction_buffer,
&None,
&s,
None::<Box<dyn Fn()>>, None::<Box<dyn Fn()>>,
&BankingStageStats::default(), &BankingStageStats::default(),
&committer,
&recorder, &recorder,
&QosService::new(1), &QosService::new(1),
&mut LeaderSlotMetricsTracker::new(0), &mut LeaderSlotMetricsTracker::new(0),

View File

@ -443,9 +443,11 @@ impl BankingStage {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver); let mut packet_receiver = PacketReceiver::new(id, packet_receiver);
let poh_recorder = poh_recorder.clone(); let poh_recorder = poh_recorder.clone();
let transaction_status_sender = transaction_status_sender.clone();
let replay_vote_sender = replay_vote_sender.clone();
let committer = Committer::new(
transaction_status_sender.clone(),
replay_vote_sender.clone(),
);
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let forwarder = Forwarder::new( let forwarder = Forwarder::new(
poh_recorder.clone(), poh_recorder.clone(),
@ -462,10 +464,9 @@ impl BankingStage {
&mut packet_receiver, &mut packet_receiver,
&decision_maker, &decision_maker,
&forwarder, &forwarder,
&committer,
&poh_recorder, &poh_recorder,
id, id,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit, log_messages_bytes_limit,
unprocessed_transaction_storage, unprocessed_transaction_storage,
); );
@ -480,9 +481,8 @@ impl BankingStage {
fn do_process_packets( fn do_process_packets(
bank_start: &BankStart, bank_start: &BankStart,
payload: &mut ConsumeScannerPayload, payload: &mut ConsumeScannerPayload,
committer: &Committer,
recorder: &TransactionRecorder, recorder: &TransactionRecorder,
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
qos_service: &QosService, qos_service: &QosService,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
@ -500,10 +500,9 @@ impl BankingStage {
Self::process_packets_transactions( Self::process_packets_transactions(
&bank_start.working_bank, &bank_start.working_bank,
&bank_start.bank_creation_time, &bank_start.bank_creation_time,
committer,
recorder, recorder,
&payload.sanitized_transactions, &payload.sanitized_transactions,
transaction_status_sender,
replay_vote_sender,
banking_stage_stats, banking_stage_stats,
qos_service, qos_service,
payload.slot_metrics_tracker, payload.slot_metrics_tracker,
@ -553,14 +552,12 @@ impl BankingStage {
Some(retryable_transaction_indexes) Some(retryable_transaction_indexes)
} }
#[allow(clippy::too_many_arguments)]
pub fn consume_buffered_packets( pub fn consume_buffered_packets(
bank_start: &BankStart, bank_start: &BankStart,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
test_fn: Option<impl Fn()>, test_fn: Option<impl Fn()>,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
committer: &Committer,
recorder: &TransactionRecorder, recorder: &TransactionRecorder,
qos_service: &QosService, qos_service: &QosService,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker, slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
@ -579,9 +576,8 @@ impl BankingStage {
Self::do_process_packets( Self::do_process_packets(
bank_start, bank_start,
payload, payload,
committer,
recorder, recorder,
transaction_status_sender,
replay_vote_sender,
banking_stage_stats, banking_stage_stats,
qos_service, qos_service,
log_messages_bytes_limit, log_messages_bytes_limit,
@ -624,9 +620,8 @@ impl BankingStage {
fn process_buffered_packets( fn process_buffered_packets(
decision_maker: &DecisionMaker, decision_maker: &DecisionMaker,
forwarder: &Forwarder, forwarder: &Forwarder,
committer: &Committer,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
recorder: &TransactionRecorder, recorder: &TransactionRecorder,
qos_service: &QosService, qos_service: &QosService,
@ -652,10 +647,9 @@ impl BankingStage {
Self::consume_buffered_packets( Self::consume_buffered_packets(
&bank_start, &bank_start,
unprocessed_transaction_storage, unprocessed_transaction_storage,
transaction_status_sender,
replay_vote_sender,
None::<Box<dyn Fn()>>, None::<Box<dyn Fn()>>,
banking_stage_stats, banking_stage_stats,
committer,
recorder, recorder,
qos_service, qos_service,
slot_metrics_tracker, slot_metrics_tracker,
@ -699,10 +693,9 @@ impl BankingStage {
packet_receiver: &mut PacketReceiver, packet_receiver: &mut PacketReceiver,
decision_maker: &DecisionMaker, decision_maker: &DecisionMaker,
forwarder: &Forwarder, forwarder: &Forwarder,
committer: &Committer,
poh_recorder: &Arc<RwLock<PohRecorder>>, poh_recorder: &Arc<RwLock<PohRecorder>>,
id: u32, id: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
mut unprocessed_transaction_storage: UnprocessedTransactionStorage, mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
) { ) {
@ -722,9 +715,8 @@ impl BankingStage {
Self::process_buffered_packets( Self::process_buffered_packets(
decision_maker, decision_maker,
forwarder, forwarder,
committer,
&mut unprocessed_transaction_storage, &mut unprocessed_transaction_storage,
&transaction_status_sender,
&replay_vote_sender,
&banking_stage_stats, &banking_stage_stats,
&recorder, &recorder,
&qos_service, &qos_service,
@ -765,12 +757,12 @@ impl BankingStage {
fn execute_and_commit_transactions_locked( fn execute_and_commit_transactions_locked(
bank: &Arc<Bank>, bank: &Arc<Bank>,
committer: &Committer,
poh: &TransactionRecorder, poh: &TransactionRecorder,
batch: &TransactionBatch, batch: &TransactionBatch,
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
) -> ExecuteAndCommitTransactionsOutput { ) -> ExecuteAndCommitTransactionsOutput {
let transaction_status_sender_enabled = committer.transaction_status_sender_enabled();
let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default(); let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
let mut pre_balance_info = PreBalanceInfo::default(); let mut pre_balance_info = PreBalanceInfo::default();
@ -778,7 +770,7 @@ impl BankingStage {
{ {
// If the extra meta-data services are enabled for RPC, collect the // If the extra meta-data services are enabled for RPC, collect the
// pre-balances for native and token programs. // pre-balances for native and token programs.
if transaction_status_sender.is_some() { if transaction_status_sender_enabled {
pre_balance_info.native = bank.collect_balances(batch); pre_balance_info.native = bank.collect_balances(batch);
pre_balance_info.token = pre_balance_info.token =
collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals) collect_token_balances(bank, batch, &mut pre_balance_info.mint_decimals)
@ -792,9 +784,9 @@ impl BankingStage {
bank.load_and_execute_transactions( bank.load_and_execute_transactions(
batch, batch,
MAX_PROCESSING_AGE, MAX_PROCESSING_AGE,
transaction_status_sender.is_some(), transaction_status_sender_enabled,
transaction_status_sender.is_some(), transaction_status_sender_enabled,
transaction_status_sender.is_some(), transaction_status_sender_enabled,
&mut execute_and_commit_timings.execute_timings, &mut execute_and_commit_timings.execute_timings,
None, // account_overrides None, // account_overrides
log_messages_bytes_limit log_messages_bytes_limit
@ -880,7 +872,7 @@ impl BankingStage {
} }
let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 { let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 {
Committer::commit_transactions( committer.commit_transactions(
batch, batch,
&mut loaded_transactions, &mut loaded_transactions,
execution_results, execution_results,
@ -888,8 +880,6 @@ impl BankingStage {
bank, bank,
&mut pre_balance_info, &mut pre_balance_info,
&mut execute_and_commit_timings, &mut execute_and_commit_timings,
transaction_status_sender,
replay_vote_sender,
signature_count, signature_count,
executed_transactions_count, executed_transactions_count,
executed_non_vote_transactions_count, executed_non_vote_transactions_count,
@ -937,10 +927,9 @@ impl BankingStage {
pub fn process_and_record_transactions( pub fn process_and_record_transactions(
bank: &Arc<Bank>, bank: &Arc<Bank>,
txs: &[SanitizedTransaction], txs: &[SanitizedTransaction],
committer: &Committer,
poh: &TransactionRecorder, poh: &TransactionRecorder,
chunk_offset: usize, chunk_offset: usize,
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
qos_service: &QosService, qos_service: &QosService,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
) -> ProcessTransactionBatchOutput { ) -> ProcessTransactionBatchOutput {
@ -962,10 +951,9 @@ impl BankingStage {
let mut execute_and_commit_transactions_output = let mut execute_and_commit_transactions_output =
Self::execute_and_commit_transactions_locked( Self::execute_and_commit_transactions_locked(
bank, bank,
committer,
poh, poh,
&batch, &batch,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit, log_messages_bytes_limit,
); );
@ -1036,9 +1024,8 @@ impl BankingStage {
bank: &Arc<Bank>, bank: &Arc<Bank>,
bank_creation_time: &Instant, bank_creation_time: &Instant,
transactions: &[SanitizedTransaction], transactions: &[SanitizedTransaction],
committer: &Committer,
poh: &TransactionRecorder, poh: &TransactionRecorder,
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
qos_service: &QosService, qos_service: &QosService,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
) -> ProcessTransactionsSummary { ) -> ProcessTransactionsSummary {
@ -1067,10 +1054,9 @@ impl BankingStage {
let process_transaction_batch_output = Self::process_and_record_transactions( let process_transaction_batch_output = Self::process_and_record_transactions(
bank, bank,
&transactions[chunk_start..chunk_end], &transactions[chunk_start..chunk_end],
committer,
poh, poh,
chunk_start, chunk_start,
transaction_status_sender,
replay_vote_sender,
qos_service, qos_service,
log_messages_bytes_limit, log_messages_bytes_limit,
); );
@ -1206,14 +1192,12 @@ impl BankingStage {
Self::filter_valid_transaction_indexes(&results) Self::filter_valid_transaction_indexes(&results)
} }
#[allow(clippy::too_many_arguments)]
fn process_packets_transactions<'a>( fn process_packets_transactions<'a>(
bank: &'a Arc<Bank>, bank: &'a Arc<Bank>,
bank_creation_time: &Instant, bank_creation_time: &Instant,
committer: &'a Committer,
poh: &'a TransactionRecorder, poh: &'a TransactionRecorder,
sanitized_transactions: &[SanitizedTransaction], sanitized_transactions: &[SanitizedTransaction],
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &'a ReplayVoteSender,
banking_stage_stats: &'a BankingStageStats, banking_stage_stats: &'a BankingStageStats,
qos_service: &'a QosService, qos_service: &'a QosService,
slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
@ -1225,9 +1209,8 @@ impl BankingStage {
bank, bank,
bank_creation_time, bank_creation_time,
sanitized_transactions, sanitized_transactions,
committer,
poh, poh,
transaction_status_sender,
replay_vote_sender,
qos_service, qos_service,
log_messages_bytes_limit, log_messages_bytes_limit,
), ),
@ -1910,14 +1893,14 @@ mod tests {
poh_recorder.write().unwrap().set_bank(&bank, false); poh_recorder.write().unwrap().set_bank(&bank, false);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
let process_transactions_batch_output = BankingStage::process_and_record_transactions( let process_transactions_batch_output = BankingStage::process_and_record_transactions(
&bank, &bank,
&transactions, &transactions,
&committer,
&recorder, &recorder,
0, 0,
&None,
&replay_vote_sender,
&QosService::new(1), &QosService::new(1),
None, None,
); );
@ -1967,10 +1950,9 @@ mod tests {
let process_transactions_batch_output = BankingStage::process_and_record_transactions( let process_transactions_batch_output = BankingStage::process_and_record_transactions(
&bank, &bank,
&transactions, &transactions,
&committer,
&recorder, &recorder,
0, 0,
&None,
&replay_vote_sender,
&QosService::new(1), &QosService::new(1),
None, None,
); );
@ -2047,14 +2029,14 @@ mod tests {
poh_recorder.write().unwrap().set_bank(&bank, false); poh_recorder.write().unwrap().set_bank(&bank, false);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
let process_transactions_batch_output = BankingStage::process_and_record_transactions( let process_transactions_batch_output = BankingStage::process_and_record_transactions(
&bank, &bank,
&transactions, &transactions,
&committer,
&recorder, &recorder,
0, 0,
&None,
&replay_vote_sender,
&QosService::new(1), &QosService::new(1),
None, None,
); );
@ -2121,7 +2103,7 @@ mod tests {
poh_recorder.write().unwrap().set_bank(&bank, false); poh_recorder.write().unwrap().set_bank(&bank, false);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
let qos_service = QosService::new(1); let qos_service = QosService::new(1);
let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost(); let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost();
@ -2143,10 +2125,9 @@ mod tests {
let process_transactions_batch_output = BankingStage::process_and_record_transactions( let process_transactions_batch_output = BankingStage::process_and_record_transactions(
&bank, &bank,
&transactions, &transactions,
&committer,
&recorder, &recorder,
0, 0,
&None,
&replay_vote_sender,
&qos_service, &qos_service,
None, None,
); );
@ -2183,10 +2164,9 @@ mod tests {
let process_transactions_batch_output = BankingStage::process_and_record_transactions( let process_transactions_batch_output = BankingStage::process_and_record_transactions(
&bank, &bank,
&transactions, &transactions,
&committer,
&recorder, &recorder,
0, 0,
&None,
&replay_vote_sender,
&qos_service, &qos_service,
None, None,
); );
@ -2276,14 +2256,14 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
let process_transactions_batch_output = BankingStage::process_and_record_transactions( let process_transactions_batch_output = BankingStage::process_and_record_transactions(
&bank, &bank,
&transactions, &transactions,
&committer,
&recorder, &recorder,
0, 0,
&None,
&replay_vote_sender,
&QosService::new(1), &QosService::new(1),
None, None,
); );
@ -2354,14 +2334,14 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder))); let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder)));
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
let process_transactions_summary = BankingStage::process_transactions( let process_transactions_summary = BankingStage::process_transactions(
&bank, &bank,
&Instant::now(), &Instant::now(),
&transactions, &transactions,
&committer,
&recorder, &recorder,
&None,
&replay_vote_sender,
&QosService::new(1), &QosService::new(1),
None, None,
); );
@ -2421,14 +2401,14 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
let process_transactions_summary = BankingStage::process_transactions( let process_transactions_summary = BankingStage::process_transactions(
&bank, &bank,
&Instant::now(), &Instant::now(),
&transactions, &transactions,
&committer,
&recorder, &recorder,
&None,
&replay_vote_sender,
&QosService::new(1), &QosService::new(1),
None, None,
); );
@ -2649,20 +2629,24 @@ mod tests {
); );
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
Some(TransactionStatusSender {
sender: transaction_status_sender,
}),
replay_vote_sender,
);
let _ = BankingStage::process_and_record_transactions( let _ = BankingStage::process_and_record_transactions(
&bank, &bank,
&transactions, &transactions,
&committer,
&recorder, &recorder,
0, 0,
&Some(TransactionStatusSender {
sender: transaction_status_sender,
}),
&replay_vote_sender,
&QosService::new(1), &QosService::new(1),
None, None,
); );
drop(committer); // drop/disconnect transaction_status_sender
transaction_status_service.join().unwrap(); transaction_status_service.join().unwrap();
let confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap(); let confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap();
@ -2818,20 +2802,24 @@ mod tests {
); );
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(
Some(TransactionStatusSender {
sender: transaction_status_sender,
}),
replay_vote_sender,
);
let _ = BankingStage::process_and_record_transactions( let _ = BankingStage::process_and_record_transactions(
&bank, &bank,
&[sanitized_tx.clone()], &[sanitized_tx.clone()],
&committer,
&recorder, &recorder,
0, 0,
&Some(TransactionStatusSender {
sender: transaction_status_sender,
}),
&replay_vote_sender,
&QosService::new(1), &QosService::new(1),
None, None,
); );
drop(committer); // drop/disconnect transaction_status_sender
transaction_status_service.join().unwrap(); transaction_status_service.join().unwrap();
let mut confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap(); let mut confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap();
@ -2939,6 +2927,7 @@ mod tests {
); );
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank()); assert!(!poh_recorder.read().unwrap().has_bank());
@ -2950,10 +2939,9 @@ mod tests {
BankingStage::consume_buffered_packets( BankingStage::consume_buffered_packets(
&bank_start, &bank_start,
&mut buffered_packet_batches, &mut buffered_packet_batches,
&None,
&replay_vote_sender,
None::<Box<dyn Fn()>>, None::<Box<dyn Fn()>>,
&BankingStageStats::default(), &BankingStageStats::default(),
&committer,
&recorder, &recorder,
&QosService::new(1), &QosService::new(1),
&mut LeaderSlotMetricsTracker::new(0), &mut LeaderSlotMetricsTracker::new(0),
@ -2997,6 +2985,7 @@ mod tests {
); );
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
// When the working bank in poh_recorder is None, no packets should be processed // When the working bank in poh_recorder is None, no packets should be processed
assert!(!poh_recorder.read().unwrap().has_bank()); assert!(!poh_recorder.read().unwrap().has_bank());
@ -3008,10 +2997,9 @@ mod tests {
BankingStage::consume_buffered_packets( BankingStage::consume_buffered_packets(
&bank_start, &bank_start,
&mut buffered_packet_batches, &mut buffered_packet_batches,
&None,
&replay_vote_sender,
None::<Box<dyn Fn()>>, None::<Box<dyn Fn()>>,
&BankingStageStats::default(), &BankingStageStats::default(),
&committer,
&recorder, &recorder,
&QosService::new(1), &QosService::new(1),
&mut LeaderSlotMetricsTracker::new(0), &mut LeaderSlotMetricsTracker::new(0),
@ -3048,6 +3036,8 @@ mod tests {
let recorder = poh_recorder_.read().unwrap().recorder(); let recorder = poh_recorder_.read().unwrap().recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
// Start up thread to process the banks // Start up thread to process the banks
let t_consume = Builder::new() let t_consume = Builder::new()
.name("consume-buffered-packets".to_string()) .name("consume-buffered-packets".to_string())
@ -3070,10 +3060,9 @@ mod tests {
BankingStage::consume_buffered_packets( BankingStage::consume_buffered_packets(
&bank_start, &bank_start,
&mut buffered_packet_batches, &mut buffered_packet_batches,
&None,
&replay_vote_sender,
test_fn, test_fn,
&BankingStageStats::default(), &BankingStageStats::default(),
&committer,
&recorder, &recorder,
&QosService::new(1), &QosService::new(1),
&mut LeaderSlotMetricsTracker::new(0), &mut LeaderSlotMetricsTracker::new(0),

View File

@ -26,11 +26,29 @@ pub enum CommitTransactionDetails {
NotCommitted, NotCommitted,
} }
pub struct Committer; pub struct Committer {
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
}
impl Committer { impl Committer {
pub fn new(
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
) -> Self {
Self {
transaction_status_sender,
replay_vote_sender,
}
}
pub(super) fn transaction_status_sender_enabled(&self) -> bool {
self.transaction_status_sender.is_some()
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub(super) fn commit_transactions( pub(super) fn commit_transactions(
&self,
batch: &TransactionBatch, batch: &TransactionBatch,
loaded_transactions: &mut [TransactionLoadResult], loaded_transactions: &mut [TransactionLoadResult],
execution_results: Vec<TransactionExecutionResult>, execution_results: Vec<TransactionExecutionResult>,
@ -38,8 +56,6 @@ impl Committer {
bank: &Arc<Bank>, bank: &Arc<Bank>,
pre_balance_info: &mut PreBalanceInfo, pre_balance_info: &mut PreBalanceInfo,
execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings, execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings,
transaction_status_sender: &Option<TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
signature_count: u64, signature_count: u64,
executed_transactions_count: usize, executed_transactions_count: usize,
executed_non_vote_transactions_count: usize, executed_non_vote_transactions_count: usize,
@ -86,10 +102,9 @@ impl Committer {
bank_utils::find_and_send_votes( bank_utils::find_and_send_votes(
batch.sanitized_transactions(), batch.sanitized_transactions(),
&tx_results, &tx_results,
Some(replay_vote_sender), Some(&self.replay_vote_sender),
); );
Self::collect_balances_and_send_status_batch( self.collect_balances_and_send_status_batch(
transaction_status_sender,
tx_results, tx_results,
bank, bank,
batch, batch,
@ -102,14 +117,14 @@ impl Committer {
} }
fn collect_balances_and_send_status_batch( fn collect_balances_and_send_status_batch(
transaction_status_sender: &Option<TransactionStatusSender>, &self,
tx_results: TransactionResults, tx_results: TransactionResults,
bank: &Arc<Bank>, bank: &Arc<Bank>,
batch: &TransactionBatch, batch: &TransactionBatch,
pre_balance_info: &mut PreBalanceInfo, pre_balance_info: &mut PreBalanceInfo,
starting_transaction_index: Option<usize>, starting_transaction_index: Option<usize>,
) { ) {
if let Some(transaction_status_sender) = transaction_status_sender { if let Some(transaction_status_sender) = &self.transaction_status_sender {
let txs = batch.sanitized_transactions().to_vec(); let txs = batch.sanitized_transactions().to_vec();
let post_balances = bank.collect_balances(batch); let post_balances = bank.collect_balances(batch);
let post_token_balances = let post_token_balances =