Remove kvstore (#6075)

automerge
This commit is contained in:
Michael Vines 2019-09-24 19:59:32 -07:00 committed by Grimes
parent 94f0c081a6
commit 5f079137e5
21 changed files with 0 additions and 3866 deletions

16
Cargo.lock generated
View File

@ -3278,7 +3278,6 @@ dependencies = [
"solana-client 0.20.0",
"solana-drone 0.20.0",
"solana-ed25519-dalek 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-kvstore 0.20.0",
"solana-logger 0.20.0",
"solana-measure 0.20.0",
"solana-merkle-tree 0.20.0",
@ -3499,21 +3498,6 @@ dependencies = [
"tiny-bip39 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "solana-kvstore"
version = "0.20.0"
dependencies = [
"bincode 1.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)",
"crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.101 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "solana-ledger-tool"
version = "0.20.0"

View File

@ -15,7 +15,6 @@ default-members = [
"gossip",
"install",
"keygen",
"kvstore",
"ledger-tool",
"local_cluster",
"logger",
@ -76,7 +75,6 @@ members = [
"gossip",
"install",
"keygen",
"kvstore",
"ledger-tool",
"local_cluster",
"logger",

View File

@ -15,7 +15,6 @@ codecov = { repository = "solana-labs/solana", branch = "master", service = "git
[features]
cuda = []
kvstore = ["solana-kvstore"]
pin_gpu_memory = []
[dependencies]
@ -54,7 +53,6 @@ solana-chacha-sys = { path = "../chacha-sys", version = "0.20.0" }
solana-client = { path = "../client", version = "0.20.0" }
solana-drone = { path = "../drone", version = "0.20.0" }
solana-ed25519-dalek = "0.2.0"
solana-kvstore = { path = "../kvstore", version = "0.20.0", optional = true }
solana-logger = { path = "../logger", version = "0.20.0" }
solana-merkle-tree = { path = "../merkle-tree", version = "0.20.0" }
solana-metrics = { path = "../metrics", version = "0.20.0" }

View File

@ -6,14 +6,10 @@ use crate::erasure::ErasureConfig;
use crate::result::{Error, Result};
use crate::shred::{Shred, Shredder};
#[cfg(feature = "kvstore")]
use solana_kvstore as kvstore;
use bincode::deserialize;
use std::collections::HashMap;
#[cfg(not(feature = "kvstore"))]
use rocksdb;
use solana_metrics::{datapoint_error, datapoint_info};
@ -61,10 +57,7 @@ macro_rules! db_imports {
};
}
#[cfg(not(feature = "kvstore"))]
db_imports! {rocks, Rocks, "rocksdb"}
#[cfg(feature = "kvstore")]
db_imports! {kvs, Kvs, "kvstore"}
pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000;
@ -76,8 +69,6 @@ pub enum BlocktreeError {
ShredForIndexExists,
InvalidShredData(Box<bincode::ErrorKind>),
RocksDb(rocksdb::Error),
#[cfg(feature = "kvstore")]
KvsDb(kvstore::Error),
SlotNotRooted,
}

View File

@ -1,283 +0,0 @@
use crate::blocktree::db::columns as cf;
use crate::blocktree::db::{Backend, Column, DbCursor, IWriteBatch, TypedColumn};
use crate::blocktree::BlocktreeError;
use crate::result::{Error, Result};
use byteorder::{BigEndian, ByteOrder};
use solana_kvstore::{self as kvstore, Key, KvStore};
use std::path::Path;
type ColumnFamily = u64;
#[derive(Debug)]
pub struct Kvs(KvStore);
/// Dummy struct for now
#[derive(Debug, Clone, Copy)]
pub struct Dummy;
impl Backend for Kvs {
type Key = Key;
type OwnedKey = Key;
type ColumnFamily = ColumnFamily;
type Cursor = Dummy;
type Iter = Dummy;
type WriteBatch = Dummy;
type Error = kvstore::Error;
fn open(_path: &Path) -> Result<Kvs> {
unimplemented!()
}
fn columns(&self) -> Vec<&'static str> {
unimplemented!()
}
fn destroy(_path: &Path) -> Result<()> {
unimplemented!()
}
fn cf_handle(&self, _cf: &str) -> ColumnFamily {
unimplemented!()
}
fn get_cf(&self, _cf: ColumnFamily, _key: &Key) -> Result<Option<Vec<u8>>> {
unimplemented!()
}
fn put_cf(&self, _cf: ColumnFamily, _key: &Key, _value: &[u8]) -> Result<()> {
unimplemented!()
}
fn delete_cf(&self, _cf: ColumnFamily, _key: &Key) -> Result<()> {
unimplemented!()
}
fn iterator_cf(&self, _cf: ColumnFamily) -> Result<Dummy> {
unimplemented!()
}
fn raw_iterator_cf(&self, _cf: ColumnFamily) -> Result<Dummy> {
unimplemented!()
}
fn batch(&self) -> Result<Dummy> {
unimplemented!()
}
fn write(&self, _batch: Dummy) -> Result<()> {
unimplemented!()
}
}
impl Column<Kvs> for cf::Coding {
const NAME: &'static str = super::ERASURE_CF;
type Index = (u64, u64);
fn key(index: (u64, u64)) -> Key {
cf::Data::key(index)
}
fn index(key: &Key) -> (u64, u64) {
cf::Data::index(key)
}
}
impl Column<Kvs> for cf::Data {
const NAME: &'static str = super::DATA_CF;
type Index = (u64, u64);
fn key((slot, index): (u64, u64)) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
BigEndian::write_u64(&mut key.0[16..], index);
key
}
fn index(key: &Key) -> (u64, u64) {
let slot = BigEndian::read_u64(&key.0[8..16]);
let index = BigEndian::read_u64(&key.0[16..]);
(slot, index)
}
}
impl Column<Kvs> for cf::Index {
const NAME: &'static str = super::INDEX_CF;
type Index = u64;
fn key(slot: u64) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
key
}
fn index(key: &Key) -> u64 {
BigEndian::read_u64(&key.0[8..16])
}
}
impl TypedColumn<Kvs> for cf::Index {
type Type = crate::blocktree::meta::Index;
}
impl Column<Kvs> for cf::DeadSlots {
const NAME: &'static str = super::DEAD_SLOTS;
type Index = u64;
fn key(slot: u64) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
key
}
fn index(key: &Key) -> u64 {
BigEndian::read_u64(&key.0[8..16])
}
}
impl TypedColumn<Kvs> for cf::Root {
type Type = bool;
}
impl Column<Kvs> for cf::Orphans {
const NAME: &'static str = super::ORPHANS_CF;
type Index = u64;
fn key(slot: u64) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
key
}
fn index(key: &Key) -> u64 {
BigEndian::read_u64(&key.0[8..16])
}
}
impl TypedColumn<Kvs> for cf::Orphans {
type Type = bool;
}
impl Column<Kvs> for cf::Root {
const NAME: &'static str = super::ROOT_CF;
type Index = u64;
fn key(slot: u64) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
key
}
fn index(key: &Key) -> u64 {
BigEndian::read_u64(&key.0[8..16])
}
}
impl TypedColumn<Kvs> for cf::Root {
type Type = bool;
}
impl Column<Kvs> for cf::SlotMeta {
const NAME: &'static str = super::META_CF;
type Index = u64;
fn key(slot: u64) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
key
}
fn index(key: &Key) -> u64 {
BigEndian::read_u64(&key.0[8..16])
}
}
impl Column<Kvs> for cf::SlotMeta {
const NAME: &'static str = super::META_CF;
type Index = u64;
fn key(slot: u64) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
key
}
fn index(key: &Key) -> u64 {
BigEndian::read_u64(&key.0[8..16])
}
}
impl TypedColumn<Kvs> for cf::SlotMeta {
type Type = super::SlotMeta;
}
impl Column<Kvs> for cf::ErasureMeta {
const NAME: &'static str = super::ERASURE_META_CF;
type Index = (u64, u64);
fn key((slot, set_index): (u64, u64)) -> Key {
let mut key = Key::default();
BigEndian::write_u64(&mut key.0[8..16], slot);
BigEndian::write_u64(&mut key.0[16..], set_index);
key
}
fn index(key: &Key) -> (u64, u64) {
let slot = BigEndian::read_u64(&key.0[8..16]);
let set_index = BigEndian::read_u64(&key.0[16..]);
(slot, set_index)
}
}
impl TypedColumn<Kvs> for cf::ErasureMeta {
type Type = super::ErasureMeta;
}
impl DbCursor<Kvs> for Dummy {
fn valid(&self) -> bool {
unimplemented!()
}
fn seek(&mut self, _key: &Key) {
unimplemented!()
}
fn seek_to_first(&mut self) {
unimplemented!()
}
fn next(&mut self) {
unimplemented!()
}
fn key(&self) -> Option<Key> {
unimplemented!()
}
fn value(&self) -> Option<Vec<u8>> {
unimplemented!()
}
}
impl IWriteBatch<Kvs> for Dummy {
fn put_cf(&mut self, _cf: ColumnFamily, _key: &Key, _value: &[u8]) -> Result<()> {
unimplemented!()
}
fn delete_cf(&mut self, _cf: ColumnFamily, _key: &Key) -> Result<()> {
unimplemented!()
}
}
impl Iterator for Dummy {
type Item = (Box<Key>, Box<[u8]>);
fn next(&mut self) -> Option<Self::Item> {
unimplemented!()
}
}
impl std::convert::From<kvstore::Error> for Error {
fn from(e: kvstore::Error) -> Error {
Error::BlocktreeError(BlocktreeError::KvsDb(e))
}
}

2
kvstore/.gitignore vendored
View File

@ -1,2 +0,0 @@
/target/
/farf/

View File

@ -1,22 +0,0 @@
[package]
name = "solana-kvstore"
description = "Embedded Key-Value store for solana"
version = "0.20.0"
homepage = "https://solana.com/"
repository = "https://github.com/solana-labs/solana"
authors = ["Solana Maintainers <maintainers@solana.com>"]
license = "Apache-2.0"
edition = "2018"
[dependencies]
bincode = "1.1.4"
byteorder = "1.3.2"
chrono = "0.4.9"
crc = "1.8.1"
memmap = "0.7.0"
rand = "0.6.5"
serde = "1.0.101"
serde_derive = "1.0.101"
[dev-dependencies]
tempfile = "3.1.0"

View File

@ -1,170 +0,0 @@
#![feature(test)]
extern crate test;
use std::fs;
use std::path::{Path, PathBuf};
use rand::{self, Rng};
use test::Bencher;
use solana_kvstore::{test::gen, Config, Key, KvStore};
const SMALL_SIZE: usize = 512;
const LARGE_SIZE: usize = 32 * 1024;
const HUGE_SIZE: usize = 64 * 1024;
fn bench_write(bench: &mut Bencher, rows: &[(Key, Vec<u8>)], ledger_path: &str) {
let store = KvStore::open_default(&ledger_path).unwrap();
bench.iter(move || {
store.put_many(rows.iter()).expect("Failed to insert rows");
});
teardown(&ledger_path);
}
fn bench_write_partitioned(bench: &mut Bencher, rows: &[(Key, Vec<u8>)], ledger_path: &str) {
let path = Path::new(ledger_path);
let storage_dirs = (0..4)
.map(|i| path.join(format!("parition-{}", i)))
.collect::<Vec<_>>();
let store = KvStore::partitioned(&ledger_path, &storage_dirs, Config::default()).unwrap();
bench.iter(move || {
store.put_many(rows.iter()).expect("Failed to insert rows");
});
teardown(&ledger_path);
}
#[bench]
#[ignore]
fn bench_write_small(bench: &mut Bencher) {
let ledger_path = setup("bench_write_small");
let num_entries = 32 * 1024;
let rows = gen::pairs(SMALL_SIZE).take(num_entries).collect::<Vec<_>>();
bench_write(bench, &rows, &ledger_path.to_string_lossy());
}
#[bench]
#[ignore]
fn bench_write_small_partitioned(bench: &mut Bencher) {
let ledger_path = setup("bench_write_small_partitioned");
let num_entries = 32 * 1024;
let rows = gen::pairs(SMALL_SIZE).take(num_entries).collect::<Vec<_>>();
bench_write_partitioned(bench, &rows, &ledger_path.to_string_lossy());
}
#[bench]
#[ignore]
fn bench_write_large(bench: &mut Bencher) {
let ledger_path = setup("bench_write_large");
let num_entries = 32 * 1024;
let rows = gen::pairs(LARGE_SIZE).take(num_entries).collect::<Vec<_>>();
bench_write(bench, &rows, &ledger_path.to_string_lossy());
}
#[bench]
#[ignore]
fn bench_write_huge(bench: &mut Bencher) {
let ledger_path = setup("bench_write_huge");
let num_entries = 32 * 1024;
let rows = gen::pairs(HUGE_SIZE).take(num_entries).collect::<Vec<_>>();
bench_write(bench, &rows, &ledger_path.to_string_lossy());
}
#[bench]
#[ignore]
fn bench_read_sequential(bench: &mut Bencher) {
let ledger_path = setup("bench_read_sequential");
let store = KvStore::open_default(&ledger_path).unwrap();
// Insert some big and small blobs into the ledger
let num_small_blobs = 32 * 1024;
let num_large_blobs = 32 * 1024;
let total_blobs = num_small_blobs + num_large_blobs;
let small = gen::data(SMALL_SIZE).take(num_small_blobs);
let large = gen::data(LARGE_SIZE).take(num_large_blobs);
let rows = gen_seq_keys().zip(small.chain(large));
let _ = store.put_many(rows);
let num_reads = total_blobs / 15;
let mut rng = rand::thread_rng();
bench.iter(move || {
// Generate random starting point in the range [0, total_blobs - 1], read num_reads blobs sequentially
let start_index = rng.gen_range(0, num_small_blobs + num_large_blobs);
for i in start_index..start_index + num_reads {
let i = i as u64;
let k = Key::from((i, i, i));
let _ = store.get(&k);
}
});
teardown(&ledger_path);
}
#[bench]
#[ignore]
fn bench_read_random(bench: &mut Bencher) {
let ledger_path = setup("bench_read_sequential");
let store = KvStore::open_default(&ledger_path).unwrap();
// Insert some big and small blobs into the ledger
let num_small_blobs = 32 * 1024;
let num_large_blobs = 32 * 1024;
let total_blobs = num_small_blobs + num_large_blobs;
let small = gen::data(SMALL_SIZE).take(num_small_blobs);
let large = gen::data(LARGE_SIZE).take(num_large_blobs);
let rows = gen_seq_keys().zip(small.chain(large));
let _ = store.put_many(rows);
let num_reads = total_blobs / 15;
let mut rng = rand::thread_rng();
// Generate a num_reads sized random sample of indexes in range [0, total_blobs - 1],
// simulating random reads
let indexes: Vec<u64> = (0..num_reads)
.map(|_| rng.gen_range(0, total_blobs as u64))
.collect();
bench.iter(move || {
for &i in indexes.iter() {
let i = i as u64;
let k = Key::from((i, i, i));
let _ = store.get(&k);
}
});
teardown(&ledger_path);
}
fn setup(test_name: &str) -> PathBuf {
let dir = Path::new("kvstore-bench").join(test_name);
let _ig = fs::remove_dir_all(&dir);
fs::create_dir_all(&dir).unwrap();
dir
}
fn gen_seq_keys() -> impl Iterator<Item = Key> {
let mut n = 0;
std::iter::repeat_with(move || {
let key = Key::from((n, n, n));
n += 1;
key
})
}
fn teardown<P: AsRef<Path>>(p: P) {
KvStore::destroy(p).expect("Expect successful store destruction");
}

View File

@ -1,223 +0,0 @@
use crate::error::{Error, Result};
use crate::mapper::{Kind, Mapper};
use crate::sstable::{Key, Merged, SSTable};
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
type TableVec = Vec<BTreeMap<Key, SSTable>>;
type TableSlice<'a> = &'a [BTreeMap<Key, SSTable>];
#[derive(Debug, Copy, Clone)]
pub struct Config {
pub max_pages: usize,
pub page_size: usize,
}
#[derive(Debug)]
pub enum Req {
Start(PathBuf),
Gc,
}
#[derive(Debug)]
pub enum Resp {
Done(TableVec),
Failed(Error),
}
pub fn spawn_compactor(
mapper: Arc<dyn Mapper>,
config: Config,
) -> Result<(Sender<Req>, Receiver<Resp>, JoinHandle<()>)> {
let (req_tx, req_rx) = channel();
let (resp_tx, resp_rx) = channel();
let handle = thread::spawn(move || {
let _ignored = run_loop(mapper, config, req_rx, resp_tx);
});
Ok((req_tx, resp_rx, handle))
}
fn run_loop(
mapper: Arc<dyn Mapper>,
config: Config,
req_rx: Receiver<Req>,
resp_tx: Sender<Resp>,
) -> Result<()> {
while let Ok(msg) = req_rx.recv() {
match msg {
Req::Start(_) => {
let new_tables_res = run_compaction(&*mapper, &config);
match new_tables_res {
Ok(new_tables) => {
resp_tx.send(Resp::Done(new_tables))?;
}
Err(e) => {
resp_tx.send(Resp::Failed(e))?;
}
}
}
Req::Gc => {
let _ = mapper.empty_trash();
}
}
}
Ok(())
}
fn run_compaction(mapper: &dyn Mapper, config: &Config) -> Result<TableVec> {
let mut tables = load_tables(mapper)?;
compact_level_0(mapper, &mut tables, config)?;
for level in 1..tables.len() {
while level_needs_compact(level as u8, config, &tables) {
compact_upper_level(mapper, &mut tables, config, level as u8)?;
}
}
// move old tables to garbage
mapper.rotate_tables()?;
Ok(tables)
}
fn compact_level_0(mapper: &dyn Mapper, tables: &mut TableVec, config: &Config) -> Result<()> {
assert!(!tables.is_empty());
if tables.len() == 1 {
tables.push(BTreeMap::new());
}
let mut new_tables = BTreeMap::new();
{
let sources = tables
.iter()
.take(2)
.map(BTreeMap::values)
.flatten()
.map(|sst| sst.range(&(Key::ALL_INCLUSIVE)))
.collect::<Result<Vec<_>>>()?;
let mut iter = Merged::new(sources).peekable();
while iter.peek().is_some() {
let sst = mapper.make_table(Kind::Compaction, &mut |mut data_wtr, mut index_wtr| {
SSTable::create_capped(
&mut iter,
1,
config.page_size as u64,
&mut data_wtr,
&mut index_wtr,
);
})?;
new_tables.insert(sst.meta().start, sst);
}
}
tables[0].clear();
tables[1].clear();
tables[1].append(&mut new_tables);
Ok(())
}
fn compact_upper_level(
mapper: &dyn Mapper,
pages: &mut TableVec,
config: &Config,
level: u8,
) -> Result<()> {
assert!(1 <= level && (level as usize) < pages.len());
assert!(!pages[level as usize].is_empty());
let next_level = level + 1;
let level = level as usize;
if next_level as usize == pages.len() {
pages.push(BTreeMap::new());
}
let (&key, chosen_sst) = pages[level].iter().next_back().unwrap();
let (start, end) = {
let meta = chosen_sst.meta();
(meta.start, meta.end)
};
let mut page_keys = Vec::new();
let mut merge_with = Vec::new();
for (key, sst) in pages[next_level as usize].iter() {
if sst.is_overlap(&(start..=end)) {
page_keys.push(*key);
merge_with.push(sst);
}
}
let mut new_tables = BTreeMap::new();
{
let sources = merge_with
.into_iter()
.chain(std::iter::once(chosen_sst))
.map(|sst| sst.range(&(Key::ALL_INCLUSIVE)))
.collect::<Result<Vec<_>>>()?;
let mut iter = Merged::new(sources).peekable();
while iter.peek().is_some() {
let sst = mapper.make_table(Kind::Compaction, &mut |mut data_wtr, mut index_wtr| {
SSTable::create_capped(
&mut iter,
next_level,
config.page_size as u64,
&mut data_wtr,
&mut index_wtr,
);
})?;
new_tables.insert(sst.meta().start, sst);
}
}
// delete merged page and merged pages in next level
pages[level].remove(&key).unwrap();
for start_key in page_keys {
pages[next_level as usize].remove(&start_key).unwrap();
}
pages[next_level as usize].append(&mut new_tables);
Ok(())
}
fn load_tables(mapper: &dyn Mapper) -> Result<TableVec> {
Ok(SSTable::sorted_tables(&mapper.active_set()?))
}
#[inline]
fn level_max(level: u8, config: &Config) -> usize {
match level {
0 => config.max_pages,
x => 10usize.pow(u32::from(x)),
}
}
#[inline]
fn level_needs_compact(level: u8, config: &Config, tables: TableSlice) -> bool {
if level as usize >= tables.len() {
return false;
}
let max = level_max(level, config);
tables[level as usize].len() > max
}

View File

@ -1,79 +0,0 @@
use std::error::Error as StdErr;
use std::fmt;
use std::io;
use std::result::Result as StdRes;
use std::sync::mpsc::{RecvError, SendError, TryRecvError};
pub type Result<T> = StdRes<T, Error>;
#[derive(Debug)]
pub enum Error {
Io(io::Error),
Corrupted(bincode::Error),
Channel(Box<dyn StdErr + Sync + Send>),
Missing,
WriteBatchFull(usize),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Corrupted(_) => write!(f, "Serialization error: Store may be corrupted"),
Error::Channel(e) => write!(f, "Internal communication error: {}", e),
Error::Io(e) => write!(f, "I/O error: {}", e),
Error::Missing => write!(f, "Item not present in ledger"),
Error::WriteBatchFull(capacity) => write!(f, "WriteBatch capacity {} full", capacity),
}
}
}
impl StdErr for Error {
fn source(&self) -> Option<&(dyn StdErr + 'static)> {
match self {
Error::Io(e) => Some(e),
Error::Corrupted(ref e) => Some(e),
Error::Channel(e) => Some(e.as_ref()),
Error::Missing => None,
Error::WriteBatchFull(_) => None,
}
}
}
impl From<io::Error> for Error {
fn from(e: io::Error) -> Self {
Error::Io(e)
}
}
impl<W> From<io::IntoInnerError<W>> for Error {
fn from(e: io::IntoInnerError<W>) -> Self {
Error::Io(e.into())
}
}
impl From<bincode::Error> for Error {
fn from(e: bincode::Error) -> Self {
Error::Corrupted(e)
}
}
impl<T> From<SendError<T>> for Error
where
T: Send + Sync + 'static,
{
fn from(e: SendError<T>) -> Self {
Error::Channel(Box::new(e))
}
}
impl From<RecvError> for Error {
fn from(e: RecvError) -> Self {
Error::Channel(Box::new(e))
}
}
impl From<TryRecvError> for Error {
fn from(e: TryRecvError) -> Self {
Error::Channel(Box::new(e))
}
}

View File

@ -1,437 +0,0 @@
use byteorder::{BigEndian, ByteOrder};
use crc::crc32;
use memmap::Mmap;
use std::cmp;
use std::fs::File;
use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write};
use std::ops::Deref;
use std::sync::{Arc, RwLock};
const BACKING_ERR: &str = "In-memory table lock poisoned; concurrency error";
#[derive(Debug)]
pub enum MemMap {
Disk(Mmap),
Mem(Arc<RwLock<Vec<u8>>>),
}
#[derive(Debug)]
pub enum Writer {
Disk(BufWriter<File>),
Mem(SharedWriter),
}
#[derive(Debug)]
pub struct SharedWriter {
buf: Arc<RwLock<Vec<u8>>>,
pos: u64,
}
#[derive(Debug)]
pub struct CRCWriter<W: Write> {
writer: W,
buffer: Vec<u8>,
position: usize,
capacity: usize,
}
#[derive(Debug)]
pub struct CRCReader<R: Read> {
reader: R,
buffer: Vec<u8>,
position: usize,
chunk_size: usize,
}
/// Helper trait to make zeroing buffers easier
pub trait Fill<T> {
fn fill(&mut self, v: T);
}
impl SharedWriter {
pub fn new(buf: Arc<RwLock<Vec<u8>>>) -> SharedWriter {
SharedWriter { buf, pos: 0 }
}
}
impl<W: Write> CRCWriter<W> {
#[allow(dead_code)]
pub fn new(inner: W, chunk_size: usize) -> CRCWriter<W> {
if chunk_size <= 8 {
panic!("chunk_size must be > 8");
}
CRCWriter {
writer: inner,
buffer: vec![0; chunk_size],
position: 0,
capacity: chunk_size - 8,
}
}
#[allow(dead_code)]
pub fn into_inner(mut self) -> io::Result<W> {
self.flush()?;
Ok(self.writer)
}
#[allow(dead_code)]
pub fn get_ref(&self) -> &W {
&self.writer
}
#[allow(dead_code)]
pub fn get_mut(&mut self) -> &mut W {
&mut self.writer
}
}
impl<R: Read> CRCReader<R> {
#[allow(dead_code)]
pub fn new(inner: R, chunk_size: usize) -> CRCReader<R> {
if chunk_size <= 8 {
panic!("chunk_size must be > 8");
}
CRCReader {
reader: inner,
buffer: vec![0; chunk_size - 8],
position: chunk_size,
chunk_size,
}
}
#[allow(dead_code)]
pub fn into_inner(self) -> R {
self.reader
}
fn load_block(&mut self) -> io::Result<()> {
self.buffer.clear();
self.position = 0;
let mut block_buffer = vec![0; self.chunk_size];
let mut block_position = 0;
while block_position < self.chunk_size {
let bytes_read = self.reader.read(&mut block_buffer[block_position..])?;
if bytes_read == 0 {
break;
}
block_position += bytes_read
}
if block_position < self.chunk_size {
return Err(io::ErrorKind::UnexpectedEof.into());
}
assert_eq!(block_position, self.chunk_size);
let stored_digest = BigEndian::read_u32(&block_buffer[0..4]);
let payload_len = BigEndian::read_u32(&block_buffer[4..8]) as usize;
if payload_len + 8 > block_buffer.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"CRCReader: invalid block size",
));
}
let payload = &block_buffer[8..8 + payload_len];
let computed_digest = crc32::checksum_ieee(&block_buffer[4..8 + payload_len]);
if computed_digest != stored_digest {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"CRCReader: CRC validation failed",
));
}
self.buffer.extend_from_slice(payload);
Ok(())
}
}
impl<T> Fill<T> for [T]
where
T: Clone,
{
fn fill(&mut self, v: T) {
for i in self {
*i = v.clone()
}
}
}
impl Deref for MemMap {
type Target = [u8];
fn deref(&self) -> &[u8] {
match self {
MemMap::Disk(mmap) => mmap.deref(),
MemMap::Mem(vec) => {
let buf = vec.read().expect(BACKING_ERR);
let slice = buf.as_slice();
// transmute lifetime. Relying on the RwLock + immutability for safety
unsafe { std::mem::transmute(slice) }
}
}
}
}
impl<W> Write for CRCWriter<W>
where
W: Write,
{
fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
let mut written = 0;
while written < buffer.len() {
let batch_len = (&mut self.buffer[8 + self.position..]).write(&buffer[written..])?;
self.position += batch_len;
written += batch_len;
if self.position >= self.capacity {
self.flush()?;
}
}
Ok(written)
}
fn flush(&mut self) -> io::Result<()> {
BigEndian::write_u32(&mut self.buffer[4..8], self.position as u32);
let total_len = self.position + 8;
// crc over length + payload
let digest = crc32::checksum_ieee(&self.buffer[4..total_len]);
BigEndian::write_u32(&mut self.buffer[0..4], digest);
self.writer.write_all(&self.buffer)?;
self.position = 0;
Ok(())
}
}
impl<R> Read for CRCReader<R>
where
R: Read,
{
fn read(&mut self, buffer: &mut [u8]) -> io::Result<usize> {
let mut write_position = 0;
while write_position < buffer.len() {
if self.position >= self.buffer.len() {
self.load_block()?;
}
let bytes_available = self.buffer.len() - self.position;
let space_remaining = buffer.len() - write_position;
let copy_len = cmp::min(bytes_available, space_remaining);
(&mut buffer[write_position..write_position + copy_len])
.copy_from_slice(&self.buffer[self.position..self.position + copy_len]);
write_position += copy_len;
self.position += copy_len;
}
Ok(write_position)
}
}
impl Write for SharedWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut vec = self.buf.write().expect(BACKING_ERR);
// Calc ranges
let space_remaining = vec.len() - self.pos as usize;
let copy_len = cmp::min(buf.len(), space_remaining);
let copy_src_range = 0..copy_len;
let append_src_range = copy_len..buf.len();
let copy_dest_range = self.pos as usize..(self.pos as usize + copy_len);
// Copy then append
(&mut vec[copy_dest_range]).copy_from_slice(&buf[copy_src_range]);
vec.extend_from_slice(&buf[append_src_range]);
let written = buf.len();
self.pos += written as u64;
Ok(written)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
let _written = self.write(buf)?;
Ok(())
}
}
impl Seek for SharedWriter {
fn seek(&mut self, to: SeekFrom) -> io::Result<u64> {
self.pos = match to {
SeekFrom::Start(new_pos) => new_pos,
SeekFrom::Current(diff) => (self.pos as i64 + diff) as u64,
SeekFrom::End(rpos) => (self.buf.read().expect(BACKING_ERR).len() as i64 + rpos) as u64,
};
Ok(self.pos)
}
}
impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self {
Writer::Disk(ref mut wtr) => wtr.write(buf),
Writer::Mem(ref mut wtr) => wtr.write(buf),
}
}
fn flush(&mut self) -> io::Result<()> {
match self {
Writer::Disk(ref mut wtr) => {
wtr.flush()?;
wtr.get_mut().sync_data()?;
Ok(())
}
Writer::Mem(ref mut wtr) => wtr.flush(),
}
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
match self {
Writer::Disk(ref mut wtr) => wtr.write_all(buf),
Writer::Mem(ref mut wtr) => wtr.write_all(buf),
}
}
}
impl Seek for Writer {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
match self {
Writer::Disk(ref mut wtr) => wtr.seek(pos),
Writer::Mem(ref mut wtr) => wtr.seek(pos),
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_crc_write() {
let block_sizes = &[256, 512, 1024, 2048];
let byte_counts = &[8, 128, 1024, 1024 * 8];
for &block_size in block_sizes {
for &n_bytes in byte_counts {
let bytes: Vec<_> = (0..n_bytes).map(|x| (x % 255) as u8).collect();
let buffer = Vec::new();
let mut writer = CRCWriter::new(buffer, block_size);
writer.write_all(&bytes).unwrap();
let buffer = writer.into_inner().unwrap();
let space_per_block = block_size - 8;
let n_full_blocks = n_bytes / space_per_block;
let blocks_expected = n_full_blocks + (n_bytes % space_per_block != 0) as usize;
let expected_len = blocks_expected * block_size;
assert_eq!(buffer.len(), expected_len);
assert_eq!(&buffer[8..16], &[0, 1, 2, 3, 4, 5, 6, 7]);
}
}
}
#[test]
fn test_crc_io() {
const BLK_SIZE: usize = 1024;
let bytes: Vec<_> = (0..512 * 1024).map(|x| (x % 255) as u8).collect();
let buffer = Vec::new();
let mut writer = CRCWriter::new(buffer, BLK_SIZE);
writer.write_all(&bytes).unwrap();
let buffer = writer.into_inner().unwrap();
assert_eq!(&buffer[8..16], &[0, 1, 2, 3, 4, 5, 6, 7]);
let mut reader = CRCReader::new(&buffer[..], BLK_SIZE);
let mut retrieved = Vec::with_capacity(512 * 1024);
let read_buffer = &mut [0; 1024];
while let Ok(amt) = reader.read(read_buffer) {
if amt == 0 {
break;
}
retrieved.extend_from_slice(&read_buffer[..amt]);
}
assert_eq!(&retrieved[..8], &[0, 1, 2, 3, 4, 5, 6, 7]);
assert_eq!(bytes.len(), retrieved.len());
assert_eq!(bytes, retrieved);
}
#[test]
fn test_crc_validation() {
const BLK_SIZE: usize = 1024;
let n_bytes = 512 * 1024;
let bytes: Vec<_> = (0..n_bytes).map(|x| (x % 255) as u8).collect();
let buffer = Vec::new();
let mut writer = CRCWriter::new(buffer, BLK_SIZE);
writer.write_all(&bytes).unwrap();
let mut buffer = writer.into_inner().unwrap();
buffer[BLK_SIZE / 2] += 1;
let mut reader = CRCReader::new(&buffer[..], BLK_SIZE);
let mut retrieved = vec![];
let res = reader.read_to_end(&mut retrieved);
assert_eq!(res.unwrap_err().kind(), io::ErrorKind::InvalidData);
}
#[test]
fn test_crc_size_mismatch() {
const BLK_SIZE: usize = 1024;
let n_bytes = 512 * 1024;
let bytes: Vec<_> = (0..n_bytes).map(|x| (x % 255) as u8).collect();
let buffer = Vec::new();
let mut writer = CRCWriter::new(buffer, BLK_SIZE);
writer.write_all(&bytes).unwrap();
let mut buffer = writer.into_inner().unwrap();
buffer.drain((n_bytes - 512)..n_bytes);
for &size_diff in &[100, 1, 25, BLK_SIZE - 9] {
let mut reader = CRCReader::new(&buffer[..], BLK_SIZE - size_diff);
let mut retrieved = vec![];
let res = reader.read_to_end(&mut retrieved);
assert_eq!(res.unwrap_err().kind(), io::ErrorKind::InvalidData);
}
}
#[should_panic]
#[test]
fn test_crc_writer_invalid_chunk_size() {
let _ = CRCWriter::new(Vec::new(), 8);
}
#[should_panic]
#[test]
fn test_crc_reader_invalid_chunk_size() {
let _ = CRCReader::new(io::empty(), 8);
}
}

View File

@ -1,407 +0,0 @@
use crate::mapper::{Disk, Mapper, Memory};
use crate::sstable::SSTable;
use crate::storage::MemTable;
use crate::writelog::WriteLog;
use std::collections::BTreeMap;
use std::fs;
use std::io;
use std::ops::RangeInclusive;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::JoinHandle;
mod compactor;
mod error;
mod io_utils;
mod mapper;
mod readtx;
mod sstable;
mod storage;
mod writebatch;
mod writelog;
#[macro_use]
extern crate serde_derive;
pub use self::error::{Error, Result};
pub use self::readtx::ReadTx as Snapshot;
pub use self::sstable::Key;
pub use self::writebatch::{Config as WriteBatchConfig, WriteBatch};
pub use self::writelog::Config as LogConfig;
const TABLES_FILE: &str = "tables.meta";
const LOG_FILE: &str = "mem-log";
const DEFAULT_TABLE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_MEM_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_MAX_PAGES: usize = 10;
const COMMIT_ORDERING: Ordering = Ordering::Relaxed;
#[derive(Debug, PartialEq, Copy, Clone)]
pub struct Config {
pub max_mem: usize,
pub max_tables: usize,
pub page_size: usize,
pub in_memory: bool,
pub log_config: LogConfig,
}
#[derive(Debug)]
pub struct KvStore {
config: Config,
root: PathBuf,
commit: AtomicUsize,
mem: RwLock<MemTable>,
log: Arc<RwLock<WriteLog>>,
tables: RwLock<Vec<BTreeMap<Key, SSTable>>>,
mapper: Arc<dyn Mapper>,
sender: Mutex<Sender<compactor::Req>>,
receiver: Mutex<Receiver<compactor::Resp>>,
compactor_handle: JoinHandle<()>,
}
impl KvStore {
pub fn open_default<P>(root: P) -> Result<Self>
where
P: AsRef<Path>,
{
let mapper = Disk::single(root.as_ref());
open(root.as_ref(), Arc::new(mapper), Config::default())
}
pub fn open<P>(root: P, config: Config) -> Result<Self>
where
P: AsRef<Path>,
{
let mapper: Arc<dyn Mapper> = if config.in_memory {
Arc::new(Memory::new())
} else {
Arc::new(Disk::single(root.as_ref()))
};
open(root.as_ref(), mapper, config)
}
pub fn partitioned<P, P2>(root: P, storage_dirs: &[P2], config: Config) -> Result<Self>
where
P: AsRef<Path>,
P2: AsRef<Path>,
{
let mapper = Disk::new(storage_dirs);
open(root.as_ref(), Arc::new(mapper), config)
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn put(&self, key: &Key, data: &[u8]) -> Result<()> {
let mut memtable = self.mem.write().unwrap();
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
log.log_put(key, commit, data).unwrap();
memtable.put(key, commit, data);
self.ensure_memtable(&mut *memtable, &mut *log)?;
Ok(())
}
pub fn put_many<Iter, Tup, K, V>(&self, rows: Iter) -> Result<()>
where
Iter: Iterator<Item = Tup>,
Tup: std::borrow::Borrow<(K, V)>,
K: std::borrow::Borrow<Key>,
V: std::borrow::Borrow<[u8]>,
{
let mut memtable = self.mem.write().unwrap();
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
for pair in rows {
let (ref k, ref d) = pair.borrow();
let (key, data) = (k.borrow(), d.borrow());
log.log_put(key, commit, data).unwrap();
memtable.put(key, commit, data);
}
self.ensure_memtable(&mut *memtable, &mut *log)?;
Ok(())
}
pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
self.query_compactor()?;
let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap());
storage::get(&memtable.values, &*tables, key)
}
pub fn delete(&self, key: &Key) -> Result<()> {
let mut memtable = self.mem.write().unwrap();
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
log.log_delete(key, commit).unwrap();
memtable.delete(key, commit);
self.ensure_memtable(&mut *memtable, &mut *log)?;
Ok(())
}
pub fn delete_many<Iter, K>(&self, rows: Iter) -> Result<()>
where
Iter: Iterator<Item = K>,
K: std::borrow::Borrow<Key>,
{
let mut memtable = self.mem.write().unwrap();
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
for k in rows {
let key = k.borrow();
log.log_delete(key, commit).unwrap();
memtable.delete(key, commit);
}
self.ensure_memtable(&mut *memtable, &mut *log)?;
Ok(())
}
pub fn batch(&self, config: WriteBatchConfig) -> WriteBatch {
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
WriteBatch {
config,
commit,
memtable: MemTable::new(BTreeMap::new()),
log: Arc::clone(&self.log),
}
}
pub fn commit(&self, mut batch: WriteBatch) -> Result<()> {
let mut memtable = self.mem.write().unwrap();
let mut log = self.log.write().unwrap();
memtable.values.append(&mut batch.memtable.values);
self.ensure_memtable(&mut *memtable, &mut *log)?;
Ok(())
}
pub fn snapshot(&self) -> Snapshot {
let (memtable, tables) = (
self.mem.read().unwrap().values.clone(),
self.tables.read().unwrap().clone(),
);
Snapshot::new(memtable, tables)
}
pub fn range(
&self,
range: RangeInclusive<Key>,
) -> Result<impl Iterator<Item = (Key, Vec<u8>)>> {
self.query_compactor()?;
let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap());
storage::range(&memtable.values, &*tables, range)
}
pub fn destroy<P>(path: P) -> Result<()>
where
P: AsRef<Path>,
{
let path = path.as_ref();
if !path.exists() {
return Ok(());
}
fs::remove_dir_all(path)?;
Ok(())
}
fn query_compactor(&self) -> Result<()> {
if let (Ok(mut sender), Ok(mut receiver), Ok(mut tables)) = (
self.sender.try_lock(),
self.receiver.try_lock(),
self.tables.try_write(),
) {
query_compactor(
&self.root,
&*self.mapper,
&mut *tables,
&mut *receiver,
&mut *sender,
)?;
}
Ok(())
}
fn ensure_memtable(&self, mem: &mut MemTable, log: &mut WriteLog) -> Result<()> {
if mem.mem_size < self.config.max_mem {
return Ok(());
}
let mut tables = self.tables.write().unwrap();
storage::flush_table(&mem.values, &*self.mapper, &mut *tables)?;
mem.values.clear();
mem.mem_size = 0;
log.reset().expect("Write-log rotation failed");
if is_lvl0_full(&tables, &self.config) {
let sender = self.sender.lock().unwrap();
sender.send(compactor::Req::Start(PathBuf::new()))?;
}
Ok(())
}
}
impl Default for Config {
fn default() -> Config {
Config {
max_mem: DEFAULT_MEM_SIZE,
max_tables: DEFAULT_MAX_PAGES,
page_size: DEFAULT_TABLE_SIZE,
in_memory: false,
log_config: LogConfig::default(),
}
}
}
fn open(root: &Path, mapper: Arc<dyn Mapper>, config: Config) -> Result<KvStore> {
let root = root.to_path_buf();
let log_path = root.join(LOG_FILE);
let restore_log = log_path.exists();
if !root.exists() {
fs::create_dir(&root)?;
}
let commit = chrono::Utc::now().timestamp();
let mut log = WriteLog::open(&log_path, config.log_config)?;
let values = if restore_log && !config.in_memory {
log.materialize()?
} else {
BTreeMap::new()
};
let mem = MemTable::new(values);
let tables = load_tables(&root, &*mapper)?;
let cfg = compactor::Config {
max_pages: config.max_tables,
page_size: config.page_size,
};
let (sender, receiver, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Ok(KvStore {
config,
root,
commit: AtomicUsize::new(commit as usize),
mem: RwLock::new(mem),
log: Arc::new(RwLock::new(log)),
tables: RwLock::new(tables),
mapper,
sender: Mutex::new(sender),
receiver: Mutex::new(receiver),
compactor_handle,
})
}
fn load_tables(root: &Path, mapper: &dyn Mapper) -> Result<Vec<BTreeMap<Key, SSTable>>> {
let mut tables = Vec::new();
let meta_path = root.join(TABLES_FILE);
if meta_path.exists() {
mapper.load_state_from(&meta_path)?;
tables = SSTable::sorted_tables(&mapper.active_set()?);
}
Ok(tables)
}
fn dump_tables(root: &Path, mapper: &dyn Mapper) -> Result<()> {
mapper.serialize_state_to(&root.join(TABLES_FILE))?;
Ok(())
}
fn query_compactor(
root: &Path,
mapper: &dyn Mapper,
tables: &mut Vec<BTreeMap<Key, SSTable>>,
receiver: &mut Receiver<compactor::Resp>,
sender: &mut Sender<compactor::Req>,
) -> Result<()> {
match receiver.try_recv() {
Ok(compactor::Resp::Done(new_tables)) => {
std::mem::replace(tables, new_tables);
dump_tables(root, mapper)?;
sender.send(compactor::Req::Gc).unwrap();
}
Ok(compactor::Resp::Failed(e)) => {
return Err(e);
}
// Nothing available, do nothing
_ => {}
}
Ok(())
}
#[inline]
fn is_lvl0_full(tables: &[BTreeMap<Key, SSTable>], config: &Config) -> bool {
if tables.is_empty() {
false
} else {
tables[0].len() > config.max_tables
}
}
pub mod test {
pub mod gen {
use crate::Key;
use rand::distributions::Uniform;
use rand::{rngs::SmallRng, FromEntropy, Rng};
use std::iter;
use std::ops::Range;
pub fn keys() -> impl Iterator<Item = Key> {
let mut rng = SmallRng::from_entropy();
iter::repeat_with(move || Key(rng.gen()))
}
pub fn data(size: usize) -> impl Iterator<Item = Vec<u8>> {
iter::repeat(vec![0; size])
}
pub fn data_vary(range: Range<u64>) -> impl Iterator<Item = Vec<u8>> {
let dist = Uniform::from(range);
let mut rng = SmallRng::from_entropy();
iter::repeat_with(move || {
let size: u64 = rng.sample(dist);
vec![0; size as usize]
})
}
pub fn pairs(size: usize) -> impl Iterator<Item = (Key, Vec<u8>)> {
keys().zip(data(size))
}
pub fn pairs_vary(range: Range<u64>) -> impl Iterator<Item = (Key, Vec<u8>)> {
keys().zip(data_vary(range))
}
}
}

View File

@ -1,50 +0,0 @@
use crate::io_utils::Writer;
use crate::sstable::SSTable;
use crate::Result;
use std::path::Path;
use std::sync::RwLock;
mod disk;
mod memory;
pub use self::disk::Disk;
pub use self::memory::Memory;
pub trait Mapper: std::fmt::Debug + Send + Sync {
fn make_table(&self, kind: Kind, func: &mut dyn FnMut(Writer, Writer)) -> Result<SSTable>;
fn rotate_tables(&self) -> Result<()>;
fn empty_trash(&self) -> Result<()>;
fn active_set(&self) -> Result<Vec<SSTable>>;
fn serialize_state_to(&self, path: &Path) -> Result<()>;
fn load_state_from(&self, path: &Path) -> Result<()>;
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Deserialize, Serialize)]
pub enum Kind {
Active,
Compaction,
Garbage,
}
pub trait RwLockExt<T> {
fn read_as<U, F: FnOnce(&T) -> U>(&self, f: F) -> U;
fn write_as<U, F: FnOnce(&mut T) -> U>(&self, f: F) -> U;
fn try_read_as<U, F: FnOnce(&T) -> U>(&self, f: F) -> U;
fn try_write_as<U, F: FnOnce(&mut T) -> U>(&self, f: F) -> U;
}
impl<T> RwLockExt<T> for RwLock<T> {
fn read_as<U, F: FnOnce(&T) -> U>(&self, f: F) -> U {
f(&*self.read().unwrap())
}
fn write_as<U, F: FnOnce(&mut T) -> U>(&self, f: F) -> U {
f(&mut *self.write().unwrap())
}
fn try_read_as<U, F: FnOnce(&T) -> U>(&self, f: F) -> U {
f(&*self.try_read().unwrap())
}
fn try_write_as<U, F: FnOnce(&mut T) -> U>(&self, f: F) -> U {
f(&mut *self.try_write().unwrap())
}
}

View File

@ -1,336 +0,0 @@
use crate::io_utils::{MemMap, Writer};
use crate::mapper::{Kind, Mapper, RwLockExt};
use crate::sstable::SSTable;
use crate::Result;
use memmap::Mmap;
use rand::{rngs::SmallRng, seq::SliceRandom, FromEntropy, Rng};
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::{self, BufReader, BufWriter};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
struct Id {
id: u32,
kind: Kind,
}
#[derive(Debug)]
pub struct Disk {
rng: RwLock<SmallRng>,
mappings: RwLock<HashMap<Id, PathInfo>>,
storage_dirs: RwLock<Vec<PathBuf>>,
}
impl Disk {
pub fn single(dir: &Path) -> Self {
Disk::new(&[dir])
}
pub fn new<P: AsRef<Path>>(storage_dirs: &[P]) -> Self {
if storage_dirs.is_empty() {
panic!("Disk Mapper requires at least one storage director");
}
let storage_dirs = storage_dirs
.iter()
.map(AsRef::as_ref)
.map(Path::to_path_buf)
.collect();
Disk {
storage_dirs: RwLock::new(storage_dirs),
mappings: RwLock::new(HashMap::new()),
rng: RwLock::new(SmallRng::from_entropy()),
}
}
}
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct PathInfo {
pub data: PathBuf,
pub index: PathBuf,
}
impl Disk {
#[inline]
fn choose_storage(&self) -> PathBuf {
let mut rng = rand::thread_rng();
let path = self
.storage_dirs
.read_as(|storage| storage.choose(&mut rng).unwrap().to_path_buf());
if !path.exists() {
fs::create_dir_all(&path).expect("couldn't create table storage directory");
}
path
}
#[inline]
fn add_mapping(&self, tref: Id, paths: PathInfo) {
let mut map = self.mappings.write().unwrap();
map.insert(tref, paths);
}
}
impl Mapper for Disk {
fn make_table(&self, kind: Kind, func: &mut dyn FnMut(Writer, Writer)) -> Result<SSTable> {
let storage = self.choose_storage();
let id = next_id(kind);
let paths = mk_paths(id, &storage);
let (data, index) = mk_writers(&paths)?;
func(data, index);
self.add_mapping(id, paths.clone());
let (data, index) = mk_maps(&paths)?;
let sst = SSTable::from_parts(Arc::new(data), Arc::new(index))?;
Ok(sst)
}
fn rotate_tables(&self) -> Result<()> {
let mut map = self.mappings.write().unwrap();
let mut new_map = HashMap::new();
for (tref, paths) in map.drain() {
let new_kind = match tref.kind {
Kind::Active => Kind::Garbage,
Kind::Compaction => Kind::Active,
k => k,
};
let new_ref = next_id(new_kind);
new_map.insert(new_ref, paths);
}
*map = new_map;
Ok(())
}
fn empty_trash(&self) -> Result<()> {
self.mappings.write_as(|map| {
let to_rm = map
.keys()
.filter(|tref| tref.kind == Kind::Garbage)
.cloned()
.collect::<Vec<_>>();
for tref in to_rm {
let paths = map.remove(&tref).unwrap();
fs::remove_file(&paths.index)?;
fs::remove_file(&paths.data)?;
}
Ok(())
})
}
fn active_set(&self) -> Result<Vec<SSTable>> {
let map = self.mappings.read().unwrap();
let active = map.iter().filter(|(tref, _)| tref.kind == Kind::Active);
let mut vec = Vec::new();
for (_, paths) in active {
let (data, index): (MemMap, MemMap) = mk_maps(paths)?;
let sst = SSTable::from_parts(Arc::new(data), Arc::new(index))?;
vec.push(sst);
}
Ok(vec)
}
fn serialize_state_to(&self, path: &Path) -> Result<()> {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)?;
let wtr = BufWriter::new(file);
self.mappings.read_as(|mappings| {
self.storage_dirs
.read_as(|storage| bincode::serialize_into(wtr, &(storage, mappings)))
})?;
Ok(())
}
fn load_state_from(&self, path: &Path) -> Result<()> {
let rdr = BufReader::new(File::open(path)?);
let (new_storage, new_mappings) = bincode::deserialize_from(rdr)?;
self.storage_dirs.write_as(|storage| {
self.mappings.write_as(|mappings| {
*storage = new_storage;
*mappings = new_mappings;
})
});
Ok(())
}
}
fn mk_writers(paths: &PathInfo) -> io::Result<(Writer, Writer)> {
let mut opts = OpenOptions::new();
opts.create(true).append(true);
let data = BufWriter::new(opts.open(&paths.data)?);
let index = BufWriter::new(opts.open(&paths.index)?);
Ok((Writer::Disk(data), Writer::Disk(index)))
}
fn mk_maps(paths: &PathInfo) -> io::Result<(MemMap, MemMap)> {
let (data_file, index_file) = (File::open(&paths.data)?, File::open(&paths.index)?);
let (data, index) = unsafe { (Mmap::map(&data_file)?, Mmap::map(&index_file)?) };
Ok((MemMap::Disk(data), MemMap::Disk(index)))
}
fn mk_paths(tref: Id, dir: &Path) -> PathInfo {
let (data_name, index_name) = mk_filenames(tref.id);
PathInfo {
data: dir.join(data_name),
index: dir.join(index_name),
}
}
#[inline]
fn mk_filenames(n: u32) -> (String, String) {
let data = format!("{}.sstable", n,);
let index = format!("{}.index", n,);
(data, index)
}
#[inline]
fn next_id(kind: Kind) -> Id {
Id {
id: rand::thread_rng().gen(),
kind,
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::mapper::Kind;
use crate::sstable::{Key, Value};
use crate::test::gen;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::thread;
use tempfile::tempdir;
const DATA_SIZE: usize = 128;
#[test]
fn test_table_management() {
let tempdir = tempdir().unwrap();
let mapper = Arc::new(Disk::single(tempdir.path()));
let records: BTreeMap<_, _> = gen_records().take(1024).collect();
let mut threads = vec![];
let mut number_of_tables = 4;
for kind in [Kind::Active, Kind::Garbage, Kind::Compaction].iter() {
let records = records.clone();
let mapper = Arc::clone(&mapper);
let child = thread::spawn(move || {
for _ in 0..number_of_tables {
mapper
.make_table(*kind, &mut |mut data_writer, mut index_writer| {
SSTable::create(
&mut records.iter(),
0,
&mut data_writer,
&mut index_writer,
);
})
.unwrap();
}
});
number_of_tables *= 2;
threads.push(child);
}
threads.into_iter().for_each(|child| child.join().unwrap());
let count_kind = |kind, mapper: &Disk| {
mapper
.mappings
.read()
.unwrap()
.keys()
.filter(|id| id.kind == kind)
.count()
};
assert_eq!(count_kind(Kind::Active, &mapper), 4);
assert_eq!(count_kind(Kind::Garbage, &mapper), 8);
assert_eq!(count_kind(Kind::Compaction, &mapper), 16);
mapper.empty_trash().unwrap();
assert_eq!(count_kind(Kind::Garbage, &mapper), 0);
mapper.rotate_tables().unwrap();
assert_eq!(count_kind(Kind::Active, &mapper), 16);
assert_eq!(count_kind(Kind::Garbage, &mapper), 4);
assert_eq!(count_kind(Kind::Compaction, &mapper), 0);
let active_set = mapper.active_set().unwrap();
assert_eq!(active_set.len(), 16);
}
#[test]
fn test_state() {
let tempdir = tempdir().unwrap();
let dirs_1: Vec<_> = (0..4).map(|i| tempdir.path().join(i.to_string())).collect();
let dirs_2: Vec<_> = (4..8).map(|i| tempdir.path().join(i.to_string())).collect();
let mapper_1 = Arc::new(Disk::new(&dirs_1));
let records: BTreeMap<_, _> = gen_records().take(1024).collect();
for (i, &kind) in [Kind::Active, Kind::Compaction, Kind::Garbage]
.iter()
.enumerate()
{
for _ in 0..(i * 3) {
mapper_1
.make_table(kind, &mut |mut data_writer, mut index_writer| {
SSTable::create(
&mut records.iter(),
0,
&mut data_writer,
&mut index_writer,
);
})
.unwrap();
}
}
let state_path = tempdir.path().join("state");
mapper_1.serialize_state_to(&state_path).unwrap();
assert!(state_path.exists());
let mapper_2 = Arc::new(Disk::new(&dirs_2));
mapper_2.load_state_from(&state_path).unwrap();
assert_eq!(
&*mapper_1.mappings.read().unwrap(),
&*mapper_2.mappings.read().unwrap()
);
assert_eq!(
&*mapper_1.storage_dirs.read().unwrap(),
&*mapper_2.storage_dirs.read().unwrap()
);
}
fn gen_records() -> impl Iterator<Item = (Key, Value)> {
gen::pairs(DATA_SIZE).map(|(key, data)| (key, Value::new(0, Some(data))))
}
}

View File

@ -1,226 +0,0 @@
use crate::io_utils::{MemMap, SharedWriter, Writer};
use crate::mapper::{Kind, Mapper, RwLockExt};
use crate::sstable::SSTable;
use crate::Result;
use rand::{rngs::SmallRng, FromEntropy, Rng};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, RwLock};
type Id = u32;
type TableMap = HashMap<Id, (Arc<RwLock<Vec<u8>>>, Arc<RwLock<Vec<u8>>>)>;
type Backing = Arc<RwLock<TableMap>>;
const BACKING_ERR_MSG: &str = "In-memory table lock poisoned; concurrency error";
#[derive(Debug)]
pub struct Memory {
tables: Backing,
compaction: Backing,
garbage: Backing,
meta: Arc<RwLock<Vec<u8>>>,
rng: RwLock<SmallRng>,
}
impl Memory {
pub fn new() -> Self {
fn init_backing() -> Backing {
Arc::new(RwLock::new(HashMap::new()))
}
Memory {
tables: init_backing(),
compaction: init_backing(),
garbage: init_backing(),
meta: Arc::new(RwLock::new(vec![])),
rng: RwLock::new(SmallRng::from_entropy()),
}
}
}
impl Memory {
#[inline]
fn get_backing(&self, kind: Kind) -> &Backing {
match kind {
Kind::Active => &self.tables,
Kind::Compaction => &self.compaction,
Kind::Garbage => &self.garbage,
}
}
}
impl Mapper for Memory {
fn make_table(&self, kind: Kind, func: &mut dyn FnMut(Writer, Writer)) -> Result<SSTable> {
let backing = self.get_backing(kind);
let id = next_id();
let (data, index) = backing.write_as(|tables| get_memory_writers_for(id, tables))?;
func(data, index);
backing.read_as(|map| get_table(id, map))
}
fn rotate_tables(&self) -> Result<()> {
let (mut active, mut compaction, mut garbage) = (
self.tables.write().expect(BACKING_ERR_MSG),
self.compaction.write().expect(BACKING_ERR_MSG),
self.garbage.write().expect(BACKING_ERR_MSG),
);
// old active set => garbage
garbage.extend(active.drain());
// compacted tables => new active set
active.extend(compaction.drain());
Ok(())
}
fn empty_trash(&self) -> Result<()> {
self.garbage.write().expect(BACKING_ERR_MSG).clear();
Ok(())
}
fn active_set(&self) -> Result<Vec<SSTable>> {
let active = self.tables.read().expect(BACKING_ERR_MSG);
let mut tables = Vec::with_capacity(active.len());
for tref in active.keys() {
let sst = get_table(*tref, &*active)?;
tables.push(sst);
}
Ok(tables)
}
fn serialize_state_to(&self, _: &Path) -> Result<()> {
Ok(())
}
fn load_state_from(&self, _: &Path) -> Result<()> {
Ok(())
}
}
fn get_memory_writers_for(id: Id, backing: &mut TableMap) -> Result<(Writer, Writer)> {
let data_buf = Arc::new(RwLock::new(vec![]));
let index_buf = Arc::new(RwLock::new(vec![]));
backing.insert(id, (Arc::clone(&data_buf), Arc::clone(&index_buf)));
let data_wtr = SharedWriter::new(data_buf);
let index_wtr = SharedWriter::new(index_buf);
let data = Writer::Mem(data_wtr);
let index = Writer::Mem(index_wtr);
Ok((data, index))
}
fn get_memmaps(id: Id, map: &TableMap) -> Result<(MemMap, MemMap)> {
let entry = map
.get(&id)
.expect("Map should always be present, given a Id that's not destroyed");
let data = MemMap::Mem(Arc::clone(&entry.0));
let index = MemMap::Mem(Arc::clone(&entry.1));
Ok((data, index))
}
fn get_table(id: Id, map: &TableMap) -> Result<SSTable> {
let (data, index) = get_memmaps(id, map)?;
let sst = SSTable::from_parts(Arc::new(data), Arc::new(index))?;
Ok(sst)
}
#[inline]
fn next_id() -> Id {
rand::thread_rng().gen()
}
#[cfg(test)]
mod test {
use super::*;
use crate::mapper::Kind;
use crate::sstable::{Key, Value};
use crate::test::gen;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::thread;
const DATA_SIZE: usize = 128;
#[test]
fn test_table_management() {
let mapper = Arc::new(Memory::new());
let records: BTreeMap<_, _> = gen_records().take(1024).collect();
let mut threads = vec![];
let mut number_of_tables = 4;
for kind in [Kind::Active, Kind::Garbage, Kind::Compaction].iter() {
let records = records.clone();
let mapper = Arc::clone(&mapper);
let child = thread::spawn(move || {
for _ in 0..number_of_tables {
mapper
.make_table(*kind, &mut |mut data_writer, mut index_writer| {
SSTable::create(
&mut records.iter(),
0,
&mut data_writer,
&mut index_writer,
);
})
.unwrap();
}
});
number_of_tables *= 2;
threads.push(child);
}
threads.into_iter().for_each(|child| child.join().unwrap());
assert_eq!(mapper.tables.read().unwrap().len(), 4);
assert_eq!(mapper.garbage.read().unwrap().len(), 8);
assert_eq!(mapper.compaction.read().unwrap().len(), 16);
mapper.empty_trash().unwrap();
assert_eq!(mapper.garbage.read().unwrap().len(), 0);
mapper.rotate_tables().unwrap();
assert_eq!(mapper.tables.read().unwrap().len(), 16);
assert_eq!(mapper.garbage.read().unwrap().len(), 4);
assert!(mapper.compaction.read().unwrap().is_empty());
let active_set = mapper.active_set().unwrap();
assert_eq!(active_set.len(), 16);
}
#[test]
fn test_no_state() {
let tempdir = tempfile::tempdir().unwrap();
let mapper = Arc::new(Memory::new());
let records: BTreeMap<_, _> = gen_records().take(1024).collect();
mapper
.make_table(Kind::Active, &mut |mut data_writer, mut index_writer| {
SSTable::create(&mut records.iter(), 0, &mut data_writer, &mut index_writer);
})
.unwrap();
let state_path = tempdir.path().join("state");
mapper.serialize_state_to(&state_path).unwrap();
mapper.load_state_from(&state_path).unwrap();
assert!(!state_path.exists());
}
fn gen_records() -> impl Iterator<Item = (Key, Value)> {
gen::pairs(DATA_SIZE).map(|(key, data)| (key, Value::new(0, Some(data))))
}
}

View File

@ -1,33 +0,0 @@
use crate::error::Result;
use crate::sstable::{Key, SSTable, Value};
use crate::storage;
use std::collections::BTreeMap;
use std::ops::RangeInclusive;
use std::sync::Arc;
#[derive(Debug)]
pub struct ReadTx {
mem: Arc<BTreeMap<Key, Value>>,
tables: Arc<[BTreeMap<Key, SSTable>]>,
}
impl ReadTx {
pub fn new(mem: BTreeMap<Key, Value>, tables: Vec<BTreeMap<Key, SSTable>>) -> ReadTx {
ReadTx {
mem: Arc::new(mem),
tables: Arc::from(tables.into_boxed_slice()),
}
}
pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
storage::get(&self.mem, &*self.tables, key)
}
pub fn range(
&self,
range: RangeInclusive<Key>,
) -> Result<impl Iterator<Item = (Key, Vec<u8>)>> {
storage::range(&self.mem, &*self.tables, range)
}
}

View File

@ -1,564 +0,0 @@
use crate::error::Result;
use crate::io_utils::{Fill, MemMap};
use byteorder::{BigEndian, ByteOrder};
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt;
use std::io::prelude::*;
use std::mem;
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::u64;
const INDEX_META_SIZE: usize = mem::size_of::<IndexMeta>();
const KEY_LEN: usize = mem::size_of::<Key>();
const INDEX_ENTRY_SIZE: usize = mem::size_of::<IndexEntry>();
const INDEX_RECORD_SIZE: usize = KEY_LEN + INDEX_ENTRY_SIZE;
#[derive(Clone, Debug)]
pub struct SSTable {
data: Arc<MemMap>,
index: Arc<MemMap>,
meta: IndexMeta,
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct IndexMeta {
pub level: u8,
pub data_size: u64,
pub start: Key,
pub end: Key,
}
#[derive(
Debug, Default, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Hash, Serialize, Deserialize,
)]
pub struct Key(pub [u8; 24]);
#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)]
pub struct IndexEntry {
pub timestamp: i64,
pub offset: u64,
pub size: u64,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct Value {
pub ts: i64,
pub val: Option<Vec<u8>>,
}
/// An iterator that produces logical view over a set of SSTables.
/// It implements [direct k-way merge](https://en.wikipedia.org/wiki/K-way_merge_algorithm#Heap)
/// and reconciles out-of-date/deleted values in a lazy fashion. Inputs *MUST* be sorted
pub struct Merged<I> {
sources: Vec<I>,
heads: BTreeMap<(Key, usize), Value>,
}
impl SSTable {
pub fn meta(&self) -> &IndexMeta {
&self.meta
}
#[allow(dead_code)]
pub fn num_keys(&self) -> u64 {
((self.index.len() - INDEX_META_SIZE) / INDEX_ENTRY_SIZE) as u64
}
pub fn get(&self, key: &Key) -> Result<Option<Value>> {
let range = *key..=*key;
let found_opt = self.range(&range)?.find(|(k, _)| k == key).map(|(_, v)| v);
Ok(found_opt)
}
pub fn range(&self, range: &RangeInclusive<Key>) -> Result<impl Iterator<Item = (Key, Value)>> {
Ok(Scan::new(
range.clone(),
Arc::clone(&self.data),
Arc::clone(&self.index),
))
}
pub fn create_capped<I, K, V>(
rows: &mut I,
level: u8,
max_table_size: u64,
data_wtr: &mut dyn Write,
index_wtr: &mut dyn Write,
) where
I: Iterator<Item = (K, V)>,
K: Borrow<Key>,
V: Borrow<Value>,
{
const DATA_ERR: &str = "Error writing table data";
const INDEX_ERR: &str = "Error writing index data";
let (data_size, index) =
flush_mem_table_capped(rows, data_wtr, max_table_size).expect(DATA_ERR);
data_wtr.flush().expect(DATA_ERR);
let (&start, &end) = (
index.keys().next().unwrap(),
index.keys().next_back().unwrap(),
);
let meta = IndexMeta {
start,
end,
level,
data_size,
};
flush_index(&index, &meta, index_wtr).expect(INDEX_ERR);
index_wtr.flush().expect(INDEX_ERR);
}
pub fn create<I, K, V>(
rows: &mut I,
level: u8,
data_wtr: &mut dyn Write,
index_wtr: &mut dyn Write,
) where
I: Iterator<Item = (K, V)>,
K: Borrow<Key>,
V: Borrow<Value>,
{
SSTable::create_capped(rows, level, u64::MAX, data_wtr, index_wtr);
}
pub fn from_parts(data: Arc<MemMap>, index: Arc<MemMap>) -> Result<Self> {
let len = index.len() as usize;
assert!(len > INDEX_META_SIZE);
assert_eq!((len - INDEX_META_SIZE) % INDEX_RECORD_SIZE, 0);
let meta = bincode::deserialize_from(&index[..INDEX_META_SIZE])?;
Ok(SSTable { data, index, meta })
}
pub fn could_contain(&self, key: &Key) -> bool {
self.meta.start <= *key && *key <= self.meta.end
}
pub fn is_overlap(&self, range: &RangeInclusive<Key>) -> bool {
let r = self.meta.start..=self.meta.end;
overlapping(&r, range)
}
pub fn sorted_tables(tables: &[SSTable]) -> Vec<BTreeMap<Key, SSTable>> {
let mut sorted = Vec::new();
for sst in tables {
let (key, level) = {
let meta = sst.meta();
(meta.start, meta.level)
};
while level as usize >= sorted.len() {
sorted.push(BTreeMap::new());
}
sorted[level as usize].insert(key, sst.clone());
}
sorted
}
}
impl Key {
pub const MIN: Key = Key([0u8; KEY_LEN]);
pub const MAX: Key = Key([255u8; KEY_LEN]);
pub const ALL_INCLUSIVE: RangeInclusive<Key> = RangeInclusive::new(Key::MIN, Key::MAX);
pub fn write<W: Write>(&self, wtr: &mut W) -> Result<()> {
wtr.write_all(&self.0)?;
Ok(())
}
pub fn read(bytes: &[u8]) -> Key {
let mut key = Key::default();
key.0.copy_from_slice(bytes);
key
}
}
impl Value {
pub fn new(commit: i64, data: Option<Vec<u8>>) -> Value {
Value {
ts: commit,
val: data,
}
}
}
struct Scan {
bounds: RangeInclusive<Key>,
data: Arc<MemMap>,
index: Arc<MemMap>,
index_pos: usize,
}
impl Scan {
fn new(bounds: RangeInclusive<Key>, data: Arc<MemMap>, index: Arc<MemMap>) -> Self {
Scan {
bounds,
data,
index,
index_pos: INDEX_META_SIZE as usize,
}
}
fn step(&mut self) -> Result<Option<(Key, Value)>> {
while self.index_pos < self.index.len() {
let pos = self.index_pos as usize;
let end = pos + INDEX_RECORD_SIZE;
let (key, entry): (Key, IndexEntry) = bincode::deserialize_from(&self.index[pos..end])?;
self.index_pos = end;
if key < *self.bounds.start() {
continue;
}
if *self.bounds.end() < key {
self.index_pos = std::usize::MAX;
return Ok(None);
}
let record_range = entry.offset as usize..(entry.offset + entry.size) as usize;
let (data_key, value) = bincode::deserialize_from(&self.data[record_range])?;
assert_eq!(data_key, key);
return Ok(Some((data_key, value)));
}
Ok(None)
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let k0 = BigEndian::read_u64(&self.0[..8]);
let k1 = BigEndian::read_u64(&self.0[8..16]);
let k2 = BigEndian::read_u64(&self.0[16..]);
write!(f, "Key({}, {}, {})", k0, k1, k2)
}
}
impl From<(u64, u64, u64)> for Key {
fn from((k0, k1, k2): (u64, u64, u64)) -> Self {
let mut buf = [0u8; KEY_LEN];
BigEndian::write_u64(&mut buf[..8], k0);
BigEndian::write_u64(&mut buf[8..16], k1);
BigEndian::write_u64(&mut buf[16..], k2);
Key(buf)
}
}
impl<I> Merged<I>
where
I: Iterator<Item = (Key, Value)>,
{
pub fn new(mut sources: Vec<I>) -> Self {
let mut heads = BTreeMap::new();
for (source_idx, source) in sources.iter_mut().enumerate() {
if let Some((k, v)) = source.next() {
heads.insert((k, source_idx), v);
}
}
Merged { sources, heads }
}
}
impl<I> Iterator for Merged<I>
where
I: Iterator<Item = (Key, Value)>,
{
type Item = (Key, Value);
fn next(&mut self) -> Option<Self::Item> {
while !self.heads.is_empty() {
// get new key
let (key, source_idx) = *self.heads.keys().next().unwrap();
let mut val = self.heads.remove(&(key, source_idx)).unwrap();
// replace
if let Some((k, v)) = self.sources[source_idx].next() {
self.heads.insert((k, source_idx), v);
}
// check for other versions of this record
while !self.heads.is_empty() {
let (next_key, source_idx) = *self.heads.keys().next().unwrap();
// Found a different version of the record
if key == next_key {
// pop this version, check if it's newer
let other_version = self.heads.remove(&(next_key, source_idx)).unwrap();
if other_version.ts > val.ts {
val = other_version;
}
// replace
if let Some((k, v)) = self.sources[source_idx].next() {
self.heads.insert((k, source_idx), v);
}
} else {
break;
}
}
// Don't produce deleted records
if val.val.is_some() {
return Some((key, val));
}
}
None
}
}
impl Iterator for Scan {
type Item = (Key, Value);
fn next(&mut self) -> Option<Self::Item> {
if self.index_pos as usize >= self.index.len() {
return None;
}
match self.step() {
Ok(opt) => opt,
Err(_) => {
self.index_pos = std::usize::MAX;
None
}
}
}
}
fn flush_index(
index: &BTreeMap<Key, IndexEntry>,
meta: &IndexMeta,
writer: &mut dyn Write,
) -> Result<()> {
let mut entry_buffer = [0u8; INDEX_RECORD_SIZE];
let mut meta_buffer = [0u8; INDEX_META_SIZE];
bincode::serialize_into(&mut meta_buffer[..], meta)?;
writer.write_all(&meta_buffer)?;
for (key, entry) in index.iter() {
let rec = (key, entry);
entry_buffer.fill(0);
bincode::serialize_into(&mut entry_buffer[..], &rec)?;
writer.write_all(&entry_buffer)?;
}
Ok(())
}
fn flush_mem_table_capped<I, K, V>(
rows: &mut I,
mut wtr: &mut dyn Write,
max_table_size: u64,
) -> Result<(u64, BTreeMap<Key, IndexEntry>)>
where
I: Iterator<Item = (K, V)>,
K: Borrow<Key>,
V: Borrow<Value>,
{
let mut index = BTreeMap::new();
let mut size = 0;
let bincode_config = bincode::config();
for (key, val) in rows {
let record = (key.borrow(), val.borrow());
let serialized_size = bincode_config.serialized_size(&record)?;
bincode::serialize_into(&mut wtr, &record)?;
let entry = IndexEntry {
timestamp: record.1.ts,
offset: size,
size: serialized_size,
};
size += serialized_size;
index.insert(*record.0, entry);
if size >= max_table_size {
break;
}
}
Ok((size, index))
}
#[inline]
fn overlapping<T: Ord + Eq>(r1: &RangeInclusive<T>, r2: &RangeInclusive<T>) -> bool {
r1.start() <= r2.end() && r2.start() <= r1.end()
}
#[cfg(test)]
pub mod test {
use super::*;
use crate::test::gen;
use std::sync::{Arc, RwLock};
#[test]
fn test_dump_data() {
let mut data_buffer = vec![];
let records: BTreeMap<_, _> = gen_records().take(512).collect();
let (_, index) =
flush_mem_table_capped(&mut records.iter(), &mut data_buffer, u64::MAX).unwrap();
assert_eq!(index.len(), records.len());
assert!(index.keys().eq(records.keys()));
let mut retrieved = BTreeMap::new();
for (key, entry) in index.iter() {
let range = entry.offset as usize..(entry.offset + entry.size) as usize;
let (data_key, value) = bincode::deserialize_from(&data_buffer[range]).unwrap();
assert_eq!(&data_key, key);
retrieved.insert(data_key, value);
}
assert_eq!(records, retrieved);
}
#[test]
fn test_dump_indexes() {
let mut data_buffer = vec![];
let mut index_buffer = vec![];
let records: BTreeMap<_, _> = gen_records().take(512).collect();
let (data_size, index) =
flush_mem_table_capped(&mut records.iter(), &mut data_buffer, u64::MAX).unwrap();
let (&start, &end) = (
index.keys().next().unwrap(),
index.keys().next_back().unwrap(),
);
let meta = IndexMeta {
start,
end,
data_size,
level: 0,
};
flush_index(&index, &meta, &mut index_buffer).unwrap();
let retrieved_meta = bincode::deserialize_from(&index_buffer[..INDEX_META_SIZE]).unwrap();
assert_eq!(meta, retrieved_meta);
// By iterating over the BTreeMap we also check the order of index entries as written
for (i, (key, entry)) in index.iter().enumerate() {
let start = i * INDEX_RECORD_SIZE + INDEX_META_SIZE;
let end = start + INDEX_RECORD_SIZE;
let (retrieved_key, retrieved_entry) =
bincode::deserialize_from(&index_buffer[start..end]).unwrap();
assert_eq!(key, &retrieved_key);
assert_eq!(entry, &retrieved_entry);
}
}
#[test]
fn test_sstable_scan() {
let mut data_buffer = vec![];
let mut index_buffer = vec![];
let records: BTreeMap<_, _> = gen_records().take(512).collect();
SSTable::create(&mut records.iter(), 0, &mut data_buffer, &mut index_buffer);
let data = MemMap::Mem(Arc::new(RwLock::new(data_buffer)));
let index = MemMap::Mem(Arc::new(RwLock::new(index_buffer)));
let sst = SSTable::from_parts(Arc::new(data), Arc::new(index)).unwrap();
let output_iter = Scan::new(
Key::ALL_INCLUSIVE,
Arc::clone(&sst.data),
Arc::clone(&sst.index),
);
assert!(output_iter.eq(records.into_iter()));
}
#[test]
fn test_merge_2way() {
let records: BTreeMap<_, _> = gen_records().take(512).collect();
let updates: BTreeMap<_, _> = records
.iter()
.map(|(k, v)| (*k, Value::new(v.ts + 1, Some(vec![]))))
.collect();
let deletes: BTreeMap<_, _> = records
.iter()
.map(|(k, v)| (*k, Value::new(v.ts + 1, None)))
.collect();
let owned = |(k, v): (&Key, &Value)| (*k, v.clone());
let sources = vec![records.iter().map(owned), updates.iter().map(owned)];
let merged: Vec<_> = Merged::new(sources).collect();
assert!(merged.into_iter().eq(updates.into_iter()));
let sources = vec![records.into_iter(), deletes.into_iter()];
let merged: Vec<_> = Merged::new(sources).collect();
assert_eq!(merged.len(), 0);
}
#[test]
fn test_merge_4way() {
// delete last half, then update first half, then delete last half of first half
let start: BTreeMap<_, _> = gen_records().take(512).collect();
let deletes: BTreeMap<_, _> = start
.iter()
.skip(256)
.map(|(k, v)| (*k, Value::new(v.ts + 1, None)))
.collect();
let updates: BTreeMap<_, _> = start
.iter()
.take(256)
.map(|(k, v)| (*k, Value::new(v.ts + 2, Some(vec![]))))
.collect();
let more_deletes: BTreeMap<_, _> = updates
.iter()
.skip(128)
.map(|(k, v)| (*k, Value::new(v.ts + 3, None)))
.collect();
let sources = vec![
more_deletes.into_iter(),
updates.clone().into_iter(),
start.into_iter(),
deletes.into_iter(),
];
let merged: Vec<_> = Merged::new(sources).collect();
let expected: Vec<_> = updates.into_iter().take(128).collect();
assert_eq!(merged.len(), expected.len());
assert_eq!(merged, expected);
}
fn gen_records() -> impl Iterator<Item = (Key, Value)> {
gen::pairs_vary(0..255)
.map(|(key, bytes)| (key, Value::new(bytes.len() as i64, Some(bytes))))
}
}

View File

@ -1,280 +0,0 @@
use crate::error::Result;
use crate::mapper::{Kind, Mapper};
use crate::sstable::{Key, Merged, SSTable, Value};
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::mem;
/// Wrapper over a BTreeMap<`Key`, `Value`> that does basic accounting of memory usage
/// (Doesn't include BTreeMap internal stuff, can't reliably account for that without
/// using special data-structures or depending on unstable implementation details of `std`)
#[derive(Debug)]
pub struct MemTable {
pub mem_size: usize,
pub values: BTreeMap<Key, Value>,
}
impl MemTable {
/// Memory over-head per record. Size of the key + size of commit ID.
pub const OVERHEAD_PER_RECORD: usize = mem::size_of::<Key>() + mem::size_of::<i64>();
pub fn new(values: BTreeMap<Key, Value>) -> MemTable {
let mem_size = values.values().fold(0, |acc, elem| {
acc + Self::OVERHEAD_PER_RECORD + opt_bytes_memory(&elem.val)
});
MemTable { mem_size, values }
}
pub fn put(&mut self, key: &Key, commit: i64, data: &[u8]) {
let value = Value {
ts: commit,
val: Some(data.to_vec()),
};
self.mem_size += data.len();
match self.values.entry(*key) {
Entry::Vacant(entry) => {
entry.insert(value);
self.mem_size += Self::OVERHEAD_PER_RECORD;
}
Entry::Occupied(mut entry) => {
let old = entry.insert(value);
self.mem_size -= opt_bytes_memory(&old.val);
}
}
}
pub fn delete(&mut self, key: &Key, commit: i64) {
let value = Value {
ts: commit,
val: None,
};
match self.values.entry(*key) {
Entry::Vacant(entry) => {
entry.insert(value);
self.mem_size += Self::OVERHEAD_PER_RECORD;
}
Entry::Occupied(mut entry) => {
let old = entry.insert(value);
self.mem_size -= opt_bytes_memory(&old.val);
}
}
}
}
pub fn flush_table(
mem: &BTreeMap<Key, Value>,
mapper: &dyn Mapper,
pages: &mut Vec<BTreeMap<Key, SSTable>>,
) -> Result<()> {
if mem.is_empty() {
return Ok(());
};
if pages.is_empty() {
pages.push(BTreeMap::new());
}
let mut iter = mem.iter();
let sst = mapper.make_table(Kind::Active, &mut |mut data_wtr, mut index_wtr| {
SSTable::create(&mut iter, 0, &mut data_wtr, &mut index_wtr);
})?;
let first = sst.meta().start;
pages[0].insert(first, sst);
Ok(())
}
pub fn get(
mem: &BTreeMap<Key, Value>,
pages: &[BTreeMap<Key, SSTable>],
key: &Key,
) -> Result<Option<Vec<u8>>> {
if let Some(idx) = mem.get(key) {
return Ok(idx.val.clone());
}
let mut candidates = Vec::new();
for level in pages.iter() {
for (_, sst) in level.iter().rev() {
if sst.could_contain(key) {
if let Some(val) = sst.get(&key)? {
candidates.push((*key, val));
}
}
}
}
let merged = Merged::new(vec![candidates.into_iter()])
.next()
.map(|(_, v)| v.val.unwrap());
Ok(merged)
}
pub fn range(
mem: &BTreeMap<Key, Value>,
tables: &[BTreeMap<Key, SSTable>],
range: std::ops::RangeInclusive<Key>,
) -> Result<impl Iterator<Item = (Key, Vec<u8>)>> {
let mut sources: Vec<Box<dyn Iterator<Item = (Key, Value)>>> = Vec::new();
let mem = mem
.range(range.clone())
.map(|(k, v)| (*k, v.clone()))
.collect::<Vec<_>>();
sources.push(Box::new(mem.into_iter()));
for level in tables.iter() {
for sst in level.values() {
let iter = sst.range(&range)?;
let iter = Box::new(iter) as Box<dyn Iterator<Item = (Key, Value)>>;
sources.push(iter);
}
}
let rows = Merged::new(sources).map(|(k, v)| (k, v.val.unwrap()));
Ok(rows)
}
impl Default for MemTable {
fn default() -> MemTable {
MemTable {
values: BTreeMap::new(),
mem_size: 0,
}
}
}
#[inline]
fn opt_bytes_memory(bytes: &Option<Vec<u8>>) -> usize {
bytes.as_ref().map(Vec::len).unwrap_or(0)
}
#[cfg(test)]
mod test {
use super::*;
use crate::test::gen;
const COMMIT: i64 = -1;
#[test]
fn test_put_calc() {
const DATA_SIZE: usize = 16;
let mut table = MemTable::default();
for (key, data) in gen::pairs(DATA_SIZE).take(1024) {
table.put(&key, COMMIT, &data);
}
let expected_size = 1024 * (DATA_SIZE + MemTable::OVERHEAD_PER_RECORD);
assert_eq!(table.mem_size, expected_size);
}
#[test]
fn test_delete_calc() {
const DATA_SIZE: usize = 32;
let mut table = MemTable::default();
let input = gen::pairs(DATA_SIZE).take(1024).collect::<Vec<_>>();
for (key, data) in &input {
table.put(key, COMMIT, data);
}
for (key, _) in input.iter().rev().take(512) {
table.delete(key, COMMIT);
}
let expected_size =
512 * (DATA_SIZE + MemTable::OVERHEAD_PER_RECORD) + 512 * MemTable::OVERHEAD_PER_RECORD;
assert_eq!(table.mem_size, expected_size);
// Deletes of things not in the memory table must be recorded
for key in gen::keys().take(512) {
table.delete(&key, COMMIT);
}
let expected_size = expected_size + 512 * MemTable::OVERHEAD_PER_RECORD;
assert_eq!(table.mem_size, expected_size);
}
#[test]
fn test_put_order_irrelevant() {
let (mut table_1, mut table_2) = (MemTable::default(), MemTable::default());
let big_input: Vec<_> = gen::pairs(1024).take(128).collect();
let small_input: Vec<_> = gen::pairs(16).take(128).collect();
for (key, data) in big_input.iter().chain(small_input.iter()) {
table_1.put(key, COMMIT, data);
}
let iter = big_input
.iter()
.rev()
.zip(small_input.iter().rev())
.enumerate();
for (i, ((big_key, big_data), (small_key, small_data))) in iter {
if i % 2 == 0 {
table_2.put(big_key, COMMIT, big_data);
table_2.put(small_key, COMMIT, small_data);
} else {
table_2.put(small_key, COMMIT, small_data);
table_2.put(big_key, COMMIT, big_data);
}
}
assert_eq!(table_1.mem_size, table_2.mem_size);
assert_eq!(table_1.values, table_2.values);
}
#[test]
fn test_delete_order_irrelevant() {
let (mut table_1, mut table_2) = (MemTable::default(), MemTable::default());
let big_input: Vec<_> = gen::pairs(1024).take(128).collect();
let small_input: Vec<_> = gen::pairs(16).take(128).collect();
for (key, data) in big_input.iter().chain(small_input.iter()) {
table_1.put(key, COMMIT, data);
table_2.put(key, COMMIT, data);
}
let iter = big_input
.iter()
.rev()
.take(64)
.chain(small_input.iter().rev().take(64))
.map(|(key, _)| key);
for key in iter {
table_1.delete(key, COMMIT);
}
let iter = big_input
.iter()
.rev()
.take(64)
.zip(small_input.iter().rev().take(64))
.map(|((key, _), (key2, _))| (key, key2))
.enumerate();
for (i, (big_key, small_key)) in iter {
if i % 2 == 0 {
table_2.delete(big_key, COMMIT);
table_2.delete(small_key, COMMIT);
} else {
table_2.delete(small_key, COMMIT);
table_2.delete(big_key, COMMIT);
}
}
assert_eq!(table_1.mem_size, table_2.mem_size);
assert_eq!(table_1.values, table_2.values);
}
}

View File

@ -1,209 +0,0 @@
use crate::error::{Error, Result};
use crate::sstable::Key;
use crate::storage::MemTable;
use crate::writelog::WriteLog;
use crate::DEFAULT_MEM_SIZE;
use std::sync::{Arc, RwLock};
/// Configuration for `WriteBatch`
#[derive(Debug)]
pub struct Config {
/// Determines whether writes using this batch will be written to the write-ahead-log
/// immediately, or only all-at-once when the batch is being committed.
pub log_writes: bool,
/// Size cap for the write-batch. Inserts after it is full will return an `Err`;
pub max_size: usize,
}
#[derive(Debug)]
pub struct WriteBatch {
pub(crate) log: Arc<RwLock<WriteLog>>,
pub(crate) memtable: MemTable,
pub(crate) commit: i64,
pub(crate) config: Config,
}
impl WriteBatch {
pub fn put(&mut self, key: &Key, data: &[u8]) -> Result<()> {
self.check_capacity()?;
if self.config.log_writes {
let mut log = self.log.write().unwrap();
log.log_put(key, self.commit, data).unwrap();
}
self.memtable.put(key, self.commit, data);
Ok(())
}
pub fn put_many<Iter, Tup, K, V>(&mut self, rows: Iter) -> Result<()>
where
Iter: Iterator<Item = Tup>,
Tup: std::borrow::Borrow<(K, V)>,
K: std::borrow::Borrow<Key>,
V: std::borrow::Borrow<[u8]>,
{
self.check_capacity()?;
if self.config.log_writes {
let mut log = self.log.write().unwrap();
for pair in rows {
let (ref key, ref data) = pair.borrow();
let (key, data) = (key.borrow(), data.borrow());
log.log_put(key, self.commit, data).unwrap();
self.memtable.put(key, self.commit, data);
}
} else {
for pair in rows {
let (ref key, ref data) = pair.borrow();
self.memtable.put(key.borrow(), self.commit, data.borrow());
}
}
Ok(())
}
pub fn delete(&mut self, key: &Key) {
if self.config.log_writes {
let mut log = self.log.write().unwrap();
log.log_delete(key, self.commit).unwrap();
}
self.memtable.delete(key, self.commit);
}
pub fn delete_many<Iter, K>(&mut self, rows: Iter)
where
Iter: Iterator<Item = K>,
K: std::borrow::Borrow<Key>,
{
if self.config.log_writes {
let mut log = self.log.write().unwrap();
for key in rows {
let key = key.borrow();
log.log_delete(key, self.commit).unwrap();
self.memtable.delete(key, self.commit);
}
} else {
for key in rows {
self.memtable.delete(key.borrow(), self.commit);
}
}
}
#[inline]
fn check_capacity(&self) -> Result<()> {
if self.memtable.mem_size >= self.config.max_size {
return Err(Error::WriteBatchFull(self.config.max_size));
}
Ok(())
}
}
impl Default for Config {
fn default() -> Config {
Config {
log_writes: true,
max_size: DEFAULT_MEM_SIZE,
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test::gen;
use crate::writelog::Config as WalConfig;
const CAPACITY: usize = 10 * 1024;
#[test]
fn test_put_associative() {
let mut writebatch = setup();
let input: Vec<_> = gen::pairs(32).take(100).collect();
writebatch.put_many(input.iter()).unwrap();
let mut writebatch2 = setup();
for (key, data) in &input {
writebatch2.put(key, data).unwrap();
}
let (materialized_1, materialized_2) = (
writebatch.log.write().unwrap().materialize().unwrap(),
writebatch2.log.write().unwrap().materialize().unwrap(),
);
assert_eq!(materialized_1, materialized_2);
}
#[test]
fn test_delete_associative() {
let (mut writebatch, mut writebatch2) = (setup(), setup());
let input: Vec<_> = gen::pairs(32).take(100).collect();
writebatch.put_many(input.iter()).unwrap();
writebatch2.put_many(input.iter()).unwrap();
writebatch.delete_many(input.iter().map(|(k, _)| k));
for (key, _) in &input {
writebatch2.delete(key);
}
let (materialized_1, materialized_2) = (
writebatch.log.write().unwrap().materialize().unwrap(),
writebatch2.log.write().unwrap().materialize().unwrap(),
);
assert_eq!(materialized_1, materialized_2);
}
#[test]
fn test_no_put_when_full() {
const AMT_RECORDS: usize = 64;
let mut writebatch = setup();
let space_per_record = CAPACITY / AMT_RECORDS - MemTable::OVERHEAD_PER_RECORD;
let input: Vec<_> = gen::pairs(space_per_record).take(AMT_RECORDS).collect();
writebatch.put_many(input.iter()).unwrap();
match writebatch.check_capacity() {
Err(Error::WriteBatchFull(CAPACITY)) => {}
_ => panic!("Writebatch should be exactly at capacity"),
}
let (key, data) = gen::pairs(space_per_record).next().unwrap();
let result = writebatch.put(&key, &data);
assert!(result.is_err());
// Free up space
writebatch.delete(&input[0].0);
let result = writebatch.put(&key, &data);
assert!(result.is_ok());
}
fn setup() -> WriteBatch {
let config = Config {
log_writes: true,
max_size: CAPACITY,
};
let log = WriteLog::memory(WalConfig::default());
WriteBatch {
config,
commit: -1,
memtable: MemTable::default(),
log: Arc::new(RwLock::new(log)),
}
}
}

View File

@ -1,276 +0,0 @@
use crate::error::Result;
use crate::io_utils::{CRCReader, CRCWriter};
use crate::sstable::Value;
use crate::Key;
use memmap::Mmap;
use std::collections::BTreeMap;
use std::fs::{self, File};
use std::io::Write;
use std::path::{Path, PathBuf};
// RocksDb's log uses this size.
// May be worth making configurable and experimenting
const BLOCK_SIZE: usize = 32 * 1024;
#[derive(Debug)]
pub struct WriteLog {
log_path: PathBuf,
logger: Logger,
config: Config,
in_memory: bool,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub struct Config {
pub use_fsync: bool,
pub sync_every_write: bool,
}
impl WriteLog {
pub fn open(path: &Path, config: Config) -> Result<Self> {
let file = file_opts().open(path)?;
Ok(WriteLog {
config,
log_path: path.to_path_buf(),
logger: Logger::disk(file),
in_memory: false,
})
}
#[allow(dead_code)]
pub fn memory(config: Config) -> WriteLog {
WriteLog {
config,
logger: Logger::memory(),
log_path: Path::new("").to_path_buf(),
in_memory: true,
}
}
pub fn reset(&mut self) -> Result<()> {
let new_logger = if self.in_memory {
Logger::memory()
} else {
let file = file_opts().truncate(true).open(&self.log_path)?;
Logger::disk(file)
};
self.logger = new_logger;
Ok(())
}
pub fn log_put(&mut self, key: &Key, ts: i64, val: &[u8]) -> Result<()> {
log(&mut self.logger, key, ts, Some(val))?;
if self.config.sync_every_write {
sync(&mut self.logger, self.config.use_fsync)?;
}
Ok(())
}
pub fn log_delete(&mut self, key: &Key, ts: i64) -> Result<()> {
log(&mut self.logger, key, ts, None)?;
if self.config.sync_every_write {
sync(&mut self.logger, self.config.use_fsync)?;
}
Ok(())
}
#[allow(dead_code)]
pub fn sync(&mut self) -> Result<()> {
sync(&mut self.logger, self.config.use_fsync)
}
pub fn materialize(&mut self) -> Result<BTreeMap<Key, Value>> {
let mmap = self.logger.writer.mmap()?;
read_log(&mmap)
}
}
impl Default for Config {
fn default() -> Config {
Config {
use_fsync: false,
sync_every_write: true,
}
}
}
trait LogWriter: std::fmt::Debug + Write + Send + Sync {
fn sync(&mut self, fsync: bool) -> Result<()>;
fn mmap(&self) -> Result<Mmap>;
}
/// Holds actual logging related state
#[derive(Debug)]
struct Logger {
writer: Box<dyn LogWriter>,
}
impl Logger {
fn memory() -> Self {
Logger {
writer: Box::new(CRCWriter::new(vec![], BLOCK_SIZE)),
}
}
fn disk(file: File) -> Self {
Logger {
writer: Box::new(CRCWriter::new(file, BLOCK_SIZE)),
}
}
}
impl LogWriter for CRCWriter<Vec<u8>> {
fn sync(&mut self, _: bool) -> Result<()> {
self.flush()?;
Ok(())
}
fn mmap(&self) -> Result<Mmap> {
let mut map = memmap::MmapMut::map_anon(self.get_ref().len())?;
(&mut map[..]).copy_from_slice(self.get_ref());
Ok(map.make_read_only()?)
}
}
impl LogWriter for CRCWriter<File> {
fn sync(&mut self, fsync: bool) -> Result<()> {
self.flush()?;
let file = self.get_mut();
if fsync {
file.sync_all()?;
} else {
file.sync_data()?;
}
Ok(())
}
fn mmap(&self) -> Result<Mmap> {
let map = unsafe { Mmap::map(self.get_ref())? };
Ok(map)
}
}
fn log(logger: &mut Logger, key: &Key, commit: i64, data: Option<&[u8]>) -> Result<()> {
let writer = &mut logger.writer;
bincode::serialize_into(writer, &(key, commit, data))?;
Ok(())
}
fn sync(logger: &mut Logger, sync_all: bool) -> Result<()> {
let writer = &mut logger.writer;
writer.sync(sync_all)?;
Ok(())
}
#[inline]
fn file_opts() -> fs::OpenOptions {
let mut opts = fs::OpenOptions::new();
opts.read(true).write(true).create(true);
opts
}
fn read_log(log_buf: &[u8]) -> Result<BTreeMap<Key, Value>> {
let mut map = BTreeMap::new();
let mut reader = CRCReader::new(log_buf, BLOCK_SIZE);
while let Ok((key, commit, opt_bytes)) = bincode::deserialize_from(&mut reader) {
map.insert(key, Value::new(commit, opt_bytes));
}
Ok(map)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_log_serialization() {
let (key, commit, data) = (Key::from((1, 2, 3)), 4, Some(vec![0; 1024]));
let mut buf = vec![];
bincode::serialize_into(&mut buf, &(&key, commit, &data)).unwrap();
buf.extend(std::iter::repeat(0).take(buf.len()));
let log_record: (Key, i64, Option<Vec<u8>>) = bincode::deserialize_from(&buf[..]).unwrap();
assert_eq!(log_record.0, key);
assert_eq!(log_record.1, commit);
assert_eq!(log_record.2, data);
}
#[test]
fn test_log_round_trip() {
let mut wal = WriteLog::memory(Config::default());
let values: BTreeMap<Key, Value> = (0u64..100)
.map(|n| {
let val = if n % 2 == 0 {
Some(vec![0; 1024])
} else {
None
};
(Key::from((n, n, n)), Value { ts: n as i64, val })
})
.collect();
for (k, v) in values.iter() {
if v.val.is_some() {
wal.log_put(k, v.ts, v.val.as_ref().unwrap())
.expect("Wal::put");
} else {
wal.log_delete(k, v.ts).expect("Wal::delete");
}
}
let reloaded = wal.materialize().expect("Wal::materialize");
assert_eq!(values.len(), reloaded.len());
assert_eq!(values, reloaded);
}
#[test]
fn test_reset() {
use crate::error::Error;
let mut wal = WriteLog::memory(Config::default());
let values: BTreeMap<Key, Value> = (0u64..100)
.map(|n| {
let val = Some(vec![0; 64]);
(Key::from((n, n, n)), Value { ts: n as i64, val })
})
.collect();
for (k, v) in values.iter() {
wal.log_put(k, v.ts, v.val.as_ref().unwrap())
.expect("Wal::put");
}
wal.reset().expect("Wal::reset");
// Should result in an error due to attempting to make a memory map of length 0
let result = wal.materialize();
assert!(result.is_err());
if let Err(Error::Io(e)) = result {
assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
} else {
panic!("should fail to create 0-length memory-map with an empty log");
}
}
}

View File

@ -1,240 +0,0 @@
use std::fs;
use std::path::{Path, PathBuf};
use solana_kvstore::test::gen;
use solana_kvstore::{Config, Key, KvStore};
const KB: usize = 1024;
const HALF_KB: usize = 512;
#[test]
fn test_put_get() {
let path = setup("test_put_get");
let cfg = Config {
max_mem: 64 * KB,
max_tables: 5,
page_size: 64 * KB,
..Config::default()
};
let lsm = KvStore::open(&path, cfg).unwrap();
let (key, bytes) = gen::pairs(HALF_KB).take(1).next().unwrap();
lsm.put(&key, &bytes).expect("put fail");
let out_bytes = lsm.get(&key).expect("get fail").expect("missing");
assert_eq!(bytes, out_bytes);
teardown(&path);
}
#[test]
fn test_put_get_many() {
let path = setup("test_put_get_many");
let cfg = Config {
max_mem: 64 * KB,
max_tables: 5,
page_size: 64 * KB,
..Config::default()
};
let lsm = KvStore::open(&path, cfg).unwrap();
let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(1024).collect();
pairs.sort_unstable_by_key(|(k, _)| *k);
lsm.put_many(pairs.clone().drain(..))
.expect("put_many fail");
let retrieved: Vec<(Key, Vec<u8>)> =
lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect();
assert!(!retrieved.is_empty());
assert_eq!(pairs.len(), retrieved.len());
assert_eq!(pairs, retrieved);
teardown(&path);
}
#[test]
fn test_delete() {
let path = setup("test_delete");
let cfg = Config {
max_mem: 64 * KB,
max_tables: 5,
page_size: 64 * KB,
..Config::default()
};
let lsm = KvStore::open(&path, cfg).unwrap();
let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(64 * 6).collect();
pairs.sort_unstable_by_key(|(k, _)| *k);
for (k, i) in pairs.iter() {
lsm.put(k, i).expect("put fail");
}
// drain iterator deletes from `pairs`
for (k, _) in pairs.drain(64..128) {
lsm.delete(&k).expect("delete fail");
}
let retrieved: Vec<(Key, Vec<u8>)> =
lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect();
assert!(!retrieved.is_empty());
assert_eq!(pairs.len(), retrieved.len());
assert_eq!(pairs, retrieved);
teardown(&path);
}
#[test]
fn test_delete_many() {
let path = setup("test_delete_many");
let cfg = Config {
max_mem: 64 * KB,
max_tables: 5,
page_size: 64 * KB,
..Config::default()
};
let lsm = KvStore::open(&path, cfg).unwrap();
let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(64 * 6).collect();
pairs.sort_unstable_by_key(|(k, _)| *k);
for (k, i) in pairs.iter() {
lsm.put(k, i).expect("put fail");
}
// drain iterator deletes from `pairs`
let keys_to_delete = pairs.drain(320..384).map(|(k, _)| k);
lsm.delete_many(keys_to_delete).expect("delete_many fail");
let retrieved: Vec<(Key, Vec<u8>)> =
lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect();
assert!(!retrieved.is_empty());
assert_eq!(pairs.len(), retrieved.len());
assert_eq!(pairs, retrieved);
teardown(&path);
}
#[test]
fn test_close_reopen() {
let path = setup("test_close_reopen");
let cfg = Config::default();
let lsm = KvStore::open(&path, cfg).unwrap();
let mut pairs: Vec<_> = gen::pairs(KB).take(1024).collect();
pairs.sort_unstable_by_key(|(k, _)| *k);
for (k, i) in pairs.iter() {
lsm.put(k, i).expect("put fail");
}
for (k, _) in pairs.drain(64..128) {
lsm.delete(&k).expect("delete fail");
}
// Drop and re-open
drop(lsm);
let lsm = KvStore::open(&path, cfg).unwrap();
let retrieved: Vec<(Key, Vec<u8>)> =
lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect();
assert!(!retrieved.is_empty());
assert_eq!(pairs.len(), retrieved.len());
assert_eq!(pairs, retrieved);
teardown(&path);
}
#[test]
fn test_partitioned() {
let path = setup("test_partitioned");
let cfg = Config {
max_mem: 64 * KB,
max_tables: 5,
page_size: 64 * KB,
..Config::default()
};
let storage_dirs = (0..4)
.map(|i| path.join(format!("parition-{}", i)))
.collect::<Vec<_>>();
let lsm = KvStore::partitioned(&path, &storage_dirs, cfg).unwrap();
let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(64 * 12).collect();
pairs.sort_unstable_by_key(|(k, _)| *k);
lsm.put_many(pairs.iter()).expect("put_many fail");
// drain iterator deletes from `pairs`
let keys_to_delete = pairs.drain(320..384).map(|(k, _)| k);
lsm.delete_many(keys_to_delete).expect("delete_many fail");
let retrieved: Vec<(Key, Vec<u8>)> =
lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect();
assert!(!retrieved.is_empty());
assert_eq!(pairs.len(), retrieved.len());
assert_eq!(pairs, retrieved);
teardown(&path);
}
#[test]
fn test_in_memory() {
let path = setup("test_in_memory");
let cfg = Config {
max_mem: 64 * KB,
max_tables: 5,
page_size: 64 * KB,
in_memory: true,
..Config::default()
};
let lsm = KvStore::open(&path, cfg).unwrap();
let mut pairs: Vec<_> = gen::pairs(HALF_KB).take(64 * 12).collect();
pairs.sort_unstable_by_key(|(k, _)| *k);
lsm.put_many(pairs.iter()).expect("put_many fail");
// drain iterator deletes from `pairs`
let keys_to_delete = pairs.drain(320..384).map(|(k, _)| k);
lsm.delete_many(keys_to_delete).expect("delete_many fail");
let retrieved: Vec<(Key, Vec<u8>)> =
lsm.range(Key::ALL_INCLUSIVE).expect("range fail").collect();
assert!(!retrieved.is_empty());
assert_eq!(pairs.len(), retrieved.len());
assert_eq!(pairs, retrieved);
teardown(&path);
}
fn setup(test_name: &str) -> PathBuf {
let dir = Path::new("kvstore-test").join(test_name);
let _ig = fs::remove_dir_all(&dir);
fs::create_dir_all(&dir).unwrap();
dir
}
fn teardown(p: &Path) {
KvStore::destroy(p).expect("Expect successful store destruction");
}