remove bank drop queue stat

remove try_send
add warn! for excessively long bank drop queue
This commit is contained in:
Haoran Yi 2022-08-26 13:43:59 -05:00 committed by HaoranYi
parent 5b64107626
commit 37bba64613
1 changed files with 8 additions and 75 deletions

View File

@ -12,7 +12,7 @@ use {
snapshot_package::{PendingAccountsPackage, SnapshotType},
snapshot_utils::{self, SnapshotError},
},
crossbeam_channel::{Receiver, SendError, Sender, TrySendError},
crossbeam_channel::{Receiver, SendError, Sender},
log::*,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
@ -25,7 +25,7 @@ use {
boxed::Box,
fmt::{Debug, Formatter},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, sleep, Builder, JoinHandle},
@ -51,69 +51,6 @@ pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
pub type DroppedSlotsSender = Sender<(Slot, BankId)>;
pub type DroppedSlotsReceiver = Receiver<(Slot, BankId)>;
/// interval to report bank_drop queue events: 60s
const BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL: u64 = 60_000;
/// Bank drop signal queue events
#[allow(dead_code)]
enum BankDropQueueEvent {
Full,
Disconnected,
}
/// Bank drop signal queue event statistics
#[derive(Debug, Default)]
struct BankDropQueueStats {
report_time: AtomicU64,
queue_full: AtomicUsize,
queue_disconnected: AtomicUsize,
}
impl BankDropQueueStats {
/// increase event counter
fn increase(&self, event: BankDropQueueEvent) {
let counter = match event {
BankDropQueueEvent::Full => &self.queue_full,
BankDropQueueEvent::Disconnected => &self.queue_disconnected,
};
counter.fetch_add(1, Ordering::Relaxed);
}
/// submit bank drop signal queue event counters
fn report(&self, event: BankDropQueueEvent) {
let counter = match event {
BankDropQueueEvent::Full => &self.queue_full,
BankDropQueueEvent::Disconnected => &self.queue_disconnected,
};
let name = match event {
BankDropQueueEvent::Full => "full",
BankDropQueueEvent::Disconnected => "disconnected",
};
let ts = solana_sdk::timing::timestamp();
let last_report_time = self.report_time.load(Ordering::Acquire);
if ts.saturating_sub(last_report_time) > BANK_DROP_SIGNAL_CHANNEL_REPORT_INTERVAL {
let val = counter.load(Ordering::Relaxed);
if counter
.compare_exchange_weak(val, 0, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
if val > 0 {
datapoint_info!("bank_drop_queue_event", (name, val, i64));
}
self.report_time.store(ts, Ordering::Release);
}
}
}
}
lazy_static! {
static ref BANK_DROP_QUEUE_STATS: BankDropQueueStats = BankDropQueueStats::default();
}
#[derive(Clone)]
pub struct SendDroppedBankCallback {
sender: DroppedSlotsSender,
@ -121,17 +58,13 @@ pub struct SendDroppedBankCallback {
impl DropCallback for SendDroppedBankCallback {
fn callback(&self, bank: &Bank) {
BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full);
match self.sender.try_send((bank.slot(), bank.bank_id())) {
Err(TrySendError::Full(_)) => {
BANK_DROP_QUEUE_STATS.increase(BankDropQueueEvent::Full);
BANK_DROP_QUEUE_STATS.report(BankDropQueueEvent::Full);
let l = self.sender.len();
if l > 10_000 {
warn!("Excessive pruned_bank_channel_len: {}", l);
}
// send again and block until success
let _ = self.sender.send((bank.slot(), bank.bank_id()));
}
Err(TrySendError::Disconnected(_)) => {
match self.sender.send((bank.slot(), bank.bank_id())) {
Err(SendError(_)) => {
info!("bank DropCallback signal queue disconnected.");
}
// success