Compare commits
2 Commits
df562a3e42
...
7f0ddd3ac5
Author | SHA1 | Date |
---|---|---|
Riordan Panayides | 7f0ddd3ac5 | |
Riordan Panayides | fad9f7bae3 |
|
@ -5076,6 +5076,30 @@ dependencies = [
|
||||||
"without-alloc",
|
"without-alloc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serum_dex"
|
||||||
|
version = "0.5.10"
|
||||||
|
source = "git+ssh://git@github.com/jup-ag/openbook-program?branch=feat/expose-things#d287391bd7ed8111078fd1afb8f7b6d55cdd1d1f"
|
||||||
|
dependencies = [
|
||||||
|
"anchor-lang",
|
||||||
|
"arrayref",
|
||||||
|
"bincode",
|
||||||
|
"bytemuck",
|
||||||
|
"byteorder",
|
||||||
|
"enumflags2",
|
||||||
|
"field-offset",
|
||||||
|
"itertools 0.9.0",
|
||||||
|
"num-traits",
|
||||||
|
"num_enum",
|
||||||
|
"safe-transmute",
|
||||||
|
"serde",
|
||||||
|
"solana-program",
|
||||||
|
"spl-token",
|
||||||
|
"static_assertions",
|
||||||
|
"thiserror",
|
||||||
|
"without-alloc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serum_dex"
|
name = "serum_dex"
|
||||||
version = "0.5.10"
|
version = "0.5.10"
|
||||||
|
@ -5121,7 +5145,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serum_dex 0.5.10",
|
"serum_dex 0.5.10 (git+ssh://git@github.com/jup-ag/openbook-program?branch=feat/expose-things)",
|
||||||
"solana-geyser-connector-lib",
|
"solana-geyser-connector-lib",
|
||||||
"solana-logger",
|
"solana-logger",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
@ -5149,7 +5173,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serum_dex 0.5.10",
|
"serum_dex 0.5.10 (git+ssh://git@github.com/openbook-dex/program?branch=master)",
|
||||||
"solana-geyser-connector-lib",
|
"solana-geyser-connector-lib",
|
||||||
"solana-logger",
|
"solana-logger",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
@ -5728,7 +5752,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serum_dex 0.5.10",
|
"serum_dex 0.5.10 (git+ssh://git@github.com/jup-ag/openbook-program?branch=feat/expose-things)",
|
||||||
"solana-account-decoder",
|
"solana-account-decoder",
|
||||||
"solana-client",
|
"solana-client",
|
||||||
"solana-rpc",
|
"solana-rpc",
|
||||||
|
|
|
@ -58,7 +58,7 @@ warp = "0.3"
|
||||||
|
|
||||||
anchor-lang = "0.25.0"
|
anchor-lang = "0.25.0"
|
||||||
|
|
||||||
serum_dex = { git = "ssh://git@github.com/openbook-dex/program", branch = "master" }
|
serum_dex = { git = "ssh://git@github.com/jup-ag/openbook-program", branch = "feat/expose-things" }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-build = { version = "0.6", features = ["compression"] }
|
tonic-build = { version = "0.6", features = ["compression"] }
|
||||||
|
|
|
@ -3,8 +3,10 @@ use crate::{
|
||||||
metrics::{MetricType, Metrics},
|
metrics::{MetricType, Metrics},
|
||||||
AccountWrite, SlotUpdate,
|
AccountWrite, SlotUpdate,
|
||||||
};
|
};
|
||||||
|
use bytemuck::{Pod, Zeroable};
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||||
|
use serum_dex::state::EventView;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
account::{ReadableAccount, WritableAccount},
|
account::{ReadableAccount, WritableAccount},
|
||||||
clock::Epoch,
|
clock::Epoch,
|
||||||
|
@ -39,6 +41,27 @@ pub struct FillUpdate {
|
||||||
pub write_version: u64,
|
pub write_version: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct SerumFillUpdate {
|
||||||
|
pub event: serum_dex::state::Event,
|
||||||
|
pub status: FillUpdateStatus,
|
||||||
|
pub market: String,
|
||||||
|
pub queue: String,
|
||||||
|
pub slot: u64,
|
||||||
|
pub write_version: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Copy, Clone, Debug)]
|
||||||
|
#[repr(packed)]
|
||||||
|
pub struct SerumEventQueueHeader {
|
||||||
|
_account_flags: u64, // Initialized, EventQueue
|
||||||
|
_head: u64,
|
||||||
|
count: u64,
|
||||||
|
seq_num: u64,
|
||||||
|
}
|
||||||
|
unsafe impl Zeroable for SerumEventQueueHeader {}
|
||||||
|
unsafe impl Pod for SerumEventQueueHeader {}
|
||||||
|
|
||||||
impl Serialize for FillUpdate {
|
impl Serialize for FillUpdate {
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
|
@ -57,6 +80,24 @@ impl Serialize for FillUpdate {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Serialize for SerumFillUpdate {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
let event = base64::encode(bytemuck::bytes_of(&self.event));
|
||||||
|
let mut state = serializer.serialize_struct("SerumFillUpdate", 4)?;
|
||||||
|
state.serialize_field("event", &event)?;
|
||||||
|
state.serialize_field("market", &self.market)?;
|
||||||
|
state.serialize_field("queue", &self.queue)?;
|
||||||
|
state.serialize_field("status", &self.status)?;
|
||||||
|
state.serialize_field("slot", &self.slot)?;
|
||||||
|
state.serialize_field("write_version", &self.write_version)?;
|
||||||
|
|
||||||
|
state.end()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct FillCheckpoint {
|
pub struct FillCheckpoint {
|
||||||
pub market: String,
|
pub market: String,
|
||||||
|
@ -66,6 +107,15 @@ pub struct FillCheckpoint {
|
||||||
pub write_version: u64,
|
pub write_version: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct SerumFillCheckpoint {
|
||||||
|
pub market: String,
|
||||||
|
pub queue: String,
|
||||||
|
pub events: Vec<serum_dex::state::Event>,
|
||||||
|
pub slot: u64,
|
||||||
|
pub write_version: u64,
|
||||||
|
}
|
||||||
|
|
||||||
impl Serialize for FillCheckpoint {
|
impl Serialize for FillCheckpoint {
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
|
@ -76,7 +126,28 @@ impl Serialize for FillCheckpoint {
|
||||||
.iter()
|
.iter()
|
||||||
.map(|e| base64::encode(bytemuck::bytes_of(e)))
|
.map(|e| base64::encode(bytemuck::bytes_of(e)))
|
||||||
.collect();
|
.collect();
|
||||||
let mut state = serializer.serialize_struct("FillUpdate", 3)?;
|
let mut state = serializer.serialize_struct("FillCheckpoint", 3)?;
|
||||||
|
state.serialize_field("events", &events)?;
|
||||||
|
state.serialize_field("market", &self.market)?;
|
||||||
|
state.serialize_field("queue", &self.queue)?;
|
||||||
|
state.serialize_field("slot", &self.slot)?;
|
||||||
|
state.serialize_field("write_version", &self.write_version)?;
|
||||||
|
|
||||||
|
state.end()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Serialize for SerumFillCheckpoint {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
let events: Vec<String> = self
|
||||||
|
.events
|
||||||
|
.iter()
|
||||||
|
.map(|e| base64::encode(bytemuck::bytes_of(e)))
|
||||||
|
.collect();
|
||||||
|
let mut state = serializer.serialize_struct("SerumFillCheckpoint", 3)?;
|
||||||
state.serialize_field("events", &events)?;
|
state.serialize_field("events", &events)?;
|
||||||
state.serialize_field("market", &self.market)?;
|
state.serialize_field("market", &self.market)?;
|
||||||
state.serialize_field("queue", &self.queue)?;
|
state.serialize_field("queue", &self.queue)?;
|
||||||
|
@ -89,13 +160,15 @@ impl Serialize for FillCheckpoint {
|
||||||
|
|
||||||
pub enum FillEventFilterMessage {
|
pub enum FillEventFilterMessage {
|
||||||
Update(FillUpdate),
|
Update(FillUpdate),
|
||||||
|
SerumUpdate(SerumFillUpdate),
|
||||||
Checkpoint(FillCheckpoint),
|
Checkpoint(FillCheckpoint),
|
||||||
|
SerumCheckpoint(SerumFillCheckpoint),
|
||||||
}
|
}
|
||||||
|
|
||||||
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
|
// couldn't compile the correct struct size / math on m1, fixed sizes resolve this issue
|
||||||
type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
|
type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
|
||||||
|
|
||||||
fn publish_changes(
|
fn publish_changes_perp(
|
||||||
slot: u64,
|
slot: u64,
|
||||||
write_version: u64,
|
write_version: u64,
|
||||||
mkt: &(Pubkey, Pubkey),
|
mkt: &(Pubkey, Pubkey),
|
||||||
|
@ -232,6 +305,144 @@ fn publish_changes(
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn publish_changes_serum(
|
||||||
|
slot: u64,
|
||||||
|
write_version: u64,
|
||||||
|
mkt: &(Pubkey, Pubkey),
|
||||||
|
header: &SerumEventQueueHeader,
|
||||||
|
events: &[serum_dex::state::Event],
|
||||||
|
old_seq_num: u64,
|
||||||
|
old_events: &[serum_dex::state::Event],
|
||||||
|
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
|
||||||
|
metric_events_new: &mut MetricU64,
|
||||||
|
metric_events_change: &mut MetricU64,
|
||||||
|
metric_events_drop: &mut MetricU64,
|
||||||
|
) {
|
||||||
|
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
|
||||||
|
let start_seq_num = max(old_seq_num, header.seq_num)
|
||||||
|
.checked_sub(MAX_NUM_EVENTS as u64)
|
||||||
|
.unwrap_or(0);
|
||||||
|
let mut checkpoint = Vec::new();
|
||||||
|
let mkt_pk_string = mkt.0.to_string();
|
||||||
|
let evq_pk_string = mkt.1.to_string();
|
||||||
|
let header_seq_num = header.seq_num;
|
||||||
|
info!("start seq {} header seq {}", start_seq_num, header_seq_num);
|
||||||
|
for seq_num in start_seq_num..header_seq_num {
|
||||||
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
||||||
|
let event_view = events[idx].as_view().unwrap();
|
||||||
|
let old_event_view = old_events[idx].as_view().unwrap();
|
||||||
|
|
||||||
|
match event_view {
|
||||||
|
EventView::Fill { .. } => {
|
||||||
|
// there are three possible cases:
|
||||||
|
// 1) the event is past the old seq num, hence guaranteed new event
|
||||||
|
// 2) the event is not matching the old event queue
|
||||||
|
// 3) all other events are matching the old event queue
|
||||||
|
// the order of these checks is important so they are exhaustive
|
||||||
|
if seq_num >= old_seq_num {
|
||||||
|
info!("found new serum fill {} idx {}", mkt_pk_string, idx,);
|
||||||
|
|
||||||
|
metric_events_new.increment();
|
||||||
|
fill_update_sender
|
||||||
|
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
|
||||||
|
slot,
|
||||||
|
write_version,
|
||||||
|
event: events[idx],
|
||||||
|
status: FillUpdateStatus::New,
|
||||||
|
market: mkt_pk_string.clone(),
|
||||||
|
queue: evq_pk_string.clone(),
|
||||||
|
}))
|
||||||
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
|
checkpoint.push(events[idx]);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match old_event_view {
|
||||||
|
EventView::Fill { .. } => {
|
||||||
|
info!("already got all fills???");
|
||||||
|
// every already published event is recorded in checkpoint
|
||||||
|
checkpoint.push(events[idx]);
|
||||||
|
}
|
||||||
|
EventView::Out { .. } => {
|
||||||
|
info!(
|
||||||
|
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
|
||||||
|
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
|
||||||
|
);
|
||||||
|
|
||||||
|
metric_events_change.increment();
|
||||||
|
|
||||||
|
// first revoke old event
|
||||||
|
fill_update_sender
|
||||||
|
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
|
||||||
|
slot,
|
||||||
|
write_version,
|
||||||
|
event: old_events[idx],
|
||||||
|
status: FillUpdateStatus::Revoke,
|
||||||
|
market: mkt_pk_string.clone(),
|
||||||
|
queue: evq_pk_string.clone(),
|
||||||
|
}))
|
||||||
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
|
|
||||||
|
// then publish new if its a fill and record in checkpoint
|
||||||
|
fill_update_sender
|
||||||
|
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
|
||||||
|
slot,
|
||||||
|
write_version,
|
||||||
|
event: events[idx],
|
||||||
|
status: FillUpdateStatus::New,
|
||||||
|
market: mkt_pk_string.clone(),
|
||||||
|
queue: evq_pk_string.clone(),
|
||||||
|
}))
|
||||||
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
|
checkpoint.push(events[idx]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// in case queue size shrunk due to a fork we need revoke all previous fills
|
||||||
|
for seq_num in header_seq_num..old_seq_num {
|
||||||
|
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
|
||||||
|
let old_event_view = old_events[idx].as_view().unwrap();
|
||||||
|
debug!(
|
||||||
|
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
|
||||||
|
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
|
||||||
|
);
|
||||||
|
|
||||||
|
metric_events_drop.increment();
|
||||||
|
|
||||||
|
match old_event_view {
|
||||||
|
EventView::Fill { .. } => {
|
||||||
|
fill_update_sender
|
||||||
|
.try_send(FillEventFilterMessage::SerumUpdate(SerumFillUpdate {
|
||||||
|
slot,
|
||||||
|
event: old_events[idx],
|
||||||
|
write_version,
|
||||||
|
status: FillUpdateStatus::Revoke,
|
||||||
|
market: mkt_pk_string.clone(),
|
||||||
|
queue: evq_pk_string.clone(),
|
||||||
|
}))
|
||||||
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
|
}
|
||||||
|
EventView::Out { .. } => { continue }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fill_update_sender
|
||||||
|
.try_send(FillEventFilterMessage::SerumCheckpoint(
|
||||||
|
SerumFillCheckpoint {
|
||||||
|
slot,
|
||||||
|
write_version,
|
||||||
|
events: checkpoint,
|
||||||
|
market: mkt_pk_string,
|
||||||
|
queue: evq_pk_string,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn init(
|
pub async fn init(
|
||||||
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
|
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
|
||||||
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
|
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
|
||||||
|
@ -265,12 +476,13 @@ pub async fn init(
|
||||||
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
||||||
|
|
||||||
let mut chain_cache = ChainData::new();
|
let mut chain_cache = ChainData::new();
|
||||||
let mut events_cache: HashMap<String, EventQueueEvents> = HashMap::new();
|
let mut perp_events_cache: HashMap<String, EventQueueEvents> = HashMap::new();
|
||||||
|
let mut serum_events_cache: HashMap<String, Vec<serum_dex::state::Event>> = HashMap::new();
|
||||||
let mut seq_num_cache = HashMap::new();
|
let mut seq_num_cache = HashMap::new();
|
||||||
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
|
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
|
||||||
|
|
||||||
let relevant_pubkeys = [perp_queue_pks.clone(), serum_queue_pks.clone()]
|
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
|
||||||
.concat()
|
let relevant_pubkeys = all_queue_pks
|
||||||
.iter()
|
.iter()
|
||||||
.map(|m| m.1)
|
.map(|m| m.1)
|
||||||
.collect::<HashSet<Pubkey>>();
|
.collect::<HashSet<Pubkey>>();
|
||||||
|
@ -310,7 +522,7 @@ pub async fn init(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for mkt in perp_queue_pks.iter() {
|
for mkt in all_queue_pks.iter() {
|
||||||
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
|
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
|
||||||
let mkt_pk = mkt.1;
|
let mkt_pk = mkt.1;
|
||||||
|
|
||||||
|
@ -326,34 +538,81 @@ pub async fn init(
|
||||||
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
|
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
|
||||||
|
|
||||||
let account = &account_info.account;
|
let account = &account_info.account;
|
||||||
|
let is_perp = mango_v4::check_id(account.owner());
|
||||||
|
if is_perp {
|
||||||
|
let event_queue =
|
||||||
|
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
|
||||||
|
trace!(
|
||||||
|
"evq {} seq_num {}",
|
||||||
|
evq_pk_string,
|
||||||
|
event_queue.header.seq_num
|
||||||
|
);
|
||||||
|
match seq_num_cache.get(&evq_pk_string) {
|
||||||
|
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
|
||||||
|
Some(old_events) => publish_changes_perp(
|
||||||
|
account_info.slot,
|
||||||
|
account_info.write_version,
|
||||||
|
mkt,
|
||||||
|
&event_queue.header,
|
||||||
|
&event_queue.buf,
|
||||||
|
*old_seq_num,
|
||||||
|
old_events,
|
||||||
|
&fill_update_sender,
|
||||||
|
&mut metric_events_new,
|
||||||
|
&mut metric_events_change,
|
||||||
|
&mut metrics_events_drop,
|
||||||
|
),
|
||||||
|
_ => {
|
||||||
|
info!("perp_events_cache could not find {}", evq_pk_string)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => info!("seq_num_cache could not find {}", evq_pk_string),
|
||||||
|
}
|
||||||
|
|
||||||
let event_queue =
|
seq_num_cache
|
||||||
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
|
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
|
||||||
trace!("evq {} seq_num {}", evq_pk_string, event_queue.header.seq_num);
|
perp_events_cache
|
||||||
|
.insert(evq_pk_string.clone(), event_queue.buf.clone());
|
||||||
|
} else {
|
||||||
|
let inner_data = &account.data()[5..&account.data().len() - 7];
|
||||||
|
let header_span = std::mem::size_of::<SerumEventQueueHeader>();
|
||||||
|
let header: SerumEventQueueHeader =
|
||||||
|
*bytemuck::from_bytes(&inner_data[..header_span]);
|
||||||
|
let seq_num = header.seq_num;
|
||||||
|
let count = header.count;
|
||||||
|
let rest = &inner_data[header_span..];
|
||||||
|
let slop = rest.len() % std::mem::size_of::<serum_dex::state::Event>();
|
||||||
|
let new_len = rest.len() - slop;
|
||||||
|
let events = &rest[..new_len];
|
||||||
|
debug!("evq {} header_span {} header_seq_num {} header_count {} inner_len {} events_len {} sizeof Event {}", evq_pk_string, header_span, seq_num, count, inner_data.len(), events.len(), std::mem::size_of::<serum_dex::state::Event>());
|
||||||
|
let events: &[serum_dex::state::Event] = bytemuck::cast_slice(&events);
|
||||||
|
|
||||||
match seq_num_cache.get(&evq_pk_string) {
|
match seq_num_cache.get(&evq_pk_string) {
|
||||||
Some(old_seq_num) => match events_cache.get(&evq_pk_string) {
|
Some(old_seq_num) => match serum_events_cache.get(&evq_pk_string) {
|
||||||
Some(old_events) => publish_changes(
|
Some(old_events) => publish_changes_serum(
|
||||||
account_info.slot,
|
account_info.slot,
|
||||||
account_info.write_version,
|
account_info.write_version,
|
||||||
mkt,
|
mkt,
|
||||||
&event_queue.header,
|
&header,
|
||||||
&event_queue.buf,
|
&events,
|
||||||
*old_seq_num,
|
*old_seq_num,
|
||||||
old_events,
|
old_events,
|
||||||
&fill_update_sender,
|
&fill_update_sender,
|
||||||
&mut metric_events_new,
|
&mut metric_events_new,
|
||||||
&mut metric_events_change,
|
&mut metric_events_change,
|
||||||
&mut metrics_events_drop,
|
&mut metrics_events_drop,
|
||||||
),
|
),
|
||||||
_ => info!("events_cache could not find {}", evq_pk_string),
|
_ => {
|
||||||
},
|
info!("serum_events_cache could not find {}", evq_pk_string)
|
||||||
_ => info!("seq_num_cache could not find {}", evq_pk_string),
|
}
|
||||||
|
},
|
||||||
|
_ => info!("seq_num_cache could not find {}", evq_pk_string),
|
||||||
|
}
|
||||||
|
|
||||||
|
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
|
||||||
|
serum_events_cache
|
||||||
|
.insert(evq_pk_string.clone(), events.clone().to_vec());
|
||||||
}
|
}
|
||||||
|
|
||||||
seq_num_cache
|
|
||||||
.insert(evq_pk_string.clone(), event_queue.header.seq_num.clone());
|
|
||||||
events_cache.insert(evq_pk_string.clone(), event_queue.buf.clone());
|
|
||||||
}
|
}
|
||||||
Err(_) => info!("chain_cache could not find {}", mkt.1),
|
Err(_) => info!("chain_cache could not find {}", mkt.1),
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,10 @@ use crate::{
|
||||||
metrics::{MetricType, Metrics},
|
metrics::{MetricType, Metrics},
|
||||||
AccountWrite, SlotUpdate,
|
AccountWrite, SlotUpdate,
|
||||||
};
|
};
|
||||||
|
use itertools::Itertools;
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
use serde::{ser::SerializeStruct, Serialize, Serializer};
|
||||||
|
use serum_dex::critbit::Slab;
|
||||||
use solana_sdk::{
|
use solana_sdk::{
|
||||||
account::{ReadableAccount, WritableAccount},
|
account::{ReadableAccount, WritableAccount},
|
||||||
clock::Epoch,
|
clock::Epoch,
|
||||||
|
@ -12,13 +14,13 @@ use solana_sdk::{
|
||||||
};
|
};
|
||||||
use std::{
|
use std::{
|
||||||
borrow::BorrowMut,
|
borrow::BorrowMut,
|
||||||
collections::{HashMap, HashSet}, time::{UNIX_EPOCH, SystemTime},
|
collections::{HashMap, HashSet},
|
||||||
|
time::{SystemTime, UNIX_EPOCH}, mem::size_of,
|
||||||
};
|
};
|
||||||
use itertools::Itertools;
|
|
||||||
|
|
||||||
use crate::metrics::MetricU64;
|
use crate::metrics::MetricU64;
|
||||||
use anchor_lang::AccountDeserialize;
|
use anchor_lang::AccountDeserialize;
|
||||||
use mango_v4::state::{BookSide, OrderTreeType};
|
use mango_v4::{state::{BookSide, OrderTreeType}, serum3_cpi::OrderBookStateHeader};
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum OrderbookSide {
|
pub enum OrderbookSide {
|
||||||
|
@ -29,19 +31,19 @@ pub enum OrderbookSide {
|
||||||
impl Serialize for OrderbookSide {
|
impl Serialize for OrderbookSide {
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
S: Serializer
|
S: Serializer,
|
||||||
{
|
{
|
||||||
match *self {
|
match *self {
|
||||||
OrderbookSide::Bid => serializer.serialize_unit_variant("Side", 0, "bid"),
|
OrderbookSide::Bid => serializer.serialize_unit_variant("Side", 0, "bid"),
|
||||||
OrderbookSide::Ask => serializer.serialize_unit_variant("Side", 1, "ask"),
|
OrderbookSide::Ask => serializer.serialize_unit_variant("Side", 1, "ask"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct OrderbookLevel {
|
pub struct OrderbookLevel {
|
||||||
pub price: i64,
|
pub price: f64,
|
||||||
pub size: i64,
|
pub size: f64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Serialize for OrderbookLevel {
|
impl Serialize for OrderbookLevel {
|
||||||
|
@ -112,48 +114,34 @@ pub enum OrderbookFilterMessage {
|
||||||
Checkpoint(OrderbookCheckpoint),
|
Checkpoint(OrderbookCheckpoint),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
pub struct MarketConfig {
|
pub struct MarketConfig {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub bids: Pubkey,
|
pub bids: Pubkey,
|
||||||
pub asks: Pubkey,
|
pub asks: Pubkey,
|
||||||
|
pub base_decimals: u8,
|
||||||
|
pub quote_decimals: u8,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn native_to_ui(native: i64, decimals: u8) -> f64 {
|
||||||
|
native as f64 / (10u64.pow(decimals.into())) as f64
|
||||||
}
|
}
|
||||||
|
|
||||||
fn publish_changes(
|
fn publish_changes(
|
||||||
slot: u64,
|
slot: u64,
|
||||||
write_version: u64,
|
write_version: u64,
|
||||||
mkt: &(Pubkey, MarketConfig),
|
mkt: &(Pubkey, MarketConfig),
|
||||||
bookside: &BookSide,
|
side: OrderbookSide,
|
||||||
old_bookside: &BookSide,
|
current_bookside: &Vec<OrderbookLevel>,
|
||||||
other_bookside: Option<&BookSide>,
|
previous_bookside: &Vec<OrderbookLevel>,
|
||||||
|
maybe_other_bookside: Option<&Vec<OrderbookLevel>>,
|
||||||
orderbook_update_sender: &async_channel::Sender<OrderbookFilterMessage>,
|
orderbook_update_sender: &async_channel::Sender<OrderbookFilterMessage>,
|
||||||
metric_updates: &mut MetricU64,
|
metric_updates: &mut MetricU64,
|
||||||
) {
|
) {
|
||||||
let time_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
|
let mut update: Vec<OrderbookLevel> = vec![];
|
||||||
let oracle_price_lots = 0; // todo: does this matter? where to find it?
|
|
||||||
let side = match bookside.nodes.order_tree_type() {
|
|
||||||
OrderTreeType::Bids => OrderbookSide::Bid,
|
|
||||||
OrderTreeType::Asks => OrderbookSide::Ask,
|
|
||||||
};
|
|
||||||
|
|
||||||
let current_l2_snapshot: Vec<OrderbookLevel> = bookside
|
|
||||||
.iter_valid(time_now, oracle_price_lots)
|
|
||||||
.map(|item| (item.node.price_data() as i64, item.node.quantity))
|
|
||||||
.group_by(|(price, _)| *price)
|
|
||||||
.into_iter()
|
|
||||||
.map(|(price, group)| OrderbookLevel { price, size: group.map(|(_, quantity)| quantity).fold(0, |acc, x| acc + x)})
|
|
||||||
.collect();
|
|
||||||
let previous_l2_snapshot: Vec<OrderbookLevel> = old_bookside
|
|
||||||
.iter_valid(time_now, oracle_price_lots)
|
|
||||||
.map(|item| (item.node.price_data() as i64, item.node.quantity))
|
|
||||||
.group_by(|(price, _)| *price)
|
|
||||||
.into_iter()
|
|
||||||
.map(|(price, group)| OrderbookLevel { price, size: group.map(|(_, quantity)| quantity).fold(0, |acc, x| acc + x)})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let mut update: Vec<OrderbookLevel> = vec!();
|
|
||||||
// push diff for levels that are no longer present
|
// push diff for levels that are no longer present
|
||||||
for previous_order in previous_l2_snapshot.iter() {
|
for previous_order in previous_bookside.iter() {
|
||||||
let peer = current_l2_snapshot
|
let peer = current_bookside
|
||||||
.iter()
|
.iter()
|
||||||
.find(|level| previous_order.price == level.price);
|
.find(|level| previous_order.price == level.price);
|
||||||
|
|
||||||
|
@ -161,17 +149,17 @@ fn publish_changes(
|
||||||
None => {
|
None => {
|
||||||
info!("level removed {}", previous_order.price);
|
info!("level removed {}", previous_order.price);
|
||||||
update.push(OrderbookLevel {
|
update.push(OrderbookLevel {
|
||||||
price: previous_order.price,
|
price: previous_order.price,
|
||||||
size: 0,
|
size: 0f64,
|
||||||
});
|
});
|
||||||
},
|
}
|
||||||
_ => continue
|
_ => continue,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// push diff where there's a new level or size has changed
|
// push diff where there's a new level or size has changed
|
||||||
for current_order in ¤t_l2_snapshot {
|
for current_order in current_bookside {
|
||||||
let peer = previous_l2_snapshot
|
let peer = previous_bookside
|
||||||
.iter()
|
.iter()
|
||||||
.find(|item| item.price == current_order.price);
|
.find(|item| item.price == current_order.price);
|
||||||
|
|
||||||
|
@ -180,9 +168,12 @@ fn publish_changes(
|
||||||
if previous_order.size == current_order.size {
|
if previous_order.size == current_order.size {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
debug!("size changed {} -> {}", previous_order.size, current_order.size);
|
debug!(
|
||||||
|
"size changed {} -> {}",
|
||||||
|
previous_order.size, current_order.size
|
||||||
|
);
|
||||||
update.push(current_order.clone());
|
update.push(current_order.clone());
|
||||||
},
|
}
|
||||||
None => {
|
None => {
|
||||||
debug!("new level {},{}", current_order.price, current_order.size);
|
debug!("new level {},{}", current_order.price, current_order.size);
|
||||||
update.push(current_order.clone())
|
update.push(current_order.clone())
|
||||||
|
@ -190,29 +181,22 @@ fn publish_changes(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match other_bookside {
|
match maybe_other_bookside {
|
||||||
Some(other_bookside) => {
|
Some(other_bookside) => {
|
||||||
let other_l2_snapshot = other_bookside
|
|
||||||
.iter_valid(time_now, oracle_price_lots)
|
|
||||||
.map(|item| (item.node.price_data() as i64, item.node.quantity))
|
|
||||||
.group_by(|(price, _)| *price)
|
|
||||||
.into_iter()
|
|
||||||
.map(|(price, group)| OrderbookLevel { price, size: group.map(|(_, quantity)| quantity).fold(0, |acc, x| acc + x)})
|
|
||||||
.collect();
|
|
||||||
let (bids, asks) = match side {
|
let (bids, asks) = match side {
|
||||||
OrderbookSide::Bid => (current_l2_snapshot, other_l2_snapshot),
|
OrderbookSide::Bid => (current_bookside, other_bookside),
|
||||||
OrderbookSide::Ask => (other_l2_snapshot, current_l2_snapshot)
|
OrderbookSide::Ask => (other_bookside, current_bookside),
|
||||||
};
|
};
|
||||||
orderbook_update_sender
|
orderbook_update_sender
|
||||||
.try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint {
|
.try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint {
|
||||||
slot,
|
slot,
|
||||||
write_version,
|
write_version,
|
||||||
bids,
|
bids: bids.clone(),
|
||||||
asks,
|
asks: asks.clone(),
|
||||||
market: mkt.0.to_string(),
|
market: mkt.0.to_string(),
|
||||||
}))
|
}))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
},
|
}
|
||||||
None => info!("other bookside not in cache"),
|
None => info!("other bookside not in cache"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,11 +214,100 @@ fn publish_changes(
|
||||||
}))
|
}))
|
||||||
.unwrap(); // TODO: use anyhow to bubble up error
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
metric_updates.increment();
|
metric_updates.increment();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn publish_changes_serum(
|
||||||
|
slot: u64,
|
||||||
|
write_version: u64,
|
||||||
|
mkt: &(Pubkey, MarketConfig),
|
||||||
|
side: OrderbookSide,
|
||||||
|
current_bookside: &Vec<OrderbookLevel>,
|
||||||
|
previous_bookside: &Vec<OrderbookLevel>,
|
||||||
|
maybe_other_bookside: Option<&Vec<OrderbookLevel>>,
|
||||||
|
orderbook_update_sender: &async_channel::Sender<OrderbookFilterMessage>,
|
||||||
|
metric_updates: &mut MetricU64,
|
||||||
|
) {
|
||||||
|
let mut update: Vec<OrderbookLevel> = vec![];
|
||||||
|
|
||||||
|
// push diff for levels that are no longer present
|
||||||
|
for previous_order in previous_bookside.iter() {
|
||||||
|
let peer = current_bookside
|
||||||
|
.iter()
|
||||||
|
.find(|level| previous_order.price == level.price);
|
||||||
|
|
||||||
|
match peer {
|
||||||
|
None => {
|
||||||
|
info!("level removed {}", previous_order.price);
|
||||||
|
update.push(OrderbookLevel {
|
||||||
|
price: previous_order.price,
|
||||||
|
size: 0f64,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_ => continue,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// push diff where there's a new level or size has changed
|
||||||
|
for current_order in current_bookside {
|
||||||
|
let peer = previous_bookside
|
||||||
|
.iter()
|
||||||
|
.find(|item| item.price == current_order.price);
|
||||||
|
|
||||||
|
match peer {
|
||||||
|
Some(previous_order) => {
|
||||||
|
if previous_order.size == current_order.size {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
debug!(
|
||||||
|
"size changed {} -> {}",
|
||||||
|
previous_order.size, current_order.size
|
||||||
|
);
|
||||||
|
update.push(current_order.clone());
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
debug!("new level {},{}", current_order.price, current_order.size);
|
||||||
|
update.push(current_order.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match maybe_other_bookside {
|
||||||
|
Some(other_bookside) => {
|
||||||
|
let (bids, asks) = match side {
|
||||||
|
OrderbookSide::Bid => (current_bookside, other_bookside),
|
||||||
|
OrderbookSide::Ask => (other_bookside, current_bookside),
|
||||||
|
};
|
||||||
|
orderbook_update_sender
|
||||||
|
.try_send(OrderbookFilterMessage::Checkpoint(OrderbookCheckpoint {
|
||||||
|
slot,
|
||||||
|
write_version,
|
||||||
|
bids: bids.clone(),
|
||||||
|
asks: asks.clone(),
|
||||||
|
market: mkt.0.to_string(),
|
||||||
|
}))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
None => info!("other bookside not in cache"),
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("diff {} {:?}", mkt.1.name, update);
|
||||||
|
if update.len() > 0 {
|
||||||
|
orderbook_update_sender
|
||||||
|
.try_send(OrderbookFilterMessage::Update(OrderbookUpdate {
|
||||||
|
market: mkt.0.to_string(),
|
||||||
|
side: side.clone(),
|
||||||
|
update,
|
||||||
|
slot,
|
||||||
|
write_version,
|
||||||
|
}))
|
||||||
|
.unwrap(); // TODO: use anyhow to bubble up error
|
||||||
|
metric_updates.increment();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn init(
|
pub async fn init(
|
||||||
market_configs: Vec<(Pubkey, MarketConfig)>,
|
market_configs: Vec<(Pubkey, MarketConfig)>,
|
||||||
|
serum_market_configs: Vec<(Pubkey, MarketConfig)>,
|
||||||
metrics_sender: Metrics,
|
metrics_sender: Metrics,
|
||||||
) -> anyhow::Result<(
|
) -> anyhow::Result<(
|
||||||
async_channel::Sender<AccountWrite>,
|
async_channel::Sender<AccountWrite>,
|
||||||
|
@ -261,12 +334,17 @@ pub async fn init(
|
||||||
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
|
||||||
|
|
||||||
let mut chain_cache = ChainData::new();
|
let mut chain_cache = ChainData::new();
|
||||||
let mut bookside_cache: HashMap<String, BookSide> = HashMap::new();
|
let mut bookside_cache: HashMap<String, Vec<OrderbookLevel>> = HashMap::new();
|
||||||
|
let mut serum_bookside_cache: HashMap<String, Vec<OrderbookLevel>> = HashMap::new();
|
||||||
let mut last_write_versions = HashMap::<String, (u64, u64)>::new();
|
let mut last_write_versions = HashMap::<String, (u64, u64)>::new();
|
||||||
|
|
||||||
let relevant_pubkeys = market_configs.iter().flat_map(|m| [m.1.bids, m.1.asks]).collect::<HashSet<Pubkey>>();
|
let relevant_pubkeys = [market_configs.clone(), serum_market_configs.clone()]
|
||||||
|
.concat()
|
||||||
|
.iter()
|
||||||
|
.flat_map(|m| [m.1.bids, m.1.asks])
|
||||||
|
.collect::<HashSet<Pubkey>>();
|
||||||
info!("relevant_pubkeys {:?}", relevant_pubkeys);
|
info!("relevant_pubkeys {:?}", relevant_pubkeys);
|
||||||
// update handling thread, reads both sloths and account updates
|
// update handling thread, reads both slots and account updates
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
@ -304,16 +382,8 @@ pub async fn init(
|
||||||
for mkt in market_configs.iter() {
|
for mkt in market_configs.iter() {
|
||||||
for side in 0..2 {
|
for side in 0..2 {
|
||||||
let mkt_pk = mkt.0;
|
let mkt_pk = mkt.0;
|
||||||
let side_pk = if side == 0 {
|
let side_pk = if side == 0 { mkt.1.bids } else { mkt.1.asks };
|
||||||
mkt.1.bids
|
let other_side_pk = if side == 0 { mkt.1.asks } else { mkt.1.bids };
|
||||||
} else {
|
|
||||||
mkt.1.asks
|
|
||||||
};
|
|
||||||
let other_side_pk = if side == 0 {
|
|
||||||
mkt.1.asks
|
|
||||||
} else {
|
|
||||||
mkt.1.bids
|
|
||||||
};
|
|
||||||
let last_write_version = last_write_versions
|
let last_write_version = last_write_versions
|
||||||
.get(&side_pk.to_string())
|
.get(&side_pk.to_string())
|
||||||
.unwrap_or(&(0, 0));
|
.unwrap_or(&(0, 0));
|
||||||
|
@ -332,6 +402,28 @@ pub async fn init(
|
||||||
let account = &account_info.account;
|
let account = &account_info.account;
|
||||||
let bookside =
|
let bookside =
|
||||||
BookSide::try_deserialize(account.data().borrow_mut()).unwrap();
|
BookSide::try_deserialize(account.data().borrow_mut()).unwrap();
|
||||||
|
let side = match bookside.nodes.order_tree_type() {
|
||||||
|
OrderTreeType::Bids => OrderbookSide::Bid,
|
||||||
|
OrderTreeType::Asks => OrderbookSide::Ask,
|
||||||
|
};
|
||||||
|
let time_now = SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs();
|
||||||
|
let oracle_price_lots = 0; // todo: does this matter? where to find it?
|
||||||
|
let bookside = bookside
|
||||||
|
.iter_valid(time_now, oracle_price_lots)
|
||||||
|
.map(|item| (item.node.price_data() as i64, item.node.quantity))
|
||||||
|
.group_by(|(price, _)| *price)
|
||||||
|
.into_iter()
|
||||||
|
.map(|(price, group)| OrderbookLevel {
|
||||||
|
price: native_to_ui(price, mkt.1.quote_decimals),
|
||||||
|
size: native_to_ui(group
|
||||||
|
.map(|(_, quantity)| quantity)
|
||||||
|
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
let other_bookside = bookside_cache.get(&other_side_pk.to_string());
|
let other_bookside = bookside_cache.get(&other_side_pk.to_string());
|
||||||
|
|
||||||
match bookside_cache.get(&side_pk_string) {
|
match bookside_cache.get(&side_pk_string) {
|
||||||
|
@ -339,6 +431,7 @@ pub async fn init(
|
||||||
account_info.slot,
|
account_info.slot,
|
||||||
account_info.write_version,
|
account_info.write_version,
|
||||||
mkt,
|
mkt,
|
||||||
|
side,
|
||||||
&bookside,
|
&bookside,
|
||||||
&old_bookside,
|
&old_bookside,
|
||||||
other_bookside,
|
other_bookside,
|
||||||
|
@ -349,11 +442,82 @@ pub async fn init(
|
||||||
}
|
}
|
||||||
|
|
||||||
bookside_cache.insert(side_pk_string.clone(), bookside.clone());
|
bookside_cache.insert(side_pk_string.clone(), bookside.clone());
|
||||||
}
|
}
|
||||||
Err(_) => info!("chain_cache could not find {}", mkt_pk),
|
Err(_) => info!("chain_cache could not find {}", mkt_pk),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for mkt in serum_market_configs.iter() {
|
||||||
|
for side in 0..2 {
|
||||||
|
let side_pk = if side == 0 { mkt.1.bids } else { mkt.1.asks };
|
||||||
|
let other_side_pk = if side == 0 { mkt.1.asks } else { mkt.1.bids };
|
||||||
|
let last_write_version = last_write_versions
|
||||||
|
.get(&side_pk.to_string())
|
||||||
|
.unwrap_or(&(0, 0));
|
||||||
|
|
||||||
|
match chain_cache.account(&side_pk) {
|
||||||
|
Ok(account_info) => {
|
||||||
|
let side_pk_string = side_pk.to_string();
|
||||||
|
|
||||||
|
let write_version = (account_info.slot, account_info.write_version);
|
||||||
|
// todo: should this be <= so we don't overwrite with old data received late?
|
||||||
|
if write_version == *last_write_version {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
last_write_versions.insert(side_pk_string.clone(), write_version);
|
||||||
|
|
||||||
|
let account = &mut account_info.account.clone();
|
||||||
|
let data = account.data_as_mut_slice();
|
||||||
|
let len = data.len();
|
||||||
|
info!("side pk {} side {}", side_pk, side);
|
||||||
|
let inner = &mut data[5..len - 7];
|
||||||
|
let slab = Slab::new(&mut inner[size_of::<OrderBookStateHeader>()..]);
|
||||||
|
|
||||||
|
let bookside: Vec<OrderbookLevel> = slab
|
||||||
|
.iter(side == 0)
|
||||||
|
.map(|item| (u64::from(item.price()) as i64, item.quantity() as i64))
|
||||||
|
.group_by(|(price, _)| *price)
|
||||||
|
.into_iter()
|
||||||
|
.map(|(price, group)| OrderbookLevel {
|
||||||
|
price: native_to_ui(price, mkt.1.quote_decimals),
|
||||||
|
size: native_to_ui(group
|
||||||
|
.map(|(_, quantity)| quantity)
|
||||||
|
.fold(0, |acc, x| acc + x), mkt.1.base_decimals),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let other_bookside =
|
||||||
|
serum_bookside_cache.get(&other_side_pk.to_string());
|
||||||
|
|
||||||
|
match serum_bookside_cache.get(&side_pk_string) {
|
||||||
|
Some(old_bookside) => publish_changes_serum(
|
||||||
|
account_info.slot,
|
||||||
|
account_info.write_version,
|
||||||
|
mkt,
|
||||||
|
if side == 0 {
|
||||||
|
OrderbookSide::Bid
|
||||||
|
} else {
|
||||||
|
OrderbookSide::Ask
|
||||||
|
},
|
||||||
|
&bookside,
|
||||||
|
old_bookside,
|
||||||
|
other_bookside,
|
||||||
|
&fill_update_sender,
|
||||||
|
&mut metric_events_new,
|
||||||
|
),
|
||||||
|
_ => info!("bookside_cache could not find {}", side_pk_string),
|
||||||
|
}
|
||||||
|
|
||||||
|
serum_bookside_cache.insert(
|
||||||
|
side_pk_string.clone(),
|
||||||
|
bookside,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(_) => info!("chain_cache could not find {}", side_pk),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,6 @@ bytemuck = "1.7.2"
|
||||||
|
|
||||||
mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
|
mango-v4 = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
|
||||||
client = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
|
client = { git = "ssh://git@github.com/blockworks-foundation/mango-v4", branch = "dev" }
|
||||||
serum_dex = { git = "ssh://git@github.com/openbook-dex/program", branch = "master" }
|
serum_dex = { git = "ssh://git@github.com/jup-ag/openbook-program", branch = "feat/expose-things" }
|
||||||
anchor-lang = "0.25.0"
|
anchor-lang = "0.25.0"
|
||||||
anchor-client = "0.25.0"
|
anchor-client = "0.25.0"
|
||||||
|
|
|
@ -13,17 +13,19 @@ use tokio::{
|
||||||
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
||||||
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig};
|
use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig, fill_event_filter::SerumFillCheckpoint};
|
||||||
use solana_geyser_connector_lib::{
|
use solana_geyser_connector_lib::{
|
||||||
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
|
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
|
||||||
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
|
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
|
||||||
};
|
};
|
||||||
|
|
||||||
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
|
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
|
||||||
|
type SerumCheckpointMap = Arc<Mutex<HashMap<String, SerumFillCheckpoint>>>;
|
||||||
type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>;
|
type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>;
|
||||||
|
|
||||||
async fn handle_connection_error(
|
async fn handle_connection_error(
|
||||||
checkpoint_map: CheckpointMap,
|
checkpoint_map: CheckpointMap,
|
||||||
|
serum_checkpoint_map: SerumCheckpointMap,
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
raw_stream: TcpStream,
|
raw_stream: TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
|
@ -32,7 +34,7 @@ async fn handle_connection_error(
|
||||||
) {
|
) {
|
||||||
metrics_opened_connections.clone().increment();
|
metrics_opened_connections.clone().increment();
|
||||||
|
|
||||||
let result = handle_connection(checkpoint_map, peer_map.clone(), raw_stream, addr).await;
|
let result = handle_connection(checkpoint_map, serum_checkpoint_map, peer_map.clone(), raw_stream, addr).await;
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
error!("connection {} error {}", addr, result.unwrap_err());
|
error!("connection {} error {}", addr, result.unwrap_err());
|
||||||
};
|
};
|
||||||
|
@ -44,6 +46,7 @@ async fn handle_connection_error(
|
||||||
|
|
||||||
async fn handle_connection(
|
async fn handle_connection(
|
||||||
checkpoint_map: CheckpointMap,
|
checkpoint_map: CheckpointMap,
|
||||||
|
serum_checkpoint_map: SerumCheckpointMap,
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
raw_stream: TcpStream,
|
raw_stream: TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
|
@ -59,6 +62,7 @@ async fn handle_connection(
|
||||||
info!("ws published: {}", addr);
|
info!("ws published: {}", addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo: add subscribe logic
|
||||||
// 2: send initial checkpoint
|
// 2: send initial checkpoint
|
||||||
{
|
{
|
||||||
let checkpoint_map_copy = checkpoint_map.lock().unwrap().clone();
|
let checkpoint_map_copy = checkpoint_map.lock().unwrap().clone();
|
||||||
|
@ -170,9 +174,11 @@ async fn main() -> anyhow::Result<()> {
|
||||||
fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?;
|
fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?;
|
||||||
|
|
||||||
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
|
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
|
||||||
|
let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new()));
|
||||||
let peers = PeerMap::new(Mutex::new(HashMap::new()));
|
let peers = PeerMap::new(Mutex::new(HashMap::new()));
|
||||||
|
|
||||||
let checkpoints_ref_thread = checkpoints.clone();
|
let checkpoints_ref_thread = checkpoints.clone();
|
||||||
|
let serum_checkpoints_ref_thread = serum_checkpoints.clone();
|
||||||
let peers_ref_thread = peers.clone();
|
let peers_ref_thread = peers.clone();
|
||||||
|
|
||||||
// filleventfilter websocket sink
|
// filleventfilter websocket sink
|
||||||
|
@ -202,6 +208,27 @@ async fn main() -> anyhow::Result<()> {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(checkpoint.queue.clone(), checkpoint);
|
.insert(checkpoint.queue.clone(), checkpoint);
|
||||||
}
|
}
|
||||||
|
FillEventFilterMessage::SerumUpdate(update) => {
|
||||||
|
debug!("ws update {} {:?} serum fill", update.market, update.status);
|
||||||
|
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
||||||
|
for (k, v) in peer_copy.iter_mut() {
|
||||||
|
trace!(" > {}", k);
|
||||||
|
let json = serde_json::to_string(&update);
|
||||||
|
let result = v.send(Message::Text(json.unwrap())).await;
|
||||||
|
if result.is_err() {
|
||||||
|
error!(
|
||||||
|
"ws update {} {:?} serum fill could not reach {}",
|
||||||
|
update.market, update.status, k
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
FillEventFilterMessage::SerumCheckpoint(checkpoint) => {
|
||||||
|
serum_checkpoints_ref_thread
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.insert(checkpoint.queue.clone(), checkpoint);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -214,6 +241,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
while let Ok((stream, addr)) = listener.accept().await {
|
while let Ok((stream, addr)) = listener.accept().await {
|
||||||
tokio::spawn(handle_connection_error(
|
tokio::spawn(handle_connection_error(
|
||||||
checkpoints.clone(),
|
checkpoints.clone(),
|
||||||
|
serum_checkpoints.clone(),
|
||||||
peers.clone(),
|
peers.clone(),
|
||||||
stream,
|
stream,
|
||||||
addr,
|
addr,
|
||||||
|
@ -234,7 +262,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
);
|
);
|
||||||
let use_geyser = true;
|
let use_geyser = true;
|
||||||
let filter_config = FilterConfig {
|
let filter_config = FilterConfig {
|
||||||
program_ids: vec!["abc123".into()]
|
program_ids: vec![
|
||||||
|
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
|
||||||
|
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
|
||||||
|
],
|
||||||
};
|
};
|
||||||
if use_geyser {
|
if use_geyser {
|
||||||
grpc_plugin_source::process_events(
|
grpc_plugin_source::process_events(
|
||||||
|
|
|
@ -7,8 +7,7 @@ use client::{Client, MangoGroupContext};
|
||||||
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
use futures_channel::mpsc::{unbounded, UnboundedSender};
|
||||||
use futures_util::{
|
use futures_util::{
|
||||||
future::{self, Ready},
|
future::{self, Ready},
|
||||||
pin_mut,
|
pin_mut, SinkExt, StreamExt, TryStreamExt,
|
||||||
SinkExt, StreamExt, TryStreamExt,
|
|
||||||
};
|
};
|
||||||
use log::*;
|
use log::*;
|
||||||
use std::{
|
use std::{
|
||||||
|
@ -27,15 +26,15 @@ use tokio::{
|
||||||
};
|
};
|
||||||
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
||||||
|
|
||||||
use serde::{Deserialize};
|
use serde::Deserialize;
|
||||||
|
use solana_geyser_connector_lib::{
|
||||||
|
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
|
||||||
|
};
|
||||||
use solana_geyser_connector_lib::{
|
use solana_geyser_connector_lib::{
|
||||||
metrics::{MetricType, MetricU64},
|
metrics::{MetricType, MetricU64},
|
||||||
orderbook_filter::{self, MarketConfig, OrderbookCheckpoint, OrderbookFilterMessage},
|
orderbook_filter::{self, MarketConfig, OrderbookCheckpoint, OrderbookFilterMessage},
|
||||||
FilterConfig, StatusResponse,
|
FilterConfig, StatusResponse,
|
||||||
};
|
};
|
||||||
use solana_geyser_connector_lib::{
|
|
||||||
grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
type CheckpointMap = Arc<Mutex<HashMap<String, OrderbookCheckpoint>>>;
|
type CheckpointMap = Arc<Mutex<HashMap<String, OrderbookCheckpoint>>>;
|
||||||
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
|
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Peer>>>;
|
||||||
|
@ -81,7 +80,7 @@ pub struct Config {
|
||||||
async fn handle_connection_error(
|
async fn handle_connection_error(
|
||||||
checkpoint_map: CheckpointMap,
|
checkpoint_map: CheckpointMap,
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
market_ids: Vec<String>,
|
market_ids: HashMap<String, String>,
|
||||||
raw_stream: TcpStream,
|
raw_stream: TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
metrics_opened_connections: MetricU64,
|
metrics_opened_connections: MetricU64,
|
||||||
|
@ -109,7 +108,7 @@ async fn handle_connection_error(
|
||||||
async fn handle_connection(
|
async fn handle_connection(
|
||||||
checkpoint_map: CheckpointMap,
|
checkpoint_map: CheckpointMap,
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
market_ids: Vec<String>,
|
market_ids: HashMap<String, String>,
|
||||||
raw_stream: TcpStream,
|
raw_stream: TcpStream,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
@ -153,7 +152,7 @@ fn handle_commands(
|
||||||
msg: Message,
|
msg: Message,
|
||||||
peer_map: PeerMap,
|
peer_map: PeerMap,
|
||||||
checkpoint_map: CheckpointMap,
|
checkpoint_map: CheckpointMap,
|
||||||
market_ids: Vec<String>,
|
market_ids: HashMap<String, String>,
|
||||||
) -> Ready<Result<(), Error>> {
|
) -> Ready<Result<(), Error>> {
|
||||||
let msg_str = msg.clone().into_text().unwrap();
|
let msg_str = msg.clone().into_text().unwrap();
|
||||||
let command: Result<Command, serde_json::Error> = serde_json::from_str(&msg_str);
|
let command: Result<Command, serde_json::Error> = serde_json::from_str(&msg_str);
|
||||||
|
@ -162,17 +161,20 @@ fn handle_commands(
|
||||||
match command {
|
match command {
|
||||||
Ok(Command::Subscribe(cmd)) => {
|
Ok(Command::Subscribe(cmd)) => {
|
||||||
let market_id = cmd.clone().market_id;
|
let market_id = cmd.clone().market_id;
|
||||||
let subscribed = peer.subscriptions.insert(market_id.clone());
|
match market_ids.get(&market_id) {
|
||||||
if !market_ids.contains(&market_id) {
|
None => {
|
||||||
let res = StatusResponse {
|
let res = StatusResponse {
|
||||||
success: false,
|
success: false,
|
||||||
message: "market not found",
|
message: "market not found",
|
||||||
};
|
};
|
||||||
peer.sender
|
peer.sender
|
||||||
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
|
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
return future::ok(());
|
return future::ok(());
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
}
|
}
|
||||||
|
let subscribed = peer.subscriptions.insert(market_id.clone());
|
||||||
|
|
||||||
let res = if subscribed {
|
let res = if subscribed {
|
||||||
StatusResponse {
|
StatusResponse {
|
||||||
|
@ -221,13 +223,13 @@ fn handle_commands(
|
||||||
peer.sender
|
peer.sender
|
||||||
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
|
.unbounded_send(Message::Text(serde_json::to_string(&res).unwrap()))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
},
|
}
|
||||||
Ok(Command::GetMarkets) => {
|
Ok(Command::GetMarkets) => {
|
||||||
info!("getMarkets");
|
info!("getMarkets");
|
||||||
peer.sender
|
peer.sender
|
||||||
.unbounded_send(Message::Text(serde_json::to_string(&market_ids).unwrap()))
|
.unbounded_send(Message::Text(serde_json::to_string(&market_ids).unwrap()))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
},
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
info!("error deserializing user input {:?}", err);
|
info!("error deserializing user input {:?}", err);
|
||||||
let res = StatusResponse {
|
let res = StatusResponse {
|
||||||
|
@ -287,31 +289,65 @@ async fn main() -> anyhow::Result<()> {
|
||||||
Pubkey::from_str(&config.mango_group).unwrap(),
|
Pubkey::from_str(&config.mango_group).unwrap(),
|
||||||
client.cluster.clone(),
|
client.cluster.clone(),
|
||||||
client.commitment,
|
client.commitment,
|
||||||
)?);
|
)?);
|
||||||
|
|
||||||
// todo: reload markets at intervals
|
// todo: reload markets at intervals
|
||||||
let market_pubkey_strings: Vec<String> = group_context
|
|
||||||
.perp_markets
|
|
||||||
.iter()
|
|
||||||
.map(|(_, context)| context.address.to_string())
|
|
||||||
.collect();
|
|
||||||
let market_configs: Vec<(Pubkey, MarketConfig)> = group_context
|
let market_configs: Vec<(Pubkey, MarketConfig)> = group_context
|
||||||
.perp_markets
|
.perp_markets
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, context)| {
|
.map(|(_, context)| {
|
||||||
|
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index) {
|
||||||
|
Some(token) => token.decimals,
|
||||||
|
None => panic!("token not found for market") // todo: default to 6 for usdc?
|
||||||
|
};
|
||||||
(
|
(
|
||||||
context.address,
|
context.address,
|
||||||
MarketConfig {
|
MarketConfig {
|
||||||
name: context.market.name().to_owned(),
|
name: context.market.name().to_owned(),
|
||||||
bids: context.market.bids,
|
bids: context.market.bids,
|
||||||
asks: context.market.asks,
|
asks: context.market.asks,
|
||||||
|
base_decimals: context.market.base_decimals,
|
||||||
|
quote_decimals,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let serum_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
|
||||||
|
.serum3_markets
|
||||||
|
.iter()
|
||||||
|
.map(|(_, context)| {
|
||||||
|
let base_decimals = match group_context.tokens.get(&context.market.base_token_index) {
|
||||||
|
Some(token) => token.decimals,
|
||||||
|
None => panic!("token not found for market") // todo: default?
|
||||||
|
};
|
||||||
|
let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) {
|
||||||
|
Some(token) => token.decimals,
|
||||||
|
None => panic!("token not found for market") // todo: default to 6 for usdc?
|
||||||
|
};
|
||||||
|
(
|
||||||
|
context.address,
|
||||||
|
MarketConfig {
|
||||||
|
name: context.market.name().to_owned(),
|
||||||
|
bids: context.bids,
|
||||||
|
asks: context.asks,
|
||||||
|
base_decimals,
|
||||||
|
quote_decimals,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let market_pubkey_strings: HashMap<String, String> = [market_configs.clone(), serum_market_configs.clone()]
|
||||||
|
.concat()
|
||||||
|
.iter()
|
||||||
|
.map(|market| (market.0.to_string(), market.1.name.clone()))
|
||||||
|
.collect::<Vec<(String, String)>>()
|
||||||
|
.into_iter()
|
||||||
|
.collect();
|
||||||
|
|
||||||
let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) =
|
let (account_write_queue_sender, slot_queue_sender, orderbook_receiver) =
|
||||||
orderbook_filter::init(market_configs, metrics_tx.clone()).await?;
|
orderbook_filter::init(market_configs, serum_market_configs, metrics_tx.clone()).await?;
|
||||||
|
|
||||||
let checkpoints_ref_thread = checkpoints.clone();
|
let checkpoints_ref_thread = checkpoints.clone();
|
||||||
let peers_ref_thread = peers.clone();
|
let peers_ref_thread = peers.clone();
|
||||||
|
@ -381,7 +417,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let filter_config = FilterConfig {
|
let filter_config = FilterConfig {
|
||||||
program_ids: vec![
|
program_ids: vec![
|
||||||
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
|
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
|
||||||
//"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
|
"srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX".into(),
|
||||||
],
|
],
|
||||||
};
|
};
|
||||||
grpc_plugin_source::process_events(
|
grpc_plugin_source::process_events(
|
||||||
|
|
|
@ -288,11 +288,17 @@ async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
// start filling chain_data from the grpc plugin source
|
// start filling chain_data from the grpc plugin source
|
||||||
let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?;
|
let (account_write_queue_sender, slot_queue_sender) = memory_target::init(chain_data).await?;
|
||||||
|
let filter_config = FilterConfig {
|
||||||
|
program_ids: vec![
|
||||||
|
"4MangoMjqJ2firMokCjjGgoK8d4MXcrgL7XJaL3w6fVg".into(),
|
||||||
|
],
|
||||||
|
};
|
||||||
grpc_plugin_source::process_events(
|
grpc_plugin_source::process_events(
|
||||||
&config.source,
|
&config.source,
|
||||||
|
&filter_config,
|
||||||
account_write_queue_sender,
|
account_write_queue_sender,
|
||||||
slot_queue_sender,
|
slot_queue_sender,
|
||||||
metrics_tx,
|
metrics_tx.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue