decouples shreds sig-verify from tpu vote and transaction packets (#26300)

Shreds have different workload and traffic pattern from TPU vote and
transaction packets. Some of recent changes to SigVerifyStage are not
suitable or at least optimal for shreds sig-verify; e.g. random discard,
dedup with false positives, discard excess by IP-address, ...

SigVerifier trait is meant to abstract out the distinctions between the
two pipelines, but in practice it has led to more verbose and convoluted
code.

This commit discards SigVerifier implementation for shreds sig-verify
and instead provides a standalone stage for verifying shreds signatures.
This commit is contained in:
behzad nouri 2022-07-07 11:13:13 +00:00 committed by GitHub
parent 9723a33d2f
commit 6f4838719b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 193 additions and 127 deletions

View File

@ -28,7 +28,7 @@ impl ShredFetchStage {
// updates packets received on a channel and sends them on another channel
fn modify_packets(
recvr: PacketBatchReceiver,
sendr: Sender<Vec<PacketBatch>>,
sendr: Sender<PacketBatch>,
bank_forks: &RwLock<BankForks>,
shred_version: u16,
name: &'static str,
@ -46,7 +46,7 @@ impl ShredFetchStage {
let mut stats = ShredFetchStats::default();
let mut packet_hasher = PacketHasher::default();
while let Some(mut packet_batch) = recvr.iter().next() {
for mut packet_batch in recvr {
if last_updated.elapsed().as_millis() as u64 > DEFAULT_MS_PER_SLOT {
last_updated = Instant::now();
packet_hasher.reset();
@ -79,7 +79,7 @@ impl ShredFetchStage {
}
}
stats.maybe_submit(name, STATS_SUBMIT_CADENCE);
if sendr.send(vec![packet_batch]).is_err() {
if sendr.send(packet_batch).is_err() {
break;
}
}
@ -88,7 +88,7 @@ impl ShredFetchStage {
fn packet_modifier(
sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
sender: Sender<Vec<PacketBatch>>,
sender: Sender<PacketBatch>,
recycler: PacketBatchRecycler,
bank_forks: Arc<RwLock<BankForks>>,
shred_version: u16,
@ -132,7 +132,7 @@ impl ShredFetchStage {
sockets: Vec<Arc<UdpSocket>>,
forward_sockets: Vec<Arc<UdpSocket>>,
repair_socket: Arc<UdpSocket>,
sender: Sender<Vec<PacketBatch>>,
sender: Sender<PacketBatch>,
shred_version: u16,
bank_forks: Arc<RwLock<BankForks>>,
exit: &Arc<AtomicBool>,

View File

@ -1,11 +1,5 @@
#![allow(clippy::implicit_hasher)]
use {
crate::{
sigverify,
sigverify_stage::{SigVerifier, SigVerifyServiceError},
},
crossbeam_channel::Sender,
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
},
@ -18,87 +12,115 @@ use {
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
};
#[derive(Clone)]
pub struct ShredSigVerifier {
pubkey: Pubkey, // TODO: Hot swap will change pubkey.
#[allow(clippy::enum_variant_names)]
enum Error {
RecvDisconnected,
RecvTimeout,
SendError,
}
pub(crate) fn spawn_shred_sigverify(
// TODO: Hot swap will change pubkey.
self_pubkey: Pubkey,
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
recycler_cache: RecyclerCache,
shred_fetch_receiver: Receiver<PacketBatch>,
retransmit_sender: Sender<Vec</*shred:*/ Vec<u8>>>,
packet_sender: Sender<Vec<PacketBatch>>,
verified_sender: Sender<Vec<PacketBatch>>,
turbine_disabled: Arc<AtomicBool>,
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
Builder::new()
.name("shred-verifier".to_string())
.spawn(move || loop {
match run_shred_sigverify(
&self_pubkey,
&bank_forks,
&leader_schedule_cache,
&recycler_cache,
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
&turbine_disabled,
&mut stats,
) {
Ok(()) => (),
Err(Error::RecvTimeout) => (),
Err(Error::RecvDisconnected) => break,
Err(Error::SendError) => break,
}
stats.maybe_submit();
})
.unwrap()
}
impl ShredSigVerifier {
pub fn new(
pubkey: Pubkey,
bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
retransmit_sender: Sender<Vec</*shred:*/ Vec<u8>>>,
packet_sender: Sender<Vec<PacketBatch>>,
turbine_disabled: Arc<AtomicBool>,
) -> Self {
sigverify::init();
Self {
pubkey,
bank_forks,
leader_schedule_cache,
recycler_cache: RecyclerCache::warmed(),
retransmit_sender,
packet_sender,
turbine_disabled,
}
}
}
impl SigVerifier for ShredSigVerifier {
type SendType = Vec<PacketBatch>;
fn send_packets(
&mut self,
packet_batches: Vec<PacketBatch>,
) -> Result<(), SigVerifyServiceError<Self::SendType>> {
if self.turbine_disabled.load(Ordering::Relaxed) {
return Ok(());
}
// Exclude repair packets from retransmit.
// TODO: return the error here!
let _ = self.retransmit_sender.send(
packet_batches
.iter()
.flat_map(PacketBatch::iter)
.filter(|packet| !packet.meta.discard() && !packet.meta.repair())
.filter_map(shred::layout::get_shred)
.map(<[u8]>::to_vec)
.collect(),
);
self.packet_sender.send(packet_batches)?;
Ok(())
}
fn verify_batches(
&self,
mut batches: Vec<PacketBatch>,
_valid_packets: usize,
) -> Vec<PacketBatch> {
let working_bank = self.bank_forks.read().unwrap().working_bank();
let leader_slots: HashMap<Slot, [u8; 32]> = get_slot_leaders(
&self.pubkey,
&mut batches,
&self.leader_schedule_cache,
&working_bank,
)
.into_iter()
.filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes())))
.chain(std::iter::once((Slot::MAX, [0u8; 32])))
fn run_shred_sigverify(
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
turbine_disabled: &AtomicBool,
stats: &mut ShredSigVerifyStats,
) -> Result<(), Error> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let packets = shred_fetch_receiver.recv_timeout(RECV_TIMEOUT)?;
let mut packets: Vec<_> = std::iter::once(packets)
.chain(shred_fetch_receiver.try_iter())
.collect();
let r = verify_shreds_gpu(&batches, &leader_slots, &self.recycler_cache);
solana_perf::sigverify::mark_disabled(&mut batches, &r);
batches
let now = Instant::now();
stats.num_iters += 1;
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
stats.num_discards_pre += count_discards(&packets);
verify_packets(
self_pubkey,
bank_forks,
leader_schedule_cache,
recycler_cache,
&mut packets,
);
stats.num_discards_post += count_discards(&packets);
// Exclude repair packets from retransmit.
let shreds: Vec<_> = packets
.iter()
.flat_map(PacketBatch::iter)
.filter(|packet| !packet.meta.discard() && !packet.meta.repair())
.filter_map(shred::layout::get_shred)
.map(<[u8]>::to_vec)
.collect();
stats.num_retransmit_shreds += shreds.len();
if !turbine_disabled.load(Ordering::Relaxed) {
retransmit_sender.send(shreds)?;
verified_sender.send(packets)?;
}
stats.elapsed_micros += now.elapsed().as_micros() as u64;
Ok(())
}
fn verify_packets(
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
recycler_cache: &RecyclerCache,
packets: &mut [PacketBatch],
) {
let working_bank = bank_forks.read().unwrap().working_bank();
let leader_slots: HashMap<Slot, [u8; 32]> =
get_slot_leaders(self_pubkey, packets, leader_schedule_cache, &working_bank)
.into_iter()
.filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes())))
.chain(std::iter::once((Slot::MAX, [0u8; 32])))
.collect();
let out = verify_shreds_gpu(packets, &leader_slots, recycler_cache);
solana_perf::sigverify::mark_disabled(packets, &out);
}
// Returns pubkey of leaders for shred slots refrenced in the packets.
@ -139,11 +161,75 @@ fn get_slot_leaders(
leaders
}
fn count_discards(packets: &[PacketBatch]) -> usize {
packets
.iter()
.flat_map(PacketBatch::iter)
.filter(|packet| packet.meta.discard())
.count()
}
impl From<RecvTimeoutError> for Error {
fn from(err: RecvTimeoutError) -> Self {
match err {
RecvTimeoutError::Timeout => Self::RecvTimeout,
RecvTimeoutError::Disconnected => Self::RecvDisconnected,
}
}
}
impl<T> From<SendError<T>> for Error {
fn from(_: SendError<T>) -> Self {
Self::SendError
}
}
struct ShredSigVerifyStats {
since: Instant,
num_iters: usize,
num_packets: usize,
num_discards_pre: usize,
num_discards_post: usize,
num_retransmit_shreds: usize,
elapsed_micros: u64,
}
impl ShredSigVerifyStats {
const METRICS_SUBMIT_CADENCE: Duration = Duration::from_secs(2);
fn new(now: Instant) -> Self {
Self {
since: now,
num_iters: 0usize,
num_packets: 0usize,
num_discards_pre: 0usize,
num_discards_post: 0usize,
num_retransmit_shreds: 0usize,
elapsed_micros: 0u64,
}
}
fn maybe_submit(&mut self) {
if self.since.elapsed() <= Self::METRICS_SUBMIT_CADENCE {
return;
}
datapoint_info!(
"shred_sigverify",
("num_iters", self.num_iters, i64),
("num_packets", self.num_packets, i64),
("num_discards_pre", self.num_discards_pre, i64),
("num_discards_post", self.num_discards_post, i64),
("num_retransmit_shreds", self.num_retransmit_shreds, i64),
("elapsed_micros", self.elapsed_micros, i64),
);
*self = Self::new(Instant::now());
}
}
#[cfg(test)]
pub mod tests {
mod tests {
use {
super::*,
crossbeam_channel::unbounded,
solana_ledger::{
genesis_utils::create_genesis_config_with_leader,
shred::{Shred, ShredFlags},
@ -160,18 +246,8 @@ pub mod tests {
let bank = Bank::new_for_tests(
&create_genesis_config_with_leader(100, &leader_pubkey, 10).genesis_config,
);
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let bf = Arc::new(RwLock::new(BankForks::new(bank)));
let (sender, receiver) = unbounded();
let (retransmit_sender, _retransmit_receiver) = unbounded();
let mut verifier = ShredSigVerifier::new(
Pubkey::new_unique(),
bf,
cache,
retransmit_sender,
sender,
Arc::<AtomicBool>::default(), // turbine_disabled
);
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
let bank_forks = RwLock::new(BankForks::new(bank));
let batch_size = 2;
let mut batch = PacketBatch::with_capacity(batch_size);
batch.resize(batch_size, Packet::default());
@ -206,20 +282,14 @@ pub mod tests {
batches[0][1].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload());
batches[0][1].meta.size = shred.payload().len();
let num_packets = solana_perf::sigverify::count_packets_in_batches(&batches);
let rv = verifier.verify_batches(batches, num_packets);
assert!(!rv[0][0].meta.discard());
assert!(rv[0][1].meta.discard());
verifier.send_packets(rv.clone()).unwrap();
let received_packets = receiver.recv().unwrap();
assert_eq!(received_packets.len(), rv.len());
for (received_packet_batch, original_packet_batch) in received_packets.iter().zip(rv.iter())
{
assert_eq!(
received_packet_batch.iter().collect::<Vec<_>>(),
original_packet_batch.iter().collect::<Vec<_>>()
);
}
verify_packets(
&Pubkey::new_unique(), // self_pubkey
&bank_forks,
&leader_schedule_cache,
&RecyclerCache::warmed(),
&mut batches,
);
assert!(!batches[0][0].meta.discard());
assert!(batches[0][1].meta.discard());
}
}

View File

@ -20,8 +20,7 @@ use {
retransmit_stage::RetransmitStage,
rewards_recorder_service::RewardsRecorderSender,
shred_fetch_stage::ShredFetchStage,
sigverify_shreds::ShredSigVerifier,
sigverify_stage::SigVerifyStage,
sigverify_shreds,
tower_storage::TowerStorage,
validator::ProcessBlockStore,
voting_service::VotingService,
@ -56,13 +55,13 @@ use {
collections::HashSet,
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock},
thread,
thread::{self, JoinHandle},
},
};
pub struct Tvu {
fetch_stage: ShredFetchStage,
sigverify_stage: SigVerifyStage,
shred_sigverify: JoinHandle<()>,
retransmit_stage: RetransmitStage,
window_service: WindowService,
cluster_slots_service: ClusterSlotsService,
@ -163,17 +162,14 @@ impl Tvu {
let (verified_sender, verified_receiver) = unbounded();
let (retransmit_sender, retransmit_receiver) = unbounded();
let sigverify_stage = SigVerifyStage::new(
let shred_sigverify = sigverify_shreds::spawn_shred_sigverify(
cluster_info.id(),
bank_forks.clone(),
leader_schedule_cache.clone(),
fetch_receiver,
ShredSigVerifier::new(
cluster_info.id(),
bank_forks.clone(),
leader_schedule_cache.clone(),
retransmit_sender.clone(),
verified_sender,
turbine_disabled,
),
"shred-verifier",
retransmit_sender.clone(),
verified_sender,
turbine_disabled,
);
let retransmit_stage = RetransmitStage::new(
@ -319,7 +315,7 @@ impl Tvu {
Tvu {
fetch_stage,
sigverify_stage,
shred_sigverify,
retransmit_stage,
window_service,
cluster_slots_service,
@ -338,7 +334,7 @@ impl Tvu {
self.window_service.join()?;
self.cluster_slots_service.join()?;
self.fetch_stage.join()?;
self.sigverify_stage.join()?;
self.shred_sigverify.join()?;
if self.ledger_cleanup_service.is_some() {
self.ledger_cleanup_service.unwrap().join()?;
}