From 6d9818b8e4133faef3bc41cfdadab0fd1cf2e1f7 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 1 Sep 2021 15:44:26 +0000 Subject: [PATCH] skips retransmit for shreds with unknown slot leader (#19472) Shreds' signatures should be verified before they reach retransmit stage, and if the leader is unknown they should fail signature check. Therefore retransmit-stage can as well expect to know who the slot leader is and otherwise just skip the shred. Blockstore checking signature of recovered shreds before sending them to retransmit stage: https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/blockstore.rs#L884-L930 Shred signature verifier: https://github.com/solana-labs/solana/blob/4305d4b7b/core/src/sigverify_shreds.rs#L41-L57 https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/sigverify_shreds.rs#L105 --- core/src/broadcast_stage.rs | 2 +- .../broadcast_duplicates_run.rs | 2 +- core/src/cluster_nodes.rs | 24 +++--- core/src/retransmit_stage.rs | 9 ++- ledger/src/shred.rs | 76 +++++++++---------- 5 files changed, 56 insertions(+), 57 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index f8fdc7a09a..84ffae41fb 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -413,7 +413,7 @@ pub fn broadcast_shreds( let mut result = Ok(()); let mut shred_select = Measure::start("shred_select"); // Only the leader broadcasts shreds. - let leader = Some(cluster_info.id()); + let leader = cluster_info.id(); let (root_bank, working_bank) = { let bank_forks = bank_forks.read().unwrap(); (bank_forks.root_bank(), bank_forks.working_bank()) diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index ba55a306e2..978d83df1f 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -282,7 +282,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { let packets: Vec<_> = shreds .iter() .filter_map(|shred| { - let seed = shred.seed(Some(self_pubkey), &root_bank); + let seed = shred.seed(self_pubkey, &root_bank); let node = cluster_nodes.get_broadcast_peer(seed)?; if !socket_addr_space.check(&node.tvu) { return None; diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 47ce908290..ec982db518 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -122,25 +122,21 @@ impl ClusterNodes { &self, shred_seed: [u8; 32], fanout: usize, - slot_leader: Option, + slot_leader: Pubkey, ) -> ( Vec<&ContactInfo>, // neighbors Vec<&ContactInfo>, // children ) { // Exclude leader from list of nodes. - let index = self.index.iter().copied(); - let (weights, index): (Vec, Vec) = match slot_leader { - None => { - error!("unknown leader for shred slot"); - index.unzip() - } - Some(slot_leader) if slot_leader == self.pubkey => { - error!("retransmit from slot leader: {}", slot_leader); - index.unzip() - } - Some(slot_leader) => index + let (weights, index): (Vec, Vec) = if slot_leader == self.pubkey { + error!("retransmit from slot leader: {}", slot_leader); + self.index.iter().copied().unzip() + } else { + self.index + .iter() .filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader) - .unzip(), + .copied() + .unzip() }; let index: Vec<_> = { let shuffle = weighted_shuffle(weights.into_iter(), shred_seed); @@ -462,7 +458,7 @@ mod tests { let (neighbors_indices, children_indices) = compute_retransmit_peers(fanout, self_index, &shuffled_index); let (neighbors, children) = - cluster_nodes.get_retransmit_peers(shred_seed, fanout, Some(slot_leader)); + cluster_nodes.get_retransmit_peers(shred_seed, fanout, slot_leader); assert_eq!(children.len(), children_indices.len()); for (node, index) in children.into_iter().zip(children_indices) { assert_eq!(*node, peers[index]); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index ec86d0913e..9b2f2b0a47 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -287,7 +287,14 @@ fn retransmit( let mut compute_turbine_peers = Measure::start("turbine_start"); // TODO: consider using root-bank here for leader lookup! - let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); + // Shreds' signatures should be verified before they reach here, and if + // the leader is unknown they should fail signature check. So here we + // should expect to know the slot leader and otherwise skip the shred. + let slot_leader = + match leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)) { + Some(pubkey) => pubkey, + None => continue, + }; let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); let shred_seed = shred.seed(slot_leader, &root_bank); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 2e4310f992..30a2c7d877 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -49,31 +49,31 @@ //! So, given a) - c), we must restrict data shred's payload length such that the entire coding //! payload can fit into one coding shred / packet. -use crate::{blockstore::MAX_DATA_SHREDS_PER_SLOT, erasure::Session}; -use bincode::config::Options; -use core::cell::RefCell; -use rayon::{ - iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}, - slice::ParallelSlice, - ThreadPool, +use { + crate::{blockstore::MAX_DATA_SHREDS_PER_SLOT, erasure::Session}, + bincode::config::Options, + rayon::{ + iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator}, + slice::ParallelSlice, + ThreadPool, + }, + serde::{Deserialize, Serialize}, + solana_entry::entry::{create_ticks, Entry}, + solana_measure::measure::Measure, + solana_perf::packet::{limited_deserialize, Packet}, + solana_rayon_threadlimit::get_thread_count, + solana_runtime::bank::Bank, + solana_sdk::{ + clock::Slot, + feature_set, + hash::{hashv, Hash}, + packet::PACKET_DATA_SIZE, + pubkey::Pubkey, + signature::{Keypair, Signature, Signer}, + }, + std::{cell::RefCell, convert::TryInto, mem::size_of}, + thiserror::Error, }; -use serde::{Deserialize, Serialize}; -use solana_entry::entry::{create_ticks, Entry}; -use solana_measure::measure::Measure; -use solana_perf::packet::{limited_deserialize, Packet}; -use solana_rayon_threadlimit::get_thread_count; -use solana_runtime::bank::Bank; -use solana_sdk::{ - clock::Slot, - feature_set, - hash::hashv, - hash::Hash, - packet::PACKET_DATA_SIZE, - pubkey::Pubkey, - signature::{Keypair, Signature, Signer}, -}; -use std::mem::size_of; -use thiserror::Error; #[derive(Default, Clone)] pub struct ProcessShredsStats { @@ -466,23 +466,19 @@ impl Shred { self.common_header.signature } - pub fn seed(&self, leader_pubkey: Option, root_bank: &Bank) -> [u8; 32] { - if let Some(leader_pubkey) = leader_pubkey { - if enable_deterministic_seed(self.slot(), root_bank) { - let h = hashv(&[ - &self.slot().to_le_bytes(), - &self.index().to_le_bytes(), - &leader_pubkey.to_bytes(), - ]); - return h.to_bytes(); - } + pub fn seed(&self, leader_pubkey: Pubkey, root_bank: &Bank) -> [u8; 32] { + if enable_deterministic_seed(self.slot(), root_bank) { + hashv(&[ + &self.slot().to_le_bytes(), + &self.index().to_le_bytes(), + &leader_pubkey.to_bytes(), + ]) + .to_bytes() + } else { + let signature = self.common_header.signature.as_ref(); + let offset = signature.len().checked_sub(32).unwrap(); + signature[offset..].try_into().unwrap() } - - let mut seed = [0; 32]; - let seed_len = seed.len(); - let sig = self.common_header.signature.as_ref(); - seed[0..seed_len].copy_from_slice(&sig[(sig.len() - seed_len)..]); - seed } pub fn is_data(&self) -> bool {