Liquidator: cleanups
This commit is contained in:
parent
5c34b60105
commit
5c40c12cf5
|
@ -8,6 +8,10 @@ use mango_v4::state::{
|
||||||
|
|
||||||
use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
|
use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
|
||||||
|
|
||||||
|
pub struct Config {
|
||||||
|
pub min_health_ratio: f64,
|
||||||
|
}
|
||||||
|
|
||||||
pub fn new_health_cache_(
|
pub fn new_health_cache_(
|
||||||
context: &MangoGroupContext,
|
context: &MangoGroupContext,
|
||||||
account_fetcher: &chain_data::AccountFetcher,
|
account_fetcher: &chain_data::AccountFetcher,
|
||||||
|
@ -41,9 +45,9 @@ pub fn process_account(
|
||||||
mango_client: &MangoClient,
|
mango_client: &MangoClient,
|
||||||
account_fetcher: &chain_data::AccountFetcher,
|
account_fetcher: &chain_data::AccountFetcher,
|
||||||
pubkey: &Pubkey,
|
pubkey: &Pubkey,
|
||||||
|
config: &Config,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
// TODO: configurable
|
let min_health_ratio = I80F48::from_num(config.min_health_ratio);
|
||||||
let min_health_ratio = I80F48::from_num(50.0f64);
|
|
||||||
let quote_token_index = 0;
|
let quote_token_index = 0;
|
||||||
|
|
||||||
let account = account_fetcher.fetch_mango_account(pubkey)?;
|
let account = account_fetcher.fetch_mango_account(pubkey)?;
|
||||||
|
@ -166,9 +170,10 @@ pub fn process_accounts<'a>(
|
||||||
mango_client: &MangoClient,
|
mango_client: &MangoClient,
|
||||||
account_fetcher: &chain_data::AccountFetcher,
|
account_fetcher: &chain_data::AccountFetcher,
|
||||||
accounts: impl Iterator<Item = &'a Pubkey>,
|
accounts: impl Iterator<Item = &'a Pubkey>,
|
||||||
|
config: &Config,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
for pubkey in accounts {
|
for pubkey in accounts {
|
||||||
match process_account(mango_client, account_fetcher, pubkey) {
|
match process_account(mango_client, account_fetcher, pubkey, config) {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// Not all errors need to be raised to the user's attention.
|
// Not all errors need to be raised to the user's attention.
|
||||||
let mut log_level = log::Level::Error;
|
let mut log_level = log::Level::Error;
|
||||||
|
|
|
@ -67,13 +67,17 @@ struct Cli {
|
||||||
#[clap(long, env, default_value = "300")]
|
#[clap(long, env, default_value = "300")]
|
||||||
snapshot_interval_secs: u64,
|
snapshot_interval_secs: u64,
|
||||||
|
|
||||||
// how many getMultipleAccounts requests to send in parallel
|
/// how many getMultipleAccounts requests to send in parallel
|
||||||
#[clap(long, env, default_value = "10")]
|
#[clap(long, env, default_value = "10")]
|
||||||
parallel_rpc_requests: usize,
|
parallel_rpc_requests: usize,
|
||||||
|
|
||||||
// typically 100 is the max number for getMultipleAccounts
|
/// typically 100 is the max number of accounts getMultipleAccounts will retrieve at once
|
||||||
#[clap(long, env, default_value = "100")]
|
#[clap(long, env, default_value = "100")]
|
||||||
get_multiple_accounts_count: usize,
|
get_multiple_accounts_count: usize,
|
||||||
|
|
||||||
|
/// liquidator health ratio should not fall below this value
|
||||||
|
#[clap(long, env, default_value = "50")]
|
||||||
|
min_health_ratio: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn encode_address(addr: &Pubkey) -> String {
|
pub fn encode_address(addr: &Pubkey) -> String {
|
||||||
|
@ -114,8 +118,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let group_context = MangoGroupContext::new_from_rpc(mango_group, cluster.clone(), commitment)?;
|
let group_context = MangoGroupContext::new_from_rpc(mango_group, cluster.clone(), commitment)?;
|
||||||
|
|
||||||
// TODO: this is all oracles, not just pyth!
|
let mango_oracles = group_context
|
||||||
let mango_pyth_oracles = group_context
|
|
||||||
.tokens
|
.tokens
|
||||||
.values()
|
.values()
|
||||||
.map(|value| value.mint_info.oracle)
|
.map(|value| value.mint_info.oracle)
|
||||||
|
@ -145,7 +148,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
serum_program: cli.serum_program,
|
serum_program: cli.serum_program,
|
||||||
open_orders_authority: mango_group,
|
open_orders_authority: mango_group,
|
||||||
},
|
},
|
||||||
mango_pyth_oracles.clone(),
|
mango_oracles.clone(),
|
||||||
websocket_sender,
|
websocket_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -169,7 +172,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
snapshot_interval: std::time::Duration::from_secs(cli.snapshot_interval_secs),
|
snapshot_interval: std::time::Duration::from_secs(cli.snapshot_interval_secs),
|
||||||
min_slot: first_websocket_slot + 10,
|
min_slot: first_websocket_slot + 10,
|
||||||
},
|
},
|
||||||
mango_pyth_oracles,
|
mango_oracles,
|
||||||
snapshot_sender,
|
snapshot_sender,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -183,14 +186,6 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let mut oracles = HashSet::<Pubkey>::new();
|
let mut oracles = HashSet::<Pubkey>::new();
|
||||||
let mut perp_markets = HashMap::<PerpMarketIndex, Pubkey>::new();
|
let mut perp_markets = HashMap::<PerpMarketIndex, Pubkey>::new();
|
||||||
|
|
||||||
// List of accounts that are potentially liquidatable.
|
|
||||||
//
|
|
||||||
// Used to send a different message for newly liqudatable accounts and
|
|
||||||
// accounts that are still liquidatable but not fresh anymore.
|
|
||||||
//
|
|
||||||
// This should actually be done per connected websocket client, and not globally.
|
|
||||||
let _current_candidates = HashSet::<Pubkey>::new();
|
|
||||||
|
|
||||||
// Is the first snapshot done? Only start checking account health when it is.
|
// Is the first snapshot done? Only start checking account health when it is.
|
||||||
let mut one_snapshot_done = false;
|
let mut one_snapshot_done = false;
|
||||||
|
|
||||||
|
@ -211,6 +206,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
)?)
|
)?)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let liq_config = liquidate::Config {
|
||||||
|
min_health_ratio: cli.min_health_ratio,
|
||||||
|
};
|
||||||
|
|
||||||
info!("main loop");
|
info!("main loop");
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
@ -239,10 +238,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(err) = liquidate::process_accounts(
|
if let Err(err) = liquidate::process_accounts(
|
||||||
&mango_client,
|
&mango_client,
|
||||||
&account_fetcher,
|
&account_fetcher,
|
||||||
std::iter::once(&account_write.pubkey),
|
std::iter::once(&account_write.pubkey),
|
||||||
|
&liq_config,
|
||||||
) {
|
) {
|
||||||
warn!("could not process account {}: {:?}", account_write.pubkey, err);
|
warn!("could not process account {}: {:?}", account_write.pubkey, err);
|
||||||
}
|
}
|
||||||
|
@ -270,9 +269,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
// However, this currently takes like 50ms for me in release builds,
|
// However, this currently takes like 50ms for me in release builds,
|
||||||
// so optimizing much seems unnecessary.
|
// so optimizing much seems unnecessary.
|
||||||
if let Err(err) = liquidate::process_accounts(
|
if let Err(err) = liquidate::process_accounts(
|
||||||
&mango_client,
|
&mango_client,
|
||||||
&account_fetcher,
|
&account_fetcher,
|
||||||
mango_accounts.iter(),
|
mango_accounts.iter(),
|
||||||
|
&liq_config,
|
||||||
) {
|
) {
|
||||||
warn!("could not process accounts: {:?}", err);
|
warn!("could not process accounts: {:?}", err);
|
||||||
}
|
}
|
||||||
|
@ -304,9 +304,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
// trigger a full health check
|
// trigger a full health check
|
||||||
if let Err(err) = liquidate::process_accounts(
|
if let Err(err) = liquidate::process_accounts(
|
||||||
&mango_client,
|
&mango_client,
|
||||||
&account_fetcher,
|
&account_fetcher,
|
||||||
mango_accounts.iter(),
|
mango_accounts.iter(),
|
||||||
|
&liq_config,
|
||||||
) {
|
) {
|
||||||
warn!("could not process accounts: {:?}", err);
|
warn!("could not process accounts: {:?}", err);
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,7 +86,7 @@ pub struct Config {
|
||||||
|
|
||||||
async fn feed_snapshots(
|
async fn feed_snapshots(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
mango_pyth_oracles: Vec<Pubkey>,
|
mango_oracles: Vec<Pubkey>,
|
||||||
sender: &async_channel::Sender<AccountSnapshot>,
|
sender: &async_channel::Sender<AccountSnapshot>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let rpc_client = http::connect_with_options::<AccountsDataClient>(&config.rpc_http_url, true)
|
let rpc_client = http::connect_with_options::<AccountsDataClient>(&config.rpc_http_url, true)
|
||||||
|
@ -128,7 +128,7 @@ async fn feed_snapshots(
|
||||||
let results: Vec<(
|
let results: Vec<(
|
||||||
Vec<Pubkey>,
|
Vec<Pubkey>,
|
||||||
Result<Response<Vec<Option<UiAccount>>>, jsonrpc_core_client::RpcError>,
|
Result<Response<Vec<Option<UiAccount>>>, jsonrpc_core_client::RpcError>,
|
||||||
)> = stream::iter(mango_pyth_oracles)
|
)> = stream::iter(mango_oracles)
|
||||||
.chunks(config.get_multiple_accounts_count)
|
.chunks(config.get_multiple_accounts_count)
|
||||||
.map(|keys| {
|
.map(|keys| {
|
||||||
let rpc_client = &rpc_client;
|
let rpc_client = &rpc_client;
|
||||||
|
@ -207,7 +207,7 @@ async fn feed_snapshots(
|
||||||
|
|
||||||
pub fn start(
|
pub fn start(
|
||||||
config: Config,
|
config: Config,
|
||||||
mango_pyth_oracles: Vec<Pubkey>,
|
mango_oracles: Vec<Pubkey>,
|
||||||
sender: async_channel::Sender<AccountSnapshot>,
|
sender: async_channel::Sender<AccountSnapshot>,
|
||||||
) {
|
) {
|
||||||
let mut poll_wait_first_snapshot = time::interval(time::Duration::from_secs(2));
|
let mut poll_wait_first_snapshot = time::interval(time::Duration::from_secs(2));
|
||||||
|
@ -239,7 +239,7 @@ pub fn start(
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
interval_between_snapshots.tick().await;
|
interval_between_snapshots.tick().await;
|
||||||
if let Err(err) = feed_snapshots(&config, mango_pyth_oracles.clone(), &sender).await {
|
if let Err(err) = feed_snapshots(&config, mango_oracles.clone(), &sender).await {
|
||||||
warn!("snapshot error: {:?}", err);
|
warn!("snapshot error: {:?}", err);
|
||||||
} else {
|
} else {
|
||||||
info!("snapshot success");
|
info!("snapshot success");
|
||||||
|
|
|
@ -57,7 +57,7 @@ pub struct Config {
|
||||||
|
|
||||||
async fn feed_data(
|
async fn feed_data(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
mango_pyth_oracles: Vec<Pubkey>,
|
mango_oracles: Vec<Pubkey>,
|
||||||
sender: async_channel::Sender<Message>,
|
sender: async_channel::Sender<Message>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let connect = ws::try_connect::<RpcSolPubSubClient>(&config.rpc_ws_url).map_err_anyhow()?;
|
let connect = ws::try_connect::<RpcSolPubSubClient>(&config.rpc_ws_url).map_err_anyhow()?;
|
||||||
|
@ -101,7 +101,7 @@ async fn feed_data(
|
||||||
.map_err_anyhow()?;
|
.map_err_anyhow()?;
|
||||||
// TODO: mango_pyth_oracles should not contain stub mango_pyth_oracles, since they already sub'ed with mango_sub
|
// TODO: mango_pyth_oracles should not contain stub mango_pyth_oracles, since they already sub'ed with mango_sub
|
||||||
let mut mango_pyth_oracles_sub_map = StreamMap::new();
|
let mut mango_pyth_oracles_sub_map = StreamMap::new();
|
||||||
for oracle in mango_pyth_oracles.into_iter() {
|
for oracle in mango_oracles.into_iter() {
|
||||||
mango_pyth_oracles_sub_map.insert(
|
mango_pyth_oracles_sub_map.insert(
|
||||||
oracle,
|
oracle,
|
||||||
client
|
client
|
||||||
|
@ -171,16 +171,12 @@ async fn feed_data(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start(
|
pub fn start(config: Config, mango_oracles: Vec<Pubkey>, sender: async_channel::Sender<Message>) {
|
||||||
config: Config,
|
|
||||||
mango_pyth_oracles: Vec<Pubkey>,
|
|
||||||
sender: async_channel::Sender<Message>,
|
|
||||||
) {
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// if the websocket disconnects, we get no data in a while etc, reconnect and try again
|
// if the websocket disconnects, we get no data in a while etc, reconnect and try again
|
||||||
loop {
|
loop {
|
||||||
info!("connecting to solana websocket streams");
|
info!("connecting to solana websocket streams");
|
||||||
let out = feed_data(&config, mango_pyth_oracles.clone(), sender.clone());
|
let out = feed_data(&config, mango_oracles.clone(), sender.clone());
|
||||||
let _ = out.await;
|
let _ = out.await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue