From 12e57ef76559ab3739425414c0ddb80669546d76 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Thu, 16 Mar 2023 17:10:34 +0100 Subject: [PATCH] ChainData: Remove the metrics into its separate thing Until they're refactored away completely. --- data-streams/src/account_write_filter.rs | 8 ++- data-streams/src/chain_data.rs | 71 +++++++++++++++++------- lib/src/fill_event_filter.rs | 7 ++- lib/src/orderbook_filter.rs | 7 ++- service-mango-pnl/src/main.rs | 3 +- 5 files changed, 69 insertions(+), 27 deletions(-) diff --git a/data-streams/src/account_write_filter.rs b/data-streams/src/account_write_filter.rs index 1a677f9..79aad21 100644 --- a/data-streams/src/account_write_filter.rs +++ b/data-streams/src/account_write_filter.rs @@ -1,5 +1,5 @@ use crate::{ - chain_data::{AccountData, ChainData, SlotData}, + chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, metrics::Metrics, AccountWrite, SlotUpdate, }; @@ -46,7 +46,9 @@ pub fn init( // there they'll flow into the postgres sending thread. let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); - let mut chain_data = ChainData::new(metrics_sender); + let mut chain_data = ChainData::new(); + let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender); + let mut last_updated = HashMap::::new(); let all_queue_pks: BTreeSet = routes @@ -90,6 +92,8 @@ pub fn init( } } + chain_data_metrics.report(&chain_data); + for route in routes.iter() { for pk in route.matched_pubkeys.iter() { match chain_data.account(&pk) { diff --git a/data-streams/src/chain_data.rs b/data-streams/src/chain_data.rs index af81f2f..dff11c3 100644 --- a/data-streams/src/chain_data.rs +++ b/data-streams/src/chain_data.rs @@ -1,11 +1,11 @@ -use crate::metrics::{MetricType, MetricU64, Metrics}; - use { solana_sdk::account::{AccountSharedData, ReadableAccount}, solana_sdk::pubkey::Pubkey, std::collections::HashMap, }; +use crate::metrics::*; + #[derive(Clone, Copy, Debug, PartialEq)] pub enum SlotStatus { Rooted, @@ -42,13 +42,10 @@ pub struct ChainData { best_chain_slot: u64, account_versions_stored: usize, account_bytes_stored: usize, - metric_accounts_stored: MetricU64, - metric_account_versions_stored: MetricU64, - metric_account_bytes_stored: MetricU64, } impl ChainData { - pub fn new(metrics_sender: Metrics) -> Self { + pub fn new() -> Self { Self { slots: HashMap::new(), accounts: HashMap::new(), @@ -57,14 +54,6 @@ impl ChainData { best_chain_slot: 0, account_versions_stored: 0, account_bytes_stored: 0, - metric_accounts_stored: metrics_sender - .register_u64("chaindata_accounts_stored".into(), MetricType::Gauge), - metric_account_versions_stored: metrics_sender.register_u64( - "chaindata_account_versions_stored".into(), - MetricType::Gauge, - ), - metric_account_bytes_stored: metrics_sender - .register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge), } } @@ -164,12 +153,6 @@ impl ChainData { // now it's fine to drop any slots before the new rooted head // as account writes for non-rooted slots before it have been dropped self.slots.retain(|s, _| *s >= self.newest_rooted_slot); - - self.metric_accounts_stored.set(self.accounts.len() as u64); - self.metric_account_versions_stored - .set(self.account_versions_stored as u64); - self.metric_account_bytes_stored - .set(self.account_bytes_stored as u64); } } @@ -276,3 +259,51 @@ impl ChainData { self.newest_rooted_slot } } + +pub struct ChainDataMetrics { + slots_stored: MetricU64, + accounts_stored: MetricU64, + account_versions_stored: MetricU64, + account_bytes_stored: MetricU64, +} + +impl ChainDataMetrics { + pub fn new(metrics: &Metrics) -> Self { + Self { + slots_stored: metrics.register_u64("chaindata_slots_stored".into(), MetricType::Gauge), + accounts_stored: metrics + .register_u64("chaindata_accounts_stored".into(), MetricType::Gauge), + account_versions_stored: metrics.register_u64( + "chaindata_account_versions_stored".into(), + MetricType::Gauge, + ), + account_bytes_stored: metrics + .register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge), + } + } + + pub fn report(&mut self, chain: &ChainData) { + self.slots_stored.set(chain.slots_count() as u64); + self.accounts_stored.set(chain.accounts_count() as u64); + self.account_versions_stored + .set(chain.account_writes_count() as u64); + self.account_bytes_stored.set(chain.account_bytes() as u64); + } + + pub fn spawn_report_job( + chain: std::sync::Arc>, + metrics: &Metrics, + interval: std::time::Duration, + ) { + let mut m = Self::new(metrics); + + let mut interval = tokio::time::interval(interval); + tokio::spawn(async move { + loop { + interval.tick().await; + let chain_lock = chain.read().unwrap(); + m.report(&chain_lock); + } + }); + } +} diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 3cdf62b..2eab6d5 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -1,5 +1,5 @@ use crate::{ - chain_data::{AccountData, ChainData, SlotData}, + chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, metrics::{MetricType, Metrics}, orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide}, serum::SerumEventQueueHeader, @@ -662,7 +662,8 @@ pub async fn init( let account_write_queue_receiver_c = account_write_queue_receiver.clone(); - let mut chain_cache = ChainData::new(metrics_sender); + let mut chain_cache = ChainData::new(); + let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender); let mut perp_events_cache: HashMap = HashMap::new(); let mut serum_events_cache: HashMap> = HashMap::new(); let mut seq_num_cache = HashMap::new(); @@ -721,6 +722,8 @@ pub async fn init( } } + chain_data_metrics.report(&chain_cache); + for mkt in all_market_configs.iter() { let evq_pk = mkt.1.event_queue; let evq_pk_string = evq_pk.to_string(); diff --git a/lib/src/orderbook_filter.rs b/lib/src/orderbook_filter.rs index f9d5a86..00af473 100644 --- a/lib/src/orderbook_filter.rs +++ b/lib/src/orderbook_filter.rs @@ -1,6 +1,6 @@ use crate::metrics::MetricU64; use crate::{ - chain_data::{AccountData, ChainData, SlotData}, + chain_data::{AccountData, ChainData, ChainDataMetrics, SlotData}, metrics::{MetricType, Metrics}, AccountWrite, SlotUpdate, }; @@ -270,7 +270,8 @@ pub async fn init( let account_write_queue_receiver_c = account_write_queue_receiver.clone(); - let mut chain_cache = ChainData::new(metrics_sender); + let mut chain_cache = ChainData::new(); + let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender); let mut bookside_cache: HashMap> = HashMap::new(); let mut serum_bookside_cache: HashMap> = HashMap::new(); let mut last_write_versions = HashMap::::new(); @@ -315,6 +316,8 @@ pub async fn init( } } + chain_data_metrics.report(&chain_cache); + for mkt in market_configs.iter() { for side in 0..2 { let mkt_pk = mkt.0; diff --git a/service-mango-pnl/src/main.rs b/service-mango-pnl/src/main.rs index 0c5394d..ae80e6b 100644 --- a/service-mango-pnl/src/main.rs +++ b/service-mango-pnl/src/main.rs @@ -271,7 +271,8 @@ async fn main() -> anyhow::Result<()> { metrics_tx.register_u64("pnl_jsonrpc_reqs_invalid_total".into(), MetricType::Counter); let metrics_pnls_tracked = metrics_tx.register_u64("pnl_num_tracked".into(), MetricType::Gauge); - let chain_data = Arc::new(RwLock::new(ChainData::new(metrics_tx.clone()))); + // BUG: This shadows the previous chain_data and means this can't actually get data! + let chain_data = Arc::new(RwLock::new(ChainData::new())); let pnl_data = Arc::new(RwLock::new(PnlData::new())); start_pnl_updater(