adds back cluster partitions to broadcast-duplicates (#19253)

An earlier version of this code was aiming to create a partition by
manipulating stakes, and setting some of them to zero:
https://github.com/solana-labs/solana/blob/cde146155/core/src/broadcast_stage/broadcast_duplicates_run.rs#L65-L116

https://github.com/solana-labs/solana/pull/18971
moved stakes computation further down the stream, and so that logic
could no longer live there. This commit adds back cluster partitions
by intercepting packets before send.
This commit is contained in:
behzad nouri 2021-08-16 22:24:30 +00:00 committed by GitHub
parent 692aa99147
commit f33b7abffb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 75 additions and 19 deletions

View File

@ -1,14 +1,16 @@
use {
super::*,
crate::cluster_nodes::ClusterNodesCache,
itertools::Itertools,
solana_entry::entry::Entry,
solana_ledger::shred::Shredder,
solana_runtime::blockhash_queue::BlockhashQueue,
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
signature::{Keypair, Signature, Signer},
system_transaction,
},
std::collections::HashSet,
};
pub const MINIMUM_DUPLICATE_SLOT: Slot = 20;
@ -36,6 +38,8 @@ pub(super) struct BroadcastDuplicatesRun {
prev_entry_hash: Option<Hash>,
num_slots_broadcasted: usize,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
original_last_data_shreds: HashSet<Signature>,
partition_last_data_shreds: HashSet<Signature>,
}
impl BroadcastDuplicatesRun {
@ -56,6 +60,8 @@ impl BroadcastDuplicatesRun {
prev_entry_hash: None,
num_slots_broadcasted: 0,
cluster_nodes_cache,
original_last_data_shreds: HashSet::default(),
partition_last_data_shreds: HashSet::default(),
}
}
}
@ -198,19 +204,23 @@ impl BroadcastRun for BroadcastDuplicatesRun {
// Special handling of last shred to cause partition
if let Some((original_last_data_shred, partition_last_data_shred)) = last_shreds {
let pubkey = keypair.pubkey();
self.original_last_data_shreds
.extend(original_last_data_shred.iter().map(|shred| {
assert!(shred.verify(&pubkey));
shred.signature()
}));
self.partition_last_data_shreds
.extend(partition_last_data_shred.iter().map(|shred| {
assert!(shred.verify(&pubkey));
shred.signature()
}));
let original_last_data_shred = Arc::new(original_last_data_shred);
let partition_last_data_shred = Arc::new(partition_last_data_shred);
// Store the original shreds that this node replayed
blockstore_sender.send((original_last_data_shred.clone(), None))?;
// TODO: Previously, on the last shred, the code here was using
// stakes to partition the network with duplicate and real shreds
// at self.config.stake_partition of cumulative stake. This is no
// longer possible here as stakes are computed elsewhere further
// down the stream. Figure out how to replicate old behaviour to
// preserve test coverage.
// https://github.com/solana-labs/solana/blob/cde146155/core/src/broadcast_stage/broadcast_duplicates_run.rs#L65-L116
let original_transmit_shreds = (bank.slot(), original_last_data_shred);
let partition_transmit_shreds = (bank.slot(), partition_last_data_shred);
@ -232,21 +242,67 @@ impl BroadcastRun for BroadcastDuplicatesRun {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let self_pubkey = cluster_info.id();
// Creat cluster partition.
let cluster_partition: HashSet<Pubkey> = {
let mut cumilative_stake = 0;
let epoch = root_bank.get_leader_schedule_epoch(slot);
root_bank
.epoch_staked_nodes(epoch)
.unwrap()
.iter()
.filter(|(pubkey, _)| **pubkey != self_pubkey)
.sorted_by_key(|(pubkey, stake)| (**stake, **pubkey))
.take_while(|(_, stake)| {
cumilative_stake += *stake;
cumilative_stake <= self.config.stake_partition
})
.map(|(pubkey, _)| *pubkey)
.collect()
};
// Broadcast data
let cluster_nodes =
self.cluster_nodes_cache
.get(slot, &root_bank, &working_bank, cluster_info);
broadcast_shreds(
sock,
&shreds,
&cluster_nodes,
&Arc::new(AtomicInterval::default()),
&mut TransmitShredsStats::default(),
cluster_info.id(),
bank_forks,
cluster_info.socket_addr_space(),
)?;
let socket_addr_space = cluster_info.socket_addr_space();
let packets: Vec<_> = shreds
.iter()
.filter_map(|shred| {
let seed = shred.seed(Some(self_pubkey), &root_bank);
let node = cluster_nodes.get_broadcast_peer(seed)?;
if !socket_addr_space.check(&node.tvu) {
return None;
}
if self.original_last_data_shreds.contains(&shred.signature()) {
// If the node is within the partitin skip the shred.
if cluster_partition.contains(&node.id) {
info!(
"skipping node {} for original shred index {}, slot {}",
node.id,
shred.index(),
shred.slot()
);
return None;
}
}
if self.partition_last_data_shreds.contains(&shred.signature()) {
// If the node is not within the partitin skip the shred.
if !cluster_partition.contains(&node.id) {
info!(
"skipping node {} for partition shred index {}, slot {}",
node.id,
shred.index(),
shred.slot()
);
return None;
}
}
Some((&shred.payload, node.tvu))
})
.collect();
if let Err(SendPktsError::IoError(ioerr, _)) = batch_send(sock, &packets) {
return Err(Error::Io(ioerr));
}
Ok(())
}