diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 1e5edb5212..ba55a306e2 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -38,8 +38,8 @@ pub(super) struct BroadcastDuplicatesRun { prev_entry_hash: Option, num_slots_broadcasted: usize, cluster_nodes_cache: Arc>, - original_last_data_shreds: HashSet, - partition_last_data_shreds: HashSet, + original_last_data_shreds: Arc>>, + partition_last_data_shreds: Arc>>, } impl BroadcastDuplicatesRun { @@ -60,8 +60,8 @@ impl BroadcastDuplicatesRun { prev_entry_hash: None, num_slots_broadcasted: 0, cluster_nodes_cache, - original_last_data_shreds: HashSet::default(), - partition_last_data_shreds: HashSet::default(), + original_last_data_shreds: Arc::>>::default(), + partition_last_data_shreds: Arc::>>::default(), } } } @@ -205,16 +205,19 @@ impl BroadcastRun for BroadcastDuplicatesRun { // Special handling of last shred to cause partition if let Some((original_last_data_shred, partition_last_data_shred)) = last_shreds { let pubkey = keypair.pubkey(); - self.original_last_data_shreds - .extend(original_last_data_shred.iter().map(|shred| { + self.original_last_data_shreds.lock().unwrap().extend( + original_last_data_shred.iter().map(|shred| { assert!(shred.verify(&pubkey)); shred.signature() - })); - self.partition_last_data_shreds - .extend(partition_last_data_shred.iter().map(|shred| { + }), + ); + self.partition_last_data_shreds.lock().unwrap().extend( + partition_last_data_shred.iter().map(|shred| { + info!("adding {} to partition set", shred.signature()); assert!(shred.verify(&pubkey)); shred.signature() - })); + }), + ); let original_last_data_shred = Arc::new(original_last_data_shred); let partition_last_data_shred = Arc::new(partition_last_data_shred); @@ -252,6 +255,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { (bank_forks.root_bank(), bank_forks.working_bank()) }; let self_pubkey = cluster_info.id(); + // Creat cluster partition. let cluster_partition: HashSet = { let mut cumilative_stake = 0; @@ -269,6 +273,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { .map(|(pubkey, _)| *pubkey) .collect() }; + // Broadcast data let cluster_nodes = self.cluster_nodes_cache @@ -282,8 +287,12 @@ impl BroadcastRun for BroadcastDuplicatesRun { if !socket_addr_space.check(&node.tvu) { return None; } - if self.original_last_data_shreds.contains(&shred.signature()) { - // If the node is within the partitin skip the shred. + if self + .original_last_data_shreds + .lock() + .unwrap() + .remove(&shred.signature()) + { if cluster_partition.contains(&node.id) { info!( "skipping node {} for original shred index {}, slot {}", @@ -293,22 +302,33 @@ impl BroadcastRun for BroadcastDuplicatesRun { ); return None; } + } else if self + .partition_last_data_shreds + .lock() + .unwrap() + .remove(&shred.signature()) + { + // If the shred is part of the partition, broadcast it directly to the + // partition node. This is to account for cases when the partition stake + // is small such as in `test_duplicate_shreds_broadcast_leader()`, then + // the partition node is never selected by get_broadcast_peer() + return Some( + cluster_partition + .iter() + .filter_map(|pubkey| { + let tvu = cluster_info + .lookup_contact_info(pubkey, |contact_info| contact_info.tvu)?; + Some((&shred.payload, tvu)) + }) + .collect(), + ); } - if self.partition_last_data_shreds.contains(&shred.signature()) { - // If the node is not within the partitin skip the shred. - if !cluster_partition.contains(&node.id) { - info!( - "skipping node {} for partition shred index {}, slot {}", - node.id, - shred.index(), - shred.slot() - ); - return None; - } - } - Some((&shred.payload, node.tvu)) + + Some(vec![(&shred.payload, node.tvu)]) }) + .flatten() .collect(); + if let Err(SendPktsError::IoError(ioerr, _)) = batch_send(sock, &packets) { return Err(Error::Io(ioerr)); } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index fb209611b3..9129773b98 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -148,6 +148,11 @@ impl LocalCluster { .iter() .zip(&config.node_stakes) .filter_map(|((node_keypair, in_genesis), stake)| { + info!( + "STARTING LOCAL CLUSTER: key {} has {} stake", + node_keypair.pubkey(), + stake + ); if *in_genesis { Some(( ValidatorVoteKeypairs {