Fix orderbook exit handling, tidy up scope

This commit is contained in:
Riordan Panayides 2023-05-07 12:50:33 +01:00
parent e5f91c282f
commit f2688876b0
2 changed files with 102 additions and 68 deletions

View File

@ -19,8 +19,11 @@ use std::{
io::Read,
net::SocketAddr,
str::FromStr,
sync::Arc,
sync::{atomic::AtomicBool, Mutex},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Mutex
},
time::Duration,
};
use tokio::{
@ -258,15 +261,15 @@ fn handle_commands(
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args: Vec<String> = std::env::args().collect();
solana_logger::setup_with_default("info");
let exit: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
// load config
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
eprintln!("Please enter a config file path argument");
return Ok(());
}
let config: Config = {
let mut file = File::open(&args[1])?;
let mut contents = String::new();
@ -274,19 +277,14 @@ async fn main() -> anyhow::Result<()> {
toml::from_str(&contents).unwrap()
};
solana_logger::setup_with_default("info");
// setup metrics
let metrics_tx = metrics::start(config.metrics, "orderbook".into());
let metrics_opened_connections =
metrics_tx.register_u64("orderbook_opened_connections".into(), MetricType::Counter);
let metrics_closed_connections =
metrics_tx.register_u64("orderbook_closed_connections".into(), MetricType::Counter);
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
let peers = PeerMap::new(Mutex::new(HashMap::new()));
// load mango group and markets from rpc
let rpc_url = config.rpc_http_url;
let ws_url = rpc_url.replace("https", "wss");
let rpc_timeout = Duration::from_secs(10);
@ -378,72 +376,99 @@ async fn main() -> anyhow::Result<()> {
market_configs.clone(),
serum_market_configs.clone(),
metrics_tx.clone(),
exit.clone()
)
.await?;
let checkpoints_ref_thread = checkpoints.clone();
let peers_ref_thread = peers.clone();
let peers_ref_thread1 = peers.clone();
let checkpoints = CheckpointMap::new(Mutex::new(HashMap::new()));
let peers = PeerMap::new(Mutex::new(HashMap::new()));
tokio::spawn(async move {
pin!(orderbook_receiver);
loop {
let message: OrderbookFilterMessage = orderbook_receiver.recv().await.unwrap();
match message {
OrderbookFilterMessage::Update(update) => {
debug!("ws update {} {:?}", update.market, update.side);
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
// orderbook receiver
{
let checkpoints = checkpoints.clone();
let peers = peers.clone();
let exit = exit.clone();
tokio::spawn(async move {
pin!(orderbook_receiver);
loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down orderbook receiver...");
break;
}
// only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market) {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!(
"ws update {} {:?} fill could not reach {}",
update.market, update.side, addr
);
let message: OrderbookFilterMessage = orderbook_receiver.recv().await.unwrap();
match message {
OrderbookFilterMessage::Update(update) => {
debug!("ws update {} {:?}", update.market, update.side);
let mut peer_copy = peers.lock().unwrap().clone();
for (addr, peer) in peer_copy.iter_mut() {
let json = serde_json::to_string(&update).unwrap();
// only send updates if the peer is subscribed
if peer.subscriptions.contains(&update.market) {
let result = peer.sender.send(Message::Text(json)).await;
if result.is_err() {
error!(
"ws update {} {:?} could not reach {}",
update.market, update.side, addr
);
}
}
}
}
}
OrderbookFilterMessage::Checkpoint(checkpoint) => {
checkpoints_ref_thread
.lock()
.unwrap()
.insert(checkpoint.market.clone(), checkpoint);
OrderbookFilterMessage::Checkpoint(checkpoint) => {
checkpoints
.lock()
.unwrap()
.insert(checkpoint.market.clone(), checkpoint);
}
}
}
}
});
});
}
info!("ws listen: {}", config.bind_ws_addr);
let try_socket = TcpListener::bind(&config.bind_ws_addr).await;
let listener = try_socket.expect("Failed to bind");
tokio::spawn(async move {
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection_error(
checkpoints.clone(),
peers.clone(),
market_pubkey_strings.clone(),
stream,
addr,
metrics_opened_connections.clone(),
metrics_closed_connections.clone(),
));
}
});
// websocket server
{
info!("ws listen: {}", config.bind_ws_addr);
let try_socket = TcpListener::bind(&config.bind_ws_addr).await;
let listener = try_socket.expect("Failed to bind");
let exit = exit.clone();
let peers = peers.clone();
tokio::spawn(async move {
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
if exit.load(Ordering::Relaxed) {
warn!("shutting down websocket server...");
break;
}
tokio::spawn(handle_connection_error(
checkpoints.clone(),
peers.clone(),
market_pubkey_strings.clone(),
stream,
addr,
metrics_opened_connections.clone(),
metrics_closed_connections.clone(),
));
}
});
}
// keepalive
{
let exit = exit.clone();
let peers = peers.clone();
tokio::spawn(async move {
let mut write_interval = time::interval(time::Duration::from_secs(30));
loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down keepalive...");
break;
}
write_interval.tick().await;
let peers_copy = peers_ref_thread1.lock().unwrap().clone();
let peers_copy = peers.lock().unwrap().clone();
for (addr, peer) in peers_copy.iter() {
let pl = Vec::new();
let result = peer.clone().sender.send(Message::Ping(pl)).await;
@ -455,15 +480,15 @@ async fn main() -> anyhow::Result<()> {
});
}
// // handle sigint
// {
// let exit = exit.clone();
// tokio::spawn(async move {
// tokio::signal::ctrl_c().await.unwrap();
// info!("Received SIGINT, shutting down...");
// exit.store(true, Ordering::Relaxed);
// });
// }
// handle sigint
{
let exit = exit.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
info!("Received SIGINT, shutting down...");
exit.store(true, Ordering::Relaxed);
});
}
info!(
"rpc connect: {}",

View File

@ -33,6 +33,10 @@ use std::{
collections::{HashMap, HashSet},
mem::size_of,
time::{SystemTime, UNIX_EPOCH},
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
struct KeyedSharedDataAccountReader {
@ -154,6 +158,7 @@ pub async fn init(
market_configs: Vec<(Pubkey, MarketConfig)>,
serum_market_configs: Vec<(Pubkey, MarketConfig)>,
metrics_sender: Metrics,
exit: Arc<AtomicBool>
) -> anyhow::Result<(
async_channel::Sender<AccountWrite>,
async_channel::Sender<SlotUpdate>,
@ -192,6 +197,10 @@ pub async fn init(
// update handling thread, reads both slots and account updates
tokio::spawn(async move {
loop {
if exit.load(Ordering::Relaxed) {
warn!("shutting down orderbook_filter...");
break;
}
tokio::select! {
Ok(account_write) = account_write_queue_receiver.recv() => {
if !relevant_pubkeys.contains(&account_write.pubkey) {