diff --git a/liquidator/src/chain_data.rs b/client/src/chain_data.rs similarity index 66% rename from liquidator/src/chain_data.rs rename to client/src/chain_data.rs index d50cb3553..b2580c7e4 100644 --- a/liquidator/src/chain_data.rs +++ b/client/src/chain_data.rs @@ -1,16 +1,8 @@ -use crate::FIRST_WEBSOCKET_SLOT; - use { - log::*, solana_sdk::account::AccountSharedData, solana_sdk::pubkey::Pubkey, - std::collections::HashMap, + solana_sdk::account::AccountSharedData, solana_sdk::pubkey::Pubkey, std::collections::HashMap, }; -use { - // TODO: None of these should be here - crate::metrics, - crate::snapshot_source, - crate::websocket_source, -}; +pub use crate::chain_data_fetcher::AccountFetcher; #[derive(Clone, Copy, Debug, PartialEq)] pub enum SlotStatus { @@ -28,7 +20,7 @@ pub struct SlotData { } #[derive(Clone, Debug)] -pub struct AccountData { +pub struct AccountAndSlot { pub slot: u64, pub account: AccountSharedData, } @@ -41,33 +33,24 @@ pub struct ChainData { /// only slots >= newest_rooted_slot are retained slots: HashMap, /// writes to accounts, only the latest rooted write an newer are retained - accounts: HashMap>, + accounts: HashMap>, newest_rooted_slot: u64, newest_processed_slot: u64, best_chain_slot: u64, - - // storing global metrics here is not good style - metric_slots_count: metrics::MetricU64, - metric_accounts_count: metrics::MetricU64, - metric_account_write_count: metrics::MetricU64, } impl ChainData { - pub fn new(metrics: &metrics::Metrics) -> Self { + pub fn new() -> Self { Self { slots: HashMap::new(), accounts: HashMap::new(), newest_rooted_slot: 0, newest_processed_slot: 0, best_chain_slot: 0, - metric_slots_count: metrics.register_u64("chain_data_slots_count".into()), - metric_accounts_count: metrics.register_u64("chain_data_accounts_count".into()), - metric_account_write_count: metrics - .register_u64("chain_data_account_write_count".into()), } } - fn update_slot(&mut self, new_slot: SlotData) { + pub fn update_slot(&mut self, new_slot: SlotData) { let new_processed_head = new_slot.slot > self.newest_processed_slot; if new_processed_head { self.newest_processed_slot = new_slot.slot; @@ -154,19 +137,10 @@ impl ChainData { // now it's fine to drop any slots before the new rooted head // as account writes for non-rooted slots before it have been dropped self.slots.retain(|s, _| *s >= self.newest_rooted_slot); - - self.metric_slots_count.set(self.slots.len() as u64); - self.metric_accounts_count.set(self.accounts.len() as u64); - self.metric_account_write_count.set( - self.accounts - .iter() - .map(|(_key, writes)| writes.len() as u64) - .sum(), - ); } } - fn update_account(&mut self, pubkey: Pubkey, account: AccountData) { + pub fn update_account(&mut self, pubkey: Pubkey, account: AccountAndSlot) { use std::collections::hash_map::Entry; match self.accounts.entry(pubkey) { Entry::Vacant(v) => { @@ -191,71 +165,7 @@ impl ChainData { }; } - pub fn update_from_snapshot(&mut self, snapshot: snapshot_source::AccountSnapshot) { - for account_write in snapshot.accounts { - self.update_account( - account_write.pubkey, - AccountData { - slot: account_write.slot, - account: account_write.account, - }, - ); - } - } - - pub fn update_from_websocket(&mut self, message: websocket_source::Message) { - match message { - websocket_source::Message::Account(account_write) => { - trace!("websocket account message"); - self.update_account( - account_write.pubkey, - AccountData { - slot: account_write.slot, - account: account_write.account, - }, - ); - } - websocket_source::Message::Slot(slot_update) => { - trace!("websocket slot message"); - let slot_update = match *slot_update { - solana_client::rpc_response::SlotUpdate::CreatedBank { - slot, parent, .. - } => Some(SlotData { - slot, - parent: Some(parent), - status: SlotStatus::Processed, - chain: 0, - }), - solana_client::rpc_response::SlotUpdate::OptimisticConfirmation { - slot, - .. - } => Some(SlotData { - slot, - parent: None, - status: SlotStatus::Confirmed, - chain: 0, - }), - solana_client::rpc_response::SlotUpdate::Root { slot, .. } => Some(SlotData { - slot, - parent: None, - status: SlotStatus::Rooted, - chain: 0, - }), - _ => None, - }; - if let Some(update) = slot_update { - if FIRST_WEBSOCKET_SLOT.get().is_none() { - FIRST_WEBSOCKET_SLOT.set(update.slot).expect("always Ok"); - log::debug!("first slot for websocket {}", update.slot); - } - - self.update_slot(update); - } - } - } - } - - pub fn update_from_rpc(&mut self, pubkey: &Pubkey, account: AccountData) { + pub fn update_from_rpc(&mut self, pubkey: &Pubkey, account: AccountAndSlot) { // Add a stub slot if the rpc has information about the future. // If it's in the past, either the slot already exists (and maybe we have // the data already), or it was a skipped slot and adding it now makes no difference. @@ -271,7 +181,7 @@ impl ChainData { self.update_account(*pubkey, account) } - fn is_account_write_live(&self, write: &AccountData) -> bool { + fn is_account_write_live(&self, write: &AccountAndSlot) -> bool { self.slots .get(&write.slot) // either the slot is rooted, in the current chain or newer than the chain head @@ -285,7 +195,7 @@ impl ChainData { } /// Cloned snapshot of all the most recent live writes per pubkey - pub fn accounts_snapshot(&self) -> HashMap { + pub fn accounts_snapshot(&self) -> HashMap { self.accounts .iter() .filter_map(|(pubkey, writes)| { @@ -304,7 +214,7 @@ impl ChainData { } /// Ref to the most recent live write of the pubkey, along with the slot that it was for - pub fn account_and_slot<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountData> { + pub fn account_and_slot<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountAndSlot> { self.accounts .get(pubkey) .ok_or_else(|| anyhow::anyhow!("account {} not found", pubkey))? @@ -313,4 +223,16 @@ impl ChainData { .find(|w| self.is_account_write_live(w)) .ok_or_else(|| anyhow::anyhow!("account {} has no live data", pubkey)) } + + pub fn slots_count(&self) -> usize { + self.slots.len() + } + + pub fn accounts_count(&self) -> usize { + self.accounts.len() + } + + pub fn account_writes_count(&self) -> usize { + self.accounts.values().map(|v| v.len()).sum() + } } diff --git a/liquidator/src/chain_data_fetcher.rs b/client/src/chain_data_fetcher.rs similarity index 88% rename from liquidator/src/chain_data_fetcher.rs rename to client/src/chain_data_fetcher.rs index bce92e90f..8d46b0c5b 100644 --- a/liquidator/src/chain_data_fetcher.rs +++ b/client/src/chain_data_fetcher.rs @@ -2,7 +2,6 @@ use std::sync::{Arc, RwLock}; use crate::chain_data::*; -use client::AccountFetcher; use mango_v4::accounts_zerocopy::LoadZeroCopy; use mango_v4::state::MangoAccountValue; @@ -12,12 +11,12 @@ use solana_client::rpc_client::RpcClient; use solana_sdk::account::{AccountSharedData, ReadableAccount}; use solana_sdk::pubkey::Pubkey; -pub struct ChainDataAccountFetcher { +pub struct AccountFetcher { pub chain_data: Arc>, pub rpc: RpcClient, } -impl ChainDataAccountFetcher { +impl AccountFetcher { // loads from ChainData pub fn fetch( &self, @@ -71,22 +70,17 @@ impl ChainDataAccountFetcher { let mut chain_data = self.chain_data.write().unwrap(); chain_data.update_from_rpc( address, - AccountData { + AccountAndSlot { slot: response.context.slot, account: account.into(), }, ); - log::trace!( - "refreshed data of account {} via rpc, got context slot {}", - address, - response.context.slot - ); Ok(()) } } -impl AccountFetcher for ChainDataAccountFetcher { +impl crate::AccountFetcher for AccountFetcher { fn fetch_raw_account(&self, address: Pubkey) -> anyhow::Result { self.fetch_raw(&address).map(|a| a.into()) } diff --git a/client/src/lib.rs b/client/src/lib.rs index b6ccaaedc..c65e53a78 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -4,6 +4,8 @@ pub use context::*; pub use util::*; mod account_fetcher; +pub mod chain_data; +mod chain_data_fetcher; mod client; mod context; mod gpa; diff --git a/liquidator/src/liquidate.rs b/liquidator/src/liquidate.rs index 41f7221be..8e72fecf1 100644 --- a/liquidator/src/liquidate.rs +++ b/liquidator/src/liquidate.rs @@ -1,7 +1,6 @@ use crate::account_shared_data::KeyedAccountSharedData; -use crate::ChainDataAccountFetcher; -use client::{AccountFetcher, MangoClient, MangoClientError, MangoGroupContext}; +use client::{chain_data, AccountFetcher, MangoClient, MangoClientError, MangoGroupContext}; use mango_v4::state::{ new_health_cache, oracle_price, Bank, FixedOrderAccountRetriever, HealthCache, HealthType, MangoAccountValue, TokenIndex, @@ -11,7 +10,7 @@ use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; pub fn new_health_cache_( context: &MangoGroupContext, - account_fetcher: &ChainDataAccountFetcher, + account_fetcher: &chain_data::AccountFetcher, account: &MangoAccountValue, ) -> anyhow::Result { let active_token_len = account.token_iter_active().count(); @@ -40,7 +39,7 @@ pub fn new_health_cache_( #[allow(clippy::too_many_arguments)] pub fn process_account( mango_client: &MangoClient, - account_fetcher: &ChainDataAccountFetcher, + account_fetcher: &chain_data::AccountFetcher, pubkey: &Pubkey, ) -> anyhow::Result<()> { // TODO: configurable @@ -165,7 +164,7 @@ pub fn process_account( #[allow(clippy::too_many_arguments)] pub fn process_accounts<'a>( mango_client: &MangoClient, - account_fetcher: &ChainDataAccountFetcher, + account_fetcher: &chain_data::AccountFetcher, accounts: impl Iterator, ) -> anyhow::Result<()> { for pubkey in accounts { diff --git a/liquidator/src/main.rs b/liquidator/src/main.rs index b432deb89..245260e2c 100644 --- a/liquidator/src/main.rs +++ b/liquidator/src/main.rs @@ -4,12 +4,10 @@ use std::time::Duration; use anchor_client::Cluster; use clap::Parser; -use client::{keypair_from_cli, MangoClient, MangoGroupContext}; +use client::{chain_data, keypair_from_cli, MangoClient, MangoGroupContext}; use log::*; use mango_v4::state::{PerpMarketIndex, TokenIndex}; -use once_cell::sync::OnceCell; - use solana_client::rpc_client::RpcClient; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::pubkey::Pubkey; @@ -17,16 +15,12 @@ use solana_sdk::signer::Signer; use std::collections::HashSet; pub mod account_shared_data; -pub mod chain_data; -pub mod chain_data_fetcher; pub mod liquidate; pub mod metrics; pub mod snapshot_source; pub mod util; pub mod websocket_source; -use crate::chain_data::*; -use crate::chain_data_fetcher::*; use crate::util::{is_mango_account, is_mango_bank, is_mint_info, is_perp_market}; // jemalloc seems to be better at keeping the memory footprint reasonable over @@ -34,9 +28,6 @@ use crate::util::{is_mango_account, is_mango_bank, is_mint_info, is_perp_market} #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -// first slot received from websocket feed -static FIRST_WEBSOCKET_SLOT: OnceCell = OnceCell::new(); - trait AnyhowWrap { type Value; fn map_err_anyhow(self) -> anyhow::Result; @@ -170,6 +161,12 @@ async fn main() -> anyhow::Result<()> { websocket_sender, ); + let first_websocket_slot = websocket_source::get_next_create_bank_slot( + websocket_receiver.clone(), + Duration::from_secs(10), + ) + .await?; + // Getting solana account snapshots via jsonrpc let (snapshot_sender, snapshot_receiver) = async_channel::unbounded::(); @@ -182,15 +179,16 @@ async fn main() -> anyhow::Result<()> { get_multiple_accounts_count: cli.get_multiple_accounts_count, parallel_rpc_requests: cli.parallel_rpc_requests, snapshot_interval: std::time::Duration::from_secs(cli.snapshot_interval_secs), + min_slot: first_websocket_slot + 10, }, mango_pyth_oracles, snapshot_sender, ); // The representation of current on-chain account data - let chain_data = Arc::new(RwLock::new(ChainData::new(&metrics))); + let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new())); // Reading accounts from chain_data - let account_fetcher = Arc::new(ChainDataAccountFetcher { + let account_fetcher = Arc::new(chain_data::AccountFetcher { chain_data: chain_data.clone(), rpc: RpcClient::new_with_timeout_and_commitment( cluster.url().to_string(), @@ -199,6 +197,8 @@ async fn main() -> anyhow::Result<()> { ), }); + start_chain_data_metrics(chain_data.clone(), &metrics); + // Addresses of the MangoAccounts belonging to the mango program. // Needed to check health of them all when the cache updates. let mut mango_accounts = HashSet::::new(); @@ -245,8 +245,7 @@ async fn main() -> anyhow::Result<()> { let message = message.expect("channel not closed"); // build a model of slots and accounts in `chain_data` - // this code should be generic so it can be reused in future projects - chain_data.write().unwrap().update_from_websocket(message.clone()); + websocket_source::update_chain_data(&mut chain_data.write().unwrap(), message.clone()); // specific program logic using the mirrored data if let websocket_source::Message::Account(account_write) = message { @@ -325,7 +324,7 @@ async fn main() -> anyhow::Result<()> { } metric_mango_accounts.set(mango_accounts.len() as u64); - chain_data.write().unwrap().update_from_snapshot(message); + snapshot_source::update_chain_data(&mut chain_data.write().unwrap(), message); one_snapshot_done = true; // trigger a full health check @@ -340,3 +339,22 @@ async fn main() -> anyhow::Result<()> { } } } + +fn start_chain_data_metrics(chain: Arc>, metrics: &metrics::Metrics) { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); + + let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into()); + let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into()); + let mut metric_account_write_count = + metrics.register_u64("chain_data_account_write_count".into()); + + tokio::spawn(async move { + loop { + interval.tick().await; + let chain_lock = chain.read().unwrap(); + metric_slots_count.set(chain_lock.slots_count() as u64); + metric_accounts_count.set(chain_lock.accounts_count() as u64); + metric_account_write_count.set(chain_lock.account_writes_count() as u64); + } + }); +} diff --git a/liquidator/src/metrics.rs b/liquidator/src/metrics.rs index 554426468..434d55d17 100644 --- a/liquidator/src/metrics.rs +++ b/liquidator/src/metrics.rs @@ -151,7 +151,7 @@ pub fn start() -> Metrics { 0 }; let diff = new_value.wrapping_sub(previous_value) as i64; - log::debug!("metric: {}: {} ({:+})", name, new_value, diff); + log::info!("metric: {}: {} ({:+})", name, new_value, diff); } Value::I64(v) => { let new_value = v.load(atomic::Ordering::Acquire); @@ -164,7 +164,7 @@ pub fn start() -> Metrics { 0 }; let diff = new_value - previous_value; - log::debug!("metric: {}: {} ({:+})", name, new_value, diff); + log::info!("metric: {}: {} ({:+})", name, new_value, diff); } Value::String(v) => { let new_value = v.lock().unwrap(); @@ -178,9 +178,9 @@ pub fn start() -> Metrics { "".into() }; if *new_value == previous_value { - log::debug!("metric: {}: {} (unchanged)", name, &*new_value); + log::info!("metric: {}: {} (unchanged)", name, &*new_value); } else { - log::debug!( + log::info!( "metric: {}: {} (before: {})", name, &*new_value, diff --git a/liquidator/src/snapshot_source.rs b/liquidator/src/snapshot_source.rs index ef8a8ac8d..6d59f5bb7 100644 --- a/liquidator/src/snapshot_source.rs +++ b/liquidator/src/snapshot_source.rs @@ -18,7 +18,9 @@ use std::str::FromStr; use std::time::Duration; use tokio::time; -use crate::{util::is_mango_account, AnyhowWrap, FIRST_WEBSOCKET_SLOT}; +use client::chain_data; + +use crate::{util::is_mango_account, AnyhowWrap}; #[derive(Clone)] pub struct AccountUpdate { @@ -79,6 +81,7 @@ pub struct Config { pub get_multiple_accounts_count: usize, pub parallel_rpc_requests: usize, pub snapshot_interval: Duration, + pub min_slot: u64, } async fn feed_snapshots( @@ -94,7 +97,7 @@ async fn feed_snapshots( encoding: Some(UiAccountEncoding::Base64), commitment: Some(CommitmentConfig::finalized()), data_slice: None, - min_context_slot: None, + min_context_slot: Some(config.min_slot), }; let all_accounts_config = RpcProgramAccountsConfig { filters: None, @@ -215,6 +218,7 @@ pub fn start( .await .expect("always Ok"); + // Wait for slot to exceed min_slot loop { poll_wait_first_snapshot.tick().await; @@ -227,14 +231,9 @@ pub fn start( .expect("always Ok"); log::debug!("latest slot for snapshot {}", epoch_info.absolute_slot); - match FIRST_WEBSOCKET_SLOT.get() { - Some(first_websocket_slot) => { - if first_websocket_slot < &epoch_info.absolute_slot { - log::debug!("continuing to fetch snapshot now, first websocket feed slot {} is older than latest snapshot slot {}",first_websocket_slot, epoch_info.absolute_slot); - break; - } - } - None => {} + if epoch_info.absolute_slot > config.min_slot { + log::debug!("continuing to fetch snapshot now, min_slot {} is older than latest epoch slot {}", config.min_slot, epoch_info.absolute_slot); + break; } } @@ -248,3 +247,15 @@ pub fn start( } }); } + +pub fn update_chain_data(chain: &mut chain_data::ChainData, snapshot: AccountSnapshot) { + for account_update in snapshot.accounts { + chain.update_account( + account_update.pubkey, + chain_data::AccountAndSlot { + account: account_update.account, + slot: account_update.slot, + }, + ); + } +} diff --git a/liquidator/src/websocket_source.rs b/liquidator/src/websocket_source.rs index d53b56d64..9f2462e79 100644 --- a/liquidator/src/websocket_source.rs +++ b/liquidator/src/websocket_source.rs @@ -10,10 +10,13 @@ use solana_client::{ use solana_rpc::rpc_pubsub::RpcSolPubSubClient; use solana_sdk::{account::AccountSharedData, commitment_config::CommitmentConfig, pubkey::Pubkey}; +use anyhow::Context; use log::*; use std::{str::FromStr, sync::Arc, time::Duration}; use tokio_stream::StreamMap; +use client::chain_data; + use crate::AnyhowWrap; #[derive(Clone)] @@ -182,3 +185,89 @@ pub fn start( } }); } + +pub fn update_chain_data(chain: &mut chain_data::ChainData, message: Message) { + use chain_data::*; + match message { + Message::Account(account_write) => { + trace!("websocket account message"); + chain.update_account( + account_write.pubkey, + AccountAndSlot { + slot: account_write.slot, + account: account_write.account, + }, + ); + } + Message::Slot(slot_update) => { + trace!("websocket slot message"); + let slot_update = match *slot_update { + solana_client::rpc_response::SlotUpdate::CreatedBank { slot, parent, .. } => { + Some(SlotData { + slot, + parent: Some(parent), + status: SlotStatus::Processed, + chain: 0, + }) + } + solana_client::rpc_response::SlotUpdate::OptimisticConfirmation { + slot, .. + } => Some(SlotData { + slot, + parent: None, + status: SlotStatus::Confirmed, + chain: 0, + }), + solana_client::rpc_response::SlotUpdate::Root { slot, .. } => Some(SlotData { + slot, + parent: None, + status: SlotStatus::Rooted, + chain: 0, + }), + _ => None, + }; + if let Some(update) = slot_update { + chain.update_slot(update); + } + } + } +} + +pub async fn get_next_create_bank_slot( + receiver: async_channel::Receiver, + timeout: Duration, +) -> anyhow::Result { + let start = std::time::Instant::now(); + loop { + let elapsed = start.elapsed(); + if elapsed > timeout { + anyhow::bail!( + "did not receive a slot from the websocket connection in {}s", + timeout.as_secs() + ); + } + let remaining_timeout = timeout - elapsed; + + let msg = match tokio::time::timeout(remaining_timeout, receiver.recv()).await { + // timeout + Err(_) => continue, + // channel close + Ok(Err(err)) => { + return Err(err).context("while waiting for first slot from websocket connection"); + } + // success + Ok(Ok(msg)) => msg, + }; + + match msg { + Message::Slot(slot_update) => { + if let solana_client::rpc_response::SlotUpdate::CreatedBank { slot, .. } = + *slot_update + { + return Ok(slot); + } + } + _ => {} + } + } +}