Write helper for multithread update (#18808)

This commit is contained in:
sakridge 2021-07-28 18:16:36 -07:00 committed by GitHub
parent 4d0cd9b283
commit 84e78316b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 86 additions and 111 deletions

View File

@ -16,12 +16,16 @@ use solana_ledger::{
shred::Shred, shred::Shred,
}; };
use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{pubkey, signature::Keypair, timing::timestamp}; use solana_sdk::{
pubkey,
signature::Keypair,
timing::{timestamp, AtomicInterval},
};
use solana_streamer::socket::SocketAddrSpace; use solana_streamer::socket::SocketAddrSpace;
use std::{ use std::{
collections::HashMap, collections::HashMap,
net::UdpSocket, net::UdpSocket,
sync::{atomic::AtomicU64, Arc, RwLock}, sync::{Arc, RwLock},
}; };
use test::Bencher; use test::Bencher;
@ -54,7 +58,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
let cluster_info = Arc::new(cluster_info); let cluster_info = Arc::new(cluster_info);
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes); let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
let shreds = Arc::new(shreds); let shreds = Arc::new(shreds);
let last_datapoint = Arc::new(AtomicU64::new(0)); let last_datapoint = Arc::new(AtomicInterval::default());
bencher.iter(move || { bencher.iter(move || {
let shreds = shreds.clone(); let shreds = shreds.clone();
broadcast_shreds( broadcast_shreds(

View File

@ -37,7 +37,7 @@ use solana_sdk::{
sanitized_transaction::SanitizedTransaction, sanitized_transaction::SanitizedTransaction,
short_vec::decode_shortu16_len, short_vec::decode_shortu16_len,
signature::Signature, signature::Signature,
timing::{duration_as_ms, timestamp}, timing::{duration_as_ms, timestamp, AtomicInterval},
transaction::{self, Transaction, TransactionError}, transaction::{self, Transaction, TransactionError},
}; };
use solana_transaction_status::token_balances::{ use solana_transaction_status::token_balances::{
@ -79,7 +79,7 @@ const DEFAULT_LRU_SIZE: usize = 200_000;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct BankingStageStats { pub struct BankingStageStats {
last_report: AtomicU64, last_report: AtomicInterval,
id: u32, id: u32,
process_packets_count: AtomicUsize, process_packets_count: AtomicUsize,
new_tx_count: AtomicUsize, new_tx_count: AtomicUsize,
@ -115,19 +115,7 @@ impl BankingStageStats {
} }
fn report(&self, report_interval_ms: u64) { fn report(&self, report_interval_ms: u64) {
let should_report = { if self.last_report.should_update(report_interval_ms) {
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
now.saturating_sub(last) > report_interval_ms
&& self.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
};
if should_report {
datapoint_info!( datapoint_info!(
"banking_stage-loop-stats", "banking_stage-loop-stats",
("id", self.id as i64, i64), ("id", self.id as i64, i64),

View File

@ -21,13 +21,12 @@ use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
use solana_poh::poh_recorder::WorkingBankEntry; use solana_poh::poh_recorder::WorkingBankEntry;
use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::timing::timestamp; use solana_sdk::timing::{timestamp, AtomicInterval};
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair};
use solana_streamer::{ use solana_streamer::{
sendmmsg::{batch_send, SendPktsError}, sendmmsg::{batch_send, SendPktsError},
socket::SocketAddrSpace, socket::SocketAddrSpace,
}; };
use std::sync::atomic::AtomicU64;
use std::{ use std::{
collections::HashMap, collections::HashMap,
net::UdpSocket, net::UdpSocket,
@ -378,14 +377,9 @@ impl BroadcastStage {
fn update_peer_stats( fn update_peer_stats(
num_live_peers: i64, num_live_peers: i64,
broadcast_len: i64, broadcast_len: i64,
last_datapoint_submit: &Arc<AtomicU64>, last_datapoint_submit: &Arc<AtomicInterval>,
) { ) {
let now = timestamp(); if last_datapoint_submit.should_update(1000) {
let last = last_datapoint_submit.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 1000
&& last_datapoint_submit.compare_and_swap(last, now, Ordering::Relaxed) == last
{
datapoint_info!( datapoint_info!(
"cluster_info-num_nodes", "cluster_info-num_nodes",
("live_count", num_live_peers, i64), ("live_count", num_live_peers, i64),
@ -400,7 +394,7 @@ pub fn broadcast_shreds(
s: &UdpSocket, s: &UdpSocket,
shreds: &[Shred], shreds: &[Shred],
cluster_nodes: &ClusterNodes<BroadcastStage>, cluster_nodes: &ClusterNodes<BroadcastStage>,
last_datapoint_submit: &Arc<AtomicU64>, last_datapoint_submit: &Arc<AtomicInterval>,
transmit_stats: &mut TransmitShredsStats, transmit_stats: &mut TransmitShredsStats,
self_pubkey: Pubkey, self_pubkey: Pubkey,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,

View File

@ -291,7 +291,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
sock, sock,
&shreds, &shreds,
&cluster_nodes, &cluster_nodes,
&Arc::new(AtomicU64::new(0)), &Arc::new(AtomicInterval::default()),
&mut TransmitShredsStats::default(), &mut TransmitShredsStats::default(),
cluster_info.id(), cluster_info.id(),
bank_forks, bank_forks,

View File

@ -143,7 +143,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
sock, sock,
&shreds, &shreds,
&cluster_nodes, &cluster_nodes,
&Arc::new(AtomicU64::new(0)), &Arc::new(AtomicInterval::default()),
&mut TransmitShredsStats::default(), &mut TransmitShredsStats::default(),
cluster_info.id(), cluster_info.id(),
bank_forks, bank_forks,

View File

@ -9,7 +9,11 @@ use solana_entry::entry::Entry;
use solana_ledger::shred::{ use solana_ledger::shred::{
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_TICK_REFERENCE_MASK, ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_TICK_REFERENCE_MASK,
}; };
use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; use solana_sdk::{
pubkey::Pubkey,
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
};
use std::{collections::HashMap, sync::RwLock, time::Duration}; use std::{collections::HashMap, sync::RwLock, time::Duration};
#[derive(Clone)] #[derive(Clone)]
@ -21,10 +25,10 @@ pub struct StandardBroadcastRun {
current_slot_and_parent: Option<(u64, u64)>, current_slot_and_parent: Option<(u64, u64)>,
slot_broadcast_start: Option<Instant>, slot_broadcast_start: Option<Instant>,
shred_version: u16, shred_version: u16,
last_datapoint_submit: Arc<AtomicU64>, last_datapoint_submit: Arc<AtomicInterval>,
num_batches: usize, num_batches: usize,
cluster_nodes: Arc<RwLock<ClusterNodes<BroadcastStage>>>, cluster_nodes: Arc<RwLock<ClusterNodes<BroadcastStage>>>,
last_peer_update: Arc<AtomicU64>, last_peer_update: Arc<AtomicInterval>,
} }
impl StandardBroadcastRun { impl StandardBroadcastRun {
@ -40,7 +44,7 @@ impl StandardBroadcastRun {
last_datapoint_submit: Arc::default(), last_datapoint_submit: Arc::default(),
num_batches: 0, num_batches: 0,
cluster_nodes: Arc::default(), cluster_nodes: Arc::default(),
last_peer_update: Arc::default(), last_peer_update: Arc::new(AtomicInterval::default()),
} }
} }
@ -338,14 +342,9 @@ impl StandardBroadcastRun {
trace!("Broadcasting {:?} shreds", shreds.len()); trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to // Get the list of peers to broadcast to
let mut get_peers_time = Measure::start("broadcast::get_peers"); let mut get_peers_time = Measure::start("broadcast::get_peers");
let now = timestamp(); if self
let last = self.last_peer_update.load(Ordering::Relaxed); .last_peer_update
#[allow(deprecated)] .should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false)
if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS
&& self
.last_peer_update
.compare_and_swap(last, now, Ordering::Relaxed)
== last
{ {
*self.cluster_nodes.write().unwrap() = ClusterNodes::<BroadcastStage>::new( *self.cluster_nodes.write().unwrap() = ClusterNodes::<BroadcastStage>::new(
cluster_info, cluster_info,

View File

@ -23,7 +23,12 @@ use solana_metrics::inc_new_counter_error;
use solana_perf::packet::{Packet, Packets}; use solana_perf::packet::{Packet, Packets};
use solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}; use solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions};
use solana_runtime::{bank::Bank, bank_forks::BankForks}; use solana_runtime::{bank::Bank, bank_forks::BankForks};
use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; use solana_sdk::{
clock::Slot,
epoch_schedule::EpochSchedule,
pubkey::Pubkey,
timing::{timestamp, AtomicInterval},
};
use solana_streamer::streamer::PacketReceiver; use solana_streamer::streamer::PacketReceiver;
use std::{ use std::{
collections::hash_set::HashSet, collections::hash_set::HashSet,
@ -56,7 +61,7 @@ struct RetransmitStats {
repair_total: AtomicU64, repair_total: AtomicU64,
discard_total: AtomicU64, discard_total: AtomicU64,
retransmit_total: AtomicU64, retransmit_total: AtomicU64,
last_ts: AtomicU64, last_ts: AtomicInterval,
compute_turbine_peers_total: AtomicU64, compute_turbine_peers_total: AtomicU64,
retransmit_tree_mismatch: AtomicU64, retransmit_tree_mismatch: AtomicU64,
packets_by_slot: Mutex<BTreeMap<Slot, usize>>, packets_by_slot: Mutex<BTreeMap<Slot, usize>>,
@ -116,12 +121,7 @@ fn update_retransmit_stats(
} }
} }
let now = timestamp(); if stats.last_ts.should_update(2000) {
let last = stats.last_ts.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 2000
&& stats.last_ts.compare_and_swap(last, now, Ordering::Relaxed) == last
{
datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64));
datapoint_info!( datapoint_info!(
"retransmit-stage", "retransmit-stage",
@ -279,7 +279,7 @@ fn retransmit(
id: u32, id: u32,
stats: &RetransmitStats, stats: &RetransmitStats,
cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>, cluster_nodes: &RwLock<ClusterNodes<RetransmitStage>>,
last_peer_update: &AtomicU64, last_peer_update: &AtomicInterval,
shreds_received: &Mutex<ShredFilterAndHasher>, shreds_received: &Mutex<ShredFilterAndHasher>,
max_slots: &MaxSlots, max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>, first_shreds_received: &Mutex<BTreeSet<Slot>>,
@ -309,12 +309,7 @@ fn retransmit(
epoch_fetch.stop(); epoch_fetch.stop();
let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update");
let now = timestamp(); if last_peer_update.should_update_ext(1000, false) {
let last = last_peer_update.load(Ordering::Relaxed);
#[allow(deprecated)]
if now.saturating_sub(last) > 1000
&& last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last
{
let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch); let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch);
*cluster_nodes.write().unwrap() = ClusterNodes::<RetransmitStage>::new( *cluster_nodes.write().unwrap() = ClusterNodes::<RetransmitStage>::new(
cluster_info, cluster_info,
@ -474,7 +469,7 @@ pub fn retransmitter(
let cluster_info = cluster_info.clone(); let cluster_info = cluster_info.clone();
let stats = stats.clone(); let stats = stats.clone();
let cluster_nodes = Arc::default(); let cluster_nodes = Arc::default();
let last_peer_update = Arc::new(AtomicU64::new(0)); let last_peer_update = Arc::new(AtomicInterval::default());
let shreds_received = shreds_received.clone(); let shreds_received = shreds_received.clone();
let max_slots = max_slots.clone(); let max_slots = max_slots.clone();
let first_shreds_received = first_shreds_received.clone(); let first_shreds_received = first_shreds_received.clone();
@ -661,6 +656,7 @@ mod tests {
let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap(); let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap();
// need to make sure tvu and tpu are valid addresses // need to make sure tvu and tpu are valid addresses
me.tvu_forwards = me_retransmit.local_addr().unwrap(); me.tvu_forwards = me_retransmit.local_addr().unwrap();
let port = find_available_port_in_range(ip_addr, (8000, 10000)).unwrap(); let port = find_available_port_in_range(ip_addr, (8000, 10000)).unwrap();
me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port)) me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port))
.unwrap() .unwrap()

View File

@ -52,6 +52,7 @@ use solana_sdk::{
genesis_config::ClusterType, genesis_config::ClusterType,
hash::{Hash, Hasher}, hash::{Hash, Hasher},
pubkey::Pubkey, pubkey::Pubkey,
timing::AtomicInterval,
}; };
use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY;
use std::{ use std::{
@ -977,7 +978,7 @@ struct AccountsStats {
delta_hash_accumulate_time_total_us: AtomicU64, delta_hash_accumulate_time_total_us: AtomicU64,
delta_hash_num: AtomicU64, delta_hash_num: AtomicU64,
last_store_report: AtomicU64, last_store_report: AtomicInterval,
store_hash_accounts: AtomicU64, store_hash_accounts: AtomicU64,
calc_stored_meta: AtomicU64, calc_stored_meta: AtomicU64,
store_accounts: AtomicU64, store_accounts: AtomicU64,
@ -997,7 +998,7 @@ struct AccountsStats {
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct PurgeStats { struct PurgeStats {
last_report: AtomicU64, last_report: AtomicInterval,
safety_checks_elapsed: AtomicU64, safety_checks_elapsed: AtomicU64,
remove_cache_elapsed: AtomicU64, remove_cache_elapsed: AtomicU64,
remove_storage_entries_elapsed: AtomicU64, remove_storage_entries_elapsed: AtomicU64,
@ -1016,18 +1017,7 @@ struct PurgeStats {
impl PurgeStats { impl PurgeStats {
fn report(&self, metric_name: &'static str, report_interval_ms: Option<u64>) { fn report(&self, metric_name: &'static str, report_interval_ms: Option<u64>) {
let should_report = report_interval_ms let should_report = report_interval_ms
.map(|report_interval_ms| { .map(|report_interval_ms| self.last_report.should_update(report_interval_ms))
let last = self.last_report.load(Ordering::Relaxed);
let now = solana_sdk::timing::timestamp();
now.saturating_sub(last) > report_interval_ms
&& self.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
&& last != 0
})
.unwrap_or(true); .unwrap_or(true);
if should_report { if should_report {
@ -1201,7 +1191,7 @@ impl CleanAccountsStats {
#[derive(Debug, Default)] #[derive(Debug, Default)]
struct ShrinkStats { struct ShrinkStats {
last_report: AtomicU64, last_report: AtomicInterval,
num_slots_shrunk: AtomicUsize, num_slots_shrunk: AtomicUsize,
storage_read_elapsed: AtomicU64, storage_read_elapsed: AtomicU64,
index_read_elapsed: AtomicU64, index_read_elapsed: AtomicU64,
@ -1222,20 +1212,7 @@ struct ShrinkStats {
impl ShrinkStats { impl ShrinkStats {
fn report(&self) { fn report(&self) {
let last = self.last_report.load(Ordering::Relaxed); if self.last_report.should_update(1000) {
let now = solana_sdk::timing::timestamp();
// last is initialized to 0 by ::default()
// thus, the first 'report' call would always log.
// Instead, the first call now initialializes 'last_report' to now.
let is_first_call = last == 0;
let should_report = now.saturating_sub(last) > 1000
&& self
.last_report
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
== Ok(last);
if !is_first_call && should_report {
datapoint_info!( datapoint_info!(
"shrink_stats", "shrink_stats",
( (
@ -5512,17 +5489,7 @@ impl AccountsDb {
} }
fn report_store_timings(&self) { fn report_store_timings(&self) {
let last = self.stats.last_store_report.load(Ordering::Relaxed); if self.stats.last_store_report.should_update(1000) {
let now = solana_sdk::timing::timestamp();
if now.saturating_sub(last) > 1000
&& self.stats.last_store_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last)
{
let (read_only_cache_hits, read_only_cache_misses) = let (read_only_cache_hits, read_only_cache_misses) =
self.read_only_accounts_cache.get_and_reset_stats(); self.read_only_accounts_cache.get_and_reset_stats();
datapoint_info!( datapoint_info!(

View File

@ -1,5 +1,5 @@
use dashmap::{mapref::entry::Entry::Occupied, DashMap}; use dashmap::{mapref::entry::Entry::Occupied, DashMap};
use solana_sdk::pubkey::Pubkey; use solana_sdk::{pubkey::Pubkey, timing::AtomicInterval};
use std::{ use std::{
collections::HashSet, collections::HashSet,
fmt::Debug, fmt::Debug,
@ -26,7 +26,7 @@ pub trait SecondaryIndexEntry: Debug {
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct SecondaryIndexStats { pub struct SecondaryIndexStats {
last_report: AtomicU64, last_report: AtomicInterval,
num_inner_keys: AtomicU64, num_inner_keys: AtomicU64,
} }
@ -142,17 +142,7 @@ impl<SecondaryIndexEntryType: SecondaryIndexEntry + Default + Sync + Send>
} }
} }
let now = solana_sdk::timing::timestamp(); if self.stats.last_report.should_update(1000) {
let last = self.stats.last_report.load(Ordering::Relaxed);
let should_report = now.saturating_sub(last) > 1000
&& self.stats.last_report.compare_exchange(
last,
now,
Ordering::Relaxed,
Ordering::Relaxed,
) == Ok(last);
if should_report {
datapoint_info!( datapoint_info!(
self.metrics_name, self.metrics_name,
("num_secondary_keys", self.index.len() as i64, i64), ("num_secondary_keys", self.index.len() as i64, i64),

View File

@ -1,5 +1,6 @@
//! The `timing` module provides std::time utility functions. //! The `timing` module provides std::time utility functions.
use crate::unchecked_div_by_const; use crate::unchecked_div_by_const;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub fn duration_as_ns(d: &Duration) -> u64 { pub fn duration_as_ns(d: &Duration) -> u64 {
@ -59,10 +60,46 @@ pub fn slot_duration_from_slots_per_year(slots_per_year: f64) -> Duration {
Duration::from_nanos(slot_in_ns as u64) Duration::from_nanos(slot_in_ns as u64)
} }
#[derive(Debug, Default)]
pub struct AtomicInterval {
last_update: AtomicU64,
}
impl AtomicInterval {
pub fn should_update(&self, interval_time: u64) -> bool {
self.should_update_ext(interval_time, true)
}
pub fn should_update_ext(&self, interval_time: u64, skip_first: bool) -> bool {
let now = timestamp();
let last = self.last_update.load(Ordering::Relaxed);
now.saturating_sub(last) > interval_time
&& self
.last_update
.compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed)
== Ok(last)
&& !(skip_first && last == 0)
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
#[test]
fn test_interval_update() {
solana_logger::setup();
let i = AtomicInterval::default();
assert!(!i.should_update(1000));
let i = AtomicInterval::default();
assert!(i.should_update_ext(1000, false));
std::thread::sleep(Duration::from_millis(10));
assert!(i.should_update(9));
assert!(!i.should_update(100));
}
#[test] #[test]
#[allow(clippy::float_cmp)] #[allow(clippy::float_cmp)]
fn test_years_as_slots() { fn test_years_as_slots() {