Merge pull request #1 from blockworks-foundation/max/debug
improved error handling for websocket
This commit is contained in:
commit
43b5cf5780
|
@ -6,7 +6,7 @@ use tokio::{
|
|||
net::{TcpListener, TcpStream},
|
||||
pin,
|
||||
};
|
||||
use tokio_tungstenite::tungstenite::protocol::Message;
|
||||
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
|
||||
|
||||
use serde::Deserialize;
|
||||
use solana_geyser_connector_lib::{
|
||||
|
@ -17,16 +17,27 @@ use solana_geyser_connector_lib::{
|
|||
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
|
||||
type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>;
|
||||
|
||||
async fn handle_connection(
|
||||
async fn handle_connection_error(
|
||||
checkpoint_map: CheckpointMap,
|
||||
peer_map: PeerMap,
|
||||
raw_stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
) {
|
||||
let result = handle_connection(checkpoint_map, peer_map.clone(), raw_stream, addr).await;
|
||||
if result.is_err() {
|
||||
error!("connection {} error {}", addr, result.unwrap_err());
|
||||
};
|
||||
peer_map.lock().unwrap().remove(&addr);
|
||||
}
|
||||
|
||||
async fn handle_connection(
|
||||
checkpoint_map: CheckpointMap,
|
||||
peer_map: PeerMap,
|
||||
raw_stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
) -> Result<(), Error> {
|
||||
info!("ws connected: {}", addr);
|
||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
|
||||
.await
|
||||
.expect("Error during the ws handshake occurred");
|
||||
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
|
||||
let (mut ws_tx, _ws_rx) = ws_stream.split();
|
||||
|
||||
// 1: publish channel in peer map
|
||||
|
@ -42,22 +53,19 @@ async fn handle_connection(
|
|||
for (_, ckpt) in checkpoint_map_copy.iter() {
|
||||
ws_tx
|
||||
.feed(Message::Text(serde_json::to_string(ckpt).unwrap()))
|
||||
.await
|
||||
.unwrap();
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
let result_ckpt = ws_tx.flush().await;
|
||||
info!("ws ckpt sent: {} err: {:?}", addr, result_ckpt);
|
||||
info!("ws ckpt sent: {}", addr);
|
||||
ws_tx.flush().await?;
|
||||
|
||||
// 3: forward all events from channel to peer socket
|
||||
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
|
||||
pin_mut!(forward_updates);
|
||||
let result_forward = forward_updates.await;
|
||||
forward_updates.await?;
|
||||
|
||||
info!("ws disconnected: {} err: {:?}", &addr, result_forward);
|
||||
peer_map.lock().unwrap().remove(&addr);
|
||||
result_ckpt.unwrap();
|
||||
result_forward.unwrap();
|
||||
info!("ws disconnected: {}", &addr);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize)]
|
||||
|
@ -102,16 +110,18 @@ async fn main() -> anyhow::Result<()> {
|
|||
let message = fill_receiver.recv().await.unwrap();
|
||||
match message {
|
||||
FillEventFilterMessage::Update(update) => {
|
||||
info!("ws update {} {:?} fill", update.market, update.status);
|
||||
|
||||
debug!("ws update {} {:?} fill", update.market, update.status);
|
||||
let mut peer_copy = peers_ref_thread.lock().unwrap().clone();
|
||||
|
||||
for (k, v) in peer_copy.iter_mut() {
|
||||
debug!(" > {}", k);
|
||||
|
||||
trace!(" > {}", k);
|
||||
let json = serde_json::to_string(&update);
|
||||
|
||||
v.send(Message::Text(json.unwrap())).await.unwrap()
|
||||
let result = v.send(Message::Text(json.unwrap())).await;
|
||||
if result.is_err() {
|
||||
error!(
|
||||
"ws update {} {:?} fill could not reach {}",
|
||||
update.market, update.status, k
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
FillEventFilterMessage::Checkpoint(checkpoint) => {
|
||||
|
@ -130,7 +140,7 @@ async fn main() -> anyhow::Result<()> {
|
|||
tokio::spawn(async move {
|
||||
// Let's spawn the handling of each connection in a separate task.
|
||||
while let Ok((stream, addr)) = listener.accept().await {
|
||||
tokio::spawn(handle_connection(
|
||||
tokio::spawn(handle_connection_error(
|
||||
checkpoints.clone(),
|
||||
peers.clone(),
|
||||
stream,
|
||||
|
|
Loading…
Reference in New Issue