Auto discover markets in fills feed

This commit is contained in:
Riordan Panayides 2022-12-16 11:03:21 +00:00
parent 49bcdeee77
commit df562a3e42
4 changed files with 120 additions and 104 deletions

View File

@ -4,7 +4,7 @@ use crate::{
AccountWrite, SlotUpdate,
};
use log::*;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
use serde::{ser::SerializeStruct, Serialize, Serializer};
use solana_sdk::{
account::{ReadableAccount, WritableAccount},
clock::Epoch,
@ -14,7 +14,6 @@ use std::{
borrow::BorrowMut,
cmp::max,
collections::{HashMap, HashSet},
str::FromStr,
};
use crate::metrics::MetricU64;
@ -23,12 +22,6 @@ use mango_v4::state::{
AnyEvent, EventQueue, EventQueueHeader, EventType, FillEvent, MAX_NUM_EVENTS,
};
#[derive(Clone, Debug, Deserialize)]
pub struct MarketConfig {
pub name: String,
pub event_queue: String,
}
#[derive(Clone, Copy, Debug, Serialize)]
pub enum FillUpdateStatus {
New,
@ -51,7 +44,7 @@ impl Serialize for FillUpdate {
where
S: Serializer,
{
let event = base64::encode_config(bytemuck::bytes_of(&self.event), base64::STANDARD);
let event = base64::encode(bytemuck::bytes_of(&self.event));
let mut state = serializer.serialize_struct("FillUpdate", 4)?;
state.serialize_field("event", &event)?;
state.serialize_field("market", &self.market)?;
@ -81,7 +74,7 @@ impl Serialize for FillCheckpoint {
let events: Vec<String> = self
.events
.iter()
.map(|e| base64::encode_config(bytemuck::bytes_of(e), base64::STANDARD))
.map(|e| base64::encode(bytemuck::bytes_of(e)))
.collect();
let mut state = serializer.serialize_struct("FillUpdate", 3)?;
state.serialize_field("events", &events)?;
@ -105,7 +98,7 @@ type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
fn publish_changes(
slot: u64,
write_version: u64,
mkt: &MarketConfig,
mkt: &(Pubkey, Pubkey),
header: &EventQueueHeader,
events: &EventQueueEvents,
old_seq_num: u64,
@ -120,6 +113,8 @@ fn publish_changes(
.checked_sub(MAX_NUM_EVENTS as u64)
.unwrap_or(0);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.0.to_string();
for seq_num in start_seq_num..header.seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
@ -131,7 +126,7 @@ fn publish_changes(
if seq_num >= old_seq_num {
debug!(
"found new event {} idx {} type {}",
mkt.name, idx, events[idx].event_type as u32
mkt_pk_string, idx, events[idx].event_type as u32
);
metric_events_new.increment();
@ -145,8 +140,8 @@ fn publish_changes(
write_version,
event: fill,
status: FillUpdateStatus::New,
market: mkt.name.clone(),
queue: mkt.event_queue.clone(),
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -156,7 +151,7 @@ fn publish_changes(
{
debug!(
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt.name, idx, seq_num, header.seq_num, old_seq_num
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num
);
metric_events_change.increment();
@ -170,8 +165,8 @@ fn publish_changes(
write_version,
event: fill,
status: FillUpdateStatus::Revoke,
market: mkt.name.clone(),
queue: mkt.event_queue.clone(),
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -185,8 +180,8 @@ fn publish_changes(
write_version,
event: fill,
status: FillUpdateStatus::New,
market: mkt.name.clone(),
queue: mkt.event_queue.clone(),
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -206,7 +201,7 @@ fn publish_changes(
debug!(
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt.name, idx, seq_num, header.seq_num, old_seq_num
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num
);
metric_events_drop.increment();
@ -219,8 +214,8 @@ fn publish_changes(
event: fill,
write_version,
status: FillUpdateStatus::Revoke,
market: mkt.name.clone(),
queue: mkt.event_queue.clone(),
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -231,14 +226,15 @@ fn publish_changes(
slot,
write_version,
events: checkpoint,
market: mkt.name.clone(),
queue: mkt.event_queue.clone(),
market: mkt_pk_string,
queue: evq_pk_string,
}))
.unwrap()
}
pub async fn init(
markets: Vec<MarketConfig>,
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
@ -271,11 +267,12 @@ pub async fn init(
let mut chain_cache = ChainData::new();
let mut events_cache: HashMap<String, EventQueueEvents> = HashMap::new();
let mut seq_num_cache = HashMap::new();
let mut last_ev_q_versions = HashMap::<String, (u64, u64)>::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let relevant_pubkeys = markets
let relevant_pubkeys = [perp_queue_pks.clone(), serum_queue_pks.clone()]
.concat()
.iter()
.map(|m| Pubkey::from_str(&m.event_queue).unwrap())
.map(|m| m.1)
.collect::<HashSet<Pubkey>>();
// update handling thread, reads both sloths and account updates
@ -313,28 +310,29 @@ pub async fn init(
}
}
for mkt in markets.iter() {
let last_ev_q_version = last_ev_q_versions.get(&mkt.event_queue).unwrap_or(&(0, 0));
let mkt_pk = mkt.event_queue.parse::<Pubkey>().unwrap();
for mkt in perp_queue_pks.iter() {
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
let mkt_pk = mkt.1;
match chain_cache.account(&mkt_pk) {
Ok(account_info) => {
// only process if the account state changed
let ev_q_version = (account_info.slot, account_info.write_version);
trace!("evq {} write_version {:?}", mkt.name, ev_q_version);
if ev_q_version == *last_ev_q_version {
let evq_version = (account_info.slot, account_info.write_version);
let evq_pk_string = mkt.1.to_string();
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
if evq_version == *last_evq_version {
continue;
}
last_ev_q_versions.insert(mkt.event_queue.clone(), ev_q_version);
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
let account = &account_info.account;
let event_queue =
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
trace!("evq {} seq_num {}", mkt.name, event_queue.header.seq_num);
trace!("evq {} seq_num {}", evq_pk_string, event_queue.header.seq_num);
match seq_num_cache.get(&mkt.event_queue) {
Some(old_seq_num) => match events_cache.get(&mkt.event_queue) {
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match events_cache.get(&evq_pk_string) {
Some(old_events) => publish_changes(
account_info.slot,
account_info.write_version,
@ -348,16 +346,16 @@ pub async fn init(
&mut metric_events_change,
&mut metrics_events_drop,
),
_ => info!("events_cache could not find {}", mkt.name),
_ => info!("events_cache could not find {}", evq_pk_string),
},
_ => info!("seq_num_cache could not find {}", mkt.name),
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache
.insert(mkt.event_queue.clone(), event_queue.header.seq_num.clone());
events_cache.insert(mkt.event_queue.clone(), event_queue.buf.clone());
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
events_cache.insert(evq_pk_string.clone(), event_queue.buf.clone());
}
Err(_) => info!("chain_cache could not find {}", mkt.name),
Err(_) => info!("chain_cache could not find {}", mkt.1),
}
}
}

View File

@ -22,3 +22,10 @@ async-channel = "1.6"
async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.17"
bytemuck = "1.7.2"
mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "ssh://git@github.com/openbook-dex/program", branch = "master" }
anchor-lang = "0.25.0"
anchor-client = "0.25.0"

View File

@ -1,4 +1,6 @@
bind_ws_addr = "127.0.0.1:8080"
bind_ws_addr = "0.0.0.0:8080"
rpc_http_url = "http://mango.rpcpool.com/<token>"
mango_group = "78b8f4cGCwmZ9ysPFMWLaLTkkaYnUjwMJYStWe5RTSSX"
[metrics]
output_stdout = true
@ -6,63 +8,13 @@ output_http = true
[source]
dedup_queue_size = 50000
rpc_ws_url = ""
rpc_ws_url = "wss://mango.rpcpool.com/<token>"
[[source.grpc_sources]]
name = "server"
connection_string = "http://[::1]:10000"
name = "accountsdb-client"
connection_string = "http://tyo64.rpcpool.com/"
retry_connection_sleep_secs = 30
#[source.grpc_sources.tls]
#ca_cert_path = "ca.pem"
#client_cert_path = "client.pem"
#client_key_path = "client.pem"
#domain_name = "example.com"
[source.snapshot]
rpc_http_url = ""
program_id = "mv3ekLzLbnVPNxjSKvqBpU3ZeZXPQdEC3bp5MDEBG68"
[[markets]]
name = "BTC-PERP"
event_queue = "7t5Me8RieYKsFpfLEV8jnpqcqswNpyWD95ZqgUXuLV8Z"
[[markets]]
name = "ETH-PERP"
event_queue = "9vDfKNPJkCvQv9bzR4JNTGciQC2RVHPVNMMHiVDgT1mw"
[[markets]]
name = "SOL-PERP"
event_queue = "31cKs646dt1YkA3zPyxZ7rUAkxTBz279w4XEobFXcAKP"
[[markets]]
name = "MNGO-PERP"
event_queue = "7orixrhZpjvofZGWZyyLFxSEt2tfFiost5kHEzd7jdet"
[[markets]]
name = "SRM-PERP"
event_queue = "BXSPmdHWP6fMqsCsT6kG8UN9uugAJxdDkQWy87njUQnL"
[[markets]]
name = "RAY-PERP"
event_queue = "Css2MQhEvXMTKjp9REVZR9ZyUAYAZAPrnDvRoPxrQkeN"
[[markets]]
name = "FTT-PERP"
event_queue = "5pHAhyEphQRVvLqvYF7dziofR52yZWuq8DThQFJvJ7r5"
[[markets]]
name = "ADA-PERP"
event_queue = "G6Dsw9KnP4G38hePtedTH6gDfDQmPJGJw8zipBJvKc12"
[[markets]]
name = "BNB-PERP"
event_queue = "GmX4qXMpXvs1DuUXNB4eqL1rfF8LeYEjkKgpFeYsm55n"
[[markets]]
name = "AVAX-PERP"
event_queue = "5Grgo9kLu692SUcJ6S7jtbi1WkdwiyRWgThAfN1PcvbL"
[[markets]]
name = "LUNA-PERP"
event_queue = "HDJ43o9Dxxu6yWRWPEce44gtCHauRGLXJwwtvD7GwEBx"
rpc_http_url = "http://mango.rpcpool.com/<token>"
program_id = "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg"

View File

@ -1,7 +1,11 @@
use anchor_client::{Cluster, solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair, account::Account}};
use anchor_lang::prelude::Pubkey;
use bytemuck::cast_slice;
use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{pin_mut, SinkExt, StreamExt};
use log::*;
use std::{collections::HashMap, fs::File, io::Read, net::SocketAddr, sync::Arc, sync::Mutex};
use std::{collections::HashMap, fs::File, io::Read, net::SocketAddr, sync::Arc, sync::Mutex, time::Duration, convert::identity, str::FromStr};
use tokio::{
net::{TcpListener, TcpStream},
pin,
@ -9,9 +13,9 @@ use tokio::{
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize;
use solana_geyser_connector_lib::metrics::{MetricType, MetricU64};
use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig};
use solana_geyser_connector_lib::{
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage, MarketConfig},
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
};
@ -80,8 +84,9 @@ async fn handle_connection(
pub struct Config {
pub source: SourceConfig,
pub metrics: MetricsConfig,
pub markets: Vec<MarketConfig>,
pub bind_ws_addr: String,
pub rpc_http_url: String,
pub mango_group: String,
}
#[tokio::main]
@ -111,8 +116,58 @@ async fn main() -> anyhow::Result<()> {
let metrics_closed_connections =
metrics_tx.register_u64("fills_feed_closed_connections".into(), MetricType::Counter);
let rpc_url = config.rpc_http_url;
let ws_url = rpc_url.replace("https", "wss");
let rpc_timeout = Duration::from_secs(10);
let cluster = Cluster::Custom(rpc_url.clone(), ws_url.clone());
let client = Client::new(
cluster.clone(),
CommitmentConfig::processed(),
&Keypair::new(),
Some(rpc_timeout),
);
let group_context = Arc::new(MangoGroupContext::new_from_rpc(
Pubkey::from_str(&config.mango_group).unwrap(),
client.cluster.clone(),
client.commitment,
)?);
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
.perp_markets
.iter()
.map(|(_, context)| (context.address, context.market.event_queue))
.collect();
let serum_market_pks: Vec<Pubkey> = group_context
.serum3_markets
.iter()
.map(|(_, context)| context.market.serum_market_external)
.collect();
let serum_market_ais = client
.rpc()
.get_multiple_accounts(serum_market_pks.as_slice())?;
let serum_market_ais: Vec<&Account> = serum_market_ais
.iter()
.filter_map(|maybe_ai| match maybe_ai {
Some(ai) => Some(ai),
None => None,
})
.collect();
let serum_queue_pks: Vec<(Pubkey, Pubkey)> = serum_market_ais
.iter()
.enumerate()
.map(|pair| {
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
);
(serum_market_pks[pair.0], Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])))
})
.collect();
let (account_write_queue_sender, slot_queue_sender, fill_receiver) =
fill_event_filter::init(config.markets.clone(), metrics_tx.clone()).await?;
fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?;
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
let peers = PeerMap::new(Mutex::new(HashMap::new()));
@ -178,9 +233,13 @@ async fn main() -> anyhow::Result<()> {
.collect::<String>()
);
let use_geyser = true;
let filter_config = FilterConfig {
program_ids: vec!["abc123".into()]
};
if use_geyser {
grpc_plugin_source::process_events(
&config.source,
&filter_config,
account_write_queue_sender,
slot_queue_sender,
metrics_tx.clone(),