liquidator: add some latency metrics (#904)

liquidator: add some latency metrics
This commit is contained in:
Serge Farny 2024-03-07 11:44:23 +01:00 committed by GitHub
parent a7aaaff07e
commit 5affbb9cee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 157 additions and 9 deletions

15
Cargo.lock generated
View File

@ -2455,6 +2455,20 @@ version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
[[package]]
name = "hdrhistogram"
version = "7.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
dependencies = [
"base64 0.21.4",
"byteorder",
"crossbeam-channel",
"flate2 1.0.27",
"nom 7.1.3",
"num-traits",
]
[[package]]
name = "headers"
version = "0.3.9"
@ -3522,6 +3536,7 @@ dependencies = [
"futures 0.3.28",
"futures-core",
"futures-util",
"hdrhistogram",
"itertools",
"jemallocator",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",

View File

@ -49,3 +49,4 @@ tokio-stream = { version = "0.1.9"}
tokio-tungstenite = "0.16.1"
tracing = "0.1"
regex = "1.9.5"
hdrhistogram = "7.5.4"

View File

@ -282,6 +282,8 @@ async fn main() -> anyhow::Result<()> {
let mut metric_account_update_queue_len =
metrics.register_u64("account_update_queue_length".into());
let mut metric_chain_update_latency =
metrics.register_latency("in-memory chain update".into());
let mut metric_mango_accounts = metrics.register_u64("mango_accounts".into());
let mut mint_infos = HashMap::<TokenIndex, Pubkey>::new();
@ -294,6 +296,7 @@ async fn main() -> anyhow::Result<()> {
.recv()
.await
.expect("channel not closed");
let current_time = Instant::now();
metric_account_update_queue_len.set(account_update_receiver.len() as u64);
message.update_chain_data(&mut chain_data.write().unwrap());
@ -301,6 +304,15 @@ async fn main() -> anyhow::Result<()> {
match message {
Message::Account(account_write) => {
let mut state = shared_state.write().unwrap();
let reception_time = account_write.reception_time;
state.oldest_chain_event_reception_time = Some(
state
.oldest_chain_event_reception_time
.unwrap_or(reception_time),
);
metric_chain_update_latency.push(current_time - reception_time);
if is_mango_account(&account_write.account, &mango_group).is_some() {
// e.g. to render debug logs RUST_LOG="liquidator=debug"
debug!(
@ -315,8 +327,21 @@ async fn main() -> anyhow::Result<()> {
}
Message::Snapshot(snapshot) => {
let mut state = shared_state.write().unwrap();
let mut reception_time = None;
// Track all mango account pubkeys
for update in snapshot.iter() {
reception_time = Some(
update
.reception_time
.min(reception_time.unwrap_or(update.reception_time)),
);
state.oldest_chain_event_reception_time = Some(
state
.oldest_chain_event_reception_time
.unwrap_or(update.reception_time),
);
if is_mango_account(&update.account, &mango_group).is_some() {
state.mango_accounts.insert(update.pubkey);
}
@ -330,6 +355,11 @@ async fn main() -> anyhow::Result<()> {
oracles.insert(perp_market.oracle);
}
}
if reception_time.is_some() {
metric_chain_update_latency
.push(current_time - reception_time.unwrap());
}
metric_mango_accounts.set(state.mango_accounts.len() as u64);
state.one_snapshot_done = true;
@ -369,35 +399,82 @@ async fn main() -> anyhow::Result<()> {
let liquidation_job = tokio::spawn({
let mut interval =
mango_v4_client::delay_interval(Duration::from_millis(cli.check_interval_ms));
let mut metric_liquidation_check = metrics.register_latency("liquidation_check".into());
let mut metric_liquidation_start_end =
metrics.register_latency("liquidation_start_end".into());
let mut liquidation_start_time = None;
let mut tcs_start_time = None;
let shared_state = shared_state.clone();
async move {
loop {
interval.tick().await;
let account_addresses = {
let state = shared_state.write().unwrap();
let mut state = shared_state.write().unwrap();
if !state.one_snapshot_done {
// discard first latency info as it will skew data too much
state.oldest_chain_event_reception_time = None;
continue;
}
if state.oldest_chain_event_reception_time.is_none()
&& liquidation_start_time.is_none()
{
// no new update, skip computing
continue;
}
state.mango_accounts.iter().cloned().collect_vec()
};
liquidation.errors.update();
liquidation.oracle_errors.update();
if liquidation_start_time.is_none() {
liquidation_start_time = Some(Instant::now());
}
let liquidated = liquidation
.maybe_liquidate_one(account_addresses.iter())
.await;
if !liquidated {
// This will be incorrect if we liquidate the last checked account
// (We will wait for next full run, skewing latency metrics)
// Probability is very low, might not need to be fixed
let mut state = shared_state.write().unwrap();
let reception_time = state.oldest_chain_event_reception_time.unwrap();
let current_time = Instant::now();
state.oldest_chain_event_reception_time = None;
metric_liquidation_check.push(current_time - reception_time);
metric_liquidation_start_end
.push(current_time - liquidation_start_time.unwrap());
liquidation_start_time = None;
}
let mut took_tcs = false;
if !liquidated && cli.take_tcs == BoolArg::True {
tcs_start_time = Some(tcs_start_time.unwrap_or(Instant::now()));
took_tcs = liquidation
.maybe_take_token_conditional_swap(account_addresses.iter())
.await
.unwrap_or_else(|err| {
error!("error during maybe_take_token_conditional_swap: {err}");
false
})
});
if !took_tcs {
let current_time = Instant::now();
let mut metric_tcs_start_end =
metrics.register_latency("tcs_start_end".into());
metric_tcs_start_end.push(current_time - tcs_start_time.unwrap());
tcs_start_time = None;
}
}
if liquidated || took_tcs {
@ -477,6 +554,9 @@ struct SharedState {
/// Is the first snapshot done? Only start checking account health when it is.
one_snapshot_done: bool,
/// Oldest chain event not processed yet
oldest_chain_event_reception_time: Option<Instant>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]

View File

@ -1,3 +1,5 @@
use hdrhistogram::Histogram;
use std::time::Duration;
use {
std::collections::HashMap,
std::sync::{atomic, Arc, Mutex, RwLock},
@ -10,6 +12,7 @@ enum Value {
U64(Arc<atomic::AtomicU64>),
I64(Arc<atomic::AtomicI64>),
String(Arc<Mutex<String>>),
Latency(Arc<Mutex<Histogram<u64>>>),
}
#[derive(Debug)]
@ -49,6 +52,18 @@ impl MetricU64 {
}
}
#[derive(Clone)]
pub struct MetricLatency {
value: Arc<Mutex<Histogram<u64>>>,
}
impl MetricLatency {
pub fn push(&mut self, duration: std::time::Duration) {
let mut guard = self.value.lock().unwrap();
let ns: u64 = duration.as_nanos().try_into().unwrap();
guard.record(ns).expect("latency error");
}
}
#[derive(Clone)]
pub struct MetricI64 {
value: Arc<atomic::AtomicI64>,
@ -110,6 +125,19 @@ impl Metrics {
}
}
pub fn register_latency(&self, name: String) -> MetricLatency {
let mut registry = self.registry.write().unwrap();
let value = registry.entry(name).or_insert_with(|| {
Value::Latency(Arc::new(Mutex::new(Histogram::<u64>::new(3).unwrap())))
});
MetricLatency {
value: match value {
Value::Latency(v) => v.clone(),
_ => panic!("bad metric type"),
},
}
}
pub fn register_string(&self, name: String) -> MetricString {
let mut registry = self.registry.write().unwrap();
let value = registry
@ -187,6 +215,16 @@ pub fn start() -> Metrics {
);
}
}
Value::Latency(v) => {
let hist = v.lock().unwrap();
info!(
"metric: {}: 99'th percentile: {:?}, 99,9'th percentile: {:?}",
name,
Duration::from_nanos(hist.value_at_quantile(0.99)),
Duration::from_nanos(hist.value_at_quantile(0.999))
);
}
}
}
}

View File

@ -1,6 +1,7 @@
use solana_client::rpc_response::{Response, RpcKeyedAccount};
use solana_sdk::{account::AccountSharedData, pubkey::Pubkey};
use std::time::Instant;
use std::{str::FromStr, sync::Arc};
use tracing::*;
@ -11,6 +12,7 @@ pub struct AccountUpdate {
pub pubkey: Pubkey,
pub slot: u64,
pub account: AccountSharedData,
pub reception_time: Instant,
}
impl AccountUpdate {
@ -25,15 +27,22 @@ impl AccountUpdate {
pubkey,
slot: rpc.context.slot,
account,
reception_time: Instant::now(),
})
}
}
#[derive(Clone)]
pub struct ChainSlotUpdate {
pub slot_update: Arc<solana_client::rpc_response::SlotUpdate>,
pub reception_time: Instant,
}
#[derive(Clone)]
pub enum Message {
Account(AccountUpdate),
Snapshot(Vec<AccountUpdate>),
Slot(Arc<solana_client::rpc_response::SlotUpdate>),
Slot(ChainSlotUpdate),
}
impl Message {
@ -65,7 +74,7 @@ impl Message {
}
Message::Slot(slot_update) => {
trace!("websocket slot message");
let slot_update = match **slot_update {
let slot_update = match *(slot_update.slot_update) {
solana_client::rpc_response::SlotUpdate::CreatedBank {
slot, parent, ..
} => Some(SlotData {

View File

@ -15,7 +15,7 @@ use futures::{stream, StreamExt};
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_rpc::rpc::rpc_accounts_scan::AccountsScanClient;
use std::str::FromStr;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::time;
use tracing::*;
@ -55,6 +55,7 @@ impl AccountSnapshot {
.account
.decode()
.ok_or_else(|| anyhow::anyhow!("could not decode account"))?,
reception_time: Instant::now(),
});
}
Ok(())
@ -74,6 +75,7 @@ impl AccountSnapshot {
account: ui_account
.decode()
.ok_or_else(|| anyhow::anyhow!("could not decode account"))?,
reception_time: Instant::now(),
});
}
}

View File

@ -11,11 +11,11 @@ use solana_rpc::rpc_pubsub::RpcSolPubSubClient;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use anyhow::Context;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio_stream::StreamMap;
use tracing::*;
use crate::account_update_stream::{AccountUpdate, Message};
use crate::account_update_stream::{AccountUpdate, ChainSlotUpdate, Message};
use crate::AnyhowWrap;
pub struct Config {
@ -143,7 +143,10 @@ async fn feed_data(
},
message = slot_sub.next() => {
if let Some(data) = message {
sender.send(Message::Slot(data.map_err_anyhow()?)).await.expect("sending must succeed");
sender.send(Message::Slot(ChainSlotUpdate{
slot_update: data.map_err_anyhow()?,
reception_time: Instant::now()
})).await.expect("sending must succeed");
} else {
warn!("slot update stream closed");
return Ok(());
@ -200,7 +203,7 @@ pub async fn get_next_create_bank_slot(
match msg {
Message::Slot(slot_update) => {
if let solana_client::rpc_response::SlotUpdate::CreatedBank { slot, .. } =
*slot_update
*slot_update.slot_update
{
return Ok(slot);
}