Orderbook Feed Fixes (#3)

- Use GHCR for publishing the Docker image
- Orderbook Feed Fixes:
  - Fix serum prices
  - Read rpc url from env
  - Add keepalives
  - Add exit signal
  - Enable serum markets
  - Reduce info logging
-Refactor TS client library and add Orderbook feed
This commit is contained in:
riordanp 2023-04-25 16:16:20 +01:00 committed by GitHub
parent bc78b86cec
commit bbf6927159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 378 additions and 155 deletions

View File

@ -1 +1,3 @@
target
target
node_modules
dist

View File

@ -1,18 +1,13 @@
name: Publish Docker Image to GCR
name: Publish Docker Image
on:
push:
branches: [main, master, production]
workflow_call:
secrets:
GCR_PROJECT:
required: false
GCR_SA_KEY:
required: false
branches: [main]
env:
PROJECT_ID: ${{ secrets.GCR_PROJECT }}
IMAGE: mango-geyser-services
IMAGE: mango-feeds
ORG: blockworks-foundation
REGISTRY: ghcr.io
jobs:
build:
@ -23,25 +18,21 @@ jobs:
with:
submodules: recursive
- name: Set up Docker Buildx
# Use docker buildx
- name: Use docker buildx
uses: docker/setup-buildx-action@v2
id: buildx
uses: docker/setup-buildx-action@master
# Login to Google Cloud
- name: 'Login to Google Cloud'
uses: 'google-github-actions/auth@v0'
id: auth
with:
token_format: 'access_token'
credentials_json: '${{ secrets.GCR_SA_KEY }}'
install: true
buildkitd-flags: --debug
# Login to GCR
- name: Login to GCR
# Login to Registry
- name: Login to Registry
uses: docker/login-action@v2
with:
registry: us-docker.pkg.dev
username: oauth2accesstoken
password: ${{ steps.auth.outputs.access_token }}
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# Build and push the image
- name: Build and Push Image
@ -50,7 +41,7 @@ jobs:
context: .
push: true
tags: |
us-docker.pkg.dev/${{ env.PROJECT_ID }}/gcr.io/${{ env.IMAGE }}:${{ github.sha }}
us-docker.pkg.dev/${{ env.PROJECT_ID }}/gcr.io/${{ env.IMAGE }}:latest
${{ env.REGISTRY }}/${{ env.ORG }}/${{ env.IMAGE }}:${{ github.sha }}
${{ env.REGISTRY }}/${{ env.ORG }}/${{ env.IMAGE }}:latest
cache-from: type=gha
cache-to: type=gha,mode=max

View File

@ -2,7 +2,7 @@ name: Deploy to Fly
on:
workflow_run:
workflows: ["Publish Docker Image to GCR"]
workflows: ["Publish Docker Image"]
branches: [production]
types:
- completed

View File

@ -9,7 +9,7 @@ kill_timeout = 5
cmd = ["service-mango-orderbook", "orderbook-config.toml"]
[[services]]
internal_port = 8082
internal_port = 8080
processes = ["app"]
protocol = "tcp"

View File

@ -94,7 +94,13 @@ pub struct MarketConfig {
pub quote_lot_size: i64,
}
pub fn base_lots_to_ui(native: i64, base_decimals: u8, base_lot_size: i64) -> f64 {
pub fn base_lots_to_ui(
native: i64,
base_decimals: u8,
_quote_decimals: u8,
base_lot_size: i64,
_quote_lot_size: i64,
) -> f64 {
(native * base_lot_size) as f64 / 10i64.pow(base_decimals.into()) as f64
}
@ -102,9 +108,20 @@ pub fn base_lots_to_ui_perp(native: i64, decimals: u8, base_lot_size: i64) -> f6
native as f64 * (base_lot_size as f64 / (10i64.pow(decimals.into()) as f64))
}
pub fn price_lots_to_ui(native: i64, base_decimals: u8, quote_decimals: u8) -> f64 {
let decimals = base_decimals - quote_decimals;
native as f64 / (10u64.pow(decimals.into())) as f64
pub fn price_lots_to_ui(
native: i64,
base_decimals: u8,
quote_decimals: u8,
base_lot_size: i64,
quote_lot_size: i64,
) -> f64 {
let base_multiplier = 10i64.pow(base_decimals.into());
let quote_multiplier = 10i64.pow(quote_decimals.into());
let left: u128 = native as u128 * quote_lot_size as u128 * base_multiplier as u128;
let right: u128 = base_lot_size as u128 * quote_multiplier as u128;
left as f64 / right as f64
}
pub fn spot_price_to_ui(

View File

@ -22,6 +22,7 @@ use mango_v4_client::{Client, MangoGroupContext, TransactionBuilderConfig};
use service_mango_fills::{Command, FillCheckpoint, FillEventFilterMessage, FillEventType};
use std::{
collections::{HashMap, HashSet},
env,
fs::File,
io::Read,
net::SocketAddr,
@ -353,7 +354,10 @@ async fn main() -> anyhow::Result<()> {
let metrics_closed_connections =
metrics_tx.register_u64("fills_feed_closed_connections".into(), MetricType::Counter);
let rpc_url = config.rpc_http_url;
let rpc_url = match &config.rpc_http_url.chars().next().unwrap() {
'$' => env::var(&config.rpc_http_url[1..]).expect("reading rpc http url from env"),
_ => config.rpc_http_url.clone(),
};
let ws_url = rpc_url.replace("https", "wss");
let rpc_timeout = Duration::from_secs(10);
let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone());

View File

@ -24,7 +24,7 @@ use std::{
};
use tokio::{
net::{TcpListener, TcpStream},
pin,
pin, time,
};
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
@ -131,14 +131,24 @@ async fn handle_connection(
);
}
let receive_commands = ws_rx.try_for_each(|msg| {
handle_commands(
let receive_commands = ws_rx.try_for_each(|msg| match msg {
Message::Text(_) => handle_commands(
addr,
msg,
peer_map.clone(),
checkpoint_map.clone(),
market_ids.clone(),
)
),
Message::Ping(_) => {
let peers = peer_map.clone();
let mut peers_lock = peers.lock().unwrap();
let peer = peers_lock.get_mut(&addr).expect("peer should be in map");
peer.sender
.unbounded_send(Message::Pong(Vec::new()))
.unwrap();
future::ready(Ok(()))
}
_ => future::ready(Ok(())),
});
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
@ -344,8 +354,8 @@ async fn main() -> anyhow::Result<()> {
event_queue: context.event_q,
base_decimals,
quote_decimals,
base_lot_size: context.pc_lot_size as i64,
quote_lot_size: context.coin_lot_size as i64,
base_lot_size: context.coin_lot_size as i64,
quote_lot_size: context.pc_lot_size as i64,
},
)
})
@ -370,6 +380,7 @@ async fn main() -> anyhow::Result<()> {
let checkpoints_ref_thread = checkpoints.clone();
let peers_ref_thread = peers.clone();
let peers_ref_thread1 = peers.clone();
tokio::spawn(async move {
pin!(orderbook_receiver);
@ -422,6 +433,35 @@ async fn main() -> anyhow::Result<()> {
}
});
// keepalive
{
tokio::spawn(async move {
let mut write_interval = time::interval(time::Duration::from_secs(30));
loop {
write_interval.tick().await;
let peers_copy = peers_ref_thread1.lock().unwrap().clone();
for (addr, peer) in peers_copy.iter() {
let pl = Vec::new();
let result = peer.clone().sender.send(Message::Ping(pl)).await;
if result.is_err() {
error!("ws ping could not reach {}", addr);
}
}
}
});
}
// // handle sigint
// {
// let exit = exit.clone();
// tokio::spawn(async move {
// tokio::signal::ctrl_c().await.unwrap();
// info!("Received SIGINT, shutting down...");
// exit.store(true, Ordering::Relaxed);
// });
// }
info!(
"rpc connect: {}",
config
@ -432,7 +472,7 @@ async fn main() -> anyhow::Result<()> {
.collect::<String>()
);
let relevant_pubkeys = [market_configs.clone()]
let relevant_pubkeys = [market_configs.clone(), serum_market_configs.clone()]
.concat()
.iter()
.flat_map(|m| [m.1.bids.to_string(), m.1.asks.to_string()])

View File

@ -46,7 +46,7 @@ fn publish_changes(
let mut update: Vec<OrderbookLevel> = vec![];
// push diff for levels that are no longer present
if current_bookside.len() != previous_bookside.len() {
info!(
debug!(
"L {}",
current_bookside.len() as i64 - previous_bookside.len() as i64
)
@ -59,7 +59,7 @@ fn publish_changes(
match peer {
None => {
info!("R {} {}", previous_order[0], previous_order[1]);
debug!("R {} {}", previous_order[0], previous_order[1]);
update.push([previous_order[0], 0f64]);
}
_ => continue,
@ -77,14 +77,14 @@ fn publish_changes(
if previous_order[1] == current_order[1] {
continue;
}
info!(
debug!(
"C {} {} -> {}",
current_order[0], previous_order[1], current_order[1]
);
update.push(*current_order);
}
None => {
info!("A {} {}", current_order[0], current_order[1]);
debug!("A {} {}", current_order[0], current_order[1]);
update.push(*current_order)
}
}
@ -323,11 +323,15 @@ pub async fn init(
price,
mkt.1.base_decimals,
mkt.1.quote_decimals,
mkt.1.base_lot_size,
mkt.1.quote_lot_size,
),
base_lots_to_ui(
group.map(|(_, quantity)| quantity).sum(),
mkt.1.base_decimals,
mkt.1.quote_decimals,
mkt.1.base_lot_size,
mkt.1.quote_lot_size,
),
]
})

View File

@ -4,7 +4,7 @@ const RECONNECT_INTERVAL_MS = 1000;
const RECONNECT_ATTEMPTS_MAX = -1;
// Subscribe on connection
const fillsFeed = new FillsFeed('ws://localhost:8080', {
const fillsFeed = new FillsFeed('wss://api.mngo.cloud/fills/v1/', {
reconnectionIntervalMs: RECONNECT_INTERVAL_MS,
reconnectionMaxAttempts: RECONNECT_ATTEMPTS_MAX,
subscriptions: {

View File

@ -0,0 +1,35 @@
import { OrderbookFeed } from '../src';
const RECONNECT_INTERVAL_MS = 1000;
const RECONNECT_ATTEMPTS_MAX = -1;
// Subscribe on connection
const orderbookFeed = new OrderbookFeed('wss://api.mngo.cloud/orderbook/v1/', {
reconnectionIntervalMs: RECONNECT_INTERVAL_MS,
reconnectionMaxAttempts: RECONNECT_ATTEMPTS_MAX,
subscriptions: {
marketId: '9XJt2tvSZghsMAhWto1VuPBrwXsiimPtsTR8XwGgDxK2',
},
});
// Subscribe after connection
orderbookFeed.onConnect(() => {
console.log('connected');
orderbookFeed.subscribe({
marketId: 'ESdnpnNLgTkBCZRuTJkZLi5wKEZ2z47SG3PJrhundSQ2',
});
});
orderbookFeed.onDisconnect(() => {
console.log(`disconnected, reconnecting in ${RECONNECT_INTERVAL_MS}...`);
});
orderbookFeed.onL2Update((update) => {
console.log('update', update)
})
orderbookFeed.onL2Checkpoint((checkpoint) => {
console.log('checkpoint', checkpoint)
})
orderbookFeed.onStatus((update) => {
console.log('status', update)
})

View File

@ -1,6 +1,4 @@
import ws from 'ws';
const WebSocket = global.WebSocket || ws;
import { ReconnectingWebsocketFeed } from './util';
interface FillsFeedOptions {
subscriptions?: FillsFeedSubscribeParams;
@ -57,99 +55,35 @@ function isHeadUpdate(obj: any): obj is HeadUpdate {
return obj.head !== undefined;
}
interface StatusMessage {
success: boolean;
message: string;
}
function isStatusMessage(obj: any): obj is StatusMessage {
return obj.success !== undefined;
}
export class FillsFeed {
private _url: string;
private _socket: WebSocket;
export class FillsFeed extends ReconnectingWebsocketFeed {
private _subscriptions?: FillsFeedSubscribeParams;
private _connected: boolean;
private _reconnectionIntervalMs;
private _reconnectionAttempts;
private _reconnectionMaxAttempts;
private _onConnect: (() => void) | null = null;
private _onDisconnect:
| ((reconnectionAttemptsExhausted: boolean) => void)
| null = null;
private _onFill: ((update: FillEventUpdate) => void) | null = null;
private _onHead: ((update: HeadUpdate) => void) | null = null;
private _onStatus: ((update: StatusMessage) => void) | null = null;
constructor(url: string, options?: FillsFeedOptions) {
this._url = url;
this._subscriptions = options?.subscriptions;
this._reconnectionIntervalMs = options?.reconnectionIntervalMs ?? 5000;
this._reconnectionAttempts = 0;
this._reconnectionMaxAttempts = options?.reconnectionMaxAttempts ?? -1;
this._connect();
}
private _reconnectionAttemptsExhausted(): boolean {
return (
this._reconnectionMaxAttempts != -1 &&
this._reconnectionAttempts >= this._reconnectionMaxAttempts
super(
url,
options?.reconnectionIntervalMs,
options?.reconnectionMaxAttempts,
);
}
this._subscriptions = options?.subscriptions;
private _connect() {
this._socket = new WebSocket(this._url);
this._socket.addEventListener('error', (err: any) => {
console.warn(`[FillsFeed] connection error: ${err.message}`);
if (this._reconnectionAttemptsExhausted()) {
console.error('[FillsFeed] fatal connection error');
throw err.error;
this.onMessage((data: any) => {
if (isFillEventUpdate(data) && this._onFill) {
this._onFill(data);
} else if (isHeadUpdate(data) && this._onHead) {
this._onHead(data);
}
});
this._socket.addEventListener('open', () => {
if (this._subscriptions !== undefined) {
this.subscribe(this._subscriptions);
}
this._connected = true;
this._reconnectionAttempts = 0;
if (this._onConnect) this._onConnect();
});
this._socket.addEventListener('close', () => {
this._connected = false;
setTimeout(() => {
if (!this._reconnectionAttemptsExhausted()) {
this._reconnectionAttempts++;
this._connect();
}
}, this._reconnectionIntervalMs);
if (this._onDisconnect)
this._onDisconnect(this._reconnectionAttemptsExhausted());
});
this._socket.addEventListener('message', (msg: any) => {
try {
const data = JSON.parse(msg.data);
if (isFillEventUpdate(data) && this._onFill) {
this._onFill(data);
} else if (isHeadUpdate(data) && this._onHead) {
this._onHead(data);
} else if (isStatusMessage(data) && this._onStatus) {
this._onStatus(data);
}
} catch (err) {
console.warn('[FillsFeed] error deserializing message', err);
}
});
if (this._subscriptions !== undefined) {
this.subscribe(this._subscriptions);
}
}
public subscribe(subscriptions: FillsFeedSubscribeParams) {
if (this._connected) {
if (this.connected()) {
this._socket.send(
JSON.stringify({
command: 'subscribe',
@ -162,7 +96,7 @@ export class FillsFeed {
}
public unsubscribe(marketId: string) {
if (this._connected) {
if (this.connected()) {
this._socket.send(
JSON.stringify({
command: 'unsubscribe',
@ -174,27 +108,6 @@ export class FillsFeed {
}
}
public disconnect() {
if (this._connected) {
this._socket.close();
this._connected = false;
} else {
console.warn('[FillsFeed] attempt to disconnect when not connected');
}
}
public connected(): boolean {
return this._connected;
}
public onConnect(callback: () => void) {
this._onConnect = callback;
}
public onDisconnect(callback: () => void) {
this._onDisconnect = callback;
}
public onFill(callback: (update: FillEventUpdate) => void) {
this._onFill = callback;
}
@ -202,8 +115,4 @@ export class FillsFeed {
public onHead(callback: (update: HeadUpdate) => void) {
this._onHead = callback;
}
public onStatus(callback: (update: StatusMessage) => void) {
this._onStatus = callback;
}
}

View File

@ -1 +1,4 @@
export * from './fills';
import { FillsFeed } from './fills';
import { OrderbookFeed } from './orderbook';
export { FillsFeed, OrderbookFeed };

View File

@ -0,0 +1,100 @@
import { ReconnectingWebsocketFeed } from './util';
interface OrderbookFeedOptions {
subscriptions?: OrderbookFeedSubscribeParams;
reconnectionIntervalMs?: number;
reconnectionMaxAttempts?: number;
}
interface OrderbookFeedSubscribeParams {
marketId?: string;
marketIds?: string[];
}
interface OrderbookL2Update {
market: string,
side: 'bid' | 'ask',
update: [number, number][],
slot: number,
writeVersion: number,
}
function isOrderbookL2Update(obj: any): obj is OrderbookL2Update {
return obj.update !== undefined;
}
interface OrderbookL2Checkpoint {
market: string,
side: 'bid' | 'ask',
bids: [number, number][],
asks: [number, number][],
slot: number,
writeVersion: number,
}
function isOrderbookL2Checkpoint(obj: any): obj is OrderbookL2Checkpoint {
return obj.bids !== undefined && obj.asks !== undefined;
}
export class OrderbookFeed extends ReconnectingWebsocketFeed {
private _subscriptions?: OrderbookFeedSubscribeParams;
private _onL2Update: ((update: OrderbookL2Update) => void) | null = null;
private _onL2Checkpoint: ((update: OrderbookL2Checkpoint) => void) | null =
null;
constructor(url: string, options?: OrderbookFeedOptions) {
super(
url,
options?.reconnectionIntervalMs,
options?.reconnectionMaxAttempts,
);
this._subscriptions = options?.subscriptions;
this.onMessage((data) => {
if (isOrderbookL2Update(data) && this._onL2Update) {
this._onL2Update(data);
} else if (isOrderbookL2Checkpoint(data) && this._onL2Checkpoint) {
this._onL2Checkpoint(data);
}
});
if (this._subscriptions !== undefined) {
this.subscribe(this._subscriptions);
}
}
public subscribe(subscriptions: OrderbookFeedSubscribeParams) {
if (this.connected()) {
this._socket.send(
JSON.stringify({
command: 'subscribe',
...subscriptions,
}),
);
} else {
console.warn('[OrderbookFeed] attempt to subscribe when not connected');
}
}
public unsubscribe(marketId: string) {
if (this.connected()) {
this._socket.send(
JSON.stringify({
command: 'unsubscribe',
marketId,
}),
);
} else {
console.warn('[OrderbookFeed] attempt to unsubscribe when not connected');
}
}
public onL2Update(callback: (update: OrderbookL2Update) => void) {
this._onL2Update = callback;
}
public onL2Checkpoint(callback: (checkpoint: OrderbookL2Checkpoint) => void) {
this._onL2Checkpoint = callback;
}
}

118
ts/client/src/util.ts Normal file
View File

@ -0,0 +1,118 @@
import ws from 'ws';
const WebSocket = global.WebSocket || ws;
interface StatusMessage {
success: boolean;
message: string;
}
function isStatusMessage(obj: any): obj is StatusMessage {
return obj.success !== undefined;
}
export class ReconnectingWebsocketFeed {
private _url: string;
protected _socket: WebSocket;
private _connected: boolean;
private _reconnectionIntervalMs: number;
private _reconnectionMaxAttempts: number;
private _reconnectionAttempts: number;
private _onConnect: (() => void) | null = null;
private _onDisconnect:
| ((reconnectionAttemptsExhausted: boolean) => void)
| null = null;
private _onStatus: ((update: StatusMessage) => void) | null = null;
private _onMessage: ((data: any) => void) | null = null;
constructor(
url: string,
reconnectionIntervalMs?: number,
reconnectionMaxAttempts?: number,
) {
this._url = url;
this._reconnectionIntervalMs = reconnectionIntervalMs ?? 5000;
this._reconnectionMaxAttempts = reconnectionMaxAttempts ?? -1;
this._reconnectionAttempts = 0;
this._connect();
}
public disconnect() {
if (this._connected) {
this._socket.close();
this._connected = false;
}
}
public connected(): boolean {
return this._connected;
}
public onConnect(callback: () => void) {
this._onConnect = callback;
}
public onDisconnect(callback: (reconnectionAttemptsExhausted: boolean) => void) {
this._onDisconnect = callback;
}
public onStatus(callback: (update: StatusMessage) => void) {
this._onStatus = callback;
}
protected onMessage(callback: (data: any) => void) {
this._onMessage = callback;
}
private _connect() {
this._socket = new WebSocket(this._url);
this._socket.addEventListener('error', (err: any) => {
console.warn(`[MangoFeed] connection error: ${err.message}`);
if (this._reconnectionAttemptsExhausted()) {
console.error('[MangoFeed] fatal connection error');
throw err.error;
}
});
this._socket.addEventListener('open', () => {
this._connected = true;
this._reconnectionAttempts = 0;
if (this._onConnect) this._onConnect();
});
this._socket.addEventListener('close', () => {
this._connected = false;
setTimeout(() => {
if (!this._reconnectionAttemptsExhausted()) {
this._reconnectionAttempts++;
this._connect();
}
}, this._reconnectionIntervalMs);
if (this._onDisconnect)
this._onDisconnect(this._reconnectionAttemptsExhausted());
});
this._socket.addEventListener('message', (msg: any) => {
try {
const data = JSON.parse(msg.data);
if (isStatusMessage(data) && this._onStatus) {
this._onStatus(data);
} else if (this._onMessage) {
this._onMessage(data);
}
} catch (err) {
console.warn('[MangoFeed] error deserializing message', err);
}
});
}
private _reconnectionAttemptsExhausted(): boolean {
return (
this._reconnectionMaxAttempts != -1 &&
this._reconnectionAttempts >= this._reconnectionMaxAttempts
);
}
}