simplify batch

This commit is contained in:
steve-gg 2024-04-15 07:59:36 +02:00
parent e85e5a22bc
commit 1fea1c36a1
No known key found for this signature in database
GPG Key ID: 5B6EB831A5CD2015
2 changed files with 36 additions and 39 deletions

View File

@ -142,49 +142,52 @@ impl AccountsDb {
let mut batcher = BatchAccountNotfier {};
let mut measure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
let local_write_version = 0;
let local_write_version = 0; // TODO restore
let mut buffer = Vec::with_capacity(1000);
// let mut mapped: Vec<StoredAccountMeta> = Vec::with_capacity(1000);
let mut remaining = accounts_to_stream.len();
'drain: for acc in accounts_to_stream.drain() {
remaining -= 1;
println!("drain... remaining: {}", remaining);
if buffer.len() < 1000 {
buffer.push(acc);
if remaining > 0 {
continue 'drain;
}
} else {
let mut mapped: Vec<StoredAccountMeta> = Vec::with_capacity(1000);
for (_pubkey, mut account) in buffer.drain(..) { // We do not need to rely on the specific write_version read from the append vec.
// So, overwrite the write_version with something that works.
// 'accounts_to_stream' is already a hashmap, so there is already only entry per pubkey.
// write_version is only used to order multiple entries with the same pubkey, so it doesn't matter what value it gets here.
// Passing 0 for everyone's write_version is sufficiently correct.
// let meta: StoredMeta = StoredMeta {
// write_version_obsolete: local_write_version,
// ..*account.meta()
// };
// account.set_meta(&meta);
let pubkey = *account.pubkey();
let account = Self::mapmeta(account);
let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
// batcher.insert();
// notifier.notify_account_restore_from_snapshot(slot, &[&account]);
mapped.push(account);
measure_pure_notify.stop();
notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
// notified_accounts.insert(*account.pubkey());
notified_accounts.insert(pubkey);
measure_bookkeep.stop();
notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
} // -- END batch items
notifier.notify_account_restore_from_snapshot(slot, &mapped); // TODO check if this allocates
}
// mapped.clear();
// for (_pubkey, mut account) in buffer.drain(..) { // We do not need to rely on the specific write_version read from the append vec.
// So, overwrite the write_version with something that works.
// 'accounts_to_stream' is already a hashmap, so there is already only entry per pubkey.
// write_version is only used to order multiple entries with the same pubkey, so it doesn't matter what value it gets here.
// Passing 0 for everyone's write_version is sufficiently correct.
// let meta: StoredMeta = StoredMeta {
// write_version_obsolete: local_write_version,
// ..*account.meta()
// };
// account.set_meta(&meta);
// let pubkey = *account.pubkey();
// let account = Self::mapmeta(account);
// let mut measure_pure_notify = Measure::start("accountsdb-plugin-notifying-accounts");
// batcher.insert();
// notifier.notify_account_restore_from_snapshot(slot, &[&account]);
// mapped.push(account);
// measure_pure_notify.stop();
// notify_stats.total_pure_notify += measure_pure_notify.as_us() as usize;
// let mut measure_bookkeep = Measure::start("accountsdb-plugin-notifying-bookeeeping");
// notified_accounts.insert(*account.pubkey());
// notified_accounts.insert(pubkey); -> see below
// measure_bookkeep.stop();
// notify_stats.total_pure_bookeeping += measure_bookkeep.as_us() as usize;
// } // -- END batch items
notified_accounts.extend(buffer.iter().map(|(pubkey, _meta)| pubkey));
let mapped = buffer.drain(..).map(|(_pubkey, meta)| Self::mapmeta(meta)).collect_vec();
notifier.notify_account_restore_from_snapshot(slot, &mapped); // TODO check if this allocates
}
batcher.flush();

View File

@ -154,7 +154,6 @@ impl AccountsUpdateNotifierImpl {
slot: Slot,
is_startup: bool,
) {
let started_at = Instant::now();
let mut measure2 = Measure::start("geyser-plugin-notify_plugins_of_account_update");
let plugin_manager = self.plugin_manager.read().unwrap();
@ -164,7 +163,6 @@ impl AccountsUpdateNotifierImpl {
for plugin in plugin_manager.plugins.iter() {
let mut measure = Measure::start("geyser-plugin-update-account");
debug!("Proccess account update batch ({}) for plugin {}", accounts_batch.len(), plugin.name());
for account in accounts_batch {
match plugin.update_account(
@ -208,9 +206,5 @@ impl AccountsUpdateNotifierImpl {
100000,
100000
);
debug!(
"Processed account update batch ({}) for all plugins in {}us",
accounts_batch.len(),
started_at.elapsed().as_micros());
}
}