Greedy receive in banking stage (#25060)

* Greedy receive in banking stage

* add upperbound to batch size and batching time

* update test_banking_stage_entryfication test
This commit is contained in:
Pankaj Garg 2022-05-08 10:47:55 -07:00 committed by GitHub
parent 032a2b8215
commit 362b0526cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 60 additions and 23 deletions

View File

@ -86,6 +86,9 @@ const MIN_THREADS_BANKING: u32 = 1;
const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING;
const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128;
const MAX_RECEIVE_BATCH_SIZE_PER_ITERATION: usize = 50_000;
const MAX_RECEIVE_TIME_MS_PER_ITERATION: u64 = 50;
pub struct ProcessTransactionBatchOutput {
// The number of transactions filtered out by the cost model
cost_model_throttled_transactions_count: usize,
@ -1977,6 +1980,25 @@ impl BankingStage {
.collect()
}
fn receive_until(
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
recv_timeout: Duration,
batching_timeout: Duration,
batch_size_upperbound: usize,
) -> Result<Vec<PacketBatch>, RecvTimeoutError> {
let start = Instant::now();
let mut packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
while let Ok(packet_batch) = verified_receiver.try_recv() {
trace!("got more packets");
packet_batches.extend(packet_batch);
if start.elapsed() >= batching_timeout || packet_batches.len() >= batch_size_upperbound
{
break;
}
}
Ok(packet_batches)
}
#[allow(clippy::too_many_arguments)]
/// Receive incoming packets, push into unprocessed buffer with packet indexes
fn receive_and_buffer_packets(
@ -1989,7 +2011,12 @@ impl BankingStage {
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> Result<(), RecvTimeoutError> {
let mut recv_time = Measure::start("receive_and_buffer_packets_recv");
let packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
let packet_batches = Self::receive_until(
verified_receiver,
recv_timeout,
Duration::from_millis(MAX_RECEIVE_TIME_MS_PER_ITERATION),
MAX_RECEIVE_BATCH_SIZE_PER_ITERATION,
)?;
recv_time.stop();
let packet_batches_len = packet_batches.len();
@ -2442,29 +2469,7 @@ mod tests {
} = create_slow_genesis_config(2);
let (verified_sender, verified_receiver) = unbounded();
// Process a batch that includes a transaction that receives two lamports.
let alice = Keypair::new();
let tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 2, genesis_config.hash());
let packet_batches = to_packet_batches(&[tx], 1);
let packet_batches = packet_batches
.into_iter()
.map(|batch| (batch, vec![1u8]))
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send(packet_batches).unwrap();
// Process a second batch that uses the same from account, so conflicts with above TX
let tx =
system_transaction::transfer(&mint_keypair, &alice.pubkey(), 1, genesis_config.hash());
let packet_batches = to_packet_batches(&[tx], 1);
let packet_batches = packet_batches
.into_iter()
.map(|batch| (batch, vec![1u8]))
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send(packet_batches).unwrap();
let (vote_sender, vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();
@ -2501,6 +2506,38 @@ mod tests {
Arc::new(RwLock::new(CostModel::default())),
);
// Process a batch that includes a transaction that receives two lamports.
let tx = system_transaction::transfer(
&mint_keypair,
&alice.pubkey(),
2,
genesis_config.hash(),
);
let packet_batches = to_packet_batches(&[tx], 1);
let packet_batches = packet_batches
.into_iter()
.map(|batch| (batch, vec![1u8]))
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send(packet_batches).unwrap();
sleep(Duration::from_millis(200));
// Process a second batch that uses the same from account, so conflicts with above TX
let tx = system_transaction::transfer(
&mint_keypair,
&alice.pubkey(),
1,
genesis_config.hash(),
);
let packet_batches = to_packet_batches(&[tx], 1);
let packet_batches = packet_batches
.into_iter()
.map(|batch| (batch, vec![1u8]))
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send(packet_batches).unwrap();
// wait for banking_stage to eat the packets
while bank.get_balance(&alice.pubkey()) < 2 {
sleep(Duration::from_millis(100));