From 0d41c04de589c7a91513eb37c130c267eb48a7a9 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Thu, 19 Jan 2023 02:44:54 +0000 Subject: [PATCH 1/5] Snapshot all program ids in fills --- lib/src/grpc_plugin_source.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 2642dad..5aa432f 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -69,7 +69,7 @@ async fn get_snapshot( account_config: account_info_config.clone(), }; - info!("requesting snapshot"); + info!("requesting snapshot {}", program_id); let account_snapshot = rpc_client .get_program_accounts( program_id.to_string(), @@ -77,7 +77,7 @@ async fn get_snapshot( ) .await .map_err_anyhow()?; - info!("snapshot received"); + info!("snapshot received {}", program_id); Ok(account_snapshot) } @@ -88,7 +88,6 @@ async fn feed_data_geyser( filter_config: &FilterConfig, sender: async_channel::Sender, ) -> anyhow::Result<()> { - let program_id = Pubkey::from_str(&snapshot_config.program_id)?; let connection_string = match &grpc_config.connection_string.chars().next().unwrap() { '$' => env::var(&grpc_config.connection_string[1..]) .expect("reading connection string from env"), @@ -217,7 +216,9 @@ async fn feed_data_geyser( } if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot { snapshot_needed = false; - snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), program_id)).fuse(); + for program_id in filter_config.program_ids.clone() { + snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), Pubkey::from_str(&program_id).unwrap())).fuse(); + } } } }, From 3dc7ae124676994829fd91138c133a0541710277 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Fri, 20 Jan 2023 14:52:01 +0000 Subject: [PATCH 2/5] * Add chaindata metrics * jemalloc for fills * Reenable dropped fill processing * Add gMA snapshot support * Tidy up serum orderbook change detection * cargo fmt --- Cargo.lock | 23 +++++ lib/src/chain_data.rs | 35 ++++++- lib/src/fill_event_filter.rs | 124 +++++++++++------------ lib/src/grpc_plugin_source.rs | 150 ++++++++++++++++++++-------- lib/src/lib.rs | 1 + lib/src/orderbook_filter.rs | 119 ++++------------------ lib/src/websocket_source.rs | 4 +- service-mango-fills/Cargo.toml | 1 + service-mango-fills/src/main.rs | 105 +++++++++---------- service-mango-orderbook/src/main.rs | 36 ++++--- service-mango-pnl/src/main.rs | 22 ++-- 11 files changed, 334 insertions(+), 286 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f099d04..c7bc001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2576,6 +2576,27 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" +[[package]] +name = "jemalloc-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d3b9f3f5c9b31aa0f5ed3260385ac205db665baa41d49bb8338008ae94ede45" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43ae63fcfc45e99ab3d1b29a46782ad679e98436c3169d15a167a1108a724b69" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "jobserver" version = "0.1.25" @@ -5150,6 +5171,7 @@ dependencies = [ "client", "futures-channel", "futures-util", + "jemallocator", "log 0.4.17", "mango-v4", "serde", @@ -5749,6 +5771,7 @@ dependencies = [ "futures-core", "futures-util", "itertools 0.10.5", + "jemallocator", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", "log 0.4.17", diff --git a/lib/src/chain_data.rs b/lib/src/chain_data.rs index eaa770f..ccdd543 100644 --- a/lib/src/chain_data.rs +++ b/lib/src/chain_data.rs @@ -1,5 +1,9 @@ +use crate::metrics::{MetricType, MetricU64, Metrics}; + use { - solana_sdk::account::{AccountSharedData, ReadableAccount}, solana_sdk::pubkey::Pubkey, std::collections::HashMap, + solana_sdk::account::{AccountSharedData, ReadableAccount}, + solana_sdk::pubkey::Pubkey, + std::collections::HashMap, }; #[derive(Clone, Copy, Debug, PartialEq)] @@ -37,10 +41,13 @@ pub struct ChainData { newest_processed_slot: u64, account_versions_stored: usize, account_bytes_stored: usize, + metric_accounts_stored: MetricU64, + metric_account_versions_stored: MetricU64, + metric_account_bytes_stored: MetricU64, } impl ChainData { - pub fn new() -> Self { + pub fn new(metrics_sender: Metrics) -> Self { Self { slots: HashMap::new(), accounts: HashMap::new(), @@ -48,6 +55,18 @@ impl ChainData { newest_processed_slot: 0, account_versions_stored: 0, account_bytes_stored: 0, + metric_accounts_stored: metrics_sender.register_u64( + "fills_feed_chaindata_accounts_stored".into(), + MetricType::Gauge, + ), + metric_account_versions_stored: metrics_sender.register_u64( + "fills_feed_chaindata_account_versions_stored".into(), + MetricType::Gauge, + ), + metric_account_bytes_stored: metrics_sender.register_u64( + "fills_feed_chaindata_account_bytes_stored".into(), + MetricType::Gauge, + ), } } @@ -128,15 +147,21 @@ impl ChainData { writes .retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot); self.account_versions_stored += writes.len(); - self.account_bytes_stored += writes.iter().map(|w| w.account.data().len()).fold(0, |acc, l| acc + l) + self.account_bytes_stored += writes + .iter() + .map(|w| w.account.data().len()) + .fold(0, |acc, l| acc + l) } // now it's fine to drop any slots before the new rooted head // as account writes for non-rooted slots before it have been dropped self.slots.retain(|s, _| *s >= self.newest_rooted_slot); - // TODO: move this to prom - println!("[chain_data] account_versions_stored = {} account_bytes_stored = {}", self.account_versions_stored, self.account_bytes_stored); + self.metric_accounts_stored.set(self.accounts.len() as u64); + self.metric_account_versions_stored + .set(self.account_versions_stored as u64); + self.metric_account_bytes_stored + .set(self.account_bytes_stored as u64); } } diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 9508fdc..b108166 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -355,77 +355,77 @@ fn publish_changes_serum( continue; } - // match old_event_view { - // EventView::Fill { .. } => { - // // every already published event is recorded in checkpoint - // checkpoint.push(events[idx]); - // } - // EventView::Out { .. } => { - // debug!( - // "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", - // mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num - // ); + match old_event_view { + EventView::Fill { .. } => { + // every already published event is recorded in checkpoint + checkpoint.push(events[idx]); + } + EventView::Out { .. } => { + debug!( + "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + ); - // metric_events_change.increment(); + metric_events_change.increment(); - // // first revoke old event - // fill_update_sender - // .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { - // slot, - // write_version, - // event: old_events[idx], - // status: FillUpdateStatus::Revoke, - // market: mkt_pk_string.clone(), - // queue: evq_pk_string.clone(), - // })) - // .unwrap(); // TODO: use anyhow to bubble up error + // first revoke old event + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + write_version, + event: old_events[idx], + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error - // // then publish new if its a fill and record in checkpoint - // fill_update_sender - // .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { - // slot, - // write_version, - // event: events[idx], - // status: FillUpdateStatus::New, - // market: mkt_pk_string.clone(), - // queue: evq_pk_string.clone(), - // })) - // .unwrap(); // TODO: use anyhow to bubble up error - // checkpoint.push(events[idx]); - // } - // } + // then publish new if its a fill and record in checkpoint + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + write_version, + event: events[idx], + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(events[idx]); + } + } } _ => continue, } } - // // in case queue size shrunk due to a fork we need revoke all previous fills - // for seq_num in header_seq_num..old_seq_num { - // let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; - // let old_event_view = old_events[idx].as_view().unwrap(); - // debug!( - // "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", - // mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num - // ); + // in case queue size shrunk due to a fork we need revoke all previous fills + for seq_num in header_seq_num..old_seq_num { + let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; + let old_event_view = old_events[idx].as_view().unwrap(); + debug!( + "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + ); - // metric_events_drop.increment(); + metric_events_drop.increment(); - // match old_event_view { - // EventView::Fill { .. } => { - // fill_update_sender - // .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { - // slot, - // event: old_events[idx], - // write_version, - // status: FillUpdateStatus::Revoke, - // market: mkt_pk_string.clone(), - // queue: evq_pk_string.clone(), - // })) - // .unwrap(); // TODO: use anyhow to bubble up error - // } - // EventView::Out { .. } => { continue } - // } - // } + match old_event_view { + EventView::Fill { .. } => { + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + event: old_events[idx], + write_version, + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + EventView::Out { .. } => continue, + } + } fill_update_sender .try_send(FillEventFilterMessage::SerumCheckpoint( @@ -474,7 +474,7 @@ pub async fn init( let account_write_queue_receiver_c = account_write_queue_receiver.clone(); - let mut chain_cache = ChainData::new(); + let mut chain_cache = ChainData::new(metrics_sender); let mut perp_events_cache: HashMap = HashMap::new(); let mut serum_events_cache: HashMap> = HashMap::new(); let mut seq_num_cache = HashMap::new(); diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 5aa432f..864981d 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -3,16 +3,17 @@ use geyser::geyser_client::GeyserClient; use jsonrpc_core::futures::StreamExt; use jsonrpc_core_client::transports::http; -use solana_account_decoder::UiAccountEncoding; +use solana_account_decoder::{UiAccount, UiAccountEncoding}; use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; -use solana_client::rpc_response::{OptionalContext, Response, RpcKeyedAccount}; -use solana_rpc::{rpc::rpc_accounts::AccountsDataClient}; +use solana_client::rpc_response::{OptionalContext, RpcKeyedAccount}; +use solana_rpc::rpc::rpc_accounts::AccountsDataClient; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; use futures::{future, future::FutureExt}; use tonic::{ - metadata::MetadataValue, Request, - transport::{Channel, Certificate, Identity, ClientTlsConfig}, + metadata::MetadataValue, + transport::{Certificate, Channel, ClientTlsConfig, Identity}, + Request, }; use log::*; @@ -42,16 +43,18 @@ use crate::{ //use solana_geyser_connector_plugin_grpc::compression::zstd_decompress; -type SnapshotData = Response>; - +struct SnapshotData { + slot: u64, + accounts: Vec<(String, Option)>, +} enum Message { GrpcUpdate(geyser::SubscribeUpdate), Snapshot(SnapshotData), } -async fn get_snapshot( +async fn get_snapshot_gpa( rpc_http_url: String, - program_id: Pubkey, + program_id: String, ) -> anyhow::Result>> { let rpc_client = http::connect_with_options::(&rpc_http_url, true) .await @@ -71,16 +74,37 @@ async fn get_snapshot( info!("requesting snapshot {}", program_id); let account_snapshot = rpc_client - .get_program_accounts( - program_id.to_string(), - Some(program_accounts_config.clone()), - ) + .get_program_accounts(program_id.clone(), Some(program_accounts_config.clone())) .await .map_err_anyhow()?; info!("snapshot received {}", program_id); Ok(account_snapshot) } +async fn get_snapshot_gma( + rpc_http_url: String, + ids: Vec, +) -> anyhow::Result>>> { + let rpc_client = http::connect_with_options::(&rpc_http_url, true) + .await + .map_err_anyhow()?; + + let account_info_config = RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig::finalized()), + data_slice: None, + min_context_slot: None, + }; + + info!("requesting snapshot {:?}", ids); + let account_snapshot = rpc_client + .get_multiple_accounts(ids.clone(), Some(account_info_config)) + .await + .map_err_anyhow()?; + info!("snapshot received {:?}", ids); + Ok(account_snapshot) +} + async fn feed_data_geyser( grpc_config: &GrpcSourceConfig, tls_config: Option, @@ -107,25 +131,27 @@ async fn feed_data_geyser( } .connect() .await?; - let token: MetadataValue<_> = "dbbf36253d0b2e6a85618a4ef2fa".parse()?; + let token: MetadataValue<_> = "eed31807f710e4bb098779fb9f67".parse()?; let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| { req.metadata_mut().insert("x-token", token.clone()); Ok(req) }); + // If account_ids are provided, snapshot will be gMA. If only program_ids, then only the first id will be snapshot + // TODO: handle this better + if filter_config.program_ids.len() > 1 { + warn!("only one program id is supported for gPA snapshots") + } let mut accounts = HashMap::new(); accounts.insert( "client".to_owned(), SubscribeRequestFilterAccounts { - account: Vec::new(), + account: filter_config.account_ids.clone(), owner: filter_config.program_ids.clone(), }, ); let mut slots = HashMap::new(); - slots.insert( - "client".to_owned(), - SubscribeRequestFilterSlots {}, - ); + slots.insert("client".to_owned(), SubscribeRequestFilterSlots {}); let blocks = HashMap::new(); let transactions = HashMap::new(); @@ -171,7 +197,8 @@ async fn feed_data_geyser( // which will have "finalized" commitment. let mut rooted_to_finalized_slots = 30; - let mut snapshot_future = future::Fuse::terminated(); + let mut snapshot_gma = future::Fuse::terminated(); + let mut snapshot_gpa = future::Fuse::terminated(); // The plugin sends a ping every 5s or so let fatal_idle_timeout = Duration::from_secs(60); @@ -214,10 +241,13 @@ async fn feed_data_geyser( // drop data for slots that are well beyond rooted slot_pubkey_writes.retain(|&k, _| k >= max_rooted_slot - max_out_of_order_slots); } + if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot { snapshot_needed = false; - for program_id in filter_config.program_ids.clone() { - snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), Pubkey::from_str(&program_id).unwrap())).fuse(); + if filter_config.account_ids.len() > 0 { + snapshot_gma = tokio::spawn(get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone())).fuse(); + } else if filter_config.program_ids.len() > 0 { + snapshot_gpa = tokio::spawn(get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone())).fuse(); } } } @@ -243,7 +273,7 @@ async fn feed_data_geyser( continue; }, }; - + let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes(); let write_version_mapping = pubkey_writes.entry(pubkey_bytes).or_insert(WriteVersion { global: write.write_version, @@ -266,13 +296,43 @@ async fn feed_data_geyser( } sender.send(Message::GrpcUpdate(update)).await.expect("send success"); }, - snapshot = &mut snapshot_future => { + snapshot = &mut snapshot_gma => { + let snapshot = snapshot??; + info!("snapshot is for slot {}, first full slot was {}", snapshot.context.slot, first_full_slot); + if snapshot.context.slot >= first_full_slot { + let accounts: Vec<(String, Option)> = filter_config.account_ids.iter().zip(snapshot.value).map(|x| (x.0.clone(), x.1)).collect(); + sender + .send(Message::Snapshot(SnapshotData { + accounts, + slot: snapshot.context.slot, + })) + .await + .expect("send success"); + } else { + info!( + "snapshot is too old: has slot {}, expected {} minimum", + snapshot.context.slot, + first_full_slot + ); + // try again in another 10 slots + snapshot_needed = true; + rooted_to_finalized_slots += 10; + } + }, + snapshot = &mut snapshot_gpa => { let snapshot = snapshot??; if let OptionalContext::Context(snapshot_data) = snapshot { info!("snapshot is for slot {}, first full slot was {}", snapshot_data.context.slot, first_full_slot); if snapshot_data.context.slot >= first_full_slot { + let accounts: Vec<(String, Option)> = snapshot_data.value.iter().map(|x| { + let deref = x.clone(); + (deref.pubkey, Some(deref.account)) + }).collect(); sender - .send(Message::Snapshot(snapshot_data)) + .send(Message::Snapshot(SnapshotData { + accounts, + slot: snapshot_data.context.slot, + })) .await .expect("send success"); } else { @@ -411,7 +471,7 @@ pub async fn process_events( loop { metric_dedup_queue.set(msg_receiver.len() as u64); let msg = msg_receiver.recv().await.expect("sender must not close"); - use geyser::{subscribe_update::UpdateOneof}; + use geyser::subscribe_update::UpdateOneof; match msg { Message::GrpcUpdate(update) => { match update.update_oneof.expect("invalid grpc") { @@ -421,7 +481,7 @@ pub async fn process_events( None => { // TODO: handle error continue; - }, + } }; assert!(update.pubkey.len() == 32); assert!(update.owner.len() == 32); @@ -460,11 +520,12 @@ pub async fn process_events( metric_slot_updates.increment(); metric_slot_queue.set(slot_queue_sender.len() as u64); - let status = SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v { - SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed, - SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed, - SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted, - }); + let status = + SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v { + SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed, + SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed, + SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted, + }); if status.is_none() { error!("unexpected slot status: {}", update.status); continue; @@ -480,24 +541,29 @@ pub async fn process_events( .await .expect("send success"); } - UpdateOneof::Block(_) => {}, - UpdateOneof::Transaction(_) => {}, + UpdateOneof::Block(_) => {} + UpdateOneof::Transaction(_) => {} } } Message::Snapshot(update) => { metric_snapshots.increment(); info!("processing snapshot..."); - for keyed_account in update.value { + for account in update.accounts.iter() { metric_snapshot_account_writes.increment(); metric_account_queue.set(account_write_queue_sender.len() as u64); - // TODO: Resnapshot on invalid data? - let account: Account = keyed_account.account.decode().unwrap(); - let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap(); - account_write_queue_sender - .send(AccountWrite::from(pubkey, update.context.slot, 0, account)) - .await - .expect("send success"); + match account { + (key, Some(ui_account)) => { + // TODO: Resnapshot on invalid data? + let pubkey = Pubkey::from_str(key).unwrap(); + let account: Account = ui_account.decode().unwrap(); + account_write_queue_sender + .send(AccountWrite::from(pubkey, update.slot, 0, account)) + .await + .expect("send success"); + } + (key, None) => warn!("account not found {}", key), + } } info!("processing snapshot done"); } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index ed77518..df30ca3 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -122,6 +122,7 @@ pub struct SourceConfig { #[derive(Clone, Debug, Deserialize)] pub struct FilterConfig { pub program_ids: Vec, + pub account_ids: Vec, } #[derive(Clone, Debug)] diff --git a/lib/src/orderbook_filter.rs b/lib/src/orderbook_filter.rs index 887d248..70174c7 100644 --- a/lib/src/orderbook_filter.rs +++ b/lib/src/orderbook_filter.rs @@ -126,18 +126,12 @@ pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, base_lot_size: i64) res } -pub fn price_lots_to_ui( - native: i64, - base_decimals: u8, - quote_decimals: u8, -) -> f64 { +pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 { let decimals = base_decimals - quote_decimals; // let res = native as f64 // * ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64) // as f64; - let res = native as f64 - / (10u64.pow(decimals.into())) - as f64; + let res = native as f64 / (10u64.pow(decimals.into())) as f64; res } @@ -158,7 +152,6 @@ pub fn price_lots_to_ui_perp( res } - fn publish_changes( slot: u64, write_version: u64, @@ -172,6 +165,13 @@ fn publish_changes( ) { let mut update: Vec = vec![]; // push diff for levels that are no longer present + if current_bookside.len() != previous_bookside.len() { + info!( + "L {}", + current_bookside.len() as i64 - previous_bookside.len() as i64 + ) + } + for previous_order in previous_bookside.iter() { let peer = current_bookside .iter() @@ -179,7 +179,7 @@ fn publish_changes( match peer { None => { - info!("removed level {}", previous_order[0]); + info!("R {} {}", previous_order[0], previous_order[1]); update.push([previous_order[0], 0f64]); } _ => continue, @@ -197,11 +197,14 @@ fn publish_changes( if previous_order[1] == current_order[1] { continue; } - info!("size changed {} -> {}", previous_order[1], current_order[1]); + info!( + "C {} {} -> {}", + current_order[0], previous_order[1], current_order[1] + ); update.push(current_order.clone()); } None => { - info!("new level {},{}", current_order[0], current_order[1]); + info!("A {} {}", current_order[0], current_order[1]); update.push(current_order.clone()) } } @@ -242,88 +245,6 @@ fn publish_changes( metric_updates.increment(); } -fn publish_changes_serum( - slot: u64, - write_version: u64, - mkt: &(Pubkey, MarketConfig), - side: OrderbookSide, - current_bookside: &Vec, - previous_bookside: &Vec, - maybe_other_bookside: Option<&Vec>, - orderbook_update_sender: &async_channel::Sender, - metric_updates: &mut MetricU64, -) { - let mut update: Vec = vec![]; - - // push diff for levels that are no longer present - for previous_order in previous_bookside.iter() { - let peer = current_bookside - .iter() - .find(|level| previous_order[0] == level[0]); - - match peer { - None => { - info!("removed level s {}", previous_order[0]); - update.push([previous_order[0], 0f64]); - } - _ => continue, - } - } - - // push diff where there's a new level or size has changed - for current_order in current_bookside { - let peer = previous_bookside - .iter() - .find(|item| item[0] == current_order[0]); - - match peer { - Some(previous_order) => { - if previous_order[1] == current_order[1] { - continue; - } - info!("size changed {} -> {}", previous_order[1], current_order[1]); - update.push(current_order.clone()); - } - None => { - info!("new level {},{}", current_order[0], current_order[1]); - update.push(current_order.clone()) - } - } - } - - match maybe_other_bookside { - Some(other_bookside) => { - let (bids, asks) = match side { - OrderbookSide::Bid => (current_bookside, other_bookside), - OrderbookSide::Ask => (other_bookside, current_bookside), - }; - orderbook_update_sender - .try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint { - slot, - write_version, - bids: bids.clone(), - asks: asks.clone(), - market: mkt.0.to_string(), - })) - .unwrap() - } - None => info!("other bookside not in cache"), - } - - if update.len() > 0 { - orderbook_update_sender - .try_send(OrderbookFilterMessage::Update(OrderbookUpdate { - market: mkt.0.to_string(), - side: side.clone(), - update, - slot, - write_version, - })) - .unwrap(); // TODO: use anyhow to bubble up error - metric_updates.increment(); - } -} - pub async fn init( market_configs: Vec<(Pubkey, MarketConfig)>, serum_market_configs: Vec<(Pubkey, MarketConfig)>, @@ -352,7 +273,7 @@ pub async fn init( let account_write_queue_receiver_c = account_write_queue_receiver.clone(); - let mut chain_cache = ChainData::new(); + let mut chain_cache = ChainData::new(metrics_sender); let mut bookside_cache: HashMap> = HashMap::new(); let mut serum_bookside_cache: HashMap> = HashMap::new(); let mut last_write_versions = HashMap::::new(); @@ -412,7 +333,7 @@ pub async fn init( let write_version = (account_info.slot, account_info.write_version); // todo: should this be <= so we don't overwrite with old data received late? - if write_version == *last_write_version { + if write_version <= *last_write_version { continue; } last_write_versions.insert(side_pk_string.clone(), write_version); @@ -492,11 +413,11 @@ pub async fn init( let write_version = (account_info.slot, account_info.write_version); // todo: should this be <= so we don't overwrite with old data received late? - if write_version == *last_write_version { + if write_version <= *last_write_version { continue; } last_write_versions.insert(side_pk_string.clone(), write_version); - + info!("W {}", mkt.1.name); let account = &mut account_info.account.clone(); let data = account.data_as_mut_slice(); let len = data.len(); @@ -532,7 +453,7 @@ pub async fn init( serum_bookside_cache.get(&other_side_pk.to_string()); match serum_bookside_cache.get(&side_pk_string) { - Some(old_bookside) => publish_changes_serum( + Some(old_bookside) => publish_changes( account_info.slot, account_info.write_version, mkt, diff --git a/lib/src/websocket_source.rs b/lib/src/websocket_source.rs index beb8d5b..765f21f 100644 --- a/lib/src/websocket_source.rs +++ b/lib/src/websocket_source.rs @@ -6,9 +6,7 @@ use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_response::{OptionalContext, Response, RpcKeyedAccount}, }; -use solana_rpc::{ - rpc::rpc_accounts::AccountsDataClient, rpc_pubsub::RpcSolPubSubClient, -}; +use solana_rpc::{rpc::rpc_accounts::AccountsDataClient, rpc_pubsub::RpcSolPubSubClient}; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; use log::*; diff --git a/service-mango-fills/Cargo.toml b/service-mango-fills/Cargo.toml index 2e94ff9..24fc4a0 100644 --- a/service-mango-fills/Cargo.toml +++ b/service-mango-fills/Cargo.toml @@ -23,6 +23,7 @@ async-trait = "0.1" tokio = { version = "1", features = ["full"] } tokio-tungstenite = "0.17" bytemuck = "1.7.2" +jemallocator = "0.3.2" mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index a41d288..f4c7f08 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -43,6 +43,11 @@ type CheckpointMap = Arc>>; type SerumCheckpointMap = Arc>>; type PeerMap = Arc>>; +// jemalloc seems to be better at keeping the memory footprint reasonable over +// longer periods of time +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + #[derive(Clone, Debug, Deserialize)] #[serde(tag = "command")] pub enum Command { @@ -127,27 +132,25 @@ async fn handle_connection( ); } - let receive_commands = ws_rx.try_for_each(|msg| { - match msg { - Message::Text(_) => { - handle_commands( - addr, - msg, - peer_map.clone(), - checkpoint_map.clone(), - serum_checkpoint_map.clone(), - market_ids.clone(), - ) - }, - Message::Ping(_) => { - let peers = peer_map.clone(); - let mut peers_lock = peers.lock().unwrap(); - let peer = peers_lock.get_mut(&addr).expect("peer should be in map"); - peer.sender.unbounded_send(Message::Pong(Vec::new())).unwrap(); - future::ready(Ok(())) - } - _ => future::ready(Ok(())), + let receive_commands = ws_rx.try_for_each(|msg| match msg { + Message::Text(_) => handle_commands( + addr, + msg, + peer_map.clone(), + checkpoint_map.clone(), + serum_checkpoint_map.clone(), + market_ids.clone(), + ), + Message::Ping(_) => { + let peers = peer_map.clone(); + let mut peers_lock = peers.lock().unwrap(); + let peer = peers_lock.get_mut(&addr).expect("peer should be in map"); + peer.sender + .unbounded_send(Message::Pong(Vec::new())) + .unwrap(); + future::ready(Ok(())) } + _ => future::ready(Ok(())), }); let forward_updates = chan_rx.map(Ok).forward(ws_tx); @@ -454,40 +457,40 @@ async fn main() -> anyhow::Result<()> { let try_socket = TcpListener::bind(&config.bind_ws_addr).await; let listener = try_socket.expect("Failed to bind"); { - tokio::spawn(async move { - // Let's spawn the handling of each connection in a separate task. - while let Ok((stream, addr)) = listener.accept().await { - tokio::spawn(handle_connection_error( - checkpoints.clone(), - serum_checkpoints.clone(), - peers.clone(), - market_pubkey_strings.clone(), - stream, - addr, - metrics_opened_connections.clone(), - metrics_closed_connections.clone(), - )); - } - }); + tokio::spawn(async move { + // Let's spawn the handling of each connection in a separate task. + while let Ok((stream, addr)) = listener.accept().await { + tokio::spawn(handle_connection_error( + checkpoints.clone(), + serum_checkpoints.clone(), + peers.clone(), + market_pubkey_strings.clone(), + stream, + addr, + metrics_opened_connections.clone(), + metrics_closed_connections.clone(), + )); + } + }); } // keepalive { - tokio::spawn(async move { - let mut write_interval = time::interval(time::Duration::from_secs(30)); + tokio::spawn(async move { + let mut write_interval = time::interval(time::Duration::from_secs(30)); - loop { - write_interval.tick().await; - let peers_copy = peers_ref_thread1.lock().unwrap().clone(); - for (addr, peer) in peers_copy.iter() { - let pl = Vec::new(); - let result = peer.clone().sender.send(Message::Ping(pl)).await; - if result.is_err() { - error!("ws ping could not reach {}", addr); + loop { + write_interval.tick().await; + let peers_copy = peers_ref_thread1.lock().unwrap().clone(); + for (addr, peer) in peers_copy.iter() { + let pl = Vec::new(); + let result = peer.clone().sender.send(Message::Ping(pl)).await; + if result.is_err() { + error!("ws ping could not reach {}", addr); + } } } - } - }); + }); } info!( "rpc connect: {}", @@ -499,11 +502,11 @@ async fn main() -> anyhow::Result<()> { .collect::() ); let use_geyser = true; + let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat(); + let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect(); let filter_config = FilterConfig { - program_ids: vec![ - "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(), - "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(), - ], + program_ids: vec![], + account_ids: relevant_pubkeys, }; if use_geyser { grpc_plugin_source::process_events( diff --git a/service-mango-orderbook/src/main.rs b/service-mango-orderbook/src/main.rs index 838da03..ee5b280 100644 --- a/service-mango-orderbook/src/main.rs +++ b/service-mango-orderbook/src/main.rs @@ -285,19 +285,23 @@ async fn main() -> anyhow::Result<()> { &Keypair::new(), Some(rpc_timeout), ); - let group_context = Arc::new(MangoGroupContext::new_from_rpc( - &client.rpc_async(), - Pubkey::from_str(&config.mango_group).unwrap(), - ).await?); + let group_context = Arc::new( + MangoGroupContext::new_from_rpc( + &client.rpc_async(), + Pubkey::from_str(&config.mango_group).unwrap(), + ) + .await?, + ); // todo: reload markets at intervals let market_configs: Vec<(Pubkey, MarketConfig)> = group_context .perp_markets .iter() .map(|(_, context)| { - let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index) { + let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index) + { Some(token) => token.decimals, - None => panic!("token not found for market") // todo: default to 6 for usdc? + None => panic!("token not found for market"), // todo: default to 6 for usdc? }; ( context.address, @@ -320,11 +324,11 @@ async fn main() -> anyhow::Result<()> { .map(|(_, context)| { let base_decimals = match group_context.tokens.get(&context.market.base_token_index) { Some(token) => token.decimals, - None => panic!("token not found for market") // todo: default? + None => panic!("token not found for market"), // todo: default? }; let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) { Some(token) => token.decimals, - None => panic!("token not found for market") // todo: default to 6 for usdc? + None => panic!("token not found for market"), // todo: default to 6 for usdc? }; ( context.market.serum_market_external, @@ -341,13 +345,14 @@ async fn main() -> anyhow::Result<()> { }) .collect(); - let market_pubkey_strings: HashMap = [market_configs.clone(), serum_market_configs.clone()] - .concat() - .iter() - .map(|market| (market.0.to_string(), market.1.name.clone())) - .collect::>() - .into_iter() - .collect(); + let market_pubkey_strings: HashMap = + [market_configs.clone(), serum_market_configs.clone()] + .concat() + .iter() + .map(|market| (market.0.to_string(), market.1.name.clone())) + .collect::>() + .into_iter() + .collect(); let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) = orderbook_filter::init(market_configs, serum_market_configs, metrics_tx.clone()).await?; @@ -422,6 +427,7 @@ async fn main() -> anyhow::Result<()> { "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(), "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(), ], + account_ids: vec![], }; grpc_plugin_source::process_events( &config.source, diff --git a/service-mango-pnl/src/main.rs b/service-mango-pnl/src/main.rs index 1e36db6..b6a68e4 100644 --- a/service-mango-pnl/src/main.rs +++ b/service-mango-pnl/src/main.rs @@ -119,7 +119,9 @@ fn start_pnl_updater( } let pnl_vals = - compute_pnl(context.clone(), account_fetcher.clone(), &mango_account).await.unwrap(); + compute_pnl(context.clone(), account_fetcher.clone(), &mango_account) + .await + .unwrap(); // Alternatively, we could prepare the sorted and limited lists for each // market here. That would be faster and cause less contention on the pnl_data @@ -247,10 +249,13 @@ async fn main() -> anyhow::Result<()> { &Keypair::new(), Some(rpc_timeout), ); - let group_context = Arc::new(MangoGroupContext::new_from_rpc( - &client.rpc_async(), - Pubkey::from_str(&config.pnl.mango_group).unwrap(), - ).await?); + let group_context = Arc::new( + MangoGroupContext::new_from_rpc( + &client.rpc_async(), + Pubkey::from_str(&config.pnl.mango_group).unwrap(), + ) + .await?, + ); let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new())); let account_fetcher = Arc::new(chain_data::AccountFetcher { chain_data: chain_data.clone(), @@ -265,7 +270,7 @@ async fn main() -> anyhow::Result<()> { metrics_tx.register_u64("pnl_jsonrpc_reqs_invalid_total".into(), MetricType::Counter); let metrics_pnls_tracked = metrics_tx.register_u64("pnl_num_tracked".into(), MetricType::Gauge); - let chain_data = Arc::new(RwLock::new(ChainData::new())); + let chain_data = Arc::new(RwLock::new(ChainData::new(metrics_tx.clone()))); let pnl_data = Arc::new(RwLock::new(PnlData::new())); start_pnl_updater( @@ -288,9 +293,8 @@ async fn main() -> anyhow::Result<()> { // start filling chain_data from the grpc plugin source let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?; let filter_config = FilterConfig { - program_ids: vec![ - "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(), - ], + program_ids: vec!["4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into()], + account_ids: vec![], }; grpc_plugin_source::process_events( &config.source, From f88ec6a53ce084c3a7e1b687fa4f377fcd88f1ca Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Fri, 20 Jan 2023 16:50:19 +0000 Subject: [PATCH 3/5] Add new common event schema --- Cargo.lock | 1 - lib/src/fill_event_filter.rs | 348 +++++++++++++++++++++-------------- 2 files changed, 209 insertions(+), 140 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7bc001..0df8624 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5771,7 +5771,6 @@ dependencies = [ "futures-core", "futures-util", "itertools 0.10.5", - "jemallocator", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", "log 0.4.17", diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index b108166..5371ad6 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -1,12 +1,12 @@ use crate::{ chain_data::{AccountData, ChainData, SlotData}, metrics::{MetricType, Metrics}, - AccountWrite, SlotUpdate, + AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide, }; -use bytemuck::{Pod, Zeroable}; +use bytemuck::{Pod, Zeroable, cast_slice}; use log::*; use serde::{ser::SerializeStruct, Serialize, Serializer}; -use serum_dex::state::EventView; +use serum_dex::state::EventView as SpotEvent; use solana_sdk::{ account::{ReadableAccount, WritableAccount}, clock::Epoch, @@ -15,13 +15,14 @@ use solana_sdk::{ use std::{ borrow::BorrowMut, cmp::max, - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet}, convert::identity, time::SystemTime, }; use crate::metrics::MetricU64; use anchor_lang::AccountDeserialize; use mango_v4::state::{ - AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent, MAX_NUM_EVENTS, + AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent as PerpFillEvent, Side, + MAX_NUM_EVENTS, }; #[derive(Clone, Copy, Debug, Serialize)] @@ -30,19 +31,122 @@ pub enum FillUpdateStatus { Revoke, } -#[derive(Clone, Debug)] -pub struct FillUpdate { - pub event: FillEvent, - pub status: FillUpdateStatus, - pub market: String, - pub queue: String, - pub slot: u64, - pub write_version: u64, +#[derive(Clone, Copy, Debug, Serialize)] +pub enum FillEventType { + Spot, + Perp, } #[derive(Clone, Debug)] -pub struct SerumFillUpdate { - pub event: serum_dex::state::Event, +pub struct FillEvent { + pub event_type: FillEventType, + pub maker: bool, + pub side: OrderbookSide, + pub timestamp: u64, + pub seq_num: u64, + pub owner: String, + pub order_id: u128, + pub client_order_id: u64, + pub fee: f32, + pub price: i64, + pub quantity: i64, +} + +impl Serialize for FillEvent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("FillEvent", 12)?; + state.serialize_field("eventType", &self.event_type)?; + state.serialize_field("maker", &self.maker)?; + state.serialize_field("side", &self.side)?; + state.serialize_field("timestamp", &self.timestamp)?; + state.serialize_field("seqNum", &self.seq_num)?; + state.serialize_field("owner", &self.owner)?; + state.serialize_field("orderId", &self.order_id)?; + state.serialize_field("clientOrderId", &self.client_order_id)?; + state.serialize_field("fee", &self.fee)?; + state.serialize_field("price", &self.price)?; + state.serialize_field("quantity", &self.quantity)?; + state.end() + } +} + +impl FillEvent { + pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] { + let taker_side = match event.taker_side() { + Side::Ask => OrderbookSide::Ask, + Side::Bid => OrderbookSide::Bid, + }; + let maker_side = match event.taker_side() { + Side::Ask => OrderbookSide::Bid, + Side::Bid => OrderbookSide::Ask, + }; + [FillEvent { + event_type: FillEventType::Perp, + maker: true, + side: maker_side, + timestamp: event.timestamp, + seq_num: event.seq_num, + owner: event.maker.to_string(), + order_id: event.maker_order_id, + client_order_id: 0u64, + fee: event.maker_fee.to_num(), + price: event.price, + quantity: event.quantity, + }, + FillEvent { + event_type: FillEventType::Perp, + maker: false, + side: taker_side, + timestamp: event.timestamp, + seq_num: event.seq_num, + owner: event.taker.to_string(), + order_id: event.taker_order_id, + client_order_id: event.taker_client_order_id, + fee: event.taker_fee.to_num(), + price: event.price, + quantity: event.quantity, + + }] + } + pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self { + match event { + SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => { + let side = match side as u8 { + 0 => OrderbookSide::Bid, + 1 => OrderbookSide::Ask, + _ => panic!("invalid side"), + }; + let client_order_id: u64 = match client_order_id { + Some(id) => id.into(), + None => 0u64, + }; + // TODO: native to ui + let price = (native_qty_paid / native_qty_received) as i64; + FillEvent { + event_type: FillEventType::Spot, + maker: maker, + side, + timestamp, + seq_num, + owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(), + order_id: order_id, + client_order_id: client_order_id, + fee: native_fee_or_rebate as f32, + price, + quantity: native_qty_received as i64, + } + } + SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")} + } + } +} + +#[derive(Clone, Debug)] +pub struct FillUpdate { + pub event: FillEvent, pub status: FillUpdateStatus, pub market: String, pub queue: String, @@ -66,27 +170,8 @@ impl Serialize for FillUpdate { where S: Serializer, { - let event = base64::encode(bytemuck::bytes_of(&self.event)); let mut state = serializer.serialize_struct("FillUpdate", 4)?; - state.serialize_field("event", &event)?; - state.serialize_field("market", &self.market)?; - state.serialize_field("queue", &self.queue)?; - state.serialize_field("status", &self.status)?; - state.serialize_field("slot", &self.slot)?; - state.serialize_field("write_version", &self.write_version)?; - - state.end() - } -} - -impl Serialize for SerumFillUpdate { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let event = base64::encode(bytemuck::bytes_of(&self.event)); - let mut state = serializer.serialize_struct("SerumFillUpdate", 4)?; - state.serialize_field("event", &event)?; + state.serialize_field("event", &self.event)?; state.serialize_field("market", &self.market)?; state.serialize_field("queue", &self.queue)?; state.serialize_field("status", &self.status)?; @@ -106,48 +191,13 @@ pub struct FillCheckpoint { pub write_version: u64, } -#[derive(Clone, Debug)] -pub struct SerumFillCheckpoint { - pub market: String, - pub queue: String, - pub events: Vec, - pub slot: u64, - pub write_version: u64, -} - impl Serialize for FillCheckpoint { fn serialize(&self, serializer: S) -> Result where S: Serializer, { - let events: Vec = self - .events - .iter() - .map(|e| base64::encode(bytemuck::bytes_of(e))) - .collect(); let mut state = serializer.serialize_struct("FillCheckpoint", 3)?; - state.serialize_field("events", &events)?; - state.serialize_field("market", &self.market)?; - state.serialize_field("queue", &self.queue)?; - state.serialize_field("slot", &self.slot)?; - state.serialize_field("write_version", &self.write_version)?; - - state.end() - } -} - -impl Serialize for SerumFillCheckpoint { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let events: Vec = self - .events - .iter() - .map(|e| base64::encode(bytemuck::bytes_of(e))) - .collect(); - let mut state = serializer.serialize_struct("SerumFillCheckpoint", 3)?; - state.serialize_field("events", &events)?; + state.serialize_field("events", &self.events)?; state.serialize_field("market", &self.market)?; state.serialize_field("queue", &self.queue)?; state.serialize_field("slot", &self.slot)?; @@ -159,9 +209,7 @@ impl Serialize for SerumFillCheckpoint { pub enum FillEventFilterMessage { Update(FillUpdate), - SerumUpdate(SerumFillUpdate), Checkpoint(FillCheckpoint), - SerumCheckpoint(SerumFillCheckpoint), } // couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue @@ -205,18 +253,22 @@ fn publish_changes_perp( // new fills are published and recorded in checkpoint if events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(events[idx]); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill, - status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(fill); + let fill: PerpFillEvent = bytemuck::cast(events[idx]); + let fills = FillEvent::new_from_perp(fill); + // send event for both maker and taker + for fill in fills { + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill.clone(), + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(fill); + } } } else if old_events[idx].event_type != events[idx].event_type || old_events[idx].padding != events[idx].padding @@ -230,39 +282,48 @@ fn publish_changes_perp( // first revoke old event if a fill if old_events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(old_events[idx]); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill, - status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error + let fill: PerpFillEvent = bytemuck::cast(old_events[idx]); + let fills = FillEvent::new_from_perp(fill); + for fill in fills { + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill, + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } } // then publish new if its a fill and record in checkpoint if events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(events[idx]); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: fill, - status: FillUpdateStatus::New, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(fill); + let fill: PerpFillEvent = bytemuck::cast(events[idx]); + let fills = FillEvent::new_from_perp(fill); + for fill in fills { + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill.clone(), + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(fill); + } } } else { // every already published event is recorded in checkpoint if a fill if events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(events[idx]); - checkpoint.push(fill); + let fill: PerpFillEvent = bytemuck::cast(events[idx]); + let fills = FillEvent::new_from_perp(fill); + for fill in fills { + checkpoint.push(fill); + } } } } @@ -278,17 +339,20 @@ fn publish_changes_perp( metric_events_drop.increment(); if old_events[idx].event_type == EventType::Fill as u8 { - let fill: FillEvent = bytemuck::cast(old_events[idx]); - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - event: fill, - write_version, - status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error + let fill: PerpFillEvent = bytemuck::cast(old_events[idx]); + let fills = FillEvent::new_from_perp(fill); + for fill in fills { + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + event: fill, + write_version, + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } } } @@ -325,42 +389,46 @@ fn publish_changes_serum( let evq_pk_string = mkt.1.to_string(); let header_seq_num = header.seq_num; debug!("start seq {} header seq {}", start_seq_num, header_seq_num); + + // Timestamp for spot events is time scraped + let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); for seq_num in start_seq_num..header_seq_num { let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; let event_view = events[idx].as_view().unwrap(); let old_event_view = old_events[idx].as_view().unwrap(); match event_view { - EventView::Fill { .. } => { + SpotEvent::Fill { .. } => { // there are three possible cases: // 1) the event is past the old seq num, hence guaranteed new event // 2) the event is not matching the old event queue // 3) all other events are matching the old event queue // the order of these checks is important so they are exhaustive + let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num); if seq_num >= old_seq_num { debug!("found new serum fill {} idx {}", mkt_pk_string, idx,); metric_events_new.increment(); fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + .try_send(FillEventFilterMessage::Update(FillUpdate { slot, write_version, - event: events[idx], + event: fill.clone(), status: FillUpdateStatus::New, market: mkt_pk_string.clone(), queue: evq_pk_string.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(events[idx]); + checkpoint.push(fill); continue; } match old_event_view { - EventView::Fill { .. } => { + SpotEvent::Fill { .. } => { // every already published event is recorded in checkpoint - checkpoint.push(events[idx]); + checkpoint.push(fill); } - EventView::Out { .. } => { + SpotEvent::Out { .. } => { debug!( "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num @@ -368,12 +436,13 @@ fn publish_changes_serum( metric_events_change.increment(); + let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); // first revoke old event fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + .try_send(FillEventFilterMessage::Update(FillUpdate { slot, write_version, - event: old_events[idx], + event: old_fill, status: FillUpdateStatus::Revoke, market: mkt_pk_string.clone(), queue: evq_pk_string.clone(), @@ -382,16 +451,16 @@ fn publish_changes_serum( // then publish new if its a fill and record in checkpoint fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + .try_send(FillEventFilterMessage::Update(FillUpdate { slot, write_version, - event: events[idx], + event: fill.clone(), status: FillUpdateStatus::New, market: mkt_pk_string.clone(), queue: evq_pk_string.clone(), })) .unwrap(); // TODO: use anyhow to bubble up error - checkpoint.push(events[idx]); + checkpoint.push(fill); } } } @@ -411,11 +480,12 @@ fn publish_changes_serum( metric_events_drop.increment(); match old_event_view { - EventView::Fill { .. } => { + SpotEvent::Fill { .. } => { + let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); fill_update_sender - .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + .try_send(FillEventFilterMessage::Update(FillUpdate { slot, - event: old_events[idx], + event: old_fill, write_version, status: FillUpdateStatus::Revoke, market: mkt_pk_string.clone(), @@ -423,13 +493,13 @@ fn publish_changes_serum( })) .unwrap(); // TODO: use anyhow to bubble up error } - EventView::Out { .. } => continue, + SpotEvent::Out { .. } => continue, } } fill_update_sender - .try_send(FillEventFilterMessage::SerumCheckpoint( - SerumFillCheckpoint { + .try_send(FillEventFilterMessage::Checkpoint( + FillCheckpoint { slot, write_version, events: checkpoint, From 8d6b6f5f38ac9b9b630712a76b25951ac87c4577 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Fri, 20 Jan 2023 16:50:39 +0000 Subject: [PATCH 4/5] Fix serum event change detection --- lib/src/fill_event_filter.rs | 56 +++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 5371ad6..2af48b7 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -424,32 +424,54 @@ fn publish_changes_serum( } match old_event_view { - SpotEvent::Fill { .. } => { - // every already published event is recorded in checkpoint + SpotEvent::Fill { order_id, .. } => { + if order_id != fill.order_id { + debug!( + "found changed id event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + ); + + metric_events_change.increment(); + + + let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); + // first revoke old event + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: old_fill, + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + + // then publish new + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + slot, + write_version, + event: fill.clone(), + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + + // record new event in checkpoint checkpoint.push(fill); } SpotEvent::Out { .. } => { debug!( - "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", + "found changed type event {} idx {} seq_num {} header seq num {} old seq num {}", mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num ); metric_events_change.increment(); - let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num); - // first revoke old event - fill_update_sender - .try_send(FillEventFilterMessage::Update(FillUpdate { - slot, - write_version, - event: old_fill, - status: FillUpdateStatus::Revoke, - market: mkt_pk_string.clone(), - queue: evq_pk_string.clone(), - })) - .unwrap(); // TODO: use anyhow to bubble up error - - // then publish new if its a fill and record in checkpoint + // publish new fill and record in checkpoint fill_update_sender .try_send(FillEventFilterMessage::Update(FillUpdate { slot, From 1233cc58f31f39e5be97bc266d5eb89707d4b088 Mon Sep 17 00:00:00 2001 From: Riordan Panayides Date: Fri, 20 Jan 2023 17:29:02 +0000 Subject: [PATCH 5/5] Update chaindata metric names, finish serum events unified schema --- lib/src/chain_data.rs | 6 ++--- service-mango-fills/src/main.rs | 47 +++------------------------------ 2 files changed, 6 insertions(+), 47 deletions(-) diff --git a/lib/src/chain_data.rs b/lib/src/chain_data.rs index ccdd543..f4a3888 100644 --- a/lib/src/chain_data.rs +++ b/lib/src/chain_data.rs @@ -56,15 +56,15 @@ impl ChainData { account_versions_stored: 0, account_bytes_stored: 0, metric_accounts_stored: metrics_sender.register_u64( - "fills_feed_chaindata_accounts_stored".into(), + "chaindata_accounts_stored".into(), MetricType::Gauge, ), metric_account_versions_stored: metrics_sender.register_u64( - "fills_feed_chaindata_account_versions_stored".into(), + "chaindata_account_versions_stored".into(), MetricType::Gauge, ), metric_account_bytes_stored: metrics_sender.register_u64( - "fills_feed_chaindata_account_bytes_stored".into(), + "chaindata_account_bytes_stored".into(), MetricType::Gauge, ), } diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index f4c7f08..158c618 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -30,7 +30,6 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use serde::Deserialize; use solana_geyser_connector_lib::{ - fill_event_filter::SerumFillCheckpoint, metrics::{MetricType, MetricU64}, FilterConfig, StatusResponse, }; @@ -40,7 +39,6 @@ use solana_geyser_connector_lib::{ }; type CheckpointMap = Arc>>; -type SerumCheckpointMap = Arc>>; type PeerMap = Arc>>; // jemalloc seems to be better at keeping the memory footprint reasonable over @@ -79,7 +77,6 @@ pub struct Peer { async fn handle_connection_error( checkpoint_map: CheckpointMap, - serum_checkpoint_map: SerumCheckpointMap, peer_map: PeerMap, market_ids: HashMap, raw_stream: TcpStream, @@ -91,7 +88,6 @@ async fn handle_connection_error( let result = handle_connection( checkpoint_map, - serum_checkpoint_map, peer_map.clone(), market_ids, raw_stream, @@ -109,7 +105,6 @@ async fn handle_connection_error( async fn handle_connection( checkpoint_map: CheckpointMap, - serum_checkpoint_map: SerumCheckpointMap, peer_map: PeerMap, market_ids: HashMap, raw_stream: TcpStream, @@ -138,7 +133,6 @@ async fn handle_connection( msg, peer_map.clone(), checkpoint_map.clone(), - serum_checkpoint_map.clone(), market_ids.clone(), ), Message::Ping(_) => { @@ -167,7 +161,6 @@ fn handle_commands( msg: Message, peer_map: PeerMap, checkpoint_map: CheckpointMap, - serum_checkpoint_map: SerumCheckpointMap, market_ids: HashMap, ) -> Ready> { let msg_str = msg.clone().into_text().unwrap(); @@ -209,7 +202,6 @@ fn handle_commands( if subscribed { let checkpoint_map = checkpoint_map.lock().unwrap(); - let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap(); let checkpoint = checkpoint_map.get(&market_id); match checkpoint { Some(checkpoint) => { @@ -219,17 +211,8 @@ fn handle_commands( )) .unwrap(); } - None => match serum_checkpoint_map.get(&market_id) { - Some(checkpoint) => { - peer.sender - .unbounded_send(Message::Text( - serde_json::to_string(&checkpoint).unwrap(), - )) - .unwrap(); - } - None => info!("no checkpoint available on client subscription"), - }, - } + None => info!("no checkpoint available on client subscription"), + }; } } Ok(Command::Unsubscribe(cmd)) => { @@ -393,11 +376,9 @@ async fn main() -> anyhow::Result<()> { .await?; let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new())); - let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new())); let peers = PeerMap::new(Mutex::new(HashMap::new())); let checkpoints_ref_thread = checkpoints.clone(); - let serum_checkpoints_ref_thread = serum_checkpoints.clone(); let peers_ref_thread = peers.clone(); let peers_ref_thread1 = peers.clone(); @@ -408,7 +389,7 @@ async fn main() -> anyhow::Result<()> { let message = fill_receiver.recv().await.unwrap(); match message { FillEventFilterMessage::Update(update) => { - debug!("ws update {} {:?} fill", update.market, update.status); + debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type); let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); for (addr, peer) in peer_copy.iter_mut() { let json = serde_json::to_string(&update).unwrap(); @@ -428,27 +409,6 @@ async fn main() -> anyhow::Result<()> { .unwrap() .insert(checkpoint.queue.clone(), checkpoint); } - FillEventFilterMessage::SerumUpdate(update) => { - debug!("ws update {} {:?} serum fill", update.market, update.status); - let mut peers_copy = peers_ref_thread.lock().unwrap().clone(); - for (addr, peer) in peers_copy.iter_mut() { - let json = serde_json::to_string(&update).unwrap(); - - // only send updates if the peer is subscribed - if peer.subscriptions.contains(&update.market) { - let result = peer.sender.send(Message::Text(json)).await; - if result.is_err() { - error!("ws update {} fill could not reach {}", update.market, addr); - } - } - } - } - FillEventFilterMessage::SerumCheckpoint(checkpoint) => { - serum_checkpoints_ref_thread - .lock() - .unwrap() - .insert(checkpoint.queue.clone(), checkpoint); - } } } }); @@ -462,7 +422,6 @@ async fn main() -> anyhow::Result<()> { while let Ok((stream, addr)) = listener.accept().await { tokio::spawn(handle_connection_error( checkpoints.clone(), - serum_checkpoints.clone(), peers.clone(), market_pubkey_strings.clone(), stream,