Compare commits
3 Commits
464946dc16
...
b89faa1ed0
Author | SHA1 | Date |
---|---|---|
Riordan Panayides | b89faa1ed0 | |
Riordan Panayides | c838c58ca6 | |
Riordan Panayides | f45e614765 |
178
README.md
178
README.md
|
@ -1,184 +1,22 @@
|
||||||
# Overview
|
# mango-geyser-services
|
||||||
|
|
||||||
This project is about streaming Solana account updates for a specific program
|
Mango v4 Geyser Services
|
||||||
into other databases or event queues.
|
|
||||||
|
|
||||||
Having an up to date version of all account data data in a database is
|
|
||||||
particularly useful for queries that need access to all accounts. For example,
|
|
||||||
retrieving the addresses of Mango Markets accounts with the largest unrealized
|
|
||||||
PnL goes from "getProgramAccounts from a Solana node for 50MB of data and
|
|
||||||
compute locally (3-10s total)" to "run a SQL query (150ms total)".
|
|
||||||
|
|
||||||
The database could also be used as a backend for serving `getMultipleAccounts`
|
|
||||||
and `getProgramAccounts` queries generally. That would reduce load on Solana
|
|
||||||
RPCt nodes while decreasing response times.
|
|
||||||
|
|
||||||
Supported Solana sources:
|
|
||||||
|
|
||||||
- Geyser plugin (preferred) plus JSONRPC HTTP API (for initial snapshots)
|
|
||||||
|
|
||||||
Unfinished Solana sources:
|
|
||||||
|
|
||||||
- JSONRPC websocket subscriptions plus JSONRPC HTTP API (for initial snapshots)
|
|
||||||
|
|
||||||
Supported targets:
|
|
||||||
|
|
||||||
- PostgreSQL
|
|
||||||
|
|
||||||
# Components
|
# Components
|
||||||
|
|
||||||
- [`geyser-plugin-grpc/`](geyser-plugin-grpc/)
|
|
||||||
|
|
||||||
The Solana Geyser plugin. It opens a gRPC server (see [`proto/`](proto/)) and
|
|
||||||
broadcasts account and slot updates to all clients that connect.
|
|
||||||
|
|
||||||
- [`lib/`](lib/)
|
- [`lib/`](lib/)
|
||||||
|
|
||||||
The connector abstractions that the connector service is built from.
|
Tools for building services
|
||||||
|
|
||||||
Projects may want to use it to build their own connector service and decode
|
|
||||||
their specific account data before sending it into target systems.
|
|
||||||
|
|
||||||
- [`connector-raw/`](connector-raw/)
|
|
||||||
|
|
||||||
A connector binary built on lib/ that stores raw binary account data in
|
|
||||||
PostgreSQL.
|
|
||||||
|
|
||||||
- [`connector-mango/`](connector-mango/)
|
|
||||||
|
|
||||||
A connector binary built on lib/ that decodes Mango account types before
|
|
||||||
storing them in PostgeSQL.
|
|
||||||
|
|
||||||
- [`service-mango-fills/`](service-mango-fills/)
|
- [`service-mango-fills/`](service-mango-fills/)
|
||||||
|
|
||||||
A service providing lowest-latency, bandwidth conserving access to fill events
|
A service providing lowest-latency, bandwidth conserving access to fill events for Mango V4 Perp and Openbook markets
|
||||||
as they are processed by the rpc node.
|
as they are processed by the rpc node.
|
||||||
|
|
||||||
# Setup Tutorial
|
- [`service-mango-pnl/`](service-mango-pnl/)
|
||||||
|
|
||||||
1. Compile the project.
|
A service providing pre-computed account lists ordered by unsettled PnL per market
|
||||||
|
|
||||||
Make sure that you are using _exactly_ the same Rust version for compiling
|
- [`service-mango-orderbook/`](service-mango-pnl/)
|
||||||
the Geyser plugin that was used for compiling your `solana-validator`!
|
|
||||||
Otherwise the plugin will crash the validator during startup!
|
|
||||||
|
|
||||||
2. Prepare the plugin configuration file.
|
A service providing Orderbook L2 state and delta updates for Mango V4 Perp and Openbook Spot markets
|
||||||
|
|
||||||
[Here is an example](geyser-plugin-grpc/example-config.json). This file
|
|
||||||
points the validator to your plugin shared library, controls which accounts
|
|
||||||
will be exported, which address the gRPC server will bind to and internal
|
|
||||||
queue sizes.
|
|
||||||
|
|
||||||
3. Run `solana-validator` with `--geyser-plugin-config myconfig.json`.
|
|
||||||
|
|
||||||
Check the logs to ensure the plugin was loaded.
|
|
||||||
|
|
||||||
4. Prepare the connector configuration file.
|
|
||||||
|
|
||||||
[Here is an example](connector-raw/example-config.toml).
|
|
||||||
|
|
||||||
- `rpc_ws_url` is unused and can stay empty.
|
|
||||||
- `connection_string` for your `grpc_sources` must point to the gRPC server
|
|
||||||
address configured for the plugin.
|
|
||||||
- `rpc_http_url` must point to the JSON-RPC URL.
|
|
||||||
- `connection_string` for your `posgres_target` uses
|
|
||||||
[the tokio-postgres syntax](https://docs.rs/tokio-postgres/0.7.5/tokio_postgres/config/struct.Config.html)
|
|
||||||
- `program_id` must match what is configured for the gRPC plugin
|
|
||||||
|
|
||||||
5. Prepare the PostgreSQL schema.
|
|
||||||
|
|
||||||
Use [this example script](connector-raw/scripts/create_schema.sql).
|
|
||||||
|
|
||||||
6. Start the connector service binary.
|
|
||||||
|
|
||||||
Pass the path to the config file as the first argument. It logs to stdout. It
|
|
||||||
should be restarted on exit. (it intentionally terminates when postgres is
|
|
||||||
unreachable for too long, for example)
|
|
||||||
|
|
||||||
7. Monitor the logs
|
|
||||||
|
|
||||||
`WARN` messages can be recovered from. `ERROR` messages need attention.
|
|
||||||
|
|
||||||
Check the metrics for `account_write_queue` and `slot_update_queue`: They
|
|
||||||
should be around 0. If they keep growing the service can't keep up and you'll
|
|
||||||
need to figure out what's up.
|
|
||||||
|
|
||||||
# Design and Reliability
|
|
||||||
|
|
||||||
```
|
|
||||||
Solana ---------------> Connector -----------> PostgreSQL
|
|
||||||
nodes jsonrpc/gRPC nodes
|
|
||||||
```
|
|
||||||
|
|
||||||
For reliability it is recommended to feed data from multiple Solana nodes into
|
|
||||||
each Connector node.
|
|
||||||
|
|
||||||
It is also allowed to run multiple Connector nodes that target the same
|
|
||||||
PostgeSQL target database.
|
|
||||||
|
|
||||||
The Connector service is stateless (except for some caches). Restarting it is
|
|
||||||
always safe.
|
|
||||||
|
|
||||||
If the Solana node is down, the Connector service attempts to reconnect and then
|
|
||||||
requests a new data snapshot if necessary.
|
|
||||||
|
|
||||||
If PostgeSQL is down temporarily, the Connector service caches updates and
|
|
||||||
applies them when the database is back up.
|
|
||||||
|
|
||||||
If PostgreSQL is down for a longer time, the Connector service exits with an
|
|
||||||
error. On restart, it pauses until PostgreSQL is back up, and then starts
|
|
||||||
pulling data from the Solana nodes again.
|
|
||||||
|
|
||||||
# PostgreSQL data layout
|
|
||||||
|
|
||||||
See `scripts/` for SQL that creates the target schema.
|
|
||||||
|
|
||||||
The Connector streams data into the `account_write` and `slot` tables. When
|
|
||||||
slots become "rooted", older `account_write` data rooted slots is deleted. That
|
|
||||||
way the current account data for the latest rooted, confirmed or processed slot
|
|
||||||
can be queried, but older data is forgotten.
|
|
||||||
|
|
||||||
When new slots arrive, the `uncle` column is updated for "processed" and
|
|
||||||
"confirmed" slots to allow easy filtering of slots that are no longer part of
|
|
||||||
the chain.
|
|
||||||
|
|
||||||
Example for querying confirmed data:
|
|
||||||
|
|
||||||
```
|
|
||||||
SELECT DISTINCT ON(pubkey_id)
|
|
||||||
pubkey, account_write.*
|
|
||||||
FROM account_write
|
|
||||||
LEFT JOIN slot USING(slot)
|
|
||||||
INNER JOIN pubkey USING(pubkey_id)
|
|
||||||
WHERE status = 'Rooted' OR status IS NULL OR (uncle = FALSE AND status = 'Confirmed')
|
|
||||||
ORDER BY pubkey_id, slot DESC, write_version DESC;
|
|
||||||
```
|
|
||||||
|
|
||||||
For each pubkey, this gets the latest (most recent slot, most recent
|
|
||||||
write_version) account data; limited to slots that are either rooted or
|
|
||||||
(confirmed and not an uncle).
|
|
||||||
|
|
||||||
# Fill Service Setup
|
|
||||||
|
|
||||||
1. Prepare the connector configuration file.
|
|
||||||
|
|
||||||
[Here is an example](service-mango-fills/example-config.toml).
|
|
||||||
|
|
||||||
- `bind_ws_addr` is the listen port for the websocket clients
|
|
||||||
- `rpc_ws_url` is unused and can stay empty.
|
|
||||||
- `connection_string` for your `grpc_sources` must point to the gRPC server
|
|
||||||
address configured for the plugin.
|
|
||||||
- `rpc_http_url` must point to the JSON-RPC URL.
|
|
||||||
- `program_id` must match what is configured for the gRPC plugin
|
|
||||||
- `markets` need to contain all observed perp markets
|
|
||||||
|
|
||||||
2. Start the service binary.
|
|
||||||
|
|
||||||
Pass the path to the config file as the first argument. It logs to stdout. It
|
|
||||||
should be restarted on exit.
|
|
||||||
|
|
||||||
3. Monitor the logs
|
|
||||||
|
|
||||||
`WARN` messages can be recovered from. `ERROR` messages need attention. The
|
|
||||||
logs are very spammy changing the default log level is recommended when you
|
|
||||||
dont want to analyze performance of the service.
|
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
app = "mango-pnl"
|
||||||
|
kill_signal = "SIGINT"
|
||||||
|
kill_timeout = 5
|
||||||
|
|
||||||
|
[build]
|
||||||
|
dockerfile = "../Dockerfile"
|
||||||
|
|
||||||
|
[experimental]
|
||||||
|
cmd = ["service-mango-pnl", "pnl-config.toml"]
|
||||||
|
|
||||||
|
[[services]]
|
||||||
|
internal_port = 8081
|
||||||
|
processes = ["app"]
|
||||||
|
protocol = "tcp"
|
||||||
|
|
||||||
|
[services.concurrency]
|
||||||
|
hard_limit = 1024
|
||||||
|
soft_limit = 1024
|
||||||
|
type = "connections"
|
||||||
|
|
||||||
|
[metrics]
|
||||||
|
path = "/metrics"
|
||||||
|
port = 9091
|
|
@ -3,12 +3,8 @@ app = "mango-geyser-services"
|
||||||
kill_signal = "SIGINT"
|
kill_signal = "SIGINT"
|
||||||
kill_timeout = 5
|
kill_timeout = 5
|
||||||
|
|
||||||
# [build]
|
|
||||||
# image = "us-docker.pkg.dev/mango-markets/gcr.io/mango-geyser-services:latest"
|
|
||||||
|
|
||||||
[processes]
|
[processes]
|
||||||
fills = "service-mango-fills fills-config.toml"
|
fills = "service-mango-fills fills-config.toml"
|
||||||
pnl = "service-mango-pnl pnl-config.toml"
|
|
||||||
orderbook = "service-mango-orderbook orderbook-config.toml"
|
orderbook = "service-mango-orderbook orderbook-config.toml"
|
||||||
|
|
||||||
[[services]]
|
[[services]]
|
||||||
|
@ -25,20 +21,6 @@ kill_timeout = 5
|
||||||
hard_limit = 1024
|
hard_limit = 1024
|
||||||
soft_limit = 1024
|
soft_limit = 1024
|
||||||
|
|
||||||
[[services]]
|
|
||||||
processes = ["pnl"]
|
|
||||||
internal_port = 8081
|
|
||||||
protocol = "tcp"
|
|
||||||
|
|
||||||
[[services.ports]]
|
|
||||||
handlers = ["tls", "http"]
|
|
||||||
port = 8081
|
|
||||||
|
|
||||||
[services.concurrency]
|
|
||||||
type = "connections"
|
|
||||||
hard_limit = 1024
|
|
||||||
soft_limit = 1024
|
|
||||||
|
|
||||||
[[services]]
|
[[services]]
|
||||||
processes = ["orderbook"]
|
processes = ["orderbook"]
|
||||||
internal_port = 8082
|
internal_port = 8082
|
|
@ -15,12 +15,16 @@ use solana_sdk::{
|
||||||
use std::{
|
use std::{
|
||||||
borrow::BorrowMut,
|
borrow::BorrowMut,
|
||||||
collections::{HashMap, HashSet},
|
collections::{HashMap, HashSet},
|
||||||
time::{SystemTime, UNIX_EPOCH}, mem::size_of,
|
mem::size_of,
|
||||||
|
time::{SystemTime, UNIX_EPOCH},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::metrics::MetricU64;
|
use crate::metrics::MetricU64;
|
||||||
use anchor_lang::AccountDeserialize;
|
use anchor_lang::AccountDeserialize;
|
||||||
use mango_v4::{state::{BookSide, OrderTreeType}, serum3_cpi::OrderBookStateHeader};
|
use mango_v4::{
|
||||||
|
serum3_cpi::OrderBookStateHeader,
|
||||||
|
state::{BookSide, OrderTreeType},
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum OrderbookSide {
|
pub enum OrderbookSide {
|
||||||
|
@ -40,24 +44,7 @@ impl Serialize for OrderbookSide {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
pub type OrderbookLevel = [f64; 2];
|
||||||
pub struct OrderbookLevel {
|
|
||||||
pub price: f64,
|
|
||||||
pub size: f64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Serialize for OrderbookLevel {
|
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
let mut state = serializer.serialize_struct("OrderbookLevel", 2)?;
|
|
||||||
state.serialize_field("price", &self.price)?;
|
|
||||||
state.serialize_field("size", &self.size)?;
|
|
||||||
|
|
||||||
state.end()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct OrderbookUpdate {
|
pub struct OrderbookUpdate {
|
||||||
|
@ -121,12 +108,57 @@ pub struct MarketConfig {
|
||||||
pub asks: Pubkey,
|
pub asks: Pubkey,
|
||||||
pub base_decimals: u8,
|
pub base_decimals: u8,
|
||||||
pub quote_decimals: u8,
|
pub quote_decimals: u8,
|
||||||
|
pub base_lot_size: i64,
|
||||||
|
pub quote_lot_size: i64,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn native_to_ui(native: i64, decimals: u8) -> f64 {
|
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
|
||||||
native as f64 / (10u64.pow(decimals.into())) as f64
|
let decimals: u32 = 3;
|
||||||
|
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
|
||||||
|
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
|
||||||
|
let decimals: u32 = 4;
|
||||||
|
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
|
||||||
|
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn price_lots_to_ui(
|
||||||
|
native: i64,
|
||||||
|
base_decimals: u8,
|
||||||
|
quote_decimals: u8,
|
||||||
|
) -> f64 {
|
||||||
|
let decimals = base_decimals - quote_decimals;
|
||||||
|
// let res = native as f64
|
||||||
|
// * ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
|
||||||
|
// as f64;
|
||||||
|
let res = native as f64
|
||||||
|
/ (10u64.pow(decimals.into()))
|
||||||
|
as f64;
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn price_lots_to_ui_perp(
|
||||||
|
native: i64,
|
||||||
|
base_decimals: u8,
|
||||||
|
quote_decimals: u8,
|
||||||
|
base_lot_size: i64,
|
||||||
|
quote_lot_size: i64,
|
||||||
|
) -> f64 {
|
||||||
|
let decimals = base_decimals - quote_decimals;
|
||||||
|
let res = native as f64
|
||||||
|
* ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
|
||||||
|
as f64;
|
||||||
|
// let res = native as f64
|
||||||
|
// / (10u64.pow(decimals.into()))
|
||||||
|
// as f64;
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
fn publish_changes(
|
fn publish_changes(
|
||||||
slot: u64,
|
slot: u64,
|
||||||
write_version: u64,
|
write_version: u64,
|
||||||
|
@ -143,14 +175,12 @@ fn publish_changes(
|
||||||
for previous_order in previous_bookside.iter() {
|
for previous_order in previous_bookside.iter() {
|
||||||
let peer = current_bookside
|
let peer = current_bookside
|
||||||
.iter()
|
.iter()
|
||||||
.find(|level| previous_order.price == level.price);
|
.find(|level| previous_order[0] == level[0]);
|
||||||
|
|
||||||
match peer {
|
match peer {
|
||||||
None => {
|
None => {
|
||||||
update.push(OrderbookLevel {
|
info!("removed level {}", previous_order[0]);
|
||||||
price: previous_order.price,
|
update.push([previous_order[0], 0f64]);
|
||||||
size: 0f64,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
_ => continue,
|
_ => continue,
|
||||||
}
|
}
|
||||||
|
@ -160,21 +190,18 @@ fn publish_changes(
|
||||||
for current_order in current_bookside {
|
for current_order in current_bookside {
|
||||||
let peer = previous_bookside
|
let peer = previous_bookside
|
||||||
.iter()
|
.iter()
|
||||||
.find(|item| item.price == current_order.price);
|
.find(|item| item[0] == current_order[0]);
|
||||||
|
|
||||||
match peer {
|
match peer {
|
||||||
Some(previous_order) => {
|
Some(previous_order) => {
|
||||||
if previous_order.size == current_order.size {
|
if previous_order[1] == current_order[1] {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
debug!(
|
info!("size changed {} -> {}", previous_order[1], current_order[1]);
|
||||||
"size changed {} -> {}",
|
|
||||||
previous_order.size, current_order.size
|
|
||||||
);
|
|
||||||
update.push(current_order.clone());
|
update.push(current_order.clone());
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
debug!("new level {},{}", current_order.price, current_order.size);
|
info!("new level {},{}", current_order[0], current_order[1]);
|
||||||
update.push(current_order.clone())
|
update.push(current_order.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -232,14 +259,12 @@ fn publish_changes_serum(
|
||||||
for previous_order in previous_bookside.iter() {
|
for previous_order in previous_bookside.iter() {
|
||||||
let peer = current_bookside
|
let peer = current_bookside
|
||||||
.iter()
|
.iter()
|
||||||
.find(|level| previous_order.price == level.price);
|
.find(|level| previous_order[0] == level[0]);
|
||||||
|
|
||||||
match peer {
|
match peer {
|
||||||
None => {
|
None => {
|
||||||
update.push(OrderbookLevel {
|
info!("removed level s {}", previous_order[0]);
|
||||||
price: previous_order.price,
|
update.push([previous_order[0], 0f64]);
|
||||||
size: 0f64,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
_ => continue,
|
_ => continue,
|
||||||
}
|
}
|
||||||
|
@ -249,21 +274,18 @@ fn publish_changes_serum(
|
||||||
for current_order in current_bookside {
|
for current_order in current_bookside {
|
||||||
let peer = previous_bookside
|
let peer = previous_bookside
|
||||||
.iter()
|
.iter()
|
||||||
.find(|item| item.price == current_order.price);
|
.find(|item| item[0] == current_order[0]);
|
||||||
|
|
||||||
match peer {
|
match peer {
|
||||||
Some(previous_order) => {
|
Some(previous_order) => {
|
||||||
if previous_order.size == current_order.size {
|
if previous_order[1] == current_order[1] {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
debug!(
|
info!("size changed {} -> {}", previous_order[1], current_order[1]);
|
||||||
"size changed {} -> {}",
|
|
||||||
previous_order.size, current_order.size
|
|
||||||
);
|
|
||||||
update.push(current_order.clone());
|
update.push(current_order.clone());
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
debug!("new level {},{}", current_order.price, current_order.size);
|
info!("new level {},{}", current_order[0], current_order[1]);
|
||||||
update.push(current_order.clone())
|
update.push(current_order.clone())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -412,14 +434,26 @@ pub async fn init(
|
||||||
.map(|item| (item.node.price_data() as i64, item.node.quantity))
|
.map(|item| (item.node.price_data() as i64, item.node.quantity))
|
||||||
.group_by(|(price, _)| *price)
|
.group_by(|(price, _)| *price)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(price, group)| OrderbookLevel {
|
.map(|(price, group)| {
|
||||||
price: native_to_ui(price, mkt.1.quote_decimals),
|
[
|
||||||
size: native_to_ui(group
|
price_lots_to_ui_perp(
|
||||||
.map(|(_, quantity)| quantity)
|
price,
|
||||||
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
|
mkt.1.base_decimals,
|
||||||
|
mkt.1.quote_decimals,
|
||||||
|
mkt.1.base_lot_size,
|
||||||
|
mkt.1.quote_lot_size,
|
||||||
|
),
|
||||||
|
base_lots_to_ui_perp(
|
||||||
|
group
|
||||||
|
.map(|(_, quantity)| quantity)
|
||||||
|
.fold(0, |acc, x| acc + x),
|
||||||
|
mkt.1.base_decimals,
|
||||||
|
mkt.1.base_lot_size,
|
||||||
|
),
|
||||||
|
]
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let other_bookside = bookside_cache.get(&other_side_pk.to_string());
|
let other_bookside = bookside_cache.get(&other_side_pk.to_string());
|
||||||
|
|
||||||
match bookside_cache.get(&side_pk_string) {
|
match bookside_cache.get(&side_pk_string) {
|
||||||
|
@ -471,14 +505,26 @@ pub async fn init(
|
||||||
|
|
||||||
let bookside: Vec<OrderbookLevel> = slab
|
let bookside: Vec<OrderbookLevel> = slab
|
||||||
.iter(side == 0)
|
.iter(side == 0)
|
||||||
.map(|item| (u64::from(item.price()) as i64, item.quantity() as i64))
|
.map(|item| {
|
||||||
|
(u64::from(item.price()) as i64, item.quantity() as i64)
|
||||||
|
})
|
||||||
.group_by(|(price, _)| *price)
|
.group_by(|(price, _)| *price)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(price, group)| OrderbookLevel {
|
.map(|(price, group)| {
|
||||||
price: native_to_ui(price, mkt.1.quote_decimals),
|
[
|
||||||
size: native_to_ui(group
|
price_lots_to_ui(
|
||||||
.map(|(_, quantity)| quantity)
|
price,
|
||||||
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
|
mkt.1.base_decimals,
|
||||||
|
mkt.1.quote_decimals,
|
||||||
|
),
|
||||||
|
base_lots_to_ui(
|
||||||
|
group
|
||||||
|
.map(|(_, quantity)| quantity)
|
||||||
|
.fold(0, |acc, x| acc + x),
|
||||||
|
mkt.1.base_decimals,
|
||||||
|
mkt.1.base_lot_size,
|
||||||
|
),
|
||||||
|
]
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
@ -504,10 +550,7 @@ pub async fn init(
|
||||||
_ => info!("bookside_cache could not find {}", side_pk_string),
|
_ => info!("bookside_cache could not find {}", side_pk_string),
|
||||||
}
|
}
|
||||||
|
|
||||||
serum_bookside_cache.insert(
|
serum_bookside_cache.insert(side_pk_string.clone(), bookside);
|
||||||
side_pk_string.clone(),
|
|
||||||
bookside,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
Err(_) => info!("chain_cache could not find {}", side_pk),
|
Err(_) => info!("chain_cache could not find {}", side_pk),
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,8 +24,8 @@ tokio = { version = "1", features = ["full"] }
|
||||||
tokio-tungstenite = "0.17"
|
tokio-tungstenite = "0.17"
|
||||||
bytemuck = "1.7.2"
|
bytemuck = "1.7.2"
|
||||||
|
|
||||||
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "ckamm/accountfetcher-send" }
|
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
|
||||||
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "ckamm/accountfetcher-send" }
|
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
|
||||||
serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things" }
|
serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things" }
|
||||||
anchor-lang = "0.25.0"
|
anchor-lang = "0.25.0"
|
||||||
anchor-client = "0.25.0"
|
anchor-client = "0.25.0"
|
||||||
|
|
|
@ -1,13 +1,35 @@
|
||||||
connector-fills
|
# service-mango-fills
|
||||||
|
|
||||||
This module parses event queues and exposes individual fills on a websocket.
|
This module parses event queues and exposes individual fills on a websocket.
|
||||||
|
|
||||||
TODO:
|
## Setup
|
||||||
|
|
||||||
- [] early filter out all account writes we dont care about
|
1. Prepare the connector configuration file.
|
||||||
- [] startup logic, dont accept websockets before first snapshot
|
|
||||||
|
[Here is an example](service-mango-fills/example-config.toml).
|
||||||
|
|
||||||
|
- `bind_ws_addr` is the listen port for the websocket clients
|
||||||
|
- `rpc_ws_url` is unused and can stay empty.
|
||||||
|
- `connection_string` for your `grpc_sources` must point to the gRPC server
|
||||||
|
address configured for the plugin.
|
||||||
|
- `rpc_http_url` must point to the JSON-RPC URL.
|
||||||
|
- `program_id` must match what is configured for the gRPC plugin
|
||||||
|
- `markets` need to contain all observed perp markets
|
||||||
|
|
||||||
|
2. Start the service binary.
|
||||||
|
|
||||||
|
Pass the path to the config file as the first argument. It logs to stdout. It
|
||||||
|
should be restarted on exit.
|
||||||
|
|
||||||
|
3. Monitor the logs
|
||||||
|
|
||||||
|
`WARN` messages can be recovered from. `ERROR` messages need attention. The
|
||||||
|
logs are very spammy changing the default log level is recommended when you
|
||||||
|
dont want to analyze performance of the service.
|
||||||
|
|
||||||
|
## TODO
|
||||||
|
- [] startup logic, dont accept market subscriptions before first snapshot
|
||||||
- [] failover logic, kill all websockets when we receive a later snapshot, more
|
- [] failover logic, kill all websockets when we receive a later snapshot, more
|
||||||
frequent when running on home connections
|
frequent when running on home connections
|
||||||
- [] track websocket connect / disconnect
|
|
||||||
- [] track latency accountwrite -> websocket
|
- [] track latency accountwrite -> websocket
|
||||||
- [] track queue length
|
- [] create new model for fills so snapshot maps can be combined per market
|
||||||
|
|
|
@ -1,19 +1,39 @@
|
||||||
use anchor_client::{Cluster, solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair, account::Account}};
|
use anchor_client::{
|
||||||
|
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
|
||||||
|
Cluster,
|
||||||
|
};
|
||||||
use anchor_lang::prelude::Pubkey;
|
use anchor_lang::prelude::Pubkey;
|
||||||
use bytemuck::cast_slice;
|
use bytemuck::cast_slice;
|
||||||
use client::{Client, MangoGroupContext};
|
use client::{Client, MangoGroupContext};
|
||||||
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
||||||
use futures_util::{pin_mut, SinkExt, StreamExt, future::{self, Ready}, TryStreamExt};
|
use futures_util::{
|
||||||
|
future::{self, Ready},
|
||||||
|
pin_mut, SinkExt, StreamExt, TryStreamExt,
|
||||||
|
};
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::{collections::{HashMap, HashSet}, fs::File, io::Read, net::SocketAddr, sync::Arc, sync::Mutex, time::Duration, convert::identity, str::FromStr};
|
use std::{
|
||||||
|
collections::{HashMap, HashSet},
|
||||||
|
convert::identity,
|
||||||
|
fs::File,
|
||||||
|
io::Read,
|
||||||
|
net::SocketAddr,
|
||||||
|
str::FromStr,
|
||||||
|
sync::Arc,
|
||||||
|
sync::Mutex,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
net::{TcpListener, TcpStream},
|
net::{TcpListener, TcpStream},
|
||||||
pin,
|
pin, time,
|
||||||
};
|
};
|
||||||
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
||||||
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig, fill_event_filter::SerumFillCheckpoint, StatusResponse};
|
use solana_geyser_connector_lib::{
|
||||||
|
fill_event_filter::SerumFillCheckpoint,
|
||||||
|
metrics::{MetricType, MetricU64},
|
||||||
|
FilterConfig, StatusResponse,
|
||||||
|
};
|
||||||
use solana_geyser_connector_lib::{
|
use solana_geyser_connector_lib::{
|
||||||
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
|
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
|
||||||
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
|
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
|
||||||
|
@ -64,7 +84,15 @@ async fn handle_connection_error(
|
||||||
) {
|
) {
|
||||||
metrics_opened_connections.clone().increment();
|
metrics_opened_connections.clone().increment();
|
||||||
|
|
||||||
let result = handle_connection(checkpoint_map, serum_checkpoint_map, peer_map.clone(), market_ids, raw_stream, addr).await;
|
let result = handle_connection(
|
||||||
|
checkpoint_map,
|
||||||
|
serum_checkpoint_map,
|
||||||
|
peer_map.clone(),
|
||||||
|
market_ids,
|
||||||
|
raw_stream,
|
||||||
|
addr,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
error!("connection {} error {}", addr, result.unwrap_err());
|
error!("connection {} error {}", addr, result.unwrap_err());
|
||||||
};
|
};
|
||||||
|
@ -84,6 +112,7 @@ async fn handle_connection(
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
info!("ws connected: {}", addr);
|
info!("ws connected: {}", addr);
|
||||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
||||||
|
|
||||||
let (ws_tx, ws_rx) = ws_stream.split();
|
let (ws_tx, ws_rx) = ws_stream.split();
|
||||||
|
|
||||||
// 1: publish channel in peer map
|
// 1: publish channel in peer map
|
||||||
|
@ -99,14 +128,26 @@ async fn handle_connection(
|
||||||
}
|
}
|
||||||
|
|
||||||
let receive_commands = ws_rx.try_for_each(|msg| {
|
let receive_commands = ws_rx.try_for_each(|msg| {
|
||||||
handle_commands(
|
match msg {
|
||||||
addr,
|
Message::Text(_) => {
|
||||||
msg,
|
handle_commands(
|
||||||
peer_map.clone(),
|
addr,
|
||||||
checkpoint_map.clone(),
|
msg,
|
||||||
serum_checkpoint_map.clone(),
|
peer_map.clone(),
|
||||||
market_ids.clone(),
|
checkpoint_map.clone(),
|
||||||
)
|
serum_checkpoint_map.clone(),
|
||||||
|
market_ids.clone(),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
Message::Ping(_) => {
|
||||||
|
let peers = peer_map.clone();
|
||||||
|
let mut peers_lock = peers.lock().unwrap();
|
||||||
|
let peer = peers_lock.get_mut(&addr).expect("peer should be in map");
|
||||||
|
peer.sender.unbounded_send(Message::Pong(Vec::new())).unwrap();
|
||||||
|
future::ready(Ok(()))
|
||||||
|
}
|
||||||
|
_ => future::ready(Ok(())),
|
||||||
|
}
|
||||||
});
|
});
|
||||||
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
|
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
|
||||||
|
|
||||||
|
@ -164,7 +205,6 @@ fn handle_commands(
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
if subscribed {
|
if subscribed {
|
||||||
// todo: this is janky af
|
|
||||||
let checkpoint_map = checkpoint_map.lock().unwrap();
|
let checkpoint_map = checkpoint_map.lock().unwrap();
|
||||||
let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap();
|
let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap();
|
||||||
let checkpoint = checkpoint_map.get(&market_id);
|
let checkpoint = checkpoint_map.get(&market_id);
|
||||||
|
@ -184,8 +224,8 @@ fn handle_commands(
|
||||||
))
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
None => info!("no checkpoint available on client subscription"), // todo: what to do here?
|
None => info!("no checkpoint available on client subscription"),
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -274,10 +314,13 @@ async fn main() -> anyhow::Result<()> {
|
||||||
&Keypair::new(),
|
&Keypair::new(),
|
||||||
Some(rpc_timeout),
|
Some(rpc_timeout),
|
||||||
);
|
);
|
||||||
let group_context = Arc::new(MangoGroupContext::new_from_rpc(
|
let group_context = Arc::new(
|
||||||
&client.rpc_async(),
|
MangoGroupContext::new_from_rpc(
|
||||||
Pubkey::from_str(&config.mango_group).unwrap(),
|
&client.rpc_async(),
|
||||||
).await?);
|
Pubkey::from_str(&config.mango_group).unwrap(),
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
);
|
||||||
|
|
||||||
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
|
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
|
||||||
.perp_markets
|
.perp_markets
|
||||||
|
@ -293,7 +336,8 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let serum_market_ais = client
|
let serum_market_ais = client
|
||||||
.rpc_async()
|
.rpc_async()
|
||||||
.get_multiple_accounts(serum_market_pks.as_slice()).await?;
|
.get_multiple_accounts(serum_market_pks.as_slice())
|
||||||
|
.await?;
|
||||||
let serum_market_ais: Vec<&Account> = serum_market_ais
|
let serum_market_ais: Vec<&Account> = serum_market_ais
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|maybe_ai| match maybe_ai {
|
.filter_map(|maybe_ai| match maybe_ai {
|
||||||
|
@ -309,25 +353,41 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
|
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
|
||||||
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
|
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
|
||||||
);
|
);
|
||||||
(serum_market_pks[pair.0], Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])))
|
(
|
||||||
|
serum_market_pks[pair.0],
|
||||||
|
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
|
||||||
|
)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let a: Vec<(String, String)> = group_context
|
let a: Vec<(String, String)> = group_context
|
||||||
.serum3_markets
|
.serum3_markets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, context)| (context.market.serum_market_external.to_string(), context.market.name().to_owned())).collect();
|
.map(|(_, context)| {
|
||||||
|
(
|
||||||
|
context.market.serum_market_external.to_string(),
|
||||||
|
context.market.name().to_owned(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
let b: Vec<(String, String)> = group_context
|
let b: Vec<(String, String)> = group_context
|
||||||
.perp_markets
|
.perp_markets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, context)| (context.address.to_string(), context.market.name().to_owned())).collect();
|
.map(|(_, context)| {
|
||||||
let market_pubkey_strings: HashMap<String, String> = [a, b]
|
(
|
||||||
.concat()
|
context.address.to_string(),
|
||||||
.into_iter()
|
context.market.name().to_owned(),
|
||||||
|
)
|
||||||
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
|
||||||
|
|
||||||
let (account_write_queue_sender, slot_queue_sender, fill_receiver) =
|
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
|
||||||
fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?;
|
perp_queue_pks.clone(),
|
||||||
|
serum_queue_pks.clone(),
|
||||||
|
metrics_tx.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
|
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
|
||||||
let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new()));
|
let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new()));
|
||||||
|
@ -336,6 +396,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let checkpoints_ref_thread = checkpoints.clone();
|
let checkpoints_ref_thread = checkpoints.clone();
|
||||||
let serum_checkpoints_ref_thread = serum_checkpoints.clone();
|
let serum_checkpoints_ref_thread = serum_checkpoints.clone();
|
||||||
let peers_ref_thread = peers.clone();
|
let peers_ref_thread = peers.clone();
|
||||||
|
let peers_ref_thread1 = peers.clone();
|
||||||
|
|
||||||
// filleventfilter websocket sink
|
// filleventfilter websocket sink
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
@ -348,15 +409,12 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
||||||
for (addr, peer) in peer_copy.iter_mut() {
|
for (addr, peer) in peer_copy.iter_mut() {
|
||||||
let json = serde_json::to_string(&update).unwrap();
|
let json = serde_json::to_string(&update).unwrap();
|
||||||
|
|
||||||
// only send updates if the peer is subscribed
|
// only send updates if the peer is subscribed
|
||||||
if peer.subscriptions.contains(&update.market) {
|
if peer.subscriptions.contains(&update.market) {
|
||||||
let result = peer.sender.send(Message::Text(json)).await;
|
let result = peer.sender.send(Message::Text(json)).await;
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
error!(
|
error!("ws update {} fill could not reach {}", update.market, addr);
|
||||||
"ws update {} fill could not reach {}",
|
|
||||||
update.market, addr
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -369,18 +427,15 @@ async fn main() -> anyhow::Result<()> {
|
||||||
}
|
}
|
||||||
FillEventFilterMessage::SerumUpdate(update) => {
|
FillEventFilterMessage::SerumUpdate(update) => {
|
||||||
debug!("ws update {} {:?} serum fill", update.market, update.status);
|
debug!("ws update {} {:?} serum fill", update.market, update.status);
|
||||||
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
let mut peers_copy = peers_ref_thread.lock().unwrap().clone();
|
||||||
for (addr, peer) in peer_copy.iter_mut() {
|
for (addr, peer) in peers_copy.iter_mut() {
|
||||||
let json = serde_json::to_string(&update).unwrap();
|
let json = serde_json::to_string(&update).unwrap();
|
||||||
|
|
||||||
// only send updates if the peer is subscribed
|
// only send updates if the peer is subscribed
|
||||||
if peer.subscriptions.contains(&update.market) {
|
if peer.subscriptions.contains(&update.market) {
|
||||||
let result = peer.sender.send(Message::Text(json)).await;
|
let result = peer.sender.send(Message::Text(json)).await;
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
error!(
|
error!("ws update {} fill could not reach {}", update.market, addr);
|
||||||
"ws update {} fill could not reach {}",
|
|
||||||
update.market, addr
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -398,6 +453,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
info!("ws listen: {}", config.bind_ws_addr);
|
info!("ws listen: {}", config.bind_ws_addr);
|
||||||
let try_socket = TcpListener::bind(&config.bind_ws_addr).await;
|
let try_socket = TcpListener::bind(&config.bind_ws_addr).await;
|
||||||
let listener = try_socket.expect("Failed to bind");
|
let listener = try_socket.expect("Failed to bind");
|
||||||
|
{
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// Let's spawn the handling of each connection in a separate task.
|
// Let's spawn the handling of each connection in a separate task.
|
||||||
while let Ok((stream, addr)) = listener.accept().await {
|
while let Ok((stream, addr)) = listener.accept().await {
|
||||||
|
@ -413,7 +469,26 @@ async fn main() -> anyhow::Result<()> {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// keepalive
|
||||||
|
{
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut write_interval = time::interval(time::Duration::from_secs(30));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
write_interval.tick().await;
|
||||||
|
let peers_copy = peers_ref_thread1.lock().unwrap().clone();
|
||||||
|
for (addr, peer) in peers_copy.iter() {
|
||||||
|
let pl = Vec::new();
|
||||||
|
let result = peer.clone().sender.send(Message::Ping(pl)).await;
|
||||||
|
if result.is_err() {
|
||||||
|
error!("ws ping could not reach {}", addr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
info!(
|
info!(
|
||||||
"rpc connect: {}",
|
"rpc connect: {}",
|
||||||
config
|
config
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
# service-mango-orderbook
|
||||||
|
|
||||||
|
This module parses bookside accounts and exposes L2 data and updates on a websocket
|
||||||
|
|
||||||
|
## Setup
|
||||||
|
|
||||||
|
1. Prepare the connector configuration file.
|
||||||
|
|
||||||
|
[Here is an example](service-mango-fills/example-config.toml).
|
||||||
|
|
||||||
|
- `bind_ws_addr` is the listen port for the websocket clients
|
||||||
|
- `rpc_ws_url` is unused and can stay empty.
|
||||||
|
- `connection_string` for your `grpc_sources` must point to the gRPC server
|
||||||
|
address configured for the plugin.
|
||||||
|
- `rpc_http_url` must point to the JSON-RPC URL.
|
||||||
|
- `program_id` must match what is configured for the gRPC plugin
|
||||||
|
- `markets` need to contain all observed perp markets
|
||||||
|
|
||||||
|
2. Start the service binary.
|
||||||
|
|
||||||
|
Pass the path to the config file as the first argument. It logs to stdout. It
|
||||||
|
should be restarted on exit.
|
||||||
|
|
||||||
|
3. Monitor the logs
|
||||||
|
|
||||||
|
`WARN` messages can be recovered from. `ERROR` messages need attention. The
|
||||||
|
logs are very spammy changing the default log level is recommended when you
|
||||||
|
dont want to analyze performance of the service.
|
||||||
|
|
||||||
|
## TODO
|
||||||
|
- [] startup logic, dont accept market subscriptions before first snapshot
|
||||||
|
- [] failover logic, kill all websockets when we receive a later snapshot, more
|
||||||
|
frequent when running on home connections
|
||||||
|
- [] track latency accountwrite -> websocket
|
||||||
|
- [] create new model for fills so snapshot maps can be combined per market
|
|
@ -307,6 +307,8 @@ async fn main() -> anyhow::Result<()> {
|
||||||
asks: context.market.asks,
|
asks: context.market.asks,
|
||||||
base_decimals: context.market.base_decimals,
|
base_decimals: context.market.base_decimals,
|
||||||
quote_decimals,
|
quote_decimals,
|
||||||
|
base_lot_size: context.market.base_lot_size,
|
||||||
|
quote_lot_size: context.market.quote_lot_size,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
@ -325,13 +327,15 @@ async fn main() -> anyhow::Result<()> {
|
||||||
None => panic!("token not found for market") // todo: default to 6 for usdc?
|
None => panic!("token not found for market") // todo: default to 6 for usdc?
|
||||||
};
|
};
|
||||||
(
|
(
|
||||||
context.address,
|
context.market.serum_market_external,
|
||||||
MarketConfig {
|
MarketConfig {
|
||||||
name: context.market.name().to_owned(),
|
name: context.market.name().to_owned(),
|
||||||
bids: context.bids,
|
bids: context.bids,
|
||||||
asks: context.asks,
|
asks: context.asks,
|
||||||
base_decimals,
|
base_decimals,
|
||||||
quote_decimals,
|
quote_decimals,
|
||||||
|
base_lot_size: context.pc_lot_size as i64,
|
||||||
|
quote_lot_size: context.coin_lot_size as i64,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue