Compare commits

...

5 Commits

Author SHA1 Message Date
Riordan Panayides 4b75b88a98 cargo fmt 2023-02-03 11:49:22 +00:00
Riordan Panayides ba4aa29f7f Update deps 2023-02-03 11:45:29 +00:00
Riordan Panayides 746174ce8e Orderbook updates
* Subscribe to accounts rather than program
* Process out of order writes correctly
* Update native to ui helpers
2023-02-03 11:44:46 +00:00
Riordan Panayides 5c8ebc53b1 Fills updates
* Unify fill event schema
* Change fill updates json
* Convert all native values to ui
* Add fills postgres target
2023-02-03 11:39:53 +00:00
Riordan Panayides 6dfe88ac15 Add deployment configs 2023-02-03 11:31:18 +00:00
12 changed files with 760 additions and 947 deletions

61
Cargo.lock generated
View File

@ -604,9 +604,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "base64ct"
@ -2393,7 +2393,7 @@ checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
dependencies = [
"http",
"hyper 0.14.23",
"rustls 0.20.7",
"rustls 0.20.8",
"tokio",
"tokio-rustls 0.23.4",
]
@ -3033,7 +3033,7 @@ dependencies = [
[[package]]
name = "mango-v4"
version = "0.1.0"
source = "git+https://github.com/blockworks-foundation/mango-v4?branch=dev#5019864b844ca6633ee7d1356c763d78fc52ad64"
source = "git+https://github.com/blockworks-foundation/mango-v4?branch=dev#0ba7ecd5061e666addd90e6d3a4293ec1381f9c0"
dependencies = [
"anchor-lang",
"anchor-spl",
@ -3885,6 +3885,7 @@ checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1"
dependencies = [
"array-init",
"bytes 1.3.0",
"chrono",
"fallible-iterator",
"postgres-derive",
"postgres-protocol",
@ -4215,7 +4216,7 @@ dependencies = [
"fxhash",
"quinn-proto",
"quinn-udp",
"rustls 0.20.7",
"rustls 0.20.8",
"thiserror",
"tokio",
"tracing",
@ -4232,7 +4233,7 @@ dependencies = [
"fxhash",
"rand 0.8.5",
"ring",
"rustls 0.20.7",
"rustls 0.20.8",
"rustls-native-certs",
"rustls-pemfile 0.2.1",
"slab",
@ -4605,8 +4606,8 @@ dependencies = [
"once_cell",
"percent-encoding 2.2.0",
"pin-project-lite",
"rustls 0.20.7",
"rustls-pemfile 1.0.1",
"rustls 0.20.8",
"rustls-pemfile 1.0.2",
"serde",
"serde_json",
"serde_urlencoded",
@ -4776,9 +4777,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.20.7"
version = "0.20.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
dependencies = [
"log 0.4.17",
"ring",
@ -4793,7 +4794,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
dependencies = [
"openssl-probe",
"rustls-pemfile 1.0.1",
"rustls-pemfile 1.0.2",
"schannel",
"security-framework",
]
@ -4809,11 +4810,11 @@ dependencies = [
[[package]]
name = "rustls-pemfile"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
dependencies = [
"base64 0.13.1",
"base64 0.21.0",
]
[[package]]
@ -5610,7 +5611,7 @@ dependencies = [
"rand_chacha 0.2.2",
"rayon",
"reqwest",
"rustls 0.20.7",
"rustls 0.20.8",
"semver 1.0.14",
"serde",
"serde_derive",
@ -5762,10 +5763,11 @@ dependencies = [
"async-channel",
"async-stream 0.2.1",
"async-trait",
"base64 0.20.0",
"base64 0.21.0",
"bs58 0.4.0",
"bytemuck",
"bytes 1.3.0",
"chrono",
"fixed",
"futures 0.3.25",
"futures-core",
@ -5781,6 +5783,8 @@ dependencies = [
"postgres_query",
"prost 0.9.0",
"rand 0.7.3",
"rustls 0.20.8",
"rustls-pemfile 1.0.2",
"serde",
"serde_derive",
"serde_json",
@ -5791,6 +5795,7 @@ dependencies = [
"solana-sdk",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-stream",
"tonic 0.6.2",
"tonic-build 0.6.2",
@ -6413,7 +6418,7 @@ dependencies = [
"quinn",
"rand 0.7.3",
"rcgen",
"rustls 0.20.7",
"rustls 0.20.8",
"solana-metrics",
"solana-perf",
"solana-sdk",
@ -7098,6 +7103,20 @@ dependencies = [
"tokio-util 0.7.2",
]
[[package]]
name = "tokio-postgres-rustls"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "606f2b73660439474394432239c82249c0d45eb5f23d91f401be1e33590444a7"
dependencies = [
"futures 0.3.25",
"ring",
"rustls 0.20.8",
"tokio",
"tokio-postgres",
"tokio-rustls 0.23.4",
]
[[package]]
name = "tokio-reactor"
version = "0.1.12"
@ -7134,7 +7153,7 @@ version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls 0.20.7",
"rustls 0.20.8",
"tokio",
"webpki 0.22.0",
]
@ -7193,7 +7212,7 @@ checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
dependencies = [
"futures-util",
"log 0.4.17",
"rustls 0.20.7",
"rustls 0.20.8",
"tokio",
"tokio-rustls 0.23.4",
"tungstenite",
@ -7294,7 +7313,7 @@ dependencies = [
"pin-project",
"prost 0.11.3",
"prost-derive 0.11.2",
"rustls-pemfile 1.0.1",
"rustls-pemfile 1.0.2",
"tokio",
"tokio-rustls 0.23.4",
"tokio-stream",
@ -7456,7 +7475,7 @@ dependencies = [
"httparse",
"log 0.4.17",
"rand 0.8.5",
"rustls 0.20.7",
"rustls 0.20.8",
"sha-1 0.10.1",
"thiserror",
"url 2.3.1",

23
cd/fills.toml Normal file
View File

@ -0,0 +1,23 @@
app = "mango-fills"
kill_signal = "SIGINT"
kill_timeout = 5
[build]
dockerfile = "../Dockerfile"
[experimental]
cmd = ["service-mango-fills", "fills-config.toml"]
[[services]]
internal_port = 8080
processes = ["app"]
protocol = "tcp"
[services.concurrency]
hard_limit = 1024
soft_limit = 1024
type = "connections"
[metrics]
path = "/metrics"
port = 9091

23
cd/orderbook.toml Normal file
View File

@ -0,0 +1,23 @@
app = "mango-orderbook"
kill_signal = "SIGINT"
kill_timeout = 5
[build]
dockerfile = "../Dockerfile"
[experimental]
cmd = ["service-mango-orderbook", "orderbook-config.toml"]
[[services]]
internal_port = 8082
processes = ["app"]
protocol = "tcp"
[services.concurrency]
hard_limit = 1024
soft_limit = 1024
type = "connections"
[metrics]
path = "/metrics"
port = 9091

View File

@ -23,10 +23,13 @@ fixed = { version = "*", features = ["serde"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
tokio-postgres = "0.7"
postgres-types = { version = "0.2", features = ["array-impls", "derive"] }
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
tokio-postgres-rustls = "0.9.0"
postgres-types = { version = "0.2", features = ["array-impls", "derive", "with-chrono-0_4"] }
postgres-native-tls = "0.5"
native-tls = "0.2"
rustls = "0.20.8"
rustls-pemfile = "1.0.2"
# postgres_query hasn't updated its crate in a while
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
@ -45,6 +48,7 @@ rand = "0.7"
anyhow = "1.0"
bytes = "1.0"
itertools = "0.10.5"
chrono = "0.4.23"
futures = "0.3.17"
futures-core = "0.3"

View File

@ -55,18 +55,14 @@ impl ChainData {
newest_processed_slot: 0,
account_versions_stored: 0,
account_bytes_stored: 0,
metric_accounts_stored: metrics_sender.register_u64(
"chaindata_accounts_stored".into(),
MetricType::Gauge,
),
metric_accounts_stored: metrics_sender
.register_u64("chaindata_accounts_stored".into(), MetricType::Gauge),
metric_account_versions_stored: metrics_sender.register_u64(
"chaindata_account_versions_stored".into(),
MetricType::Gauge,
),
metric_account_bytes_stored: metrics_sender.register_u64(
"chaindata_account_bytes_stored".into(),
MetricType::Gauge,
),
metric_account_bytes_stored: metrics_sender
.register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge),
}
}

View File

@ -1,9 +1,11 @@
use crate::{
chain_data::{AccountData, ChainData, SlotData},
metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide,
orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide},
AccountWrite, SlotUpdate,
};
use bytemuck::{Pod, Zeroable, cast_slice};
use bytemuck::{cast_slice, Pod, Zeroable};
use chrono::{TimeZone, Utc};
use log::*;
use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::state::EventView as SpotEvent;
@ -15,7 +17,9 @@ use solana_sdk::{
use std::{
borrow::BorrowMut,
cmp::max,
collections::{HashMap, HashSet}, convert::identity, time::SystemTime,
collections::{HashMap, HashSet},
convert::identity,
time::SystemTime,
};
use crate::metrics::MetricU64;
@ -25,18 +29,46 @@ use mango_v4::state::{
MAX_NUM_EVENTS,
};
#[derive(Clone, Copy, Debug, Serialize)]
#[derive(Clone, Copy, Debug)]
pub enum FillUpdateStatus {
New,
Revoke,
}
#[derive(Clone, Copy, Debug, Serialize)]
impl Serialize for FillUpdateStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
FillUpdateStatus::New => {
serializer.serialize_unit_variant("FillUpdateStatus", 0, "new")
}
FillUpdateStatus::Revoke => {
serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke")
}
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum FillEventType {
Spot,
Perp,
}
impl Serialize for FillEventType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"),
FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"),
}
}
}
#[derive(Clone, Debug)]
pub struct FillEvent {
pub event_type: FillEventType,
@ -48,8 +80,8 @@ pub struct FillEvent {
pub order_id: u128,
pub client_order_id: u64,
pub fee: f32,
pub price: i64,
pub quantity: i64,
pub price: f64,
pub quantity: f64,
}
impl Serialize for FillEvent {
@ -61,7 +93,12 @@ impl Serialize for FillEvent {
state.serialize_field("eventType", &self.event_type)?;
state.serialize_field("maker", &self.maker)?;
state.serialize_field("side", &self.side)?;
state.serialize_field("timestamp", &self.timestamp)?;
state.serialize_field(
"timestamp",
&Utc.timestamp_opt(self.timestamp as i64, 0)
.unwrap()
.to_rfc3339(),
)?;
state.serialize_field("seqNum", &self.seq_num)?;
state.serialize_field("owner", &self.owner)?;
state.serialize_field("orderId", &self.order_id)?;
@ -74,7 +111,7 @@ impl Serialize for FillEvent {
}
impl FillEvent {
pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] {
pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> [Self; 2] {
let taker_side = match event.taker_side() {
Side::Ask => OrderbookSide::Ask,
Side::Bid => OrderbookSide::Bid,
@ -83,37 +120,62 @@ impl FillEvent {
Side::Ask => OrderbookSide::Bid,
Side::Bid => OrderbookSide::Ask,
};
[FillEvent {
event_type: FillEventType::Perp,
maker: true,
side: maker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.maker.to_string(),
order_id: event.maker_order_id,
client_order_id: 0u64,
fee: event.maker_fee.to_num(),
price: event.price,
quantity: event.quantity,
},
FillEvent {
event_type: FillEventType::Perp,
maker: false,
side: taker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.taker.to_string(),
order_id: event.taker_order_id,
client_order_id: event.taker_client_order_id,
fee: event.taker_fee.to_num(),
price: event.price,
quantity: event.quantity,
}]
let price = price_lots_to_ui_perp(
event.price,
config.base_decimals,
config.quote_decimals,
config.base_lot_size,
config.quote_lot_size,
);
let quantity =
base_lots_to_ui_perp(event.quantity, config.base_decimals, config.quote_decimals);
[
FillEvent {
event_type: FillEventType::Perp,
maker: true,
side: maker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.maker.to_string(),
order_id: event.maker_order_id,
client_order_id: 0u64,
fee: event.maker_fee.to_num(),
price: price,
quantity: quantity,
},
FillEvent {
event_type: FillEventType::Perp,
maker: false,
side: taker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.taker.to_string(),
order_id: event.taker_order_id,
client_order_id: event.taker_client_order_id,
fee: event.taker_fee.to_num(),
price: price,
quantity: quantity,
},
]
}
pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self {
pub fn new_from_spot(
event: SpotEvent,
timestamp: u64,
seq_num: u64,
config: &MarketConfig,
) -> Self {
match event {
SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => {
SpotEvent::Fill {
side,
maker,
native_qty_paid,
native_qty_received,
native_fee_or_rebate,
order_id,
owner,
client_order_id,
..
} => {
let side = match side as u8 {
0 => OrderbookSide::Bid,
1 => OrderbookSide::Ask,
@ -123,8 +185,41 @@ impl FillEvent {
Some(id) => id.into(),
None => 0u64,
};
// TODO: native to ui
let price = (native_qty_paid / native_qty_received) as i64;
let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64;
let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64;
let (price, quantity) = match side {
OrderbookSide::Bid => {
let price_before_fees = if maker {
native_qty_paid + native_fee_or_rebate
} else {
native_qty_paid - native_fee_or_rebate
};
let top = price_before_fees * base_multiplier;
let bottom = quote_multiplier * native_qty_received;
let price = top as f64 / bottom as f64;
let quantity = native_qty_received as f64 / base_multiplier as f64;
(price, quantity)
}
OrderbookSide::Ask => {
let price_before_fees = if maker {
native_qty_received - native_fee_or_rebate
} else {
native_qty_received + native_fee_or_rebate
};
let top = price_before_fees * base_multiplier;
let bottom = quote_multiplier * native_qty_paid;
let price = top as f64 / bottom as f64;
let quantity = native_qty_paid as f64 / base_multiplier as f64;
(price, quantity)
}
};
let fee = native_fee_or_rebate as f32 / quote_multiplier as f32;
FillEvent {
event_type: FillEventType::Spot,
maker: maker,
@ -134,12 +229,14 @@ impl FillEvent {
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
order_id: order_id,
client_order_id: client_order_id,
fee: native_fee_or_rebate as f32,
fee,
price,
quantity: native_qty_received as i64,
quantity,
}
}
SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")}
SpotEvent::Out { .. } => {
panic!("Can't build FillEvent from SpotEvent::Out")
}
}
}
}
@ -148,8 +245,8 @@ impl FillEvent {
pub struct FillUpdate {
pub event: FillEvent,
pub status: FillUpdateStatus,
pub market: String,
pub queue: String,
pub market_key: String,
pub market_name: String,
pub slot: u64,
pub write_version: u64,
}
@ -170,13 +267,13 @@ impl Serialize for FillUpdate {
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillUpdate", 4)?;
let mut state = serializer.serialize_struct("FillUpdate", 6)?;
state.serialize_field("event", &self.event)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("marketKey", &self.market_key)?;
state.serialize_field("marketName", &self.market_name)?;
state.serialize_field("status", &self.status)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.serialize_field("writeVersion", &self.write_version)?;
state.end()
}
@ -218,7 +315,7 @@ type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
fn publish_changes_perp(
slot: u64,
write_version: u64,
mkt: &(Pubkey, Pubkey),
mkt: &(Pubkey, MarketConfig),
header: &EventQueueHeader,
events: &EventQueueEvents,
old_seq_num: u64,
@ -234,7 +331,7 @@ fn publish_changes_perp(
.unwrap_or(0);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.1.to_string();
let evq_pk_string = mkt.1.event_queue.to_string();
for seq_num in start_seq_num..header.seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
@ -245,8 +342,8 @@ fn publish_changes_perp(
// the order of these checks is important so they are exhaustive
if seq_num >= old_seq_num {
debug!(
"found new event {} idx {} type {}",
mkt_pk_string, idx, events[idx].event_type as u32
"found new event {} idx {} type {} slot {} write_version {}",
mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version
);
metric_events_new.increment();
@ -254,7 +351,7 @@ fn publish_changes_perp(
// new fills are published and recorded in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
// send event for both maker and taker
for fill in fills {
fill_update_sender
@ -263,8 +360,8 @@ fn publish_changes_perp(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -283,7 +380,7 @@ fn publish_changes_perp(
// first revoke old event if a fill
if old_events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -291,8 +388,8 @@ fn publish_changes_perp(
write_version,
event: fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -301,7 +398,7 @@ fn publish_changes_perp(
// then publish new if its a fill and record in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -309,8 +406,8 @@ fn publish_changes_perp(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -320,7 +417,7 @@ fn publish_changes_perp(
// every already published event is recorded in checkpoint if a fill
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
checkpoint.push(fill);
}
@ -332,15 +429,15 @@ fn publish_changes_perp(
for seq_num in header.seq_num..old_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
debug!(
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}",
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num, slot, write_version
);
metric_events_drop.increment();
if old_events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -348,8 +445,8 @@ fn publish_changes_perp(
event: fill,
write_version,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -370,7 +467,7 @@ fn publish_changes_perp(
fn publish_changes_serum(
slot: u64,
write_version: u64,
mkt: &(Pubkey, Pubkey),
mkt: &(Pubkey, MarketConfig),
header: &SerumEventQueueHeader,
events: &[serum_dex::state::Event],
old_seq_num: u64,
@ -386,12 +483,15 @@ fn publish_changes_serum(
.unwrap_or(0);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.1.to_string();
let evq_pk_string = mkt.1.event_queue.to_string();
let header_seq_num = header.seq_num;
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
// Timestamp for spot events is time scraped
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
for seq_num in start_seq_num..header_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
let event_view = events[idx].as_view().unwrap();
@ -404,7 +504,7 @@ fn publish_changes_serum(
// 2) the event is not matching the old event queue
// 3) all other events are matching the old event queue
// the order of these checks is important so they are exhaustive
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num);
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1);
if seq_num >= old_seq_num {
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
@ -415,8 +515,8 @@ fn publish_changes_serum(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -430,11 +530,15 @@ fn publish_changes_serum(
"found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
let old_fill = FillEvent::new_from_spot(
old_event_view,
timestamp,
seq_num,
&mkt.1,
);
// first revoke old event
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -442,11 +546,11 @@ fn publish_changes_serum(
write_version,
event: old_fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -454,8 +558,8 @@ fn publish_changes_serum(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -478,8 +582,8 @@ fn publish_changes_serum(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -503,15 +607,15 @@ fn publish_changes_serum(
match old_event_view {
SpotEvent::Fill { .. } => {
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
event: old_fill,
write_version,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -520,21 +624,19 @@ fn publish_changes_serum(
}
fill_update_sender
.try_send(FillEventFilterMessage::Checkpoint(
FillCheckpoint {
slot,
write_version,
events: checkpoint,
market: mkt_pk_string,
queue: evq_pk_string,
},
))
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
slot,
write_version,
events: checkpoint,
market: mkt_pk_string,
queue: evq_pk_string,
}))
.unwrap()
}
pub async fn init(
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
perp_market_configs: Vec<(Pubkey, MarketConfig)>,
spot_market_configs: Vec<(Pubkey, MarketConfig)>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
@ -549,8 +651,12 @@ pub async fn init(
metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter);
let mut metric_events_change =
metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter);
let mut metric_events_change_serum =
metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter);
let mut metrics_events_drop =
metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter);
let mut metrics_events_drop_serum =
metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter);
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
@ -572,18 +678,24 @@ pub async fn init(
let mut seq_num_cache = HashMap::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
let relevant_pubkeys = all_queue_pks
let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat();
let perp_queue_pks: Vec<Pubkey> = perp_market_configs
.iter()
.map(|m| m.1)
.collect::<HashSet<Pubkey>>();
.map(|x| x.1.event_queue)
.collect();
let spot_queue_pks: Vec<Pubkey> = spot_market_configs
.iter()
.map(|x| x.1.event_queue)
.collect();
let all_queue_pks: HashSet<Pubkey> =
HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat());
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver_c.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) {
if !all_queue_pks.contains(&account_write.pubkey) {
continue;
}
@ -611,21 +723,38 @@ pub async fn init(
});
}
Err(e) = slot_queue_receiver.recv() => {
warn!("slot update channel err {:?}", e);
}
Err(e) = account_write_queue_receiver_c.recv() => {
warn!("write update channel err {:?}", e);
}
}
for mkt in all_queue_pks.iter() {
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
let mkt_pk = mkt.1;
for mkt in all_market_configs.iter() {
let evq_pk = mkt.1.event_queue;
let evq_pk_string = evq_pk.to_string();
let last_evq_version = last_evq_versions
.get(&mkt.1.event_queue.to_string())
.unwrap_or(&(0, 0));
match chain_cache.account(&mkt_pk) {
match chain_cache.account(&evq_pk) {
Ok(account_info) => {
// only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version);
let evq_pk_string = mkt.1.to_string();
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
if evq_version == *last_evq_version {
continue;
}
if evq_version.0 < last_evq_version.0 {
debug!("evq version slot was old");
continue;
}
if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1
{
info!("evq version slot was same and write version was old");
continue;
}
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
let account = &account_info.account;
@ -633,17 +762,16 @@ pub async fn init(
if is_perp {
let event_queue =
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
trace!(
"evq {} seq_num {}",
evq_pk_string,
event_queue.header.seq_num
info!(
"evq {} seq_num {} version {:?}",
evq_pk_string, event_queue.header.seq_num, evq_version,
);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
Some(old_events) => publish_changes_perp(
account_info.slot,
account_info.write_version,
mkt,
&mkt,
&event_queue.header,
&event_queue.buf,
*old_seq_num,
@ -690,14 +818,17 @@ pub async fn init(
old_events,
&fill_update_sender,
&mut metric_events_new_serum,
&mut metric_events_change,
&mut metrics_events_drop,
&mut metric_events_change_serum,
&mut metrics_events_drop_serum,
),
_ => {
info!("serum_events_cache could not find {}", evq_pk_string)
debug!(
"serum_events_cache could not find {}",
evq_pk_string
)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
_ => debug!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
@ -705,7 +836,7 @@ pub async fn init(
.insert(evq_pk_string.clone(), events.clone().to_vec());
}
}
Err(_) => info!("chain_cache could not find {}", mkt.1),
Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue),
}
}
}

View File

@ -0,0 +1,237 @@
use chrono::{TimeZone, Utc};
use log::*;
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use postgres_query::Caching;
use std::{env, fs, time::Duration};
use tokio_postgres::Client;
use crate::{fill_event_filter::FillUpdate, metrics::*, PostgresConfig};
async fn postgres_connection(
config: &PostgresConfig,
metric_retries: MetricU64,
metric_live: MetricU64,
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
let (tx, rx) = async_channel::unbounded();
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
info!("making tls config");
let tls = match &config.tls {
Some(tls) => {
use base64::{engine::general_purpose, Engine as _};
let ca_cert = match &tls.ca_cert_path.chars().next().unwrap() {
'$' => general_purpose::STANDARD
.decode(
env::var(&tls.ca_cert_path[1..])
.expect("reading client cert from env")
.into_bytes(),
)
.expect("decoding client cert"),
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
};
let client_key = match &tls.client_key_path.chars().next().unwrap() {
'$' => general_purpose::STANDARD
.decode(
env::var(&tls.client_key_path[1..])
.expect("reading client key from env")
.into_bytes(),
)
.expect("decoding client key"),
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
};
MakeTlsConnector::new(
TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
.identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
)
}
None => MakeTlsConnector::new(
TlsConnector::builder()
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
),
};
let config = config.clone();
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?);
let mut metric_retries = metric_retries;
let mut metric_live = metric_live;
tokio::spawn(async move {
loop {
let (client, connection) = match initial.take() {
Some(v) => v,
None => {
let result =
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
match result {
Ok(v) => v,
Err(err) => {
warn!("could not connect to postgres: {:?}", err);
tokio::time::sleep(Duration::from_secs(
config.retry_connection_sleep_secs,
))
.await;
continue;
}
}
}
};
tx.send(Some(client)).await.expect("send success");
metric_live.increment();
let result = connection.await;
metric_retries.increment();
metric_live.decrement();
tx.send(None).await.expect("send success");
warn!("postgres connection error: {:?}", result);
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
}
});
Ok(rx)
}
async fn update_postgres_client<'a>(
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
config: &PostgresConfig,
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
// get the most recent client, waiting if there's a disconnect
while !rx.is_empty() || client.is_none() {
tokio::select! {
client_raw_opt = rx.recv() => {
*client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new);
},
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
error!("waited too long for new postgres client");
std::process::exit(1);
},
}
}
client.as_ref().expect("must contain value")
}
async fn process_update(client: &Caching<Client>, update: &FillUpdate) -> anyhow::Result<()> {
let market = &update.market_key;
let seq_num = update.event.seq_num as i64;
let fill_timestamp = Utc.timestamp_opt(update.event.timestamp as i64, 0).unwrap();
let price = update.event.price as f64;
let quantity = update.event.quantity as f64;
let slot = update.slot as i64;
let write_version = update.write_version as i64;
let query = postgres_query::query!(
"INSERT INTO transactions_v4.perp_fills_feed_events
(market, seq_num, fill_timestamp, price,
quantity, slot, write_version)
VALUES
($market, $seq_num, $fill_timestamp, $price,
$quantity, $slot, $write_version)
ON CONFLICT (market, seq_num) DO NOTHING",
market,
seq_num,
fill_timestamp,
price,
quantity,
slot,
write_version,
);
let _ = query.execute(&client).await?;
Ok(())
}
pub async fn init(
config: &PostgresConfig,
metrics_sender: Metrics,
) -> anyhow::Result<async_channel::Sender<FillUpdate>> {
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (fill_update_queue_sender, fill_update_queue_receiver) =
async_channel::bounded::<FillUpdate>(config.max_queue_size);
let metric_con_retries = metrics_sender.register_u64(
"fills_postgres_connection_retries".into(),
MetricType::Counter,
);
let metric_con_live =
metrics_sender.register_u64("fills_postgres_connections_alive".into(), MetricType::Gauge);
// postgres fill update sending worker threads
for _ in 0..config.connection_count {
let postgres_account_writes =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let fill_update_queue_receiver_c = fill_update_queue_receiver.clone();
let config = config.clone();
let mut metric_retries =
metrics_sender.register_u64("fills_postgres_retries".into(), MetricType::Counter);
tokio::spawn(async move {
let mut client_opt = None;
loop {
// Retrieve up to batch_size updates
let mut batch = Vec::new();
batch.push(
fill_update_queue_receiver_c
.recv()
.await
.expect("sender must stay alive"),
);
while batch.len() < config.max_batch_size {
match fill_update_queue_receiver_c.try_recv() {
Ok(update) => batch.push(update),
Err(async_channel::TryRecvError::Empty) => break,
Err(async_channel::TryRecvError::Closed) => {
panic!("sender must stay alive")
}
};
}
info!(
"updates, batch {}, channel size {}",
batch.len(),
fill_update_queue_receiver_c.len(),
);
let mut error_count = 0;
loop {
let client =
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
.await;
let mut results = futures::future::join_all(
batch.iter().map(|update| process_update(client, update)),
)
.await;
let mut iter = results.iter();
batch.retain(|_| iter.next().unwrap().is_err());
if batch.len() > 0 {
metric_retries.add(batch.len() as u64);
error_count += 1;
if error_count - 1 < config.retry_query_max_count {
results.retain(|r| r.is_err());
warn!("failed to process fill update, retrying: {:?}", results);
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
.await;
continue;
} else {
error!("failed to process account write, exiting");
std::process::exit(1);
}
};
break;
}
}
});
}
Ok(fill_update_queue_sender)
}

View File

@ -1,21 +1,19 @@
pub mod chain_data;
pub mod fill_event_filter;
pub mod orderbook_filter;
pub mod fill_event_postgres_target;
pub mod grpc_plugin_source;
pub mod memory_target;
pub mod metrics;
pub mod postgres_target;
pub mod orderbook_filter;
pub mod postgres_types_numeric;
pub mod websocket_source;
pub use chain_data::SlotStatus;
use serde::{Serialize, Serializer, ser::SerializeStruct};
use serde::{ser::SerializeStruct, Serialize, Serializer};
use {
async_trait::async_trait,
serde_derive::Deserialize,
solana_sdk::{account::Account, pubkey::Pubkey},
std::sync::Arc,
};
trait AnyhowWrap {
@ -69,14 +67,12 @@ pub struct SlotUpdate {
#[derive(Clone, Debug, Deserialize)]
pub struct PostgresConfig {
pub connection_string: String,
/// Number of parallel postgres connections used for account write insertions
pub account_write_connection_count: u64,
/// Maximum batch size for account write inserts over one connection
pub account_write_max_batch_size: usize,
/// Max size of account write queues
pub account_write_max_queue_size: usize,
/// Number of parallel postgres connections used for slot insertions
pub slot_update_connection_count: u64,
/// Number of parallel postgres connections used for insertions
pub connection_count: u64,
/// Maximum batch size for inserts over one connection
pub max_batch_size: usize,
/// Max size of queues
pub max_queue_size: usize,
/// Number of queries retries before fatal error
pub retry_query_max_count: u64,
/// Seconds to sleep between query retries
@ -87,12 +83,15 @@ pub struct PostgresConfig {
pub fatal_connection_timeout_secs: u64,
/// Allow invalid TLS certificates, passed to native_tls danger_accept_invalid_certs
pub allow_invalid_certs: bool,
/// Name key to use in the monitoring table
pub monitoring_name: String,
/// Time between updates to the monitoring table
pub monitoring_update_interval_secs: u64,
/// Time between cleanup jobs (0 to disable)
pub cleanup_interval_secs: u64,
pub tls: Option<PostgresTlsConfig>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct PostgresTlsConfig {
/// CA Cert file or env var
pub ca_cert_path: String,
/// PKCS12 client cert path
pub client_key_path: String,
}
#[derive(Clone, Debug, Deserialize)]
@ -164,63 +163,3 @@ pub struct Config {
pub source: SourceConfig,
pub metrics: MetricsConfig,
}
#[async_trait]
pub trait AccountTable: Sync + Send {
fn table_name(&self) -> &str;
async fn insert_account_write(
&self,
client: &postgres_query::Caching<tokio_postgres::Client>,
account_write: &AccountWrite,
) -> anyhow::Result<()>;
}
pub type AccountTables = Vec<Arc<dyn AccountTable>>;
pub struct RawAccountTable {}
pub fn encode_address(addr: &Pubkey) -> String {
bs58::encode(&addr.to_bytes()).into_string()
}
#[async_trait]
impl AccountTable for RawAccountTable {
fn table_name(&self) -> &str {
"account_write"
}
async fn insert_account_write(
&self,
client: &postgres_query::Caching<tokio_postgres::Client>,
account_write: &AccountWrite,
) -> anyhow::Result<()> {
let pubkey = encode_address(&account_write.pubkey);
let owner = encode_address(&account_write.owner);
let slot = account_write.slot as i64;
let write_version = account_write.write_version as i64;
let lamports = account_write.lamports as i64;
let rent_epoch = account_write.rent_epoch as i64;
// TODO: should update for same write_version to work with websocket input
let query = postgres_query::query!(
"INSERT INTO account_write
(pubkey_id, slot, write_version, is_selected,
owner_id, lamports, executable, rent_epoch, data)
VALUES
(map_pubkey($pubkey), $slot, $write_version, $is_selected,
map_pubkey($owner), $lamports, $executable, $rent_epoch, $data)
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey,
slot,
write_version,
is_selected = account_write.is_selected,
owner,
lamports,
executable = account_write.executable,
rent_epoch,
data = account_write.data,
);
let _ = query.execute(client).await?;
Ok(())
}
}

View File

@ -1,10 +1,16 @@
use crate::metrics::MetricU64;
use crate::{
chain_data::{AccountData, ChainData, SlotData},
metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate,
};
use anchor_lang::AccountDeserialize;
use itertools::Itertools;
use log::*;
use mango_v4::{
serum3_cpi::OrderBookStateHeader,
state::{BookSide, OrderTreeType},
};
use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::critbit::Slab;
use solana_sdk::{
@ -19,13 +25,6 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use crate::metrics::MetricU64;
use anchor_lang::AccountDeserialize;
use mango_v4::{
serum3_cpi::OrderBookStateHeader,
state::{BookSide, OrderTreeType},
};
#[derive(Clone, Debug)]
pub enum OrderbookSide {
Bid = 0,
@ -106,6 +105,7 @@ pub struct MarketConfig {
pub name: String,
pub bids: Pubkey,
pub asks: Pubkey,
pub event_queue: Pubkey,
pub base_decimals: u8,
pub quote_decimals: u8,
pub base_lot_size: i64,
@ -113,26 +113,28 @@ pub struct MarketConfig {
}
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
let decimals: u32 = 3;
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
res
(native * base_lot_size) as f64 / 10i64.pow(base_decimals.into()) as f64
}
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
let decimals: u32 = 4;
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
res
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
let decimals = base_decimals - quote_decimals;
native as f64 / (10i64.pow(decimals.into()) as f64)
}
pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
let decimals = base_decimals - quote_decimals;
// let res = native as f64
// * ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
// as f64;
let res = native as f64 / (10u64.pow(decimals.into())) as f64;
res
native as f64 / (10u64.pow(decimals.into())) as f64
}
pub fn spot_price_to_ui(
native: i64,
native_size: i64,
base_decimals: u8,
quote_decimals: u8,
) -> f64 {
// TODO: account for fees
((native * 10i64.pow(base_decimals.into())) / (10i64.pow(quote_decimals.into()) * native_size))
as f64
}
pub fn price_lots_to_ui_perp(
@ -143,13 +145,8 @@ pub fn price_lots_to_ui_perp(
quote_lot_size: i64,
) -> f64 {
let decimals = base_decimals - quote_decimals;
let res = native as f64
* ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
as f64;
// let res = native as f64
// / (10u64.pow(decimals.into()))
// as f64;
res
let multiplier = 10u64.pow(decimals.into()) as f64;
native as f64 * ((multiplier * quote_lot_size as f64) / base_lot_size as f64)
}
fn publish_changes(
@ -332,8 +329,17 @@ pub async fn init(
let side_pk_string = side_pk.to_string();
let write_version = (account_info.slot, account_info.write_version);
// todo: should this be <= so we don't overwrite with old data received late?
if write_version <= *last_write_version {
if write_version == *last_write_version {
continue;
}
if write_version.0 < last_write_version.0 {
debug!("evq version slot was old");
continue;
}
if write_version.0 == last_write_version.0
&& write_version.1 < last_write_version.1
{
debug!("evq version slot was same and write version was old");
continue;
}
last_write_versions.insert(side_pk_string.clone(), write_version);
@ -369,7 +375,7 @@ pub async fn init(
.map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x),
mkt.1.base_decimals,
mkt.1.base_lot_size,
mkt.1.quote_decimals,
),
]
})
@ -394,7 +400,7 @@ pub async fn init(
bookside_cache.insert(side_pk_string.clone(), bookside.clone());
}
Err(_) => info!("chain_cache could not find {}", mkt_pk),
Err(_) => debug!("chain_cache could not find {}", mkt_pk),
}
}
}
@ -417,7 +423,7 @@ pub async fn init(
continue;
}
last_write_versions.insert(side_pk_string.clone(), write_version);
info!("W {}", mkt.1.name);
debug!("W {}", mkt.1.name);
let account = &mut account_info.account.clone();
let data = account.data_as_mut_slice();
let len = data.len();
@ -473,7 +479,7 @@ pub async fn init(
serum_bookside_cache.insert(side_pk_string.clone(), bookside);
}
Err(_) => info!("chain_cache could not find {}", side_pk),
Err(_) => debug!("chain_cache could not find {}", side_pk),
}
}
}

View File

@ -1,637 +0,0 @@
use anyhow::Context;
use log::*;
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use postgres_query::{query, query_dyn};
use std::{collections::HashMap, convert::TryFrom, time::Duration};
use crate::{metrics::*, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate};
mod pg {
#[derive(Clone, Copy, Debug, PartialEq, postgres_types::ToSql)]
pub enum SlotStatus {
Rooted,
Confirmed,
Processed,
}
impl From<super::SlotStatus> for SlotStatus {
fn from(status: super::SlotStatus) -> SlotStatus {
match status {
super::SlotStatus::Rooted => SlotStatus::Rooted,
super::SlotStatus::Confirmed => SlotStatus::Confirmed,
super::SlotStatus::Processed => SlotStatus::Processed,
}
}
}
}
async fn postgres_connection(
config: &PostgresConfig,
metric_retries: MetricU64,
metric_live: MetricU64,
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
let (tx, rx) = async_channel::unbounded();
let tls = MakeTlsConnector::new(
TlsConnector::builder()
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
);
let config = config.clone();
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?);
let mut metric_retries = metric_retries;
let mut metric_live = metric_live;
tokio::spawn(async move {
loop {
let (client, connection) = match initial.take() {
Some(v) => v,
None => {
let result =
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
match result {
Ok(v) => v,
Err(err) => {
warn!("could not connect to postgres: {:?}", err);
tokio::time::sleep(Duration::from_secs(
config.retry_connection_sleep_secs,
))
.await;
continue;
}
}
}
};
tx.send(Some(client)).await.expect("send success");
metric_live.increment();
let result = connection.await;
metric_retries.increment();
metric_live.decrement();
tx.send(None).await.expect("send success");
warn!("postgres connection error: {:?}", result);
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
}
});
Ok(rx)
}
async fn update_postgres_client<'a>(
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
config: &PostgresConfig,
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
// get the most recent client, waiting if there's a disconnect
while !rx.is_empty() || client.is_none() {
tokio::select! {
client_raw_opt = rx.recv() => {
*client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new);
},
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
error!("waited too long for new postgres client");
std::process::exit(1);
},
}
}
client.as_ref().expect("must contain value")
}
async fn process_account_write(
client: &postgres_query::Caching<tokio_postgres::Client>,
write: &AccountWrite,
account_tables: &AccountTables,
) -> anyhow::Result<()> {
futures::future::try_join_all(
account_tables
.iter()
.map(|table| table.insert_account_write(client, write)),
)
.await?;
Ok(())
}
struct Slots {
// non-rooted only
slots: HashMap<u64, SlotUpdate>,
newest_processed_slot: Option<u64>,
newest_rooted_slot: Option<u64>,
}
#[derive(Default)]
struct SlotPreprocessing {
discard_duplicate: bool,
discard_old: bool,
new_processed_head: bool,
new_rooted_head: bool,
parent_update: bool,
}
impl Slots {
fn new() -> Self {
Self {
slots: HashMap::new(),
newest_processed_slot: None,
newest_rooted_slot: None,
}
}
fn add(&mut self, update: &SlotUpdate) -> SlotPreprocessing {
let mut result = SlotPreprocessing::default();
if let Some(previous) = self.slots.get_mut(&update.slot) {
if previous.status == update.status && previous.parent == update.parent {
result.discard_duplicate = true;
}
previous.status = update.status;
if update.parent.is_some() && previous.parent != update.parent {
previous.parent = update.parent;
result.parent_update = true;
}
} else if self.newest_rooted_slot.is_none()
|| update.slot > self.newest_rooted_slot.unwrap()
{
self.slots.insert(update.slot, update.clone());
} else {
result.discard_old = true;
}
if update.status == SlotStatus::Rooted {
let old_slots: Vec<u64> = self
.slots
.keys()
.filter(|s| **s <= update.slot)
.copied()
.collect();
for old_slot in old_slots {
self.slots.remove(&old_slot);
}
if self.newest_rooted_slot.is_none() || self.newest_rooted_slot.unwrap() < update.slot {
self.newest_rooted_slot = Some(update.slot);
result.new_rooted_head = true;
}
}
if self.newest_processed_slot.is_none() || self.newest_processed_slot.unwrap() < update.slot
{
self.newest_processed_slot = Some(update.slot);
result.new_processed_head = true;
}
result
}
}
fn make_cleanup_steps(tables: &Vec<String>) -> HashMap<String, String> {
let mut steps = HashMap::<String, String>::new();
// Delete all account writes that came before the newest rooted slot except
// for the newest rooted write for each pubkey.
// This could be older rooted writes or writes in uncled slots that came
// before the newest rooted slot.
//
// Also delete _all_ writes from before the newest snapshot, because these may
// be for deleted accounts where the deletion event was missed. Snapshots
// provide a new state for all live accounts, but don't tell us about deleted
// accounts.
//
// The way this is done, by taking the newest snapshot that's at least
// min_snapshot_age behind the newest rooted slot is a workaround: we don't know
// how long it'll take to insert snapshot data, but assume it'll be done by that
// time.
let min_snapshot_age = 300;
steps.extend(
tables
.iter()
.map(|table_name| {
let sql = format!(
"WITH
newest_rooted AS (
SELECT max(slot) AS newest_rooted_slot FROM slot WHERE status = 'Rooted'),
newest_snapshot AS (
SELECT max(slot) AS newest_snapshot_slot FROM account_write, newest_rooted
WHERE write_version = 0 AND slot + {min_snapshot_age} < newest_rooted_slot)
DELETE FROM {table} AS data
USING
newest_rooted,
newest_snapshot,
(SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version
FROM {table}
LEFT JOIN slot USING(slot)
CROSS JOIN newest_rooted
WHERE slot <= newest_rooted_slot AND (status = 'Rooted' OR status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC
) newest_rooted_write
WHERE
data.pubkey_id = newest_rooted_write.pubkey_id AND (
data.slot < newest_snapshot_slot OR (
data.slot <= newest_rooted_slot
AND (data.slot != newest_rooted_write.slot OR data.write_version != newest_rooted_write.write_version)
)
)",
table = table_name,
min_snapshot_age = min_snapshot_age,
);
(format!("delete old writes in {}", table_name), sql)
})
.collect::<HashMap<String, String>>(),
);
// Delete information about older slots
steps.insert(
"delete old slots".into(),
"DELETE FROM slot
USING (SELECT max(slot) as newest_rooted_slot FROM slot WHERE status = 'Rooted') s
WHERE slot + 1000 < newest_rooted_slot"
.into(),
);
steps
}
#[derive(Clone)]
struct SlotsProcessing {}
impl SlotsProcessing {
fn new() -> Self {
Self {}
}
async fn process(
&self,
client: &postgres_query::Caching<tokio_postgres::Client>,
update: &SlotUpdate,
meta: &SlotPreprocessing,
) -> anyhow::Result<()> {
let slot = update.slot as i64;
let status: pg::SlotStatus = update.status.into();
if let Some(parent) = update.parent {
let parent = parent as i64;
let query = query!(
"INSERT INTO slot
(slot, parent, status, uncle)
VALUES
($slot, $parent, $status, FALSE)
ON CONFLICT (slot) DO UPDATE SET
parent=$parent, status=$status",
slot,
parent,
status,
);
let _ = query.execute(client).await.context("updating slot row")?;
} else {
let query = query!(
"INSERT INTO slot
(slot, parent, status, uncle)
VALUES
($slot, NULL, $status, FALSE)
ON CONFLICT (slot) DO UPDATE SET
status=$status",
slot,
status,
);
let _ = query.execute(client).await.context("updating slot row")?;
}
if meta.new_rooted_head {
let slot = update.slot as i64;
// Mark preceeding non-uncle slots as rooted
let query = query!(
"UPDATE slot SET status = 'Rooted'
WHERE slot < $newest_final_slot
AND (NOT uncle)
AND status != 'Rooted'",
newest_final_slot = slot
);
let _ = query
.execute(client)
.await
.context("updating preceding non-rooted slots")?;
}
if meta.new_processed_head || meta.parent_update {
// update the uncle column for the chain of slots from the
// newest down the the first rooted slot
let query = query!(
"WITH RECURSIVE
liveslots AS (
SELECT slot.*, 0 AS depth FROM slot
WHERE slot = (SELECT max(slot) FROM slot)
UNION ALL
SELECT s.*, depth + 1 FROM slot s
INNER JOIN liveslots l ON s.slot = l.parent
WHERE l.status != 'Rooted' AND depth < 1000
),
min_slot AS (SELECT min(slot) AS min_slot FROM liveslots)
UPDATE slot SET
uncle = NOT EXISTS (SELECT 1 FROM liveslots WHERE liveslots.slot = slot.slot)
FROM min_slot
WHERE slot >= min_slot;"
);
let _ = query
.execute(client)
.await
.context("recomputing slot uncle status")?;
}
trace!("slot update done {}", update.slot);
Ok(())
}
}
fn secs_since_epoch() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
}
fn epoch_secs_to_time(secs: u64) -> std::time::SystemTime {
std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs)
}
pub async fn init(
config: &PostgresConfig,
account_tables: AccountTables,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
)> {
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::bounded::<AccountWrite>(config.account_write_max_queue_size);
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
let (slot_inserter_sender, slot_inserter_receiver) =
async_channel::unbounded::<(SlotUpdate, SlotPreprocessing)>();
let metric_con_retries =
metrics_sender.register_u64("postgres_connection_retries".into(), MetricType::Counter);
let metric_con_live =
metrics_sender.register_u64("postgres_connections_alive".into(), MetricType::Gauge);
// postgres account write sending worker threads
for _ in 0..config.account_write_connection_count {
let postgres_account_writes =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let account_tables_c = account_tables.clone();
let config = config.clone();
let mut metric_retries = metrics_sender
.register_u64("postgres_account_write_retries".into(), MetricType::Counter);
let mut metric_last_write = metrics_sender.register_u64(
"postgres_account_write_last_write_timestamp".into(),
MetricType::Gauge,
);
tokio::spawn(async move {
let mut client_opt = None;
loop {
// Retrieve up to batch_size account writes
let mut write_batch = Vec::new();
write_batch.push(
account_write_queue_receiver_c
.recv()
.await
.expect("sender must stay alive"),
);
while write_batch.len() < config.account_write_max_batch_size {
match account_write_queue_receiver_c.try_recv() {
Ok(write) => write_batch.push(write),
Err(async_channel::TryRecvError::Empty) => break,
Err(async_channel::TryRecvError::Closed) => {
panic!("sender must stay alive")
}
};
}
trace!(
"account write, batch {}, channel size {}",
write_batch.len(),
account_write_queue_receiver_c.len(),
);
let mut error_count = 0;
loop {
let client =
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
.await;
let mut results = futures::future::join_all(
write_batch
.iter()
.map(|write| process_account_write(client, &write, &account_tables_c)),
)
.await;
let mut iter = results.iter();
write_batch.retain(|_| iter.next().unwrap().is_err());
if write_batch.len() > 0 {
metric_retries.add(write_batch.len() as u64);
error_count += 1;
if error_count - 1 < config.retry_query_max_count {
results.retain(|r| r.is_err());
warn!("failed to process account write, retrying: {:?}", results);
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
.await;
continue;
} else {
error!("failed to process account write, exiting");
std::process::exit(1);
}
};
break;
}
metric_last_write.set_max(secs_since_epoch());
}
});
}
// slot update handling thread
let mut metric_slot_queue =
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
tokio::spawn(async move {
let mut slots = Slots::new();
loop {
let update = slot_queue_receiver
.recv()
.await
.expect("sender must stay alive");
trace!(
"slot update {}, channel size {}",
update.slot,
slot_queue_receiver.len()
);
// Check if we already know about the slot, or it is outdated
let slot_preprocessing = slots.add(&update);
if slot_preprocessing.discard_duplicate || slot_preprocessing.discard_old {
continue;
}
slot_inserter_sender
.send((update, slot_preprocessing))
.await
.expect("sending must succeed");
metric_slot_queue.set(slot_inserter_sender.len() as u64);
}
});
// postgres slot update worker threads
let slots_processing = SlotsProcessing::new();
for _ in 0..config.slot_update_connection_count {
let postgres_slot =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let receiver_c = slot_inserter_receiver.clone();
let config = config.clone();
let mut metric_retries =
metrics_sender.register_u64("postgres_slot_update_retries".into(), MetricType::Counter);
let mut metric_last_write = metrics_sender.register_u64(
"postgres_slot_last_write_timestamp".into(),
MetricType::Gauge,
);
let slots_processing = slots_processing.clone();
tokio::spawn(async move {
let mut client_opt = None;
loop {
let (update, preprocessing) =
receiver_c.recv().await.expect("sender must stay alive");
trace!("slot insertion, slot {}", update.slot);
let mut error_count = 0;
loop {
let client =
update_postgres_client(&mut client_opt, &postgres_slot, &config).await;
if let Err(err) = slots_processing
.process(client, &update, &preprocessing)
.await
{
metric_retries.increment();
error_count += 1;
if error_count - 1 < config.retry_query_max_count {
warn!("failed to process slot update, retrying: {:?}", err);
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
.await;
continue;
} else {
error!("failed to process slot update, exiting");
std::process::exit(1);
}
};
break;
}
metric_last_write.set_max(secs_since_epoch());
}
});
}
// postgres cleanup thread
if config.cleanup_interval_secs > 0 {
let table_names: Vec<String> = account_tables
.iter()
.map(|table| table.table_name().to_string())
.collect();
let cleanup_steps = make_cleanup_steps(&table_names);
let postgres_con =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let mut metric_last_cleanup = metrics_sender.register_u64(
"postgres_cleanup_last_success_timestamp".into(),
MetricType::Gauge,
);
let mut metric_cleanup_errors =
metrics_sender.register_u64("postgres_cleanup_errors".into(), MetricType::Counter);
let config = config.clone();
tokio::spawn(async move {
let mut client_opt = None;
loop {
tokio::time::sleep(Duration::from_secs(config.cleanup_interval_secs)).await;
let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await;
let mut all_successful = true;
for (name, cleanup_sql) in &cleanup_steps {
let query = query_dyn!(&cleanup_sql).unwrap();
if let Err(err) = query.execute(client).await {
warn!("failed to process cleanup step {}: {:?}", name, err);
metric_cleanup_errors.increment();
all_successful = false;
}
}
if all_successful {
metric_last_cleanup.set_max(secs_since_epoch());
}
}
});
}
// postgres metrics/monitoring thread
{
let postgres_con =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let metric_slot_last_write = metrics_sender.register_u64(
"postgres_slot_last_write_timestamp".into(),
MetricType::Gauge,
);
let metric_account_write_last_write = metrics_sender.register_u64(
"postgres_account_write_last_write_timestamp".into(),
MetricType::Gauge,
);
let metric_account_queue =
metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge);
let metric_slot_queue =
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
let config = config.clone();
tokio::spawn(async move {
let mut client_opt = None;
loop {
tokio::time::sleep(Duration::from_secs(config.monitoring_update_interval_secs))
.await;
let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await;
let last_update = std::time::SystemTime::now();
let last_slot_write = epoch_secs_to_time(metric_slot_last_write.value());
let last_account_write_write =
epoch_secs_to_time(metric_account_write_last_write.value());
let slot_queue = i64::try_from(metric_slot_queue.value()).unwrap();
let account_write_queue = i64::try_from(metric_account_queue.value()).unwrap();
let query = query!(
"INSERT INTO monitoring
(name, last_update, last_slot_write, last_account_write_write, slot_queue, account_write_queue)
VALUES
($name, $last_update, $last_slot_write, $last_account_write_write, $slot_queue, $account_write_queue)
ON CONFLICT (name) DO UPDATE SET
last_update=$last_update,
last_slot_write=$last_slot_write,
last_account_write_write=$last_account_write_write,
slot_queue=$slot_queue,
account_write_queue=$account_write_queue
",
name = config.monitoring_name,
last_update,
last_slot_write,
last_account_write_write,
slot_queue,
account_write_queue,
);
if let Err(err) = query
.execute(client)
.await
.context("updating monitoring table")
{
warn!("failed to process monitoring update: {:?}", err);
};
}
});
}
Ok((account_write_queue_sender, slot_queue_sender))
}

View File

@ -1,9 +1,8 @@
use anchor_client::{
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair},
Cluster,
};
use anchor_lang::prelude::Pubkey;
use bytemuck::cast_slice;
use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{
@ -13,7 +12,6 @@ use futures_util::{
use log::*;
use std::{
collections::{HashMap, HashSet},
convert::identity,
fs::File,
io::Read,
net::SocketAddr,
@ -30,8 +28,11 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize;
use solana_geyser_connector_lib::{
fill_event_filter::FillEventType,
fill_event_postgres_target,
metrics::{MetricType, MetricU64},
FilterConfig, StatusResponse,
orderbook_filter::MarketConfig,
FilterConfig, PostgresConfig, PostgresTlsConfig, StatusResponse,
};
use solana_geyser_connector_lib::{
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
@ -308,44 +309,70 @@ async fn main() -> anyhow::Result<()> {
.await?,
);
// todo: reload markets at intervals
let perp_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.perp_markets
.iter()
.map(|(_, context)| {
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index)
{
Some(token) => token.decimals,
None => panic!("token not found for market"), // todo: default to 6 for usdc?
};
(
context.address,
MarketConfig {
name: context.market.name().to_owned(),
bids: context.market.bids,
asks: context.market.asks,
event_queue: context.market.event_queue,
base_decimals: context.market.base_decimals,
quote_decimals,
base_lot_size: context.market.base_lot_size,
quote_lot_size: context.market.quote_lot_size,
},
)
})
.collect();
let spot_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.serum3_markets
.iter()
.map(|(_, context)| {
let base_decimals = match group_context.tokens.get(&context.market.base_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market"), // todo: default?
};
let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market"), // todo: default to 6 for usdc?
};
(
context.market.serum_market_external,
MarketConfig {
name: context.market.name().to_owned(),
bids: context.bids,
asks: context.asks,
event_queue: context.event_q,
base_decimals,
quote_decimals,
base_lot_size: context.pc_lot_size as i64,
quote_lot_size: context.coin_lot_size as i64,
},
)
})
.collect();
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
.perp_markets
.iter()
.map(|(_, context)| (context.address, context.market.event_queue))
.collect();
let serum_market_pks: Vec<Pubkey> = group_context
.serum3_markets
let spot_queue_pks: Vec<(Pubkey, Pubkey)> = spot_market_configs
.iter()
.map(|(_, context)| context.market.serum_market_external)
.map(|x| (x.0, x.1.event_queue))
.collect();
let serum_market_ais = client
.rpc_async()
.get_multiple_accounts(serum_market_pks.as_slice())
.await?;
let serum_market_ais: Vec<&Account> = serum_market_ais
.iter()
.filter_map(|maybe_ai| match maybe_ai {
Some(ai) => Some(ai),
None => None,
})
.collect();
let serum_queue_pks: Vec<(Pubkey, Pubkey)> = serum_market_ais
.iter()
.enumerate()
.map(|pair| {
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
);
(
serum_market_pks[pair.0],
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
)
})
.collect();
let a: Vec<(String, String)> = group_context
.serum3_markets
.iter()
@ -368,9 +395,28 @@ async fn main() -> anyhow::Result<()> {
.collect();
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
// TODO: read all this from config
let pgconf = PostgresConfig {
connection_string: "$PG_CONNECTION_STRING".to_owned(),
connection_count: 1,
max_batch_size: 1,
max_queue_size: 50_000,
retry_query_max_count: 10,
retry_query_sleep_secs: 2,
retry_connection_sleep_secs: 10,
fatal_connection_timeout_secs: 120,
allow_invalid_certs: true,
tls: Some(PostgresTlsConfig {
ca_cert_path: "$PG_CA_CERT".to_owned(),
client_key_path: "$PG_CLIENT_KEY".to_owned(),
}),
};
let postgres_update_sender =
fill_event_postgres_target::init(&pgconf, metrics_tx.clone()).await?;
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
perp_queue_pks.clone(),
serum_queue_pks.clone(),
perp_market_configs.clone(),
spot_market_configs.clone(),
metrics_tx.clone(),
)
.await?;
@ -389,19 +435,36 @@ async fn main() -> anyhow::Result<()> {
let message = fill_receiver.recv().await.unwrap();
match message {
FillEventFilterMessage::Update(update) => {
debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type);
debug!(
"ws update {} {:?} {:?} fill",
update.market_name, update.status, update.event.event_type
);
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
let json = serde_json::to_string(&update.clone()).unwrap();
// only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market) {
if peer.subscriptions.contains(&update.market_key) {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!("ws update {} fill could not reach {}", update.market, addr);
error!(
"ws update {} fill could not reach {}",
update.market_name, addr
);
}
}
}
// send taker fills to db
let update_c = update.clone();
match update_c.event.event_type {
FillEventType::Perp => {
if !update_c.event.maker {
debug!("{:?}", update_c);
postgres_update_sender.send(update_c).await.unwrap();
}
}
_ => warn!("failed to write spot event to db"),
}
}
FillEventFilterMessage::Checkpoint(checkpoint) => {
checkpoints_ref_thread
@ -461,7 +524,7 @@ async fn main() -> anyhow::Result<()> {
.collect::<String>()
);
let use_geyser = true;
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
let all_queue_pks = [perp_queue_pks.clone()].concat();
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect();
let filter_config = FilterConfig {
program_ids: vec![],

View File

@ -309,6 +309,7 @@ async fn main() -> anyhow::Result<()> {
name: context.market.name().to_owned(),
bids: context.market.bids,
asks: context.market.asks,
event_queue: context.market.event_queue,
base_decimals: context.market.base_decimals,
quote_decimals,
base_lot_size: context.market.base_lot_size,
@ -336,6 +337,7 @@ async fn main() -> anyhow::Result<()> {
name: context.market.name().to_owned(),
bids: context.bids,
asks: context.asks,
event_queue: context.event_q,
base_decimals,
quote_decimals,
base_lot_size: context.pc_lot_size as i64,
@ -355,7 +357,12 @@ async fn main() -> anyhow::Result<()> {
.collect();
let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) =
orderbook_filter::init(market_configs, serum_market_configs, metrics_tx.clone()).await?;
orderbook_filter::init(
market_configs.clone(),
serum_market_configs.clone(),
metrics_tx.clone(),
)
.await?;
let checkpoints_ref_thread = checkpoints.clone();
let peers_ref_thread = peers.clone();
@ -422,12 +429,14 @@ async fn main() -> anyhow::Result<()> {
);
let use_geyser = true;
if use_geyser {
let relevant_pubkeys = [market_configs.clone()]
.concat()
.iter()
.flat_map(|m| [m.1.bids.to_string(), m.1.asks.to_string()])
.collect();
let filter_config = FilterConfig {
program_ids: vec![
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
],
account_ids: vec![],
program_ids: vec![],
account_ids: relevant_pubkeys,
};
grpc_plugin_source::process_events(
&config.source,