Compare commits
No commits in common. "b89faa1ed088a2346753203e7a008b7cace71d50" and "464946dc1648903eb3dbae5a8e7a8c0dafd44d1b" have entirely different histories.
b89faa1ed0
...
464946dc16
178
README.md
178
README.md
|
@ -1,22 +1,184 @@
|
|||
# mango-geyser-services
|
||||
# Overview
|
||||
|
||||
Mango v4 Geyser Services
|
||||
This project is about streaming Solana account updates for a specific program
|
||||
into other databases or event queues.
|
||||
|
||||
Having an up to date version of all account data data in a database is
|
||||
particularly useful for queries that need access to all accounts. For example,
|
||||
retrieving the addresses of Mango Markets accounts with the largest unrealized
|
||||
PnL goes from "getProgramAccounts from a Solana node for 50MB of data and
|
||||
compute locally (3-10s total)" to "run a SQL query (150ms total)".
|
||||
|
||||
The database could also be used as a backend for serving `getMultipleAccounts`
|
||||
and `getProgramAccounts` queries generally. That would reduce load on Solana
|
||||
RPCt nodes while decreasing response times.
|
||||
|
||||
Supported Solana sources:
|
||||
|
||||
- Geyser plugin (preferred) plus JSONRPC HTTP API (for initial snapshots)
|
||||
|
||||
Unfinished Solana sources:
|
||||
|
||||
- JSONRPC websocket subscriptions plus JSONRPC HTTP API (for initial snapshots)
|
||||
|
||||
Supported targets:
|
||||
|
||||
- PostgreSQL
|
||||
|
||||
# Components
|
||||
|
||||
- [`geyser-plugin-grpc/`](geyser-plugin-grpc/)
|
||||
|
||||
The Solana Geyser plugin. It opens a gRPC server (see [`proto/`](proto/)) and
|
||||
broadcasts account and slot updates to all clients that connect.
|
||||
|
||||
- [`lib/`](lib/)
|
||||
|
||||
Tools for building services
|
||||
The connector abstractions that the connector service is built from.
|
||||
|
||||
Projects may want to use it to build their own connector service and decode
|
||||
their specific account data before sending it into target systems.
|
||||
|
||||
- [`connector-raw/`](connector-raw/)
|
||||
|
||||
A connector binary built on lib/ that stores raw binary account data in
|
||||
PostgreSQL.
|
||||
|
||||
- [`connector-mango/`](connector-mango/)
|
||||
|
||||
A connector binary built on lib/ that decodes Mango account types before
|
||||
storing them in PostgeSQL.
|
||||
|
||||
- [`service-mango-fills/`](service-mango-fills/)
|
||||
|
||||
A service providing lowest-latency, bandwidth conserving access to fill events for Mango V4 Perp and Openbook markets
|
||||
A service providing lowest-latency, bandwidth conserving access to fill events
|
||||
as they are processed by the rpc node.
|
||||
|
||||
- [`service-mango-pnl/`](service-mango-pnl/)
|
||||
# Setup Tutorial
|
||||
|
||||
A service providing pre-computed account lists ordered by unsettled PnL per market
|
||||
1. Compile the project.
|
||||
|
||||
- [`service-mango-orderbook/`](service-mango-pnl/)
|
||||
Make sure that you are using _exactly_ the same Rust version for compiling
|
||||
the Geyser plugin that was used for compiling your `solana-validator`!
|
||||
Otherwise the plugin will crash the validator during startup!
|
||||
|
||||
A service providing Orderbook L2 state and delta updates for Mango V4 Perp and Openbook Spot markets
|
||||
2. Prepare the plugin configuration file.
|
||||
|
||||
[Here is an example](geyser-plugin-grpc/example-config.json). This file
|
||||
points the validator to your plugin shared library, controls which accounts
|
||||
will be exported, which address the gRPC server will bind to and internal
|
||||
queue sizes.
|
||||
|
||||
3. Run `solana-validator` with `--geyser-plugin-config myconfig.json`.
|
||||
|
||||
Check the logs to ensure the plugin was loaded.
|
||||
|
||||
4. Prepare the connector configuration file.
|
||||
|
||||
[Here is an example](connector-raw/example-config.toml).
|
||||
|
||||
- `rpc_ws_url` is unused and can stay empty.
|
||||
- `connection_string` for your `grpc_sources` must point to the gRPC server
|
||||
address configured for the plugin.
|
||||
- `rpc_http_url` must point to the JSON-RPC URL.
|
||||
- `connection_string` for your `posgres_target` uses
|
||||
[the tokio-postgres syntax](https://docs.rs/tokio-postgres/0.7.5/tokio_postgres/config/struct.Config.html)
|
||||
- `program_id` must match what is configured for the gRPC plugin
|
||||
|
||||
5. Prepare the PostgreSQL schema.
|
||||
|
||||
Use [this example script](connector-raw/scripts/create_schema.sql).
|
||||
|
||||
6. Start the connector service binary.
|
||||
|
||||
Pass the path to the config file as the first argument. It logs to stdout. It
|
||||
should be restarted on exit. (it intentionally terminates when postgres is
|
||||
unreachable for too long, for example)
|
||||
|
||||
7. Monitor the logs
|
||||
|
||||
`WARN` messages can be recovered from. `ERROR` messages need attention.
|
||||
|
||||
Check the metrics for `account_write_queue` and `slot_update_queue`: They
|
||||
should be around 0. If they keep growing the service can't keep up and you'll
|
||||
need to figure out what's up.
|
||||
|
||||
# Design and Reliability
|
||||
|
||||
```
|
||||
Solana ---------------> Connector -----------> PostgreSQL
|
||||
nodes jsonrpc/gRPC nodes
|
||||
```
|
||||
|
||||
For reliability it is recommended to feed data from multiple Solana nodes into
|
||||
each Connector node.
|
||||
|
||||
It is also allowed to run multiple Connector nodes that target the same
|
||||
PostgeSQL target database.
|
||||
|
||||
The Connector service is stateless (except for some caches). Restarting it is
|
||||
always safe.
|
||||
|
||||
If the Solana node is down, the Connector service attempts to reconnect and then
|
||||
requests a new data snapshot if necessary.
|
||||
|
||||
If PostgeSQL is down temporarily, the Connector service caches updates and
|
||||
applies them when the database is back up.
|
||||
|
||||
If PostgreSQL is down for a longer time, the Connector service exits with an
|
||||
error. On restart, it pauses until PostgreSQL is back up, and then starts
|
||||
pulling data from the Solana nodes again.
|
||||
|
||||
# PostgreSQL data layout
|
||||
|
||||
See `scripts/` for SQL that creates the target schema.
|
||||
|
||||
The Connector streams data into the `account_write` and `slot` tables. When
|
||||
slots become "rooted", older `account_write` data rooted slots is deleted. That
|
||||
way the current account data for the latest rooted, confirmed or processed slot
|
||||
can be queried, but older data is forgotten.
|
||||
|
||||
When new slots arrive, the `uncle` column is updated for "processed" and
|
||||
"confirmed" slots to allow easy filtering of slots that are no longer part of
|
||||
the chain.
|
||||
|
||||
Example for querying confirmed data:
|
||||
|
||||
```
|
||||
SELECT DISTINCT ON(pubkey_id)
|
||||
pubkey, account_write.*
|
||||
FROM account_write
|
||||
LEFT JOIN slot USING(slot)
|
||||
INNER JOIN pubkey USING(pubkey_id)
|
||||
WHERE status = 'Rooted' OR status IS NULL OR (uncle = FALSE AND status = 'Confirmed')
|
||||
ORDER BY pubkey_id, slot DESC, write_version DESC;
|
||||
```
|
||||
|
||||
For each pubkey, this gets the latest (most recent slot, most recent
|
||||
write_version) account data; limited to slots that are either rooted or
|
||||
(confirmed and not an uncle).
|
||||
|
||||
# Fill Service Setup
|
||||
|
||||
1. Prepare the connector configuration file.
|
||||
|
||||
[Here is an example](service-mango-fills/example-config.toml).
|
||||
|
||||
- `bind_ws_addr` is the listen port for the websocket clients
|
||||
- `rpc_ws_url` is unused and can stay empty.
|
||||
- `connection_string` for your `grpc_sources` must point to the gRPC server
|
||||
address configured for the plugin.
|
||||
- `rpc_http_url` must point to the JSON-RPC URL.
|
||||
- `program_id` must match what is configured for the gRPC plugin
|
||||
- `markets` need to contain all observed perp markets
|
||||
|
||||
2. Start the service binary.
|
||||
|
||||
Pass the path to the config file as the first argument. It logs to stdout. It
|
||||
should be restarted on exit.
|
||||
|
||||
3. Monitor the logs
|
||||
|
||||
`WARN` messages can be recovered from. `ERROR` messages need attention. The
|
||||
logs are very spammy changing the default log level is recommended when you
|
||||
dont want to analyze performance of the service.
|
||||
|
|
23
cd/pnl.toml
23
cd/pnl.toml
|
@ -1,23 +0,0 @@
|
|||
app = "mango-pnl"
|
||||
kill_signal = "SIGINT"
|
||||
kill_timeout = 5
|
||||
|
||||
[build]
|
||||
dockerfile = "../Dockerfile"
|
||||
|
||||
[experimental]
|
||||
cmd = ["service-mango-pnl", "pnl-config.toml"]
|
||||
|
||||
[[services]]
|
||||
internal_port = 8081
|
||||
processes = ["app"]
|
||||
protocol = "tcp"
|
||||
|
||||
[services.concurrency]
|
||||
hard_limit = 1024
|
||||
soft_limit = 1024
|
||||
type = "connections"
|
||||
|
||||
[metrics]
|
||||
path = "/metrics"
|
||||
port = 9091
|
|
@ -3,8 +3,12 @@ app = "mango-geyser-services"
|
|||
kill_signal = "SIGINT"
|
||||
kill_timeout = 5
|
||||
|
||||
# [build]
|
||||
# image = "us-docker.pkg.dev/mango-markets/gcr.io/mango-geyser-services:latest"
|
||||
|
||||
[processes]
|
||||
fills = "service-mango-fills fills-config.toml"
|
||||
pnl = "service-mango-pnl pnl-config.toml"
|
||||
orderbook = "service-mango-orderbook orderbook-config.toml"
|
||||
|
||||
[[services]]
|
||||
|
@ -21,6 +25,20 @@ kill_timeout = 5
|
|||
hard_limit = 1024
|
||||
soft_limit = 1024
|
||||
|
||||
[[services]]
|
||||
processes = ["pnl"]
|
||||
internal_port = 8081
|
||||
protocol = "tcp"
|
||||
|
||||
[[services.ports]]
|
||||
handlers = ["tls", "http"]
|
||||
port = 8081
|
||||
|
||||
[services.concurrency]
|
||||
type = "connections"
|
||||
hard_limit = 1024
|
||||
soft_limit = 1024
|
||||
|
||||
[[services]]
|
||||
processes = ["orderbook"]
|
||||
internal_port = 8082
|
|
@ -15,16 +15,12 @@ use solana_sdk::{
|
|||
use std::{
|
||||
borrow::BorrowMut,
|
||||
collections::{HashMap, HashSet},
|
||||
mem::size_of,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
time::{SystemTime, UNIX_EPOCH}, mem::size_of,
|
||||
};
|
||||
|
||||
use crate::metrics::MetricU64;
|
||||
use anchor_lang::AccountDeserialize;
|
||||
use mango_v4::{
|
||||
serum3_cpi::OrderBookStateHeader,
|
||||
state::{BookSide, OrderTreeType},
|
||||
};
|
||||
use mango_v4::{state::{BookSide, OrderTreeType}, serum3_cpi::OrderBookStateHeader};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum OrderbookSide {
|
||||
|
@ -44,7 +40,24 @@ impl Serialize for OrderbookSide {
|
|||
}
|
||||
}
|
||||
|
||||
pub type OrderbookLevel = [f64; 2];
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OrderbookLevel {
|
||||
pub price: f64,
|
||||
pub size: f64,
|
||||
}
|
||||
|
||||
impl Serialize for OrderbookLevel {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut state = serializer.serialize_struct("OrderbookLevel", 2)?;
|
||||
state.serialize_field("price", &self.price)?;
|
||||
state.serialize_field("size", &self.size)?;
|
||||
|
||||
state.end()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct OrderbookUpdate {
|
||||
|
@ -108,57 +121,12 @@ pub struct MarketConfig {
|
|||
pub asks: Pubkey,
|
||||
pub base_decimals: u8,
|
||||
pub quote_decimals: u8,
|
||||
pub base_lot_size: i64,
|
||||
pub quote_lot_size: i64,
|
||||
}
|
||||
|
||||
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
|
||||
let decimals: u32 = 3;
|
||||
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
|
||||
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
|
||||
res
|
||||
pub fn native_to_ui(native: i64, decimals: u8) -> f64 {
|
||||
native as f64 / (10u64.pow(decimals.into())) as f64
|
||||
}
|
||||
|
||||
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
|
||||
let decimals: u32 = 4;
|
||||
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
|
||||
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
|
||||
res
|
||||
}
|
||||
|
||||
pub fn price_lots_to_ui(
|
||||
native: i64,
|
||||
base_decimals: u8,
|
||||
quote_decimals: u8,
|
||||
) -> f64 {
|
||||
let decimals = base_decimals - quote_decimals;
|
||||
// let res = native as f64
|
||||
// * ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
|
||||
// as f64;
|
||||
let res = native as f64
|
||||
/ (10u64.pow(decimals.into()))
|
||||
as f64;
|
||||
res
|
||||
}
|
||||
|
||||
pub fn price_lots_to_ui_perp(
|
||||
native: i64,
|
||||
base_decimals: u8,
|
||||
quote_decimals: u8,
|
||||
base_lot_size: i64,
|
||||
quote_lot_size: i64,
|
||||
) -> f64 {
|
||||
let decimals = base_decimals - quote_decimals;
|
||||
let res = native as f64
|
||||
* ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
|
||||
as f64;
|
||||
// let res = native as f64
|
||||
// / (10u64.pow(decimals.into()))
|
||||
// as f64;
|
||||
res
|
||||
}
|
||||
|
||||
|
||||
fn publish_changes(
|
||||
slot: u64,
|
||||
write_version: u64,
|
||||
|
@ -175,12 +143,14 @@ fn publish_changes(
|
|||
for previous_order in previous_bookside.iter() {
|
||||
let peer = current_bookside
|
||||
.iter()
|
||||
.find(|level| previous_order[0] == level[0]);
|
||||
.find(|level| previous_order.price == level.price);
|
||||
|
||||
match peer {
|
||||
None => {
|
||||
info!("removed level {}", previous_order[0]);
|
||||
update.push([previous_order[0], 0f64]);
|
||||
update.push(OrderbookLevel {
|
||||
price: previous_order.price,
|
||||
size: 0f64,
|
||||
});
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
|
@ -190,18 +160,21 @@ fn publish_changes(
|
|||
for current_order in current_bookside {
|
||||
let peer = previous_bookside
|
||||
.iter()
|
||||
.find(|item| item[0] == current_order[0]);
|
||||
.find(|item| item.price == current_order.price);
|
||||
|
||||
match peer {
|
||||
Some(previous_order) => {
|
||||
if previous_order[1] == current_order[1] {
|
||||
if previous_order.size == current_order.size {
|
||||
continue;
|
||||
}
|
||||
info!("size changed {} -> {}", previous_order[1], current_order[1]);
|
||||
debug!(
|
||||
"size changed {} -> {}",
|
||||
previous_order.size, current_order.size
|
||||
);
|
||||
update.push(current_order.clone());
|
||||
}
|
||||
None => {
|
||||
info!("new level {},{}", current_order[0], current_order[1]);
|
||||
debug!("new level {},{}", current_order.price, current_order.size);
|
||||
update.push(current_order.clone())
|
||||
}
|
||||
}
|
||||
|
@ -259,12 +232,14 @@ fn publish_changes_serum(
|
|||
for previous_order in previous_bookside.iter() {
|
||||
let peer = current_bookside
|
||||
.iter()
|
||||
.find(|level| previous_order[0] == level[0]);
|
||||
.find(|level| previous_order.price == level.price);
|
||||
|
||||
match peer {
|
||||
None => {
|
||||
info!("removed level s {}", previous_order[0]);
|
||||
update.push([previous_order[0], 0f64]);
|
||||
update.push(OrderbookLevel {
|
||||
price: previous_order.price,
|
||||
size: 0f64,
|
||||
});
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
|
@ -274,18 +249,21 @@ fn publish_changes_serum(
|
|||
for current_order in current_bookside {
|
||||
let peer = previous_bookside
|
||||
.iter()
|
||||
.find(|item| item[0] == current_order[0]);
|
||||
.find(|item| item.price == current_order.price);
|
||||
|
||||
match peer {
|
||||
Some(previous_order) => {
|
||||
if previous_order[1] == current_order[1] {
|
||||
if previous_order.size == current_order.size {
|
||||
continue;
|
||||
}
|
||||
info!("size changed {} -> {}", previous_order[1], current_order[1]);
|
||||
debug!(
|
||||
"size changed {} -> {}",
|
||||
previous_order.size, current_order.size
|
||||
);
|
||||
update.push(current_order.clone());
|
||||
}
|
||||
None => {
|
||||
info!("new level {},{}", current_order[0], current_order[1]);
|
||||
debug!("new level {},{}", current_order.price, current_order.size);
|
||||
update.push(current_order.clone())
|
||||
}
|
||||
}
|
||||
|
@ -434,23 +412,11 @@ pub async fn init(
|
|||
.map(|item| (item.node.price_data() as i64, item.node.quantity))
|
||||
.group_by(|(price, _)| *price)
|
||||
.into_iter()
|
||||
.map(|(price, group)| {
|
||||
[
|
||||
price_lots_to_ui_perp(
|
||||
price,
|
||||
mkt.1.base_decimals,
|
||||
mkt.1.quote_decimals,
|
||||
mkt.1.base_lot_size,
|
||||
mkt.1.quote_lot_size,
|
||||
),
|
||||
base_lots_to_ui_perp(
|
||||
group
|
||||
.map(|(price, group)| OrderbookLevel {
|
||||
price: native_to_ui(price, mkt.1.quote_decimals),
|
||||
size: native_to_ui(group
|
||||
.map(|(_, quantity)| quantity)
|
||||
.fold(0, |acc, x| acc + x),
|
||||
mkt.1.base_decimals,
|
||||
mkt.1.base_lot_size,
|
||||
),
|
||||
]
|
||||
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -505,26 +471,14 @@ pub async fn init(
|
|||
|
||||
let bookside: Vec<OrderbookLevel> = slab
|
||||
.iter(side == 0)
|
||||
.map(|item| {
|
||||
(u64::from(item.price()) as i64, item.quantity() as i64)
|
||||
})
|
||||
.map(|item| (u64::from(item.price()) as i64, item.quantity() as i64))
|
||||
.group_by(|(price, _)| *price)
|
||||
.into_iter()
|
||||
.map(|(price, group)| {
|
||||
[
|
||||
price_lots_to_ui(
|
||||
price,
|
||||
mkt.1.base_decimals,
|
||||
mkt.1.quote_decimals,
|
||||
),
|
||||
base_lots_to_ui(
|
||||
group
|
||||
.map(|(price, group)| OrderbookLevel {
|
||||
price: native_to_ui(price, mkt.1.quote_decimals),
|
||||
size: native_to_ui(group
|
||||
.map(|(_, quantity)| quantity)
|
||||
.fold(0, |acc, x| acc + x),
|
||||
mkt.1.base_decimals,
|
||||
mkt.1.base_lot_size,
|
||||
),
|
||||
]
|
||||
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -550,7 +504,10 @@ pub async fn init(
|
|||
_ => info!("bookside_cache could not find {}", side_pk_string),
|
||||
}
|
||||
|
||||
serum_bookside_cache.insert(side_pk_string.clone(), bookside);
|
||||
serum_bookside_cache.insert(
|
||||
side_pk_string.clone(),
|
||||
bookside,
|
||||
);
|
||||
}
|
||||
Err(_) => info!("chain_cache could not find {}", side_pk),
|
||||
}
|
||||
|
|
|
@ -24,8 +24,8 @@ tokio = { version = "1", features = ["full"] }
|
|||
tokio-tungstenite = "0.17"
|
||||
bytemuck = "1.7.2"
|
||||
|
||||
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
|
||||
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
|
||||
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "ckamm/accountfetcher-send" }
|
||||
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "ckamm/accountfetcher-send" }
|
||||
serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things" }
|
||||
anchor-lang = "0.25.0"
|
||||
anchor-client = "0.25.0"
|
||||
|
|
|
@ -1,35 +1,13 @@
|
|||
# service-mango-fills
|
||||
connector-fills
|
||||
|
||||
This module parses event queues and exposes individual fills on a websocket.
|
||||
|
||||
## Setup
|
||||
TODO:
|
||||
|
||||
1. Prepare the connector configuration file.
|
||||
|
||||
[Here is an example](service-mango-fills/example-config.toml).
|
||||
|
||||
- `bind_ws_addr` is the listen port for the websocket clients
|
||||
- `rpc_ws_url` is unused and can stay empty.
|
||||
- `connection_string` for your `grpc_sources` must point to the gRPC server
|
||||
address configured for the plugin.
|
||||
- `rpc_http_url` must point to the JSON-RPC URL.
|
||||
- `program_id` must match what is configured for the gRPC plugin
|
||||
- `markets` need to contain all observed perp markets
|
||||
|
||||
2. Start the service binary.
|
||||
|
||||
Pass the path to the config file as the first argument. It logs to stdout. It
|
||||
should be restarted on exit.
|
||||
|
||||
3. Monitor the logs
|
||||
|
||||
`WARN` messages can be recovered from. `ERROR` messages need attention. The
|
||||
logs are very spammy changing the default log level is recommended when you
|
||||
dont want to analyze performance of the service.
|
||||
|
||||
## TODO
|
||||
- [] startup logic, dont accept market subscriptions before first snapshot
|
||||
- [] early filter out all account writes we dont care about
|
||||
- [] startup logic, dont accept websockets before first snapshot
|
||||
- [] failover logic, kill all websockets when we receive a later snapshot, more
|
||||
frequent when running on home connections
|
||||
- [] track websocket connect / disconnect
|
||||
- [] track latency accountwrite -> websocket
|
||||
- [] create new model for fills so snapshot maps can be combined per market
|
||||
- [] track queue length
|
||||
|
|
|
@ -1,39 +1,19 @@
|
|||
use anchor_client::{
|
||||
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
|
||||
Cluster,
|
||||
};
|
||||
use anchor_client::{Cluster, solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair, account::Account}};
|
||||
use anchor_lang::prelude::Pubkey;
|
||||
use bytemuck::cast_slice;
|
||||
use client::{Client, MangoGroupContext};
|
||||
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
||||
use futures_util::{
|
||||
future::{self, Ready},
|
||||
pin_mut, SinkExt, StreamExt, TryStreamExt,
|
||||
};
|
||||
use futures_util::{pin_mut, SinkExt, StreamExt, future::{self, Ready}, TryStreamExt};
|
||||
use log::*;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
convert::identity,
|
||||
fs::File,
|
||||
io::Read,
|
||||
net::SocketAddr,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
sync::Mutex,
|
||||
time::Duration,
|
||||
};
|
||||
use std::{collections::{HashMap, HashSet}, fs::File, io::Read, net::SocketAddr, sync::Arc, sync::Mutex, time::Duration, convert::identity, str::FromStr};
|
||||
use tokio::{
|
||||
net::{TcpListener, TcpStream},
|
||||
pin, time,
|
||||
pin,
|
||||
};
|
||||
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
||||
|
||||
use serde::Deserialize;
|
||||
use solana_geyser_connector_lib::{
|
||||
fill_event_filter::SerumFillCheckpoint,
|
||||
metrics::{MetricType, MetricU64},
|
||||
FilterConfig, StatusResponse,
|
||||
};
|
||||
use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig, fill_event_filter::SerumFillCheckpoint, StatusResponse};
|
||||
use solana_geyser_connector_lib::{
|
||||
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
|
||||
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
|
||||
|
@ -84,15 +64,7 @@ async fn handle_connection_error(
|
|||
) {
|
||||
metrics_opened_connections.clone().increment();
|
||||
|
||||
let result = handle_connection(
|
||||
checkpoint_map,
|
||||
serum_checkpoint_map,
|
||||
peer_map.clone(),
|
||||
market_ids,
|
||||
raw_stream,
|
||||
addr,
|
||||
)
|
||||
.await;
|
||||
let result = handle_connection(checkpoint_map, serum_checkpoint_map, peer_map.clone(), market_ids, raw_stream, addr).await;
|
||||
if result.is_err() {
|
||||
error!("connection {} error {}", addr, result.unwrap_err());
|
||||
};
|
||||
|
@ -112,7 +84,6 @@ async fn handle_connection(
|
|||
) -> Result<(), Error> {
|
||||
info!("ws connected: {}", addr);
|
||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
||||
|
||||
let (ws_tx, ws_rx) = ws_stream.split();
|
||||
|
||||
// 1: publish channel in peer map
|
||||
|
@ -128,8 +99,6 @@ async fn handle_connection(
|
|||
}
|
||||
|
||||
let receive_commands = ws_rx.try_for_each(|msg| {
|
||||
match msg {
|
||||
Message::Text(_) => {
|
||||
handle_commands(
|
||||
addr,
|
||||
msg,
|
||||
|
@ -138,16 +107,6 @@ async fn handle_connection(
|
|||
serum_checkpoint_map.clone(),
|
||||
market_ids.clone(),
|
||||
)
|
||||
},
|
||||
Message::Ping(_) => {
|
||||
let peers = peer_map.clone();
|
||||
let mut peers_lock = peers.lock().unwrap();
|
||||
let peer = peers_lock.get_mut(&addr).expect("peer should be in map");
|
||||
peer.sender.unbounded_send(Message::Pong(Vec::new())).unwrap();
|
||||
future::ready(Ok(()))
|
||||
}
|
||||
_ => future::ready(Ok(())),
|
||||
}
|
||||
});
|
||||
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
|
||||
|
||||
|
@ -205,6 +164,7 @@ fn handle_commands(
|
|||
.unwrap();
|
||||
|
||||
if subscribed {
|
||||
// todo: this is janky af
|
||||
let checkpoint_map = checkpoint_map.lock().unwrap();
|
||||
let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap();
|
||||
let checkpoint = checkpoint_map.get(&market_id);
|
||||
|
@ -224,8 +184,8 @@ fn handle_commands(
|
|||
))
|
||||
.unwrap();
|
||||
}
|
||||
None => info!("no checkpoint available on client subscription"),
|
||||
},
|
||||
None => info!("no checkpoint available on client subscription"), // todo: what to do here?
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -314,13 +274,10 @@ async fn main() -> anyhow::Result<()> {
|
|||
&Keypair::new(),
|
||||
Some(rpc_timeout),
|
||||
);
|
||||
let group_context = Arc::new(
|
||||
MangoGroupContext::new_from_rpc(
|
||||
let group_context = Arc::new(MangoGroupContext::new_from_rpc(
|
||||
&client.rpc_async(),
|
||||
Pubkey::from_str(&config.mango_group).unwrap(),
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
).await?);
|
||||
|
||||
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
|
||||
.perp_markets
|
||||
|
@ -336,8 +293,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
|
||||
let serum_market_ais = client
|
||||
.rpc_async()
|
||||
.get_multiple_accounts(serum_market_pks.as_slice())
|
||||
.await?;
|
||||
.get_multiple_accounts(serum_market_pks.as_slice()).await?;
|
||||
let serum_market_ais: Vec<&Account> = serum_market_ais
|
||||
.iter()
|
||||
.filter_map(|maybe_ai| match maybe_ai {
|
||||
|
@ -353,41 +309,25 @@ async fn main() -> anyhow::Result<()> {
|
|||
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
|
||||
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
|
||||
);
|
||||
(
|
||||
serum_market_pks[pair.0],
|
||||
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
|
||||
)
|
||||
(serum_market_pks[pair.0], Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let a: Vec<(String, String)> = group_context
|
||||
.serum3_markets
|
||||
.iter()
|
||||
.map(|(_, context)| {
|
||||
(
|
||||
context.market.serum_market_external.to_string(),
|
||||
context.market.name().to_owned(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
.map(|(_, context)| (context.market.serum_market_external.to_string(), context.market.name().to_owned())).collect();
|
||||
let b: Vec<(String, String)> = group_context
|
||||
.perp_markets
|
||||
.iter()
|
||||
.map(|(_, context)| {
|
||||
(
|
||||
context.address.to_string(),
|
||||
context.market.name().to_owned(),
|
||||
)
|
||||
})
|
||||
.map(|(_, context)| (context.address.to_string(), context.market.name().to_owned())).collect();
|
||||
let market_pubkey_strings: HashMap<String, String> = [a, b]
|
||||
.concat()
|
||||
.into_iter()
|
||||
.collect();
|
||||
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
|
||||
|
||||
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
|
||||
perp_queue_pks.clone(),
|
||||
serum_queue_pks.clone(),
|
||||
metrics_tx.clone(),
|
||||
)
|
||||
.await?;
|
||||
let (account_write_queue_sender, slot_queue_sender, fill_receiver) =
|
||||
fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?;
|
||||
|
||||
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
|
||||
let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new()));
|
||||
|
@ -396,7 +336,6 @@ async fn main() -> anyhow::Result<()> {
|
|||
let checkpoints_ref_thread = checkpoints.clone();
|
||||
let serum_checkpoints_ref_thread = serum_checkpoints.clone();
|
||||
let peers_ref_thread = peers.clone();
|
||||
let peers_ref_thread1 = peers.clone();
|
||||
|
||||
// filleventfilter websocket sink
|
||||
tokio::spawn(async move {
|
||||
|
@ -414,7 +353,10 @@ async fn main() -> anyhow::Result<()> {
|
|||
if peer.subscriptions.contains(&update.market) {
|
||||
let result = peer.sender.send(Message::Text(json)).await;
|
||||
if result.is_err() {
|
||||
error!("ws update {} fill could not reach {}", update.market, addr);
|
||||
error!(
|
||||
"ws update {} fill could not reach {}",
|
||||
update.market, addr
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -427,15 +369,18 @@ async fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
FillEventFilterMessage::SerumUpdate(update) => {
|
||||
debug!("ws update {} {:?} serum fill", update.market, update.status);
|
||||
let mut peers_copy = peers_ref_thread.lock().unwrap().clone();
|
||||
for (addr, peer) in peers_copy.iter_mut() {
|
||||
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
||||
for (addr, peer) in peer_copy.iter_mut() {
|
||||
let json = serde_json::to_string(&update).unwrap();
|
||||
|
||||
// only send updates if the peer is subscribed
|
||||
if peer.subscriptions.contains(&update.market) {
|
||||
let result = peer.sender.send(Message::Text(json)).await;
|
||||
if result.is_err() {
|
||||
error!("ws update {} fill could not reach {}", update.market, addr);
|
||||
error!(
|
||||
"ws update {} fill could not reach {}",
|
||||
update.market, addr
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -453,7 +398,6 @@ async fn main() -> anyhow::Result<()> {
|
|||
info!("ws listen: {}", config.bind_ws_addr);
|
||||
let try_socket = TcpListener::bind(&config.bind_ws_addr).await;
|
||||
let listener = try_socket.expect("Failed to bind");
|
||||
{
|
||||
tokio::spawn(async move {
|
||||
// Let's spawn the handling of each connection in a separate task.
|
||||
while let Ok((stream, addr)) = listener.accept().await {
|
||||
|
@ -469,26 +413,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// keepalive
|
||||
{
|
||||
tokio::spawn(async move {
|
||||
let mut write_interval = time::interval(time::Duration::from_secs(30));
|
||||
|
||||
loop {
|
||||
write_interval.tick().await;
|
||||
let peers_copy = peers_ref_thread1.lock().unwrap().clone();
|
||||
for (addr, peer) in peers_copy.iter() {
|
||||
let pl = Vec::new();
|
||||
let result = peer.clone().sender.send(Message::Ping(pl)).await;
|
||||
if result.is_err() {
|
||||
error!("ws ping could not reach {}", addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
info!(
|
||||
"rpc connect: {}",
|
||||
config
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
# service-mango-orderbook
|
||||
|
||||
This module parses bookside accounts and exposes L2 data and updates on a websocket
|
||||
|
||||
## Setup
|
||||
|
||||
1. Prepare the connector configuration file.
|
||||
|
||||
[Here is an example](service-mango-fills/example-config.toml).
|
||||
|
||||
- `bind_ws_addr` is the listen port for the websocket clients
|
||||
- `rpc_ws_url` is unused and can stay empty.
|
||||
- `connection_string` for your `grpc_sources` must point to the gRPC server
|
||||
address configured for the plugin.
|
||||
- `rpc_http_url` must point to the JSON-RPC URL.
|
||||
- `program_id` must match what is configured for the gRPC plugin
|
||||
- `markets` need to contain all observed perp markets
|
||||
|
||||
2. Start the service binary.
|
||||
|
||||
Pass the path to the config file as the first argument. It logs to stdout. It
|
||||
should be restarted on exit.
|
||||
|
||||
3. Monitor the logs
|
||||
|
||||
`WARN` messages can be recovered from. `ERROR` messages need attention. The
|
||||
logs are very spammy changing the default log level is recommended when you
|
||||
dont want to analyze performance of the service.
|
||||
|
||||
## TODO
|
||||
- [] startup logic, dont accept market subscriptions before first snapshot
|
||||
- [] failover logic, kill all websockets when we receive a later snapshot, more
|
||||
frequent when running on home connections
|
||||
- [] track latency accountwrite -> websocket
|
||||
- [] create new model for fills so snapshot maps can be combined per market
|
|
@ -307,8 +307,6 @@ async fn main() -> anyhow::Result<()> {
|
|||
asks: context.market.asks,
|
||||
base_decimals: context.market.base_decimals,
|
||||
quote_decimals,
|
||||
base_lot_size: context.market.base_lot_size,
|
||||
quote_lot_size: context.market.quote_lot_size,
|
||||
},
|
||||
)
|
||||
})
|
||||
|
@ -327,15 +325,13 @@ async fn main() -> anyhow::Result<()> {
|
|||
None => panic!("token not found for market") // todo: default to 6 for usdc?
|
||||
};
|
||||
(
|
||||
context.market.serum_market_external,
|
||||
context.address,
|
||||
MarketConfig {
|
||||
name: context.market.name().to_owned(),
|
||||
bids: context.bids,
|
||||
asks: context.asks,
|
||||
base_decimals,
|
||||
quote_decimals,
|
||||
base_lot_size: context.pc_lot_size as i64,
|
||||
quote_lot_size: context.coin_lot_size as i64,
|
||||
},
|
||||
)
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue