Improve snapshotting: avoid gPA for Serum OpenOrders

Instead send getMultipleAccounts for the accounts that are actually
needed.
This commit is contained in:
Christian Kamm 2022-04-25 09:24:05 +02:00
parent 24bb725759
commit 1e660667ed
6 changed files with 109 additions and 47 deletions

View File

@ -6,6 +6,8 @@ mango_cache_id = "Ge5isD68ThYorC6CoL4xdSK9eShEg2KXrnwABSzyFP4j"
mango_signer_id = "DUWfSgNPHJfmTEhnzE19W2Jchdno69M1mJ1mwHkAu7GK"
serum_program_id = "DESVgJVGajEgKGXhb6XmqDHGz3VjdgP7rEVESBgxmroY"
snapshot_interval_secs = 240
parallel_rpc_requests = 10
get_multiple_accounts_count = 100
websocket_server_bind_address = "localhost:9123"
early_candidate_percentage = 1.0

View File

@ -18,6 +18,12 @@ serum_program_id = "9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin"
# Interval between requesting getProgramAccounts data snapshots
snapshot_interval_secs = 240
# Number of parallel getMultipleAccounts requests to send during snapshotting.
parallel_rpc_requests = 10
# Number of accounts that can be retrieved in a getMultipleAccounts call.
# Typically 100 for normal RPC nodes.
get_multiple_accounts_count = 100
# Address at which a websocket server will be openend. Connect
# to listen to information about liquidatable accounts.
websocket_server_bind_address = "localhost:9123"

View File

@ -1,7 +1,7 @@
use {
crate::Config,
crate::chain_data::ChainData,
crate::websocket_sink::{LiquidationCanditate, HealthInfo},
crate::websocket_sink::{HealthInfo, LiquidationCanditate},
crate::Config,
anyhow::Context,
fixed::types::I80F48,
log::*,
@ -17,7 +17,7 @@ use {
};
// FUTURE: It'd be very nice if I could map T to the DataType::T constant!
fn load_mango_account<T: Loadable + Sized>(
pub fn load_mango_account<T: Loadable + Sized>(
data_type: DataType,
account: &AccountSharedData,
) -> anyhow::Result<&T> {
@ -94,8 +94,8 @@ struct Health {
candidate: bool,
being_liquidated: bool,
health_fraction: I80F48, // always maint
assets: I80F48, // always maint
liabilities: I80F48, // always maint
assets: I80F48, // always maint
liabilities: I80F48, // always maint
}
fn check_health(
@ -116,7 +116,8 @@ fn check_health(
I80F48::MAX
};
let still_being_liquidated = account.being_liquidated && health_cache.get_health(group, HealthType::Init) < 0;
let still_being_liquidated =
account.being_liquidated && health_cache.get_health(group, HealthType::Init) < 0;
let threshold = 1.0 + config.early_candidate_percentage / 100.0;
let candidate = health_fraction < threshold || still_being_liquidated;

View File

@ -47,6 +47,10 @@ pub struct Config {
pub serum_program_id: String,
pub snapshot_interval_secs: u64,
pub websocket_server_bind_address: String,
// how many getMultipleAccounts requests to send in parallel
pub parallel_rpc_requests: usize,
// typically 100 is the max number for getMultipleAccounts
pub get_multiple_accounts_count: usize,
pub early_candidate_percentage: f64,
}

View File

@ -1,19 +1,20 @@
use jsonrpc_core_client::transports::http;
use solana_account_decoder::UiAccountEncoding;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, MemcmpEncodedBytes, RpcFilterType},
rpc_response::{Response, RpcKeyedAccount},
};
use solana_rpc::{rpc::rpc_full::FullClient, rpc::OptionalContext};
use solana_sdk::{account::AccountSharedData, commitment_config::CommitmentConfig, pubkey::Pubkey};
use anyhow::Context;
use futures::{stream, StreamExt};
use log::*;
use std::str::FromStr;
use tokio::time;
use crate::{AnyhowWrap, Config};
use crate::{healthcheck, AnyhowWrap, Config};
#[derive(Clone)]
pub struct AccountUpdate {
@ -28,7 +29,10 @@ pub struct AccountSnapshot {
}
impl AccountSnapshot {
pub fn extend_from_rpc(&mut self, rpc: Response<Vec<RpcKeyedAccount>>) -> anyhow::Result<()> {
pub fn extend_from_gpa_rpc(
&mut self,
rpc: Response<Vec<RpcKeyedAccount>>,
) -> anyhow::Result<()> {
self.accounts.reserve(rpc.value.len());
for a in rpc.value {
self.accounts.push(AccountUpdate {
@ -42,6 +46,26 @@ impl AccountSnapshot {
}
Ok(())
}
pub fn extend_from_gma_rpc(
&mut self,
keys: &[Pubkey],
rpc: Response<Vec<Option<UiAccount>>>,
) -> anyhow::Result<()> {
self.accounts.reserve(rpc.value.len());
for (&pubkey, a) in keys.iter().zip(rpc.value.iter()) {
if let Some(ui_account) = a {
self.accounts.push(AccountUpdate {
slot: rpc.context.slot,
pubkey,
account: ui_account
.decode()
.ok_or(anyhow::anyhow!("could not decode account"))?,
});
}
}
Ok(())
}
}
async fn feed_snapshots(
@ -49,8 +73,6 @@ async fn feed_snapshots(
sender: &async_channel::Sender<AccountSnapshot>,
) -> anyhow::Result<()> {
let mango_program_id = Pubkey::from_str(&config.mango_program_id)?;
let serum_program_id = Pubkey::from_str(&config.serum_program_id)?;
let mango_signer_id = Pubkey::from_str(&config.mango_signer_id)?;
let rpc_client = http::connect_with_options::<FullClient>(&config.rpc_http_url, true)
.await
@ -66,54 +88,81 @@ async fn feed_snapshots(
with_context: Some(true),
account_config: account_info_config.clone(),
};
let open_orders_accounts_config = RpcProgramAccountsConfig {
// filter for only OpenOrders with mango_signer as owner
filters: Some(vec![
RpcFilterType::DataSize(3228), // open orders size
RpcFilterType::Memcmp(Memcmp {
offset: 0,
// "serum" + u64 that is Initialized (1) + OpenOrders (4)
bytes: MemcmpEncodedBytes::Base58("AcUQf4PGf6fCHGwmpB".into()),
encoding: None,
}),
RpcFilterType::Memcmp(Memcmp {
offset: 45, // owner is the 4th field, after "serum" (header), account_flags: u64 and market: Pubkey
bytes: MemcmpEncodedBytes::Bytes(mango_signer_id.to_bytes().into()),
encoding: None,
}),
]),
with_context: Some(true),
account_config: account_info_config.clone(),
};
// TODO: This way the snapshots are done sequentially, and a failing snapshot prohibits the second one to be attempted
let mut snapshot = AccountSnapshot::default();
// Get all accounts of the mango program
let response = rpc_client
.get_program_accounts(
mango_program_id.to_string(),
Some(all_accounts_config.clone()),
)
.await
.map_err_anyhow()?;
.map_err_anyhow()
.context("error during getProgamAccounts for mango program")?;
if let OptionalContext::Context(account_snapshot_response) = response {
snapshot.extend_from_rpc(account_snapshot_response)?;
snapshot.extend_from_gpa_rpc(account_snapshot_response)?;
} else {
anyhow::bail!("did not receive context");
}
let response = rpc_client
.get_program_accounts(
serum_program_id.to_string(),
Some(open_orders_accounts_config.clone()),
)
.await
.map_err_anyhow()?;
if let OptionalContext::Context(account_snapshot_response) = response {
snapshot.extend_from_rpc(account_snapshot_response)?;
} else {
anyhow::bail!("did not receive context");
// Get all the active open orders account keys
let oo_account_pubkeys =
snapshot
.accounts
.iter()
.filter_map(|update| {
if let Ok(mango_account) = healthcheck::load_mango_account::<
mango::state::MangoAccount,
>(
mango::state::DataType::MangoAccount, &update.account
) {
if mango_account.mango_group.to_string() == config.mango_group_id {
Some(mango_account)
} else {
None
}
} else {
None
}
})
.flat_map(|mango_account| {
mango_account
.in_margin_basket
.iter()
.zip(mango_account.spot_open_orders.iter())
.filter_map(|(in_basket, oo)| in_basket.then(|| *oo))
})
.collect::<Vec<Pubkey>>();
// Retrieve all the open orders accounts
let results = stream::iter(oo_account_pubkeys)
.chunks(config.get_multiple_accounts_count)
.map(|keys| {
let rpc_client = &rpc_client;
let account_info_config = account_info_config.clone();
async move {
let string_keys = keys.iter().map(|k| k.to_string()).collect::<Vec<_>>();
(
keys,
rpc_client
.get_multiple_accounts(string_keys, Some(account_info_config))
.await,
)
}
})
.buffer_unordered(config.parallel_rpc_requests)
.collect::<Vec<_>>()
.await;
for (keys, result) in results {
snapshot.extend_from_gma_rpc(
&keys,
result
.map_err_anyhow()
.context("error during getMultipleAccounts for OpenOrders accounts")?,
)?;
}
sender.send(snapshot).await.expect("sending must succeed");

View File

@ -1,6 +1,7 @@
use {
crate::Config,
anyhow::Context,
fixed::types::I80F48,
futures_util::{SinkExt, StreamExt},
log::*,
serde::Serialize,
@ -9,7 +10,6 @@ use {
tokio::net::{TcpListener, TcpStream},
//std::str::FromStr,
tokio::sync::broadcast,
fixed::types::I80F48,
};
#[derive(Clone, Debug)]
@ -17,8 +17,8 @@ pub struct HealthInfo {
pub account: Pubkey,
pub being_liquidated: bool,
pub health_fraction: I80F48, // always maint
pub assets: I80F48, // always maint
pub liabilities: I80F48, // always maint
pub assets: I80F48, // always maint
pub liabilities: I80F48, // always maint
}
#[derive(Clone, Debug)]