Refactor Sigverify trait (#25359)

This commit is contained in:
carllin 2022-05-24 16:01:41 -05:00 committed by GitHub
parent e1684e94f5
commit 9651cdad99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 361 additions and 117 deletions

View File

@ -373,7 +373,9 @@ fn main() {
packet_batch_index,
timestamp(),
);
verified_sender.send(vec![packet_batch.clone()]).unwrap();
verified_sender
.send((vec![packet_batch.clone()], None))
.unwrap();
}
for tx in &packets_for_this_iteration.transactions {

View File

@ -256,7 +256,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
for xv in v {
sent += xv.len();
}
verified_sender.send(v.to_vec()).unwrap();
verified_sender.send((v.to_vec(), None)).unwrap();
}
check_txs(&signal_receiver2, txes / CHUNKS);

View File

@ -51,7 +51,7 @@ fn run_bench_packet_discard(num_ips: usize, bencher: &mut Bencher) {
info!("total packets: {}", total);
bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ());
let mut num_packets = 0;
for batch in batches.iter_mut() {
for p in batch.iter_mut() {
@ -98,7 +98,7 @@ fn bench_packet_discard_mixed_senders(bencher: &mut Bencher) {
}
}
bencher.iter(move || {
SigVerifyStage::discard_excess_packets(&mut batches, 10_000);
SigVerifyStage::discard_excess_packets(&mut batches, 10_000, |_| ());
let mut num_packets = 0;
for batch in batches.iter_mut() {
for packet in batch.iter_mut() {
@ -142,8 +142,8 @@ fn bench_sigverify_stage(bencher: &mut Bencher) {
trace!("start");
let (packet_s, packet_r) = unbounded();
let (verified_s, verified_r) = unbounded();
let verifier = TransactionSigVerifier::default();
let stage = SigVerifyStage::new(packet_r, verified_s, verifier, "bench");
let verifier = TransactionSigVerifier::new(verified_s);
let stage = SigVerifyStage::new(packet_r, verifier, "bench");
let use_same_tx = true;
bencher.iter(move || {
@ -165,7 +165,7 @@ fn bench_sigverify_stage(bencher: &mut Bencher) {
let mut received = 0;
trace!("sent: {}", sent_len);
loop {
if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) {
if let Ok((mut verifieds, _)) = verified_r.recv_timeout(Duration::from_millis(10)) {
while let Some(v) = verifieds.pop() {
received += v.len();
batches.push(v);

View File

@ -8,9 +8,12 @@ use {
LeaderExecuteAndCommitTimings, RecordTransactionsTimings,
},
qos_service::QosService,
sigverify::TransactionTracerPacketStats,
unprocessed_packet_batches::{self, *},
},
crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError},
crossbeam_channel::{
Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender,
},
histogram::Histogram,
itertools::Itertools,
min_max_heap::MinMaxHeap,
@ -87,6 +90,9 @@ const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING
const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128;
const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);
pub type BankingPacketBatch = (Vec<PacketBatch>, Option<TransactionTracerPacketStats>);
pub type BankingPacketSender = CrossbeamSender<BankingPacketBatch>;
pub type BankingPacketReceiver = CrossbeamReceiver<BankingPacketBatch>;
pub struct ProcessTransactionBatchOutput {
// The number of transactions filtered out by the cost model
@ -381,9 +387,9 @@ impl BankingStage {
pub fn new(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: BankingPacketReceiver,
tpu_verified_vote_receiver: BankingPacketReceiver,
verified_vote_receiver: BankingPacketReceiver,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
cost_model: Arc<RwLock<CostModel>>,
@ -405,9 +411,9 @@ impl BankingStage {
pub fn new_num_threads(
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
tpu_verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_vote_receiver: CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: BankingPacketReceiver,
tpu_verified_vote_receiver: BankingPacketReceiver,
verified_vote_receiver: BankingPacketReceiver,
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
gossip_vote_sender: ReplayVoteSender,
@ -989,7 +995,7 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
fn process_loop(
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: &BankingPacketReceiver,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &ClusterInfo,
recv_start: &mut Instant,
@ -1984,14 +1990,15 @@ impl BankingStage {
}
fn receive_until(
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: &BankingPacketReceiver,
recv_timeout: Duration,
packet_count_upperbound: usize,
) -> Result<Vec<PacketBatch>, RecvTimeoutError> {
let start = Instant::now();
let mut packet_batches = verified_receiver.recv_timeout(recv_timeout)?;
let (mut packet_batches, _tracer_packet_stats_option) =
verified_receiver.recv_timeout(recv_timeout)?;
let mut num_packets_received: usize = packet_batches.iter().map(|batch| batch.len()).sum();
while let Ok(packet_batch) = verified_receiver.try_recv() {
while let Ok((packet_batch, _tracer_packet_stats_option)) = verified_receiver.try_recv() {
trace!("got more packet batches in banking stage");
let (packets_received, packet_count_overflowed) = num_packets_received
.overflowing_add(packet_batch.iter().map(|batch| batch.len()).sum());
@ -2013,7 +2020,7 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
/// Receive incoming packets, push into unprocessed buffer with packet indexes
fn receive_and_buffer_packets(
verified_receiver: &CrossbeamReceiver<Vec<PacketBatch>>,
verified_receiver: &BankingPacketReceiver,
recv_start: &mut Instant,
recv_timeout: Duration,
id: u32,
@ -2414,7 +2421,7 @@ mod tests {
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
verified_sender // no_ver, anf, tx
.send(packet_batches)
.send((packet_batches, None))
.unwrap();
drop(verified_sender);
@ -2486,7 +2493,7 @@ mod tests {
.map(|batch| (batch, vec![1u8]))
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send(packet_batches).unwrap();
verified_sender.send((packet_batches, None)).unwrap();
// Process a second batch that uses the same from account, so conflicts with above TX
let tx =
@ -2497,7 +2504,7 @@ mod tests {
.map(|batch| (batch, vec![1u8]))
.collect();
let packet_batches = convert_from_old_verified(packet_batches);
verified_sender.send(packet_batches).unwrap();
verified_sender.send((packet_batches, None)).unwrap();
let (vote_sender, vote_receiver) = unbounded();
let (tpu_vote_sender, tpu_vote_receiver) = unbounded();

View File

@ -1,5 +1,6 @@
use {
crate::{
banking_stage::BankingPacketSender,
optimistic_confirmation_verifier::OptimisticConfirmationVerifier,
replay_stage::DUPLICATE_THRESHOLD,
result::{Error, Result},
@ -18,7 +19,7 @@ use {
solana_ledger::blockstore::Blockstore,
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_debug,
solana_perf::packet::{self, PacketBatch},
solana_perf::packet,
solana_poh::poh_recorder::PohRecorder,
solana_rpc::{
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender},
@ -190,7 +191,7 @@ impl ClusterInfoVoteListener {
pub fn new(
exit: Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>,
verified_packets_sender: Sender<Vec<PacketBatch>>,
verified_packets_sender: BankingPacketSender,
poh_recorder: Arc<Mutex<PohRecorder>>,
vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>,
@ -333,7 +334,7 @@ impl ClusterInfoVoteListener {
exit: Arc<AtomicBool>,
verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver,
poh_recorder: Arc<Mutex<PohRecorder>>,
verified_packets_sender: &Sender<Vec<PacketBatch>>,
verified_packets_sender: &BankingPacketSender,
) -> Result<()> {
let mut verified_vote_packets = VerifiedVotePackets::default();
let mut time_since_lock = Instant::now();
@ -382,7 +383,7 @@ impl ClusterInfoVoteListener {
fn check_for_leader_bank_and_send_votes(
bank_vote_sender_state_option: &mut Option<BankVoteSenderState>,
current_working_bank: Arc<Bank>,
verified_packets_sender: &Sender<Vec<PacketBatch>>,
verified_packets_sender: &BankingPacketSender,
verified_vote_packets: &VerifiedVotePackets,
) -> Result<()> {
// We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS`
@ -423,7 +424,7 @@ impl ClusterInfoVoteListener {
for single_validator_votes in gossip_votes_iterator {
bank_send_votes_stats.num_votes_sent += single_validator_votes.len();
bank_send_votes_stats.num_batches_sent += 1;
verified_packets_sender.send(single_validator_votes)?;
verified_packets_sender.send((single_validator_votes, None))?;
}
filter_gossip_votes_timing.stop();
bank_send_votes_stats.total_elapsed += filter_gossip_votes_timing.as_us();

View File

@ -8,30 +8,45 @@ pub use solana_perf::sigverify::{
count_packets_in_batches, ed25519_verify_cpu, ed25519_verify_disabled, init, TxOffset,
};
use {
crate::sigverify_stage::SigVerifier,
crate::{
banking_stage::BankingPacketBatch,
sigverify_stage::{SigVerifier, SigVerifyServiceError},
},
crossbeam_channel::Sender,
solana_perf::{cuda_runtime::PinnedVec, packet::PacketBatch, recycler::Recycler, sigverify},
solana_sdk::packet::Packet,
};
#[derive(Debug, Default, Clone)]
pub struct TransactionTracerPacketStats {
pub total_removed_before_sigverify_stage: usize,
pub total_tracer_packets_received_in_sigverify_stage: usize,
pub total_tracer_packets_deduped: usize,
pub total_excess_tracer_packets: usize,
pub total_tracker_packets_passed_sigverify: usize,
}
#[derive(Clone)]
pub struct TransactionSigVerifier {
packet_sender: Sender<<Self as SigVerifier>::SendType>,
tracer_packet_stats: TransactionTracerPacketStats,
recycler: Recycler<TxOffset>,
recycler_out: Recycler<PinnedVec<u8>>,
reject_non_vote: bool,
}
impl TransactionSigVerifier {
pub fn new_reject_non_vote() -> Self {
TransactionSigVerifier {
reject_non_vote: true,
..TransactionSigVerifier::default()
}
pub fn new_reject_non_vote(packet_sender: Sender<<Self as SigVerifier>::SendType>) -> Self {
let mut new_self = Self::new(packet_sender);
new_self.reject_non_vote = true;
new_self
}
}
impl Default for TransactionSigVerifier {
fn default() -> Self {
pub fn new(packet_sender: Sender<<Self as SigVerifier>::SendType>) -> Self {
init();
Self {
packet_sender,
tracer_packet_stats: TransactionTracerPacketStats::default(),
recycler: Recycler::warmed(50, 4096),
recycler_out: Recycler::warmed(50, 4096),
reject_non_vote: false,
@ -40,6 +55,58 @@ impl Default for TransactionSigVerifier {
}
impl SigVerifier for TransactionSigVerifier {
type SendType = BankingPacketBatch;
#[inline(always)]
fn process_received_packet(
&mut self,
packet: &mut Packet,
removed_before_sigverify_stage: bool,
is_dup: bool,
) {
if packet.meta.is_tracer_packet() {
if removed_before_sigverify_stage {
self.tracer_packet_stats
.total_removed_before_sigverify_stage += 1;
} else {
self.tracer_packet_stats
.total_tracer_packets_received_in_sigverify_stage += 1;
if is_dup {
self.tracer_packet_stats.total_tracer_packets_deduped += 1;
}
}
}
}
#[inline(always)]
fn process_excess_packet(&mut self, packet: &Packet) {
if packet.meta.is_tracer_packet() {
self.tracer_packet_stats.total_excess_tracer_packets += 1;
}
}
#[inline(always)]
fn process_passed_sigverify_packet(&mut self, packet: &Packet) {
if packet.meta.is_tracer_packet() {
self.tracer_packet_stats
.total_tracker_packets_passed_sigverify += 1;
}
}
fn send_packets(
&mut self,
packet_batches: Vec<PacketBatch>,
) -> Result<(), SigVerifyServiceError<Self::SendType>> {
let mut tracer_packet_stats_to_send = TransactionTracerPacketStats::default();
std::mem::swap(
&mut tracer_packet_stats_to_send,
&mut self.tracer_packet_stats,
);
self.packet_sender
.send((packet_batches, Some(tracer_packet_stats_to_send)))?;
Ok(())
}
fn verify_batches(
&self,
mut batches: Vec<PacketBatch>,

View File

@ -1,6 +1,11 @@
#![allow(clippy::implicit_hasher)]
use {
crate::{sigverify, sigverify_stage::SigVerifier},
crate::{
sigverify,
sigverify_stage::{SigVerifier, SigVerifyServiceError},
},
crossbeam_channel::Sender,
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, shred::Shred,
sigverify_shreds::verify_shreds_gpu,
@ -18,18 +23,21 @@ pub struct ShredSigVerifier {
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
recycler_cache: RecyclerCache,
packet_sender: Sender<Vec<PacketBatch>>,
}
impl ShredSigVerifier {
pub fn new(
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
packet_sender: Sender<Vec<PacketBatch>>,
) -> Self {
sigverify::init();
Self {
bank_forks,
leader_schedule_cache,
recycler_cache: RecyclerCache::warmed(),
packet_sender,
}
}
fn read_slots(batches: &[PacketBatch]) -> HashSet<u64> {
@ -41,6 +49,16 @@ impl ShredSigVerifier {
}
impl SigVerifier for ShredSigVerifier {
type SendType = Vec<PacketBatch>;
fn send_packets(
&mut self,
packet_batches: Vec<PacketBatch>,
) -> Result<(), SigVerifyServiceError<Self::SendType>> {
self.packet_sender.send(packet_batches)?;
Ok(())
}
fn verify_batches(
&self,
mut batches: Vec<PacketBatch>,
@ -69,6 +87,7 @@ impl SigVerifier for ShredSigVerifier {
pub mod tests {
use {
super::*,
crossbeam_channel::unbounded,
solana_ledger::{
genesis_utils::create_genesis_config_with_leader,
shred::{Shred, ShredFlags},
@ -131,7 +150,8 @@ pub mod tests {
);
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let bf = Arc::new(RwLock::new(BankForks::new(bank)));
let verifier = ShredSigVerifier::new(bf, cache);
let (sender, receiver) = unbounded();
let mut verifier = ShredSigVerifier::new(bf, cache, sender);
let batch_size = 2;
let mut batch = PacketBatch::with_capacity(batch_size);
@ -171,5 +191,16 @@ pub mod tests {
let rv = verifier.verify_batches(batches, num_packets);
assert!(!rv[0][0].meta.discard());
assert!(rv[0][1].meta.discard());
verifier.send_packets(rv.clone()).unwrap();
let received_packets = receiver.recv().unwrap();
assert_eq!(received_packets.len(), rv.len());
for (received_packet_batch, original_packet_batch) in received_packets.iter().zip(rv.iter())
{
assert_eq!(
received_packet_batch.iter().collect::<Vec<_>>(),
original_packet_batch.iter().collect::<Vec<_>>()
);
}
}
}

View File

@ -8,11 +8,11 @@
use {
crate::{find_packet_sender_stake_stage, sigverify},
core::time::Duration,
crossbeam_channel::{RecvTimeoutError, SendError, Sender},
crossbeam_channel::{RecvTimeoutError, SendError},
itertools::Itertools,
solana_measure::measure::Measure,
solana_perf::{
packet::PacketBatch,
packet::{Packet, PacketBatch},
sigverify::{count_valid_packets, shrink_batches, Deduper},
},
solana_sdk::timing,
@ -33,22 +33,33 @@ const MAX_DEDUP_BATCH: usize = 165_000;
const MAX_SIGVERIFY_BATCH: usize = 2_000;
#[derive(Error, Debug)]
pub enum SigVerifyServiceError {
pub enum SigVerifyServiceError<SendType> {
#[error("send packets batch error")]
Send(#[from] SendError<Vec<PacketBatch>>),
Send(#[from] SendError<SendType>),
#[error("streamer error")]
Streamer(#[from] StreamerError),
}
type Result<T> = std::result::Result<T, SigVerifyServiceError>;
type Result<T, SendType> = std::result::Result<T, SigVerifyServiceError<SendType>>;
pub struct SigVerifyStage {
thread_hdl: JoinHandle<()>,
}
pub trait SigVerifier {
type SendType: std::fmt::Debug;
fn verify_batches(&self, batches: Vec<PacketBatch>, valid_packets: usize) -> Vec<PacketBatch>;
fn process_received_packet(
&mut self,
_packet: &mut Packet,
_removed_before_sigverify_stage: bool,
_is_dup: bool,
) {
}
fn process_excess_packet(&mut self, _packet: &Packet) {}
fn process_passed_sigverify_packet(&mut self, _packet: &Packet) {}
fn send_packets(&mut self, packet_batches: Vec<PacketBatch>) -> Result<(), Self::SendType>;
}
#[derive(Default, Clone)]
@ -199,6 +210,7 @@ impl SigVerifierStats {
}
impl SigVerifier for DisabledSigVerifier {
type SendType = ();
fn verify_batches(
&self,
mut batches: Vec<PacketBatch>,
@ -207,21 +219,28 @@ impl SigVerifier for DisabledSigVerifier {
sigverify::ed25519_verify_disabled(&mut batches);
batches
}
fn send_packets(&mut self, _packet_batches: Vec<PacketBatch>) -> Result<(), Self::SendType> {
Ok(())
}
}
impl SigVerifyStage {
#[allow(clippy::new_ret_no_self)]
pub fn new<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
verified_sender: Sender<Vec<PacketBatch>>,
verifier: T,
name: &'static str,
) -> Self {
let thread_hdl = Self::verifier_services(packet_receiver, verified_sender, verifier, name);
let thread_hdl = Self::verifier_services(packet_receiver, verifier, name);
Self { thread_hdl }
}
pub fn discard_excess_packets(batches: &mut [PacketBatch], mut max_packets: usize) {
pub fn discard_excess_packets(
batches: &mut [PacketBatch],
mut max_packets: usize,
mut process_excess_packet: impl FnMut(&Packet),
) {
// Group packets by their incoming IP address.
let mut addrs = batches
.iter_mut()
@ -242,6 +261,7 @@ impl SigVerifyStage {
}
// Discard excess packets from each address.
for packet in addrs.into_values().flatten() {
process_excess_packet(packet);
packet.meta.set_discard(true);
}
}
@ -249,10 +269,9 @@ impl SigVerifyStage {
fn verifier<T: SigVerifier>(
deduper: &Deduper,
recvr: &find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
sendr: &Sender<Vec<PacketBatch>>,
verifier: &T,
verifier: &mut T,
stats: &mut SigVerifierStats,
) -> Result<()> {
) -> Result<(), T::SendType> {
let (mut batches, num_packets, recv_duration) = streamer::recv_vec_packet_batches(recvr)?;
let batches_len = batches.len();
@ -272,14 +291,29 @@ impl SigVerifyStage {
discard_random_time.stop();
let mut dedup_time = Measure::start("sigverify_dedup_time");
let discard_or_dedup_fail = deduper.dedup_packets_and_count_discards(&mut batches) as usize;
let discard_or_dedup_fail = deduper.dedup_packets_and_count_discards(
&mut batches,
#[inline(always)]
|received_packet, removed_before_sigverify_stage, is_dup| {
verifier.process_received_packet(
received_packet,
removed_before_sigverify_stage,
is_dup,
);
},
) as usize;
dedup_time.stop();
let num_unique = non_discarded_packets.saturating_sub(discard_or_dedup_fail);
let mut discard_time = Measure::start("sigverify_discard_time");
let mut num_valid_packets = num_unique;
if num_unique > MAX_SIGVERIFY_BATCH {
Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH);
Self::discard_excess_packets(
&mut batches,
MAX_SIGVERIFY_BATCH,
#[inline(always)]
|excess_packet| verifier.process_excess_packet(excess_packet),
);
num_valid_packets = MAX_SIGVERIFY_BATCH;
}
let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH);
@ -290,7 +324,11 @@ impl SigVerifyStage {
verify_time.stop();
let mut shrink_time = Measure::start("sigverify_shrink_time");
let num_valid_packets = count_valid_packets(&batches);
let num_valid_packets = count_valid_packets(
&batches,
#[inline(always)]
|valid_packet| verifier.process_passed_sigverify_packet(valid_packet),
);
let start_len = batches.len();
const MAX_EMPTY_BATCH_RATIO: usize = 4;
if non_discarded_packets > num_valid_packets.saturating_mul(MAX_EMPTY_BATCH_RATIO) {
@ -300,7 +338,7 @@ impl SigVerifyStage {
let total_shrinks = start_len.saturating_sub(batches.len());
shrink_time.stop();
sendr.send(batches)?;
verifier.send_packets(batches)?;
debug!(
"@{:?} verifier: done. batches: {} total verify time: {:?} verified: {} v/s {}",
@ -347,11 +385,9 @@ impl SigVerifyStage {
fn verifier_service<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
verified_sender: Sender<Vec<PacketBatch>>,
verifier: &T,
mut verifier: T,
name: &'static str,
) -> JoinHandle<()> {
let verifier = verifier.clone();
let mut stats = SigVerifierStats::default();
let mut last_print = Instant::now();
const MAX_DEDUPER_AGE: Duration = Duration::from_secs(2);
@ -362,13 +398,9 @@ impl SigVerifyStage {
let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE);
loop {
deduper.reset();
if let Err(e) = Self::verifier(
&deduper,
&packet_receiver,
&verified_sender,
&verifier,
&mut stats,
) {
if let Err(e) =
Self::verifier(&deduper, &packet_receiver, &mut verifier, &mut stats)
{
match e {
SigVerifyServiceError::Streamer(StreamerError::RecvTimeout(
RecvTimeoutError::Disconnected,
@ -394,11 +426,10 @@ impl SigVerifyStage {
fn verifier_services<T: SigVerifier + 'static + Send + Clone>(
packet_receiver: find_packet_sender_stake_stage::FindPacketSenderStakeReceiver,
verified_sender: Sender<Vec<PacketBatch>>,
verifier: T,
name: &'static str,
) -> JoinHandle<()> {
Self::verifier_service(packet_receiver, verified_sender, &verifier, name)
Self::verifier_service(packet_receiver, verifier, name)
}
pub fn join(self) -> thread::Result<()> {
@ -416,6 +447,7 @@ mod tests {
packet::{to_packet_batches, Packet},
test_tx::test_tx,
},
solana_sdk::packet::PacketFlags,
};
fn count_non_discard(packet_batches: &[PacketBatch]) -> usize {
@ -435,27 +467,48 @@ mod tests {
solana_logger::setup();
let batch_size = 10;
let mut batch = PacketBatch::with_capacity(batch_size);
batch.resize(batch_size, Packet::default());
let mut tracer_packet = Packet::default();
tracer_packet.meta.flags |= PacketFlags::TRACER_PACKET;
batch.resize(batch_size, tracer_packet);
batch[3].meta.addr = std::net::IpAddr::from([1u16; 8]);
batch[3].meta.set_discard(true);
let num_discarded_before_filter = 1;
batch[4].meta.addr = std::net::IpAddr::from([2u16; 8]);
let total_num_packets = batch.len();
let mut batches = vec![batch];
let max = 3;
SigVerifyStage::discard_excess_packets(&mut batches, max);
assert_eq!(count_non_discard(&batches), max);
let mut total_tracer_packets_discarded = 0;
SigVerifyStage::discard_excess_packets(&mut batches, max, |packet| {
if packet.meta.is_tracer_packet() {
total_tracer_packets_discarded += 1;
}
});
let total_non_discard = count_non_discard(&batches);
let total_discarded = total_num_packets - total_non_discard;
// Every packet except the packets already marked `discard` before the call
// to `discard_excess_packets()` should count towards the
// `total_tracer_packets_discarded`
assert_eq!(
total_tracer_packets_discarded,
total_discarded - num_discarded_before_filter
);
assert_eq!(total_non_discard, max);
assert!(!batches[0][0].meta.discard());
assert!(batches[0][3].meta.discard());
assert!(!batches[0][4].meta.discard());
}
fn gen_batches(use_same_tx: bool) -> Vec<PacketBatch> {
let len = 4096;
let chunk_size = 1024;
fn gen_batches(
use_same_tx: bool,
packets_per_batch: usize,
total_packets: usize,
) -> Vec<PacketBatch> {
if use_same_tx {
let tx = test_tx();
to_packet_batches(&vec![tx; len], chunk_size)
to_packet_batches(&vec![tx; total_packets], packets_per_batch)
} else {
let txs: Vec<_> = (0..len).map(|_| test_tx()).collect();
to_packet_batches(&txs, chunk_size)
let txs: Vec<_> = (0..total_packets).map(|_| test_tx()).collect();
to_packet_batches(&txs, packets_per_batch)
}
}
@ -465,12 +518,17 @@ mod tests {
trace!("start");
let (packet_s, packet_r) = unbounded();
let (verified_s, verified_r) = unbounded();
let verifier = TransactionSigVerifier::default();
let stage = SigVerifyStage::new(packet_r, verified_s, verifier, "test");
let verifier = TransactionSigVerifier::new(verified_s);
let stage = SigVerifyStage::new(packet_r, verifier, "test");
let use_same_tx = true;
let now = Instant::now();
let mut batches = gen_batches(use_same_tx);
let packets_per_batch = 128;
let total_packets = 1920;
// This is important so that we don't discard any packets and fail asserts below about
// `total_excess_tracer_packets`
assert!(total_packets < MAX_SIGVERIFY_BATCH);
let mut batches = gen_batches(use_same_tx, packets_per_batch, total_packets);
trace!(
"starting... generation took: {} ms batches: {}",
duration_as_ms(&now.elapsed()),
@ -479,25 +537,72 @@ mod tests {
let mut sent_len = 0;
for _ in 0..batches.len() {
if let Some(batch) = batches.pop() {
if let Some(mut batch) = batches.pop() {
sent_len += batch.len();
batch
.iter_mut()
.for_each(|packet| packet.meta.flags |= PacketFlags::TRACER_PACKET);
assert_eq!(batch.len(), packets_per_batch);
packet_s.send(vec![batch]).unwrap();
}
}
let mut received = 0;
let mut total_tracer_packets_received_in_sigverify_stage = 0;
trace!("sent: {}", sent_len);
loop {
if let Ok(mut verifieds) = verified_r.recv_timeout(Duration::from_millis(10)) {
if let Ok((mut verifieds, tracer_packet_stats_option)) = verified_r.recv() {
let tracer_packet_stats = tracer_packet_stats_option.unwrap();
total_tracer_packets_received_in_sigverify_stage +=
tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage;
assert_eq!(
tracer_packet_stats.total_tracer_packets_received_in_sigverify_stage
% packets_per_batch,
0,
);
if use_same_tx {
// Every transaction other than the very first one in the very first batch
// should be deduped.
// Also have to account for the fact that deduper could be cleared periodically,
// in which case the first transaction in the next batch won't be deduped
assert!(
(tracer_packet_stats.total_tracer_packets_deduped
== tracer_packet_stats
.total_tracer_packets_received_in_sigverify_stage
- 1)
|| (tracer_packet_stats.total_tracer_packets_deduped
== tracer_packet_stats
.total_tracer_packets_received_in_sigverify_stage)
);
assert!(
(tracer_packet_stats.total_tracker_packets_passed_sigverify == 1)
|| (tracer_packet_stats.total_tracker_packets_passed_sigverify == 0)
);
} else {
assert_eq!(tracer_packet_stats.total_tracer_packets_deduped, 0);
assert!(
(tracer_packet_stats.total_tracker_packets_passed_sigverify
== tracer_packet_stats
.total_tracer_packets_received_in_sigverify_stage)
);
}
assert_eq!(tracer_packet_stats.total_excess_tracer_packets, 0);
while let Some(v) = verifieds.pop() {
received += v.len();
batches.push(v);
}
if use_same_tx || received >= sent_len {
break;
}
}
if total_tracer_packets_received_in_sigverify_stage >= sent_len {
break;
}
}
trace!("received: {}", received);
assert_eq!(
total_tracer_packets_received_in_sigverify_stage,
total_packets
);
drop(packet_s);
stage.join().unwrap();
}

View File

@ -159,22 +159,17 @@ impl Tpu {
.unwrap();
let sigverify_stage = {
let verifier = TransactionSigVerifier::default();
SigVerifyStage::new(
find_packet_sender_stake_receiver,
verified_sender,
verifier,
"tpu-verifier",
)
let verifier = TransactionSigVerifier::new(verified_sender);
SigVerifyStage::new(find_packet_sender_stake_receiver, verifier, "tpu-verifier")
};
let (verified_tpu_vote_packets_sender, verified_tpu_vote_packets_receiver) = unbounded();
let vote_sigverify_stage = {
let verifier = TransactionSigVerifier::new_reject_non_vote();
let verifier =
TransactionSigVerifier::new_reject_non_vote(verified_tpu_vote_packets_sender);
SigVerifyStage::new(
vote_find_packet_sender_stake_receiver,
verified_tpu_vote_packets_sender,
verifier,
"tpu-vote-verifier",
)

View File

@ -161,8 +161,11 @@ impl Tvu {
let (verified_sender, verified_receiver) = unbounded();
let sigverify_stage = SigVerifyStage::new(
fetch_receiver,
verified_sender,
ShredSigVerifier::new(bank_forks.clone(), leader_schedule_cache.clone()),
ShredSigVerifier::new(
bank_forks.clone(),
leader_schedule_cache.clone(),
verified_sender,
),
"shred-verifier",
);

View File

@ -26,7 +26,7 @@ fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>)
// verify packets
let mut deduper = sigverify::Deduper::new(1_000_000, Duration::from_millis(2_000));
bencher.iter(|| {
let _ans = deduper.dedup_packets_and_count_discards(&mut batches);
let _ans = deduper.dedup_packets_and_count_discards(&mut batches, |_, _, _| ());
deduper.reset();
batches
.iter_mut()

View File

@ -79,6 +79,6 @@ fn bench_shrink_count_packets(bencher: &mut Bencher) {
});
bencher.iter(|| {
let _ = sigverify::count_valid_packets(&batches);
let _ = sigverify::count_valid_packets(&batches, |_| ());
});
}

View File

@ -19,6 +19,7 @@ use {
hash::Hash,
message::{MESSAGE_HEADER_LENGTH, MESSAGE_VERSION_PREFIX},
pubkey::Pubkey,
saturating_add_assign,
short_vec::decode_shortu16_len,
signature::Signature,
},
@ -152,10 +153,10 @@ fn verify_packet(packet: &mut Packet, reject_non_vote: bool) {
}
// Check for tracer pubkey
if !packet.meta.is_tracer_tx()
if !packet.meta.is_tracer_packet()
&& &packet.data[pubkey_start..pubkey_end] == TRACER_KEY.as_ref()
{
packet.meta.flags |= PacketFlags::TRACER_TX;
packet.meta.flags |= PacketFlags::TRACER_PACKET;
}
pubkey_start = pubkey_end;
@ -167,10 +168,24 @@ pub fn count_packets_in_batches(batches: &[PacketBatch]) -> usize {
batches.iter().map(|batch| batch.len()).sum()
}
pub fn count_valid_packets(batches: &[PacketBatch]) -> usize {
pub fn count_valid_packets(
batches: &[PacketBatch],
mut process_valid_packet: impl FnMut(&Packet),
) -> usize {
batches
.iter()
.map(|batch| batch.iter().filter(|p| !p.meta.discard()).count())
.map(|batch| {
batch
.iter()
.filter(|p| {
let should_keep = !p.meta.discard();
if should_keep {
process_valid_packet(p);
}
should_keep
})
.count()
})
.sum()
}
@ -495,11 +510,23 @@ impl Deduper {
0
}
pub fn dedup_packets_and_count_discards(&self, batches: &mut [PacketBatch]) -> u64 {
batches
.iter_mut()
.flat_map(|batch| batch.iter_mut().map(|p| self.dedup_packet(p)))
.sum()
pub fn dedup_packets_and_count_discards(
&self,
batches: &mut [PacketBatch],
mut process_received_packet: impl FnMut(&mut Packet, bool, bool),
) -> u64 {
let mut num_removed: u64 = 0;
batches.iter_mut().for_each(|batch| {
batch.iter_mut().for_each(|p| {
let removed_before_sigverify = p.meta.discard();
let is_duplicate = self.dedup_packet(p);
if is_duplicate == 1 {
saturating_add_assign!(num_removed, 1);
}
process_received_packet(p, removed_before_sigverify, is_duplicate == 1);
})
});
num_removed
}
}
@ -1401,7 +1428,14 @@ mod tests {
to_packet_batches(&std::iter::repeat(tx).take(1024).collect::<Vec<_>>(), 128);
let packet_count = sigverify::count_packets_in_batches(&batches);
let filter = Deduper::new(1_000_000, Duration::from_millis(0));
let discard = filter.dedup_packets_and_count_discards(&mut batches) as usize;
let mut num_deduped = 0;
let discard = filter.dedup_packets_and_count_discards(
&mut batches,
|_deduped_packet, _removed_before_sigverify_stage, _is_dup| {
num_deduped += 1;
},
) as usize;
assert_eq!(num_deduped, discard + 1);
assert_eq!(packet_count, discard + 1);
}
@ -1409,8 +1443,7 @@ mod tests {
fn test_dedup_diff() {
let mut filter = Deduper::new(1_000_000, Duration::from_millis(0));
let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
let discard = filter.dedup_packets_and_count_discards(&mut batches) as usize;
let discard = filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
// because dedup uses a threadpool, there maybe up to N threads of txs that go through
assert_eq!(discard, 0);
filter.reset();
@ -1428,7 +1461,7 @@ mod tests {
for i in 0..1000 {
let mut batches =
to_packet_batches(&(0..1000).map(|_| test_tx()).collect::<Vec<_>>(), 128);
discard += filter.dedup_packets_and_count_discards(&mut batches) as usize;
discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
debug!("{} {}", i, discard);
if filter.saturated.load(Ordering::Relaxed) {
break;
@ -1444,7 +1477,7 @@ mod tests {
for i in 0..10 {
let mut batches =
to_packet_batches(&(0..1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
discard += filter.dedup_packets_and_count_discards(&mut batches) as usize;
discard += filter.dedup_packets_and_count_discards(&mut batches, |_, _, _| ()) as usize;
debug!("false positive rate: {}/{}", discard, i * 1024);
}
//allow for 1 false positive even if extremely unlikely
@ -1473,7 +1506,7 @@ mod tests {
});
start.sort_by_key(|p| p.data);
let packet_count = count_valid_packets(&batches);
let packet_count = count_valid_packets(&batches, |_| ());
let res = shrink_batches(&mut batches);
batches.truncate(res);
@ -1485,7 +1518,7 @@ mod tests {
.for_each(|p| end.push(p.clone()))
});
end.sort_by_key(|p| p.data);
let packet_count2 = count_valid_packets(&batches);
let packet_count2 = count_valid_packets(&batches, |_| ());
assert_eq!(packet_count, packet_count2);
assert_eq!(start, end);
}
@ -1642,13 +1675,13 @@ mod tests {
PACKETS_PER_BATCH,
);
assert_eq!(batches.len(), BATCH_COUNT);
assert_eq!(count_valid_packets(&batches), PACKET_COUNT);
assert_eq!(count_valid_packets(&batches, |_| ()), PACKET_COUNT);
batches.iter_mut().enumerate().for_each(|(i, b)| {
b.iter_mut()
.enumerate()
.for_each(|(j, p)| p.meta.set_discard(set_discard(i, j)))
});
assert_eq!(count_valid_packets(&batches), *expect_valid_packets);
assert_eq!(count_valid_packets(&batches, |_| ()), *expect_valid_packets);
debug!("show valid packets for case {}", i);
batches.iter_mut().enumerate().for_each(|(i, b)| {
b.iter_mut().enumerate().for_each(|(j, p)| {
@ -1662,7 +1695,7 @@ mod tests {
debug!("shrunk batch test {} count: {}", i, shrunken_batch_count);
assert_eq!(shrunken_batch_count, *expect_batch_count);
batches.truncate(shrunken_batch_count);
assert_eq!(count_valid_packets(&batches), *expect_valid_packets);
assert_eq!(count_valid_packets(&batches, |_| ()), *expect_valid_packets);
}
}
}

View File

@ -21,7 +21,7 @@ bitflags! {
const FORWARDED = 0b0000_0010;
const REPAIR = 0b0000_0100;
const SIMPLE_VOTE_TX = 0b0000_1000;
const TRACER_TX = 0b0001_0000;
const TRACER_PACKET = 0b0001_0000;
}
}
@ -148,8 +148,8 @@ impl Meta {
}
#[inline]
pub fn is_tracer_tx(&self) -> bool {
self.flags.contains(PacketFlags::TRACER_TX)
pub fn is_tracer_packet(&self) -> bool {
self.flags.contains(PacketFlags::TRACER_PACKET)
}
}