diff --git a/Cargo.lock b/Cargo.lock index 25c4d31..25274d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -539,9 +539,9 @@ dependencies = [ [[package]] name = "base-x" -version = "0.2.11" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" +checksum = "dc19a4937b4fbd3fe3379793130e42060d10627a360f2127802b10b87e7baf74" [[package]] name = "base64" @@ -4787,7 +4787,6 @@ dependencies = [ "tokio", "tokio-tungstenite", "toml", - "warp", "ws", ] @@ -5349,6 +5348,7 @@ dependencies = [ "tokio-stream", "tonic 0.6.2", "tonic-build 0.6.2", + "warp", ] [[package]] @@ -7425,9 +7425,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.32" +version = "0.4.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa76fb221a1f8acddf5b54ace85912606980ad661ac7a503b4570ffd3a624dad" +checksum = "6f741de44b75e14c35df886aff5f1eb73aa114fa5d4d00dcd37b5e01259bf3b2" dependencies = [ "cfg-if 1.0.0", "js-sys", diff --git a/connector-mango/src/main.rs b/connector-mango/src/main.rs index 374e788..b65316e 100644 --- a/connector-mango/src/main.rs +++ b/connector-mango/src/main.rs @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { Arc::new(mango::MangoCacheTable {}), ]; - let metrics_tx = metrics::start(); + let metrics_tx = metrics::start(config.metrics); let (account_write_queue_sender, slot_queue_sender) = postgres_target::init(&config.postgres_target, account_tables, metrics_tx.clone()).await?; diff --git a/connector-raw/src/main.rs b/connector-raw/src/main.rs index 6b2ab0c..c741e05 100644 --- a/connector-raw/src/main.rs +++ b/connector-raw/src/main.rs @@ -22,7 +22,7 @@ async fn main() -> anyhow::Result<()> { solana_logger::setup_with_default("info"); info!("startup"); - let metrics_tx = metrics::start(); + let metrics_tx = metrics::start(config.metrics); let account_tables: AccountTables = vec![Arc::new(RawAccountTable {})]; diff --git a/lib/Cargo.toml b/lib/Cargo.toml index ccb4cac..c939b64 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -53,6 +53,8 @@ async-stream = "0.2" async-channel = "1.6" async-trait = "0.1" +warp = "0.3" + solana-geyser-connector-plugin-grpc = { path = "../geyser-plugin-grpc" } [build-dependencies] diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 1eb2f43..0169d37 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -1,6 +1,6 @@ use crate::{ chain_data::{AccountData, ChainData, SlotData}, - metrics, AccountWrite, SlotUpdate, + metrics::{Metrics, MetricType}, AccountWrite, SlotUpdate, }; use log::*; use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; @@ -237,7 +237,7 @@ fn publish_changes( pub async fn init( markets: Vec, - metrics_sender: metrics::Metrics, + metrics_sender: Metrics, ) -> anyhow::Result<( async_channel::Sender, async_channel::Sender, @@ -245,9 +245,9 @@ pub async fn init( )> { let metrics_sender = metrics_sender.clone(); - let mut metric_events_new = metrics_sender.register_u64("fills_feed_events_new".into()); - let mut metric_events_change = metrics_sender.register_u64("fills_feed_events_change".into()); - let mut metrics_events_drop = metrics_sender.register_u64("fills_feed_events_drop".into()); + let mut metric_events_new = metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Gauge); + let mut metric_events_change = metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Gauge); + let mut metrics_events_drop = metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Gauge); // The actual message may want to also contain a retry count, if it self-reinserts on failure? let (account_write_queue_sender, account_write_queue_receiver) = diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 3d9c4e1..61b1c34 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -19,7 +19,7 @@ pub mod geyser_proto { use geyser_proto::accounts_db_client::AccountsDbClient; use crate::{ - metrics, AccountWrite, AnyhowWrap, GrpcSourceConfig, SlotStatus, SlotUpdate, + metrics::{Metrics, MetricType}, AccountWrite, AnyhowWrap, GrpcSourceConfig, SlotStatus, SlotUpdate, SnapshotSourceConfig, SourceConfig, TlsConfig, }; @@ -276,7 +276,7 @@ pub async fn process_events( config: &SourceConfig, account_write_queue_sender: async_channel::Sender, slot_queue_sender: async_channel::Sender, - metrics_sender: metrics::Metrics, + metrics_sender: Metrics, ) { // Subscribe to geyser let (msg_sender, msg_receiver) = async_channel::bounded::(config.dedup_queue_size); @@ -290,15 +290,15 @@ pub async fn process_events( tokio::spawn(async move { let mut metric_retries = metrics_sender.register_u64(format!( - "grpc_source_{}_connection_retries_count", - grpc_source.name - )); - let metric_status = - metrics_sender.register_string(format!("grpc_source_{}_status", grpc_source.name)); + "grpc_source_{}_connection_retries", + grpc_source.name, + ), MetricType::Counter); + let metric_connected = + metrics_sender.register_bool(format!("grpc_source_{}_status", grpc_source.name)); // Continuously reconnect on failure loop { - metric_status.set("connected".into()); + metric_connected.set(true); let out = feed_data_geyser( &grpc_source, tls_config.clone(), @@ -314,7 +314,7 @@ pub async fn process_events( ); } - metric_status.set("disconnected".into()); + metric_connected.set(false); metric_retries.increment(); tokio::time::sleep(std::time::Duration::from_secs( @@ -335,13 +335,13 @@ pub async fn process_events( // Number of slots to retain in latest_write let latest_write_retention = 50; - let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes_count".into()); - let mut metric_account_queue = metrics_sender.register_u64("account_write_queue".into()); - let mut metric_slot_queue = metrics_sender.register_u64("slot_update_queue".into()); - let mut metric_slot_updates = metrics_sender.register_u64("grpc_slot_updates_count".into()); - let mut metric_snapshots = metrics_sender.register_u64("grpc_snapshots_count".into()); + let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes".into(), MetricType::Counter); + let mut metric_account_queue = metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge); + let mut metric_slot_queue = metrics_sender.register_u64("slot_update_queue".into(), MetricType::Gauge); + let mut metric_slot_updates = metrics_sender.register_u64("grpc_slot_updates".into(), MetricType::Counter); + let mut metric_snapshots = metrics_sender.register_u64("grpc_snapshots".into(), MetricType::Counter); let mut metric_snapshot_account_writes = - metrics_sender.register_u64("grpc_snapshot_account_writes_count".into()); + metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter); loop { let msg = msg_receiver.recv().await.expect("sender must not close"); diff --git a/lib/src/lib.rs b/lib/src/lib.rs index a643de4..14b58e4 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -123,10 +123,19 @@ pub struct SnapshotSourceConfig { pub program_id: String, } +#[derive(Clone, Debug, Deserialize)] +pub struct MetricsConfig { + pub output_stdout: bool, + pub output_http: bool, + // TODO: add configurable port and endpoint url + // TODO: add configurable write interval +} + #[derive(Clone, Debug, Deserialize)] pub struct Config { pub postgres_target: PostgresConfig, pub source: SourceConfig, + pub metrics: MetricsConfig, } #[async_trait] diff --git a/lib/src/metrics.rs b/lib/src/metrics.rs index 56582d4..31e7208 100644 --- a/lib/src/metrics.rs +++ b/lib/src/metrics.rs @@ -1,22 +1,44 @@ use { + crate::MetricsConfig, log::*, std::collections::HashMap, std::sync::{atomic, Arc, Mutex, RwLock}, tokio::time, + warp::{Filter, Rejection, Reply}, + std::fmt, }; #[derive(Debug)] enum Value { - U64(Arc), - I64(Arc), - String(Arc>), + U64 { value: Arc, metric_type: MetricType }, + I64 { value: Arc, metric_type: MetricType }, + Bool { value: Arc>, metric_type: MetricType }, +} + +#[derive(Debug, Clone)] +pub enum MetricType { + Counter, + Gauge, +} + +impl fmt::Display for MetricType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + MetricType::Counter => { + write!(f, "counter") + }, + MetricType::Gauge => { + write!(f, "gauge") + }, + } + } } #[derive(Debug)] enum PrevValue { U64(u64), I64(i64), - String(String), + Bool(bool), } #[derive(Clone)] @@ -68,12 +90,12 @@ impl MetricI64 { } #[derive(Clone)] -pub struct MetricString { - value: Arc>, +pub struct MetricBool { + value: Arc>, } -impl MetricString { - pub fn set(&self, value: String) { +impl MetricBool { + pub fn set(&self, value: bool) { *self.value.lock().unwrap() = value; } } @@ -84,133 +106,184 @@ pub struct Metrics { } impl Metrics { - pub fn register_u64(&self, name: String) -> MetricU64 { + pub fn register_u64(&self, name: String, metric_type: MetricType) -> MetricU64 { let mut registry = self.registry.write().unwrap(); let value = registry .entry(name) - .or_insert(Value::U64(Arc::new(atomic::AtomicU64::new(0)))); + .or_insert(Value::U64 { value: Arc::new(atomic::AtomicU64::new(0)), metric_type: metric_type }); MetricU64 { value: match value { - Value::U64(v) => v.clone(), + Value::U64 { value: v, metric_type: _ } => v.clone(), _ => panic!("bad metric type"), }, } } - pub fn register_i64(&self, name: String) -> MetricI64 { + pub fn register_i64(&self, name: String, metric_type: MetricType) -> MetricI64 { let mut registry = self.registry.write().unwrap(); let value = registry .entry(name) - .or_insert(Value::I64(Arc::new(atomic::AtomicI64::new(0)))); + .or_insert(Value::I64 { value: Arc::new(atomic::AtomicI64::new(0)), metric_type: metric_type }); MetricI64 { value: match value { - Value::I64(v) => v.clone(), + Value::I64 { value: v, metric_type: _ } => v.clone(), _ => panic!("bad metric type"), }, } } - pub fn register_string(&self, name: String) -> MetricString { + pub fn register_bool(&self, name: String) -> MetricBool { let mut registry = self.registry.write().unwrap(); let value = registry .entry(name) - .or_insert(Value::String(Arc::new(Mutex::new(String::new())))); - MetricString { + .or_insert(Value::Bool { value: Arc::new(Mutex::new(false)), metric_type: MetricType::Gauge }); + MetricBool { value: match value { - Value::String(v) => v.clone(), + Value::Bool { value: v, metric_type: _ } => v.clone(), _ => panic!("bad metric type"), }, } } - pub fn get_registry_vec(&self) -> Vec<(String, String)> { - let mut vec: Vec<(String, String)> = Vec::new(); + pub fn get_registry_vec(&self) -> Vec<(String, String, String)> { + let mut vec: Vec<(String, String, String)> = Vec::new(); let metrics = self.registry.read().unwrap(); for (name, value) in metrics.iter() { - let value_str = match value { - Value::U64(v) => { - format!("{}", v.load(atomic::Ordering::Acquire)) + let (value_str, type_str) = match value { + Value::U64 { value: v, metric_type: t } => { + (format!("{}", v.load(atomic::Ordering::Acquire)), t.to_string()) } - Value::I64(v) => { - format!("{}", v.load(atomic::Ordering::Acquire)) + Value::I64 { value: v, metric_type: t } => { + (format!("{}", v.load(atomic::Ordering::Acquire)), t.to_string()) } - Value::String(v) => { - format!("{}", v.lock().unwrap()) + Value::Bool { value: v, metric_type: t } => { + let bool_to_int = if *v.lock().unwrap() { + 1 + } else { + 0 + }; + (format!("{}", bool_to_int), t.to_string()) } }; - vec.push((name.clone(), value_str)); + vec.push((name.clone(), value_str, type_str)); } vec } } -pub fn start() -> Metrics { +async fn handle_prometheus_poll(metrics: Metrics) -> Result { + debug!("handle_prometheus_poll"); + let labels = HashMap::from([("process", "fills")]); + let label_strings_vec: Vec = labels + .iter() + .map(|(name, value)| format!("{}=\"{}\"", name, value)) + .collect(); + let lines: Vec = metrics + .get_registry_vec() + .iter() + .map(|(name, value, type_name)| { + format!( + "# TYPE {} {}\n{}{{{}}} {}", + name, + type_name, + name, + label_strings_vec.join(","), + value + ) + }) + .collect(); + Ok(format!("{}\n", lines.join("\n"))) +} + +pub fn with_metrics( + metrics: Metrics, +) -> impl Filter + Clone { + warp::any().map(move || metrics.clone()) +} + +pub fn start(config: MetricsConfig) -> Metrics { let mut write_interval = time::interval(time::Duration::from_secs(60)); let registry = Arc::new(RwLock::new(HashMap::::new())); let registry_c = Arc::clone(®istry); - tokio::spawn(async move { - let mut previous_values = HashMap::::new(); - loop { - write_interval.tick().await; + let metrics_tx = Metrics { registry }; + let metrics_route = warp::path!("metrics") + .and(with_metrics(metrics_tx.clone())) + .and_then(handle_prometheus_poll); - // Nested locking! Safe because the only other user locks registry for writing and doesn't - // acquire any interior locks. - let metrics = registry_c.read().unwrap(); - for (name, value) in metrics.iter() { - let previous_value = previous_values.get_mut(name); - match value { - Value::U64(v) => { - let new_value = v.load(atomic::Ordering::Acquire); - let previous_value = if let Some(PrevValue::U64(v)) = previous_value { - let prev = *v; - *v = new_value; - prev - } else { - previous_values.insert(name.clone(), PrevValue::U64(new_value)); - 0 - }; - let diff = new_value.wrapping_sub(previous_value) as i64; - info!("metric: {}: {} ({:+})", name, new_value, diff); - } - Value::I64(v) => { - let new_value = v.load(atomic::Ordering::Acquire); - let previous_value = if let Some(PrevValue::I64(v)) = previous_value { - let prev = *v; - *v = new_value; - prev - } else { - previous_values.insert(name.clone(), PrevValue::I64(new_value)); - 0 - }; - let diff = new_value - previous_value; - info!("metric: {}: {} ({:+})", name, new_value, diff); - } - Value::String(v) => { - let new_value = v.lock().unwrap(); - let previous_value = if let Some(PrevValue::String(v)) = previous_value { - let mut prev = new_value.clone(); - std::mem::swap(&mut prev, v); - prev - } else { - previous_values - .insert(name.clone(), PrevValue::String(new_value.clone())); - "".into() - }; - if *new_value == previous_value { - info!("metric: {}: {} (unchanged)", name, &*new_value); - } else { - info!( - "metric: {}: {} (before: {})", - name, &*new_value, previous_value - ); + if config.output_http { + // serve prometheus metrics endpoint + tokio::spawn(async move { + warp::serve(metrics_route).run(([0, 0, 0, 0], 9091)).await; + }); + } + + if config.output_stdout { + // periodically log to stdout + tokio::spawn(async move { + let mut previous_values = HashMap::::new(); + loop { + write_interval.tick().await; + + // Nested locking! Safe because the only other user locks registry for writing and doesn't + // acquire any interior locks. + let metrics = registry_c.read().unwrap(); + for (name, value) in metrics.iter() { + let previous_value = previous_values.get_mut(name); + match value { + Value::U64 { value: v, metric_type: _ } => { + let new_value = v.load(atomic::Ordering::Acquire); + let previous_value = if let Some(PrevValue::U64(v)) = previous_value { + let prev = *v; + *v = new_value; + prev + } else { + previous_values.insert(name.clone(), PrevValue::U64(new_value)); + 0 + }; + let diff = new_value.wrapping_sub(previous_value) as i64; + info!("metric: {}: {} ({:+})", name, new_value, diff); + } + Value::I64 { value: v, metric_type: _ } => { + let new_value = v.load(atomic::Ordering::Acquire); + let previous_value = if let Some(PrevValue::I64(v)) = previous_value { + let prev = *v; + *v = new_value; + prev + } else { + previous_values.insert(name.clone(), PrevValue::I64(new_value)); + 0 + }; + let diff = new_value - previous_value; + info!("metric: {}: {} ({:+})", name, new_value, diff); + } + Value::Bool { value: v, metric_type: _ } => { + let new_value = v.lock().unwrap(); + let previous_value = if let Some(PrevValue::Bool(v)) = previous_value + { + let mut prev = new_value.clone(); + std::mem::swap(&mut prev, v); + prev + } else { + previous_values + .insert(name.clone(), PrevValue::Bool(new_value.clone())); + false + }; + if *new_value == previous_value { + info!("metric: {}: {} (unchanged)", name, &*new_value); + } else { + info!( + "metric: {}: {} (before: {})", + name, &*new_value, previous_value + ); + } } } } } - } - }); + }); + } - Metrics { registry } + metrics_tx } diff --git a/lib/src/postgres_target.rs b/lib/src/postgres_target.rs index 7a587e7..e4e9ab7 100644 --- a/lib/src/postgres_target.rs +++ b/lib/src/postgres_target.rs @@ -5,7 +5,7 @@ use postgres_native_tls::MakeTlsConnector; use postgres_query::{query, query_dyn}; use std::{collections::HashMap, convert::TryFrom, time::Duration}; -use crate::{metrics, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate}; +use crate::{metrics::*, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate}; mod pg { #[derive(Clone, Copy, Debug, PartialEq, postgres_types::ToSql)] @@ -28,8 +28,8 @@ mod pg { async fn postgres_connection( config: &PostgresConfig, - metric_retries: metrics::MetricU64, - metric_live: metrics::MetricU64, + metric_retries: MetricU64, + metric_live: MetricU64, ) -> anyhow::Result>> { let (tx, rx) = async_channel::unbounded(); @@ -357,7 +357,7 @@ fn epoch_secs_to_time(secs: u64) -> std::time::SystemTime { pub async fn init( config: &PostgresConfig, account_tables: AccountTables, - metrics_sender: metrics::Metrics, + metrics_sender: Metrics, ) -> anyhow::Result<( async_channel::Sender, async_channel::Sender, @@ -372,8 +372,8 @@ pub async fn init( let (slot_inserter_sender, slot_inserter_receiver) = async_channel::unbounded::<(SlotUpdate, SlotPreprocessing)>(); - let metric_con_retries = metrics_sender.register_u64("postgres_connection_retries".into()); - let metric_con_live = metrics_sender.register_u64("postgres_connections_alive".into()); + let metric_con_retries = metrics_sender.register_u64("postgres_connection_retries".into(), MetricType::Counter); + let metric_con_live = metrics_sender.register_u64("postgres_connections_alive".into(), MetricType::Gauge); // postgres account write sending worker threads for _ in 0..config.account_write_connection_count { @@ -384,9 +384,9 @@ pub async fn init( let account_tables_c = account_tables.clone(); let config = config.clone(); let mut metric_retries = - metrics_sender.register_u64("postgres_account_write_retries".into()); + metrics_sender.register_u64("postgres_account_write_retries".into(), MetricType::Counter); let mut metric_last_write = - metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into()); + metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into(), MetricType::Gauge); tokio::spawn(async move { let mut client_opt = None; loop { @@ -449,7 +449,7 @@ pub async fn init( } // slot update handling thread - let mut metric_slot_queue = metrics_sender.register_u64("slot_insert_queue".into()); + let mut metric_slot_queue = metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge); tokio::spawn(async move { let mut slots = Slots::new(); @@ -486,9 +486,9 @@ pub async fn init( .await?; let receiver_c = slot_inserter_receiver.clone(); let config = config.clone(); - let mut metric_retries = metrics_sender.register_u64("postgres_slot_update_retries".into()); + let mut metric_retries = metrics_sender.register_u64("postgres_slot_update_retries".into(), MetricType::Counter); let mut metric_last_write = - metrics_sender.register_u64("postgres_slot_last_write_timestamp".into()); + metrics_sender.register_u64("postgres_slot_last_write_timestamp".into(), MetricType::Gauge); let slots_processing = slots_processing.clone(); tokio::spawn(async move { let mut client_opt = None; @@ -536,9 +536,9 @@ pub async fn init( postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) .await?; let mut metric_last_cleanup = - metrics_sender.register_u64("postgres_cleanup_last_success_timestamp".into()); + metrics_sender.register_u64("postgres_cleanup_last_success_timestamp".into(), MetricType::Gauge); let mut metric_cleanup_errors = - metrics_sender.register_u64("postgres_cleanup_errors".into()); + metrics_sender.register_u64("postgres_cleanup_errors".into(), MetricType::Counter); let config = config.clone(); tokio::spawn(async move { let mut client_opt = None; @@ -568,11 +568,11 @@ pub async fn init( postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) .await?; let metric_slot_last_write = - metrics_sender.register_u64("postgres_slot_last_write_timestamp".into()); + metrics_sender.register_u64("postgres_slot_last_write_timestamp".into(), MetricType::Gauge); let metric_account_write_last_write = - metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into()); - let metric_account_queue = metrics_sender.register_u64("account_write_queue".into()); - let metric_slot_queue = metrics_sender.register_u64("slot_insert_queue".into()); + metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into(), MetricType::Gauge); + let metric_account_queue = metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge); + let metric_slot_queue = metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge); let config = config.clone(); tokio::spawn(async move { let mut client_opt = None; diff --git a/service-mango-fills/Cargo.toml b/service-mango-fills/Cargo.toml index 1edc6e2..118287a 100644 --- a/service-mango-fills/Cargo.toml +++ b/service-mango-fills/Cargo.toml @@ -22,4 +22,3 @@ async-channel = "1.6" async-trait = "0.1" tokio = { version = "1", features = ["full"] } tokio-tungstenite = "0.17" -warp = "0.3" diff --git a/service-mango-fills/example-config.toml b/service-mango-fills/example-config.toml index 45fafd7..ee95545 100644 --- a/service-mango-fills/example-config.toml +++ b/service-mango-fills/example-config.toml @@ -1,5 +1,9 @@ bind_ws_addr = "127.0.0.1:8080" +[metrics] +output_stdout = true +output_http = true + [source] dedup_queue_size = 50000 rpc_ws_url = "" diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index b58c436..978f295 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -9,14 +9,12 @@ use tokio::{ use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use serde::Deserialize; -use solana_geyser_connector_lib::metrics::MetricU64; +use solana_geyser_connector_lib::metrics::{MetricU64, MetricType}; use solana_geyser_connector_lib::{ fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage, MarketConfig}, - grpc_plugin_source, metrics, websocket_source, SourceConfig, + grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig, }; -use crate::metrics::Metrics; -use warp::{Filter, Rejection, Reply}; type CheckpointMap = Arc>>; type PeerMap = Arc>>>; @@ -78,30 +76,10 @@ async fn handle_connection( Ok(()) } -async fn handle_metrics(metrics: Metrics) -> Result { - info!("handle_metrics"); - let labels = HashMap::from([("process", "fills")]); - let label_strings_vec: Vec = labels - .iter() - .map(|(name, value)| format!("{}=\"{}\"", name, value)) - .collect(); - let lines: Vec = metrics - .get_registry_vec() - .iter() - .map(|(name, value)| format!("# TYPE {} counter\n{}{{{}}} {}", name, name, label_strings_vec.join(","), value)) - .collect(); - Ok(format!("{}\n", lines.join("\n"))) -} - -pub fn with_metrics( - metrics: Metrics, -) -> impl Filter + Clone { - warp::any().map(move || metrics.clone()) -} - #[derive(Clone, Debug, Deserialize)] pub struct Config { pub source: SourceConfig, + pub metrics: MetricsConfig, pub markets: Vec, pub bind_ws_addr: String, } @@ -125,21 +103,13 @@ async fn main() -> anyhow::Result<()> { solana_logger::setup_with_default("info"); - let metrics_tx = metrics::start(); - let metrics_route = warp::path!("metrics") - .and(with_metrics(metrics_tx.clone())) - .and_then(handle_metrics); - - // serve prometheus metrics endpoint - tokio::spawn(async move { - warp::serve(metrics_route).run(([0, 0, 0, 0], 9091)).await; - }); + let metrics_tx = metrics::start(config.metrics); let metrics_opened_connections = - metrics_tx.register_u64("fills_feed_opened_connections_count".into()); + metrics_tx.register_u64("fills_feed_opened_connections".into(), MetricType::Gauge); let metrics_closed_connections = - metrics_tx.register_u64("fills_feed_closed_connections_count".into()); + metrics_tx.register_u64("fills_feed_closed_connections".into(), MetricType::Gauge); let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(config.markets.clone(), metrics_tx.clone()).await?; diff --git a/service-mango-fills/template-config.toml b/service-mango-fills/template-config.toml index f0bb59a..6bda961 100644 --- a/service-mango-fills/template-config.toml +++ b/service-mango-fills/template-config.toml @@ -1,5 +1,9 @@ bind_ws_addr = "0.0.0.0:8080" +[metrics] +output_stdout = true +output_http = true + [source] dedup_queue_size = 50000 rpc_ws_url = "" diff --git a/service-mango-pnl/example-config.toml b/service-mango-pnl/example-config.toml index 5ab10b2..6c6d48a 100644 --- a/service-mango-pnl/example-config.toml +++ b/service-mango-pnl/example-config.toml @@ -1,3 +1,7 @@ +[metrics] +output_stdout = true +output_http = true + [source] dedup_queue_size = 50000 rpc_ws_url = "" diff --git a/service-mango-pnl/src/main.rs b/service-mango-pnl/src/main.rs index 04f4233..c4246fc 100644 --- a/service-mango-pnl/src/main.rs +++ b/service-mango-pnl/src/main.rs @@ -33,6 +33,7 @@ pub struct JsonRpcConfig { #[derive(Clone, Debug, Deserialize)] pub struct Config { pub source: SourceConfig, + pub metrics: MetricsConfig, pub pnl: PnlConfig, pub jsonrpc_server: JsonRpcConfig, } @@ -197,7 +198,7 @@ async fn main() -> anyhow::Result<()> { solana_logger::setup_with_default("info"); info!("startup"); - let metrics_tx = metrics::start(); + let metrics_tx = metrics::start(config.metrics); let chain_data = Arc::new(RwLock::new(ChainData::new())); let pnl_data = Arc::new(RwLock::new(PnlData::new())); diff --git a/service-mango-pnl/template-config.toml b/service-mango-pnl/template-config.toml index 8177e4f..7f6c995 100644 --- a/service-mango-pnl/template-config.toml +++ b/service-mango-pnl/template-config.toml @@ -1,3 +1,7 @@ +[metrics] +output_stdout = true +output_http = true + [source] dedup_queue_size = 50000 rpc_ws_url = ""