shares cluster-nodes between retransmit threads (#18947)

cluster_nodes and last_peer_update are not shared between retransmit
threads, as each thread have its own value:
https://github.com/solana-labs/solana/blob/65ccfed86/core/src/retransmit_stage.rs#L476-L477

Additionally, with shared references, this code:
https://github.com/solana-labs/solana/blob/0167daa11/core/src/retransmit_stage.rs#L315-L328
has a concurrency bug where the thread which does compare_and_swap,
updates cluster_nodes much later after other threads have run with
outdated cluster_nodes for a while. In particular, the write-lock there
may block.
This commit is contained in:
behzad nouri 2021-07-29 16:20:15 +00:00 committed by GitHub
parent 1f788781dd
commit d06dc6c8a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 136 additions and 92 deletions

View File

@ -3,34 +3,42 @@
extern crate solana_core; extern crate solana_core;
extern crate test; extern crate test;
use log::*; use {
use solana_core::retransmit_stage::retransmitter; log::*,
use solana_entry::entry::Entry; solana_core::retransmit_stage::retransmitter,
use solana_gossip::cluster_info::{ClusterInfo, Node}; solana_entry::entry::Entry,
use solana_gossip::contact_info::ContactInfo; solana_gossip::{
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}; cluster_info::{ClusterInfo, Node},
use solana_ledger::leader_schedule_cache::LeaderScheduleCache; contact_info::ContactInfo,
use solana_ledger::shred::Shredder; },
use solana_measure::measure::Measure; solana_ledger::{
use solana_perf::packet::{Packet, Packets}; genesis_utils::{create_genesis_config, GenesisConfigInfo},
use solana_rpc::max_slots::MaxSlots; leader_schedule_cache::LeaderScheduleCache,
use solana_runtime::bank::Bank; shred::Shredder,
use solana_runtime::bank_forks::BankForks; },
use solana_sdk::hash::Hash; solana_measure::measure::Measure,
use solana_sdk::pubkey; solana_perf::packet::{Packet, Packets},
use solana_sdk::signature::{Keypair, Signer}; solana_runtime::{bank::Bank, bank_forks::BankForks},
use solana_sdk::system_transaction; solana_sdk::{
use solana_sdk::timing::timestamp; hash::Hash,
use solana_streamer::socket::SocketAddrSpace; pubkey,
use std::net::UdpSocket; signature::{Keypair, Signer},
use std::sync::atomic::{AtomicUsize, Ordering}; system_transaction,
use std::sync::mpsc::channel; timing::timestamp,
use std::sync::Mutex; },
use std::sync::{Arc, RwLock}; solana_streamer::socket::SocketAddrSpace,
use std::thread::sleep; std::{
use std::thread::Builder; net::UdpSocket,
use std::time::Duration; sync::{
use test::Bencher; atomic::{AtomicUsize, Ordering},
mpsc::channel,
Arc, Mutex, RwLock,
},
thread::{sleep, Builder},
time::Duration,
},
test::Bencher,
};
#[bench] #[bench]
#[allow(clippy::same_item_push)] #[allow(clippy::same_item_push)]
@ -102,7 +110,7 @@ fn bench_retransmitter(bencher: &mut Bencher) {
&leader_schedule_cache, &leader_schedule_cache,
cluster_info, cluster_info,
packet_receiver, packet_receiver,
&Arc::new(MaxSlots::default()), Arc::default(), // solana_rpc::max_slots::MaxSlots
None, None,
); );

View File

@ -1,47 +1,54 @@
//! The `retransmit_stage` retransmits shreds between validators //! The `retransmit_stage` retransmits shreds between validators
#![allow(clippy::rc_buffer)] #![allow(clippy::rc_buffer)]
use crate::{ use {
ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, crate::{
cluster_info_vote_listener::VerifiedVoteReceiver, ancestor_hashes_service::AncestorHashesReplayUpdateReceiver,
cluster_nodes::ClusterNodes, cluster_info_vote_listener::VerifiedVoteReceiver,
cluster_slots::ClusterSlots, cluster_nodes::ClusterNodes,
cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver}, cluster_slots::ClusterSlots,
completed_data_sets_service::CompletedDataSetsSender, cluster_slots_service::{ClusterSlotsService, ClusterSlotsUpdateReceiver},
repair_service::{DuplicateSlotsResetSender, RepairInfo}, completed_data_sets_service::CompletedDataSetsSender,
result::{Error, Result}, packet_hasher::PacketHasher,
window_service::{should_retransmit_and_persist, WindowService}, repair_service::{DuplicateSlotsResetSender, RepairInfo},
}; result::{Error, Result},
use crossbeam_channel::{Receiver, Sender}; window_service::{should_retransmit_and_persist, WindowService},
use lru::LruCache; },
use solana_client::rpc_response::SlotUpdate; crossbeam_channel::{Receiver, Sender},
use solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}; lru::LruCache,
use solana_ledger::shred::{get_shred_slot_index_type, ShredFetchStats}; solana_client::rpc_response::SlotUpdate,
use solana_ledger::{blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache}; solana_gossip::cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
use solana_measure::measure::Measure; solana_ledger::{
use solana_metrics::inc_new_counter_error; shred::{get_shred_slot_index_type, ShredFetchStats},
use solana_perf::packet::{Packet, Packets}; {blockstore::Blockstore, leader_schedule_cache::LeaderScheduleCache},
use solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}; },
use solana_runtime::{bank::Bank, bank_forks::BankForks}; solana_measure::measure::Measure,
use solana_sdk::{ solana_metrics::inc_new_counter_error,
clock::Slot, solana_perf::packet::{Packet, Packets},
epoch_schedule::EpochSchedule, solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions},
pubkey::Pubkey, solana_runtime::{bank::Bank, bank_forks::BankForks},
timing::{timestamp, AtomicInterval}, solana_sdk::{
}; clock::{Epoch, Slot},
use solana_streamer::streamer::PacketReceiver; epoch_schedule::EpochSchedule,
use std::{ pubkey::Pubkey,
collections::hash_set::HashSet, timing::{timestamp, AtomicInterval},
collections::{BTreeMap, BTreeSet, HashMap}, },
net::UdpSocket, solana_streamer::streamer::PacketReceiver,
ops::{Deref, DerefMut}, std::{
sync::atomic::{AtomicBool, AtomicU64, Ordering}, collections::{
sync::mpsc::channel, hash_set::HashSet,
sync::mpsc::RecvTimeoutError, {BTreeMap, BTreeSet, HashMap},
sync::Mutex, },
sync::{Arc, RwLock}, net::UdpSocket,
thread::{self, Builder, JoinHandle}, ops::DerefMut,
time::Duration, sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
mpsc::{channel, RecvTimeoutError},
Arc, Mutex, RwLock,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
}; };
const MAX_DUPLICATE_COUNT: usize = 2; const MAX_DUPLICATE_COUNT: usize = 2;
@ -209,7 +216,6 @@ fn update_retransmit_stats(
} }
} }
use crate::packet_hasher::PacketHasher;
// Map of shred (slot, index, is_data) => list of hash values seen for that key. // Map of shred (slot, index, is_data) => list of hash values seen for that key.
pub type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>; pub type ShredFilter = LruCache<(Slot, u32, bool), Vec<u64>>;
@ -269,6 +275,38 @@ fn check_if_first_shred_received(
} }
} }
fn maybe_update_peers_cache(
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
shreds_received: &Mutex<ShredFilterAndHasher>,
last_peer_update: &AtomicU64,
cluster_info: &ClusterInfo,
bank_epoch: Epoch,
working_bank: &Bank,
) {
const UPDATE_INTERVAL_MS: u64 = 1000;
if timestamp().saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS {
return;
}
{
// Write-lock cluster-nodes here so that only one thread does the
// computations to update peers.
let mut cluster_nodes = cluster_nodes.write().unwrap();
let now = timestamp();
if now.saturating_sub(last_peer_update.load(Ordering::Acquire)) < UPDATE_INTERVAL_MS {
return; // Some other thread has already done the update.
}
let epoch_staked_nodes = working_bank
.epoch_staked_nodes(bank_epoch)
.unwrap_or_default();
*cluster_nodes = ClusterNodes::<RetransmitStage>::new(cluster_info, &epoch_staked_nodes);
last_peer_update.store(now, Ordering::Release);
}
let mut shreds_received = shreds_received.lock().unwrap();
let (cache, hasher) = shreds_received.deref_mut();
cache.clear();
hasher.reset();
}
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn retransmit( fn retransmit(
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
@ -279,7 +317,7 @@ fn retransmit(
id: u32, id: u32,
stats: &RetransmitStats, stats: &RetransmitStats,
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>, cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
last_peer_update: &AtomicInterval, last_peer_update: &AtomicU64,
shreds_received: &Mutex<ShredFilterAndHasher>, shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots, max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>, first_shreds_received: &Mutex<BTreeSet<Slot>>,
@ -301,26 +339,22 @@ fn retransmit(
drop(r_lock); drop(r_lock);
let mut epoch_fetch = Measure::start("retransmit_epoch_fetch"); let mut epoch_fetch = Measure::start("retransmit_epoch_fetch");
let (r_bank, root_bank) = { let (working_bank, root_bank) = {
let bank_forks = bank_forks.read().unwrap(); let bank_forks = bank_forks.read().unwrap();
(bank_forks.working_bank(), bank_forks.root_bank()) (bank_forks.working_bank(), bank_forks.root_bank())
}; };
let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot()); let bank_epoch = working_bank.get_leader_schedule_epoch(working_bank.slot());
epoch_fetch.stop(); epoch_fetch.stop();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
if last_peer_update.should_update_ext(1000, false) { maybe_update_peers_cache(
let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch); cluster_nodes,
*cluster_nodes.write().unwrap() = ClusterNodes::<RetransmitStage>::new( shreds_received,
cluster_info, last_peer_update,
&epoch_staked_nodes.unwrap_or_default(), cluster_info,
); bank_epoch,
{ &working_bank,
let mut sr = shreds_received.lock().unwrap(); );
sr.0.clear();
sr.1.reset();
}
}
let cluster_nodes = cluster_nodes.read().unwrap(); let cluster_nodes = cluster_nodes.read().unwrap();
let mut peers_len = 0; let mut peers_len = 0;
epoch_cache_update.stop(); epoch_cache_update.stop();
@ -363,7 +397,7 @@ fn retransmit(
} }
let mut compute_turbine_peers = Measure::start("turbine_start"); let mut compute_turbine_peers = Measure::start("turbine_start");
let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(r_bank.deref())); let slot_leader = leader_schedule_cache.slot_leader_at(shred_slot, Some(&working_bank));
let (neighbors, children) = let (neighbors, children) =
cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader); cluster_nodes.get_retransmit_peers(packet.meta.seed, DATA_PLANE_FANOUT, slot_leader);
// If the node is on the critical path (i.e. the first node in each // If the node is on the critical path (i.e. the first node in each
@ -451,9 +485,11 @@ pub fn retransmitter(
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
r: Arc<Mutex<PacketReceiver>>, r: Arc<Mutex<PacketReceiver>>,
max_slots: &Arc<MaxSlots>, max_slots: Arc<MaxSlots>,
rpc_subscriptions: Option<Arc<RpcSubscriptions>>, rpc_subscriptions: Option<Arc<RpcSubscriptions>>,
) -> Vec<JoinHandle<()>> { ) -> Vec<JoinHandle<()>> {
let cluster_nodes = Arc::default();
let last_peer_update = Arc::default();
let stats = Arc::new(RetransmitStats::default()); let stats = Arc::new(RetransmitStats::default());
let shreds_received = Arc::new(Mutex::new(( let shreds_received = Arc::new(Mutex::new((
LruCache::new(DEFAULT_LRU_SIZE), LruCache::new(DEFAULT_LRU_SIZE),
@ -468,8 +504,8 @@ pub fn retransmitter(
let r = r.clone(); let r = r.clone();
let cluster_info = cluster_info.clone(); let cluster_info = cluster_info.clone();
let stats = stats.clone(); let stats = stats.clone();
let cluster_nodes = Arc::default(); let cluster_nodes = Arc::clone(&cluster_nodes);
let last_peer_update = Arc::new(AtomicInterval::default()); let last_peer_update = Arc::clone(&last_peer_update);
let shreds_received = shreds_received.clone(); let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone(); let max_slots = max_slots.clone();
let first_shreds_received = first_shreds_received.clone(); let first_shreds_received = first_shreds_received.clone();
@ -552,7 +588,7 @@ impl RetransmitStage {
leader_schedule_cache, leader_schedule_cache,
cluster_info.clone(), cluster_info.clone(),
retransmit_receiver, retransmit_receiver,
max_slots, Arc::clone(max_slots),
rpc_subscriptions, rpc_subscriptions,
); );
@ -686,7 +722,7 @@ mod tests {
&leader_schedule_cache, &leader_schedule_cache,
cluster_info, cluster_info,
Arc::new(Mutex::new(retransmit_receiver)), Arc::new(Mutex::new(retransmit_receiver)),
&Arc::new(MaxSlots::default()), Arc::default(), // MaxSlots
None, None,
); );