From e4be00fece487d5413f7569b2e46c770b9b4d12e Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 4 Aug 2021 18:06:42 -0400 Subject: [PATCH] falls back on working-bank if root-bank::epoch-staked-nodes is none bank.get_leader_schedule_epoch(shred_slot) is one epoch after epoch_schedule.get_epoch(shred_slot). At epoch boundaries, shred is already one epoch after the root-slot. So we need epoch-stakes 2 epochs ahead of the root. But the root bank only has epoch-stakes for one epoch ahead, and as a result looking up epoch staked-nodes from the root-bank fails. To be backward compatible with the current master code, this commit implements a fallback on working-bank if epoch staked-nodes obtained from the root-bank is none. --- core/src/broadcast_stage/broadcast_duplicates_run.rs | 9 +++++++-- .../fail_entry_verification_broadcast_run.rs | 9 +++++++-- core/src/broadcast_stage/standard_broadcast_run.rs | 9 +++++++-- core/src/cluster_nodes.rs | 7 +++++-- core/src/retransmit_stage.rs | 6 ++++-- 5 files changed, 30 insertions(+), 10 deletions(-) diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index a593973a5e..ccf8af822b 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -228,9 +228,14 @@ impl BroadcastRun for BroadcastDuplicatesRun { bank_forks: &Arc>, ) -> Result<()> { let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; - let root_bank = bank_forks.read().unwrap().root_bank(); + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.working_bank()) + }; // Broadcast data - let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); + let cluster_nodes = + self.cluster_nodes_cache + .get(slot, &root_bank, &working_bank, cluster_info); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 978421d33d..43b354c179 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -138,9 +138,14 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { bank_forks: &Arc>, ) -> Result<()> { let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; - let root_bank = bank_forks.read().unwrap().root_bank(); + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.working_bank()) + }; // Broadcast data - let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); + let cluster_nodes = + self.cluster_nodes_cache + .get(slot, &root_bank, &working_bank, cluster_info); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 7688abefcc..fc3d11e745 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -347,8 +347,13 @@ impl StandardBroadcastRun { trace!("Broadcasting {:?} shreds", shreds.len()); // Get the list of peers to broadcast to let mut get_peers_time = Measure::start("broadcast::get_peers"); - let root_bank = bank_forks.read().unwrap().root_bank(); - let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.working_bank()) + }; + let cluster_nodes = + self.cluster_nodes_cache + .get(slot, &root_bank, &working_bank, cluster_info); get_peers_time.stop(); let mut transmit_stats = TransmitShredsStats::default(); diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 5729e0a09f..82754ea725 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -261,6 +261,7 @@ impl ClusterNodesCache { &self, shred_slot: Slot, root_bank: &Bank, + working_bank: &Bank, cluster_info: &ClusterInfo, ) -> Arc> { let epoch = root_bank.get_leader_schedule_epoch(shred_slot); @@ -273,11 +274,13 @@ impl ClusterNodesCache { return Arc::clone(nodes); } } - let epoch_staked_nodes = root_bank.epoch_staked_nodes(epoch); + let epoch_staked_nodes = [root_bank, working_bank] + .iter() + .find_map(|bank| bank.epoch_staked_nodes(epoch)); if epoch_staked_nodes.is_none() { inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes", 1); if epoch != root_bank.get_leader_schedule_epoch(root_bank.slot()) { - return self.get(root_bank.slot(), root_bank, cluster_info); + return self.get(root_bank.slot(), root_bank, working_bank, cluster_info); } inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes_root", 1); } diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index f50a943d0a..381c443c5b 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -378,7 +378,8 @@ fn retransmit( let mut compute_turbine_peers = Measure::start("turbine_start"); let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); - let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, cluster_info); + let cluster_nodes = + cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info); let (neighbors, children) = cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); // If the node is on the critical path (i.e. the first node in each @@ -432,7 +433,8 @@ fn retransmit( retransmit_total, id, ); - let cluster_nodes = cluster_nodes_cache.get(root_bank.slot(), &root_bank, cluster_info); + let cluster_nodes = + cluster_nodes_cache.get(root_bank.slot(), &root_bank, &working_bank, cluster_info); update_retransmit_stats( stats, timer_start.as_us(),