From 04b31f17c58c387d753fb1c7c50d842a568bb5dd Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Tue, 8 Aug 2023 16:02:15 +0200 Subject: [PATCH] fix(hermes): send close message on WS closure Some WS clients were receiving "Abnormal Connection Closure" errors. This commit fixes the issue by sending specific close message upon receiving close message from client. This commit also refactors the ws.rs code by making it simpler and using tokio::time::Interval instead of a manually implemented interval. Lastly, it updates the axum package to include newer patches. --- hermes/Cargo.lock | 18 ++++++------- hermes/Cargo.toml | 6 ++--- hermes/src/api/ws.rs | 63 ++++++++++++++++++++++---------------------- 3 files changed, 44 insertions(+), 43 deletions(-) diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index 1e8883fe..e2d56055 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -404,9 +404,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.19" +version = "0.6.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6a1de45611fdb535bfde7b7de4fd54f4fd2b17b1737c0a59b69bf9b92074b8c" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", @@ -432,7 +432,7 @@ dependencies = [ "sha1", "sync_wrapper", "tokio", - "tokio-tungstenite 0.19.0", + "tokio-tungstenite 0.20.0", "tower", "tower-layer", "tower-service", @@ -1764,7 +1764,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermes" -version = "0.1.7" +version = "0.1.8" dependencies = [ "anyhow", "axum", @@ -5900,14 +5900,14 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec509ac96e9a0c43427c74f003127d953a265737636129424288d27cb5c4b12c" +checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" dependencies = [ "futures-util", "log", "tokio", - "tungstenite 0.19.0", + "tungstenite 0.20.0", ] [[package]] @@ -6102,9 +6102,9 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.19.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67" +checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649" dependencies = [ "byteorder", "bytes", diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index fc76ba86..42aa5590 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -1,12 +1,12 @@ [package] name = "hermes" -version = "0.1.7" +version = "0.1.8" edition = "2021" [dependencies] anyhow = { version = "1.0.69" } -axum = { version = "0.6.9", features = ["json", "ws", "macros"] } -axum-macros = { version = "0.3.4" } +axum = { version = "0.6.20", features = ["json", "ws", "macros"] } +axum-macros = { version = "0.3.8" } base64 = { version = "0.21.0" } borsh = { version = "0.10.3" } byteorder = { version = "1.4.3" } diff --git a/hermes/src/api/ws.rs b/hermes/src/api/ws.rs index 934460e5..01f47f1e 100644 --- a/hermes/src/api/ws.rs +++ b/hermes/src/api/ws.rs @@ -39,7 +39,6 @@ use { }, std::{ collections::HashMap, - pin::Pin, sync::{ atomic::{ AtomicUsize, @@ -88,7 +87,7 @@ pub struct Subscriber { receiver: SplitStream, sender: SplitSink, price_feeds_with_config: HashMap, - ping_interval_future: Pin>, + ping_interval: tokio::time::Interval, responded_to_ping: bool, } @@ -108,7 +107,7 @@ impl Subscriber { receiver, sender, price_feeds_with_config: HashMap::new(), - ping_interval_future: Box::pin(tokio::time::sleep(PING_INTERVAL_DURATION)), + ping_interval: tokio::time::interval(PING_INTERVAL_DURATION), responded_to_ping: true, // We start with true so we don't close the connection immediately } } @@ -116,7 +115,7 @@ impl Subscriber { pub async fn run(&mut self) { while !self.closed { if let Err(e) = self.handle_next().await { - log::warn!("Subscriber {}: Error handling next message: {}", self.id, e); + log::debug!("Subscriber {}: Error handling next message: {}", self.id, e); break; } } @@ -128,31 +127,22 @@ impl Subscriber { if maybe_update_feeds.is_none() { return Err(anyhow!("Update channel closed. This should never happen. Closing connection.")); }; - self.handle_price_feeds_update().await?; + self.handle_price_feeds_update().await }, maybe_message_or_err = self.receiver.next() => { - match maybe_message_or_err { - None => { - log::debug!("Subscriber {} closed connection unexpectedly.", self.id); - self.closed = true; - return Ok(()); - }, - Some(message_or_err) => self.handle_client_message(message_or_err?).await? - } + self.handle_client_message( + maybe_message_or_err.ok_or(anyhow!("Client channel is closed"))?? + ).await }, - _ = &mut self.ping_interval_future => { + _ = self.ping_interval.tick() => { if !self.responded_to_ping { - log::debug!("Subscriber {} did not respond to ping. Closing connection.", self.id); - self.closed = true; - return Ok(()); + return Err(anyhow!("Subscriber did not respond to ping. Closing connection.")); } self.responded_to_ping = false; self.sender.send(Message::Ping(vec![])).await?; - self.ping_interval_future = Box::pin(tokio::time::sleep(PING_INTERVAL_DURATION)); + Ok(()) } } - - Ok(()) } async fn handle_price_feeds_update(&mut self) -> Result<()> { @@ -170,6 +160,8 @@ impl Subscriber { "Config missing, price feed list was poisoned during iteration." ))?; + // `sender.feed` buffers a message to the client but does not flush it, so we can send + // multiple messages and flush them all at once. self.sender .feed(Message::Text(serde_json::to_string( &ServerMessage::PriceUpdate { @@ -188,13 +180,22 @@ impl Subscriber { } async fn handle_client_message(&mut self, message: Message) -> Result<()> { - if let Message::Close(_) = message { - log::debug!("Subscriber {} closed connection", self.id); - self.closed = true; - return Ok(()); - } - let maybe_client_message = match message { + Message::Close(_) => { + // Closing the connection. We don't remove it from the subscribers + // list, instead when the Subscriber struct is dropped the channel + // to subscribers list will be closed and it will eventually get + // removed. + log::trace!("Subscriber {} closed connection", self.id); + + // Send the close message to gracefully shut down the connection + // Otherwise the client might get an abnormal Websocket closure + // error. + self.sender.close().await?; + + self.closed = true; + return Ok(()); + } Message::Text(text) => serde_json::from_str::(&text), Message::Binary(data) => serde_json::from_slice::(&data), Message::Ping(_) => { @@ -205,9 +206,6 @@ impl Subscriber { self.responded_to_ping = true; return Ok(()); } - _ => { - return Ok(()); - } }; match maybe_client_message { @@ -261,8 +259,11 @@ pub async fn notify_updates(ws_state: Arc) { .map(|subscriber| async move { match subscriber.send(()).await { Ok(_) => None, - Err(e) => { - log::debug!("Error sending update to subscriber: {}", e); + Err(_) => { + // An error here indicates the channel is closed (which may happen either when the + // client has sent Message::Close or some other abrupt disconnection). We remove + // subscribers only when send fails so we can handle closure only once when we are + // able to see send() fail. Some(*subscriber.key()) } }