websocket: add a metdata feed to indicate missing account, snapshot start and end

This commit is contained in:
Serge FARNY 2024-05-07 09:19:41 +02:00
parent 9e7ebd1d16
commit f9295b0d9f
2 changed files with 22 additions and 3 deletions

View File

@ -80,6 +80,7 @@ async fn main() -> anyhow::Result<()> {
&config, &config,
&filter_config, &filter_config,
account_write_queue_sender, account_write_queue_sender,
None,
slot_queue_sender, slot_queue_sender,
) )
.await; .await;

View File

@ -25,8 +25,8 @@ use crate::snapshot::{
get_snapshot_gma, get_snapshot_gpa, SnapshotMultipleAccounts, SnapshotProgramAccounts, get_snapshot_gma, get_snapshot_gpa, SnapshotMultipleAccounts, SnapshotProgramAccounts,
}; };
use crate::{ use crate::{
chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FilterConfig, SlotUpdate, chain_data::SlotStatus, AccountWrite, AnyhowWrap, EntityFilter, FeedMetadata, FilterConfig,
SourceConfig, SlotUpdate, SourceConfig,
}; };
const SNAPSHOT_REFRESH_INTERVAL: Duration = Duration::from_secs(300); const SNAPSHOT_REFRESH_INTERVAL: Duration = Duration::from_secs(300);
@ -294,6 +294,7 @@ pub async fn process_events(
config: &SourceConfig, config: &SourceConfig,
filter_config: &FilterConfig, filter_config: &FilterConfig,
account_write_queue_sender: async_channel::Sender<AccountWrite>, account_write_queue_sender: async_channel::Sender<AccountWrite>,
metdata_write_queue_sender: Option<async_channel::Sender<FeedMetadata>>,
slot_queue_sender: async_channel::Sender<SlotUpdate>, slot_queue_sender: async_channel::Sender<SlotUpdate>,
) { ) {
// Subscribe to program account updates websocket // Subscribe to program account updates websocket
@ -325,6 +326,13 @@ pub async fn process_events(
// The thread that pulls updates and forwards them to postgres // The thread that pulls updates and forwards them to postgres
// //
let metadata_sender = |msg| {
if let Some(sender) = &metdata_write_queue_sender {
sender.send_blocking(msg)
} else {
Ok(())
}
};
// consume websocket updates from rust channels // consume websocket updates from rust channels
loop { loop {
let update = update_receiver.recv().await.unwrap(); let update = update_receiver.recv().await.unwrap();
@ -342,9 +350,13 @@ pub async fn process_events(
} }
WebsocketMessage::SnapshotUpdate((slot, accounts)) => { WebsocketMessage::SnapshotUpdate((slot, accounts)) => {
trace!("snapshot update {slot}"); trace!("snapshot update {slot}");
if let Err(e) = metadata_sender(FeedMetadata::SnapshotStart) {
warn!("failed to send feed matadata event: {}", e);
}
for (pubkey, account) in accounts { for (pubkey, account) in accounts {
let pubkey = Pubkey::from_str(&pubkey).unwrap();
if let Some(account) = account { if let Some(account) = account {
let pubkey = Pubkey::from_str(&pubkey).unwrap();
account_write_queue_sender account_write_queue_sender
.send(AccountWrite::from( .send(AccountWrite::from(
pubkey, pubkey,
@ -354,8 +366,14 @@ pub async fn process_events(
)) ))
.await .await
.expect("send success"); .expect("send success");
} else if let Err(e) = metadata_sender(FeedMetadata::InvalidAccount(pubkey)) {
warn!("failed to send feed matadata event: {}", e);
} }
} }
if let Err(e) = metadata_sender(FeedMetadata::SnapshotEnd) {
warn!("failed to send feed matadata event: {}", e);
}
} }
WebsocketMessage::SlotUpdate(update) => { WebsocketMessage::SlotUpdate(update) => {
trace!("slot update"); trace!("slot update");