Compare commits
5 Commits
1233cc58f3
...
4b75b88a98
Author | SHA1 | Date |
---|---|---|
Riordan Panayides | 4b75b88a98 | |
Riordan Panayides | ba4aa29f7f | |
Riordan Panayides | 746174ce8e | |
Riordan Panayides | 5c8ebc53b1 | |
Riordan Panayides | 6dfe88ac15 |
|
@ -604,9 +604,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "base64"
|
name = "base64"
|
||||||
version = "0.20.0"
|
version = "0.21.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
|
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "base64ct"
|
name = "base64ct"
|
||||||
|
@ -2393,7 +2393,7 @@ checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"http",
|
"http",
|
||||||
"hyper 0.14.23",
|
"hyper 0.14.23",
|
||||||
"rustls 0.20.7",
|
"rustls 0.20.8",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls 0.23.4",
|
"tokio-rustls 0.23.4",
|
||||||
]
|
]
|
||||||
|
@ -3033,7 +3033,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mango-v4"
|
name = "mango-v4"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/blockworks-foundation/mango-v4?branch=dev#5019864b844ca6633ee7d1356c763d78fc52ad64"
|
source = "git+https://github.com/blockworks-foundation/mango-v4?branch=dev#0ba7ecd5061e666addd90e6d3a4293ec1381f9c0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anchor-lang",
|
"anchor-lang",
|
||||||
"anchor-spl",
|
"anchor-spl",
|
||||||
|
@ -3885,6 +3885,7 @@ checksum = "73d946ec7d256b04dfadc4e6a3292324e6f417124750fc5c0950f981b703a0f1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"array-init",
|
"array-init",
|
||||||
"bytes 1.3.0",
|
"bytes 1.3.0",
|
||||||
|
"chrono",
|
||||||
"fallible-iterator",
|
"fallible-iterator",
|
||||||
"postgres-derive",
|
"postgres-derive",
|
||||||
"postgres-protocol",
|
"postgres-protocol",
|
||||||
|
@ -4215,7 +4216,7 @@ dependencies = [
|
||||||
"fxhash",
|
"fxhash",
|
||||||
"quinn-proto",
|
"quinn-proto",
|
||||||
"quinn-udp",
|
"quinn-udp",
|
||||||
"rustls 0.20.7",
|
"rustls 0.20.8",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
@ -4232,7 +4233,7 @@ dependencies = [
|
||||||
"fxhash",
|
"fxhash",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"ring",
|
"ring",
|
||||||
"rustls 0.20.7",
|
"rustls 0.20.8",
|
||||||
"rustls-native-certs",
|
"rustls-native-certs",
|
||||||
"rustls-pemfile 0.2.1",
|
"rustls-pemfile 0.2.1",
|
||||||
"slab",
|
"slab",
|
||||||
|
@ -4605,8 +4606,8 @@ dependencies = [
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"percent-encoding 2.2.0",
|
"percent-encoding 2.2.0",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"rustls 0.20.7",
|
"rustls 0.20.8",
|
||||||
"rustls-pemfile 1.0.1",
|
"rustls-pemfile 1.0.2",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_urlencoded",
|
"serde_urlencoded",
|
||||||
|
@ -4776,9 +4777,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustls"
|
name = "rustls"
|
||||||
version = "0.20.7"
|
version = "0.20.8"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
|
checksum = "fff78fc74d175294f4e83b28343315ffcfb114b156f0185e9741cb5570f50e2f"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
"ring",
|
"ring",
|
||||||
|
@ -4793,7 +4794,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
|
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"openssl-probe",
|
"openssl-probe",
|
||||||
"rustls-pemfile 1.0.1",
|
"rustls-pemfile 1.0.2",
|
||||||
"schannel",
|
"schannel",
|
||||||
"security-framework",
|
"security-framework",
|
||||||
]
|
]
|
||||||
|
@ -4809,11 +4810,11 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustls-pemfile"
|
name = "rustls-pemfile"
|
||||||
version = "1.0.1"
|
version = "1.0.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
|
checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"base64 0.13.1",
|
"base64 0.21.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -5610,7 +5611,7 @@ dependencies = [
|
||||||
"rand_chacha 0.2.2",
|
"rand_chacha 0.2.2",
|
||||||
"rayon",
|
"rayon",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rustls 0.20.7",
|
"rustls 0.20.8",
|
||||||
"semver 1.0.14",
|
"semver 1.0.14",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
|
@ -5762,10 +5763,11 @@ dependencies = [
|
||||||
"async-channel",
|
"async-channel",
|
||||||
"async-stream 0.2.1",
|
"async-stream 0.2.1",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"base64 0.20.0",
|
"base64 0.21.0",
|
||||||
"bs58 0.4.0",
|
"bs58 0.4.0",
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
"bytes 1.3.0",
|
"bytes 1.3.0",
|
||||||
|
"chrono",
|
||||||
"fixed",
|
"fixed",
|
||||||
"futures 0.3.25",
|
"futures 0.3.25",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
|
@ -5781,6 +5783,8 @@ dependencies = [
|
||||||
"postgres_query",
|
"postgres_query",
|
||||||
"prost 0.9.0",
|
"prost 0.9.0",
|
||||||
"rand 0.7.3",
|
"rand 0.7.3",
|
||||||
|
"rustls 0.20.8",
|
||||||
|
"rustls-pemfile 1.0.2",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -5791,6 +5795,7 @@ dependencies = [
|
||||||
"solana-sdk",
|
"solana-sdk",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-postgres",
|
"tokio-postgres",
|
||||||
|
"tokio-postgres-rustls",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tonic 0.6.2",
|
"tonic 0.6.2",
|
||||||
"tonic-build 0.6.2",
|
"tonic-build 0.6.2",
|
||||||
|
@ -6413,7 +6418,7 @@ dependencies = [
|
||||||
"quinn",
|
"quinn",
|
||||||
"rand 0.7.3",
|
"rand 0.7.3",
|
||||||
"rcgen",
|
"rcgen",
|
||||||
"rustls 0.20.7",
|
"rustls 0.20.8",
|
||||||
"solana-metrics",
|
"solana-metrics",
|
||||||
"solana-perf",
|
"solana-perf",
|
||||||
"solana-sdk",
|
"solana-sdk",
|
||||||
|
@ -7098,6 +7103,20 @@ dependencies = [
|
||||||
"tokio-util 0.7.2",
|
"tokio-util 0.7.2",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-postgres-rustls"
|
||||||
|
version = "0.9.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "606f2b73660439474394432239c82249c0d45eb5f23d91f401be1e33590444a7"
|
||||||
|
dependencies = [
|
||||||
|
"futures 0.3.25",
|
||||||
|
"ring",
|
||||||
|
"rustls 0.20.8",
|
||||||
|
"tokio",
|
||||||
|
"tokio-postgres",
|
||||||
|
"tokio-rustls 0.23.4",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-reactor"
|
name = "tokio-reactor"
|
||||||
version = "0.1.12"
|
version = "0.1.12"
|
||||||
|
@ -7134,7 +7153,7 @@ version = "0.23.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
|
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"rustls 0.20.7",
|
"rustls 0.20.8",
|
||||||
"tokio",
|
"tokio",
|
||||||
"webpki 0.22.0",
|
"webpki 0.22.0",
|
||||||
]
|
]
|
||||||
|
@ -7193,7 +7212,7 @@ checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
"rustls 0.20.7",
|
"rustls 0.20.8",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls 0.23.4",
|
"tokio-rustls 0.23.4",
|
||||||
"tungstenite",
|
"tungstenite",
|
||||||
|
@ -7294,7 +7313,7 @@ dependencies = [
|
||||||
"pin-project",
|
"pin-project",
|
||||||
"prost 0.11.3",
|
"prost 0.11.3",
|
||||||
"prost-derive 0.11.2",
|
"prost-derive 0.11.2",
|
||||||
"rustls-pemfile 1.0.1",
|
"rustls-pemfile 1.0.2",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-rustls 0.23.4",
|
"tokio-rustls 0.23.4",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
|
@ -7456,7 +7475,7 @@ dependencies = [
|
||||||
"httparse",
|
"httparse",
|
||||||
"log 0.4.17",
|
"log 0.4.17",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rustls 0.20.7",
|
"rustls 0.20.8",
|
||||||
"sha-1 0.10.1",
|
"sha-1 0.10.1",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"url 2.3.1",
|
"url 2.3.1",
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
app = "mango-fills"
|
||||||
|
kill_signal = "SIGINT"
|
||||||
|
kill_timeout = 5
|
||||||
|
|
||||||
|
[build]
|
||||||
|
dockerfile = "../Dockerfile"
|
||||||
|
|
||||||
|
[experimental]
|
||||||
|
cmd = ["service-mango-fills", "fills-config.toml"]
|
||||||
|
|
||||||
|
[[services]]
|
||||||
|
internal_port = 8080
|
||||||
|
processes = ["app"]
|
||||||
|
protocol = "tcp"
|
||||||
|
|
||||||
|
[services.concurrency]
|
||||||
|
hard_limit = 1024
|
||||||
|
soft_limit = 1024
|
||||||
|
type = "connections"
|
||||||
|
|
||||||
|
[metrics]
|
||||||
|
path = "/metrics"
|
||||||
|
port = 9091
|
|
@ -0,0 +1,23 @@
|
||||||
|
app = "mango-orderbook"
|
||||||
|
kill_signal = "SIGINT"
|
||||||
|
kill_timeout = 5
|
||||||
|
|
||||||
|
[build]
|
||||||
|
dockerfile = "../Dockerfile"
|
||||||
|
|
||||||
|
[experimental]
|
||||||
|
cmd = ["service-mango-orderbook", "orderbook-config.toml"]
|
||||||
|
|
||||||
|
[[services]]
|
||||||
|
internal_port = 8082
|
||||||
|
processes = ["app"]
|
||||||
|
protocol = "tcp"
|
||||||
|
|
||||||
|
[services.concurrency]
|
||||||
|
hard_limit = 1024
|
||||||
|
soft_limit = 1024
|
||||||
|
type = "connections"
|
||||||
|
|
||||||
|
[metrics]
|
||||||
|
path = "/metrics"
|
||||||
|
port = 9091
|
|
@ -23,10 +23,13 @@ fixed = { version = "*", features = ["serde"] }
|
||||||
|
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
tokio-stream = "0.1"
|
tokio-stream = "0.1"
|
||||||
tokio-postgres = "0.7"
|
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
|
||||||
postgres-types = { version = "0.2", features = ["array-impls", "derive"] }
|
tokio-postgres-rustls = "0.9.0"
|
||||||
|
postgres-types = { version = "0.2", features = ["array-impls", "derive", "with-chrono-0_4"] }
|
||||||
postgres-native-tls = "0.5"
|
postgres-native-tls = "0.5"
|
||||||
native-tls = "0.2"
|
native-tls = "0.2"
|
||||||
|
rustls = "0.20.8"
|
||||||
|
rustls-pemfile = "1.0.2"
|
||||||
|
|
||||||
# postgres_query hasn't updated its crate in a while
|
# postgres_query hasn't updated its crate in a while
|
||||||
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
|
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
|
||||||
|
@ -45,6 +48,7 @@ rand = "0.7"
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
bytes = "1.0"
|
bytes = "1.0"
|
||||||
itertools = "0.10.5"
|
itertools = "0.10.5"
|
||||||
|
chrono = "0.4.23"
|
||||||
|
|
||||||
futures = "0.3.17"
|
futures = "0.3.17"
|
||||||
futures-core = "0.3"
|
futures-core = "0.3"
|
||||||
|
|
|
@ -55,18 +55,14 @@ impl ChainData {
|
||||||
newest_processed_slot: 0,
|
newest_processed_slot: 0,
|
||||||
account_versions_stored: 0,
|
account_versions_stored: 0,
|
||||||
account_bytes_stored: 0,
|
account_bytes_stored: 0,
|
||||||
metric_accounts_stored: metrics_sender.register_u64(
|
metric_accounts_stored: metrics_sender
|
||||||
"chaindata_accounts_stored".into(),
|
.register_u64("chaindata_accounts_stored".into(), MetricType::Gauge),
|
||||||
MetricType::Gauge,
|
|
||||||
),
|
|
||||||
metric_account_versions_stored: metrics_sender.register_u64(
|
metric_account_versions_stored: metrics_sender.register_u64(
|
||||||
"chaindata_account_versions_stored".into(),
|
"chaindata_account_versions_stored".into(),
|
||||||
MetricType::Gauge,
|
MetricType::Gauge,
|
||||||
),
|
),
|
||||||
metric_account_bytes_stored: metrics_sender.register_u64(
|
metric_account_bytes_stored: metrics_sender
|
||||||
"chaindata_account_bytes_stored".into(),
|
.register_u64("chaindata_account_bytes_stored".into(), MetricType::Gauge),
|
||||||
MetricType::Gauge,
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
chain_data::{AccountData, ChainData, SlotData},
|
chain_data::{AccountData, ChainData, SlotData},
|
||||||
metrics::{MetricType, Metrics},
|
metrics::{MetricType, Metrics},
|
||||||
AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide,
|
orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide},
|
||||||
|
AccountWrite, SlotUpdate,
|
||||||
};
|
};
|
||||||
use bytemuck::{Pod, Zeroable, cast_slice};
|
use bytemuck::{cast_slice, Pod, Zeroable};
|
||||||
|
use chrono::{TimeZone, Utc};
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||||
use serum_dex::state::EventView as SpotEvent;
|
use serum_dex::state::EventView as SpotEvent;
|
||||||
|
@ -15,7 +17,9 @@ use solana_sdk::{
|
||||||
use std::{
|
use std::{
|
||||||
borrow::BorrowMut,
|
borrow::BorrowMut,
|
||||||
cmp::max,
|
cmp::max,
|
||||||
collections::{HashMap, HashSet}, convert::identity, time::SystemTime,
|
collections::{HashMap, HashSet},
|
||||||
|
convert::identity,
|
||||||
|
time::SystemTime,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::metrics::MetricU64;
|
use crate::metrics::MetricU64;
|
||||||
|
@ -25,18 +29,46 @@ use mango_v4::state::{
|
||||||
MAX_NUM_EVENTS,
|
MAX_NUM_EVENTS,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Serialize)]
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub enum FillUpdateStatus {
|
pub enum FillUpdateStatus {
|
||||||
New,
|
New,
|
||||||
Revoke,
|
Revoke,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Copy, Debug, Serialize)]
|
impl Serialize for FillUpdateStatus {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
match *self {
|
||||||
|
FillUpdateStatus::New => {
|
||||||
|
serializer.serialize_unit_variant("FillUpdateStatus", 0, "new")
|
||||||
|
}
|
||||||
|
FillUpdateStatus::Revoke => {
|
||||||
|
serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
pub enum FillEventType {
|
pub enum FillEventType {
|
||||||
Spot,
|
Spot,
|
||||||
Perp,
|
Perp,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Serialize for FillEventType {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
match *self {
|
||||||
|
FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"),
|
||||||
|
FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct FillEvent {
|
pub struct FillEvent {
|
||||||
pub event_type: FillEventType,
|
pub event_type: FillEventType,
|
||||||
|
@ -48,8 +80,8 @@ pub struct FillEvent {
|
||||||
pub order_id: u128,
|
pub order_id: u128,
|
||||||
pub client_order_id: u64,
|
pub client_order_id: u64,
|
||||||
pub fee: f32,
|
pub fee: f32,
|
||||||
pub price: i64,
|
pub price: f64,
|
||||||
pub quantity: i64,
|
pub quantity: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Serialize for FillEvent {
|
impl Serialize for FillEvent {
|
||||||
|
@ -61,7 +93,12 @@ impl Serialize for FillEvent {
|
||||||
state.serialize_field("eventType", &self.event_type)?;
|
state.serialize_field("eventType", &self.event_type)?;
|
||||||
state.serialize_field("maker", &self.maker)?;
|
state.serialize_field("maker", &self.maker)?;
|
||||||
state.serialize_field("side", &self.side)?;
|
state.serialize_field("side", &self.side)?;
|
||||||
state.serialize_field("timestamp", &self.timestamp)?;
|
state.serialize_field(
|
||||||
|
"timestamp",
|
||||||
|
&Utc.timestamp_opt(self.timestamp as i64, 0)
|
||||||
|
.unwrap()
|
||||||
|
.to_rfc3339(),
|
||||||
|
)?;
|
||||||
state.serialize_field("seqNum", &self.seq_num)?;
|
state.serialize_field("seqNum", &self.seq_num)?;
|
||||||
state.serialize_field("owner", &self.owner)?;
|
state.serialize_field("owner", &self.owner)?;
|
||||||
state.serialize_field("orderId", &self.order_id)?;
|
state.serialize_field("orderId", &self.order_id)?;
|
||||||
|
@ -74,7 +111,7 @@ impl Serialize for FillEvent {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FillEvent {
|
impl FillEvent {
|
||||||
pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] {
|
pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> [Self; 2] {
|
||||||
let taker_side = match event.taker_side() {
|
let taker_side = match event.taker_side() {
|
||||||
Side::Ask => OrderbookSide::Ask,
|
Side::Ask => OrderbookSide::Ask,
|
||||||
Side::Bid => OrderbookSide::Bid,
|
Side::Bid => OrderbookSide::Bid,
|
||||||
|
@ -83,7 +120,17 @@ impl FillEvent {
|
||||||
Side::Ask => OrderbookSide::Bid,
|
Side::Ask => OrderbookSide::Bid,
|
||||||
Side::Bid => OrderbookSide::Ask,
|
Side::Bid => OrderbookSide::Ask,
|
||||||
};
|
};
|
||||||
[FillEvent {
|
let price = price_lots_to_ui_perp(
|
||||||
|
event.price,
|
||||||
|
config.base_decimals,
|
||||||
|
config.quote_decimals,
|
||||||
|
config.base_lot_size,
|
||||||
|
config.quote_lot_size,
|
||||||
|
);
|
||||||
|
let quantity =
|
||||||
|
base_lots_to_ui_perp(event.quantity, config.base_decimals, config.quote_decimals);
|
||||||
|
[
|
||||||
|
FillEvent {
|
||||||
event_type: FillEventType::Perp,
|
event_type: FillEventType::Perp,
|
||||||
maker: true,
|
maker: true,
|
||||||
side: maker_side,
|
side: maker_side,
|
||||||
|
@ -93,8 +140,8 @@ impl FillEvent {
|
||||||
order_id: event.maker_order_id,
|
order_id: event.maker_order_id,
|
||||||
client_order_id: 0u64,
|
client_order_id: 0u64,
|
||||||
fee: event.maker_fee.to_num(),
|
fee: event.maker_fee.to_num(),
|
||||||
price: event.price,
|
price: price,
|
||||||
quantity: event.quantity,
|
quantity: quantity,
|
||||||
},
|
},
|
||||||
FillEvent {
|
FillEvent {
|
||||||
event_type: FillEventType::Perp,
|
event_type: FillEventType::Perp,
|
||||||
|
@ -106,14 +153,29 @@ impl FillEvent {
|
||||||
order_id: event.taker_order_id,
|
order_id: event.taker_order_id,
|
||||||
client_order_id: event.taker_client_order_id,
|
client_order_id: event.taker_client_order_id,
|
||||||
fee: event.taker_fee.to_num(),
|
fee: event.taker_fee.to_num(),
|
||||||
price: event.price,
|
price: price,
|
||||||
quantity: event.quantity,
|
quantity: quantity,
|
||||||
|
},
|
||||||
}]
|
]
|
||||||
}
|
}
|
||||||
pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self {
|
pub fn new_from_spot(
|
||||||
|
event: SpotEvent,
|
||||||
|
timestamp: u64,
|
||||||
|
seq_num: u64,
|
||||||
|
config: &MarketConfig,
|
||||||
|
) -> Self {
|
||||||
match event {
|
match event {
|
||||||
SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => {
|
SpotEvent::Fill {
|
||||||
|
side,
|
||||||
|
maker,
|
||||||
|
native_qty_paid,
|
||||||
|
native_qty_received,
|
||||||
|
native_fee_or_rebate,
|
||||||
|
order_id,
|
||||||
|
owner,
|
||||||
|
client_order_id,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
let side = match side as u8 {
|
let side = match side as u8 {
|
||||||
0 => OrderbookSide::Bid,
|
0 => OrderbookSide::Bid,
|
||||||
1 => OrderbookSide::Ask,
|
1 => OrderbookSide::Ask,
|
||||||
|
@ -123,8 +185,41 @@ impl FillEvent {
|
||||||
Some(id) => id.into(),
|
Some(id) => id.into(),
|
||||||
None => 0u64,
|
None => 0u64,
|
||||||
};
|
};
|
||||||
// TODO: native to ui
|
|
||||||
let price = (native_qty_paid / native_qty_received) as i64;
|
let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64;
|
||||||
|
let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64;
|
||||||
|
|
||||||
|
let (price, quantity) = match side {
|
||||||
|
OrderbookSide::Bid => {
|
||||||
|
let price_before_fees = if maker {
|
||||||
|
native_qty_paid + native_fee_or_rebate
|
||||||
|
} else {
|
||||||
|
native_qty_paid - native_fee_or_rebate
|
||||||
|
};
|
||||||
|
|
||||||
|
let top = price_before_fees * base_multiplier;
|
||||||
|
let bottom = quote_multiplier * native_qty_received;
|
||||||
|
let price = top as f64 / bottom as f64;
|
||||||
|
let quantity = native_qty_received as f64 / base_multiplier as f64;
|
||||||
|
(price, quantity)
|
||||||
|
}
|
||||||
|
OrderbookSide::Ask => {
|
||||||
|
let price_before_fees = if maker {
|
||||||
|
native_qty_received - native_fee_or_rebate
|
||||||
|
} else {
|
||||||
|
native_qty_received + native_fee_or_rebate
|
||||||
|
};
|
||||||
|
|
||||||
|
let top = price_before_fees * base_multiplier;
|
||||||
|
let bottom = quote_multiplier * native_qty_paid;
|
||||||
|
let price = top as f64 / bottom as f64;
|
||||||
|
let quantity = native_qty_paid as f64 / base_multiplier as f64;
|
||||||
|
(price, quantity)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let fee = native_fee_or_rebate as f32 / quote_multiplier as f32;
|
||||||
|
|
||||||
FillEvent {
|
FillEvent {
|
||||||
event_type: FillEventType::Spot,
|
event_type: FillEventType::Spot,
|
||||||
maker: maker,
|
maker: maker,
|
||||||
|
@ -134,12 +229,14 @@ impl FillEvent {
|
||||||
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
|
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
|
||||||
order_id: order_id,
|
order_id: order_id,
|
||||||
client_order_id: client_order_id,
|
client_order_id: client_order_id,
|
||||||
fee: native_fee_or_rebate as f32,
|
fee,
|
||||||
price,
|
price,
|
||||||
quantity: native_qty_received as i64,
|
quantity,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")}
|
SpotEvent::Out { .. } => {
|
||||||
|
panic!("Can't build FillEvent from SpotEvent::Out")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -148,8 +245,8 @@ impl FillEvent {
|
||||||
pub struct FillUpdate {
|
pub struct FillUpdate {
|
||||||
pub event: FillEvent,
|
pub event: FillEvent,
|
||||||
pub status: FillUpdateStatus,
|
pub status: FillUpdateStatus,
|
||||||
pub market: String,
|
pub market_key: String,
|
||||||
pub queue: String,
|
pub market_name: String,
|
||||||
pub slot: u64,
|
pub slot: u64,
|
||||||
pub write_version: u64,
|
pub write_version: u64,
|
||||||
}
|
}
|
||||||
|
@ -170,13 +267,13 @@ impl Serialize for FillUpdate {
|
||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
let mut state = serializer.serialize_struct("FillUpdate", 4)?;
|
let mut state = serializer.serialize_struct("FillUpdate", 6)?;
|
||||||
state.serialize_field("event", &self.event)?;
|
state.serialize_field("event", &self.event)?;
|
||||||
state.serialize_field("market", &self.market)?;
|
state.serialize_field("marketKey", &self.market_key)?;
|
||||||
state.serialize_field("queue", &self.queue)?;
|
state.serialize_field("marketName", &self.market_name)?;
|
||||||
state.serialize_field("status", &self.status)?;
|
state.serialize_field("status", &self.status)?;
|
||||||
state.serialize_field("slot", &self.slot)?;
|
state.serialize_field("slot", &self.slot)?;
|
||||||
state.serialize_field("write_version", &self.write_version)?;
|
state.serialize_field("writeVersion", &self.write_version)?;
|
||||||
|
|
||||||
state.end()
|
state.end()
|
||||||
}
|
}
|
||||||
|
@ -218,7 +315,7 @@ type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
|
||||||
fn publish_changes_perp(
|
fn publish_changes_perp(
|
||||||
slot: u64,
|
slot: u64,
|
||||||
write_version: u64,
|
write_version: u64,
|
||||||
mkt: &(Pubkey, Pubkey),
|
mkt: &(Pubkey, MarketConfig),
|
||||||
header: &EventQueueHeader,
|
header: &EventQueueHeader,
|
||||||
events: &EventQueueEvents,
|
events: &EventQueueEvents,
|
||||||
old_seq_num: u64,
|
old_seq_num: u64,
|
||||||
|
@ -234,7 +331,7 @@ fn publish_changes_perp(
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
let mut checkpoint = Vec::new();
|
let mut checkpoint = Vec::new();
|
||||||
let mkt_pk_string = mkt.0.to_string();
|
let mkt_pk_string = mkt.0.to_string();
|
||||||
let evq_pk_string = mkt.1.to_string();
|
let evq_pk_string = mkt.1.event_queue.to_string();
|
||||||
for seq_num in start_seq_num..header.seq_num {
|
for seq_num in start_seq_num..header.seq_num {
|
||||||
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
||||||
|
|
||||||
|
@ -245,8 +342,8 @@ fn publish_changes_perp(
|
||||||
// the order of these checks is important so they are exhaustive
|
// the order of these checks is important so they are exhaustive
|
||||||
if seq_num >= old_seq_num {
|
if seq_num >= old_seq_num {
|
||||||
debug!(
|
debug!(
|
||||||
"found new event {} idx {} type {}",
|
"found new event {} idx {} type {} slot {} write_version {}",
|
||||||
mkt_pk_string, idx, events[idx].event_type as u32
|
mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version
|
||||||
);
|
);
|
||||||
|
|
||||||
metric_events_new.increment();
|
metric_events_new.increment();
|
||||||
|
@ -254,7 +351,7 @@ fn publish_changes_perp(
|
||||||
// new fills are published and recorded in checkpoint
|
// new fills are published and recorded in checkpoint
|
||||||
if events[idx].event_type == EventType::Fill as u8 {
|
if events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
||||||
let fills = FillEvent::new_from_perp(fill);
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||||
// send event for both maker and taker
|
// send event for both maker and taker
|
||||||
for fill in fills {
|
for fill in fills {
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
|
@ -263,8 +360,8 @@ fn publish_changes_perp(
|
||||||
write_version,
|
write_version,
|
||||||
event: fill.clone(),
|
event: fill.clone(),
|
||||||
status: FillUpdateStatus::New,
|
status: FillUpdateStatus::New,
|
||||||
market: mkt_pk_string.clone(),
|
market_key: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
market_name: mkt.1.name.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
checkpoint.push(fill);
|
checkpoint.push(fill);
|
||||||
|
@ -283,7 +380,7 @@ fn publish_changes_perp(
|
||||||
// first revoke old event if a fill
|
// first revoke old event if a fill
|
||||||
if old_events[idx].event_type == EventType::Fill as u8 {
|
if old_events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
||||||
let fills = FillEvent::new_from_perp(fill);
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||||
for fill in fills {
|
for fill in fills {
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
|
@ -291,8 +388,8 @@ fn publish_changes_perp(
|
||||||
write_version,
|
write_version,
|
||||||
event: fill,
|
event: fill,
|
||||||
status: FillUpdateStatus::Revoke,
|
status: FillUpdateStatus::Revoke,
|
||||||
market: mkt_pk_string.clone(),
|
market_key: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
market_name: mkt.1.name.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
}
|
}
|
||||||
|
@ -301,7 +398,7 @@ fn publish_changes_perp(
|
||||||
// then publish new if its a fill and record in checkpoint
|
// then publish new if its a fill and record in checkpoint
|
||||||
if events[idx].event_type == EventType::Fill as u8 {
|
if events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
||||||
let fills = FillEvent::new_from_perp(fill);
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||||
for fill in fills {
|
for fill in fills {
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
|
@ -309,8 +406,8 @@ fn publish_changes_perp(
|
||||||
write_version,
|
write_version,
|
||||||
event: fill.clone(),
|
event: fill.clone(),
|
||||||
status: FillUpdateStatus::New,
|
status: FillUpdateStatus::New,
|
||||||
market: mkt_pk_string.clone(),
|
market_key: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
market_name: mkt.1.name.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
checkpoint.push(fill);
|
checkpoint.push(fill);
|
||||||
|
@ -320,7 +417,7 @@ fn publish_changes_perp(
|
||||||
// every already published event is recorded in checkpoint if a fill
|
// every already published event is recorded in checkpoint if a fill
|
||||||
if events[idx].event_type == EventType::Fill as u8 {
|
if events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
|
||||||
let fills = FillEvent::new_from_perp(fill);
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||||
for fill in fills {
|
for fill in fills {
|
||||||
checkpoint.push(fill);
|
checkpoint.push(fill);
|
||||||
}
|
}
|
||||||
|
@ -332,15 +429,15 @@ fn publish_changes_perp(
|
||||||
for seq_num in header.seq_num..old_seq_num {
|
for seq_num in header.seq_num..old_seq_num {
|
||||||
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
||||||
debug!(
|
debug!(
|
||||||
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
|
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}",
|
||||||
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num
|
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num, slot, write_version
|
||||||
);
|
);
|
||||||
|
|
||||||
metric_events_drop.increment();
|
metric_events_drop.increment();
|
||||||
|
|
||||||
if old_events[idx].event_type == EventType::Fill as u8 {
|
if old_events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
|
||||||
let fills = FillEvent::new_from_perp(fill);
|
let fills = FillEvent::new_from_perp(fill, &mkt.1);
|
||||||
for fill in fills {
|
for fill in fills {
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
|
@ -348,8 +445,8 @@ fn publish_changes_perp(
|
||||||
event: fill,
|
event: fill,
|
||||||
write_version,
|
write_version,
|
||||||
status: FillUpdateStatus::Revoke,
|
status: FillUpdateStatus::Revoke,
|
||||||
market: mkt_pk_string.clone(),
|
market_key: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
market_name: mkt.1.name.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
}
|
}
|
||||||
|
@ -370,7 +467,7 @@ fn publish_changes_perp(
|
||||||
fn publish_changes_serum(
|
fn publish_changes_serum(
|
||||||
slot: u64,
|
slot: u64,
|
||||||
write_version: u64,
|
write_version: u64,
|
||||||
mkt: &(Pubkey, Pubkey),
|
mkt: &(Pubkey, MarketConfig),
|
||||||
header: &SerumEventQueueHeader,
|
header: &SerumEventQueueHeader,
|
||||||
events: &[serum_dex::state::Event],
|
events: &[serum_dex::state::Event],
|
||||||
old_seq_num: u64,
|
old_seq_num: u64,
|
||||||
|
@ -386,12 +483,15 @@ fn publish_changes_serum(
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
let mut checkpoint = Vec::new();
|
let mut checkpoint = Vec::new();
|
||||||
let mkt_pk_string = mkt.0.to_string();
|
let mkt_pk_string = mkt.0.to_string();
|
||||||
let evq_pk_string = mkt.1.to_string();
|
let evq_pk_string = mkt.1.event_queue.to_string();
|
||||||
let header_seq_num = header.seq_num;
|
let header_seq_num = header.seq_num;
|
||||||
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
|
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
|
||||||
|
|
||||||
// Timestamp for spot events is time scraped
|
// Timestamp for spot events is time scraped
|
||||||
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
|
let timestamp = SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs();
|
||||||
for seq_num in start_seq_num..header_seq_num {
|
for seq_num in start_seq_num..header_seq_num {
|
||||||
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
||||||
let event_view = events[idx].as_view().unwrap();
|
let event_view = events[idx].as_view().unwrap();
|
||||||
|
@ -404,7 +504,7 @@ fn publish_changes_serum(
|
||||||
// 2) the event is not matching the old event queue
|
// 2) the event is not matching the old event queue
|
||||||
// 3) all other events are matching the old event queue
|
// 3) all other events are matching the old event queue
|
||||||
// the order of these checks is important so they are exhaustive
|
// the order of these checks is important so they are exhaustive
|
||||||
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num);
|
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1);
|
||||||
if seq_num >= old_seq_num {
|
if seq_num >= old_seq_num {
|
||||||
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
|
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
|
||||||
|
|
||||||
|
@ -415,8 +515,8 @@ fn publish_changes_serum(
|
||||||
write_version,
|
write_version,
|
||||||
event: fill.clone(),
|
event: fill.clone(),
|
||||||
status: FillUpdateStatus::New,
|
status: FillUpdateStatus::New,
|
||||||
market: mkt_pk_string.clone(),
|
market_key: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
market_name: mkt.1.name.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
checkpoint.push(fill);
|
checkpoint.push(fill);
|
||||||
|
@ -433,8 +533,12 @@ fn publish_changes_serum(
|
||||||
|
|
||||||
metric_events_change.increment();
|
metric_events_change.increment();
|
||||||
|
|
||||||
|
let old_fill = FillEvent::new_from_spot(
|
||||||
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
|
old_event_view,
|
||||||
|
timestamp,
|
||||||
|
seq_num,
|
||||||
|
&mkt.1,
|
||||||
|
);
|
||||||
// first revoke old event
|
// first revoke old event
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
|
@ -442,8 +546,8 @@ fn publish_changes_serum(
|
||||||
write_version,
|
write_version,
|
||||||
event: old_fill,
|
event: old_fill,
|
||||||
status: FillUpdateStatus::Revoke,
|
status: FillUpdateStatus::Revoke,
|
||||||
market: mkt_pk_string.clone(),
|
market_key: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
market_name: mkt.1.name.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
|
|
||||||
|
@ -454,8 +558,8 @@ fn publish_changes_serum(
|
||||||
write_version,
|
write_version,
|
||||||
event: fill.clone(),
|
event: fill.clone(),
|
||||||
status: FillUpdateStatus::New,
|
status: FillUpdateStatus::New,
|
||||||
market: mkt_pk_string.clone(),
|
market_key: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
market_name: mkt.1.name.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
}
|
}
|
||||||
|
@ -478,8 +582,8 @@ fn publish_changes_serum(
|
||||||
write_version,
|
write_version,
|
||||||
event: fill.clone(),
|
event: fill.clone(),
|
||||||
status: FillUpdateStatus::New,
|
status: FillUpdateStatus::New,
|
||||||
market: mkt_pk_string.clone(),
|
market_key: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
market_name: mkt.1.name.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
checkpoint.push(fill);
|
checkpoint.push(fill);
|
||||||
|
@ -503,15 +607,15 @@ fn publish_changes_serum(
|
||||||
|
|
||||||
match old_event_view {
|
match old_event_view {
|
||||||
SpotEvent::Fill { .. } => {
|
SpotEvent::Fill { .. } => {
|
||||||
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
|
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1);
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
.try_send(FillEventFilterMessage::Update(FillUpdate {
|
||||||
slot,
|
slot,
|
||||||
event: old_fill,
|
event: old_fill,
|
||||||
write_version,
|
write_version,
|
||||||
status: FillUpdateStatus::Revoke,
|
status: FillUpdateStatus::Revoke,
|
||||||
market: mkt_pk_string.clone(),
|
market_key: mkt_pk_string.clone(),
|
||||||
queue: evq_pk_string.clone(),
|
market_name: mkt.1.name.clone(),
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
}
|
}
|
||||||
|
@ -520,21 +624,19 @@ fn publish_changes_serum(
|
||||||
}
|
}
|
||||||
|
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
.try_send(FillEventFilterMessage::Checkpoint(
|
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
|
||||||
FillCheckpoint {
|
|
||||||
slot,
|
slot,
|
||||||
write_version,
|
write_version,
|
||||||
events: checkpoint,
|
events: checkpoint,
|
||||||
market: mkt_pk_string,
|
market: mkt_pk_string,
|
||||||
queue: evq_pk_string,
|
queue: evq_pk_string,
|
||||||
},
|
}))
|
||||||
))
|
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init(
|
pub async fn init(
|
||||||
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
|
perp_market_configs: Vec<(Pubkey, MarketConfig)>,
|
||||||
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
|
spot_market_configs: Vec<(Pubkey, MarketConfig)>,
|
||||||
metrics_sender: Metrics,
|
metrics_sender: Metrics,
|
||||||
) -> anyhow::Result<(
|
) -> anyhow::Result<(
|
||||||
async_channel::Sender<AccountWrite>,
|
async_channel::Sender<AccountWrite>,
|
||||||
|
@ -549,8 +651,12 @@ pub async fn init(
|
||||||
metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter);
|
metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter);
|
||||||
let mut metric_events_change =
|
let mut metric_events_change =
|
||||||
metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter);
|
metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter);
|
||||||
|
let mut metric_events_change_serum =
|
||||||
|
metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter);
|
||||||
let mut metrics_events_drop =
|
let mut metrics_events_drop =
|
||||||
metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter);
|
metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter);
|
||||||
|
let mut metrics_events_drop_serum =
|
||||||
|
metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter);
|
||||||
|
|
||||||
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
||||||
let (account_write_queue_sender, account_write_queue_receiver) =
|
let (account_write_queue_sender, account_write_queue_receiver) =
|
||||||
|
@ -572,18 +678,24 @@ pub async fn init(
|
||||||
let mut seq_num_cache = HashMap::new();
|
let mut seq_num_cache = HashMap::new();
|
||||||
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
|
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
|
||||||
|
|
||||||
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
|
let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat();
|
||||||
let relevant_pubkeys = all_queue_pks
|
let perp_queue_pks: Vec<Pubkey> = perp_market_configs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|m| m.1)
|
.map(|x| x.1.event_queue)
|
||||||
.collect::<HashSet<Pubkey>>();
|
.collect();
|
||||||
|
let spot_queue_pks: Vec<Pubkey> = spot_market_configs
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.1.event_queue)
|
||||||
|
.collect();
|
||||||
|
let all_queue_pks: HashSet<Pubkey> =
|
||||||
|
HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat());
|
||||||
|
|
||||||
// update handling thread, reads both sloths and account updates
|
// update handling thread, reads both sloths and account updates
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Ok(account_write) = account_write_queue_receiver_c.recv() => {
|
Ok(account_write) = account_write_queue_receiver_c.recv() => {
|
||||||
if !relevant_pubkeys.contains(&account_write.pubkey) {
|
if !all_queue_pks.contains(&account_write.pubkey) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -611,21 +723,38 @@ pub async fn init(
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Err(e) = slot_queue_receiver.recv() => {
|
||||||
|
warn!("slot update channel err {:?}", e);
|
||||||
|
}
|
||||||
|
Err(e) = account_write_queue_receiver_c.recv() => {
|
||||||
|
warn!("write update channel err {:?}", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for mkt in all_queue_pks.iter() {
|
for mkt in all_market_configs.iter() {
|
||||||
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
|
let evq_pk = mkt.1.event_queue;
|
||||||
let mkt_pk = mkt.1;
|
let evq_pk_string = evq_pk.to_string();
|
||||||
|
let last_evq_version = last_evq_versions
|
||||||
|
.get(&mkt.1.event_queue.to_string())
|
||||||
|
.unwrap_or(&(0, 0));
|
||||||
|
|
||||||
match chain_cache.account(&mkt_pk) {
|
match chain_cache.account(&evq_pk) {
|
||||||
Ok(account_info) => {
|
Ok(account_info) => {
|
||||||
// only process if the account state changed
|
// only process if the account state changed
|
||||||
let evq_version = (account_info.slot, account_info.write_version);
|
let evq_version = (account_info.slot, account_info.write_version);
|
||||||
let evq_pk_string = mkt.1.to_string();
|
|
||||||
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
|
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
|
||||||
if evq_version == *last_evq_version {
|
if evq_version == *last_evq_version {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if evq_version.0 < last_evq_version.0 {
|
||||||
|
debug!("evq version slot was old");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1
|
||||||
|
{
|
||||||
|
info!("evq version slot was same and write version was old");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
|
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
|
||||||
|
|
||||||
let account = &account_info.account;
|
let account = &account_info.account;
|
||||||
|
@ -633,17 +762,16 @@ pub async fn init(
|
||||||
if is_perp {
|
if is_perp {
|
||||||
let event_queue =
|
let event_queue =
|
||||||
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
|
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
|
||||||
trace!(
|
info!(
|
||||||
"evq {} seq_num {}",
|
"evq {} seq_num {} version {:?}",
|
||||||
evq_pk_string,
|
evq_pk_string, event_queue.header.seq_num, evq_version,
|
||||||
event_queue.header.seq_num
|
|
||||||
);
|
);
|
||||||
match seq_num_cache.get(&evq_pk_string) {
|
match seq_num_cache.get(&evq_pk_string) {
|
||||||
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
|
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
|
||||||
Some(old_events) => publish_changes_perp(
|
Some(old_events) => publish_changes_perp(
|
||||||
account_info.slot,
|
account_info.slot,
|
||||||
account_info.write_version,
|
account_info.write_version,
|
||||||
mkt,
|
&mkt,
|
||||||
&event_queue.header,
|
&event_queue.header,
|
||||||
&event_queue.buf,
|
&event_queue.buf,
|
||||||
*old_seq_num,
|
*old_seq_num,
|
||||||
|
@ -690,14 +818,17 @@ pub async fn init(
|
||||||
old_events,
|
old_events,
|
||||||
&fill_update_sender,
|
&fill_update_sender,
|
||||||
&mut metric_events_new_serum,
|
&mut metric_events_new_serum,
|
||||||
&mut metric_events_change,
|
&mut metric_events_change_serum,
|
||||||
&mut metrics_events_drop,
|
&mut metrics_events_drop_serum,
|
||||||
),
|
),
|
||||||
_ => {
|
_ => {
|
||||||
info!("serum_events_cache could not find {}", evq_pk_string)
|
debug!(
|
||||||
|
"serum_events_cache could not find {}",
|
||||||
|
evq_pk_string
|
||||||
|
)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => info!("seq_num_cache could not find {}", evq_pk_string),
|
_ => debug!("seq_num_cache could not find {}", evq_pk_string),
|
||||||
}
|
}
|
||||||
|
|
||||||
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
|
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
|
||||||
|
@ -705,7 +836,7 @@ pub async fn init(
|
||||||
.insert(evq_pk_string.clone(), events.clone().to_vec());
|
.insert(evq_pk_string.clone(), events.clone().to_vec());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => info!("chain_cache could not find {}", mkt.1),
|
Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,237 @@
|
||||||
|
use chrono::{TimeZone, Utc};
|
||||||
|
use log::*;
|
||||||
|
use native_tls::{Certificate, Identity, TlsConnector};
|
||||||
|
use postgres_native_tls::MakeTlsConnector;
|
||||||
|
use postgres_query::Caching;
|
||||||
|
use std::{env, fs, time::Duration};
|
||||||
|
use tokio_postgres::Client;
|
||||||
|
|
||||||
|
use crate::{fill_event_filter::FillUpdate, metrics::*, PostgresConfig};
|
||||||
|
|
||||||
|
async fn postgres_connection(
|
||||||
|
config: &PostgresConfig,
|
||||||
|
metric_retries: MetricU64,
|
||||||
|
metric_live: MetricU64,
|
||||||
|
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
|
||||||
|
let (tx, rx) = async_channel::unbounded();
|
||||||
|
|
||||||
|
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
|
||||||
|
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
|
||||||
|
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
|
||||||
|
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
|
||||||
|
info!("making tls config");
|
||||||
|
let tls = match &config.tls {
|
||||||
|
Some(tls) => {
|
||||||
|
use base64::{engine::general_purpose, Engine as _};
|
||||||
|
let ca_cert = match &tls.ca_cert_path.chars().next().unwrap() {
|
||||||
|
'$' => general_purpose::STANDARD
|
||||||
|
.decode(
|
||||||
|
env::var(&tls.ca_cert_path[1..])
|
||||||
|
.expect("reading client cert from env")
|
||||||
|
.into_bytes(),
|
||||||
|
)
|
||||||
|
.expect("decoding client cert"),
|
||||||
|
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
|
||||||
|
};
|
||||||
|
let client_key = match &tls.client_key_path.chars().next().unwrap() {
|
||||||
|
'$' => general_purpose::STANDARD
|
||||||
|
.decode(
|
||||||
|
env::var(&tls.client_key_path[1..])
|
||||||
|
.expect("reading client key from env")
|
||||||
|
.into_bytes(),
|
||||||
|
)
|
||||||
|
.expect("decoding client key"),
|
||||||
|
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
|
||||||
|
};
|
||||||
|
MakeTlsConnector::new(
|
||||||
|
TlsConnector::builder()
|
||||||
|
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
|
||||||
|
.identity(Identity::from_pkcs12(&client_key, "pass")?)
|
||||||
|
.danger_accept_invalid_certs(config.allow_invalid_certs)
|
||||||
|
.build()?,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
None => MakeTlsConnector::new(
|
||||||
|
TlsConnector::builder()
|
||||||
|
.danger_accept_invalid_certs(config.allow_invalid_certs)
|
||||||
|
.build()?,
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
let config = config.clone();
|
||||||
|
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?);
|
||||||
|
let mut metric_retries = metric_retries;
|
||||||
|
let mut metric_live = metric_live;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let (client, connection) = match initial.take() {
|
||||||
|
Some(v) => v,
|
||||||
|
None => {
|
||||||
|
let result =
|
||||||
|
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
|
||||||
|
match result {
|
||||||
|
Ok(v) => v,
|
||||||
|
Err(err) => {
|
||||||
|
warn!("could not connect to postgres: {:?}", err);
|
||||||
|
tokio::time::sleep(Duration::from_secs(
|
||||||
|
config.retry_connection_sleep_secs,
|
||||||
|
))
|
||||||
|
.await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tx.send(Some(client)).await.expect("send success");
|
||||||
|
metric_live.increment();
|
||||||
|
|
||||||
|
let result = connection.await;
|
||||||
|
|
||||||
|
metric_retries.increment();
|
||||||
|
metric_live.decrement();
|
||||||
|
|
||||||
|
tx.send(None).await.expect("send success");
|
||||||
|
warn!("postgres connection error: {:?}", result);
|
||||||
|
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_postgres_client<'a>(
|
||||||
|
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
|
||||||
|
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
|
||||||
|
config: &PostgresConfig,
|
||||||
|
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
|
||||||
|
// get the most recent client, waiting if there's a disconnect
|
||||||
|
while !rx.is_empty() || client.is_none() {
|
||||||
|
tokio::select! {
|
||||||
|
client_raw_opt = rx.recv() => {
|
||||||
|
*client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new);
|
||||||
|
},
|
||||||
|
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
|
||||||
|
error!("waited too long for new postgres client");
|
||||||
|
std::process::exit(1);
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
client.as_ref().expect("must contain value")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process_update(client: &Caching<Client>, update: &FillUpdate) -> anyhow::Result<()> {
|
||||||
|
let market = &update.market_key;
|
||||||
|
let seq_num = update.event.seq_num as i64;
|
||||||
|
let fill_timestamp = Utc.timestamp_opt(update.event.timestamp as i64, 0).unwrap();
|
||||||
|
let price = update.event.price as f64;
|
||||||
|
let quantity = update.event.quantity as f64;
|
||||||
|
let slot = update.slot as i64;
|
||||||
|
let write_version = update.write_version as i64;
|
||||||
|
|
||||||
|
let query = postgres_query::query!(
|
||||||
|
"INSERT INTO transactions_v4.perp_fills_feed_events
|
||||||
|
(market, seq_num, fill_timestamp, price,
|
||||||
|
quantity, slot, write_version)
|
||||||
|
VALUES
|
||||||
|
($market, $seq_num, $fill_timestamp, $price,
|
||||||
|
$quantity, $slot, $write_version)
|
||||||
|
ON CONFLICT (market, seq_num) DO NOTHING",
|
||||||
|
market,
|
||||||
|
seq_num,
|
||||||
|
fill_timestamp,
|
||||||
|
price,
|
||||||
|
quantity,
|
||||||
|
slot,
|
||||||
|
write_version,
|
||||||
|
);
|
||||||
|
let _ = query.execute(&client).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn init(
|
||||||
|
config: &PostgresConfig,
|
||||||
|
metrics_sender: Metrics,
|
||||||
|
) -> anyhow::Result<async_channel::Sender<FillUpdate>> {
|
||||||
|
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
||||||
|
let (fill_update_queue_sender, fill_update_queue_receiver) =
|
||||||
|
async_channel::bounded::<FillUpdate>(config.max_queue_size);
|
||||||
|
|
||||||
|
let metric_con_retries = metrics_sender.register_u64(
|
||||||
|
"fills_postgres_connection_retries".into(),
|
||||||
|
MetricType::Counter,
|
||||||
|
);
|
||||||
|
let metric_con_live =
|
||||||
|
metrics_sender.register_u64("fills_postgres_connections_alive".into(), MetricType::Gauge);
|
||||||
|
|
||||||
|
// postgres fill update sending worker threads
|
||||||
|
for _ in 0..config.connection_count {
|
||||||
|
let postgres_account_writes =
|
||||||
|
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
||||||
|
.await?;
|
||||||
|
let fill_update_queue_receiver_c = fill_update_queue_receiver.clone();
|
||||||
|
let config = config.clone();
|
||||||
|
let mut metric_retries =
|
||||||
|
metrics_sender.register_u64("fills_postgres_retries".into(), MetricType::Counter);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut client_opt = None;
|
||||||
|
loop {
|
||||||
|
// Retrieve up to batch_size updates
|
||||||
|
let mut batch = Vec::new();
|
||||||
|
batch.push(
|
||||||
|
fill_update_queue_receiver_c
|
||||||
|
.recv()
|
||||||
|
.await
|
||||||
|
.expect("sender must stay alive"),
|
||||||
|
);
|
||||||
|
while batch.len() < config.max_batch_size {
|
||||||
|
match fill_update_queue_receiver_c.try_recv() {
|
||||||
|
Ok(update) => batch.push(update),
|
||||||
|
Err(async_channel::TryRecvError::Empty) => break,
|
||||||
|
Err(async_channel::TryRecvError::Closed) => {
|
||||||
|
panic!("sender must stay alive")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"updates, batch {}, channel size {}",
|
||||||
|
batch.len(),
|
||||||
|
fill_update_queue_receiver_c.len(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut error_count = 0;
|
||||||
|
loop {
|
||||||
|
let client =
|
||||||
|
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
|
||||||
|
.await;
|
||||||
|
let mut results = futures::future::join_all(
|
||||||
|
batch.iter().map(|update| process_update(client, update)),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let mut iter = results.iter();
|
||||||
|
batch.retain(|_| iter.next().unwrap().is_err());
|
||||||
|
if batch.len() > 0 {
|
||||||
|
metric_retries.add(batch.len() as u64);
|
||||||
|
error_count += 1;
|
||||||
|
if error_count - 1 < config.retry_query_max_count {
|
||||||
|
results.retain(|r| r.is_err());
|
||||||
|
warn!("failed to process fill update, retrying: {:?}", results);
|
||||||
|
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
|
||||||
|
.await;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
error!("failed to process account write, exiting");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(fill_update_queue_sender)
|
||||||
|
}
|
|
@ -1,21 +1,19 @@
|
||||||
pub mod chain_data;
|
pub mod chain_data;
|
||||||
pub mod fill_event_filter;
|
pub mod fill_event_filter;
|
||||||
pub mod orderbook_filter;
|
pub mod fill_event_postgres_target;
|
||||||
pub mod grpc_plugin_source;
|
pub mod grpc_plugin_source;
|
||||||
pub mod memory_target;
|
pub mod memory_target;
|
||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
pub mod postgres_target;
|
pub mod orderbook_filter;
|
||||||
pub mod postgres_types_numeric;
|
pub mod postgres_types_numeric;
|
||||||
pub mod websocket_source;
|
pub mod websocket_source;
|
||||||
|
|
||||||
pub use chain_data::SlotStatus;
|
pub use chain_data::SlotStatus;
|
||||||
use serde::{Serialize, Serializer, ser::SerializeStruct};
|
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||||
|
|
||||||
use {
|
use {
|
||||||
async_trait::async_trait,
|
|
||||||
serde_derive::Deserialize,
|
serde_derive::Deserialize,
|
||||||
solana_sdk::{account::Account, pubkey::Pubkey},
|
solana_sdk::{account::Account, pubkey::Pubkey},
|
||||||
std::sync::Arc,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
trait AnyhowWrap {
|
trait AnyhowWrap {
|
||||||
|
@ -69,14 +67,12 @@ pub struct SlotUpdate {
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
pub struct PostgresConfig {
|
pub struct PostgresConfig {
|
||||||
pub connection_string: String,
|
pub connection_string: String,
|
||||||
/// Number of parallel postgres connections used for account write insertions
|
/// Number of parallel postgres connections used for insertions
|
||||||
pub account_write_connection_count: u64,
|
pub connection_count: u64,
|
||||||
/// Maximum batch size for account write inserts over one connection
|
/// Maximum batch size for inserts over one connection
|
||||||
pub account_write_max_batch_size: usize,
|
pub max_batch_size: usize,
|
||||||
/// Max size of account write queues
|
/// Max size of queues
|
||||||
pub account_write_max_queue_size: usize,
|
pub max_queue_size: usize,
|
||||||
/// Number of parallel postgres connections used for slot insertions
|
|
||||||
pub slot_update_connection_count: u64,
|
|
||||||
/// Number of queries retries before fatal error
|
/// Number of queries retries before fatal error
|
||||||
pub retry_query_max_count: u64,
|
pub retry_query_max_count: u64,
|
||||||
/// Seconds to sleep between query retries
|
/// Seconds to sleep between query retries
|
||||||
|
@ -87,12 +83,15 @@ pub struct PostgresConfig {
|
||||||
pub fatal_connection_timeout_secs: u64,
|
pub fatal_connection_timeout_secs: u64,
|
||||||
/// Allow invalid TLS certificates, passed to native_tls danger_accept_invalid_certs
|
/// Allow invalid TLS certificates, passed to native_tls danger_accept_invalid_certs
|
||||||
pub allow_invalid_certs: bool,
|
pub allow_invalid_certs: bool,
|
||||||
/// Name key to use in the monitoring table
|
pub tls: Option<PostgresTlsConfig>,
|
||||||
pub monitoring_name: String,
|
}
|
||||||
/// Time between updates to the monitoring table
|
|
||||||
pub monitoring_update_interval_secs: u64,
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
/// Time between cleanup jobs (0 to disable)
|
pub struct PostgresTlsConfig {
|
||||||
pub cleanup_interval_secs: u64,
|
/// CA Cert file or env var
|
||||||
|
pub ca_cert_path: String,
|
||||||
|
/// PKCS12 client cert path
|
||||||
|
pub client_key_path: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
|
@ -164,63 +163,3 @@ pub struct Config {
|
||||||
pub source: SourceConfig,
|
pub source: SourceConfig,
|
||||||
pub metrics: MetricsConfig,
|
pub metrics: MetricsConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait AccountTable: Sync + Send {
|
|
||||||
fn table_name(&self) -> &str;
|
|
||||||
async fn insert_account_write(
|
|
||||||
&self,
|
|
||||||
client: &postgres_query::Caching<tokio_postgres::Client>,
|
|
||||||
account_write: &AccountWrite,
|
|
||||||
) -> anyhow::Result<()>;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type AccountTables = Vec<Arc<dyn AccountTable>>;
|
|
||||||
|
|
||||||
pub struct RawAccountTable {}
|
|
||||||
|
|
||||||
pub fn encode_address(addr: &Pubkey) -> String {
|
|
||||||
bs58::encode(&addr.to_bytes()).into_string()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl AccountTable for RawAccountTable {
|
|
||||||
fn table_name(&self) -> &str {
|
|
||||||
"account_write"
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn insert_account_write(
|
|
||||||
&self,
|
|
||||||
client: &postgres_query::Caching<tokio_postgres::Client>,
|
|
||||||
account_write: &AccountWrite,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let pubkey = encode_address(&account_write.pubkey);
|
|
||||||
let owner = encode_address(&account_write.owner);
|
|
||||||
let slot = account_write.slot as i64;
|
|
||||||
let write_version = account_write.write_version as i64;
|
|
||||||
let lamports = account_write.lamports as i64;
|
|
||||||
let rent_epoch = account_write.rent_epoch as i64;
|
|
||||||
|
|
||||||
// TODO: should update for same write_version to work with websocket input
|
|
||||||
let query = postgres_query::query!(
|
|
||||||
"INSERT INTO account_write
|
|
||||||
(pubkey_id, slot, write_version, is_selected,
|
|
||||||
owner_id, lamports, executable, rent_epoch, data)
|
|
||||||
VALUES
|
|
||||||
(map_pubkey($pubkey), $slot, $write_version, $is_selected,
|
|
||||||
map_pubkey($owner), $lamports, $executable, $rent_epoch, $data)
|
|
||||||
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
|
|
||||||
pubkey,
|
|
||||||
slot,
|
|
||||||
write_version,
|
|
||||||
is_selected = account_write.is_selected,
|
|
||||||
owner,
|
|
||||||
lamports,
|
|
||||||
executable = account_write.executable,
|
|
||||||
rent_epoch,
|
|
||||||
data = account_write.data,
|
|
||||||
);
|
|
||||||
let _ = query.execute(client).await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,10 +1,16 @@
|
||||||
|
use crate::metrics::MetricU64;
|
||||||
use crate::{
|
use crate::{
|
||||||
chain_data::{AccountData, ChainData, SlotData},
|
chain_data::{AccountData, ChainData, SlotData},
|
||||||
metrics::{MetricType, Metrics},
|
metrics::{MetricType, Metrics},
|
||||||
AccountWrite, SlotUpdate,
|
AccountWrite, SlotUpdate,
|
||||||
};
|
};
|
||||||
|
use anchor_lang::AccountDeserialize;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use log::*;
|
use log::*;
|
||||||
|
use mango_v4::{
|
||||||
|
serum3_cpi::OrderBookStateHeader,
|
||||||
|
state::{BookSide, OrderTreeType},
|
||||||
|
};
|
||||||
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||||
use serum_dex::critbit::Slab;
|
use serum_dex::critbit::Slab;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
|
@ -19,13 +25,6 @@ use std::{
|
||||||
time::{SystemTime, UNIX_EPOCH},
|
time::{SystemTime, UNIX_EPOCH},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::metrics::MetricU64;
|
|
||||||
use anchor_lang::AccountDeserialize;
|
|
||||||
use mango_v4::{
|
|
||||||
serum3_cpi::OrderBookStateHeader,
|
|
||||||
state::{BookSide, OrderTreeType},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum OrderbookSide {
|
pub enum OrderbookSide {
|
||||||
Bid = 0,
|
Bid = 0,
|
||||||
|
@ -106,6 +105,7 @@ pub struct MarketConfig {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub bids: Pubkey,
|
pub bids: Pubkey,
|
||||||
pub asks: Pubkey,
|
pub asks: Pubkey,
|
||||||
|
pub event_queue: Pubkey,
|
||||||
pub base_decimals: u8,
|
pub base_decimals: u8,
|
||||||
pub quote_decimals: u8,
|
pub quote_decimals: u8,
|
||||||
pub base_lot_size: i64,
|
pub base_lot_size: i64,
|
||||||
|
@ -113,26 +113,28 @@ pub struct MarketConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
|
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
|
||||||
let decimals: u32 = 3;
|
(native * base_lot_size) as f64 / 10i64.pow(base_decimals.into()) as f64
|
||||||
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
|
|
||||||
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
|
|
||||||
res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
|
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
|
||||||
let decimals: u32 = 4;
|
let decimals = base_decimals - quote_decimals;
|
||||||
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
|
native as f64 / (10i64.pow(decimals.into()) as f64)
|
||||||
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
|
|
||||||
res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
|
pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
|
||||||
let decimals = base_decimals - quote_decimals;
|
let decimals = base_decimals - quote_decimals;
|
||||||
// let res = native as f64
|
native as f64 / (10u64.pow(decimals.into())) as f64
|
||||||
// * ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
|
}
|
||||||
// as f64;
|
|
||||||
let res = native as f64 / (10u64.pow(decimals.into())) as f64;
|
pub fn spot_price_to_ui(
|
||||||
res
|
native: i64,
|
||||||
|
native_size: i64,
|
||||||
|
base_decimals: u8,
|
||||||
|
quote_decimals: u8,
|
||||||
|
) -> f64 {
|
||||||
|
// TODO: account for fees
|
||||||
|
((native * 10i64.pow(base_decimals.into())) / (10i64.pow(quote_decimals.into()) * native_size))
|
||||||
|
as f64
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn price_lots_to_ui_perp(
|
pub fn price_lots_to_ui_perp(
|
||||||
|
@ -143,13 +145,8 @@ pub fn price_lots_to_ui_perp(
|
||||||
quote_lot_size: i64,
|
quote_lot_size: i64,
|
||||||
) -> f64 {
|
) -> f64 {
|
||||||
let decimals = base_decimals - quote_decimals;
|
let decimals = base_decimals - quote_decimals;
|
||||||
let res = native as f64
|
let multiplier = 10u64.pow(decimals.into()) as f64;
|
||||||
* ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
|
native as f64 * ((multiplier * quote_lot_size as f64) / base_lot_size as f64)
|
||||||
as f64;
|
|
||||||
// let res = native as f64
|
|
||||||
// / (10u64.pow(decimals.into()))
|
|
||||||
// as f64;
|
|
||||||
res
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn publish_changes(
|
fn publish_changes(
|
||||||
|
@ -332,8 +329,17 @@ pub async fn init(
|
||||||
let side_pk_string = side_pk.to_string();
|
let side_pk_string = side_pk.to_string();
|
||||||
|
|
||||||
let write_version = (account_info.slot, account_info.write_version);
|
let write_version = (account_info.slot, account_info.write_version);
|
||||||
// todo: should this be <= so we don't overwrite with old data received late?
|
if write_version == *last_write_version {
|
||||||
if write_version <= *last_write_version {
|
continue;
|
||||||
|
}
|
||||||
|
if write_version.0 < last_write_version.0 {
|
||||||
|
debug!("evq version slot was old");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if write_version.0 == last_write_version.0
|
||||||
|
&& write_version.1 < last_write_version.1
|
||||||
|
{
|
||||||
|
debug!("evq version slot was same and write version was old");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
last_write_versions.insert(side_pk_string.clone(), write_version);
|
last_write_versions.insert(side_pk_string.clone(), write_version);
|
||||||
|
@ -369,7 +375,7 @@ pub async fn init(
|
||||||
.map(|(_, quantity)| quantity)
|
.map(|(_, quantity)| quantity)
|
||||||
.fold(0, |acc, x| acc + x),
|
.fold(0, |acc, x| acc + x),
|
||||||
mkt.1.base_decimals,
|
mkt.1.base_decimals,
|
||||||
mkt.1.base_lot_size,
|
mkt.1.quote_decimals,
|
||||||
),
|
),
|
||||||
]
|
]
|
||||||
})
|
})
|
||||||
|
@ -394,7 +400,7 @@ pub async fn init(
|
||||||
|
|
||||||
bookside_cache.insert(side_pk_string.clone(), bookside.clone());
|
bookside_cache.insert(side_pk_string.clone(), bookside.clone());
|
||||||
}
|
}
|
||||||
Err(_) => info!("chain_cache could not find {}", mkt_pk),
|
Err(_) => debug!("chain_cache could not find {}", mkt_pk),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -417,7 +423,7 @@ pub async fn init(
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
last_write_versions.insert(side_pk_string.clone(), write_version);
|
last_write_versions.insert(side_pk_string.clone(), write_version);
|
||||||
info!("W {}", mkt.1.name);
|
debug!("W {}", mkt.1.name);
|
||||||
let account = &mut account_info.account.clone();
|
let account = &mut account_info.account.clone();
|
||||||
let data = account.data_as_mut_slice();
|
let data = account.data_as_mut_slice();
|
||||||
let len = data.len();
|
let len = data.len();
|
||||||
|
@ -473,7 +479,7 @@ pub async fn init(
|
||||||
|
|
||||||
serum_bookside_cache.insert(side_pk_string.clone(), bookside);
|
serum_bookside_cache.insert(side_pk_string.clone(), bookside);
|
||||||
}
|
}
|
||||||
Err(_) => info!("chain_cache could not find {}", side_pk),
|
Err(_) => debug!("chain_cache could not find {}", side_pk),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,637 +0,0 @@
|
||||||
use anyhow::Context;
|
|
||||||
use log::*;
|
|
||||||
use native_tls::TlsConnector;
|
|
||||||
use postgres_native_tls::MakeTlsConnector;
|
|
||||||
use postgres_query::{query, query_dyn};
|
|
||||||
use std::{collections::HashMap, convert::TryFrom, time::Duration};
|
|
||||||
|
|
||||||
use crate::{metrics::*, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate};
|
|
||||||
|
|
||||||
mod pg {
|
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, postgres_types::ToSql)]
|
|
||||||
pub enum SlotStatus {
|
|
||||||
Rooted,
|
|
||||||
Confirmed,
|
|
||||||
Processed,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<super::SlotStatus> for SlotStatus {
|
|
||||||
fn from(status: super::SlotStatus) -> SlotStatus {
|
|
||||||
match status {
|
|
||||||
super::SlotStatus::Rooted => SlotStatus::Rooted,
|
|
||||||
super::SlotStatus::Confirmed => SlotStatus::Confirmed,
|
|
||||||
super::SlotStatus::Processed => SlotStatus::Processed,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn postgres_connection(
|
|
||||||
config: &PostgresConfig,
|
|
||||||
metric_retries: MetricU64,
|
|
||||||
metric_live: MetricU64,
|
|
||||||
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
|
|
||||||
let (tx, rx) = async_channel::unbounded();
|
|
||||||
|
|
||||||
let tls = MakeTlsConnector::new(
|
|
||||||
TlsConnector::builder()
|
|
||||||
.danger_accept_invalid_certs(config.allow_invalid_certs)
|
|
||||||
.build()?,
|
|
||||||
);
|
|
||||||
|
|
||||||
let config = config.clone();
|
|
||||||
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?);
|
|
||||||
let mut metric_retries = metric_retries;
|
|
||||||
let mut metric_live = metric_live;
|
|
||||||
tokio::spawn(async move {
|
|
||||||
loop {
|
|
||||||
let (client, connection) = match initial.take() {
|
|
||||||
Some(v) => v,
|
|
||||||
None => {
|
|
||||||
let result =
|
|
||||||
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
|
|
||||||
match result {
|
|
||||||
Ok(v) => v,
|
|
||||||
Err(err) => {
|
|
||||||
warn!("could not connect to postgres: {:?}", err);
|
|
||||||
tokio::time::sleep(Duration::from_secs(
|
|
||||||
config.retry_connection_sleep_secs,
|
|
||||||
))
|
|
||||||
.await;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
tx.send(Some(client)).await.expect("send success");
|
|
||||||
metric_live.increment();
|
|
||||||
|
|
||||||
let result = connection.await;
|
|
||||||
|
|
||||||
metric_retries.increment();
|
|
||||||
metric_live.decrement();
|
|
||||||
|
|
||||||
tx.send(None).await.expect("send success");
|
|
||||||
warn!("postgres connection error: {:?}", result);
|
|
||||||
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(rx)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn update_postgres_client<'a>(
|
|
||||||
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
|
|
||||||
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
|
|
||||||
config: &PostgresConfig,
|
|
||||||
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
|
|
||||||
// get the most recent client, waiting if there's a disconnect
|
|
||||||
while !rx.is_empty() || client.is_none() {
|
|
||||||
tokio::select! {
|
|
||||||
client_raw_opt = rx.recv() => {
|
|
||||||
*client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new);
|
|
||||||
},
|
|
||||||
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
|
|
||||||
error!("waited too long for new postgres client");
|
|
||||||
std::process::exit(1);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
client.as_ref().expect("must contain value")
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn process_account_write(
|
|
||||||
client: &postgres_query::Caching<tokio_postgres::Client>,
|
|
||||||
write: &AccountWrite,
|
|
||||||
account_tables: &AccountTables,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
futures::future::try_join_all(
|
|
||||||
account_tables
|
|
||||||
.iter()
|
|
||||||
.map(|table| table.insert_account_write(client, write)),
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Slots {
|
|
||||||
// non-rooted only
|
|
||||||
slots: HashMap<u64, SlotUpdate>,
|
|
||||||
newest_processed_slot: Option<u64>,
|
|
||||||
newest_rooted_slot: Option<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default)]
|
|
||||||
struct SlotPreprocessing {
|
|
||||||
discard_duplicate: bool,
|
|
||||||
discard_old: bool,
|
|
||||||
new_processed_head: bool,
|
|
||||||
new_rooted_head: bool,
|
|
||||||
parent_update: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Slots {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
slots: HashMap::new(),
|
|
||||||
newest_processed_slot: None,
|
|
||||||
newest_rooted_slot: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn add(&mut self, update: &SlotUpdate) -> SlotPreprocessing {
|
|
||||||
let mut result = SlotPreprocessing::default();
|
|
||||||
|
|
||||||
if let Some(previous) = self.slots.get_mut(&update.slot) {
|
|
||||||
if previous.status == update.status && previous.parent == update.parent {
|
|
||||||
result.discard_duplicate = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
previous.status = update.status;
|
|
||||||
if update.parent.is_some() && previous.parent != update.parent {
|
|
||||||
previous.parent = update.parent;
|
|
||||||
result.parent_update = true;
|
|
||||||
}
|
|
||||||
} else if self.newest_rooted_slot.is_none()
|
|
||||||
|| update.slot > self.newest_rooted_slot.unwrap()
|
|
||||||
{
|
|
||||||
self.slots.insert(update.slot, update.clone());
|
|
||||||
} else {
|
|
||||||
result.discard_old = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if update.status == SlotStatus::Rooted {
|
|
||||||
let old_slots: Vec<u64> = self
|
|
||||||
.slots
|
|
||||||
.keys()
|
|
||||||
.filter(|s| **s <= update.slot)
|
|
||||||
.copied()
|
|
||||||
.collect();
|
|
||||||
for old_slot in old_slots {
|
|
||||||
self.slots.remove(&old_slot);
|
|
||||||
}
|
|
||||||
if self.newest_rooted_slot.is_none() || self.newest_rooted_slot.unwrap() < update.slot {
|
|
||||||
self.newest_rooted_slot = Some(update.slot);
|
|
||||||
result.new_rooted_head = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.newest_processed_slot.is_none() || self.newest_processed_slot.unwrap() < update.slot
|
|
||||||
{
|
|
||||||
self.newest_processed_slot = Some(update.slot);
|
|
||||||
result.new_processed_head = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
result
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_cleanup_steps(tables: &Vec<String>) -> HashMap<String, String> {
|
|
||||||
let mut steps = HashMap::<String, String>::new();
|
|
||||||
|
|
||||||
// Delete all account writes that came before the newest rooted slot except
|
|
||||||
// for the newest rooted write for each pubkey.
|
|
||||||
// This could be older rooted writes or writes in uncled slots that came
|
|
||||||
// before the newest rooted slot.
|
|
||||||
//
|
|
||||||
// Also delete _all_ writes from before the newest snapshot, because these may
|
|
||||||
// be for deleted accounts where the deletion event was missed. Snapshots
|
|
||||||
// provide a new state for all live accounts, but don't tell us about deleted
|
|
||||||
// accounts.
|
|
||||||
//
|
|
||||||
// The way this is done, by taking the newest snapshot that's at least
|
|
||||||
// min_snapshot_age behind the newest rooted slot is a workaround: we don't know
|
|
||||||
// how long it'll take to insert snapshot data, but assume it'll be done by that
|
|
||||||
// time.
|
|
||||||
let min_snapshot_age = 300;
|
|
||||||
steps.extend(
|
|
||||||
tables
|
|
||||||
.iter()
|
|
||||||
.map(|table_name| {
|
|
||||||
let sql = format!(
|
|
||||||
"WITH
|
|
||||||
newest_rooted AS (
|
|
||||||
SELECT max(slot) AS newest_rooted_slot FROM slot WHERE status = 'Rooted'),
|
|
||||||
newest_snapshot AS (
|
|
||||||
SELECT max(slot) AS newest_snapshot_slot FROM account_write, newest_rooted
|
|
||||||
WHERE write_version = 0 AND slot + {min_snapshot_age} < newest_rooted_slot)
|
|
||||||
DELETE FROM {table} AS data
|
|
||||||
USING
|
|
||||||
newest_rooted,
|
|
||||||
newest_snapshot,
|
|
||||||
(SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version
|
|
||||||
FROM {table}
|
|
||||||
LEFT JOIN slot USING(slot)
|
|
||||||
CROSS JOIN newest_rooted
|
|
||||||
WHERE slot <= newest_rooted_slot AND (status = 'Rooted' OR status is NULL)
|
|
||||||
ORDER BY pubkey_id, slot DESC, write_version DESC
|
|
||||||
) newest_rooted_write
|
|
||||||
WHERE
|
|
||||||
data.pubkey_id = newest_rooted_write.pubkey_id AND (
|
|
||||||
data.slot < newest_snapshot_slot OR (
|
|
||||||
data.slot <= newest_rooted_slot
|
|
||||||
AND (data.slot != newest_rooted_write.slot OR data.write_version != newest_rooted_write.write_version)
|
|
||||||
)
|
|
||||||
)",
|
|
||||||
table = table_name,
|
|
||||||
min_snapshot_age = min_snapshot_age,
|
|
||||||
);
|
|
||||||
(format!("delete old writes in {}", table_name), sql)
|
|
||||||
})
|
|
||||||
.collect::<HashMap<String, String>>(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Delete information about older slots
|
|
||||||
steps.insert(
|
|
||||||
"delete old slots".into(),
|
|
||||||
"DELETE FROM slot
|
|
||||||
USING (SELECT max(slot) as newest_rooted_slot FROM slot WHERE status = 'Rooted') s
|
|
||||||
WHERE slot + 1000 < newest_rooted_slot"
|
|
||||||
.into(),
|
|
||||||
);
|
|
||||||
|
|
||||||
steps
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct SlotsProcessing {}
|
|
||||||
|
|
||||||
impl SlotsProcessing {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self {}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn process(
|
|
||||||
&self,
|
|
||||||
client: &postgres_query::Caching<tokio_postgres::Client>,
|
|
||||||
update: &SlotUpdate,
|
|
||||||
meta: &SlotPreprocessing,
|
|
||||||
) -> anyhow::Result<()> {
|
|
||||||
let slot = update.slot as i64;
|
|
||||||
let status: pg::SlotStatus = update.status.into();
|
|
||||||
if let Some(parent) = update.parent {
|
|
||||||
let parent = parent as i64;
|
|
||||||
let query = query!(
|
|
||||||
"INSERT INTO slot
|
|
||||||
(slot, parent, status, uncle)
|
|
||||||
VALUES
|
|
||||||
($slot, $parent, $status, FALSE)
|
|
||||||
ON CONFLICT (slot) DO UPDATE SET
|
|
||||||
parent=$parent, status=$status",
|
|
||||||
slot,
|
|
||||||
parent,
|
|
||||||
status,
|
|
||||||
);
|
|
||||||
let _ = query.execute(client).await.context("updating slot row")?;
|
|
||||||
} else {
|
|
||||||
let query = query!(
|
|
||||||
"INSERT INTO slot
|
|
||||||
(slot, parent, status, uncle)
|
|
||||||
VALUES
|
|
||||||
($slot, NULL, $status, FALSE)
|
|
||||||
ON CONFLICT (slot) DO UPDATE SET
|
|
||||||
status=$status",
|
|
||||||
slot,
|
|
||||||
status,
|
|
||||||
);
|
|
||||||
let _ = query.execute(client).await.context("updating slot row")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if meta.new_rooted_head {
|
|
||||||
let slot = update.slot as i64;
|
|
||||||
// Mark preceeding non-uncle slots as rooted
|
|
||||||
let query = query!(
|
|
||||||
"UPDATE slot SET status = 'Rooted'
|
|
||||||
WHERE slot < $newest_final_slot
|
|
||||||
AND (NOT uncle)
|
|
||||||
AND status != 'Rooted'",
|
|
||||||
newest_final_slot = slot
|
|
||||||
);
|
|
||||||
let _ = query
|
|
||||||
.execute(client)
|
|
||||||
.await
|
|
||||||
.context("updating preceding non-rooted slots")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if meta.new_processed_head || meta.parent_update {
|
|
||||||
// update the uncle column for the chain of slots from the
|
|
||||||
// newest down the the first rooted slot
|
|
||||||
let query = query!(
|
|
||||||
"WITH RECURSIVE
|
|
||||||
liveslots AS (
|
|
||||||
SELECT slot.*, 0 AS depth FROM slot
|
|
||||||
WHERE slot = (SELECT max(slot) FROM slot)
|
|
||||||
UNION ALL
|
|
||||||
SELECT s.*, depth + 1 FROM slot s
|
|
||||||
INNER JOIN liveslots l ON s.slot = l.parent
|
|
||||||
WHERE l.status != 'Rooted' AND depth < 1000
|
|
||||||
),
|
|
||||||
min_slot AS (SELECT min(slot) AS min_slot FROM liveslots)
|
|
||||||
UPDATE slot SET
|
|
||||||
uncle = NOT EXISTS (SELECT 1 FROM liveslots WHERE liveslots.slot = slot.slot)
|
|
||||||
FROM min_slot
|
|
||||||
WHERE slot >= min_slot;"
|
|
||||||
);
|
|
||||||
let _ = query
|
|
||||||
.execute(client)
|
|
||||||
.await
|
|
||||||
.context("recomputing slot uncle status")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!("slot update done {}", update.slot);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn secs_since_epoch() -> u64 {
|
|
||||||
std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.expect("Time went backwards")
|
|
||||||
.as_secs()
|
|
||||||
}
|
|
||||||
fn epoch_secs_to_time(secs: u64) -> std::time::SystemTime {
|
|
||||||
std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn init(
|
|
||||||
config: &PostgresConfig,
|
|
||||||
account_tables: AccountTables,
|
|
||||||
metrics_sender: Metrics,
|
|
||||||
) -> anyhow::Result<(
|
|
||||||
async_channel::Sender<AccountWrite>,
|
|
||||||
async_channel::Sender<SlotUpdate>,
|
|
||||||
)> {
|
|
||||||
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
|
||||||
let (account_write_queue_sender, account_write_queue_receiver) =
|
|
||||||
async_channel::bounded::<AccountWrite>(config.account_write_max_queue_size);
|
|
||||||
|
|
||||||
// Slot updates flowing from the outside into the single processing thread. From
|
|
||||||
// there they'll flow into the postgres sending thread.
|
|
||||||
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
|
|
||||||
let (slot_inserter_sender, slot_inserter_receiver) =
|
|
||||||
async_channel::unbounded::<(SlotUpdate, SlotPreprocessing)>();
|
|
||||||
|
|
||||||
let metric_con_retries =
|
|
||||||
metrics_sender.register_u64("postgres_connection_retries".into(), MetricType::Counter);
|
|
||||||
let metric_con_live =
|
|
||||||
metrics_sender.register_u64("postgres_connections_alive".into(), MetricType::Gauge);
|
|
||||||
|
|
||||||
// postgres account write sending worker threads
|
|
||||||
for _ in 0..config.account_write_connection_count {
|
|
||||||
let postgres_account_writes =
|
|
||||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
|
||||||
.await?;
|
|
||||||
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
|
||||||
let account_tables_c = account_tables.clone();
|
|
||||||
let config = config.clone();
|
|
||||||
let mut metric_retries = metrics_sender
|
|
||||||
.register_u64("postgres_account_write_retries".into(), MetricType::Counter);
|
|
||||||
let mut metric_last_write = metrics_sender.register_u64(
|
|
||||||
"postgres_account_write_last_write_timestamp".into(),
|
|
||||||
MetricType::Gauge,
|
|
||||||
);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut client_opt = None;
|
|
||||||
loop {
|
|
||||||
// Retrieve up to batch_size account writes
|
|
||||||
let mut write_batch = Vec::new();
|
|
||||||
write_batch.push(
|
|
||||||
account_write_queue_receiver_c
|
|
||||||
.recv()
|
|
||||||
.await
|
|
||||||
.expect("sender must stay alive"),
|
|
||||||
);
|
|
||||||
while write_batch.len() < config.account_write_max_batch_size {
|
|
||||||
match account_write_queue_receiver_c.try_recv() {
|
|
||||||
Ok(write) => write_batch.push(write),
|
|
||||||
Err(async_channel::TryRecvError::Empty) => break,
|
|
||||||
Err(async_channel::TryRecvError::Closed) => {
|
|
||||||
panic!("sender must stay alive")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(
|
|
||||||
"account write, batch {}, channel size {}",
|
|
||||||
write_batch.len(),
|
|
||||||
account_write_queue_receiver_c.len(),
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut error_count = 0;
|
|
||||||
loop {
|
|
||||||
let client =
|
|
||||||
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
|
|
||||||
.await;
|
|
||||||
let mut results = futures::future::join_all(
|
|
||||||
write_batch
|
|
||||||
.iter()
|
|
||||||
.map(|write| process_account_write(client, &write, &account_tables_c)),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
let mut iter = results.iter();
|
|
||||||
write_batch.retain(|_| iter.next().unwrap().is_err());
|
|
||||||
if write_batch.len() > 0 {
|
|
||||||
metric_retries.add(write_batch.len() as u64);
|
|
||||||
error_count += 1;
|
|
||||||
if error_count - 1 < config.retry_query_max_count {
|
|
||||||
results.retain(|r| r.is_err());
|
|
||||||
warn!("failed to process account write, retrying: {:?}", results);
|
|
||||||
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
|
|
||||||
.await;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
error!("failed to process account write, exiting");
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
metric_last_write.set_max(secs_since_epoch());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// slot update handling thread
|
|
||||||
let mut metric_slot_queue =
|
|
||||||
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut slots = Slots::new();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let update = slot_queue_receiver
|
|
||||||
.recv()
|
|
||||||
.await
|
|
||||||
.expect("sender must stay alive");
|
|
||||||
trace!(
|
|
||||||
"slot update {}, channel size {}",
|
|
||||||
update.slot,
|
|
||||||
slot_queue_receiver.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Check if we already know about the slot, or it is outdated
|
|
||||||
let slot_preprocessing = slots.add(&update);
|
|
||||||
if slot_preprocessing.discard_duplicate || slot_preprocessing.discard_old {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
slot_inserter_sender
|
|
||||||
.send((update, slot_preprocessing))
|
|
||||||
.await
|
|
||||||
.expect("sending must succeed");
|
|
||||||
metric_slot_queue.set(slot_inserter_sender.len() as u64);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// postgres slot update worker threads
|
|
||||||
let slots_processing = SlotsProcessing::new();
|
|
||||||
for _ in 0..config.slot_update_connection_count {
|
|
||||||
let postgres_slot =
|
|
||||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
|
||||||
.await?;
|
|
||||||
let receiver_c = slot_inserter_receiver.clone();
|
|
||||||
let config = config.clone();
|
|
||||||
let mut metric_retries =
|
|
||||||
metrics_sender.register_u64("postgres_slot_update_retries".into(), MetricType::Counter);
|
|
||||||
let mut metric_last_write = metrics_sender.register_u64(
|
|
||||||
"postgres_slot_last_write_timestamp".into(),
|
|
||||||
MetricType::Gauge,
|
|
||||||
);
|
|
||||||
let slots_processing = slots_processing.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut client_opt = None;
|
|
||||||
loop {
|
|
||||||
let (update, preprocessing) =
|
|
||||||
receiver_c.recv().await.expect("sender must stay alive");
|
|
||||||
trace!("slot insertion, slot {}", update.slot);
|
|
||||||
|
|
||||||
let mut error_count = 0;
|
|
||||||
loop {
|
|
||||||
let client =
|
|
||||||
update_postgres_client(&mut client_opt, &postgres_slot, &config).await;
|
|
||||||
if let Err(err) = slots_processing
|
|
||||||
.process(client, &update, &preprocessing)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
metric_retries.increment();
|
|
||||||
error_count += 1;
|
|
||||||
if error_count - 1 < config.retry_query_max_count {
|
|
||||||
warn!("failed to process slot update, retrying: {:?}", err);
|
|
||||||
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
|
|
||||||
.await;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
error!("failed to process slot update, exiting");
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
metric_last_write.set_max(secs_since_epoch());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// postgres cleanup thread
|
|
||||||
if config.cleanup_interval_secs > 0 {
|
|
||||||
let table_names: Vec<String> = account_tables
|
|
||||||
.iter()
|
|
||||||
.map(|table| table.table_name().to_string())
|
|
||||||
.collect();
|
|
||||||
let cleanup_steps = make_cleanup_steps(&table_names);
|
|
||||||
|
|
||||||
let postgres_con =
|
|
||||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
|
||||||
.await?;
|
|
||||||
let mut metric_last_cleanup = metrics_sender.register_u64(
|
|
||||||
"postgres_cleanup_last_success_timestamp".into(),
|
|
||||||
MetricType::Gauge,
|
|
||||||
);
|
|
||||||
let mut metric_cleanup_errors =
|
|
||||||
metrics_sender.register_u64("postgres_cleanup_errors".into(), MetricType::Counter);
|
|
||||||
let config = config.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut client_opt = None;
|
|
||||||
loop {
|
|
||||||
tokio::time::sleep(Duration::from_secs(config.cleanup_interval_secs)).await;
|
|
||||||
let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await;
|
|
||||||
|
|
||||||
let mut all_successful = true;
|
|
||||||
for (name, cleanup_sql) in &cleanup_steps {
|
|
||||||
let query = query_dyn!(&cleanup_sql).unwrap();
|
|
||||||
if let Err(err) = query.execute(client).await {
|
|
||||||
warn!("failed to process cleanup step {}: {:?}", name, err);
|
|
||||||
metric_cleanup_errors.increment();
|
|
||||||
all_successful = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if all_successful {
|
|
||||||
metric_last_cleanup.set_max(secs_since_epoch());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// postgres metrics/monitoring thread
|
|
||||||
{
|
|
||||||
let postgres_con =
|
|
||||||
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
|
|
||||||
.await?;
|
|
||||||
let metric_slot_last_write = metrics_sender.register_u64(
|
|
||||||
"postgres_slot_last_write_timestamp".into(),
|
|
||||||
MetricType::Gauge,
|
|
||||||
);
|
|
||||||
let metric_account_write_last_write = metrics_sender.register_u64(
|
|
||||||
"postgres_account_write_last_write_timestamp".into(),
|
|
||||||
MetricType::Gauge,
|
|
||||||
);
|
|
||||||
let metric_account_queue =
|
|
||||||
metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge);
|
|
||||||
let metric_slot_queue =
|
|
||||||
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
|
|
||||||
let config = config.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut client_opt = None;
|
|
||||||
loop {
|
|
||||||
tokio::time::sleep(Duration::from_secs(config.monitoring_update_interval_secs))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await;
|
|
||||||
let last_update = std::time::SystemTime::now();
|
|
||||||
let last_slot_write = epoch_secs_to_time(metric_slot_last_write.value());
|
|
||||||
let last_account_write_write =
|
|
||||||
epoch_secs_to_time(metric_account_write_last_write.value());
|
|
||||||
let slot_queue = i64::try_from(metric_slot_queue.value()).unwrap();
|
|
||||||
let account_write_queue = i64::try_from(metric_account_queue.value()).unwrap();
|
|
||||||
let query = query!(
|
|
||||||
"INSERT INTO monitoring
|
|
||||||
(name, last_update, last_slot_write, last_account_write_write, slot_queue, account_write_queue)
|
|
||||||
VALUES
|
|
||||||
($name, $last_update, $last_slot_write, $last_account_write_write, $slot_queue, $account_write_queue)
|
|
||||||
ON CONFLICT (name) DO UPDATE SET
|
|
||||||
last_update=$last_update,
|
|
||||||
last_slot_write=$last_slot_write,
|
|
||||||
last_account_write_write=$last_account_write_write,
|
|
||||||
slot_queue=$slot_queue,
|
|
||||||
account_write_queue=$account_write_queue
|
|
||||||
",
|
|
||||||
name = config.monitoring_name,
|
|
||||||
last_update,
|
|
||||||
last_slot_write,
|
|
||||||
last_account_write_write,
|
|
||||||
slot_queue,
|
|
||||||
account_write_queue,
|
|
||||||
);
|
|
||||||
if let Err(err) = query
|
|
||||||
.execute(client)
|
|
||||||
.await
|
|
||||||
.context("updating monitoring table")
|
|
||||||
{
|
|
||||||
warn!("failed to process monitoring update: {:?}", err);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((account_write_queue_sender, slot_queue_sender))
|
|
||||||
}
|
|
|
@ -1,9 +1,8 @@
|
||||||
use anchor_client::{
|
use anchor_client::{
|
||||||
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
|
solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair},
|
||||||
Cluster,
|
Cluster,
|
||||||
};
|
};
|
||||||
use anchor_lang::prelude::Pubkey;
|
use anchor_lang::prelude::Pubkey;
|
||||||
use bytemuck::cast_slice;
|
|
||||||
use client::{Client, MangoGroupContext};
|
use client::{Client, MangoGroupContext};
|
||||||
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
||||||
use futures_util::{
|
use futures_util::{
|
||||||
|
@ -13,7 +12,6 @@ use futures_util::{
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
convert::identity,
|
|
||||||
fs::File,
|
fs::File,
|
||||||
io::Read,
|
io::Read,
|
||||||
net::SocketAddr,
|
net::SocketAddr,
|
||||||
|
@ -30,8 +28,11 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
||||||
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use solana_geyser_connector_lib::{
|
use solana_geyser_connector_lib::{
|
||||||
|
fill_event_filter::FillEventType,
|
||||||
|
fill_event_postgres_target,
|
||||||
metrics::{MetricType, MetricU64},
|
metrics::{MetricType, MetricU64},
|
||||||
FilterConfig, StatusResponse,
|
orderbook_filter::MarketConfig,
|
||||||
|
FilterConfig, PostgresConfig, PostgresTlsConfig, StatusResponse,
|
||||||
};
|
};
|
||||||
use solana_geyser_connector_lib::{
|
use solana_geyser_connector_lib::{
|
||||||
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
|
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
|
||||||
|
@ -308,44 +309,70 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.await?,
|
.await?,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// todo: reload markets at intervals
|
||||||
|
let perp_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
|
||||||
|
.perp_markets
|
||||||
|
.iter()
|
||||||
|
.map(|(_, context)| {
|
||||||
|
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index)
|
||||||
|
{
|
||||||
|
Some(token) => token.decimals,
|
||||||
|
None => panic!("token not found for market"), // todo: default to 6 for usdc?
|
||||||
|
};
|
||||||
|
(
|
||||||
|
context.address,
|
||||||
|
MarketConfig {
|
||||||
|
name: context.market.name().to_owned(),
|
||||||
|
bids: context.market.bids,
|
||||||
|
asks: context.market.asks,
|
||||||
|
event_queue: context.market.event_queue,
|
||||||
|
base_decimals: context.market.base_decimals,
|
||||||
|
quote_decimals,
|
||||||
|
base_lot_size: context.market.base_lot_size,
|
||||||
|
quote_lot_size: context.market.quote_lot_size,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let spot_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
|
||||||
|
.serum3_markets
|
||||||
|
.iter()
|
||||||
|
.map(|(_, context)| {
|
||||||
|
let base_decimals = match group_context.tokens.get(&context.market.base_token_index) {
|
||||||
|
Some(token) => token.decimals,
|
||||||
|
None => panic!("token not found for market"), // todo: default?
|
||||||
|
};
|
||||||
|
let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) {
|
||||||
|
Some(token) => token.decimals,
|
||||||
|
None => panic!("token not found for market"), // todo: default to 6 for usdc?
|
||||||
|
};
|
||||||
|
(
|
||||||
|
context.market.serum_market_external,
|
||||||
|
MarketConfig {
|
||||||
|
name: context.market.name().to_owned(),
|
||||||
|
bids: context.bids,
|
||||||
|
asks: context.asks,
|
||||||
|
event_queue: context.event_q,
|
||||||
|
base_decimals,
|
||||||
|
quote_decimals,
|
||||||
|
base_lot_size: context.pc_lot_size as i64,
|
||||||
|
quote_lot_size: context.coin_lot_size as i64,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
|
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
|
||||||
.perp_markets
|
.perp_markets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, context)| (context.address, context.market.event_queue))
|
.map(|(_, context)| (context.address, context.market.event_queue))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let serum_market_pks: Vec<Pubkey> = group_context
|
let spot_queue_pks: Vec<(Pubkey, Pubkey)> = spot_market_configs
|
||||||
.serum3_markets
|
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, context)| context.market.serum_market_external)
|
.map(|x| (x.0, x.1.event_queue))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let serum_market_ais = client
|
|
||||||
.rpc_async()
|
|
||||||
.get_multiple_accounts(serum_market_pks.as_slice())
|
|
||||||
.await?;
|
|
||||||
let serum_market_ais: Vec<&Account> = serum_market_ais
|
|
||||||
.iter()
|
|
||||||
.filter_map(|maybe_ai| match maybe_ai {
|
|
||||||
Some(ai) => Some(ai),
|
|
||||||
None => None,
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let serum_queue_pks: Vec<(Pubkey, Pubkey)> = serum_market_ais
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|pair| {
|
|
||||||
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
|
|
||||||
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
|
|
||||||
);
|
|
||||||
(
|
|
||||||
serum_market_pks[pair.0],
|
|
||||||
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let a: Vec<(String, String)> = group_context
|
let a: Vec<(String, String)> = group_context
|
||||||
.serum3_markets
|
.serum3_markets
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -368,9 +395,28 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.collect();
|
.collect();
|
||||||
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
|
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
|
||||||
|
|
||||||
|
// TODO: read all this from config
|
||||||
|
let pgconf = PostgresConfig {
|
||||||
|
connection_string: "$PG_CONNECTION_STRING".to_owned(),
|
||||||
|
connection_count: 1,
|
||||||
|
max_batch_size: 1,
|
||||||
|
max_queue_size: 50_000,
|
||||||
|
retry_query_max_count: 10,
|
||||||
|
retry_query_sleep_secs: 2,
|
||||||
|
retry_connection_sleep_secs: 10,
|
||||||
|
fatal_connection_timeout_secs: 120,
|
||||||
|
allow_invalid_certs: true,
|
||||||
|
tls: Some(PostgresTlsConfig {
|
||||||
|
ca_cert_path: "$PG_CA_CERT".to_owned(),
|
||||||
|
client_key_path: "$PG_CLIENT_KEY".to_owned(),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
let postgres_update_sender =
|
||||||
|
fill_event_postgres_target::init(&pgconf, metrics_tx.clone()).await?;
|
||||||
|
|
||||||
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
|
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
|
||||||
perp_queue_pks.clone(),
|
perp_market_configs.clone(),
|
||||||
serum_queue_pks.clone(),
|
spot_market_configs.clone(),
|
||||||
metrics_tx.clone(),
|
metrics_tx.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -389,19 +435,36 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let message = fill_receiver.recv().await.unwrap();
|
let message = fill_receiver.recv().await.unwrap();
|
||||||
match message {
|
match message {
|
||||||
FillEventFilterMessage::Update(update) => {
|
FillEventFilterMessage::Update(update) => {
|
||||||
debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type);
|
debug!(
|
||||||
|
"ws update {} {:?} {:?} fill",
|
||||||
|
update.market_name, update.status, update.event.event_type
|
||||||
|
);
|
||||||
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
||||||
for (addr, peer) in peer_copy.iter_mut() {
|
for (addr, peer) in peer_copy.iter_mut() {
|
||||||
let json = serde_json::to_string(&update).unwrap();
|
let json = serde_json::to_string(&update.clone()).unwrap();
|
||||||
|
|
||||||
// only send updates if the peer is subscribed
|
// only send updates if the peer is subscribed
|
||||||
if peer.subscriptions.contains(&update.market) {
|
if peer.subscriptions.contains(&update.market_key) {
|
||||||
let result = peer.sender.send(Message::Text(json)).await;
|
let result = peer.sender.send(Message::Text(json)).await;
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
error!("ws update {} fill could not reach {}", update.market, addr);
|
error!(
|
||||||
|
"ws update {} fill could not reach {}",
|
||||||
|
update.market_name, addr
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// send taker fills to db
|
||||||
|
let update_c = update.clone();
|
||||||
|
match update_c.event.event_type {
|
||||||
|
FillEventType::Perp => {
|
||||||
|
if !update_c.event.maker {
|
||||||
|
debug!("{:?}", update_c);
|
||||||
|
postgres_update_sender.send(update_c).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => warn!("failed to write spot event to db"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
FillEventFilterMessage::Checkpoint(checkpoint) => {
|
FillEventFilterMessage::Checkpoint(checkpoint) => {
|
||||||
checkpoints_ref_thread
|
checkpoints_ref_thread
|
||||||
|
@ -461,7 +524,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.collect::<String>()
|
.collect::<String>()
|
||||||
);
|
);
|
||||||
let use_geyser = true;
|
let use_geyser = true;
|
||||||
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
|
let all_queue_pks = [perp_queue_pks.clone()].concat();
|
||||||
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect();
|
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect();
|
||||||
let filter_config = FilterConfig {
|
let filter_config = FilterConfig {
|
||||||
program_ids: vec![],
|
program_ids: vec![],
|
||||||
|
|
|
@ -309,6 +309,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
name: context.market.name().to_owned(),
|
name: context.market.name().to_owned(),
|
||||||
bids: context.market.bids,
|
bids: context.market.bids,
|
||||||
asks: context.market.asks,
|
asks: context.market.asks,
|
||||||
|
event_queue: context.market.event_queue,
|
||||||
base_decimals: context.market.base_decimals,
|
base_decimals: context.market.base_decimals,
|
||||||
quote_decimals,
|
quote_decimals,
|
||||||
base_lot_size: context.market.base_lot_size,
|
base_lot_size: context.market.base_lot_size,
|
||||||
|
@ -336,6 +337,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
name: context.market.name().to_owned(),
|
name: context.market.name().to_owned(),
|
||||||
bids: context.bids,
|
bids: context.bids,
|
||||||
asks: context.asks,
|
asks: context.asks,
|
||||||
|
event_queue: context.event_q,
|
||||||
base_decimals,
|
base_decimals,
|
||||||
quote_decimals,
|
quote_decimals,
|
||||||
base_lot_size: context.pc_lot_size as i64,
|
base_lot_size: context.pc_lot_size as i64,
|
||||||
|
@ -355,7 +357,12 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) =
|
let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) =
|
||||||
orderbook_filter::init(market_configs, serum_market_configs, metrics_tx.clone()).await?;
|
orderbook_filter::init(
|
||||||
|
market_configs.clone(),
|
||||||
|
serum_market_configs.clone(),
|
||||||
|
metrics_tx.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let checkpoints_ref_thread = checkpoints.clone();
|
let checkpoints_ref_thread = checkpoints.clone();
|
||||||
let peers_ref_thread = peers.clone();
|
let peers_ref_thread = peers.clone();
|
||||||
|
@ -422,12 +429,14 @@ async fn main() -> anyhow::Result<()> {
|
||||||
);
|
);
|
||||||
let use_geyser = true;
|
let use_geyser = true;
|
||||||
if use_geyser {
|
if use_geyser {
|
||||||
|
let relevant_pubkeys = [market_configs.clone()]
|
||||||
|
.concat()
|
||||||
|
.iter()
|
||||||
|
.flat_map(|m| [m.1.bids.to_string(), m.1.asks.to_string()])
|
||||||
|
.collect();
|
||||||
let filter_config = FilterConfig {
|
let filter_config = FilterConfig {
|
||||||
program_ids: vec![
|
program_ids: vec![],
|
||||||
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
|
account_ids: relevant_pubkeys,
|
||||||
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
|
|
||||||
],
|
|
||||||
account_ids: vec![],
|
|
||||||
};
|
};
|
||||||
grpc_plugin_source::process_events(
|
grpc_plugin_source::process_events(
|
||||||
&config.source,
|
&config.source,
|
||||||
|
|
Loading…
Reference in New Issue