diff --git a/connector/src/websocket_source.rs b/connector/src/websocket_source.rs index 07f49f3..31ed977 100644 --- a/connector/src/websocket_source.rs +++ b/connector/src/websocket_source.rs @@ -19,6 +19,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use async_channel::SendError; use tokio::time::timeout; use crate::snapshot::{ @@ -328,9 +329,10 @@ pub async fn process_events( let metadata_sender = |msg| { if let Some(sender) = &metdata_write_queue_sender { - sender.send_blocking(msg) - } else { - Ok(()) + let res = sender.send_blocking(msg); + if let Err(SendError(_)) = res { + warn!("failed to send feed matadata event to closed channel - ignore"); + } } }; // consume websocket updates from rust channels @@ -350,9 +352,7 @@ pub async fn process_events( } WebsocketMessage::SnapshotUpdate((slot, accounts)) => { trace!("snapshot update {slot}"); - if let Err(e) = metadata_sender(FeedMetadata::SnapshotStart) { - warn!("failed to send feed matadata event: {}", e); - } + metadata_sender(FeedMetadata::SnapshotStart); for (pubkey, account) in accounts { let pubkey = Pubkey::from_str(&pubkey).unwrap(); @@ -366,14 +366,12 @@ pub async fn process_events( )) .await .expect("send success"); - } else if let Err(e) = metadata_sender(FeedMetadata::InvalidAccount(pubkey)) { - warn!("failed to send feed matadata event: {}", e); + } else { + metadata_sender(FeedMetadata::InvalidAccount(pubkey)); } } - if let Err(e) = metadata_sender(FeedMetadata::SnapshotEnd) { - warn!("failed to send feed matadata event: {}", e); - } + metadata_sender(FeedMetadata::SnapshotEnd); } WebsocketMessage::SlotUpdate(update) => { trace!("slot update");