diff --git a/Cargo.lock b/Cargo.lock index 854bfce72d..dfab259d40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1098,6 +1098,15 @@ dependencies = [ "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "memmap" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "libc 0.2.50 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "memoffset" version = "0.2.1" @@ -1976,6 +1985,7 @@ dependencies = [ "libc 0.2.50 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "nix 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3071,6 +3081,7 @@ dependencies = [ "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum memchr 2.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "db4c41318937f6e76648f42826b1d9ade5c09cafb5aef7e351240a70f39206e9" "checksum memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e2ffa2c986de11a9df78620c01eeaaf27d94d3ff02bf81bfcca953102dd0c6ff" +"checksum memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" "checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3" "checksum mime 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ba626b8a6de5da682e1caa06bdb42a335aee5a84db8e5046a3e8ab17ba0a3ae0" "checksum mime 0.3.13 (registry+https://github.com/rust-lang/crates.io-index)" = "3e27ca21f40a310bd06d9031785f4801710d566c184a6e15bad4f1d9b65f9425" diff --git a/core/Cargo.toml b/core/Cargo.toml index 1f7ef84842..756d14cf9d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -17,6 +17,7 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git chacha = [] cuda = [] erasure = [] +kvstore = ["memmap"] [dependencies] bincode = "1.1.2" @@ -33,6 +34,7 @@ jsonrpc-pubsub = "10.1.0" jsonrpc-ws-server = "10.1.0" libc = "0.2.50" log = "0.4.2" +memmap = { version = "0.7.0", optional = true } nix = "0.13.0" rand = "0.6.5" rand_chacha = "0.1.1" diff --git a/core/benches/kvstore.rs b/core/benches/kvstore.rs new file mode 100644 index 0000000000..516c305a0d --- /dev/null +++ b/core/benches/kvstore.rs @@ -0,0 +1,189 @@ +#![cfg(feature = "kvstore")] +#![feature(test)] +extern crate test; + +use std::fs; +use std::path::{Path, PathBuf}; + +use rand::{self, thread_rng, Rng}; + +use test::Bencher; + +use solana::kvstore::{Config, Key, KvStore}; + +const SMALL_SIZE: usize = 512; +const LARGE_SIZE: usize = 32 * 1024; +const HUGE_SIZE: usize = 64 * 1024; + +fn bench_write(bench: &mut Bencher, rows: &[(Key, Vec)], ledger_path: &str) { + let store = KvStore::open_default(&ledger_path).unwrap(); + + bench.iter(move || { + store.put_many(rows.iter()).expect("Failed to insert rows"); + }); + + teardown(&ledger_path); +} + +fn bench_write_partitioned(bench: &mut Bencher, rows: &[(Key, Vec)], ledger_path: &str) { + let path = Path::new(ledger_path); + let storage_dirs = (0..4) + .map(|i| path.join(format!("parition-{}", i))) + .collect::>(); + + let store = KvStore::partitioned(&ledger_path, &storage_dirs, Config::default()).unwrap(); + + bench.iter(move || { + store.put_many(rows.iter()).expect("Failed to insert rows"); + }); + + teardown(&ledger_path); +} + +#[bench] +#[ignore] +fn bench_write_small(bench: &mut Bencher) { + let ledger_path = setup("bench_write_small"); + let num_entries = 32 * 1024; + let rows = gen_pairs(SMALL_SIZE).take(num_entries).collect::>(); + bench_write(bench, &rows, &ledger_path.to_string_lossy()); +} + +#[bench] +#[ignore] +fn bench_write_small_partitioned(bench: &mut Bencher) { + let ledger_path = setup("bench_write_small_partitioned"); + let num_entries = 32 * 1024; + let rows = gen_pairs(SMALL_SIZE).take(num_entries).collect::>(); + bench_write_partitioned(bench, &rows, &ledger_path.to_string_lossy()); +} + +#[bench] +#[ignore] +fn bench_write_large(bench: &mut Bencher) { + let ledger_path = setup("bench_write_large"); + let num_entries = 32 * 1024; + let rows = gen_pairs(LARGE_SIZE).take(num_entries).collect::>(); + bench_write(bench, &rows, &ledger_path.to_string_lossy()); +} + +#[bench] +#[ignore] +fn bench_write_huge(bench: &mut Bencher) { + let ledger_path = setup("bench_write_huge"); + let num_entries = 32 * 1024; + let rows = gen_pairs(HUGE_SIZE).take(num_entries).collect::>(); + bench_write(bench, &rows, &ledger_path.to_string_lossy()); +} + +#[bench] +#[ignore] +fn bench_read_sequential(bench: &mut Bencher) { + let ledger_path = setup("bench_read_sequential"); + let store = KvStore::open_default(&ledger_path).unwrap(); + + // Insert some big and small blobs into the ledger + let num_small_blobs = 32 * 1024; + let num_large_blobs = 32 * 1024; + let total_blobs = num_small_blobs + num_large_blobs; + + let small = gen_data(SMALL_SIZE).take(num_small_blobs); + let large = gen_data(LARGE_SIZE).take(num_large_blobs); + let rows = gen_seq_keys().zip(small.chain(large)); + + let _ = store.put_many(rows); + + let num_reads = total_blobs / 15; + let mut rng = rand::thread_rng(); + + bench.iter(move || { + // Generate random starting point in the range [0, total_blobs - 1], read num_reads blobs sequentially + let start_index = rng.gen_range(0, num_small_blobs + num_large_blobs); + for i in start_index..start_index + num_reads { + let i = i as u64; + let k = Key::from((i, i, i)); + let _ = store.get(&k); + } + }); + + teardown(&ledger_path); +} + +#[bench] +#[ignore] +fn bench_read_random(bench: &mut Bencher) { + let ledger_path = setup("bench_read_sequential"); + let store = KvStore::open_default(&ledger_path).unwrap(); + + // Insert some big and small blobs into the ledger + let num_small_blobs = 32 * 1024; + let num_large_blobs = 32 * 1024; + let total_blobs = num_small_blobs + num_large_blobs; + + let small = gen_data(SMALL_SIZE).take(num_small_blobs); + let large = gen_data(LARGE_SIZE).take(num_large_blobs); + let rows = gen_seq_keys().zip(small.chain(large)); + + let _ = store.put_many(rows); + + let num_reads = total_blobs / 15; + let mut rng = rand::thread_rng(); + + // Generate a num_reads sized random sample of indexes in range [0, total_blobs - 1], + // simulating random reads + let indexes: Vec = (0..num_reads) + .map(|_| rng.gen_range(0, total_blobs as u64)) + .collect(); + + bench.iter(move || { + for &i in indexes.iter() { + let i = i as u64; + let k = Key::from((i, i, i)); + let _ = store.get(&k); + } + }); + + teardown(&ledger_path); +} + +fn setup(test_name: &str) -> PathBuf { + let dir = Path::new("kvstore-bench").join(test_name);; + + let _ig = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + + dir +} + +fn gen_seq_keys() -> impl Iterator { + let mut n = 0; + + std::iter::repeat_with(move || { + let key = Key::from((n, n, n)); + n += 1; + + key + }) +} + +fn gen_keys() -> impl Iterator { + let mut rng = thread_rng(); + + std::iter::repeat_with(move || { + let buf = rng.gen(); + + Key(buf) + }) +} + +fn gen_data(size: usize) -> impl Iterator> { + std::iter::repeat(vec![1u8; size]) +} + +fn gen_pairs(data_size: usize) -> impl Iterator)> { + gen_keys().zip(gen_data(data_size)) +} + +fn teardown>(p: P) { + KvStore::destroy(p).expect("Expect successful store destruction"); +} diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index d8425e87d7..9bed08df07 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -3,121 +3,81 @@ //! access read to a persistent file-based ledger. use crate::entry::Entry; +#[cfg(feature = "kvstore")] +use crate::kvstore; use crate::packet::{Blob, SharedBlob, BLOB_HEADER_SIZE}; use crate::result::{Error, Result}; + use bincode::{deserialize, serialize}; -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; + use hashbrown::HashMap; -use rocksdb::{ - ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, IteratorMode, Options, WriteBatch, DB, -}; -use serde::de::DeserializeOwned; + +#[cfg(not(feature = "kvstore"))] +use rocksdb; + use serde::Serialize; + use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, KeypairUtil}; -use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; + use std::borrow::{Borrow, Cow}; use std::cell::RefCell; use std::cmp; use std::fs; use std::io; -use std::path::Path; use std::rc::Rc; use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::Arc; -pub type BlocktreeRawIterator = rocksdb::DBRawIterator; +mod db; +#[cfg(feature = "kvstore")] +mod kvs; +#[cfg(not(feature = "kvstore"))] +mod rocks; +#[cfg(feature = "kvstore")] +use self::kvs::{DataCf, ErasureCf, Kvs, MetaCf}; +#[cfg(not(feature = "kvstore"))] +use self::rocks::{DataCf, ErasureCf, MetaCf, Rocks}; + +pub use db::{ + Cursor, Database, IDataCf, IErasureCf, IMetaCf, IWriteBatch, LedgerColumnFamily, + LedgerColumnFamilyRaw, +}; + +#[cfg(not(feature = "kvstore"))] +pub type BlocktreeRawIterator = ::Cursor; +#[cfg(feature = "kvstore")] +pub type BlocktreeRawIterator = ::Cursor; + +#[cfg(not(feature = "kvstore"))] +pub type WriteBatch = ::WriteBatch; +#[cfg(feature = "kvstore")] +pub type WriteBatch = ::WriteBatch; + +#[cfg(not(feature = "kvstore"))] +type KeyRef = ::KeyRef; +#[cfg(feature = "kvstore")] +type KeyRef = ::KeyRef; + +#[cfg(not(feature = "kvstore"))] +pub type Key = ::Key; +#[cfg(feature = "kvstore")] +pub type Key = ::Key; + +#[cfg(not(feature = "kvstore"))] pub const BLOCKTREE_DIRECTORY: &str = "rocksdb"; -// A good value for this is the number of cores on the machine -const TOTAL_THREADS: i32 = 8; -const MAX_WRITE_BUFFER_SIZE: usize = 512 * 1024 * 1024; +#[cfg(feature = "kvstore")] +pub const BLOCKTREE_DIRECTORY: &str = "kvstore"; #[derive(Debug)] pub enum BlocktreeError { BlobForIndexExists, InvalidBlobData, RocksDb(rocksdb::Error), -} - -impl std::convert::From for Error { - fn from(e: rocksdb::Error) -> Error { - Error::BlocktreeError(BlocktreeError::RocksDb(e)) - } -} - -pub trait LedgerColumnFamily { - type ValueType: DeserializeOwned + Serialize; - - fn get(&self, key: &[u8]) -> Result> { - let db = self.db(); - let data_bytes = db.get_cf(self.handle(), key)?; - - if let Some(raw) = data_bytes { - let result: Self::ValueType = deserialize(&raw)?; - Ok(Some(result)) - } else { - Ok(None) - } - } - - fn get_bytes(&self, key: &[u8]) -> Result>> { - let db = self.db(); - let data_bytes = db.get_cf(self.handle(), key)?; - Ok(data_bytes.map(|x| x.to_vec())) - } - - fn put_bytes(&self, key: &[u8], serialized_value: &[u8]) -> Result<()> { - let db = self.db(); - db.put_cf(self.handle(), &key, &serialized_value)?; - Ok(()) - } - - fn put(&self, key: &[u8], value: &Self::ValueType) -> Result<()> { - let db = self.db(); - let serialized = serialize(value)?; - db.put_cf(self.handle(), &key, &serialized)?; - Ok(()) - } - - fn delete(&self, key: &[u8]) -> Result<()> { - let db = self.db(); - db.delete_cf(self.handle(), &key)?; - Ok(()) - } - - fn db(&self) -> &Arc; - fn handle(&self) -> ColumnFamily; -} - -pub trait LedgerColumnFamilyRaw { - fn get(&self, key: &[u8]) -> Result>> { - let db = self.db(); - let data_bytes = db.get_cf(self.handle(), key)?; - Ok(data_bytes.map(|x| x.to_vec())) - } - - fn put(&self, key: &[u8], serialized_value: &[u8]) -> Result<()> { - let db = self.db(); - db.put_cf(self.handle(), &key, &serialized_value)?; - Ok(()) - } - - fn delete(&self, key: &[u8]) -> Result<()> { - let db = self.db(); - db.delete_cf(self.handle(), &key)?; - Ok(()) - } - - fn raw_iterator(&self) -> BlocktreeRawIterator { - let db = self.db(); - db.raw_iterator_cf(self.handle()) - .expect("Expected to be able to open database iterator") - } - - fn handle(&self) -> ColumnFamily; - fn db(&self) -> &Arc; + #[cfg(feature = "kvstore")] + KvsDb(kvstore::Error), } #[derive(Clone, Debug, Default, Deserialize, Serialize, Eq, PartialEq)] @@ -171,156 +131,13 @@ impl SlotMeta { } } -pub struct MetaCf { - db: Arc, -} - -impl MetaCf { - pub fn new(db: Arc) -> Self { - MetaCf { db } - } - - pub fn key(slot: u64) -> Vec { - let mut key = vec![0u8; 8]; - BigEndian::write_u64(&mut key[0..8], slot); - key - } - - pub fn get_slot_meta(&self, slot: u64) -> Result> { - let key = Self::key(slot); - self.get(&key) - } - - pub fn put_slot_meta(&self, slot: u64, slot_meta: &SlotMeta) -> Result<()> { - let key = Self::key(slot); - self.put(&key, slot_meta) - } - - pub fn index_from_key(key: &[u8]) -> Result { - let mut rdr = io::Cursor::new(&key[..]); - let index = rdr.read_u64::()?; - Ok(index) - } -} - -impl LedgerColumnFamily for MetaCf { - type ValueType = SlotMeta; - - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(META_CF).unwrap() - } -} - -// The data column family -pub struct DataCf { - db: Arc, -} - -impl DataCf { - pub fn new(db: Arc) -> Self { - DataCf { db } - } - - pub fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { - let key = Self::key(slot, index); - self.get(&key) - } - - pub fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { - let key = Self::key(slot, index); - self.delete(&key) - } - - pub fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { - let key = Self::key(slot, index); - self.put(&key, serialized_value) - } - - pub fn key(slot: u64, index: u64) -> Vec { - let mut key = vec![0u8; 16]; - BigEndian::write_u64(&mut key[0..8], slot); - BigEndian::write_u64(&mut key[8..16], index); - key - } - - pub fn slot_from_key(key: &[u8]) -> Result { - let mut rdr = io::Cursor::new(&key[0..8]); - let height = rdr.read_u64::()?; - Ok(height) - } - - pub fn index_from_key(key: &[u8]) -> Result { - let mut rdr = io::Cursor::new(&key[8..16]); - let index = rdr.read_u64::()?; - Ok(index) - } -} - -impl LedgerColumnFamilyRaw for DataCf { - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(DATA_CF).unwrap() - } -} - -// The erasure column family -pub struct ErasureCf { - db: Arc, -} - -impl ErasureCf { - pub fn new(db: Arc) -> Self { - ErasureCf { db } - } - pub fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { - let key = Self::key(slot, index); - self.delete(&key) - } - - pub fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { - let key = Self::key(slot, index); - self.get(&key) - } - - pub fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { - let key = Self::key(slot, index); - self.put(&key, serialized_value) - } - - pub fn key(slot: u64, index: u64) -> Vec { - DataCf::key(slot, index) - } - - pub fn slot_from_key(key: &[u8]) -> Result { - DataCf::slot_from_key(key) - } - - pub fn index_from_key(key: &[u8]) -> Result { - DataCf::index_from_key(key) - } -} - -impl LedgerColumnFamilyRaw for ErasureCf { - fn db(&self) -> &Arc { - &self.db - } - - fn handle(&self) -> ColumnFamily { - self.db.cf_handle(ERASURE_CF).unwrap() - } -} - // ledger window pub struct Blocktree { // Underlying database is automatically closed in the Drop implementation of DB - db: Arc, + #[cfg(not(feature = "kvstore"))] + db: Arc, + #[cfg(feature = "kvstore")] + db: Arc, meta_cf: MetaCf, data_cf: DataCf, erasure_cf: ErasureCf, @@ -336,47 +153,6 @@ pub const DATA_CF: &str = "data"; pub const ERASURE_CF: &str = "erasure"; impl Blocktree { - // Opens a Ledger in directory, provides "infinite" window of blobs - pub fn open(ledger_path: &str) -> Result { - fs::create_dir_all(&ledger_path)?; - let ledger_path = Path::new(ledger_path).join(BLOCKTREE_DIRECTORY); - - // Use default database options - let db_options = Self::get_db_options(); - - // Column family names - let meta_cf_descriptor = ColumnFamilyDescriptor::new(META_CF, Self::get_cf_options()); - let data_cf_descriptor = ColumnFamilyDescriptor::new(DATA_CF, Self::get_cf_options()); - let erasure_cf_descriptor = ColumnFamilyDescriptor::new(ERASURE_CF, Self::get_cf_options()); - let cfs = vec![ - meta_cf_descriptor, - data_cf_descriptor, - erasure_cf_descriptor, - ]; - - // Open the database - let db = Arc::new(DB::open_cf_descriptors(&db_options, ledger_path, cfs)?); - - // Create the metadata column family - let meta_cf = MetaCf::new(db.clone()); - - // Create the data column family - let data_cf = DataCf::new(db.clone()); - - // Create the erasure column family - let erasure_cf = ErasureCf::new(db.clone()); - - let ticks_per_slot = DEFAULT_TICKS_PER_SLOT; - Ok(Blocktree { - db, - meta_cf, - data_cf, - erasure_cf, - new_blobs_signals: vec![], - ticks_per_slot, - }) - } - pub fn open_with_signal(ledger_path: &str) -> Result<(Self, Receiver)> { let mut blocktree = Self::open(ledger_path)?; let (signal_sender, signal_receiver) = sync_channel(1); @@ -422,14 +198,6 @@ impl Blocktree { Ok(()) } - pub fn destroy(ledger_path: &str) -> Result<()> { - // DB::destroy() fails if `ledger_path` doesn't exist - fs::create_dir_all(&ledger_path)?; - let ledger_path = Path::new(ledger_path).join(BLOCKTREE_DIRECTORY); - DB::destroy(&Options::default(), &ledger_path)?; - Ok(()) - } - pub fn get_next_slot(&self, slot: u64) -> Result> { let mut db_iterator = self.db.raw_iterator_cf(self.meta_cf.handle())?; db_iterator.seek(&MetaCf::key(slot + 1)); @@ -526,7 +294,7 @@ impl Blocktree { I: IntoIterator, I::Item: Borrow, { - let mut write_batch = WriteBatch::default(); + let mut write_batch = self.db.batch()?; // A map from slot to a 2-tuple of metadata: (working copy, backup copy), // so we can detect changes to the slot metadata later let mut slot_meta_working_set = HashMap::new(); @@ -672,24 +440,6 @@ impl Blocktree { Ok((total_blobs, total_current_size as u64)) } - /// Return an iterator for all the entries in the given file. - pub fn read_ledger(&self) -> Result> { - let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; - - db_iterator.seek_to_first(); - Ok(EntryIterator { - db_iterator, - blockhash: None, - }) - } - - pub fn read_ledger_blobs(&self) -> impl Iterator { - self.db - .iterator_cf(self.data_cf.handle(), IteratorMode::Start) - .unwrap() - .map(|(_, blob_data)| Blob::new(&blob_data)) - } - pub fn get_coding_blob_bytes(&self, slot: u64, index: u64) -> Result>> { self.erasure_cf.get_by_slot_index(slot, index) } @@ -703,7 +453,7 @@ impl Blocktree { self.erasure_cf.put_by_slot_index(slot, index, bytes) } - pub fn put_data_raw(&self, key: &[u8], value: &[u8]) -> Result<()> { + pub fn put_data_raw(&self, key: &KeyRef, value: &[u8]) -> Result<()> { self.data_cf.put(key, value) } @@ -738,9 +488,9 @@ impl Blocktree { slot: u64, start_index: u64, end_index: u64, - key: &dyn Fn(u64, u64) -> Vec, - slot_from_key: &dyn Fn(&[u8]) -> Result, - index_from_key: &dyn Fn(&[u8]) -> Result, + key: &dyn Fn(u64, u64) -> Key, + slot_from_key: &dyn Fn(&KeyRef) -> Result, + index_from_key: &dyn Fn(&KeyRef) -> Result, max_missing: usize, ) -> Vec { if start_index >= end_index || max_missing == 0 { @@ -897,27 +647,6 @@ impl Blocktree { .collect() } - fn get_cf_options() -> Options { - let mut options = Options::default(); - options.set_max_write_buffer_number(32); - options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); - options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); - options - } - - fn get_db_options() -> Options { - let mut options = Options::default(); - options.create_if_missing(true); - options.create_missing_column_families(true); - options.increase_parallelism(TOTAL_THREADS); - options.set_max_background_flushes(4); - options.set_max_background_compactions(4); - options.set_max_write_buffer_number(32); - options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); - options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); - options - } - fn slot_has_updates(slot_meta: &SlotMeta, slot_meta_backup: &Option) -> bool { // We should signal that there are updates if we extended the chain of consecutive blocks starting // from block 0, which is true iff: @@ -1204,7 +933,7 @@ impl Blocktree { bootstrap_meta.received = last.index() + 1; bootstrap_meta.is_rooted = true; - let mut batch = WriteBatch::default(); + let mut batch = self.db.batch()?; batch.put_cf( self.meta_cf.handle(), &meta_key, @@ -1220,45 +949,6 @@ impl Blocktree { } } -// TODO: all this goes away with Blocktree -struct EntryIterator { - db_iterator: DBRawIterator, - - // TODO: remove me when replay_stage is iterating by block (Blocktree) - // this verification is duplicating that of replay_stage, which - // can do this in parallel - blockhash: Option, - // https://github.com/rust-rocksdb/rust-rocksdb/issues/234 - // rocksdb issue: the _blocktree member must be lower in the struct to prevent a crash - // when the db_iterator member above is dropped. - // _blocktree is unused, but dropping _blocktree results in a broken db_iterator - // you have to hold the database open in order to iterate over it, and in order - // for db_iterator to be able to run Drop - // _blocktree: Blocktree, -} - -impl Iterator for EntryIterator { - type Item = Entry; - - fn next(&mut self) -> Option { - if self.db_iterator.valid() { - if let Some(value) = self.db_iterator.value() { - if let Ok(entry) = deserialize::(&value[BLOB_HEADER_SIZE..]) { - if let Some(blockhash) = self.blockhash { - if !entry.verify(&blockhash) { - return None; - } - } - self.db_iterator.next(); - self.blockhash = Some(entry.hash); - return Some(entry); - } - } - } - None - } -} - // Creates a new ledger with slot 0 full of ticks (and only ticks). // // Returns the blockhash that can be used to append entries with. diff --git a/core/src/blocktree/db.rs b/core/src/blocktree/db.rs new file mode 100644 index 0000000000..2cf315ac0e --- /dev/null +++ b/core/src/blocktree/db.rs @@ -0,0 +1,195 @@ +use crate::entry::Entry; +use crate::result::{Error, Result}; + +use bincode::{deserialize, serialize}; + +use serde::de::DeserializeOwned; +use serde::Serialize; + +use std::borrow::Borrow; +use std::sync::Arc; + +pub trait Database: Sized + Send + Sync { + type Error: Into; + type Key: Borrow; + type KeyRef: ?Sized; + type ColumnFamily; + type Cursor: Cursor; + type EntryIter: Iterator; + type WriteBatch: IWriteBatch; + + fn cf_handle(&self, cf: &str) -> Option; + + fn get_cf(&self, cf: Self::ColumnFamily, key: &Self::KeyRef) -> Result>>; + + fn put_cf(&self, cf: Self::ColumnFamily, key: &Self::KeyRef, data: &[u8]) -> Result<()>; + + fn delete_cf(&self, cf: Self::ColumnFamily, key: &Self::KeyRef) -> Result<()>; + + fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result; + + fn write(&self, batch: Self::WriteBatch) -> Result<()>; + + fn batch(&self) -> Result; +} + +pub trait Cursor { + fn valid(&self) -> bool; + + fn seek(&mut self, key: &D::KeyRef); + + fn seek_to_first(&mut self); + + fn next(&mut self); + + fn key(&self) -> Option; + + fn value(&self) -> Option>; +} + +pub trait IWriteBatch { + fn put_cf(&mut self, cf: D::ColumnFamily, key: &D::KeyRef, data: &[u8]) -> Result<()>; +} + +pub trait IDataCf: LedgerColumnFamilyRaw { + fn new(db: Arc) -> Self; + + fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { + let key = Self::key(slot, index); + self.get(key.borrow()) + } + + fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { + let key = Self::key(slot, index); + self.delete(&key.borrow()) + } + + fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { + let key = Self::key(slot, index); + self.put(key.borrow(), serialized_value) + } + + fn key(slot: u64, index: u64) -> D::Key; + + fn slot_from_key(key: &D::KeyRef) -> Result; + + fn index_from_key(key: &D::KeyRef) -> Result; +} + +pub trait IErasureCf: LedgerColumnFamilyRaw { + fn new(db: Arc) -> Self; + + fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { + let key = Self::key(slot, index); + self.delete(key.borrow()) + } + + fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { + let key = Self::key(slot, index); + self.get(key.borrow()) + } + + fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { + let key = Self::key(slot, index); + self.put(key.borrow(), serialized_value) + } + + fn key(slot: u64, index: u64) -> D::Key; + + fn slot_from_key(key: &D::KeyRef) -> Result; + + fn index_from_key(key: &D::KeyRef) -> Result; +} + +pub trait IMetaCf: LedgerColumnFamily { + fn new(db: Arc) -> Self; + + fn key(slot: u64) -> D::Key; + + fn get_slot_meta(&self, slot: u64) -> Result> { + let key = Self::key(slot); + self.get(key.borrow()) + } + + fn put_slot_meta(&self, slot: u64, slot_meta: &super::SlotMeta) -> Result<()> { + let key = Self::key(slot); + self.put(key.borrow(), slot_meta) + } + + fn index_from_key(key: &D::KeyRef) -> Result; +} + +pub trait LedgerColumnFamily { + type ValueType: DeserializeOwned + Serialize; + + fn get(&self, key: &D::KeyRef) -> Result> { + let db = self.db(); + let data_bytes = db.get_cf(self.handle(), key)?; + + if let Some(raw) = data_bytes { + let result: Self::ValueType = deserialize(&raw)?; + Ok(Some(result)) + } else { + Ok(None) + } + } + + fn get_bytes(&self, key: &D::KeyRef) -> Result>> { + let db = self.db(); + let data_bytes = db.get_cf(self.handle(), key)?; + Ok(data_bytes.map(|x| x.to_vec())) + } + + fn put_bytes(&self, key: &D::KeyRef, serialized_value: &[u8]) -> Result<()> { + let db = self.db(); + db.put_cf(self.handle(), key, &serialized_value)?; + Ok(()) + } + + fn put(&self, key: &D::KeyRef, value: &Self::ValueType) -> Result<()> { + let db = self.db(); + let serialized = serialize(value)?; + db.put_cf(self.handle(), key, &serialized)?; + Ok(()) + } + + fn delete(&self, key: &D::KeyRef) -> Result<()> { + let db = self.db(); + db.delete_cf(self.handle(), key)?; + Ok(()) + } + + fn db(&self) -> &Arc; + + fn handle(&self) -> D::ColumnFamily; +} + +pub trait LedgerColumnFamilyRaw { + fn get(&self, key: &D::KeyRef) -> Result>> { + let db = self.db(); + let data_bytes = db.get_cf(self.handle(), key)?; + Ok(data_bytes.map(|x| x.to_vec())) + } + + fn put(&self, key: &D::KeyRef, serialized_value: &[u8]) -> Result<()> { + let db = self.db(); + db.put_cf(self.handle(), &key, &serialized_value)?; + Ok(()) + } + + fn delete(&self, key: &D::KeyRef) -> Result<()> { + let db = self.db(); + db.delete_cf(self.handle(), &key)?; + Ok(()) + } + + fn raw_iterator(&self) -> D::Cursor { + let db = self.db(); + db.raw_iterator_cf(self.handle()) + .expect("Expected to be able to open database iterator") + } + + fn handle(&self) -> D::ColumnFamily; + + fn db(&self) -> &Arc; +} diff --git a/core/src/blocktree/kvs.rs b/core/src/blocktree/kvs.rs new file mode 100644 index 0000000000..d59235263c --- /dev/null +++ b/core/src/blocktree/kvs.rs @@ -0,0 +1,265 @@ +use crate::entry::Entry; +use crate::kvstore::{self, Key}; +use crate::packet::Blob; +use crate::result::{Error, Result}; + +use std::sync::Arc; + +use super::db::{ + Cursor, Database, IDataCf, IErasureCf, IMetaCf, IWriteBatch, LedgerColumnFamily, + LedgerColumnFamilyRaw, +}; +use super::{Blocktree, BlocktreeError}; + +#[derive(Debug)] +pub struct Kvs(()); + +/// The metadata column family +#[derive(Debug)] +pub struct MetaCf { + db: Arc, +} + +/// The data column family +#[derive(Debug)] +pub struct DataCf { + db: Arc, +} + +/// The erasure column family +#[derive(Debug)] +pub struct ErasureCf { + db: Arc, +} + +/// Dummy struct to get things compiling +/// TODO: all this goes away with Blocktree +pub struct EntryIterator(i32); +/// Dummy struct to get things compiling +pub struct KvsCursor; +/// Dummy struct to get things compiling +pub struct ColumnFamily; +/// Dummy struct to get things compiling +pub struct KvsWriteBatch; + +impl Blocktree { + /// Opens a Ledger in directory, provides "infinite" window of blobs + pub fn open(_ledger_path: &str) -> Result { + unimplemented!() + } + + #[allow(unreachable_code)] + pub fn read_ledger_blobs(&self) -> impl Iterator { + unimplemented!(); + self.read_ledger().unwrap().map(|_| Blob::new(&[])) + } + + /// Return an iterator for all the entries in the given file. + #[allow(unreachable_code)] + pub fn read_ledger(&self) -> Result> { + Ok(EntryIterator(unimplemented!())) + } + + pub fn destroy(_ledger_path: &str) -> Result<()> { + unimplemented!() + } +} + +impl Database for Kvs { + type Error = kvstore::Error; + type Key = Key; + type KeyRef = Key; + type ColumnFamily = ColumnFamily; + type Cursor = KvsCursor; + type EntryIter = EntryIterator; + type WriteBatch = KvsWriteBatch; + + fn cf_handle(&self, _cf: &str) -> Option { + unimplemented!() + } + + fn get_cf(&self, _cf: ColumnFamily, _key: &Key) -> Result>> { + unimplemented!() + } + + fn put_cf(&self, _cf: ColumnFamily, _key: &Key, _data: &[u8]) -> Result<()> { + unimplemented!() + } + + fn delete_cf(&self, _cf: Self::ColumnFamily, _key: &Key) -> Result<()> { + unimplemented!() + } + + fn raw_iterator_cf(&self, _cf: Self::ColumnFamily) -> Result { + unimplemented!() + } + + fn write(&self, _batch: Self::WriteBatch) -> Result<()> { + unimplemented!() + } + + fn batch(&self) -> Result { + unimplemented!() + } +} + +impl Cursor for KvsCursor { + fn valid(&self) -> bool { + unimplemented!() + } + + fn seek(&mut self, _key: &Key) { + unimplemented!() + } + + fn seek_to_first(&mut self) { + unimplemented!() + } + + fn next(&mut self) { + unimplemented!() + } + + fn key(&self) -> Option { + unimplemented!() + } + + fn value(&self) -> Option> { + unimplemented!() + } +} + +impl IWriteBatch for KvsWriteBatch { + fn put_cf(&mut self, _cf: ColumnFamily, _key: &Key, _data: &[u8]) -> Result<()> { + unimplemented!() + } +} + +impl IDataCf for DataCf { + fn new(db: Arc) -> Self { + DataCf { db } + } + + fn get_by_slot_index(&self, _slot: u64, _index: u64) -> Result>> { + unimplemented!() + } + + fn delete_by_slot_index(&self, _slot: u64, _index: u64) -> Result<()> { + unimplemented!() + } + + fn put_by_slot_index(&self, _slot: u64, _index: u64, _serialized_value: &[u8]) -> Result<()> { + unimplemented!() + } + + fn key(_slot: u64, _index: u64) -> Key { + unimplemented!() + } + + fn slot_from_key(_key: &Key) -> Result { + unimplemented!() + } + + fn index_from_key(_key: &Key) -> Result { + unimplemented!() + } +} + +impl IErasureCf for ErasureCf { + fn new(db: Arc) -> Self { + ErasureCf { db } + } + + fn delete_by_slot_index(&self, _slot: u64, _index: u64) -> Result<()> { + unimplemented!() + } + + fn get_by_slot_index(&self, _slot: u64, _index: u64) -> Result>> { + unimplemented!() + } + + fn put_by_slot_index(&self, _slot: u64, _index: u64, _serialized_value: &[u8]) -> Result<()> { + unimplemented!() + } + + fn key(slot: u64, index: u64) -> Key { + DataCf::key(slot, index) + } + + fn slot_from_key(key: &Key) -> Result { + DataCf::slot_from_key(key) + } + + fn index_from_key(key: &Key) -> Result { + DataCf::index_from_key(key) + } +} + +impl IMetaCf for MetaCf { + fn new(db: Arc) -> Self { + MetaCf { db } + } + + fn key(_slot: u64) -> Key { + unimplemented!() + } + + fn get_slot_meta(&self, _slot: u64) -> Result> { + unimplemented!() + } + + fn put_slot_meta(&self, _slot: u64, _slot_meta: &super::SlotMeta) -> Result<()> { + unimplemented!() + } + + fn index_from_key(_key: &Key) -> Result { + unimplemented!() + } +} + +impl LedgerColumnFamilyRaw for DataCf { + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(super::DATA_CF).unwrap() + } +} + +impl LedgerColumnFamilyRaw for ErasureCf { + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(super::ERASURE_CF).unwrap() + } +} + +impl LedgerColumnFamily for MetaCf { + type ValueType = super::SlotMeta; + + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(super::META_CF).unwrap() + } +} + +impl std::convert::From for Error { + fn from(e: kvstore::Error) -> Error { + Error::BlocktreeError(BlocktreeError::KvsDb(e)) + } +} + +/// TODO: all this goes away with Blocktree +impl Iterator for EntryIterator { + type Item = Entry; + + fn next(&mut self) -> Option { + unimplemented!() + } +} diff --git a/core/src/blocktree/kvstore.rs b/core/src/blocktree/kvstore.rs new file mode 100644 index 0000000000..e69de29bb2 diff --git a/core/src/blocktree/rocks.rs b/core/src/blocktree/rocks.rs new file mode 100644 index 0000000000..c7f26f092b --- /dev/null +++ b/core/src/blocktree/rocks.rs @@ -0,0 +1,400 @@ +use crate::entry::Entry; +use crate::packet::{Blob, BLOB_HEADER_SIZE}; +use crate::result::{Error, Result}; + +use bincode::deserialize; + +use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; + +use rocksdb::{ + self, ColumnFamily, ColumnFamilyDescriptor, DBRawIterator, IteratorMode, Options, + WriteBatch as RWriteBatch, DB, +}; + +use solana_sdk::hash::Hash; +use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT; + +use std::fs; +use std::io; +use std::path::Path; +use std::sync::Arc; + +use super::db::{ + Cursor, Database, IDataCf, IErasureCf, IMetaCf, IWriteBatch, LedgerColumnFamily, + LedgerColumnFamilyRaw, +}; +use super::{Blocktree, BlocktreeError}; + +// A good value for this is the number of cores on the machine +const TOTAL_THREADS: i32 = 8; +const MAX_WRITE_BUFFER_SIZE: usize = 512 * 1024 * 1024; + +#[derive(Debug)] +pub struct Rocks(rocksdb::DB); + +/// The metadata column family +#[derive(Debug)] +pub struct MetaCf { + db: Arc, +} + +/// The data column family +#[derive(Debug)] +pub struct DataCf { + db: Arc, +} + +/// The erasure column family +#[derive(Debug)] +pub struct ErasureCf { + db: Arc, +} + +/// TODO: all this goes away with Blocktree +pub struct EntryIterator { + db_iterator: DBRawIterator, + + // TODO: remove me when replay_stage is iterating by block (Blocktree) + // this verification is duplicating that of replay_stage, which + // can do this in parallel + blockhash: Option, + // https://github.com/rust-rocksdb/rust-rocksdb/issues/234 + // rocksdb issue: the _blocktree member must be lower in the struct to prevent a crash + // when the db_iterator member above is dropped. + // _blocktree is unused, but dropping _blocktree results in a broken db_iterator + // you have to hold the database open in order to iterate over it, and in order + // for db_iterator to be able to run Drop + // _blocktree: Blocktree, +} + +impl Blocktree { + /// Opens a Ledger in directory, provides "infinite" window of blobs + pub fn open(ledger_path: &str) -> Result { + fs::create_dir_all(&ledger_path)?; + let ledger_path = Path::new(ledger_path).join(super::BLOCKTREE_DIRECTORY); + + // Use default database options + let db_options = Blocktree::get_db_options(); + + // Column family names + let meta_cf_descriptor = + ColumnFamilyDescriptor::new(super::META_CF, Blocktree::get_cf_options()); + let data_cf_descriptor = + ColumnFamilyDescriptor::new(super::DATA_CF, Blocktree::get_cf_options()); + let erasure_cf_descriptor = + ColumnFamilyDescriptor::new(super::ERASURE_CF, Blocktree::get_cf_options()); + let cfs = vec![ + meta_cf_descriptor, + data_cf_descriptor, + erasure_cf_descriptor, + ]; + + // Open the database + let db = Arc::new(Rocks(DB::open_cf_descriptors( + &db_options, + ledger_path, + cfs, + )?)); + + // Create the metadata column family + let meta_cf = MetaCf::new(db.clone()); + + // Create the data column family + let data_cf = DataCf::new(db.clone()); + + // Create the erasure column family + let erasure_cf = ErasureCf::new(db.clone()); + + let ticks_per_slot = DEFAULT_TICKS_PER_SLOT; + Ok(Blocktree { + db, + meta_cf, + data_cf, + erasure_cf, + new_blobs_signals: vec![], + ticks_per_slot, + }) + } + + pub fn read_ledger_blobs(&self) -> impl Iterator { + self.db + .0 + .iterator_cf(self.data_cf.handle(), IteratorMode::Start) + .unwrap() + .map(|(_, blob_data)| Blob::new(&blob_data)) + } + + /// Return an iterator for all the entries in the given file. + pub fn read_ledger(&self) -> Result> { + let mut db_iterator = self.db.raw_iterator_cf(self.data_cf.handle())?; + + db_iterator.seek_to_first(); + Ok(EntryIterator { + db_iterator, + blockhash: None, + }) + } + + pub fn destroy(ledger_path: &str) -> Result<()> { + // DB::destroy() fails if `ledger_path` doesn't exist + fs::create_dir_all(&ledger_path)?; + let ledger_path = Path::new(ledger_path).join(super::BLOCKTREE_DIRECTORY); + DB::destroy(&Options::default(), &ledger_path)?; + Ok(()) + } + + fn get_cf_options() -> Options { + let mut options = Options::default(); + options.set_max_write_buffer_number(32); + options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); + options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); + options + } + + fn get_db_options() -> Options { + let mut options = Options::default(); + options.create_if_missing(true); + options.create_missing_column_families(true); + options.increase_parallelism(TOTAL_THREADS); + options.set_max_background_flushes(4); + options.set_max_background_compactions(4); + options.set_max_write_buffer_number(32); + options.set_write_buffer_size(MAX_WRITE_BUFFER_SIZE); + options.set_max_bytes_for_level_base(MAX_WRITE_BUFFER_SIZE as u64); + options + } +} + +impl Database for Rocks { + type Error = rocksdb::Error; + type Key = Vec; + type KeyRef = [u8]; + type ColumnFamily = ColumnFamily; + type Cursor = DBRawIterator; + type EntryIter = EntryIterator; + type WriteBatch = RWriteBatch; + + fn cf_handle(&self, cf: &str) -> Option { + self.0.cf_handle(cf) + } + + fn get_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result>> { + let opt = self.0.get_cf(cf, key)?; + Ok(opt.map(|dbvec| dbvec.to_vec())) + } + + fn put_cf(&self, cf: ColumnFamily, key: &[u8], data: &[u8]) -> Result<()> { + self.0.put_cf(cf, key, data)?; + Ok(()) + } + + fn delete_cf(&self, cf: Self::ColumnFamily, key: &[u8]) -> Result<()> { + self.0.delete_cf(cf, key).map_err(From::from) + } + + fn raw_iterator_cf(&self, cf: Self::ColumnFamily) -> Result { + Ok(self.0.raw_iterator_cf(cf)?) + } + + fn write(&self, batch: Self::WriteBatch) -> Result<()> { + self.0.write(batch).map_err(From::from) + } + + fn batch(&self) -> Result { + Ok(RWriteBatch::default()) + } +} + +impl Cursor for DBRawIterator { + fn valid(&self) -> bool { + DBRawIterator::valid(self) + } + + fn seek(&mut self, key: &[u8]) { + DBRawIterator::seek(self, key) + } + + fn seek_to_first(&mut self) { + DBRawIterator::seek_to_first(self) + } + + fn next(&mut self) { + DBRawIterator::next(self) + } + + fn key(&self) -> Option> { + DBRawIterator::key(self) + } + + fn value(&self) -> Option> { + DBRawIterator::value(self) + } +} + +impl IWriteBatch for RWriteBatch { + fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], data: &[u8]) -> Result<()> { + RWriteBatch::put_cf(self, cf, key, data)?; + Ok(()) + } +} + +impl IDataCf for DataCf { + fn new(db: Arc) -> Self { + DataCf { db } + } + + fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { + let key = Self::key(slot, index); + self.get(&key) + } + + fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { + let key = Self::key(slot, index); + self.delete(&key) + } + + fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { + let key = Self::key(slot, index); + self.put(&key, serialized_value) + } + + fn key(slot: u64, index: u64) -> Vec { + let mut key = vec![0u8; 16]; + BigEndian::write_u64(&mut key[0..8], slot); + BigEndian::write_u64(&mut key[8..16], index); + key + } + + fn slot_from_key(key: &[u8]) -> Result { + let mut rdr = io::Cursor::new(&key[0..8]); + let height = rdr.read_u64::()?; + Ok(height) + } + + fn index_from_key(key: &[u8]) -> Result { + let mut rdr = io::Cursor::new(&key[8..16]); + let index = rdr.read_u64::()?; + Ok(index) + } +} + +impl IErasureCf for ErasureCf { + fn new(db: Arc) -> Self { + ErasureCf { db } + } + fn delete_by_slot_index(&self, slot: u64, index: u64) -> Result<()> { + let key = Self::key(slot, index); + self.delete(&key) + } + + fn get_by_slot_index(&self, slot: u64, index: u64) -> Result>> { + let key = Self::key(slot, index); + self.get(&key) + } + + fn put_by_slot_index(&self, slot: u64, index: u64, serialized_value: &[u8]) -> Result<()> { + let key = Self::key(slot, index); + self.put(&key, serialized_value) + } + + fn key(slot: u64, index: u64) -> Vec { + DataCf::key(slot, index) + } + + fn slot_from_key(key: &[u8]) -> Result { + DataCf::slot_from_key(key) + } + + fn index_from_key(key: &[u8]) -> Result { + DataCf::index_from_key(key) + } +} + +impl IMetaCf for MetaCf { + fn new(db: Arc) -> Self { + MetaCf { db } + } + + fn key(slot: u64) -> Vec { + let mut key = vec![0u8; 8]; + BigEndian::write_u64(&mut key[0..8], slot); + key + } + + fn get_slot_meta(&self, slot: u64) -> Result> { + let key = Self::key(slot); + self.get(&key) + } + + fn put_slot_meta(&self, slot: u64, slot_meta: &super::SlotMeta) -> Result<()> { + let key = Self::key(slot); + self.put(&key, slot_meta) + } + + fn index_from_key(key: &[u8]) -> Result { + let mut rdr = io::Cursor::new(&key[..]); + let index = rdr.read_u64::()?; + Ok(index) + } +} + +impl LedgerColumnFamilyRaw for DataCf { + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(super::DATA_CF).unwrap() + } +} + +impl LedgerColumnFamilyRaw for ErasureCf { + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(super::ERASURE_CF).unwrap() + } +} + +impl LedgerColumnFamily for MetaCf { + type ValueType = super::SlotMeta; + + fn db(&self) -> &Arc { + &self.db + } + + fn handle(&self) -> ColumnFamily { + self.db.cf_handle(super::META_CF).unwrap() + } +} + +impl std::convert::From for Error { + fn from(e: rocksdb::Error) -> Error { + Error::BlocktreeError(BlocktreeError::RocksDb(e)) + } +} + +/// TODO: all this goes away with Blocktree +impl Iterator for EntryIterator { + type Item = Entry; + + fn next(&mut self) -> Option { + if self.db_iterator.valid() { + if let Some(value) = self.db_iterator.value() { + if let Ok(entry) = deserialize::(&value[BLOB_HEADER_SIZE..]) { + if let Some(blockhash) = self.blockhash { + if !entry.verify(&blockhash) { + return None; + } + } + self.db_iterator.next(); + self.blockhash = Some(entry.hash); + return Some(entry); + } + } + } + None + } +} diff --git a/core/src/kvstore.rs b/core/src/kvstore.rs new file mode 100644 index 0000000000..d27696d66d --- /dev/null +++ b/core/src/kvstore.rs @@ -0,0 +1,345 @@ +use crate::kvstore::mapper::{Disk, Mapper, Memory}; +use crate::kvstore::sstable::SSTable; +use crate::kvstore::storage::WriteState; +use crate::kvstore::writelog::WriteLog; + +use std::collections::BTreeMap; +use std::fs; +use std::io; +use std::ops::RangeInclusive; +use std::path::{Path, PathBuf}; +use std::sync::mpsc::{Receiver, Sender}; +use std::sync::{Arc, RwLock}; +use std::thread::JoinHandle; + +mod compactor; +mod error; +mod io_utils; +mod mapper; +mod readtx; +mod sstable; +mod storage; +mod writelog; +mod writetx; + +pub use self::error::{Error, Result}; +pub use self::readtx::ReadTx as Snapshot; +pub use self::sstable::Key; +pub use self::writetx::WriteTx; + +const TABLES_FILE: &str = "tables.meta"; +const LOG_FILE: &str = "mem-log"; +const DEFAULT_TABLE_SIZE: usize = 64 * 1024 * 1024; +const DEFAULT_MEM_SIZE: usize = 64 * 1024 * 1024; +const DEFAULT_MAX_PAGES: usize = 10; + +#[derive(Debug, PartialEq, Copy, Clone)] +pub struct Config { + pub max_mem: usize, + pub max_tables: usize, + pub page_size: usize, + pub in_memory: bool, +} + +#[derive(Debug)] +pub struct KvStore { + write: RwLock, + tables: RwLock>>, + config: Config, + root: PathBuf, + mapper: Arc, + req_tx: RwLock>, + resp_rx: RwLock>, + compactor_handle: JoinHandle<()>, +} + +impl KvStore { + pub fn open_default

(root: P) -> Result + where + P: AsRef, + { + let mapper = Disk::single(root.as_ref()); + open(root.as_ref(), Arc::new(mapper), Config::default()) + } + + pub fn open

(root: P, config: Config) -> Result + where + P: AsRef, + { + let mapper: Arc = if config.in_memory { + Arc::new(Memory::new()) + } else { + Arc::new(Disk::single(root.as_ref())) + }; + open(root.as_ref(), mapper, config) + } + + pub fn partitioned(root: P, storage_dirs: &[P2], config: Config) -> Result + where + P: AsRef, + P2: AsRef, + { + let mapper = Disk::new(storage_dirs); + open(root.as_ref(), Arc::new(mapper), config) + } + + pub fn config(&self) -> &Config { + &self.config + } + + pub fn put(&self, key: &Key, data: &[u8]) -> Result<()> { + self.ensure_mem()?; + + let mut write = self.write.write().unwrap(); + + write.put(key, data)?; + write.commit += 1; + + Ok(()) + } + + pub fn put_many(&self, rows: Iter) -> Result<()> + where + Iter: Iterator, + Tup: std::borrow::Borrow<(K, V)>, + K: std::borrow::Borrow, + V: std::borrow::Borrow<[u8]>, + { + { + let mut write = self.write.write().unwrap(); + + for pair in rows { + let tup = pair.borrow(); + let (key, data) = (tup.0.borrow(), tup.1.borrow()); + write.put(key, data)?; + } + write.commit += 1; + } + + self.ensure_mem()?; + + Ok(()) + } + + pub fn get(&self, key: &Key) -> Result>> { + self.query_compactor()?; + + let (write_state, tables) = (self.write.read().unwrap(), self.tables.read().unwrap()); + + storage::get(&write_state.values, &*tables, key) + } + + pub fn delete(&self, key: &Key) -> Result<()> { + self.query_compactor()?; + + { + let mut write = self.write.write().unwrap(); + + write.delete(key)?; + write.commit += 1; + } + + self.ensure_mem()?; + Ok(()) + } + + pub fn delete_many(&self, rows: Iter) -> Result<()> + where + Iter: Iterator, + K: std::borrow::Borrow, + { + self.query_compactor()?; + + { + let mut write = self.write.write().unwrap(); + for k in rows { + let key = k.borrow(); + write.delete(key)?; + } + write.commit += 1; + } + + self.ensure_mem()?; + Ok(()) + } + + pub fn transaction(&self) -> Result { + unimplemented!() + } + + pub fn commit(&self, _txn: WriteTx) -> Result<()> { + unimplemented!() + } + + pub fn snapshot(&self) -> Snapshot { + let (state, tables) = (self.write.read().unwrap(), self.tables.read().unwrap()); + + Snapshot::new(state.values.clone(), tables.clone()) + } + + pub fn range( + &self, + range: RangeInclusive, + ) -> Result)>> { + self.query_compactor()?; + + let (write_state, tables) = (self.write.read().unwrap(), self.tables.read().unwrap()); + storage::range(&write_state.values, &*tables, range) + } + + pub fn destroy

(path: P) -> Result<()> + where + P: AsRef, + { + let path = path.as_ref(); + if !path.exists() { + return Ok(()); + } + + fs::remove_dir_all(path)?; + Ok(()) + } + + fn query_compactor(&self) -> Result<()> { + if let (Ok(mut req_tx), Ok(mut resp_rx), Ok(mut tables)) = ( + self.req_tx.try_write(), + self.resp_rx.try_write(), + self.tables.try_write(), + ) { + query_compactor( + &self.root, + &*self.mapper, + &mut *tables, + &mut *resp_rx, + &mut *req_tx, + )?; + } + + Ok(()) + } + + fn ensure_mem(&self) -> Result<()> { + let trigger_compact = { + let mut write_rw = self.write.write().unwrap(); + + if write_rw.mem_size < self.config.max_mem { + return Ok(()); + } + + let mut tables = self.tables.write().unwrap(); + storage::flush_table(&write_rw.values, &*self.mapper, &mut *tables)?; + + write_rw.reset()?; + write_rw.commit += 1; + + is_lvl0_full(&tables, &self.config) + }; + + dump_tables(&self.root, &*self.mapper).unwrap(); + if trigger_compact { + let tables_path = self.root.join(TABLES_FILE); + self.req_tx + .write() + .unwrap() + .send(compactor::Req::Start(tables_path)) + .expect("compactor thread dead"); + } + + Ok(()) + } +} + +impl Default for Config { + fn default() -> Config { + Config { + max_mem: DEFAULT_MEM_SIZE, + max_tables: DEFAULT_MAX_PAGES, + page_size: DEFAULT_TABLE_SIZE, + in_memory: false, + } + } +} + +fn open(root: &Path, mapper: Arc, config: Config) -> Result { + let root = root.to_path_buf(); + let log_path = root.join(LOG_FILE); + if !root.exists() { + fs::create_dir(&root)?; + } + + let write_log = WriteLog::open(&log_path, config.max_mem)?; + let mem = write_log.materialize()?; + + let write = RwLock::new(WriteState::new(write_log, mem)); + + let tables = load_tables(&root, &*mapper)?; + let tables = RwLock::new(tables); + + let cfg = compactor::Config { + max_pages: config.max_tables, + page_size: config.page_size, + }; + let (req_tx, resp_rx, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let (req_tx, resp_rx) = (RwLock::new(req_tx), RwLock::new(resp_rx)); + + Ok(KvStore { + write, + tables, + config, + mapper, + root, + req_tx, + resp_rx, + compactor_handle, + }) +} + +fn load_tables(root: &Path, mapper: &dyn Mapper) -> Result>> { + let mut tables = Vec::new(); + let meta_path = root.join(TABLES_FILE); + + if meta_path.exists() { + mapper.load_state_from(&meta_path)?; + tables = SSTable::sorted_tables(&mapper.active_set()?); + } + + Ok(tables) +} + +fn dump_tables(root: &Path, mapper: &Mapper) -> Result<()> { + mapper.serialize_state_to(&root.join(TABLES_FILE))?; + Ok(()) +} + +fn query_compactor( + root: &Path, + mapper: &dyn Mapper, + tables: &mut Vec>, + resp_rx: &mut Receiver, + req_tx: &mut Sender, +) -> Result<()> { + match resp_rx.try_recv() { + Ok(compactor::Resp::Done(new_tables)) => { + std::mem::replace(tables, new_tables); + dump_tables(root, mapper)?; + req_tx.send(compactor::Req::Gc).unwrap(); + } + Ok(compactor::Resp::Failed(e)) => { + return Err(e); + } + // Nothing available, do nothing + _ => {} + } + + Ok(()) +} + +#[inline] +fn is_lvl0_full(tables: &[BTreeMap], config: &Config) -> bool { + if tables.is_empty() { + false + } else { + tables[0].len() > config.max_tables + } +} diff --git a/core/src/kvstore/compactor.rs b/core/src/kvstore/compactor.rs new file mode 100644 index 0000000000..0e1c444847 --- /dev/null +++ b/core/src/kvstore/compactor.rs @@ -0,0 +1,223 @@ +use crate::kvstore::error::{Error, Result}; +use crate::kvstore::mapper::{Kind, Mapper}; +use crate::kvstore::sstable::{Key, Merged, SSTable}; + +use std::collections::BTreeMap; +use std::path::PathBuf; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; + +type TableVec = Vec>; +type TableSlice<'a> = &'a [BTreeMap]; + +#[derive(Debug, Copy, Clone)] +pub struct Config { + pub max_pages: usize, + pub page_size: usize, +} + +#[derive(Debug)] +pub enum Req { + Start(PathBuf), + Gc, +} + +#[derive(Debug)] +pub enum Resp { + Done(TableVec), + Failed(Error), +} + +pub fn spawn_compactor( + mapper: Arc, + config: Config, +) -> Result<(Sender, Receiver, JoinHandle<()>)> { + let (req_tx, req_rx) = channel(); + let (resp_tx, resp_rx) = channel(); + + let handle = thread::spawn(move || { + let _ignored = run_loop(mapper, config, req_rx, resp_tx); + }); + + Ok((req_tx, resp_rx, handle)) +} + +fn run_loop( + mapper: Arc, + config: Config, + req_rx: Receiver, + resp_tx: Sender, +) -> Result<()> { + while let Ok(msg) = req_rx.recv() { + match msg { + Req::Start(_) => { + let new_tables_res = run_compaction(&*mapper, &config); + + match new_tables_res { + Ok(new_tables) => { + resp_tx.send(Resp::Done(new_tables))?; + } + Err(e) => { + resp_tx.send(Resp::Failed(e))?; + } + } + } + Req::Gc => { + let _ = mapper.empty_trash(); + } + } + } + + Ok(()) +} + +fn run_compaction(mapper: &dyn Mapper, config: &Config) -> Result { + let mut tables = load_tables(mapper)?; + + compact_level_0(mapper, &mut tables, config)?; + + for level in 1..tables.len() { + while level_needs_compact(level as u8, config, &tables) { + compact_upper_level(mapper, &mut tables, config, level as u8)?; + } + } + + // move old tables to garbage + mapper.rotate_tables()?; + + Ok(tables) +} + +fn compact_level_0(mapper: &dyn Mapper, tables: &mut TableVec, config: &Config) -> Result<()> { + assert!(!tables.is_empty()); + + if tables.len() == 1 { + tables.push(BTreeMap::new()); + } + + let mut new_tables = BTreeMap::new(); + { + let sources = tables + .iter() + .take(2) + .map(BTreeMap::values) + .flatten() + .map(|sst| sst.range(&(Key::ALL_INCLUSIVE))) + .collect::>>()?; + + let mut iter = Merged::new(sources).peekable(); + while iter.peek().is_some() { + let sst = mapper.make_table(Kind::Compaction, &mut |mut data_wtr, mut index_wtr| { + SSTable::create_capped( + &mut iter, + 1, + config.page_size as u64, + &mut data_wtr, + &mut index_wtr, + ); + })?; + + new_tables.insert(sst.meta().start, sst); + } + } + + tables[0].clear(); + tables[1].clear(); + + tables[1].append(&mut new_tables); + + Ok(()) +} + +fn compact_upper_level( + mapper: &dyn Mapper, + pages: &mut TableVec, + config: &Config, + level: u8, +) -> Result<()> { + assert!(1 <= level && (level as usize) < pages.len()); + assert!(!pages[level as usize].is_empty()); + + let next_level = level + 1; + let level = level as usize; + + if next_level as usize == pages.len() { + pages.push(BTreeMap::new()); + } + + let (&key, chosen_sst) = pages[level].iter().next_back().unwrap(); + let (start, end) = { + let meta = chosen_sst.meta(); + (meta.start, meta.end) + }; + + let mut page_keys = Vec::new(); + let mut merge_with = Vec::new(); + + for (key, sst) in pages[next_level as usize].iter() { + if sst.is_overlap(&(start..=end)) { + page_keys.push(*key); + merge_with.push(sst); + } + } + + let mut new_tables = BTreeMap::new(); + { + let sources = merge_with + .into_iter() + .chain(std::iter::once(chosen_sst)) + .map(|sst| sst.range(&(Key::ALL_INCLUSIVE))) + .collect::>>()?; + + let mut iter = Merged::new(sources).peekable(); + + while iter.peek().is_some() { + let sst = mapper.make_table(Kind::Compaction, &mut |mut data_wtr, mut index_wtr| { + SSTable::create_capped( + &mut iter, + next_level, + config.page_size as u64, + &mut data_wtr, + &mut index_wtr, + ); + })?; + + new_tables.insert(sst.meta().start, sst); + } + } + + // delete merged page and merged pages in next level + pages[level].remove(&key).unwrap(); + + for start_key in page_keys { + pages[next_level as usize].remove(&start_key).unwrap(); + } + + pages[next_level as usize].append(&mut new_tables); + + Ok(()) +} + +fn load_tables(mapper: &dyn Mapper) -> Result { + Ok(SSTable::sorted_tables(&mapper.active_set()?)) +} + +#[inline] +fn level_max(level: u8, config: &Config) -> usize { + match level { + 0 => config.max_pages, + x => 10usize.pow(u32::from(x)), + } +} + +#[inline] +fn level_needs_compact(level: u8, config: &Config, tables: TableSlice) -> bool { + if level as usize >= tables.len() { + return false; + } + + let max = level_max(level, config); + + tables[level as usize].len() > max +} diff --git a/core/src/kvstore/error.rs b/core/src/kvstore/error.rs new file mode 100644 index 0000000000..5a29317330 --- /dev/null +++ b/core/src/kvstore/error.rs @@ -0,0 +1,76 @@ +use std::error::Error as StdErr; +use std::fmt; +use std::io; +use std::result::Result as StdRes; +use std::sync::mpsc::{RecvError, SendError, TryRecvError}; + +pub type Result = StdRes; + +#[derive(Debug)] +pub enum Error { + Io(io::Error), + Corrupted(bincode::Error), + Channel(Box), + Missing, +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::Corrupted(_) => write!(f, "Serialization error: Store may be corrupted"), + Error::Channel(e) => write!(f, "Internal communication error: {}", e), + Error::Io(e) => write!(f, "I/O error: {}", e), + Error::Missing => write!(f, "Item not present in ledger"), + } + } +} + +impl StdErr for Error { + fn source(&self) -> Option<&(dyn StdErr + 'static)> { + match self { + Error::Io(e) => Some(e), + Error::Corrupted(ref e) => Some(e), + Error::Channel(e) => Some(e.as_ref()), + Error::Missing => None, + } + } +} + +impl From for Error { + fn from(e: io::Error) -> Self { + Error::Io(e) + } +} + +impl From> for Error { + fn from(e: io::IntoInnerError) -> Self { + Error::Io(e.into()) + } +} + +impl From for Error { + fn from(e: bincode::Error) -> Self { + Error::Corrupted(e) + } +} + +impl From> for Error +where + T: Send + Sync + 'static, +{ + fn from(e: SendError) -> Self { + Error::Channel(Box::new(e)) + } +} + +impl From for Error { + fn from(e: RecvError) -> Self { + Error::Channel(Box::new(e)) + } +} + +impl From for Error { + fn from(e: TryRecvError) -> Self { + Error::Channel(Box::new(e)) + } +} diff --git a/core/src/kvstore/io_utils.rs b/core/src/kvstore/io_utils.rs new file mode 100644 index 0000000000..a2b8b3af95 --- /dev/null +++ b/core/src/kvstore/io_utils.rs @@ -0,0 +1,131 @@ +use memmap::Mmap; + +use std::fs::File; +use std::io::{self, BufWriter, Seek, SeekFrom, Write}; +use std::ops::Deref; +use std::sync::{Arc, RwLock}; + +const BACKING_ERR: &str = "In-memory table lock poisoned; concurrency error"; + +#[derive(Debug)] +pub enum MemMap { + Disk(Mmap), + Mem(Arc>>), +} + +#[derive(Debug)] +pub enum Writer { + Disk(BufWriter), + Mem(SharedWriter), +} + +#[derive(Debug)] +pub struct SharedWriter { + buf: Arc>>, + pos: u64, +} + +impl SharedWriter { + pub fn new(buf: Arc>>) -> SharedWriter { + SharedWriter { buf, pos: 0 } + } +} + +impl Deref for MemMap { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + match self { + MemMap::Disk(mmap) => mmap.deref(), + MemMap::Mem(vec) => { + let buf = vec.read().expect(BACKING_ERR); + let slice = buf.as_slice(); + + // transmute lifetime. Relying on the RwLock + immutability for safety + unsafe { std::mem::transmute(slice) } + } + } + } +} + +impl Write for SharedWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + use std::cmp; + + let mut vec = self.buf.write().expect(BACKING_ERR); + + // Calc ranges + let space_remaining = vec.len() - self.pos as usize; + let copy_len = cmp::min(buf.len(), space_remaining); + let copy_src_range = 0..copy_len; + let append_src_range = copy_len..buf.len(); + let copy_dest_range = self.pos as usize..(self.pos as usize + copy_len); + + // Copy then append + (&mut vec[copy_dest_range]).copy_from_slice(&buf[copy_src_range]); + vec.extend_from_slice(&buf[append_src_range]); + + let written = buf.len(); + + self.pos += written as u64; + + Ok(written) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + let _written = self.write(buf)?; + Ok(()) + } +} + +impl Seek for SharedWriter { + fn seek(&mut self, to: SeekFrom) -> io::Result { + self.pos = match to { + SeekFrom::Start(new_pos) => new_pos, + SeekFrom::Current(diff) => (self.pos as i64 + diff) as u64, + SeekFrom::End(rpos) => (self.buf.read().expect(BACKING_ERR).len() as i64 + rpos) as u64, + }; + + Ok(self.pos) + } +} + +impl Write for Writer { + fn write(&mut self, buf: &[u8]) -> io::Result { + match self { + Writer::Disk(ref mut wtr) => wtr.write(buf), + Writer::Mem(ref mut wtr) => wtr.write(buf), + } + } + + fn flush(&mut self) -> io::Result<()> { + match self { + Writer::Disk(ref mut wtr) => { + wtr.flush()?; + wtr.get_mut().sync_data()?; + Ok(()) + } + Writer::Mem(ref mut wtr) => wtr.flush(), + } + } + + fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { + match self { + Writer::Disk(ref mut wtr) => wtr.write_all(buf), + Writer::Mem(ref mut wtr) => wtr.write_all(buf), + } + } +} + +impl Seek for Writer { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + match self { + Writer::Disk(ref mut wtr) => wtr.seek(pos), + Writer::Mem(ref mut wtr) => wtr.seek(pos), + } + } +} diff --git a/core/src/kvstore/mapper.rs b/core/src/kvstore/mapper.rs new file mode 100644 index 0000000000..fab8f4250b --- /dev/null +++ b/core/src/kvstore/mapper.rs @@ -0,0 +1,50 @@ +use crate::kvstore::io_utils::Writer; +use crate::kvstore::sstable::SSTable; +use crate::kvstore::Result; + +use std::path::Path; +use std::sync::RwLock; + +mod disk; +mod memory; + +pub use self::disk::Disk; +pub use self::memory::Memory; + +pub trait Mapper: std::fmt::Debug + Send + Sync { + fn make_table(&self, kind: Kind, func: &mut FnMut(Writer, Writer)) -> Result; + fn rotate_tables(&self) -> Result<()>; + fn empty_trash(&self) -> Result<()>; + fn active_set(&self) -> Result>; + fn serialize_state_to(&self, path: &Path) -> Result<()>; + fn load_state_from(&self, path: &Path) -> Result<()>; +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)] +pub enum Kind { + Active, + Compaction, + Garbage, +} + +pub trait RwLockExt { + fn read_as U>(&self, f: F) -> U; + fn write_as U>(&self, f: F) -> U; + fn try_read_as U>(&self, f: F) -> U; + fn try_write_as U>(&self, f: F) -> U; +} + +impl RwLockExt for RwLock { + fn read_as U>(&self, f: F) -> U { + f(&*self.read().unwrap()) + } + fn write_as U>(&self, f: F) -> U { + f(&mut *self.write().unwrap()) + } + fn try_read_as U>(&self, f: F) -> U { + f(&*self.try_read().unwrap()) + } + fn try_write_as U>(&self, f: F) -> U { + f(&mut *self.try_write().unwrap()) + } +} diff --git a/core/src/kvstore/mapper/disk.rs b/core/src/kvstore/mapper/disk.rs new file mode 100644 index 0000000000..37d3125f48 --- /dev/null +++ b/core/src/kvstore/mapper/disk.rs @@ -0,0 +1,215 @@ +use crate::kvstore::io_utils::{MemMap, Writer}; +use crate::kvstore::mapper::{Kind, Mapper, RwLockExt}; +use crate::kvstore::sstable::SSTable; +use crate::kvstore::Result; + +use memmap::Mmap; + +use rand::{rngs::SmallRng, seq::SliceRandom, FromEntropy, Rng}; + +use std::collections::HashMap; +use std::fs::{self, File, OpenOptions}; +use std::io::{self, BufReader, BufWriter}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +struct Id { + id: u32, + kind: Kind, +} + +#[derive(Debug)] +pub struct Disk { + rng: RwLock, + mappings: RwLock>, + storage_dirs: RwLock>, +} + +impl Disk { + pub fn single(dir: &Path) -> Self { + Disk::new(&[dir]) + } + + pub fn new>(storage_dirs: &[P]) -> Self { + if storage_dirs.is_empty() { + panic!("Disk Mapper requires at least one storage director"); + } + + let storage_dirs = storage_dirs + .iter() + .map(AsRef::as_ref) + .map(Path::to_path_buf) + .collect(); + + Disk { + storage_dirs: RwLock::new(storage_dirs), + mappings: RwLock::new(HashMap::new()), + rng: RwLock::new(SmallRng::from_entropy()), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PathInfo { + pub data: PathBuf, + pub index: PathBuf, +} + +impl Disk { + #[inline] + fn choose_storage(&self) -> PathBuf { + let mut rng = rand::thread_rng(); + let path = self + .storage_dirs + .read_as(|storage| storage.choose(&mut rng).unwrap().to_path_buf()); + if !path.exists() { + fs::create_dir_all(&path).expect("couldn't create table storage directory"); + } + + path + } + + #[inline] + fn add_mapping(&self, tref: Id, paths: PathInfo) { + let mut map = self.mappings.write().unwrap(); + map.insert(tref, paths); + } +} + +impl Mapper for Disk { + fn make_table(&self, kind: Kind, func: &mut FnMut(Writer, Writer)) -> Result { + let storage = self.choose_storage(); + + let id = next_id(kind); + let paths = mk_paths(id, &storage); + let (data, index) = mk_writers(&paths)?; + + func(data, index); + + self.add_mapping(id, paths.clone()); + + let (data, index) = mk_maps(&paths)?; + let sst = SSTable::from_parts(Arc::new(data), Arc::new(index))?; + Ok(sst) + } + + fn rotate_tables(&self) -> Result<()> { + let mut map = self.mappings.write().unwrap(); + let mut new_map = HashMap::new(); + + for (tref, paths) in map.drain() { + let new_kind = match tref.kind { + Kind::Active => Kind::Garbage, + Kind::Compaction => Kind::Active, + k => k, + }; + let new_ref = next_id(new_kind); + new_map.insert(new_ref, paths); + } + *map = new_map; + + Ok(()) + } + + fn empty_trash(&self) -> Result<()> { + self.mappings.write_as(|map| { + let to_rm = map + .keys() + .filter(|tref| tref.kind == Kind::Garbage) + .cloned() + .collect::>(); + + for tref in to_rm { + let paths = map.remove(&tref).unwrap(); + fs::remove_file(&paths.index)?; + fs::remove_file(&paths.data)?; + } + + Ok(()) + }) + } + + fn active_set(&self) -> Result> { + let map = self.mappings.read().unwrap(); + let active = map.iter().filter(|(tref, _)| tref.kind == Kind::Active); + let mut vec = Vec::new(); + + for (_, paths) in active { + let (data, index): (MemMap, MemMap) = mk_maps(paths)?; + let sst = SSTable::from_parts(Arc::new(data), Arc::new(index))?; + + vec.push(sst); + } + Ok(vec) + } + + fn serialize_state_to(&self, path: &Path) -> Result<()> { + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(path)?; + let wtr = BufWriter::new(file); + + self.mappings.read_as(|mappings| { + self.storage_dirs + .read_as(|storage| bincode::serialize_into(wtr, &(storage, mappings))) + })?; + + Ok(()) + } + + fn load_state_from(&self, path: &Path) -> Result<()> { + let rdr = BufReader::new(File::open(path)?); + let (new_storage, new_mappings) = bincode::deserialize_from(rdr)?; + + self.storage_dirs.write_as(|storage| { + self.mappings.write_as(|mappings| { + *storage = new_storage; + *mappings = new_mappings; + }) + }); + + Ok(()) + } +} + +fn mk_writers(paths: &PathInfo) -> io::Result<(Writer, Writer)> { + let mut opts = OpenOptions::new(); + opts.create(true).append(true); + + let data = BufWriter::new(opts.open(&paths.data)?); + let index = BufWriter::new(opts.open(&paths.index)?); + + Ok((Writer::Disk(data), Writer::Disk(index))) +} + +fn mk_maps(paths: &PathInfo) -> io::Result<(MemMap, MemMap)> { + let (data_file, index_file) = (File::open(&paths.data)?, File::open(&paths.index)?); + let (data, index) = unsafe { (Mmap::map(&data_file)?, Mmap::map(&index_file)?) }; + Ok((MemMap::Disk(data), MemMap::Disk(index))) +} + +fn mk_paths(tref: Id, dir: &Path) -> PathInfo { + let (data_name, index_name) = mk_filenames(tref.id); + PathInfo { + data: dir.join(data_name), + index: dir.join(index_name), + } +} + +#[inline] +fn mk_filenames(n: u32) -> (String, String) { + let data = format!("{}.sstable", n,); + let index = format!("{}.index", n,); + (data, index) +} + +#[inline] +fn next_id(kind: Kind) -> Id { + Id { + id: rand::thread_rng().gen(), + kind, + } +} diff --git a/core/src/kvstore/mapper/memory.rs b/core/src/kvstore/mapper/memory.rs new file mode 100644 index 0000000000..3db5c57b7e --- /dev/null +++ b/core/src/kvstore/mapper/memory.rs @@ -0,0 +1,144 @@ +use crate::kvstore::io_utils::{MemMap, SharedWriter, Writer}; +use crate::kvstore::mapper::{Kind, Mapper, RwLockExt}; +use crate::kvstore::sstable::SSTable; +use crate::kvstore::Result; + +use rand::{rngs::SmallRng, FromEntropy, Rng}; + +use std::collections::HashMap; +use std::path::Path; +use std::sync::{Arc, RwLock}; + +type Id = u32; +type TableMap = HashMap>>, Arc>>)>; +type Backing = Arc>; + +const BACKING_ERR_MSG: &str = "In-memory table lock poisoned; concurrency error"; + +#[derive(Debug)] +pub struct Memory { + tables: Backing, + compaction: Backing, + garbage: Backing, + meta: Arc>>, + rng: RwLock, +} + +impl Memory { + pub fn new() -> Self { + fn init_backing() -> Backing { + Arc::new(RwLock::new(HashMap::new())) + } + Memory { + tables: init_backing(), + compaction: init_backing(), + garbage: init_backing(), + meta: Arc::new(RwLock::new(vec![])), + rng: RwLock::new(SmallRng::from_entropy()), + } + } +} + +impl Memory { + #[inline] + fn get_backing(&self, kind: Kind) -> &Backing { + match kind { + Kind::Active => &self.tables, + Kind::Compaction => &self.compaction, + Kind::Garbage => &self.garbage, + } + } +} + +impl Mapper for Memory { + fn make_table(&self, kind: Kind, func: &mut FnMut(Writer, Writer)) -> Result { + let backing = self.get_backing(kind); + let id = next_id(); + + let (data, index) = backing.write_as(|tables| get_memory_writers_for(id, tables))?; + func(data, index); + + backing.read_as(|map| get_table(id, map)) + } + + fn rotate_tables(&self) -> Result<()> { + use std::mem::swap; + + let (mut active, mut compact, mut garbage) = ( + self.tables.write().expect(BACKING_ERR_MSG), + self.compaction.write().expect(BACKING_ERR_MSG), + self.garbage.write().expect(BACKING_ERR_MSG), + ); + + // compacted tables => active set + swap(&mut active, &mut compact); + // old active set => garbage + garbage.extend(compact.drain()); + + Ok(()) + } + + fn empty_trash(&self) -> Result<()> { + self.garbage.write().expect(BACKING_ERR_MSG).clear(); + + Ok(()) + } + + fn active_set(&self) -> Result> { + let active = self.tables.read().expect(BACKING_ERR_MSG); + + let mut tables = Vec::with_capacity(active.len()); + for tref in active.keys() { + let sst = get_table(*tref, &*active)?; + tables.push(sst); + } + + Ok(tables) + } + + fn serialize_state_to(&self, _: &Path) -> Result<()> { + Ok(()) + } + + fn load_state_from(&self, _: &Path) -> Result<()> { + Ok(()) + } +} + +fn get_memory_writers_for(id: Id, backing: &mut TableMap) -> Result<(Writer, Writer)> { + let data_buf = Arc::new(RwLock::new(vec![])); + let index_buf = Arc::new(RwLock::new(vec![])); + + backing.insert(id, (Arc::clone(&data_buf), Arc::clone(&index_buf))); + + let data_wtr = SharedWriter::new(data_buf); + let index_wtr = SharedWriter::new(index_buf); + + let data = Writer::Mem(data_wtr); + let index = Writer::Mem(index_wtr); + + Ok((data, index)) +} + +fn get_memmaps(id: Id, map: &TableMap) -> Result<(MemMap, MemMap)> { + let entry = map + .get(&id) + .expect("Map should always be present, given a Id that's not destroyed"); + + let data = MemMap::Mem(Arc::clone(&entry.0)); + let index = MemMap::Mem(Arc::clone(&entry.1)); + + Ok((data, index)) +} + +fn get_table(id: Id, map: &TableMap) -> Result { + let (data, index) = get_memmaps(id, map)?; + let sst = SSTable::from_parts(Arc::new(data), Arc::new(index))?; + + Ok(sst) +} + +#[inline] +fn next_id() -> Id { + rand::thread_rng().gen() +} diff --git a/core/src/kvstore/readtx.rs b/core/src/kvstore/readtx.rs new file mode 100644 index 0000000000..9d95e69458 --- /dev/null +++ b/core/src/kvstore/readtx.rs @@ -0,0 +1,33 @@ +use crate::kvstore::error::Result; +use crate::kvstore::sstable::{Key, SSTable, Value}; +use crate::kvstore::storage; + +use std::collections::BTreeMap; +use std::ops::RangeInclusive; +use std::sync::Arc; + +#[derive(Debug)] +pub struct ReadTx { + mem: Arc>, + tables: Arc<[BTreeMap]>, +} + +impl ReadTx { + pub fn new(mem: BTreeMap, tables: Vec>) -> ReadTx { + ReadTx { + mem: Arc::new(mem), + tables: Arc::from(tables.into_boxed_slice()), + } + } + + pub fn get(&self, key: &Key) -> Result>> { + storage::get(&self.mem, &*self.tables, key) + } + + pub fn range( + &self, + range: RangeInclusive, + ) -> Result)>> { + storage::range(&self.mem, &*self.tables, range) + } +} diff --git a/core/src/kvstore/sstable.rs b/core/src/kvstore/sstable.rs new file mode 100644 index 0000000000..0bcb404b56 --- /dev/null +++ b/core/src/kvstore/sstable.rs @@ -0,0 +1,476 @@ +use crate::kvstore::error::Result; +use crate::kvstore::io_utils::{MemMap, Writer}; + +use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; + +use std::borrow::Borrow; +use std::collections::{BTreeMap, HashMap}; +use std::io::{prelude::*, Cursor, Seek, SeekFrom}; +use std::ops::RangeInclusive; +use std::sync::Arc; +use std::u64; + +// ___________________________________________ +// | start_key | end_key | level | data_size | +// ------------------------------------------- +const IDX_META_SIZE: usize = KEY_LEN + KEY_LEN + 1 + 8; + +const KEY_LEN: usize = 3 * 8; +// _________________ +// | offset | size | +// ----------------- +const PTR_SIZE: usize = 2 * 8; +// __________________________________________ +// | key | timestamp | pointer OR tombstone | +// ------------------------------------------ +const INDEX_ENTRY_SIZE: usize = KEY_LEN + 8 + PTR_SIZE; +// Represented by zero offset and size +const TOMBSTONE: [u8; PTR_SIZE] = [0u8; PTR_SIZE]; + +#[derive(Clone, Debug)] +pub struct SSTable { + data: Arc, + index: Arc, + meta: IndexMeta, +} + +#[derive(Debug, PartialEq, Clone)] +pub struct IndexMeta { + pub level: u8, + pub data_size: u64, + pub start: Key, + pub end: Key, +} + +#[derive(Debug, Default, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Hash)] +pub struct Key(pub [u8; 24]); + +#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Copy, Clone)] +pub struct Index { + pub offset: u64, + pub size: u64, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Value { + pub ts: i64, + pub val: Option>, +} + +/// An iterator that produces logical view over a set of SSTables +pub struct Merged { + sources: Vec, + heads: BTreeMap<(Key, usize), Value>, + seen: HashMap, +} + +impl SSTable { + pub fn meta(&self) -> &IndexMeta { + &self.meta + } + + #[allow(dead_code)] + pub fn num_keys(&self) -> u64 { + ((self.index.len() - IDX_META_SIZE) / INDEX_ENTRY_SIZE) as u64 + } + + pub fn get(&self, key: &Key) -> Result> { + let range = *key..=*key; + let found_opt = self.range(&range)?.find(|(k, _)| k == key).map(|(_, v)| v); + Ok(found_opt) + } + + pub fn range(&self, range: &RangeInclusive) -> Result> { + Ok(Scan::new( + range.clone(), + Arc::clone(&self.data), + Arc::clone(&self.index), + )) + } + + pub fn create_capped( + rows: &mut I, + level: u8, + max_table_size: u64, + data_wtr: &mut Writer, + index_wtr: &mut Writer, + ) where + I: Iterator, + K: Borrow, + V: Borrow, + { + const DATA_ERR: &str = "Error writing table data"; + const INDEX_ERR: &str = "Error writing index data"; + + let (data_size, index) = + flush_mem_table_capped(rows, data_wtr, max_table_size).expect(DATA_ERR); + + data_wtr.flush().expect(DATA_ERR); + + let (&start, &end) = ( + index.keys().next().unwrap(), + index.keys().next_back().unwrap(), + ); + + let meta = IndexMeta { + start, + end, + level, + data_size, + }; + + flush_index(&index, &meta, index_wtr).expect(INDEX_ERR); + index_wtr.flush().expect(INDEX_ERR); + } + + pub fn create(rows: &mut I, level: u8, data_wtr: &mut Writer, index_wtr: &mut Writer) + where + I: Iterator, + K: Borrow, + V: Borrow, + { + SSTable::create_capped(rows, level, u64::MAX, data_wtr, index_wtr); + } + + pub fn from_parts(data: Arc, index: Arc) -> Result { + sst_from_parts(data, index) + } + + pub fn could_contain(&self, key: &Key) -> bool { + self.meta.start <= *key && *key <= self.meta.end + } + + pub fn is_overlap(&self, range: &RangeInclusive) -> bool { + let r = self.meta.start..=self.meta.end; + overlapping(&r, range) + } + + pub fn sorted_tables(tables: &[SSTable]) -> Vec> { + let mut sorted = Vec::new(); + + for sst in tables { + let (key, level) = { + let meta = sst.meta(); + (meta.start, meta.level) + }; + + while level as usize >= tables.len() { + sorted.push(BTreeMap::new()); + } + sorted[level as usize].insert(key, sst.clone()); + } + + sorted + } +} + +impl Key { + pub const MIN: Key = Key([0u8; KEY_LEN as usize]); + pub const MAX: Key = Key([255u8; KEY_LEN as usize]); + pub const ALL_INCLUSIVE: RangeInclusive = RangeInclusive::new(Key::MIN, Key::MAX); + + pub fn write(&self, wtr: &mut W) -> Result<()> { + wtr.write_all(&self.0)?; + Ok(()) + } + + pub fn read(bytes: &[u8]) -> Key { + let mut key = Key::default(); + key.0.copy_from_slice(bytes); + key + } +} + +struct Scan { + bounds: RangeInclusive, + data: Arc, + index: Arc, + index_pos: usize, +} + +impl Scan { + fn new(bounds: RangeInclusive, data: Arc, index: Arc) -> Self { + Scan { + bounds, + data, + index, + index_pos: IDX_META_SIZE as usize, + } + } + + fn step(&mut self) -> Result> { + while self.index_pos < self.index.len() { + let pos = self.index_pos as usize; + let end = pos + INDEX_ENTRY_SIZE; + let (key, ts, idx) = read_index_rec(&self.index[pos..end]); + + if key < *self.bounds.start() { + self.index_pos = end; + continue; + } + + if *self.bounds.end() < key { + self.index_pos = std::usize::MAX; + return Ok(None); + } + + let bytes_opt = idx.map(|ptr| get_val(&self.data, ptr).to_vec()); + + let val = Value { ts, val: bytes_opt }; + + self.index_pos = end; + + return Ok(Some((key, val))); + } + + Ok(None) + } +} + +impl From<(u64, u64, u64)> for Key { + fn from((k0, k1, k2): (u64, u64, u64)) -> Self { + let mut buf = [0u8; KEY_LEN as usize]; + + BigEndian::write_u64(&mut buf[..8], k0); + BigEndian::write_u64(&mut buf[8..16], k1); + BigEndian::write_u64(&mut buf[16..], k2); + + Key(buf) + } +} + +impl Index { + fn write(&self, wtr: &mut W) -> Result<()> { + wtr.write_u64::(self.offset)?; + wtr.write_u64::(self.size)?; + Ok(()) + } + + #[inline] + fn read(bytes: &[u8]) -> Index { + let offset = BigEndian::read_u64(&bytes[..8]); + let size = BigEndian::read_u64(&bytes[8..16]); + + Index { offset, size } + } +} + +impl IndexMeta { + fn write(&self, wtr: &mut W) -> Result<()> { + self.start.write(wtr)?; + self.end.write(wtr)?; + wtr.write_u8(self.level)?; + wtr.write_u64::(self.data_size)?; + Ok(()) + } + + fn read(data: &[u8]) -> Self { + let start = Key::read(&data[..24]); + let end = Key::read(&data[24..48]); + let level = data[48]; + let data_size = BigEndian::read_u64(&data[49..57]); + + IndexMeta { + start, + end, + level, + data_size, + } + } +} + +impl Merged +where + I: Iterator, +{ + pub fn new(mut sources: Vec) -> Self { + let mut heads = BTreeMap::new(); + + for (source_idx, source) in sources.iter_mut().enumerate() { + if let Some((k, v)) = source.next() { + heads.insert((k, source_idx), v); + } + } + + Merged { + sources, + heads, + seen: HashMap::new(), + } + } +} + +impl Iterator for Merged +where + I: Iterator, +{ + type Item = (Key, Value); + + fn next(&mut self) -> Option { + while !self.heads.is_empty() { + let (key, source_idx) = *self.heads.keys().next().unwrap(); + let val = self.heads.remove(&(key, source_idx)).unwrap(); + + // replace + if let Some((k, v)) = self.sources[source_idx].next() { + self.heads.insert((k, source_idx), v); + } + + // merge logic + // if deleted, remember + let (deleted, stale) = match self.seen.get(&key) { + Some(&seen_ts) if seen_ts < val.ts => { + // fresh val + self.seen.insert(key, val.ts); + (val.val.is_none(), false) + } + Some(_) => (val.val.is_none(), true), + None => { + self.seen.insert(key, val.ts); + (val.val.is_none(), false) + } + }; + + if !(stale || deleted) { + return Some((key, val)); + } + } + + None + } +} + +impl Iterator for Scan { + type Item = (Key, Value); + + fn next(&mut self) -> Option { + if self.index_pos as usize >= self.index.len() { + return None; + } + + match self.step() { + Ok(opt) => opt, + Err(_) => { + self.index_pos = std::usize::MAX; + None + } + } + } +} + +fn sst_from_parts(data: Arc, index: Arc) -> Result { + let len = index.len() as usize; + + assert!(len > IDX_META_SIZE); + assert_eq!((len - IDX_META_SIZE) % INDEX_ENTRY_SIZE, 0); + + let mut rdr = Cursor::new(&**index); + let mut idx_buf = [0; IDX_META_SIZE]; + rdr.read_exact(&mut idx_buf)?; + + let meta = IndexMeta::read(&idx_buf); + + Ok(SSTable { data, index, meta }) +} + +fn flush_index( + index: &BTreeMap)>, + meta: &IndexMeta, + wtr: &mut Writer, +) -> Result<()> { + meta.write(wtr)?; + + for (&key, &(ts, idx)) in index.iter() { + write_index_rec(wtr, (key, ts, idx))?; + } + + Ok(()) +} +#[allow(clippy::type_complexity)] +fn flush_mem_table_capped( + rows: &mut I, + wtr: &mut Writer, + max_table_size: u64, +) -> Result<(u64, BTreeMap)>)> +where + I: Iterator, + K: Borrow, + V: Borrow, +{ + let mut ssi = BTreeMap::new(); + let mut size = 0; + + for (key, val) in rows { + let (key, val) = (key.borrow(), val.borrow()); + let ts = val.ts; + + let (index, item_size) = match val.val { + Some(ref bytes) => (Some(write_val(wtr, bytes)?), bytes.len()), + None => (None, 0), + }; + + size += item_size as u64; + ssi.insert(*key, (ts, index)); + + if size >= max_table_size { + break; + } + } + + Ok((size, ssi)) +} + +#[inline] +fn overlapping(r1: &RangeInclusive, r2: &RangeInclusive) -> bool { + r1.start() <= r2.end() && r2.start() <= r1.end() +} + +#[inline] +fn write_val(wtr: &mut W, val: &[u8]) -> Result { + let offset = wtr.seek(SeekFrom::Current(0))?; + let size = val.len() as u64; + + wtr.write_all(val)?; + Ok(Index { offset, size }) +} + +#[inline] +fn get_val(mmap: &MemMap, idx: Index) -> &[u8] { + let row = &mmap[idx.offset as usize..(idx.offset + idx.size) as usize]; + assert_eq!(row.len(), idx.size as usize); + row +} + +#[inline] +fn write_index_rec(wtr: &mut W, (key, ts, ptr): (Key, i64, Option)) -> Result<()> { + key.write(wtr)?; + + wtr.write_i64::(ts)?; + + match ptr { + Some(idx) => idx.write(wtr)?, + None => wtr.write_all(&TOMBSTONE)?, + }; + + Ok(()) +} + +#[inline] +fn read_index_rec(bytes: &[u8]) -> (Key, i64, Option) { + assert_eq!(bytes.len(), INDEX_ENTRY_SIZE); + const TS_END: usize = KEY_LEN + 8; + + let mut key_buf = [0; KEY_LEN as usize]; + key_buf.copy_from_slice(&bytes[..KEY_LEN as usize]); + let key = Key(key_buf); + let ts = BigEndian::read_i64(&bytes[KEY_LEN..TS_END]); + + let idx_slice = &bytes[TS_END..INDEX_ENTRY_SIZE]; + let idx = if idx_slice == TOMBSTONE { + None + } else { + Some(Index::read(idx_slice)) + }; + + (key, ts, idx) +} diff --git a/core/src/kvstore/storage.rs b/core/src/kvstore/storage.rs new file mode 100644 index 0000000000..4c738f2b7d --- /dev/null +++ b/core/src/kvstore/storage.rs @@ -0,0 +1,175 @@ +use crate::kvstore::error::Result; +use crate::kvstore::mapper::{Kind, Mapper}; +use crate::kvstore::sstable::{Key, Merged, SSTable, Value}; +use crate::kvstore::writelog::WriteLog; + +use chrono::Utc; + +use std::collections::BTreeMap; + +type MemTable = BTreeMap; + +// Size of timestamp + size of key +const OVERHEAD: usize = 8 + 3 * 8; +const LOG_ERR: &str = "Write to log failed! Halting."; + +#[derive(Debug)] +pub struct WriteState { + pub commit: i64, + pub log: WriteLog, + pub values: MemTable, + pub mem_size: usize, +} + +impl WriteState { + pub fn new(log: WriteLog, values: BTreeMap) -> WriteState { + let mem_size = values.values().fold(0, |acc, elem| acc + val_mem_use(elem)); + WriteState { + commit: Utc::now().timestamp(), + log, + mem_size, + values, + } + } + + pub fn put(&mut self, key: &Key, data: &[u8]) -> Result<()> { + use std::collections::btree_map::Entry; + let ts = self.commit; + let value = Value { + ts, + val: Some(data.to_vec()), + }; + self.log.log_put(key, ts, data).expect(LOG_ERR); + + self.mem_size += val_mem_use(&value); + + match self.values.entry(*key) { + Entry::Vacant(entry) => { + entry.insert(value); + } + Entry::Occupied(mut entry) => { + let old = entry.insert(value); + self.mem_size -= val_mem_use(&old); + } + } + + Ok(()) + } + + pub fn delete(&mut self, key: &Key) -> Result<()> { + use std::collections::btree_map::Entry; + let ts = self.commit; + let value = Value { ts, val: None }; + + self.log.log_delete(key, ts).expect(LOG_ERR); + + self.mem_size += val_mem_use(&value); + + match self.values.entry(*key) { + Entry::Vacant(entry) => { + entry.insert(value); + } + Entry::Occupied(mut entry) => { + let old = entry.insert(value); + self.mem_size -= val_mem_use(&old); + } + } + + Ok(()) + } + + pub fn reset(&mut self) -> Result<()> { + self.values.clear(); + self.log.reset()?; + self.mem_size = 0; + Ok(()) + } +} + +pub fn flush_table( + mem: &MemTable, + mapper: &dyn Mapper, + pages: &mut Vec>, +) -> Result<()> { + if mem.is_empty() { + return Ok(()); + }; + + if pages.is_empty() { + pages.push(BTreeMap::new()); + } + + let mut iter = mem.iter(); + let sst = mapper.make_table(Kind::Active, &mut |mut data_wtr, mut index_wtr| { + SSTable::create(&mut iter, 0, &mut data_wtr, &mut index_wtr); + })?; + + let first = sst.meta().start; + + pages[0].insert(first, sst); + Ok(()) +} + +pub fn get(mem: &MemTable, pages: &[BTreeMap], key: &Key) -> Result>> { + if let Some(idx) = mem.get(key) { + return Ok(idx.val.clone()); + } + + let mut candidates = Vec::new(); + + for level in pages.iter() { + for (_, sst) in level.iter().rev() { + if sst.could_contain(key) { + if let Some(val) = sst.get(&key)? { + candidates.push((*key, val)); + } + } + } + } + + let merged = Merged::new(vec![candidates.into_iter()]) + .next() + .map(|(_, v)| v.val.unwrap()); + Ok(merged) +} + +pub fn range( + mem: &MemTable, + tables: &[BTreeMap], + range: std::ops::RangeInclusive, +) -> Result)>> { + let mut sources: Vec>> = Vec::new(); + + let mem = mem + .range(range.clone()) + .map(|(k, v)| (*k, v.clone())) + .collect::>(); + + let mut disk = Vec::new(); + + for level in tables.iter() { + for sst in level.values() { + let iter = sst.range(&range)?; + let iter = Box::new(iter) as Box>; + + disk.push(iter); + } + } + + sources.push(Box::new(mem.into_iter())); + sources.extend(disk); + + let rows = Merged::new(sources).map(|(k, v)| (k, v.val.unwrap())); + + Ok(rows) +} + +#[inline] +fn val_mem_use(val: &Value) -> usize { + OVERHEAD + val.val.as_ref().map(Vec::len).unwrap_or(0) +} + +// TODO: Write basic tests using mem-table +// 1. test put + delete works right +// 2. test delete of unknown key recorded +// 3. check memory usage calcs diff --git a/core/src/kvstore/writelog.rs b/core/src/kvstore/writelog.rs new file mode 100644 index 0000000000..14d1b7c7db --- /dev/null +++ b/core/src/kvstore/writelog.rs @@ -0,0 +1,105 @@ +use crate::kvstore::error::Result; +use crate::kvstore::sstable::Value; +use crate::kvstore::Key; + +use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; + +use std::collections::BTreeMap; +use std::fs::{self, File}; +use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; + +#[derive(Debug)] +pub struct WriteLog { + log_path: PathBuf, + log_writer: BufWriter, + max_batch_size: usize, +} + +impl WriteLog { + pub fn open(path: &Path, max_batch_size: usize) -> Result { + let log_writer = BufWriter::new( + fs::OpenOptions::new() + .create(true) + .append(true) + .open(path)?, + ); + let log_path = path.to_path_buf(); + + Ok(WriteLog { + log_writer, + log_path, + max_batch_size, + }) + } + + pub fn reset(&mut self) -> Result<()> { + self.log_writer.flush()?; + let file = self.log_writer.get_mut(); + file.set_len(0)?; + file.seek(SeekFrom::Start(0))?; + + Ok(()) + } + + pub fn log_put(&mut self, key: &Key, ts: i64, val: &[u8]) -> Result<()> { + let rec_len = 24 + 8 + 1 + val.len() as u64; + let mut buf = vec![0u8; rec_len as usize + 8]; + + log_to_buffer(&mut buf, rec_len, key, ts, val); + + self.log_writer.write_all(&buf)?; + Ok(()) + } + + pub fn log_delete(&mut self, key: &Key, ts: i64) -> Result<()> { + self.log_put(key, ts, &[]) + } + + // TODO: decide how to configure/schedule calling this + #[allow(dead_code)] + pub fn sync(&mut self) -> Result<()> { + self.log_writer.flush()?; + self.log_writer.get_mut().sync_all()?; + Ok(()) + } + + pub fn materialize(&self) -> Result> { + let mut table = BTreeMap::new(); + if !self.log_path.exists() { + return Ok(table); + } + + let mut rdr = BufReader::new(File::open(&self.log_path)?); + let mut buf = vec![]; + + while let Ok(rec_len) = rdr.read_u64::() { + buf.resize(rec_len as usize, 0); + rdr.read_exact(&mut buf)?; + + let key = Key::read(&buf[0..24]); + let ts = BigEndian::read_i64(&buf[24..32]); + let exists = buf[32] != 0; + + let val = if exists { + Some(buf[33..].to_vec()) + } else { + None + }; + let value = Value { ts, val }; + + table.insert(key, value); + } + + Ok(table) + } +} + +#[inline] +fn log_to_buffer(buf: &mut [u8], rec_len: u64, key: &Key, ts: i64, val: &[u8]) { + BigEndian::write_u64(&mut buf[..8], rec_len); + (&mut buf[8..32]).copy_from_slice(&key.0); + BigEndian::write_i64(&mut buf[32..40], ts); + buf[40] = (!val.is_empty()) as u8; + (&mut buf[41..]).copy_from_slice(val); +} diff --git a/core/src/kvstore/writetx.rs b/core/src/kvstore/writetx.rs new file mode 100644 index 0000000000..8bd33739a1 --- /dev/null +++ b/core/src/kvstore/writetx.rs @@ -0,0 +1,17 @@ +use crate::kvstore::error::Result; +use crate::kvstore::sstable::Key; + +#[derive(Debug)] +pub struct WriteTx<'a> { + _dummy: &'a mut (), +} + +impl<'a> WriteTx<'a> { + pub fn put(&mut self, _key: &Key, _data: &[u8]) -> Result<()> { + unimplemented!() + } + + pub fn delete(&mut self, _key: &Key) -> Result<()> { + unimplemented!() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index acc9fa72ec..f55981b381 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -39,6 +39,8 @@ pub mod fetch_stage; pub mod fullnode; pub mod gen_keys; pub mod gossip_service; +#[cfg(feature = "kvstore")] +pub mod kvstore; pub mod leader_confirmation_service; pub mod leader_schedule; pub mod leader_schedule_utils; diff --git a/core/tests/kvstore.rs b/core/tests/kvstore.rs new file mode 100644 index 0000000000..a4e1f5f293 --- /dev/null +++ b/core/tests/kvstore.rs @@ -0,0 +1,252 @@ +#![cfg(feature = "kvstore")] +use rand::{thread_rng, Rng}; + +use std::fs; +use std::path::{Path, PathBuf}; + +use solana::kvstore::{Config, Key, KvStore}; + +const KB: usize = 1024; +const HALF_KB: usize = 512; + +#[test] +fn test_put_get() { + let path = setup("test_put_get"); + + let cfg = Config { + max_mem: 64 * KB, + max_tables: 5, + page_size: 64 * KB, + ..Config::default() + }; + + let lsm = KvStore::open(&path, cfg).unwrap(); + let (key, bytes) = gen_pairs(HALF_KB).take(1).next().unwrap(); + + lsm.put(&key, &bytes).expect("put fail"); + let out_bytes = lsm.get(&key).expect("get fail").expect("missing"); + + assert_eq!(bytes, out_bytes); + + teardown(&path); +} + +#[test] +fn test_put_get_many() { + let path = setup("test_put_get_many"); + + let cfg = Config { + max_mem: 64 * KB, + max_tables: 5, + page_size: 64 * KB, + ..Config::default() + }; + let lsm = KvStore::open(&path, cfg).unwrap(); + + let mut pairs: Vec<_> = gen_pairs(HALF_KB).take(1024).collect(); + pairs.sort_unstable_by_key(|(k, _)| *k); + + lsm.put_many(pairs.clone().drain(..)) + .expect("put_many fail"); + + let retrieved: Vec<(Key, Vec)> = + lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); + + assert!(!retrieved.is_empty()); + assert_eq!(pairs.len(), retrieved.len()); + assert_eq!(pairs, retrieved); + + teardown(&path); +} + +#[test] +fn test_delete() { + let path = setup("test_delete"); + + let cfg = Config { + max_mem: 64 * KB, + max_tables: 5, + page_size: 64 * KB, + ..Config::default() + }; + let lsm = KvStore::open(&path, cfg).unwrap(); + + let mut pairs: Vec<_> = gen_pairs(HALF_KB).take(64 * 6).collect(); + pairs.sort_unstable_by_key(|(k, _)| *k); + + for (k, i) in pairs.iter() { + lsm.put(k, i).expect("put fail"); + } + + // drain iterator deletes from `pairs` + for (k, _) in pairs.drain(64..128) { + lsm.delete(&k).expect("delete fail"); + } + + let retrieved: Vec<(Key, Vec)> = + lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); + + assert!(!retrieved.is_empty()); + assert_eq!(pairs.len(), retrieved.len()); + assert_eq!(pairs, retrieved); + + teardown(&path); +} + +#[test] +fn test_delete_many() { + let path = setup("test_delete_many"); + + let cfg = Config { + max_mem: 64 * KB, + max_tables: 5, + page_size: 64 * KB, + ..Config::default() + }; + let lsm = KvStore::open(&path, cfg).unwrap(); + + let mut pairs: Vec<_> = gen_pairs(HALF_KB).take(64 * 6).collect(); + pairs.sort_unstable_by_key(|(k, _)| *k); + + for (k, i) in pairs.iter() { + lsm.put(k, i).expect("put fail"); + } + + // drain iterator deletes from `pairs` + let keys_to_delete = pairs.drain(320..384).map(|(k, _)| k); + + lsm.delete_many(keys_to_delete).expect("delete_many fail"); + + let retrieved: Vec<(Key, Vec)> = + lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); + + assert!(!retrieved.is_empty()); + assert_eq!(pairs.len(), retrieved.len()); + assert_eq!(pairs, retrieved); + + teardown(&path); +} + +#[test] +fn test_close_reopen() { + let path = setup("test_close_reopen"); + let cfg = Config::default(); + let lsm = KvStore::open(&path, cfg).unwrap(); + + let mut pairs: Vec<_> = gen_pairs(KB).take(1024).collect(); + pairs.sort_unstable_by_key(|(k, _)| *k); + + for (k, i) in pairs.iter() { + lsm.put(k, i).expect("put fail"); + } + + for (k, _) in pairs.drain(64..128) { + lsm.delete(&k).expect("delete fail"); + } + + // Drop and re-open + drop(lsm); + let lsm = KvStore::open(&path, cfg).unwrap(); + + let retrieved: Vec<(Key, Vec)> = + lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); + + assert!(!retrieved.is_empty()); + assert_eq!(pairs.len(), retrieved.len()); + assert_eq!(pairs, retrieved); + + teardown(&path); +} + +#[test] +fn test_partitioned() { + let path = setup("test_partitioned"); + + let cfg = Config { + max_mem: 64 * KB, + max_tables: 5, + page_size: 64 * KB, + ..Config::default() + }; + + let storage_dirs = (0..4) + .map(|i| path.join(format!("parition-{}", i))) + .collect::>(); + + let lsm = KvStore::partitioned(&path, &storage_dirs, cfg).unwrap(); + + let mut pairs: Vec<_> = gen_pairs(HALF_KB).take(64 * 12).collect(); + pairs.sort_unstable_by_key(|(k, _)| *k); + + lsm.put_many(pairs.iter()).expect("put_many fail"); + + // drain iterator deletes from `pairs` + let keys_to_delete = pairs.drain(320..384).map(|(k, _)| k); + + lsm.delete_many(keys_to_delete).expect("delete_many fail"); + + let retrieved: Vec<(Key, Vec)> = + lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); + + assert!(!retrieved.is_empty()); + assert_eq!(pairs.len(), retrieved.len()); + assert_eq!(pairs, retrieved); + + teardown(&path); +} + +#[test] +fn test_in_memory() { + let path = setup("test_in_memory"); + + let cfg = Config { + max_mem: 64 * KB, + max_tables: 5, + page_size: 64 * KB, + in_memory: true, + }; + let lsm = KvStore::open(&path, cfg).unwrap(); + + let mut pairs: Vec<_> = gen_pairs(HALF_KB).take(64 * 12).collect(); + pairs.sort_unstable_by_key(|(k, _)| *k); + + lsm.put_many(pairs.iter()).expect("put_many fail"); + + // drain iterator deletes from `pairs` + let keys_to_delete = pairs.drain(320..384).map(|(k, _)| k); + + lsm.delete_many(keys_to_delete).expect("delete_many fail"); + + let retrieved: Vec<(Key, Vec)> = + lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect(); + + assert!(!retrieved.is_empty()); + assert_eq!(pairs.len(), retrieved.len()); + assert_eq!(pairs, retrieved); + + teardown(&path); +} + +fn setup(test_name: &str) -> PathBuf { + let dir = Path::new("kvstore-test").join(test_name);; + + let _ig = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + + dir +} + +fn teardown(p: &Path) { + KvStore::destroy(p).expect("Expect successful store destruction"); +} + +fn gen_pairs(data_size: usize) -> impl Iterator)> { + let mut rng = thread_rng(); + + std::iter::repeat_with(move || { + let data = vec![0u8; data_size]; + let buf = rng.gen(); + + (Key(buf), data) + }) +}