Client: Move chain_data to client for reuse

This commit is contained in:
Christian Kamm 2022-07-21 13:03:28 +02:00
parent 91d59c1918
commit 16e1e83e26
8 changed files with 180 additions and 145 deletions

View File

@ -1,16 +1,8 @@
use crate::FIRST_WEBSOCKET_SLOT;
use {
log::*, solana_sdk::account::AccountSharedData, solana_sdk::pubkey::Pubkey,
std::collections::HashMap,
solana_sdk::account::AccountSharedData, solana_sdk::pubkey::Pubkey, std::collections::HashMap,
};
use {
// TODO: None of these should be here
crate::metrics,
crate::snapshot_source,
crate::websocket_source,
};
pub use crate::chain_data_fetcher::AccountFetcher;
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SlotStatus {
@ -28,7 +20,7 @@ pub struct SlotData {
}
#[derive(Clone, Debug)]
pub struct AccountData {
pub struct AccountAndSlot {
pub slot: u64,
pub account: AccountSharedData,
}
@ -41,33 +33,24 @@ pub struct ChainData {
/// only slots >= newest_rooted_slot are retained
slots: HashMap<u64, SlotData>,
/// writes to accounts, only the latest rooted write an newer are retained
accounts: HashMap<Pubkey, Vec<AccountData>>,
accounts: HashMap<Pubkey, Vec<AccountAndSlot>>,
newest_rooted_slot: u64,
newest_processed_slot: u64,
best_chain_slot: u64,
// storing global metrics here is not good style
metric_slots_count: metrics::MetricU64,
metric_accounts_count: metrics::MetricU64,
metric_account_write_count: metrics::MetricU64,
}
impl ChainData {
pub fn new(metrics: &metrics::Metrics) -> Self {
pub fn new() -> Self {
Self {
slots: HashMap::new(),
accounts: HashMap::new(),
newest_rooted_slot: 0,
newest_processed_slot: 0,
best_chain_slot: 0,
metric_slots_count: metrics.register_u64("chain_data_slots_count".into()),
metric_accounts_count: metrics.register_u64("chain_data_accounts_count".into()),
metric_account_write_count: metrics
.register_u64("chain_data_account_write_count".into()),
}
}
fn update_slot(&mut self, new_slot: SlotData) {
pub fn update_slot(&mut self, new_slot: SlotData) {
let new_processed_head = new_slot.slot > self.newest_processed_slot;
if new_processed_head {
self.newest_processed_slot = new_slot.slot;
@ -154,19 +137,10 @@ impl ChainData {
// now it's fine to drop any slots before the new rooted head
// as account writes for non-rooted slots before it have been dropped
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
self.metric_slots_count.set(self.slots.len() as u64);
self.metric_accounts_count.set(self.accounts.len() as u64);
self.metric_account_write_count.set(
self.accounts
.iter()
.map(|(_key, writes)| writes.len() as u64)
.sum(),
);
}
}
fn update_account(&mut self, pubkey: Pubkey, account: AccountData) {
pub fn update_account(&mut self, pubkey: Pubkey, account: AccountAndSlot) {
use std::collections::hash_map::Entry;
match self.accounts.entry(pubkey) {
Entry::Vacant(v) => {
@ -191,71 +165,7 @@ impl ChainData {
};
}
pub fn update_from_snapshot(&mut self, snapshot: snapshot_source::AccountSnapshot) {
for account_write in snapshot.accounts {
self.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
account: account_write.account,
},
);
}
}
pub fn update_from_websocket(&mut self, message: websocket_source::Message) {
match message {
websocket_source::Message::Account(account_write) => {
trace!("websocket account message");
self.update_account(
account_write.pubkey,
AccountData {
slot: account_write.slot,
account: account_write.account,
},
);
}
websocket_source::Message::Slot(slot_update) => {
trace!("websocket slot message");
let slot_update = match *slot_update {
solana_client::rpc_response::SlotUpdate::CreatedBank {
slot, parent, ..
} => Some(SlotData {
slot,
parent: Some(parent),
status: SlotStatus::Processed,
chain: 0,
}),
solana_client::rpc_response::SlotUpdate::OptimisticConfirmation {
slot,
..
} => Some(SlotData {
slot,
parent: None,
status: SlotStatus::Confirmed,
chain: 0,
}),
solana_client::rpc_response::SlotUpdate::Root { slot, .. } => Some(SlotData {
slot,
parent: None,
status: SlotStatus::Rooted,
chain: 0,
}),
_ => None,
};
if let Some(update) = slot_update {
if FIRST_WEBSOCKET_SLOT.get().is_none() {
FIRST_WEBSOCKET_SLOT.set(update.slot).expect("always Ok");
log::debug!("first slot for websocket {}", update.slot);
}
self.update_slot(update);
}
}
}
}
pub fn update_from_rpc(&mut self, pubkey: &Pubkey, account: AccountData) {
pub fn update_from_rpc(&mut self, pubkey: &Pubkey, account: AccountAndSlot) {
// Add a stub slot if the rpc has information about the future.
// If it's in the past, either the slot already exists (and maybe we have
// the data already), or it was a skipped slot and adding it now makes no difference.
@ -271,7 +181,7 @@ impl ChainData {
self.update_account(*pubkey, account)
}
fn is_account_write_live(&self, write: &AccountData) -> bool {
fn is_account_write_live(&self, write: &AccountAndSlot) -> bool {
self.slots
.get(&write.slot)
// either the slot is rooted, in the current chain or newer than the chain head
@ -285,7 +195,7 @@ impl ChainData {
}
/// Cloned snapshot of all the most recent live writes per pubkey
pub fn accounts_snapshot(&self) -> HashMap<Pubkey, AccountData> {
pub fn accounts_snapshot(&self) -> HashMap<Pubkey, AccountAndSlot> {
self.accounts
.iter()
.filter_map(|(pubkey, writes)| {
@ -304,7 +214,7 @@ impl ChainData {
}
/// Ref to the most recent live write of the pubkey, along with the slot that it was for
pub fn account_and_slot<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountData> {
pub fn account_and_slot<'a>(&'a self, pubkey: &Pubkey) -> anyhow::Result<&'a AccountAndSlot> {
self.accounts
.get(pubkey)
.ok_or_else(|| anyhow::anyhow!("account {} not found", pubkey))?
@ -313,4 +223,16 @@ impl ChainData {
.find(|w| self.is_account_write_live(w))
.ok_or_else(|| anyhow::anyhow!("account {} has no live data", pubkey))
}
pub fn slots_count(&self) -> usize {
self.slots.len()
}
pub fn accounts_count(&self) -> usize {
self.accounts.len()
}
pub fn account_writes_count(&self) -> usize {
self.accounts.values().map(|v| v.len()).sum()
}
}

View File

@ -2,7 +2,6 @@ use std::sync::{Arc, RwLock};
use crate::chain_data::*;
use client::AccountFetcher;
use mango_v4::accounts_zerocopy::LoadZeroCopy;
use mango_v4::state::MangoAccountValue;
@ -12,12 +11,12 @@ use solana_client::rpc_client::RpcClient;
use solana_sdk::account::{AccountSharedData, ReadableAccount};
use solana_sdk::pubkey::Pubkey;
pub struct ChainDataAccountFetcher {
pub struct AccountFetcher {
pub chain_data: Arc<RwLock<ChainData>>,
pub rpc: RpcClient,
}
impl ChainDataAccountFetcher {
impl AccountFetcher {
// loads from ChainData
pub fn fetch<T: anchor_lang::ZeroCopy + anchor_lang::Owner>(
&self,
@ -71,22 +70,17 @@ impl ChainDataAccountFetcher {
let mut chain_data = self.chain_data.write().unwrap();
chain_data.update_from_rpc(
address,
AccountData {
AccountAndSlot {
slot: response.context.slot,
account: account.into(),
},
);
log::trace!(
"refreshed data of account {} via rpc, got context slot {}",
address,
response.context.slot
);
Ok(())
}
}
impl AccountFetcher for ChainDataAccountFetcher {
impl crate::AccountFetcher for AccountFetcher {
fn fetch_raw_account(&self, address: Pubkey) -> anyhow::Result<solana_sdk::account::Account> {
self.fetch_raw(&address).map(|a| a.into())
}

View File

@ -4,6 +4,8 @@ pub use context::*;
pub use util::*;
mod account_fetcher;
pub mod chain_data;
mod chain_data_fetcher;
mod client;
mod context;
mod gpa;

View File

@ -1,7 +1,6 @@
use crate::account_shared_data::KeyedAccountSharedData;
use crate::ChainDataAccountFetcher;
use client::{AccountFetcher, MangoClient, MangoClientError, MangoGroupContext};
use client::{chain_data, AccountFetcher, MangoClient, MangoClientError, MangoGroupContext};
use mango_v4::state::{
new_health_cache, oracle_price, Bank, FixedOrderAccountRetriever, HealthCache, HealthType,
MangoAccountValue, TokenIndex,
@ -11,7 +10,7 @@ use {anyhow::Context, fixed::types::I80F48, solana_sdk::pubkey::Pubkey};
pub fn new_health_cache_(
context: &MangoGroupContext,
account_fetcher: &ChainDataAccountFetcher,
account_fetcher: &chain_data::AccountFetcher,
account: &MangoAccountValue,
) -> anyhow::Result<HealthCache> {
let active_token_len = account.token_iter_active().count();
@ -40,7 +39,7 @@ pub fn new_health_cache_(
#[allow(clippy::too_many_arguments)]
pub fn process_account(
mango_client: &MangoClient,
account_fetcher: &ChainDataAccountFetcher,
account_fetcher: &chain_data::AccountFetcher,
pubkey: &Pubkey,
) -> anyhow::Result<()> {
// TODO: configurable
@ -165,7 +164,7 @@ pub fn process_account(
#[allow(clippy::too_many_arguments)]
pub fn process_accounts<'a>(
mango_client: &MangoClient,
account_fetcher: &ChainDataAccountFetcher,
account_fetcher: &chain_data::AccountFetcher,
accounts: impl Iterator<Item = &'a Pubkey>,
) -> anyhow::Result<()> {
for pubkey in accounts {

View File

@ -4,12 +4,10 @@ use std::time::Duration;
use anchor_client::Cluster;
use clap::Parser;
use client::{keypair_from_cli, MangoClient, MangoGroupContext};
use client::{chain_data, keypair_from_cli, MangoClient, MangoGroupContext};
use log::*;
use mango_v4::state::{PerpMarketIndex, TokenIndex};
use once_cell::sync::OnceCell;
use solana_client::rpc_client::RpcClient;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
@ -17,16 +15,12 @@ use solana_sdk::signer::Signer;
use std::collections::HashSet;
pub mod account_shared_data;
pub mod chain_data;
pub mod chain_data_fetcher;
pub mod liquidate;
pub mod metrics;
pub mod snapshot_source;
pub mod util;
pub mod websocket_source;
use crate::chain_data::*;
use crate::chain_data_fetcher::*;
use crate::util::{is_mango_account, is_mango_bank, is_mint_info, is_perp_market};
// jemalloc seems to be better at keeping the memory footprint reasonable over
@ -34,9 +28,6 @@ use crate::util::{is_mango_account, is_mango_bank, is_mint_info, is_perp_market}
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
// first slot received from websocket feed
static FIRST_WEBSOCKET_SLOT: OnceCell<u64> = OnceCell::new();
trait AnyhowWrap {
type Value;
fn map_err_anyhow(self) -> anyhow::Result<Self::Value>;
@ -170,6 +161,12 @@ async fn main() -> anyhow::Result<()> {
websocket_sender,
);
let first_websocket_slot = websocket_source::get_next_create_bank_slot(
websocket_receiver.clone(),
Duration::from_secs(10),
)
.await?;
// Getting solana account snapshots via jsonrpc
let (snapshot_sender, snapshot_receiver) =
async_channel::unbounded::<snapshot_source::AccountSnapshot>();
@ -182,15 +179,16 @@ async fn main() -> anyhow::Result<()> {
get_multiple_accounts_count: cli.get_multiple_accounts_count,
parallel_rpc_requests: cli.parallel_rpc_requests,
snapshot_interval: std::time::Duration::from_secs(cli.snapshot_interval_secs),
min_slot: first_websocket_slot + 10,
},
mango_pyth_oracles,
snapshot_sender,
);
// The representation of current on-chain account data
let chain_data = Arc::new(RwLock::new(ChainData::new(&metrics)));
let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new()));
// Reading accounts from chain_data
let account_fetcher = Arc::new(ChainDataAccountFetcher {
let account_fetcher = Arc::new(chain_data::AccountFetcher {
chain_data: chain_data.clone(),
rpc: RpcClient::new_with_timeout_and_commitment(
cluster.url().to_string(),
@ -199,6 +197,8 @@ async fn main() -> anyhow::Result<()> {
),
});
start_chain_data_metrics(chain_data.clone(), &metrics);
// Addresses of the MangoAccounts belonging to the mango program.
// Needed to check health of them all when the cache updates.
let mut mango_accounts = HashSet::<Pubkey>::new();
@ -245,8 +245,7 @@ async fn main() -> anyhow::Result<()> {
let message = message.expect("channel not closed");
// build a model of slots and accounts in `chain_data`
// this code should be generic so it can be reused in future projects
chain_data.write().unwrap().update_from_websocket(message.clone());
websocket_source::update_chain_data(&mut chain_data.write().unwrap(), message.clone());
// specific program logic using the mirrored data
if let websocket_source::Message::Account(account_write) = message {
@ -325,7 +324,7 @@ async fn main() -> anyhow::Result<()> {
}
metric_mango_accounts.set(mango_accounts.len() as u64);
chain_data.write().unwrap().update_from_snapshot(message);
snapshot_source::update_chain_data(&mut chain_data.write().unwrap(), message);
one_snapshot_done = true;
// trigger a full health check
@ -340,3 +339,22 @@ async fn main() -> anyhow::Result<()> {
}
}
}
fn start_chain_data_metrics(chain: Arc<RwLock<chain_data::ChainData>>, metrics: &metrics::Metrics) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
let mut metric_slots_count = metrics.register_u64("chain_data_slots_count".into());
let mut metric_accounts_count = metrics.register_u64("chain_data_accounts_count".into());
let mut metric_account_write_count =
metrics.register_u64("chain_data_account_write_count".into());
tokio::spawn(async move {
loop {
interval.tick().await;
let chain_lock = chain.read().unwrap();
metric_slots_count.set(chain_lock.slots_count() as u64);
metric_accounts_count.set(chain_lock.accounts_count() as u64);
metric_account_write_count.set(chain_lock.account_writes_count() as u64);
}
});
}

View File

@ -151,7 +151,7 @@ pub fn start() -> Metrics {
0
};
let diff = new_value.wrapping_sub(previous_value) as i64;
log::debug!("metric: {}: {} ({:+})", name, new_value, diff);
log::info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::I64(v) => {
let new_value = v.load(atomic::Ordering::Acquire);
@ -164,7 +164,7 @@ pub fn start() -> Metrics {
0
};
let diff = new_value - previous_value;
log::debug!("metric: {}: {} ({:+})", name, new_value, diff);
log::info!("metric: {}: {} ({:+})", name, new_value, diff);
}
Value::String(v) => {
let new_value = v.lock().unwrap();
@ -178,9 +178,9 @@ pub fn start() -> Metrics {
"".into()
};
if *new_value == previous_value {
log::debug!("metric: {}: {} (unchanged)", name, &*new_value);
log::info!("metric: {}: {} (unchanged)", name, &*new_value);
} else {
log::debug!(
log::info!(
"metric: {}: {} (before: {})",
name,
&*new_value,

View File

@ -18,7 +18,9 @@ use std::str::FromStr;
use std::time::Duration;
use tokio::time;
use crate::{util::is_mango_account, AnyhowWrap, FIRST_WEBSOCKET_SLOT};
use client::chain_data;
use crate::{util::is_mango_account, AnyhowWrap};
#[derive(Clone)]
pub struct AccountUpdate {
@ -79,6 +81,7 @@ pub struct Config {
pub get_multiple_accounts_count: usize,
pub parallel_rpc_requests: usize,
pub snapshot_interval: Duration,
pub min_slot: u64,
}
async fn feed_snapshots(
@ -94,7 +97,7 @@ async fn feed_snapshots(
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
min_context_slot: Some(config.min_slot),
};
let all_accounts_config = RpcProgramAccountsConfig {
filters: None,
@ -215,6 +218,7 @@ pub fn start(
.await
.expect("always Ok");
// Wait for slot to exceed min_slot
loop {
poll_wait_first_snapshot.tick().await;
@ -227,14 +231,9 @@ pub fn start(
.expect("always Ok");
log::debug!("latest slot for snapshot {}", epoch_info.absolute_slot);
match FIRST_WEBSOCKET_SLOT.get() {
Some(first_websocket_slot) => {
if first_websocket_slot < &epoch_info.absolute_slot {
log::debug!("continuing to fetch snapshot now, first websocket feed slot {} is older than latest snapshot slot {}",first_websocket_slot, epoch_info.absolute_slot);
break;
}
}
None => {}
if epoch_info.absolute_slot > config.min_slot {
log::debug!("continuing to fetch snapshot now, min_slot {} is older than latest epoch slot {}", config.min_slot, epoch_info.absolute_slot);
break;
}
}
@ -248,3 +247,15 @@ pub fn start(
}
});
}
pub fn update_chain_data(chain: &mut chain_data::ChainData, snapshot: AccountSnapshot) {
for account_update in snapshot.accounts {
chain.update_account(
account_update.pubkey,
chain_data::AccountAndSlot {
account: account_update.account,
slot: account_update.slot,
},
);
}
}

View File

@ -10,10 +10,13 @@ use solana_client::{
use solana_rpc::rpc_pubsub::RpcSolPubSubClient;
use solana_sdk::{account::AccountSharedData, commitment_config::CommitmentConfig, pubkey::Pubkey};
use anyhow::Context;
use log::*;
use std::{str::FromStr, sync::Arc, time::Duration};
use tokio_stream::StreamMap;
use client::chain_data;
use crate::AnyhowWrap;
#[derive(Clone)]
@ -182,3 +185,89 @@ pub fn start(
}
});
}
pub fn update_chain_data(chain: &mut chain_data::ChainData, message: Message) {
use chain_data::*;
match message {
Message::Account(account_write) => {
trace!("websocket account message");
chain.update_account(
account_write.pubkey,
AccountAndSlot {
slot: account_write.slot,
account: account_write.account,
},
);
}
Message::Slot(slot_update) => {
trace!("websocket slot message");
let slot_update = match *slot_update {
solana_client::rpc_response::SlotUpdate::CreatedBank { slot, parent, .. } => {
Some(SlotData {
slot,
parent: Some(parent),
status: SlotStatus::Processed,
chain: 0,
})
}
solana_client::rpc_response::SlotUpdate::OptimisticConfirmation {
slot, ..
} => Some(SlotData {
slot,
parent: None,
status: SlotStatus::Confirmed,
chain: 0,
}),
solana_client::rpc_response::SlotUpdate::Root { slot, .. } => Some(SlotData {
slot,
parent: None,
status: SlotStatus::Rooted,
chain: 0,
}),
_ => None,
};
if let Some(update) = slot_update {
chain.update_slot(update);
}
}
}
}
pub async fn get_next_create_bank_slot(
receiver: async_channel::Receiver<Message>,
timeout: Duration,
) -> anyhow::Result<u64> {
let start = std::time::Instant::now();
loop {
let elapsed = start.elapsed();
if elapsed > timeout {
anyhow::bail!(
"did not receive a slot from the websocket connection in {}s",
timeout.as_secs()
);
}
let remaining_timeout = timeout - elapsed;
let msg = match tokio::time::timeout(remaining_timeout, receiver.recv()).await {
// timeout
Err(_) => continue,
// channel close
Ok(Err(err)) => {
return Err(err).context("while waiting for first slot from websocket connection");
}
// success
Ok(Ok(msg)) => msg,
};
match msg {
Message::Slot(slot_update) => {
if let solana_client::rpc_response::SlotUpdate::CreatedBank { slot, .. } =
*slot_update
{
return Ok(slot);
}
}
_ => {}
}
}
}