batch notify

This commit is contained in:
steve-gg 2024-04-12 22:06:26 +02:00
parent 3479ec55f8
commit 5e5a4d5c2c
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 38 additions and 11 deletions

View File

@ -48,9 +48,11 @@ impl AccountsDb {
/// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated /// in the reverse order of the slots so that an account is only streamed once. At a slot, if the accounts is updated
/// multiple times only the last write (with highest write_version) is notified. /// multiple times only the last write (with highest write_version) is notified.
pub fn notify_account_restore_from_snapshot(&self) { pub fn notify_account_restore_from_snapshot(&self) {
println!("notify_account_restore_from_snapshot...");
if self.accounts_update_notifier.is_none() { if self.accounts_update_notifier.is_none() {
return; return;
} }
println!("notify_account_restore_from_snapshot...cont");
let mut slots = self.storage.all_slots(); let mut slots = self.storage.all_slots();
let mut notified_accounts: HashSet<Pubkey> = HashSet::default(); let mut notified_accounts: HashSet<Pubkey> = HashSet::default();
@ -77,6 +79,7 @@ impl AccountsDb {
) where ) where
P: Iterator<Item = u64>, P: Iterator<Item = u64>,
{ {
println!("notify_account_at_accounts_update");
if let Some(accounts_update_notifier) = &self.accounts_update_notifier { if let Some(accounts_update_notifier) = &self.accounts_update_notifier {
let notifier = &accounts_update_notifier.read().unwrap(); let notifier = &accounts_update_notifier.read().unwrap();
notifier.notify_account_update( notifier.notify_account_update(
@ -127,6 +130,7 @@ impl AccountsDb {
mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta>, mut accounts_to_stream: HashMap<Pubkey, StoredAccountMeta>,
notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats, notify_stats: &mut GeyserPluginNotifyAtSnapshotRestoreStats,
) { ) {
println!("notify_filtered_accounts: {}", accounts_to_stream.len());
let notifier = self let notifier = self
.accounts_update_notifier .accounts_update_notifier
.as_ref() .as_ref()
@ -138,36 +142,46 @@ impl AccountsDb {
let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts"); let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
let local_write_version = 0; let local_write_version = 0;
let mut buffer = Vec::with_capacity(1000); let mut buffer = Vec::with_capacity(1000);
let mut remaining = accounts_to_stream.len();
'drain: for acc in accounts_to_stream.drain() { 'drain: for acc in accounts_to_stream.drain() {
remaining -= 1;
if buffer.len() < 1000 { if buffer.len() < 1000 {
buffer.push(acc); buffer.push(acc);
continue 'drain; if remaining > 0 {
continue 'drain;
}
} else { } else {
let mut mapped = Vec::new(); let mut mapped: Vec<StoredAccountMeta> = Vec::new();
for (pubkey, mut account) in buffer.drain(..) { // We do not need to rely on the specific write_version read from the append vec. for (_pubkey, mut account) in buffer.drain(..) { // We do not need to rely on the specific write_version read from the append vec.
// So, overwrite the write_version with something that works. // So, overwrite the write_version with something that works.
// 'accounts_to_stream' is already a hashmap, so there is already only entry per pubkey. // 'accounts_to_stream' is already a hashmap, so there is already only entry per pubkey.
// write_version is only used to order multiple entries with the same pubkey, so it doesn't matter what value it gets here. // write_version is only used to order multiple entries with the same pubkey, so it doesn't matter what value it gets here.
// Passing 0 for everyone's write_version is sufficiently correct. // Passing 0 for everyone's write_version is sufficiently correct.
let meta = StoredMeta { // let meta: StoredMeta = StoredMeta {
write_version_obsolete: local_write_version, // write_version_obsolete: local_write_version,
..*account.meta() // ..*account.meta()
}; // };
account.set_meta(&meta); // account.set_meta(&meta);
let pubkey = *account.pubkey();
let account = Self::mapmeta(account);
let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts"); let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
// notifier.notify_account_restore_from_snapshot(slot, &[&account]); // notifier.notify_account_restore_from_snapshot(slot, &[&account]);
mapped.push(&account); mapped.push(account);
measure_pure_notify.stop(); measure_pure_notify.stop();
notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize; notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping"); let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
notified_accounts.insert(*account.pubkey()); // notified_accounts.insert(*account.pubkey());
notified_accounts.insert(pubkey);
measure_bookkeep.stop(); measure_bookkeep.stop();
notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize; notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
println!("ADD ITEM");
} // -- END batch items } // -- END batch items
notifier.notify_account_restore_from_snapshot(slot, &mapped);
let mapped2: Vec<&StoredAccountMeta> = mapped.iter().map(|x| x).collect_vec();
notifier.notify_account_restore_from_snapshot(slot, &mapped2); // TODO check if this allocates
} }
} }
notify_stats.notified_accounts += accounts_to_stream.len(); notify_stats.notified_accounts += accounts_to_stream.len();
@ -175,6 +189,17 @@ impl AccountsDb {
notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize; notify_stats.elapsed_notifying_us += measure_notify.as_us() as usize;
} }
fn mapmeta(mut account: StoredAccountMeta) -> StoredAccountMeta {
let meta: StoredMeta = StoredMeta {
write_version_obsolete: 12121212,
..*account.meta()
};
let boxxed = Box::new(meta);
// account.set_meta(boxxed.as_ref());
account
}
} }

View File

@ -34,6 +34,7 @@ impl AccountsUpdateNotifierInterface for AccountsUpdateNotifierImpl {
pubkey: &Pubkey, pubkey: &Pubkey,
write_version: u64, write_version: u64,
) { ) {
print!("notify_account_update");
if let Some(account_info) = if let Some(account_info) =
self.accountinfo_from_shared_account_data(account, txn, pubkey, write_version) self.accountinfo_from_shared_account_data(account, txn, pubkey, write_version)
{ {
@ -164,6 +165,7 @@ impl AccountsUpdateNotifierImpl {
for plugin in plugin_manager.plugins.iter() { for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-update-account"); let mut measure = Measure::start("geyser-plugin-update-account");
info!("Proccess account update batch ({}) for plugin {}", accounts_batch.len(), plugin.name());
for account in accounts_batch { for account in accounts_batch {
match plugin.update_account( match plugin.update_account(