diff --git a/lib/src/chain_data.rs b/lib/src/chain_data.rs index f4a3888..901b28b 100644 --- a/lib/src/chain_data.rs +++ b/lib/src/chain_data.rs @@ -55,18 +55,14 @@ impl ChainData { newest_processed_slot: 0, account_versions_stored: 0, account_bytes_stored: 0, - metric_accounts_stored: metrics_sender.register_u64( - "chaindata_accounts_stored".into(), - MetricType::Gauge, - ), + metric_accounts_stored: metrics_sender + .register_u64("chaindata_accounts_stored".into(), MetricType::Gauge), metric_account_versions_stored: metrics_sender.register_u64( "chaindata_account_versions_stored".into(), MetricType::Gauge, ), - metric_account_bytes_stored: metrics_sender.register_u64( - "chaindata_account_bytes_stored".into(), - MetricType::Gauge, - ), + metric_account_bytes_stored: metrics_sender + .register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge), } } diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs index 6d5c9bb..696fc4b 100644 --- a/lib/src/fill_event_filter.rs +++ b/lib/src/fill_event_filter.rs @@ -5,7 +5,7 @@ use crate::{ AccountWrite, SlotUpdate, }; use bytemuck::{cast_slice, Pod, Zeroable}; -use chrono::{Utc, TimeZone}; +use chrono::{TimeZone, Utc}; use log::*; use serde::{ser::SerializeStruct, Serialize, Serializer}; use serum_dex::state::EventView as SpotEvent; @@ -93,7 +93,12 @@ impl Serialize for FillEvent { state.serialize_field("eventType", &self.event_type)?; state.serialize_field("maker", &self.maker)?; state.serialize_field("side", &self.side)?; - state.serialize_field("timestamp", &Utc.timestamp_opt(self.timestamp as i64, 0).unwrap().to_rfc3339())?; + state.serialize_field( + "timestamp", + &Utc.timestamp_opt(self.timestamp as i64, 0) + .unwrap() + .to_rfc3339(), + )?; state.serialize_field("seqNum", &self.seq_num)?; state.serialize_field("owner", &self.owner)?; state.serialize_field("orderId", &self.order_id)?; @@ -191,7 +196,7 @@ impl FillEvent { } else { native_qty_paid - native_fee_or_rebate }; - + let top = price_before_fees * base_multiplier; let bottom = quote_multiplier * native_qty_received; let price = top as f64 / bottom as f64; @@ -213,8 +218,7 @@ impl FillEvent { } }; - let fee = - native_fee_or_rebate as f32 / quote_multiplier as f32; + let fee = native_fee_or_rebate as f32 / quote_multiplier as f32; FillEvent { event_type: FillEventType::Spot, diff --git a/lib/src/fill_event_postgres_target.rs b/lib/src/fill_event_postgres_target.rs index 3af9163..ca351df 100644 --- a/lib/src/fill_event_postgres_target.rs +++ b/lib/src/fill_event_postgres_target.rs @@ -18,7 +18,7 @@ async fn postgres_connection( // openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks // base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64 // fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills - // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills + // fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills info!("making tls config"); let tls = match &config.tls { Some(tls) => { diff --git a/lib/src/lib.rs b/lib/src/lib.rs index f978ac0..ae9b197 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -5,7 +5,6 @@ pub mod grpc_plugin_source; pub mod memory_target; pub mod metrics; pub mod orderbook_filter; -pub mod postgres_target; pub mod postgres_types_numeric; pub mod websocket_source; diff --git a/lib/src/orderbook_filter.rs b/lib/src/orderbook_filter.rs index e2856fd..f9d5a86 100644 --- a/lib/src/orderbook_filter.rs +++ b/lib/src/orderbook_filter.rs @@ -1,10 +1,16 @@ +use crate::metrics::MetricU64; use crate::{ chain_data::{AccountData, ChainData, SlotData}, metrics::{MetricType, Metrics}, AccountWrite, SlotUpdate, }; +use anchor_lang::AccountDeserialize; use itertools::Itertools; use log::*; +use mango_v4::{ + serum3_cpi::OrderBookStateHeader, + state::{BookSide, OrderTreeType}, +}; use serde::{ser::SerializeStruct, Serialize, Serializer}; use serum_dex::critbit::Slab; use solana_sdk::{ @@ -18,12 +24,6 @@ use std::{ mem::size_of, time::{SystemTime, UNIX_EPOCH}, }; -use crate::metrics::MetricU64; -use anchor_lang::AccountDeserialize; -use mango_v4::{ - serum3_cpi::OrderBookStateHeader, - state::{BookSide, OrderTreeType}, -}; #[derive(Clone, Debug)] pub enum OrderbookSide { @@ -126,9 +126,15 @@ pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f native as f64 / (10u64.pow(decimals.into())) as f64 } -pub fn spot_price_to_ui(native: i64, native_size: i64, base_decimals: u8, quote_decimals: u8) -> f64 { +pub fn spot_price_to_ui( + native: i64, + native_size: i64, + base_decimals: u8, + quote_decimals: u8, +) -> f64 { // TODO: account for fees - ((native * 10i64.pow(base_decimals.into())) / (10i64.pow(quote_decimals.into()) * native_size)) as f64 + ((native * 10i64.pow(base_decimals.into())) / (10i64.pow(quote_decimals.into()) * native_size)) + as f64 } pub fn price_lots_to_ui_perp( @@ -140,8 +146,7 @@ pub fn price_lots_to_ui_perp( ) -> f64 { let decimals = base_decimals - quote_decimals; let multiplier = 10u64.pow(decimals.into()) as f64; - native as f64 - * ((multiplier * quote_lot_size as f64) / base_lot_size as f64) + native as f64 * ((multiplier * quote_lot_size as f64) / base_lot_size as f64) } fn publish_changes( @@ -331,7 +336,8 @@ pub async fn init( debug!("evq version slot was old"); continue; } - if write_version.0 == last_write_version.0 && write_version.1 < last_write_version.1 + if write_version.0 == last_write_version.0 + && write_version.1 < last_write_version.1 { debug!("evq version slot was same and write version was old"); continue; diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index 049b566..4b41053 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -28,8 +28,11 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use serde::Deserialize; use solana_geyser_connector_lib::{ + fill_event_filter::FillEventType, + fill_event_postgres_target, metrics::{MetricType, MetricU64}, - FilterConfig, StatusResponse, fill_event_postgres_target, PostgresConfig, fill_event_filter::FillEventType, orderbook_filter::MarketConfig, PostgresTlsConfig, + orderbook_filter::MarketConfig, + FilterConfig, PostgresConfig, PostgresTlsConfig, StatusResponse, }; use solana_geyser_connector_lib::{ fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage}, @@ -366,7 +369,10 @@ async fn main() -> anyhow::Result<()> { .map(|(_, context)| (context.address, context.market.event_queue)) .collect(); - let spot_queue_pks: Vec<(Pubkey, Pubkey)> = spot_market_configs.iter().map(|x| (x.0, x.1.event_queue)).collect(); + let spot_queue_pks: Vec<(Pubkey, Pubkey)> = spot_market_configs + .iter() + .map(|x| (x.0, x.1.event_queue)) + .collect(); let a: Vec<(String, String)> = group_context .serum3_markets .iter() @@ -403,9 +409,10 @@ async fn main() -> anyhow::Result<()> { tls: Some(PostgresTlsConfig { ca_cert_path: "$PG_CA_CERT".to_owned(), client_key_path: "$PG_CLIENT_KEY".to_owned(), - }) + }), }; - let postgres_update_sender = fill_event_postgres_target::init(&pgconf, metrics_tx.clone()).await?; + let postgres_update_sender = + fill_event_postgres_target::init(&pgconf, metrics_tx.clone()).await?; let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init( perp_market_configs.clone(), @@ -428,7 +435,10 @@ async fn main() -> anyhow::Result<()> { let message = fill_receiver.recv().await.unwrap(); match message { FillEventFilterMessage::Update(update) => { - debug!("ws update {} {:?} {:?} fill", update.market_name, update.status, update.event.event_type); + debug!( + "ws update {} {:?} {:?} fill", + update.market_name, update.status, update.event.event_type + ); let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); for (addr, peer) in peer_copy.iter_mut() { let json = serde_json::to_string(&update.clone()).unwrap(); @@ -437,7 +447,10 @@ async fn main() -> anyhow::Result<()> { if peer.subscriptions.contains(&update.market_key) { let result = peer.sender.send(Message::Text(json)).await; if result.is_err() { - error!("ws update {} fill could not reach {}", update.market_name, addr); + error!( + "ws update {} fill could not reach {}", + update.market_name, addr + ); } } } @@ -450,7 +463,7 @@ async fn main() -> anyhow::Result<()> { postgres_update_sender.send(update_c).await.unwrap(); } } - _ => warn!("failed to write spot event to db") + _ => warn!("failed to write spot event to db"), } } FillEventFilterMessage::Checkpoint(checkpoint) => { diff --git a/service-mango-orderbook/src/main.rs b/service-mango-orderbook/src/main.rs index 4182a31..18f8aff 100644 --- a/service-mango-orderbook/src/main.rs +++ b/service-mango-orderbook/src/main.rs @@ -357,7 +357,12 @@ async fn main() -> anyhow::Result<()> { .collect(); let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) = - orderbook_filter::init(market_configs.clone(), serum_market_configs.clone(), metrics_tx.clone()).await?; + orderbook_filter::init( + market_configs.clone(), + serum_market_configs.clone(), + metrics_tx.clone(), + ) + .await?; let checkpoints_ref_thread = checkpoints.clone(); let peers_ref_thread = peers.clone();