cargo fmt

This commit is contained in:
Riordan Panayides 2022-10-07 11:46:19 +01:00
parent 2881378b7f
commit e2729306b8
5 changed files with 131 additions and 69 deletions

View File

@ -1,6 +1,7 @@
use crate::{ use crate::{
chain_data::{AccountData, ChainData, SlotData}, chain_data::{AccountData, ChainData, SlotData},
metrics::{Metrics, MetricType}, AccountWrite, SlotUpdate, metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate,
}; };
use log::*; use log::*;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
@ -245,9 +246,12 @@ pub async fn init(
)> { )> {
let metrics_sender = metrics_sender.clone(); let metrics_sender = metrics_sender.clone();
let mut metric_events_new = metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Gauge); let mut metric_events_new =
let mut metric_events_change = metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Gauge); metrics_sender.register_u64("fills_feed_events_new".into(), MetricType::Gauge);
let mut metrics_events_drop = metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Gauge); let mut metric_events_change =
metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Gauge);
let mut metrics_events_drop =
metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Gauge);
// The actual message may want to also contain a retry count, if it self-reinserts on failure? // The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) = let (account_write_queue_sender, account_write_queue_receiver) =

View File

@ -19,8 +19,9 @@ pub mod geyser_proto {
use geyser_proto::accounts_db_client::AccountsDbClient; use geyser_proto::accounts_db_client::AccountsDbClient;
use crate::{ use crate::{
metrics::{Metrics, MetricType}, AccountWrite, AnyhowWrap, GrpcSourceConfig, SlotStatus, SlotUpdate, metrics::{MetricType, Metrics},
SnapshotSourceConfig, SourceConfig, TlsConfig, AccountWrite, AnyhowWrap, GrpcSourceConfig, SlotStatus, SlotUpdate, SnapshotSourceConfig,
SourceConfig, TlsConfig,
}; };
use solana_geyser_connector_plugin_grpc::compression::zstd_decompress; use solana_geyser_connector_plugin_grpc::compression::zstd_decompress;
@ -289,10 +290,10 @@ pub async fn process_events(
let tls_config = grpc_source.tls.as_ref().map(make_tls_config); let tls_config = grpc_source.tls.as_ref().map(make_tls_config);
tokio::spawn(async move { tokio::spawn(async move {
let mut metric_retries = metrics_sender.register_u64(format!( let mut metric_retries = metrics_sender.register_u64(
"grpc_source_{}_connection_retries", format!("grpc_source_{}_connection_retries", grpc_source.name,),
grpc_source.name, MetricType::Counter,
), MetricType::Counter); );
let metric_connected = let metric_connected =
metrics_sender.register_bool(format!("grpc_source_{}_status", grpc_source.name)); metrics_sender.register_bool(format!("grpc_source_{}_status", grpc_source.name));
@ -335,11 +336,16 @@ pub async fn process_events(
// Number of slots to retain in latest_write // Number of slots to retain in latest_write
let latest_write_retention = 50; let latest_write_retention = 50;
let mut metric_account_writes = metrics_sender.register_u64("grpc_account_writes".into(), MetricType::Counter); let mut metric_account_writes =
let mut metric_account_queue = metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge); metrics_sender.register_u64("grpc_account_writes".into(), MetricType::Counter);
let mut metric_slot_queue = metrics_sender.register_u64("slot_update_queue".into(), MetricType::Gauge); let mut metric_account_queue =
let mut metric_slot_updates = metrics_sender.register_u64("grpc_slot_updates".into(), MetricType::Counter); metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge);
let mut metric_snapshots = metrics_sender.register_u64("grpc_snapshots".into(), MetricType::Counter); let mut metric_slot_queue =
metrics_sender.register_u64("slot_update_queue".into(), MetricType::Gauge);
let mut metric_slot_updates =
metrics_sender.register_u64("grpc_slot_updates".into(), MetricType::Counter);
let mut metric_snapshots =
metrics_sender.register_u64("grpc_snapshots".into(), MetricType::Counter);
let mut metric_snapshot_account_writes = let mut metric_snapshot_account_writes =
metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter); metrics_sender.register_u64("grpc_snapshot_account_writes".into(), MetricType::Counter);

View File

@ -2,17 +2,26 @@ use {
crate::MetricsConfig, crate::MetricsConfig,
log::*, log::*,
std::collections::HashMap, std::collections::HashMap,
std::fmt,
std::sync::{atomic, Arc, Mutex, RwLock}, std::sync::{atomic, Arc, Mutex, RwLock},
tokio::time, tokio::time,
warp::{Filter, Rejection, Reply}, warp::{Filter, Rejection, Reply},
std::fmt,
}; };
#[derive(Debug)] #[derive(Debug)]
enum Value { enum Value {
U64 { value: Arc<atomic::AtomicU64>, metric_type: MetricType }, U64 {
I64 { value: Arc<atomic::AtomicI64>, metric_type: MetricType }, value: Arc<atomic::AtomicU64>,
Bool { value: Arc<Mutex<bool>>, metric_type: MetricType }, metric_type: MetricType,
},
I64 {
value: Arc<atomic::AtomicI64>,
metric_type: MetricType,
},
Bool {
value: Arc<Mutex<bool>>,
metric_type: MetricType,
},
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -26,10 +35,10 @@ impl fmt::Display for MetricType {
match self { match self {
MetricType::Counter => { MetricType::Counter => {
write!(f, "counter") write!(f, "counter")
}, }
MetricType::Gauge => { MetricType::Gauge => {
write!(f, "gauge") write!(f, "gauge")
}, }
} }
} }
} }
@ -108,12 +117,16 @@ pub struct Metrics {
impl Metrics { impl Metrics {
pub fn register_u64(&self, name: String, metric_type: MetricType) -> MetricU64 { pub fn register_u64(&self, name: String, metric_type: MetricType) -> MetricU64 {
let mut registry = self.registry.write().unwrap(); let mut registry = self.registry.write().unwrap();
let value = registry let value = registry.entry(name).or_insert(Value::U64 {
.entry(name) value: Arc::new(atomic::AtomicU64::new(0)),
.or_insert(Value::U64 { value: Arc::new(atomic::AtomicU64::new(0)), metric_type: metric_type }); metric_type: metric_type,
});
MetricU64 { MetricU64 {
value: match value { value: match value {
Value::U64 { value: v, metric_type: _ } => v.clone(), Value::U64 {
value: v,
metric_type: _,
} => v.clone(),
_ => panic!("bad metric type"), _ => panic!("bad metric type"),
}, },
} }
@ -121,12 +134,16 @@ impl Metrics {
pub fn register_i64(&self, name: String, metric_type: MetricType) -> MetricI64 { pub fn register_i64(&self, name: String, metric_type: MetricType) -> MetricI64 {
let mut registry = self.registry.write().unwrap(); let mut registry = self.registry.write().unwrap();
let value = registry let value = registry.entry(name).or_insert(Value::I64 {
.entry(name) value: Arc::new(atomic::AtomicI64::new(0)),
.or_insert(Value::I64 { value: Arc::new(atomic::AtomicI64::new(0)), metric_type: metric_type }); metric_type: metric_type,
});
MetricI64 { MetricI64 {
value: match value { value: match value {
Value::I64 { value: v, metric_type: _ } => v.clone(), Value::I64 {
value: v,
metric_type: _,
} => v.clone(),
_ => panic!("bad metric type"), _ => panic!("bad metric type"),
}, },
} }
@ -134,12 +151,16 @@ impl Metrics {
pub fn register_bool(&self, name: String) -> MetricBool { pub fn register_bool(&self, name: String) -> MetricBool {
let mut registry = self.registry.write().unwrap(); let mut registry = self.registry.write().unwrap();
let value = registry let value = registry.entry(name).or_insert(Value::Bool {
.entry(name) value: Arc::new(Mutex::new(false)),
.or_insert(Value::Bool { value: Arc::new(Mutex::new(false)), metric_type: MetricType::Gauge }); metric_type: MetricType::Gauge,
});
MetricBool { MetricBool {
value: match value { value: match value {
Value::Bool { value: v, metric_type: _ } => v.clone(), Value::Bool {
value: v,
metric_type: _,
} => v.clone(),
_ => panic!("bad metric type"), _ => panic!("bad metric type"),
}, },
} }
@ -150,18 +171,25 @@ impl Metrics {
let metrics = self.registry.read().unwrap(); let metrics = self.registry.read().unwrap();
for (name, value) in metrics.iter() { for (name, value) in metrics.iter() {
let (value_str, type_str) = match value { let (value_str, type_str) = match value {
Value::U64 { value: v, metric_type: t } => { Value::U64 {
(format!("{}", v.load(atomic::Ordering::Acquire)), t.to_string()) value: v,
} metric_type: t,
Value::I64 { value: v, metric_type: t } => { } => (
(format!("{}", v.load(atomic::Ordering::Acquire)), t.to_string()) format!("{}", v.load(atomic::Ordering::Acquire)),
} t.to_string(),
Value::Bool { value: v, metric_type: t } => { ),
let bool_to_int = if *v.lock().unwrap() { Value::I64 {
1 value: v,
} else { metric_type: t,
0 } => (
}; format!("{}", v.load(atomic::Ordering::Acquire)),
t.to_string(),
),
Value::Bool {
value: v,
metric_type: t,
} => {
let bool_to_int = if *v.lock().unwrap() { 1 } else { 0 };
(format!("{}", bool_to_int), t.to_string()) (format!("{}", bool_to_int), t.to_string())
} }
}; };
@ -232,7 +260,10 @@ pub fn start(config: MetricsConfig) -> Metrics {
for (name, value) in metrics.iter() { for (name, value) in metrics.iter() {
let previous_value = previous_values.get_mut(name); let previous_value = previous_values.get_mut(name);
match value { match value {
Value::U64 { value: v, metric_type: _ } => { Value::U64 {
value: v,
metric_type: _,
} => {
let new_value = v.load(atomic::Ordering::Acquire); let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::U64(v)) = previous_value { let previous_value = if let Some(PrevValue::U64(v)) = previous_value {
let prev = *v; let prev = *v;
@ -245,7 +276,10 @@ pub fn start(config: MetricsConfig) -> Metrics {
let diff = new_value.wrapping_sub(previous_value) as i64; let diff = new_value.wrapping_sub(previous_value) as i64;
info!("metric: {}: {} ({:+})", name, new_value, diff); info!("metric: {}: {} ({:+})", name, new_value, diff);
} }
Value::I64 { value: v, metric_type: _ } => { Value::I64 {
value: v,
metric_type: _,
} => {
let new_value = v.load(atomic::Ordering::Acquire); let new_value = v.load(atomic::Ordering::Acquire);
let previous_value = if let Some(PrevValue::I64(v)) = previous_value { let previous_value = if let Some(PrevValue::I64(v)) = previous_value {
let prev = *v; let prev = *v;
@ -258,10 +292,12 @@ pub fn start(config: MetricsConfig) -> Metrics {
let diff = new_value - previous_value; let diff = new_value - previous_value;
info!("metric: {}: {} ({:+})", name, new_value, diff); info!("metric: {}: {} ({:+})", name, new_value, diff);
} }
Value::Bool { value: v, metric_type: _ } => { Value::Bool {
value: v,
metric_type: _,
} => {
let new_value = v.lock().unwrap(); let new_value = v.lock().unwrap();
let previous_value = if let Some(PrevValue::Bool(v)) = previous_value let previous_value = if let Some(PrevValue::Bool(v)) = previous_value {
{
let mut prev = new_value.clone(); let mut prev = new_value.clone();
std::mem::swap(&mut prev, v); std::mem::swap(&mut prev, v);
prev prev

View File

@ -372,8 +372,10 @@ pub async fn init(
let (slot_inserter_sender, slot_inserter_receiver) = let (slot_inserter_sender, slot_inserter_receiver) =
async_channel::unbounded::<(SlotUpdate, SlotPreprocessing)>(); async_channel::unbounded::<(SlotUpdate, SlotPreprocessing)>();
let metric_con_retries = metrics_sender.register_u64("postgres_connection_retries".into(), MetricType::Counter); let metric_con_retries =
let metric_con_live = metrics_sender.register_u64("postgres_connections_alive".into(), MetricType::Gauge); metrics_sender.register_u64("postgres_connection_retries".into(), MetricType::Counter);
let metric_con_live =
metrics_sender.register_u64("postgres_connections_alive".into(), MetricType::Gauge);
// postgres account write sending worker threads // postgres account write sending worker threads
for _ in 0..config.account_write_connection_count { for _ in 0..config.account_write_connection_count {
@ -383,10 +385,12 @@ pub async fn init(
let account_write_queue_receiver_c = account_write_queue_receiver.clone(); let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let account_tables_c = account_tables.clone(); let account_tables_c = account_tables.clone();
let config = config.clone(); let config = config.clone();
let mut metric_retries = let mut metric_retries = metrics_sender
metrics_sender.register_u64("postgres_account_write_retries".into(), MetricType::Counter); .register_u64("postgres_account_write_retries".into(), MetricType::Counter);
let mut metric_last_write = let mut metric_last_write = metrics_sender.register_u64(
metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into(), MetricType::Gauge); "postgres_account_write_last_write_timestamp".into(),
MetricType::Gauge,
);
tokio::spawn(async move { tokio::spawn(async move {
let mut client_opt = None; let mut client_opt = None;
loop { loop {
@ -449,7 +453,8 @@ pub async fn init(
} }
// slot update handling thread // slot update handling thread
let mut metric_slot_queue = metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge); let mut metric_slot_queue =
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
tokio::spawn(async move { tokio::spawn(async move {
let mut slots = Slots::new(); let mut slots = Slots::new();
@ -486,9 +491,12 @@ pub async fn init(
.await?; .await?;
let receiver_c = slot_inserter_receiver.clone(); let receiver_c = slot_inserter_receiver.clone();
let config = config.clone(); let config = config.clone();
let mut metric_retries = metrics_sender.register_u64("postgres_slot_update_retries".into(), MetricType::Counter); let mut metric_retries =
let mut metric_last_write = metrics_sender.register_u64("postgres_slot_update_retries".into(), MetricType::Counter);
metrics_sender.register_u64("postgres_slot_last_write_timestamp".into(), MetricType::Gauge); let mut metric_last_write = metrics_sender.register_u64(
"postgres_slot_last_write_timestamp".into(),
MetricType::Gauge,
);
let slots_processing = slots_processing.clone(); let slots_processing = slots_processing.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut client_opt = None; let mut client_opt = None;
@ -535,8 +543,10 @@ pub async fn init(
let postgres_con = let postgres_con =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?; .await?;
let mut metric_last_cleanup = let mut metric_last_cleanup = metrics_sender.register_u64(
metrics_sender.register_u64("postgres_cleanup_last_success_timestamp".into(), MetricType::Gauge); "postgres_cleanup_last_success_timestamp".into(),
MetricType::Gauge,
);
let mut metric_cleanup_errors = let mut metric_cleanup_errors =
metrics_sender.register_u64("postgres_cleanup_errors".into(), MetricType::Counter); metrics_sender.register_u64("postgres_cleanup_errors".into(), MetricType::Counter);
let config = config.clone(); let config = config.clone();
@ -567,12 +577,18 @@ pub async fn init(
let postgres_con = let postgres_con =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone()) postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?; .await?;
let metric_slot_last_write = let metric_slot_last_write = metrics_sender.register_u64(
metrics_sender.register_u64("postgres_slot_last_write_timestamp".into(), MetricType::Gauge); "postgres_slot_last_write_timestamp".into(),
let metric_account_write_last_write = MetricType::Gauge,
metrics_sender.register_u64("postgres_account_write_last_write_timestamp".into(), MetricType::Gauge); );
let metric_account_queue = metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge); let metric_account_write_last_write = metrics_sender.register_u64(
let metric_slot_queue = metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge); "postgres_account_write_last_write_timestamp".into(),
MetricType::Gauge,
);
let metric_account_queue =
metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge);
let metric_slot_queue =
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
let config = config.clone(); let config = config.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut client_opt = None; let mut client_opt = None;

View File

@ -9,7 +9,7 @@ use tokio::{
use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize; use serde::Deserialize;
use solana_geyser_connector_lib::metrics::{MetricU64, MetricType}; use solana_geyser_connector_lib::metrics::{MetricType, MetricU64};
use solana_geyser_connector_lib::{ use solana_geyser_connector_lib::{
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage, MarketConfig}, fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage, MarketConfig},
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig, grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,