falls back on working-bank if root-bank::epoch-staked-nodes is none

bank.get_leader_schedule_epoch(shred_slot)
is one epoch after epoch_schedule.get_epoch(shred_slot).

At epoch boundaries, shred is already one epoch after the root-slot. So
we need epoch-stakes 2 epochs ahead of the root. But the root bank only
has epoch-stakes for one epoch ahead, and as a result looking up epoch
staked-nodes from the root-bank fails.

To be backward compatible with the current master code, this commit
implements a fallback on working-bank if epoch staked-nodes obtained
from the root-bank is none.
This commit is contained in:
behzad nouri 2021-08-04 18:06:42 -04:00
parent eaf927cf49
commit e4be00fece
5 changed files with 30 additions and 10 deletions

View File

@ -228,9 +228,14 @@ impl BroadcastRun for BroadcastDuplicatesRun {
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
let root_bank = bank_forks.read().unwrap().root_bank(); let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
// Broadcast data // Broadcast data
let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); let cluster_nodes =
self.cluster_nodes_cache
.get(slot, &root_bank, &working_bank, cluster_info);
broadcast_shreds( broadcast_shreds(
sock, sock,
&shreds, &shreds,

View File

@ -138,9 +138,14 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
let root_bank = bank_forks.read().unwrap().root_bank(); let (root_bank, working_bank) = {
let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
// Broadcast data // Broadcast data
let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); let cluster_nodes =
self.cluster_nodes_cache
.get(slot, &root_bank, &working_bank, cluster_info);
broadcast_shreds( broadcast_shreds(
sock, sock,
&shreds, &shreds,

View File

@ -347,8 +347,13 @@ impl StandardBroadcastRun {
trace!("Broadcasting {:?} shreds", shreds.len()); trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to // Get the list of peers to broadcast to
let mut get_peers_time = Measure::start("broadcast::get_peers"); let mut get_peers_time = Measure::start("broadcast::get_peers");
let root_bank = bank_forks.read().unwrap().root_bank(); let (root_bank, working_bank) = {
let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info); let bank_forks = bank_forks.read().unwrap();
(bank_forks.root_bank(), bank_forks.working_bank())
};
let cluster_nodes =
self.cluster_nodes_cache
.get(slot, &root_bank, &working_bank, cluster_info);
get_peers_time.stop(); get_peers_time.stop();
let mut transmit_stats = TransmitShredsStats::default(); let mut transmit_stats = TransmitShredsStats::default();

View File

@ -261,6 +261,7 @@ impl<T: 'static> ClusterNodesCache<T> {
&self, &self,
shred_slot: Slot, shred_slot: Slot,
root_bank: &Bank, root_bank: &Bank,
working_bank: &Bank,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
) -> Arc<ClusterNodes<T>> { ) -> Arc<ClusterNodes<T>> {
let epoch = root_bank.get_leader_schedule_epoch(shred_slot); let epoch = root_bank.get_leader_schedule_epoch(shred_slot);
@ -273,11 +274,13 @@ impl<T: 'static> ClusterNodesCache<T> {
return Arc::clone(nodes); return Arc::clone(nodes);
} }
} }
let epoch_staked_nodes = root_bank.epoch_staked_nodes(epoch); let epoch_staked_nodes = [root_bank, working_bank]
.iter()
.find_map(|bank| bank.epoch_staked_nodes(epoch));
if epoch_staked_nodes.is_none() { if epoch_staked_nodes.is_none() {
inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes", 1); inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes", 1);
if epoch != root_bank.get_leader_schedule_epoch(root_bank.slot()) { if epoch != root_bank.get_leader_schedule_epoch(root_bank.slot()) {
return self.get(root_bank.slot(), root_bank, cluster_info); return self.get(root_bank.slot(), root_bank, working_bank, cluster_info);
} }
inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes_root", 1); inc_new_counter_info!("cluster_nodes-unknown_epoch_staked_nodes_root", 1);
} }

View File

@ -378,7 +378,8 @@ fn retransmit(
let mut compute_turbine_peers = Measure::start("turbine_start"); let mut compute_turbine_peers = Measure::start("turbine_start");
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank)); let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank));
let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, cluster_info); let cluster_nodes =
cluster_nodes_cache.get(shred_slot, &root_bank, &working_bank, cluster_info);
let (neighbors, children) = let (neighbors, children) =
cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader);
// If the node is on the critical path (i.e. the first node in each // If the node is on the critical path (i.e. the first node in each
@ -432,7 +433,8 @@ fn retransmit(
retransmit_total, retransmit_total,
id, id,
); );
let cluster_nodes = cluster_nodes_cache.get(root_bank.slot(), &root_bank, cluster_info); let cluster_nodes =
cluster_nodes_cache.get(root_bank.slot(), &root_bank, &working_bank, cluster_info);
update_retransmit_stats( update_retransmit_stats(
stats, stats,
timer_start.as_us(), timer_start.as_us(),