diff --git a/Cargo.lock b/Cargo.lock index f099d04..c7bc001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2576,6 +2576,27 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" +[[package]] +name = "jemalloc-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d3b9f3f5c9b31aa0f5ed3260385ac205db665baa41d49bb8338008ae94ede45" +dependencies = [ + "cc", + "fs_extra", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43ae63fcfc45e99ab3d1b29a46782ad679e98436c3169d15a167a1108a724b69" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "jobserver" version = "0.1.25" @@ -5150,6 +5171,7 @@ dependencies = [ "client", "futures-channel", "futures-util", + "jemallocator", "log 0.4.17", "mango-v4", "serde", @@ -5749,6 +5771,7 @@ dependencies = [ "futures-core", "futures-util", "itertools 0.10.5", + "jemallocator", "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", "log 0.4.17", diff --git a/lib/src/chain_data.rs b/lib/src/chain_data.rs index eaa770f..ccdd543 100644 --- a/lib/src/chain_data.rs +++ b/lib/src/chain_data.rs @@ -1,5 +1,9 @@ +use crate::metrics::{MetricType, MetricU64, Metrics}; + use { - solana_sdk::account::{AccountSharedData, ReadableAccount}, solana_sdk::pubkey::Pubkey, std::collections::HashMap, + solana_sdk::account::{AccountSharedData, ReadableAccount}, + solana_sdk::pubkey::Pubkey, + std::collections::HashMap, }; #[derive(Clone, Copy, Debug, PartialEq)] @@ -37,10 +41,13 @@ pub struct ChainData { newest_processed_slot: u64, account_versions_stored: usize, account_bytes_stored: usize, + metric_accounts_stored: MetricU64, + metric_account_versions_stored: MetricU64, + metric_account_bytes_stored: MetricU64, } impl ChainData { - pub fn new() -> Self { + pub fn new(metrics_sender: Metrics) -> Self { Self { slots: HashMap::new(), accounts: HashMap::new(), @@ -48,6 +55,18 @@ impl ChainData { newest_processed_slot: 0, account_versions_stored: 0, account_bytes_stored: 0, + metric_accounts_stored: metrics_sender.register_u64( + "fills_feed_chaindata_accounts_stored".into(), + MetricType::Gauge, + ), + metric_account_versions_stored: metrics_sender.register_u64( + "fills_feed_chaindata_account_versions_stored".into(), + MetricType::Gauge, + ), + metric_account_bytes_stored: metrics_sender.register_u64( + "fills_feed_chaindata_account_bytes_stored".into(), + MetricType::Gauge, + ), } } @@ -128,15 +147,21 @@ impl ChainData { writes .retain(|w| w.slot == newest_rooted_write || w.slot > self.newest_rooted_slot); self.account_versions_stored += writes.len(); - self.account_bytes_stored += writes.iter().map(|w| w.account.data().len()).fold(0, |acc, l| acc + l) + self.account_bytes_stored += writes + .iter() + .map(|w| w.account.data().len()) + .fold(0, |acc, l| acc + l) } // now it's fine to drop any slots before the new rooted head // as account writes for non-rooted slots before it have been dropped self.slots.retain(|s, _| *s >= self.newest_rooted_slot); - // TODO: move this to prom - println!("[chain_data] account_versions_stored = {} account_bytes_stored = {}", self.account_versions_stored, self.account_bytes_stored); + self.metric_accounts_stored.set(self.accounts.len() as u64); + self.metric_account_versions_stored + .set(self.account_versions_stored as u64); + self.metric_account_bytes_stored + .set(self.account_bytes_stored as u64); } } diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 9508fdc..b108166 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -355,77 +355,77 @@ fn publish_changes_serum( continue; } - // match old_event_view { - // EventView::Fill { .. } => { - // // every already published event is recorded in checkpoint - // checkpoint.push(events[idx]); - // } - // EventView::Out { .. } => { - // debug!( - // "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", - // mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num - // ); + match old_event_view { + EventView::Fill { .. } => { + // every already published event is recorded in checkpoint + checkpoint.push(events[idx]); + } + EventView::Out { .. } => { + debug!( + "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + ); - // metric_events_change.increment(); + metric_events_change.increment(); - // // first revoke old event - // fill_update_sender - // .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { - // slot, - // write_version, - // event: old_events[idx], - // status: FillUpdateStatus::Revoke, - // market: mkt_pk_string.clone(), - // queue: evq_pk_string.clone(), - // })) - // .unwrap(); // TODO: use anyhow to bubble up error + // first revoke old event + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + write_version, + event: old_events[idx], + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error - // // then publish new if its a fill and record in checkpoint - // fill_update_sender - // .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { - // slot, - // write_version, - // event: events[idx], - // status: FillUpdateStatus::New, - // market: mkt_pk_string.clone(), - // queue: evq_pk_string.clone(), - // })) - // .unwrap(); // TODO: use anyhow to bubble up error - // checkpoint.push(events[idx]); - // } - // } + // then publish new if its a fill and record in checkpoint + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + write_version, + event: events[idx], + status: FillUpdateStatus::New, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(events[idx]); + } + } } _ => continue, } } - // // in case queue size shrunk due to a fork we need revoke all previous fills - // for seq_num in header_seq_num..old_seq_num { - // let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; - // let old_event_view = old_events[idx].as_view().unwrap(); - // debug!( - // "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", - // mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num - // ); + // in case queue size shrunk due to a fork we need revoke all previous fills + for seq_num in header_seq_num..old_seq_num { + let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize; + let old_event_view = old_events[idx].as_view().unwrap(); + debug!( + "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num + ); - // metric_events_drop.increment(); + metric_events_drop.increment(); - // match old_event_view { - // EventView::Fill { .. } => { - // fill_update_sender - // .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { - // slot, - // event: old_events[idx], - // write_version, - // status: FillUpdateStatus::Revoke, - // market: mkt_pk_string.clone(), - // queue: evq_pk_string.clone(), - // })) - // .unwrap(); // TODO: use anyhow to bubble up error - // } - // EventView::Out { .. } => { continue } - // } - // } + match old_event_view { + EventView::Fill { .. } => { + fill_update_sender + .try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate { + slot, + event: old_events[idx], + write_version, + status: FillUpdateStatus::Revoke, + market: mkt_pk_string.clone(), + queue: evq_pk_string.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + EventView::Out { .. } => continue, + } + } fill_update_sender .try_send(FillEventFilterMessage::SerumCheckpoint( @@ -474,7 +474,7 @@ pub async fn init( let account_write_queue_receiver_c = account_write_queue_receiver.clone(); - let mut chain_cache = ChainData::new(); + let mut chain_cache = ChainData::new(metrics_sender); let mut perp_events_cache: HashMap = HashMap::new(); let mut serum_events_cache: HashMap> = HashMap::new(); let mut seq_num_cache = HashMap::new(); diff --git a/lib/src/grpc_plugin_source.rs b/lib/src/grpc_plugin_source.rs index 5aa432f..864981d 100644 --- a/lib/src/grpc_plugin_source.rs +++ b/lib/src/grpc_plugin_source.rs @@ -3,16 +3,17 @@ use geyser::geyser_client::GeyserClient; use jsonrpc_core::futures::StreamExt; use jsonrpc_core_client::transports::http; -use solana_account_decoder::UiAccountEncoding; +use solana_account_decoder::{UiAccount, UiAccountEncoding}; use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}; -use solana_client::rpc_response::{OptionalContext, Response, RpcKeyedAccount}; -use solana_rpc::{rpc::rpc_accounts::AccountsDataClient}; +use solana_client::rpc_response::{OptionalContext, RpcKeyedAccount}; +use solana_rpc::rpc::rpc_accounts::AccountsDataClient; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; use futures::{future, future::FutureExt}; use tonic::{ - metadata::MetadataValue, Request, - transport::{Channel, Certificate, Identity, ClientTlsConfig}, + metadata::MetadataValue, + transport::{Certificate, Channel, ClientTlsConfig, Identity}, + Request, }; use log::*; @@ -42,16 +43,18 @@ use crate::{ //use solana_geyser_connector_plugin_grpc::compression::zstd_decompress; -type SnapshotData = Response>; - +struct SnapshotData { + slot: u64, + accounts: Vec<(String, Option)>, +} enum Message { GrpcUpdate(geyser::SubscribeUpdate), Snapshot(SnapshotData), } -async fn get_snapshot( +async fn get_snapshot_gpa( rpc_http_url: String, - program_id: Pubkey, + program_id: String, ) -> anyhow::Result>> { let rpc_client = http::connect_with_options::(&rpc_http_url, true) .await @@ -71,16 +74,37 @@ async fn get_snapshot( info!("requesting snapshot {}", program_id); let account_snapshot = rpc_client - .get_program_accounts( - program_id.to_string(), - Some(program_accounts_config.clone()), - ) + .get_program_accounts(program_id.clone(), Some(program_accounts_config.clone())) .await .map_err_anyhow()?; info!("snapshot received {}", program_id); Ok(account_snapshot) } +async fn get_snapshot_gma( + rpc_http_url: String, + ids: Vec, +) -> anyhow::Result>>> { + let rpc_client = http::connect_with_options::(&rpc_http_url, true) + .await + .map_err_anyhow()?; + + let account_info_config = RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + commitment: Some(CommitmentConfig::finalized()), + data_slice: None, + min_context_slot: None, + }; + + info!("requesting snapshot {:?}", ids); + let account_snapshot = rpc_client + .get_multiple_accounts(ids.clone(), Some(account_info_config)) + .await + .map_err_anyhow()?; + info!("snapshot received {:?}", ids); + Ok(account_snapshot) +} + async fn feed_data_geyser( grpc_config: &GrpcSourceConfig, tls_config: Option, @@ -107,25 +131,27 @@ async fn feed_data_geyser( } .connect() .await?; - let token: MetadataValue<_> = "dbbf36253d0b2e6a85618a4ef2fa".parse()?; + let token: MetadataValue<_> = "eed31807f710e4bb098779fb9f67".parse()?; let mut client = GeyserClient::with_interceptor(channel, move |mut req: Request<()>| { req.metadata_mut().insert("x-token", token.clone()); Ok(req) }); + // If account_ids are provided, snapshot will be gMA. If only program_ids, then only the first id will be snapshot + // TODO: handle this better + if filter_config.program_ids.len() > 1 { + warn!("only one program id is supported for gPA snapshots") + } let mut accounts = HashMap::new(); accounts.insert( "client".to_owned(), SubscribeRequestFilterAccounts { - account: Vec::new(), + account: filter_config.account_ids.clone(), owner: filter_config.program_ids.clone(), }, ); let mut slots = HashMap::new(); - slots.insert( - "client".to_owned(), - SubscribeRequestFilterSlots {}, - ); + slots.insert("client".to_owned(), SubscribeRequestFilterSlots {}); let blocks = HashMap::new(); let transactions = HashMap::new(); @@ -171,7 +197,8 @@ async fn feed_data_geyser( // which will have "finalized" commitment. let mut rooted_to_finalized_slots = 30; - let mut snapshot_future = future::Fuse::terminated(); + let mut snapshot_gma = future::Fuse::terminated(); + let mut snapshot_gpa = future::Fuse::terminated(); // The plugin sends a ping every 5s or so let fatal_idle_timeout = Duration::from_secs(60); @@ -214,10 +241,13 @@ async fn feed_data_geyser( // drop data for slots that are well beyond rooted slot_pubkey_writes.retain(|&k, _| k >= max_rooted_slot - max_out_of_order_slots); } + if snapshot_needed && max_rooted_slot - rooted_to_finalized_slots > first_full_slot { snapshot_needed = false; - for program_id in filter_config.program_ids.clone() { - snapshot_future = tokio::spawn(get_snapshot(rpc_http_url.clone(), Pubkey::from_str(&program_id).unwrap())).fuse(); + if filter_config.account_ids.len() > 0 { + snapshot_gma = tokio::spawn(get_snapshot_gma(rpc_http_url.clone(), filter_config.account_ids.clone())).fuse(); + } else if filter_config.program_ids.len() > 0 { + snapshot_gpa = tokio::spawn(get_snapshot_gpa(rpc_http_url.clone(), filter_config.program_ids[0].clone())).fuse(); } } } @@ -243,7 +273,7 @@ async fn feed_data_geyser( continue; }, }; - + let pubkey_bytes = Pubkey::new(&write.pubkey).to_bytes(); let write_version_mapping = pubkey_writes.entry(pubkey_bytes).or_insert(WriteVersion { global: write.write_version, @@ -266,13 +296,43 @@ async fn feed_data_geyser( } sender.send(Message::GrpcUpdate(update)).await.expect("send success"); }, - snapshot = &mut snapshot_future => { + snapshot = &mut snapshot_gma => { + let snapshot = snapshot??; + info!("snapshot is for slot {}, first full slot was {}", snapshot.context.slot, first_full_slot); + if snapshot.context.slot >= first_full_slot { + let accounts: Vec<(String, Option)> = filter_config.account_ids.iter().zip(snapshot.value).map(|x| (x.0.clone(), x.1)).collect(); + sender + .send(Message::Snapshot(SnapshotData { + accounts, + slot: snapshot.context.slot, + })) + .await + .expect("send success"); + } else { + info!( + "snapshot is too old: has slot {}, expected {} minimum", + snapshot.context.slot, + first_full_slot + ); + // try again in another 10 slots + snapshot_needed = true; + rooted_to_finalized_slots += 10; + } + }, + snapshot = &mut snapshot_gpa => { let snapshot = snapshot??; if let OptionalContext::Context(snapshot_data) = snapshot { info!("snapshot is for slot {}, first full slot was {}", snapshot_data.context.slot, first_full_slot); if snapshot_data.context.slot >= first_full_slot { + let accounts: Vec<(String, Option)> = snapshot_data.value.iter().map(|x| { + let deref = x.clone(); + (deref.pubkey, Some(deref.account)) + }).collect(); sender - .send(Message::Snapshot(snapshot_data)) + .send(Message::Snapshot(SnapshotData { + accounts, + slot: snapshot_data.context.slot, + })) .await .expect("send success"); } else { @@ -411,7 +471,7 @@ pub async fn process_events( loop { metric_dedup_queue.set(msg_receiver.len() as u64); let msg = msg_receiver.recv().await.expect("sender must not close"); - use geyser::{subscribe_update::UpdateOneof}; + use geyser::subscribe_update::UpdateOneof; match msg { Message::GrpcUpdate(update) => { match update.update_oneof.expect("invalid grpc") { @@ -421,7 +481,7 @@ pub async fn process_events( None => { // TODO: handle error continue; - }, + } }; assert!(update.pubkey.len() == 32); assert!(update.owner.len() == 32); @@ -460,11 +520,12 @@ pub async fn process_events( metric_slot_updates.increment(); metric_slot_queue.set(slot_queue_sender.len() as u64); - let status = SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v { - SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed, - SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed, - SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted, - }); + let status = + SubscribeUpdateSlotStatus::from_i32(update.status).map(|v| match v { + SubscribeUpdateSlotStatus::Processed => SlotStatus::Processed, + SubscribeUpdateSlotStatus::Confirmed => SlotStatus::Confirmed, + SubscribeUpdateSlotStatus::Finalized => SlotStatus::Rooted, + }); if status.is_none() { error!("unexpected slot status: {}", update.status); continue; @@ -480,24 +541,29 @@ pub async fn process_events( .await .expect("send success"); } - UpdateOneof::Block(_) => {}, - UpdateOneof::Transaction(_) => {}, + UpdateOneof::Block(_) => {} + UpdateOneof::Transaction(_) => {} } } Message::Snapshot(update) => { metric_snapshots.increment(); info!("processing snapshot..."); - for keyed_account in update.value { + for account in update.accounts.iter() { metric_snapshot_account_writes.increment(); metric_account_queue.set(account_write_queue_sender.len() as u64); - // TODO: Resnapshot on invalid data? - let account: Account = keyed_account.account.decode().unwrap(); - let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap(); - account_write_queue_sender - .send(AccountWrite::from(pubkey, update.context.slot, 0, account)) - .await - .expect("send success"); + match account { + (key, Some(ui_account)) => { + // TODO: Resnapshot on invalid data? + let pubkey = Pubkey::from_str(key).unwrap(); + let account: Account = ui_account.decode().unwrap(); + account_write_queue_sender + .send(AccountWrite::from(pubkey, update.slot, 0, account)) + .await + .expect("send success"); + } + (key, None) => warn!("account not found {}", key), + } } info!("processing snapshot done"); } diff --git a/lib/src/lib.rs b/lib/src/lib.rs index ed77518..df30ca3 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -122,6 +122,7 @@ pub struct SourceConfig { #[derive(Clone, Debug, Deserialize)] pub struct FilterConfig { pub program_ids: Vec, + pub account_ids: Vec, } #[derive(Clone, Debug)] diff --git a/lib/src/orderbook_filter.rs b/lib/src/orderbook_filter.rs index 887d248..70174c7 100644 --- a/lib/src/orderbook_filter.rs +++ b/lib/src/orderbook_filter.rs @@ -126,18 +126,12 @@ pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, base_lot_size: i64) res } -pub fn price_lots_to_ui( - native: i64, - base_decimals: u8, - quote_decimals: u8, -) -> f64 { +pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 { let decimals = base_decimals - quote_decimals; // let res = native as f64 // * ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64) // as f64; - let res = native as f64 - / (10u64.pow(decimals.into())) - as f64; + let res = native as f64 / (10u64.pow(decimals.into())) as f64; res } @@ -158,7 +152,6 @@ pub fn price_lots_to_ui_perp( res } - fn publish_changes( slot: u64, write_version: u64, @@ -172,6 +165,13 @@ fn publish_changes( ) { let mut update: Vec = vec![]; // push diff for levels that are no longer present + if current_bookside.len() != previous_bookside.len() { + info!( + "L {}", + current_bookside.len() as i64 - previous_bookside.len() as i64 + ) + } + for previous_order in previous_bookside.iter() { let peer = current_bookside .iter() @@ -179,7 +179,7 @@ fn publish_changes( match peer { None => { - info!("removed level {}", previous_order[0]); + info!("R {} {}", previous_order[0], previous_order[1]); update.push([previous_order[0], 0f64]); } _ => continue, @@ -197,11 +197,14 @@ fn publish_changes( if previous_order[1] == current_order[1] { continue; } - info!("size changed {} -> {}", previous_order[1], current_order[1]); + info!( + "C {} {} -> {}", + current_order[0], previous_order[1], current_order[1] + ); update.push(current_order.clone()); } None => { - info!("new level {},{}", current_order[0], current_order[1]); + info!("A {} {}", current_order[0], current_order[1]); update.push(current_order.clone()) } } @@ -242,88 +245,6 @@ fn publish_changes( metric_updates.increment(); } -fn publish_changes_serum( - slot: u64, - write_version: u64, - mkt: &(Pubkey, MarketConfig), - side: OrderbookSide, - current_bookside: &Vec, - previous_bookside: &Vec, - maybe_other_bookside: Option<&Vec>, - orderbook_update_sender: &async_channel::Sender, - metric_updates: &mut MetricU64, -) { - let mut update: Vec = vec![]; - - // push diff for levels that are no longer present - for previous_order in previous_bookside.iter() { - let peer = current_bookside - .iter() - .find(|level| previous_order[0] == level[0]); - - match peer { - None => { - info!("removed level s {}", previous_order[0]); - update.push([previous_order[0], 0f64]); - } - _ => continue, - } - } - - // push diff where there's a new level or size has changed - for current_order in current_bookside { - let peer = previous_bookside - .iter() - .find(|item| item[0] == current_order[0]); - - match peer { - Some(previous_order) => { - if previous_order[1] == current_order[1] { - continue; - } - info!("size changed {} -> {}", previous_order[1], current_order[1]); - update.push(current_order.clone()); - } - None => { - info!("new level {},{}", current_order[0], current_order[1]); - update.push(current_order.clone()) - } - } - } - - match maybe_other_bookside { - Some(other_bookside) => { - let (bids, asks) = match side { - OrderbookSide::Bid => (current_bookside, other_bookside), - OrderbookSide::Ask => (other_bookside, current_bookside), - }; - orderbook_update_sender - .try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint { - slot, - write_version, - bids: bids.clone(), - asks: asks.clone(), - market: mkt.0.to_string(), - })) - .unwrap() - } - None => info!("other bookside not in cache"), - } - - if update.len() > 0 { - orderbook_update_sender - .try_send(OrderbookFilterMessage::Update(OrderbookUpdate { - market: mkt.0.to_string(), - side: side.clone(), - update, - slot, - write_version, - })) - .unwrap(); // TODO: use anyhow to bubble up error - metric_updates.increment(); - } -} - pub async fn init( market_configs: Vec<(Pubkey, MarketConfig)>, serum_market_configs: Vec<(Pubkey, MarketConfig)>, @@ -352,7 +273,7 @@ pub async fn init( let account_write_queue_receiver_c = account_write_queue_receiver.clone(); - let mut chain_cache = ChainData::new(); + let mut chain_cache = ChainData::new(metrics_sender); let mut bookside_cache: HashMap> = HashMap::new(); let mut serum_bookside_cache: HashMap> = HashMap::new(); let mut last_write_versions = HashMap::::new(); @@ -412,7 +333,7 @@ pub async fn init( let write_version = (account_info.slot, account_info.write_version); // todo: should this be <= so we don't overwrite with old data received late? - if write_version == *last_write_version { + if write_version <= *last_write_version { continue; } last_write_versions.insert(side_pk_string.clone(), write_version); @@ -492,11 +413,11 @@ pub async fn init( let write_version = (account_info.slot, account_info.write_version); // todo: should this be <= so we don't overwrite with old data received late? - if write_version == *last_write_version { + if write_version <= *last_write_version { continue; } last_write_versions.insert(side_pk_string.clone(), write_version); - + info!("W {}", mkt.1.name); let account = &mut account_info.account.clone(); let data = account.data_as_mut_slice(); let len = data.len(); @@ -532,7 +453,7 @@ pub async fn init( serum_bookside_cache.get(&other_side_pk.to_string()); match serum_bookside_cache.get(&side_pk_string) { - Some(old_bookside) => publish_changes_serum( + Some(old_bookside) => publish_changes( account_info.slot, account_info.write_version, mkt, diff --git a/lib/src/websocket_source.rs b/lib/src/websocket_source.rs index beb8d5b..765f21f 100644 --- a/lib/src/websocket_source.rs +++ b/lib/src/websocket_source.rs @@ -6,9 +6,7 @@ use solana_client::{ rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, rpc_response::{OptionalContext, Response, RpcKeyedAccount}, }; -use solana_rpc::{ - rpc::rpc_accounts::AccountsDataClient, rpc_pubsub::RpcSolPubSubClient, -}; +use solana_rpc::{rpc::rpc_accounts::AccountsDataClient, rpc_pubsub::RpcSolPubSubClient}; use solana_sdk::{account::Account, commitment_config::CommitmentConfig, pubkey::Pubkey}; use log::*; diff --git a/service-mango-fills/Cargo.toml b/service-mango-fills/Cargo.toml index 2e94ff9..24fc4a0 100644 --- a/service-mango-fills/Cargo.toml +++ b/service-mango-fills/Cargo.toml @@ -23,6 +23,7 @@ async-trait = "0.1" tokio = { version = "1", features = ["full"] } tokio-tungstenite = "0.17" bytemuck = "1.7.2" +jemallocator = "0.3.2" mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" } diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index a41d288..f4c7f08 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -43,6 +43,11 @@ type CheckpointMap = Arc>>; type SerumCheckpointMap = Arc>>; type PeerMap = Arc>>; +// jemalloc seems to be better at keeping the memory footprint reasonable over +// longer periods of time +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + #[derive(Clone, Debug, Deserialize)] #[serde(tag = "command")] pub enum Command { @@ -127,27 +132,25 @@ async fn handle_connection( ); } - let receive_commands = ws_rx.try_for_each(|msg| { - match msg { - Message::Text(_) => { - handle_commands( - addr, - msg, - peer_map.clone(), - checkpoint_map.clone(), - serum_checkpoint_map.clone(), - market_ids.clone(), - ) - }, - Message::Ping(_) => { - let peers = peer_map.clone(); - let mut peers_lock = peers.lock().unwrap(); - let peer = peers_lock.get_mut(&addr).expect("peer should be in map"); - peer.sender.unbounded_send(Message::Pong(Vec::new())).unwrap(); - future::ready(Ok(())) - } - _ => future::ready(Ok(())), + let receive_commands = ws_rx.try_for_each(|msg| match msg { + Message::Text(_) => handle_commands( + addr, + msg, + peer_map.clone(), + checkpoint_map.clone(), + serum_checkpoint_map.clone(), + market_ids.clone(), + ), + Message::Ping(_) => { + let peers = peer_map.clone(); + let mut peers_lock = peers.lock().unwrap(); + let peer = peers_lock.get_mut(&addr).expect("peer should be in map"); + peer.sender + .unbounded_send(Message::Pong(Vec::new())) + .unwrap(); + future::ready(Ok(())) } + _ => future::ready(Ok(())), }); let forward_updates = chan_rx.map(Ok).forward(ws_tx); @@ -454,40 +457,40 @@ async fn main() -> anyhow::Result<()> { let try_socket = TcpListener::bind(&config.bind_ws_addr).await; let listener = try_socket.expect("Failed to bind"); { - tokio::spawn(async move { - // Let's spawn the handling of each connection in a separate task. - while let Ok((stream, addr)) = listener.accept().await { - tokio::spawn(handle_connection_error( - checkpoints.clone(), - serum_checkpoints.clone(), - peers.clone(), - market_pubkey_strings.clone(), - stream, - addr, - metrics_opened_connections.clone(), - metrics_closed_connections.clone(), - )); - } - }); + tokio::spawn(async move { + // Let's spawn the handling of each connection in a separate task. + while let Ok((stream, addr)) = listener.accept().await { + tokio::spawn(handle_connection_error( + checkpoints.clone(), + serum_checkpoints.clone(), + peers.clone(), + market_pubkey_strings.clone(), + stream, + addr, + metrics_opened_connections.clone(), + metrics_closed_connections.clone(), + )); + } + }); } // keepalive { - tokio::spawn(async move { - let mut write_interval = time::interval(time::Duration::from_secs(30)); + tokio::spawn(async move { + let mut write_interval = time::interval(time::Duration::from_secs(30)); - loop { - write_interval.tick().await; - let peers_copy = peers_ref_thread1.lock().unwrap().clone(); - for (addr, peer) in peers_copy.iter() { - let pl = Vec::new(); - let result = peer.clone().sender.send(Message::Ping(pl)).await; - if result.is_err() { - error!("ws ping could not reach {}", addr); + loop { + write_interval.tick().await; + let peers_copy = peers_ref_thread1.lock().unwrap().clone(); + for (addr, peer) in peers_copy.iter() { + let pl = Vec::new(); + let result = peer.clone().sender.send(Message::Ping(pl)).await; + if result.is_err() { + error!("ws ping could not reach {}", addr); + } } } - } - }); + }); } info!( "rpc connect: {}", @@ -499,11 +502,11 @@ async fn main() -> anyhow::Result<()> { .collect::() ); let use_geyser = true; + let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat(); + let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect(); let filter_config = FilterConfig { - program_ids: vec![ - "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(), - "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(), - ], + program_ids: vec![], + account_ids: relevant_pubkeys, }; if use_geyser { grpc_plugin_source::process_events( diff --git a/service-mango-orderbook/src/main.rs b/service-mango-orderbook/src/main.rs index 838da03..ee5b280 100644 --- a/service-mango-orderbook/src/main.rs +++ b/service-mango-orderbook/src/main.rs @@ -285,19 +285,23 @@ async fn main() -> anyhow::Result<()> { &Keypair::new(), Some(rpc_timeout), ); - let group_context = Arc::new(MangoGroupContext::new_from_rpc( - &client.rpc_async(), - Pubkey::from_str(&config.mango_group).unwrap(), - ).await?); + let group_context = Arc::new( + MangoGroupContext::new_from_rpc( + &client.rpc_async(), + Pubkey::from_str(&config.mango_group).unwrap(), + ) + .await?, + ); // todo: reload markets at intervals let market_configs: Vec<(Pubkey, MarketConfig)> = group_context .perp_markets .iter() .map(|(_, context)| { - let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index) { + let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index) + { Some(token) => token.decimals, - None => panic!("token not found for market") // todo: default to 6 for usdc? + None => panic!("token not found for market"), // todo: default to 6 for usdc? }; ( context.address, @@ -320,11 +324,11 @@ async fn main() -> anyhow::Result<()> { .map(|(_, context)| { let base_decimals = match group_context.tokens.get(&context.market.base_token_index) { Some(token) => token.decimals, - None => panic!("token not found for market") // todo: default? + None => panic!("token not found for market"), // todo: default? }; let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) { Some(token) => token.decimals, - None => panic!("token not found for market") // todo: default to 6 for usdc? + None => panic!("token not found for market"), // todo: default to 6 for usdc? }; ( context.market.serum_market_external, @@ -341,13 +345,14 @@ async fn main() -> anyhow::Result<()> { }) .collect(); - let market_pubkey_strings: HashMap = [market_configs.clone(), serum_market_configs.clone()] - .concat() - .iter() - .map(|market| (market.0.to_string(), market.1.name.clone())) - .collect::>() - .into_iter() - .collect(); + let market_pubkey_strings: HashMap = + [market_configs.clone(), serum_market_configs.clone()] + .concat() + .iter() + .map(|market| (market.0.to_string(), market.1.name.clone())) + .collect::>() + .into_iter() + .collect(); let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) = orderbook_filter::init(market_configs, serum_market_configs, metrics_tx.clone()).await?; @@ -422,6 +427,7 @@ async fn main() -> anyhow::Result<()> { "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(), "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(), ], + account_ids: vec![], }; grpc_plugin_source::process_events( &config.source, diff --git a/service-mango-pnl/src/main.rs b/service-mango-pnl/src/main.rs index 1e36db6..b6a68e4 100644 --- a/service-mango-pnl/src/main.rs +++ b/service-mango-pnl/src/main.rs @@ -119,7 +119,9 @@ fn start_pnl_updater( } let pnl_vals = - compute_pnl(context.clone(), account_fetcher.clone(), &mango_account).await.unwrap(); + compute_pnl(context.clone(), account_fetcher.clone(), &mango_account) + .await + .unwrap(); // Alternatively, we could prepare the sorted and limited lists for each // market here. That would be faster and cause less contention on the pnl_data @@ -247,10 +249,13 @@ async fn main() -> anyhow::Result<()> { &Keypair::new(), Some(rpc_timeout), ); - let group_context = Arc::new(MangoGroupContext::new_from_rpc( - &client.rpc_async(), - Pubkey::from_str(&config.pnl.mango_group).unwrap(), - ).await?); + let group_context = Arc::new( + MangoGroupContext::new_from_rpc( + &client.rpc_async(), + Pubkey::from_str(&config.pnl.mango_group).unwrap(), + ) + .await?, + ); let chain_data = Arc::new(RwLock::new(chain_data::ChainData::new())); let account_fetcher = Arc::new(chain_data::AccountFetcher { chain_data: chain_data.clone(), @@ -265,7 +270,7 @@ async fn main() -> anyhow::Result<()> { metrics_tx.register_u64("pnl_jsonrpc_reqs_invalid_total".into(), MetricType::Counter); let metrics_pnls_tracked = metrics_tx.register_u64("pnl_num_tracked".into(), MetricType::Gauge); - let chain_data = Arc::new(RwLock::new(ChainData::new())); + let chain_data = Arc::new(RwLock::new(ChainData::new(metrics_tx.clone()))); let pnl_data = Arc::new(RwLock::new(PnlData::new())); start_pnl_updater( @@ -288,9 +293,8 @@ async fn main() -> anyhow::Result<()> { // start filling chain_data from the grpc plugin source let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?; let filter_config = FilterConfig { - program_ids: vec![ - "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(), - ], + program_ids: vec!["4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into()], + account_ids: vec![], }; grpc_plugin_source::process_events( &config.source,