From 870ac80b7948f22dca0eceea0061b1f74c64f56b Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 4 May 2022 21:50:56 -0500 Subject: [PATCH] Prioritize BankingStage packets individually in min-max heap (#24187) --- Cargo.lock | 7 + core/Cargo.toml | 4 + core/benches/banking_stage.rs | 20 +- core/benches/unprocessed_packet_batches.rs | 176 ++++ core/src/banking_stage.rs | 889 +++++++++--------- core/src/leader_slot_banking_stage_metrics.rs | 43 + ...eader_slot_banking_stage_timing_metrics.rs | 10 + core/src/unprocessed_packet_batches.rs | 681 +++++++++----- programs/bpf/Cargo.lock | 7 + sdk/src/packet.rs | 4 +- 10 files changed, 1111 insertions(+), 730 deletions(-) create mode 100644 core/benches/unprocessed_packet_batches.rs diff --git a/Cargo.lock b/Cargo.lock index d63ffbbf4..87dcd547a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2402,6 +2402,12 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "min-max-heap" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2687e6cf9c00f48e9284cf9fd15f2ef341d03cc7743abf9df4c5f07fdee50b18" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -4615,6 +4621,7 @@ dependencies = [ "log", "lru", "matches", + "min-max-heap", "rand 0.7.3", "rand_chacha 0.2.2", "raptorq", diff --git a/core/Cargo.toml b/core/Cargo.toml index f9bdf15cb..14cb7bba5 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -27,6 +27,7 @@ histogram = "0.6.9" itertools = "0.10.3" log = "0.4.17" lru = "0.7.5" +min-max-heap = "1.3.0" rand = "0.7.0" rand_chacha = "0.2.2" rayon = "1.5.2" @@ -96,5 +97,8 @@ name = "sigverify_stage" [[bench]] name = "retransmit_stage" +[[bench]] +name = "unprocessed_packet_batches" + [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index da4a9a899..a665451c3 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -76,18 +76,11 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let recorder = poh_recorder.lock().unwrap().recorder(); let tx = test_tx(); - let len = 4096; - let chunk_size = 1024; - let batches = to_packet_batches(&vec![tx; len], chunk_size); - let mut packet_batches = UnprocessedPacketBatches::new(); - for batch in batches { - let batch_len = batch.packets.len(); - packet_batches.push_back(DeserializedPacketBatch::new( - batch, - vec![0usize; batch_len], - false, - )); - } + let transactions = vec![tx; 4194304]; + let batches = transactions_to_deserialized_packets(&transactions).unwrap(); + let batches_len = batches.len(); + let mut transaction_buffer = + UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len); let (s, _r) = unbounded(); // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. @@ -96,7 +89,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &my_pubkey, std::u128::MAX, &poh_recorder, - &mut packet_batches, + &mut transaction_buffer, None, &s, None::>, @@ -104,6 +97,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &recorder, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), &mut LeaderSlotMetricsTracker::new(0), + 10, ); }); diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs new file mode 100644 index 000000000..e490db9ce --- /dev/null +++ b/core/benches/unprocessed_packet_batches.rs @@ -0,0 +1,176 @@ +#![allow(clippy::integer_arithmetic)] +#![feature(test)] + +extern crate test; + +use { + rand::distributions::{Distribution, Uniform}, + solana_core::unprocessed_packet_batches::*, + solana_measure::measure::Measure, + solana_perf::packet::{Packet, PacketBatch}, + solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, + test::Bencher, +}; + +fn build_packet_batch(packet_per_batch_count: usize) -> (PacketBatch, Vec) { + let packet_batch = PacketBatch::new( + (0..packet_per_batch_count) + .map(|sender_stake| { + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut packet = Packet::from_data(None, &tx).unwrap(); + packet.meta.sender_stake = sender_stake as u64; + packet + }) + .collect(), + ); + let packet_indexes: Vec = (0..packet_per_batch_count).collect(); + + (packet_batch, packet_indexes) +} + +fn build_randomized_packet_batch(packet_per_batch_count: usize) -> (PacketBatch, Vec) { + let mut rng = rand::thread_rng(); + let distribution = Uniform::from(0..200_000); + + let packet_batch = PacketBatch::new( + (0..packet_per_batch_count) + .map(|_| { + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let mut packet = Packet::from_data(None, &tx).unwrap(); + let sender_stake = distribution.sample(&mut rng); + packet.meta.sender_stake = sender_stake as u64; + packet + }) + .collect(), + ); + let packet_indexes: Vec = (0..packet_per_batch_count).collect(); + + (packet_batch, packet_indexes) +} + +fn insert_packet_batches( + buffer_max_size: usize, + batch_count: usize, + packet_per_batch_count: usize, + randomize: bool, +) { + solana_logger::setup(); + let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size); + + let mut timer = Measure::start("insert_batch"); + (0..batch_count).for_each(|_| { + let (packet_batch, packet_indexes) = if randomize { + build_randomized_packet_batch(packet_per_batch_count) + } else { + build_packet_batch(packet_per_batch_count) + }; + let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes, None); + unprocessed_packet_batches.insert_batch(deserialized_packets); + }); + timer.stop(); + log::info!( + "inserted {} batch, elapsed {}", + buffer_max_size, + timer.as_us() + ); +} + +#[bench] +#[allow(clippy::unit_arg)] +fn bench_packet_clone(bencher: &mut Bencher) { + let batch_count = 1000; + let packet_per_batch_count = 128; + + let packet_batches: Vec = (0..batch_count) + .map(|_| build_packet_batch(packet_per_batch_count).0) + .collect(); + + bencher.iter(|| { + test::black_box(packet_batches.iter().for_each(|packet_batch| { + let mut outer_packet = Packet::default(); + + let mut timer = Measure::start("insert_batch"); + packet_batch.packets.iter().for_each(|packet| { + let mut packet = packet.clone(); + packet.meta.sender_stake *= 2; + if packet.meta.sender_stake > 2 { + outer_packet = packet; + } + }); + + timer.stop(); + })); + }); +} + +//* +// v1, bench: 5,600,038,163 ns/iter (+/- 940,818,988) +// v2, bench: 5,265,382,750 ns/iter (+/- 153,623,264) +#[bench] +#[ignore] +fn bench_unprocessed_packet_batches_within_limit(bencher: &mut Bencher) { + let buffer_capacity = 1_000 * 128; + let batch_count = 1_000; + let packet_per_batch_count = 128; + + bencher.iter(|| { + insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, false); + }); +} + +// v1, bench: 6,607,014,940 ns/iter (+/- 768,191,361) +// v2, bench: 5,692,753,323 ns/iter (+/- 548,959,624) +#[bench] +#[ignore] +fn bench_unprocessed_packet_batches_beyond_limit(bencher: &mut Bencher) { + let buffer_capacity = 1_000 * 128; + let batch_count = 1_100; + let packet_per_batch_count = 128; + + // this is the worst scenario testing: all batches are uniformly populated with packets from + // priority 100..228, so in order to drop a batch, algo will have to drop all packets that has + // priority < 228, plus one 228. That's 2000 batch * 127 packets + 1 + // Also, since all batches have same stake distribution, the new one is always the one got + // dropped. Tho it does not change algo complexity. + bencher.iter(|| { + insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, false); + }); +} +// */ +// v1, bench: 5,843,307,086 ns/iter (+/- 844,249,298) +// v2, bench: 5,139,525,951 ns/iter (+/- 48,005,521) +#[bench] +#[ignore] +fn bench_unprocessed_packet_batches_randomized_within_limit(bencher: &mut Bencher) { + let buffer_capacity = 1_000 * 128; + let batch_count = 1_000; + let packet_per_batch_count = 128; + + bencher.iter(|| { + insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true); + }); +} + +// v1, bench: 6,497,623,849 ns/iter (+/- 3,206,382,212) +// v2, bench: 5,762,071,682 ns/iter (+/- 168,244,418) +#[bench] +#[ignore] +fn bench_unprocessed_packet_batches_randomized_beyond_limit(bencher: &mut Bencher) { + let buffer_capacity = 1_000 * 128; + let batch_count = 1_100; + let packet_per_batch_count = 128; + + bencher.iter(|| { + insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true); + }); +} diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 8656f1ece..da85044b8 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -8,12 +8,12 @@ use { LeaderExecuteAndCommitTimings, RecordTransactionsTimings, }, qos_service::QosService, - unprocessed_packet_batches::*, + unprocessed_packet_batches::{self, *}, }, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, histogram::Histogram, itertools::Itertools, - retain_mut::RetainMut, + min_max_heap::MinMaxHeap, solana_client::connection_cache::send_wire_transaction_batch, solana_entry::entry::hash_transactions, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, @@ -60,6 +60,7 @@ use { collections::HashMap, env, net::SocketAddr, + rc::Rc, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, RwLock, @@ -83,6 +84,7 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING; +const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; pub struct ProcessTransactionBatchOutput { // The number of transactions filtered out by the cost model @@ -129,7 +131,6 @@ pub struct BankingStageStats { last_report: AtomicInterval, id: u32, receive_and_buffer_packets_count: AtomicUsize, - dropped_packet_batches_count: AtomicUsize, dropped_packets_count: AtomicUsize, pub(crate) dropped_duplicated_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize, @@ -166,7 +167,6 @@ impl BankingStageStats { 0 == self .receive_and_buffer_packets_count .load(Ordering::Relaxed) as u64 - + self.dropped_packet_batches_count.load(Ordering::Relaxed) as u64 + self.dropped_packets_count.load(Ordering::Relaxed) as u64 + self .dropped_duplicated_packets_count @@ -211,11 +211,6 @@ impl BankingStageStats { .swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "dropped_packet_batches_count", - self.dropped_packet_batches_count.swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "dropped_packets_count", self.dropped_packets_count.swap(0, Ordering::Relaxed) as i64, @@ -421,8 +416,8 @@ impl BankingStage { // This thread talks to poh_service and broadcasts the entries once they have been recorded. // Once an entry has been recorded, its blockhash is registered with the bank. let data_budget = Arc::new(DataBudget::default()); - let batch_limit = TOTAL_BUFFERED_PACKETS - / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize * PACKETS_PER_BATCH); + let batch_limit = + TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { @@ -470,15 +465,15 @@ impl BankingStage { } fn filter_valid_packets_for_forwarding<'a>( - packet_batches: impl Iterator, + deserialized_packets: impl Iterator, ) -> Vec<&'a Packet> { - packet_batches - .filter(|deserialized_packet_batch| !deserialized_packet_batch.forwarded) - .flat_map(|deserialized_packet_batch| { - deserialized_packet_batch - .unprocessed_packets - .iter() - .map(|(index, _)| &deserialized_packet_batch.packet_batch.packets[*index]) + deserialized_packets + .filter_map(|deserialized_packet| { + if !deserialized_packet.forwarded { + Some(deserialized_packet.immutable_section().original_packet()) + } else { + None + } }) .collect() } @@ -553,57 +548,28 @@ impl BankingStage { recorder: &TransactionRecorder, qos_service: &QosService, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + num_packets_to_process_per_iteration: usize, ) { let mut rebuffered_packet_count = 0; let mut consumed_buffered_packets_count = 0; - let buffered_packet_batches_len = buffered_packet_batches.len(); + let buffered_packets_len = buffered_packet_batches.len(); let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot: Option = None; - RetainMut::retain_mut(buffered_packet_batches, |deserialized_packet_batch| { - let original_unprocessed_indexes = deserialized_packet_batch - .unprocessed_packets - .keys() - .cloned() - .collect::>(); - if let Some(end_of_slot) = &reached_end_of_slot { - // We've hit the end of this slot, no need to perform more processing, - // just filter the remaining packets for the invalid (e.g. too old) ones - // if the working_bank is available - let mut end_of_slot_filtering_time = Measure::start("end_of_slot_filtering"); - let should_retain = if let Some(bank) = &end_of_slot.working_bank { - let new_unprocessed_indexes = Self::filter_unprocessed_packets_at_end_of_slot( - bank, - &deserialized_packet_batch.unprocessed_packets, - my_pubkey, - end_of_slot.next_slot_leader, - banking_stage_stats, - ); + let mut retryable_packets = MinMaxHeap::with_capacity(buffered_packet_batches.capacity()); + std::mem::swap( + &mut buffered_packet_batches.packet_priority_queue, + &mut retryable_packets, + ); - let end_of_slot_filtered_invalid_count = original_unprocessed_indexes - .len() - .saturating_sub(new_unprocessed_indexes.len()); - - slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( - end_of_slot_filtered_invalid_count as u64, - ); - - banking_stage_stats - .end_of_slot_filtered_invalid_count - .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); - - deserialized_packet_batch.update_buffered_packets_with_new_unprocessed( - &original_unprocessed_indexes, - &new_unprocessed_indexes, - ) - } else { - true - }; - end_of_slot_filtering_time.stop(); - slot_metrics_tracker - .increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us()); - should_retain - } else { + let mut retryable_packets: MinMaxHeap> = retryable_packets + .drain_desc() + .chunks(num_packets_to_process_per_iteration) + .into_iter() + .flat_map(|packets_to_process| { + let packets_to_process = packets_to_process.into_iter().collect_vec(); + // TODO: Right now we iterate through buffer and try the highest weighted transaction once + // but we should retry the highest weighted transactions more often. let (bank_start, poh_recorder_lock_time) = Measure::this( |_| poh_recorder.lock().unwrap().bank_start(), (), @@ -613,6 +579,7 @@ impl BankingStage { poh_recorder_lock_time.as_us(), ); + let packets_to_process_len = packets_to_process.len(); if let Some(BankStart { working_bank, bank_creation_time, @@ -625,7 +592,7 @@ impl BankingStage { &working_bank, &bank_creation_time, recorder, - &deserialized_packet_batch.unprocessed_packets, + packets_to_process.iter().map(|p| &**p), transaction_status_sender.clone(), gossip_vote_sender, banking_stage_stats, @@ -679,22 +646,44 @@ impl BankingStage { // duplicate signature, etc.) // // Note: This assumes that every packet deserializes into one transaction! - consumed_buffered_packets_count += original_unprocessed_indexes - .len() - .saturating_sub(retryable_transaction_indexes.len()); + consumed_buffered_packets_count += + packets_to_process_len.saturating_sub(retryable_transaction_indexes.len()); // Out of the buffered packets just retried, collect any still unprocessed // transactions in this batch for forwarding rebuffered_packet_count += retryable_transaction_indexes.len(); - let has_more_unprocessed_transactions = deserialized_packet_batch - .update_buffered_packets_with_new_unprocessed( - &original_unprocessed_indexes, - &retryable_transaction_indexes, - ); if let Some(test_fn) = &test_fn { test_fn(); } - has_more_unprocessed_transactions + + slot_metrics_tracker.increment_retryable_packets_count( + retryable_transaction_indexes.len() as u64, + ); + + let result = retryable_transaction_indexes + .iter() + .map(|i| packets_to_process[*i].clone()) + .collect_vec(); + + // Remove the non-retryable packets, packets that were either: + // 1) Successfully processed + // 2) Failed but not retryable + Self::filter_processed_packets( + retryable_transaction_indexes + .iter() + .chain(std::iter::once(&packets_to_process.len())), + |start, end| { + for processed_packet in &packets_to_process[start..end] { + buffered_packet_batches + .message_hash_to_transaction + .remove(processed_packet.message_hash()); + } + }, + ); + + result + } else if reached_end_of_slot.is_some() { + packets_to_process } else { // mark as end-of-slot to avoid aggressively lock poh for the remaining for // packet batches in buffer @@ -715,24 +704,70 @@ impl BankingStage { poh_recorder_lock_time.as_us(), ); - // `original_unprocessed_indexes` must have remaining packets to process - // if not yet processed. - assert!(!original_unprocessed_indexes.is_empty()); - true + packets_to_process } - } - }); - proc_start.stop(); + }) + .collect(); + std::mem::swap( + &mut retryable_packets, + &mut buffered_packet_batches.packet_priority_queue, + ); + + if let Some(end_of_slot) = &reached_end_of_slot { + slot_metrics_tracker + .set_end_of_slot_unprocessed_buffer_len(buffered_packet_batches.len() as u64); + + // We've hit the end of this slot, no need to perform more processing, + // just filter the remaining packets for the invalid (e.g. too old) ones + // if the working_bank is available + let mut end_of_slot_filtering_time = Measure::start("end_of_slot_filtering"); + // TODO: This doesn't have to be done at the end of every slot, can instead + // hold multiple unbuffered queues without merging them + + // TODO: update this here to filter the rest of the packets remaining + // TODO: this needs to be done even if there is no end_of_slot.working_bank + // to put retryable packets back in buffer + let end_of_slot_filtered_invalid_count = + Self::filter_unprocessed_packets_at_end_of_slot( + &end_of_slot.working_bank, + buffered_packet_batches, + my_pubkey, + end_of_slot.next_slot_leader, + banking_stage_stats, + ); + + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + end_of_slot_filtered_invalid_count + ); + slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( + end_of_slot_filtered_invalid_count as u64, + ); + banking_stage_stats + .end_of_slot_filtered_invalid_count + .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); + + end_of_slot_filtering_time.stop(); + slot_metrics_tracker + .increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us()); + } + + proc_start.stop(); debug!( "@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}", timestamp(), - buffered_packet_batches_len, + buffered_packets_len, proc_start.as_ms(), consumed_buffered_packets_count, (consumed_buffered_packets_count as f32) / (proc_start.as_s()) ); + // Assert unprocessed queue is still consistent + assert_eq!( + buffered_packet_batches.packet_priority_queue.len(), + buffered_packet_batches.message_hash_to_transaction.len() + ); banking_stage_stats .consume_buffered_packets_elapsed .fetch_add(proc_start.as_us(), Ordering::Relaxed); @@ -843,6 +878,7 @@ impl BankingStage { recorder, qos_service, slot_metrics_tracker, + UNPROCESSED_BUFFER_STEP_SIZE, ) }, (), @@ -940,11 +976,8 @@ impl BankingStage { } if hold { - buffered_packet_batches.retain(|deserialized_packet_batch| { - !deserialized_packet_batch.unprocessed_packets.is_empty() - }); - for deserialized_packet_batch in buffered_packet_batches.iter_mut() { - deserialized_packet_batch.forwarded = true; + for deserialized_packet in buffered_packet_batches.iter_mut() { + deserialized_packet.forwarded = true; } } else { slot_metrics_tracker @@ -1039,7 +1072,6 @@ impl BankingStage { recv_start, recv_timeout, id, - batch_limit, &mut buffered_packet_batches, &mut banking_stage_stats, &mut slot_metrics_tracker, @@ -1695,30 +1727,25 @@ impl BankingStage { // messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned // with their packet indexes. #[allow(clippy::needless_collect)] - fn transactions_from_packets( - deserialized_packet_batch: &HashMap, + fn transaction_from_deserialized_packet( + deserialized_packet: &ImmutableDeserializedPacket, feature_set: &Arc, votes_only: bool, address_loader: impl AddressLoader, - ) -> (Vec, Vec) { - deserialized_packet_batch - .iter() - .filter_map(|(&tx_index, deserialized_packet)| { - if votes_only && !deserialized_packet.is_simple_vote { - return None; - } + ) -> Option { + if votes_only && !deserialized_packet.is_simple_vote() { + return None; + } - let tx = SanitizedTransaction::try_create( - deserialized_packet.versioned_transaction.clone(), - deserialized_packet.message_hash, - Some(deserialized_packet.is_simple_vote), - address_loader.clone(), - ) - .ok()?; - tx.verify_precompiles(feature_set).ok()?; - Some((tx, tx_index)) - }) - .unzip() + let tx = SanitizedTransaction::try_create( + deserialized_packet.versioned_transaction().clone(), + *deserialized_packet.message_hash(), + Some(deserialized_packet.is_simple_vote()), + address_loader, + ) + .ok()?; + tx.verify_precompiles(feature_set).ok()?; + Some(tx) } /// This function filters pending packets that are still valid @@ -1760,31 +1787,60 @@ impl BankingStage { Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes) } + fn filter_processed_packets<'a, F>( + retryable_transaction_indexes: impl Iterator, + mut f: F, + ) where + F: FnMut(usize, usize), + { + let mut prev_retryable_index = 0; + for (i, retryable_index) in retryable_transaction_indexes.enumerate() { + let start = if i == 0 { 0 } else { prev_retryable_index + 1 }; + + let end = *retryable_index; + prev_retryable_index = *retryable_index; + + if start < end { + f(start, end) + } + } + } + #[allow(clippy::too_many_arguments)] - fn process_packets_transactions( - bank: &Arc, + fn process_packets_transactions<'a>( + bank: &'a Arc, bank_creation_time: &Instant, - poh: &TransactionRecorder, - deserialized_packet_batch: &HashMap, + poh: &'a TransactionRecorder, + deserialized_packets: impl Iterator, transaction_status_sender: Option, - gossip_vote_sender: &ReplayVoteSender, - banking_stage_stats: &BankingStageStats, - qos_service: &QosService, - slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + gossip_vote_sender: &'a ReplayVoteSender, + banking_stage_stats: &'a BankingStageStats, + qos_service: &'a QosService, + slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, ) -> ProcessTransactionsSummary { // Convert packets to transactions - let ((transactions, transaction_to_packet_indexes), packet_conversion_time) = Measure::this( + let ((transactions, transaction_to_packet_indexes), packet_conversion_time): ( + (Vec, Vec), + _, + ) = Measure::this( |_| { - Self::transactions_from_packets( - deserialized_packet_batch, - &bank.feature_set, - bank.vote_only_bank(), - bank.as_ref(), - ) + deserialized_packets + .enumerate() + .filter_map(|(i, deserialized_packet)| { + Self::transaction_from_deserialized_packet( + deserialized_packet, + &bank.feature_set, + bank.vote_only_bank(), + bank.as_ref(), + ) + .map(|transaction| (transaction, i)) + }) + .unzip() }, (), "packet_conversion", ); + let packet_conversion_us = packet_conversion_time.as_us(); slot_metrics_tracker.increment_transactions_from_packets_us(packet_conversion_us); banking_stage_stats @@ -1826,7 +1882,7 @@ impl BankingStage { let retryable_tx_count = retryable_transaction_indexes.len(); inc_new_counter_info!("banking_stage-unprocessed_transactions", retryable_tx_count); - // Filter out transactions that can't be retried + // Filter out the retryable transactions that are too old let (filtered_retryable_transaction_indexes, filter_retryable_packets_time) = Measure::this( |_| { Self::filter_pending_packets_from_pending_txs( @@ -1864,57 +1920,52 @@ impl BankingStage { process_transactions_summary } + // Returns the number of packets that were filtered out for + // no longer being valid (could be too old, a duplicate of something + // already processed, etc.) fn filter_unprocessed_packets_at_end_of_slot( - bank: &Arc, - deserialized_packet_batch: &HashMap, + bank: &Option>, + unprocessed_packets: &mut UnprocessedPacketBatches, my_pubkey: &Pubkey, next_leader: Option, banking_stage_stats: &BankingStageStats, - ) -> Vec { + ) -> usize { // Check if we are the next leader. If so, let's not filter the packets // as we'll filter it again while processing the packets. // Filtering helps if we were going to forward the packets to some other node - if let Some(leader) = next_leader { - if leader == *my_pubkey { - return deserialized_packet_batch - .keys() - .cloned() - .collect::>(); - } + let will_still_be_leader = next_leader + .map(|next_leader| next_leader == *my_pubkey) + .unwrap_or(false); + let should_filter_unprocessed_packets = !will_still_be_leader && bank.is_some(); + let original_unprocessed_packets_len = unprocessed_packets.len(); + + if should_filter_unprocessed_packets { + // If `should_filter_unprocessed_packets` is true, then the bank + // must be `Some` + let bank = bank.as_ref().unwrap(); + let mut unprocessed_packet_conversion_time = + Measure::start("unprocessed_packet_conversion"); + + let should_retain = |deserialized_packet: &mut DeserializedPacket| { + Self::transaction_from_deserialized_packet( + deserialized_packet.immutable_section(), + &bank.feature_set, + bank.vote_only_bank(), + bank.as_ref(), + ) + .is_some() + }; + unprocessed_packets.retain(should_retain); + unprocessed_packet_conversion_time.stop(); + banking_stage_stats + .unprocessed_packet_conversion_elapsed + .fetch_add( + unprocessed_packet_conversion_time.as_us(), + Ordering::Relaxed, + ); } - let mut unprocessed_packet_conversion_time = - Measure::start("unprocessed_packet_conversion"); - let (transactions, transaction_to_packet_indexes) = Self::transactions_from_packets( - deserialized_packet_batch, - &bank.feature_set, - bank.vote_only_bank(), - bank.as_ref(), - ); - unprocessed_packet_conversion_time.stop(); - - let unprocessed_tx_indexes = (0..transactions.len()).collect_vec(); - let filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( - bank, - &transactions, - &transaction_to_packet_indexes, - &unprocessed_tx_indexes, - ); - - inc_new_counter_info!( - "banking_stage-dropped_tx_before_forwarding", - unprocessed_tx_indexes - .len() - .saturating_sub(filtered_unprocessed_packet_indexes.len()) - ); - banking_stage_stats - .unprocessed_packet_conversion_elapsed - .fetch_add( - unprocessed_packet_conversion_time.as_us(), - Ordering::Relaxed, - ); - - filtered_unprocessed_packet_indexes + original_unprocessed_packets_len.saturating_sub(unprocessed_packets.len()) } fn generate_packet_indexes(vers: &PinnedVec) -> Vec { @@ -1932,7 +1983,6 @@ impl BankingStage { recv_start: &mut Instant, recv_timeout: Duration, id: u32, - batch_limit: usize, buffered_packet_batches: &mut UnprocessedPacketBatches, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, @@ -1954,7 +2004,6 @@ impl BankingStage { let packet_batch_iter = packet_batches.into_iter(); let mut dropped_packets_count = 0; - let mut dropped_packet_batches_count = 0; let mut newly_buffered_packets_count = 0; for packet_batch in packet_batch_iter { let packet_indexes = Self::generate_packet_indexes(&packet_batch.packets); @@ -1969,12 +2018,10 @@ impl BankingStage { Self::push_unprocessed( buffered_packet_batches, - packet_batch, - packet_indexes, - &mut dropped_packet_batches_count, + &packet_batch, + &packet_indexes, &mut dropped_packets_count, &mut newly_buffered_packets_count, - batch_limit, banking_stage_stats, slot_metrics_tracker, ) @@ -1995,9 +2042,6 @@ impl BankingStage { banking_stage_stats .receive_and_buffer_packets_count .fetch_add(packet_count, Ordering::Relaxed); - banking_stage_stats - .dropped_packet_batches_count - .fetch_add(dropped_packet_batches_count, Ordering::Relaxed); banking_stage_stats .dropped_packets_count .fetch_add(dropped_packets_count, Ordering::Relaxed); @@ -2007,40 +2051,23 @@ impl BankingStage { banking_stage_stats .current_buffered_packet_batches_count .swap(buffered_packet_batches.len(), Ordering::Relaxed); - banking_stage_stats.current_buffered_packets_count.swap( - buffered_packet_batches - .iter() - .map(|deserialized_packet_batch| { - deserialized_packet_batch.unprocessed_packets.len() - }) - .sum(), - Ordering::Relaxed, - ); + banking_stage_stats + .current_buffered_packets_count + .swap(buffered_packet_batches.len(), Ordering::Relaxed); *recv_start = Instant::now(); Ok(()) } fn push_unprocessed( unprocessed_packet_batches: &mut UnprocessedPacketBatches, - packet_batch: PacketBatch, - packet_indexes: Vec, - dropped_packet_batches_count: &mut usize, + packet_batch: &PacketBatch, + packet_indexes: &[usize], dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, - batch_limit: usize, banking_stage_stats: &mut BankingStageStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, ) { if !packet_indexes.is_empty() { - if unprocessed_packet_batches.len() >= batch_limit { - *dropped_packet_batches_count += 1; - if let Some(dropped_batch) = unprocessed_packet_batches.pop_front() { - *dropped_packets_count += dropped_batch.unprocessed_packets.len(); - slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( - dropped_batch.unprocessed_packets.len() as u64, - ); - } - } let _ = banking_stage_stats .batch_packet_indexes_len .increment(packet_indexes.len() as u64); @@ -2049,11 +2076,15 @@ impl BankingStage { slot_metrics_tracker .increment_newly_buffered_packets_count(packet_indexes.len() as u64); - unprocessed_packet_batches.push_back(DeserializedPacketBatch::new( - packet_batch, - packet_indexes, - false, - )); + let number_of_dropped_packets = unprocessed_packet_batches.insert_batch( + // Passing `None` for bank for now will make all packet weights 0 + unprocessed_packet_batches::deserialize_packets(packet_batch, packet_indexes, None), + ); + + saturating_add_assign!(*dropped_packets_count, number_of_dropped_packets); + slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( + number_of_dropped_packets as u64, + ); } } @@ -2148,11 +2179,12 @@ mod tests { solana_vote_program::vote_transaction, std::{ borrow::Cow, - net::SocketAddr, + collections::HashSet, path::Path, sync::atomic::{AtomicBool, Ordering}, thread::sleep, }, + unprocessed_packet_batches::DeserializedPacket, }; fn new_test_cluster_info(contact_info: ContactInfo) -> ClusterInfo { @@ -3162,54 +3194,36 @@ mod tests { #[test] fn test_filter_valid_packets() { solana_logger::setup(); - - let mut packet_batches = (0..16) + let mut packets: Vec = (0..256) .map(|packets_id| { - let packet_batch = PacketBatch::new( - (0..32) - .map(|packet_id| { - // packets are deserialized upon receiving, failed packets will not be - // forwarded; Therefore we need to create real packets here. - let keypair = Keypair::new(); - let pubkey = solana_sdk::pubkey::new_rand(); - let blockhash = Hash::new_unique(); - let transaction = - system_transaction::transfer(&keypair, &pubkey, 1, blockhash); - let mut p = Packet::from_data(None, &transaction).unwrap(); - p.meta.port = packets_id << 8 | packet_id; - p - }) - .collect_vec(), - ); - let valid_indexes = (0..32) - .filter_map(|x| if x % 2 != 0 { Some(x as usize) } else { None }) - .collect_vec(); - DeserializedPacketBatch::new(packet_batch, valid_indexes, false) + // packets are deserialized upon receiving, failed packets will not be + // forwarded; Therefore we need to create real packets here. + let keypair = Keypair::new(); + let pubkey = solana_sdk::pubkey::new_rand(); + let blockhash = Hash::new_unique(); + let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash); + let mut p = Packet::from_data(None, &transaction).unwrap(); + p.meta.port = packets_id; + DeserializedPacket::new(p, None).unwrap() }) .collect_vec(); - let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter()); - + let result = BankingStage::filter_valid_packets_for_forwarding(packets.iter()); assert_eq!(result.len(), 256); // packets in a batch are forwarded in arbitrary order; verify the ports match after // sorting - let expected_ports: Vec<_> = (0..16) - .flat_map(|packets_id| { - (0..16).map(move |packet_id| { - let packet_id = packet_id * 2 + 1; - (packets_id << 8 | packet_id) as u16 - }) - }) - .collect(); - + let expected_ports: Vec<_> = (0..256).collect(); let mut forwarded_ports: Vec<_> = result.into_iter().map(|p| p.meta.port).collect(); forwarded_ports.sort_unstable(); assert_eq!(expected_ports, forwarded_ports); - packet_batches[0].forwarded = true; - let result = BankingStage::filter_valid_packets_for_forwarding(packet_batches.iter()); - assert_eq!(result.len(), 240); + let num_already_forwarded = 16; + for packet in &mut packets[0..num_already_forwarded] { + packet.forwarded = true; + } + let result = BankingStage::filter_valid_packets_for_forwarding(packets.iter()); + assert_eq!(result.len(), packets.len() - num_already_forwarded); } #[test] @@ -3806,21 +3820,15 @@ mod tests { setup_conflicting_transactions(ledger_path.path()); let recorder = poh_recorder.lock().unwrap().recorder(); let num_conflicting_transactions = transactions.len(); - let mut packet_batches = to_packet_batches(&transactions, num_conflicting_transactions); - assert_eq!(packet_batches.len(), 1); - assert_eq!( - packet_batches[0].packets.len(), - num_conflicting_transactions - ); - let packet_batch = packet_batches.pop().unwrap(); + let deserialized_packets = + unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions) + .unwrap(); + assert_eq!(deserialized_packets.len(), num_conflicting_transactions); let mut buffered_packet_batches: UnprocessedPacketBatches = - vec![DeserializedPacketBatch::new( - packet_batch, - (0..num_conflicting_transactions).into_iter().collect(), - false, - )] - .into_iter() - .collect(); + UnprocessedPacketBatches::from_iter( + deserialized_packets.into_iter(), + num_conflicting_transactions, + ); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -3839,13 +3847,12 @@ mod tests { &recorder, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), &mut LeaderSlotMetricsTracker::new(0), + num_conflicting_transactions, ); - assert_eq!( - buffered_packet_batches[0].unprocessed_packets.len(), - num_conflicting_transactions - ); + assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); // When the poh recorder has a bank, should process all non conflicting buffered packets. // Processes one packet per iteration of the loop + let num_packets_to_process_per_iteration = num_conflicting_transactions; for num_expected_unprocessed in (0..num_conflicting_transactions).rev() { poh_recorder.lock().unwrap().set_bank(&bank); BankingStage::consume_buffered_packets( @@ -3860,14 +3867,12 @@ mod tests { &recorder, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), &mut LeaderSlotMetricsTracker::new(0), + num_packets_to_process_per_iteration, ); if num_expected_unprocessed == 0 { assert!(buffered_packet_batches.is_empty()) } else { - assert_eq!( - buffered_packet_batches[0].unprocessed_packets.len(), - num_expected_unprocessed - ); + assert_eq!(buffered_packet_batches.len(), num_expected_unprocessed); } } poh_recorder @@ -3884,22 +3889,10 @@ mod tests { fn test_consume_buffered_packets_interrupted() { let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = - setup_conflicting_transactions(ledger_path.path()); - let num_conflicting_transactions = transactions.len(); - let packet_batches = to_packet_batches(&transactions, 1); - assert_eq!(packet_batches.len(), num_conflicting_transactions); - for single_packet_batch in &packet_batches { - assert_eq!(single_packet_batch.packets.len(), 1); - } - let mut buffered_packet_batches: UnprocessedPacketBatches = packet_batches - .clone() - .into_iter() - .map(|single_packets| DeserializedPacketBatch::new(single_packets, vec![0], false)) - .collect(); - let (continue_sender, continue_receiver) = unbounded(); let (finished_packet_sender, finished_packet_receiver) = unbounded(); + let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = + setup_conflicting_transactions(ledger_path.path()); let test_fn = Some(move || { finished_packet_sender.send(()).unwrap(); @@ -3918,6 +3911,23 @@ mod tests { let t_consume = Builder::new() .name("consume-buffered-packets".to_string()) .spawn(move || { + let num_conflicting_transactions = transactions.len(); + let deserialized_packets = + unprocessed_packet_batches::transactions_to_deserialized_packets( + &transactions, + ) + .unwrap(); + assert_eq!(deserialized_packets.len(), num_conflicting_transactions); + let num_packets_to_process_per_iteration = 1; + let mut buffered_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter( + deserialized_packets.clone().into_iter(), + num_conflicting_transactions, + ); + let all_packet_message_hashes: HashSet = buffered_packet_batches + .iter() + .map(|packet| *packet.immutable_section().message_hash()) + .collect(); BankingStage::consume_buffered_packets( &Pubkey::default(), std::u128::MAX, @@ -3930,22 +3940,18 @@ mod tests { &recorder, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), &mut LeaderSlotMetricsTracker::new(0), + num_packets_to_process_per_iteration, ); // Check everything is correct. All indexes after `interrupted_iteration` // should still be unprocessed assert_eq!( buffered_packet_batches.len(), - packet_batches[interrupted_iteration + 1..].len() + deserialized_packets[interrupted_iteration + 1..].len() ); - for (deserialized_packet_batch, original_packet) in buffered_packet_batches - .iter() - .zip(&packet_batches[interrupted_iteration + 1..]) - { - assert_eq!( - deserialized_packet_batch.packet_batch.packets[0], - original_packet.packets[0] - ); + for packet in buffered_packet_batches.iter() { + assert!(all_packet_message_hashes + .contains(packet.immutable_section().message_hash())); } }) .unwrap(); @@ -3983,7 +3989,7 @@ mod tests { Hash::new_unique(), ); let packet = Packet::from_data(None, &tx).unwrap(); - let single_packet_batch = PacketBatch::new(vec![packet]); + let deserialized_packet = DeserializedPacket::new(packet, None).unwrap(); let genesis_config_info = create_slow_genesis_config(10_000); let GenesisConfigInfo { @@ -4020,13 +4026,10 @@ mod tests { for (name, data_budget, expected_num_forwarded) in test_cases { let mut unprocessed_packet_batches: UnprocessedPacketBatches = - vec![DeserializedPacketBatch::new( - single_packet_batch.clone(), - vec![0], - false, - )] - .into_iter() - .collect(); + UnprocessedPacketBatches::from_iter( + vec![deserialized_packet.clone()].into_iter(), + 1, + ); BankingStage::handle_forwarding( &ForwardOption::ForwardTransaction, &cluster_info, @@ -4065,24 +4068,21 @@ mod tests { let transaction = system_transaction::transfer(&keypair, &pubkey, 1, fwd_block_hash); let mut packet = Packet::from_data(None, &transaction).unwrap(); packet.meta.flags |= PacketFlags::FORWARDED; - packet + DeserializedPacket::new(packet, None).unwrap() }; let normal_block_hash = Hash::new_unique(); let normal_packet = { let transaction = system_transaction::transfer(&keypair, &pubkey, 1, normal_block_hash); - Packet::from_data(None, &transaction).unwrap() + let packet = Packet::from_data(None, &transaction).unwrap(); + DeserializedPacket::new(packet, None).unwrap() }; - let packet_batch = PacketBatch::new(vec![forwarded_packet, normal_packet]); let mut unprocessed_packet_batches: UnprocessedPacketBatches = - vec![DeserializedPacketBatch::new( - packet_batch, - vec![0, 1], - false, - )] - .into_iter() - .collect(); + UnprocessedPacketBatches::from_iter( + vec![forwarded_packet, normal_packet].into_iter(), + 2, + ); let genesis_config_info = create_slow_genesis_config(10_000); let GenesisConfigInfo { @@ -4166,10 +4166,7 @@ mod tests { ); } - let num_unprocessed_packets: usize = unprocessed_packet_batches - .iter() - .map(|b| b.packet_batch.packets.len()) - .sum(); + let num_unprocessed_packets: usize = unprocessed_packet_batches.len(); assert_eq!( num_unprocessed_packets, expected_num_unprocessed, "{}", @@ -4183,125 +4180,28 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[test] - fn test_push_unprocessed_batch_limit() { - solana_logger::setup(); - // Create `PacketBatch` with 2 unprocessed packets - let tx = system_transaction::transfer( - &Keypair::new(), - &solana_sdk::pubkey::new_rand(), - 1, - Hash::new_unique(), - ); - let packet = Packet::from_data(None, &tx).unwrap(); - let new_packet_batch = PacketBatch::new(vec![packet; 2]); - let mut unprocessed_packets: UnprocessedPacketBatches = vec![DeserializedPacketBatch::new( - new_packet_batch, - vec![0, 1], - false, - )] - .into_iter() - .collect(); - assert_eq!(unprocessed_packets.len(), 1); - assert_eq!(unprocessed_packets[0].unprocessed_packets.len(), 2); - - // Set the limit to 2 - let batch_limit = 2; - // Create new unprocessed packets and add to a batch - let new_packet_batch = PacketBatch::new(vec![Packet::default()]); - let packet_indexes = vec![]; - - let mut dropped_packet_batches_count = 0; - let mut dropped_packets_count = 0; - let mut newly_buffered_packets_count = 0; - let mut banking_stage_stats = BankingStageStats::default(); - // Because the set of unprocessed `packet_indexes` is empty, the - // packets are not added to the unprocessed queue - BankingStage::push_unprocessed( - &mut unprocessed_packets, - new_packet_batch.clone(), - packet_indexes, - &mut dropped_packet_batches_count, - &mut dropped_packets_count, - &mut newly_buffered_packets_count, - batch_limit, - &mut banking_stage_stats, - &mut LeaderSlotMetricsTracker::new(0), - ); - assert_eq!(unprocessed_packets.len(), 1); - assert_eq!(dropped_packet_batches_count, 0); - assert_eq!(dropped_packets_count, 0); - assert_eq!(newly_buffered_packets_count, 0); - - // Because the set of unprocessed `packet_indexes` is non-empty, the - // packets are added to the unprocessed queue - let packet_indexes = vec![0]; - BankingStage::push_unprocessed( - &mut unprocessed_packets, - new_packet_batch, - packet_indexes.clone(), - &mut dropped_packet_batches_count, - &mut dropped_packets_count, - &mut newly_buffered_packets_count, - batch_limit, - &mut banking_stage_stats, - &mut LeaderSlotMetricsTracker::new(0), - ); - assert_eq!(unprocessed_packets.len(), 2); - assert_eq!(dropped_packet_batches_count, 0); - assert_eq!(dropped_packets_count, 0); - assert_eq!(newly_buffered_packets_count, 1); - - // Because we've reached the batch limit, old unprocessed packets are - // dropped and the new one is appended to the end - let new_packet_batch = PacketBatch::new(vec![Packet::from_data( - Some(&SocketAddr::from(([127, 0, 0, 1], 8001))), - 42, - ) - .unwrap()]); - assert_eq!(unprocessed_packets.len(), batch_limit); - BankingStage::push_unprocessed( - &mut unprocessed_packets, - new_packet_batch.clone(), - packet_indexes, - &mut dropped_packet_batches_count, - &mut dropped_packets_count, - &mut newly_buffered_packets_count, - batch_limit, - &mut banking_stage_stats, - &mut LeaderSlotMetricsTracker::new(0), - ); - assert_eq!(unprocessed_packets.len(), 2); - assert_eq!( - unprocessed_packets[1].packet_batch.packets[0], - new_packet_batch.packets[0] - ); - assert_eq!(dropped_packet_batches_count, 1); - assert_eq!(dropped_packets_count, 2); - assert_eq!(newly_buffered_packets_count, 2); - } - #[cfg(test)] fn make_test_packets( transactions: Vec, vote_indexes: Vec, - ) -> DeserializedPacketBatch { + ) -> Vec { let capacity = transactions.len(); - let mut packet_batch = PacketBatch::with_capacity(capacity); - let mut packet_indexes = Vec::with_capacity(capacity); - packet_batch.packets.resize(capacity, Packet::default()); - for (index, tx) in transactions.iter().enumerate() { - Packet::populate_packet(&mut packet_batch.packets[index], None, tx).ok(); - packet_indexes.push(index); + let mut packet_vector = Vec::with_capacity(capacity); + for tx in transactions.iter() { + packet_vector.push(Packet::from_data(None, &tx).unwrap()); } for index in vote_indexes.iter() { - packet_batch.packets[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX; + packet_vector[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX; } - DeserializedPacketBatch::new(packet_batch, packet_indexes, false) + + packet_vector + .into_iter() + .map(|p| DeserializedPacket::new(p, None).unwrap()) + .collect() } #[test] - fn test_transactions_from_packets() { + fn test_transaction_from_deserialized_packet() { use solana_sdk::feature_set::FeatureSet; let keypair = Keypair::new(); let transfer_tx = @@ -4315,94 +4215,96 @@ mod tests { &keypair, None, ); - let sorted = |mut v: Vec| { - v.sort_unstable(); - v - }; // packets with no votes { let vote_indexes = vec![]; - let deserialized_packet_batch = + let packet_vector = make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); let mut votes_only = false; - let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &deserialized_packet_batch.unprocessed_packets, - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ); - assert_eq!(2, txs.len()); - assert_eq!(vec![0, 1], sorted(tx_packet_index)); + let txs = packet_vector.iter().filter_map(|tx| { + BankingStage::transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(2, txs.count()); votes_only = true; - let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &deserialized_packet_batch.unprocessed_packets, - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ); - assert_eq!(0, txs.len()); - assert_eq!(0, tx_packet_index.len()); + let txs = packet_vector.iter().filter_map(|tx| { + BankingStage::transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(0, txs.count()); } // packets with some votes { let vote_indexes = vec![0, 2]; - let deserialized_packet_batch = make_test_packets( + let packet_vector = make_test_packets( vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], vote_indexes, ); let mut votes_only = false; - let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &deserialized_packet_batch.unprocessed_packets, - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ); - assert_eq!(3, txs.len()); - assert_eq!(vec![0, 1, 2], sorted(tx_packet_index)); + let txs = packet_vector.iter().filter_map(|tx| { + BankingStage::transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(3, txs.count()); votes_only = true; - let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &deserialized_packet_batch.unprocessed_packets, - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ); - assert_eq!(2, txs.len()); - assert_eq!(vec![0, 2], sorted(tx_packet_index)); + let txs = packet_vector.iter().filter_map(|tx| { + BankingStage::transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(2, txs.count()); } // packets with all votes { let vote_indexes = vec![0, 1, 2]; - let deserialized_packet_batch = make_test_packets( + let packet_vector = make_test_packets( vec![vote_tx.clone(), vote_tx.clone(), vote_tx], vote_indexes, ); let mut votes_only = false; - let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &deserialized_packet_batch.unprocessed_packets, - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ); - assert_eq!(3, txs.len()); - assert_eq!(vec![0, 1, 2], sorted(tx_packet_index)); + let txs = packet_vector.iter().filter_map(|tx| { + BankingStage::transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(3, txs.count()); votes_only = true; - let (txs, tx_packet_index) = BankingStage::transactions_from_packets( - &deserialized_packet_batch.unprocessed_packets, - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ); - assert_eq!(3, txs.len()); - assert_eq!(vec![0, 1, 2], sorted(tx_packet_index)); + let txs = packet_vector.iter().filter_map(|tx| { + BankingStage::transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(3, txs.count()); } } @@ -4492,4 +4394,55 @@ mod tests { assert_eq!(expected_units, units); assert_eq!(expected_us, us); } + + #[test] + fn test_filter_processed_packets() { + let retryable_indexes = [0, 1, 2, 3]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + BankingStage::filter_processed_packets(retryable_indexes.iter(), f); + assert!(non_retryable_indexes.is_empty()); + + let retryable_indexes = [0, 1, 2, 3, 5]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + BankingStage::filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(4, 5)]); + + let retryable_indexes = [1, 2, 3]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + BankingStage::filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(0, 1)]); + + let retryable_indexes = [1, 2, 3, 5]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + BankingStage::filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5)]); + + let retryable_indexes = [1, 2, 3, 5, 8]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + BankingStage::filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); + + let retryable_indexes = [1, 2, 3, 5, 8, 8]; + let mut non_retryable_indexes = vec![]; + let f = |start, end| { + non_retryable_indexes.push((start, end)); + }; + BankingStage::filter_processed_packets(retryable_indexes.iter(), f); + assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); + } } diff --git a/core/src/leader_slot_banking_stage_metrics.rs b/core/src/leader_slot_banking_stage_metrics.rs index 17768aa86..ed11cfb11 100644 --- a/core/src/leader_slot_banking_stage_metrics.rs +++ b/core/src/leader_slot_banking_stage_metrics.rs @@ -88,6 +88,13 @@ struct LeaderSlotPacketCountMetrics { // queue becaus they were retryable errors retryable_errored_transaction_count: u64, + // The size of the unprocessed buffer at the end of the slot + end_of_slot_unprocessed_buffer_len: u64, + + // total number of transactions that were rebuffered into the queue after not being + // executed on a previous pass + retryable_packets_count: u64, + // total number of transactions that attempted execution due to some fatal error (too old, duplicate signature, etc.) // AND were dropped from the buffered queue nonretryable_errored_transactions_count: u64, @@ -174,6 +181,11 @@ impl LeaderSlotPacketCountMetrics { self.retryable_errored_transaction_count as i64, i64 ), + ( + "retryable_packets_count", + self.retryable_packets_count as i64, + i64 + ), ( "nonretryable_errored_transactions_count", self.nonretryable_errored_transactions_count as i64, @@ -214,6 +226,11 @@ impl LeaderSlotPacketCountMetrics { self.end_of_slot_filtered_invalid_count as i64, i64 ), + ( + "end_of_slot_unprocessed_buffer_len", + self.end_of_slot_unprocessed_buffer_len as i64, + i64 + ), ); } } @@ -524,6 +541,17 @@ impl LeaderSlotMetricsTracker { } } + pub(crate) fn increment_retryable_packets_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .retryable_packets_count, + count + ); + } + } + pub(crate) fn increment_end_of_slot_filtered_invalid_count(&mut self, count: u64) { if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { saturating_add_assign!( @@ -535,6 +563,14 @@ impl LeaderSlotMetricsTracker { } } + pub(crate) fn set_end_of_slot_unprocessed_buffer_len(&mut self, len: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + leader_slot_metrics + .packet_count_metrics + .end_of_slot_unprocessed_buffer_len = len; + } + } + // Outermost banking thread's loop timing metrics pub(crate) fn increment_process_buffered_packets_us(&mut self, us: u64) { if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { @@ -569,6 +605,13 @@ impl LeaderSlotMetricsTracker { .receive_and_buffer_packets_us, us ); + saturating_add_assign!( + leader_slot_metrics + .timing_metrics + .outer_loop_timings + .receive_and_buffer_packets_invoked_count, + 1 + ); } } diff --git a/core/src/leader_slot_banking_stage_timing_metrics.rs b/core/src/leader_slot_banking_stage_timing_metrics.rs index 157e925b2..5c337e1f4 100644 --- a/core/src/leader_slot_banking_stage_timing_metrics.rs +++ b/core/src/leader_slot_banking_stage_timing_metrics.rs @@ -134,6 +134,10 @@ pub(crate) struct OuterLoopTimings { // Time spent processing new incoming packets to the banking thread pub receive_and_buffer_packets_us: u64, + + // The number of times the function to receive and buffer new packets + // was called + pub receive_and_buffer_packets_invoked_count: u64, } impl OuterLoopTimings { @@ -144,6 +148,7 @@ impl OuterLoopTimings { process_buffered_packets_us: 0, slot_metrics_check_slot_boundary_us: 0, receive_and_buffer_packets_us: 0, + receive_and_buffer_packets_invoked_count: 0, } } @@ -179,6 +184,11 @@ impl OuterLoopTimings { self.receive_and_buffer_packets_us, i64 ), + ( + "receive_and_buffer_packets_invoked_count", + self.receive_and_buffer_packets_invoked_count, + i64 + ) ); } } diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 56aae6b44..36c1eba70 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -1,207 +1,388 @@ use { - retain_mut::RetainMut, + min_max_heap::MinMaxHeap, solana_perf::packet::{limited_deserialize, Packet, PacketBatch}, + solana_runtime::bank::Bank, solana_sdk::{ - hash::Hash, message::Message, short_vec::decode_shortu16_len, signature::Signature, - transaction::VersionedTransaction, + hash::Hash, + message::{Message, VersionedMessage}, + short_vec::decode_shortu16_len, + signature::Signature, + transaction::{Transaction, VersionedTransaction}, }, std::{ - collections::{HashMap, VecDeque}, + cmp::Ordering, + collections::{hash_map::Entry, HashMap}, mem::size_of, + rc::Rc, + sync::Arc, }, + thiserror::Error, }; +#[derive(Debug, Error)] +pub enum DeserializedPacketError { + #[error("ShortVec Failed to Deserialize")] + // short_vec::decode_shortu16_len() currently returns () on error + ShortVecError(()), + #[error("Deserialization Error: {0}")] + DeserializationError(#[from] bincode::Error), + #[error("overflowed on signature size {0}")] + SignatureOverflowed(usize), +} + +#[derive(Debug, Default, PartialEq, Eq)] +pub struct ImmutableDeserializedPacket { + original_packet: Packet, + versioned_transaction: VersionedTransaction, + message_hash: Hash, + is_simple_vote: bool, + fee_per_cu: u64, +} + +impl ImmutableDeserializedPacket { + pub fn original_packet(&self) -> &Packet { + &self.original_packet + } + + pub fn versioned_transaction(&self) -> &VersionedTransaction { + &self.versioned_transaction + } + + pub fn sender_stake(&self) -> u64 { + self.original_packet.meta.sender_stake + } + + pub fn message_hash(&self) -> &Hash { + &self.message_hash + } + + pub fn is_simple_vote(&self) -> bool { + self.is_simple_vote + } + + pub fn fee_per_cu(&self) -> u64 { + self.fee_per_cu + } +} + /// Holds deserialized messages, as well as computed message_hash and other things needed to create /// SanitizedTransaction -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct DeserializedPacket { - pub versioned_transaction: VersionedTransaction, - pub message_hash: Hash, - pub is_simple_vote: bool, -} - -/// Defines the type of entry in `UnprocessedPacketBatches`, it holds original packet_batch -/// for forwarding, as well as `forwarded` flag; -/// Each packet in packet_batch are deserialized upon receiving, the result are stored in -/// `DeserializedPacket` in the same order as packets in `packet_batch`. -#[derive(Debug, Default)] -pub struct DeserializedPacketBatch { - pub packet_batch: PacketBatch, + immutable_section: Rc, pub forwarded: bool, - // indexes of valid packets in batch, and their corresponding deserialized_packet - pub unprocessed_packets: HashMap, } -/// References to a packet in `UnprocessedPacketBatches`, where -/// - batch_index references to `DeserializedPacketBatch`, -/// - packet_index references to `packet` within `DeserializedPacketBatch.packet_batch` -#[derive(Debug, Default)] -pub struct PacketLocator { - #[allow(dead_code)] - batch_index: usize, - #[allow(dead_code)] - packet_index: usize, +impl DeserializedPacket { + pub fn new(packet: Packet, bank: Option<&Arc>) -> Result { + Self::new_internal(packet, bank, None) + } + + #[cfg(test)] + fn new_with_fee_per_cu( + packet: Packet, + fee_per_cu: u64, + ) -> Result { + Self::new_internal(packet, None, Some(fee_per_cu)) + } + + pub fn new_internal( + packet: Packet, + bank: Option<&Arc>, + fee_per_cu: Option, + ) -> Result { + let versioned_transaction: VersionedTransaction = + limited_deserialize(&packet.data[0..packet.meta.size])?; + let message_bytes = packet_message(&packet)?; + let message_hash = Message::hash_raw_message(message_bytes); + let is_simple_vote = packet.meta.is_simple_vote_tx(); + + let fee_per_cu = fee_per_cu.unwrap_or_else(|| { + bank.as_ref() + .map(|bank| compute_fee_per_cu(&versioned_transaction.message, bank)) + .unwrap_or(0) + }); + Ok(Self { + immutable_section: Rc::new(ImmutableDeserializedPacket { + original_packet: packet, + versioned_transaction, + message_hash, + is_simple_vote, + fee_per_cu, + }), + forwarded: false, + }) + } + + pub fn immutable_section(&self) -> &Rc { + &self.immutable_section + } +} + +impl PartialOrd for DeserializedPacket { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for DeserializedPacket { + fn cmp(&self, other: &Self) -> Ordering { + match self + .immutable_section() + .fee_per_cu() + .cmp(&other.immutable_section().fee_per_cu()) + { + Ordering::Equal => self + .immutable_section() + .sender_stake() + .cmp(&other.immutable_section().sender_stake()), + ordering => ordering, + } + } +} + +impl PartialOrd for ImmutableDeserializedPacket { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ImmutableDeserializedPacket { + fn cmp(&self, other: &Self) -> Ordering { + match self.fee_per_cu().cmp(&other.fee_per_cu()) { + Ordering::Equal => self.sender_stake().cmp(&other.sender_stake()), + ordering => ordering, + } + } } /// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store /// PacketBatch's received from sigverify. Banking thread continuously scans the buffer /// to pick proper packets to add to the block. #[derive(Default)] -pub struct UnprocessedPacketBatches(VecDeque); - -impl std::ops::Deref for UnprocessedPacketBatches { - type Target = VecDeque; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::ops::DerefMut for UnprocessedPacketBatches { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -impl RetainMut for UnprocessedPacketBatches { - fn retain_mut(&mut self, f: F) - where - F: FnMut(&mut DeserializedPacketBatch) -> bool, - { - RetainMut::retain_mut(&mut self.0, f); - } -} - -impl FromIterator for UnprocessedPacketBatches { - fn from_iter>(iter: I) -> Self { - Self(iter.into_iter().collect()) - } +pub struct UnprocessedPacketBatches { + pub packet_priority_queue: MinMaxHeap>, + pub message_hash_to_transaction: HashMap, + batch_limit: usize, } impl UnprocessedPacketBatches { - pub fn new() -> Self { - Self::default() + pub fn from_iter>(iter: I, capacity: usize) -> Self { + let mut unprocessed_packet_batches = Self::with_capacity(capacity); + for deserialized_packet in iter.into_iter() { + unprocessed_packet_batches.push(deserialized_packet); + } + + unprocessed_packet_batches } pub fn with_capacity(capacity: usize) -> Self { - UnprocessedPacketBatches(VecDeque::with_capacity(capacity)) - } - - /// Returns total number of all packets (including unprocessed and processed) in buffer - #[allow(dead_code)] - fn get_packets_count(&self) -> usize { - self.iter() - .map(|deserialized_packet_batch| deserialized_packet_batch.packet_batch.packets.len()) - .sum() - } - - /// Returns total number of unprocessed packets in buffer - #[allow(dead_code)] - fn get_unprocessed_packets_count(&self) -> usize { - self.iter() - .map(|deserialized_packet_batch| deserialized_packet_batch.unprocessed_packets.len()) - .sum() - } - - /// Iterates the inner `Vec`. - /// Returns the flattened result of mapping each - /// `DeserializedPacketBatch` to a list the batch's inner - /// packets' sender's stake and their `PacketLocator`'s within the - /// `Vec`. - #[allow(dead_code)] - fn get_stakes_and_locators(&self) -> (Vec, Vec) { - self.iter() - .enumerate() - .flat_map(|(batch_index, deserialized_packet_batch)| { - let packet_batch = &deserialized_packet_batch.packet_batch; - deserialized_packet_batch - .unprocessed_packets - .keys() - .map(move |packet_index| { - let p = &packet_batch.packets[*packet_index]; - ( - p.meta.sender_stake, - PacketLocator { - batch_index, - packet_index: *packet_index, - }, - ) - }) - }) - .unzip() - } -} - -impl DeserializedPacketBatch { - pub fn new(packet_batch: PacketBatch, packet_indexes: Vec, forwarded: bool) -> Self { - let unprocessed_packets = Self::deserialize_packets(&packet_batch, &packet_indexes); - Self { - packet_batch, - unprocessed_packets, - forwarded, + UnprocessedPacketBatches { + packet_priority_queue: MinMaxHeap::with_capacity(capacity), + message_hash_to_transaction: HashMap::with_capacity(capacity), + batch_limit: capacity, } } - fn deserialize_packets( - packet_batch: &PacketBatch, - packet_indexes: &[usize], - ) -> HashMap { - packet_indexes - .iter() - .filter_map(|packet_index| { - let deserialized_packet = - Self::deserialize_packet(&packet_batch.packets[*packet_index])?; - Some((*packet_index, deserialized_packet)) - }) - .collect() + pub fn clear(&mut self) { + self.packet_priority_queue.clear(); + self.message_hash_to_transaction.clear(); } - fn deserialize_packet(packet: &Packet) -> Option { - let versioned_transaction: VersionedTransaction = - match limited_deserialize(&packet.data[0..packet.meta.size]) { - Ok(tx) => tx, - Err(_) => return None, - }; + /// Insert new `deserizlized_packet_batch` into inner `MinMaxHeap`, + /// weighted first by the fee-per-cu, then the stake of the sender. + /// If buffer is at the max limit, the lowest weighted packet is dropped + /// + /// Returns tuple of number of packets dropped + pub fn insert_batch( + &mut self, + deserialized_packets: impl Iterator, + ) -> usize { + let mut num_dropped_packets = 0; + for deserialized_packet in deserialized_packets { + if self.push(deserialized_packet).is_some() { + num_dropped_packets += 1; + } + } + num_dropped_packets + } - if let Some(message_bytes) = Self::packet_message(packet) { - let message_hash = Message::hash_raw_message(message_bytes); - let is_simple_vote = packet.meta.is_simple_vote_tx(); - Some(DeserializedPacket { - versioned_transaction, - message_hash, - is_simple_vote, - }) + pub fn push(&mut self, deserialized_packet: DeserializedPacket) -> Option { + if self + .message_hash_to_transaction + .contains_key(deserialized_packet.immutable_section().message_hash()) + { + return None; + } + + if self.len() == self.batch_limit { + // Optimized to not allocate by calling `MinMaxHeap::push_pop_min()` + Some(self.push_pop_min(deserialized_packet)) } else { + self.push_internal(deserialized_packet); None } } - /// Read the transaction message from packet data - pub fn packet_message(packet: &Packet) -> Option<&[u8]> { - let (sig_len, sig_size) = decode_shortu16_len(&packet.data).ok()?; - let msg_start = sig_len - .checked_mul(size_of::()) - .and_then(|v| v.checked_add(sig_size))?; - let msg_end = packet.meta.size; - Some(&packet.data[msg_start..msg_end]) + pub fn iter(&mut self) -> impl Iterator { + self.message_hash_to_transaction.values() } - /// Returns whether the given `PacketBatch` has any more remaining unprocessed - /// transactions - pub fn update_buffered_packets_with_new_unprocessed( - &mut self, - _original_unprocessed_indexes: &[usize], - new_unprocessed_indexes: &[usize], - ) -> bool { - let has_more_unprocessed_transactions = !new_unprocessed_indexes.is_empty(); - if has_more_unprocessed_transactions { - self.unprocessed_packets - .retain(|index, _| new_unprocessed_indexes.contains(index)); + pub fn iter_mut(&mut self) -> impl Iterator { + self.message_hash_to_transaction.iter_mut().map(|(_k, v)| v) + } + + pub fn retain(&mut self, mut f: F) + where + F: FnMut(&mut DeserializedPacket) -> bool, + { + // TODO: optimize this only when number of packets + // with oudated blockhash is high + let new_packet_priority_queue: MinMaxHeap> = self + .packet_priority_queue + .drain() + .filter(|immutable_packet| { + match self + .message_hash_to_transaction + .entry(*immutable_packet.message_hash()) + { + Entry::Vacant(_vacant_entry) => { + panic!( + "entry {} must exist to be consistent with `packet_priority_queue`", + immutable_packet.message_hash() + ); + } + Entry::Occupied(mut occupied_entry) => { + let should_retain = f(occupied_entry.get_mut()); + if !should_retain { + occupied_entry.remove_entry(); + } + should_retain + } + } + }) + .collect(); + self.packet_priority_queue = new_packet_priority_queue; + } + + pub fn len(&self) -> usize { + self.packet_priority_queue.len() + } + + pub fn is_empty(&self) -> bool { + self.packet_priority_queue.is_empty() + } + + fn push_internal(&mut self, deserialized_packet: DeserializedPacket) { + // Push into the priority queue + self.packet_priority_queue + .push(deserialized_packet.immutable_section().clone()); + + // Keep track of the original packet in the tracking hashmap + self.message_hash_to_transaction.insert( + *deserialized_packet.immutable_section().message_hash(), + deserialized_packet, + ); + } + + /// Returns the popped minimum packet from the priority queue. + fn push_pop_min(&mut self, deserialized_packet: DeserializedPacket) -> DeserializedPacket { + let immutable_packet = deserialized_packet.immutable_section().clone(); + + // Push into the priority queue + let popped_immutable_packet = self.packet_priority_queue.push_pop_min(immutable_packet); + + if popped_immutable_packet.message_hash() + != deserialized_packet.immutable_section().message_hash() + { + // Remove the popped entry from the tracking hashmap. Unwrap call is safe + // because the priority queue and hashmap are kept consistent at all times. + let removed_min = self + .message_hash_to_transaction + .remove(popped_immutable_packet.message_hash()) + .unwrap(); + + // Keep track of the original packet in the tracking hashmap + self.message_hash_to_transaction.insert( + *deserialized_packet.immutable_section().message_hash(), + deserialized_packet, + ); + removed_min } else { - self.unprocessed_packets.clear(); + deserialized_packet } - - has_more_unprocessed_transactions } + + pub fn pop_max(&mut self) -> Option { + self.packet_priority_queue + .pop_max() + .map(|immutable_packet| { + self.message_hash_to_transaction + .remove(immutable_packet.message_hash()) + .unwrap() + }) + } + + /// Pop up to the next `n` highest priority transactions from the queue. + /// Returns `None` if the queue is empty + pub fn pop_max_n(&mut self, n: usize) -> Option> { + let current_len = self.len(); + if self.is_empty() { + None + } else { + let num_to_pop = std::cmp::min(current_len, n); + Some( + std::iter::from_fn(|| Some(self.pop_max().unwrap())) + .take(num_to_pop) + .collect::>(), + ) + } + } + + pub fn capacity(&self) -> usize { + self.packet_priority_queue.capacity() + } +} + +pub fn deserialize_packets<'a>( + packet_batch: &'a PacketBatch, + packet_indexes: &'a [usize], + bank: Option<&'a Arc>, +) -> impl Iterator + 'a { + packet_indexes.iter().filter_map(move |packet_index| { + DeserializedPacket::new(packet_batch.packets[*packet_index].clone(), bank).ok() + }) +} + +/// Read the transaction message from packet data +pub fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> { + let (sig_len, sig_size) = + decode_shortu16_len(&packet.data).map_err(DeserializedPacketError::ShortVecError)?; + sig_len + .checked_mul(size_of::()) + .and_then(|v| v.checked_add(sig_size)) + .map(|msg_start| { + let msg_end = packet.meta.size; + &packet.data[msg_start..msg_end] + }) + .ok_or(DeserializedPacketError::SignatureOverflowed(sig_size)) +} + +/// Computes `(addition_fee + base_fee / requested_cu)` for `deserialized_packet` +fn compute_fee_per_cu(_message: &VersionedMessage, _bank: &Bank) -> u64 { + 1 +} + +pub fn transactions_to_deserialized_packets( + transactions: &[Transaction], +) -> Result, DeserializedPacketError> { + transactions + .iter() + .map(|transaction| { + let packet = Packet::from_data(None, transaction)?; + DeserializedPacket::new(packet, None) + }) + .collect() } #[cfg(test)] @@ -212,7 +393,7 @@ mod tests { std::net::IpAddr, }; - fn packet_with_sender_stake(sender_stake: u64, ip: Option) -> Packet { + fn packet_with_sender_stake(sender_stake: u64, ip: Option) -> DeserializedPacket { let tx = system_transaction::transfer( &Keypair::new(), &solana_sdk::pubkey::new_rand(), @@ -224,109 +405,115 @@ mod tests { if let Some(ip) = ip { packet.meta.addr = ip; } - packet + DeserializedPacket::new(packet, None).unwrap() + } + + fn packet_with_fee_per_cu(fee_per_cu: u64) -> DeserializedPacket { + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, &tx).unwrap(); + DeserializedPacket::new_with_fee_per_cu(packet, fee_per_cu).unwrap() } #[test] - fn test_packet_message() { - let keypair = Keypair::new(); - let pubkey = solana_sdk::pubkey::new_rand(); - let blockhash = Hash::new_unique(); - let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash); - let packet = Packet::from_data(None, &transaction).unwrap(); + fn test_unprocessed_packet_batches_insert_pop_same_packet() { + let packet = packet_with_sender_stake(1, None); + let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2); + unprocessed_packet_batches.push(packet.clone()); + unprocessed_packet_batches.push(packet.clone()); + + // There was only one unique packet, so that one should be the + // only packet returned assert_eq!( - DeserializedPacketBatch::packet_message(&packet) - .unwrap() - .to_vec(), - transaction.message_data() + unprocessed_packet_batches.pop_max_n(2).unwrap(), + vec![packet] ); } #[test] - fn test_get_packets_count() { - // create a buffer with 3 batches, each has 2 packets but only first one is valid - let batch_size = 2usize; - let batch_count = 3usize; - let unprocessed_packet_batches: UnprocessedPacketBatches = (0..batch_count) - .map(|_batch_index| { - DeserializedPacketBatch::new( - PacketBatch::new( - (0..batch_size) - .map(|packet_index| packet_with_sender_stake(packet_index as u64, None)) - .collect(), - ), - vec![0], - false, - ) - }) - .collect(); + fn test_unprocessed_packet_batches_insert_minimum_packet_over_capacity() { + let heavier_packet_weight = 2; + let heavier_packet = packet_with_fee_per_cu(heavier_packet_weight); - // Assert total packets count, and unprocessed packets count + let lesser_packet_weight = heavier_packet_weight - 1; + let lesser_packet = packet_with_fee_per_cu(lesser_packet_weight); + + // Test that the heavier packet is actually heavier + let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2); + unprocessed_packet_batches.push(heavier_packet.clone()); + unprocessed_packet_batches.push(lesser_packet.clone()); assert_eq!( - batch_size * batch_count, - unprocessed_packet_batches.get_packets_count() + unprocessed_packet_batches.pop_max().unwrap(), + heavier_packet ); + + let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(1); + unprocessed_packet_batches.push(heavier_packet); + + // Buffer is now at capacity, pushing the smaller weighted + // packet should immediately pop it assert_eq!( - batch_count, - unprocessed_packet_batches.get_unprocessed_packets_count() + unprocessed_packet_batches + .push(lesser_packet.clone()) + .unwrap(), + lesser_packet ); } #[test] - fn test_get_stakes_and_locators_from_empty_buffer() { - let unprocessed_packet_batches = UnprocessedPacketBatches::default(); - let (stakes, locators) = unprocessed_packet_batches.get_stakes_and_locators(); + fn test_unprocessed_packet_batches_pop_max_n() { + let num_packets = 10; + let packets_iter = + std::iter::repeat_with(|| packet_with_sender_stake(1, None)).take(num_packets); + let mut unprocessed_packet_batches = + UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets); - assert!(stakes.is_empty()); - assert!(locators.is_empty()); - } - - #[test] - fn test_get_stakes_and_locators() { - solana_logger::setup(); - - // setup senders' address and stake - let senders: Vec<(IpAddr, u64)> = vec![ - (IpAddr::from([127, 0, 0, 1]), 1), - (IpAddr::from([127, 0, 0, 2]), 2), - (IpAddr::from([127, 0, 0, 3]), 3), - ]; - // create a buffer with 3 batches, each has 2 packet from above sender. - // buffer looks like: - // [127.0.0.1, 127.0.0.2] - // [127.0.0.3, 127.0.0.1] - // [127.0.0.2, 127.0.0.3] - let batch_size = 2usize; - let batch_count = 3usize; - let unprocessed_packet_batches: UnprocessedPacketBatches = (0..batch_count) - .map(|batch_index| { - DeserializedPacketBatch::new( - PacketBatch::new( - (0..batch_size) - .map(|packet_index| { - let n = (batch_index * batch_size + packet_index) % senders.len(); - packet_with_sender_stake(senders[n].1, Some(senders[n].0)) - }) - .collect(), - ), - (0..batch_size).collect(), - false, - ) - }) - .collect(); - - let (stakes, locators) = unprocessed_packet_batches.get_stakes_and_locators(); - - // Produced stakes and locators should both have "batch_size * batch_count" entries; - assert_eq!(batch_size * batch_count, stakes.len()); - assert_eq!(batch_size * batch_count, locators.len()); - // Assert stakes and locators are in good order - locators.iter().enumerate().for_each(|(index, locator)| { + // Test with small step size + let step_size = 1; + for _ in 0..num_packets { assert_eq!( - stakes[index], - senders[(locator.batch_index * batch_size + locator.packet_index) % senders.len()] - .1 + unprocessed_packet_batches + .pop_max_n(step_size) + .unwrap() + .len(), + step_size ); - }); + } + + assert!(unprocessed_packet_batches.is_empty()); + assert!(unprocessed_packet_batches.pop_max_n(0).is_none()); + assert!(unprocessed_packet_batches.pop_max_n(1).is_none()); + + // Test with step size larger than `num_packets` + let step_size = num_packets + 1; + let mut unprocessed_packet_batches = + UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets); + assert_eq!( + unprocessed_packet_batches + .pop_max_n(step_size) + .unwrap() + .len(), + num_packets + ); + assert!(unprocessed_packet_batches.is_empty()); + assert!(unprocessed_packet_batches.pop_max_n(0).is_none()); + + // Test with step size equal to `num_packets` + let step_size = num_packets; + let mut unprocessed_packet_batches = + UnprocessedPacketBatches::from_iter(packets_iter, num_packets); + assert_eq!( + unprocessed_packet_batches + .pop_max_n(step_size) + .unwrap() + .len(), + step_size + ); + assert!(unprocessed_packet_batches.is_empty()); + assert!(unprocessed_packet_batches.pop_max_n(0).is_none()); } } diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 87495a222..91f51fb8f 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -2140,6 +2140,12 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "min-max-heap" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2687e6cf9c00f48e9284cf9fd15f2ef341d03cc7743abf9df4c5f07fdee50b18" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -4288,6 +4294,7 @@ dependencies = [ "itertools", "log", "lru", + "min-max-heap", "rand 0.7.3", "rand_chacha 0.2.2", "rayon", diff --git a/sdk/src/packet.rs b/sdk/src/packet.rs index b73590da1..760ab0e1c 100644 --- a/sdk/src/packet.rs +++ b/sdk/src/packet.rs @@ -25,7 +25,7 @@ bitflags! { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Eq)] #[repr(C)] pub struct Meta { pub size: usize, @@ -35,7 +35,7 @@ pub struct Meta { pub sender_stake: u64, } -#[derive(Clone)] +#[derive(Clone, Eq)] #[repr(C)] pub struct Packet { pub data: [u8; PACKET_DATA_SIZE],