unifies cluster-nodes computation & caching across turbine stages

Broadcast-stage is using epoch_staked_nodes based on the same slot that
shreds belong to:
https://github.com/solana-labs/solana/blob/049fb0417/core/src/broadcast_stage/standard_broadcast_run.rs#L208-L228
https://github.com/solana-labs/solana/blob/0cf52e206/core/src/broadcast_stage.rs#L342-L349

But retransmit-stage is using bank-epoch of the working-bank:
https://github.com/solana-labs/solana/blob/19bd30262/core/src/retransmit_stage.rs#L272-L289

So the two are not consistent at epoch boundaries where some nodes may
have a working bank (or similarly a root bank) lagging other nodes. As a
result the node which obtains a packet may construct turbine broadcast
tree inconsistently with its parent node in the tree and so some packets
may fail to reach all nodes in the tree.
This commit is contained in:
behzad nouri 2021-07-29 12:51:00 -04:00
parent aa32738dd5
commit 50d0e830c9
2 changed files with 13 additions and 44 deletions

View File

@ -47,7 +47,7 @@ pub struct ClusterNodes<T> {
_phantom: PhantomData<T>,
}
pub struct ClusterNodesCache<T> {
pub(crate) struct ClusterNodesCache<T> {
#[allow(clippy::type_complexity)]
cache: Mutex<LruCache<Epoch, (Instant, Arc<ClusterNodes<T>>)>>,
ttl: Duration, // Time to live.
@ -72,12 +72,12 @@ impl Node {
}
impl<T> ClusterNodes<T> {
pub fn num_peers(&self) -> usize {
pub(crate) fn num_peers(&self) -> usize {
self.index.len()
}
// A peer is considered live if they generated their contact info recently.
pub fn num_peers_live(&self, now: u64) -> usize {
pub(crate) fn num_peers_live(&self, now: u64) -> usize {
self.index
.iter()
.filter_map(|(_, index)| self.nodes[*index].contact_info())
@ -100,7 +100,7 @@ impl ClusterNodes<BroadcastStage> {
/// Returns the root of turbine broadcast tree, which the leader sends the
/// shred to.
pub fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> {
pub(crate) fn get_broadcast_peer(&self, shred_seed: [u8; 32]) -> Option<&ContactInfo> {
if self.index.is_empty() {
None
} else {
@ -114,11 +114,7 @@ impl ClusterNodes<BroadcastStage> {
}
impl ClusterNodes<RetransmitStage> {
pub fn new(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Self {
new_cluster_nodes(cluster_info, stakes)
}
pub fn get_retransmit_peers(
pub(crate) fn get_retransmit_peers(
&self,
shred_seed: [u8; 32],
fanout: usize,
@ -230,7 +226,7 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
}
impl<T> ClusterNodesCache<T> {
pub fn new(
pub(crate) fn new(
// Capacity of underlying LRU-cache in terms of number of epochs.
cap: usize,
// A time-to-live eviction policy is enforced to refresh entries in
@ -244,41 +240,13 @@ impl<T> ClusterNodesCache<T> {
}
}
impl ClusterNodesCache<RetransmitStage> {
pub fn get(
&self,
cluster_info: &ClusterInfo,
working_bank: &Bank,
) -> Arc<ClusterNodes<RetransmitStage>> {
let slot = working_bank.slot();
let epoch = working_bank.get_leader_schedule_epoch(slot);
{
let mut cache = self.cache.lock().unwrap();
if let Some((asof, nodes)) = cache.get(&epoch) {
if asof.elapsed() < self.ttl {
return Arc::clone(nodes);
}
cache.pop(&epoch);
}
}
let epoch_staked_nodes = working_bank.epoch_staked_nodes(epoch).unwrap_or_default();
let nodes = ClusterNodes::<RetransmitStage>::new(cluster_info, &epoch_staked_nodes);
let nodes = Arc::new(nodes);
{
let mut cache = self.cache.lock().unwrap();
cache.put(epoch, (Instant::now(), Arc::clone(&nodes)));
}
nodes
}
}
impl ClusterNodesCache<BroadcastStage> {
pub fn get(
impl<T: 'static> ClusterNodesCache<T> {
pub(crate) fn get(
&self,
shred_slot: Slot,
root_bank: &Bank,
cluster_info: &ClusterInfo,
) -> Arc<ClusterNodes<BroadcastStage>> {
) -> Arc<ClusterNodes<T>> {
let epoch = root_bank.get_leader_schedule_epoch(shred_slot);
{
let mut cache = self.cache.lock().unwrap();
@ -290,7 +258,7 @@ impl ClusterNodesCache<BroadcastStage> {
}
}
let epoch_staked_nodes = root_bank.epoch_staked_nodes(epoch).unwrap_or_default();
let nodes = ClusterNodes::<BroadcastStage>::new(cluster_info, &epoch_staked_nodes);
let nodes = new_cluster_nodes::<T>(cluster_info, &epoch_staked_nodes);
let nodes = Arc::new(nodes);
{
let mut cache = self.cache.lock().unwrap();
@ -408,7 +376,7 @@ mod tests {
let this_node = cluster_info.my_contact_info();
// ClusterInfo::tvu_peers excludes the node itself.
assert_eq!(cluster_info.tvu_peers().len(), nodes.len() - 1);
let cluster_nodes = ClusterNodes::<RetransmitStage>::new(&cluster_info, &stakes);
let cluster_nodes = new_cluster_nodes::<RetransmitStage>(&cluster_info, &stakes);
// All nodes with contact-info should be in the index.
assert_eq!(cluster_nodes.index.len(), nodes.len());
// Staked nodes with no contact-info should be included.

View File

@ -337,7 +337,6 @@ fn retransmit(
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
maybe_reset_shreds_received_cache(shreds_received, hasher_reset_ts);
let cluster_nodes = cluster_nodes_cache.get(cluster_info, &working_bank);
epoch_cache_update.stop();
let my_id = cluster_info.id();
@ -379,6 +378,7 @@ fn retransmit(
let mut compute_turbine_peers = Measure::start("turbine_start");
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank));
let cluster_nodes = cluster_nodes_cache.get(shred_slot, &root_bank, cluster_info);
let (neighbors, children) =
cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader);
// If the node is on the critical path (i.e. the first node in each
@ -432,6 +432,7 @@ fn retransmit(
retransmit_total,
id,
);
let cluster_nodes = cluster_nodes_cache.get(root_bank.slot(), &root_bank, cluster_info);
update_retransmit_stats(
stats,
timer_start.as_us(),