refactor metadata_sender

This commit is contained in:
GroovieGermanikus 2024-07-29 11:19:17 +02:00
parent 50a9f126a6
commit d398c07954
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
1 changed files with 9 additions and 11 deletions

View File

@ -19,6 +19,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use async_channel::SendError;
use tokio::time::timeout;
use crate::snapshot::{
@ -328,9 +329,10 @@ pub async fn process_events(
let metadata_sender = |msg| {
if let Some(sender) = &metdata_write_queue_sender {
sender.send_blocking(msg)
} else {
Ok(())
let res = sender.send_blocking(msg);
if let Err(SendError(_)) = res {
warn!("failed to send feed matadata event to closed channel - ignore");
}
}
};
// consume websocket updates from rust channels
@ -350,9 +352,7 @@ pub async fn process_events(
}
WebsocketMessage::SnapshotUpdate((slot, accounts)) => {
trace!("snapshot update {slot}");
if let Err(e) = metadata_sender(FeedMetadata::SnapshotStart) {
warn!("failed to send feed matadata event: {}", e);
}
metadata_sender(FeedMetadata::SnapshotStart);
for (pubkey, account) in accounts {
let pubkey = Pubkey::from_str(&pubkey).unwrap();
@ -366,14 +366,12 @@ pub async fn process_events(
))
.await
.expect("send success");
} else if let Err(e) = metadata_sender(FeedMetadata::InvalidAccount(pubkey)) {
warn!("failed to send feed matadata event: {}", e);
} else {
metadata_sender(FeedMetadata::InvalidAccount(pubkey));
}
}
if let Err(e) = metadata_sender(FeedMetadata::SnapshotEnd) {
warn!("failed to send feed matadata event: {}", e);
}
metadata_sender(FeedMetadata::SnapshotEnd);
}
WebsocketMessage::SlotUpdate(update) => {
trace!("slot update");