Fills updates

* Unify fill event schema
* Change fill updates json
* Convert all native values to ui
* Add fills postgres target
This commit is contained in:
Riordan Panayides 2023-02-03 11:39:53 +00:00
parent 6dfe88ac15
commit 5c8ebc53b1
5 changed files with 594 additions and 877 deletions

View File

@ -1,9 +1,11 @@
use crate::{
chain_data::{AccountData, ChainData, SlotData},
metrics::{MetricType, Metrics},
AccountWrite, SlotUpdate, orderbook_filter::OrderbookSide,
orderbook_filter::{base_lots_to_ui_perp, price_lots_to_ui_perp, MarketConfig, OrderbookSide},
AccountWrite, SlotUpdate,
};
use bytemuck::{Pod, Zeroable, cast_slice};
use bytemuck::{cast_slice, Pod, Zeroable};
use chrono::{Utc, TimeZone};
use log::*;
use serde::{ser::SerializeStruct, Serialize, Serializer};
use serum_dex::state::EventView as SpotEvent;
@ -15,7 +17,9 @@ use solana_sdk::{
use std::{
borrow::BorrowMut,
cmp::max,
collections::{HashMap, HashSet}, convert::identity, time::SystemTime,
collections::{HashMap, HashSet},
convert::identity,
time::SystemTime,
};
use crate::metrics::MetricU64;
@ -25,18 +29,46 @@ use mango_v4::state::{
MAX_NUM_EVENTS,
};
#[derive(Clone, Copy, Debug, Serialize)]
#[derive(Clone, Copy, Debug)]
pub enum FillUpdateStatus {
New,
Revoke,
}
#[derive(Clone, Copy, Debug, Serialize)]
impl Serialize for FillUpdateStatus {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
FillUpdateStatus::New => {
serializer.serialize_unit_variant("FillUpdateStatus", 0, "new")
}
FillUpdateStatus::Revoke => {
serializer.serialize_unit_variant("FillUpdateStatus", 1, "revoke")
}
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum FillEventType {
Spot,
Perp,
}
impl Serialize for FillEventType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
FillEventType::Spot => serializer.serialize_unit_variant("FillEventType", 0, "spot"),
FillEventType::Perp => serializer.serialize_unit_variant("FillEventType", 1, "perp"),
}
}
}
#[derive(Clone, Debug)]
pub struct FillEvent {
pub event_type: FillEventType,
@ -48,8 +80,8 @@ pub struct FillEvent {
pub order_id: u128,
pub client_order_id: u64,
pub fee: f32,
pub price: i64,
pub quantity: i64,
pub price: f64,
pub quantity: f64,
}
impl Serialize for FillEvent {
@ -61,7 +93,7 @@ impl Serialize for FillEvent {
state.serialize_field("eventType", &self.event_type)?;
state.serialize_field("maker", &self.maker)?;
state.serialize_field("side", &self.side)?;
state.serialize_field("timestamp", &self.timestamp)?;
state.serialize_field("timestamp", &Utc.timestamp_opt(self.timestamp as i64, 0).unwrap().to_rfc3339())?;
state.serialize_field("seqNum", &self.seq_num)?;
state.serialize_field("owner", &self.owner)?;
state.serialize_field("orderId", &self.order_id)?;
@ -74,7 +106,7 @@ impl Serialize for FillEvent {
}
impl FillEvent {
pub fn new_from_perp(event: PerpFillEvent) -> [Self; 2] {
pub fn new_from_perp(event: PerpFillEvent, config: &MarketConfig) -> [Self; 2] {
let taker_side = match event.taker_side() {
Side::Ask => OrderbookSide::Ask,
Side::Bid => OrderbookSide::Bid,
@ -83,37 +115,62 @@ impl FillEvent {
Side::Ask => OrderbookSide::Bid,
Side::Bid => OrderbookSide::Ask,
};
[FillEvent {
event_type: FillEventType::Perp,
maker: true,
side: maker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.maker.to_string(),
order_id: event.maker_order_id,
client_order_id: 0u64,
fee: event.maker_fee.to_num(),
price: event.price,
quantity: event.quantity,
},
FillEvent {
event_type: FillEventType::Perp,
maker: false,
side: taker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.taker.to_string(),
order_id: event.taker_order_id,
client_order_id: event.taker_client_order_id,
fee: event.taker_fee.to_num(),
price: event.price,
quantity: event.quantity,
}]
let price = price_lots_to_ui_perp(
event.price,
config.base_decimals,
config.quote_decimals,
config.base_lot_size,
config.quote_lot_size,
);
let quantity =
base_lots_to_ui_perp(event.quantity, config.base_decimals, config.quote_decimals);
[
FillEvent {
event_type: FillEventType::Perp,
maker: true,
side: maker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.maker.to_string(),
order_id: event.maker_order_id,
client_order_id: 0u64,
fee: event.maker_fee.to_num(),
price: price,
quantity: quantity,
},
FillEvent {
event_type: FillEventType::Perp,
maker: false,
side: taker_side,
timestamp: event.timestamp,
seq_num: event.seq_num,
owner: event.taker.to_string(),
order_id: event.taker_order_id,
client_order_id: event.taker_client_order_id,
fee: event.taker_fee.to_num(),
price: price,
quantity: quantity,
},
]
}
pub fn new_from_spot(event: SpotEvent, timestamp: u64, seq_num: u64) -> Self {
pub fn new_from_spot(
event: SpotEvent,
timestamp: u64,
seq_num: u64,
config: &MarketConfig,
) -> Self {
match event {
SpotEvent::Fill { side, maker, native_qty_paid, native_qty_received, native_fee_or_rebate, order_id, owner, client_order_id, .. } => {
SpotEvent::Fill {
side,
maker,
native_qty_paid,
native_qty_received,
native_fee_or_rebate,
order_id,
owner,
client_order_id,
..
} => {
let side = match side as u8 {
0 => OrderbookSide::Bid,
1 => OrderbookSide::Ask,
@ -123,8 +180,42 @@ impl FillEvent {
Some(id) => id.into(),
None => 0u64,
};
// TODO: native to ui
let price = (native_qty_paid / native_qty_received) as i64;
let base_multiplier = 10u64.pow(config.base_decimals.into()) as u64;
let quote_multiplier = 10u64.pow(config.quote_decimals.into()) as u64;
let (price, quantity) = match side {
OrderbookSide::Bid => {
let price_before_fees = if maker {
native_qty_paid + native_fee_or_rebate
} else {
native_qty_paid - native_fee_or_rebate
};
let top = price_before_fees * base_multiplier;
let bottom = quote_multiplier * native_qty_received;
let price = top as f64 / bottom as f64;
let quantity = native_qty_received as f64 / base_multiplier as f64;
(price, quantity)
}
OrderbookSide::Ask => {
let price_before_fees = if maker {
native_qty_received - native_fee_or_rebate
} else {
native_qty_received + native_fee_or_rebate
};
let top = price_before_fees * base_multiplier;
let bottom = quote_multiplier * native_qty_paid;
let price = top as f64 / bottom as f64;
let quantity = native_qty_paid as f64 / base_multiplier as f64;
(price, quantity)
}
};
let fee =
native_fee_or_rebate as f32 / quote_multiplier as f32;
FillEvent {
event_type: FillEventType::Spot,
maker: maker,
@ -134,12 +225,14 @@ impl FillEvent {
owner: Pubkey::new(cast_slice(&identity(owner) as &[_])).to_string(),
order_id: order_id,
client_order_id: client_order_id,
fee: native_fee_or_rebate as f32,
fee,
price,
quantity: native_qty_received as i64,
quantity,
}
}
SpotEvent::Out { .. } => { panic!("Can't build FillEvent from SpotEvent::Out")}
SpotEvent::Out { .. } => {
panic!("Can't build FillEvent from SpotEvent::Out")
}
}
}
}
@ -148,8 +241,8 @@ impl FillEvent {
pub struct FillUpdate {
pub event: FillEvent,
pub status: FillUpdateStatus,
pub market: String,
pub queue: String,
pub market_key: String,
pub market_name: String,
pub slot: u64,
pub write_version: u64,
}
@ -170,13 +263,13 @@ impl Serialize for FillUpdate {
where
S: Serializer,
{
let mut state = serializer.serialize_struct("FillUpdate", 4)?;
let mut state = serializer.serialize_struct("FillUpdate", 6)?;
state.serialize_field("event", &self.event)?;
state.serialize_field("market", &self.market)?;
state.serialize_field("queue", &self.queue)?;
state.serialize_field("marketKey", &self.market_key)?;
state.serialize_field("marketName", &self.market_name)?;
state.serialize_field("status", &self.status)?;
state.serialize_field("slot", &self.slot)?;
state.serialize_field("write_version", &self.write_version)?;
state.serialize_field("writeVersion", &self.write_version)?;
state.end()
}
@ -218,7 +311,7 @@ type EventQueueEvents = [AnyEvent; MAX_NUM_EVENTS as usize];
fn publish_changes_perp(
slot: u64,
write_version: u64,
mkt: &(Pubkey, Pubkey),
mkt: &(Pubkey, MarketConfig),
header: &EventQueueHeader,
events: &EventQueueEvents,
old_seq_num: u64,
@ -234,7 +327,7 @@ fn publish_changes_perp(
.unwrap_or(0);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.1.to_string();
let evq_pk_string = mkt.1.event_queue.to_string();
for seq_num in start_seq_num..header.seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
@ -245,8 +338,8 @@ fn publish_changes_perp(
// the order of these checks is important so they are exhaustive
if seq_num >= old_seq_num {
debug!(
"found new event {} idx {} type {}",
mkt_pk_string, idx, events[idx].event_type as u32
"found new event {} idx {} type {} slot {} write_version {}",
mkt_pk_string, idx, events[idx].event_type as u32, slot, write_version
);
metric_events_new.increment();
@ -254,7 +347,7 @@ fn publish_changes_perp(
// new fills are published and recorded in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
// send event for both maker and taker
for fill in fills {
fill_update_sender
@ -263,8 +356,8 @@ fn publish_changes_perp(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -283,7 +376,7 @@ fn publish_changes_perp(
// first revoke old event if a fill
if old_events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -291,8 +384,8 @@ fn publish_changes_perp(
write_version,
event: fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -301,7 +394,7 @@ fn publish_changes_perp(
// then publish new if its a fill and record in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -309,8 +402,8 @@ fn publish_changes_perp(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -320,7 +413,7 @@ fn publish_changes_perp(
// every already published event is recorded in checkpoint if a fill
if events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
checkpoint.push(fill);
}
@ -332,15 +425,15 @@ fn publish_changes_perp(
for seq_num in header.seq_num..old_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
debug!(
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {} slot {} write_version {}",
mkt_pk_string, idx, seq_num, header.seq_num, old_seq_num, slot, write_version
);
metric_events_drop.increment();
if old_events[idx].event_type == EventType::Fill as u8 {
let fill: PerpFillEvent = bytemuck::cast(old_events[idx]);
let fills = FillEvent::new_from_perp(fill);
let fills = FillEvent::new_from_perp(fill, &mkt.1);
for fill in fills {
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -348,8 +441,8 @@ fn publish_changes_perp(
event: fill,
write_version,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -370,7 +463,7 @@ fn publish_changes_perp(
fn publish_changes_serum(
slot: u64,
write_version: u64,
mkt: &(Pubkey, Pubkey),
mkt: &(Pubkey, MarketConfig),
header: &SerumEventQueueHeader,
events: &[serum_dex::state::Event],
old_seq_num: u64,
@ -386,12 +479,15 @@ fn publish_changes_serum(
.unwrap_or(0);
let mut checkpoint = Vec::new();
let mkt_pk_string = mkt.0.to_string();
let evq_pk_string = mkt.1.to_string();
let evq_pk_string = mkt.1.event_queue.to_string();
let header_seq_num = header.seq_num;
debug!("start seq {} header seq {}", start_seq_num, header_seq_num);
// Timestamp for spot events is time scraped
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
for seq_num in start_seq_num..header_seq_num {
let idx = (seq_num % MAX_NUM_EVENTS as u64) as usize;
let event_view = events[idx].as_view().unwrap();
@ -404,7 +500,7 @@ fn publish_changes_serum(
// 2) the event is not matching the old event queue
// 3) all other events are matching the old event queue
// the order of these checks is important so they are exhaustive
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num);
let fill = FillEvent::new_from_spot(event_view, timestamp, seq_num, &mkt.1);
if seq_num >= old_seq_num {
debug!("found new serum fill {} idx {}", mkt_pk_string, idx,);
@ -415,8 +511,8 @@ fn publish_changes_serum(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -430,11 +526,15 @@ fn publish_changes_serum(
"found changed id event {} idx {} seq_num {} header seq num {} old seq num {}",
mkt_pk_string, idx, seq_num, header_seq_num, old_seq_num
);
metric_events_change.increment();
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
let old_fill = FillEvent::new_from_spot(
old_event_view,
timestamp,
seq_num,
&mkt.1,
);
// first revoke old event
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -442,11 +542,11 @@ fn publish_changes_serum(
write_version,
event: old_fill,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
// then publish new
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
@ -454,8 +554,8 @@ fn publish_changes_serum(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -478,8 +578,8 @@ fn publish_changes_serum(
write_version,
event: fill.clone(),
status: FillUpdateStatus::New,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
checkpoint.push(fill);
@ -503,15 +603,15 @@ fn publish_changes_serum(
match old_event_view {
SpotEvent::Fill { .. } => {
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num);
let old_fill = FillEvent::new_from_spot(old_event_view, timestamp, seq_num, &mkt.1);
fill_update_sender
.try_send(FillEventFilterMessage::Update(FillUpdate {
slot,
event: old_fill,
write_version,
status: FillUpdateStatus::Revoke,
market: mkt_pk_string.clone(),
queue: evq_pk_string.clone(),
market_key: mkt_pk_string.clone(),
market_name: mkt.1.name.clone(),
}))
.unwrap(); // TODO: use anyhow to bubble up error
}
@ -520,21 +620,19 @@ fn publish_changes_serum(
}
fill_update_sender
.try_send(FillEventFilterMessage::Checkpoint(
FillCheckpoint {
slot,
write_version,
events: checkpoint,
market: mkt_pk_string,
queue: evq_pk_string,
},
))
.try_send(FillEventFilterMessage::Checkpoint(FillCheckpoint {
slot,
write_version,
events: checkpoint,
market: mkt_pk_string,
queue: evq_pk_string,
}))
.unwrap()
}
pub async fn init(
perp_queue_pks: Vec<(Pubkey, Pubkey)>,
serum_queue_pks: Vec<(Pubkey, Pubkey)>,
perp_market_configs: Vec<(Pubkey, MarketConfig)>,
spot_market_configs: Vec<(Pubkey, MarketConfig)>,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
@ -549,8 +647,12 @@ pub async fn init(
metrics_sender.register_u64("fills_feed_events_new_serum".into(), MetricType::Counter);
let mut metric_events_change =
metrics_sender.register_u64("fills_feed_events_change".into(), MetricType::Counter);
let mut metric_events_change_serum =
metrics_sender.register_u64("fills_feed_events_change_serum".into(), MetricType::Counter);
let mut metrics_events_drop =
metrics_sender.register_u64("fills_feed_events_drop".into(), MetricType::Counter);
let mut metrics_events_drop_serum =
metrics_sender.register_u64("fills_feed_events_drop_serum".into(), MetricType::Counter);
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
@ -572,18 +674,24 @@ pub async fn init(
let mut seq_num_cache = HashMap::new();
let mut last_evq_versions = HashMap::<String, (u64, u64)>::new();
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
let relevant_pubkeys = all_queue_pks
let all_market_configs = [perp_market_configs.clone(), spot_market_configs.clone()].concat();
let perp_queue_pks: Vec<Pubkey> = perp_market_configs
.iter()
.map(|m| m.1)
.collect::<HashSet<Pubkey>>();
.map(|x| x.1.event_queue)
.collect();
let spot_queue_pks: Vec<Pubkey> = spot_market_configs
.iter()
.map(|x| x.1.event_queue)
.collect();
let all_queue_pks: HashSet<Pubkey> =
HashSet::from_iter([perp_queue_pks.clone(), spot_queue_pks.clone()].concat());
// update handling thread, reads both sloths and account updates
tokio::spawn(async move {
loop {
tokio::select! {
Ok(account_write) = account_write_queue_receiver_c.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) {
if !all_queue_pks.contains(&account_write.pubkey) {
continue;
}
@ -611,21 +719,38 @@ pub async fn init(
});
}
Err(e) = slot_queue_receiver.recv() => {
warn!("slot update channel err {:?}", e);
}
Err(e) = account_write_queue_receiver_c.recv() => {
warn!("write update channel err {:?}", e);
}
}
for mkt in all_queue_pks.iter() {
let last_evq_version = last_evq_versions.get(&mkt.1.to_string()).unwrap_or(&(0, 0));
let mkt_pk = mkt.1;
for mkt in all_market_configs.iter() {
let evq_pk = mkt.1.event_queue;
let evq_pk_string = evq_pk.to_string();
let last_evq_version = last_evq_versions
.get(&mkt.1.event_queue.to_string())
.unwrap_or(&(0, 0));
match chain_cache.account(&mkt_pk) {
match chain_cache.account(&evq_pk) {
Ok(account_info) => {
// only process if the account state changed
let evq_version = (account_info.slot, account_info.write_version);
let evq_pk_string = mkt.1.to_string();
trace!("evq {} write_version {:?}", evq_pk_string, evq_version);
if evq_version == *last_evq_version {
continue;
}
if evq_version.0 < last_evq_version.0 {
debug!("evq version slot was old");
continue;
}
if evq_version.0 == last_evq_version.0 && evq_version.1 < last_evq_version.1
{
info!("evq version slot was same and write version was old");
continue;
}
last_evq_versions.insert(evq_pk_string.clone(), evq_version);
let account = &account_info.account;
@ -633,17 +758,16 @@ pub async fn init(
if is_perp {
let event_queue =
EventQueue::try_deserialize(account.data().borrow_mut()).unwrap();
trace!(
"evq {} seq_num {}",
evq_pk_string,
event_queue.header.seq_num
info!(
"evq {} seq_num {} version {:?}",
evq_pk_string, event_queue.header.seq_num, evq_version,
);
match seq_num_cache.get(&evq_pk_string) {
Some(old_seq_num) => match perp_events_cache.get(&evq_pk_string) {
Some(old_events) => publish_changes_perp(
account_info.slot,
account_info.write_version,
mkt,
&mkt,
&event_queue.header,
&event_queue.buf,
*old_seq_num,
@ -690,14 +814,17 @@ pub async fn init(
old_events,
&fill_update_sender,
&mut metric_events_new_serum,
&mut metric_events_change,
&mut metrics_events_drop,
&mut metric_events_change_serum,
&mut metrics_events_drop_serum,
),
_ => {
info!("serum_events_cache could not find {}", evq_pk_string)
debug!(
"serum_events_cache could not find {}",
evq_pk_string
)
}
},
_ => info!("seq_num_cache could not find {}", evq_pk_string),
_ => debug!("seq_num_cache could not find {}", evq_pk_string),
}
seq_num_cache.insert(evq_pk_string.clone(), seq_num.clone());
@ -705,7 +832,7 @@ pub async fn init(
.insert(evq_pk_string.clone(), events.clone().to_vec());
}
}
Err(_) => info!("chain_cache could not find {}", mkt.1),
Err(_) => debug!("chain_cache could not find {}", mkt.1.event_queue),
}
}
}

View File

@ -0,0 +1,237 @@
use chrono::{TimeZone, Utc};
use log::*;
use native_tls::{Certificate, Identity, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use postgres_query::Caching;
use std::{env, fs, time::Duration};
use tokio_postgres::Client;
use crate::{fill_event_filter::FillUpdate, metrics::*, PostgresConfig};
async fn postgres_connection(
config: &PostgresConfig,
metric_retries: MetricU64,
metric_live: MetricU64,
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
let (tx, rx) = async_channel::unbounded();
// openssl pkcs12 -export -in client.cer -inkey client-key.cer -out client.pks
// base64 -i ca.cer -o ca.cer.b64 && base64 -i client.pks -o client.pks.b64
// fly secrets set PG_CA_CERT=- < ./ca.cer.b64 -a mango-fills
// fly secrets set PG_CLIENT_KEY=- < ./client.pks.b64 -a mango-fills
info!("making tls config");
let tls = match &config.tls {
Some(tls) => {
use base64::{engine::general_purpose, Engine as _};
let ca_cert = match &tls.ca_cert_path.chars().next().unwrap() {
'$' => general_purpose::STANDARD
.decode(
env::var(&tls.ca_cert_path[1..])
.expect("reading client cert from env")
.into_bytes(),
)
.expect("decoding client cert"),
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
};
let client_key = match &tls.client_key_path.chars().next().unwrap() {
'$' => general_purpose::STANDARD
.decode(
env::var(&tls.client_key_path[1..])
.expect("reading client key from env")
.into_bytes(),
)
.expect("decoding client key"),
_ => fs::read(&tls.client_key_path).expect("reading client key from file"),
};
MakeTlsConnector::new(
TlsConnector::builder()
.add_root_certificate(Certificate::from_pem(&ca_cert)?)
.identity(Identity::from_pkcs12(&client_key, "pass")?)
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
)
}
None => MakeTlsConnector::new(
TlsConnector::builder()
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
),
};
let config = config.clone();
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?);
let mut metric_retries = metric_retries;
let mut metric_live = metric_live;
tokio::spawn(async move {
loop {
let (client, connection) = match initial.take() {
Some(v) => v,
None => {
let result =
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
match result {
Ok(v) => v,
Err(err) => {
warn!("could not connect to postgres: {:?}", err);
tokio::time::sleep(Duration::from_secs(
config.retry_connection_sleep_secs,
))
.await;
continue;
}
}
}
};
tx.send(Some(client)).await.expect("send success");
metric_live.increment();
let result = connection.await;
metric_retries.increment();
metric_live.decrement();
tx.send(None).await.expect("send success");
warn!("postgres connection error: {:?}", result);
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
}
});
Ok(rx)
}
async fn update_postgres_client<'a>(
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
config: &PostgresConfig,
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
// get the most recent client, waiting if there's a disconnect
while !rx.is_empty() || client.is_none() {
tokio::select! {
client_raw_opt = rx.recv() => {
*client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new);
},
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
error!("waited too long for new postgres client");
std::process::exit(1);
},
}
}
client.as_ref().expect("must contain value")
}
async fn process_update(client: &Caching<Client>, update: &FillUpdate) -> anyhow::Result<()> {
let market = &update.market_key;
let seq_num = update.event.seq_num as i64;
let fill_timestamp = Utc.timestamp_opt(update.event.timestamp as i64, 0).unwrap();
let price = update.event.price as f64;
let quantity = update.event.quantity as f64;
let slot = update.slot as i64;
let write_version = update.write_version as i64;
let query = postgres_query::query!(
"INSERT INTO transactions_v4.perp_fills_feed_events
(market, seq_num, fill_timestamp, price,
quantity, slot, write_version)
VALUES
($market, $seq_num, $fill_timestamp, $price,
$quantity, $slot, $write_version)
ON CONFLICT (market, seq_num) DO NOTHING",
market,
seq_num,
fill_timestamp,
price,
quantity,
slot,
write_version,
);
let _ = query.execute(&client).await?;
Ok(())
}
pub async fn init(
config: &PostgresConfig,
metrics_sender: Metrics,
) -> anyhow::Result<async_channel::Sender<FillUpdate>> {
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (fill_update_queue_sender, fill_update_queue_receiver) =
async_channel::bounded::<FillUpdate>(config.max_queue_size);
let metric_con_retries = metrics_sender.register_u64(
"fills_postgres_connection_retries".into(),
MetricType::Counter,
);
let metric_con_live =
metrics_sender.register_u64("fills_postgres_connections_alive".into(), MetricType::Gauge);
// postgres fill update sending worker threads
for _ in 0..config.connection_count {
let postgres_account_writes =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let fill_update_queue_receiver_c = fill_update_queue_receiver.clone();
let config = config.clone();
let mut metric_retries =
metrics_sender.register_u64("fills_postgres_retries".into(), MetricType::Counter);
tokio::spawn(async move {
let mut client_opt = None;
loop {
// Retrieve up to batch_size updates
let mut batch = Vec::new();
batch.push(
fill_update_queue_receiver_c
.recv()
.await
.expect("sender must stay alive"),
);
while batch.len() < config.max_batch_size {
match fill_update_queue_receiver_c.try_recv() {
Ok(update) => batch.push(update),
Err(async_channel::TryRecvError::Empty) => break,
Err(async_channel::TryRecvError::Closed) => {
panic!("sender must stay alive")
}
};
}
info!(
"updates, batch {}, channel size {}",
batch.len(),
fill_update_queue_receiver_c.len(),
);
let mut error_count = 0;
loop {
let client =
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
.await;
let mut results = futures::future::join_all(
batch.iter().map(|update| process_update(client, update)),
)
.await;
let mut iter = results.iter();
batch.retain(|_| iter.next().unwrap().is_err());
if batch.len() > 0 {
metric_retries.add(batch.len() as u64);
error_count += 1;
if error_count - 1 < config.retry_query_max_count {
results.retain(|r| r.is_err());
warn!("failed to process fill update, retrying: {:?}", results);
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
.await;
continue;
} else {
error!("failed to process account write, exiting");
std::process::exit(1);
}
};
break;
}
}
});
}
Ok(fill_update_queue_sender)
}

View File

@ -1,21 +1,20 @@
pub mod chain_data;
pub mod fill_event_filter;
pub mod orderbook_filter;
pub mod fill_event_postgres_target;
pub mod grpc_plugin_source;
pub mod memory_target;
pub mod metrics;
pub mod orderbook_filter;
pub mod postgres_target;
pub mod postgres_types_numeric;
pub mod websocket_source;
pub use chain_data::SlotStatus;
use serde::{Serialize, Serializer, ser::SerializeStruct};
use serde::{ser::SerializeStruct, Serialize, Serializer};
use {
async_trait::async_trait,
serde_derive::Deserialize,
solana_sdk::{account::Account, pubkey::Pubkey},
std::sync::Arc,
};
trait AnyhowWrap {
@ -69,14 +68,12 @@ pub struct SlotUpdate {
#[derive(Clone, Debug, Deserialize)]
pub struct PostgresConfig {
pub connection_string: String,
/// Number of parallel postgres connections used for account write insertions
pub account_write_connection_count: u64,
/// Maximum batch size for account write inserts over one connection
pub account_write_max_batch_size: usize,
/// Max size of account write queues
pub account_write_max_queue_size: usize,
/// Number of parallel postgres connections used for slot insertions
pub slot_update_connection_count: u64,
/// Number of parallel postgres connections used for insertions
pub connection_count: u64,
/// Maximum batch size for inserts over one connection
pub max_batch_size: usize,
/// Max size of queues
pub max_queue_size: usize,
/// Number of queries retries before fatal error
pub retry_query_max_count: u64,
/// Seconds to sleep between query retries
@ -87,12 +84,15 @@ pub struct PostgresConfig {
pub fatal_connection_timeout_secs: u64,
/// Allow invalid TLS certificates, passed to native_tls danger_accept_invalid_certs
pub allow_invalid_certs: bool,
/// Name key to use in the monitoring table
pub monitoring_name: String,
/// Time between updates to the monitoring table
pub monitoring_update_interval_secs: u64,
/// Time between cleanup jobs (0 to disable)
pub cleanup_interval_secs: u64,
pub tls: Option<PostgresTlsConfig>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct PostgresTlsConfig {
/// CA Cert file or env var
pub ca_cert_path: String,
/// PKCS12 client cert path
pub client_key_path: String,
}
#[derive(Clone, Debug, Deserialize)]
@ -164,63 +164,3 @@ pub struct Config {
pub source: SourceConfig,
pub metrics: MetricsConfig,
}
#[async_trait]
pub trait AccountTable: Sync + Send {
fn table_name(&self) -> &str;
async fn insert_account_write(
&self,
client: &postgres_query::Caching<tokio_postgres::Client>,
account_write: &AccountWrite,
) -> anyhow::Result<()>;
}
pub type AccountTables = Vec<Arc<dyn AccountTable>>;
pub struct RawAccountTable {}
pub fn encode_address(addr: &Pubkey) -> String {
bs58::encode(&addr.to_bytes()).into_string()
}
#[async_trait]
impl AccountTable for RawAccountTable {
fn table_name(&self) -> &str {
"account_write"
}
async fn insert_account_write(
&self,
client: &postgres_query::Caching<tokio_postgres::Client>,
account_write: &AccountWrite,
) -> anyhow::Result<()> {
let pubkey = encode_address(&account_write.pubkey);
let owner = encode_address(&account_write.owner);
let slot = account_write.slot as i64;
let write_version = account_write.write_version as i64;
let lamports = account_write.lamports as i64;
let rent_epoch = account_write.rent_epoch as i64;
// TODO: should update for same write_version to work with websocket input
let query = postgres_query::query!(
"INSERT INTO account_write
(pubkey_id, slot, write_version, is_selected,
owner_id, lamports, executable, rent_epoch, data)
VALUES
(map_pubkey($pubkey), $slot, $write_version, $is_selected,
map_pubkey($owner), $lamports, $executable, $rent_epoch, $data)
ON CONFLICT (pubkey_id, slot, write_version) DO NOTHING",
pubkey,
slot,
write_version,
is_selected = account_write.is_selected,
owner,
lamports,
executable = account_write.executable,
rent_epoch,
data = account_write.data,
);
let _ = query.execute(client).await?;
Ok(())
}
}

View File

@ -1,637 +0,0 @@
use anyhow::Context;
use log::*;
use native_tls::TlsConnector;
use postgres_native_tls::MakeTlsConnector;
use postgres_query::{query, query_dyn};
use std::{collections::HashMap, convert::TryFrom, time::Duration};
use crate::{metrics::*, AccountTables, AccountWrite, PostgresConfig, SlotStatus, SlotUpdate};
mod pg {
#[derive(Clone, Copy, Debug, PartialEq, postgres_types::ToSql)]
pub enum SlotStatus {
Rooted,
Confirmed,
Processed,
}
impl From<super::SlotStatus> for SlotStatus {
fn from(status: super::SlotStatus) -> SlotStatus {
match status {
super::SlotStatus::Rooted => SlotStatus::Rooted,
super::SlotStatus::Confirmed => SlotStatus::Confirmed,
super::SlotStatus::Processed => SlotStatus::Processed,
}
}
}
}
async fn postgres_connection(
config: &PostgresConfig,
metric_retries: MetricU64,
metric_live: MetricU64,
) -> anyhow::Result<async_channel::Receiver<Option<tokio_postgres::Client>>> {
let (tx, rx) = async_channel::unbounded();
let tls = MakeTlsConnector::new(
TlsConnector::builder()
.danger_accept_invalid_certs(config.allow_invalid_certs)
.build()?,
);
let config = config.clone();
let mut initial = Some(tokio_postgres::connect(&config.connection_string, tls.clone()).await?);
let mut metric_retries = metric_retries;
let mut metric_live = metric_live;
tokio::spawn(async move {
loop {
let (client, connection) = match initial.take() {
Some(v) => v,
None => {
let result =
tokio_postgres::connect(&config.connection_string, tls.clone()).await;
match result {
Ok(v) => v,
Err(err) => {
warn!("could not connect to postgres: {:?}", err);
tokio::time::sleep(Duration::from_secs(
config.retry_connection_sleep_secs,
))
.await;
continue;
}
}
}
};
tx.send(Some(client)).await.expect("send success");
metric_live.increment();
let result = connection.await;
metric_retries.increment();
metric_live.decrement();
tx.send(None).await.expect("send success");
warn!("postgres connection error: {:?}", result);
tokio::time::sleep(Duration::from_secs(config.retry_connection_sleep_secs)).await;
}
});
Ok(rx)
}
async fn update_postgres_client<'a>(
client: &'a mut Option<postgres_query::Caching<tokio_postgres::Client>>,
rx: &async_channel::Receiver<Option<tokio_postgres::Client>>,
config: &PostgresConfig,
) -> &'a postgres_query::Caching<tokio_postgres::Client> {
// get the most recent client, waiting if there's a disconnect
while !rx.is_empty() || client.is_none() {
tokio::select! {
client_raw_opt = rx.recv() => {
*client = client_raw_opt.expect("not closed").map(postgres_query::Caching::new);
},
_ = tokio::time::sleep(Duration::from_secs(config.fatal_connection_timeout_secs)) => {
error!("waited too long for new postgres client");
std::process::exit(1);
},
}
}
client.as_ref().expect("must contain value")
}
async fn process_account_write(
client: &postgres_query::Caching<tokio_postgres::Client>,
write: &AccountWrite,
account_tables: &AccountTables,
) -> anyhow::Result<()> {
futures::future::try_join_all(
account_tables
.iter()
.map(|table| table.insert_account_write(client, write)),
)
.await?;
Ok(())
}
struct Slots {
// non-rooted only
slots: HashMap<u64, SlotUpdate>,
newest_processed_slot: Option<u64>,
newest_rooted_slot: Option<u64>,
}
#[derive(Default)]
struct SlotPreprocessing {
discard_duplicate: bool,
discard_old: bool,
new_processed_head: bool,
new_rooted_head: bool,
parent_update: bool,
}
impl Slots {
fn new() -> Self {
Self {
slots: HashMap::new(),
newest_processed_slot: None,
newest_rooted_slot: None,
}
}
fn add(&mut self, update: &SlotUpdate) -> SlotPreprocessing {
let mut result = SlotPreprocessing::default();
if let Some(previous) = self.slots.get_mut(&update.slot) {
if previous.status == update.status && previous.parent == update.parent {
result.discard_duplicate = true;
}
previous.status = update.status;
if update.parent.is_some() && previous.parent != update.parent {
previous.parent = update.parent;
result.parent_update = true;
}
} else if self.newest_rooted_slot.is_none()
|| update.slot > self.newest_rooted_slot.unwrap()
{
self.slots.insert(update.slot, update.clone());
} else {
result.discard_old = true;
}
if update.status == SlotStatus::Rooted {
let old_slots: Vec<u64> = self
.slots
.keys()
.filter(|s| **s <= update.slot)
.copied()
.collect();
for old_slot in old_slots {
self.slots.remove(&old_slot);
}
if self.newest_rooted_slot.is_none() || self.newest_rooted_slot.unwrap() < update.slot {
self.newest_rooted_slot = Some(update.slot);
result.new_rooted_head = true;
}
}
if self.newest_processed_slot.is_none() || self.newest_processed_slot.unwrap() < update.slot
{
self.newest_processed_slot = Some(update.slot);
result.new_processed_head = true;
}
result
}
}
fn make_cleanup_steps(tables: &Vec<String>) -> HashMap<String, String> {
let mut steps = HashMap::<String, String>::new();
// Delete all account writes that came before the newest rooted slot except
// for the newest rooted write for each pubkey.
// This could be older rooted writes or writes in uncled slots that came
// before the newest rooted slot.
//
// Also delete _all_ writes from before the newest snapshot, because these may
// be for deleted accounts where the deletion event was missed. Snapshots
// provide a new state for all live accounts, but don't tell us about deleted
// accounts.
//
// The way this is done, by taking the newest snapshot that's at least
// min_snapshot_age behind the newest rooted slot is a workaround: we don't know
// how long it'll take to insert snapshot data, but assume it'll be done by that
// time.
let min_snapshot_age = 300;
steps.extend(
tables
.iter()
.map(|table_name| {
let sql = format!(
"WITH
newest_rooted AS (
SELECT max(slot) AS newest_rooted_slot FROM slot WHERE status = 'Rooted'),
newest_snapshot AS (
SELECT max(slot) AS newest_snapshot_slot FROM account_write, newest_rooted
WHERE write_version = 0 AND slot + {min_snapshot_age} < newest_rooted_slot)
DELETE FROM {table} AS data
USING
newest_rooted,
newest_snapshot,
(SELECT DISTINCT ON(pubkey_id) pubkey_id, slot, write_version
FROM {table}
LEFT JOIN slot USING(slot)
CROSS JOIN newest_rooted
WHERE slot <= newest_rooted_slot AND (status = 'Rooted' OR status is NULL)
ORDER BY pubkey_id, slot DESC, write_version DESC
) newest_rooted_write
WHERE
data.pubkey_id = newest_rooted_write.pubkey_id AND (
data.slot < newest_snapshot_slot OR (
data.slot <= newest_rooted_slot
AND (data.slot != newest_rooted_write.slot OR data.write_version != newest_rooted_write.write_version)
)
)",
table = table_name,
min_snapshot_age = min_snapshot_age,
);
(format!("delete old writes in {}", table_name), sql)
})
.collect::<HashMap<String, String>>(),
);
// Delete information about older slots
steps.insert(
"delete old slots".into(),
"DELETE FROM slot
USING (SELECT max(slot) as newest_rooted_slot FROM slot WHERE status = 'Rooted') s
WHERE slot + 1000 < newest_rooted_slot"
.into(),
);
steps
}
#[derive(Clone)]
struct SlotsProcessing {}
impl SlotsProcessing {
fn new() -> Self {
Self {}
}
async fn process(
&self,
client: &postgres_query::Caching<tokio_postgres::Client>,
update: &SlotUpdate,
meta: &SlotPreprocessing,
) -> anyhow::Result<()> {
let slot = update.slot as i64;
let status: pg::SlotStatus = update.status.into();
if let Some(parent) = update.parent {
let parent = parent as i64;
let query = query!(
"INSERT INTO slot
(slot, parent, status, uncle)
VALUES
($slot, $parent, $status, FALSE)
ON CONFLICT (slot) DO UPDATE SET
parent=$parent, status=$status",
slot,
parent,
status,
);
let _ = query.execute(client).await.context("updating slot row")?;
} else {
let query = query!(
"INSERT INTO slot
(slot, parent, status, uncle)
VALUES
($slot, NULL, $status, FALSE)
ON CONFLICT (slot) DO UPDATE SET
status=$status",
slot,
status,
);
let _ = query.execute(client).await.context("updating slot row")?;
}
if meta.new_rooted_head {
let slot = update.slot as i64;
// Mark preceeding non-uncle slots as rooted
let query = query!(
"UPDATE slot SET status = 'Rooted'
WHERE slot < $newest_final_slot
AND (NOT uncle)
AND status != 'Rooted'",
newest_final_slot = slot
);
let _ = query
.execute(client)
.await
.context("updating preceding non-rooted slots")?;
}
if meta.new_processed_head || meta.parent_update {
// update the uncle column for the chain of slots from the
// newest down the the first rooted slot
let query = query!(
"WITH RECURSIVE
liveslots AS (
SELECT slot.*, 0 AS depth FROM slot
WHERE slot = (SELECT max(slot) FROM slot)
UNION ALL
SELECT s.*, depth + 1 FROM slot s
INNER JOIN liveslots l ON s.slot = l.parent
WHERE l.status != 'Rooted' AND depth < 1000
),
min_slot AS (SELECT min(slot) AS min_slot FROM liveslots)
UPDATE slot SET
uncle = NOT EXISTS (SELECT 1 FROM liveslots WHERE liveslots.slot = slot.slot)
FROM min_slot
WHERE slot >= min_slot;"
);
let _ = query
.execute(client)
.await
.context("recomputing slot uncle status")?;
}
trace!("slot update done {}", update.slot);
Ok(())
}
}
fn secs_since_epoch() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
}
fn epoch_secs_to_time(secs: u64) -> std::time::SystemTime {
std::time::UNIX_EPOCH + std::time::Duration::from_secs(secs)
}
pub async fn init(
config: &PostgresConfig,
account_tables: AccountTables,
metrics_sender: Metrics,
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
)> {
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
let (account_write_queue_sender, account_write_queue_receiver) =
async_channel::bounded::<AccountWrite>(config.account_write_max_queue_size);
// Slot updates flowing from the outside into the single processing thread. From
// there they'll flow into the postgres sending thread.
let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::<SlotUpdate>();
let (slot_inserter_sender, slot_inserter_receiver) =
async_channel::unbounded::<(SlotUpdate, SlotPreprocessing)>();
let metric_con_retries =
metrics_sender.register_u64("postgres_connection_retries".into(), MetricType::Counter);
let metric_con_live =
metrics_sender.register_u64("postgres_connections_alive".into(), MetricType::Gauge);
// postgres account write sending worker threads
for _ in 0..config.account_write_connection_count {
let postgres_account_writes =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let account_write_queue_receiver_c = account_write_queue_receiver.clone();
let account_tables_c = account_tables.clone();
let config = config.clone();
let mut metric_retries = metrics_sender
.register_u64("postgres_account_write_retries".into(), MetricType::Counter);
let mut metric_last_write = metrics_sender.register_u64(
"postgres_account_write_last_write_timestamp".into(),
MetricType::Gauge,
);
tokio::spawn(async move {
let mut client_opt = None;
loop {
// Retrieve up to batch_size account writes
let mut write_batch = Vec::new();
write_batch.push(
account_write_queue_receiver_c
.recv()
.await
.expect("sender must stay alive"),
);
while write_batch.len() < config.account_write_max_batch_size {
match account_write_queue_receiver_c.try_recv() {
Ok(write) => write_batch.push(write),
Err(async_channel::TryRecvError::Empty) => break,
Err(async_channel::TryRecvError::Closed) => {
panic!("sender must stay alive")
}
};
}
trace!(
"account write, batch {}, channel size {}",
write_batch.len(),
account_write_queue_receiver_c.len(),
);
let mut error_count = 0;
loop {
let client =
update_postgres_client(&mut client_opt, &postgres_account_writes, &config)
.await;
let mut results = futures::future::join_all(
write_batch
.iter()
.map(|write| process_account_write(client, &write, &account_tables_c)),
)
.await;
let mut iter = results.iter();
write_batch.retain(|_| iter.next().unwrap().is_err());
if write_batch.len() > 0 {
metric_retries.add(write_batch.len() as u64);
error_count += 1;
if error_count - 1 < config.retry_query_max_count {
results.retain(|r| r.is_err());
warn!("failed to process account write, retrying: {:?}", results);
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
.await;
continue;
} else {
error!("failed to process account write, exiting");
std::process::exit(1);
}
};
break;
}
metric_last_write.set_max(secs_since_epoch());
}
});
}
// slot update handling thread
let mut metric_slot_queue =
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
tokio::spawn(async move {
let mut slots = Slots::new();
loop {
let update = slot_queue_receiver
.recv()
.await
.expect("sender must stay alive");
trace!(
"slot update {}, channel size {}",
update.slot,
slot_queue_receiver.len()
);
// Check if we already know about the slot, or it is outdated
let slot_preprocessing = slots.add(&update);
if slot_preprocessing.discard_duplicate || slot_preprocessing.discard_old {
continue;
}
slot_inserter_sender
.send((update, slot_preprocessing))
.await
.expect("sending must succeed");
metric_slot_queue.set(slot_inserter_sender.len() as u64);
}
});
// postgres slot update worker threads
let slots_processing = SlotsProcessing::new();
for _ in 0..config.slot_update_connection_count {
let postgres_slot =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let receiver_c = slot_inserter_receiver.clone();
let config = config.clone();
let mut metric_retries =
metrics_sender.register_u64("postgres_slot_update_retries".into(), MetricType::Counter);
let mut metric_last_write = metrics_sender.register_u64(
"postgres_slot_last_write_timestamp".into(),
MetricType::Gauge,
);
let slots_processing = slots_processing.clone();
tokio::spawn(async move {
let mut client_opt = None;
loop {
let (update, preprocessing) =
receiver_c.recv().await.expect("sender must stay alive");
trace!("slot insertion, slot {}", update.slot);
let mut error_count = 0;
loop {
let client =
update_postgres_client(&mut client_opt, &postgres_slot, &config).await;
if let Err(err) = slots_processing
.process(client, &update, &preprocessing)
.await
{
metric_retries.increment();
error_count += 1;
if error_count - 1 < config.retry_query_max_count {
warn!("failed to process slot update, retrying: {:?}", err);
tokio::time::sleep(Duration::from_secs(config.retry_query_sleep_secs))
.await;
continue;
} else {
error!("failed to process slot update, exiting");
std::process::exit(1);
}
};
break;
}
metric_last_write.set_max(secs_since_epoch());
}
});
}
// postgres cleanup thread
if config.cleanup_interval_secs > 0 {
let table_names: Vec<String> = account_tables
.iter()
.map(|table| table.table_name().to_string())
.collect();
let cleanup_steps = make_cleanup_steps(&table_names);
let postgres_con =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let mut metric_last_cleanup = metrics_sender.register_u64(
"postgres_cleanup_last_success_timestamp".into(),
MetricType::Gauge,
);
let mut metric_cleanup_errors =
metrics_sender.register_u64("postgres_cleanup_errors".into(), MetricType::Counter);
let config = config.clone();
tokio::spawn(async move {
let mut client_opt = None;
loop {
tokio::time::sleep(Duration::from_secs(config.cleanup_interval_secs)).await;
let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await;
let mut all_successful = true;
for (name, cleanup_sql) in &cleanup_steps {
let query = query_dyn!(&cleanup_sql).unwrap();
if let Err(err) = query.execute(client).await {
warn!("failed to process cleanup step {}: {:?}", name, err);
metric_cleanup_errors.increment();
all_successful = false;
}
}
if all_successful {
metric_last_cleanup.set_max(secs_since_epoch());
}
}
});
}
// postgres metrics/monitoring thread
{
let postgres_con =
postgres_connection(config, metric_con_retries.clone(), metric_con_live.clone())
.await?;
let metric_slot_last_write = metrics_sender.register_u64(
"postgres_slot_last_write_timestamp".into(),
MetricType::Gauge,
);
let metric_account_write_last_write = metrics_sender.register_u64(
"postgres_account_write_last_write_timestamp".into(),
MetricType::Gauge,
);
let metric_account_queue =
metrics_sender.register_u64("account_write_queue".into(), MetricType::Gauge);
let metric_slot_queue =
metrics_sender.register_u64("slot_insert_queue".into(), MetricType::Gauge);
let config = config.clone();
tokio::spawn(async move {
let mut client_opt = None;
loop {
tokio::time::sleep(Duration::from_secs(config.monitoring_update_interval_secs))
.await;
let client = update_postgres_client(&mut client_opt, &postgres_con, &config).await;
let last_update = std::time::SystemTime::now();
let last_slot_write = epoch_secs_to_time(metric_slot_last_write.value());
let last_account_write_write =
epoch_secs_to_time(metric_account_write_last_write.value());
let slot_queue = i64::try_from(metric_slot_queue.value()).unwrap();
let account_write_queue = i64::try_from(metric_account_queue.value()).unwrap();
let query = query!(
"INSERT INTO monitoring
(name, last_update, last_slot_write, last_account_write_write, slot_queue, account_write_queue)
VALUES
($name, $last_update, $last_slot_write, $last_account_write_write, $slot_queue, $account_write_queue)
ON CONFLICT (name) DO UPDATE SET
last_update=$last_update,
last_slot_write=$last_slot_write,
last_account_write_write=$last_account_write_write,
slot_queue=$slot_queue,
account_write_queue=$account_write_queue
",
name = config.monitoring_name,
last_update,
last_slot_write,
last_account_write_write,
slot_queue,
account_write_queue,
);
if let Err(err) = query
.execute(client)
.await
.context("updating monitoring table")
{
warn!("failed to process monitoring update: {:?}", err);
};
}
});
}
Ok((account_write_queue_sender, slot_queue_sender))
}

View File

@ -1,9 +1,8 @@
use anchor_client::{
solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair},
solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair},
Cluster,
};
use anchor_lang::prelude::Pubkey;
use bytemuck::cast_slice;
use client::{Client, MangoGroupContext};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{
@ -13,7 +12,6 @@ use futures_util::{
use log::*;
use std::{
collections::{HashMap, HashSet},
convert::identity,
fs::File,
io::Read,
net::SocketAddr,
@ -31,7 +29,7 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize;
use solana_geyser_connector_lib::{
metrics::{MetricType, MetricU64},
FilterConfig, StatusResponse,
FilterConfig, StatusResponse, fill_event_postgres_target, PostgresConfig, fill_event_filter::FillEventType, orderbook_filter::MarketConfig, PostgresTlsConfig,
};
use solana_geyser_connector_lib::{
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage},
@ -308,44 +306,67 @@ async fn main() -> anyhow::Result<()> {
.await?,
);
// todo: reload markets at intervals
let perp_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.perp_markets
.iter()
.map(|(_, context)| {
let quote_decimals = match group_context.tokens.get(&context.market.settle_token_index)
{
Some(token) => token.decimals,
None => panic!("token not found for market"), // todo: default to 6 for usdc?
};
(
context.address,
MarketConfig {
name: context.market.name().to_owned(),
bids: context.market.bids,
asks: context.market.asks,
event_queue: context.market.event_queue,
base_decimals: context.market.base_decimals,
quote_decimals,
base_lot_size: context.market.base_lot_size,
quote_lot_size: context.market.quote_lot_size,
},
)
})
.collect();
let spot_market_configs: Vec<(Pubkey, MarketConfig)> = group_context
.serum3_markets
.iter()
.map(|(_, context)| {
let base_decimals = match group_context.tokens.get(&context.market.base_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market"), // todo: default?
};
let quote_decimals = match group_context.tokens.get(&context.market.quote_token_index) {
Some(token) => token.decimals,
None => panic!("token not found for market"), // todo: default to 6 for usdc?
};
(
context.market.serum_market_external,
MarketConfig {
name: context.market.name().to_owned(),
bids: context.bids,
asks: context.asks,
event_queue: context.event_q,
base_decimals,
quote_decimals,
base_lot_size: context.pc_lot_size as i64,
quote_lot_size: context.coin_lot_size as i64,
},
)
})
.collect();
let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context
.perp_markets
.iter()
.map(|(_, context)| (context.address, context.market.event_queue))
.collect();
let serum_market_pks: Vec<Pubkey> = group_context
.serum3_markets
.iter()
.map(|(_, context)| context.market.serum_market_external)
.collect();
let serum_market_ais = client
.rpc_async()
.get_multiple_accounts(serum_market_pks.as_slice())
.await?;
let serum_market_ais: Vec<&Account> = serum_market_ais
.iter()
.filter_map(|maybe_ai| match maybe_ai {
Some(ai) => Some(ai),
None => None,
})
.collect();
let serum_queue_pks: Vec<(Pubkey, Pubkey)> = serum_market_ais
.iter()
.enumerate()
.map(|pair| {
let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes(
&pair.1.data[5..5 + std::mem::size_of::<serum_dex::state::MarketState>()],
);
(
serum_market_pks[pair.0],
Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])),
)
})
.collect();
let spot_queue_pks: Vec<(Pubkey, Pubkey)> = spot_market_configs.iter().map(|x| (x.0, x.1.event_queue)).collect();
let a: Vec<(String, String)> = group_context
.serum3_markets
.iter()
@ -368,9 +389,27 @@ async fn main() -> anyhow::Result<()> {
.collect();
let market_pubkey_strings: HashMap<String, String> = [a, b].concat().into_iter().collect();
// TODO: read all this from config
let pgconf = PostgresConfig {
connection_string: "$PG_CONNECTION_STRING".to_owned(),
connection_count: 1,
max_batch_size: 1,
max_queue_size: 50_000,
retry_query_max_count: 10,
retry_query_sleep_secs: 2,
retry_connection_sleep_secs: 10,
fatal_connection_timeout_secs: 120,
allow_invalid_certs: true,
tls: Some(PostgresTlsConfig {
ca_cert_path: "$PG_CA_CERT".to_owned(),
client_key_path: "$PG_CLIENT_KEY".to_owned(),
})
};
let postgres_update_sender = fill_event_postgres_target::init(&pgconf, metrics_tx.clone()).await?;
let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init(
perp_queue_pks.clone(),
serum_queue_pks.clone(),
perp_market_configs.clone(),
spot_market_configs.clone(),
metrics_tx.clone(),
)
.await?;
@ -389,19 +428,30 @@ async fn main() -> anyhow::Result<()> {
let message = fill_receiver.recv().await.unwrap();
match message {
FillEventFilterMessage::Update(update) => {
debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type);
debug!("ws update {} {:?} {:?} fill", update.market_name, update.status, update.event.event_type);
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
let json = serde_json::to_string(&update.clone()).unwrap();
// only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market) {
if peer.subscriptions.contains(&update.market_key) {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!("ws update {} fill could not reach {}", update.market, addr);
error!("ws update {} fill could not reach {}", update.market_name, addr);
}
}
}
// send taker fills to db
let update_c = update.clone();
match update_c.event.event_type {
FillEventType::Perp => {
if !update_c.event.maker {
debug!("{:?}", update_c);
postgres_update_sender.send(update_c).await.unwrap();
}
}
_ => warn!("failed to write spot event to db")
}
}
FillEventFilterMessage::Checkpoint(checkpoint) => {
checkpoints_ref_thread
@ -461,7 +511,7 @@ async fn main() -> anyhow::Result<()> {
.collect::<String>()
);
let use_geyser = true;
let all_queue_pks = [perp_queue_pks.clone(), serum_queue_pks.clone()].concat();
let all_queue_pks = [perp_queue_pks.clone()].concat();
let relevant_pubkeys = all_queue_pks.iter().map(|m| m.1.to_string()).collect();
let filter_config = FilterConfig {
program_ids: vec![],