From ed48d8323c35c85afc3f3f990cdfa5e795c94190 Mon Sep 17 00:00:00 2001 From: "Mark E. Sinclair" <48664490+mark-solana@users.noreply.github.com> Date: Fri, 3 May 2019 16:46:02 -0500 Subject: [PATCH] Reduce locking in Blocktree (#4075) * Reduce lock contention in blocktree * Store root slot in separate column --- core/src/blocktree.rs | 187 +++++++++++++++--------------------- core/src/blocktree/db.rs | 145 +++++++++++++++++----------- core/src/blocktree/kvs.rs | 30 ++++++ core/src/blocktree/meta.rs | 3 - core/src/blocktree/rocks.rs | 22 ++++- core/src/repair_service.rs | 2 +- 6 files changed, 217 insertions(+), 172 deletions(-) diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index fe0bc4e28c..dbd6a53f4d 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -49,6 +49,7 @@ macro_rules! db_imports { pub type Cursor = db::Cursor<$db, C>; pub type LedgerColumn = db::LedgerColumn<$db, C>; pub type WriteBatch = db::WriteBatch<$db>; + type BatchProcessor = db::BatchProcessor<$db>; pub trait Column: db::Column<$db> {} impl> Column for C {} @@ -73,15 +74,15 @@ pub enum BlocktreeError { // ledger window pub struct Blocktree { - db: Arc>, + db: Arc, meta_cf: LedgerColumn, data_cf: LedgerColumn, erasure_cf: LedgerColumn, erasure_meta_cf: LedgerColumn, orphans_cf: LedgerColumn, + batch_processor: Arc>, session: Arc, pub new_blobs_signals: Vec>, - pub root_slot: RwLock, } // Column family for metadata about a leader slot @@ -93,6 +94,8 @@ pub const ERASURE_CF: &str = "erasure"; pub const ERASURE_META_CF: &str = "erasure_meta"; // Column family for orphans data pub const ORPHANS_CF: &str = "orphans"; +// Column family for root data +pub const ROOT_CF: &str = "root"; impl Blocktree { /// Opens a Ledger in directory, provides "infinite" window of blobs @@ -105,6 +108,8 @@ impl Blocktree { // Open the database let db = Database::open(&ledger_path)?; + let batch_processor = Arc::new(RwLock::new(db.batch_processor())); + // Create the metadata column family let meta_cf = db.column(); @@ -124,7 +129,7 @@ impl Blocktree { // setup erasure let session = Arc::new(erasure::Session::default()); - let db = Arc::new(RwLock::new(db)); + let db = Arc::new(db); Ok(Blocktree { db, @@ -135,7 +140,7 @@ impl Blocktree { orphans_cf, session, new_blobs_signals: vec![], - root_slot: RwLock::new(0), + batch_processor, }) } @@ -155,21 +160,19 @@ impl Blocktree { } pub fn meta(&self, slot: u64) -> Result> { - self.meta_cf.get(&*self.db.read().unwrap(), slot) + self.meta_cf.get(slot) } pub fn erasure_meta(&self, slot: u64, set_index: u64) -> Result> { - self.erasure_meta_cf - .get(&*self.db.read().unwrap(), (slot, set_index)) + self.erasure_meta_cf.get((slot, set_index)) } pub fn orphan(&self, slot: u64) -> Result> { - self.orphans_cf.get(&*self.db.read().unwrap(), slot) + self.orphans_cf.get(slot) } pub fn get_next_slot(&self, slot: u64) -> Result> { - let db = self.db.read().unwrap(); - let mut db_iterator = db.cursor::()?; + let mut db_iterator = self.db.cursor::()?; db_iterator.seek(slot + 1); if !db_iterator.valid() { @@ -264,8 +267,9 @@ impl Blocktree { I: IntoIterator, I::Item: Borrow, { - let mut db = self.db.write().unwrap(); - let mut write_batch = db.batch()?; + let db = &*self.db; + let mut batch_processor = self.batch_processor.write().unwrap(); + let mut write_batch = batch_processor.batch()?; let new_blobs: Vec<_> = new_blobs.into_iter().collect(); let mut recovered_data = vec![]; @@ -285,7 +289,7 @@ impl Blocktree { .entry((blob_slot, set_index)) .or_insert_with(|| { self.erasure_meta_cf - .get(&db, (blob_slot, set_index)) + .get((blob_slot, set_index)) .expect("Expect database get to succeed") .unwrap_or_else(|| ErasureMeta::new(set_index)) }); @@ -358,7 +362,7 @@ impl Blocktree { } } - db.write(write_batch)?; + batch_processor.write(write_batch)?; Ok(()) } @@ -374,8 +378,7 @@ impl Blocktree { buf: &mut [u8], slot: u64, ) -> Result<(u64, u64)> { - let db = self.db.read().unwrap(); - let mut db_iterator = db.cursor::()?; + let mut db_iterator = self.db.cursor::()?; db_iterator.seek((slot, start_index)); let mut total_blobs = 0; @@ -427,69 +430,65 @@ impl Blocktree { } pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { - let db = self.db.read().unwrap(); - self.erasure_cf.get_bytes(&db, (slot, index)) + self.erasure_cf.get_bytes((slot, index)) } pub fn delete_coding_blob(&self, slot: u64, index: u64) -> Result<()> { let set_index = ErasureMeta::set_index_for(index); - let mut db = self.db.write().unwrap(); + let mut batch_processor = self.batch_processor.write().unwrap(); let mut erasure_meta = self .erasure_meta_cf - .get(&db, (slot, set_index))? + .get((slot, set_index))? .unwrap_or_else(|| ErasureMeta::new(set_index)); erasure_meta.set_coding_present(index, false); - let mut batch = db.batch()?; + let mut batch = batch_processor.batch()?; batch.delete::((slot, index))?; batch.put::((slot, set_index), &erasure_meta)?; - db.write(batch)?; + batch_processor.write(batch)?; Ok(()) } pub fn get_data_blob_bytes(&self, slot: u64, index: u64) -> Result>> { - let db = self.db.read().unwrap(); - self.data_cf.get_bytes(&db, (slot, index)) + self.data_cf.get_bytes((slot, index)) } /// For benchmarks, testing, and setup. /// Does no metadata tracking. Use with care. pub fn put_data_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - let mut db = self.db.write().unwrap(); - self.data_cf.put_bytes(&mut db, (slot, index), bytes) + self.data_cf.put_bytes((slot, index), bytes) } /// For benchmarks, testing, and setup. /// Does no metadata tracking. Use with care. pub fn put_coding_blob_bytes_raw(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { - let mut db = self.db.write().unwrap(); - self.erasure_cf.put_bytes(&mut db, (slot, index), bytes) + self.erasure_cf.put_bytes((slot, index), bytes) } /// this function will insert coding blobs and also automatically track erasure-related /// metadata. If recovery is available it will be done pub fn put_coding_blob_bytes(&self, slot: u64, index: u64, bytes: &[u8]) -> Result<()> { let set_index = ErasureMeta::set_index_for(index); - let mut db = self.db.write().unwrap(); + let mut batch_processor = self.batch_processor.write().unwrap(); let mut erasure_meta = self .erasure_meta_cf - .get(&db, (slot, set_index))? + .get((slot, set_index))? .unwrap_or_else(|| ErasureMeta::new(set_index)); erasure_meta.set_coding_present(index, true); erasure_meta.set_size(bytes.len() - BLOB_HEADER_SIZE); - let mut writebatch = db.batch()?; + let mut writebatch = batch_processor.batch()?; writebatch.put_bytes::((slot, index), bytes)?; if let Some((data, coding)) = try_erasure_recover( - &db, + &self.db, &self.session, &erasure_meta, slot, @@ -501,7 +500,7 @@ impl Blocktree { insert_data_blob_batch( &data[..], - &db, + &self.db, &mut HashMap::new(), &mut erasure_meta_working_set, &mut HashMap::new(), @@ -522,7 +521,7 @@ impl Blocktree { writebatch.put::((slot, set_index), &erasure_meta)?; - db.write(writebatch)?; + batch_processor.write(writebatch)?; Ok(()) } @@ -619,8 +618,7 @@ impl Blocktree { end_index: u64, max_missing: usize, ) -> Vec { - let db = self.db.read().unwrap(); - if let Ok(mut db_iterator) = db.cursor::() { + if let Ok(mut db_iterator) = self.db.cursor::() { Self::find_missing_indexes(&mut db_iterator, slot, start_index, end_index, max_missing) } else { vec![] @@ -639,9 +637,7 @@ impl Blocktree { } pub fn read_ledger_blobs(&self) -> impl Iterator + '_ { - let db = self.db.read().unwrap(); - - let iter = db.iter::().unwrap(); + let iter = self.db.iter::().unwrap(); iter.map(|(_, blob_data)| Blob::new(&blob_data)) } @@ -699,7 +695,7 @@ impl Blocktree { None } } - let mut db_iterator = self.db.read().unwrap().cursor::()?; + let mut db_iterator = self.db.cursor::()?; db_iterator.seek_to_first(); Ok(EntryIterator { @@ -718,7 +714,7 @@ impl Blocktree { // Find the next consecutive block of blobs. let consecutive_blobs = get_slot_consecutive_blobs( slot, - &self.db.read().unwrap(), + &self.db, &HashMap::new(), blob_start_index, max_entries, @@ -750,30 +746,28 @@ impl Blocktree { } pub fn is_root(&self, slot: u64) -> bool { - if let Ok(Some(meta)) = self.meta(slot) { - meta.is_root + if let Ok(Some(root_slot)) = self.db.get::(()) { + root_slot == slot } else { false } } pub fn set_root(&self, slot: u64) -> Result<()> { - let mut root_slot = self.root_slot.write().unwrap(); - let mut db = self.db.write().unwrap(); - - *root_slot = slot; - if let Some(mut meta) = self.meta_cf.get(&db, slot)? { - meta.is_root = true; - self.meta_cf.put(&mut db, slot, &meta)?; - } + self.db.put::((), &slot)?; Ok(()) } + pub fn get_root(&self) -> Result { + let root_opt = self.db.get::(())?; + + Ok(root_opt.unwrap_or(0)) + } + pub fn get_orphans(&self, max: Option) -> Vec { let mut results = vec![]; - let db = self.db.read().unwrap(); - let mut iter = db.cursor::().unwrap(); + let mut iter = self.db.cursor::().unwrap(); iter.seek_to_first(); while iter.valid() { if let Some(max) = max { @@ -794,19 +788,19 @@ impl Blocktree { let mut bootstrap_meta = SlotMeta::new(0, 1); let last = blobs.last().unwrap(); - let mut db = self.db.write().unwrap(); + let mut batch_processor = self.batch_processor.write().unwrap(); bootstrap_meta.consumed = last.index() + 1; bootstrap_meta.received = last.index() + 1; bootstrap_meta.is_connected = true; - let mut batch = db.batch()?; + let mut batch = batch_processor.batch()?; batch.put::(0, &bootstrap_meta)?; for blob in blobs { let serialized_blob_datas = &blob.data[..BLOB_HEADER_SIZE + blob.size()]; batch.put_bytes::((blob.slot(), blob.index()), serialized_blob_datas)?; } - db.write(batch)?; + batch_processor.write(batch)?; Ok(()) } } @@ -918,7 +912,7 @@ fn check_insert_data_blob<'a>( let entry = slot_meta_working_set.entry(blob_slot).or_insert_with(|| { // Store a 2-tuple of the metadata (working copy, backup copy) if let Some(mut meta) = meta_cf - .get(db, blob_slot) + .get(blob_slot) .expect("Expect database get to succeed") { let backup = Some(meta.clone()); @@ -965,7 +959,7 @@ fn should_insert_blob( if blob_index < slot.consumed || prev_inserted_blob_datas.contains_key(&(blob_slot, blob_index)) || data_cf - .get_bytes(db, (blob_slot, blob_index)) + .get_bytes((blob_slot, blob_index)) .map(|opt| opt.is_some()) .unwrap_or(false) { @@ -1035,7 +1029,7 @@ fn find_slot_meta_in_db_else_create<'a>( slot: u64, insert_map: &'a mut HashMap>>, ) -> Result>> { - if let Some(slot_meta) = db.column::().get(db, slot)? { + if let Some(slot_meta) = db.column::().get(slot)? { insert_map.insert(slot, Rc::new(RefCell::new(slot_meta))); Ok(insert_map.get(&slot).unwrap().clone()) } else { @@ -1084,7 +1078,7 @@ fn get_slot_consecutive_blobs<'a>( // Try to find the next blob we're looking for in the prev_inserted_blob_datas if let Some(prev_blob_data) = prev_inserted_blob_datas.get(&(slot, current_index)) { blobs.push(Cow::Borrowed(*prev_blob_data)); - } else if let Some(blob_data) = data_cf.get_bytes(db, (slot, current_index))? { + } else if let Some(blob_data) = data_cf.get_bytes((slot, current_index))? { // Try to find the next blob we're looking for in the database blobs.push(Cow::Owned(blob_data)); } else { @@ -1345,7 +1339,7 @@ fn recover( let mut blob_bytes = match new_coding { Some((new_coding_index, bytes)) if new_coding_index == i => bytes.to_vec(), _ => erasure_cf - .get_bytes(db, (slot, i))? + .get_bytes((slot, i))? .expect("ErasureMeta must have no false positives"), }; @@ -1368,7 +1362,7 @@ fn recover( let mut blob_bytes = match prev_inserted_blob_datas.get(&(slot, i)) { Some(bytes) => bytes.to_vec(), None => data_cf - .get_bytes(db, (slot, i))? + .get_bytes((slot, i))? .expect("erasure_meta must have no false positives"), }; @@ -1631,14 +1625,13 @@ pub mod tests { fn test_put_get_simple() { let ledger_path = get_tmp_ledger_path("test_put_get_simple"); let ledger = Blocktree::open(&ledger_path).unwrap(); - let mut db = ledger.db.write().unwrap(); // Test meta column family let meta = SlotMeta::new(0, 1); - ledger.meta_cf.put(&mut db, 0, &meta).unwrap(); + ledger.meta_cf.put(0, &meta).unwrap(); let result = ledger .meta_cf - .get(&db, 0) + .get(0) .unwrap() .expect("Expected meta object to exist"); @@ -1647,14 +1640,11 @@ pub mod tests { // Test erasure column family let erasure = vec![1u8; 16]; let erasure_key = (0, 0); - ledger - .erasure_cf - .put_bytes(&mut db, erasure_key, &erasure) - .unwrap(); + ledger.erasure_cf.put_bytes(erasure_key, &erasure).unwrap(); let result = ledger .erasure_cf - .get_bytes(&db, erasure_key) + .get_bytes(erasure_key) .unwrap() .expect("Expected erasure object to exist"); @@ -1663,18 +1653,17 @@ pub mod tests { // Test data column family let data = vec![2u8; 16]; let data_key = (0, 0); - ledger.data_cf.put_bytes(&mut db, data_key, &data).unwrap(); + ledger.data_cf.put_bytes(data_key, &data).unwrap(); let result = ledger .data_cf - .get_bytes(&db, data_key) + .get_bytes(data_key) .unwrap() .expect("Expected data object to exist"); assert_eq!(result, data); // Destroying database without closing it first is undefined behavior - drop(db); drop(ledger); Blocktree::destroy(&ledger_path).expect("Expected successful database destruction"); } @@ -1853,8 +1842,6 @@ pub mod tests { let mut db_iterator = blocktree .db - .read() - .unwrap() .cursor::() .expect("Expected to be able to open database iterator"); @@ -2504,10 +2491,7 @@ pub mod tests { } // No orphan slots should exist - assert!(blocktree - .orphans_cf - .is_empty(&blocktree.db.read().unwrap()) - .unwrap()) + assert!(blocktree.orphans_cf.is_empty().unwrap()) } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); @@ -2524,20 +2508,14 @@ pub mod tests { assert!(blocktree.get_slots_since(&vec![0]).unwrap().is_empty()); let mut meta0 = SlotMeta::new(0, 0); - blocktree - .meta_cf - .put(&mut blocktree.db.write().unwrap(), 0, &meta0) - .unwrap(); + blocktree.meta_cf.put(0, &meta0).unwrap(); // Slot exists, chains to nothing let expected: HashMap> = HashMap::from_iter(vec![(0, vec![])].into_iter()); assert_eq!(blocktree.get_slots_since(&vec![0]).unwrap(), expected); meta0.next_slots = vec![1, 2]; - blocktree - .meta_cf - .put(&mut blocktree.db.write().unwrap(), 0, &meta0) - .unwrap(); + blocktree.meta_cf.put(0, &meta0).unwrap(); // Slot exists, chains to some other slots let expected: HashMap> = @@ -2547,10 +2525,7 @@ pub mod tests { let mut meta3 = SlotMeta::new(3, 1); meta3.next_slots = vec![10, 5]; - blocktree - .meta_cf - .put(&mut blocktree.db.write().unwrap(), 3, &meta3) - .unwrap(); + blocktree.meta_cf.put(3, &meta3).unwrap(); let expected: HashMap> = HashMap::from_iter(vec![(0, vec![1, 2]), (3, vec![10, 5])].into_iter()); assert_eq!(blocktree.get_slots_since(&vec![0, 1, 3]).unwrap(), expected); @@ -2611,10 +2586,7 @@ pub mod tests { assert!(!is_orphan(&meta)); } // Orphans cf is empty - assert!(blocktree - .orphans_cf - .is_empty(&blocktree.db.read().unwrap()) - .unwrap()) + assert!(blocktree.orphans_cf.is_empty().unwrap()) } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } @@ -2847,7 +2819,7 @@ pub mod tests { assert_eq!(slot_meta.consumed, 5); assert!(!should_insert_blob( &slot_meta, - &blocktree.db.read().unwrap(), + &blocktree.db, &HashMap::new(), &blobs[4].clone() )); @@ -2857,7 +2829,7 @@ pub mod tests { let slot_meta = blocktree.meta(0).unwrap().unwrap(); assert!(!should_insert_blob( &slot_meta, - &blocktree.db.read().unwrap(), + &blocktree.db, &HashMap::new(), &blobs[7].clone() )); @@ -2870,7 +2842,7 @@ pub mod tests { blobs[8].set_is_last_in_slot(); assert!(!should_insert_blob( &slot_meta, - &blocktree.db.read().unwrap(), + &blocktree.db, &HashMap::new(), &blobs[8].clone() )); @@ -2883,7 +2855,7 @@ pub mod tests { // Trying to insert a blob with index > the "is_last" blob should fail assert!(!should_insert_blob( &slot_meta, - &blocktree.db.read().unwrap(), + &blocktree.db, &HashMap::new(), &blobs[10].clone() )); @@ -3092,11 +3064,9 @@ pub mod tests { (slot, blob.index()); } - let db = blocktree.db.read().unwrap(); - let erasure_meta = blocktree .erasure_meta_cf - .get(&db, (slot, set_index as u64)) + .get((slot, set_index as u64)) .expect("Erasure Meta should be present") .unwrap(); @@ -3104,7 +3074,7 @@ pub mod tests { let retrieved_data = blocktree .data_cf - .get_bytes(&db, (slot, focused_index as u64)) + .get_bytes((slot, focused_index as u64)) .unwrap(); assert!(retrieved_data.is_some()); @@ -3151,7 +3121,7 @@ pub mod tests { // try recovery even though there aren't enough blobs let erasure_meta = blocktree .erasure_meta_cf - .get(&blocktree.db.read().unwrap(), (SLOT, SET_INDEX)) + .get((SLOT, SET_INDEX)) .unwrap() .unwrap(); @@ -3160,7 +3130,7 @@ pub mod tests { let prev_inserted_blob_datas = HashMap::new(); let attempt_result = try_erasure_recover( - &blocktree.db.read().unwrap(), + &blocktree.db, &blocktree.session, &erasure_meta, SLOT, @@ -3303,10 +3273,7 @@ pub mod tests { // triggering recovery. let erasure_meta = blocktree .erasure_meta_cf - .get( - &blocktree.db.read().unwrap(), - (slot, erasure_set.set_index), - ) + .get((slot, erasure_set.set_index)) .unwrap() .unwrap(); @@ -3340,7 +3307,7 @@ pub mod tests { let erasure_meta = blocktree .erasure_meta_cf - .get(&blocktree.db.read().unwrap(), (slot, set_index)) + .get((slot, set_index)) .expect("DB get must succeed") .expect("ErasureMeta must be present for each erasure set"); diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs index 3cd4fff9c1..38e2d24e4d 100644 --- a/core/src/blocktree/db.rs +++ b/core/src/blocktree/db.rs @@ -9,6 +9,7 @@ use std::borrow::Borrow; use std::collections::HashMap; use std::marker::PhantomData; use std::path::Path; +use std::sync::Arc; pub mod columns { #[derive(Debug)] @@ -30,6 +31,10 @@ pub mod columns { #[derive(Debug)] /// The erasure meta column pub struct ErasureMeta; + + #[derive(Debug)] + /// The root column + pub struct Root; } pub trait Backend: Sized + Send + Sync { @@ -112,7 +117,15 @@ pub struct Database where B: Backend, { - backend: B, + backend: Arc, +} + +#[derive(Debug, Clone)] +pub struct BatchProcessor +where + B: Backend, +{ + backend: Arc, } #[derive(Debug, Clone)] @@ -132,7 +145,7 @@ where B: Backend, C: Column, { - backend: PhantomData, + backend: Arc, column: PhantomData, } @@ -151,7 +164,7 @@ where B: Backend, { pub fn open(path: &Path) -> Result { - let backend = B::open(path)?; + let backend = Arc::new(B::open(path)?); Ok(Database { backend }) } @@ -170,7 +183,7 @@ where .get_cf(self.cf_handle::(), C::key(key).borrow()) } - pub fn put_bytes(&mut self, key: C::Index, data: &[u8]) -> Result<()> + pub fn put_bytes(&self, key: C::Index, data: &[u8]) -> Result<()> where C: Column, { @@ -178,7 +191,7 @@ where .put_cf(self.cf_handle::(), C::key(key).borrow(), data) } - pub fn delete(&mut self, key: C::Index) -> Result<()> + pub fn delete(&self, key: C::Index) -> Result<()> where C: Column, { @@ -202,7 +215,7 @@ where } } - pub fn put(&mut self, key: C::Index, value: &C::Type) -> Result<()> + pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> where C: TypedColumn, { @@ -240,6 +253,35 @@ where Ok(iter) } + #[inline] + pub fn cf_handle(&self) -> B::ColumnFamily + where + C: Column, + { + self.backend.cf_handle(C::NAME).clone() + } + + pub fn column(&self) -> LedgerColumn + where + C: Column, + { + LedgerColumn { + backend: Arc::clone(&self.backend), + column: PhantomData, + } + } + + pub fn batch_processor(&self) -> BatchProcessor { + BatchProcessor { + backend: Arc::clone(&self.backend), + } + } +} + +impl BatchProcessor +where + B: Backend, +{ pub fn batch(&mut self) -> Result> { let db_write_batch = self.backend.batch()?; let map = self @@ -259,24 +301,6 @@ where pub fn write(&mut self, batch: WriteBatch) -> Result<()> { self.backend.write(batch.write_batch) } - - #[inline] - pub fn cf_handle(&self) -> B::ColumnFamily - where - C: Column, - { - self.backend.cf_handle(C::NAME).clone() - } - - pub fn column(&self) -> LedgerColumn - where - C: Column, - { - LedgerColumn { - backend: PhantomData, - column: PhantomData, - } - } } impl Cursor @@ -333,41 +357,47 @@ where B: Backend, C: Column, { - pub fn get_bytes(&self, db: &Database, key: C::Index) -> Result>> { - db.backend.get_cf(self.handle(db), C::key(key).borrow()) + pub fn get_bytes(&self, key: C::Index) -> Result>> { + self.backend.get_cf(self.handle(), C::key(key).borrow()) } - pub fn cursor(&self, db: &Database) -> Result> { - db.cursor() + pub fn cursor(&self) -> Result> { + let db_cursor = self.backend.raw_iterator_cf(self.handle())?; + + Ok(Cursor { + db_cursor, + column: PhantomData, + backend: PhantomData, + }) } - pub fn iter(&self, db: &Database) -> Result)>> { - db.iter::() + pub fn iter(&self) -> Result)>> { + let iter = self + .backend + .iterator_cf(self.handle())? + .map(|(key, value)| (C::index(&key), value.into())); + + Ok(iter) } - pub fn handle(&self, db: &Database) -> B::ColumnFamily { - db.cf_handle::() + #[inline] + pub fn handle(&self) -> B::ColumnFamily { + self.backend.cf_handle(C::NAME).clone() } - pub fn is_empty(&self, db: &Database) -> Result { - let mut cursor = self.cursor(db)?; + pub fn is_empty(&self) -> Result { + let mut cursor = self.cursor()?; cursor.seek_to_first(); Ok(!cursor.valid()) } -} -impl LedgerColumn -where - B: Backend, - C: Column, -{ - pub fn put_bytes(&self, db: &mut Database, key: C::Index, value: &[u8]) -> Result<()> { - db.backend - .put_cf(self.handle(db), C::key(key).borrow(), value) + pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { + self.backend + .put_cf(self.handle(), C::key(key).borrow(), value) } - pub fn delete(&self, db: &mut Database, key: C::Index) -> Result<()> { - db.backend.delete_cf(self.handle(db), C::key(key).borrow()) + pub fn delete(&self, key: C::Index) -> Result<()> { + self.backend.delete_cf(self.handle(), C::key(key).borrow()) } } @@ -376,18 +406,21 @@ where B: Backend, C: TypedColumn, { - pub fn get(&self, db: &Database, key: C::Index) -> Result> { - db.get::(key) - } -} + pub fn get(&self, key: C::Index) -> Result> { + if let Some(serialized_value) = self.backend.get_cf(self.handle(), C::key(key).borrow())? { + let value = deserialize(&serialized_value)?; -impl LedgerColumn -where - B: Backend, - C: TypedColumn, -{ - pub fn put(&self, db: &mut Database, key: C::Index, value: &C::Type) -> Result<()> { - db.put::(key, value) + Ok(Some(value)) + } else { + Ok(None) + } + } + + pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> { + let serialized_value = serialize(value)?; + + self.backend + .put_cf(self.handle(), C::key(key).borrow(), &serialized_value) } } diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs index a9eee4137e..90c3d28bdb 100644 --- a/core/src/blocktree/kvs.rs +++ b/core/src/blocktree/kvs.rs @@ -119,6 +119,36 @@ impl TypedColumn for cf::Orphans { type Type = bool; } +impl Column for cf::Root { + const NAME: &'static str = super::ROOT_CF; + type Index = (); + + fn key(_: ()) -> Key { + Key::default() + } + + fn index(_: &Key) {} +} + +impl TypedColumn for cf::Root { + type Type = u64; +} + +impl Column for cf::SlotMeta { + const NAME: &'static str = super::META_CF; + type Index = u64; + + fn key(slot: u64) -> Key { + let mut key = Key::default(); + BigEndian::write_u64(&mut key.0[8..16], slot); + key + } + + fn index(key: &Key) -> u64 { + BigEndian::read_u64(&key.0[8..16]) + } +} + impl Column for cf::SlotMeta { const NAME: &'static str = super::META_CF; type Index = u64; diff --git a/core/src/blocktree/meta.rs b/core/src/blocktree/meta.rs index 21e3ccf8a8..a1fa6d064b 100644 --- a/core/src/blocktree/meta.rs +++ b/core/src/blocktree/meta.rs @@ -24,8 +24,6 @@ pub struct SlotMeta { // True if this slot is full (consumed == last_index + 1) and if every // slot that is a parent of this slot is also connected. pub is_connected: bool, - // True if this slot is a root - pub is_root: bool, } impl SlotMeta { @@ -68,7 +66,6 @@ impl SlotMeta { parent_slot, next_slots: vec![], is_connected: slot == 0, - is_root: false, last_index: std::u64::MAX, } } diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs index 4d881d935c..c68392d993 100644 --- a/core/src/blocktree/rocks.rs +++ b/core/src/blocktree/rocks.rs @@ -30,7 +30,7 @@ impl Backend for Rocks { type Error = rocksdb::Error; fn open(path: &Path) -> Result { - use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta}; + use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, Root, SlotMeta}; fs::create_dir_all(&path)?; @@ -44,6 +44,7 @@ impl Backend for Rocks { let erasure_meta_cf_descriptor = ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options()); let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options()); + let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options()); let cfs = vec![ meta_cf_descriptor, @@ -51,6 +52,7 @@ impl Backend for Rocks { erasure_cf_descriptor, erasure_meta_cf_descriptor, orphans_cf_descriptor, + root_cf_descriptor, ]; // Open the database @@ -60,13 +62,14 @@ impl Backend for Rocks { } fn columns(&self) -> Vec<&'static str> { - use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, SlotMeta}; + use crate::blocktree::db::columns::{Coding, Data, ErasureMeta, Orphans, Root, SlotMeta}; vec![ Coding::NAME, ErasureMeta::NAME, Data::NAME, Orphans::NAME, + Root::NAME, SlotMeta::NAME, ] } @@ -170,6 +173,21 @@ impl TypedColumn for cf::Orphans { type Type = bool; } +impl Column for cf::Root { + const NAME: &'static str = super::ROOT_CF; + type Index = (); + + fn key(_: ()) -> Vec { + vec![0; 8] + } + + fn index(_: &[u8]) {} +} + +impl TypedColumn for cf::Root { + type Type = u64; +} + impl Column for cf::SlotMeta { const NAME: &'static str = super::META_CF; type Index = u64; diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 896c8e2ec1..5057e74cc2 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -201,7 +201,7 @@ impl RepairService { fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result<(Vec)> { // Slot height and blob indexes for blobs we want to repair let mut repairs: Vec = vec![]; - let slot = *blocktree.root_slot.read().unwrap(); + let slot = blocktree.get_root()?; Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, slot); // TODO: Incorporate gossip to determine priorities for repair?