Aggregate t event metrics (#3)
* Aggregate t event metrics * Misc * Misc
This commit is contained in:
parent
cb95574b8a
commit
1bebc2f6bc
|
@ -1,3 +1,6 @@
|
||||||
/target
|
/target
|
||||||
**/*.pem
|
**/*.pem
|
||||||
**/config.toml
|
**/config.toml
|
||||||
|
.DS_Store
|
||||||
|
.idea/
|
||||||
|
*.pem
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
chain_data::{AccountData, ChainData, SlotData},
|
chain_data::{AccountData, ChainData, SlotData},
|
||||||
AccountWrite, SlotUpdate,
|
AccountWrite, SlotUpdate, metrics
|
||||||
};
|
};
|
||||||
use log::*;
|
use log::*;
|
||||||
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
|
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
|
||||||
|
@ -18,6 +18,7 @@ use std::{
|
||||||
|
|
||||||
use arrayref::array_ref;
|
use arrayref::array_ref;
|
||||||
use mango::queue::{AnyEvent, EventQueueHeader, EventType, FillEvent};
|
use mango::queue::{AnyEvent, EventQueueHeader, EventType, FillEvent};
|
||||||
|
use crate::metrics::MetricU64;
|
||||||
|
|
||||||
#[derive(Clone, Debug, Deserialize)]
|
#[derive(Clone, Debug, Deserialize)]
|
||||||
pub struct MarketConfig {
|
pub struct MarketConfig {
|
||||||
|
@ -109,6 +110,9 @@ fn publish_changes(
|
||||||
old_seq_num: usize,
|
old_seq_num: usize,
|
||||||
old_events: &EventQueueEvents,
|
old_events: &EventQueueEvents,
|
||||||
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
|
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
|
||||||
|
metric_events_new: &mut MetricU64,
|
||||||
|
metric_events_change: &mut MetricU64,
|
||||||
|
metric_events_drop: &mut MetricU64
|
||||||
) {
|
) {
|
||||||
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
|
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
|
||||||
let start_seq_num = max(old_seq_num, header.seq_num) - QUEUE_LEN;
|
let start_seq_num = max(old_seq_num, header.seq_num) - QUEUE_LEN;
|
||||||
|
@ -123,11 +127,14 @@ fn publish_changes(
|
||||||
// 3) all other events are matching the old event queue
|
// 3) all other events are matching the old event queue
|
||||||
// the order of these checks is important so they are exhaustive
|
// the order of these checks is important so they are exhaustive
|
||||||
if seq_num >= old_seq_num {
|
if seq_num >= old_seq_num {
|
||||||
info!(
|
debug!(
|
||||||
"found new event {} idx {} type {}",
|
"found new event {} idx {} type {}",
|
||||||
mkt.name, idx, events[idx].event_type as u32
|
mkt.name, idx, events[idx].event_type as u32
|
||||||
);
|
);
|
||||||
|
|
||||||
|
metric_events_new.increment();
|
||||||
|
|
||||||
|
|
||||||
// new fills are published and recorded in checkpoint
|
// new fills are published and recorded in checkpoint
|
||||||
if events[idx].event_type == EventType::Fill as u8 {
|
if events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: FillEvent = bytemuck::cast(events[idx]);
|
let fill: FillEvent = bytemuck::cast(events[idx]);
|
||||||
|
@ -146,11 +153,13 @@ fn publish_changes(
|
||||||
} else if old_events[idx].event_type != events[idx].event_type
|
} else if old_events[idx].event_type != events[idx].event_type
|
||||||
|| old_events[idx].padding != events[idx].padding
|
|| old_events[idx].padding != events[idx].padding
|
||||||
{
|
{
|
||||||
info!(
|
debug!(
|
||||||
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
|
"found changed event {} idx {} seq_num {} header seq num {} old seq num {}",
|
||||||
mkt.name, idx, seq_num, header.seq_num, old_seq_num
|
mkt.name, idx, seq_num, header.seq_num, old_seq_num
|
||||||
);
|
);
|
||||||
|
|
||||||
|
metric_events_change.increment();
|
||||||
|
|
||||||
// first revoke old event if a fill
|
// first revoke old event if a fill
|
||||||
if old_events[idx].event_type == EventType::Fill as u8 {
|
if old_events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: FillEvent = bytemuck::cast(old_events[idx]);
|
let fill: FillEvent = bytemuck::cast(old_events[idx]);
|
||||||
|
@ -193,11 +202,14 @@ fn publish_changes(
|
||||||
// in case queue size shrunk due to a fork we need revoke all previous fills
|
// in case queue size shrunk due to a fork we need revoke all previous fills
|
||||||
for seq_num in header.seq_num..old_seq_num {
|
for seq_num in header.seq_num..old_seq_num {
|
||||||
let idx = seq_num % QUEUE_LEN;
|
let idx = seq_num % QUEUE_LEN;
|
||||||
info!(
|
|
||||||
|
debug!(
|
||||||
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
|
"found dropped event {} idx {} seq_num {} header seq num {} old seq num {}",
|
||||||
mkt.name, idx, seq_num, header.seq_num, old_seq_num
|
mkt.name, idx, seq_num, header.seq_num, old_seq_num
|
||||||
);
|
);
|
||||||
|
|
||||||
|
metric_events_drop.increment();
|
||||||
|
|
||||||
if old_events[idx].event_type == EventType::Fill as u8 {
|
if old_events[idx].event_type == EventType::Fill as u8 {
|
||||||
let fill: FillEvent = bytemuck::cast(old_events[idx]);
|
let fill: FillEvent = bytemuck::cast(old_events[idx]);
|
||||||
fill_update_sender
|
fill_update_sender
|
||||||
|
@ -226,11 +238,18 @@ fn publish_changes(
|
||||||
|
|
||||||
pub async fn init(
|
pub async fn init(
|
||||||
markets: Vec<MarketConfig>,
|
markets: Vec<MarketConfig>,
|
||||||
|
metrics_sender: metrics::Metrics,
|
||||||
) -> anyhow::Result<(
|
) -> anyhow::Result<(
|
||||||
async_channel::Sender<AccountWrite>,
|
async_channel::Sender<AccountWrite>,
|
||||||
async_channel::Sender<SlotUpdate>,
|
async_channel::Sender<SlotUpdate>,
|
||||||
async_channel::Receiver<FillEventFilterMessage>,
|
async_channel::Receiver<FillEventFilterMessage>,
|
||||||
)> {
|
)> {
|
||||||
|
let metrics_sender = metrics_sender.clone();
|
||||||
|
|
||||||
|
let mut metric_events_new = metrics_sender.register_u64("fills_feed_events_new".into());
|
||||||
|
let mut metric_events_change = metrics_sender.register_u64("fills_feed_events_change".into());
|
||||||
|
let mut metrics_events_drop = metrics_sender.register_u64("fills_feed_events_drop".into());
|
||||||
|
|
||||||
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
// The actual message may want to also contain a retry count, if it self-reinserts on failure?
|
||||||
let (account_write_queue_sender, account_write_queue_receiver) =
|
let (account_write_queue_sender, account_write_queue_receiver) =
|
||||||
async_channel::unbounded::<AccountWrite>();
|
async_channel::unbounded::<AccountWrite>();
|
||||||
|
@ -326,6 +345,9 @@ pub async fn init(
|
||||||
*old_seq_num,
|
*old_seq_num,
|
||||||
old_events,
|
old_events,
|
||||||
&fill_update_sender,
|
&fill_update_sender,
|
||||||
|
&mut metric_events_new,
|
||||||
|
&mut metric_events_change,
|
||||||
|
&mut metrics_events_drop
|
||||||
),
|
),
|
||||||
_ => info!("events_cache could not find {}", mkt.name),
|
_ => info!("events_cache could not find {}", mkt.name),
|
||||||
},
|
},
|
||||||
|
|
|
@ -95,7 +95,10 @@ async fn main() -> anyhow::Result<()> {
|
||||||
let metrics_tx = metrics::start();
|
let metrics_tx = metrics::start();
|
||||||
|
|
||||||
let (account_write_queue_sender, slot_queue_sender, fill_receiver) =
|
let (account_write_queue_sender, slot_queue_sender, fill_receiver) =
|
||||||
fill_event_filter::init(config.markets.clone()).await?;
|
fill_event_filter::init(
|
||||||
|
config.markets.clone(),
|
||||||
|
metrics_tx.clone()
|
||||||
|
).await?;
|
||||||
|
|
||||||
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
|
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
|
||||||
let peers = PeerMap::new(Mutex::new(HashMap::new()));
|
let peers = PeerMap::new(Mutex::new(HashMap::new()));
|
||||||
|
@ -164,7 +167,7 @@ async fn main() -> anyhow::Result<()> {
|
||||||
&config.source,
|
&config.source,
|
||||||
account_write_queue_sender,
|
account_write_queue_sender,
|
||||||
slot_queue_sender,
|
slot_queue_sender,
|
||||||
metrics_tx,
|
metrics_tx.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue