avoid panics in websocket handler

This commit is contained in:
Maximilian Schneider 2022-09-10 22:29:17 +02:00
parent 1a2fcd2002
commit 2b50d1bc3d
1 changed files with 27 additions and 18 deletions

View File

@ -1,12 +1,15 @@
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{pin_mut, SinkExt, StreamExt};
use log::*;
use std::{collections::HashMap, fs::File, io::Read, net::SocketAddr, sync::Arc, sync::Mutex};
use std::{
collections::HashMap, fs::File, io::Read, net::SocketAddr, os::macos::raw, sync::Arc,
sync::Mutex,
};
use tokio::{
net::{TcpListener, TcpStream},
pin,
};
use tokio_tungstenite::tungstenite::protocol::Message;
use tokio_tungstenite::tungstenite::{protocol::Message, Error};
use serde::Deserialize;
use solana_geyser_connector_lib::{
@ -17,16 +20,27 @@ use solana_geyser_connector_lib::{
type CheckpointMap = Arc<Mutex<HashMap<String, FillCheckpoint>>>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>;
async fn handle_connection(
async fn handle_connection_error(
checkpoint_map: CheckpointMap,
peer_map: PeerMap,
raw_stream: TcpStream,
addr: SocketAddr,
) {
let result = handle_connection(checkpoint_map, peer_map.clone(), raw_stream, addr).await;
if result.is_err() {
error!("connection {} error {}", addr, result.unwrap_err());
};
peer_map.lock().unwrap().remove(&addr);
}
async fn handle_connection(
checkpoint_map: CheckpointMap,
peer_map: PeerMap,
raw_stream: TcpStream,
addr: SocketAddr,
) -> Result<(), Error> {
info!("ws connected: {}", addr);
let ws_stream = tokio_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the ws handshake occurred");
let ws_stream = tokio_tungstenite::accept_async(raw_stream).await?;
let (mut ws_tx, _ws_rx) = ws_stream.split();
// 1: publish channel in peer map
@ -42,24 +56,19 @@ async fn handle_connection(
for (_, ckpt) in checkpoint_map_copy.iter() {
ws_tx
.feed(Message::Text(serde_json::to_string(ckpt).unwrap()))
.await
.unwrap();
.await?;
}
}
let result_ckpt = ws_tx.flush().await;
info!("ws ckpt sent: {} err: {:?}", addr, result_ckpt);
info!("ws ckpt sent: {}", addr);
ws_tx.flush().await?;
// 3: forward all events from channel to peer socket
let forward_updates = chan_rx.map(Ok).forward(ws_tx);
pin_mut!(forward_updates);
let result_forward = forward_updates.await;
forward_updates.await?;
info!("ws disconnected: {} err: {:?}", &addr, result_forward);
peer_map.lock().unwrap().remove(&addr);
result_ckpt.unwrap();
// disconnecting the socket can result in an error in result_forward.
// unwrapping it can cause a panic in tokyo, ignore error for now
// result_forward.unwrap();
info!("ws disconnected: {}", &addr);
Ok(())
}
#[derive(Clone, Debug, Deserialize)]
@ -132,7 +141,7 @@ async fn main() -> anyhow::Result<()> {
tokio::spawn(async move {
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
tokio::spawn(handle_connection(
tokio::spawn(handle_connection_error(
checkpoints.clone(),
peers.clone(),
stream,