diff --git a/lib/src/chain_data.rs b/lib/src/chain_data.rs index ccdd543..f4a3888 100644 --- a/lib/src/chain_data.rs +++ b/lib/src/chain_data.rs @@ -56,15 +56,15 @@ impl ChainData { account_versions_stored: 0, account_bytes_stored: 0, metric_accounts_stored: metrics_sender.register_u64( - "fills_feed_chaindata_accounts_stored".into(), + "chaindata_accounts_stored".into(), MetricType::Gauge, ), metric_account_versions_stored: metrics_sender.register_u64( - "fills_feed_chaindata_account_versions_stored".into(), + "chaindata_account_versions_stored".into(), MetricType::Gauge, ), metric_account_bytes_stored: metrics_sender.register_u64( - "fills_feed_chaindata_account_bytes_stored".into(), + "chaindata_account_bytes_stored".into(), MetricType::Gauge, ), } diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index f4c7f08..158c618 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -30,7 +30,6 @@ use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use serde::Deserialize; use solana_geyser_connector_lib::{ - fill_event_filter::SerumFillCheckpoint, metrics::{MetricType, MetricU64}, FilterConfig, StatusResponse, }; @@ -40,7 +39,6 @@ use solana_geyser_connector_lib::{ }; type CheckpointMap = Arc>>; -type SerumCheckpointMap = Arc>>; type PeerMap = Arc>>; // jemalloc seems to be better at keeping the memory footprint reasonable over @@ -79,7 +77,6 @@ pub struct Peer { async fn handle_connection_error( checkpoint_map: CheckpointMap, - serum_checkpoint_map: SerumCheckpointMap, peer_map: PeerMap, market_ids: HashMap, raw_stream: TcpStream, @@ -91,7 +88,6 @@ async fn handle_connection_error( let result = handle_connection( checkpoint_map, - serum_checkpoint_map, peer_map.clone(), market_ids, raw_stream, @@ -109,7 +105,6 @@ async fn handle_connection_error( async fn handle_connection( checkpoint_map: CheckpointMap, - serum_checkpoint_map: SerumCheckpointMap, peer_map: PeerMap, market_ids: HashMap, raw_stream: TcpStream, @@ -138,7 +133,6 @@ async fn handle_connection( msg, peer_map.clone(), checkpoint_map.clone(), - serum_checkpoint_map.clone(), market_ids.clone(), ), Message::Ping(_) => { @@ -167,7 +161,6 @@ fn handle_commands( msg: Message, peer_map: PeerMap, checkpoint_map: CheckpointMap, - serum_checkpoint_map: SerumCheckpointMap, market_ids: HashMap, ) -> Ready> { let msg_str = msg.clone().into_text().unwrap(); @@ -209,7 +202,6 @@ fn handle_commands( if subscribed { let checkpoint_map = checkpoint_map.lock().unwrap(); - let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap(); let checkpoint = checkpoint_map.get(&market_id); match checkpoint { Some(checkpoint) => { @@ -219,17 +211,8 @@ fn handle_commands( )) .unwrap(); } - None => match serum_checkpoint_map.get(&market_id) { - Some(checkpoint) => { - peer.sender - .unbounded_send(Message::Text( - serde_json::to_string(&checkpoint).unwrap(), - )) - .unwrap(); - } - None => info!("no checkpoint available on client subscription"), - }, - } + None => info!("no checkpoint available on client subscription"), + }; } } Ok(Command::Unsubscribe(cmd)) => { @@ -393,11 +376,9 @@ async fn main() -> anyhow::Result<()> { .await?; let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new())); - let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new())); let peers = PeerMap::new(Mutex::new(HashMap::new())); let checkpoints_ref_thread = checkpoints.clone(); - let serum_checkpoints_ref_thread = serum_checkpoints.clone(); let peers_ref_thread = peers.clone(); let peers_ref_thread1 = peers.clone(); @@ -408,7 +389,7 @@ async fn main() -> anyhow::Result<()> { let message = fill_receiver.recv().await.unwrap(); match message { FillEventFilterMessage::Update(update) => { - debug!("ws update {} {:?} fill", update.market, update.status); + debug!("ws update {} {:?} {:?} fill", update.market, update.status, update.event.event_type); let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); for (addr, peer) in peer_copy.iter_mut() { let json = serde_json::to_string(&update).unwrap(); @@ -428,27 +409,6 @@ async fn main() -> anyhow::Result<()> { .unwrap() .insert(checkpoint.queue.clone(), checkpoint); } - FillEventFilterMessage::SerumUpdate(update) => { - debug!("ws update {} {:?} serum fill", update.market, update.status); - let mut peers_copy = peers_ref_thread.lock().unwrap().clone(); - for (addr, peer) in peers_copy.iter_mut() { - let json = serde_json::to_string(&update).unwrap(); - - // only send updates if the peer is subscribed - if peer.subscriptions.contains(&update.market) { - let result = peer.sender.send(Message::Text(json)).await; - if result.is_err() { - error!("ws update {} fill could not reach {}", update.market, addr); - } - } - } - } - FillEventFilterMessage::SerumCheckpoint(checkpoint) => { - serum_checkpoints_ref_thread - .lock() - .unwrap() - .insert(checkpoint.queue.clone(), checkpoint); - } } } }); @@ -462,7 +422,6 @@ async fn main() -> anyhow::Result<()> { while let Ok((stream, addr)) = listener.accept().await { tokio::spawn(handle_connection_error( checkpoints.clone(), - serum_checkpoints.clone(), peers.clone(), market_pubkey_strings.clone(), stream,