parent
13107b4eb6
commit
5f6755f58b
|
@ -94,7 +94,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
|
|||
);
|
||||
let (s, _r) = unbounded();
|
||||
let committer = Committer::new(None, s);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
// This tests the performance of buffering packets.
|
||||
// If the packet buffers are copied, performance will be poor.
|
||||
bencher.iter(move || {
|
||||
|
|
|
@ -399,7 +399,6 @@ impl BankingStage {
|
|||
poh_recorder.read().unwrap().recorder(),
|
||||
QosService::new(id),
|
||||
log_messages_bytes_limit,
|
||||
None,
|
||||
);
|
||||
|
||||
Builder::new()
|
||||
|
|
|
@ -70,7 +70,6 @@ pub struct Consumer {
|
|||
transaction_recorder: TransactionRecorder,
|
||||
qos_service: QosService,
|
||||
log_messages_bytes_limit: Option<usize>,
|
||||
test_fn: Option<Box<dyn Fn() + Send>>,
|
||||
}
|
||||
|
||||
impl Consumer {
|
||||
|
@ -79,14 +78,12 @@ impl Consumer {
|
|||
transaction_recorder: TransactionRecorder,
|
||||
qos_service: QosService,
|
||||
log_messages_bytes_limit: Option<usize>,
|
||||
test_fn: Option<Box<dyn Fn() + Send>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
committer,
|
||||
transaction_recorder,
|
||||
qos_service,
|
||||
log_messages_bytes_limit,
|
||||
test_fn,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -198,9 +195,6 @@ impl Consumer {
|
|||
// Out of the buffered packets just retried, collect any still unprocessed
|
||||
// transactions in this batch for forwarding
|
||||
*rebuffered_packet_count += retryable_transaction_indexes.len();
|
||||
if let Some(test_fn) = &self.test_fn {
|
||||
test_fn();
|
||||
}
|
||||
|
||||
payload
|
||||
.slot_metrics_tracker
|
||||
|
@ -758,7 +752,7 @@ mod tests {
|
|||
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let committer = Committer::new(None, replay_vote_sender);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
let process_transactions_summary =
|
||||
consumer.process_transactions(&bank, &Instant::now(), &transactions);
|
||||
|
||||
|
@ -898,7 +892,7 @@ mod tests {
|
|||
poh_recorder.write().unwrap().set_bank(&bank, false);
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let committer = Committer::new(None, replay_vote_sender);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
|
||||
let process_transactions_batch_output =
|
||||
consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||
|
@ -1021,7 +1015,7 @@ mod tests {
|
|||
poh_recorder.write().unwrap().set_bank(&bank, false);
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let committer = Committer::new(None, replay_vote_sender);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
|
||||
let process_transactions_batch_output =
|
||||
consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||
|
@ -1089,7 +1083,7 @@ mod tests {
|
|||
poh_recorder.write().unwrap().set_bank(&bank, false);
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let committer = Committer::new(None, replay_vote_sender);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
|
||||
let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost();
|
||||
let get_tx_count = || bank.read_cost_tracker().unwrap().transaction_count();
|
||||
|
@ -1207,7 +1201,7 @@ mod tests {
|
|||
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let committer = Committer::new(None, replay_vote_sender);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
|
||||
let process_transactions_batch_output =
|
||||
consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||
|
@ -1400,8 +1394,7 @@ mod tests {
|
|||
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let committer = Committer::new(None, replay_vote_sender);
|
||||
let consumer =
|
||||
Consumer::new(committer, recorder.clone(), QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None);
|
||||
|
||||
let process_transactions_summary =
|
||||
consumer.process_transactions(&bank, &Instant::now(), &transactions);
|
||||
|
@ -1525,7 +1518,7 @@ mod tests {
|
|||
}),
|
||||
replay_vote_sender,
|
||||
);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
|
||||
let _ = consumer.process_and_record_transactions(&bank, &transactions, 0);
|
||||
|
||||
|
@ -1662,7 +1655,7 @@ mod tests {
|
|||
}),
|
||||
replay_vote_sender,
|
||||
);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
|
||||
let _ = consumer.process_and_record_transactions(&bank, &[sanitized_tx.clone()], 0);
|
||||
|
||||
|
@ -1720,7 +1713,7 @@ mod tests {
|
|||
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let committer = Committer::new(None, replay_vote_sender);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
|
||||
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
|
||||
assert!(!poh_recorder.read().unwrap().has_bank());
|
||||
|
@ -1796,7 +1789,7 @@ mod tests {
|
|||
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let committer = Committer::new(None, replay_vote_sender);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
|
||||
// When the working bank in poh_recorder is None, no packets should be processed
|
||||
assert!(!poh_recorder.read().unwrap().has_bank());
|
||||
|
@ -1846,7 +1839,7 @@ mod tests {
|
|||
|
||||
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
|
||||
let committer = Committer::new(None, replay_vote_sender);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
|
||||
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
|
||||
|
||||
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
|
||||
assert!(!poh_recorder.read().unwrap().has_bank());
|
||||
|
|
Loading…
Reference in New Issue