fix(hermes): send close message on WS closure

Some WS clients were receiving "Abnormal Connection Closure" errors.
This commit fixes the issue by sending specific close message upon
receiving close message from client. This commit also refactors the
ws.rs code by making it simpler and using tokio::time::Interval
instead of a manually implemented interval. Lastly, it updates the
axum package to include newer patches.
This commit is contained in:
Ali Behjati 2023-08-08 16:02:15 +02:00
parent f1eeb94210
commit 04b31f17c5
3 changed files with 44 additions and 43 deletions

18
hermes/Cargo.lock generated
View File

@ -404,9 +404,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.19"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6a1de45611fdb535bfde7b7de4fd54f4fd2b17b1737c0a59b69bf9b92074b8c"
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
dependencies = [
"async-trait",
"axum-core",
@ -432,7 +432,7 @@ dependencies = [
"sha1",
"sync_wrapper",
"tokio",
"tokio-tungstenite 0.19.0",
"tokio-tungstenite 0.20.0",
"tower",
"tower-layer",
"tower-service",
@ -1764,7 +1764,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
[[package]]
name = "hermes"
version = "0.1.7"
version = "0.1.8"
dependencies = [
"anyhow",
"axum",
@ -5900,14 +5900,14 @@ dependencies = [
[[package]]
name = "tokio-tungstenite"
version = "0.19.0"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec509ac96e9a0c43427c74f003127d953a265737636129424288d27cb5c4b12c"
checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.19.0",
"tungstenite 0.20.0",
]
[[package]]
@ -6102,9 +6102,9 @@ dependencies = [
[[package]]
name = "tungstenite"
version = "0.19.0"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67"
checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649"
dependencies = [
"byteorder",
"bytes",

View File

@ -1,12 +1,12 @@
[package]
name = "hermes"
version = "0.1.7"
version = "0.1.8"
edition = "2021"
[dependencies]
anyhow = { version = "1.0.69" }
axum = { version = "0.6.9", features = ["json", "ws", "macros"] }
axum-macros = { version = "0.3.4" }
axum = { version = "0.6.20", features = ["json", "ws", "macros"] }
axum-macros = { version = "0.3.8" }
base64 = { version = "0.21.0" }
borsh = { version = "0.10.3" }
byteorder = { version = "1.4.3" }

View File

@ -39,7 +39,6 @@ use {
},
std::{
collections::HashMap,
pin::Pin,
sync::{
atomic::{
AtomicUsize,
@ -88,7 +87,7 @@ pub struct Subscriber {
receiver: SplitStream<WebSocket>,
sender: SplitSink<WebSocket, Message>,
price_feeds_with_config: HashMap<PriceIdentifier, PriceFeedClientConfig>,
ping_interval_future: Pin<Box<tokio::time::Sleep>>,
ping_interval: tokio::time::Interval,
responded_to_ping: bool,
}
@ -108,7 +107,7 @@ impl Subscriber {
receiver,
sender,
price_feeds_with_config: HashMap::new(),
ping_interval_future: Box::pin(tokio::time::sleep(PING_INTERVAL_DURATION)),
ping_interval: tokio::time::interval(PING_INTERVAL_DURATION),
responded_to_ping: true, // We start with true so we don't close the connection immediately
}
}
@ -116,7 +115,7 @@ impl Subscriber {
pub async fn run(&mut self) {
while !self.closed {
if let Err(e) = self.handle_next().await {
log::warn!("Subscriber {}: Error handling next message: {}", self.id, e);
log::debug!("Subscriber {}: Error handling next message: {}", self.id, e);
break;
}
}
@ -128,31 +127,22 @@ impl Subscriber {
if maybe_update_feeds.is_none() {
return Err(anyhow!("Update channel closed. This should never happen. Closing connection."));
};
self.handle_price_feeds_update().await?;
self.handle_price_feeds_update().await
},
maybe_message_or_err = self.receiver.next() => {
match maybe_message_or_err {
None => {
log::debug!("Subscriber {} closed connection unexpectedly.", self.id);
self.closed = true;
return Ok(());
},
Some(message_or_err) => self.handle_client_message(message_or_err?).await?
}
self.handle_client_message(
maybe_message_or_err.ok_or(anyhow!("Client channel is closed"))??
).await
},
_ = &mut self.ping_interval_future => {
_ = self.ping_interval.tick() => {
if !self.responded_to_ping {
log::debug!("Subscriber {} did not respond to ping. Closing connection.", self.id);
self.closed = true;
return Ok(());
return Err(anyhow!("Subscriber did not respond to ping. Closing connection."));
}
self.responded_to_ping = false;
self.sender.send(Message::Ping(vec![])).await?;
self.ping_interval_future = Box::pin(tokio::time::sleep(PING_INTERVAL_DURATION));
Ok(())
}
}
Ok(())
}
async fn handle_price_feeds_update(&mut self) -> Result<()> {
@ -170,6 +160,8 @@ impl Subscriber {
"Config missing, price feed list was poisoned during iteration."
))?;
// `sender.feed` buffers a message to the client but does not flush it, so we can send
// multiple messages and flush them all at once.
self.sender
.feed(Message::Text(serde_json::to_string(
&ServerMessage::PriceUpdate {
@ -188,13 +180,22 @@ impl Subscriber {
}
async fn handle_client_message(&mut self, message: Message) -> Result<()> {
if let Message::Close(_) = message {
log::debug!("Subscriber {} closed connection", self.id);
self.closed = true;
return Ok(());
}
let maybe_client_message = match message {
Message::Close(_) => {
// Closing the connection. We don't remove it from the subscribers
// list, instead when the Subscriber struct is dropped the channel
// to subscribers list will be closed and it will eventually get
// removed.
log::trace!("Subscriber {} closed connection", self.id);
// Send the close message to gracefully shut down the connection
// Otherwise the client might get an abnormal Websocket closure
// error.
self.sender.close().await?;
self.closed = true;
return Ok(());
}
Message::Text(text) => serde_json::from_str::<ClientMessage>(&text),
Message::Binary(data) => serde_json::from_slice::<ClientMessage>(&data),
Message::Ping(_) => {
@ -205,9 +206,6 @@ impl Subscriber {
self.responded_to_ping = true;
return Ok(());
}
_ => {
return Ok(());
}
};
match maybe_client_message {
@ -261,8 +259,11 @@ pub async fn notify_updates(ws_state: Arc<WsState>) {
.map(|subscriber| async move {
match subscriber.send(()).await {
Ok(_) => None,
Err(e) => {
log::debug!("Error sending update to subscriber: {}", e);
Err(_) => {
// An error here indicates the channel is closed (which may happen either when the
// client has sent Message::Close or some other abrupt disconnection). We remove
// subscribers only when send fails so we can handle closure only once when we are
// able to see send() fail.
Some(*subscriber.key())
}
}