Prioritize BankingStage packets individually in min-max heap (#24187)

This commit is contained in:
carllin 2022-05-04 21:50:56 -05:00 committed by GitHub
parent 9089909995
commit 870ac80b79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1111 additions and 730 deletions

7
Cargo.lock generated
View File

@ -2402,6 +2402,12 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "min-max-heap"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2687e6cf9c00f48e9284cf9fd15f2ef341d03cc7743abf9df4c5f07fdee50b18"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -4615,6 +4621,7 @@ dependencies = [
"log",
"lru",
"matches",
"min-max-heap",
"rand 0.7.3",
"rand_chacha 0.2.2",
"raptorq",

View File

@ -27,6 +27,7 @@ histogram = "0.6.9"
itertools = "0.10.3"
log = "0.4.17"
lru = "0.7.5"
min-max-heap = "1.3.0"
rand = "0.7.0"
rand_chacha = "0.2.2"
rayon = "1.5.2"
@ -96,5 +97,8 @@ name = "sigverify_stage"
[[bench]]
name = "retransmit_stage"
[[bench]]
name = "unprocessed_packet_batches"
[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]

View File

@ -76,18 +76,11 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
let recorder = poh_recorder.lock().unwrap().recorder();
let tx = test_tx();
let len = 4096;
let chunk_size = 1024;
let batches = to_packet_batches(&vec![tx; len], chunk_size);
let mut packet_batches = UnprocessedPacketBatches::new();
for batch in batches {
let batch_len = batch.packets.len();
packet_batches.push_back(DeserializedPacketBatch::new(
batch,
vec![0usize; batch_len],
false,
));
}
let transactions = vec![tx; 4194304];
let batches = transactions_to_deserialized_packets(&transactions).unwrap();
let batches_len = batches.len();
let mut transaction_buffer =
UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len);
let (s, _r) = unbounded();
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
@ -96,7 +89,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&my_pubkey,
std::u128::MAX,
&poh_recorder,
&mut packet_batches,
&mut transaction_buffer,
None,
&s,
None::<Box<dyn Fn()>>,
@ -104,6 +97,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
&recorder,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
&mut LeaderSlotMetricsTracker::new(0),
10,
);
});

View File

@ -0,0 +1,176 @@
#![allow(clippy::integer_arithmetic)]
#![feature(test)]
extern crate test;
use {
rand::distributions::{Distribution, Uniform},
solana_core::unprocessed_packet_batches::*,
solana_measure::measure::Measure,
solana_perf::packet::{Packet, PacketBatch},
solana_sdk::{hash::Hash, signature::Keypair, system_transaction},
test::Bencher,
};
fn build_packet_batch(packet_per_batch_count: usize) -> (PacketBatch, Vec<usize>) {
let packet_batch = PacketBatch::new(
(0..packet_per_batch_count)
.map(|sender_stake| {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let mut packet = Packet::from_data(None, &tx).unwrap();
packet.meta.sender_stake = sender_stake as u64;
packet
})
.collect(),
);
let packet_indexes: Vec<usize> = (0..packet_per_batch_count).collect();
(packet_batch, packet_indexes)
}
fn build_randomized_packet_batch(packet_per_batch_count: usize) -> (PacketBatch, Vec<usize>) {
let mut rng = rand::thread_rng();
let distribution = Uniform::from(0..200_000);
let packet_batch = PacketBatch::new(
(0..packet_per_batch_count)
.map(|_| {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let mut packet = Packet::from_data(None, &tx).unwrap();
let sender_stake = distribution.sample(&mut rng);
packet.meta.sender_stake = sender_stake as u64;
packet
})
.collect(),
);
let packet_indexes: Vec<usize> = (0..packet_per_batch_count).collect();
(packet_batch, packet_indexes)
}
fn insert_packet_batches(
buffer_max_size: usize,
batch_count: usize,
packet_per_batch_count: usize,
randomize: bool,
) {
solana_logger::setup();
let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size);
let mut timer = Measure::start("insert_batch");
(0..batch_count).for_each(|_| {
let (packet_batch, packet_indexes) = if randomize {
build_randomized_packet_batch(packet_per_batch_count)
} else {
build_packet_batch(packet_per_batch_count)
};
let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes, None);
unprocessed_packet_batches.insert_batch(deserialized_packets);
});
timer.stop();
log::info!(
"inserted {} batch, elapsed {}",
buffer_max_size,
timer.as_us()
);
}
#[bench]
#[allow(clippy::unit_arg)]
fn bench_packet_clone(bencher: &mut Bencher) {
let batch_count = 1000;
let packet_per_batch_count = 128;
let packet_batches: Vec<PacketBatch> = (0..batch_count)
.map(|_| build_packet_batch(packet_per_batch_count).0)
.collect();
bencher.iter(|| {
test::black_box(packet_batches.iter().for_each(|packet_batch| {
let mut outer_packet = Packet::default();
let mut timer = Measure::start("insert_batch");
packet_batch.packets.iter().for_each(|packet| {
let mut packet = packet.clone();
packet.meta.sender_stake *= 2;
if packet.meta.sender_stake > 2 {
outer_packet = packet;
}
});
timer.stop();
}));
});
}
//*
// v1, bench: 5,600,038,163 ns/iter (+/- 940,818,988)
// v2, bench: 5,265,382,750 ns/iter (+/- 153,623,264)
#[bench]
#[ignore]
fn bench_unprocessed_packet_batches_within_limit(bencher: &mut Bencher) {
let buffer_capacity = 1_000 * 128;
let batch_count = 1_000;
let packet_per_batch_count = 128;
bencher.iter(|| {
insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, false);
});
}
// v1, bench: 6,607,014,940 ns/iter (+/- 768,191,361)
// v2, bench: 5,692,753,323 ns/iter (+/- 548,959,624)
#[bench]
#[ignore]
fn bench_unprocessed_packet_batches_beyond_limit(bencher: &mut Bencher) {
let buffer_capacity = 1_000 * 128;
let batch_count = 1_100;
let packet_per_batch_count = 128;
// this is the worst scenario testing: all batches are uniformly populated with packets from
// priority 100..228, so in order to drop a batch, algo will have to drop all packets that has
// priority < 228, plus one 228. That's 2000 batch * 127 packets + 1
// Also, since all batches have same stake distribution, the new one is always the one got
// dropped. Tho it does not change algo complexity.
bencher.iter(|| {
insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, false);
});
}
// */
// v1, bench: 5,843,307,086 ns/iter (+/- 844,249,298)
// v2, bench: 5,139,525,951 ns/iter (+/- 48,005,521)
#[bench]
#[ignore]
fn bench_unprocessed_packet_batches_randomized_within_limit(bencher: &mut Bencher) {
let buffer_capacity = 1_000 * 128;
let batch_count = 1_000;
let packet_per_batch_count = 128;
bencher.iter(|| {
insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true);
});
}
// v1, bench: 6,497,623,849 ns/iter (+/- 3,206,382,212)
// v2, bench: 5,762,071,682 ns/iter (+/- 168,244,418)
#[bench]
#[ignore]
fn bench_unprocessed_packet_batches_randomized_beyond_limit(bencher: &mut Bencher) {
let buffer_capacity = 1_000 * 128;
let batch_count = 1_100;
let packet_per_batch_count = 128;
bencher.iter(|| {
insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true);
});
}

File diff suppressed because it is too large Load Diff

View File

@ -88,6 +88,13 @@ struct LeaderSlotPacketCountMetrics {
// queue becaus they were retryable errors
retryable_errored_transaction_count: u64,
// The size of the unprocessed buffer at the end of the slot
end_of_slot_unprocessed_buffer_len: u64,
// total number of transactions that were rebuffered into the queue after not being
// executed on a previous pass
retryable_packets_count: u64,
// total number of transactions that attempted execution due to some fatal error (too old, duplicate signature, etc.)
// AND were dropped from the buffered queue
nonretryable_errored_transactions_count: u64,
@ -174,6 +181,11 @@ impl LeaderSlotPacketCountMetrics {
self.retryable_errored_transaction_count as i64,
i64
),
(
"retryable_packets_count",
self.retryable_packets_count as i64,
i64
),
(
"nonretryable_errored_transactions_count",
self.nonretryable_errored_transactions_count as i64,
@ -214,6 +226,11 @@ impl LeaderSlotPacketCountMetrics {
self.end_of_slot_filtered_invalid_count as i64,
i64
),
(
"end_of_slot_unprocessed_buffer_len",
self.end_of_slot_unprocessed_buffer_len as i64,
i64
),
);
}
}
@ -524,6 +541,17 @@ impl LeaderSlotMetricsTracker {
}
}
pub(crate) fn increment_retryable_packets_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
leader_slot_metrics
.packet_count_metrics
.retryable_packets_count,
count
);
}
}
pub(crate) fn increment_end_of_slot_filtered_invalid_count(&mut self, count: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
saturating_add_assign!(
@ -535,6 +563,14 @@ impl LeaderSlotMetricsTracker {
}
}
pub(crate) fn set_end_of_slot_unprocessed_buffer_len(&mut self, len: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
leader_slot_metrics
.packet_count_metrics
.end_of_slot_unprocessed_buffer_len = len;
}
}
// Outermost banking thread's loop timing metrics
pub(crate) fn increment_process_buffered_packets_us(&mut self, us: u64) {
if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics {
@ -569,6 +605,13 @@ impl LeaderSlotMetricsTracker {
.receive_and_buffer_packets_us,
us
);
saturating_add_assign!(
leader_slot_metrics
.timing_metrics
.outer_loop_timings
.receive_and_buffer_packets_invoked_count,
1
);
}
}

View File

@ -134,6 +134,10 @@ pub(crate) struct OuterLoopTimings {
// Time spent processing new incoming packets to the banking thread
pub receive_and_buffer_packets_us: u64,
// The number of times the function to receive and buffer new packets
// was called
pub receive_and_buffer_packets_invoked_count: u64,
}
impl OuterLoopTimings {
@ -144,6 +148,7 @@ impl OuterLoopTimings {
process_buffered_packets_us: 0,
slot_metrics_check_slot_boundary_us: 0,
receive_and_buffer_packets_us: 0,
receive_and_buffer_packets_invoked_count: 0,
}
}
@ -179,6 +184,11 @@ impl OuterLoopTimings {
self.receive_and_buffer_packets_us,
i64
),
(
"receive_and_buffer_packets_invoked_count",
self.receive_and_buffer_packets_invoked_count,
i64
)
);
}
}

View File

@ -1,207 +1,388 @@
use {
retain_mut::RetainMut,
min_max_heap::MinMaxHeap,
solana_perf::packet::{limited_deserialize, Packet, PacketBatch},
solana_runtime::bank::Bank,
solana_sdk::{
hash::Hash, message::Message, short_vec::decode_shortu16_len, signature::Signature,
transaction::VersionedTransaction,
hash::Hash,
message::{Message, VersionedMessage},
short_vec::decode_shortu16_len,
signature::Signature,
transaction::{Transaction, VersionedTransaction},
},
std::{
collections::{HashMap, VecDeque},
cmp::Ordering,
collections::{hash_map::Entry, HashMap},
mem::size_of,
rc::Rc,
sync::Arc,
},
thiserror::Error,
};
#[derive(Debug, Error)]
pub enum DeserializedPacketError {
#[error("ShortVec Failed to Deserialize")]
// short_vec::decode_shortu16_len() currently returns () on error
ShortVecError(()),
#[error("Deserialization Error: {0}")]
DeserializationError(#[from] bincode::Error),
#[error("overflowed on signature size {0}")]
SignatureOverflowed(usize),
}
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ImmutableDeserializedPacket {
original_packet: Packet,
versioned_transaction: VersionedTransaction,
message_hash: Hash,
is_simple_vote: bool,
fee_per_cu: u64,
}
impl ImmutableDeserializedPacket {
pub fn original_packet(&self) -> &Packet {
&self.original_packet
}
pub fn versioned_transaction(&self) -> &VersionedTransaction {
&self.versioned_transaction
}
pub fn sender_stake(&self) -> u64 {
self.original_packet.meta.sender_stake
}
pub fn message_hash(&self) -> &Hash {
&self.message_hash
}
pub fn is_simple_vote(&self) -> bool {
self.is_simple_vote
}
pub fn fee_per_cu(&self) -> u64 {
self.fee_per_cu
}
}
/// Holds deserialized messages, as well as computed message_hash and other things needed to create
/// SanitizedTransaction
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct DeserializedPacket {
pub versioned_transaction: VersionedTransaction,
pub message_hash: Hash,
pub is_simple_vote: bool,
}
/// Defines the type of entry in `UnprocessedPacketBatches`, it holds original packet_batch
/// for forwarding, as well as `forwarded` flag;
/// Each packet in packet_batch are deserialized upon receiving, the result are stored in
/// `DeserializedPacket` in the same order as packets in `packet_batch`.
#[derive(Debug, Default)]
pub struct DeserializedPacketBatch {
pub packet_batch: PacketBatch,
immutable_section: Rc<ImmutableDeserializedPacket>,
pub forwarded: bool,
// indexes of valid packets in batch, and their corresponding deserialized_packet
pub unprocessed_packets: HashMap<usize, DeserializedPacket>,
}
/// References to a packet in `UnprocessedPacketBatches`, where
/// - batch_index references to `DeserializedPacketBatch`,
/// - packet_index references to `packet` within `DeserializedPacketBatch.packet_batch`
#[derive(Debug, Default)]
pub struct PacketLocator {
#[allow(dead_code)]
batch_index: usize,
#[allow(dead_code)]
packet_index: usize,
impl DeserializedPacket {
pub fn new(packet: Packet, bank: Option<&Arc<Bank>>) -> Result<Self, DeserializedPacketError> {
Self::new_internal(packet, bank, None)
}
#[cfg(test)]
fn new_with_fee_per_cu(
packet: Packet,
fee_per_cu: u64,
) -> Result<Self, DeserializedPacketError> {
Self::new_internal(packet, None, Some(fee_per_cu))
}
pub fn new_internal(
packet: Packet,
bank: Option<&Arc<Bank>>,
fee_per_cu: Option<u64>,
) -> Result<Self, DeserializedPacketError> {
let versioned_transaction: VersionedTransaction =
limited_deserialize(&packet.data[0..packet.meta.size])?;
let message_bytes = packet_message(&packet)?;
let message_hash = Message::hash_raw_message(message_bytes);
let is_simple_vote = packet.meta.is_simple_vote_tx();
let fee_per_cu = fee_per_cu.unwrap_or_else(|| {
bank.as_ref()
.map(|bank| compute_fee_per_cu(&versioned_transaction.message, bank))
.unwrap_or(0)
});
Ok(Self {
immutable_section: Rc::new(ImmutableDeserializedPacket {
original_packet: packet,
versioned_transaction,
message_hash,
is_simple_vote,
fee_per_cu,
}),
forwarded: false,
})
}
pub fn immutable_section(&self) -> &Rc<ImmutableDeserializedPacket> {
&self.immutable_section
}
}
impl PartialOrd for DeserializedPacket {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for DeserializedPacket {
fn cmp(&self, other: &Self) -> Ordering {
match self
.immutable_section()
.fee_per_cu()
.cmp(&other.immutable_section().fee_per_cu())
{
Ordering::Equal => self
.immutable_section()
.sender_stake()
.cmp(&other.immutable_section().sender_stake()),
ordering => ordering,
}
}
}
impl PartialOrd for ImmutableDeserializedPacket {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ImmutableDeserializedPacket {
fn cmp(&self, other: &Self) -> Ordering {
match self.fee_per_cu().cmp(&other.fee_per_cu()) {
Ordering::Equal => self.sender_stake().cmp(&other.sender_stake()),
ordering => ordering,
}
}
}
/// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store
/// PacketBatch's received from sigverify. Banking thread continuously scans the buffer
/// to pick proper packets to add to the block.
#[derive(Default)]
pub struct UnprocessedPacketBatches(VecDeque<DeserializedPacketBatch>);
impl std::ops::Deref for UnprocessedPacketBatches {
type Target = VecDeque<DeserializedPacketBatch>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for UnprocessedPacketBatches {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl RetainMut<DeserializedPacketBatch> for UnprocessedPacketBatches {
fn retain_mut<F>(&mut self, f: F)
where
F: FnMut(&mut DeserializedPacketBatch) -> bool,
{
RetainMut::retain_mut(&mut self.0, f);
}
}
impl FromIterator<DeserializedPacketBatch> for UnprocessedPacketBatches {
fn from_iter<I: IntoIterator<Item = DeserializedPacketBatch>>(iter: I) -> Self {
Self(iter.into_iter().collect())
}
pub struct UnprocessedPacketBatches {
pub packet_priority_queue: MinMaxHeap<Rc<ImmutableDeserializedPacket>>,
pub message_hash_to_transaction: HashMap<Hash, DeserializedPacket>,
batch_limit: usize,
}
impl UnprocessedPacketBatches {
pub fn new() -> Self {
Self::default()
pub fn from_iter<I: IntoIterator<Item = DeserializedPacket>>(iter: I, capacity: usize) -> Self {
let mut unprocessed_packet_batches = Self::with_capacity(capacity);
for deserialized_packet in iter.into_iter() {
unprocessed_packet_batches.push(deserialized_packet);
}
unprocessed_packet_batches
}
pub fn with_capacity(capacity: usize) -> Self {
UnprocessedPacketBatches(VecDeque::with_capacity(capacity))
}
/// Returns total number of all packets (including unprocessed and processed) in buffer
#[allow(dead_code)]
fn get_packets_count(&self) -> usize {
self.iter()
.map(|deserialized_packet_batch| deserialized_packet_batch.packet_batch.packets.len())
.sum()
}
/// Returns total number of unprocessed packets in buffer
#[allow(dead_code)]
fn get_unprocessed_packets_count(&self) -> usize {
self.iter()
.map(|deserialized_packet_batch| deserialized_packet_batch.unprocessed_packets.len())
.sum()
}
/// Iterates the inner `Vec<DeserializedPacketBatch>`.
/// Returns the flattened result of mapping each
/// `DeserializedPacketBatch` to a list the batch's inner
/// packets' sender's stake and their `PacketLocator`'s within the
/// `Vec<DeserializedPacketBatch>`.
#[allow(dead_code)]
fn get_stakes_and_locators(&self) -> (Vec<u64>, Vec<PacketLocator>) {
self.iter()
.enumerate()
.flat_map(|(batch_index, deserialized_packet_batch)| {
let packet_batch = &deserialized_packet_batch.packet_batch;
deserialized_packet_batch
.unprocessed_packets
.keys()
.map(move |packet_index| {
let p = &packet_batch.packets[*packet_index];
(
p.meta.sender_stake,
PacketLocator {
batch_index,
packet_index: *packet_index,
},
)
})
})
.unzip()
}
}
impl DeserializedPacketBatch {
pub fn new(packet_batch: PacketBatch, packet_indexes: Vec<usize>, forwarded: bool) -> Self {
let unprocessed_packets = Self::deserialize_packets(&packet_batch, &packet_indexes);
Self {
packet_batch,
unprocessed_packets,
forwarded,
UnprocessedPacketBatches {
packet_priority_queue: MinMaxHeap::with_capacity(capacity),
message_hash_to_transaction: HashMap::with_capacity(capacity),
batch_limit: capacity,
}
}
fn deserialize_packets(
packet_batch: &PacketBatch,
packet_indexes: &[usize],
) -> HashMap<usize, DeserializedPacket> {
packet_indexes
.iter()
.filter_map(|packet_index| {
let deserialized_packet =
Self::deserialize_packet(&packet_batch.packets[*packet_index])?;
Some((*packet_index, deserialized_packet))
})
.collect()
pub fn clear(&mut self) {
self.packet_priority_queue.clear();
self.message_hash_to_transaction.clear();
}
fn deserialize_packet(packet: &Packet) -> Option<DeserializedPacket> {
let versioned_transaction: VersionedTransaction =
match limited_deserialize(&packet.data[0..packet.meta.size]) {
Ok(tx) => tx,
Err(_) => return None,
};
/// Insert new `deserizlized_packet_batch` into inner `MinMaxHeap<DeserializedPacket>`,
/// weighted first by the fee-per-cu, then the stake of the sender.
/// If buffer is at the max limit, the lowest weighted packet is dropped
///
/// Returns tuple of number of packets dropped
pub fn insert_batch(
&mut self,
deserialized_packets: impl Iterator<Item = DeserializedPacket>,
) -> usize {
let mut num_dropped_packets = 0;
for deserialized_packet in deserialized_packets {
if self.push(deserialized_packet).is_some() {
num_dropped_packets += 1;
}
}
num_dropped_packets
}
if let Some(message_bytes) = Self::packet_message(packet) {
let message_hash = Message::hash_raw_message(message_bytes);
let is_simple_vote = packet.meta.is_simple_vote_tx();
Some(DeserializedPacket {
versioned_transaction,
message_hash,
is_simple_vote,
})
pub fn push(&mut self, deserialized_packet: DeserializedPacket) -> Option<DeserializedPacket> {
if self
.message_hash_to_transaction
.contains_key(deserialized_packet.immutable_section().message_hash())
{
return None;
}
if self.len() == self.batch_limit {
// Optimized to not allocate by calling `MinMaxHeap::push_pop_min()`
Some(self.push_pop_min(deserialized_packet))
} else {
self.push_internal(deserialized_packet);
None
}
}
/// Read the transaction message from packet data
pub fn packet_message(packet: &Packet) -> Option<&[u8]> {
let (sig_len, sig_size) = decode_shortu16_len(&packet.data).ok()?;
let msg_start = sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))?;
let msg_end = packet.meta.size;
Some(&packet.data[msg_start..msg_end])
pub fn iter(&mut self) -> impl Iterator<Item = &DeserializedPacket> {
self.message_hash_to_transaction.values()
}
/// Returns whether the given `PacketBatch` has any more remaining unprocessed
/// transactions
pub fn update_buffered_packets_with_new_unprocessed(
&mut self,
_original_unprocessed_indexes: &[usize],
new_unprocessed_indexes: &[usize],
) -> bool {
let has_more_unprocessed_transactions = !new_unprocessed_indexes.is_empty();
if has_more_unprocessed_transactions {
self.unprocessed_packets
.retain(|index, _| new_unprocessed_indexes.contains(index));
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut DeserializedPacket> {
self.message_hash_to_transaction.iter_mut().map(|(_k, v)| v)
}
pub fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&mut DeserializedPacket) -> bool,
{
// TODO: optimize this only when number of packets
// with oudated blockhash is high
let new_packet_priority_queue: MinMaxHeap<Rc<ImmutableDeserializedPacket>> = self
.packet_priority_queue
.drain()
.filter(|immutable_packet| {
match self
.message_hash_to_transaction
.entry(*immutable_packet.message_hash())
{
Entry::Vacant(_vacant_entry) => {
panic!(
"entry {} must exist to be consistent with `packet_priority_queue`",
immutable_packet.message_hash()
);
}
Entry::Occupied(mut occupied_entry) => {
let should_retain = f(occupied_entry.get_mut());
if !should_retain {
occupied_entry.remove_entry();
}
should_retain
}
}
})
.collect();
self.packet_priority_queue = new_packet_priority_queue;
}
pub fn len(&self) -> usize {
self.packet_priority_queue.len()
}
pub fn is_empty(&self) -> bool {
self.packet_priority_queue.is_empty()
}
fn push_internal(&mut self, deserialized_packet: DeserializedPacket) {
// Push into the priority queue
self.packet_priority_queue
.push(deserialized_packet.immutable_section().clone());
// Keep track of the original packet in the tracking hashmap
self.message_hash_to_transaction.insert(
*deserialized_packet.immutable_section().message_hash(),
deserialized_packet,
);
}
/// Returns the popped minimum packet from the priority queue.
fn push_pop_min(&mut self, deserialized_packet: DeserializedPacket) -> DeserializedPacket {
let immutable_packet = deserialized_packet.immutable_section().clone();
// Push into the priority queue
let popped_immutable_packet = self.packet_priority_queue.push_pop_min(immutable_packet);
if popped_immutable_packet.message_hash()
!= deserialized_packet.immutable_section().message_hash()
{
// Remove the popped entry from the tracking hashmap. Unwrap call is safe
// because the priority queue and hashmap are kept consistent at all times.
let removed_min = self
.message_hash_to_transaction
.remove(popped_immutable_packet.message_hash())
.unwrap();
// Keep track of the original packet in the tracking hashmap
self.message_hash_to_transaction.insert(
*deserialized_packet.immutable_section().message_hash(),
deserialized_packet,
);
removed_min
} else {
self.unprocessed_packets.clear();
deserialized_packet
}
has_more_unprocessed_transactions
}
pub fn pop_max(&mut self) -> Option<DeserializedPacket> {
self.packet_priority_queue
.pop_max()
.map(|immutable_packet| {
self.message_hash_to_transaction
.remove(immutable_packet.message_hash())
.unwrap()
})
}
/// Pop up to the next `n` highest priority transactions from the queue.
/// Returns `None` if the queue is empty
pub fn pop_max_n(&mut self, n: usize) -> Option<Vec<DeserializedPacket>> {
let current_len = self.len();
if self.is_empty() {
None
} else {
let num_to_pop = std::cmp::min(current_len, n);
Some(
std::iter::from_fn(|| Some(self.pop_max().unwrap()))
.take(num_to_pop)
.collect::<Vec<DeserializedPacket>>(),
)
}
}
pub fn capacity(&self) -> usize {
self.packet_priority_queue.capacity()
}
}
pub fn deserialize_packets<'a>(
packet_batch: &'a PacketBatch,
packet_indexes: &'a [usize],
bank: Option<&'a Arc<Bank>>,
) -> impl Iterator<Item = DeserializedPacket> + 'a {
packet_indexes.iter().filter_map(move |packet_index| {
DeserializedPacket::new(packet_batch.packets[*packet_index].clone(), bank).ok()
})
}
/// Read the transaction message from packet data
pub fn packet_message(packet: &Packet) -> Result<&[u8], DeserializedPacketError> {
let (sig_len, sig_size) =
decode_shortu16_len(&packet.data).map_err(DeserializedPacketError::ShortVecError)?;
sig_len
.checked_mul(size_of::<Signature>())
.and_then(|v| v.checked_add(sig_size))
.map(|msg_start| {
let msg_end = packet.meta.size;
&packet.data[msg_start..msg_end]
})
.ok_or(DeserializedPacketError::SignatureOverflowed(sig_size))
}
/// Computes `(addition_fee + base_fee / requested_cu)` for `deserialized_packet`
fn compute_fee_per_cu(_message: &VersionedMessage, _bank: &Bank) -> u64 {
1
}
pub fn transactions_to_deserialized_packets(
transactions: &[Transaction],
) -> Result<Vec<DeserializedPacket>, DeserializedPacketError> {
transactions
.iter()
.map(|transaction| {
let packet = Packet::from_data(None, transaction)?;
DeserializedPacket::new(packet, None)
})
.collect()
}
#[cfg(test)]
@ -212,7 +393,7 @@ mod tests {
std::net::IpAddr,
};
fn packet_with_sender_stake(sender_stake: u64, ip: Option<IpAddr>) -> Packet {
fn packet_with_sender_stake(sender_stake: u64, ip: Option<IpAddr>) -> DeserializedPacket {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
@ -224,109 +405,115 @@ mod tests {
if let Some(ip) = ip {
packet.meta.addr = ip;
}
packet
DeserializedPacket::new(packet, None).unwrap()
}
fn packet_with_fee_per_cu(fee_per_cu: u64) -> DeserializedPacket {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let packet = Packet::from_data(None, &tx).unwrap();
DeserializedPacket::new_with_fee_per_cu(packet, fee_per_cu).unwrap()
}
#[test]
fn test_packet_message() {
let keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();
let blockhash = Hash::new_unique();
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash);
let packet = Packet::from_data(None, &transaction).unwrap();
fn test_unprocessed_packet_batches_insert_pop_same_packet() {
let packet = packet_with_sender_stake(1, None);
let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2);
unprocessed_packet_batches.push(packet.clone());
unprocessed_packet_batches.push(packet.clone());
// There was only one unique packet, so that one should be the
// only packet returned
assert_eq!(
DeserializedPacketBatch::packet_message(&packet)
.unwrap()
.to_vec(),
transaction.message_data()
unprocessed_packet_batches.pop_max_n(2).unwrap(),
vec![packet]
);
}
#[test]
fn test_get_packets_count() {
// create a buffer with 3 batches, each has 2 packets but only first one is valid
let batch_size = 2usize;
let batch_count = 3usize;
let unprocessed_packet_batches: UnprocessedPacketBatches = (0..batch_count)
.map(|_batch_index| {
DeserializedPacketBatch::new(
PacketBatch::new(
(0..batch_size)
.map(|packet_index| packet_with_sender_stake(packet_index as u64, None))
.collect(),
),
vec![0],
false,
)
})
.collect();
fn test_unprocessed_packet_batches_insert_minimum_packet_over_capacity() {
let heavier_packet_weight = 2;
let heavier_packet = packet_with_fee_per_cu(heavier_packet_weight);
// Assert total packets count, and unprocessed packets count
let lesser_packet_weight = heavier_packet_weight - 1;
let lesser_packet = packet_with_fee_per_cu(lesser_packet_weight);
// Test that the heavier packet is actually heavier
let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(2);
unprocessed_packet_batches.push(heavier_packet.clone());
unprocessed_packet_batches.push(lesser_packet.clone());
assert_eq!(
batch_size * batch_count,
unprocessed_packet_batches.get_packets_count()
unprocessed_packet_batches.pop_max().unwrap(),
heavier_packet
);
let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(1);
unprocessed_packet_batches.push(heavier_packet);
// Buffer is now at capacity, pushing the smaller weighted
// packet should immediately pop it
assert_eq!(
batch_count,
unprocessed_packet_batches.get_unprocessed_packets_count()
unprocessed_packet_batches
.push(lesser_packet.clone())
.unwrap(),
lesser_packet
);
}
#[test]
fn test_get_stakes_and_locators_from_empty_buffer() {
let unprocessed_packet_batches = UnprocessedPacketBatches::default();
let (stakes, locators) = unprocessed_packet_batches.get_stakes_and_locators();
fn test_unprocessed_packet_batches_pop_max_n() {
let num_packets = 10;
let packets_iter =
std::iter::repeat_with(|| packet_with_sender_stake(1, None)).take(num_packets);
let mut unprocessed_packet_batches =
UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets);
assert!(stakes.is_empty());
assert!(locators.is_empty());
}
#[test]
fn test_get_stakes_and_locators() {
solana_logger::setup();
// setup senders' address and stake
let senders: Vec<(IpAddr, u64)> = vec![
(IpAddr::from([127, 0, 0, 1]), 1),
(IpAddr::from([127, 0, 0, 2]), 2),
(IpAddr::from([127, 0, 0, 3]), 3),
];
// create a buffer with 3 batches, each has 2 packet from above sender.
// buffer looks like:
// [127.0.0.1, 127.0.0.2]
// [127.0.0.3, 127.0.0.1]
// [127.0.0.2, 127.0.0.3]
let batch_size = 2usize;
let batch_count = 3usize;
let unprocessed_packet_batches: UnprocessedPacketBatches = (0..batch_count)
.map(|batch_index| {
DeserializedPacketBatch::new(
PacketBatch::new(
(0..batch_size)
.map(|packet_index| {
let n = (batch_index * batch_size + packet_index) % senders.len();
packet_with_sender_stake(senders[n].1, Some(senders[n].0))
})
.collect(),
),
(0..batch_size).collect(),
false,
)
})
.collect();
let (stakes, locators) = unprocessed_packet_batches.get_stakes_and_locators();
// Produced stakes and locators should both have "batch_size * batch_count" entries;
assert_eq!(batch_size * batch_count, stakes.len());
assert_eq!(batch_size * batch_count, locators.len());
// Assert stakes and locators are in good order
locators.iter().enumerate().for_each(|(index, locator)| {
// Test with small step size
let step_size = 1;
for _ in 0..num_packets {
assert_eq!(
stakes[index],
senders[(locator.batch_index * batch_size + locator.packet_index) % senders.len()]
.1
unprocessed_packet_batches
.pop_max_n(step_size)
.unwrap()
.len(),
step_size
);
});
}
assert!(unprocessed_packet_batches.is_empty());
assert!(unprocessed_packet_batches.pop_max_n(0).is_none());
assert!(unprocessed_packet_batches.pop_max_n(1).is_none());
// Test with step size larger than `num_packets`
let step_size = num_packets + 1;
let mut unprocessed_packet_batches =
UnprocessedPacketBatches::from_iter(packets_iter.clone(), num_packets);
assert_eq!(
unprocessed_packet_batches
.pop_max_n(step_size)
.unwrap()
.len(),
num_packets
);
assert!(unprocessed_packet_batches.is_empty());
assert!(unprocessed_packet_batches.pop_max_n(0).is_none());
// Test with step size equal to `num_packets`
let step_size = num_packets;
let mut unprocessed_packet_batches =
UnprocessedPacketBatches::from_iter(packets_iter, num_packets);
assert_eq!(
unprocessed_packet_batches
.pop_max_n(step_size)
.unwrap()
.len(),
step_size
);
assert!(unprocessed_packet_batches.is_empty());
assert!(unprocessed_packet_batches.pop_max_n(0).is_none());
}
}

View File

@ -2140,6 +2140,12 @@ version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "min-max-heap"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2687e6cf9c00f48e9284cf9fd15f2ef341d03cc7743abf9df4c5f07fdee50b18"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -4288,6 +4294,7 @@ dependencies = [
"itertools",
"log",
"lru",
"min-max-heap",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",

View File

@ -25,7 +25,7 @@ bitflags! {
}
}
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
#[repr(C)]
pub struct Meta {
pub size: usize,
@ -35,7 +35,7 @@ pub struct Meta {
pub sender_stake: u64,
}
#[derive(Clone)]
#[derive(Clone, Eq)]
#[repr(C)]
pub struct Packet {
pub data: [u8; PACKET_DATA_SIZE],