uses cluster-nodes cache in broadcast-stage

* Current caching mechanism does not update cluster-nodes when the epoch
  (and so epoch staked nodes) changes:
  https://github.com/solana-labs/solana/blob/19bd30262/core/src/broadcast_stage/standard_broadcast_run.rs#L332-L344

* Additionally, the cache update has a concurrency bug in which the
  thread which does compare_and_swap may be blocked when it tries to
  obtain the write-lock on cache, while other threads will keep running
  ahead with the outdated cache (since the atomic timestamp is already
  updated).

In the new ClusterNodesCache, entries are keyed by epoch, and so if
epoch changes cluster-nodes will be recalculated. The time-to-live
eviction policy is also encapsulated and rigidly enforced.
This commit is contained in:
behzad nouri 2021-07-29 12:25:56 -04:00
parent 30bec3921e
commit aa32738dd5
4 changed files with 45 additions and 31 deletions

View File

@ -44,6 +44,9 @@ pub(crate) mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run;
const CLUSTER_NODES_CACHE_NUM_EPOCH_CAP: usize = 8;
const CLUSTER_NODES_CACHE_TTL: Duration = Duration::from_secs(5);
pub(crate) const NUM_INSERT_THREADS: usize = 2;
pub(crate) type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;

View File

@ -1,11 +1,14 @@
use super::*;
use solana_entry::entry::Entry;
use solana_ledger::shred::Shredder;
use solana_runtime::blockhash_queue::BlockhashQueue;
use solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
system_transaction,
use {
super::*,
crate::cluster_nodes::ClusterNodesCache,
solana_entry::entry::Entry,
solana_ledger::shred::Shredder,
solana_runtime::blockhash_queue::BlockhashQueue,
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
system_transaction,
},
};
pub const MINIMUM_DUPLICATE_SLOT: Slot = 20;
@ -32,10 +35,15 @@ pub(super) struct BroadcastDuplicatesRun {
recent_blockhash: Option<Hash>,
prev_entry_hash: Option<Hash>,
num_slots_broadcasted: usize,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
}
impl BroadcastDuplicatesRun {
pub(super) fn new(shred_version: u16, config: BroadcastDuplicatesConfig) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
Self {
config,
duplicate_queue: BlockhashQueue::default(),
@ -47,6 +55,7 @@ impl BroadcastDuplicatesRun {
recent_blockhash: None,
prev_entry_hash: None,
num_slots_broadcasted: 0,
cluster_nodes_cache,
}
}
}
@ -220,11 +229,8 @@ impl BroadcastRun for BroadcastDuplicatesRun {
) -> Result<()> {
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
let root_bank = bank_forks.read().unwrap().root_bank();
let epoch = root_bank.get_leader_schedule_epoch(slot);
let stakes = root_bank.epoch_staked_nodes(epoch);
// Broadcast data
let cluster_nodes =
ClusterNodes::<BroadcastStage>::new(cluster_info, &stakes.unwrap_or_default());
let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info);
broadcast_shreds(
sock,
&shreds,

View File

@ -1,9 +1,10 @@
use super::*;
use crate::cluster_nodes::ClusterNodes;
use solana_ledger::shred::Shredder;
use solana_sdk::hash::Hash;
use solana_sdk::signature::Keypair;
use std::{thread::sleep, time::Duration};
use {
super::*,
crate::cluster_nodes::ClusterNodesCache,
solana_ledger::shred::Shredder,
solana_sdk::{hash::Hash, signature::Keypair},
std::{thread::sleep, time::Duration},
};
pub const NUM_BAD_SLOTS: u64 = 10;
pub const SLOT_TO_RESOLVE: u64 = 32;
@ -14,15 +15,21 @@ pub(super) struct FailEntryVerificationBroadcastRun {
good_shreds: Vec<Shred>,
current_slot: Slot,
next_shred_index: u32,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
}
impl FailEntryVerificationBroadcastRun {
pub(super) fn new(shred_version: u16) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
Self {
shred_version,
good_shreds: vec![],
current_slot: 0,
next_shred_index: 0,
cluster_nodes_cache,
}
}
}
@ -132,11 +139,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
) -> Result<()> {
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
let root_bank = bank_forks.read().unwrap().root_bank();
let epoch = root_bank.get_leader_schedule_epoch(slot);
let stakes = root_bank.epoch_staked_nodes(epoch);
// Broadcast data
let cluster_nodes =
ClusterNodes::<BroadcastStage>::new(cluster_info, &stakes.unwrap_or_default());
let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info);
broadcast_shreds(
sock,
&shreds,

View File

@ -5,7 +5,9 @@ use {
broadcast_utils::{self, ReceiveResults},
*,
},
crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes},
crate::{
broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodesCache,
},
solana_entry::entry::Entry,
solana_ledger::shred::{
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK,
@ -29,12 +31,16 @@ pub struct StandardBroadcastRun {
shred_version: u16,
last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize,
cluster_nodes: Arc<RwLock<ClusterNodes<BroadcastStage>>>,
cluster_nodes_cache: Arc<ClusterNodesCache<BroadcastStage>>,
last_peer_update: Arc<AtomicInterval>,
}
impl StandardBroadcastRun {
pub(super) fn new(shred_version: u16) -> Self {
let cluster_nodes_cache = Arc::new(ClusterNodesCache::<BroadcastStage>::new(
CLUSTER_NODES_CACHE_NUM_EPOCH_CAP,
CLUSTER_NODES_CACHE_TTL,
));
Self {
process_shreds_stats: ProcessShredsStats::default(),
transmit_shreds_stats: Arc::default(),
@ -45,7 +51,7 @@ impl StandardBroadcastRun {
shred_version,
last_datapoint_submit: Arc::default(),
num_batches: 0,
cluster_nodes: Arc::default(),
cluster_nodes_cache,
last_peer_update: Arc::new(AtomicInterval::default()),
}
}
@ -342,12 +348,8 @@ impl StandardBroadcastRun {
// Get the list of peers to broadcast to
let mut get_peers_time = Measure::start("broadcast::get_peers");
let root_bank = bank_forks.read().unwrap().root_bank();
let epoch = root_bank.get_leader_schedule_epoch(slot);
let stakes = root_bank.epoch_staked_nodes(epoch);
*self.cluster_nodes.write().unwrap() =
ClusterNodes::<BroadcastStage>::new(cluster_info, &stakes.unwrap_or_default());
let cluster_nodes = self.cluster_nodes_cache.get(slot, &root_bank, cluster_info);
get_peers_time.stop();
let cluster_nodes = self.cluster_nodes.read().unwrap();
let mut transmit_stats = TransmitShredsStats::default();
// Broadcast the shreds
@ -363,7 +365,6 @@ impl StandardBroadcastRun {
bank_forks,
cluster_info.socket_addr_space(),
)?;
drop(cluster_nodes);
transmit_time.stop();
transmit_stats.transmit_elapsed = transmit_time.as_us();