use crate::result::{Error, Result}; use bincode::{deserialize, serialize}; use serde::de::DeserializeOwned; use serde::Serialize; use std::borrow::Borrow; use std::collections::HashMap; use std::marker::PhantomData; use std::path::Path; use std::sync::Arc; pub mod columns { #[derive(Debug)] /// SlotMeta Column pub struct SlotMeta; #[derive(Debug)] /// Orphans Column pub struct Orphans; #[derive(Debug)] /// Erasure Column pub struct Coding; #[derive(Debug)] /// Data Column pub struct Data; #[derive(Debug)] /// Data Column pub struct DeadSlots; #[derive(Debug)] /// The erasure meta column pub struct ErasureMeta; #[derive(Debug)] /// The root column pub struct Root; #[derive(Debug)] /// The index column pub struct Index; } pub trait Backend: Sized + Send + Sync { type Key: ?Sized + ToOwned; type OwnedKey: Borrow; type ColumnFamily: Clone; type Cursor: DbCursor; type Iter: Iterator, Box<[u8]>)>; type WriteBatch: IWriteBatch; type Error: Into; fn open(path: &Path) -> Result; fn columns(&self) -> Vec<&'static str>; fn destroy(path: &Path) -> Result<()>; fn cf_handle(&self, cf: &str) -> Self::ColumnFamily; fn get_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result>>; fn put_cf(&self, cf: Self::ColumnFamily, key: &Self::Key, value: &[u8]) -> Result<()>; fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::Key) -> Result<()>; fn iterator_cf(&self, cf: Self::ColumnFamily, from: Option<&Self::Key>) -> Result; fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result; fn write(&self, batch: Self::WriteBatch) -> Result<()>; fn batch(&self) -> Result; } pub trait Column where B: Backend, { const NAME: &'static str; type Index; fn key(index: Self::Index) -> B::OwnedKey; fn index(key: &B::Key) -> Self::Index; } pub trait DbCursor where B: Backend, { fn valid(&self) -> bool; fn seek(&mut self, key: &B::Key); fn seek_to_first(&mut self); fn next(&mut self); fn key(&self) -> Option; fn value(&self) -> Option>; } pub trait IWriteBatch where B: Backend, { fn put_cf(&mut self, cf: B::ColumnFamily, key: &B::Key, value: &[u8]) -> Result<()>; fn delete_cf(&mut self, cf: B::ColumnFamily, key: &B::Key) -> Result<()>; } pub trait TypedColumn: Column where B: Backend, { type Type: Serialize + DeserializeOwned; } #[derive(Debug, Clone)] pub struct Database where B: Backend, { backend: Arc, } #[derive(Debug, Clone)] pub struct BatchProcessor where B: Backend, { backend: Arc, } #[derive(Debug, Clone)] pub struct Cursor where B: Backend, C: Column, { db_cursor: B::Cursor, column: PhantomData, backend: PhantomData, } #[derive(Debug, Clone)] pub struct LedgerColumn where B: Backend, C: Column, { backend: Arc, column: PhantomData, } #[derive(Debug)] pub struct WriteBatch where B: Backend, { write_batch: B::WriteBatch, backend: PhantomData, map: HashMap<&'static str, B::ColumnFamily>, } impl Database where B: Backend, { pub fn open(path: &Path) -> Result { let backend = Arc::new(B::open(path)?); Ok(Database { backend }) } pub fn destroy(path: &Path) -> Result<()> { B::destroy(path)?; Ok(()) } pub fn get_bytes(&self, key: C::Index) -> Result>> where C: Column, { self.backend .get_cf(self.cf_handle::(), C::key(key).borrow()) } pub fn put_bytes(&self, key: C::Index, data: &[u8]) -> Result<()> where C: Column, { self.backend .put_cf(self.cf_handle::(), C::key(key).borrow(), data) } pub fn delete(&self, key: C::Index) -> Result<()> where C: Column, { self.backend .delete_cf(self.cf_handle::(), C::key(key).borrow()) } pub fn get(&self, key: C::Index) -> Result> where C: TypedColumn, { if let Some(serialized_value) = self .backend .get_cf(self.cf_handle::(), C::key(key).borrow())? { let value = deserialize(&serialized_value)?; Ok(Some(value)) } else { Ok(None) } } pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> where C: TypedColumn, { let serialized_value = serialize(value)?; self.backend.put_cf( self.cf_handle::(), C::key(key).borrow(), &serialized_value, ) } pub fn cursor(&self) -> Result> where C: Column, { let db_cursor = self.backend.raw_iterator_cf(self.cf_handle::())?; Ok(Cursor { db_cursor, column: PhantomData, backend: PhantomData, }) } pub fn iter( &self, start_from: Option, ) -> Result)>> where C: Column, { let iter = { if let Some(index) = start_from { let key = C::key(index); self.backend .iterator_cf(self.cf_handle::(), Some(key.borrow()))? } else { self.backend.iterator_cf(self.cf_handle::(), None)? } }; Ok(iter.map(|(key, value)| (C::index(&key), value))) } #[inline] pub fn cf_handle(&self) -> B::ColumnFamily where C: Column, { self.backend.cf_handle(C::NAME).clone() } pub fn column(&self) -> LedgerColumn where C: Column, { LedgerColumn { backend: Arc::clone(&self.backend), column: PhantomData, } } // Note this returns an object that can be used to directly write to multiple column families. // This circumvents the synchronization around APIs that in Blocktree that use // blocktree.batch_processor, so this API should only be used if the caller is sure they // are writing to data in columns that will not be corrupted by any simultaneous blocktree // operations. pub unsafe fn batch_processor(&self) -> BatchProcessor { BatchProcessor { backend: Arc::clone(&self.backend), } } } impl BatchProcessor where B: Backend, { pub fn batch(&mut self) -> Result> { let db_write_batch = self.backend.batch()?; let map = self .backend .columns() .into_iter() .map(|desc| (desc, self.backend.cf_handle(desc))) .collect(); Ok(WriteBatch { write_batch: db_write_batch, backend: PhantomData, map, }) } pub fn write(&mut self, batch: WriteBatch) -> Result<()> { self.backend.write(batch.write_batch) } } impl Cursor where B: Backend, C: Column, { pub fn valid(&self) -> bool { self.db_cursor.valid() } pub fn seek(&mut self, key: C::Index) { self.db_cursor.seek(C::key(key).borrow()); } pub fn seek_to_first(&mut self) { self.db_cursor.seek_to_first(); } pub fn next(&mut self) { self.db_cursor.next(); } pub fn key(&self) -> Option { if let Some(key) = self.db_cursor.key() { Some(C::index(key.borrow())) } else { None } } pub fn value_bytes(&self) -> Option> { self.db_cursor.value() } } impl Cursor where B: Backend, C: TypedColumn, { pub fn value(&self) -> Option { if let Some(bytes) = self.db_cursor.value() { let value = deserialize(&bytes).ok()?; Some(value) } else { None } } } impl LedgerColumn where B: Backend, C: Column, { pub fn get_bytes(&self, key: C::Index) -> Result>> { self.backend.get_cf(self.handle(), C::key(key).borrow()) } pub fn cursor(&self) -> Result> { let db_cursor = self.backend.raw_iterator_cf(self.handle())?; Ok(Cursor { db_cursor, column: PhantomData, backend: PhantomData, }) } pub fn iter( &self, start_from: Option, ) -> Result)>> { let iter = { if let Some(index) = start_from { let key = C::key(index); self.backend .iterator_cf(self.handle(), Some(key.borrow()))? } else { self.backend.iterator_cf(self.handle(), None)? } }; Ok(iter.map(|(key, value)| (C::index(&key), value))) } pub fn force_delete(&self, from: Option, to: Option) -> Result<()> where C::Index: PartialOrd, { let iter = self.iter(from)?; for (index, _) in iter { if let Some(ref to) = to { if &index > to { break; } } if let Err(e) = self.delete(index) { error!("Error: {:?} while deleting {:?}", e, C::NAME) } } Ok(()) } #[inline] pub fn handle(&self) -> B::ColumnFamily { self.backend.cf_handle(C::NAME).clone() } pub fn is_empty(&self) -> Result { let mut cursor = self.cursor()?; cursor.seek_to_first(); Ok(!cursor.valid()) } pub fn put_bytes(&self, key: C::Index, value: &[u8]) -> Result<()> { self.backend .put_cf(self.handle(), C::key(key).borrow(), value) } pub fn delete(&self, key: C::Index) -> Result<()> { self.backend.delete_cf(self.handle(), C::key(key).borrow()) } } impl LedgerColumn where B: Backend, C: TypedColumn, { pub fn get(&self, key: C::Index) -> Result> { if let Some(serialized_value) = self.backend.get_cf(self.handle(), C::key(key).borrow())? { let value = deserialize(&serialized_value)?; Ok(Some(value)) } else { Ok(None) } } pub fn put(&self, key: C::Index, value: &C::Type) -> Result<()> { let serialized_value = serialize(value)?; self.backend .put_cf(self.handle(), C::key(key).borrow(), &serialized_value) } } impl WriteBatch where B: Backend, { pub fn put_bytes>(&mut self, key: C::Index, bytes: &[u8]) -> Result<()> { self.write_batch .put_cf(self.get_cf::(), C::key(key).borrow(), bytes) } pub fn delete>(&mut self, key: C::Index) -> Result<()> { self.write_batch .delete_cf(self.get_cf::(), C::key(key).borrow()) } pub fn put>(&mut self, key: C::Index, value: &C::Type) -> Result<()> { let serialized_value = serialize(&value)?; self.write_batch .put_cf(self.get_cf::(), C::key(key).borrow(), &serialized_value) } #[inline] fn get_cf>(&self) -> B::ColumnFamily { self.map[C::NAME].clone() } }