2022-09-01 08:00:48 -07:00
|
|
|
//! Deserializes packets from sigverify stage. Owned by banking stage.
|
|
|
|
|
|
|
|
use {
|
|
|
|
crate::{
|
|
|
|
immutable_deserialized_packet::ImmutableDeserializedPacket,
|
|
|
|
sigverify::SigverifyTracerPacketStats,
|
|
|
|
},
|
|
|
|
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
|
|
|
|
solana_perf::packet::PacketBatch,
|
|
|
|
std::time::{Duration, Instant},
|
|
|
|
};
|
|
|
|
|
|
|
|
pub type BankingPacketBatch = (Vec<PacketBatch>, Option<SigverifyTracerPacketStats>);
|
|
|
|
pub type BankingPacketReceiver = CrossbeamReceiver<BankingPacketBatch>;
|
|
|
|
|
|
|
|
/// Results from deserializing packet batches.
|
|
|
|
pub struct ReceivePacketResults {
|
|
|
|
/// Deserialized packets from all received packet batches
|
|
|
|
pub deserialized_packets: Vec<ImmutableDeserializedPacket>,
|
|
|
|
/// Aggregate tracer stats for all received packet batches
|
|
|
|
pub new_tracer_stats_option: Option<SigverifyTracerPacketStats>,
|
|
|
|
/// Number of packets passing sigverify
|
|
|
|
pub passed_sigverify_count: u64,
|
|
|
|
/// Number of packets failing sigverify
|
|
|
|
pub failed_sigverify_count: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct PacketDeserializer {
|
|
|
|
/// Receiver for packet batches from sigverify stage
|
|
|
|
packet_batch_receiver: BankingPacketReceiver,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PacketDeserializer {
|
|
|
|
pub fn new(packet_batch_receiver: BankingPacketReceiver) -> Self {
|
|
|
|
Self {
|
|
|
|
packet_batch_receiver,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Handles receiving packet batches from sigverify and returns a vector of deserialized packets
|
|
|
|
pub fn handle_received_packets(
|
|
|
|
&self,
|
|
|
|
recv_timeout: Duration,
|
|
|
|
capacity: usize,
|
|
|
|
) -> Result<ReceivePacketResults, RecvTimeoutError> {
|
|
|
|
let (packet_batches, sigverify_tracer_stats_option) =
|
|
|
|
self.receive_until(recv_timeout, capacity)?;
|
2022-09-06 13:54:31 -07:00
|
|
|
Ok(Self::deserialize_and_collect_packets(
|
|
|
|
&packet_batches,
|
|
|
|
sigverify_tracer_stats_option,
|
|
|
|
))
|
|
|
|
}
|
2022-09-01 08:00:48 -07:00
|
|
|
|
2022-09-06 13:54:31 -07:00
|
|
|
/// Deserialize packet batches and collect them into ReceivePacketResults
|
|
|
|
fn deserialize_and_collect_packets(
|
|
|
|
packet_batches: &[PacketBatch],
|
|
|
|
sigverify_tracer_stats_option: Option<SigverifyTracerPacketStats>,
|
|
|
|
) -> ReceivePacketResults {
|
2022-09-01 08:00:48 -07:00
|
|
|
let packet_count: usize = packet_batches.iter().map(|x| x.len()).sum();
|
|
|
|
let mut passed_sigverify_count: usize = 0;
|
|
|
|
let mut failed_sigverify_count: usize = 0;
|
|
|
|
let mut deserialized_packets = Vec::with_capacity(packet_count);
|
|
|
|
for packet_batch in packet_batches {
|
2022-09-06 13:54:31 -07:00
|
|
|
let packet_indexes = Self::generate_packet_indexes(packet_batch);
|
2022-09-01 08:00:48 -07:00
|
|
|
|
|
|
|
passed_sigverify_count += packet_indexes.len();
|
|
|
|
failed_sigverify_count += packet_batch.len().saturating_sub(packet_indexes.len());
|
|
|
|
|
2022-09-06 13:54:31 -07:00
|
|
|
deserialized_packets.extend(Self::deserialize_packets(packet_batch, &packet_indexes));
|
2022-09-01 08:00:48 -07:00
|
|
|
}
|
|
|
|
|
2022-09-06 13:54:31 -07:00
|
|
|
ReceivePacketResults {
|
2022-09-01 08:00:48 -07:00
|
|
|
deserialized_packets,
|
|
|
|
new_tracer_stats_option: sigverify_tracer_stats_option,
|
|
|
|
passed_sigverify_count: passed_sigverify_count as u64,
|
|
|
|
failed_sigverify_count: failed_sigverify_count as u64,
|
2022-09-06 13:54:31 -07:00
|
|
|
}
|
2022-09-01 08:00:48 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Receives packet batches from sigverify stage with a timeout, and aggregates tracer packet stats
|
|
|
|
fn receive_until(
|
|
|
|
&self,
|
|
|
|
recv_timeout: Duration,
|
|
|
|
packet_count_upperbound: usize,
|
|
|
|
) -> Result<(Vec<PacketBatch>, Option<SigverifyTracerPacketStats>), RecvTimeoutError> {
|
|
|
|
let start = Instant::now();
|
2023-01-16 21:21:17 -08:00
|
|
|
|
|
|
|
let (mut packet_batches_received_so_far, mut aggregated_tracer_packet_stats_option) =
|
2022-09-01 08:00:48 -07:00
|
|
|
self.packet_batch_receiver.recv_timeout(recv_timeout)?;
|
2023-01-16 21:21:17 -08:00
|
|
|
let mut num_packets_received = packet_batches_received_so_far
|
|
|
|
.iter()
|
|
|
|
.map(|batch| batch.len())
|
|
|
|
.sum::<usize>();
|
2022-09-01 08:00:48 -07:00
|
|
|
|
2023-01-16 21:21:17 -08:00
|
|
|
while let Ok((packet_batches, tracer_packet_stats_option)) =
|
2022-09-01 08:00:48 -07:00
|
|
|
self.packet_batch_receiver.try_recv()
|
|
|
|
{
|
|
|
|
trace!("got more packet batches in packet deserializer");
|
2023-01-16 21:21:17 -08:00
|
|
|
num_packets_received += packet_batches
|
|
|
|
.iter()
|
|
|
|
.map(|batch| batch.len())
|
|
|
|
.sum::<usize>();
|
|
|
|
packet_batches_received_so_far.extend(packet_batches);
|
2022-09-01 08:00:48 -07:00
|
|
|
|
|
|
|
if let Some(tracer_packet_stats) = &tracer_packet_stats_option {
|
|
|
|
if let Some(aggregated_tracer_packet_stats) =
|
|
|
|
&mut aggregated_tracer_packet_stats_option
|
|
|
|
{
|
|
|
|
aggregated_tracer_packet_stats.aggregate(tracer_packet_stats);
|
|
|
|
} else {
|
|
|
|
aggregated_tracer_packet_stats_option = tracer_packet_stats_option;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-16 21:21:17 -08:00
|
|
|
if start.elapsed() >= recv_timeout || num_packets_received >= packet_count_upperbound {
|
2022-09-01 08:00:48 -07:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-01-16 21:21:17 -08:00
|
|
|
Ok((
|
|
|
|
packet_batches_received_so_far,
|
|
|
|
aggregated_tracer_packet_stats_option,
|
|
|
|
))
|
2022-09-01 08:00:48 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
fn generate_packet_indexes(packet_batch: &PacketBatch) -> Vec<usize> {
|
|
|
|
packet_batch
|
|
|
|
.iter()
|
|
|
|
.enumerate()
|
2022-12-06 03:54:49 -08:00
|
|
|
.filter(|(_, pkt)| !pkt.meta().discard())
|
2022-09-01 08:00:48 -07:00
|
|
|
.map(|(index, _)| index)
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn deserialize_packets<'a>(
|
|
|
|
packet_batch: &'a PacketBatch,
|
|
|
|
packet_indexes: &'a [usize],
|
|
|
|
) -> impl Iterator<Item = ImmutableDeserializedPacket> + 'a {
|
|
|
|
packet_indexes.iter().filter_map(move |packet_index| {
|
|
|
|
ImmutableDeserializedPacket::new(packet_batch[*packet_index].clone(), None).ok()
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
2022-09-07 10:52:18 -07:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use {
|
|
|
|
super::*,
|
|
|
|
solana_perf::packet::to_packet_batches,
|
|
|
|
solana_sdk::{
|
|
|
|
hash::Hash, pubkey::Pubkey, signature::Keypair, system_transaction,
|
|
|
|
transaction::Transaction,
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
fn random_transfer() -> Transaction {
|
|
|
|
system_transaction::transfer(&Keypair::new(), &Pubkey::new_unique(), 1, Hash::default())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_deserialize_and_collect_packets_empty() {
|
|
|
|
let results = PacketDeserializer::deserialize_and_collect_packets(&[], None);
|
|
|
|
assert_eq!(results.deserialized_packets.len(), 0);
|
|
|
|
assert!(results.new_tracer_stats_option.is_none());
|
|
|
|
assert_eq!(results.passed_sigverify_count, 0);
|
|
|
|
assert_eq!(results.failed_sigverify_count, 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_deserialize_and_collect_packets_simple_batches() {
|
|
|
|
let transactions = vec![random_transfer(), random_transfer()];
|
|
|
|
let packet_batches = to_packet_batches(&transactions, 1);
|
|
|
|
assert_eq!(packet_batches.len(), 2);
|
|
|
|
|
|
|
|
let results = PacketDeserializer::deserialize_and_collect_packets(&packet_batches, None);
|
|
|
|
assert_eq!(results.deserialized_packets.len(), 2);
|
|
|
|
assert!(results.new_tracer_stats_option.is_none());
|
|
|
|
assert_eq!(results.passed_sigverify_count, 2);
|
|
|
|
assert_eq!(results.failed_sigverify_count, 0);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_deserialize_and_collect_packets_simple_batches_with_failure() {
|
|
|
|
let transactions = vec![random_transfer(), random_transfer()];
|
|
|
|
let mut packet_batches = to_packet_batches(&transactions, 1);
|
|
|
|
assert_eq!(packet_batches.len(), 2);
|
2022-12-06 03:54:49 -08:00
|
|
|
packet_batches[0][0].meta_mut().set_discard(true);
|
2022-09-07 10:52:18 -07:00
|
|
|
|
|
|
|
let results = PacketDeserializer::deserialize_and_collect_packets(&packet_batches, None);
|
|
|
|
assert_eq!(results.deserialized_packets.len(), 1);
|
|
|
|
assert!(results.new_tracer_stats_option.is_none());
|
|
|
|
assert_eq!(results.passed_sigverify_count, 1);
|
|
|
|
assert_eq!(results.failed_sigverify_count, 1);
|
|
|
|
}
|
|
|
|
}
|