Compare commits

...

3 Commits

Author SHA1 Message Date
Riordan Panayides b89faa1ed0 Add fills service ping handling 2023-01-11 15:43:07 +00:00
Riordan Panayides c838c58ca6 Fix orderbook decimals 2023-01-11 15:41:19 +00:00
Riordan Panayides f45e614765 Update CD, dependencies, READMEs 2023-01-11 15:40:36 +00:00
9 changed files with 323 additions and 301 deletions

178
README.md
View File

@ -1,184 +1,22 @@
# Overview
# mango-geyser-services
This project is about streaming Solana account updates for a specific program
into other databases or event queues.
Having an up to date version of all account data data in a database is
particularly useful for queries that need access to all accounts. For example,
retrieving the addresses of Mango Markets accounts with the largest unrealized
PnL goes from "getProgramAccounts from a Solana node for 50MB of data and
compute locally (3-10s total)" to "run a SQL query (150ms total)".
The database could also be used as a backend for serving `getMultipleAccounts`
and `getProgramAccounts` queries generally. That would reduce load on Solana
RPCt nodes while decreasing response times.
Supported Solana sources:
- Geyser plugin (preferred) plus JSONRPC HTTP API (for initial snapshots)
Unfinished Solana sources:
- JSONRPC websocket subscriptions plus JSONRPC HTTP API (for initial snapshots)
Supported targets:
- PostgreSQL
Mango v4 Geyser Services
# Components
- [`geyser-plugin-grpc/`](geyser-plugin-grpc/)
The Solana Geyser plugin. It opens a gRPC server (see [`proto/`](proto/)) and
broadcasts account and slot updates to all clients that connect.
- [`lib/`](lib/)
The connector abstractions that the connector service is built from.
Projects may want to use it to build their own connector service and decode
their specific account data before sending it into target systems.
- [`connector-raw/`](connector-raw/)
A connector binary built on lib/ that stores raw binary account data in
PostgreSQL.
- [`connector-mango/`](connector-mango/)
A connector binary built on lib/ that decodes Mango account types before
storing them in PostgeSQL.
Tools for building services
- [`service-mango-fills/`](service-mango-fills/)
A service providing lowest-latency, bandwidth conserving access to fill events
A service providing lowest-latency, bandwidth conserving access to fill events for Mango V4 Perp and Openbook markets
as they are processed by the rpc node.
# Setup Tutorial
- [`service-mango-pnl/`](service-mango-pnl/)
1. Compile the project.
A service providing pre-computed account lists ordered by unsettled PnL per market
Make sure that you are using _exactly_ the same Rust version for compiling
the Geyser plugin that was used for compiling your `solana-validator`!
Otherwise the plugin will crash the validator during startup!
- [`service-mango-orderbook/`](service-mango-pnl/)
2. Prepare the plugin configuration file.
[Here is an example](geyser-plugin-grpc/example-config.json). This file
points the validator to your plugin shared library, controls which accounts
will be exported, which address the gRPC server will bind to and internal
queue sizes.
3. Run `solana-validator` with `--geyser-plugin-config myconfig.json`.
Check the logs to ensure the plugin was loaded.
4. Prepare the connector configuration file.
[Here is an example](connector-raw/example-config.toml).
- `rpc_ws_url` is unused and can stay empty.
- `connection_string` for your `grpc_sources` must point to the gRPC server
address configured for the plugin.
- `rpc_http_url` must point to the JSON-RPC URL.
- `connection_string` for your `posgres_target` uses
[the tokio-postgres syntax](https://docs.rs/tokio-postgres/0.7.5/tokio_postgres/config/struct.Config.html)
- `program_id` must match what is configured for the gRPC plugin
5. Prepare the PostgreSQL schema.
Use [this example script](connector-raw/scripts/create_schema.sql).
6. Start the connector service binary.
Pass the path to the config file as the first argument. It logs to stdout. It
should be restarted on exit. (it intentionally terminates when postgres is
unreachable for too long, for example)
7. Monitor the logs
`WARN` messages can be recovered from. `ERROR` messages need attention.
Check the metrics for `account_write_queue` and `slot_update_queue`: They
should be around 0. If they keep growing the service can't keep up and you'll
need to figure out what's up.
# Design and Reliability
```
Solana ---------------> Connector -----------> PostgreSQL
nodes jsonrpc/gRPC nodes
```
For reliability it is recommended to feed data from multiple Solana nodes into
each Connector node.
It is also allowed to run multiple Connector nodes that target the same
PostgeSQL target database.
The Connector service is stateless (except for some caches). Restarting it is
always safe.
If the Solana node is down, the Connector service attempts to reconnect and then
requests a new data snapshot if necessary.
If PostgeSQL is down temporarily, the Connector service caches updates and
applies them when the database is back up.
If PostgreSQL is down for a longer time, the Connector service exits with an
error. On restart, it pauses until PostgreSQL is back up, and then starts
pulling data from the Solana nodes again.
# PostgreSQL data layout
See `scripts/` for SQL that creates the target schema.
The Connector streams data into the `account_write` and `slot` tables. When
slots become "rooted", older `account_write` data rooted slots is deleted. That
way the current account data for the latest rooted, confirmed or processed slot
can be queried, but older data is forgotten.
When new slots arrive, the `uncle` column is updated for "processed" and
"confirmed" slots to allow easy filtering of slots that are no longer part of
the chain.
Example for querying confirmed data:
```
SELECT DISTINCT ON(pubkey_id)
pubkey, account_write.*
FROM account_write
LEFT JOIN slot USING(slot)
INNER JOIN pubkey USING(pubkey_id)
WHERE status = 'Rooted' OR status IS NULL OR (uncle = FALSE AND status = 'Confirmed')
ORDER BY pubkey_id, slot DESC, write_version DESC;
```
For each pubkey, this gets the latest (most recent slot, most recent
write_version) account data; limited to slots that are either rooted or
(confirmed and not an uncle).
# Fill Service Setup
1. Prepare the connector configuration file.
[Here is an example](service-mango-fills/example-config.toml).
- `bind_ws_addr` is the listen port for the websocket clients
- `rpc_ws_url` is unused and can stay empty.
- `connection_string` for your `grpc_sources` must point to the gRPC server
address configured for the plugin.
- `rpc_http_url` must point to the JSON-RPC URL.
- `program_id` must match what is configured for the gRPC plugin
- `markets` need to contain all observed perp markets
2. Start the service binary.
Pass the path to the config file as the first argument. It logs to stdout. It
should be restarted on exit.
3. Monitor the logs
`WARN` messages can be recovered from. `ERROR` messages need attention. The
logs are very spammy changing the default log level is recommended when you
dont want to analyze performance of the service.
A service providing Orderbook L2 state and delta updates for Mango V4 Perp and Openbook Spot markets

23
cd/pnl.toml Normal file
View File

@ -0,0 +1,23 @@
app = "mango-pnl"
kill_signal = "SIGINT"
kill_timeout = 5
[build]
dockerfile = "../Dockerfile"
[experimental]
cmd = ["service-mango-pnl", "pnl-config.toml"]
[[services]]
internal_port = 8081
processes = ["app"]
protocol = "tcp"
[services.concurrency]
hard_limit = 1024
soft_limit = 1024
type = "connections"
[metrics]
path = "/metrics"
port = 9091

View File

@ -3,12 +3,8 @@ app = "mango-geyser-services"
kill_signal = "SIGINT"
kill_timeout = 5
# [build]
# image = "us-docker.pkg.dev/mango-markets/gcr.io/mango-geyser-services:latest"
[processes]
fills = "service-mango-fills fills-config.toml"
pnl = "service-mango-pnl pnl-config.toml"
orderbook = "service-mango-orderbook orderbook-config.toml"
[[services]]
@ -25,20 +21,6 @@ kill_timeout = 5
hard_limit = 1024
soft_limit = 1024
[[services]]
processes = ["pnl"]
internal_port = 8081
protocol = "tcp"
[[services.ports]]
handlers = ["tls", "http"]
port = 8081
[services.concurrency]
type = "connections"
hard_limit = 1024
soft_limit = 1024
[[services]]
processes = ["orderbook"]
internal_port = 8082

View File

@ -15,12 +15,16 @@ use solana_sdk::{
use std::{
borrow::BorrowMut,
collections::{HashMap, HashSet},
time::{SystemTime, UNIX_EPOCH}, mem::size_of,
mem::size_of,
time::{SystemTime, UNIX_EPOCH},
};
use crate::metrics::MetricU64;
use anchor_lang::AccountDeserialize;
use mango_v4::{state::{BookSide, OrderTreeType}, serum3_cpi::OrderBookStateHeader};
use mango_v4::{
serum3_cpi::OrderBookStateHeader,
state::{BookSide, OrderTreeType},
};
#[derive(Clone, Debug)]
pub enum OrderbookSide {
@ -40,24 +44,7 @@ impl Serialize for OrderbookSide {
}
}
#[derive(Clone, Debug)]
pub struct OrderbookLevel {
pub price: f64,
pub size: f64,
}
impl Serialize for OrderbookLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("OrderbookLevel", 2)?;
state.serialize_field("price", &self.price)?;
state.serialize_field("size", &self.size)?;
state.end()
}
}
pub type OrderbookLevel = [f64; 2];
#[derive(Clone, Debug)]
pub struct OrderbookUpdate {
@ -121,12 +108,57 @@ pub struct MarketConfig {
pub asks: Pubkey,
pub base_decimals: u8,
pub quote_decimals: u8,
pub base_lot_size: i64,
pub quote_lot_size: i64,
}
pub fn native_to_ui(native: i64, decimals: u8) -> f64 {
native as f64 / (10u64.pow(decimals.into())) as f64
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
let decimals: u32 = 3;
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
res
}
pub fn base_lots_to_ui_perp(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
let decimals: u32 = 4;
let res = native as f64 / (10i64.pow(decimals.into()) as f64);
//info!("res {} native {} base_d {} base ls {}", res, native, base_decimals, base_lot_size);
res
}
pub fn price_lots_to_ui(
native: i64,
base_decimals: u8,
quote_decimals: u8,
) -> f64 {
let decimals = base_decimals - quote_decimals;
// let res = native as f64
// * ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
// as f64;
let res = native as f64
/ (10u64.pow(decimals.into()))
as f64;
res
}
pub fn price_lots_to_ui_perp(
native: i64,
base_decimals: u8,
quote_decimals: u8,
base_lot_size: i64,
quote_lot_size: i64,
) -> f64 {
let decimals = base_decimals - quote_decimals;
let res = native as f64
* ((10u64.pow(decimals.into()) * quote_lot_size as u64) as f64 / base_lot_size as f64)
as f64;
// let res = native as f64
// / (10u64.pow(decimals.into()))
// as f64;
res
}
fn publish_changes(
slot: u64,
write_version: u64,
@ -143,14 +175,12 @@ fn publish_changes(
for previous_order in previous_bookside.iter() {
let peer = current_bookside
.iter()
.find(|level| previous_order.price == level.price);
.find(|level| previous_order[0] == level[0]);
match peer {
None => {
update.push(OrderbookLevel {
price: previous_order.price,
size: 0f64,
});
info!("removed level {}", previous_order[0]);
update.push([previous_order[0], 0f64]);
}
_ => continue,
}
@ -160,21 +190,18 @@ fn publish_changes(
for current_order in current_bookside {
let peer = previous_bookside
.iter()
.find(|item| item.price == current_order.price);
.find(|item| item[0] == current_order[0]);
match peer {
Some(previous_order) => {
if previous_order.size == current_order.size {
if previous_order[1] == current_order[1] {
continue;
}
debug!(
"size changed {} -> {}",
previous_order.size, current_order.size
);
info!("size changed {} -> {}", previous_order[1], current_order[1]);
update.push(current_order.clone());
}
None => {
debug!("new level {},{}", current_order.price, current_order.size);
info!("new level {},{}", current_order[0], current_order[1]);
update.push(current_order.clone())
}
}
@ -232,14 +259,12 @@ fn publish_changes_serum(
for previous_order in previous_bookside.iter() {
let peer = current_bookside
.iter()
.find(|level| previous_order.price == level.price);
.find(|level| previous_order[0] == level[0]);
match peer {
None => {
update.push(OrderbookLevel {
price: previous_order.price,
size: 0f64,
});
info!("removed level s {}", previous_order[0]);
update.push([previous_order[0], 0f64]);
}
_ => continue,
}
@ -249,21 +274,18 @@ fn publish_changes_serum(
for current_order in current_bookside {
let peer = previous_bookside
.iter()
.find(|item| item.price == current_order.price);
.find(|item| item[0] == current_order[0]);
match peer {
Some(previous_order) => {
if previous_order.size == current_order.size {
if previous_order[1] == current_order[1] {
continue;
}
debug!(
"size changed {} -> {}",
previous_order.size, current_order.size
);
info!("size changed {} -> {}", previous_order[1], current_order[1]);
update.push(current_order.clone());
}
None => {
debug!("new level {},{}", current_order.price, current_order.size);
info!("new level {},{}", current_order[0], current_order[1]);
update.push(current_order.clone())
}
}
@ -412,14 +434,26 @@ pub async fn init(
.map(|item| (item.node.price_data() as i64, item.node.quantity))
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel {
price: native_to_ui(price, mkt.1.quote_decimals),
size: native_to_ui(group
.map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
.map(|(price, group)| {
[
price_lots_to_ui_perp(
price,
mkt.1.base_decimals,
mkt.1.quote_decimals,
mkt.1.base_lot_size,
mkt.1.quote_lot_size,
),
base_lots_to_ui_perp(
group
.map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x),
mkt.1.base_decimals,
mkt.1.base_lot_size,
),
]
})
.collect();
let other_bookside = bookside_cache.get(&other_side_pk.to_string());
match bookside_cache.get(&side_pk_string) {
@ -471,14 +505,26 @@ pub async fn init(
let bookside: Vec<OrderbookLevel> = slab
.iter(side == 0)
.map(|item| (u64::from(item.price()) as i64, item.quantity() as i64))
.map(|item| {
(u64::from(item.price()) as i64, item.quantity() as i64)
})
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel {
price: native_to_ui(price, mkt.1.quote_decimals),
size: native_to_ui(group
.map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
.map(|(price, group)| {
[
price_lots_to_ui(
price,
mkt.1.base_decimals,
mkt.1.quote_decimals,
),
base_lots_to_ui(
group
.map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x),
mkt.1.base_decimals,
mkt.1.base_lot_size,
),
]
})
.collect();
@ -504,10 +550,7 @@ pub async fn init(
_ => info!("bookside_cache could not find {}", side_pk_string),
}
serum_bookside_cache.insert(
side_pk_string.clone(),
bookside,
);
serum_bookside_cache.insert(side_pk_string.clone(), bookside);
}
Err(_) => info!("chain_cache could not find {}", side_pk),
}

View File

@ -24,8 +24,8 @@ tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
bytemuck = "1.7.2"
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "ckamm/accountfetcher-send" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "ckamm/accountfetcher-send" }
mango-v4 = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "https://github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "https://github.com/jup-ag/openbook-program", branch = "feat/expose-things" }
anchor-lang = "0.25.0"
anchor-client = "0.25.0"

View File

@ -1,13 +1,35 @@
connector-fills
# service-mango-fills
This module parses event queues and exposes individual fills on a websocket.
TODO:
## Setup
- [] early filter out all account writes we dont care about
- [] startup logic, dont accept websockets before first snapshot
1. Prepare the connector configuration file.
[Here is an example](service-mango-fills/example-config.toml).
- `bind_ws_addr` is the listen port for the websocket clients
- `rpc_ws_url` is unused and can stay empty.
- `connection_string` for your `grpc_sources` must point to the gRPC server
address configured for the plugin.
- `rpc_http_url` must point to the JSON-RPC URL.
- `program_id` must match what is configured for the gRPC plugin
- `markets` need to contain all observed perp markets
2. Start the service binary.
Pass the path to the config file as the first argument. It logs to stdout. It
should be restarted on exit.
3. Monitor the logs
`WARN` messages can be recovered from. `ERROR` messages need attention. The
logs are very spammy changing the default log level is recommended when you
dont want to analyze performance of the service.
## TODO
- [] startup logic, dont accept market subscriptions before first snapshot
- [] failover logic, kill all websockets when we receive a later snapshot, more
frequent when running on home connections
- [] track websocket connect / disconnect
- [] track latency accountwrite -> websocket
- [] track queue length
- [] create new model for fills so snapshot maps can be combined per market

View File

@ -1,19 +1,39 @@
use anchor_client::{Cluster, solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair, account::Account}};
use anchor_client::{
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
Cluster,
};
use anchor_lang::prelude::Pubkey;
use bytemuck::cast_slice;
use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{pin_mut, SinkExt, StreamExt, future::{self, Ready}, TryStreamExt};
use futures_util::{
future::{self, Ready},
pin_mut, SinkExt, StreamExt, TryStreamExt,
};
use log::*;
use std::{collections::{HashMap, HashSet}, fs::File, io::Read, net::SocketAddr, sync::Arc, sync::Mutex, time::Duration, convert::identity, str::FromStr};
use std::{
collections::{HashMap, HashSet},
convert::identity,
fs::File,
io::Read,
net::SocketAddr,
str::FromStr,
sync::Arc,
sync::Mutex,
time::Duration,
};
use tokio::{
net::{TcpListener, TcpStream},
pin,
pin, time,
};
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize;
use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig, fill_event_filter::SerumFillCheckpoint, StatusResponse};
use solana_geyser_connector_lib::{
fill_event_filter::SerumFillCheckpoint,
metrics::{MetricType, MetricU64},
FilterConfig, StatusResponse,
};
use solana_geyser_connector_lib::{
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
@ -64,7 +84,15 @@ async fn handle_connection_error(
) {
metrics_opened_connections.clone().increment();
let result = handle_connection(checkpoint_map, serum_checkpoint_map, peer_map.clone(), market_ids, raw_stream, addr).await;
let result = handle_connection(
checkpoint_map,
serum_checkpoint_map,
peer_map.clone(),
market_ids,
raw_stream,
addr,
)
.await;
if result.is_err() {
error!("connection {} error {}", addr, result.unwrap_err());
};
@ -84,6 +112,7 @@ async fn handle_connection(
) -> Result<(), Error> {
info!("ws connected: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
let (ws_tx, ws_rx) = ws_stream.split();
// 1: publish channel in peer map
@ -99,14 +128,26 @@ async fn handle_connection(
}
let receive_commands = ws_rx.try_for_each(|msg| {
handle_commands(
addr,
msg,
peer_map.clone(),
checkpoint_map.clone(),
serum_checkpoint_map.clone(),
market_ids.clone(),
)
match msg {
Message::Text(_) => {
handle_commands(
addr,
msg,
peer_map.clone(),
checkpoint_map.clone(),
serum_checkpoint_map.clone(),
market_ids.clone(),
)
},
Message::Ping(_) => {
let peers = peer_map.clone();
let mut peers_lock = peers.lock().unwrap();
let peer = peers_lock.get_mut(&addr).expect("peer should be in map");
peer.sender.unbounded_send(Message::Pong(Vec::new())).unwrap();
future::ready(Ok(()))
}
_ => future::ready(Ok(())),
}
});
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
@ -164,7 +205,6 @@ fn handle_commands(
.unwrap();
if subscribed {
// todo: this is janky af
let checkpoint_map = checkpoint_map.lock().unwrap();
let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap();
let checkpoint = checkpoint_map.get(&market_id);
@ -184,8 +224,8 @@ fn handle_commands(
))
.unwrap();
}
None => info!("no checkpoint available on client subscription"), // todo: what to do here?
}
None => info!("no checkpoint available on client subscription"),
},
}
}
}
@ -274,10 +314,13 @@ async fn main() -> anyhow::Result<()> {
&Keypair::new(),
Some(rpc_timeout),
);
let group_context = Arc::new(MangoGroupContext::new_from_rpc(
&client.rpc_async(),
Pubkey::from_str(&config.mango_group).unwrap(),
).await?);
let group_context = Arc::new(
MangoGroupContext::new_from_rpc(
&client.rpc_async(),
Pubkey::from_str(&config.mango_group).unwrap(),
)
.await?,
);
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
.perp_markets
@ -293,7 +336,8 @@ async fn main() -> anyhow::Result<()> {
let serum_market_ais = client
.rpc_async()
.get_multiple_accounts(serum_market_pks.as_slice()).await?;
.get_multiple_accounts(serum_market_pks.as_slice())
.await?;
let serum_market_ais: Vec<&Account> = serum_market_ais
.iter()
.filter_map(|maybe_ai| match maybe_ai {
@ -309,25 +353,41 @@ async fn main() -> anyhow::Result<()> {
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
);
(serum_market_pks[pair.0], Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])))
(
serum_market_pks[pair.0],
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
)
})
.collect();
let a: Vec<(String, String)> = group_context
.serum3_markets
.iter()
.map(|(_, context)| (context.market.serum_market_external.to_string(), context.market.name().to_owned())).collect();
.map(|(_, context)| {
(
context.market.serum_market_external.to_string(),
context.market.name().to_owned(),
)
})
.collect();
let b: Vec<(String, String)> = group_context
.perp_markets
.iter()
.map(|(_, context)| (context.address.to_string(), context.market.name().to_owned())).collect();
let market_pubkey_strings: HashMap<String, String> = [a, b]
.concat()
.into_iter()
.map(|(_, context)| {
(
context.address.to_string(),
context.market.name().to_owned(),
)
})
.collect();
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
let (account_write_queue_sender, slot_queue_sender, fill_receiver) =
fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?;
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
perp_queue_pks.clone(),
serum_queue_pks.clone(),
metrics_tx.clone(),
)
.await?;
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new()));
@ -336,6 +396,7 @@ async fn main() -> anyhow::Result<()> {
let checkpoints_ref_thread = checkpoints.clone();
let serum_checkpoints_ref_thread = serum_checkpoints.clone();
let peers_ref_thread = peers.clone();
let peers_ref_thread1 = peers.clone();
// filleventfilter websocket sink
tokio::spawn(async move {
@ -348,15 +409,12 @@ async fn main() -> anyhow::Result<()> {
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
// only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market) {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!(
"ws update {} fill could not reach {}",
update.market, addr
);
error!("ws update {} fill could not reach {}", update.market, addr);
}
}
}
@ -369,18 +427,15 @@ async fn main() -> anyhow::Result<()> {
}
FillEventFilterMessage::SerumUpdate(update) => {
debug!("ws update {} {:?} serum fill", update.market, update.status);
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() {
let mut peers_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peers_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
// only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market) {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!(
"ws update {} fill could not reach {}",
update.market, addr
);
error!("ws update {} fill could not reach {}", update.market, addr);
}
}
}
@ -398,6 +453,7 @@ async fn main() -> anyhow::Result<()> {
info!("ws listen: {}", config.bind_ws_addr);
let try_socket = TcpListener::bind(&config.bind_ws_addr).await;
let listener = try_socket.expect("Failed to bind");
{
tokio::spawn(async move {
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
@ -413,7 +469,26 @@ async fn main() -> anyhow::Result<()> {
));
}
});
}
// keepalive
{
tokio::spawn(async move {
let mut write_interval = time::interval(time::Duration::from_secs(30));
loop {
write_interval.tick().await;
let peers_copy = peers_ref_thread1.lock().unwrap().clone();
for (addr, peer) in peers_copy.iter() {
let pl = Vec::new();
let result = peer.clone().sender.send(Message::Ping(pl)).await;
if result.is_err() {
error!("ws ping could not reach {}", addr);
}
}
}
});
}
info!(
"rpc connect: {}",
config

View File

@ -0,0 +1,35 @@
# service-mango-orderbook
This module parses bookside accounts and exposes L2 data and updates on a websocket
## Setup
1. Prepare the connector configuration file.
[Here is an example](service-mango-fills/example-config.toml).
- `bind_ws_addr` is the listen port for the websocket clients
- `rpc_ws_url` is unused and can stay empty.
- `connection_string` for your `grpc_sources` must point to the gRPC server
address configured for the plugin.
- `rpc_http_url` must point to the JSON-RPC URL.
- `program_id` must match what is configured for the gRPC plugin
- `markets` need to contain all observed perp markets
2. Start the service binary.
Pass the path to the config file as the first argument. It logs to stdout. It
should be restarted on exit.
3. Monitor the logs
`WARN` messages can be recovered from. `ERROR` messages need attention. The
logs are very spammy changing the default log level is recommended when you
dont want to analyze performance of the service.
## TODO
- [] startup logic, dont accept market subscriptions before first snapshot
- [] failover logic, kill all websockets when we receive a later snapshot, more
frequent when running on home connections
- [] track latency accountwrite -> websocket
- [] create new model for fills so snapshot maps can be combined per market

View File

@ -307,6 +307,8 @@ async fn main() -> anyhow::Result<()> {
asks: context.market.asks,
base_decimals: context.market.base_decimals,
quote_decimals,
base_lot_size: context.market.base_lot_size,
quote_lot_size: context.market.quote_lot_size,
},
)
})
@ -325,13 +327,15 @@ async fn main() -> anyhow::Result<()> {
None => panic!("token not found for market") // todo: default to 6 for usdc?
};
(
context.address,
context.market.serum_market_external,
MarketConfig {
name: context.market.name().to_owned(),
bids: context.bids,
asks: context.asks,
base_decimals,
quote_decimals,
base_lot_size: context.pc_lot_size as i64,
quote_lot_size: context.coin_lot_size as i64,
},
)
})