generate deterministic seeds for shreds (#17950)

* generate shred seed from leader pubkey

* clippy

* clippy

* review

* review 2

* fmt

* review

* check

* review

* cleanup

* fmt
This commit is contained in:
jbiseda 2021-07-07 08:21:12 -07:00 committed by GitHub
parent c039ec084b
commit a86ced0bac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 124 additions and 22 deletions

View File

@ -11,13 +11,17 @@ use solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
};
use solana_ledger::shred::Shred;
use solana_ledger::{
genesis_utils::{create_genesis_config, GenesisConfigInfo},
shred::Shred,
};
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::pubkey;
use solana_sdk::timing::timestamp;
use std::{
collections::HashMap,
net::UdpSocket,
sync::{atomic::AtomicU64, Arc},
sync::{atomic::AtomicU64, Arc, RwLock},
};
use test::Bencher;
@ -29,6 +33,10 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
let cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info);
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
const NUM_SHREDS: usize = 32;
let shreds = vec![Shred::new_empty_data_shred(); NUM_SHREDS];
let mut stakes = HashMap::new();
@ -51,6 +59,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
&cluster_nodes,
&last_datapoint,
&mut TransmitShredsStats::default(),
cluster_info.id(),
&bank_forks,
)
.unwrap();
});

View File

@ -19,7 +19,7 @@ use solana_ledger::{blockstore::Blockstore, shred::Shred};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
use solana_poh::poh_recorder::WorkingBankEntry;
use solana_runtime::bank::Bank;
use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::timing::timestamp;
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair};
use solana_streamer::sendmmsg::send_mmsg;
@ -29,7 +29,7 @@ use std::{
net::UdpSocket,
sync::atomic::{AtomicBool, Ordering},
sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender},
sync::{Arc, Mutex},
sync::{Arc, Mutex, RwLock},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
};
@ -69,6 +69,7 @@ pub enum BroadcastStageType {
}
impl BroadcastStageType {
#[allow(clippy::too_many_arguments)]
pub fn new_broadcast_stage(
&self,
sock: Vec<UdpSocket>,
@ -77,6 +78,7 @@ impl BroadcastStageType {
retransmit_slots_receiver: RetransmitSlotsReceiver,
exit_sender: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
bank_forks: &Arc<RwLock<BankForks>>,
shred_version: u16,
) -> BroadcastStage {
match self {
@ -87,6 +89,7 @@ impl BroadcastStageType {
retransmit_slots_receiver,
exit_sender,
blockstore,
bank_forks,
StandardBroadcastRun::new(shred_version),
),
@ -97,6 +100,7 @@ impl BroadcastStageType {
retransmit_slots_receiver,
exit_sender,
blockstore,
bank_forks,
FailEntryVerificationBroadcastRun::new(shred_version),
),
@ -107,6 +111,7 @@ impl BroadcastStageType {
retransmit_slots_receiver,
exit_sender,
blockstore,
bank_forks,
BroadcastFakeShredsRun::new(0, shred_version),
),
@ -117,6 +122,7 @@ impl BroadcastStageType {
retransmit_slots_receiver,
exit_sender,
blockstore,
bank_forks,
BroadcastDuplicatesRun::new(shred_version, config.clone()),
),
}
@ -138,6 +144,7 @@ trait BroadcastRun {
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()>;
fn record(
&mut self,
@ -237,6 +244,7 @@ impl BroadcastStage {
retransmit_slots_receiver: RetransmitSlotsReceiver,
exit_sender: &Arc<AtomicBool>,
blockstore: &Arc<Blockstore>,
bank_forks: &Arc<RwLock<BankForks>>,
broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone,
) -> Self {
let btree = blockstore.clone();
@ -269,10 +277,12 @@ impl BroadcastStage {
let socket_receiver = socket_receiver.clone();
let mut bs_transmit = broadcast_stage_run.clone();
let cluster_info = cluster_info.clone();
let bank_forks = bank_forks.clone();
let t = Builder::new()
.name("solana-broadcaster-transmit".to_string())
.spawn(move || loop {
let res = bs_transmit.transmit(&socket_receiver, &cluster_info, &sock);
let res =
bs_transmit.transmit(&socket_receiver, &cluster_info, &sock, &bank_forks);
let res = Self::handle_error(res, "solana-broadcaster-transmit");
if let Some(res) = res {
return res;
@ -396,6 +406,8 @@ pub fn broadcast_shreds(
cluster_nodes: &ClusterNodes<BroadcastStage>,
last_datapoint_submit: &Arc<AtomicU64>,
transmit_stats: &mut TransmitShredsStats,
self_pubkey: Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
let broadcast_len = cluster_nodes.num_peers();
if broadcast_len == 0 {
@ -403,10 +415,12 @@ pub fn broadcast_shreds(
return Ok(());
}
let mut shred_select = Measure::start("shred_select");
let root_bank = bank_forks.read().unwrap().root_bank();
let packets: Vec<_> = shreds
.iter()
.filter_map(|shred| {
let node = cluster_nodes.get_broadcast_peer(shred.seed())?;
let seed = shred.seed(Some(self_pubkey), &root_bank);
let node = cluster_nodes.get_broadcast_peer(seed)?;
Some((&shred.payload, &node.tvu))
})
.collect();
@ -598,7 +612,9 @@ pub mod test {
let exit_sender = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Arc::new(Bank::new(&genesis_config));
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank = bank_forks.read().unwrap().root_bank();
// Start up the broadcast stage
let broadcast_service = BroadcastStage::new(
@ -608,6 +624,7 @@ pub mod test {
retransmit_slots_receiver,
&exit_sender,
&blockstore,
&bank_forks,
StandardBroadcastRun::new(0),
);

View File

@ -284,6 +284,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
_bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
// Check the delay queue for shreds that are ready to be sent
let (delayed_recipient, delayed_shreds) = {

View File

@ -105,6 +105,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
_bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() {
let peers = cluster_info.tvu_peers();

View File

@ -131,6 +131,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
// Broadcast data
@ -144,6 +145,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
&cluster_nodes,
&Arc::new(AtomicU64::new(0)),
&mut TransmitShredsStats::default(),
cluster_info.id(),
bank_forks,
)?;
Ok(())

View File

@ -149,17 +149,19 @@ impl StandardBroadcastRun {
sock: &UdpSocket,
blockstore: &Arc<Blockstore>,
receive_results: ReceiveResults,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
let (bsend, brecv) = channel();
let (ssend, srecv) = channel();
self.process_receive_results(keypair, blockstore, &ssend, &bsend, receive_results)?;
let srecv = Arc::new(Mutex::new(srecv));
let brecv = Arc::new(Mutex::new(brecv));
//data
let _ = self.transmit(&srecv, cluster_info, sock);
let _ = self.transmit(&srecv, cluster_info, sock, bank_forks);
let _ = self.record(&brecv, blockstore);
//coding
let _ = self.transmit(&srecv, cluster_info, sock);
let _ = self.transmit(&srecv, cluster_info, sock, bank_forks);
let _ = self.record(&brecv, blockstore);
Ok(())
}
@ -333,6 +335,7 @@ impl StandardBroadcastRun {
stakes: Option<&HashMap<Pubkey, u64>>,
shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000;
trace!("Broadcasting {:?} shreds", shreds.len());
@ -358,12 +361,15 @@ impl StandardBroadcastRun {
let mut transmit_stats = TransmitShredsStats::default();
// Broadcast the shreds
let mut transmit_time = Measure::start("broadcast_shreds");
broadcast_shreds(
sock,
&shreds,
&cluster_nodes,
&self.last_datapoint_submit,
&mut transmit_stats,
cluster_info.id(),
bank_forks,
)?;
drop(cluster_nodes);
transmit_time.stop();
@ -470,9 +476,17 @@ impl BroadcastRun for StandardBroadcastRun {
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &ClusterInfo,
sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> {
let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?;
self.broadcast(sock, cluster_info, stakes.as_deref(), shreds, slot_start_ts)
self.broadcast(
sock,
cluster_info,
stakes.as_deref(),
shreds,
slot_start_ts,
bank_forks,
)
}
fn record(
&mut self,
@ -503,6 +517,7 @@ mod test {
use std::sync::Arc;
use std::time::Duration;
#[allow(clippy::type_complexity)]
fn setup(
num_shreds_per_slot: Slot,
) -> (
@ -512,6 +527,7 @@ mod test {
Arc<Bank>,
Arc<Keypair>,
UdpSocket,
Arc<RwLock<BankForks>>,
) {
// Setup
let ledger_path = get_tmp_ledger_path!();
@ -525,7 +541,10 @@ mod test {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut genesis_config = create_genesis_config(10_000).genesis_config;
genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot, None) + 1;
let bank0 = Arc::new(Bank::new(&genesis_config));
let bank = Bank::new(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().root_bank();
(
blockstore,
genesis_config,
@ -533,6 +552,7 @@ mod test {
bank0,
leader_keypair,
socket,
bank_forks,
)
}
@ -575,7 +595,7 @@ mod test {
fn test_slot_interrupt() {
// Setup
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket) =
let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) =
setup(num_shreds_per_slot);
// Insert 1 less than the number of ticks needed to finish the slot
@ -596,6 +616,7 @@ mod test {
&socket,
&blockstore,
receive_results,
&bank_forks,
)
.unwrap();
let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap();
@ -660,6 +681,7 @@ mod test {
&socket,
&blockstore,
receive_results,
&bank_forks,
)
.unwrap();
let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap();
@ -701,7 +723,7 @@ mod test {
#[test]
fn test_buffer_data_shreds() {
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket) =
let (blockstore, genesis_config, _cluster_info, bank, leader_keypair, _socket, _bank_forks) =
setup(num_shreds_per_slot);
let (bsend, brecv) = channel();
let (ssend, _srecv) = channel();
@ -752,7 +774,7 @@ mod test {
fn test_slot_finish() {
// Setup
let num_shreds_per_slot = 2;
let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket) =
let (blockstore, genesis_config, cluster_info, bank0, leader_keypair, socket, bank_forks) =
setup(num_shreds_per_slot);
// Insert complete slot of ticks needed to finish the slot
@ -772,6 +794,7 @@ mod test {
&socket,
&blockstore,
receive_results,
&bank_forks,
)
.unwrap();
assert!(standard_broadcast_run.unfinished_slot.is_none())

View File

@ -96,7 +96,7 @@ impl Tpu {
verified_vote_packets_sender,
poh_recorder,
vote_tracker,
bank_forks,
bank_forks.clone(),
subscriptions.clone(),
verified_vote_sender,
gossip_verified_vote_hash_sender,
@ -124,6 +124,7 @@ impl Tpu {
retransmit_slots_receiver,
exit,
blockstore,
&bank_forks,
shred_version,
);

View File

@ -213,6 +213,8 @@ where
fn recv_window<F>(
blockstore: &Arc<Blockstore>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
bank_forks: &Arc<RwLock<BankForks>>,
insert_shred_sender: &CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
my_pubkey: &Pubkey,
verified_receiver: &CrossbeamReceiver<Vec<Packets>>,
@ -236,6 +238,7 @@ where
let now = Instant::now();
inc_new_counter_debug!("streamer-recv_window-recv", total_packets);
let root_bank = bank_forks.read().unwrap().root_bank();
let last_root = blockstore.last_root();
let (shreds, repair_infos): (Vec<_>, Vec<_>) = thread_pool.install(|| {
packets
@ -275,8 +278,10 @@ where
}
};
if shred_filter(&shred, last_root) {
let leader_pubkey = leader_schedule_cache
.slot_leader_at(shred.slot(), Some(&root_bank));
packet.meta.slot = shred.slot();
packet.meta.seed = shred.seed();
packet.meta.seed = shred.seed(leader_pubkey, &root_bank);
Some((shred, repair_info))
} else {
packet.meta.discard = true;
@ -370,7 +375,7 @@ impl WindowService {
let outstanding_requests: Arc<RwLock<OutstandingRepairs>> =
Arc::new(RwLock::new(OutstandingRequests::default()));
let bank_forks = Some(repair_info.bank_forks.clone());
let bank_forks = repair_info.bank_forks.clone();
let repair_service = RepairService::new(
blockstore.clone(),
@ -411,7 +416,8 @@ impl WindowService {
insert_sender,
verified_receiver,
shred_filter,
bank_forks,
leader_schedule_cache,
&bank_forks,
retransmit,
);
@ -509,6 +515,7 @@ impl WindowService {
.unwrap()
}
#[allow(clippy::too_many_arguments)]
fn start_recv_window_thread<F>(
id: Pubkey,
exit: &Arc<AtomicBool>,
@ -516,7 +523,8 @@ impl WindowService {
insert_sender: CrossbeamSender<(Vec<Shred>, Vec<Option<RepairMeta>>)>,
verified_receiver: CrossbeamReceiver<Vec<Packets>>,
shred_filter: F,
bank_forks: Option<Arc<RwLock<BankForks>>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
bank_forks: &Arc<RwLock<BankForks>>,
retransmit: PacketSender,
) -> JoinHandle<()>
where
@ -527,6 +535,9 @@ impl WindowService {
{
let exit = exit.clone();
let blockstore = blockstore.clone();
let bank_forks = bank_forks.clone();
let bank_forks_opt = Some(bank_forks.clone());
let leader_schedule_cache = leader_schedule_cache.clone();
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
@ -554,6 +565,8 @@ impl WindowService {
};
if let Err(e) = recv_window(
&blockstore,
&leader_schedule_cache,
&bank_forks,
&insert_sender,
&id,
&verified_receiver,
@ -562,7 +575,7 @@ impl WindowService {
shred_filter(
&id,
shred,
bank_forks
bank_forks_opt
.as_ref()
.map(|bank_forks| bank_forks.read().unwrap().working_bank()),
last_root,

View File

@ -65,15 +65,17 @@ use serde::{Deserialize, Serialize};
use solana_measure::measure::Measure;
use solana_perf::packet::{limited_deserialize, Packet};
use solana_rayon_threadlimit::get_thread_count;
use solana_runtime::bank::Bank;
use solana_sdk::{
clock::Slot,
feature_set,
hash::hashv,
hash::Hash,
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
};
use std::mem::size_of;
use thiserror::Error;
#[derive(Default, Clone)]
@ -467,7 +469,18 @@ impl Shred {
self.common_header.signature
}
pub fn seed(&self) -> [u8; 32] {
pub fn seed(&self, leader_pubkey: Option<Pubkey>, root_bank: &Bank) -> [u8; 32] {
if let Some(leader_pubkey) = leader_pubkey {
if enable_deterministic_seed(self.slot(), root_bank) {
let h = hashv(&[
&self.slot().to_le_bytes(),
&self.index().to_le_bytes(),
&leader_pubkey.to_bytes(),
]);
return h.to_bytes();
}
}
let mut seed = [0; 32];
let seed_len = seed.len();
let sig = self.common_header.signature.as_ref();
@ -557,6 +570,21 @@ impl Shred {
}
}
fn enable_deterministic_seed(shred_slot: Slot, bank: &Bank) -> bool {
let feature_slot = bank
.feature_set
.activated_slot(&feature_set::deterministic_shred_seed_enabled::id());
match feature_slot {
None => false,
Some(feature_slot) => {
let epoch_schedule = bank.epoch_schedule();
let feature_epoch = epoch_schedule.get_epoch(feature_slot);
let shred_epoch = epoch_schedule.get_epoch(shred_slot);
feature_epoch < shred_epoch
}
}
}
#[derive(Debug)]
pub struct Shredder {
pub slot: Slot,

View File

@ -143,6 +143,10 @@ pub mod dedupe_config_program_signers {
solana_sdk::declare_id!("8kEuAshXLsgkUEdcFVLqrjCGGHVWFW99ZZpxvAzzMtBp");
}
pub mod deterministic_shred_seed_enabled {
solana_sdk::declare_id!("FjSRMpFe7mofQ3WrEMT7Smjk2sME1XdAoRxcv55V6M44");
}
pub mod verify_tx_signatures_len {
solana_sdk::declare_id!("EVW9B5xD9FFK7vw1SBARwMA4s5eRo5eKJdKpsBikzKBz");
}
@ -187,6 +191,7 @@ lazy_static! {
(system_transfer_zero_check::id(), "perform all checks for transfers of 0 lamports"),
(blake3_syscall_enabled::id(), "blake3 syscall"),
(dedupe_config_program_signers::id(), "dedupe config program signers"),
(deterministic_shred_seed_enabled::id(), "deterministic shred seed"),
(vote_stake_checked_instructions::id(), "vote/state program checked instructions #18345"),
/*************** ADD NEW FEATURES HERE ***************/
]