Fix duplicate broadcast test (#19365)

This commit is contained in:
carllin 2021-08-27 17:53:24 -07:00 committed by GitHub
parent 6909a79b6f
commit 84db04ce6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 25 deletions

View File

@ -38,8 +38,8 @@ pub(super) struct BroadcastDuplicatesRun {
prev_entry_hash: Option<Hash>, prev_entry_hash: Option<Hash>,
num_slots_broadcasted: usize, num_slots_broadcasted: usize,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>, cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
original_last_data_shreds: HashSet<Signature>, original_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
partition_last_data_shreds: HashSet<Signature>, partition_last_data_shreds: Arc<Mutex<HashSet<Signature>>>,
} }
impl BroadcastDuplicatesRun { impl BroadcastDuplicatesRun {
@ -60,8 +60,8 @@ impl BroadcastDuplicatesRun {
prev_entry_hash: None, prev_entry_hash: None,
num_slots_broadcasted: 0, num_slots_broadcasted: 0,
cluster_nodes_cache, cluster_nodes_cache,
original_last_data_shreds: HashSet::default(), original_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
partition_last_data_shreds: HashSet::default(), partition_last_data_shreds: Arc::<Mutex<HashSet<Signature>>>::default(),
} }
} }
} }
@ -205,16 +205,19 @@ impl BroadcastRun for BroadcastDuplicatesRun {
// Special handling of last shred to cause partition // Special handling of last shred to cause partition
if let Some((original_last_data_shred, partition_last_data_shred)) = last_shreds { if let Some((original_last_data_shred, partition_last_data_shred)) = last_shreds {
let pubkey = keypair.pubkey(); let pubkey = keypair.pubkey();
self.original_last_data_shreds self.original_last_data_shreds.lock().unwrap().extend(
.extend(original_last_data_shred.iter().map(|shred| { original_last_data_shred.iter().map(|shred| {
assert!(shred.verify(&pubkey)); assert!(shred.verify(&pubkey));
shred.signature() shred.signature()
})); }),
self.partition_last_data_shreds );
.extend(partition_last_data_shred.iter().map(|shred| { self.partition_last_data_shreds.lock().unwrap().extend(
partition_last_data_shred.iter().map(|shred| {
info!("adding {} to partition set", shred.signature());
assert!(shred.verify(&pubkey)); assert!(shred.verify(&pubkey));
shred.signature() shred.signature()
})); }),
);
let original_last_data_shred = Arc::new(original_last_data_shred); let original_last_data_shred = Arc::new(original_last_data_shred);
let partition_last_data_shred = Arc::new(partition_last_data_shred); let partition_last_data_shred = Arc::new(partition_last_data_shred);
@ -252,6 +255,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
(bank_forks.root_bank(), bank_forks.working_bank()) (bank_forks.root_bank(), bank_forks.working_bank())
}; };
let self_pubkey = cluster_info.id(); let self_pubkey = cluster_info.id();
// Creat cluster partition. // Creat cluster partition.
let cluster_partition: HashSet<Pubkey> = { let cluster_partition: HashSet<Pubkey> = {
let mut cumilative_stake = 0; let mut cumilative_stake = 0;
@ -269,6 +273,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.map(|(pubkey, _)| *pubkey) .map(|(pubkey, _)| *pubkey)
.collect() .collect()
}; };
// Broadcast data // Broadcast data
let cluster_nodes = let cluster_nodes =
self.cluster_nodes_cache self.cluster_nodes_cache
@ -282,8 +287,12 @@ impl BroadcastRun for BroadcastDuplicatesRun {
if !socket_addr_space.check(&node.tvu) { if !socket_addr_space.check(&node.tvu) {
return None; return None;
} }
if self.original_last_data_shreds.contains(&shred.signature()) { if self
// If the node is within the partitin skip the shred. .original_last_data_shreds
.lock()
.unwrap()
.remove(&shred.signature())
{
if cluster_partition.contains(&node.id) { if cluster_partition.contains(&node.id) {
info!( info!(
"skipping node {} for original shred index {}, slot {}", "skipping node {} for original shred index {}, slot {}",
@ -293,22 +302,33 @@ impl BroadcastRun for BroadcastDuplicatesRun {
); );
return None; return None;
} }
} else if self
.partition_last_data_shreds
.lock()
.unwrap()
.remove(&shred.signature())
{
// If the shred is part of the partition, broadcast it directly to the
// partition node. This is to account for cases when the partition stake
// is small such as in `test_duplicate_shreds_broadcast_leader()`, then
// the partition node is never selected by get_broadcast_peer()
return Some(
cluster_partition
.iter()
.filter_map(|pubkey| {
let tvu = cluster_info
.lookup_contact_info(pubkey, |contact_info| contact_info.tvu)?;
Some((&shred.payload, tvu))
})
.collect(),
);
} }
if self.partition_last_data_shreds.contains(&shred.signature()) {
// If the node is not within the partitin skip the shred. Some(vec![(&shred.payload, node.tvu)])
if !cluster_partition.contains(&node.id) {
info!(
"skipping node {} for partition shred index {}, slot {}",
node.id,
shred.index(),
shred.slot()
);
return None;
}
}
Some((&shred.payload, node.tvu))
}) })
.flatten()
.collect(); .collect();
if let Err(SendPktsError::IoError(ioerr, _)) = batch_send(sock, &packets) { if let Err(SendPktsError::IoError(ioerr, _)) = batch_send(sock, &packets) {
return Err(Error::Io(ioerr)); return Err(Error::Io(ioerr));
} }

View File

@ -148,6 +148,11 @@ impl LocalCluster {
.iter() .iter()
.zip(&config.node_stakes) .zip(&config.node_stakes)
.filter_map(|((node_keypair, in_genesis), stake)| { .filter_map(|((node_keypair, in_genesis), stake)| {
info!(
"STARTING LOCAL CLUSTER: key {} has {} stake",
node_keypair.pubkey(),
stake
);
if *in_genesis { if *in_genesis {
Some(( Some((
ValidatorVoteKeypairs { ValidatorVoteKeypairs {