From 250ec3f2a4d876de26f23b2b18ff5049fa161397 Mon Sep 17 00:00:00 2001 From: "Jeff Washington (jwash)" Date: Tue, 13 Dec 2022 16:40:12 -0600 Subject: [PATCH] improve write_version usage in geyser scan from snapshot (#29223) --- .../src/accounts_db/geyser_plugin_utils.rs | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/runtime/src/accounts_db/geyser_plugin_utils.rs b/runtime/src/accounts_db/geyser_plugin_utils.rs index 4f669030d..ab301f13c 100644 --- a/runtime/src/accounts_db/geyser_plugin_utils.rs +++ b/runtime/src/accounts_db/geyser_plugin_utils.rs @@ -89,29 +89,33 @@ impl AccountsDb { let slot_stores = slot_stores.read().unwrap(); let mut accounts_to_stream: HashMap = HashMap::default(); let mut measure_filter = Measure::start("accountsdb-plugin-filtering-accounts"); + let mut previous_write_version = None; for (_, storage_entry) in slot_stores.iter() { - let mut accounts = storage_entry.all_accounts(); - let account_len = accounts.len(); - notify_stats.total_accounts += account_len; - accounts.drain(..).into_iter().for_each(|account| { + let accounts = storage_entry.accounts.account_iter(); + let mut account_len = 0; + accounts.for_each(|account| { + account_len += 1; + if let Some(previous_write_version) = previous_write_version { + assert!(previous_write_version < account.meta.write_version); + } + previous_write_version = Some(account.meta.write_version); if notified_accounts.contains(&account.meta.pubkey) { notify_stats.skipped_accounts += 1; return; } match accounts_to_stream.entry(account.meta.pubkey) { Entry::Occupied(mut entry) => { - let existing_account = entry.get(); - if account.meta.write_version > existing_account.meta.write_version { - entry.insert(account); - } else { - notify_stats.skipped_accounts += 1; - } + // later entries in the same slot are more recent and override earlier accounts for the same pubkey + // We can pass an incrementing number here for write_version in the future, if the storage does not have a write_version. + // As long as all accounts for this slot are in 1 append vec that can be itereated olest to newest. + entry.insert(account); } Entry::Vacant(entry) => { entry.insert(account); } } }); + notify_stats.total_accounts += account_len; } measure_filter.stop(); notify_stats.elapsed_filtering_us += measure_filter.as_us() as usize;