diff --git a/hermes/src/api.rs b/hermes/src/api.rs index 70a2f2c9..d2081b8a 100644 --- a/hermes/src/api.rs +++ b/hermes/src/api.rs @@ -1,5 +1,5 @@ use { - self::ws::dispatch_updates, + self::ws::notify_updates, crate::store::Store, anyhow::Result, axum::{ @@ -65,7 +65,7 @@ pub async fn run(store: Arc, mut update_rx: Receiver<()>, rpc_addr: Strin .await .expect("state update channel is closed"); - dispatch_updates(state.clone()).await; + notify_updates(state.ws.clone()).await; } }); diff --git a/hermes/src/api/ws.rs b/hermes/src/api/ws.rs index 19c940c7..8b1e3cf1 100644 --- a/hermes/src/api/ws.rs +++ b/hermes/src/api/ws.rs @@ -53,6 +53,7 @@ use { }; pub const PING_INTERVAL_DURATION: Duration = Duration::from_secs(30); +pub const NOTIFICATIONS_CHAN_LEN: usize = 1000; pub async fn ws_route_handler( ws: WebSocketUpgrade, @@ -64,19 +65,14 @@ pub async fn ws_route_handler( async fn websocket_handler(stream: WebSocket, state: super::State) { let ws_state = state.ws.clone(); let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst); - - let (sender, receiver) = stream.split(); - - // TODO: Use a configured value for the buffer size or make it const static - // TODO: Use redis stream to source the updates instead of a channel - let (tx, rx) = mpsc::channel::<()>(1000); - - ws_state.subscribers.insert(id, tx); - log::debug!("New websocket connection, assigning id: {}", id); - let mut subscriber = Subscriber::new(id, state.store.clone(), rx, receiver, sender); + let (notify_sender, notify_receiver) = mpsc::channel::<()>(NOTIFICATIONS_CHAN_LEN); + let (sender, receiver) = stream.split(); + let mut subscriber = + Subscriber::new(id, state.store.clone(), notify_receiver, receiver, sender); + ws_state.subscribers.insert(id, notify_sender); subscriber.run().await; } @@ -88,7 +84,7 @@ pub struct Subscriber { id: SubscriberId, closed: bool, store: Arc, - update_rx: mpsc::Receiver<()>, + notify_receiver: mpsc::Receiver<()>, receiver: SplitStream, sender: SplitSink, price_feeds_with_config: HashMap, @@ -100,7 +96,7 @@ impl Subscriber { pub fn new( id: SubscriberId, store: Arc, - update_rx: mpsc::Receiver<()>, + notify_receiver: mpsc::Receiver<()>, receiver: SplitStream, sender: SplitSink, ) -> Self { @@ -108,7 +104,7 @@ impl Subscriber { id, closed: false, store, - update_rx, + notify_receiver, receiver, sender, price_feeds_with_config: HashMap::new(), @@ -128,7 +124,7 @@ impl Subscriber { async fn handle_next(&mut self) -> Result<()> { tokio::select! { - maybe_update_feeds = self.update_rx.recv() => { + maybe_update_feeds = self.notify_receiver.recv() => { if maybe_update_feeds.is_none() { return Err(anyhow!("Update channel closed. This should never happen. Closing connection.")); }; @@ -257,9 +253,7 @@ impl Subscriber { } } -pub async fn dispatch_updates(state: super::State) { - let ws_state = state.ws.clone(); - +pub async fn notify_updates(ws_state: Arc) { let closed_subscribers: Vec> = join_all( ws_state .subscribers