From 5c40c12cf585de4d6449d868d35efeca7fd5726a Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Fri, 5 Aug 2022 13:50:44 +0200 Subject: [PATCH] Liquidator: cleanups --- liquidator/src/liquidate.rs | 11 +++++-- liquidator/src/main.rs | 49 +++++++++++++++--------------- liquidator/src/snapshot_source.rs | 8 ++--- liquidator/src/websocket_source.rs | 12 +++----- 4 files changed, 41 insertions(+), 39 deletions(-) diff --git a/liquidator/src/liquidate.rs b/liquidator/src/liquidate.rs index f15d810ad..a37b31ed9 100644 --- a/liquidator/src/liquidate.rs +++ b/liquidator/src/liquidate.rs @@ -8,6 +8,10 @@ use mango_v4::state::{ use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey}; +pub struct Config { + pub min_health_ratio: f64, +} + pub fn new_health_cache_( context: &MangoGroupContext, account_fetcher: &chain_data::AccountFetcher, @@ -41,9 +45,9 @@ pub fn process_account( mango_client: &MangoClient, account_fetcher: &chain_data::AccountFetcher, pubkey: &Pubkey, + config: &Config, ) -> anyhow::Result<()> { - // TODO: configurable - let min_health_ratio = I80F48::from_num(50.0f64); + let min_health_ratio = I80F48::from_num(config.min_health_ratio); let quote_token_index = 0; let account = account_fetcher.fetch_mango_account(pubkey)?; @@ -166,9 +170,10 @@ pub fn process_accounts<'a>( mango_client: &MangoClient, account_fetcher: &chain_data::AccountFetcher, accounts: impl Iterator, + config: &Config, ) -> anyhow::Result<()> { for pubkey in accounts { - match process_account(mango_client, account_fetcher, pubkey) { + match process_account(mango_client, account_fetcher, pubkey, config) { Err(err) => { // Not all errors need to be raised to the user's attention. let mut log_level = log::Level::Error; diff --git a/liquidator/src/main.rs b/liquidator/src/main.rs index 11a6b9320..2a947dcc7 100644 --- a/liquidator/src/main.rs +++ b/liquidator/src/main.rs @@ -67,13 +67,17 @@ struct Cli { #[clap(long, env, default_value = "300")] snapshot_interval_secs: u64, - // how many getMultipleAccounts requests to send in parallel + /// how many getMultipleAccounts requests to send in parallel #[clap(long, env, default_value = "10")] parallel_rpc_requests: usize, - // typically 100 is the max number for getMultipleAccounts + /// typically 100 is the max number of accounts getMultipleAccounts will retrieve at once #[clap(long, env, default_value = "100")] get_multiple_accounts_count: usize, + + /// liquidator health ratio should not fall below this value + #[clap(long, env, default_value = "50")] + min_health_ratio: f64, } pub fn encode_address(addr: &Pubkey) -> String { @@ -114,8 +118,7 @@ async fn main() -> anyhow::Result<()> { let group_context = MangoGroupContext::new_from_rpc(mango_group, cluster.clone(), commitment)?; - // TODO: this is all oracles, not just pyth! - let mango_pyth_oracles = group_context + let mango_oracles = group_context .tokens .values() .map(|value| value.mint_info.oracle) @@ -145,7 +148,7 @@ async fn main() -> anyhow::Result<()> { serum_program: cli.serum_program, open_orders_authority: mango_group, }, - mango_pyth_oracles.clone(), + mango_oracles.clone(), websocket_sender, ); @@ -169,7 +172,7 @@ async fn main() -> anyhow::Result<()> { snapshot_interval: std::time::Duration::from_secs(cli.snapshot_interval_secs), min_slot: first_websocket_slot + 10, }, - mango_pyth_oracles, + mango_oracles, snapshot_sender, ); @@ -183,14 +186,6 @@ async fn main() -> anyhow::Result<()> { let mut oracles = HashSet::::new(); let mut perp_markets = HashMap::::new(); - // List of accounts that are potentially liquidatable. - // - // Used to send a different message for newly liqudatable accounts and - // accounts that are still liquidatable but not fresh anymore. - // - // This should actually be done per connected websocket client, and not globally. - let _current_candidates = HashSet::::new(); - // Is the first snapshot done? Only start checking account health when it is. let mut one_snapshot_done = false; @@ -211,6 +206,10 @@ async fn main() -> anyhow::Result<()> { )?) }; + let liq_config = liquidate::Config { + min_health_ratio: cli.min_health_ratio, + }; + info!("main loop"); loop { tokio::select! { @@ -239,10 +238,10 @@ async fn main() -> anyhow::Result<()> { } if let Err(err) = liquidate::process_accounts( - &mango_client, - &account_fetcher, - std::iter::once(&account_write.pubkey), - + &mango_client, + &account_fetcher, + std::iter::once(&account_write.pubkey), + &liq_config, ) { warn!("could not process account {}: {:?}", account_write.pubkey, err); } @@ -270,9 +269,10 @@ async fn main() -> anyhow::Result<()> { // However, this currently takes like 50ms for me in release builds, // so optimizing much seems unnecessary. if let Err(err) = liquidate::process_accounts( - &mango_client, - &account_fetcher, - mango_accounts.iter(), + &mango_client, + &account_fetcher, + mango_accounts.iter(), + &liq_config, ) { warn!("could not process accounts: {:?}", err); } @@ -304,9 +304,10 @@ async fn main() -> anyhow::Result<()> { // trigger a full health check if let Err(err) = liquidate::process_accounts( - &mango_client, - &account_fetcher, - mango_accounts.iter(), + &mango_client, + &account_fetcher, + mango_accounts.iter(), + &liq_config, ) { warn!("could not process accounts: {:?}", err); } diff --git a/liquidator/src/snapshot_source.rs b/liquidator/src/snapshot_source.rs index 6d59f5bb7..71624d5f6 100644 --- a/liquidator/src/snapshot_source.rs +++ b/liquidator/src/snapshot_source.rs @@ -86,7 +86,7 @@ pub struct Config { async fn feed_snapshots( config: &Config, - mango_pyth_oracles: Vec, + mango_oracles: Vec, sender: &async_channel::Sender, ) -> anyhow::Result<()> { let rpc_client = http::connect_with_options::(&config.rpc_http_url, true) @@ -128,7 +128,7 @@ async fn feed_snapshots( let results: Vec<( Vec, Result>>, jsonrpc_core_client::RpcError>, - )> = stream::iter(mango_pyth_oracles) + )> = stream::iter(mango_oracles) .chunks(config.get_multiple_accounts_count) .map(|keys| { let rpc_client = &rpc_client; @@ -207,7 +207,7 @@ async fn feed_snapshots( pub fn start( config: Config, - mango_pyth_oracles: Vec, + mango_oracles: Vec, sender: async_channel::Sender, ) { let mut poll_wait_first_snapshot = time::interval(time::Duration::from_secs(2)); @@ -239,7 +239,7 @@ pub fn start( loop { interval_between_snapshots.tick().await; - if let Err(err) = feed_snapshots(&config, mango_pyth_oracles.clone(), &sender).await { + if let Err(err) = feed_snapshots(&config, mango_oracles.clone(), &sender).await { warn!("snapshot error: {:?}", err); } else { info!("snapshot success"); diff --git a/liquidator/src/websocket_source.rs b/liquidator/src/websocket_source.rs index 9f2462e79..8af14dcbd 100644 --- a/liquidator/src/websocket_source.rs +++ b/liquidator/src/websocket_source.rs @@ -57,7 +57,7 @@ pub struct Config { async fn feed_data( config: &Config, - mango_pyth_oracles: Vec, + mango_oracles: Vec, sender: async_channel::Sender, ) -> anyhow::Result<()> { let connect = ws::try_connect::(&config.rpc_ws_url).map_err_anyhow()?; @@ -101,7 +101,7 @@ async fn feed_data( .map_err_anyhow()?; // TODO: mango_pyth_oracles should not contain stub mango_pyth_oracles, since they already sub'ed with mango_sub let mut mango_pyth_oracles_sub_map = StreamMap::new(); - for oracle in mango_pyth_oracles.into_iter() { + for oracle in mango_oracles.into_iter() { mango_pyth_oracles_sub_map.insert( oracle, client @@ -171,16 +171,12 @@ async fn feed_data( } } -pub fn start( - config: Config, - mango_pyth_oracles: Vec, - sender: async_channel::Sender, -) { +pub fn start(config: Config, mango_oracles: Vec, sender: async_channel::Sender) { tokio::spawn(async move { // if the websocket disconnects, we get no data in a while etc, reconnect and try again loop { info!("connecting to solana websocket streams"); - let out = feed_data(&config, mango_pyth_oracles.clone(), sender.clone()); + let out = feed_data(&config, mango_oracles.clone(), sender.clone()); let _ = out.await; } });