discards shreds in sigverify if the slot leader is the node itself (#26229)

Shreds are dropped in window-service if the slot leader is the node
itself:
https://github.com/solana-labs/solana/blob/cd2878acf/core/src/window_service.rs#L181-L185

However this is done after wasting resources verifying signature on
these shreds, and requires a redundant 2nd lookup of the slot leader.

This commit instead discards such shreds in sigverify stage where we
already know the leader for the slot.
This commit is contained in:
behzad nouri 2022-06-27 20:12:23 +00:00 committed by GitHub
parent 2cc48a650b
commit 39ca788b95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 186 deletions

View File

@ -453,7 +453,6 @@ impl RetransmitStage {
exit.clone(), exit.clone(),
); );
let leader_schedule_cache_clone = leader_schedule_cache.clone();
let repair_info = RepairInfo { let repair_info = RepairInfo {
bank_forks, bank_forks,
epoch_schedule, epoch_schedule,
@ -471,18 +470,12 @@ impl RetransmitStage {
exit, exit,
repair_info, repair_info,
leader_schedule_cache, leader_schedule_cache,
move |id, shred, working_bank, last_root| { move |shred, last_root| {
let turbine_disabled = turbine_disabled let turbine_disabled = turbine_disabled
.as_ref() .as_ref()
.map(|x| x.load(Ordering::Relaxed)) .map(|x| x.load(Ordering::Relaxed))
.unwrap_or(false); .unwrap_or(false);
let rv = should_retransmit_and_persist( let rv = should_retransmit_and_persist(shred, last_root);
shred,
working_bank,
&leader_schedule_cache_clone,
id,
last_root,
);
rv && !turbine_disabled rv && !turbine_disabled
}, },
verified_vote_receiver, verified_vote_receiver,

View File

@ -10,16 +10,17 @@ use {
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu, leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
}, },
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache}, solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_runtime::bank_forks::BankForks, solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::clock::Slot, solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{ std::{
collections::{HashMap, HashSet}, collections::HashMap,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}, },
}; };
#[derive(Clone)] #[derive(Clone)]
pub struct ShredSigVerifier { pub struct ShredSigVerifier {
pubkey: Pubkey, // TODO: Hot swap will change pubkey.
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>, leader_schedule_cache: Arc<LeaderScheduleCache>,
recycler_cache: RecyclerCache, recycler_cache: RecyclerCache,
@ -28,27 +29,20 @@ pub struct ShredSigVerifier {
impl ShredSigVerifier { impl ShredSigVerifier {
pub fn new( pub fn new(
pubkey: Pubkey,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
leader_schedule_cache: Arc<LeaderScheduleCache>, leader_schedule_cache: Arc<LeaderScheduleCache>,
packet_sender: Sender<Vec<PacketBatch>>, packet_sender: Sender<Vec<PacketBatch>>,
) -> Self { ) -> Self {
sigverify::init(); sigverify::init();
Self { Self {
pubkey,
bank_forks, bank_forks,
leader_schedule_cache, leader_schedule_cache,
recycler_cache: RecyclerCache::warmed(), recycler_cache: RecyclerCache::warmed(),
packet_sender, packet_sender,
} }
} }
fn read_slots(batches: &[PacketBatch]) -> HashSet<Slot> {
batches
.iter()
.flat_map(PacketBatch::iter)
.filter(|packet| !packet.meta.discard())
.filter_map(shred::layout::get_shred)
.filter_map(shred::layout::get_slot)
.collect()
}
} }
impl SigVerifier for ShredSigVerifier { impl SigVerifier for ShredSigVerifier {
@ -67,25 +61,61 @@ impl SigVerifier for ShredSigVerifier {
mut batches: Vec<PacketBatch>, mut batches: Vec<PacketBatch>,
_valid_packets: usize, _valid_packets: usize,
) -> Vec<PacketBatch> { ) -> Vec<PacketBatch> {
let r_bank = self.bank_forks.read().unwrap().working_bank(); let working_bank = self.bank_forks.read().unwrap().working_bank();
let slots: HashSet<u64> = Self::read_slots(&batches); let leader_slots: HashMap<Slot, [u8; 32]> = get_slot_leaders(
let mut leader_slots: HashMap<u64, [u8; 32]> = slots &self.pubkey,
.into_iter() &mut batches,
.filter_map(|slot| { &self.leader_schedule_cache,
let key = self &working_bank,
.leader_schedule_cache )
.slot_leader_at(slot, Some(&r_bank))?; .into_iter()
Some((slot, key.to_bytes())) .filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes())))
}) .chain(std::iter::once((Slot::MAX, [0u8; 32])))
.collect(); .collect();
leader_slots.insert(std::u64::MAX, [0u8; 32]);
let r = verify_shreds_gpu(&batches, &leader_slots, &self.recycler_cache); let r = verify_shreds_gpu(&batches, &leader_slots, &self.recycler_cache);
solana_perf::sigverify::mark_disabled(&mut batches, &r); solana_perf::sigverify::mark_disabled(&mut batches, &r);
batches batches
} }
} }
// Returns pubkey of leaders for shred slots refrenced in the packets.
// Marks packets as discard if:
// - fails to deserialize the shred slot.
// - slot leader is unknown.
// - slot leader is the node itself (circular transmission).
fn get_slot_leaders(
self_pubkey: &Pubkey,
batches: &mut [PacketBatch],
leader_schedule_cache: &LeaderScheduleCache,
bank: &Bank,
) -> HashMap<Slot, Option<Pubkey>> {
let mut leaders = HashMap::<Slot, Option<Pubkey>>::new();
for batch in batches {
for packet in batch.iter_mut() {
if packet.meta.discard() {
continue;
}
let shred = shred::layout::get_shred(packet);
let slot = match shred.and_then(shred::layout::get_slot) {
None => {
packet.meta.set_discard(true);
continue;
}
Some(slot) => slot,
};
let leader = leaders.entry(slot).or_insert_with(|| {
let leader = leader_schedule_cache.slot_leader_at(slot, Some(bank))?;
// Discard the shred if the slot leader is the node itself.
(&leader != self_pubkey).then(|| leader)
});
if leader.is_none() {
packet.meta.set_discard(true);
}
}
}
leaders
}
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use { use {
@ -100,50 +130,6 @@ pub mod tests {
solana_sdk::signature::{Keypair, Signer}, solana_sdk::signature::{Keypair, Signer},
}; };
#[test]
fn test_sigverify_shreds_read_slots() {
solana_logger::setup();
let mut shred = Shred::new_from_data(
0xdead_c0de,
0xc0de,
0xdead,
&[1, 2, 3, 4],
ShredFlags::LAST_SHRED_IN_SLOT,
0,
0,
0xc0de,
);
let mut batches: Vec<_> = (0..2)
.map(|_| {
let mut batch = PacketBatch::with_capacity(1);
batch.resize(1, Packet::default());
batch
})
.collect();
let keypair = Keypair::new();
shred.sign(&keypair);
batches[0][0].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload());
batches[0][0].meta.size = shred.payload().len();
let mut shred = Shred::new_from_data(
0xc0de_dead,
0xc0de,
0xdead,
&[1, 2, 3, 4],
ShredFlags::LAST_SHRED_IN_SLOT,
0,
0,
0xc0de,
);
shred.sign(&keypair);
batches[1][0].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload());
batches[1][0].meta.size = shred.payload().len();
let expected: HashSet<u64> = [0xc0de_dead, 0xdead_c0de].iter().cloned().collect();
assert_eq!(ShredSigVerifier::read_slots(&batches), expected);
}
#[test] #[test]
fn test_sigverify_shreds_verify_batches() { fn test_sigverify_shreds_verify_batches() {
let leader_keypair = Arc::new(Keypair::new()); let leader_keypair = Arc::new(Keypair::new());
@ -154,7 +140,7 @@ pub mod tests {
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let bf = Arc::new(RwLock::new(BankForks::new(bank))); let bf = Arc::new(RwLock::new(BankForks::new(bank)));
let (sender, receiver) = unbounded(); let (sender, receiver) = unbounded();
let mut verifier = ShredSigVerifier::new(bf, cache, sender); let mut verifier = ShredSigVerifier::new(Pubkey::new_unique(), bf, cache, sender);
let batch_size = 2; let batch_size = 2;
let mut batch = PacketBatch::with_capacity(batch_size); let mut batch = PacketBatch::with_capacity(batch_size);

View File

@ -160,6 +160,7 @@ impl Tvu {
let sigverify_stage = SigVerifyStage::new( let sigverify_stage = SigVerifyStage::new(
fetch_receiver, fetch_receiver,
ShredSigVerifier::new( ShredSigVerifier::new(
cluster_info.id(),
bank_forks.clone(), bank_forks.clone(),
leader_schedule_cache.clone(), leader_schedule_cache.clone(),
verified_sender, verified_sender,

View File

@ -22,7 +22,6 @@ use {
solana_metrics::{inc_new_counter_debug, inc_new_counter_error}, solana_metrics::{inc_new_counter_debug, inc_new_counter_error},
solana_perf::packet::{Packet, PacketBatch}, solana_perf::packet::{Packet, PacketBatch},
solana_rayon_threadlimit::get_thread_count, solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, pubkey::Pubkey}, solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{ std::{
cmp::Reverse, cmp::Reverse,
@ -171,26 +170,11 @@ fn verify_shred_slot(shred: &Shred, root: u64) -> bool {
/// drop shreds that are from myself or not from the correct leader for the /// drop shreds that are from myself or not from the correct leader for the
/// shred's slot /// shred's slot
pub(crate) fn should_retransmit_and_persist( pub(crate) fn should_retransmit_and_persist(shred: &Shred, root: u64) -> bool {
shred: &Shred, if verify_shred_slot(shred, root) {
bank: Option<Arc<Bank>>, true
leader_schedule_cache: &LeaderScheduleCache,
my_pubkey: &Pubkey,
root: u64,
) -> bool {
let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), bank.as_deref());
if let Some(leader_id) = slot_leader_pubkey {
if leader_id == *my_pubkey {
inc_new_counter_debug!("streamer-recv_window-circular_transmission", 1);
false
} else if !verify_shred_slot(shred, root) {
inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1);
false
} else {
true
}
} else { } else {
inc_new_counter_debug!("streamer-recv_window-unknown_leader", 1); inc_new_counter_debug!("streamer-recv_window-outdated_transmission", 1);
false false
} }
} }
@ -331,7 +315,6 @@ where
fn recv_window<F>( fn recv_window<F>(
blockstore: &Blockstore, blockstore: &Blockstore,
bank_forks: &RwLock<BankForks>,
insert_shred_sender: &Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>, insert_shred_sender: &Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: &Receiver<Vec<PacketBatch>>, verified_receiver: &Receiver<Vec<PacketBatch>>,
retransmit_sender: &Sender<Vec<Shred>>, retransmit_sender: &Sender<Vec<Shred>>,
@ -340,14 +323,13 @@ fn recv_window<F>(
stats: &mut ReceiveWindowStats, stats: &mut ReceiveWindowStats,
) -> Result<()> ) -> Result<()>
where where
F: Fn(&Shred, Arc<Bank>, /*last root:*/ Slot) -> bool + Sync, F: Fn(&Shred, /*last root:*/ Slot) -> bool + Sync,
{ {
let timer = Duration::from_millis(200); let timer = Duration::from_millis(200);
let mut packet_batches = verified_receiver.recv_timeout(timer)?; let mut packet_batches = verified_receiver.recv_timeout(timer)?;
packet_batches.extend(verified_receiver.try_iter().flatten()); packet_batches.extend(verified_receiver.try_iter().flatten());
let now = Instant::now(); let now = Instant::now();
let last_root = blockstore.last_root(); let last_root = blockstore.last_root();
let working_bank = bank_forks.read().unwrap().working_bank();
let handle_packet = |packet: &Packet| { let handle_packet = |packet: &Packet| {
if packet.meta.discard() { if packet.meta.discard() {
inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1); inc_new_counter_debug!("streamer-recv_window-invalid_or_unnecessary_packet", 1);
@ -355,7 +337,7 @@ where
} }
let serialized_shred = packet.data(..)?.to_vec(); let serialized_shred = packet.data(..)?.to_vec();
let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?; let shred = Shred::new_from_serialized_shred(serialized_shred).ok()?;
if !shred_filter(&shred, working_bank.clone(), last_root) { if !shred_filter(&shred, last_root) {
return None; return None;
} }
if packet.meta.repair() { if packet.meta.repair() {
@ -456,13 +438,12 @@ impl WindowService {
) -> WindowService ) -> WindowService
where where
F: 'static F: 'static
+ Fn(&Pubkey, &Shred, Option<Arc<Bank>>, /*last root:*/ Slot) -> bool + Fn(&Shred, /*last root:*/ Slot) -> bool
+ std::marker::Send + std::marker::Send
+ std::marker::Sync, + std::marker::Sync,
{ {
let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default(); let outstanding_requests = Arc::<RwLock<OutstandingShredRepairs>>::default();
let bank_forks = repair_info.bank_forks.clone();
let cluster_info = repair_info.cluster_info.clone(); let cluster_info = repair_info.cluster_info.clone();
let id = cluster_info.id(); let id = cluster_info.id();
@ -506,7 +487,6 @@ impl WindowService {
insert_sender, insert_sender,
verified_receiver, verified_receiver,
shred_filter, shred_filter,
bank_forks,
retransmit_sender, retransmit_sender,
); );
@ -616,14 +596,10 @@ impl WindowService {
insert_sender: Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>, insert_sender: Sender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: Receiver<Vec<PacketBatch>>, verified_receiver: Receiver<Vec<PacketBatch>>,
shred_filter: F, shred_filter: F,
bank_forks: Arc<RwLock<BankForks>>,
retransmit_sender: Sender<Vec<Shred>>, retransmit_sender: Sender<Vec<Shred>>,
) -> JoinHandle<()> ) -> JoinHandle<()>
where where
F: 'static F: 'static + Fn(&Shred, u64) -> bool + std::marker::Send + std::marker::Sync,
+ Fn(&Pubkey, &Shred, Option<Arc<Bank>>, u64) -> bool
+ std::marker::Send
+ std::marker::Sync,
{ {
let mut stats = ReceiveWindowStats::default(); let mut stats = ReceiveWindowStats::default();
Builder::new() Builder::new()
@ -652,11 +628,10 @@ impl WindowService {
}; };
if let Err(e) = recv_window( if let Err(e) = recv_window(
&blockstore, &blockstore,
&bank_forks,
&insert_sender, &insert_sender,
&verified_receiver, &verified_receiver,
&retransmit_sender, &retransmit_sender,
|shred, bank, last_root| shred_filter(&id, shred, Some(bank), last_root), |shred, last_root| shred_filter(shred, last_root),
&thread_pool, &thread_pool,
&mut stats, &mut stats,
) { ) {
@ -708,7 +683,6 @@ mod test {
solana_gossip::contact_info::ContactInfo, solana_gossip::contact_info::ContactInfo,
solana_ledger::{ solana_ledger::{
blockstore::{make_many_slot_entries, Blockstore}, blockstore::{make_many_slot_entries, Blockstore},
genesis_utils::create_genesis_config_with_leader,
get_tmp_ledger_path, get_tmp_ledger_path,
shred::{ProcessShredsStats, Shredder}, shred::{ProcessShredsStats, Shredder},
}, },
@ -759,76 +733,29 @@ mod test {
#[test] #[test]
fn test_should_retransmit_and_persist() { fn test_should_retransmit_and_persist() {
let me_id = solana_sdk::pubkey::new_rand();
let leader_keypair = Arc::new(Keypair::new()); let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey();
let bank = Arc::new(Bank::new_for_tests(
&create_genesis_config_with_leader(100, &leader_pubkey, 10).genesis_config,
));
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let shreds = local_entries_to_shred(&[Entry::default()], 0, 0, &leader_keypair); let shreds = local_entries_to_shred(&[Entry::default()], 0, 0, &leader_keypair);
// with a Bank for slot 0, shred continues // with a Bank for slot 0, shred continues
assert!(should_retransmit_and_persist( assert!(should_retransmit_and_persist(&shreds[0], 0));
&shreds[0],
Some(bank.clone()),
&cache,
&me_id,
0,
));
// substitute leader_pubkey for me_id so it looks I was the leader
// if the shred came back from me, it doesn't continue, whether or not I have a bank
assert!(!should_retransmit_and_persist(
&shreds[0],
Some(bank.clone()),
&cache,
&leader_pubkey,
0,
));
assert!(!should_retransmit_and_persist(
&shreds[0],
None,
&cache,
&leader_pubkey,
0,
));
// change the shred's slot so leader lookup fails // change the shred's slot so leader lookup fails
// with a Bank and no idea who leader is, shred gets thrown out // with a Bank and no idea who leader is, shred gets thrown out
let mut bad_slot_shred = shreds[0].clone(); let mut bad_slot_shred = shreds[0].clone();
bad_slot_shred.set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3); bad_slot_shred.set_slot(MINIMUM_SLOTS_PER_EPOCH as u64 * 3);
assert!(!should_retransmit_and_persist( assert!(!should_retransmit_and_persist(&bad_slot_shred, 0));
&bad_slot_shred,
Some(bank.clone()),
&cache,
&me_id,
0,
));
// with a shred where shred.slot() == root, shred gets thrown out // with a shred where shred.slot() == root, shred gets thrown out
let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3; let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds = local_entries_to_shred(&[Entry::default()], root, root - 1, &leader_keypair); let shreds = local_entries_to_shred(&[Entry::default()], root, root - 1, &leader_keypair);
assert!(!should_retransmit_and_persist( assert!(!should_retransmit_and_persist(&shreds[0], root));
&shreds[0],
Some(bank.clone()),
&cache,
&me_id,
root,
));
// with a shred where shred.parent() < root, shred gets thrown out // with a shred where shred.parent() < root, shred gets thrown out
let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3; let root = MINIMUM_SLOTS_PER_EPOCH as u64 * 3;
let shreds = let shreds =
local_entries_to_shred(&[Entry::default()], root + 1, root - 1, &leader_keypair); local_entries_to_shred(&[Entry::default()], root + 1, root - 1, &leader_keypair);
assert!(!should_retransmit_and_persist( assert!(!should_retransmit_and_persist(&shreds[0], root));
&shreds[0],
Some(bank.clone()),
&cache,
&me_id,
root,
));
// coding shreds don't contain parent slot information, test that slot >= root // coding shreds don't contain parent slot information, test that slot >= root
let mut coding_shred = Shred::new_from_parity_shard( let mut coding_shred = Shred::new_from_parity_shard(
@ -843,29 +770,11 @@ mod test {
); );
coding_shred.sign(&leader_keypair); coding_shred.sign(&leader_keypair);
// shred.slot() > root, shred continues // shred.slot() > root, shred continues
assert!(should_retransmit_and_persist( assert!(should_retransmit_and_persist(&coding_shred, 0));
&coding_shred,
Some(bank.clone()),
&cache,
&me_id,
0,
));
// shred.slot() == root, shred continues // shred.slot() == root, shred continues
assert!(should_retransmit_and_persist( assert!(should_retransmit_and_persist(&coding_shred, 5));
&coding_shred,
Some(bank.clone()),
&cache,
&me_id,
5,
));
// shred.slot() < root, shred gets thrown out // shred.slot() < root, shred gets thrown out
assert!(!should_retransmit_and_persist( assert!(!should_retransmit_and_persist(&coding_shred, 6));
&coding_shred,
Some(bank),
&cache,
&me_id,
6,
));
} }
#[test] #[test]