diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index fe931db5b8..22eec5c9da 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -16,12 +16,16 @@ use solana_ledger::{ shred::Shred, }; use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{pubkey, signature::Keypair, timing::timestamp}; +use solana_sdk::{ + pubkey, + signature::Keypair, + timing::{timestamp, AtomicInterval}, +}; use solana_streamer::socket::SocketAddrSpace; use std::{ collections::HashMap, net::UdpSocket, - sync::{atomic::AtomicU64, Arc, RwLock}, + sync::{Arc, RwLock}, }; use test::Bencher; @@ -54,7 +58,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { let cluster_info = Arc::new(cluster_info); let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); let shreds = Arc::new(shreds); - let last_datapoint = Arc::new(AtomicU64::new(0)); + let last_datapoint = Arc::new(AtomicInterval::default()); bencher.iter(move || { let shreds = shreds.clone(); broadcast_shreds( diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 979ca10b49..74b509bb12 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -37,7 +37,7 @@ use solana_sdk::{ sanitized_transaction::SanitizedTransaction, short_vec::decode_shortu16_len, signature::Signature, - timing::{duration_as_ms, timestamp}, + timing::{duration_as_ms, timestamp, AtomicInterval}, transaction::{self, Transaction, TransactionError}, }; use solana_transaction_status::token_balances::{ @@ -79,7 +79,7 @@ const DEFAULT_LRU_SIZE: usize = 200_000; #[derive(Debug, Default)] pub struct BankingStageStats { - last_report: AtomicU64, + last_report: AtomicInterval, id: u32, process_packets_count: AtomicUsize, new_tx_count: AtomicUsize, @@ -115,19 +115,7 @@ impl BankingStageStats { } fn report(&self, report_interval_ms: u64) { - let should_report = { - let last = self.last_report.load(Ordering::Relaxed); - let now = solana_sdk::timing::timestamp(); - now.saturating_sub(last) > report_interval_ms - && self.last_report.compare_exchange( - last, - now, - Ordering::Relaxed, - Ordering::Relaxed, - ) == Ok(last) - }; - - if should_report { + if self.last_report.should_update(report_interval_ms) { datapoint_info!( "banking_stage-loop-stats", ("id", self.id as i64, i64), diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index a50dd5c690..65714d35dc 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -21,13 +21,12 @@ use solana_measure::measure::Measure; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; use solana_poh::poh_recorder::WorkingBankEntry; use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::timing::timestamp; +use solana_sdk::timing::{timestamp, AtomicInterval}; use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}; use solana_streamer::{ sendmmsg::{batch_send, SendPktsError}, socket::SocketAddrSpace, }; -use std::sync::atomic::AtomicU64; use std::{ collections::HashMap, net::UdpSocket, @@ -378,14 +377,9 @@ impl BroadcastStage { fn update_peer_stats( num_live_peers: i64, broadcast_len: i64, - last_datapoint_submit: &Arc, + last_datapoint_submit: &Arc, ) { - let now = timestamp(); - let last = last_datapoint_submit.load(Ordering::Relaxed); - #[allow(deprecated)] - if now.saturating_sub(last) > 1000 - && last_datapoint_submit.compare_and_swap(last, now, Ordering::Relaxed) == last - { + if last_datapoint_submit.should_update(1000) { datapoint_info!( "cluster_info-num_nodes", ("live_count", num_live_peers, i64), @@ -400,7 +394,7 @@ pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], cluster_nodes: &ClusterNodes, - last_datapoint_submit: &Arc, + last_datapoint_submit: &Arc, transmit_stats: &mut TransmitShredsStats, self_pubkey: Pubkey, bank_forks: &Arc>, diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 3adb10d126..638563d62e 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -291,7 +291,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { sock, &shreds, &cluster_nodes, - &Arc::new(AtomicU64::new(0)), + &Arc::new(AtomicInterval::default()), &mut TransmitShredsStats::default(), cluster_info.id(), bank_forks, diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index edc3b8d058..80daf8befb 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -143,7 +143,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { sock, &shreds, &cluster_nodes, - &Arc::new(AtomicU64::new(0)), + &Arc::new(AtomicInterval::default()), &mut TransmitShredsStats::default(), cluster_info.id(), bank_forks, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index fd9ce574e1..529d7837f8 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -9,7 +9,11 @@ use solana_entry::entry::Entry; use solana_ledger::shred::{ ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_TICK_REFERENCE_MASK, }; -use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; +use solana_sdk::{ + pubkey::Pubkey, + signature::Keypair, + timing::{duration_as_us, AtomicInterval}, +}; use std::{collections::HashMap, sync::RwLock, time::Duration}; #[derive(Clone)] @@ -21,10 +25,10 @@ pub struct StandardBroadcastRun { current_slot_and_parent: Option<(u64, u64)>, slot_broadcast_start: Option, shred_version: u16, - last_datapoint_submit: Arc, + last_datapoint_submit: Arc, num_batches: usize, cluster_nodes: Arc>>, - last_peer_update: Arc, + last_peer_update: Arc, } impl StandardBroadcastRun { @@ -40,7 +44,7 @@ impl StandardBroadcastRun { last_datapoint_submit: Arc::default(), num_batches: 0, cluster_nodes: Arc::default(), - last_peer_update: Arc::default(), + last_peer_update: Arc::new(AtomicInterval::default()), } } @@ -338,14 +342,9 @@ impl StandardBroadcastRun { trace!("Broadcasting {:?} shreds", shreds.len()); // Get the list of peers to broadcast to let mut get_peers_time = Measure::start("broadcast::get_peers"); - let now = timestamp(); - let last = self.last_peer_update.load(Ordering::Relaxed); - #[allow(deprecated)] - if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS - && self - .last_peer_update - .compare_and_swap(last, now, Ordering::Relaxed) - == last + if self + .last_peer_update + .should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false) { *self.cluster_nodes.write().unwrap() = ClusterNodes::::new( cluster_info, diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 9964844cd8..1efd2ff21e 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -23,7 +23,12 @@ use solana_metrics::inc_new_counter_error; use solana_perf::packet::{Packet, Packets}; use solana_rpc::{max_slots::MaxSlots, rpc_subscriptions::RpcSubscriptions}; use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp}; +use solana_sdk::{ + clock::Slot, + epoch_schedule::EpochSchedule, + pubkey::Pubkey, + timing::{timestamp, AtomicInterval}, +}; use solana_streamer::streamer::PacketReceiver; use std::{ collections::hash_set::HashSet, @@ -56,7 +61,7 @@ struct RetransmitStats { repair_total: AtomicU64, discard_total: AtomicU64, retransmit_total: AtomicU64, - last_ts: AtomicU64, + last_ts: AtomicInterval, compute_turbine_peers_total: AtomicU64, retransmit_tree_mismatch: AtomicU64, packets_by_slot: Mutex>, @@ -116,12 +121,7 @@ fn update_retransmit_stats( } } - let now = timestamp(); - let last = stats.last_ts.load(Ordering::Relaxed); - #[allow(deprecated)] - if now.saturating_sub(last) > 2000 - && stats.last_ts.compare_and_swap(last, now, Ordering::Relaxed) == last - { + if stats.last_ts.should_update(2000) { datapoint_info!("retransmit-num_nodes", ("count", peers_len, i64)); datapoint_info!( "retransmit-stage", @@ -279,7 +279,7 @@ fn retransmit( id: u32, stats: &RetransmitStats, cluster_nodes: &RwLock>, - last_peer_update: &AtomicU64, + last_peer_update: &AtomicInterval, shreds_received: &Mutex, max_slots: &MaxSlots, first_shreds_received: &Mutex>, @@ -309,12 +309,7 @@ fn retransmit( epoch_fetch.stop(); let mut epoch_cache_update = Measure::start("retransmit_epoch_cach_update"); - let now = timestamp(); - let last = last_peer_update.load(Ordering::Relaxed); - #[allow(deprecated)] - if now.saturating_sub(last) > 1000 - && last_peer_update.compare_and_swap(last, now, Ordering::Relaxed) == last - { + if last_peer_update.should_update_ext(1000, false) { let epoch_staked_nodes = r_bank.epoch_staked_nodes(bank_epoch); *cluster_nodes.write().unwrap() = ClusterNodes::::new( cluster_info, @@ -474,7 +469,7 @@ pub fn retransmitter( let cluster_info = cluster_info.clone(); let stats = stats.clone(); let cluster_nodes = Arc::default(); - let last_peer_update = Arc::new(AtomicU64::new(0)); + let last_peer_update = Arc::new(AtomicInterval::default()); let shreds_received = shreds_received.clone(); let max_slots = max_slots.clone(); let first_shreds_received = first_shreds_received.clone(); @@ -661,6 +656,7 @@ mod tests { let me_retransmit = UdpSocket::bind(format!("127.0.0.1:{}", port)).unwrap(); // need to make sure tvu and tpu are valid addresses me.tvu_forwards = me_retransmit.local_addr().unwrap(); + let port = find_available_port_in_range(ip_addr, (8000, 10000)).unwrap(); me.tvu = UdpSocket::bind(format!("127.0.0.1:{}", port)) .unwrap() diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index f7428e773d..ad7cc6549c 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -52,6 +52,7 @@ use solana_sdk::{ genesis_config::ClusterType, hash::{Hash, Hasher}, pubkey::Pubkey, + timing::AtomicInterval, }; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ @@ -977,7 +978,7 @@ struct AccountsStats { delta_hash_accumulate_time_total_us: AtomicU64, delta_hash_num: AtomicU64, - last_store_report: AtomicU64, + last_store_report: AtomicInterval, store_hash_accounts: AtomicU64, calc_stored_meta: AtomicU64, store_accounts: AtomicU64, @@ -997,7 +998,7 @@ struct AccountsStats { #[derive(Debug, Default)] struct PurgeStats { - last_report: AtomicU64, + last_report: AtomicInterval, safety_checks_elapsed: AtomicU64, remove_cache_elapsed: AtomicU64, remove_storage_entries_elapsed: AtomicU64, @@ -1016,18 +1017,7 @@ struct PurgeStats { impl PurgeStats { fn report(&self, metric_name: &'static str, report_interval_ms: Option) { let should_report = report_interval_ms - .map(|report_interval_ms| { - let last = self.last_report.load(Ordering::Relaxed); - let now = solana_sdk::timing::timestamp(); - now.saturating_sub(last) > report_interval_ms - && self.last_report.compare_exchange( - last, - now, - Ordering::Relaxed, - Ordering::Relaxed, - ) == Ok(last) - && last != 0 - }) + .map(|report_interval_ms| self.last_report.should_update(report_interval_ms)) .unwrap_or(true); if should_report { @@ -1201,7 +1191,7 @@ impl CleanAccountsStats { #[derive(Debug, Default)] struct ShrinkStats { - last_report: AtomicU64, + last_report: AtomicInterval, num_slots_shrunk: AtomicUsize, storage_read_elapsed: AtomicU64, index_read_elapsed: AtomicU64, @@ -1222,20 +1212,7 @@ struct ShrinkStats { impl ShrinkStats { fn report(&self) { - let last = self.last_report.load(Ordering::Relaxed); - let now = solana_sdk::timing::timestamp(); - - // last is initialized to 0 by ::default() - // thus, the first 'report' call would always log. - // Instead, the first call now initialializes 'last_report' to now. - let is_first_call = last == 0; - let should_report = now.saturating_sub(last) > 1000 - && self - .last_report - .compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed) - == Ok(last); - - if !is_first_call && should_report { + if self.last_report.should_update(1000) { datapoint_info!( "shrink_stats", ( @@ -5512,17 +5489,7 @@ impl AccountsDb { } fn report_store_timings(&self) { - let last = self.stats.last_store_report.load(Ordering::Relaxed); - let now = solana_sdk::timing::timestamp(); - - if now.saturating_sub(last) > 1000 - && self.stats.last_store_report.compare_exchange( - last, - now, - Ordering::Relaxed, - Ordering::Relaxed, - ) == Ok(last) - { + if self.stats.last_store_report.should_update(1000) { let (read_only_cache_hits, read_only_cache_misses) = self.read_only_accounts_cache.get_and_reset_stats(); datapoint_info!( diff --git a/runtime/src/secondary_index.rs b/runtime/src/secondary_index.rs index 37199f2668..6b0e045423 100644 --- a/runtime/src/secondary_index.rs +++ b/runtime/src/secondary_index.rs @@ -1,5 +1,5 @@ use dashmap::{mapref::entry::Entry::Occupied, DashMap}; -use solana_sdk::pubkey::Pubkey; +use solana_sdk::{pubkey::Pubkey, timing::AtomicInterval}; use std::{ collections::HashSet, fmt::Debug, @@ -26,7 +26,7 @@ pub trait SecondaryIndexEntry: Debug { #[derive(Debug, Default)] pub struct SecondaryIndexStats { - last_report: AtomicU64, + last_report: AtomicInterval, num_inner_keys: AtomicU64, } @@ -142,17 +142,7 @@ impl } } - let now = solana_sdk::timing::timestamp(); - let last = self.stats.last_report.load(Ordering::Relaxed); - let should_report = now.saturating_sub(last) > 1000 - && self.stats.last_report.compare_exchange( - last, - now, - Ordering::Relaxed, - Ordering::Relaxed, - ) == Ok(last); - - if should_report { + if self.stats.last_report.should_update(1000) { datapoint_info!( self.metrics_name, ("num_secondary_keys", self.index.len() as i64, i64), diff --git a/sdk/src/timing.rs b/sdk/src/timing.rs index 1edc1de325..f415848a49 100644 --- a/sdk/src/timing.rs +++ b/sdk/src/timing.rs @@ -1,5 +1,6 @@ //! The `timing` module provides std::time utility functions. use crate::unchecked_div_by_const; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; pub fn duration_as_ns(d: &Duration) -> u64 { @@ -59,10 +60,46 @@ pub fn slot_duration_from_slots_per_year(slots_per_year: f64) -> Duration { Duration::from_nanos(slot_in_ns as u64) } +#[derive(Debug, Default)] +pub struct AtomicInterval { + last_update: AtomicU64, +} + +impl AtomicInterval { + pub fn should_update(&self, interval_time: u64) -> bool { + self.should_update_ext(interval_time, true) + } + + pub fn should_update_ext(&self, interval_time: u64, skip_first: bool) -> bool { + let now = timestamp(); + let last = self.last_update.load(Ordering::Relaxed); + now.saturating_sub(last) > interval_time + && self + .last_update + .compare_exchange(last, now, Ordering::Relaxed, Ordering::Relaxed) + == Ok(last) + && !(skip_first && last == 0) + } +} + #[cfg(test)] mod test { use super::*; + #[test] + fn test_interval_update() { + solana_logger::setup(); + let i = AtomicInterval::default(); + assert!(!i.should_update(1000)); + + let i = AtomicInterval::default(); + assert!(i.should_update_ext(1000, false)); + + std::thread::sleep(Duration::from_millis(10)); + assert!(i.should_update(9)); + assert!(!i.should_update(100)); + } + #[test] #[allow(clippy::float_cmp)] fn test_years_as_slots() {