BankingStage Refactor: Consumer State (#30288)
* BankingStage Refactor: Consumer add state * remove trailing comma
This commit is contained in:
parent
120b0c92d1
commit
bba0ed702f
|
@ -94,19 +94,15 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
|
||||||
);
|
);
|
||||||
let (s, _r) = unbounded();
|
let (s, _r) = unbounded();
|
||||||
let committer = Committer::new(None, s);
|
let committer = Committer::new(None, s);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
// This tests the performance of buffering packets.
|
// This tests the performance of buffering packets.
|
||||||
// If the packet buffers are copied, performance will be poor.
|
// If the packet buffers are copied, performance will be poor.
|
||||||
bencher.iter(move || {
|
bencher.iter(move || {
|
||||||
Consumer::consume_buffered_packets(
|
consumer.consume_buffered_packets(
|
||||||
&bank_start,
|
&bank_start,
|
||||||
&mut transaction_buffer,
|
&mut transaction_buffer,
|
||||||
None::<Box<dyn Fn()>>,
|
|
||||||
&BankingStageStats::default(),
|
&BankingStageStats::default(),
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
&QosService::new(1),
|
|
||||||
&mut LeaderSlotMetricsTracker::new(0),
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ use {
|
||||||
solana_ledger::blockstore_processor::TransactionStatusSender,
|
solana_ledger::blockstore_processor::TransactionStatusSender,
|
||||||
solana_measure::{measure, measure_us},
|
solana_measure::{measure, measure_us},
|
||||||
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
|
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
|
||||||
solana_poh::poh_recorder::{PohRecorder, TransactionRecorder},
|
solana_poh::poh_recorder::PohRecorder,
|
||||||
solana_runtime::{bank_forks::BankForks, vote_sender_types::ReplayVoteSender},
|
solana_runtime::{bank_forks::BankForks, vote_sender_types::ReplayVoteSender},
|
||||||
solana_sdk::{feature_set::allow_votes_to_directly_update_vote_state, timing::AtomicInterval},
|
solana_sdk::{feature_set::allow_votes_to_directly_update_vote_state, timing::AtomicInterval},
|
||||||
std::{
|
std::{
|
||||||
|
@ -394,6 +394,13 @@ impl BankingStage {
|
||||||
connection_cache.clone(),
|
connection_cache.clone(),
|
||||||
data_budget.clone(),
|
data_budget.clone(),
|
||||||
);
|
);
|
||||||
|
let consumer = Consumer::new(
|
||||||
|
committer,
|
||||||
|
poh_recorder.read().unwrap().recorder(),
|
||||||
|
QosService::new(id),
|
||||||
|
log_messages_bytes_limit,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name(format!("solBanknStgTx{id:02}"))
|
.name(format!("solBanknStgTx{id:02}"))
|
||||||
|
@ -402,10 +409,8 @@ impl BankingStage {
|
||||||
&mut packet_receiver,
|
&mut packet_receiver,
|
||||||
&decision_maker,
|
&decision_maker,
|
||||||
&forwarder,
|
&forwarder,
|
||||||
&committer,
|
&consumer,
|
||||||
&poh_recorder,
|
|
||||||
id,
|
id,
|
||||||
log_messages_bytes_limit,
|
|
||||||
unprocessed_transaction_storage,
|
unprocessed_transaction_storage,
|
||||||
);
|
);
|
||||||
})
|
})
|
||||||
|
@ -419,13 +424,10 @@ impl BankingStage {
|
||||||
fn process_buffered_packets(
|
fn process_buffered_packets(
|
||||||
decision_maker: &DecisionMaker,
|
decision_maker: &DecisionMaker,
|
||||||
forwarder: &Forwarder,
|
forwarder: &Forwarder,
|
||||||
committer: &Committer,
|
consumer: &Consumer,
|
||||||
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
|
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
recorder: &TransactionRecorder,
|
|
||||||
qos_service: &QosService,
|
|
||||||
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
log_messages_bytes_limit: Option<usize>,
|
|
||||||
tracer_packet_stats: &mut TracerPacketStats,
|
tracer_packet_stats: &mut TracerPacketStats,
|
||||||
) {
|
) {
|
||||||
if unprocessed_transaction_storage.should_not_process() {
|
if unprocessed_transaction_storage.should_not_process() {
|
||||||
|
@ -443,16 +445,11 @@ impl BankingStage {
|
||||||
// of the previous slot
|
// of the previous slot
|
||||||
slot_metrics_tracker.apply_action(metrics_action);
|
slot_metrics_tracker.apply_action(metrics_action);
|
||||||
let (_, consume_buffered_packets_time) = measure!(
|
let (_, consume_buffered_packets_time) = measure!(
|
||||||
Consumer::consume_buffered_packets(
|
consumer.consume_buffered_packets(
|
||||||
&bank_start,
|
&bank_start,
|
||||||
unprocessed_transaction_storage,
|
unprocessed_transaction_storage,
|
||||||
None::<Box<dyn Fn()>>,
|
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
committer,
|
|
||||||
recorder,
|
|
||||||
qos_service,
|
|
||||||
slot_metrics_tracker,
|
slot_metrics_tracker,
|
||||||
log_messages_bytes_limit
|
|
||||||
),
|
),
|
||||||
"consume_buffered_packets",
|
"consume_buffered_packets",
|
||||||
);
|
);
|
||||||
|
@ -492,16 +489,12 @@ impl BankingStage {
|
||||||
packet_receiver: &mut PacketReceiver,
|
packet_receiver: &mut PacketReceiver,
|
||||||
decision_maker: &DecisionMaker,
|
decision_maker: &DecisionMaker,
|
||||||
forwarder: &Forwarder,
|
forwarder: &Forwarder,
|
||||||
committer: &Committer,
|
consumer: &Consumer,
|
||||||
poh_recorder: &Arc<RwLock<PohRecorder>>,
|
|
||||||
id: u32,
|
id: u32,
|
||||||
log_messages_bytes_limit: Option<usize>,
|
|
||||||
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
|
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
|
||||||
) {
|
) {
|
||||||
let recorder = poh_recorder.read().unwrap().recorder();
|
|
||||||
let mut banking_stage_stats = BankingStageStats::new(id);
|
let mut banking_stage_stats = BankingStageStats::new(id);
|
||||||
let mut tracer_packet_stats = TracerPacketStats::new(id);
|
let mut tracer_packet_stats = TracerPacketStats::new(id);
|
||||||
let qos_service = QosService::new(id);
|
|
||||||
|
|
||||||
let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
|
let mut slot_metrics_tracker = LeaderSlotMetricsTracker::new(id);
|
||||||
let mut last_metrics_update = Instant::now();
|
let mut last_metrics_update = Instant::now();
|
||||||
|
@ -514,13 +507,10 @@ impl BankingStage {
|
||||||
Self::process_buffered_packets(
|
Self::process_buffered_packets(
|
||||||
decision_maker,
|
decision_maker,
|
||||||
forwarder,
|
forwarder,
|
||||||
committer,
|
consumer,
|
||||||
&mut unprocessed_transaction_storage,
|
&mut unprocessed_transaction_storage,
|
||||||
&banking_stage_stats,
|
&banking_stage_stats,
|
||||||
&recorder,
|
|
||||||
&qos_service,
|
|
||||||
&mut slot_metrics_tracker,
|
&mut slot_metrics_tracker,
|
||||||
log_messages_bytes_limit,
|
|
||||||
&mut tracer_packet_stats,
|
&mut tracer_packet_stats,
|
||||||
),
|
),
|
||||||
"process_buffered_packets",
|
"process_buffered_packets",
|
||||||
|
|
|
@ -65,19 +65,37 @@ pub struct ExecuteAndCommitTransactionsOutput {
|
||||||
error_counters: TransactionErrorMetrics,
|
error_counters: TransactionErrorMetrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Consumer;
|
pub struct Consumer {
|
||||||
|
committer: Committer,
|
||||||
|
transaction_recorder: TransactionRecorder,
|
||||||
|
qos_service: QosService,
|
||||||
|
log_messages_bytes_limit: Option<usize>,
|
||||||
|
test_fn: Option<Box<dyn Fn() + Send>>,
|
||||||
|
}
|
||||||
|
|
||||||
impl Consumer {
|
impl Consumer {
|
||||||
|
pub fn new(
|
||||||
|
committer: Committer,
|
||||||
|
transaction_recorder: TransactionRecorder,
|
||||||
|
qos_service: QosService,
|
||||||
|
log_messages_bytes_limit: Option<usize>,
|
||||||
|
test_fn: Option<Box<dyn Fn() + Send>>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
committer,
|
||||||
|
transaction_recorder,
|
||||||
|
qos_service,
|
||||||
|
log_messages_bytes_limit,
|
||||||
|
test_fn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn consume_buffered_packets(
|
pub fn consume_buffered_packets(
|
||||||
|
&self,
|
||||||
bank_start: &BankStart,
|
bank_start: &BankStart,
|
||||||
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
|
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
|
||||||
test_fn: Option<impl Fn()>,
|
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
committer: &Committer,
|
|
||||||
transaction_recorder: &TransactionRecorder,
|
|
||||||
qos_service: &QosService,
|
|
||||||
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
log_messages_bytes_limit: Option<usize>,
|
|
||||||
) {
|
) {
|
||||||
let mut rebuffered_packet_count = 0;
|
let mut rebuffered_packet_count = 0;
|
||||||
let mut consumed_buffered_packets_count = 0;
|
let mut consumed_buffered_packets_count = 0;
|
||||||
|
@ -89,17 +107,12 @@ impl Consumer {
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
slot_metrics_tracker,
|
slot_metrics_tracker,
|
||||||
|packets_to_process, payload| {
|
|packets_to_process, payload| {
|
||||||
Self::do_process_packets(
|
self.do_process_packets(
|
||||||
bank_start,
|
bank_start,
|
||||||
payload,
|
payload,
|
||||||
committer,
|
|
||||||
transaction_recorder,
|
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
qos_service,
|
|
||||||
log_messages_bytes_limit,
|
|
||||||
&mut consumed_buffered_packets_count,
|
&mut consumed_buffered_packets_count,
|
||||||
&mut rebuffered_packet_count,
|
&mut rebuffered_packet_count,
|
||||||
&test_fn,
|
|
||||||
packets_to_process,
|
packets_to_process,
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -132,18 +145,13 @@ impl Consumer {
|
||||||
.fetch_add(consumed_buffered_packets_count, Ordering::Relaxed);
|
.fetch_add(consumed_buffered_packets_count, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
|
||||||
fn do_process_packets(
|
fn do_process_packets(
|
||||||
|
&self,
|
||||||
bank_start: &BankStart,
|
bank_start: &BankStart,
|
||||||
payload: &mut ConsumeScannerPayload,
|
payload: &mut ConsumeScannerPayload,
|
||||||
committer: &Committer,
|
|
||||||
transaction_recorder: &TransactionRecorder,
|
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
qos_service: &QosService,
|
|
||||||
log_messages_bytes_limit: Option<usize>,
|
|
||||||
consumed_buffered_packets_count: &mut usize,
|
consumed_buffered_packets_count: &mut usize,
|
||||||
rebuffered_packet_count: &mut usize,
|
rebuffered_packet_count: &mut usize,
|
||||||
test_fn: &Option<impl Fn()>,
|
|
||||||
packets_to_process: &Vec<Arc<ImmutableDeserializedPacket>>,
|
packets_to_process: &Vec<Arc<ImmutableDeserializedPacket>>,
|
||||||
) -> Option<Vec<usize>> {
|
) -> Option<Vec<usize>> {
|
||||||
if payload.reached_end_of_slot {
|
if payload.reached_end_of_slot {
|
||||||
|
@ -151,17 +159,13 @@ impl Consumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
let packets_to_process_len = packets_to_process.len();
|
let packets_to_process_len = packets_to_process.len();
|
||||||
let (process_transactions_summary, process_packets_transactions_us) =
|
let (process_transactions_summary, process_packets_transactions_us) = measure_us!(self
|
||||||
measure_us!(Self::process_packets_transactions(
|
.process_packets_transactions(
|
||||||
&bank_start.working_bank,
|
&bank_start.working_bank,
|
||||||
&bank_start.bank_creation_time,
|
&bank_start.bank_creation_time,
|
||||||
committer,
|
|
||||||
transaction_recorder,
|
|
||||||
&payload.sanitized_transactions,
|
&payload.sanitized_transactions,
|
||||||
banking_stage_stats,
|
banking_stage_stats,
|
||||||
qos_service,
|
|
||||||
payload.slot_metrics_tracker,
|
payload.slot_metrics_tracker,
|
||||||
log_messages_bytes_limit
|
|
||||||
));
|
));
|
||||||
payload
|
payload
|
||||||
.slot_metrics_tracker
|
.slot_metrics_tracker
|
||||||
|
@ -194,7 +198,7 @@ impl Consumer {
|
||||||
// Out of the buffered packets just retried, collect any still unprocessed
|
// Out of the buffered packets just retried, collect any still unprocessed
|
||||||
// transactions in this batch for forwarding
|
// transactions in this batch for forwarding
|
||||||
*rebuffered_packet_count += retryable_transaction_indexes.len();
|
*rebuffered_packet_count += retryable_transaction_indexes.len();
|
||||||
if let Some(test_fn) = test_fn {
|
if let Some(test_fn) = &self.test_fn {
|
||||||
test_fn();
|
test_fn();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -206,27 +210,16 @@ impl Consumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_packets_transactions(
|
fn process_packets_transactions(
|
||||||
|
&self,
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
bank_creation_time: &Instant,
|
bank_creation_time: &Instant,
|
||||||
committer: &Committer,
|
|
||||||
transaction_recorder: &TransactionRecorder,
|
|
||||||
sanitized_transactions: &[SanitizedTransaction],
|
sanitized_transactions: &[SanitizedTransaction],
|
||||||
banking_stage_stats: &BankingStageStats,
|
banking_stage_stats: &BankingStageStats,
|
||||||
qos_service: &QosService,
|
|
||||||
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
|
||||||
log_messages_bytes_limit: Option<usize>,
|
|
||||||
) -> ProcessTransactionsSummary {
|
) -> ProcessTransactionsSummary {
|
||||||
// Process transactions
|
let (mut process_transactions_summary, process_transactions_us) = measure_us!(
|
||||||
let (mut process_transactions_summary, process_transactions_us) =
|
self.process_transactions(bank, bank_creation_time, sanitized_transactions)
|
||||||
measure_us!(Self::process_transactions(
|
);
|
||||||
bank,
|
|
||||||
bank_creation_time,
|
|
||||||
sanitized_transactions,
|
|
||||||
committer,
|
|
||||||
transaction_recorder,
|
|
||||||
qos_service,
|
|
||||||
log_messages_bytes_limit,
|
|
||||||
));
|
|
||||||
slot_metrics_tracker.increment_process_transactions_us(process_transactions_us);
|
slot_metrics_tracker.increment_process_transactions_us(process_transactions_us);
|
||||||
banking_stage_stats
|
banking_stage_stats
|
||||||
.transaction_processing_elapsed
|
.transaction_processing_elapsed
|
||||||
|
@ -277,13 +270,10 @@ impl Consumer {
|
||||||
/// Returns the number of transactions successfully processed by the bank, which may be less
|
/// Returns the number of transactions successfully processed by the bank, which may be less
|
||||||
/// than the total number if max PoH height was reached and the bank halted
|
/// than the total number if max PoH height was reached and the bank halted
|
||||||
fn process_transactions(
|
fn process_transactions(
|
||||||
|
&self,
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
bank_creation_time: &Instant,
|
bank_creation_time: &Instant,
|
||||||
transactions: &[SanitizedTransaction],
|
transactions: &[SanitizedTransaction],
|
||||||
committer: &Committer,
|
|
||||||
transaction_recorder: &TransactionRecorder,
|
|
||||||
qos_service: &QosService,
|
|
||||||
log_messages_bytes_limit: Option<usize>,
|
|
||||||
) -> ProcessTransactionsSummary {
|
) -> ProcessTransactionsSummary {
|
||||||
let mut chunk_start = 0;
|
let mut chunk_start = 0;
|
||||||
let mut all_retryable_tx_indexes = vec![];
|
let mut all_retryable_tx_indexes = vec![];
|
||||||
|
@ -307,14 +297,10 @@ impl Consumer {
|
||||||
transactions.len(),
|
transactions.len(),
|
||||||
chunk_start + MAX_NUM_TRANSACTIONS_PER_BATCH,
|
chunk_start + MAX_NUM_TRANSACTIONS_PER_BATCH,
|
||||||
);
|
);
|
||||||
let process_transaction_batch_output = Self::process_and_record_transactions(
|
let process_transaction_batch_output = self.process_and_record_transactions(
|
||||||
bank,
|
bank,
|
||||||
&transactions[chunk_start..chunk_end],
|
&transactions[chunk_start..chunk_end],
|
||||||
committer,
|
|
||||||
transaction_recorder,
|
|
||||||
chunk_start,
|
chunk_start,
|
||||||
qos_service,
|
|
||||||
log_messages_bytes_limit,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
let ProcessTransactionBatchOutput {
|
let ProcessTransactionBatchOutput {
|
||||||
|
@ -409,18 +395,17 @@ impl Consumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn process_and_record_transactions(
|
pub fn process_and_record_transactions(
|
||||||
|
&self,
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
txs: &[SanitizedTransaction],
|
txs: &[SanitizedTransaction],
|
||||||
committer: &Committer,
|
|
||||||
transaction_recorder: &TransactionRecorder,
|
|
||||||
chunk_offset: usize,
|
chunk_offset: usize,
|
||||||
qos_service: &QosService,
|
|
||||||
log_messages_bytes_limit: Option<usize>,
|
|
||||||
) -> ProcessTransactionBatchOutput {
|
) -> ProcessTransactionBatchOutput {
|
||||||
let (
|
let (
|
||||||
(transaction_costs, transactions_qos_results, cost_model_throttled_transactions_count),
|
(transaction_costs, transactions_qos_results, cost_model_throttled_transactions_count),
|
||||||
cost_model_us,
|
cost_model_us,
|
||||||
) = measure_us!(qos_service.select_and_accumulate_transaction_costs(bank, txs));
|
) = measure_us!(self
|
||||||
|
.qos_service
|
||||||
|
.select_and_accumulate_transaction_costs(bank, txs));
|
||||||
|
|
||||||
// Only lock accounts for those transactions are selected for the block;
|
// Only lock accounts for those transactions are selected for the block;
|
||||||
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
||||||
|
@ -433,13 +418,7 @@ impl Consumer {
|
||||||
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
|
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
|
||||||
// and WouldExceedMaxAccountDataCostLimit
|
// and WouldExceedMaxAccountDataCostLimit
|
||||||
let mut execute_and_commit_transactions_output =
|
let mut execute_and_commit_transactions_output =
|
||||||
Self::execute_and_commit_transactions_locked(
|
self.execute_and_commit_transactions_locked(bank, &batch);
|
||||||
bank,
|
|
||||||
committer,
|
|
||||||
transaction_recorder,
|
|
||||||
&batch,
|
|
||||||
log_messages_bytes_limit,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Once the accounts are new transactions can enter the pipeline to process them
|
// Once the accounts are new transactions can enter the pipeline to process them
|
||||||
let (_, unlock_us) = measure_us!(drop(batch));
|
let (_, unlock_us) = measure_us!(drop(batch));
|
||||||
|
@ -464,11 +443,11 @@ impl Consumer {
|
||||||
|
|
||||||
let (cu, us) =
|
let (cu, us) =
|
||||||
Self::accumulate_execute_units_and_time(&execute_and_commit_timings.execute_timings);
|
Self::accumulate_execute_units_and_time(&execute_and_commit_timings.execute_timings);
|
||||||
qos_service.accumulate_actual_execute_cu(cu);
|
self.qos_service.accumulate_actual_execute_cu(cu);
|
||||||
qos_service.accumulate_actual_execute_time(us);
|
self.qos_service.accumulate_actual_execute_time(us);
|
||||||
|
|
||||||
// reports qos service stats for this batch
|
// reports qos service stats for this batch
|
||||||
qos_service.report_metrics(bank.clone());
|
self.qos_service.report_metrics(bank.clone());
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"bank: {} lock: {}us unlock: {}us txs_len: {}",
|
"bank: {} lock: {}us unlock: {}us txs_len: {}",
|
||||||
|
@ -486,13 +465,11 @@ impl Consumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn execute_and_commit_transactions_locked(
|
fn execute_and_commit_transactions_locked(
|
||||||
|
&self,
|
||||||
bank: &Arc<Bank>,
|
bank: &Arc<Bank>,
|
||||||
committer: &Committer,
|
|
||||||
transaction_recorder: &TransactionRecorder,
|
|
||||||
batch: &TransactionBatch,
|
batch: &TransactionBatch,
|
||||||
log_messages_bytes_limit: Option<usize>,
|
|
||||||
) -> ExecuteAndCommitTransactionsOutput {
|
) -> ExecuteAndCommitTransactionsOutput {
|
||||||
let transaction_status_sender_enabled = committer.transaction_status_sender_enabled();
|
let transaction_status_sender_enabled = self.committer.transaction_status_sender_enabled();
|
||||||
let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
|
let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
|
||||||
|
|
||||||
let mut pre_balance_info = PreBalanceInfo::default();
|
let mut pre_balance_info = PreBalanceInfo::default();
|
||||||
|
@ -516,7 +493,7 @@ impl Consumer {
|
||||||
transaction_status_sender_enabled,
|
transaction_status_sender_enabled,
|
||||||
&mut execute_and_commit_timings.execute_timings,
|
&mut execute_and_commit_timings.execute_timings,
|
||||||
None, // account_overrides
|
None, // account_overrides
|
||||||
log_messages_bytes_limit
|
self.log_messages_bytes_limit
|
||||||
));
|
));
|
||||||
execute_and_commit_timings.load_execute_us = load_execute_us;
|
execute_and_commit_timings.load_execute_us = load_execute_us;
|
||||||
|
|
||||||
|
@ -556,9 +533,9 @@ impl Consumer {
|
||||||
executed_transactions_count
|
executed_transactions_count
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
let (record_transactions_summary, record_us) = measure_us!(
|
let (record_transactions_summary, record_us) = measure_us!(self
|
||||||
transaction_recorder.record_transactions(bank.slot(), executed_transactions)
|
.transaction_recorder
|
||||||
);
|
.record_transactions(bank.slot(), executed_transactions));
|
||||||
execute_and_commit_timings.record_us = record_us;
|
execute_and_commit_timings.record_us = record_us;
|
||||||
|
|
||||||
let RecordTransactionsSummary {
|
let RecordTransactionsSummary {
|
||||||
|
@ -594,7 +571,7 @@ impl Consumer {
|
||||||
}
|
}
|
||||||
|
|
||||||
let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 {
|
let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 {
|
||||||
committer.commit_transactions(
|
self.committer.commit_transactions(
|
||||||
batch,
|
batch,
|
||||||
&mut loaded_transactions,
|
&mut loaded_transactions,
|
||||||
execution_results,
|
execution_results,
|
||||||
|
@ -781,16 +758,9 @@ mod tests {
|
||||||
|
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let committer = Committer::new(None, replay_vote_sender);
|
let committer = Committer::new(None, replay_vote_sender);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
let process_transactions_summary = Consumer::process_transactions(
|
let process_transactions_summary =
|
||||||
&bank,
|
consumer.process_transactions(&bank, &Instant::now(), &transactions);
|
||||||
&Instant::now(),
|
|
||||||
&transactions,
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
&QosService::new(1),
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
poh_recorder
|
poh_recorder
|
||||||
.read()
|
.read()
|
||||||
|
@ -928,16 +898,10 @@ mod tests {
|
||||||
poh_recorder.write().unwrap().set_bank(&bank, false);
|
poh_recorder.write().unwrap().set_bank(&bank, false);
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let committer = Committer::new(None, replay_vote_sender);
|
let committer = Committer::new(None, replay_vote_sender);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
|
|
||||||
let process_transactions_batch_output = Consumer::process_and_record_transactions(
|
let process_transactions_batch_output =
|
||||||
&bank,
|
consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||||
&transactions,
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
0,
|
|
||||||
&QosService::new(1),
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
let ExecuteAndCommitTransactionsOutput {
|
let ExecuteAndCommitTransactionsOutput {
|
||||||
transactions_attempted_execution_count,
|
transactions_attempted_execution_count,
|
||||||
|
@ -981,15 +945,8 @@ mod tests {
|
||||||
genesis_config.hash(),
|
genesis_config.hash(),
|
||||||
)]);
|
)]);
|
||||||
|
|
||||||
let process_transactions_batch_output = Consumer::process_and_record_transactions(
|
let process_transactions_batch_output =
|
||||||
&bank,
|
consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||||
&transactions,
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
0,
|
|
||||||
&QosService::new(1),
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
let ExecuteAndCommitTransactionsOutput {
|
let ExecuteAndCommitTransactionsOutput {
|
||||||
transactions_attempted_execution_count,
|
transactions_attempted_execution_count,
|
||||||
|
@ -1064,16 +1021,10 @@ mod tests {
|
||||||
poh_recorder.write().unwrap().set_bank(&bank, false);
|
poh_recorder.write().unwrap().set_bank(&bank, false);
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let committer = Committer::new(None, replay_vote_sender);
|
let committer = Committer::new(None, replay_vote_sender);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
|
|
||||||
let process_transactions_batch_output = Consumer::process_and_record_transactions(
|
let process_transactions_batch_output =
|
||||||
&bank,
|
consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||||
&transactions,
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
0,
|
|
||||||
&QosService::new(1),
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
let ExecuteAndCommitTransactionsOutput {
|
let ExecuteAndCommitTransactionsOutput {
|
||||||
transactions_attempted_execution_count,
|
transactions_attempted_execution_count,
|
||||||
|
@ -1138,7 +1089,7 @@ mod tests {
|
||||||
poh_recorder.write().unwrap().set_bank(&bank, false);
|
poh_recorder.write().unwrap().set_bank(&bank, false);
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let committer = Committer::new(None, replay_vote_sender);
|
let committer = Committer::new(None, replay_vote_sender);
|
||||||
let qos_service = QosService::new(1);
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
|
|
||||||
let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost();
|
let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost();
|
||||||
let get_tx_count = || bank.read_cost_tracker().unwrap().transaction_count();
|
let get_tx_count = || bank.read_cost_tracker().unwrap().transaction_count();
|
||||||
|
@ -1156,15 +1107,8 @@ mod tests {
|
||||||
genesis_config.hash(),
|
genesis_config.hash(),
|
||||||
)]);
|
)]);
|
||||||
|
|
||||||
let process_transactions_batch_output = Consumer::process_and_record_transactions(
|
let process_transactions_batch_output =
|
||||||
&bank,
|
consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||||
&transactions,
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
0,
|
|
||||||
&qos_service,
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
let ExecuteAndCommitTransactionsOutput {
|
let ExecuteAndCommitTransactionsOutput {
|
||||||
executed_with_successful_result_count,
|
executed_with_successful_result_count,
|
||||||
|
@ -1195,15 +1139,8 @@ mod tests {
|
||||||
),
|
),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
let process_transactions_batch_output = Consumer::process_and_record_transactions(
|
let process_transactions_batch_output =
|
||||||
&bank,
|
consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||||
&transactions,
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
0,
|
|
||||||
&qos_service,
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
let ExecuteAndCommitTransactionsOutput {
|
let ExecuteAndCommitTransactionsOutput {
|
||||||
executed_with_successful_result_count,
|
executed_with_successful_result_count,
|
||||||
|
@ -1270,16 +1207,10 @@ mod tests {
|
||||||
|
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let committer = Committer::new(None, replay_vote_sender);
|
let committer = Committer::new(None, replay_vote_sender);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
|
|
||||||
let process_transactions_batch_output = Consumer::process_and_record_transactions(
|
let process_transactions_batch_output =
|
||||||
&bank,
|
consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||||
&transactions,
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
0,
|
|
||||||
&QosService::new(1),
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
poh_recorder
|
poh_recorder
|
||||||
.read()
|
.read()
|
||||||
|
@ -1469,16 +1400,11 @@ mod tests {
|
||||||
|
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let committer = Committer::new(None, replay_vote_sender);
|
let committer = Committer::new(None, replay_vote_sender);
|
||||||
|
let consumer =
|
||||||
|
Consumer::new(committer, recorder.clone(), QosService::new(1), None, None);
|
||||||
|
|
||||||
let process_transactions_summary = Consumer::process_transactions(
|
let process_transactions_summary =
|
||||||
&bank,
|
consumer.process_transactions(&bank, &Instant::now(), &transactions);
|
||||||
&Instant::now(),
|
|
||||||
&transactions,
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
&QosService::new(1),
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
let ProcessTransactionsSummary {
|
let ProcessTransactionsSummary {
|
||||||
reached_max_poh_height,
|
reached_max_poh_height,
|
||||||
|
@ -1599,18 +1525,11 @@ mod tests {
|
||||||
}),
|
}),
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
);
|
);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
|
|
||||||
let _ = Consumer::process_and_record_transactions(
|
let _ = consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||||
&bank,
|
|
||||||
&transactions,
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
0,
|
|
||||||
&QosService::new(1),
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
drop(committer); // drop/disconnect transaction_status_sender
|
drop(consumer); // drop/disconnect transaction_status_sender
|
||||||
transaction_status_service.join().unwrap();
|
transaction_status_service.join().unwrap();
|
||||||
|
|
||||||
let confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap();
|
let confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap();
|
||||||
|
@ -1743,18 +1662,11 @@ mod tests {
|
||||||
}),
|
}),
|
||||||
replay_vote_sender,
|
replay_vote_sender,
|
||||||
);
|
);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
|
|
||||||
let _ = Consumer::process_and_record_transactions(
|
let _ = consumer.process_and_record_transactions(&bank, &[sanitized_tx.clone()], 0);
|
||||||
&bank,
|
|
||||||
&[sanitized_tx.clone()],
|
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
0,
|
|
||||||
&QosService::new(1),
|
|
||||||
None,
|
|
||||||
);
|
|
||||||
|
|
||||||
drop(committer); // drop/disconnect transaction_status_sender
|
drop(consumer); // drop/disconnect transaction_status_sender
|
||||||
transaction_status_service.join().unwrap();
|
transaction_status_service.join().unwrap();
|
||||||
|
|
||||||
let mut confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap();
|
let mut confirmed_block = blockstore.get_rooted_block(bank.slot(), false).unwrap();
|
||||||
|
@ -1808,6 +1720,7 @@ mod tests {
|
||||||
|
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let committer = Committer::new(None, replay_vote_sender);
|
let committer = Committer::new(None, replay_vote_sender);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
|
|
||||||
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
|
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
|
||||||
assert!(!poh_recorder.read().unwrap().has_bank());
|
assert!(!poh_recorder.read().unwrap().has_bank());
|
||||||
|
@ -1816,16 +1729,11 @@ mod tests {
|
||||||
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
|
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
|
||||||
poh_recorder.write().unwrap().set_bank(&bank, false);
|
poh_recorder.write().unwrap().set_bank(&bank, false);
|
||||||
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
|
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
|
||||||
Consumer::consume_buffered_packets(
|
consumer.consume_buffered_packets(
|
||||||
&bank_start,
|
&bank_start,
|
||||||
&mut buffered_packet_batches,
|
&mut buffered_packet_batches,
|
||||||
None::<Box<dyn Fn()>>,
|
|
||||||
&BankingStageStats::default(),
|
&BankingStageStats::default(),
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
&QosService::new(1),
|
|
||||||
&mut LeaderSlotMetricsTracker::new(0),
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
assert!(buffered_packet_batches.is_empty());
|
assert!(buffered_packet_batches.is_empty());
|
||||||
poh_recorder
|
poh_recorder
|
||||||
|
@ -1866,6 +1774,7 @@ mod tests {
|
||||||
|
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let committer = Committer::new(None, replay_vote_sender);
|
let committer = Committer::new(None, replay_vote_sender);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||||
|
|
||||||
// When the working bank in poh_recorder is None, no packets should be processed
|
// When the working bank in poh_recorder is None, no packets should be processed
|
||||||
assert!(!poh_recorder.read().unwrap().has_bank());
|
assert!(!poh_recorder.read().unwrap().has_bank());
|
||||||
|
@ -1874,16 +1783,11 @@ mod tests {
|
||||||
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
|
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
|
||||||
poh_recorder.write().unwrap().set_bank(&bank, false);
|
poh_recorder.write().unwrap().set_bank(&bank, false);
|
||||||
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
|
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
|
||||||
Consumer::consume_buffered_packets(
|
consumer.consume_buffered_packets(
|
||||||
&bank_start,
|
&bank_start,
|
||||||
&mut buffered_packet_batches,
|
&mut buffered_packet_batches,
|
||||||
None::<Box<dyn Fn()>>,
|
|
||||||
&BankingStageStats::default(),
|
&BankingStageStats::default(),
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
&QosService::new(1),
|
|
||||||
&mut LeaderSlotMetricsTracker::new(0),
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
assert!(buffered_packet_batches.is_empty());
|
assert!(buffered_packet_batches.is_empty());
|
||||||
poh_recorder
|
poh_recorder
|
||||||
|
@ -1905,10 +1809,10 @@ mod tests {
|
||||||
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
|
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
|
||||||
setup_conflicting_transactions(ledger_path.path());
|
setup_conflicting_transactions(ledger_path.path());
|
||||||
|
|
||||||
let test_fn = Some(move || {
|
let test_fn: Option<Box<dyn Fn() + Send>> = Some(Box::new(move || {
|
||||||
finished_packet_sender.send(()).unwrap();
|
finished_packet_sender.send(()).unwrap();
|
||||||
continue_receiver.recv().unwrap();
|
continue_receiver.recv().unwrap();
|
||||||
});
|
}));
|
||||||
// When the poh recorder has a bank, it should process all buffered packets.
|
// When the poh recorder has a bank, it should process all buffered packets.
|
||||||
let num_conflicting_transactions = transactions.len();
|
let num_conflicting_transactions = transactions.len();
|
||||||
poh_recorder.write().unwrap().set_bank(&bank, false);
|
poh_recorder.write().unwrap().set_bank(&bank, false);
|
||||||
|
@ -1917,6 +1821,7 @@ mod tests {
|
||||||
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
|
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
|
||||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||||
let committer = Committer::new(None, replay_vote_sender);
|
let committer = Committer::new(None, replay_vote_sender);
|
||||||
|
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, test_fn);
|
||||||
|
|
||||||
// Start up thread to process the banks
|
// Start up thread to process the banks
|
||||||
let t_consume = Builder::new()
|
let t_consume = Builder::new()
|
||||||
|
@ -1937,16 +1842,11 @@ mod tests {
|
||||||
),
|
),
|
||||||
ThreadType::Transactions,
|
ThreadType::Transactions,
|
||||||
);
|
);
|
||||||
Consumer::consume_buffered_packets(
|
consumer.consume_buffered_packets(
|
||||||
&bank_start,
|
&bank_start,
|
||||||
&mut buffered_packet_batches,
|
&mut buffered_packet_batches,
|
||||||
test_fn,
|
|
||||||
&BankingStageStats::default(),
|
&BankingStageStats::default(),
|
||||||
&committer,
|
|
||||||
&recorder,
|
|
||||||
&QosService::new(1),
|
|
||||||
&mut LeaderSlotMetricsTracker::new(0),
|
&mut LeaderSlotMetricsTracker::new(0),
|
||||||
None,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// Check everything is correct. All valid packets should be processed.
|
// Check everything is correct. All valid packets should be processed.
|
||||||
|
|
Loading…
Reference in New Issue