Add LedgerColumn::multi_get() (#26354)

#### Problem
Blockstore operations such as get_slots_since() issues multiple rocksdb::get()
at once which is not optimal for performance.

#### Summary of Changes
This PR adds LedgerColumn::multi_get() based on rocksdb::batched_multi_get(),
the optimized version of multi_get() where get requests are processed in batch
to minimize read I/O.
This commit is contained in:
Yueh-Hsuan Chiang 2022-09-12 15:01:22 -07:00 committed by GitHub
parent 0c185d1983
commit ba3d9cd325
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 87 additions and 3 deletions

View File

@ -4628,6 +4628,38 @@ pub mod tests {
assert_eq!(result, data); assert_eq!(result, data);
} }
#[test]
fn test_multi_get() {
const TEST_PUT_ENTRY_COUNT: usize = 100;
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
// Test meta column family
for i in 0..TEST_PUT_ENTRY_COUNT {
let k = u64::try_from(i).unwrap();
let meta = SlotMeta::new(k, Some(k + 1));
blockstore.meta_cf.put(k, &meta).unwrap();
let result = blockstore
.meta_cf
.get(k)
.unwrap()
.expect("Expected meta object to exist");
assert_eq!(result, meta);
}
let mut keys: Vec<u64> = vec![0; TEST_PUT_ENTRY_COUNT];
for (i, key) in keys.iter_mut().enumerate().take(TEST_PUT_ENTRY_COUNT) {
*key = u64::try_from(i).unwrap();
}
let values = blockstore.meta_cf.multi_get(keys);
for (i, value) in values.iter().enumerate().take(TEST_PUT_ENTRY_COUNT) {
let k = u64::try_from(i).unwrap();
assert_eq!(
value.as_ref().unwrap().as_ref().unwrap(),
&SlotMeta::new(k, Some(k + 1))
);
}
}
#[test] #[test]
fn test_read_shred_bytes() { fn test_read_shred_bytes() {
let slot = 0; let slot = 0;

View File

@ -5,7 +5,8 @@ use {
blockstore_metrics::{ blockstore_metrics::{
maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf, maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf,
BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET, BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET,
PERF_METRIC_OP_NAME_PUT, PERF_METRIC_OP_NAME_WRITE_BATCH, PERF_METRIC_OP_NAME_MULTI_GET, PERF_METRIC_OP_NAME_PUT,
PERF_METRIC_OP_NAME_WRITE_BATCH,
}, },
blockstore_options::{ blockstore_options::{
AccessType, BlockstoreOptions, LedgerColumnOptions, ShredStorageType, AccessType, BlockstoreOptions, LedgerColumnOptions, ShredStorageType,
@ -20,7 +21,7 @@ use {
compaction_filter::CompactionFilter, compaction_filter::CompactionFilter,
compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory}, compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory},
properties as RocksProperties, ColumnFamily, ColumnFamilyDescriptor, CompactionDecision, properties as RocksProperties, ColumnFamily, ColumnFamilyDescriptor, CompactionDecision,
DBCompactionStyle, DBIterator, DBRawIterator, FifoCompactOptions, DBCompactionStyle, DBIterator, DBPinnableSlice, DBRawIterator, FifoCompactOptions,
IteratorMode as RocksIteratorMode, LiveFile, Options, WriteBatch as RWriteBatch, DB, IteratorMode as RocksIteratorMode, LiveFile, Options, WriteBatch as RWriteBatch, DB,
}, },
serde::{de::DeserializeOwned, Serialize}, serde::{de::DeserializeOwned, Serialize},
@ -450,6 +451,23 @@ impl Rocks {
Ok(()) Ok(())
} }
fn multi_get_cf(
&self,
cf: &ColumnFamily,
keys: Vec<&[u8]>,
) -> Vec<Result<Option<DBPinnableSlice>>> {
let values = self
.db
.batched_multi_get_cf(cf, keys, false)
.into_iter()
.map(|result| match result {
Ok(opt) => Ok(opt),
Err(e) => Err(BlockstoreError::RocksDb(e)),
})
.collect::<Vec<_>>();
values
}
fn delete_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result<()> { fn delete_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result<()> {
self.db.delete_cf(cf, key)?; self.db.delete_cf(cf, key)?;
Ok(()) Ok(())
@ -1289,6 +1307,40 @@ impl<C> LedgerColumn<C>
where where
C: TypedColumn + ColumnName, C: TypedColumn + ColumnName,
{ {
pub fn multi_get(&self, keys: Vec<C::Index>) -> Vec<Result<Option<C::Type>>> {
let rocks_keys: Vec<_> = keys.into_iter().map(|key| C::key(key)).collect();
{
let ref_rocks_keys: Vec<_> = rocks_keys.iter().map(|k| &k[..]).collect();
let is_perf_enabled = maybe_enable_rocksdb_perf(
self.column_options.rocks_perf_sample_interval,
&self.read_perf_status,
);
let result = self
.backend
.multi_get_cf(self.handle(), ref_rocks_keys)
.into_iter()
.map(|r| match r {
Ok(opt) => match opt {
Some(pinnable_slice) => Ok(Some(deserialize(pinnable_slice.as_ref())?)),
None => Ok(None),
},
Err(e) => Err(e),
})
.collect::<Vec<Result<Option<_>>>>();
if let Some(op_start_instant) = is_perf_enabled {
// use multi-get instead
report_rocksdb_read_perf(
C::NAME,
PERF_METRIC_OP_NAME_MULTI_GET,
&op_start_instant.elapsed(),
&self.column_options,
);
}
result
}
}
pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> { pub fn get(&self, key: C::Index) -> Result<Option<C::Type>> {
let mut result = Ok(None); let mut result = Ok(None);
let is_perf_enabled = maybe_enable_rocksdb_perf( let is_perf_enabled = maybe_enable_rocksdb_perf(

View File

@ -310,7 +310,7 @@ thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell<PerfContext> = RefC
// The minimum time duration between two RocksDB perf samples of the same operation. // The minimum time duration between two RocksDB perf samples of the same operation.
const PERF_SAMPLING_MIN_DURATION: Duration = Duration::from_secs(1); const PERF_SAMPLING_MIN_DURATION: Duration = Duration::from_secs(1);
pub(crate) const PERF_METRIC_OP_NAME_GET: &str = "get"; pub(crate) const PERF_METRIC_OP_NAME_GET: &str = "get";
pub(crate) const PERF_METRIC_OP_NAME_MULTI_GET: &str = "multi_get";
pub(crate) const PERF_METRIC_OP_NAME_PUT: &str = "put"; pub(crate) const PERF_METRIC_OP_NAME_PUT: &str = "put";
pub(crate) const PERF_METRIC_OP_NAME_WRITE_BATCH: &str = "write_batch"; pub(crate) const PERF_METRIC_OP_NAME_WRITE_BATCH: &str = "write_batch";