Compare commits

...

2 Commits

Author SHA1 Message Date
Riordan Panayides 7f0ddd3ac5 Fix serum fills 2022-12-24 14:43:43 +00:00
Riordan Panayides fad9f7bae3 Add Serum support for orderbook, fix pnl main 2022-12-24 12:49:17 +00:00
8 changed files with 665 additions and 145 deletions

30
Cargo.lock generated
View File

@ -5076,6 +5076,30 @@ dependencies = [
"without-alloc", "without-alloc",
] ]
[[package]]
name = "serum_dex"
version = "0.5.10"
source = "git+ssh://git@github.com/jup-ag/openbook-program?branch=feat/expose-things#d287391bd7ed8111078fd1afb8f7b6d55cdd1d1f"
dependencies = [
"anchor-lang",
"arrayref",
"bincode",
"bytemuck",
"byteorder",
"enumflags2",
"field-offset",
"itertools 0.9.0",
"num-traits",
"num_enum",
"safe-transmute",
"serde",
"solana-program",
"spl-token",
"static_assertions",
"thiserror",
"without-alloc",
]
[[package]] [[package]]
name = "serum_dex" name = "serum_dex"
version = "0.5.10" version = "0.5.10"
@ -5121,7 +5145,7 @@ dependencies = [
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"serum_dex 0.5.10", "serum_dex 0.5.10 (git+ssh://git@github.com/jup-ag/openbook-program?branch=feat/expose-things)",
"solana-geyser-connector-lib", "solana-geyser-connector-lib",
"solana-logger", "solana-logger",
"tokio", "tokio",
@ -5149,7 +5173,7 @@ dependencies = [
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"serum_dex 0.5.10", "serum_dex 0.5.10 (git+ssh://git@github.com/openbook-dex/program?branch=master)",
"solana-geyser-connector-lib", "solana-geyser-connector-lib",
"solana-logger", "solana-logger",
"tokio", "tokio",
@ -5728,7 +5752,7 @@ dependencies = [
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"serum_dex 0.5.10", "serum_dex 0.5.10 (git+ssh://git@github.com/jup-ag/openbook-program?branch=feat/expose-things)",
"solana-account-decoder", "solana-account-decoder",
"solana-client", "solana-client",
"solana-rpc", "solana-rpc",

View File

@ -58,7 +58,7 @@ warp = "0.3"
anchor-lang = "0.25.0" anchor-lang = "0.25.0"
serum_dex = { git = "ssh://git@github.com/openbook-dex/program", branch = "master" } serum_dex = { git = "ssh://git@github.com/jup-ag/openbook-program", branch = "feat/expose-things" }
[build-dependencies] [build-dependencies]
tonic-build = { version = "0.6", features = ["compression"] } tonic-build = { version = "0.6", features = ["compression"] }

View File

@ -3,8 +3,10 @@ use crate::{
metrics::{MetricType, Metrics}, metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate, AccountWrite, SlotUpdate,
}; };
use bytemuck::{Pod, Zeroable};
use log::*; use log::*;
use serde::{ser::SerializeStruct, Serialize, Serializer}; use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::state::EventView;
use solana_sdk::{ use solana_sdk::{
account::{ReadableAccount, WritableAccount}, account::{ReadableAccount, WritableAccount},
clock::Epoch, clock::Epoch,
@ -39,6 +41,27 @@ pub struct FillUpdate {
pub write_version: u64, pub write_version: u64,
} }
#[derive(Clone, Debug)]
pub struct SerumFillUpdate {
pub event: serum_dex::state::Event,
pub status: FillUpdateStatus,
pub market: String,
pub queue: String,
pub slot: u64,
pub write_version: u64,
}
#[derive(Copy, Clone, Debug)]
#[repr(packed)]
pub struct SerumEventQueueHeader {
_account_flags: u64, // Initialized, EventQueue
_head: u64,
count: u64,
seq_num: u64,
}
unsafe impl Zeroable for SerumEventQueueHeader {}
unsafe impl Pod for SerumEventQueueHeader {}
impl Serialize for FillUpdate { impl Serialize for FillUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where where
@ -57,6 +80,24 @@ impl Serialize for FillUpdate {
} }
} }
impl Serialize for SerumFillUpdate {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let event = base64::encode(bytemuck::bytes_of(&self.event));
let mut state = serializer.serialize_struct("SerumFillUpdate", 4)?;
state.serialize_field("event", &event)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("status", &self.status)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct FillCheckpoint { pub struct FillCheckpoint {
pub market: String, pub market: String,
@ -66,6 +107,15 @@ pub struct FillCheckpoint {
pub write_version: u64, pub write_version: u64,
} }
#[derive(Clone, Debug)]
pub struct SerumFillCheckpoint {
pub market: String,
pub queue: String,
pub events: Vec<serum_dex::state::Event>,
pub slot: u64,
pub write_version: u64,
}
impl Serialize for FillCheckpoint { impl Serialize for FillCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where where
@ -76,7 +126,28 @@ impl Serialize for FillCheckpoint {
.iter() .iter()
.map(|e| base64::encode(bytemuck::bytes_of(e))) .map(|e| base64::encode(bytemuck::bytes_of(e)))
.collect(); .collect();
let mut state = serializer.serialize_struct("FillUpdate", 3)?; let mut state = serializer.serialize_struct("FillCheckpoint", 3)?;
state.serialize_field("events", &events)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.end()
}
}
impl Serialize for SerumFillCheckpoint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let events: Vec<String> = self
.events
.iter()
.map(|e| base64::encode(bytemuck::bytes_of(e)))
.collect();
let mut state = serializer.serialize_struct("SerumFillCheckpoint", 3)?;
state.serialize_field("events", &events)?; state.serialize_field("events", &events)?;
state.serialize_field("market", &self.market)?; state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?; state.serialize_field("queue", &self.queue)?;
@ -89,13 +160,15 @@ impl Serialize for FillCheckpoint {
pub enum FillEventFilterMessage { pub enum FillEventFilterMessage {
Update(FillUpdate), Update(FillUpdate),
SerumUpdate(SerumFillUpdate),
Checkpoint(FillCheckpoint), Checkpoint(FillCheckpoint),
SerumCheckpoint(SerumFillCheckpoint),
} }
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue // couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize]; type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
fn publish_changes( fn publish_changes_perp(
slot: u64, slot: u64,
write_version: u64, write_version: u64,
mkt: &(Pubkey, Pubkey), mkt: &(Pubkey, Pubkey),
@ -232,6 +305,144 @@ fn publish_changes(
.unwrap() .unwrap()
} }
fn publish_changes_serum(
slot: u64,
write_version: u64,
mkt: &(Pubkey, Pubkey),
header: &SerumEventQueueHeader,
events: &[serum_dex::state::Event],
old_seq_num: u64,
old_events: &[serum_dex::state::Event],
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
metric_events_new: &mut MetricU64,
metric_events_change: &mut MetricU64,
metric_events_drop: &mut MetricU64,
) {
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
let start_seq_num = max(old_seq_num, header.seq_num)
.checked_sub(MAX_NUM_EVENTS as u64)
.unwrap_or(0);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.1.to_string();
let header_seq_num = header.seq_num;
info!("start seq {} header seq {}", start_seq_num, header_seq_num);
for seq_num in start_seq_num..header_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
let event_view = events[idx].as_view().unwrap();
let old_event_view = old_events[idx].as_view().unwrap();
match event_view {
EventView::Fill { .. } => {
// there are three possible cases:
// 1) the event is past the old seq num, hence guaranteed new event
// 2) the event is not matching the old event queue
// 3) all other events are matching the old event queue
// the order of these checks is important so they are exhaustive
if seq_num >= old_seq_num {
info!("found new serum fill {} idx {}", mkt_pk_string, idx,);
metric_events_new.increment();
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
slot,
write_version,
event: events[idx],
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(events[idx]);
continue;
}
match old_event_view {
EventView::Fill { .. } => {
info!("already got all fills???");
// every already published event is recorded in checkpoint
checkpoint.push(events[idx]);
}
EventView::Out { .. } => {
info!(
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
// first revoke old event
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
slot,
write_version,
event: old_events[idx],
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new if its a fill and record in checkpoint
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
slot,
write_version,
event: events[idx],
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(events[idx]);
}
}
}
_ => continue,
}
}
// in case queue size shrunk due to a fork we need revoke all previous fills
for seq_num in header_seq_num..old_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
let old_event_view = old_events[idx].as_view().unwrap();
debug!(
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_drop.increment();
match old_event_view {
EventView::Fill { .. } => {
fill_update_sender
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
slot,
event: old_events[idx],
write_version,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
EventView::Out { .. } => { continue }
}
}
fill_update_sender
.try_send(FillEventFilterMessage::SerumCheckpoint(
SerumFillCheckpoint {
slot,
write_version,
events: checkpoint,
market: mkt_pk_string,
queue: evq_pk_string,
},
))
.unwrap()
}
pub async fn init( pub async fn init(
perp_queue_pks: Vec<(Pubkey, Pubkey)>, perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>, serum_queue_pks: Vec<(Pubkey, Pubkey)>,
@ -265,12 +476,13 @@ pub async fn init(
let account_write_queue_receiver_c = account_write_queue_receiver.clone(); let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let mut chain_cache = ChainData::new(); let mut chain_cache = ChainData::new();
let mut events_cache: HashMap<String, EventQueueEvents> = HashMap::new(); let mut perp_events_cache: HashMap<String, EventQueueEvents> = HashMap::new();
let mut serum_events_cache: HashMap<String, Vec<serum_dex::state::Event>> = HashMap::new();
let mut seq_num_cache = HashMap::new(); let mut seq_num_cache = HashMap::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new(); let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let relevant_pubkeys = [perp_queue_pks.clone(), serum_queue_pks.clone()] let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
.concat() let relevant_pubkeys = all_queue_pks
.iter() .iter()
.map(|m| m.1) .map(|m| m.1)
.collect::<HashSet<Pubkey>>(); .collect::<HashSet<Pubkey>>();
@ -310,7 +522,7 @@ pub async fn init(
} }
} }
for mkt in perp_queue_pks.iter() { for mkt in all_queue_pks.iter() {
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0)); let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
let mkt_pk = mkt.1; let mkt_pk = mkt.1;
@ -326,34 +538,81 @@ pub async fn init(
last_evq_versions.insert(evq_pk_string.clone(), evq_version); last_evq_versions.insert(evq_pk_string.clone(), evq_version);
let account = &account_info.account; let account = &account_info.account;
let is_perp = mango_v4::check_id(account.owner());
if is_perp {
let event_queue =
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
trace!(
"evq {} seq_num {}",
evq_pk_string,
event_queue.header.seq_num
);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
Some(old_events) => publish_changes_perp(
account_info.slot,
account_info.write_version,
mkt,
&event_queue.header,
&event_queue.buf,
*old_seq_num,
old_events,
&fill_update_sender,
&mut metric_events_new,
&mut metric_events_change,
&mut metrics_events_drop,
),
_ => {
info!("perp_events_cache could not find {}", evq_pk_string)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
let event_queue = seq_num_cache
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap(); .insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
trace!("evq {} seq_num {}", evq_pk_string, event_queue.header.seq_num); perp_events_cache
.insert(evq_pk_string.clone(), event_queue.buf.clone());
} else {
let inner_data = &account.data()[5..&account.data().len() - 7];
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
let header: SerumEventQueueHeader =
*bytemuck::from_bytes(&inner_data[..header_span]);
let seq_num = header.seq_num;
let count = header.count;
let rest = &inner_data[header_span..];
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
let new_len = rest.len() - slop;
let events = &rest[..new_len];
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
match seq_num_cache.get(&evq_pk_string) { match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match events_cache.get(&evq_pk_string) { Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) {
Some(old_events) => publish_changes( Some(old_events) => publish_changes_serum(
account_info.slot, account_info.slot,
account_info.write_version, account_info.write_version,
mkt, mkt,
&event_queue.header, &header,
&event_queue.buf, &events,
*old_seq_num, *old_seq_num,
old_events, old_events,
&fill_update_sender, &fill_update_sender,
&mut metric_events_new, &mut metric_events_new,
&mut metric_events_change, &mut metric_events_change,
&mut metrics_events_drop, &mut metrics_events_drop,
), ),
_ => info!("events_cache could not find {}", evq_pk_string), _ => {
}, info!("serum_events_cache could not find {}", evq_pk_string)
_ => info!("seq_num_cache could not find {}", evq_pk_string), }
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
serum_events_cache
.insert(evq_pk_string.clone(), events.clone().to_vec());
} }
seq_num_cache
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
events_cache.insert(evq_pk_string.clone(), event_queue.buf.clone());
} }
Err(_) => info!("chain_cache could not find {}", mkt.1), Err(_) => info!("chain_cache could not find {}", mkt.1),
} }

View File

@ -3,8 +3,10 @@ use crate::{
metrics::{MetricType, Metrics}, metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate, AccountWrite, SlotUpdate,
}; };
use itertools::Itertools;
use log::*; use log::*;
use serde::{ser::SerializeStruct, Serialize, Serializer}; use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::critbit::Slab;
use solana_sdk::{ use solana_sdk::{
account::{ReadableAccount, WritableAccount}, account::{ReadableAccount, WritableAccount},
clock::Epoch, clock::Epoch,
@ -12,13 +14,13 @@ use solana_sdk::{
}; };
use std::{ use std::{
borrow::BorrowMut, borrow::BorrowMut,
collections::{HashMap, HashSet}, time::{UNIX_EPOCH, SystemTime}, collections::{HashMap, HashSet},
time::{SystemTime, UNIX_EPOCH}, mem::size_of,
}; };
use itertools::Itertools;
use crate::metrics::MetricU64; use crate::metrics::MetricU64;
use anchor_lang::AccountDeserialize; use anchor_lang::AccountDeserialize;
use mango_v4::state::{BookSide, OrderTreeType}; use mango_v4::{state::{BookSide, OrderTreeType}, serum3_cpi::OrderBookStateHeader};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum OrderbookSide { pub enum OrderbookSide {
@ -29,19 +31,19 @@ pub enum OrderbookSide {
impl Serialize for OrderbookSide { impl Serialize for OrderbookSide {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where where
S: Serializer S: Serializer,
{ {
match *self { match *self {
OrderbookSide::Bid => serializer.serialize_unit_variant("Side", 0, "bid"), OrderbookSide::Bid => serializer.serialize_unit_variant("Side", 0, "bid"),
OrderbookSide::Ask => serializer.serialize_unit_variant("Side", 1, "ask"), OrderbookSide::Ask => serializer.serialize_unit_variant("Side", 1, "ask"),
} }
} }
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct OrderbookLevel { pub struct OrderbookLevel {
pub price: i64, pub price: f64,
pub size: i64, pub size: f64,
} }
impl Serialize for OrderbookLevel { impl Serialize for OrderbookLevel {
@ -112,48 +114,34 @@ pub enum OrderbookFilterMessage {
Checkpoint(OrderbookCheckpoint), Checkpoint(OrderbookCheckpoint),
} }
#[derive(Clone, Debug)]
pub struct MarketConfig { pub struct MarketConfig {
pub name: String, pub name: String,
pub bids: Pubkey, pub bids: Pubkey,
pub asks: Pubkey, pub asks: Pubkey,
pub base_decimals: u8,
pub quote_decimals: u8,
}
pub fn native_to_ui(native: i64, decimals: u8) -> f64 {
native as f64 / (10u64.pow(decimals.into())) as f64
} }
fn publish_changes( fn publish_changes(
slot: u64, slot: u64,
write_version: u64, write_version: u64,
mkt: &(Pubkey, MarketConfig), mkt: &(Pubkey, MarketConfig),
bookside: &BookSide, side: OrderbookSide,
old_bookside: &BookSide, current_bookside: &Vec<OrderbookLevel>,
other_bookside: Option<&BookSide>, previous_bookside: &Vec<OrderbookLevel>,
maybe_other_bookside: Option<&Vec<OrderbookLevel>>,
orderbook_update_sender: &async_channel::Sender<OrderbookFilterMessage>, orderbook_update_sender: &async_channel::Sender<OrderbookFilterMessage>,
metric_updates: &mut MetricU64, metric_updates: &mut MetricU64,
) { ) {
let time_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); let mut update: Vec<OrderbookLevel> = vec![];
let oracle_price_lots = 0; // todo: does this matter? where to find it?
let side = match bookside.nodes.order_tree_type() {
OrderTreeType::Bids => OrderbookSide::Bid,
OrderTreeType::Asks => OrderbookSide::Ask,
};
let current_l2_snapshot: Vec<OrderbookLevel> = bookside
.iter_valid(time_now, oracle_price_lots)
.map(|item| (item.node.price_data() as i64, item.node.quantity))
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel { price, size: group.map(|(_, quantity)| quantity).fold(0, |acc, x| acc + x)})
.collect();
let previous_l2_snapshot: Vec<OrderbookLevel> = old_bookside
.iter_valid(time_now, oracle_price_lots)
.map(|item| (item.node.price_data() as i64, item.node.quantity))
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel { price, size: group.map(|(_, quantity)| quantity).fold(0, |acc, x| acc + x)})
.collect();
let mut update: Vec<OrderbookLevel> = vec!();
// push diff for levels that are no longer present // push diff for levels that are no longer present
for previous_order in previous_l2_snapshot.iter() { for previous_order in previous_bookside.iter() {
let peer = current_l2_snapshot let peer = current_bookside
.iter() .iter()
.find(|level| previous_order.price == level.price); .find(|level| previous_order.price == level.price);
@ -161,17 +149,17 @@ fn publish_changes(
None => { None => {
info!("level removed {}", previous_order.price); info!("level removed {}", previous_order.price);
update.push(OrderbookLevel { update.push(OrderbookLevel {
price: previous_order.price, price: previous_order.price,
size: 0, size: 0f64,
}); });
}, }
_ => continue _ => continue,
} }
} }
// push diff where there's a new level or size has changed // push diff where there's a new level or size has changed
for current_order in &current_l2_snapshot { for current_order in current_bookside {
let peer = previous_l2_snapshot let peer = previous_bookside
.iter() .iter()
.find(|item| item.price == current_order.price); .find(|item| item.price == current_order.price);
@ -180,9 +168,12 @@ fn publish_changes(
if previous_order.size == current_order.size { if previous_order.size == current_order.size {
continue; continue;
} }
debug!("size changed {} -> {}", previous_order.size, current_order.size); debug!(
"size changed {} -> {}",
previous_order.size, current_order.size
);
update.push(current_order.clone()); update.push(current_order.clone());
}, }
None => { None => {
debug!("new level {},{}", current_order.price, current_order.size); debug!("new level {},{}", current_order.price, current_order.size);
update.push(current_order.clone()) update.push(current_order.clone())
@ -190,29 +181,22 @@ fn publish_changes(
} }
} }
match other_bookside { match maybe_other_bookside {
Some(other_bookside) => { Some(other_bookside) => {
let other_l2_snapshot = other_bookside
.iter_valid(time_now, oracle_price_lots)
.map(|item| (item.node.price_data() as i64, item.node.quantity))
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel { price, size: group.map(|(_, quantity)| quantity).fold(0, |acc, x| acc + x)})
.collect();
let (bids, asks) = match side { let (bids, asks) = match side {
OrderbookSide::Bid => (current_l2_snapshot, other_l2_snapshot), OrderbookSide::Bid => (current_bookside, other_bookside),
OrderbookSide::Ask => (other_l2_snapshot, current_l2_snapshot) OrderbookSide::Ask => (other_bookside, current_bookside),
}; };
orderbook_update_sender orderbook_update_sender
.try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint { .try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint {
slot, slot,
write_version, write_version,
bids, bids: bids.clone(),
asks, asks: asks.clone(),
market: mkt.0.to_string(), market: mkt.0.to_string(),
})) }))
.unwrap() .unwrap()
}, }
None => info!("other bookside not in cache"), None => info!("other bookside not in cache"),
} }
@ -230,11 +214,100 @@ fn publish_changes(
})) }))
.unwrap(); // TODO: use anyhow to bubble up error .unwrap(); // TODO: use anyhow to bubble up error
metric_updates.increment(); metric_updates.increment();
}
fn publish_changes_serum(
slot: u64,
write_version: u64,
mkt: &(Pubkey, MarketConfig),
side: OrderbookSide,
current_bookside: &Vec<OrderbookLevel>,
previous_bookside: &Vec<OrderbookLevel>,
maybe_other_bookside: Option<&Vec<OrderbookLevel>>,
orderbook_update_sender: &async_channel::Sender<OrderbookFilterMessage>,
metric_updates: &mut MetricU64,
) {
let mut update: Vec<OrderbookLevel> = vec![];
// push diff for levels that are no longer present
for previous_order in previous_bookside.iter() {
let peer = current_bookside
.iter()
.find(|level| previous_order.price == level.price);
match peer {
None => {
info!("level removed {}", previous_order.price);
update.push(OrderbookLevel {
price: previous_order.price,
size: 0f64,
});
}
_ => continue,
}
}
// push diff where there's a new level or size has changed
for current_order in current_bookside {
let peer = previous_bookside
.iter()
.find(|item| item.price == current_order.price);
match peer {
Some(previous_order) => {
if previous_order.size == current_order.size {
continue;
}
debug!(
"size changed {} -> {}",
previous_order.size, current_order.size
);
update.push(current_order.clone());
}
None => {
debug!("new level {},{}", current_order.price, current_order.size);
update.push(current_order.clone())
}
}
}
match maybe_other_bookside {
Some(other_bookside) => {
let (bids, asks) = match side {
OrderbookSide::Bid => (current_bookside, other_bookside),
OrderbookSide::Ask => (other_bookside, current_bookside),
};
orderbook_update_sender
.try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint {
slot,
write_version,
bids: bids.clone(),
asks: asks.clone(),
market: mkt.0.to_string(),
}))
.unwrap()
}
None => info!("other bookside not in cache"),
}
info!("diff {} {:?}", mkt.1.name, update);
if update.len() > 0 {
orderbook_update_sender
.try_send(OrderbookFilterMessage::Update(OrderbookUpdate {
market: mkt.0.to_string(),
side: side.clone(),
update,
slot,
write_version,
}))
.unwrap(); // TODO: use anyhow to bubble up error
metric_updates.increment();
}
} }
pub async fn init( pub async fn init(
market_configs: Vec<(Pubkey, MarketConfig)>, market_configs: Vec<(Pubkey, MarketConfig)>,
serum_market_configs: Vec<(Pubkey, MarketConfig)>,
metrics_sender: Metrics, metrics_sender: Metrics,
) -> anyhow::Result<( ) -> anyhow::Result<(
async_channel::Sender<AccountWrite>, async_channel::Sender<AccountWrite>,
@ -261,12 +334,17 @@ pub async fn init(
let account_write_queue_receiver_c = account_write_queue_receiver.clone(); let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let mut chain_cache = ChainData::new(); let mut chain_cache = ChainData::new();
let mut bookside_cache: HashMap<String, BookSide> = HashMap::new(); let mut bookside_cache: HashMap<String, Vec<OrderbookLevel>> = HashMap::new();
let mut serum_bookside_cache: HashMap<String, Vec<OrderbookLevel>> = HashMap::new();
let mut last_write_versions = HashMap::<String, (u64, u64)>::new(); let mut last_write_versions = HashMap::<String, (u64, u64)>::new();
let relevant_pubkeys = market_configs.iter().flat_map(|m| [m.1.bids, m.1.asks]).collect::<HashSet<Pubkey>>(); let relevant_pubkeys = [market_configs.clone(), serum_market_configs.clone()]
.concat()
.iter()
.flat_map(|m| [m.1.bids, m.1.asks])
.collect::<HashSet<Pubkey>>();
info!("relevant_pubkeys {:?}", relevant_pubkeys); info!("relevant_pubkeys {:?}", relevant_pubkeys);
// update handling thread, reads both sloths and account updates // update handling thread, reads both slots and account updates
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
tokio::select! { tokio::select! {
@ -304,16 +382,8 @@ pub async fn init(
for mkt in market_configs.iter() { for mkt in market_configs.iter() {
for side in 0..2 { for side in 0..2 {
let mkt_pk = mkt.0; let mkt_pk = mkt.0;
let side_pk = if side == 0 { let side_pk = if side == 0 { mkt.1.bids } else { mkt.1.asks };
mkt.1.bids let other_side_pk = if side == 0 { mkt.1.asks } else { mkt.1.bids };
} else {
mkt.1.asks
};
let other_side_pk = if side == 0 {
mkt.1.asks
} else {
mkt.1.bids
};
let last_write_version = last_write_versions let last_write_version = last_write_versions
.get(&side_pk.to_string()) .get(&side_pk.to_string())
.unwrap_or(&(0, 0)); .unwrap_or(&(0, 0));
@ -332,6 +402,28 @@ pub async fn init(
let account = &account_info.account; let account = &account_info.account;
let bookside = let bookside =
BookSide::try_deserialize(account.data().borrow_mut()).unwrap(); BookSide::try_deserialize(account.data().borrow_mut()).unwrap();
let side = match bookside.nodes.order_tree_type() {
OrderTreeType::Bids => OrderbookSide::Bid,
OrderTreeType::Asks => OrderbookSide::Ask,
};
let time_now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let oracle_price_lots = 0; // todo: does this matter? where to find it?
let bookside = bookside
.iter_valid(time_now, oracle_price_lots)
.map(|item| (item.node.price_data() as i64, item.node.quantity))
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel {
price: native_to_ui(price, mkt.1.quote_decimals),
size: native_to_ui(group
.map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
})
.collect();
let other_bookside = bookside_cache.get(&other_side_pk.to_string()); let other_bookside = bookside_cache.get(&other_side_pk.to_string());
match bookside_cache.get(&side_pk_string) { match bookside_cache.get(&side_pk_string) {
@ -339,6 +431,7 @@ pub async fn init(
account_info.slot, account_info.slot,
account_info.write_version, account_info.write_version,
mkt, mkt,
side,
&bookside, &bookside,
&old_bookside, &old_bookside,
other_bookside, other_bookside,
@ -349,11 +442,82 @@ pub async fn init(
} }
bookside_cache.insert(side_pk_string.clone(), bookside.clone()); bookside_cache.insert(side_pk_string.clone(), bookside.clone());
} }
Err(_) => info!("chain_cache could not find {}", mkt_pk), Err(_) => info!("chain_cache could not find {}", mkt_pk),
} }
} }
} }
for mkt in serum_market_configs.iter() {
for side in 0..2 {
let side_pk = if side == 0 { mkt.1.bids } else { mkt.1.asks };
let other_side_pk = if side == 0 { mkt.1.asks } else { mkt.1.bids };
let last_write_version = last_write_versions
.get(&side_pk.to_string())
.unwrap_or(&(0, 0));
match chain_cache.account(&side_pk) {
Ok(account_info) => {
let side_pk_string = side_pk.to_string();
let write_version = (account_info.slot, account_info.write_version);
// todo: should this be <= so we don't overwrite with old data received late?
if write_version == *last_write_version {
continue;
}
last_write_versions.insert(side_pk_string.clone(), write_version);
let account = &mut account_info.account.clone();
let data = account.data_as_mut_slice();
let len = data.len();
info!("side pk {} side {}", side_pk, side);
let inner = &mut data[5..len - 7];
let slab = Slab::new(&mut inner[size_of::<OrderBookStateHeader>()..]);
let bookside: Vec<OrderbookLevel> = slab
.iter(side == 0)
.map(|item| (u64::from(item.price()) as i64, item.quantity() as i64))
.group_by(|(price, _)| *price)
.into_iter()
.map(|(price, group)| OrderbookLevel {
price: native_to_ui(price, mkt.1.quote_decimals),
size: native_to_ui(group
.map(|(_, quantity)| quantity)
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
})
.collect();
let other_bookside =
serum_bookside_cache.get(&other_side_pk.to_string());
match serum_bookside_cache.get(&side_pk_string) {
Some(old_bookside) => publish_changes_serum(
account_info.slot,
account_info.write_version,
mkt,
if side == 0 {
OrderbookSide::Bid
} else {
OrderbookSide::Ask
},
&bookside,
old_bookside,
other_bookside,
&fill_update_sender,
&mut metric_events_new,
),
_ => info!("bookside_cache could not find {}", side_pk_string),
}
serum_bookside_cache.insert(
side_pk_string.clone(),
bookside,
);
}
Err(_) => info!("chain_cache could not find {}", side_pk),
}
}
}
} }
}); });

View File

@ -26,6 +26,6 @@ bytemuck = "1.7.2"
mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" } mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
client = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" } client = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
serum_dex = { git = "ssh://git@github.com/openbook-dex/program", branch = "master" } serum_dex = { git = "ssh://git@github.com/jup-ag/openbook-program", branch = "feat/expose-things" }
anchor-lang = "0.25.0" anchor-lang = "0.25.0"
anchor-client = "0.25.0" anchor-client = "0.25.0"

View File

@ -13,17 +13,19 @@ use tokio::{
use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize; use serde::Deserialize;
use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig}; use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig, fill_event_filter::SerumFillCheckpoint};
use solana_geyser_connector_lib::{ use solana_geyser_connector_lib::{
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage}, fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig, grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
}; };
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>; type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
type SerumCheckpointMap = Arc<Mutex<HashMap<String, SerumFillCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>; type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>;
async fn handle_connection_error( async fn handle_connection_error(
checkpoint_map: CheckpointMap, checkpoint_map: CheckpointMap,
serum_checkpoint_map: SerumCheckpointMap,
peer_map: PeerMap, peer_map: PeerMap,
raw_stream: TcpStream, raw_stream: TcpStream,
addr: SocketAddr, addr: SocketAddr,
@ -32,7 +34,7 @@ async fn handle_connection_error(
) { ) {
metrics_opened_connections.clone().increment(); metrics_opened_connections.clone().increment();
let result = handle_connection(checkpoint_map, peer_map.clone(), raw_stream, addr).await; let result = handle_connection(checkpoint_map, serum_checkpoint_map, peer_map.clone(), raw_stream, addr).await;
if result.is_err() { if result.is_err() {
error!("connection {} error {}", addr, result.unwrap_err()); error!("connection {} error {}", addr, result.unwrap_err());
}; };
@ -44,6 +46,7 @@ async fn handle_connection_error(
async fn handle_connection( async fn handle_connection(
checkpoint_map: CheckpointMap, checkpoint_map: CheckpointMap,
serum_checkpoint_map: SerumCheckpointMap,
peer_map: PeerMap, peer_map: PeerMap,
raw_stream: TcpStream, raw_stream: TcpStream,
addr: SocketAddr, addr: SocketAddr,
@ -59,6 +62,7 @@ async fn handle_connection(
info!("ws published: {}", addr); info!("ws published: {}", addr);
} }
// todo: add subscribe logic
// 2: send initial checkpoint // 2: send initial checkpoint
{ {
let checkpoint_map_copy = checkpoint_map.lock().unwrap().clone(); let checkpoint_map_copy = checkpoint_map.lock().unwrap().clone();
@ -170,9 +174,11 @@ async fn main() -> anyhow::Result<()> {
fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?; fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?;
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new())); let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new()));
let peers = PeerMap::new(Mutex::new(HashMap::new())); let peers = PeerMap::new(Mutex::new(HashMap::new()));
let checkpoints_ref_thread = checkpoints.clone(); let checkpoints_ref_thread = checkpoints.clone();
let serum_checkpoints_ref_thread = serum_checkpoints.clone();
let peers_ref_thread = peers.clone(); let peers_ref_thread = peers.clone();
// filleventfilter websocket sink // filleventfilter websocket sink
@ -202,6 +208,27 @@ async fn main() -> anyhow::Result<()> {
.unwrap() .unwrap()
.insert(checkpoint.queue.clone(), checkpoint); .insert(checkpoint.queue.clone(), checkpoint);
} }
FillEventFilterMessage::SerumUpdate(update) => {
debug!("ws update {} {:?} serum fill", update.market, update.status);
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (k, v) in peer_copy.iter_mut() {
trace!(" > {}", k);
let json = serde_json::to_string(&update);
let result = v.send(Message::Text(json.unwrap())).await;
if result.is_err() {
error!(
"ws update {} {:?} serum fill could not reach {}",
update.market, update.status, k
);
}
}
}
FillEventFilterMessage::SerumCheckpoint(checkpoint) => {
serum_checkpoints_ref_thread
.lock()
.unwrap()
.insert(checkpoint.queue.clone(), checkpoint);
}
} }
} }
}); });
@ -214,6 +241,7 @@ async fn main() -> anyhow::Result<()> {
while let Ok((stream, addr)) = listener.accept().await { while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection_error( tokio::spawn(handle_connection_error(
checkpoints.clone(), checkpoints.clone(),
serum_checkpoints.clone(),
peers.clone(), peers.clone(),
stream, stream,
addr, addr,
@ -234,7 +262,10 @@ async fn main() -> anyhow::Result<()> {
); );
let use_geyser = true; let use_geyser = true;
let filter_config = FilterConfig { let filter_config = FilterConfig {
program_ids: vec!["abc123".into()] program_ids: vec![
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
],
}; };
if use_geyser { if use_geyser {
grpc_plugin_source::process_events( grpc_plugin_source::process_events(

View File

@ -7,8 +7,7 @@ use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{ use futures_util::{
future::{self, Ready}, future::{self, Ready},
pin_mut, pin_mut, SinkExt, StreamExt, TryStreamExt,
SinkExt, StreamExt, TryStreamExt,
}; };
use log::*; use log::*;
use std::{ use std::{
@ -27,15 +26,15 @@ use tokio::{
}; };
use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::{Deserialize}; use serde::Deserialize;
use solana_geyser_connector_lib::{
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
};
use solana_geyser_connector_lib::{ use solana_geyser_connector_lib::{
metrics::{MetricType, MetricU64}, metrics::{MetricType, MetricU64},
orderbook_filter::{self, MarketConfig, OrderbookCheckpoint, OrderbookFilterMessage}, orderbook_filter::{self, MarketConfig, OrderbookCheckpoint, OrderbookFilterMessage},
FilterConfig, StatusResponse, FilterConfig, StatusResponse,
}; };
use solana_geyser_connector_lib::{
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
};
type CheckpointMap = Arc<Mutex<HashMap<String, OrderbookCheckpoint>>>; type CheckpointMap = Arc<Mutex<HashMap<String, OrderbookCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>; type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
@ -81,7 +80,7 @@ pub struct Config {
async fn handle_connection_error( async fn handle_connection_error(
checkpoint_map: CheckpointMap, checkpoint_map: CheckpointMap,
peer_map: PeerMap, peer_map: PeerMap,
market_ids: Vec<String>, market_ids: HashMap<String, String>,
raw_stream: TcpStream, raw_stream: TcpStream,
addr: SocketAddr, addr: SocketAddr,
metrics_opened_connections: MetricU64, metrics_opened_connections: MetricU64,
@ -109,7 +108,7 @@ async fn handle_connection_error(
async fn handle_connection( async fn handle_connection(
checkpoint_map: CheckpointMap, checkpoint_map: CheckpointMap,
peer_map: PeerMap, peer_map: PeerMap,
market_ids: Vec<String>, market_ids: HashMap<String, String>,
raw_stream: TcpStream, raw_stream: TcpStream,
addr: SocketAddr, addr: SocketAddr,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -153,7 +152,7 @@ fn handle_commands(
msg: Message, msg: Message,
peer_map: PeerMap, peer_map: PeerMap,
checkpoint_map: CheckpointMap, checkpoint_map: CheckpointMap,
market_ids: Vec<String>, market_ids: HashMap<String, String>,
) -> Ready<Result<(), Error>> { ) -> Ready<Result<(), Error>> {
let msg_str = msg.clone().into_text().unwrap(); let msg_str = msg.clone().into_text().unwrap();
let command: Result<Command, serde_json::Error> = serde_json::from_str(&msg_str); let command: Result<Command, serde_json::Error> = serde_json::from_str(&msg_str);
@ -162,17 +161,20 @@ fn handle_commands(
match command { match command {
Ok(Command::Subscribe(cmd)) => { Ok(Command::Subscribe(cmd)) => {
let market_id = cmd.clone().market_id; let market_id = cmd.clone().market_id;
let subscribed = peer.subscriptions.insert(market_id.clone()); match market_ids.get(&market_id) {
if !market_ids.contains(&market_id) { None => {
let res = StatusResponse { let res = StatusResponse {
success: false, success: false,
message: "market not found", message: "market not found",
}; };
peer.sender peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap())) .unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap(); .unwrap();
return future::ok(()); return future::ok(());
}
_ => {}
} }
let subscribed = peer.subscriptions.insert(market_id.clone());
let res = if subscribed { let res = if subscribed {
StatusResponse { StatusResponse {
@ -221,13 +223,13 @@ fn handle_commands(
peer.sender peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap())) .unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
.unwrap(); .unwrap();
}, }
Ok(Command::GetMarkets) => { Ok(Command::GetMarkets) => {
info!("getMarkets"); info!("getMarkets");
peer.sender peer.sender
.unbounded_send(Message::Text(serde_json::to_string(&market_ids).unwrap())) .unbounded_send(Message::Text(serde_json::to_string(&market_ids).unwrap()))
.unwrap(); .unwrap();
}, }
Err(err) => { Err(err) => {
info!("error deserializing user input {:?}", err); info!("error deserializing user input {:?}", err);
let res = StatusResponse { let res = StatusResponse {
@ -287,31 +289,65 @@ async fn main() -> anyhow::Result<()> {
Pubkey::from_str(&config.mango_group).unwrap(), Pubkey::from_str(&config.mango_group).unwrap(),
client.cluster.clone(), client.cluster.clone(),
client.commitment, client.commitment,
)?); )?);
// todo: reload markets at intervals // todo: reload markets at intervals
let market_pubkey_strings: Vec<String> = group_context
.perp_markets
.iter()
.map(|(_, context)| context.address.to_string())
.collect();
let market_configs: Vec<(Pubkey, MarketConfig)> = group_context let market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.perp_markets .perp_markets
.iter() .iter()
.map(|(_, context)| { .map(|(_, context)| {
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market") // todo: default to 6 for usdc?
};
( (
context.address, context.address,
MarketConfig { MarketConfig {
name: context.market.name().to_owned(), name: context.market.name().to_owned(),
bids: context.market.bids, bids: context.market.bids,
asks: context.market.asks, asks: context.market.asks,
base_decimals: context.market.base_decimals,
quote_decimals,
}, },
) )
}) })
.collect(); .collect();
let serum_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.serum3_markets
.iter()
.map(|(_, context)| {
let base_decimals = match group_context.tokens.get(&context.market.base_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market") // todo: default?
};
let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market") // todo: default to 6 for usdc?
};
(
context.address,
MarketConfig {
name: context.market.name().to_owned(),
bids: context.bids,
asks: context.asks,
base_decimals,
quote_decimals,
},
)
})
.collect();
let market_pubkey_strings: HashMap<String, String> = [market_configs.clone(), serum_market_configs.clone()]
.concat()
.iter()
.map(|market| (market.0.to_string(), market.1.name.clone()))
.collect::<Vec<(String, String)>>()
.into_iter()
.collect();
let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) = let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) =
orderbook_filter::init(market_configs, metrics_tx.clone()).await?; orderbook_filter::init(market_configs, serum_market_configs, metrics_tx.clone()).await?;
let checkpoints_ref_thread = checkpoints.clone(); let checkpoints_ref_thread = checkpoints.clone();
let peers_ref_thread = peers.clone(); let peers_ref_thread = peers.clone();
@ -381,7 +417,7 @@ async fn main() -> anyhow::Result<()> {
let filter_config = FilterConfig { let filter_config = FilterConfig {
program_ids: vec![ program_ids: vec![
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(), "4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
//"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(), "srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
], ],
}; };
grpc_plugin_source::process_events( grpc_plugin_source::process_events(

View File

@ -288,11 +288,17 @@ async fn main() -> anyhow::Result<()> {
// start filling chain_data from the grpc plugin source // start filling chain_data from the grpc plugin source
let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?; let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?;
let filter_config = FilterConfig {
program_ids: vec![
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
],
};
grpc_plugin_source::process_events( grpc_plugin_source::process_events(
&config.source, &config.source,
&filter_config,
account_write_queue_sender, account_write_queue_sender,
slot_queue_sender, slot_queue_sender,
metrics_tx, metrics_tx.clone(),
) )
.await; .await;