Detect and report bank drop signal queue full and disconnect events (#24112)
* nonblocking send when when droping banks * detect and report drop signal queue full/disconnect events * comments * use counter for reporting bank_drop_queue events * reduce log * use datapoint to report stats * logging instead of reporting bank drop signal full * fix a corner case for reporting * fix build
This commit is contained in:
parent
1ffbb91a62
commit
c6a751d658
|
@ -7,7 +7,7 @@ use {
|
|||
},
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
},
|
||||
crossbeam_channel::unbounded,
|
||||
crossbeam_channel::bounded,
|
||||
log::*,
|
||||
solana_runtime::{
|
||||
accounts_background_service::DroppedSlotsReceiver,
|
||||
|
@ -32,6 +32,9 @@ pub type LoadResult = result::Result<
|
|||
BlockstoreProcessorError,
|
||||
>;
|
||||
|
||||
/// maximum drop bank signal queue length
|
||||
const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
|
||||
|
||||
/// Load the banks via genesis or a snapshot then processes all full blocks in blockstore
|
||||
///
|
||||
/// If a snapshot config is given, and a snapshot is found, it will be loaded. Otherwise, load
|
||||
|
@ -158,7 +161,7 @@ pub fn load_bank_forks(
|
|||
// BankForks from now on will be descended from the root bank and thus will inherit
|
||||
// the bank drop callback.
|
||||
assert_eq!(bank_forks.banks().len(), 1);
|
||||
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
|
||||
let (pruned_banks_sender, pruned_banks_receiver) = bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
|
||||
let root_bank = bank_forks.root_bank();
|
||||
let callback = root_bank
|
||||
.rc
|
||||
|
|
|
@ -11,7 +11,7 @@ use {
|
|||
snapshot_package::{PendingAccountsPackage, SnapshotType},
|
||||
snapshot_utils::{self, SnapshotError},
|
||||
},
|
||||
crossbeam_channel::{Receiver, SendError, Sender},
|
||||
crossbeam_channel::{Receiver, SendError, Sender, TrySendError},
|
||||
log::*,
|
||||
rand::{thread_rng, Rng},
|
||||
solana_measure::measure::Measure,
|
||||
|
@ -23,7 +23,7 @@ use {
|
|||
boxed::Box,
|
||||
fmt::{Debug, Formatter},
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
|
||||
Arc, RwLock,
|
||||
},
|
||||
thread::{self, sleep, Builder, JoinHandle},
|
||||
|
@ -49,6 +49,69 @@ pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
|
|||
pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
|
||||
pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
|
||||
|
||||
/// interval to report bank_drop queue events: 60s
|
||||
const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
|
||||
|
||||
/// Bank drop signal queue events
|
||||
#[allow(dead_code)]
|
||||
enum BankDropQueueEvent {
|
||||
Full,
|
||||
Disconnected,
|
||||
}
|
||||
|
||||
/// Bank drop signal queue event statistics
|
||||
#[derive(Debug, Default)]
|
||||
struct BankDropQueueStats {
|
||||
report_time: AtomicU64,
|
||||
queue_full: AtomicUsize,
|
||||
queue_disconnected: AtomicUsize,
|
||||
}
|
||||
|
||||
impl BankDropQueueStats {
|
||||
/// increase event counter
|
||||
fn increase(&self, event: BankDropQueueEvent) {
|
||||
let counter = match event {
|
||||
BankDropQueueEvent::Full => &self.queue_full,
|
||||
BankDropQueueEvent::Disconnected => &self.queue_disconnected,
|
||||
};
|
||||
|
||||
counter.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// submit bank drop signal queue event counters
|
||||
fn report(&self, event: BankDropQueueEvent) {
|
||||
let counter = match event {
|
||||
BankDropQueueEvent::Full => &self.queue_full,
|
||||
BankDropQueueEvent::Disconnected => &self.queue_disconnected,
|
||||
};
|
||||
|
||||
let name = match event {
|
||||
BankDropQueueEvent::Full => "full",
|
||||
BankDropQueueEvent::Disconnected => "disconnected",
|
||||
};
|
||||
|
||||
let ts = solana_sdk::timing::timestamp();
|
||||
let last_report_time = self.report_time.load(Ordering::Acquire);
|
||||
if ts.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL {
|
||||
let val = counter.load(Ordering::Relaxed);
|
||||
|
||||
if counter
|
||||
.compare_exchange_weak(val, 0, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_ok()
|
||||
{
|
||||
if val > 0 {
|
||||
datapoint_info!("bank_drop_queue_event", (name, val, i64));
|
||||
}
|
||||
self.report_time.store(ts, Ordering::Release);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref BANK_DROP_QUEUE_STATS: BankDropQueueStats = BankDropQueueStats::default();
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SendDroppedBankCallback {
|
||||
sender: DroppedSlotsSender,
|
||||
|
@ -56,8 +119,21 @@ pub struct SendDroppedBankCallback {
|
|||
|
||||
impl DropCallback for SendDroppedBankCallback {
|
||||
fn callback(&self, bank: &Bank) {
|
||||
if let Err(e) = self.sender.send((bank.slot(), bank.bank_id())) {
|
||||
warn!("Error sending dropped banks: {:?}", e);
|
||||
BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full);
|
||||
match self.sender.try_send((bank.slot(), bank.bank_id())) {
|
||||
Err(TrySendError::Full(_)) => {
|
||||
BANK_DROP_QUEUE_STATS.increase(BankDropQueueEvent::Full);
|
||||
BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full);
|
||||
|
||||
// send again and block until success
|
||||
let _ = self.sender.send((bank.slot(), bank.bank_id()));
|
||||
}
|
||||
|
||||
Err(TrySendError::Disconnected(_)) => {
|
||||
info!("bank DropCallback signal queue disconnected.");
|
||||
}
|
||||
// success
|
||||
Ok(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))]
|
||||
#![allow(clippy::integer_arithmetic)]
|
||||
|
||||
#[macro_use]
|
||||
extern crate lazy_static;
|
||||
|
||||
pub mod account_info;
|
||||
pub mod account_rent_state;
|
||||
pub mod accounts;
|
||||
|
|
Loading…
Reference in New Issue