From 034c5d042212167a5ae3596f4b996f635d69f111 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 20 Dec 2018 11:16:07 -0800 Subject: [PATCH] db_ledger now fully encapsulates rocksdb --- benches/db_ledger.rs | 13 +-- src/broadcast_service.rs | 2 +- src/cluster_info.rs | 6 +- src/db_ledger.rs | 213 ++++++++++++++++++++------------------- src/db_window.rs | 40 +++----- src/erasure.rs | 14 +-- src/window_service.rs | 4 +- 7 files changed, 137 insertions(+), 155 deletions(-) diff --git a/benches/db_ledger.rs b/benches/db_ledger.rs index 085de81361..728d0f05e3 100644 --- a/benches/db_ledger.rs +++ b/benches/db_ledger.rs @@ -23,7 +23,7 @@ fn bench_write_blobs(bench: &mut Bencher, blobs: &mut [&mut Blob], ledger_path: let size = blob.size().unwrap(); db_ledger .data_cf - .put(&db_ledger.db, &key, &blob.data[..BLOB_HEADER_SIZE + size]) + .put(&key, &blob.data[..BLOB_HEADER_SIZE + size]) .unwrap(); blob.set_index(index + num_blobs as u64).unwrap(); } @@ -99,10 +99,9 @@ fn bench_read_sequential(bench: &mut Bencher) { // Generate random starting point in the range [0, total_blobs - 1], read num_reads blobs sequentially let start_index = rng.gen_range(0, num_small_blobs + num_large_blobs); for i in start_index..start_index + num_reads { - let _ = - db_ledger - .data_cf - .get_by_slot_index(&db_ledger.db, slot, i as u64 % total_blobs); + let _ = db_ledger + .data_cf + .get_by_slot_index(slot, i as u64 % total_blobs); } }); @@ -133,9 +132,7 @@ fn bench_read_random(bench: &mut Bencher) { .collect(); bench.iter(move || { for i in indexes.iter() { - let _ = db_ledger - .data_cf - .get_by_slot_index(&db_ledger.db, slot, *i as u64); + let _ = db_ledger.data_cf.get_by_slot_index(slot, *i as u64); } }); diff --git a/src/broadcast_service.rs b/src/broadcast_service.rs index 1fe27e0984..4272e18954 100644 --- a/src/broadcast_service.rs +++ b/src/broadcast_service.rs @@ -472,7 +472,7 @@ mod test { .expect("Leader should exist"); let result = db_ledger .data_cf - .get_by_slot_index(&db_ledger.db, slot, entry_height + i) + .get_by_slot_index(slot, entry_height + i) .unwrap(); assert!(result.is_some()); diff --git a/src/cluster_info.rs b/src/cluster_info.rs index f5449b1504..a93c89909f 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -678,15 +678,13 @@ impl ClusterInfo { ix: u64, ) -> Vec { if let Some(db_ledger) = db_ledger { - let meta = db_ledger - .meta_cf - .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)); + let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)); if let Ok(Some(meta)) = meta { let max_slot = meta.received_slot; // Try to find the requested index in one of the slots for i in 0..=max_slot { - let get_result = db_ledger.data_cf.get_by_slot_index(&db_ledger.db, i, ix); + let get_result = db_ledger.data_cf.get_by_slot_index(i, ix); if let Ok(Some(blob_data)) = get_result { inc_new_counter_info!("cluster_info-window-request-ledger", 1); diff --git a/src/db_ledger.rs b/src/db_ledger.rs index 53001657dd..6ec9e50c93 100644 --- a/src/db_ledger.rs +++ b/src/db_ledger.rs @@ -7,7 +7,7 @@ use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; use bincode::{deserialize, serialize}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, Options, WriteBatch, DB}; +use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, Options, WriteBatch, DB}; use serde::de::DeserializeOwned; use serde::Serialize; use solana_sdk::signature::{Keypair, KeypairUtil}; @@ -15,9 +15,9 @@ use std::borrow::Borrow; use std::cmp::max; use std::io; use std::path::Path; +use std::sync::Arc; -// Re-export rocksdb::DBRawIterator until it can be encapsulated -pub use rocksdb::DBRawIterator; +pub type DbLedgerRawIterator = rocksdb::DBRawIterator; pub const DB_LEDGER_DIRECTORY: &str = "rocksdb"; // A good value for this is the number of cores on the machine @@ -40,8 +40,9 @@ impl std::convert::From for Error { pub trait LedgerColumnFamily { type ValueType: DeserializeOwned + Serialize; - fn get(&self, db: &DB, key: &[u8]) -> Result> { - let data_bytes = db.get_cf(self.handle(db), key)?; + fn get(&self, key: &[u8]) -> Result> { + let db = self.db(); + let data_bytes = db.get_cf(self.handle(), key)?; if let Some(raw) = data_bytes { let result: Self::ValueType = deserialize(&raw)?; @@ -51,47 +52,62 @@ pub trait LedgerColumnFamily { } } - fn get_bytes(&self, db: &DB, key: &[u8]) -> Result>> { - let data_bytes = db.get_cf(self.handle(db), key)?; + fn get_bytes(&self, key: &[u8]) -> Result>> { + let db = self.db(); + let data_bytes = db.get_cf(self.handle(), key)?; Ok(data_bytes.map(|x| x.to_vec())) } - fn put_bytes(&self, db: &DB, key: &[u8], serialized_value: &[u8]) -> Result<()> { - db.put_cf(self.handle(db), &key, &serialized_value)?; + fn put_bytes(&self, key: &[u8], serialized_value: &[u8]) -> Result<()> { + let db = self.db(); + db.put_cf(self.handle(), &key, &serialized_value)?; Ok(()) } - fn put(&self, db: &DB, key: &[u8], value: &Self::ValueType) -> Result<()> { + fn put(&self, key: &[u8], value: &Self::ValueType) -> Result<()> { + let db = self.db(); let serialized = serialize(value)?; - db.put_cf(self.handle(db), &key, &serialized)?; + db.put_cf(self.handle(), &key, &serialized)?; Ok(()) } - fn delete(&self, db: &DB, key: &[u8]) -> Result<()> { - db.delete_cf(self.handle(db), &key)?; + fn delete(&self, key: &[u8]) -> Result<()> { + let db = self.db(); + db.delete_cf(self.handle(), &key)?; Ok(()) } - fn handle(&self, db: &DB) -> ColumnFamily; + fn db(&self) -> &Arc; + fn handle(&self) -> ColumnFamily; } pub trait LedgerColumnFamilyRaw { - fn get(&self, db: &DB, key: &[u8]) -> Result>> { - let data_bytes = db.get_cf(self.handle(db), key)?; + fn get(&self, key: &[u8]) -> Result>> { + let db = self.db(); + let data_bytes = db.get_cf(self.handle(), key)?; Ok(data_bytes.map(|x| x.to_vec())) } - fn put(&self, db: &DB, key: &[u8], serialized_value: &[u8]) -> Result<()> { - db.put_cf(self.handle(db), &key, &serialized_value)?; + fn put(&self, key: &[u8], serialized_value: &[u8]) -> Result<()> { + let db = self.db(); + db.put_cf(self.handle(), &key, &serialized_value)?; Ok(()) } - fn delete(&self, db: &DB, key: &[u8]) -> Result<()> { - db.delete_cf(self.handle(db), &key)?; + fn delete(&self, key: &[u8]) -> Result<()> { + let db = self.db(); + db.delete_cf(self.handle(), &key)?; Ok(()) } - fn handle(&self, db: &DB) -> ColumnFamily; + fn raw_iterator(&self) -> DbLedgerRawIterator { + let db = self.db(); + db.raw_iterator_cf(self.handle()) + .expect("Expected to be able to open database iterator") + } + + fn handle(&self) -> ColumnFamily; + fn db(&self) -> &Arc; } #[derive(Debug, Default, Deserialize, Serialize, Eq, PartialEq)] @@ -119,10 +135,15 @@ impl SlotMeta { } } -#[derive(Default)] -pub struct MetaCf {} +pub struct MetaCf { + db: Arc, +} impl MetaCf { + pub fn new(db: Arc) -> Self { + MetaCf { db } + } + pub fn key(slot_height: u64) -> Vec { let mut key = vec![0u8; 8]; BigEndian::write_u64(&mut key[0..8], slot_height); @@ -133,35 +154,38 @@ impl MetaCf { impl LedgerColumnFamily for MetaCf { type ValueType = SlotMeta; - fn handle(&self, db: &DB) -> ColumnFamily { - db.cf_handle(META_CF).unwrap() + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(META_CF).unwrap() } } // The data column family -#[derive(Default)] -pub struct DataCf {} +pub struct DataCf { + db: Arc, +} impl DataCf { - pub fn get_by_slot_index( - &self, - db: &DB, - slot_height: u64, - index: u64, - ) -> Result>> { + pub fn new(db: Arc) -> Self { + DataCf { db } + } + + pub fn get_by_slot_index(&self, slot_height: u64, index: u64) -> Result>> { let key = Self::key(slot_height, index); - self.get(db, &key) + self.get(&key) } pub fn put_by_slot_index( &self, - db: &DB, slot_height: u64, index: u64, serialized_value: &[u8], ) -> Result<()> { let key = Self::key(slot_height, index); - self.put(db, &key, serialized_value) + self.put(&key, serialized_value) } pub fn key(slot_height: u64, index: u64) -> Vec { @@ -185,40 +209,42 @@ impl DataCf { } impl LedgerColumnFamilyRaw for DataCf { - fn handle(&self, db: &DB) -> ColumnFamily { - db.cf_handle(DATA_CF).unwrap() + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(DATA_CF).unwrap() } } // The erasure column family -#[derive(Default)] -pub struct ErasureCf {} +pub struct ErasureCf { + db: Arc, +} impl ErasureCf { - pub fn delete_by_slot_index(&self, db: &DB, slot_height: u64, index: u64) -> Result<()> { + pub fn new(db: Arc) -> Self { + ErasureCf { db } + } + pub fn delete_by_slot_index(&self, slot_height: u64, index: u64) -> Result<()> { let key = Self::key(slot_height, index); - self.delete(db, &key) + self.delete(&key) } - pub fn get_by_slot_index( - &self, - db: &DB, - slot_height: u64, - index: u64, - ) -> Result>> { + pub fn get_by_slot_index(&self, slot_height: u64, index: u64) -> Result>> { let key = Self::key(slot_height, index); - self.get(db, &key) + self.get(&key) } pub fn put_by_slot_index( &self, - db: &DB, slot_height: u64, index: u64, serialized_value: &[u8], ) -> Result<()> { let key = Self::key(slot_height, index); - self.put(db, &key, serialized_value) + self.put(&key, serialized_value) } pub fn key(slot_height: u64, index: u64) -> Vec { @@ -235,15 +261,19 @@ impl ErasureCf { } impl LedgerColumnFamilyRaw for ErasureCf { - fn handle(&self, db: &DB) -> ColumnFamily { - db.cf_handle(ERASURE_CF).unwrap() + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(ERASURE_CF).unwrap() } } // ledger window pub struct DbLedger { // Underlying database is automatically closed in the Drop implementation of DB - pub db: DB, + db: Arc, pub meta_cf: MetaCf, pub data_cf: DataCf, pub erasure_cf: ErasureCf, @@ -279,16 +309,16 @@ impl DbLedger { ]; // Open the database - let db = DB::open_cf_descriptors(&db_options, ledger_path, cfs)?; + let db = Arc::new(DB::open_cf_descriptors(&db_options, ledger_path, cfs)?); // Create the metadata column family - let meta_cf = MetaCf::default(); + let meta_cf = MetaCf::new(db.clone()); // Create the data column family - let data_cf = DataCf::default(); + let data_cf = DataCf::new(db.clone()); // Create the erasure column family - let erasure_cf = ErasureCf::default(); + let erasure_cf = ErasureCf::new(db.clone()); Ok(DbLedger { db, @@ -372,7 +402,7 @@ impl DbLedger { let mut should_write_meta = false; let mut meta = { - if let Some(meta) = self.db.get_cf(self.meta_cf.handle(&self.db), &meta_key)? { + if let Some(meta) = self.db.get_cf(self.meta_cf.handle(), &meta_key)? { deserialize(&meta)? } else { should_write_meta = true; @@ -444,11 +474,11 @@ impl DbLedger { } else { let key = DataCf::key(current_slot, current_index); let blob_data = { - if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { + if let Some(blob_data) = self.data_cf.get(&key)? { blob_data } else if meta.consumed < meta.received { let key = DataCf::key(current_slot + 1, current_index); - if let Some(blob_data) = self.data_cf.get(&self.db, &key)? { + if let Some(blob_data) = self.data_cf.get(&key)? { current_slot += 1; blob_data } else { @@ -473,14 +503,14 @@ impl DbLedger { // Commit Step: Atomic write both the metadata and the data let mut batch = WriteBatch::default(); if should_write_meta { - batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?; + batch.put_cf(self.meta_cf.handle(), &meta_key, &serialize(&meta)?)?; } for blob in new_blobs { let blob = blob.borrow(); let key = DataCf::key(blob.slot()?, blob.index()?); let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?]; - batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?; + batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; } self.db.write(batch)?; @@ -494,7 +524,7 @@ impl DbLedger { let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); let mut meta = { - if let Some(meta) = self.meta_cf.get(&self.db, &meta_key)? { + if let Some(meta) = self.meta_cf.get(&meta_key)? { let first = blobs[0].read().unwrap(); assert_eq!(meta.consumed, first.index()?); meta @@ -512,12 +542,12 @@ impl DbLedger { } let mut batch = WriteBatch::default(); - batch.put_cf(self.meta_cf.handle(&self.db), &meta_key, &serialize(&meta)?)?; + batch.put_cf(self.meta_cf.handle(), &meta_key, &serialize(&meta)?)?; for blob in blobs { let blob = blob.read().unwrap(); let key = DataCf::key(blob.slot()?, blob.index()?); let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()?]; - batch.put_cf(self.data_cf.handle(&self.db), &key, serialized_blob_datas)?; + batch.put_cf(self.data_cf.handle(), &key, serialized_blob_datas)?; } self.db.write(batch)?; Ok(()) @@ -535,7 +565,7 @@ impl DbLedger { slot_height: u64, ) -> Result<(u64, u64)> { let start_key = DataCf::key(slot_height, start_index); - let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle(&self.db))?; + let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; db_iterator.seek(&start_key); let mut total_blobs = 0; let mut total_current_size = 0; @@ -588,7 +618,7 @@ impl DbLedger { /// Return an iterator for all the entries in the given file. pub fn read_ledger(&self) -> Result> { - let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle(&self.db))?; + let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; db_iterator.seek_to_first(); Ok(EntryIterator { db_iterator }) @@ -696,10 +726,10 @@ mod tests { // Test meta column family let meta = SlotMeta::new(); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); - ledger.meta_cf.put(&ledger.db, &meta_key, &meta).unwrap(); + ledger.meta_cf.put(&meta_key, &meta).unwrap(); let result = ledger .meta_cf - .get(&ledger.db, &meta_key) + .get(&meta_key) .unwrap() .expect("Expected meta object to exist"); @@ -708,14 +738,11 @@ mod tests { // Test erasure column family let erasure = vec![1u8; 16]; let erasure_key = ErasureCf::key(DEFAULT_SLOT_HEIGHT, 0); - ledger - .erasure_cf - .put(&ledger.db, &erasure_key, &erasure) - .unwrap(); + ledger.erasure_cf.put(&erasure_key, &erasure).unwrap(); let result = ledger .erasure_cf - .get(&ledger.db, &erasure_key) + .get(&erasure_key) .unwrap() .expect("Expected erasure object to exist"); @@ -724,11 +751,11 @@ mod tests { // Test data column family let data = vec![2u8; 16]; let data_key = DataCf::key(DEFAULT_SLOT_HEIGHT, 0); - ledger.data_cf.put(&ledger.db, &data_key, &data).unwrap(); + ledger.data_cf.put(&data_key, &data).unwrap(); let result = ledger .data_cf - .get(&ledger.db, &data_key) + .get(&data_key) .unwrap() .expect("Expected data object to exist"); @@ -829,7 +856,7 @@ mod tests { assert!(result.len() == 0); let meta = ledger .meta_cf - .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) + .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)) .unwrap() .expect("Expected new metadata object to be created"); assert!(meta.consumed == 0 && meta.received == 2); @@ -841,7 +868,7 @@ mod tests { let meta = ledger .meta_cf - .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) + .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)) .unwrap() .expect("Expected new metadata object to exist"); assert!(meta.consumed == 2 && meta.received == 2); @@ -871,7 +898,7 @@ mod tests { let meta = ledger .meta_cf - .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) + .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)) .unwrap() .expect("Expected metadata object to exist"); if i != 0 { @@ -913,7 +940,7 @@ mod tests { let result = ledger.insert_data_blobs(vec![blobs[i]]).unwrap(); let meta = ledger .meta_cf - .get(&ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)) + .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)) .unwrap() .expect("Expected metadata object to exist"); if i != 0 { @@ -933,7 +960,6 @@ mod tests { #[test] pub fn test_iteration_order() { let slot = 0; - // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_iteration_order"); { let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); @@ -956,7 +982,7 @@ mod tests { ); let mut db_iterator = db_ledger .db - .raw_iterator_cf(db_ledger.data_cf.handle(&db_ledger.db)) + .raw_iterator_cf(db_ledger.data_cf.handle()) .expect("Expected to be able to open database iterator"); db_iterator.seek(&DataCf::key(slot, 1)); @@ -976,7 +1002,6 @@ mod tests { #[test] pub fn test_insert_data_blobs_bulk() { - // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_insert_data_blobs_bulk"); { let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); @@ -1006,11 +1031,7 @@ mod tests { ); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); - let meta = db_ledger - .meta_cf - .get(&db_ledger.db, &meta_key) - .unwrap() - .unwrap(); + let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap(); assert_eq!(meta.consumed, num_entries); assert_eq!(meta.received, num_entries); assert_eq!(meta.consumed_slot, num_entries - 1); @@ -1082,7 +1103,6 @@ mod tests { #[test] pub fn test_write_consecutive_blobs() { - // Create RocksDb ledger let db_ledger_path = get_tmp_ledger_path("test_write_consecutive_blobs"); { let db_ledger = DbLedger::open(&db_ledger_path).unwrap(); @@ -1102,11 +1122,7 @@ mod tests { .expect("Expect successful blob writes"); let meta_key = MetaCf::key(DEFAULT_SLOT_HEIGHT); - let meta = db_ledger - .meta_cf - .get(&db_ledger.db, &meta_key) - .unwrap() - .unwrap(); + let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap(); assert_eq!(meta.consumed, num_entries); assert_eq!(meta.received, num_entries); assert_eq!(meta.consumed_slot, num_entries - 1); @@ -1122,11 +1138,7 @@ mod tests { .write_consecutive_blobs(&shared_blobs) .expect("Expect successful blob writes"); - let meta = db_ledger - .meta_cf - .get(&db_ledger.db, &meta_key) - .unwrap() - .unwrap(); + let meta = db_ledger.meta_cf.get(&meta_key).unwrap().unwrap(); assert_eq!(meta.consumed, 2 * num_entries); assert_eq!(meta.received, 2 * num_entries); assert_eq!(meta.consumed_slot, 2 * num_entries - 1); @@ -1137,7 +1149,6 @@ mod tests { #[test] pub fn test_genesis_and_entry_iterator() { - // Create RocksDb ledger let entries = make_tiny_test_entries(100); let ledger_path = get_tmp_ledger_path("test_genesis_and_entry_iterator"); { diff --git a/src/db_window.rs b/src/db_window.rs index 4c026ad0f1..4c2e7e784d 100644 --- a/src/db_window.rs +++ b/src/db_window.rs @@ -31,9 +31,7 @@ pub fn repair( ) -> Result)>> { let rcluster_info = cluster_info.read().unwrap(); let mut is_next_leader = false; - let meta = db_ledger - .meta_cf - .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?; + let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))?; if meta.is_none() { return Ok(vec![]); } @@ -121,7 +119,7 @@ pub fn repair( // Given a start and end entry index, find all the missing // indexes in the ledger in the range [start_index, end_index) pub fn find_missing_indexes( - db_iterator: &mut DBRawIterator, + db_iterator: &mut DbLedgerRawIterator, slot: u64, start_index: u64, end_index: u64, @@ -178,10 +176,7 @@ pub fn find_missing_data_indexes( end_index: u64, max_missing: usize, ) -> Vec { - let mut db_iterator = db_ledger - .db - .raw_iterator_cf(db_ledger.data_cf.handle(&db_ledger.db)) - .expect("Expected to be able to open database iterator"); + let mut db_iterator = db_ledger.data_cf.raw_iterator(); find_missing_indexes( &mut db_iterator, @@ -201,10 +196,7 @@ pub fn find_missing_coding_indexes( end_index: u64, max_missing: usize, ) -> Vec { - let mut db_iterator = db_ledger - .db - .raw_iterator_cf(db_ledger.erasure_cf.handle(&db_ledger.db)) - .expect("Expected to be able to open database iterator"); + let mut db_iterator = db_ledger.erasure_cf.raw_iterator(); find_missing_indexes( &mut db_iterator, @@ -302,11 +294,9 @@ pub fn process_blob( let erasure_key = ErasureCf::key(slot, pix); let rblob = &blob.read().unwrap(); let size = rblob.size()?; - db_ledger.erasure_cf.put( - &db_ledger.db, - &erasure_key, - &rblob.data[..BLOB_HEADER_SIZE + size], - )?; + db_ledger + .erasure_cf + .put(&erasure_key, &rblob.data[..BLOB_HEADER_SIZE + size])?; vec![] } else { db_ledger.insert_data_blobs(vec![&*blob.read().unwrap()])? @@ -335,7 +325,7 @@ pub fn process_blob( if max_ix != 0 && !consumed_entries.is_empty() { let meta = db_ledger .meta_cf - .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))? + .get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))? .expect("Expect metadata to exist if consumed entries is nonzero"); let consumed = meta.consumed; @@ -376,9 +366,7 @@ pub fn calculate_max_repair_entry_height( #[cfg(feature = "erasure")] fn try_erasure(db_ledger: &Arc, consume_queue: &mut Vec) -> Result<()> { - let meta = db_ledger - .meta_cf - .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT))?; + let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT))?; if let Some(meta) = meta { let (data, coding) = erasure::recover(db_ledger, meta.consumed_slot, meta.consumed)?; @@ -389,11 +377,9 @@ fn try_erasure(db_ledger: &Arc, consume_queue: &mut Vec) -> Res cl.index().expect("Recovered blob must set index"), ); let size = cl.size().expect("Recovered blob must set size"); - db_ledger.erasure_cf.put( - &db_ledger.db, - &erasure_key, - &cl.data[..BLOB_HEADER_SIZE + size], - )?; + db_ledger + .erasure_cf + .put(&erasure_key, &cl.data[..BLOB_HEADER_SIZE + size])?; } let entries = db_ledger.write_shared_blobs(data)?; @@ -741,7 +727,7 @@ mod test { assert_eq!( &db_ledger .erasure_cf - .get_by_slot_index(&db_ledger.db, slot_height, erase_offset as u64) + .get_by_slot_index(slot_height, erase_offset as u64) .unwrap() .unwrap()[BLOB_HEADER_SIZE..], &erased_coding_l.data()[..erased_coding_l.size().unwrap() as usize], diff --git a/src/erasure.rs b/src/erasure.rs index 4fb17d9aec..f493df02c4 100644 --- a/src/erasure.rs +++ b/src/erasure.rs @@ -410,9 +410,7 @@ pub fn recover( // Add the data blobs we have into the recovery vector, mark the missing ones for i in block_start_idx..block_end_idx { - let result = db_ledger - .data_cf - .get_by_slot_index(&db_ledger.db, slot, i)?; + let result = db_ledger.data_cf.get_by_slot_index(slot, i)?; categorize_blob( &result, @@ -425,9 +423,7 @@ pub fn recover( // Add the coding blobs we have into the recovery vector, mark the missing ones for i in coding_start_idx..block_end_idx { - let result = db_ledger - .erasure_cf - .get_by_slot_index(&db_ledger.db, slot, i)?; + let result = db_ledger.erasure_cf.get_by_slot_index(slot, i)?; categorize_blob( &result, @@ -520,9 +516,7 @@ pub fn recover( // Remove the corrupted coding blobs so there's no effort wasted in trying to reconstruct // the blobs again for i in coding_start_idx..block_end_idx { - db_ledger - .erasure_cf - .delete_by_slot_index(&db_ledger.db, slot, i)?; + db_ledger.erasure_cf.delete_by_slot_index(slot, i)?; } return Ok((vec![], vec![])); } @@ -638,7 +632,6 @@ pub mod test { db_ledger .data_cf .put_by_slot_index( - &db_ledger.db, data_l.slot().unwrap(), data_l.index().unwrap(), &data_l.data[..data_l.data_size().unwrap() as usize], @@ -665,7 +658,6 @@ pub mod test { db_ledger .erasure_cf .put_by_slot_index( - &db_ledger.db, coding_lock.slot().unwrap(), index, &coding_lock.data[..data_size as usize + BLOB_HEADER_SIZE], diff --git a/src/window_service.rs b/src/window_service.rs index 828e920285..10cf4287a0 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -164,9 +164,7 @@ pub fn window_service( } } - let meta = db_ledger - .meta_cf - .get(&db_ledger.db, &MetaCf::key(DEFAULT_SLOT_HEIGHT)); + let meta = db_ledger.meta_cf.get(&MetaCf::key(DEFAULT_SLOT_HEIGHT)); if let Ok(Some(meta)) = meta { let received = meta.received;