From ba3d9cd3250d26a28cc58521a3661671c55da7fb Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang <93241502+yhchiang-sol@users.noreply.github.com> Date: Mon, 12 Sep 2022 15:01:22 -0700 Subject: [PATCH] Add LedgerColumn::multi_get() (#26354) #### Problem Blockstore operations such as get_slots_since() issues multiple rocksdb::get() at once which is not optimal for performance. #### Summary of Changes This PR adds LedgerColumn::multi_get() based on rocksdb::batched_multi_get(), the optimized version of multi_get() where get requests are processed in batch to minimize read I/O. --- ledger/src/blockstore.rs | 32 ++++++++++++++++++ ledger/src/blockstore_db.rs | 56 ++++++++++++++++++++++++++++++-- ledger/src/blockstore_metrics.rs | 2 +- 3 files changed, 87 insertions(+), 3 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 3c44d60b51..78b87ee257 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -4628,6 +4628,38 @@ pub mod tests { assert_eq!(result, data); } + #[test] + fn test_multi_get() { + const TEST_PUT_ENTRY_COUNT: usize = 100; + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()).unwrap(); + + // Test meta column family + for i in 0..TEST_PUT_ENTRY_COUNT { + let k = u64::try_from(i).unwrap(); + let meta = SlotMeta::new(k, Some(k + 1)); + blockstore.meta_cf.put(k, &meta).unwrap(); + let result = blockstore + .meta_cf + .get(k) + .unwrap() + .expect("Expected meta object to exist"); + assert_eq!(result, meta); + } + let mut keys: Vec = vec![0; TEST_PUT_ENTRY_COUNT]; + for (i, key) in keys.iter_mut().enumerate().take(TEST_PUT_ENTRY_COUNT) { + *key = u64::try_from(i).unwrap(); + } + let values = blockstore.meta_cf.multi_get(keys); + for (i, value) in values.iter().enumerate().take(TEST_PUT_ENTRY_COUNT) { + let k = u64::try_from(i).unwrap(); + assert_eq!( + value.as_ref().unwrap().as_ref().unwrap(), + &SlotMeta::new(k, Some(k + 1)) + ); + } + } + #[test] fn test_read_shred_bytes() { let slot = 0; diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 32d6d7752a..f7003112c7 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -5,7 +5,8 @@ use { blockstore_metrics::{ maybe_enable_rocksdb_perf, report_rocksdb_read_perf, report_rocksdb_write_perf, BlockstoreRocksDbColumnFamilyMetrics, PerfSamplingStatus, PERF_METRIC_OP_NAME_GET, - PERF_METRIC_OP_NAME_PUT, PERF_METRIC_OP_NAME_WRITE_BATCH, + PERF_METRIC_OP_NAME_MULTI_GET, PERF_METRIC_OP_NAME_PUT, + PERF_METRIC_OP_NAME_WRITE_BATCH, }, blockstore_options::{ AccessType, BlockstoreOptions, LedgerColumnOptions, ShredStorageType, @@ -20,7 +21,7 @@ use { compaction_filter::CompactionFilter, compaction_filter_factory::{CompactionFilterContext, CompactionFilterFactory}, properties as RocksProperties, ColumnFamily, ColumnFamilyDescriptor, CompactionDecision, - DBCompactionStyle, DBIterator, DBRawIterator, FifoCompactOptions, + DBCompactionStyle, DBIterator, DBPinnableSlice, DBRawIterator, FifoCompactOptions, IteratorMode as RocksIteratorMode, LiveFile, Options, WriteBatch as RWriteBatch, DB, }, serde::{de::DeserializeOwned, Serialize}, @@ -450,6 +451,23 @@ impl Rocks { Ok(()) } + fn multi_get_cf( + &self, + cf: &ColumnFamily, + keys: Vec<&[u8]>, + ) -> Vec>> { + let values = self + .db + .batched_multi_get_cf(cf, keys, false) + .into_iter() + .map(|result| match result { + Ok(opt) => Ok(opt), + Err(e) => Err(BlockstoreError::RocksDb(e)), + }) + .collect::>(); + values + } + fn delete_cf(&self, cf: &ColumnFamily, key: &[u8]) -> Result<()> { self.db.delete_cf(cf, key)?; Ok(()) @@ -1289,6 +1307,40 @@ impl LedgerColumn where C: TypedColumn + ColumnName, { + pub fn multi_get(&self, keys: Vec) -> Vec>> { + let rocks_keys: Vec<_> = keys.into_iter().map(|key| C::key(key)).collect(); + { + let ref_rocks_keys: Vec<_> = rocks_keys.iter().map(|k| &k[..]).collect(); + let is_perf_enabled = maybe_enable_rocksdb_perf( + self.column_options.rocks_perf_sample_interval, + &self.read_perf_status, + ); + let result = self + .backend + .multi_get_cf(self.handle(), ref_rocks_keys) + .into_iter() + .map(|r| match r { + Ok(opt) => match opt { + Some(pinnable_slice) => Ok(Some(deserialize(pinnable_slice.as_ref())?)), + None => Ok(None), + }, + Err(e) => Err(e), + }) + .collect::>>>(); + if let Some(op_start_instant) = is_perf_enabled { + // use multi-get instead + report_rocksdb_read_perf( + C::NAME, + PERF_METRIC_OP_NAME_MULTI_GET, + &op_start_instant.elapsed(), + &self.column_options, + ); + } + + result + } + } + pub fn get(&self, key: C::Index) -> Result> { let mut result = Ok(None); let is_perf_enabled = maybe_enable_rocksdb_perf( diff --git a/ledger/src/blockstore_metrics.rs b/ledger/src/blockstore_metrics.rs index be291ee416..4a6f48c95d 100644 --- a/ledger/src/blockstore_metrics.rs +++ b/ledger/src/blockstore_metrics.rs @@ -310,7 +310,7 @@ thread_local! {static PER_THREAD_ROCKS_PERF_CONTEXT: RefCell = RefC // The minimum time duration between two RocksDB perf samples of the same operation. const PERF_SAMPLING_MIN_DURATION: Duration = Duration::from_secs(1); pub(crate) const PERF_METRIC_OP_NAME_GET: &str = "get"; - +pub(crate) const PERF_METRIC_OP_NAME_MULTI_GET: &str = "multi_get"; pub(crate) const PERF_METRIC_OP_NAME_PUT: &str = "put"; pub(crate) const PERF_METRIC_OP_NAME_WRITE_BATCH: &str = "write_batch";