diff --git a/accounts-db/src/accounts_db/geyser_plugin_utils.rs b/accounts-db/src/accounts_db/geyser_plugin_utils.rs index 180fcb73c1..65f7247326 100644 --- a/accounts-db/src/accounts_db/geyser_plugin_utils.rs +++ b/accounts-db/src/accounts_db/geyser_plugin_utils.rs @@ -48,9 +48,11 @@ impl AccountsDb { /// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated /// multiple times only the last write (with highest write_version) is notified. pub fn notify_account_restore_from_snapshot(&self) { + println!("notify_account_restore_from_snapshot..."); if self.accounts_update_notifier.is_none() { return; } + println!("notify_account_restore_from_snapshot...cont"); let mut slots = self.storage.all_slots(); let mut notified_accounts: HashSet = HashSet::default(); @@ -77,6 +79,7 @@ impl AccountsDb { ) where P: Iterator, { + println!("notify_account_at_accounts_update"); if let Some(accounts_update_notifier) = &self.accounts_update_notifier { let notifier = &accounts_update_notifier.read().unwrap(); notifier.notify_account_update( @@ -127,6 +130,7 @@ impl AccountsDb { mut accounts_to_stream: HashMap, notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats, ) { + println!("notify_filtered_accounts: {}", accounts_to_stream.len()); let notifier = self .accounts_update_notifier .as_ref() @@ -138,36 +142,46 @@ impl AccountsDb { let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts"); let local_write_version = 0; let mut buffer = Vec::with_capacity(1000); + let mut remaining = accounts_to_stream.len(); 'drain: for acc in accounts_to_stream.drain() { + remaining -= 1; if buffer.len() < 1000 { buffer.push(acc); - continue 'drain; + if remaining > 0 { + continue 'drain; + } } else { - let mut mapped = Vec::new(); - for (pubkey, mut account) in buffer.drain(..) { // We do not need to rely on the specific write_version read from the append vec. + let mut mapped: Vec = Vec::new(); + for (_pubkey, mut account) in buffer.drain(..) { // We do not need to rely on the specific write_version read from the append vec. // So, overwrite the write_version with something that works. // 'accounts_to_stream' is already a hashmap, so there is already only entry per pubkey. // write_version is only used to order multiple entries with the same pubkey, so it doesn't matter what value it gets here. // Passing 0 for everyone's write_version is sufficiently correct. - let meta = StoredMeta { - write_version_obsolete: local_write_version, - ..*account.meta() - }; - account.set_meta(&meta); + // let meta: StoredMeta = StoredMeta { + // write_version_obsolete: local_write_version, + // ..*account.meta() + // }; + // account.set_meta(&meta); + let pubkey = *account.pubkey(); + let account = Self::mapmeta(account); let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts"); // notifier.notify_account_restore_from_snapshot(slot, &[&account]); - mapped.push(&account); + mapped.push(account); measure_pure_notify.stop(); notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize; let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping"); - notified_accounts.insert(*account.pubkey()); + // notified_accounts.insert(*account.pubkey()); + notified_accounts.insert(pubkey); measure_bookkeep.stop(); notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize; + println!("ADD ITEM"); } // -- END batch items - notifier.notify_account_restore_from_snapshot(slot, &mapped); + + let mapped2: Vec<&StoredAccountMeta> = mapped.iter().map(|x| x).collect_vec(); + notifier.notify_account_restore_from_snapshot(slot, &mapped2); // TODO check if this allocates } } notify_stats.notified_accounts += accounts_to_stream.len(); @@ -175,6 +189,17 @@ impl AccountsDb { notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize; } + + fn mapmeta(mut account: StoredAccountMeta) -> StoredAccountMeta { + let meta: StoredMeta = StoredMeta { + write_version_obsolete: 12121212, + ..*account.meta() + }; + let boxxed = Box::new(meta); + // account.set_meta(boxxed.as_ref()); + account + } + } diff --git a/geyser-plugin-manager/src/accounts_update_notifier.rs b/geyser-plugin-manager/src/accounts_update_notifier.rs index 94bd802685..1a971277ca 100644 --- a/geyser-plugin-manager/src/accounts_update_notifier.rs +++ b/geyser-plugin-manager/src/accounts_update_notifier.rs @@ -34,6 +34,7 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl { pubkey: &Pubkey, write_version: u64, ) { + print!("notify_account_update"); if let Some(account_info) = self.accountinfo_from_shared_account_data(account, txn, pubkey, write_version) { @@ -164,6 +165,7 @@ impl AccountsUpdateNotifierImpl { for plugin in plugin_manager.plugins.iter() { let mut measure = Measure::start("geyser-plugin-update-account"); + info!("Proccess account update batch ({}) for plugin {}", accounts_batch.len(), plugin.name()); for account in accounts_batch { match plugin.update_account(