* Add chaindata metrics

* jemalloc for fills
* Reenable dropped fill processing
* Add gMA snapshot support
* Tidy up serum orderbook change detection
* cargo fmt
This commit is contained in:
Riordan Panayides 2023-01-20 14:52:01 +00:00
parent 0d41c04de5
commit 3dc7ae1246
11 changed files with 334 additions and 286 deletions

23
Cargo.lock generated
View File

@ -2576,6 +2576,27 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc"
[[package]]
name = "jemalloc-sys"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d3b9f3f5c9b31aa0f5ed3260385ac205db665baa41d49bb8338008ae94ede45"
dependencies = [
"cc",
"fs_extra",
"libc",
]
[[package]]
name = "jemallocator"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43ae63fcfc45e99ab3d1b29a46782ad679e98436c3169d15a167a1108a724b69"
dependencies = [
"jemalloc-sys",
"libc",
]
[[package]]
name = "jobserver"
version = "0.1.25"
@ -5150,6 +5171,7 @@ dependencies = [
"client",
"futures-channel",
"futures-util",
"jemallocator",
"log 0.4.17",
"mango-v4",
"serde",
@ -5749,6 +5771,7 @@ dependencies = [
"futures-core",
"futures-util",
"itertools 0.10.5",
"jemallocator",
"jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"jsonrpc-core-client",
"log 0.4.17",

View File

@ -1,5 +1,9 @@
use crate::metrics::{MetricType, MetricU64, Metrics};
use {
solana_sdk::account::{AccountSharedData, ReadableAccount}, solana_sdk::pubkey::Pubkey, std::collections::HashMap,
solana_sdk::account::{AccountSharedData, ReadableAccount},
solana_sdk::pubkey::Pubkey,
std::collections::HashMap,
};
#[derive(Clone, Copy, Debug, PartialEq)]
@ -37,10 +41,13 @@ pub struct ChainData {
newest_processed_slot: u64,
account_versions_stored: usize,
account_bytes_stored: usize,
metric_accounts_stored: MetricU64,
metric_account_versions_stored: MetricU64,
metric_account_bytes_stored: MetricU64,
}
impl ChainData {
pub fn new() -> Self {
pub fn new(metrics_sender: Metrics) -> Self {
Self {
slots: HashMap::new(),
accounts: HashMap::new(),
@ -48,6 +55,18 @@ impl ChainData {
newest_processed_slot: 0,
account_versions_stored: 0,
account_bytes_stored: 0,
metric_accounts_stored: metrics_sender.register_u64(
"fills_feed_chaindata_accounts_stored".into(),
MetricType::Gauge,
),
metric_account_versions_stored: metrics_sender.register_u64(
"fills_feed_chaindata_account_versions_stored".into(),
MetricType::Gauge,
),
metric_account_bytes_stored: metrics_sender.register_u64(
"fills_feed_chaindata_account_bytes_stored".into(),
MetricType::Gauge,
),
}
}
@ -128,15 +147,21 @@ impl ChainData {
writes
.retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot);
self.account_versions_stored += writes.len();
self.account_bytes_stored += writes.iter().map(|w| w.account.data().len()).fold(0, |acc, l| acc + l)
self.account_bytes_stored += writes
.iter()
.map(|w| w.account.data().len())
.fold(0, |acc, l| acc + l)
}
// now it's fine to drop any slots before the new rooted head
// as account writes for non-rooted slots before it have been dropped
self.slots.retain(|s, _| *s >= self.newest_rooted_slot);
// TODO: move this to prom
println!("[chain_data] account_versions_stored = {} account_bytes_stored = {}", self.account_versions_stored, self.account_bytes_stored);
self.metric_accounts_stored.set(self.accounts.len() as u64);
self.metric_account_versions_stored
.set(self.account_versions_stored as u64);
self.metric_account_bytes_stored
.set(self.account_bytes_stored as u64);
}
}

View File

@ -355,77 +355,77 @@ fn publish_changes_serum(
continue;
}
// match old_event_view {
// EventView::Fill { .. } => {
// // every already published event is recorded in checkpoint
// checkpoint.push(events[idx]);
// }
// EventView::Out { .. } => {
// debug!(
// "found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
// mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
// );
match old_event_view {
EventView::Fill { .. } => {
// every already published event is recorded in checkpoint
checkpoint.push(events[idx]);
}
EventView::Out { .. } => {
debug!(
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
// metric_events_change.increment();
metric_events_change.increment();
// // first revoke old event
// fill_update_sender
// .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
// slot,
// write_version,
// event: old_events[idx],
// status: FillUpdateStatus::Revoke,
// market: mkt_pk_string.clone(),
// queue: evq_pk_string.clone(),
// }))
// .unwrap(); // TODO: use anyhow to bubble up error
// first revoke old event
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
slot,
write_version,
event: old_events[idx],
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// // then publish new if its a fill and record in checkpoint
// fill_update_sender
// .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
// slot,
// write_version,
// event: events[idx],
// status: FillUpdateStatus::New,
// market: mkt_pk_string.clone(),
// queue: evq_pk_string.clone(),
// }))
// .unwrap(); // TODO: use anyhow to bubble up error
// checkpoint.push(events[idx]);
// }
// }
// then publish new if its a fill and record in checkpoint
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
slot,
write_version,
event: events[idx],
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(events[idx]);
}
}
}
_ => continue,
}
}
// // in case queue size shrunk due to a fork we need revoke all previous fills
// for seq_num in header_seq_num..old_seq_num {
// let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
// let old_event_view = old_events[idx].as_view().unwrap();
// debug!(
// "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
// mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
// );
// in case queue size shrunk due to a fork we need revoke all previous fills
for seq_num in header_seq_num..old_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
let old_event_view = old_events[idx].as_view().unwrap();
debug!(
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
// metric_events_drop.increment();
metric_events_drop.increment();
// match old_event_view {
// EventView::Fill { .. } => {
// fill_update_sender
// .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
// slot,
// event: old_events[idx],
// write_version,
// status: FillUpdateStatus::Revoke,
// market: mkt_pk_string.clone(),
// queue: evq_pk_string.clone(),
// }))
// .unwrap(); // TODO: use anyhow to bubble up error
// }
// EventView::Out { .. } => { continue }
// }
// }
match old_event_view {
EventView::Fill { .. } => {
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
slot,
event: old_events[idx],
write_version,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
EventView::Out { .. } => continue,
}
}
fill_update_sender
.try_send(FillEventFilterMessage::SerumCheckpoint(
@ -474,7 +474,7 @@ pub async fn init(
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let mut chain_cache = ChainData::new();
let mut chain_cache = ChainData::new(metrics_sender);
let mut perp_events_cache: HashMap<String, EventQueueEvents> = HashMap::new();
let mut serum_events_cache: HashMap<String, Vec<serum_dex::state::Event>> = HashMap::new();
let mut seq_num_cache = HashMap::new();

View File

@ -3,16 +3,17 @@ use geyser::geyser_client::GeyserClient;
use jsonrpc_core::futures::StreamExt;
use jsonrpc_core_client::transports::http;
use solana_account_decoder::UiAccountEncoding;
use solana_account_decoder::{UiAccount, UiAccountEncoding};
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
use solana_client::rpc_response::{OptionalContext, Response, RpcKeyedAccount};
use solana_rpc::{rpc::rpc_accounts::AccountsDataClient};
use solana_client::rpc_response::{OptionalContext, RpcKeyedAccount};
use solana_rpc::rpc::rpc_accounts::AccountsDataClient;
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use futures::{future, future::FutureExt};
use tonic::{
metadata::MetadataValue, Request,
transport::{Channel, Certificate, Identity, ClientTlsConfig},
metadata::MetadataValue,
transport::{Certificate, Channel, ClientTlsConfig, Identity},
Request,
};
use log::*;
@ -42,16 +43,18 @@ use crate::{
//use solana_geyser_connector_plugin_grpc::compression::zstd_decompress;
type SnapshotData = Response<Vec<RpcKeyedAccount>>;
struct SnapshotData {
slot: u64,
accounts: Vec<(String, Option<UiAccount>)>,
}
enum Message {
GrpcUpdate(geyser::SubscribeUpdate),
Snapshot(SnapshotData),
}
async fn get_snapshot(
async fn get_snapshot_gpa(
rpc_http_url: String,
program_id: Pubkey,
program_id: String,
) -> anyhow::Result<OptionalContext<Vec<RpcKeyedAccount>>> {
let rpc_client = http::connect_with_options::<AccountsDataClient>(&rpc_http_url, true)
.await
@ -71,16 +74,37 @@ async fn get_snapshot(
info!("requesting snapshot {}", program_id);
let account_snapshot = rpc_client
.get_program_accounts(
program_id.to_string(),
Some(program_accounts_config.clone()),
)
.get_program_accounts(program_id.clone(), Some(program_accounts_config.clone()))
.await
.map_err_anyhow()?;
info!("snapshot received {}", program_id);
Ok(account_snapshot)
}
async fn get_snapshot_gma(
rpc_http_url: String,
ids: Vec<String>,
) -> anyhow::Result<solana_client::rpc_response::Response<Vec<Option<UiAccount>>>> {
let rpc_client = http::connect_with_options::<AccountsDataClient>(&rpc_http_url, true)
.await
.map_err_anyhow()?;
let account_info_config = RpcAccountInfoConfig {
encoding: Some(UiAccountEncoding::Base64),
commitment: Some(CommitmentConfig::finalized()),
data_slice: None,
min_context_slot: None,
};
info!("requesting snapshot {:?}", ids);
let account_snapshot = rpc_client
.get_multiple_accounts(ids.clone(), Some(account_info_config))
.await
.map_err_anyhow()?;
info!("snapshot received {:?}", ids);
Ok(account_snapshot)
}
async fn feed_data_geyser(
grpc_config: &GrpcSourceConfig,
tls_config: Option<ClientTlsConfig>,
@ -107,25 +131,27 @@ async fn feed_data_geyser(
}
.connect()
.await?;
let token: MetadataValue<_> = "dbbf36253d0b2e6a85618a4ef2fa".parse()?;
let token: MetadataValue<_> = "eed31807f710e4bb098779fb9f67".parse()?;
let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| {
req.metadata_mut().insert("x-token", token.clone());
Ok(req)
});
// If account_ids are provided, snapshot will be gMA. If only program_ids, then only the first id will be snapshot
// TODO: handle this better
if filter_config.program_ids.len() > 1 {
warn!("only one program id is supported for gPA snapshots")
}
let mut accounts = HashMap::new();
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
account: Vec::new(),
account: filter_config.account_ids.clone(),
owner: filter_config.program_ids.clone(),
},
);
let mut slots = HashMap::new();
slots.insert(
"client".to_owned(),
SubscribeRequestFilterSlots {},
);
slots.insert("client".to_owned(), SubscribeRequestFilterSlots {});
let blocks = HashMap::new();
let transactions = HashMap::new();
@ -171,7 +197,8 @@ async fn feed_data_geyser(
// which will have "finalized" commitment.
let mut rooted_to_finalized_slots = 30;
let mut snapshot_future = future::Fuse::terminated();
let mut snapshot_gma = future::Fuse::terminated();
let mut snapshot_gpa = future::Fuse::terminated();
// The plugin sends a ping every 5s or so
let fatal_idle_timeout = Duration::from_secs(60);
@ -214,10 +241,13 @@ async fn feed_data_geyser(
// drop data for slots that are well beyond rooted
slot_pubkey_writes.retain(|&k, _| k >= max_rooted_slot - max_out_of_order_slots);
}
if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot {
snapshot_needed = false;
for program_id in filter_config.program_ids.clone() {
snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), Pubkey::from_str(&program_id).unwrap())).fuse();
if filter_config.account_ids.len() > 0 {
snapshot_gma = tokio::spawn(get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone())).fuse();
} else if filter_config.program_ids.len() > 0 {
snapshot_gpa = tokio::spawn(get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone())).fuse();
}
}
}
@ -243,7 +273,7 @@ async fn feed_data_geyser(
continue;
},
};
let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes();
let write_version_mapping = pubkey_writes.entry(pubkey_bytes).or_insert(WriteVersion {
global: write.write_version,
@ -266,13 +296,43 @@ async fn feed_data_geyser(
}
sender.send(Message::GrpcUpdate(update)).await.expect("send success");
},
snapshot = &mut snapshot_future => {
snapshot = &mut snapshot_gma => {
let snapshot = snapshot??;
info!("snapshot is for slot {}, first full slot was {}", snapshot.context.slot, first_full_slot);
if snapshot.context.slot >= first_full_slot {
let accounts: Vec<(String, Option<UiAccount>)> = filter_config.account_ids.iter().zip(snapshot.value).map(|x| (x.0.clone(), x.1)).collect();
sender
.send(Message::Snapshot(SnapshotData {
accounts,
slot: snapshot.context.slot,
}))
.await
.expect("send success");
} else {
info!(
"snapshot is too old: has slot {}, expected {} minimum",
snapshot.context.slot,
first_full_slot
);
// try again in another 10 slots
snapshot_needed = true;
rooted_to_finalized_slots += 10;
}
},
snapshot = &mut snapshot_gpa => {
let snapshot = snapshot??;
if let OptionalContext::Context(snapshot_data) = snapshot {
info!("snapshot is for slot {}, first full slot was {}", snapshot_data.context.slot, first_full_slot);
if snapshot_data.context.slot >= first_full_slot {
let accounts: Vec<(String, Option<UiAccount>)> = snapshot_data.value.iter().map(|x| {
let deref = x.clone();
(deref.pubkey, Some(deref.account))
}).collect();
sender
.send(Message::Snapshot(snapshot_data))
.send(Message::Snapshot(SnapshotData {
accounts,
slot: snapshot_data.context.slot,
}))
.await
.expect("send success");
} else {
@ -411,7 +471,7 @@ pub async fn process_events(
loop {
metric_dedup_queue.set(msg_receiver.len() as u64);
let msg = msg_receiver.recv().await.expect("sender must not close");
use geyser::{subscribe_update::UpdateOneof};
use geyser::subscribe_update::UpdateOneof;
match msg {
Message::GrpcUpdate(update) => {
match update.update_oneof.expect("invalid grpc") {
@ -421,7 +481,7 @@ pub async fn process_events(
None => {
// TODO: handle error
continue;
},
}
};
assert!(update.pubkey.len() == 32);
assert!(update.owner.len() == 32);
@ -460,11 +520,12 @@ pub async fn process_events(
metric_slot_updates.increment();
metric_slot_queue.set(slot_queue_sender.len() as u64);
let status = SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v {
SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed,
SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed,
SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted,
});
let status =
SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v {
SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed,
SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed,
SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted,
});
if status.is_none() {
error!("unexpected slot status: {}", update.status);
continue;
@ -480,24 +541,29 @@ pub async fn process_events(
.await
.expect("send success");
}
UpdateOneof::Block(_) => {},
UpdateOneof::Transaction(_) => {},
UpdateOneof::Block(_) => {}
UpdateOneof::Transaction(_) => {}
}
}
Message::Snapshot(update) => {
metric_snapshots.increment();
info!("processing snapshot...");
for keyed_account in update.value {
for account in update.accounts.iter() {
metric_snapshot_account_writes.increment();
metric_account_queue.set(account_write_queue_sender.len() as u64);
// TODO: Resnapshot on invalid data?
let account: Account = keyed_account.account.decode().unwrap();
let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.context.slot, 0, account))
.await
.expect("send success");
match account {
(key, Some(ui_account)) => {
// TODO: Resnapshot on invalid data?
let pubkey = Pubkey::from_str(key).unwrap();
let account: Account = ui_account.decode().unwrap();
account_write_queue_sender
.send(AccountWrite::from(pubkey, update.slot, 0, account))
.await
.expect("send success");
}
(key, None) => warn!("account not found {}", key),
}
}
info!("processing snapshot done");
}

View File

@ -122,6 +122,7 @@ pub struct SourceConfig {
#[derive(Clone, Debug, Deserialize)]
pub struct FilterConfig {
pub program_ids: Vec<String>,
pub account_ids: Vec<String>,
}
#[derive(Clone, Debug)]

View File

@ -126,18 +126,12 @@ pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, base_lot_size: i64)
res
}
pub fn price_lots_to_ui(
native: i64,
base_decimals: u8,
quote_decimals: u8,
) -> f64 {
pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
let decimals = base_decimals - quote_decimals;
// let res = native as f64
// * ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
// as f64;
let res = native as f64
/ (10u64.pow(decimals.into()))
as f64;
let res = native as f64 / (10u64.pow(decimals.into())) as f64;
res
}
@ -158,7 +152,6 @@ pub fn price_lots_to_ui_perp(
res
}
fn publish_changes(
slot: u64,
write_version: u64,
@ -172,6 +165,13 @@ fn publish_changes(
) {
let mut update: Vec<OrderbookLevel> = vec![];
// push diff for levels that are no longer present
if current_bookside.len() != previous_bookside.len() {
info!(
"L {}",
current_bookside.len() as i64 - previous_bookside.len() as i64
)
}
for previous_order in previous_bookside.iter() {
let peer = current_bookside
.iter()
@ -179,7 +179,7 @@ fn publish_changes(
match peer {
None => {
info!("removed level {}", previous_order[0]);
info!("R {} {}", previous_order[0], previous_order[1]);
update.push([previous_order[0], 0f64]);
}
_ => continue,
@ -197,11 +197,14 @@ fn publish_changes(
if previous_order[1] == current_order[1] {
continue;
}
info!("size changed {} -> {}", previous_order[1], current_order[1]);
info!(
"C {} {} -> {}",
current_order[0], previous_order[1], current_order[1]
);
update.push(current_order.clone());
}
None => {
info!("new level {},{}", current_order[0], current_order[1]);
info!("A {} {}", current_order[0], current_order[1]);
update.push(current_order.clone())
}
}
@ -242,88 +245,6 @@ fn publish_changes(
metric_updates.increment();
}
fn publish_changes_serum(
slot: u64,
write_version: u64,
mkt: &(Pubkey, MarketConfig),
side: OrderbookSide,
current_bookside: &Vec<OrderbookLevel>,
previous_bookside: &Vec<OrderbookLevel>,
maybe_other_bookside: Option<&Vec<OrderbookLevel>>,
orderbook_update_sender: &async_channel::Sender<OrderbookFilterMessage>,
metric_updates: &mut MetricU64,
) {
let mut update: Vec<OrderbookLevel> = vec![];
// push diff for levels that are no longer present
for previous_order in previous_bookside.iter() {
let peer = current_bookside
.iter()
.find(|level| previous_order[0] == level[0]);
match peer {
None => {
info!("removed level s {}", previous_order[0]);
update.push([previous_order[0], 0f64]);
}
_ => continue,
}
}
// push diff where there's a new level or size has changed
for current_order in current_bookside {
let peer = previous_bookside
.iter()
.find(|item| item[0] == current_order[0]);
match peer {
Some(previous_order) => {
if previous_order[1] == current_order[1] {
continue;
}
info!("size changed {} -> {}", previous_order[1], current_order[1]);
update.push(current_order.clone());
}
None => {
info!("new level {},{}", current_order[0], current_order[1]);
update.push(current_order.clone())
}
}
}
match maybe_other_bookside {
Some(other_bookside) => {
let (bids, asks) = match side {
OrderbookSide::Bid => (current_bookside, other_bookside),
OrderbookSide::Ask => (other_bookside, current_bookside),
};
orderbook_update_sender
.try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint {
slot,
write_version,
bids: bids.clone(),
asks: asks.clone(),
market: mkt.0.to_string(),
}))
.unwrap()
}
None => info!("other bookside not in cache"),
}
if update.len() > 0 {
orderbook_update_sender
.try_send(OrderbookFilterMessage::Update(OrderbookUpdate {
market: mkt.0.to_string(),
side: side.clone(),
update,
slot,
write_version,
}))
.unwrap(); // TODO: use anyhow to bubble up error
metric_updates.increment();
}
}
pub async fn init(
market_configs: Vec<(Pubkey, MarketConfig)>,
serum_market_configs: Vec<(Pubkey, MarketConfig)>,
@ -352,7 +273,7 @@ pub async fn init(
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let mut chain_cache = ChainData::new();
let mut chain_cache = ChainData::new(metrics_sender);
let mut bookside_cache: HashMap<String, Vec<OrderbookLevel>> = HashMap::new();
let mut serum_bookside_cache: HashMap<String, Vec<OrderbookLevel>> = HashMap::new();
let mut last_write_versions = HashMap::<String, (u64, u64)>::new();
@ -412,7 +333,7 @@ pub async fn init(
let write_version = (account_info.slot, account_info.write_version);
// todo: should this be <= so we don't overwrite with old data received late?
if write_version == *last_write_version {
if write_version <= *last_write_version {
continue;
}
last_write_versions.insert(side_pk_string.clone(), write_version);
@ -492,11 +413,11 @@ pub async fn init(
let write_version = (account_info.slot, account_info.write_version);
// todo: should this be <= so we don't overwrite with old data received late?
if write_version == *last_write_version {
if write_version <= *last_write_version {
continue;
}
last_write_versions.insert(side_pk_string.clone(), write_version);
info!("W {}", mkt.1.name);
let account = &mut account_info.account.clone();
let data = account.data_as_mut_slice();
let len = data.len();
@ -532,7 +453,7 @@ pub async fn init(
serum_bookside_cache.get(&other_side_pk.to_string());
match serum_bookside_cache.get(&side_pk_string) {
Some(old_bookside) => publish_changes_serum(
Some(old_bookside) => publish_changes(
account_info.slot,
account_info.write_version,
mkt,

View File

@ -6,9 +6,7 @@ use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_response::{OptionalContext, Response, RpcKeyedAccount},
};
use solana_rpc::{
rpc::rpc_accounts::AccountsDataClient, rpc_pubsub::RpcSolPubSubClient,
};
use solana_rpc::{rpc::rpc_accounts::AccountsDataClient, rpc_pubsub::RpcSolPubSubClient};
use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey};
use log::*;

View File

@ -23,6 +23,7 @@ async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
bytemuck = "1.7.2"
jemallocator = "0.3.2"
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }

View File

@ -43,6 +43,11 @@ type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
type SerumCheckpointMap = Arc<Mutex<HashMap<String, SerumFillCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
// jemalloc seems to be better at keeping the memory footprint reasonable over
// longer periods of time
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "command")]
pub enum Command {
@ -127,27 +132,25 @@ async fn handle_connection(
);
}
let receive_commands = ws_rx.try_for_each(|msg| {
match msg {
Message::Text(_) => {
handle_commands(
addr,
msg,
peer_map.clone(),
checkpoint_map.clone(),
serum_checkpoint_map.clone(),
market_ids.clone(),
)
},
Message::Ping(_) => {
let peers = peer_map.clone();
let mut peers_lock = peers.lock().unwrap();
let peer = peers_lock.get_mut(&addr).expect("peer should be in map");
peer.sender.unbounded_send(Message::Pong(Vec::new())).unwrap();
future::ready(Ok(()))
}
_ => future::ready(Ok(())),
let receive_commands = ws_rx.try_for_each(|msg| match msg {
Message::Text(_) => handle_commands(
addr,
msg,
peer_map.clone(),
checkpoint_map.clone(),
serum_checkpoint_map.clone(),
market_ids.clone(),
),
Message::Ping(_) => {
let peers = peer_map.clone();
let mut peers_lock = peers.lock().unwrap();
let peer = peers_lock.get_mut(&addr).expect("peer should be in map");
peer.sender
.unbounded_send(Message::Pong(Vec::new()))
.unwrap();
future::ready(Ok(()))
}
_ => future::ready(Ok(())),
});
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
@ -454,40 +457,40 @@ async fn main() -> anyhow::Result<()> {
let try_socket = TcpListener::bind(&config.bind_ws_addr).await;
let listener = try_socket.expect("Failed to bind");
{
tokio::spawn(async move {
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection_error(
checkpoints.clone(),
serum_checkpoints.clone(),
peers.clone(),
market_pubkey_strings.clone(),
stream,
addr,
metrics_opened_connections.clone(),
metrics_closed_connections.clone(),
));
}
});
tokio::spawn(async move {
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection_error(
checkpoints.clone(),
serum_checkpoints.clone(),
peers.clone(),
market_pubkey_strings.clone(),
stream,
addr,
metrics_opened_connections.clone(),
metrics_closed_connections.clone(),
));
}
});
}
// keepalive
{
tokio::spawn(async move {
let mut write_interval = time::interval(time::Duration::from_secs(30));
tokio::spawn(async move {
let mut write_interval = time::interval(time::Duration::from_secs(30));
loop {
write_interval.tick().await;
let peers_copy = peers_ref_thread1.lock().unwrap().clone();
for (addr, peer) in peers_copy.iter() {
let pl = Vec::new();
let result = peer.clone().sender.send(Message::Ping(pl)).await;
if result.is_err() {
error!("ws ping could not reach {}", addr);
loop {
write_interval.tick().await;
let peers_copy = peers_ref_thread1.lock().unwrap().clone();
for (addr, peer) in peers_copy.iter() {
let pl = Vec::new();
let result = peer.clone().sender.send(Message::Ping(pl)).await;
if result.is_err() {
error!("ws ping could not reach {}", addr);
}
}
}
}
});
});
}
info!(
"rpc connect: {}",
@ -499,11 +502,11 @@ async fn main() -> anyhow::Result<()> {
.collect::<String>()
);
let use_geyser = true;
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect();
let filter_config = FilterConfig {
program_ids: vec![
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
],
program_ids: vec![],
account_ids: relevant_pubkeys,
};
if use_geyser {
grpc_plugin_source::process_events(

View File

@ -285,19 +285,23 @@ async fn main() -> anyhow::Result<()> {
&Keypair::new(),
Some(rpc_timeout),
);
let group_context = Arc::new(MangoGroupContext::new_from_rpc(
&client.rpc_async(),
Pubkey::from_str(&config.mango_group).unwrap(),
).await?);
let group_context = Arc::new(
MangoGroupContext::new_from_rpc(
&client.rpc_async(),
Pubkey::from_str(&config.mango_group).unwrap(),
)
.await?,
);
// todo: reload markets at intervals
let market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.perp_markets
.iter()
.map(|(_, context)| {
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index) {
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index)
{
Some(token) => token.decimals,
None => panic!("token not found for market") // todo: default to 6 for usdc?
None => panic!("token not found for market"), // todo: default to 6 for usdc?
};
(
context.address,
@ -320,11 +324,11 @@ async fn main() -> anyhow::Result<()> {
.map(|(_, context)| {
let base_decimals = match group_context.tokens.get(&context.market.base_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market") // todo: default?
None => panic!("token not found for market"), // todo: default?
};
let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market") // todo: default to 6 for usdc?
None => panic!("token not found for market"), // todo: default to 6 for usdc?
};
(
context.market.serum_market_external,
@ -341,13 +345,14 @@ async fn main() -> anyhow::Result<()> {
})
.collect();
let market_pubkey_strings: HashMap<String, String> = [market_configs.clone(), serum_market_configs.clone()]
.concat()
.iter()
.map(|market| (market.0.to_string(), market.1.name.clone()))
.collect::<Vec<(String, String)>>()
.into_iter()
.collect();
let market_pubkey_strings: HashMap<String, String> =
[market_configs.clone(), serum_market_configs.clone()]
.concat()
.iter()
.map(|market| (market.0.to_string(), market.1.name.clone()))
.collect::<Vec<(String, String)>>()
.into_iter()
.collect();
let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) =
orderbook_filter::init(market_configs, serum_market_configs, metrics_tx.clone()).await?;
@ -422,6 +427,7 @@ async fn main() -> anyhow::Result<()> {
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
],
account_ids: vec![],
};
grpc_plugin_source::process_events(
&config.source,

View File

@ -119,7 +119,9 @@ fn start_pnl_updater(
}
let pnl_vals =
compute_pnl(context.clone(), account_fetcher.clone(), &mango_account).await.unwrap();
compute_pnl(context.clone(), account_fetcher.clone(), &mango_account)
.await
.unwrap();
// Alternatively, we could prepare the sorted and limited lists for each
// market here. That would be faster and cause less contention on the pnl_data
@ -247,10 +249,13 @@ async fn main() -> anyhow::Result<()> {
&Keypair::new(),
Some(rpc_timeout),
);
let group_context = Arc::new(MangoGroupContext::new_from_rpc(
&client.rpc_async(),
Pubkey::from_str(&config.pnl.mango_group).unwrap(),
).await?);
let group_context = Arc::new(
MangoGroupContext::new_from_rpc(
&client.rpc_async(),
Pubkey::from_str(&config.pnl.mango_group).unwrap(),
)
.await?,
);
let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new()));
let account_fetcher = Arc::new(chain_data::AccountFetcher {
chain_data: chain_data.clone(),
@ -265,7 +270,7 @@ async fn main() -> anyhow::Result<()> {
metrics_tx.register_u64("pnl_jsonrpc_reqs_invalid_total".into(), MetricType::Counter);
let metrics_pnls_tracked = metrics_tx.register_u64("pnl_num_tracked".into(), MetricType::Gauge);
let chain_data = Arc::new(RwLock::new(ChainData::new()));
let chain_data = Arc::new(RwLock::new(ChainData::new(metrics_tx.clone())));
let pnl_data = Arc::new(RwLock::new(PnlData::new()));
start_pnl_updater(
@ -288,9 +293,8 @@ async fn main() -> anyhow::Result<()> {
// start filling chain_data from the grpc plugin source
let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?;
let filter_config = FilterConfig {
program_ids: vec![
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
],
program_ids: vec!["4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into()],
account_ids: vec![],
};
grpc_plugin_source::process_events(
&config.source,