diff --git a/data-streams/src/account_write_filter.rs b/data-streams/src/account_write_filter.rs index 79aad21..9654804 100644 --- a/data-streams/src/account_write_filter.rs +++ b/data-streams/src/account_write_filter.rs @@ -25,7 +25,7 @@ pub struct AccountWriteRoute { } #[derive(Clone, Debug)] -struct AcountWriteRecord { +struct AccountWriteRecord { slot: u64, write_version: u64, timestamp: Instant, @@ -38,18 +38,17 @@ pub fn init( async_channel::Sender, async_channel::Sender, )> { - // The actual message may want to also contain a retry count, if it self-reinserts on failure? let (account_write_queue_sender, account_write_queue_receiver) = async_channel::unbounded::(); - // Slot updates flowing from the outside into the single processing thread. From - // there they'll flow into the postgres sending thread. + // Slot updates flowing from the outside into this processing thread. From + // there the AccountWriteRoute::sink() callback is triggered. let (slot_queue_sender, slot_queue_receiver) = async_channel::unbounded::(); let mut chain_data = ChainData::new(); let mut chain_data_metrics = ChainDataMetrics::new(&metrics_sender); - let mut last_updated = HashMap::::new(); + let mut last_updated = HashMap::::new(); let all_queue_pks: BTreeSet = routes .iter() @@ -57,7 +56,7 @@ pub fn init( .map(|pk| pk.clone()) .collect(); - // update handling thread, reads both sloths and account updates + // update handling thread, reads both slots and account updates tokio::spawn(async move { loop { tokio::select! { @@ -114,7 +113,7 @@ pub fn init( // todo: metrics last_updated.insert( pk_b58.clone(), - AcountWriteRecord { + AccountWriteRecord { slot: account_info.slot, write_version: account_info.write_version, timestamp: Instant::now(),