Compare commits
5 Commits
1233cc58f3
...
4b75b88a98
Author | SHA1 | Date |
---|---|---|
Riordan Panayides | 4b75b88a98 | |
Riordan Panayides | ba4aa29f7f | |
Riordan Panayides | 746174ce8e | |
Riordan Panayides | 5c8ebc53b1 | |
Riordan Panayides | 6dfe88ac15 |
|
@ -604,9 +604,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
|
|||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.20.0"
|
||||
version = "0.21.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
|
||||
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
|
||||
|
||||
[[package]]
|
||||
name = "base64ct"
|
||||
|
@ -2393,7 +2393,7 @@ checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
|
|||
dependencies = [
|
||||
"http",
|
||||
"hyper 0.14.23",
|
||||
"rustls 0.20.7",
|
||||
"rustls 0.20.8",
|
||||
"tokio",
|
||||
"tokio-rustls 0.23.4",
|
||||
]
|
||||
|
@ -3033,7 +3033,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "mango-v4"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/blockworks-foundation/mango-v4?branch=dev#5019864b844ca6633ee7d1356c763d78fc52ad64"
|
||||
source = "git+https://github.com/blockworks-foundation/mango-v4?branch=dev#0ba7ecd5061e666addd90e6d3a4293ec1381f9c0"
|
||||
dependencies = [
|
||||
"anchor-lang",
|
||||
"anchor-spl",
|
||||
|
@ -3885,6 +3885,7 @@ checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1"
|
|||
dependencies = [
|
||||
"array-init",
|
||||
"bytes 1.3.0",
|
||||
"chrono",
|
||||
"fallible-iterator",
|
||||
"postgres-derive",
|
||||
"postgres-protocol",
|
||||
|
@ -4215,7 +4216,7 @@ dependencies = [
|
|||
"fxhash",
|
||||
"quinn-proto",
|
||||
"quinn-udp",
|
||||
"rustls 0.20.7",
|
||||
"rustls 0.20.8",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
@ -4232,7 +4233,7 @@ dependencies = [
|
|||
"fxhash",
|
||||
"rand 0.8.5",
|
||||
"ring",
|
||||
"rustls 0.20.7",
|
||||
"rustls 0.20.8",
|
||||
"rustls-native-certs",
|
||||
"rustls-pemfile 0.2.1",
|
||||
"slab",
|
||||
|
@ -4605,8 +4606,8 @@ dependencies = [
|
|||
"once_cell",
|
||||
"percent-encoding 2.2.0",
|
||||
"pin-project-lite",
|
||||
"rustls 0.20.7",
|
||||
"rustls-pemfile 1.0.1",
|
||||
"rustls 0.20.8",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
|
@ -4776,9 +4777,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.20.7"
|
||||
version = "0.20.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
|
||||
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
|
||||
dependencies = [
|
||||
"log 0.4.17",
|
||||
"ring",
|
||||
|
@ -4793,7 +4794,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pemfile 1.0.1",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
|
@ -4809,11 +4810,11 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rustls-pemfile"
|
||||
version = "1.0.1"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
|
||||
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
|
||||
dependencies = [
|
||||
"base64 0.13.1",
|
||||
"base64 0.21.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -5610,7 +5611,7 @@ dependencies = [
|
|||
"rand_chacha 0.2.2",
|
||||
"rayon",
|
||||
"reqwest",
|
||||
"rustls 0.20.7",
|
||||
"rustls 0.20.8",
|
||||
"semver 1.0.14",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
|
@ -5762,10 +5763,11 @@ dependencies = [
|
|||
"async-channel",
|
||||
"async-stream 0.2.1",
|
||||
"async-trait",
|
||||
"base64 0.20.0",
|
||||
"base64 0.21.0",
|
||||
"bs58 0.4.0",
|
||||
"bytemuck",
|
||||
"bytes 1.3.0",
|
||||
"chrono",
|
||||
"fixed",
|
||||
"futures 0.3.25",
|
||||
"futures-core",
|
||||
|
@ -5781,6 +5783,8 @@ dependencies = [
|
|||
"postgres_query",
|
||||
"prost 0.9.0",
|
||||
"rand 0.7.3",
|
||||
"rustls 0.20.8",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"serde_json",
|
||||
|
@ -5791,6 +5795,7 @@ dependencies = [
|
|||
"solana-sdk",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-postgres-rustls",
|
||||
"tokio-stream",
|
||||
"tonic 0.6.2",
|
||||
"tonic-build 0.6.2",
|
||||
|
@ -6413,7 +6418,7 @@ dependencies = [
|
|||
"quinn",
|
||||
"rand 0.7.3",
|
||||
"rcgen",
|
||||
"rustls 0.20.7",
|
||||
"rustls 0.20.8",
|
||||
"solana-metrics",
|
||||
"solana-perf",
|
||||
"solana-sdk",
|
||||
|
@ -7098,6 +7103,20 @@ dependencies = [
|
|||
"tokio-util 0.7.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-postgres-rustls"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "606f2b73660439474394432239c82249c0d45eb5f23d91f401be1e33590444a7"
|
||||
dependencies = [
|
||||
"futures 0.3.25",
|
||||
"ring",
|
||||
"rustls 0.20.8",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-rustls 0.23.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-reactor"
|
||||
version = "0.1.12"
|
||||
|
@ -7134,7 +7153,7 @@ version = "0.23.4"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
|
||||
dependencies = [
|
||||
"rustls 0.20.7",
|
||||
"rustls 0.20.8",
|
||||
"tokio",
|
||||
"webpki 0.22.0",
|
||||
]
|
||||
|
@ -7193,7 +7212,7 @@ checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
|
|||
dependencies = [
|
||||
"futures-util",
|
||||
"log 0.4.17",
|
||||
"rustls 0.20.7",
|
||||
"rustls 0.20.8",
|
||||
"tokio",
|
||||
"tokio-rustls 0.23.4",
|
||||
"tungstenite",
|
||||
|
@ -7294,7 +7313,7 @@ dependencies = [
|
|||
"pin-project",
|
||||
"prost 0.11.3",
|
||||
"prost-derive 0.11.2",
|
||||
"rustls-pemfile 1.0.1",
|
||||
"rustls-pemfile 1.0.2",
|
||||
"tokio",
|
||||
"tokio-rustls 0.23.4",
|
||||
"tokio-stream",
|
||||
|
@ -7456,7 +7475,7 @@ dependencies = [
|
|||
"httparse",
|
||||
"log 0.4.17",
|
||||
"rand 0.8.5",
|
||||
"rustls 0.20.7",
|
||||
"rustls 0.20.8",
|
||||
"sha-1 0.10.1",
|
||||
"thiserror",
|
||||
"url 2.3.1",
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
app = "mango-fills"
|
||||
kill_signal = "SIGINT"
|
||||
kill_timeout = 5
|
||||
|
||||
[build]
|
||||
dockerfile = "../Dockerfile"
|
||||
|
||||
[experimental]
|
||||
cmd = ["service-mango-fills", "fills-config.toml"]
|
||||
|
||||
[[services]]
|
||||
internal_port = 8080
|
||||
processes = ["app"]
|
||||
protocol = "tcp"
|
||||
|
||||
[services.concurrency]
|
||||
hard_limit = 1024
|
||||
soft_limit = 1024
|
||||
type = "connections"
|
||||
|
||||
[metrics]
|
||||
path = "/metrics"
|
||||
port = 9091
|
|
@ -0,0 +1,23 @@
|
|||
app = "mango-orderbook"
|
||||
kill_signal = "SIGINT"
|
||||
kill_timeout = 5
|
||||
|
||||
[build]
|
||||
dockerfile = "../Dockerfile"
|
||||
|
||||
[experimental]
|
||||
cmd = ["service-mango-orderbook", "orderbook-config.toml"]
|
||||
|
||||
[[services]]
|
||||
internal_port = 8082
|
||||
processes = ["app"]
|
||||
protocol = "tcp"
|
||||
|
||||
[services.concurrency]
|
||||
hard_limit = 1024
|
||||
soft_limit = 1024
|
||||
type = "connections"
|
||||
|
||||
[metrics]
|
||||
path = "/metrics"
|
||||
port = 9091
|
|
@ -23,10 +23,13 @@ fixed = { version = "*", features = ["serde"] }
|
|||
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-stream = "0.1"
|
||||
tokio-postgres = "0.7"
|
||||
postgres-types = { version = "0.2", features = ["array-impls", "derive"] }
|
||||
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
|
||||
tokio-postgres-rustls = "0.9.0"
|
||||
postgres-types = { version = "0.2", features = ["array-impls", "derive", "with-chrono-0_4"] }
|
||||
postgres-native-tls = "0.5"
|
||||
native-tls = "0.2"
|
||||
rustls = "0.20.8"
|
||||
rustls-pemfile = "1.0.2"
|
||||
|
||||
# postgres_query hasn't updated its crate in a while
|
||||
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
|
||||
|
@ -45,6 +48,7 @@ rand = "0.7"
|
|||
anyhow = "1.0"
|
||||
bytes = "1.0"
|
||||
itertools = "0.10.5"
|
||||
chrono = "0.4.23"
|
||||
|
||||
futures = "0.3.17"
|
||||
futures-core = "0.3"
|
||||
|
|
|
@ -55,18 +55,14 @@ impl ChainData {
|
|||
newest_processed_slot: 0,
|
||||
account_versions_stored: 0,
|
||||
account_bytes_stored: 0,
|
||||
metric_accounts_stored: metrics_sender.register_u64(
|
||||
"chaindata_accounts_stored".into(),
|
||||
MetricType::Gauge,
|
||||
),
|
||||
metric_accounts_stored: metrics_sender
|
||||
.register_u64("chaindata_accounts_stored".into(), MetricType::Gauge),
|
||||
metric_account_versions_stored: metrics_sender.register_u64(
|
||||
"chaindata_account_versions_stored".into(),
|
||||
MetricType::Gauge,
|
||||
),
|
||||
metric_account_bytes_stored: metrics_sender.register_u64(
|
||||
"chaindata_account_bytes_stored".into(),
|
||||
MetricType::Gauge,
|
||||
),
|
||||
metric_account_bytes_stored: metrics_sender
|
||||
.register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
use crate::{
|
||||
chain_data::{AccountData, ChainData, SlotData},
|
||||
metrics::{MetricType, Metrics},
|
||||
AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide,
|
||||
orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide},
|
||||
AccountWrite, SlotUpdate,
|
||||
};
|
||||
use bytemuck::{Pod, Zeroable, cast_slice};
|
||||
use bytemuck::{cast_slice, Pod, Zeroable};
|
||||
use chrono::{TimeZone, Utc};
|
||||
use log::*;
|
||||
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||
use serum_dex::state::EventView as SpotEvent;
|
||||
|
@ -15,7 +17,9 @@ use solana_sdk::{
|
|||
use std::{
|
||||
borrow::BorrowMut,
|
||||
cmp::max,
|
||||
collections::{HashMap, HashSet}, convert::identity, time::SystemTime,
|
||||
collections::{HashMap, HashSet},
|
||||
convert::identity,
|
||||
time::SystemTime,
|
||||
};
|
||||
|
||||
use crate::metrics::MetricU64;
|
||||
|
@ -25,18 +29,46 @@ use mango_v4::state::{
|
|||
MAX_NUM_EVENTS,
|
||||
};
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum FillUpdateStatus {
|
||||
New,
|
||||
Revoke,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize)]
|
||||
impl Serialize for FillUpdateStatus {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
match *self {
|
||||
FillUpdateStatus::New => {
|
||||
serializer.serialize_unit_variant("FillUpdateStatus", 0, "new")
|
||||
}
|
||||
FillUpdateStatus::Revoke => {
|
||||
serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum FillEventType {
|
||||
Spot,
|
||||
Perp,
|
||||
}
|
||||
|
||||
impl Serialize for FillEventType {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
match *self {
|
||||
FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"),
|
||||
FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct FillEvent {
|
||||
pub event_type: FillEventType,
|
||||
|
@ -48,8 +80,8 @@ pub struct FillEvent {
|
|||
pub order_id: u128,
|
||||
pub client_order_id: u64,
|
||||
pub fee: f32,
|
||||
pub price: i64,
|
||||
pub quantity: i64,
|
||||
pub price: f64,
|
||||
pub quantity: f64,
|
||||
}
|
||||
|
||||
impl Serialize for FillEvent {
|
||||
|
@ -61,7 +93,12 @@ impl Serialize for FillEvent {
|
|||
state.serialize_field("eventType", &self.event_type)?;
|
||||
state.serialize_field("maker", &self.maker)?;
|
||||
state.serialize_field("side", &self.side)?;
|
||||
state.serialize_field("timestamp", &self.timestamp)?;
|
||||
state.serialize_field(
|
||||
"timestamp",
|
||||
&Utc.timestamp_opt(self.timestamp as i64, 0)
|
||||
.unwrap()
|
||||
.to_rfc3339(),
|
||||
)?;
|
||||
state.serialize_field("seqNum", &self.seq_num)?;
|
||||
state.serialize_field("owner", &self.owner)?;
|
||||
state.serialize_field("orderId", &self.order_id)?;
|
||||
|
@ -74,7 +111,7 @@ impl Serialize for FillEvent {
|
|||
}
|
||||
|
||||
impl FillEvent {
|
||||
pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] {
|
||||
pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> [Self; 2] {
|
||||
let taker_side = match event.taker_side() {
|
||||
Side::Ask => OrderbookSide::Ask,
|
||||
Side::Bid => OrderbookSide::Bid,
|
||||
|
@ -83,37 +120,62 @@ impl FillEvent {
|
|||
Side::Ask => OrderbookSide::Bid,
|
||||
Side::Bid => OrderbookSide::Ask,
|
||||
};
|
||||
[FillEvent {
|
||||
event_type: FillEventType::Perp,
|
||||
maker: true,
|
||||
side: maker_side,
|
||||
timestamp: event.timestamp,
|
||||
seq_num: event.seq_num,
|
||||
owner: event.maker.to_string(),
|
||||
order_id: event.maker_order_id,
|
||||
client_order_id: 0u64,
|
||||
fee: event.maker_fee.to_num(),
|
||||
price: event.price,
|
||||
quantity: event.quantity,
|
||||
},
|
||||
FillEvent {
|
||||
event_type: FillEventType::Perp,
|
||||
maker: false,
|
||||
side: taker_side,
|
||||
timestamp: event.timestamp,
|
||||
seq_num: event.seq_num,
|
||||
owner: event.taker.to_string(),
|
||||
order_id: event.taker_order_id,
|
||||
client_order_id: event.taker_client_order_id,
|
||||
fee: event.taker_fee.to_num(),
|
||||
price: event.price,
|
||||
quantity: event.quantity,
|
||||
|
||||
}]
|
||||
let price = price_lots_to_ui_perp(
|
||||
event.price,
|
||||
config.base_decimals,
|
||||
config.quote_decimals,
|
||||
config.base_lot_size,
|
||||
config.quote_lot_size,
|
||||
);
|
||||
let quantity =
|
||||
base_lots_to_ui_perp(event.quantity, config.base_decimals, config.quote_decimals);
|
||||
[
|
||||
FillEvent {
|
||||
event_type: FillEventType::Perp,
|
||||
maker: true,
|
||||
side: maker_side,
|
||||
timestamp: event.timestamp,
|
||||
seq_num: event.seq_num,
|
||||
owner: event.maker.to_string(),
|
||||
order_id: event.maker_order_id,
|
||||
client_order_id: 0u64,
|
||||
fee: event.maker_fee.to_num(),
|
||||
price: price,
|
||||
quantity: quantity,
|
||||
},
|
||||
FillEvent {
|
||||
event_type: FillEventType::Perp,
|
||||
maker: false,
|
||||
side: taker_side,
|
||||
timestamp: event.timestamp,
|
||||
seq_num: event.seq_num,
|
||||
owner: event.taker.to_string(),
|
||||
order_id: event.taker_order_id,
|
||||
client_order_id: event.taker_client_order_id,
|
||||
fee: event.taker_fee.to_num(),
|
||||
price: price,
|
||||
quantity: quantity,
|
||||
},
|
||||
]
|
||||
}
|
||||
pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self {
|
||||
pub fn new_from_spot(
|
||||
event: SpotEvent,
|
||||
timestamp: u64,
|
||||
seq_num: u64,
|
||||
config: &MarketConfig,
|
||||
) -> Self {
|
||||
match event {
|
||||
SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => {
|
||||
SpotEvent::Fill {
|
||||
side,
|
||||
maker,
|
||||
native_qty_paid,
|
||||
native_qty_received,
|
||||
native_fee_or_rebate,
|
||||
order_id,
|
||||
owner,
|
||||
client_order_id,
|
||||
..
|
||||
} => {
|
||||
let side = match side as u8 {
|
||||
0 => OrderbookSide::Bid,
|
||||
1 => OrderbookSide::Ask,
|
||||
|
@ -123,8 +185,41 @@ impl FillEvent {
|
|||
Some(id) => id.into(),
|
||||
None => 0u64,
|
||||
};
|
||||
// TODO: native to ui
|
||||
let price = (native_qty_paid / native_qty_received) as i64;
|
||||
|
||||
let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64;
|
||||
let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64;
|
||||
|
||||
let (price, quantity) = match side {
|
||||
OrderbookSide::Bid => {
|
||||
let price_before_fees = if maker {
|
||||
native_qty_paid + native_fee_or_rebate
|
||||
} else {
|
||||
native_qty_paid - native_fee_or_rebate
|
||||
};
|
||||
|
||||
let top = price_before_fees * base_multiplier;
|
||||
let bottom = quote_multiplier * native_qty_received;
|
||||
let price = top as f64 / bottom as f64;
|
||||
let quantity = native_qty_received as f64 / base_multiplier as f64;
|
||||
(price, quantity)
|
||||
}
|
||||
OrderbookSide::Ask => {
|
||||
let price_before_fees = if maker {
|
||||
native_qty_received - native_fee_or_rebate
|
||||
} else {
|
||||
native_qty_received + native_fee_or_rebate
|
||||
};
|
||||
|
||||
let top = price_before_fees * base_multiplier;
|
||||
let bottom = quote_multiplier * native_qty_paid;
|
||||
let price = top as f64 / bottom as f64;
|
||||
let quantity = native_qty_paid as f64 / base_multiplier as f64;
|
||||
(price, quantity)
|
||||
}
|
||||
};
|
||||
|
||||
let fee = native_fee_or_rebate as f32 / quote_multiplier as f32;
|
||||
|
||||
FillEvent {
|
||||
event_type: FillEventType::Spot,
|
||||
maker: maker,
|
||||
|
@ -134,12 +229,14 @@ impl FillEvent {
|
|||
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
|
||||
order_id: order_id,
|
||||
client_order_id: client_order_id,
|
||||
fee: native_fee_or_rebate as f32,
|
||||
fee,
|
||||
price,
|
||||
quantity: native_qty_received as i64,
|
||||
quantity,
|
||||
}
|
||||
}
|
||||
SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")}
|
||||
SpotEvent::Out { .. } => {
|
||||
panic!("Can't build FillEvent from SpotEvent::Out")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -148,8 +245,8 @@ impl FillEvent {
|
|||
pub struct FillUpdate {
|
||||
pub event: FillEvent,
|
||||
pub status: FillUpdateStatus,
|
||||
pub market: String,
|
||||
pub queue: String,
|
||||
pub market_key: String,
|
||||
pub market_name: String,
|
||||
pub slot: u64,
|
||||
pub write_version: u64,
|
||||
}
|
||||
|
@ -170,13 +267,13 @@ impl Serialize for FillUpdate {
|
|||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut state = serializer.serialize_struct("FillUpdate", 4)?;
|
||||
let mut state = serializer.serialize_struct("FillUpdate", 6)?;
|
||||
state.serialize_field("event", &self.event)?;
|
||||
state.serialize_field("market", &self.market)?;
|
||||
state.serialize_field("queue", &self.queue)?;
|
||||
state.serialize_field("marketKey", &self.market_key)?;
|
||||
state.serialize_field("marketName", &self.market_name)?;
|
||||
state.serialize_field("status", &self.status)?;
|
||||
state.serialize_field("slot", &self.slot)?;
|
||||
state.serialize_field("write_version", &self.write_version)?;
|
||||
state.serialize_field("writeVersion", &self.write_version)?;
|
||||
|
||||
state.end()
|
||||
}
|
||||
|
@ -218,7 +315,7 @@ type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
|
|||
fn publish_changes_perp(
|
||||
slot: u64,
|
||||
write_version: u64,
|
||||
mkt: &(Pubkey, Pubkey),
|
||||
mkt: &(Pubkey, MarketConfig),
|
||||
header: &EventQueueHeader,
|
||||
events: &EventQueueEvents,
|
||||
old_seq_num: u64,
|
||||
|
@ -234,7 +331,7 @@ fn publish_changes_perp(
|
|||
.unwrap_or(0);
|
||||
let mut checkpoint = Vec::new();
|
||||
let mkt_pk_string = mkt.0.to_string();
|
||||
let evq_pk_string = mkt.1.to_string();
|
||||
let evq_pk_string = mkt.1.event_queue.to_string();
|
||||
for seq_num in start_seq_num..header.seq_num {
|
||||
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
||||
|
||||
|
@ -245,8 +342,8 @@ fn publish_changes_perp(
|
|||
// the order of these checks is important so they are exhaustive
|
||||
if seq_num >= old_seq_num {
|
||||
debug!(
|
||||
"found new event {} idx {} type {}",
|
||||
mkt_pk_string, idx, events[idx].event_type as u32
|
||||
"found new event {} idx {} type {} slot {} write_version {}",
|
||||
mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version
|
||||
);
|
||||
|
||||
metric_events_new.increment();
|
||||
|
@ -254,7 +351,7 @@ fn publish_changes_perp(
|
|||
// new fills are published and recorded in checkpoint
|
||||
if events[idx].event_type == EventType::Fill as u8 {
|
||||
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
||||
let fills = FillEvent::new_from_perp(fill);
|
||||
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||
// send event for both maker and taker
|
||||
for fill in fills {
|
||||
fill_update_sender
|
||||
|
@ -263,8 +360,8 @@ fn publish_changes_perp(
|
|||
write_version,
|
||||
event: fill.clone(),
|
||||
status: FillUpdateStatus::New,
|
||||
market: mkt_pk_string.clone(),
|
||||
queue: evq_pk_string.clone(),
|
||||
market_key: mkt_pk_string.clone(),
|
||||
market_name: mkt.1.name.clone(),
|
||||
}))
|
||||
.unwrap(); // TODO: use anyhow to bubble up error
|
||||
checkpoint.push(fill);
|
||||
|
@ -283,7 +380,7 @@ fn publish_changes_perp(
|
|||
// first revoke old event if a fill
|
||||
if old_events[idx].event_type == EventType::Fill as u8 {
|
||||
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
||||
let fills = FillEvent::new_from_perp(fill);
|
||||
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||
for fill in fills {
|
||||
fill_update_sender
|
||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||
|
@ -291,8 +388,8 @@ fn publish_changes_perp(
|
|||
write_version,
|
||||
event: fill,
|
||||
status: FillUpdateStatus::Revoke,
|
||||
market: mkt_pk_string.clone(),
|
||||
queue: evq_pk_string.clone(),
|
||||
market_key: mkt_pk_string.clone(),
|
||||
market_name: mkt.1.name.clone(),
|
||||
}))
|
||||
.unwrap(); // TODO: use anyhow to bubble up error
|
||||
}
|
||||
|
@ -301,7 +398,7 @@ fn publish_changes_perp(
|
|||
// then publish new if its a fill and record in checkpoint
|
||||
if events[idx].event_type == EventType::Fill as u8 {
|
||||
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
||||
let fills = FillEvent::new_from_perp(fill);
|
||||
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||
for fill in fills {
|
||||
fill_update_sender
|
||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||
|
@ -309,8 +406,8 @@ fn publish_changes_perp(
|
|||
write_version,
|
||||
event: fill.clone(),
|
||||
status: FillUpdateStatus::New,
|
||||
market: mkt_pk_string.clone(),
|
||||
queue: evq_pk_string.clone(),
|
||||
market_key: mkt_pk_string.clone(),
|
||||
market_name: mkt.1.name.clone(),
|
||||
}))
|
||||
.unwrap(); // TODO: use anyhow to bubble up error
|
||||
checkpoint.push(fill);
|
||||
|
@ -320,7 +417,7 @@ fn publish_changes_perp(
|
|||
// every already published event is recorded in checkpoint if a fill
|
||||
if events[idx].event_type == EventType::Fill as u8 {
|
||||
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
||||
let fills = FillEvent::new_from_perp(fill);
|
||||
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||
for fill in fills {
|
||||
checkpoint.push(fill);
|
||||
}
|
||||
|
@ -332,15 +429,15 @@ fn publish_changes_perp(
|
|||
for seq_num in header.seq_num..old_seq_num {
|
||||
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
||||
debug!(
|
||||
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
|
||||
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num
|
||||
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}",
|
||||
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num, slot, write_version
|
||||
);
|
||||
|
||||
metric_events_drop.increment();
|
||||
|
||||
if old_events[idx].event_type == EventType::Fill as u8 {
|
||||
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
||||
let fills = FillEvent::new_from_perp(fill);
|
||||
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||
for fill in fills {
|
||||
fill_update_sender
|
||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||
|
@ -348,8 +445,8 @@ fn publish_changes_perp(
|
|||
event: fill,
|
||||
write_version,
|
||||
status: FillUpdateStatus::Revoke,
|
||||
market: mkt_pk_string.clone(),
|
||||
queue: evq_pk_string.clone(),
|
||||
market_key: mkt_pk_string.clone(),
|
||||
market_name: mkt.1.name.clone(),
|
||||
}))
|
||||
.unwrap(); // TODO: use anyhow to bubble up error
|
||||
}
|
||||
|
@ -370,7 +467,7 @@ fn publish_changes_perp(
|
|||
fn publish_changes_serum(
|
||||
slot: u64,
|
||||
write_version: u64,
|
||||
mkt: &(Pubkey, Pubkey),
|
||||
mkt: &(Pubkey, MarketConfig),
|
||||
header: &SerumEventQueueHeader,
|
||||
events: &[serum_dex::state::Event],
|
||||
old_seq_num: u64,
|
||||
|
@ -386,12 +483,15 @@ fn publish_changes_serum(
|
|||
.unwrap_or(0);
|
||||
let mut checkpoint = Vec::new();
|
||||
let mkt_pk_string = mkt.0.to_string();
|
||||
let evq_pk_string = mkt.1.to_string();
|
||||
let evq_pk_string = mkt.1.event_queue.to_string();
|
||||
let header_seq_num = header.seq_num;
|
||||
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
|
||||
|
||||
// Timestamp for spot events is time scraped
|
||||
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
||||
let timestamp = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs();
|
||||
for seq_num in start_seq_num..header_seq_num {
|
||||
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
||||
let event_view = events[idx].as_view().unwrap();
|
||||
|
@ -404,7 +504,7 @@ fn publish_changes_serum(
|
|||
// 2) the event is not matching the old event queue
|
||||
// 3) all other events are matching the old event queue
|
||||
// the order of these checks is important so they are exhaustive
|
||||
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num);
|
||||
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1);
|
||||
if seq_num >= old_seq_num {
|
||||
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
|
||||
|
||||
|
@ -415,8 +515,8 @@ fn publish_changes_serum(
|
|||
write_version,
|
||||
event: fill.clone(),
|
||||
status: FillUpdateStatus::New,
|
||||
market: mkt_pk_string.clone(),
|
||||
queue: evq_pk_string.clone(),
|
||||
market_key: mkt_pk_string.clone(),
|
||||
market_name: mkt.1.name.clone(),
|
||||
}))
|
||||
.unwrap(); // TODO: use anyhow to bubble up error
|
||||
checkpoint.push(fill);
|
||||
|
@ -430,11 +530,15 @@ fn publish_changes_serum(
|
|||
"found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
|
||||
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
|
||||
);
|
||||
|
||||
|
||||
metric_events_change.increment();
|
||||
|
||||
|
||||
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
|
||||
|
||||
let old_fill = FillEvent::new_from_spot(
|
||||
old_event_view,
|
||||
timestamp,
|
||||
seq_num,
|
||||
&mkt.1,
|
||||
);
|
||||
// first revoke old event
|
||||
fill_update_sender
|
||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||
|
@ -442,11 +546,11 @@ fn publish_changes_serum(
|
|||
write_version,
|
||||
event: old_fill,
|
||||
status: FillUpdateStatus::Revoke,
|
||||
market: mkt_pk_string.clone(),
|
||||
queue: evq_pk_string.clone(),
|
||||
market_key: mkt_pk_string.clone(),
|
||||
market_name: mkt.1.name.clone(),
|
||||
}))
|
||||
.unwrap(); // TODO: use anyhow to bubble up error
|
||||
|
||||
|
||||
// then publish new
|
||||
fill_update_sender
|
||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||
|
@ -454,8 +558,8 @@ fn publish_changes_serum(
|
|||
write_version,
|
||||
event: fill.clone(),
|
||||
status: FillUpdateStatus::New,
|
||||
market: mkt_pk_string.clone(),
|
||||
queue: evq_pk_string.clone(),
|
||||
market_key: mkt_pk_string.clone(),
|
||||
market_name: mkt.1.name.clone(),
|
||||
}))
|
||||
.unwrap(); // TODO: use anyhow to bubble up error
|
||||
}
|
||||
|
@ -478,8 +582,8 @@ fn publish_changes_serum(
|
|||
write_version,
|
||||
event: fill.clone(),
|
||||
status: FillUpdateStatus::New,
|
||||
market: mkt_pk_string.clone(),
|
||||
queue: evq_pk_string.clone(),
|
||||
market_key: mkt_pk_string.clone(),
|
||||
market_name: mkt.1.name.clone(),
|
||||
}))
|
||||
.unwrap(); // TODO: use anyhow to bubble up error
|
||||
checkpoint.push(fill);
|
||||
|
@ -503,15 +607,15 @@ fn publish_changes_serum(
|
|||
|
||||
match old_event_view {
|
||||
SpotEvent::Fill { .. } => {
|
||||
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
|
||||
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1);
|
||||
fill_update_sender
|
||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||
slot,
|
||||
event: old_fill,
|
||||
write_version,
|
||||
status: FillUpdateStatus::Revoke,
|
||||
market: mkt_pk_string.clone(),
|
||||
queue: evq_pk_string.clone(),
|
||||
market_key: mkt_pk_string.clone(),
|
||||
market_name: mkt.1.name.clone(),
|
||||
}))
|
||||
.unwrap(); // TODO: use anyhow to bubble up error
|
||||
}
|
||||
|
@ -520,21 +624,19 @@ fn publish_changes_serum(
|
|||
}
|
||||
|
||||
fill_update_sender
|
||||
.try_send(FillEventFilterMessage::Checkpoint(
|
||||
FillCheckpoint {
|
||||
slot,
|
||||
write_version,
|
||||
events: checkpoint,
|
||||
market: mkt_pk_string,
|
||||
queue: evq_pk_string,
|
||||
},
|
||||
))
|
||||
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
|
||||
slot,
|
||||
write_version,
|
||||
events: checkpoint,
|
||||
market: mkt_pk_string,
|
||||
queue: evq_pk_string,
|
||||
}))
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn init(
|
||||
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
|
||||
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
|
||||
perp_market_configs: Vec<(Pubkey, MarketConfig)>,
|
||||
spot_market_configs: Vec<(Pubkey, MarketConfig)>,
|
||||
metrics_sender: Metrics,
|
||||
) -> anyhow::Result<(
|
||||
async_channel::Sender<AccountWrite>,
|
||||
|
@ -549,8 +651,12 @@ pub async fn init(
|
|||
metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter);
|
||||
let mut metric_events_change =
|
||||
metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter);
|
||||
let mut metric_events_change_serum =
|
||||
metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter);
|
||||
let mut metrics_events_drop =
|
||||
metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter);
|
||||
let mut metrics_events_drop_serum =
|
||||
metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter);
|
||||
|
||||
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
||||
let (account_write_queue_sender, account_write_queue_receiver) =
|
||||
|
@ -572,18 +678,24 @@ pub async fn init(
|
|||
let mut seq_num_cache = HashMap::new();
|
||||
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
|
||||
|
||||
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
|
||||
let relevant_pubkeys = all_queue_pks
|
||||
let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat();
|
||||
let perp_queue_pks: Vec<Pubkey> = perp_market_configs
|
||||
.iter()
|
||||
.map(|m| m.1)
|
||||
.collect::<HashSet<Pubkey>>();
|
||||
.map(|x| x.1.event_queue)
|
||||
.collect();
|
||||
let spot_queue_pks: Vec<Pubkey> = spot_market_configs
|
||||
.iter()
|
||||
.map(|x| x.1.event_queue)
|
||||
.collect();
|
||||
let all_queue_pks: HashSet<Pubkey> =
|
||||
HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat());
|
||||
|
||||
// update handling thread, reads both sloths and account updates
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Ok(account_write) = account_write_queue_receiver_c.recv() => {
|
||||
if !relevant_pubkeys.contains(&account_write.pubkey) {
|
||||
if !all_queue_pks.contains(&account_write.pubkey) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -611,21 +723,38 @@ pub async fn init(
|
|||
});
|
||||
|
||||
}
|
||||
Err(e) = slot_queue_receiver.recv() => {
|
||||
warn!("slot update channel err {:?}", e);
|
||||
}
|
||||
Err(e) = account_write_queue_receiver_c.recv() => {
|
||||
warn!("write update channel err {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
for mkt in all_queue_pks.iter() {
|
||||
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
|
||||
let mkt_pk = mkt.1;
|
||||
for mkt in all_market_configs.iter() {
|
||||
let evq_pk = mkt.1.event_queue;
|
||||
let evq_pk_string = evq_pk.to_string();
|
||||
let last_evq_version = last_evq_versions
|
||||
.get(&mkt.1.event_queue.to_string())
|
||||
.unwrap_or(&(0, 0));
|
||||
|
||||
match chain_cache.account(&mkt_pk) {
|
||||
match chain_cache.account(&evq_pk) {
|
||||
Ok(account_info) => {
|
||||
// only process if the account state changed
|
||||
let evq_version = (account_info.slot, account_info.write_version);
|
||||
let evq_pk_string = mkt.1.to_string();
|
||||
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
|
||||
if evq_version == *last_evq_version {
|
||||
continue;
|
||||
}
|
||||
if evq_version.0 < last_evq_version.0 {
|
||||
debug!("evq version slot was old");
|
||||
continue;
|
||||
}
|
||||
if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1
|
||||
{
|
||||
info!("evq version slot was same and write version was old");
|
||||
continue;
|
||||
}
|
||||
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
|
||||
|
||||
let account = &account_info.account;
|
||||
|
@ -633,17 +762,16 @@ pub async fn init(
|
|||
if is_perp {
|
||||
let event_queue =
|
||||
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
|
||||
trace!(
|
||||
"evq {} seq_num {}",
|
||||
evq_pk_string,
|
||||
event_queue.header.seq_num
|
||||
info!(
|
||||
"evq {} seq_num {} version {:?}",
|
||||
evq_pk_string, event_queue.header.seq_num, evq_version,
|
||||
);
|
||||
match seq_num_cache.get(&evq_pk_string) {
|
||||
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
|
||||
Some(old_events) => publish_changes_perp(
|
||||
account_info.slot,
|
||||
account_info.write_version,
|
||||
mkt,
|
||||
&mkt,
|
||||
&event_queue.header,
|
||||
&event_queue.buf,
|
||||
*old_seq_num,
|
||||
|
@ -690,14 +818,17 @@ pub async fn init(
|
|||
old_events,
|
||||
&fill_update_sender,
|
||||
&mut metric_events_new_serum,
|
||||
&mut metric_events_change,
|
||||
&mut metrics_events_drop,
|
||||
&mut metric_events_change_serum,
|
||||
&mut metrics_events_drop_serum,
|
||||
),
|
||||
_ => {
|
||||
info!("serum_events_cache could not find {}", evq_pk_string)
|
||||
debug!(
|
||||
"serum_events_cache could not find {}",
|
||||
evq_pk_string
|
||||
)
|
||||
}
|
||||
},
|
||||
_ => info!("seq_num_cache could not find {}", evq_pk_string),
|
||||
_ => debug!("seq_num_cache could not find {}", evq_pk_string),
|
||||
}
|
||||
|
||||
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
|
||||
|
@ -705,7 +836,7 @@ pub async fn init(
|
|||
.insert(evq_pk_string.clone(), events.clone().to_vec());
|
||||
}
|
||||
}
|
||||
Err(_) => info!("chain_cache could not find {}", mkt.1),
|
||||
Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,237 @@
|
|||
use chrono::{TimeZone, Utc};
|
||||
use log::*;
|
||||
use native_tls::{Certificate, Identity, TlsConnector};
|
||||
use postgres_native_tls::MakeTlsConnector;
|
||||
use postgres_query::Caching;
|
||||
use std::{env, fs, time::Duration};
|
||||
use tokio_postgres::Client;
|
||||
|
||||
use crate::{fill_event_filter::FillUpdate, metrics::*, PostgresConfig};
|
||||
|
||||
async fn postgres_connection(
|
||||
config: &PostgresConfig,
|
||||
metric_retries: MetricU64,
|
||||
metric_live: MetricU64,
|
||||
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
|
||||
let (tx, rx) = async_channel::unbounded();
|
||||
|
||||
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
|
||||
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
|
||||
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
|
||||
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
|
||||
info!("making tls config");
|
||||
let tls = match &config.tls {
|
||||
Some(tls) => {
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
let ca_cert = match &tls.ca_cert_path.chars().next().unwrap() {
|
||||
'$' => general_purpose::STANDARD
|
||||
.decode(
|
||||
env::var(&tls.ca_cert_path[1..])
|
||||
.expect("reading client cert from env")
|
||||
.into_bytes(),
|
||||
)
|
||||
.expect("decoding client cert"),
|
||||
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
|
||||
};
|
||||
let client_key = match &tls.client_key_path.chars().next().unwrap() {
|
||||
'$' => general_purpose::STANDARD
|
||||
.decode(
|
||||
env::var(&tls.client_key_path[1..])
|
||||
.expect("reading client key from env")
|
||||
.into_bytes(),
|
||||
)
|
||||
.expect("decoding client key"),
|
||||
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
|
||||
};
|
||||
MakeTlsConnector::new(
|
||||
TlsConnector::builder()
|
||||
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
|
||||
.identity(Identity::from_pkcs12(&client_key, "pass")?)
|
||||
.danger_accept_invalid_certs(config.allow_invalid_certs)
|
||||
.build()?,
|
||||
)
|
||||
}
|
||||
None => MakeTlsConnector::new(
|
||||
TlsConnector::builder()
|
||||
.danger_accept_invalid_certs(config.allow_invalid_certs)
|
||||
.build()?,
|
||||
),
|
||||
};
|
||||
|
||||
let config = config.clone();
|
||||
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?);
|
||||
let mut metric_retries = metric_retries;
|
||||
let mut metric_live = metric_live;
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (client, connection) = match initial.take() {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
let result =
|
||||
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
|
||||
match result {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
warn!("could not connect to postgres: {:?}", err);
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
config.retry_connection_sleep_secs,
|
||||
))
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tx.send(Some(client)).await.expect("send success");
|
||||
metric_live.increment();
|
||||
|
||||
let result = connection.await;
|
||||
|
||||
metric_retries.increment();
|
||||
metric_live.decrement();
|
||||
|
||||
tx.send(None).await.expect("send success");
|
||||
warn!("postgres connection error: {:?}", result);
|
||||
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
async fn update_postgres_client<'a>(
|
||||
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
|
||||
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
|
||||
config: &PostgresConfig,
|
||||
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
|
||||
// get the most recent client, waiting if there's a disconnect
|
||||
while !rx.is_empty() || client.is_none() {
|
||||
tokio::select! {
|
||||
client_raw_opt = rx.recv() => {
|
||||
*client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new);
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
|
||||
error!("waited too long for new postgres client");
|
||||
std::process::exit(1);
|
||||
},
|
||||
}
|
||||
}
|
||||
client.as_ref().expect("must contain value")
|
||||
}
|
||||
|
||||
async fn process_update(client: &Caching<Client>, update: &FillUpdate) -> anyhow::Result<()> {
|
||||
let market = &update.market_key;
|
||||
let seq_num = update.event.seq_num as i64;
|
||||
let fill_timestamp = Utc.timestamp_opt(update.event.timestamp as i64, 0).unwrap();
|
||||
let price = update.event.price as f64;
|
||||
let quantity = update.event.quantity as f64;
|
||||
let slot = update.slot as i64;
|
||||
let write_version = update.write_version as i64;
|
||||
|
||||
let query = postgres_query::query!(
|
||||
"INSERT INTO transactions_v4.perp_fills_feed_events
|
||||
(market, seq_num, fill_timestamp, price,
|
||||
quantity, slot, write_version)
|
||||
VALUES
|
||||
($market, $seq_num, $fill_timestamp, $price,
|
||||
$quantity, $slot, $write_version)
|
||||
ON CONFLICT (market, seq_num) DO NOTHING",
|
||||
market,
|
||||
seq_num,
|
||||
fill_timestamp,
|
||||
price,
|
||||
quantity,
|
||||
slot,
|
||||
write_version,
|
||||
);
|
||||
let _ = query.execute(&client).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn init(
|
||||
config: &PostgresConfig,
|
||||
metrics_sender: Metrics,
|
||||
) -> anyhow::Result<async_channel::Sender<FillUpdate>> {
|
||||
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
||||
let (fill_update_queue_sender, fill_update_queue_receiver) =
|
||||
async_channel::bounded::<FillUpdate>(config.max_queue_size);
|
||||
|
||||
let metric_con_retries = metrics_sender.register_u64(
|
||||
"fills_postgres_connection_retries".into(),
|
||||
MetricType::Counter,
|
||||
);
|
||||
let metric_con_live =
|
||||
metrics_sender.register_u64("fills_postgres_connections_alive".into(), MetricType::Gauge);
|
||||
|
||||
// postgres fill update sending worker threads
|
||||
for _ in 0..config.connection_count {
|
||||
let postgres_account_writes =
|
||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
||||
.await?;
|
||||
let fill_update_queue_receiver_c = fill_update_queue_receiver.clone();
|
||||
let config = config.clone();
|
||||
let mut metric_retries =
|
||||
metrics_sender.register_u64("fills_postgres_retries".into(), MetricType::Counter);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut client_opt = None;
|
||||
loop {
|
||||
// Retrieve up to batch_size updates
|
||||
let mut batch = Vec::new();
|
||||
batch.push(
|
||||
fill_update_queue_receiver_c
|
||||
.recv()
|
||||
.await
|
||||
.expect("sender must stay alive"),
|
||||
);
|
||||
while batch.len() < config.max_batch_size {
|
||||
match fill_update_queue_receiver_c.try_recv() {
|
||||
Ok(update) => batch.push(update),
|
||||
Err(async_channel::TryRecvError::Empty) => break,
|
||||
Err(async_channel::TryRecvError::Closed) => {
|
||||
panic!("sender must stay alive")
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
info!(
|
||||
"updates, batch {}, channel size {}",
|
||||
batch.len(),
|
||||
fill_update_queue_receiver_c.len(),
|
||||
);
|
||||
|
||||
let mut error_count = 0;
|
||||
loop {
|
||||
let client =
|
||||
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
|
||||
.await;
|
||||
let mut results = futures::future::join_all(
|
||||
batch.iter().map(|update| process_update(client, update)),
|
||||
)
|
||||
.await;
|
||||
let mut iter = results.iter();
|
||||
batch.retain(|_| iter.next().unwrap().is_err());
|
||||
if batch.len() > 0 {
|
||||
metric_retries.add(batch.len() as u64);
|
||||
error_count += 1;
|
||||
if error_count - 1 < config.retry_query_max_count {
|
||||
results.retain(|r| r.is_err());
|
||||
warn!("failed to process fill update, retrying: {:?}", results);
|
||||
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
|
||||
.await;
|
||||
continue;
|
||||
} else {
|
||||
error!("failed to process account write, exiting");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(fill_update_queue_sender)
|
||||
}
|
|
@ -1,21 +1,19 @@
|
|||
pub mod chain_data;
|
||||
pub mod fill_event_filter;
|
||||
pub mod orderbook_filter;
|
||||
pub mod fill_event_postgres_target;
|
||||
pub mod grpc_plugin_source;
|
||||
pub mod memory_target;
|
||||
pub mod metrics;
|
||||
pub mod postgres_target;
|
||||
pub mod orderbook_filter;
|
||||
pub mod postgres_types_numeric;
|
||||
pub mod websocket_source;
|
||||
|
||||
pub use chain_data::SlotStatus;
|
||||
use serde::{Serialize, Serializer, ser::SerializeStruct};
|
||||
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||
|
||||
use {
|
||||
async_trait::async_trait,
|
||||
serde_derive::Deserialize,
|
||||
solana_sdk::{account::Account, pubkey::Pubkey},
|
||||
std::sync::Arc,
|
||||
};
|
||||
|
||||
trait AnyhowWrap {
|
||||
|
@ -69,14 +67,12 @@ pub struct SlotUpdate {
|
|||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct PostgresConfig {
|
||||
pub connection_string: String,
|
||||
/// Number of parallel postgres connections used for account write insertions
|
||||
pub account_write_connection_count: u64,
|
||||
/// Maximum batch size for account write inserts over one connection
|
||||
pub account_write_max_batch_size: usize,
|
||||
/// Max size of account write queues
|
||||
pub account_write_max_queue_size: usize,
|
||||
/// Number of parallel postgres connections used for slot insertions
|
||||
pub slot_update_connection_count: u64,
|
||||
/// Number of parallel postgres connections used for insertions
|
||||
pub connection_count: u64,
|
||||
/// Maximum batch size for inserts over one connection
|
||||
pub max_batch_size: usize,
|
||||
/// Max size of queues
|
||||
pub max_queue_size: usize,
|
||||
/// Number of queries retries before fatal error
|
||||
pub retry_query_max_count: u64,
|
||||
/// Seconds to sleep between query retries
|
||||
|
@ -87,12 +83,15 @@ pub struct PostgresConfig {
|
|||
pub fatal_connection_timeout_secs: u64,
|
||||
/// Allow invalid TLS certificates, passed to native_tls danger_accept_invalid_certs
|
||||
pub allow_invalid_certs: bool,
|
||||
/// Name key to use in the monitoring table
|
||||
pub monitoring_name: String,
|
||||
/// Time between updates to the monitoring table
|
||||
pub monitoring_update_interval_secs: u64,
|
||||
/// Time between cleanup jobs (0 to disable)
|
||||
pub cleanup_interval_secs: u64,
|
||||
pub tls: Option<PostgresTlsConfig>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
pub struct PostgresTlsConfig {
|
||||
/// CA Cert file or env var
|
||||
pub ca_cert_path: String,
|
||||
/// PKCS12 client cert path
|
||||
pub client_key_path: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
|
@ -164,63 +163,3 @@ pub struct Config {
|
|||
pub source: SourceConfig,
|
||||
pub metrics: MetricsConfig,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait AccountTable: Sync + Send {
|
||||
fn table_name(&self) -> &str;
|
||||
async fn insert_account_write(
|
||||
&self,
|
||||
client: &postgres_query::Caching<tokio_postgres::Client>,
|
||||
account_write: &AccountWrite,
|
||||
) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
pub type AccountTables = Vec<Arc<dyn AccountTable>>;
|
||||
|
||||
pub struct RawAccountTable {}
|
||||
|
||||
pub fn encode_address(addr: &Pubkey) -> String {
|
||||
bs58::encode(&addr.to_bytes()).into_string()
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AccountTable for RawAccountTable {
|
||||
fn table_name(&self) -> &str {
|
||||
"account_write"
|
||||
}
|
||||
|
||||
async fn insert_account_write(
|
||||
&self,
|
||||
client: &postgres_query::Caching<tokio_postgres::Client>,
|
||||
account_write: &AccountWrite,
|
||||
) -> anyhow::Result<()> {
|
||||
let pubkey = encode_address(&account_write.pubkey);
|
||||
let owner = encode_address(&account_write.owner);
|
||||
let slot = account_write.slot as i64;
|
||||
let write_version = account_write.write_version as i64;
|
||||
let lamports = account_write.lamports as i64;
|
||||
let rent_epoch = account_write.rent_epoch as i64;
|
||||
|
||||
// TODO: should update for same write_version to work with websocket input
|
||||
let query = postgres_query::query!(
|
||||
"INSERT INTO account_write
|
||||
(pubkey_id, slot, write_version, is_selected,
|
||||
owner_id, lamports, executable, rent_epoch, data)
|
||||
VALUES
|
||||
(map_pubkey($pubkey), $slot, $write_version, $is_selected,
|
||||
map_pubkey($owner), $lamports, $executable, $rent_epoch, $data)
|
||||
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
|
||||
pubkey,
|
||||
slot,
|
||||
write_version,
|
||||
is_selected = account_write.is_selected,
|
||||
owner,
|
||||
lamports,
|
||||
executable = account_write.executable,
|
||||
rent_epoch,
|
||||
data = account_write.data,
|
||||
);
|
||||
let _ = query.execute(client).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,16 @@
|
|||
use crate::metrics::MetricU64;
|
||||
use crate::{
|
||||
chain_data::{AccountData, ChainData, SlotData},
|
||||
metrics::{MetricType, Metrics},
|
||||
AccountWrite, SlotUpdate,
|
||||
};
|
||||
use anchor_lang::AccountDeserialize;
|
||||
use itertools::Itertools;
|
||||
use log::*;
|
||||
use mango_v4::{
|
||||
serum3_cpi::OrderBookStateHeader,
|
||||
state::{BookSide, OrderTreeType},
|
||||
};
|
||||
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||
use serum_dex::critbit::Slab;
|
||||
use solana_sdk::{
|
||||
|
@ -19,13 +25,6 @@ use std::{
|
|||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use crate::metrics::MetricU64;
|
||||
use anchor_lang::AccountDeserialize;
|
||||
use mango_v4::{
|
||||
serum3_cpi::OrderBookStateHeader,
|
||||
state::{BookSide, OrderTreeType},
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum OrderbookSide {
|
||||
Bid = 0,
|
||||
|
@ -106,6 +105,7 @@ pub struct MarketConfig {
|
|||
pub name: String,
|
||||
pub bids: Pubkey,
|
||||
pub asks: Pubkey,
|
||||
pub event_queue: Pubkey,
|
||||
pub base_decimals: u8,
|
||||
pub quote_decimals: u8,
|
||||
pub base_lot_size: i64,
|
||||
|
@ -113,26 +113,28 @@ pub struct MarketConfig {
|
|||
}
|
||||
|
||||
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
|
||||
let decimals: u32 = 3;
|
||||
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
|
||||
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
|
||||
res
|
||||
(native * base_lot_size) as f64 / 10i64.pow(base_decimals.into()) as f64
|
||||
}
|
||||
|
||||
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
|
||||
let decimals: u32 = 4;
|
||||
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
|
||||
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
|
||||
res
|
||||
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
|
||||
let decimals = base_decimals - quote_decimals;
|
||||
native as f64 / (10i64.pow(decimals.into()) as f64)
|
||||
}
|
||||
|
||||
pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
|
||||
let decimals = base_decimals - quote_decimals;
|
||||
// let res = native as f64
|
||||
// * ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
|
||||
// as f64;
|
||||
let res = native as f64 / (10u64.pow(decimals.into())) as f64;
|
||||
res
|
||||
native as f64 / (10u64.pow(decimals.into())) as f64
|
||||
}
|
||||
|
||||
pub fn spot_price_to_ui(
|
||||
native: i64,
|
||||
native_size: i64,
|
||||
base_decimals: u8,
|
||||
quote_decimals: u8,
|
||||
) -> f64 {
|
||||
// TODO: account for fees
|
||||
((native * 10i64.pow(base_decimals.into())) / (10i64.pow(quote_decimals.into()) * native_size))
|
||||
as f64
|
||||
}
|
||||
|
||||
pub fn price_lots_to_ui_perp(
|
||||
|
@ -143,13 +145,8 @@ pub fn price_lots_to_ui_perp(
|
|||
quote_lot_size: i64,
|
||||
) -> f64 {
|
||||
let decimals = base_decimals - quote_decimals;
|
||||
let res = native as f64
|
||||
* ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
|
||||
as f64;
|
||||
// let res = native as f64
|
||||
// / (10u64.pow(decimals.into()))
|
||||
// as f64;
|
||||
res
|
||||
let multiplier = 10u64.pow(decimals.into()) as f64;
|
||||
native as f64 * ((multiplier * quote_lot_size as f64) / base_lot_size as f64)
|
||||
}
|
||||
|
||||
fn publish_changes(
|
||||
|
@ -332,8 +329,17 @@ pub async fn init(
|
|||
let side_pk_string = side_pk.to_string();
|
||||
|
||||
let write_version = (account_info.slot, account_info.write_version);
|
||||
// todo: should this be <= so we don't overwrite with old data received late?
|
||||
if write_version <= *last_write_version {
|
||||
if write_version == *last_write_version {
|
||||
continue;
|
||||
}
|
||||
if write_version.0 < last_write_version.0 {
|
||||
debug!("evq version slot was old");
|
||||
continue;
|
||||
}
|
||||
if write_version.0 == last_write_version.0
|
||||
&& write_version.1 < last_write_version.1
|
||||
{
|
||||
debug!("evq version slot was same and write version was old");
|
||||
continue;
|
||||
}
|
||||
last_write_versions.insert(side_pk_string.clone(), write_version);
|
||||
|
@ -369,7 +375,7 @@ pub async fn init(
|
|||
.map(|(_, quantity)| quantity)
|
||||
.fold(0, |acc, x| acc + x),
|
||||
mkt.1.base_decimals,
|
||||
mkt.1.base_lot_size,
|
||||
mkt.1.quote_decimals,
|
||||
),
|
||||
]
|
||||
})
|
||||
|
@ -394,7 +400,7 @@ pub async fn init(
|
|||
|
||||
bookside_cache.insert(side_pk_string.clone(), bookside.clone());
|
||||
}
|
||||
Err(_) => info!("chain_cache could not find {}", mkt_pk),
|
||||
Err(_) => debug!("chain_cache could not find {}", mkt_pk),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -417,7 +423,7 @@ pub async fn init(
|
|||
continue;
|
||||
}
|
||||
last_write_versions.insert(side_pk_string.clone(), write_version);
|
||||
info!("W {}", mkt.1.name);
|
||||
debug!("W {}", mkt.1.name);
|
||||
let account = &mut account_info.account.clone();
|
||||
let data = account.data_as_mut_slice();
|
||||
let len = data.len();
|
||||
|
@ -473,7 +479,7 @@ pub async fn init(
|
|||
|
||||
serum_bookside_cache.insert(side_pk_string.clone(), bookside);
|
||||
}
|
||||
Err(_) => info!("chain_cache could not find {}", side_pk),
|
||||
Err(_) => debug!("chain_cache could not find {}", side_pk),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,637 +0,0 @@
|
|||
use anyhow::Context;
|
||||
use log::*;
|
||||
use native_tls::TlsConnector;
|
||||
use postgres_native_tls::MakeTlsConnector;
|
||||
use postgres_query::{query, query_dyn};
|
||||
use std::{collections::HashMap, convert::TryFrom, time::Duration};
|
||||
|
||||
use crate::{metrics::*, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate};
|
||||
|
||||
mod pg {
|
||||
#[derive(Clone, Copy, Debug, PartialEq, postgres_types::ToSql)]
|
||||
pub enum SlotStatus {
|
||||
Rooted,
|
||||
Confirmed,
|
||||
Processed,
|
||||
}
|
||||
|
||||
impl From<super::SlotStatus> for SlotStatus {
|
||||
fn from(status: super::SlotStatus) -> SlotStatus {
|
||||
match status {
|
||||
super::SlotStatus::Rooted => SlotStatus::Rooted,
|
||||
super::SlotStatus::Confirmed => SlotStatus::Confirmed,
|
||||
super::SlotStatus::Processed => SlotStatus::Processed,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn postgres_connection(
|
||||
config: &PostgresConfig,
|
||||
metric_retries: MetricU64,
|
||||
metric_live: MetricU64,
|
||||
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
|
||||
let (tx, rx) = async_channel::unbounded();
|
||||
|
||||
let tls = MakeTlsConnector::new(
|
||||
TlsConnector::builder()
|
||||
.danger_accept_invalid_certs(config.allow_invalid_certs)
|
||||
.build()?,
|
||||
);
|
||||
|
||||
let config = config.clone();
|
||||
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?);
|
||||
let mut metric_retries = metric_retries;
|
||||
let mut metric_live = metric_live;
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (client, connection) = match initial.take() {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
let result =
|
||||
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
|
||||
match result {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
warn!("could not connect to postgres: {:?}", err);
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
config.retry_connection_sleep_secs,
|
||||
))
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tx.send(Some(client)).await.expect("send success");
|
||||
metric_live.increment();
|
||||
|
||||
let result = connection.await;
|
||||
|
||||
metric_retries.increment();
|
||||
metric_live.decrement();
|
||||
|
||||
tx.send(None).await.expect("send success");
|
||||
warn!("postgres connection error: {:?}", result);
|
||||
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
async fn update_postgres_client<'a>(
|
||||
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
|
||||
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
|
||||
config: &PostgresConfig,
|
||||
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
|
||||
// get the most recent client, waiting if there's a disconnect
|
||||
while !rx.is_empty() || client.is_none() {
|
||||
tokio::select! {
|
||||
client_raw_opt = rx.recv() => {
|
||||
*client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new);
|
||||
},
|
||||
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
|
||||
error!("waited too long for new postgres client");
|
||||
std::process::exit(1);
|
||||
},
|
||||
}
|
||||
}
|
||||
client.as_ref().expect("must contain value")
|
||||
}
|
||||
|
||||
async fn process_account_write(
|
||||
client: &postgres_query::Caching<tokio_postgres::Client>,
|
||||
write: &AccountWrite,
|
||||
account_tables: &AccountTables,
|
||||
) -> anyhow::Result<()> {
|
||||
futures::future::try_join_all(
|
||||
account_tables
|
||||
.iter()
|
||||
.map(|table| table.insert_account_write(client, write)),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct Slots {
|
||||
// non-rooted only
|
||||
slots: HashMap<u64, SlotUpdate>,
|
||||
newest_processed_slot: Option<u64>,
|
||||
newest_rooted_slot: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct SlotPreprocessing {
|
||||
discard_duplicate: bool,
|
||||
discard_old: bool,
|
||||
new_processed_head: bool,
|
||||
new_rooted_head: bool,
|
||||
parent_update: bool,
|
||||
}
|
||||
|
||||
impl Slots {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
slots: HashMap::new(),
|
||||
newest_processed_slot: None,
|
||||
newest_rooted_slot: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn add(&mut self, update: &SlotUpdate) -> SlotPreprocessing {
|
||||
let mut result = SlotPreprocessing::default();
|
||||
|
||||
if let Some(previous) = self.slots.get_mut(&update.slot) {
|
||||
if previous.status == update.status && previous.parent == update.parent {
|
||||
result.discard_duplicate = true;
|
||||
}
|
||||
|
||||
previous.status = update.status;
|
||||
if update.parent.is_some() && previous.parent != update.parent {
|
||||
previous.parent = update.parent;
|
||||
result.parent_update = true;
|
||||
}
|
||||
} else if self.newest_rooted_slot.is_none()
|
||||
|| update.slot > self.newest_rooted_slot.unwrap()
|
||||
{
|
||||
self.slots.insert(update.slot, update.clone());
|
||||
} else {
|
||||
result.discard_old = true;
|
||||
}
|
||||
|
||||
if update.status == SlotStatus::Rooted {
|
||||
let old_slots: Vec<u64> = self
|
||||
.slots
|
||||
.keys()
|
||||
.filter(|s| **s <= update.slot)
|
||||
.copied()
|
||||
.collect();
|
||||
for old_slot in old_slots {
|
||||
self.slots.remove(&old_slot);
|
||||
}
|
||||
if self.newest_rooted_slot.is_none() || self.newest_rooted_slot.unwrap() < update.slot {
|
||||
self.newest_rooted_slot = Some(update.slot);
|
||||
result.new_rooted_head = true;
|
||||
}
|
||||
}
|
||||
|
||||
if self.newest_processed_slot.is_none() || self.newest_processed_slot.unwrap() < update.slot
|
||||
{
|
||||
self.newest_processed_slot = Some(update.slot);
|
||||
result.new_processed_head = true;
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
fn make_cleanup_steps(tables: &Vec<String>) -> HashMap<String, String> {
|
||||
let mut steps = HashMap::<String, String>::new();
|
||||
|
||||
// Delete all account writes that came before the newest rooted slot except
|
||||
// for the newest rooted write for each pubkey.
|
||||
// This could be older rooted writes or writes in uncled slots that came
|
||||
// before the newest rooted slot.
|
||||
//
|
||||
// Also delete _all_ writes from before the newest snapshot, because these may
|
||||
// be for deleted accounts where the deletion event was missed. Snapshots
|
||||
// provide a new state for all live accounts, but don't tell us about deleted
|
||||
// accounts.
|
||||
//
|
||||
// The way this is done, by taking the newest snapshot that's at least
|
||||
// min_snapshot_age behind the newest rooted slot is a workaround: we don't know
|
||||
// how long it'll take to insert snapshot data, but assume it'll be done by that
|
||||
// time.
|
||||
let min_snapshot_age = 300;
|
||||
steps.extend(
|
||||
tables
|
||||
.iter()
|
||||
.map(|table_name| {
|
||||
let sql = format!(
|
||||
"WITH
|
||||
newest_rooted AS (
|
||||
SELECT max(slot) AS newest_rooted_slot FROM slot WHERE status = 'Rooted'),
|
||||
newest_snapshot AS (
|
||||
SELECT max(slot) AS newest_snapshot_slot FROM account_write, newest_rooted
|
||||
WHERE write_version = 0 AND slot + {min_snapshot_age} < newest_rooted_slot)
|
||||
DELETE FROM {table} AS data
|
||||
USING
|
||||
newest_rooted,
|
||||
newest_snapshot,
|
||||
(SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version
|
||||
FROM {table}
|
||||
LEFT JOIN slot USING(slot)
|
||||
CROSS JOIN newest_rooted
|
||||
WHERE slot <= newest_rooted_slot AND (status = 'Rooted' OR status is NULL)
|
||||
ORDER BY pubkey_id, slot DESC, write_version DESC
|
||||
) newest_rooted_write
|
||||
WHERE
|
||||
data.pubkey_id = newest_rooted_write.pubkey_id AND (
|
||||
data.slot < newest_snapshot_slot OR (
|
||||
data.slot <= newest_rooted_slot
|
||||
AND (data.slot != newest_rooted_write.slot OR data.write_version != newest_rooted_write.write_version)
|
||||
)
|
||||
)",
|
||||
table = table_name,
|
||||
min_snapshot_age = min_snapshot_age,
|
||||
);
|
||||
(format!("delete old writes in {}", table_name), sql)
|
||||
})
|
||||
.collect::<HashMap<String, String>>(),
|
||||
);
|
||||
|
||||
// Delete information about older slots
|
||||
steps.insert(
|
||||
"delete old slots".into(),
|
||||
"DELETE FROM slot
|
||||
USING (SELECT max(slot) as newest_rooted_slot FROM slot WHERE status = 'Rooted') s
|
||||
WHERE slot + 1000 < newest_rooted_slot"
|
||||
.into(),
|
||||
);
|
||||
|
||||
steps
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct SlotsProcessing {}
|
||||
|
||||
impl SlotsProcessing {
|
||||
fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
|
||||
async fn process(
|
||||
&self,
|
||||
client: &postgres_query::Caching<tokio_postgres::Client>,
|
||||
update: &SlotUpdate,
|
||||
meta: &SlotPreprocessing,
|
||||
) -> anyhow::Result<()> {
|
||||
let slot = update.slot as i64;
|
||||
let status: pg::SlotStatus = update.status.into();
|
||||
if let Some(parent) = update.parent {
|
||||
let parent = parent as i64;
|
||||
let query = query!(
|
||||
"INSERT INTO slot
|
||||
(slot, parent, status, uncle)
|
||||
VALUES
|
||||
($slot, $parent, $status, FALSE)
|
||||
ON CONFLICT (slot) DO UPDATE SET
|
||||
parent=$parent, status=$status",
|
||||
slot,
|
||||
parent,
|
||||
status,
|
||||
);
|
||||
let _ = query.execute(client).await.context("updating slot row")?;
|
||||
} else {
|
||||
let query = query!(
|
||||
"INSERT INTO slot
|
||||
(slot, parent, status, uncle)
|
||||
VALUES
|
||||
($slot, NULL, $status, FALSE)
|
||||
ON CONFLICT (slot) DO UPDATE SET
|
||||
status=$status",
|
||||
slot,
|
||||
status,
|
||||
);
|
||||
let _ = query.execute(client).await.context("updating slot row")?;
|
||||
}
|
||||
|
||||
if meta.new_rooted_head {
|
||||
let slot = update.slot as i64;
|
||||
// Mark preceeding non-uncle slots as rooted
|
||||
let query = query!(
|
||||
"UPDATE slot SET status = 'Rooted'
|
||||
WHERE slot < $newest_final_slot
|
||||
AND (NOT uncle)
|
||||
AND status != 'Rooted'",
|
||||
newest_final_slot = slot
|
||||
);
|
||||
let _ = query
|
||||
.execute(client)
|
||||
.await
|
||||
.context("updating preceding non-rooted slots")?;
|
||||
}
|
||||
|
||||
if meta.new_processed_head || meta.parent_update {
|
||||
// update the uncle column for the chain of slots from the
|
||||
// newest down the the first rooted slot
|
||||
let query = query!(
|
||||
"WITH RECURSIVE
|
||||
liveslots AS (
|
||||
SELECT slot.*, 0 AS depth FROM slot
|
||||
WHERE slot = (SELECT max(slot) FROM slot)
|
||||
UNION ALL
|
||||
SELECT s.*, depth + 1 FROM slot s
|
||||
INNER JOIN liveslots l ON s.slot = l.parent
|
||||
WHERE l.status != 'Rooted' AND depth < 1000
|
||||
),
|
||||
min_slot AS (SELECT min(slot) AS min_slot FROM liveslots)
|
||||
UPDATE slot SET
|
||||
uncle = NOT EXISTS (SELECT 1 FROM liveslots WHERE liveslots.slot = slot.slot)
|
||||
FROM min_slot
|
||||
WHERE slot >= min_slot;"
|
||||
);
|
||||
let _ = query
|
||||
.execute(client)
|
||||
.await
|
||||
.context("recomputing slot uncle status")?;
|
||||
}
|
||||
|
||||
trace!("slot update done {}", update.slot);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn secs_since_epoch() -> u64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_secs()
|
||||
}
|
||||
fn epoch_secs_to_time(secs: u64) -> std::time::SystemTime {
|
||||
std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs)
|
||||
}
|
||||
|
||||
pub async fn init(
|
||||
config: &PostgresConfig,
|
||||
account_tables: AccountTables,
|
||||
metrics_sender: Metrics,
|
||||
) -> anyhow::Result<(
|
||||
async_channel::Sender<AccountWrite>,
|
||||
async_channel::Sender<SlotUpdate>,
|
||||
)> {
|
||||
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
||||
let (account_write_queue_sender, account_write_queue_receiver) =
|
||||
async_channel::bounded::<AccountWrite>(config.account_write_max_queue_size);
|
||||
|
||||
// Slot updates flowing from the outside into the single processing thread. From
|
||||
// there they'll flow into the postgres sending thread.
|
||||
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
|
||||
let (slot_inserter_sender, slot_inserter_receiver) =
|
||||
async_channel::unbounded::<(SlotUpdate, SlotPreprocessing)>();
|
||||
|
||||
let metric_con_retries =
|
||||
metrics_sender.register_u64("postgres_connection_retries".into(), MetricType::Counter);
|
||||
let metric_con_live =
|
||||
metrics_sender.register_u64("postgres_connections_alive".into(), MetricType::Gauge);
|
||||
|
||||
// postgres account write sending worker threads
|
||||
for _ in 0..config.account_write_connection_count {
|
||||
let postgres_account_writes =
|
||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
||||
.await?;
|
||||
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
||||
let account_tables_c = account_tables.clone();
|
||||
let config = config.clone();
|
||||
let mut metric_retries = metrics_sender
|
||||
.register_u64("postgres_account_write_retries".into(), MetricType::Counter);
|
||||
let mut metric_last_write = metrics_sender.register_u64(
|
||||
"postgres_account_write_last_write_timestamp".into(),
|
||||
MetricType::Gauge,
|
||||
);
|
||||
tokio::spawn(async move {
|
||||
let mut client_opt = None;
|
||||
loop {
|
||||
// Retrieve up to batch_size account writes
|
||||
let mut write_batch = Vec::new();
|
||||
write_batch.push(
|
||||
account_write_queue_receiver_c
|
||||
.recv()
|
||||
.await
|
||||
.expect("sender must stay alive"),
|
||||
);
|
||||
while write_batch.len() < config.account_write_max_batch_size {
|
||||
match account_write_queue_receiver_c.try_recv() {
|
||||
Ok(write) => write_batch.push(write),
|
||||
Err(async_channel::TryRecvError::Empty) => break,
|
||||
Err(async_channel::TryRecvError::Closed) => {
|
||||
panic!("sender must stay alive")
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
trace!(
|
||||
"account write, batch {}, channel size {}",
|
||||
write_batch.len(),
|
||||
account_write_queue_receiver_c.len(),
|
||||
);
|
||||
|
||||
let mut error_count = 0;
|
||||
loop {
|
||||
let client =
|
||||
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
|
||||
.await;
|
||||
let mut results = futures::future::join_all(
|
||||
write_batch
|
||||
.iter()
|
||||
.map(|write| process_account_write(client, &write, &account_tables_c)),
|
||||
)
|
||||
.await;
|
||||
let mut iter = results.iter();
|
||||
write_batch.retain(|_| iter.next().unwrap().is_err());
|
||||
if write_batch.len() > 0 {
|
||||
metric_retries.add(write_batch.len() as u64);
|
||||
error_count += 1;
|
||||
if error_count - 1 < config.retry_query_max_count {
|
||||
results.retain(|r| r.is_err());
|
||||
warn!("failed to process account write, retrying: {:?}", results);
|
||||
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
|
||||
.await;
|
||||
continue;
|
||||
} else {
|
||||
error!("failed to process account write, exiting");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
break;
|
||||
}
|
||||
metric_last_write.set_max(secs_since_epoch());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// slot update handling thread
|
||||
let mut metric_slot_queue =
|
||||
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
|
||||
tokio::spawn(async move {
|
||||
let mut slots = Slots::new();
|
||||
|
||||
loop {
|
||||
let update = slot_queue_receiver
|
||||
.recv()
|
||||
.await
|
||||
.expect("sender must stay alive");
|
||||
trace!(
|
||||
"slot update {}, channel size {}",
|
||||
update.slot,
|
||||
slot_queue_receiver.len()
|
||||
);
|
||||
|
||||
// Check if we already know about the slot, or it is outdated
|
||||
let slot_preprocessing = slots.add(&update);
|
||||
if slot_preprocessing.discard_duplicate || slot_preprocessing.discard_old {
|
||||
continue;
|
||||
}
|
||||
|
||||
slot_inserter_sender
|
||||
.send((update, slot_preprocessing))
|
||||
.await
|
||||
.expect("sending must succeed");
|
||||
metric_slot_queue.set(slot_inserter_sender.len() as u64);
|
||||
}
|
||||
});
|
||||
|
||||
// postgres slot update worker threads
|
||||
let slots_processing = SlotsProcessing::new();
|
||||
for _ in 0..config.slot_update_connection_count {
|
||||
let postgres_slot =
|
||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
||||
.await?;
|
||||
let receiver_c = slot_inserter_receiver.clone();
|
||||
let config = config.clone();
|
||||
let mut metric_retries =
|
||||
metrics_sender.register_u64("postgres_slot_update_retries".into(), MetricType::Counter);
|
||||
let mut metric_last_write = metrics_sender.register_u64(
|
||||
"postgres_slot_last_write_timestamp".into(),
|
||||
MetricType::Gauge,
|
||||
);
|
||||
let slots_processing = slots_processing.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut client_opt = None;
|
||||
loop {
|
||||
let (update, preprocessing) =
|
||||
receiver_c.recv().await.expect("sender must stay alive");
|
||||
trace!("slot insertion, slot {}", update.slot);
|
||||
|
||||
let mut error_count = 0;
|
||||
loop {
|
||||
let client =
|
||||
update_postgres_client(&mut client_opt, &postgres_slot, &config).await;
|
||||
if let Err(err) = slots_processing
|
||||
.process(client, &update, &preprocessing)
|
||||
.await
|
||||
{
|
||||
metric_retries.increment();
|
||||
error_count += 1;
|
||||
if error_count - 1 < config.retry_query_max_count {
|
||||
warn!("failed to process slot update, retrying: {:?}", err);
|
||||
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
|
||||
.await;
|
||||
continue;
|
||||
} else {
|
||||
error!("failed to process slot update, exiting");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
break;
|
||||
}
|
||||
metric_last_write.set_max(secs_since_epoch());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// postgres cleanup thread
|
||||
if config.cleanup_interval_secs > 0 {
|
||||
let table_names: Vec<String> = account_tables
|
||||
.iter()
|
||||
.map(|table| table.table_name().to_string())
|
||||
.collect();
|
||||
let cleanup_steps = make_cleanup_steps(&table_names);
|
||||
|
||||
let postgres_con =
|
||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
||||
.await?;
|
||||
let mut metric_last_cleanup = metrics_sender.register_u64(
|
||||
"postgres_cleanup_last_success_timestamp".into(),
|
||||
MetricType::Gauge,
|
||||
);
|
||||
let mut metric_cleanup_errors =
|
||||
metrics_sender.register_u64("postgres_cleanup_errors".into(), MetricType::Counter);
|
||||
let config = config.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut client_opt = None;
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(config.cleanup_interval_secs)).await;
|
||||
let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await;
|
||||
|
||||
let mut all_successful = true;
|
||||
for (name, cleanup_sql) in &cleanup_steps {
|
||||
let query = query_dyn!(&cleanup_sql).unwrap();
|
||||
if let Err(err) = query.execute(client).await {
|
||||
warn!("failed to process cleanup step {}: {:?}", name, err);
|
||||
metric_cleanup_errors.increment();
|
||||
all_successful = false;
|
||||
}
|
||||
}
|
||||
if all_successful {
|
||||
metric_last_cleanup.set_max(secs_since_epoch());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// postgres metrics/monitoring thread
|
||||
{
|
||||
let postgres_con =
|
||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
||||
.await?;
|
||||
let metric_slot_last_write = metrics_sender.register_u64(
|
||||
"postgres_slot_last_write_timestamp".into(),
|
||||
MetricType::Gauge,
|
||||
);
|
||||
let metric_account_write_last_write = metrics_sender.register_u64(
|
||||
"postgres_account_write_last_write_timestamp".into(),
|
||||
MetricType::Gauge,
|
||||
);
|
||||
let metric_account_queue =
|
||||
metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge);
|
||||
let metric_slot_queue =
|
||||
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
|
||||
let config = config.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut client_opt = None;
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_secs(config.monitoring_update_interval_secs))
|
||||
.await;
|
||||
|
||||
let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await;
|
||||
let last_update = std::time::SystemTime::now();
|
||||
let last_slot_write = epoch_secs_to_time(metric_slot_last_write.value());
|
||||
let last_account_write_write =
|
||||
epoch_secs_to_time(metric_account_write_last_write.value());
|
||||
let slot_queue = i64::try_from(metric_slot_queue.value()).unwrap();
|
||||
let account_write_queue = i64::try_from(metric_account_queue.value()).unwrap();
|
||||
let query = query!(
|
||||
"INSERT INTO monitoring
|
||||
(name, last_update, last_slot_write, last_account_write_write, slot_queue, account_write_queue)
|
||||
VALUES
|
||||
($name, $last_update, $last_slot_write, $last_account_write_write, $slot_queue, $account_write_queue)
|
||||
ON CONFLICT (name) DO UPDATE SET
|
||||
last_update=$last_update,
|
||||
last_slot_write=$last_slot_write,
|
||||
last_account_write_write=$last_account_write_write,
|
||||
slot_queue=$slot_queue,
|
||||
account_write_queue=$account_write_queue
|
||||
",
|
||||
name = config.monitoring_name,
|
||||
last_update,
|
||||
last_slot_write,
|
||||
last_account_write_write,
|
||||
slot_queue,
|
||||
account_write_queue,
|
||||
);
|
||||
if let Err(err) = query
|
||||
.execute(client)
|
||||
.await
|
||||
.context("updating monitoring table")
|
||||
{
|
||||
warn!("failed to process monitoring update: {:?}", err);
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok((account_write_queue_sender, slot_queue_sender))
|
||||
}
|
|
@ -1,9 +1,8 @@
|
|||
use anchor_client::{
|
||||
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
|
||||
solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair},
|
||||
Cluster,
|
||||
};
|
||||
use anchor_lang::prelude::Pubkey;
|
||||
use bytemuck::cast_slice;
|
||||
use client::{Client, MangoGroupContext};
|
||||
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
||||
use futures_util::{
|
||||
|
@ -13,7 +12,6 @@ use futures_util::{
|
|||
use log::*;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
convert::identity,
|
||||
fs::File,
|
||||
io::Read,
|
||||
net::SocketAddr,
|
||||
|
@ -30,8 +28,11 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
|||
|
||||
use serde::Deserialize;
|
||||
use solana_geyser_connector_lib::{
|
||||
fill_event_filter::FillEventType,
|
||||
fill_event_postgres_target,
|
||||
metrics::{MetricType, MetricU64},
|
||||
FilterConfig, StatusResponse,
|
||||
orderbook_filter::MarketConfig,
|
||||
FilterConfig, PostgresConfig, PostgresTlsConfig, StatusResponse,
|
||||
};
|
||||
use solana_geyser_connector_lib::{
|
||||
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
|
||||
|
@ -308,44 +309,70 @@ async fn main() -> anyhow::Result<()> {
|
|||
.await?,
|
||||
);
|
||||
|
||||
// todo: reload markets at intervals
|
||||
let perp_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
|
||||
.perp_markets
|
||||
.iter()
|
||||
.map(|(_, context)| {
|
||||
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index)
|
||||
{
|
||||
Some(token) => token.decimals,
|
||||
None => panic!("token not found for market"), // todo: default to 6 for usdc?
|
||||
};
|
||||
(
|
||||
context.address,
|
||||
MarketConfig {
|
||||
name: context.market.name().to_owned(),
|
||||
bids: context.market.bids,
|
||||
asks: context.market.asks,
|
||||
event_queue: context.market.event_queue,
|
||||
base_decimals: context.market.base_decimals,
|
||||
quote_decimals,
|
||||
base_lot_size: context.market.base_lot_size,
|
||||
quote_lot_size: context.market.quote_lot_size,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let spot_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
|
||||
.serum3_markets
|
||||
.iter()
|
||||
.map(|(_, context)| {
|
||||
let base_decimals = match group_context.tokens.get(&context.market.base_token_index) {
|
||||
Some(token) => token.decimals,
|
||||
None => panic!("token not found for market"), // todo: default?
|
||||
};
|
||||
let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) {
|
||||
Some(token) => token.decimals,
|
||||
None => panic!("token not found for market"), // todo: default to 6 for usdc?
|
||||
};
|
||||
(
|
||||
context.market.serum_market_external,
|
||||
MarketConfig {
|
||||
name: context.market.name().to_owned(),
|
||||
bids: context.bids,
|
||||
asks: context.asks,
|
||||
event_queue: context.event_q,
|
||||
base_decimals,
|
||||
quote_decimals,
|
||||
base_lot_size: context.pc_lot_size as i64,
|
||||
quote_lot_size: context.coin_lot_size as i64,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
|
||||
.perp_markets
|
||||
.iter()
|
||||
.map(|(_, context)| (context.address, context.market.event_queue))
|
||||
.collect();
|
||||
|
||||
let serum_market_pks: Vec<Pubkey> = group_context
|
||||
.serum3_markets
|
||||
let spot_queue_pks: Vec<(Pubkey, Pubkey)> = spot_market_configs
|
||||
.iter()
|
||||
.map(|(_, context)| context.market.serum_market_external)
|
||||
.map(|x| (x.0, x.1.event_queue))
|
||||
.collect();
|
||||
|
||||
let serum_market_ais = client
|
||||
.rpc_async()
|
||||
.get_multiple_accounts(serum_market_pks.as_slice())
|
||||
.await?;
|
||||
let serum_market_ais: Vec<&Account> = serum_market_ais
|
||||
.iter()
|
||||
.filter_map(|maybe_ai| match maybe_ai {
|
||||
Some(ai) => Some(ai),
|
||||
None => None,
|
||||
})
|
||||
.collect();
|
||||
|
||||
let serum_queue_pks: Vec<(Pubkey, Pubkey)> = serum_market_ais
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|pair| {
|
||||
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
|
||||
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
|
||||
);
|
||||
(
|
||||
serum_market_pks[pair.0],
|
||||
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let a: Vec<(String, String)> = group_context
|
||||
.serum3_markets
|
||||
.iter()
|
||||
|
@ -368,9 +395,28 @@ async fn main() -> anyhow::Result<()> {
|
|||
.collect();
|
||||
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
|
||||
|
||||
// TODO: read all this from config
|
||||
let pgconf = PostgresConfig {
|
||||
connection_string: "$PG_CONNECTION_STRING".to_owned(),
|
||||
connection_count: 1,
|
||||
max_batch_size: 1,
|
||||
max_queue_size: 50_000,
|
||||
retry_query_max_count: 10,
|
||||
retry_query_sleep_secs: 2,
|
||||
retry_connection_sleep_secs: 10,
|
||||
fatal_connection_timeout_secs: 120,
|
||||
allow_invalid_certs: true,
|
||||
tls: Some(PostgresTlsConfig {
|
||||
ca_cert_path: "$PG_CA_CERT".to_owned(),
|
||||
client_key_path: "$PG_CLIENT_KEY".to_owned(),
|
||||
}),
|
||||
};
|
||||
let postgres_update_sender =
|
||||
fill_event_postgres_target::init(&pgconf, metrics_tx.clone()).await?;
|
||||
|
||||
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
|
||||
perp_queue_pks.clone(),
|
||||
serum_queue_pks.clone(),
|
||||
perp_market_configs.clone(),
|
||||
spot_market_configs.clone(),
|
||||
metrics_tx.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
@ -389,19 +435,36 @@ async fn main() -> anyhow::Result<()> {
|
|||
let message = fill_receiver.recv().await.unwrap();
|
||||
match message {
|
||||
FillEventFilterMessage::Update(update) => {
|
||||
debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type);
|
||||
debug!(
|
||||
"ws update {} {:?} {:?} fill",
|
||||
update.market_name, update.status, update.event.event_type
|
||||
);
|
||||
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
||||
for (addr, peer) in peer_copy.iter_mut() {
|
||||
let json = serde_json::to_string(&update).unwrap();
|
||||
let json = serde_json::to_string(&update.clone()).unwrap();
|
||||
|
||||
// only send updates if the peer is subscribed
|
||||
if peer.subscriptions.contains(&update.market) {
|
||||
if peer.subscriptions.contains(&update.market_key) {
|
||||
let result = peer.sender.send(Message::Text(json)).await;
|
||||
if result.is_err() {
|
||||
error!("ws update {} fill could not reach {}", update.market, addr);
|
||||
error!(
|
||||
"ws update {} fill could not reach {}",
|
||||
update.market_name, addr
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
// send taker fills to db
|
||||
let update_c = update.clone();
|
||||
match update_c.event.event_type {
|
||||
FillEventType::Perp => {
|
||||
if !update_c.event.maker {
|
||||
debug!("{:?}", update_c);
|
||||
postgres_update_sender.send(update_c).await.unwrap();
|
||||
}
|
||||
}
|
||||
_ => warn!("failed to write spot event to db"),
|
||||
}
|
||||
}
|
||||
FillEventFilterMessage::Checkpoint(checkpoint) => {
|
||||
checkpoints_ref_thread
|
||||
|
@ -461,7 +524,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
.collect::<String>()
|
||||
);
|
||||
let use_geyser = true;
|
||||
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
|
||||
let all_queue_pks = [perp_queue_pks.clone()].concat();
|
||||
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect();
|
||||
let filter_config = FilterConfig {
|
||||
program_ids: vec![],
|
||||
|
|
|
@ -309,6 +309,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
name: context.market.name().to_owned(),
|
||||
bids: context.market.bids,
|
||||
asks: context.market.asks,
|
||||
event_queue: context.market.event_queue,
|
||||
base_decimals: context.market.base_decimals,
|
||||
quote_decimals,
|
||||
base_lot_size: context.market.base_lot_size,
|
||||
|
@ -336,6 +337,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
name: context.market.name().to_owned(),
|
||||
bids: context.bids,
|
||||
asks: context.asks,
|
||||
event_queue: context.event_q,
|
||||
base_decimals,
|
||||
quote_decimals,
|
||||
base_lot_size: context.pc_lot_size as i64,
|
||||
|
@ -355,7 +357,12 @@ async fn main() -> anyhow::Result<()> {
|
|||
.collect();
|
||||
|
||||
let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) =
|
||||
orderbook_filter::init(market_configs, serum_market_configs, metrics_tx.clone()).await?;
|
||||
orderbook_filter::init(
|
||||
market_configs.clone(),
|
||||
serum_market_configs.clone(),
|
||||
metrics_tx.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let checkpoints_ref_thread = checkpoints.clone();
|
||||
let peers_ref_thread = peers.clone();
|
||||
|
@ -422,12 +429,14 @@ async fn main() -> anyhow::Result<()> {
|
|||
);
|
||||
let use_geyser = true;
|
||||
if use_geyser {
|
||||
let relevant_pubkeys = [market_configs.clone()]
|
||||
.concat()
|
||||
.iter()
|
||||
.flat_map(|m| [m.1.bids.to_string(), m.1.asks.to_string()])
|
||||
.collect();
|
||||
let filter_config = FilterConfig {
|
||||
program_ids: vec![
|
||||
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
|
||||
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
|
||||
],
|
||||
account_ids: vec![],
|
||||
program_ids: vec![],
|
||||
account_ids: relevant_pubkeys,
|
||||
};
|
||||
grpc_plugin_source::process_events(
|
||||
&config.source,
|
||||
|
|
Loading…
Reference in New Issue