parent
f0f75aff59
commit
1b9c9a313c
|
@ -1637,7 +1637,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
|
||||||
let is_zero_lamport = account_info.is_zero_lamport();
|
let is_zero_lamport = account_info.is_zero_lamport();
|
||||||
let result = if is_zero_lamport { Some(pubkey) } else { None };
|
let result = if is_zero_lamport { Some(pubkey) } else { None };
|
||||||
|
|
||||||
binned[binned_index].1.push((pubkey, account_info));
|
binned[binned_index].1.push((pubkey, (slot, account_info)));
|
||||||
result
|
result
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
@ -1649,25 +1649,29 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> AccountsIndex<T, U> {
|
||||||
let r_account_maps = &self.account_maps[pubkey_bin];
|
let r_account_maps = &self.account_maps[pubkey_bin];
|
||||||
let mut insert_time = Measure::start("insert_into_primary_index");
|
let mut insert_time = Measure::start("insert_into_primary_index");
|
||||||
if use_disk {
|
if use_disk {
|
||||||
r_account_maps.startup_insert_only(slot, items.into_iter());
|
r_account_maps.startup_insert_only(items.into_iter());
|
||||||
} else {
|
} else {
|
||||||
// not using disk buckets, so just write to in-mem
|
// not using disk buckets, so just write to in-mem
|
||||||
// this is no longer the default case
|
// this is no longer the default case
|
||||||
items.into_iter().for_each(|(pubkey, account_info)| {
|
items
|
||||||
let new_entry = PreAllocatedAccountMapEntry::new(
|
.into_iter()
|
||||||
slot,
|
.for_each(|(pubkey, (slot, account_info))| {
|
||||||
account_info,
|
let new_entry = PreAllocatedAccountMapEntry::new(
|
||||||
&self.storage.storage,
|
slot,
|
||||||
use_disk,
|
account_info,
|
||||||
);
|
&self.storage.storage,
|
||||||
match r_account_maps.insert_new_entry_if_missing_with_lock(pubkey, new_entry) {
|
use_disk,
|
||||||
InsertNewEntryResults::DidNotExist => {}
|
);
|
||||||
InsertNewEntryResults::ExistedNewEntryZeroLamports => {}
|
match r_account_maps
|
||||||
InsertNewEntryResults::ExistedNewEntryNonZeroLamports => {
|
.insert_new_entry_if_missing_with_lock(pubkey, new_entry)
|
||||||
dirty_pubkeys.push(pubkey);
|
{
|
||||||
|
InsertNewEntryResults::DidNotExist => {}
|
||||||
|
InsertNewEntryResults::ExistedNewEntryZeroLamports => {}
|
||||||
|
InsertNewEntryResults::ExistedNewEntryNonZeroLamports => {
|
||||||
|
dirty_pubkeys.push(pubkey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
});
|
|
||||||
}
|
}
|
||||||
insert_time.stop();
|
insert_time.stop();
|
||||||
insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed);
|
insertion_time.fetch_add(insert_time.as_us(), Ordering::Relaxed);
|
||||||
|
|
|
@ -142,7 +142,7 @@ struct StartupInfoDuplicates<T: IndexValue> {
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
struct StartupInfo<T: IndexValue> {
|
struct StartupInfo<T: IndexValue> {
|
||||||
/// entries to add next time we are flushing to disk
|
/// entries to add next time we are flushing to disk
|
||||||
insert: Mutex<Vec<(Slot, Pubkey, T)>>,
|
insert: Mutex<Vec<(Pubkey, (Slot, T))>>,
|
||||||
/// pubkeys with more than 1 entry
|
/// pubkeys with more than 1 entry
|
||||||
duplicates: Mutex<StartupInfoDuplicates<T>>,
|
duplicates: Mutex<StartupInfoDuplicates<T>>,
|
||||||
}
|
}
|
||||||
|
@ -672,14 +672,16 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
||||||
|
|
||||||
/// Queue up these insertions for when the flush thread is dealing with this bin.
|
/// Queue up these insertions for when the flush thread is dealing with this bin.
|
||||||
/// This is very fast and requires no lookups or disk access.
|
/// This is very fast and requires no lookups or disk access.
|
||||||
pub fn startup_insert_only(&self, slot: Slot, items: impl Iterator<Item = (Pubkey, T)>) {
|
pub fn startup_insert_only(&self, items: impl Iterator<Item = (Pubkey, (Slot, T))>) {
|
||||||
assert!(self.storage.get_startup());
|
assert!(self.storage.get_startup());
|
||||||
assert!(self.bucket.is_some());
|
assert!(self.bucket.is_some());
|
||||||
|
|
||||||
let mut insert = self.startup_info.insert.lock().unwrap();
|
let mut insert = self.startup_info.insert.lock().unwrap();
|
||||||
|
// todo: memcpy the new slice into our vector already
|
||||||
|
// todo: avoid reallocs and just allocate another vec instead of likely resizing this one over and over
|
||||||
items
|
items
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.for_each(|(k, v)| insert.push((slot, k, v)));
|
.for_each(|(k, (slot, v))| insert.push((k, (slot, v))));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert_new_entry_if_missing_with_lock(
|
pub fn insert_new_entry_if_missing_with_lock(
|
||||||
|
@ -1069,7 +1071,9 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
|
||||||
let disk = self.bucket.as_ref().unwrap();
|
let disk = self.bucket.as_ref().unwrap();
|
||||||
let mut count = insert.len() as u64;
|
let mut count = insert.len() as u64;
|
||||||
for (k, entry, duplicate_entry) in disk.batch_insert_non_duplicates(
|
for (k, entry, duplicate_entry) in disk.batch_insert_non_duplicates(
|
||||||
insert.into_iter().map(|(slot, k, v)| (k, (slot, v.into()))),
|
insert
|
||||||
|
.into_iter()
|
||||||
|
.map(|(k, (slot, v))| (k, (slot, v.into()))),
|
||||||
count as usize,
|
count as usize,
|
||||||
) {
|
) {
|
||||||
duplicates.duplicates.push((entry.0, k, entry.1.into()));
|
duplicates.duplicates.push((entry.0, k, entry.1.into()));
|
||||||
|
|
Loading…
Reference in New Issue