diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index ac4a6f41b..e88d04cbf 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -7,7 +7,7 @@ use { }, leader_schedule_cache::LeaderScheduleCache, }, - crossbeam_channel::unbounded, + crossbeam_channel::bounded, log::*, solana_runtime::{ accounts_background_service::DroppedSlotsReceiver, @@ -32,6 +32,9 @@ pub type LoadResult = result::Result< BlockstoreProcessorError, >; +/// maximum drop bank signal queue length +const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000; + /// Load the banks via genesis or a snapshot then processes all full blocks in blockstore /// /// If a snapshot config is given, and a snapshot is found, it will be loaded. Otherwise, load @@ -158,7 +161,7 @@ pub fn load_bank_forks( // BankForks from now on will be descended from the root bank and thus will inherit // the bank drop callback. assert_eq!(bank_forks.banks().len(), 1); - let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); + let (pruned_banks_sender, pruned_banks_receiver) = bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE); let root_bank = bank_forks.root_bank(); let callback = root_bank .rc diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index ea6a29881..27b3f8568 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -11,7 +11,7 @@ use { snapshot_package::{PendingAccountsPackage, SnapshotType}, snapshot_utils::{self, SnapshotError}, }, - crossbeam_channel::{Receiver, SendError, Sender}, + crossbeam_channel::{Receiver, SendError, Sender, TrySendError}, log::*, rand::{thread_rng, Rng}, solana_measure::measure::Measure, @@ -23,7 +23,7 @@ use { boxed::Box, fmt::{Debug, Formatter}, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, Arc, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, @@ -49,6 +49,69 @@ pub type SnapshotRequestReceiver = Receiver; pub type DroppedSlotsSender = Sender<(Slot, BankId)>; pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>; +/// interval to report bank_drop queue events: 60s +const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000; + +/// Bank drop signal queue events +#[allow(dead_code)] +enum BankDropQueueEvent { + Full, + Disconnected, +} + +/// Bank drop signal queue event statistics +#[derive(Debug, Default)] +struct BankDropQueueStats { + report_time: AtomicU64, + queue_full: AtomicUsize, + queue_disconnected: AtomicUsize, +} + +impl BankDropQueueStats { + /// increase event counter + fn increase(&self, event: BankDropQueueEvent) { + let counter = match event { + BankDropQueueEvent::Full => &self.queue_full, + BankDropQueueEvent::Disconnected => &self.queue_disconnected, + }; + + counter.fetch_add(1, Ordering::Relaxed); + } + + /// submit bank drop signal queue event counters + fn report(&self, event: BankDropQueueEvent) { + let counter = match event { + BankDropQueueEvent::Full => &self.queue_full, + BankDropQueueEvent::Disconnected => &self.queue_disconnected, + }; + + let name = match event { + BankDropQueueEvent::Full => "full", + BankDropQueueEvent::Disconnected => "disconnected", + }; + + let ts = solana_sdk::timing::timestamp(); + let last_report_time = self.report_time.load(Ordering::Acquire); + if ts.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL { + let val = counter.load(Ordering::Relaxed); + + if counter + .compare_exchange_weak(val, 0, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + if val > 0 { + datapoint_info!("bank_drop_queue_event", (name, val, i64)); + } + self.report_time.store(ts, Ordering::Release); + } + } + } +} + +lazy_static! { + static ref BANK_DROP_QUEUE_STATS: BankDropQueueStats = BankDropQueueStats::default(); +} + #[derive(Clone)] pub struct SendDroppedBankCallback { sender: DroppedSlotsSender, @@ -56,8 +119,21 @@ pub struct SendDroppedBankCallback { impl DropCallback for SendDroppedBankCallback { fn callback(&self, bank: &Bank) { - if let Err(e) = self.sender.send((bank.slot(), bank.bank_id())) { - warn!("Error sending dropped banks: {:?}", e); + BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full); + match self.sender.try_send((bank.slot(), bank.bank_id())) { + Err(TrySendError::Full(_)) => { + BANK_DROP_QUEUE_STATS.increase(BankDropQueueEvent::Full); + BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full); + + // send again and block until success + let _ = self.sender.send((bank.slot(), bank.bank_id())); + } + + Err(TrySendError::Disconnected(_)) => { + info!("bank DropCallback signal queue disconnected."); + } + // success + Ok(_) => {} } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 5295ef97b..aede6f7ee 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -1,5 +1,9 @@ #![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))] #![allow(clippy::integer_arithmetic)] + +#[macro_use] +extern crate lazy_static; + pub mod account_info; pub mod account_rent_state; pub mod accounts;