cargo fmt

This commit is contained in:
Riordan Panayides 2022-10-05 23:53:20 +01:00
parent dbf28b74be
commit d14b4081aa
2 changed files with 11 additions and 13 deletions

View File

@ -1,6 +1,6 @@
use crate::{
chain_data::{AccountData, ChainData, SlotData},
AccountWrite, SlotUpdate, metrics
metrics, AccountWrite, SlotUpdate,
};
use log::*;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
@ -16,9 +16,9 @@ use std::{
str::FromStr,
};
use crate::metrics::MetricU64;
use arrayref::array_ref;
use mango::queue::{AnyEvent, EventQueueHeader, EventType, FillEvent};
use crate::metrics::MetricU64;
#[derive(Clone, Debug, Deserialize)]
pub struct MarketConfig {
@ -112,7 +112,7 @@ fn publish_changes(
fill_update_sender: &async_channel::Sender<FillEventFilterMessage>,
metric_events_new: &mut MetricU64,
metric_events_change: &mut MetricU64,
metric_events_drop: &mut MetricU64
metric_events_drop: &mut MetricU64,
) {
// seq_num = N means that events (N-QUEUE_LEN) until N-1 are available
let start_seq_num = max(old_seq_num, header.seq_num) - QUEUE_LEN;
@ -134,7 +134,6 @@ fn publish_changes(
metric_events_new.increment();
// new fills are published and recorded in checkpoint
if events[idx].event_type == EventType::Fill as u8 {
let fill: FillEvent = bytemuck::cast(events[idx]);
@ -347,7 +346,7 @@ pub async fn init(
&fill_update_sender,
&mut metric_events_new,
&mut metric_events_change,
&mut metrics_events_drop
&mut metrics_events_drop,
),
_ => info!("events_cache could not find {}", mkt.name),
},

View File

@ -9,11 +9,11 @@ use tokio::{
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize;
use solana_geyser_connector_lib::metrics::MetricU64;
use solana_geyser_connector_lib::{
fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage, MarketConfig},
grpc_plugin_source, metrics, websocket_source, SourceConfig,
};
use solana_geyser_connector_lib::metrics::MetricU64;
use crate::metrics::Metrics;
use warp::{Filter, Rejection, Reply};
@ -26,7 +26,7 @@ async fn handle_connection_error(
raw_stream: TcpStream,
addr: SocketAddr,
metrics_opened_connections: MetricU64,
metrics_closed_connections: MetricU64
metrics_closed_connections: MetricU64,
) {
metrics_opened_connections.clone().increment();
@ -134,15 +134,14 @@ async fn main() -> anyhow::Result<()> {
warp::serve(metrics_route).run(([0, 0, 0, 0], 9091)).await;
});
let metrics_opened_connections = metrics_tx.register_u64("fills_feed_opened_connections".into());
let metrics_opened_connections =
metrics_tx.register_u64("fills_feed_opened_connections".into());
let metrics_closed_connections = metrics_tx.register_u64("fills_feed_closed_connections".into());
let metrics_closed_connections =
metrics_tx.register_u64("fills_feed_closed_connections".into());
let (account_write_queue_sender, slot_queue_sender, fill_receiver) =
fill_event_filter::init(
config.markets.clone(),
metrics_tx.clone()
).await?;
fill_event_filter::init(config.markets.clone(), metrics_tx.clone()).await?;
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
let peers = PeerMap::new(Mutex::new(HashMap::new()));