Update to mango-v4, update fills feed

This commit is contained in:
Riordan Panayides 2022-11-16 14:57:50 +00:00
parent 2a15d96424
commit cf26a885f2
12 changed files with 871 additions and 631 deletions

View File

@ -1,3 +0,0 @@
llvm-dev
libclang-dev
clang

1406
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -12,4 +12,9 @@ members = [
[patch.crates-io] [patch.crates-io]
# for gzip encoded responses # for gzip encoded responses
jsonrpc-core-client = { git = "https://github.com/ckamm/jsonrpc.git", branch = "ckamm/http-with-gzip" } jsonrpc-core-client = { git = "https://github.com/ckamm/jsonrpc.git", branch = "ckamm/http-with-gzip" }
anchor-spl = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "solana-1.10.35" }
anchor-lang = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "solana-1.10.35" }
[patch.'https://github.com/blockworks-foundation/serum-dex.git']
serum_dex = { git = 'ssh://git@github.com/blockworks-foundation/serum-dex.git', branch = "v4-compat" }

View File

@ -8,6 +8,7 @@ WORKDIR /app
FROM base AS plan FROM base AS plan
COPY . . COPY . .
WORKDIR /app
RUN cargo chef prepare --recipe-path recipe.json RUN cargo chef prepare --recipe-path recipe.json
FROM base as build FROM base as build

View File

@ -1,2 +0,0 @@
connector-mango-1: ./heroku/run.sh CONFIG_FILE_1
connector-mango-2: ./heroku/run.sh CONFIG_FILE_2

View File

@ -19,5 +19,4 @@ tokio-postgres = "0.7.4"
postgres-types = { version = "0.2", features = ["array-impls", "derive"] } postgres-types = { version = "0.2", features = ["array-impls", "derive"] }
postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" } postgres_query = { git = "https://github.com/nolanderc/rust-postgres-query", rev = "b4422051c8a31fbba4a35f88004c1cefb1878dd5" }
mango = { git = "https://github.com/blockworks-foundation/mango-v3", branch = "pan/solana-1.10" } mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3", branch = "pan/solana-1.10" }

View File

@ -16,7 +16,7 @@ solana-client = "=1.10.35"
solana-account-decoder = "=1.10.35" solana-account-decoder = "=1.10.35"
solana-sdk = "=1.10.35" solana-sdk = "=1.10.35"
mango = { git = "https://github.com/blockworks-foundation/mango-v3", branch = "pan/solana-1.10" } mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
arrayref = "*" arrayref = "*"
bytemuck = "*" bytemuck = "*"
fixed = { version = "*", features = ["serde"] } fixed = { version = "*", features = ["serde"] }
@ -55,6 +55,8 @@ async-trait = "0.1"
warp = "0.3" warp = "0.3"
anchor-lang = "0.25.0"
solana-geyser-connector-plugin-grpc = { path = "../geyser-plugin-grpc" } solana-geyser-connector-plugin-grpc = { path = "../geyser-plugin-grpc" }
[build-dependencies] [build-dependencies]

View File

@ -11,6 +11,7 @@ use solana_sdk::{
pubkey::Pubkey, pubkey::Pubkey,
}; };
use std::{ use std::{
borrow::BorrowMut,
cmp::max, cmp::max,
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
mem::size_of, mem::size_of,
@ -18,8 +19,11 @@ use std::{
}; };
use crate::metrics::MetricU64; use crate::metrics::MetricU64;
use anchor_lang::AccountDeserialize;
use arrayref::array_ref; use arrayref::array_ref;
use mango::queue::{AnyEvent, EventQueueHeader, EventType, FillEvent}; use mango_v4::state::{
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent, MAX_NUM_EVENTS,
};
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
pub struct MarketConfig { pub struct MarketConfig {
@ -98,9 +102,7 @@ pub enum FillEventFilterMessage {
} }
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue // couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
const EVENT_SIZE: usize = 200; //size_of::<AnyEvent>(); type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
const QUEUE_LEN: usize = 256;
type EventQueueEvents = [AnyEvent; QUEUE_LEN];
fn publish_changes( fn publish_changes(
slot: u64, slot: u64,
@ -108,7 +110,7 @@ fn publish_changes(
mkt: &MarketConfig, mkt: &MarketConfig,
header: &EventQueueHeader, header: &EventQueueHeader,
events: &EventQueueEvents, events: &EventQueueEvents,
old_seq_num: usize, old_seq_num: u64,
old_events: &EventQueueEvents, old_events: &EventQueueEvents,
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>, fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
metric_events_new: &mut MetricU64, metric_events_new: &mut MetricU64,
@ -116,11 +118,12 @@ fn publish_changes(
metric_events_drop: &mut MetricU64, metric_events_drop: &mut MetricU64,
) { ) {
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available // seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
let start_seq_num = max(old_seq_num, header.seq_num) - QUEUE_LEN; let start_seq_num = max(old_seq_num, header.seq_num)
.checked_sub(MAX_NUM_EVENTS as u64)
.unwrap_or(0);
let mut checkpoint = Vec::new(); let mut checkpoint = Vec::new();
for seq_num in start_seq_num..header.seq_num { for seq_num in start_seq_num..header.seq_num {
let idx = seq_num % QUEUE_LEN; let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
// there are three possible cases: // there are three possible cases:
// 1) the event is past the old seq num, hence guaranteed new event // 1) the event is past the old seq num, hence guaranteed new event
@ -201,7 +204,7 @@ fn publish_changes(
// in case queue size shrunk due to a fork we need revoke all previous fills // in case queue size shrunk due to a fork we need revoke all previous fills
for seq_num in header.seq_num..old_seq_num { for seq_num in header.seq_num..old_seq_num {
let idx = seq_num % QUEUE_LEN; let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
debug!( debug!(
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
@ -328,14 +331,9 @@ pub async fn init(
let account = &account_info.account; let account = &account_info.account;
const HEADER_SIZE: usize = size_of::<EventQueueHeader>(); let event_queue =
let header_data = array_ref![account.data(), 0, HEADER_SIZE]; EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
let header: &EventQueueHeader = bytemuck::from_bytes(header_data); trace!("evq {} seq_num {}", mkt.name, event_queue.header.seq_num);
trace!("evq {} seq_num {}", mkt.name, header.seq_num);
const QUEUE_SIZE: usize = EVENT_SIZE * QUEUE_LEN;
let events_data = array_ref![account.data(), HEADER_SIZE, QUEUE_SIZE];
let events: &EventQueueEvents = bytemuck::from_bytes(events_data);
match seq_num_cache.get(&mkt.event_queue) { match seq_num_cache.get(&mkt.event_queue) {
Some(old_seq_num) => match events_cache.get(&mkt.event_queue) { Some(old_seq_num) => match events_cache.get(&mkt.event_queue) {
@ -343,8 +341,8 @@ pub async fn init(
account_info.slot, account_info.slot,
account_info.write_version, account_info.write_version,
mkt, mkt,
header, &event_queue.header,
events, &event_queue.buf,
*old_seq_num, *old_seq_num,
old_events, old_events,
&fill_update_sender, &fill_update_sender,
@ -357,8 +355,9 @@ pub async fn init(
_ => info!("seq_num_cache could not find {}", mkt.name), _ => info!("seq_num_cache could not find {}", mkt.name),
} }
seq_num_cache.insert(mkt.event_queue.clone(), header.seq_num.clone()); seq_num_cache
events_cache.insert(mkt.event_queue.clone(), events.clone()); .insert(mkt.event_queue.clone(), event_queue.header.seq_num.clone());
events_cache.insert(mkt.event_queue.clone(), event_queue.buf.clone());
} }
Err(_) => info!("chain_cache could not find {}", mkt.name), Err(_) => info!("chain_cache could not find {}", mkt.name),
} }

View File

@ -140,11 +140,11 @@ pub async fn process_events(
// copy websocket updates into the postgres account write queue // copy websocket updates into the postgres account write queue
loop { loop {
let update = update_receiver.recv().await.unwrap(); let update = update_receiver.recv().await.unwrap();
info!("got update message"); trace!("got update message");
match update { match update {
WebsocketMessage::SingleUpdate(update) => { WebsocketMessage::SingleUpdate(update) => {
info!("single update"); trace!("single update");
let account: Account = update.value.account.decode().unwrap(); let account: Account = update.value.account.decode().unwrap();
let pubkey = Pubkey::from_str(&update.value.pubkey).unwrap(); let pubkey = Pubkey::from_str(&update.value.pubkey).unwrap();
account_write_queue_sender account_write_queue_sender
@ -153,7 +153,7 @@ pub async fn process_events(
.expect("send success"); .expect("send success");
} }
WebsocketMessage::SnapshotUpdate(update) => { WebsocketMessage::SnapshotUpdate(update) => {
info!("snapshot update"); trace!("snapshot update");
for keyed_account in update.value { for keyed_account in update.value {
let account: Account = keyed_account.account.decode().unwrap(); let account: Account = keyed_account.account.decode().unwrap();
let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap(); let pubkey = Pubkey::from_str(&keyed_account.pubkey).unwrap();
@ -164,7 +164,7 @@ pub async fn process_events(
} }
} }
WebsocketMessage::SlotUpdate(update) => { WebsocketMessage::SlotUpdate(update) => {
info!("slot update"); trace!("slot update");
let message = match *update { let message = match *update {
solana_client::rpc_response::SlotUpdate::CreatedBank { solana_client::rpc_response::SlotUpdate::CreatedBank {
slot, parent, .. slot, parent, ..

View File

@ -177,7 +177,7 @@ async fn main() -> anyhow::Result<()> {
.map(|c| c.connection_string.clone()) .map(|c| c.connection_string.clone())
.collect::<String>() .collect::<String>()
); );
let use_geyser = true; let use_geyser = false;
if use_geyser { if use_geyser {
grpc_plugin_source::process_events( grpc_plugin_source::process_events(
&config.source, &config.source,

View File

@ -21,6 +21,4 @@ tokio = { version = "1", features = ["full"] }
serde = "1.0.130" serde = "1.0.130"
serde_derive = "1.0.130" serde_derive = "1.0.130"
mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
mango = { git = "https://github.com/blockworks-foundation/mango-v3", branch = "pan/solana-1.10" }
mango-common = { git = "https://github.com/blockworks-foundation/mango-v3", branch = "pan/solana-1.10" }

View File

@ -114,7 +114,9 @@ fn start_pnl_updater(
} }
*pnl_data.write().unwrap() = pnls; *pnl_data.write().unwrap() = pnls;
metrics_pnls_tracked.clone().set(pnl_data.read().unwrap().len() as u64) metrics_pnls_tracked
.clone()
.set(pnl_data.read().unwrap().len() as u64)
} }
}); });
} }
@ -211,17 +213,26 @@ async fn main() -> anyhow::Result<()> {
let metrics_reqs = let metrics_reqs =
metrics_tx.register_u64("pnl_jsonrpc_reqs_total".into(), MetricType::Counter); metrics_tx.register_u64("pnl_jsonrpc_reqs_total".into(), MetricType::Counter);
let metrics_invalid_reqs = let metrics_invalid_reqs =
metrics_tx.register_u64("pnl_jsonrpc_reqs_invalid_total".into(), MetricType::Counter); metrics_tx.register_u64("pnl_jsonrpc_reqs_invalid_total".into(), MetricType::Counter);
let metrics_pnls_tracked = let metrics_pnls_tracked = metrics_tx.register_u64("pnl_num_tracked".into(), MetricType::Gauge);
metrics_tx.register_u64("pnl_num_tracked".into(), MetricType::Gauge);
let chain_data = Arc::new(RwLock::new(ChainData::new())); let chain_data = Arc::new(RwLock::new(ChainData::new()));
let pnl_data = Arc::new(RwLock::new(PnlData::new())); let pnl_data = Arc::new(RwLock::new(PnlData::new()));
start_pnl_updater(config.pnl.clone(), chain_data.clone(), pnl_data.clone(), metrics_pnls_tracked); start_pnl_updater(
config.pnl.clone(),
chain_data.clone(),
pnl_data.clone(),
metrics_pnls_tracked,
);
// dropping the handle would exit the server // dropping the handle would exit the server
let _http_server_handle = start_jsonrpc_server(config.jsonrpc_server.clone(), pnl_data, metrics_reqs, metrics_invalid_reqs)?; let _http_server_handle = start_jsonrpc_server(
config.jsonrpc_server.clone(),
pnl_data,
metrics_reqs,
metrics_invalid_reqs,
)?;
// start filling chain_data from the grpc plugin source // start filling chain_data from the grpc plugin source
let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?; let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?;