ChainData: Remove the metrics into its separate thing
Until they're refactored away completely.
This commit is contained in:
parent
53eefa1395
commit
12e57ef765
|
@ -1,5 +1,5 @@
|
|||
use crate::{
|
||||
chain_data::{AccountData, ChainData, SlotData},
|
||||
chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData},
|
||||
metrics::Metrics,
|
||||
AccountWrite, SlotUpdate,
|
||||
};
|
||||
|
@ -46,7 +46,9 @@ pub fn init(
|
|||
// there they'll flow into the postgres sending thread.
|
||||
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
|
||||
|
||||
let mut chain_data = ChainData::new(metrics_sender);
|
||||
let mut chain_data = ChainData::new();
|
||||
let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender);
|
||||
|
||||
let mut last_updated = HashMap::<String, AcountWriteRecord>::new();
|
||||
|
||||
let all_queue_pks: BTreeSet<Pubkey> = routes
|
||||
|
@ -90,6 +92,8 @@ pub fn init(
|
|||
}
|
||||
}
|
||||
|
||||
chain_data_metrics.report(&chain_data);
|
||||
|
||||
for route in routes.iter() {
|
||||
for pk in route.matched_pubkeys.iter() {
|
||||
match chain_data.account(&pk) {
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
use crate::metrics::{MetricType, MetricU64, Metrics};
|
||||
|
||||
use {
|
||||
solana_sdk::account::{AccountSharedData, ReadableAccount},
|
||||
solana_sdk::pubkey::Pubkey,
|
||||
std::collections::HashMap,
|
||||
};
|
||||
|
||||
use crate::metrics::*;
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
pub enum SlotStatus {
|
||||
Rooted,
|
||||
|
@ -42,13 +42,10 @@ pub struct ChainData {
|
|||
best_chain_slot: u64,
|
||||
account_versions_stored: usize,
|
||||
account_bytes_stored: usize,
|
||||
metric_accounts_stored: MetricU64,
|
||||
metric_account_versions_stored: MetricU64,
|
||||
metric_account_bytes_stored: MetricU64,
|
||||
}
|
||||
|
||||
impl ChainData {
|
||||
pub fn new(metrics_sender: Metrics) -> Self {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
slots: HashMap::new(),
|
||||
accounts: HashMap::new(),
|
||||
|
@ -57,14 +54,6 @@ impl ChainData {
|
|||
best_chain_slot: 0,
|
||||
account_versions_stored: 0,
|
||||
account_bytes_stored: 0,
|
||||
metric_accounts_stored: metrics_sender
|
||||
.register_u64("chaindata_accounts_stored".into(), MetricType::Gauge),
|
||||
metric_account_versions_stored: metrics_sender.register_u64(
|
||||
"chaindata_account_versions_stored".into(),
|
||||
MetricType::Gauge,
|
||||
),
|
||||
metric_account_bytes_stored: metrics_sender
|
||||
.register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -164,12 +153,6 @@ impl ChainData {
|
|||
// now it's fine to drop any slots before the new rooted head
|
||||
// as account writes for non-rooted slots before it have been dropped
|
||||
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
|
||||
|
||||
self.metric_accounts_stored.set(self.accounts.len() as u64);
|
||||
self.metric_account_versions_stored
|
||||
.set(self.account_versions_stored as u64);
|
||||
self.metric_account_bytes_stored
|
||||
.set(self.account_bytes_stored as u64);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -276,3 +259,51 @@ impl ChainData {
|
|||
self.newest_rooted_slot
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ChainDataMetrics {
|
||||
slots_stored: MetricU64,
|
||||
accounts_stored: MetricU64,
|
||||
account_versions_stored: MetricU64,
|
||||
account_bytes_stored: MetricU64,
|
||||
}
|
||||
|
||||
impl ChainDataMetrics {
|
||||
pub fn new(metrics: &Metrics) -> Self {
|
||||
Self {
|
||||
slots_stored: metrics.register_u64("chaindata_slots_stored".into(), MetricType::Gauge),
|
||||
accounts_stored: metrics
|
||||
.register_u64("chaindata_accounts_stored".into(), MetricType::Gauge),
|
||||
account_versions_stored: metrics.register_u64(
|
||||
"chaindata_account_versions_stored".into(),
|
||||
MetricType::Gauge,
|
||||
),
|
||||
account_bytes_stored: metrics
|
||||
.register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn report(&mut self, chain: &ChainData) {
|
||||
self.slots_stored.set(chain.slots_count() as u64);
|
||||
self.accounts_stored.set(chain.accounts_count() as u64);
|
||||
self.account_versions_stored
|
||||
.set(chain.account_writes_count() as u64);
|
||||
self.account_bytes_stored.set(chain.account_bytes() as u64);
|
||||
}
|
||||
|
||||
pub fn spawn_report_job(
|
||||
chain: std::sync::Arc<std::sync::RwLock<ChainData>>,
|
||||
metrics: &Metrics,
|
||||
interval: std::time::Duration,
|
||||
) {
|
||||
let mut m = Self::new(metrics);
|
||||
|
||||
let mut interval = tokio::time::interval(interval);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let chain_lock = chain.read().unwrap();
|
||||
m.report(&chain_lock);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::{
|
||||
chain_data::{AccountData, ChainData, SlotData},
|
||||
chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData},
|
||||
metrics::{MetricType, Metrics},
|
||||
orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide},
|
||||
serum::SerumEventQueueHeader,
|
||||
|
@ -662,7 +662,8 @@ pub async fn init(
|
|||
|
||||
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
||||
|
||||
let mut chain_cache = ChainData::new(metrics_sender);
|
||||
let mut chain_cache = ChainData::new();
|
||||
let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender);
|
||||
let mut perp_events_cache: HashMap<String, EventQueueEvents> = HashMap::new();
|
||||
let mut serum_events_cache: HashMap<String, Vec<serum_dex::state::Event>> = HashMap::new();
|
||||
let mut seq_num_cache = HashMap::new();
|
||||
|
@ -721,6 +722,8 @@ pub async fn init(
|
|||
}
|
||||
}
|
||||
|
||||
chain_data_metrics.report(&chain_cache);
|
||||
|
||||
for mkt in all_market_configs.iter() {
|
||||
let evq_pk = mkt.1.event_queue;
|
||||
let evq_pk_string = evq_pk.to_string();
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::metrics::MetricU64;
|
||||
use crate::{
|
||||
chain_data::{AccountData, ChainData, SlotData},
|
||||
chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData},
|
||||
metrics::{MetricType, Metrics},
|
||||
AccountWrite, SlotUpdate,
|
||||
};
|
||||
|
@ -270,7 +270,8 @@ pub async fn init(
|
|||
|
||||
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
||||
|
||||
let mut chain_cache = ChainData::new(metrics_sender);
|
||||
let mut chain_cache = ChainData::new();
|
||||
let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender);
|
||||
let mut bookside_cache: HashMap<String, Vec<OrderbookLevel>> = HashMap::new();
|
||||
let mut serum_bookside_cache: HashMap<String, Vec<OrderbookLevel>> = HashMap::new();
|
||||
let mut last_write_versions = HashMap::<String, (u64, u64)>::new();
|
||||
|
@ -315,6 +316,8 @@ pub async fn init(
|
|||
}
|
||||
}
|
||||
|
||||
chain_data_metrics.report(&chain_cache);
|
||||
|
||||
for mkt in market_configs.iter() {
|
||||
for side in 0..2 {
|
||||
let mkt_pk = mkt.0;
|
||||
|
|
|
@ -271,7 +271,8 @@ async fn main() -> anyhow::Result<()> {
|
|||
metrics_tx.register_u64("pnl_jsonrpc_reqs_invalid_total".into(), MetricType::Counter);
|
||||
let metrics_pnls_tracked = metrics_tx.register_u64("pnl_num_tracked".into(), MetricType::Gauge);
|
||||
|
||||
let chain_data = Arc::new(RwLock::new(ChainData::new(metrics_tx.clone())));
|
||||
// BUG: This shadows the previous chain_data and means this can't actually get data!
|
||||
let chain_data = Arc::new(RwLock::new(ChainData::new()));
|
||||
let pnl_data = Arc::new(RwLock::new(PnlData::new()));
|
||||
|
||||
start_pnl_updater(
|
||||
|
|
Loading…
Reference in New Issue