diff --git a/service-mango-fills/src/main.rs b/service-mango-fills/src/main.rs index f6e6d4c..a41d288 100644 --- a/service-mango-fills/src/main.rs +++ b/service-mango-fills/src/main.rs @@ -1,19 +1,39 @@ -use anchor_client::{Cluster, solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair, account::Account}}; +use anchor_client::{ + solana_sdk::{account::Account, commitment_config::CommitmentConfig, signature::Keypair}, + Cluster, +}; use anchor_lang::prelude::Pubkey; use bytemuck::cast_slice; use client::{Client, MangoGroupContext}; use futures_channel::mpsc::{unbounded, UnboundedSender}; -use futures_util::{pin_mut, SinkExt, StreamExt, future::{self, Ready}, TryStreamExt}; +use futures_util::{ + future::{self, Ready}, + pin_mut, SinkExt, StreamExt, TryStreamExt, +}; use log::*; -use std::{collections::{HashMap, HashSet}, fs::File, io::Read, net::SocketAddr, sync::Arc, sync::Mutex, time::Duration, convert::identity, str::FromStr}; +use std::{ + collections::{HashMap, HashSet}, + convert::identity, + fs::File, + io::Read, + net::SocketAddr, + str::FromStr, + sync::Arc, + sync::Mutex, + time::Duration, +}; use tokio::{ net::{TcpListener, TcpStream}, - pin, + pin, time, }; use tokio_tungstenite::tungstenite::{protocol::Message, Error}; use serde::Deserialize; -use solana_geyser_connector_lib::{metrics::{MetricType, MetricU64}, FilterConfig, fill_event_filter::SerumFillCheckpoint, StatusResponse}; +use solana_geyser_connector_lib::{ + fill_event_filter::SerumFillCheckpoint, + metrics::{MetricType, MetricU64}, + FilterConfig, StatusResponse, +}; use solana_geyser_connector_lib::{ fill_event_filter::{self, FillCheckpoint, FillEventFilterMessage}, grpc_plugin_source, metrics, websocket_source, MetricsConfig, SourceConfig, @@ -64,7 +84,15 @@ async fn handle_connection_error( ) { metrics_opened_connections.clone().increment(); - let result = handle_connection(checkpoint_map, serum_checkpoint_map, peer_map.clone(), market_ids, raw_stream, addr).await; + let result = handle_connection( + checkpoint_map, + serum_checkpoint_map, + peer_map.clone(), + market_ids, + raw_stream, + addr, + ) + .await; if result.is_err() { error!("connection {} error {}", addr, result.unwrap_err()); }; @@ -84,6 +112,7 @@ async fn handle_connection( ) -> Result<(), Error> { info!("ws connected: {}", addr); let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?; + let (ws_tx, ws_rx) = ws_stream.split(); // 1: publish channel in peer map @@ -99,14 +128,26 @@ async fn handle_connection( } let receive_commands = ws_rx.try_for_each(|msg| { - handle_commands( - addr, - msg, - peer_map.clone(), - checkpoint_map.clone(), - serum_checkpoint_map.clone(), - market_ids.clone(), - ) + match msg { + Message::Text(_) => { + handle_commands( + addr, + msg, + peer_map.clone(), + checkpoint_map.clone(), + serum_checkpoint_map.clone(), + market_ids.clone(), + ) + }, + Message::Ping(_) => { + let peers = peer_map.clone(); + let mut peers_lock = peers.lock().unwrap(); + let peer = peers_lock.get_mut(&addr).expect("peer should be in map"); + peer.sender.unbounded_send(Message::Pong(Vec::new())).unwrap(); + future::ready(Ok(())) + } + _ => future::ready(Ok(())), + } }); let forward_updates = chan_rx.map(Ok).forward(ws_tx); @@ -164,7 +205,6 @@ fn handle_commands( .unwrap(); if subscribed { - // todo: this is janky af let checkpoint_map = checkpoint_map.lock().unwrap(); let serum_checkpoint_map = serum_checkpoint_map.lock().unwrap(); let checkpoint = checkpoint_map.get(&market_id); @@ -184,8 +224,8 @@ fn handle_commands( )) .unwrap(); } - None => info!("no checkpoint available on client subscription"), // todo: what to do here? - } + None => info!("no checkpoint available on client subscription"), + }, } } } @@ -274,10 +314,13 @@ async fn main() -> anyhow::Result<()> { &Keypair::new(), Some(rpc_timeout), ); - let group_context = Arc::new(MangoGroupContext::new_from_rpc( - &client.rpc_async(), - Pubkey::from_str(&config.mango_group).unwrap(), - ).await?); + let group_context = Arc::new( + MangoGroupContext::new_from_rpc( + &client.rpc_async(), + Pubkey::from_str(&config.mango_group).unwrap(), + ) + .await?, + ); let perp_queue_pks: Vec<(Pubkey, Pubkey)> = group_context .perp_markets @@ -293,7 +336,8 @@ async fn main() -> anyhow::Result<()> { let serum_market_ais = client .rpc_async() - .get_multiple_accounts(serum_market_pks.as_slice()).await?; + .get_multiple_accounts(serum_market_pks.as_slice()) + .await?; let serum_market_ais: Vec<&Account> = serum_market_ais .iter() .filter_map(|maybe_ai| match maybe_ai { @@ -309,25 +353,41 @@ async fn main() -> anyhow::Result<()> { let market_state: serum_dex::state::MarketState = *bytemuck::from_bytes( &pair.1.data[5..5 + std::mem::size_of::()], ); - (serum_market_pks[pair.0], Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_]))) + ( + serum_market_pks[pair.0], + Pubkey::new(cast_slice(&identity(market_state.event_q) as &[_])), + ) }) .collect(); let a: Vec<(String, String)> = group_context .serum3_markets .iter() - .map(|(_, context)| (context.market.serum_market_external.to_string(), context.market.name().to_owned())).collect(); + .map(|(_, context)| { + ( + context.market.serum_market_external.to_string(), + context.market.name().to_owned(), + ) + }) + .collect(); let b: Vec<(String, String)> = group_context .perp_markets .iter() - .map(|(_, context)| (context.address.to_string(), context.market.name().to_owned())).collect(); - let market_pubkey_strings: HashMap = [a, b] - .concat() - .into_iter() + .map(|(_, context)| { + ( + context.address.to_string(), + context.market.name().to_owned(), + ) + }) .collect(); + let market_pubkey_strings: HashMap = [a, b].concat().into_iter().collect(); - let (account_write_queue_sender, slot_queue_sender, fill_receiver) = - fill_event_filter::init(perp_queue_pks.clone(), serum_queue_pks.clone(), metrics_tx.clone()).await?; + let (account_write_queue_sender, slot_queue_sender, fill_receiver) = fill_event_filter::init( + perp_queue_pks.clone(), + serum_queue_pks.clone(), + metrics_tx.clone(), + ) + .await?; let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new())); let serum_checkpoints = SerumCheckpointMap::new(Mutex::new(HashMap::new())); @@ -336,6 +396,7 @@ async fn main() -> anyhow::Result<()> { let checkpoints_ref_thread = checkpoints.clone(); let serum_checkpoints_ref_thread = serum_checkpoints.clone(); let peers_ref_thread = peers.clone(); + let peers_ref_thread1 = peers.clone(); // filleventfilter websocket sink tokio::spawn(async move { @@ -348,15 +409,12 @@ async fn main() -> anyhow::Result<()> { let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); for (addr, peer) in peer_copy.iter_mut() { let json = serde_json::to_string(&update).unwrap(); - + // only send updates if the peer is subscribed if peer.subscriptions.contains(&update.market) { let result = peer.sender.send(Message::Text(json)).await; if result.is_err() { - error!( - "ws update {} fill could not reach {}", - update.market, addr - ); + error!("ws update {} fill could not reach {}", update.market, addr); } } } @@ -369,18 +427,15 @@ async fn main() -> anyhow::Result<()> { } FillEventFilterMessage::SerumUpdate(update) => { debug!("ws update {} {:?} serum fill", update.market, update.status); - let mut peer_copy = peers_ref_thread.lock().unwrap().clone(); - for (addr, peer) in peer_copy.iter_mut() { + let mut peers_copy = peers_ref_thread.lock().unwrap().clone(); + for (addr, peer) in peers_copy.iter_mut() { let json = serde_json::to_string(&update).unwrap(); - + // only send updates if the peer is subscribed if peer.subscriptions.contains(&update.market) { let result = peer.sender.send(Message::Text(json)).await; if result.is_err() { - error!( - "ws update {} fill could not reach {}", - update.market, addr - ); + error!("ws update {} fill could not reach {}", update.market, addr); } } } @@ -398,6 +453,7 @@ async fn main() -> anyhow::Result<()> { info!("ws listen: {}", config.bind_ws_addr); let try_socket = TcpListener::bind(&config.bind_ws_addr).await; let listener = try_socket.expect("Failed to bind"); + { tokio::spawn(async move { // Let's spawn the handling of each connection in a separate task. while let Ok((stream, addr)) = listener.accept().await { @@ -413,7 +469,26 @@ async fn main() -> anyhow::Result<()> { )); } }); + } + // keepalive + { + tokio::spawn(async move { + let mut write_interval = time::interval(time::Duration::from_secs(30)); + + loop { + write_interval.tick().await; + let peers_copy = peers_ref_thread1.lock().unwrap().clone(); + for (addr, peer) in peers_copy.iter() { + let pl = Vec::new(); + let result = peer.clone().sender.send(Message::Ping(pl)).await; + if result.is_err() { + error!("ws ping could not reach {}", addr); + } + } + } + }); + } info!( "rpc connect: {}", config