skips retransmit for shreds with unknown slot leader (#19472)
Shreds' signatures should be verified before they reach retransmit stage, and if the leader is unknown they should fail signature check. Therefore retransmit-stage can as well expect to know who the slot leader is and otherwise just skip the shred. Blockstore checking signature of recovered shreds before sending them to retransmit stage: https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/blockstore.rs#L884-L930 Shred signature verifier: https://github.com/solana-labs/solana/blob/4305d4b7b/core/src/sigverify_shreds.rs#L41-L57 https://github.com/solana-labs/solana/blob/4305d4b7b/ledger/src/sigverify_shreds.rs#L105
This commit is contained in:
parent
82a6bbe068
commit
6d9818b8e4
|
@ -413,7 +413,7 @@ pub fn broadcast_shreds(
|
||||||
let mut result = Ok(());
|
let mut result = Ok(());
|
||||||
let mut shred_select = Measure::start("shred_select");
|
let mut shred_select = Measure::start("shred_select");
|
||||||
// Only the leader broadcasts shreds.
|
// Only the leader broadcasts shreds.
|
||||||
let leader = Some(cluster_info.id());
|
let leader = cluster_info.id();
|
||||||
let (root_bank, working_bank) = {
|
let (root_bank, working_bank) = {
|
||||||
let bank_forks = bank_forks.read().unwrap();
|
let bank_forks = bank_forks.read().unwrap();
|
||||||
(bank_forks.root_bank(), bank_forks.working_bank())
|
(bank_forks.root_bank(), bank_forks.working_bank())
|
||||||
|
|
|
@ -282,7 +282,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
||||||
let packets: Vec<_> = shreds
|
let packets: Vec<_> = shreds
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|shred| {
|
.filter_map(|shred| {
|
||||||
let seed = shred.seed(Some(self_pubkey), &root_bank);
|
let seed = shred.seed(self_pubkey, &root_bank);
|
||||||
let node = cluster_nodes.get_broadcast_peer(seed)?;
|
let node = cluster_nodes.get_broadcast_peer(seed)?;
|
||||||
if !socket_addr_space.check(&node.tvu) {
|
if !socket_addr_space.check(&node.tvu) {
|
||||||
return None;
|
return None;
|
||||||
|
|
|
@ -122,25 +122,21 @@ impl ClusterNodes<RetransmitStage> {
|
||||||
&self,
|
&self,
|
||||||
shred_seed: [u8; 32],
|
shred_seed: [u8; 32],
|
||||||
fanout: usize,
|
fanout: usize,
|
||||||
slot_leader: Option<Pubkey>,
|
slot_leader: Pubkey,
|
||||||
) -> (
|
) -> (
|
||||||
Vec<&ContactInfo>, // neighbors
|
Vec<&ContactInfo>, // neighbors
|
||||||
Vec<&ContactInfo>, // children
|
Vec<&ContactInfo>, // children
|
||||||
) {
|
) {
|
||||||
// Exclude leader from list of nodes.
|
// Exclude leader from list of nodes.
|
||||||
let index = self.index.iter().copied();
|
let (weights, index): (Vec<u64>, Vec<usize>) = if slot_leader == self.pubkey {
|
||||||
let (weights, index): (Vec<u64>, Vec<usize>) = match slot_leader {
|
|
||||||
None => {
|
|
||||||
error!("unknown leader for shred slot");
|
|
||||||
index.unzip()
|
|
||||||
}
|
|
||||||
Some(slot_leader) if slot_leader == self.pubkey => {
|
|
||||||
error!("retransmit from slot leader: {}", slot_leader);
|
error!("retransmit from slot leader: {}", slot_leader);
|
||||||
index.unzip()
|
self.index.iter().copied().unzip()
|
||||||
}
|
} else {
|
||||||
Some(slot_leader) => index
|
self.index
|
||||||
|
.iter()
|
||||||
.filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader)
|
.filter(|(_, i)| self.nodes[*i].pubkey() != slot_leader)
|
||||||
.unzip(),
|
.copied()
|
||||||
|
.unzip()
|
||||||
};
|
};
|
||||||
let index: Vec<_> = {
|
let index: Vec<_> = {
|
||||||
let shuffle = weighted_shuffle(weights.into_iter(), shred_seed);
|
let shuffle = weighted_shuffle(weights.into_iter(), shred_seed);
|
||||||
|
@ -462,7 +458,7 @@ mod tests {
|
||||||
let (neighbors_indices, children_indices) =
|
let (neighbors_indices, children_indices) =
|
||||||
compute_retransmit_peers(fanout, self_index, &shuffled_index);
|
compute_retransmit_peers(fanout, self_index, &shuffled_index);
|
||||||
let (neighbors, children) =
|
let (neighbors, children) =
|
||||||
cluster_nodes.get_retransmit_peers(shred_seed, fanout, Some(slot_leader));
|
cluster_nodes.get_retransmit_peers(shred_seed, fanout, slot_leader);
|
||||||
assert_eq!(children.len(), children_indices.len());
|
assert_eq!(children.len(), children_indices.len());
|
||||||
for (node, index) in children.into_iter().zip(children_indices) {
|
for (node, index) in children.into_iter().zip(children_indices) {
|
||||||
assert_eq!(*node, peers[index]);
|
assert_eq!(*node, peers[index]);
|
||||||
|
|
|
@ -287,7 +287,14 @@ fn retransmit(
|
||||||
|
|
||||||
let mut compute_turbine_peers = Measure::start("turbine_start");
|
let mut compute_turbine_peers = Measure::start("turbine_start");
|
||||||
// TODO: consider using root-bank here for leader lookup!
|
// TODO: consider using root-bank here for leader lookup!
|
||||||
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank));
|
// Shreds' signatures should be verified before they reach here, and if
|
||||||
|
// the leader is unknown they should fail signature check. So here we
|
||||||
|
// should expect to know the slot leader and otherwise skip the shred.
|
||||||
|
let slot_leader =
|
||||||
|
match leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)) {
|
||||||
|
Some(pubkey) => pubkey,
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
let cluster_nodes =
|
let cluster_nodes =
|
||||||
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
|
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
|
||||||
let shred_seed = shred.seed(slot_leader, &root_bank);
|
let shred_seed = shred.seed(slot_leader, &root_bank);
|
||||||
|
|
|
@ -49,31 +49,31 @@
|
||||||
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
|
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
|
||||||
//! payload can fit into one coding shred / packet.
|
//! payload can fit into one coding shred / packet.
|
||||||
|
|
||||||
use crate::{blockstore::MAX_DATA_SHREDS_PER_SLOT, erasure::Session};
|
use {
|
||||||
use bincode::config::Options;
|
crate::{blockstore::MAX_DATA_SHREDS_PER_SLOT, erasure::Session},
|
||||||
use core::cell::RefCell;
|
bincode::config::Options,
|
||||||
use rayon::{
|
rayon::{
|
||||||
iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator},
|
iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator},
|
||||||
slice::ParallelSlice,
|
slice::ParallelSlice,
|
||||||
ThreadPool,
|
ThreadPool,
|
||||||
};
|
},
|
||||||
use serde::{Deserialize, Serialize};
|
serde::{Deserialize, Serialize},
|
||||||
use solana_entry::entry::{create_ticks, Entry};
|
solana_entry::entry::{create_ticks, Entry},
|
||||||
use solana_measure::measure::Measure;
|
solana_measure::measure::Measure,
|
||||||
use solana_perf::packet::{limited_deserialize, Packet};
|
solana_perf::packet::{limited_deserialize, Packet},
|
||||||
use solana_rayon_threadlimit::get_thread_count;
|
solana_rayon_threadlimit::get_thread_count,
|
||||||
use solana_runtime::bank::Bank;
|
solana_runtime::bank::Bank,
|
||||||
use solana_sdk::{
|
solana_sdk::{
|
||||||
clock::Slot,
|
clock::Slot,
|
||||||
feature_set,
|
feature_set,
|
||||||
hash::hashv,
|
hash::{hashv, Hash},
|
||||||
hash::Hash,
|
|
||||||
packet::PACKET_DATA_SIZE,
|
packet::PACKET_DATA_SIZE,
|
||||||
pubkey::Pubkey,
|
pubkey::Pubkey,
|
||||||
signature::{Keypair, Signature, Signer},
|
signature::{Keypair, Signature, Signer},
|
||||||
|
},
|
||||||
|
std::{cell::RefCell, convert::TryInto, mem::size_of},
|
||||||
|
thiserror::Error,
|
||||||
};
|
};
|
||||||
use std::mem::size_of;
|
|
||||||
use thiserror::Error;
|
|
||||||
|
|
||||||
#[derive(Default, Clone)]
|
#[derive(Default, Clone)]
|
||||||
pub struct ProcessShredsStats {
|
pub struct ProcessShredsStats {
|
||||||
|
@ -466,25 +466,21 @@ impl Shred {
|
||||||
self.common_header.signature
|
self.common_header.signature
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn seed(&self, leader_pubkey: Option<Pubkey>, root_bank: &Bank) -> [u8; 32] {
|
pub fn seed(&self, leader_pubkey: Pubkey, root_bank: &Bank) -> [u8; 32] {
|
||||||
if let Some(leader_pubkey) = leader_pubkey {
|
|
||||||
if enable_deterministic_seed(self.slot(), root_bank) {
|
if enable_deterministic_seed(self.slot(), root_bank) {
|
||||||
let h = hashv(&[
|
hashv(&[
|
||||||
&self.slot().to_le_bytes(),
|
&self.slot().to_le_bytes(),
|
||||||
&self.index().to_le_bytes(),
|
&self.index().to_le_bytes(),
|
||||||
&leader_pubkey.to_bytes(),
|
&leader_pubkey.to_bytes(),
|
||||||
]);
|
])
|
||||||
return h.to_bytes();
|
.to_bytes()
|
||||||
|
} else {
|
||||||
|
let signature = self.common_header.signature.as_ref();
|
||||||
|
let offset = signature.len().checked_sub(32).unwrap();
|
||||||
|
signature[offset..].try_into().unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut seed = [0; 32];
|
|
||||||
let seed_len = seed.len();
|
|
||||||
let sig = self.common_header.signature.as_ref();
|
|
||||||
seed[0..seed_len].copy_from_slice(&sig[(sig.len() - seed_len)..]);
|
|
||||||
seed
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_data(&self) -> bool {
|
pub fn is_data(&self) -> bool {
|
||||||
self.common_header.shred_type == ShredType(DATA_SHRED)
|
self.common_header.shred_type == ShredType(DATA_SHRED)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue