From b41c094aba0d36df189da7594da9191d05d6b00c Mon Sep 17 00:00:00 2001 From: Maximilian Schneider Date: Mon, 28 Mar 2022 19:58:14 +0200 Subject: [PATCH] merge fill service --- .gitignore | 2 + Cargo.lock | 193 +++++++++-- Cargo.toml | 3 +- lib/Cargo.toml | 9 +- lib/src/fill_event_filter.rs | 326 ++++++++++++++++++ lib/src/lib.rs | 1 + service-mango-fills/Cargo.toml | 24 ++ service-mango-fills/README.md | 13 + service-mango-fills/example-config.toml | 64 ++++ service-mango-fills/src/main.rs | 170 +++++++++ .../Cargo.toml | 2 +- .../example-config.toml | 0 .../src/main.rs | 0 13 files changed, 779 insertions(+), 28 deletions(-) create mode 100644 lib/src/fill_event_filter.rs create mode 100644 service-mango-fills/Cargo.toml create mode 100644 service-mango-fills/README.md create mode 100644 service-mango-fills/example-config.toml create mode 100644 service-mango-fills/src/main.rs rename {mango-pnl-service => service-mango-pnl}/Cargo.toml (96%) rename {mango-pnl-service => service-mango-pnl}/example-config.toml (100%) rename {mango-pnl-service => service-mango-pnl}/src/main.rs (100%) diff --git a/.gitignore b/.gitignore index ea8c4bf..585079d 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ /target +**/*.pem +**/config.toml diff --git a/Cargo.lock b/Cargo.lock index 273556b..4531a5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2543,9 +2543,37 @@ dependencies = [ "enumflags2", "fixed", "fixed-macro", - "mango-common", - "mango-logs", - "mango-macro", + "mango-common 3.0.0 (git+https://github.com/blockworks-foundation/mango-v3?branch=main)", + "mango-logs 0.1.0 (git+https://github.com/blockworks-foundation/mango-v3?branch=main)", + "mango-macro 3.0.0 (git+https://github.com/blockworks-foundation/mango-v3?branch=main)", + "num_enum", + "safe-transmute", + "serde", + "serum_dex", + "solana-program", + "spl-token", + "static_assertions", + "switchboard-program", + "switchboard-utils", + "thiserror", +] + +[[package]] +name = "mango" +version = "3.4.2" +source = "git+https://github.com/blockworks-foundation/mango-v3#527844e85be7970f15e56887cb1ebe3dbf39f3d4" +dependencies = [ + "anchor-lang", + "arrayref", + "bincode", + "bs58 0.4.0", + "bytemuck", + "enumflags2", + "fixed", + "fixed-macro", + "mango-common 3.0.0 (git+https://github.com/blockworks-foundation/mango-v3)", + "mango-logs 0.1.0 (git+https://github.com/blockworks-foundation/mango-v3)", + "mango-macro 3.0.0 (git+https://github.com/blockworks-foundation/mango-v3)", "num_enum", "safe-transmute", "serde", @@ -2567,6 +2595,15 @@ dependencies = [ "solana-program", ] +[[package]] +name = "mango-common" +version = "3.0.0" +source = "git+https://github.com/blockworks-foundation/mango-v3#527844e85be7970f15e56887cb1ebe3dbf39f3d4" +dependencies = [ + "bytemuck", + "solana-program", +] + [[package]] name = "mango-logs" version = "0.1.0" @@ -2576,13 +2613,22 @@ dependencies = [ "base64 0.13.0", ] +[[package]] +name = "mango-logs" +version = "0.1.0" +source = "git+https://github.com/blockworks-foundation/mango-v3#527844e85be7970f15e56887cb1ebe3dbf39f3d4" +dependencies = [ + "anchor-lang", + "base64 0.13.0", +] + [[package]] name = "mango-macro" version = "3.0.0" source = "git+https://github.com/blockworks-foundation/mango-v3?branch=main#527844e85be7970f15e56887cb1ebe3dbf39f3d4" dependencies = [ "bytemuck", - "mango-common", + "mango-common 3.0.0 (git+https://github.com/blockworks-foundation/mango-v3?branch=main)", "quote 1.0.16", "safe-transmute", "solana-program", @@ -2590,25 +2636,16 @@ dependencies = [ ] [[package]] -name = "mango-pnl-service" -version = "0.1.0" +name = "mango-macro" +version = "3.0.0" +source = "git+https://github.com/blockworks-foundation/mango-v3#527844e85be7970f15e56887cb1ebe3dbf39f3d4" dependencies = [ - "anyhow", - "async-trait", - "bs58 0.3.1", "bytemuck", - "fixed", - "jsonrpsee", - "log 0.4.16", - "mango", - "mango-common", - "serde", - "serde_derive", - "solana-geyser-connector-lib", - "solana-logger", - "solana-sdk", - "tokio", - "toml", + "mango-common 3.0.0 (git+https://github.com/blockworks-foundation/mango-v3)", + "quote 1.0.16", + "safe-transmute", + "solana-program", + "syn 1.0.89", ] [[package]] @@ -4291,6 +4328,50 @@ dependencies = [ "without-alloc", ] +[[package]] +name = "service-mango-fills" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-channel", + "async-trait", + "bs58 0.3.1", + "futures-channel", + "futures-util", + "log 0.3.9", + "serde", + "serde_derive", + "serde_json", + "solana-geyser-connector-lib", + "solana-logger", + "tokio", + "tokio-tungstenite", + "toml", + "ws", +] + +[[package]] +name = "service-mango-pnl" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bs58 0.3.1", + "bytemuck", + "fixed", + "jsonrpsee", + "log 0.4.16", + "mango 3.4.2 (git+https://github.com/blockworks-foundation/mango-v3?branch=main)", + "mango-common 3.0.0 (git+https://github.com/blockworks-foundation/mango-v3?branch=main)", + "serde", + "serde_derive", + "solana-geyser-connector-lib", + "solana-logger", + "solana-sdk", + "tokio", + "toml", +] + [[package]] name = "sha-1" version = "0.8.2" @@ -4316,6 +4397,17 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "sha-1" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest 0.10.3", +] + [[package]] name = "sha1" version = "0.6.1" @@ -4621,7 +4713,7 @@ dependencies = [ "solana-vote-program", "thiserror", "tokio", - "tungstenite", + "tungstenite 0.16.0", "url 2.2.2", ] @@ -4730,10 +4822,13 @@ name = "solana-geyser-connector-lib" version = "0.1.0" dependencies = [ "anyhow", + "arrayref", "async-channel", "async-stream 0.2.1", "async-trait", + "base64 0.9.3", "bs58 0.3.1", + "bytemuck", "bytes 1.1.0", "fixed", "futures 0.3.21", @@ -4742,6 +4837,7 @@ dependencies = [ "jsonrpc-core 18.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core-client", "log 0.4.16", + "mango 3.4.2 (git+https://github.com/blockworks-foundation/mango-v3)", "native-tls", "postgres-native-tls", "postgres-types", @@ -4771,8 +4867,8 @@ dependencies = [ "bs58 0.3.1", "fixed", "log 0.4.16", - "mango", - "mango-common", + "mango 3.4.2 (git+https://github.com/blockworks-foundation/mango-v3?branch=main)", + "mango-common 3.0.0 (git+https://github.com/blockworks-foundation/mango-v3?branch=main)", "postgres-types", "postgres_query", "solana-geyser-connector-lib", @@ -6122,6 +6218,18 @@ dependencies = [ "tokio-io", ] +[[package]] +name = "tokio-tungstenite" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae" +dependencies = [ + "futures-util", + "log 0.4.16", + "tokio", + "tungstenite 0.17.2", +] + [[package]] name = "tokio-util" version = "0.6.9" @@ -6319,6 +6427,25 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "tungstenite" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes 1.1.0", + "http", + "httparse", + "log 0.4.16", + "rand 0.8.5", + "sha-1 0.10.0", + "thiserror", + "url 2.2.2", + "utf-8", +] + [[package]] name = "typeable" version = "0.1.2" @@ -6797,6 +6924,24 @@ dependencies = [ "alloc-traits", ] +[[package]] +name = "ws" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25fe90c75f236a0a00247d5900226aea4f2d7b05ccc34da9e7a8880ff59b5848" +dependencies = [ + "byteorder", + "bytes 0.4.12", + "httparse", + "log 0.4.16", + "mio 0.6.23", + "mio-extras", + "rand 0.7.3", + "sha-1 0.8.2", + "slab", + "url 2.2.2", +] + [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 0f318fa..7404794 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,8 @@ members = [ "lib", "connector-raw", "connector-mango", - "mango-pnl-service", + "service-mango-fills", + "service-mango-pnl", ] diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 9821c7e..0d9c923 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -16,6 +16,11 @@ solana-client = "=1.9.13" solana-account-decoder = "=1.9.13" solana-sdk = "=1.9.13" +mango = { git = "https://github.com/blockworks-foundation/mango-v3" } +arrayref = "*" +bytemuck = "*" +fixed = { version = "*", features = ["serde"] } + tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" tokio-postgres = "0.7" @@ -33,11 +38,11 @@ serde_json = "1.0.68" tonic = { version = "0.6", features = ["tls"] } prost = "0.9" -bs58 = "0.3.1" +bs58 = "*" +base64 = "*" log = "0.4" rand = "0.7" anyhow = "1.0" -fixed = { version = "1.9.0", features = ["serde"] } bytes = "1.0" futures = "0.3.17" diff --git a/lib/src/fill_event_filter.rs b/lib/src/fill_event_filter.rs new file mode 100644 index 0000000..6fd5b3c --- /dev/null +++ b/lib/src/fill_event_filter.rs @@ -0,0 +1,326 @@ +use crate::{ + chain_data::{AccountData, ChainData, SlotData}, + AccountWrite, SlotUpdate, +}; +use log::*; +use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; +use solana_sdk::{ + account::{ReadableAccount, WritableAccount}, + clock::Epoch, + pubkey::Pubkey, +}; +use std::{ + cmp::max, + collections::{HashMap, HashSet}, + mem::size_of, + str::FromStr, +}; + +use arrayref::array_ref; +use mango::queue::{AnyEvent, EventQueueHeader, EventType, FillEvent}; + +#[derive(Clone, Debug, Deserialize)] +pub struct MarketConfig { + pub name: String, + pub event_queue: String, +} + +#[derive(Clone, Copy, Debug, Serialize)] +pub enum FillUpdateStatus { + New, + Revoke, +} + +#[derive(Clone, Debug)] + +pub struct FillUpdate { + pub event: FillEvent, + pub status: FillUpdateStatus, + pub market: String, + pub queue: String, +} + +impl Serialize for FillUpdate { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let event = base64::encode_config(bytemuck::bytes_of(&self.event), base64::STANDARD); + let mut state = serializer.serialize_struct("FillUpdate", 4)?; + state.serialize_field("market", &self.market)?; + state.serialize_field("queue", &self.queue)?; + state.serialize_field("status", &self.status)?; + state.serialize_field("event", &event)?; + + state.end() + } +} + +#[derive(Clone, Debug)] +pub struct FillCheckpoint { + pub market: String, + pub queue: String, + pub events: Vec, +} + +impl Serialize for FillCheckpoint { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let events: Vec = self + .events + .iter() + .map(|e| base64::encode_config(bytemuck::bytes_of(e), base64::STANDARD)) + .collect(); + let mut state = serializer.serialize_struct("FillUpdate", 3)?; + state.serialize_field("market", &self.market)?; + state.serialize_field("queue", &self.queue)?; + state.serialize_field("events", &events)?; + state.end() + } +} + +pub enum FillEventFilterMessage { + Update(FillUpdate), + Checkpoint(FillCheckpoint), +} + +// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue +const EVENT_SIZE: usize = 200; //size_of::(); +const QUEUE_LEN: usize = 256; +type EventQueueEvents = [AnyEvent; QUEUE_LEN]; + +fn publish_changes( + mkt: &MarketConfig, + header: &EventQueueHeader, + events: &EventQueueEvents, + old_seq_num: usize, + old_events: &EventQueueEvents, + fill_update_sender: &async_channel::Sender, +) { + // seq_num = N means that events (N-QUEUE_LEN) until N-1 are available + let start_seq_num = max(old_seq_num, header.seq_num) - QUEUE_LEN; + let mut checkpoint = Vec::new(); + + for seq_num in start_seq_num..header.seq_num { + let idx = seq_num % QUEUE_LEN; + + // there are three possible cases: + // 1) the event is past the old seq num, hence guaranteed new event + // 2) the event is not matching the old event queue + // 3) all other events are matching the old event queue + // the order of these checks is important so they are exhaustive + if seq_num >= old_seq_num { + info!( + "found new event {} idx {} type {}", + mkt.name, idx, events[idx].event_type as u32 + ); + + // new fills are published and recorded in checkpoint + if events[idx].event_type == EventType::Fill as u8 { + let fill: FillEvent = bytemuck::cast(events[idx]); + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + event: fill, + status: FillUpdateStatus::New, + market: mkt.name.clone(), + queue: mkt.event_queue.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(fill); + } + } else if old_events[idx].event_type != events[idx].event_type + || old_events[idx].padding != events[idx].padding + { + info!( + "found changed event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt.name, idx, seq_num, header.seq_num, old_seq_num + ); + + // first revoke old event if a fill + if old_events[idx].event_type == EventType::Fill as u8 { + let fill: FillEvent = bytemuck::cast(old_events[idx]); + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + event: fill, + status: FillUpdateStatus::Revoke, + market: mkt.name.clone(), + queue: mkt.event_queue.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + + // then publish new if its a fill and record in checkpoint + if events[idx].event_type == EventType::Fill as u8 { + let fill: FillEvent = bytemuck::cast(events[idx]); + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + event: fill, + status: FillUpdateStatus::New, + market: mkt.name.clone(), + queue: mkt.event_queue.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + checkpoint.push(fill); + } + } else { + // every already published event is recorded in checkpoint if a fill + if events[idx].event_type == EventType::Fill as u8 { + let fill: FillEvent = bytemuck::cast(events[idx]); + checkpoint.push(fill); + } + } + } + + // in case queue size shrunk due to a fork we need revoke all previous fills + for seq_num in header.seq_num..old_seq_num { + let idx = seq_num % QUEUE_LEN; + info!( + "found dropped event {} idx {} seq_num {} header seq num {} old seq num {}", + mkt.name, idx, seq_num, header.seq_num, old_seq_num + ); + + if old_events[idx].event_type == EventType::Fill as u8 { + let fill: FillEvent = bytemuck::cast(old_events[idx]); + fill_update_sender + .try_send(FillEventFilterMessage::Update(FillUpdate { + event: fill, + status: FillUpdateStatus::Revoke, + market: mkt.name.clone(), + queue: mkt.event_queue.clone(), + })) + .unwrap(); // TODO: use anyhow to bubble up error + } + } + + fill_update_sender + .try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint { + events: checkpoint, + market: mkt.name.clone(), + queue: mkt.event_queue.clone(), + })) + .unwrap() +} + +pub async fn init( + markets: Vec, +) -> anyhow::Result<( + async_channel::Sender, + async_channel::Sender, + async_channel::Receiver, +)> { + // The actual message may want to also contain a retry count, if it self-reinserts on failure? + let (account_write_queue_sender, account_write_queue_receiver) = + async_channel::unbounded::(); + + // Slot updates flowing from the outside into the single processing thread. From + // there they'll flow into the postgres sending thread. + let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); + + // Fill updates can be consumed by client connections, they contain all fills for all markets + let (fill_update_sender, fill_update_receiver) = + async_channel::unbounded::(); + + let account_write_queue_receiver_c = account_write_queue_receiver.clone(); + + let mut chain_cache = ChainData::new(); + let mut events_cache: HashMap = HashMap::new(); + let mut seq_num_cache = HashMap::new(); + let mut last_ev_q_versions = HashMap::::new(); + + let relevant_pubkeys = markets + .iter() + .map(|m| Pubkey::from_str(&m.event_queue).unwrap()) + .collect::>(); + + // update handling thread, reads both sloths and account updates + tokio::spawn(async move { + loop { + tokio::select! { + Ok(account_write) = account_write_queue_receiver_c.recv() => { + if !relevant_pubkeys.contains(&account_write.pubkey) { + continue; + } + + chain_cache.update_account( + account_write.pubkey, + AccountData { + slot: account_write.slot, + write_version: account_write.write_version, + account: WritableAccount::create( + account_write.lamports, + account_write.data.clone(), + account_write.owner, + account_write.executable, + account_write.rent_epoch as Epoch, + ), + }, + ); + } + Ok(slot_update) = slot_queue_receiver.recv() => { + chain_cache.update_slot(SlotData { + slot: slot_update.slot, + parent: slot_update.parent, + status: slot_update.status, + chain: 0, + }); + + } + } + + for mkt in markets.iter() { + let last_ev_q_version = last_ev_q_versions.get(&mkt.event_queue); + let mkt_pk = mkt.event_queue.parse::().unwrap(); + + match chain_cache.account(&mkt_pk) { + Ok(account_info) => { + // only process if the account state changed + let ev_q_version = (account_info.slot, account_info.write_version); + trace!("evq {} write_version {:?}", mkt.name, ev_q_version); + if ev_q_version == *last_ev_q_version.unwrap_or(&(0, 0)) { + continue; + } + last_ev_q_versions.insert(mkt.event_queue.clone(), ev_q_version); + + let account = &account_info.account; + + const HEADER_SIZE: usize = size_of::(); + let header_data = array_ref![account.data(), 0, HEADER_SIZE]; + let header: &EventQueueHeader = bytemuck::from_bytes(header_data); + trace!("evq {} seq_num {}", mkt.name, header.seq_num); + + const QUEUE_SIZE: usize = EVENT_SIZE * QUEUE_LEN; + let events_data = array_ref![account.data(), HEADER_SIZE, QUEUE_SIZE]; + let events: &EventQueueEvents = bytemuck::from_bytes(events_data); + + match seq_num_cache.get(&mkt.event_queue) { + Some(old_seq_num) => match events_cache.get(&mkt.event_queue) { + Some(old_events) => publish_changes( + mkt, + header, + events, + *old_seq_num, + old_events, + &fill_update_sender, + ), + _ => info!("events_cache could not find {}", mkt.name), + }, + _ => info!("seq_num_cache could not find {}", mkt.name), + } + + seq_num_cache.insert(mkt.event_queue.clone(), header.seq_num.clone()); + events_cache.insert(mkt.event_queue.clone(), events.clone()); + } + Err(_) => info!("chain_cache could not find {}", mkt.name), + } + } + } + }); + + Ok(( + account_write_queue_sender, + slot_queue_sender, + fill_update_receiver, + )) +} diff --git a/lib/src/lib.rs b/lib/src/lib.rs index ac52713..a643de4 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -1,4 +1,5 @@ pub mod chain_data; +pub mod fill_event_filter; pub mod grpc_plugin_source; pub mod memory_target; pub mod metrics; diff --git a/service-mango-fills/Cargo.toml b/service-mango-fills/Cargo.toml new file mode 100644 index 0000000..118287a --- /dev/null +++ b/service-mango-fills/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "service-mango-fills" +version = "0.1.0" +authors = ["Christian Kamm ", "Maximilian Schneider "] +edition = "2018" + +[dependencies] +solana-geyser-connector-lib = { path = "../lib" } +solana-logger = "*" +bs58 = "*" +log = "*" +anyhow = "*" +toml = "*" + +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +futures-channel = "0.3" +futures-util = "0.3" +ws = "^0.9.2" +async-channel = "1.6" +async-trait = "0.1" +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = "0.17" diff --git a/service-mango-fills/README.md b/service-mango-fills/README.md new file mode 100644 index 0000000..32d9d47 --- /dev/null +++ b/service-mango-fills/README.md @@ -0,0 +1,13 @@ +connector-fills + +This module parses event queues and exposes individual fills on a websocket. + +TODO: + +- [] early filter out all account writes we dont care about +- [] startup logic, dont accept websockets before first snapshot +- [] failover logic, kill all websockets when we receive a later snapshot, more + frequent when running on home connections +- [] track websocket connect / disconnect +- [] track latency accountwrite -> websocket +- [] track queue length diff --git a/service-mango-fills/example-config.toml b/service-mango-fills/example-config.toml new file mode 100644 index 0000000..45fafd7 --- /dev/null +++ b/service-mango-fills/example-config.toml @@ -0,0 +1,64 @@ +bind_ws_addr = "127.0.0.1:8080" + +[source] +dedup_queue_size = 50000 +rpc_ws_url = "" + +[[source.grpc_sources]] +name = "server" +connection_string = "http://[::1]:10000" +retry_connection_sleep_secs = 30 + +#[source.grpc_sources.tls] +#ca_cert_path = "ca.pem" +#client_cert_path = "client.pem" +#client_key_path = "client.pem" +#domain_name = "example.com" + +[source.snapshot] +rpc_http_url = "" +program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68" + +[[markets]] +name = "BTC-PERP" +event_queue = "7t5Me8RieYKsFpfLEV8jnpqcqswNpyWD95ZqgUXuLV8Z" + +[[markets]] +name = "ETH-PERP" +event_queue = "9vDfKNPJkCvQv9bzR4JNTGciQC2RVHPVNMMHiVDgT1mw" + +[[markets]] +name = "SOL-PERP" +event_queue = "31cKs646dt1YkA3zPyxZ7rUAkxTBz279w4XEobFXcAKP" + +[[markets]] +name = "MNGO-PERP" +event_queue = "7orixrhZpjvofZGWZyyLFxSEt2tfFiost5kHEzd7jdet" + +[[markets]] +name = "SRM-PERP" +event_queue = "BXSPmdHWP6fMqsCsT6kG8UN9uugAJxdDkQWy87njUQnL" + +[[markets]] +name = "RAY-PERP" +event_queue = "Css2MQhEvXMTKjp9REVZR9ZyUAYAZAPrnDvRoPxrQkeN" + +[[markets]] +name = "FTT-PERP" +event_queue = "5pHAhyEphQRVvLqvYF7dziofR52yZWuq8DThQFJvJ7r5" + +[[markets]] +name = "ADA-PERP" +event_queue = "G6Dsw9KnP4G38hePtedTH6gDfDQmPJGJw8zipBJvKc12" + +[[markets]] +name = "BNB-PERP" +event_queue = "GmX4qXMpXvs1DuUXNB4eqL1rfF8LeYEjkKgpFeYsm55n" + +[[markets]] +name = "AVAX-PERP" +event_queue = "5Grgo9kLu692SUcJ6S7jtbi1WkdwiyRWgThAfN1PcvbL" + +[[markets]] +name = "LUNA-PERP" +event_queue = "HDJ43o9Dxxu6yWRWPEce44gtCHauRGLXJwwtvD7GwEBx" \ No newline at end of file diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs new file mode 100644 index 0000000..c80d993 --- /dev/null +++ b/service-mango-fills/src/main.rs @@ -0,0 +1,170 @@ +use futures_channel::mpsc::{unbounded, UnboundedSender}; +use futures_util::{pin_mut, SinkExt, StreamExt}; +use log::*; +use std::{collections::HashMap, fs::File, io::Read, net::SocketAddr, sync::Arc, sync::Mutex}; +use tokio::{ + net::{TcpListener, TcpStream}, + pin, +}; +use tokio_tungstenite::tungstenite::protocol::Message; + +use serde::Deserialize; +use solana_geyser_connector_lib::{ + fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage, MarketConfig}, + grpc_plugin_source, metrics, websocket_source, SourceConfig, +}; + +type CheckpointMap = Arc>>; +type PeerMap = Arc>>>; + +async fn handle_connection( + checkpoint_map: CheckpointMap, + peer_map: PeerMap, + raw_stream: TcpStream, + addr: SocketAddr, +) { + info!("ws connected: {}", addr); + let ws_stream = tokio_tungstenite::accept_async(raw_stream) + .await + .expect("Error during the ws handshake occurred"); + let (mut ws_tx, _ws_rx) = ws_stream.split(); + + // 1: publish channel in peer map + let (chan_tx, chan_rx) = unbounded(); + { + peer_map.lock().unwrap().insert(addr, chan_tx); + info!("ws published: {}", addr); + } + + // 2: send initial checkpoint + { + let checkpoint_map_copy = checkpoint_map.lock().unwrap().clone(); + for (_, ckpt) in checkpoint_map_copy.iter() { + ws_tx + .feed(Message::Text(serde_json::to_string(ckpt).unwrap())) + .await + .unwrap(); + } + } + let result_ckpt = ws_tx.flush().await; + info!("ws ckpt sent: {} err: {:?}", addr, result_ckpt); + + // 3: forward all events from channel to peer socket + let forward_updates = chan_rx.map(Ok).forward(ws_tx); + pin_mut!(forward_updates); + let result_forward = forward_updates.await; + + info!("ws disconnected: {} err: {:?}", &addr, result_forward); + peer_map.lock().unwrap().remove(&addr); + result_ckpt.unwrap(); + result_forward.unwrap(); +} + +#[derive(Clone, Debug, Deserialize)] +pub struct Config { + pub source: SourceConfig, + pub markets: Vec, + pub bind_ws_addr: String, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args: Vec = std::env::args().collect(); + if args.len() < 2 { + error!("requires a config file argument"); + return Ok(()); + } + + let config: Config = { + let mut file = File::open(&args[1])?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + toml::from_str(&contents).unwrap() + }; + + solana_logger::setup_with_default("info"); + + let metrics_tx = metrics::start(); + + let (account_write_queue_sender, slot_queue_sender, fill_receiver) = + fill_event_filter::init(config.markets.clone()).await?; + + let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new())); + let peers = PeerMap::new(Mutex::new(HashMap::new())); + + let checkpoints_ref_thread = checkpoints.clone(); + let peers_ref_thread = peers.clone(); + + // filleventfilter websocket sink + tokio::spawn(async move { + pin!(fill_receiver); + loop { + let message = fill_receiver.recv().await.unwrap(); + match message { + FillEventFilterMessage::Update(update) => { + info!("ws update {} {:?} fill", update.market, update.status); + + let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); + + for (k, v) in peer_copy.iter_mut() { + debug!(" > {}", k); + + let json = serde_json::to_string(&update); + + v.send(Message::Text(json.unwrap())).await.unwrap() + } + } + FillEventFilterMessage::Checkpoint(checkpoint) => { + checkpoints_ref_thread + .lock() + .unwrap() + .insert(checkpoint.queue.clone(), checkpoint); + } + } + } + }); + + info!("ws listen: {}", config.bind_ws_addr); + let try_socket = TcpListener::bind(&config.bind_ws_addr).await; + let listener = try_socket.expect("Failed to bind"); + tokio::spawn(async move { + // Let's spawn the handling of each connection in a separate task. + while let Ok((stream, addr)) = listener.accept().await { + tokio::spawn(handle_connection( + checkpoints.clone(), + peers.clone(), + stream, + addr, + )); + } + }); + + info!( + "rpc connect: {}", + config + .source + .grpc_sources + .iter() + .map(|c| c.connection_string.clone()) + .collect::() + ); + let use_geyser = true; + if use_geyser { + grpc_plugin_source::process_events( + &config.source, + account_write_queue_sender, + slot_queue_sender, + metrics_tx, + ) + .await; + } else { + websocket_source::process_events( + &config.source, + account_write_queue_sender, + slot_queue_sender, + ) + .await; + } + + Ok(()) +} diff --git a/mango-pnl-service/Cargo.toml b/service-mango-pnl/Cargo.toml similarity index 96% rename from mango-pnl-service/Cargo.toml rename to service-mango-pnl/Cargo.toml index 5cbf192..4fa6dd4 100644 --- a/mango-pnl-service/Cargo.toml +++ b/service-mango-pnl/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "mango-pnl-service" +name = "service-mango-pnl" version = "0.1.0" authors = ["Christian Kamm "] edition = "2021" diff --git a/mango-pnl-service/example-config.toml b/service-mango-pnl/example-config.toml similarity index 100% rename from mango-pnl-service/example-config.toml rename to service-mango-pnl/example-config.toml diff --git a/mango-pnl-service/src/main.rs b/service-mango-pnl/src/main.rs similarity index 100% rename from mango-pnl-service/src/main.rs rename to service-mango-pnl/src/main.rs