Add WriteBatch to KvStore (#3364)

* implement write-batch in kvstore

* Add tests to writebatch, and in-memory table
This commit is contained in:
Mark 2019-03-20 06:55:39 -05:00 committed by Mark E. Sinclair
parent 61f950a60c
commit b3cdf58e4b
5 changed files with 453 additions and 91 deletions

View File

@ -12,6 +12,7 @@ pub enum Error {
Corrupted(bincode::Error),
Channel(Box<dyn StdErr + Sync + Send>),
Missing,
WriteBatchFull(usize),
}
impl fmt::Display for Error {
@ -21,6 +22,7 @@ impl fmt::Display for Error {
Error::Channel(e) => write!(f, "Internal communication error: {}", e),
Error::Io(e) => write!(f, "I/O error: {}", e),
Error::Missing => write!(f, "Item not present in ledger"),
Error::WriteBatchFull(capacity) => write!(f, "WriteBatch capacity {} full", capacity),
}
}
}
@ -32,6 +34,7 @@ impl StdErr for Error {
Error::Corrupted(ref e) => Some(e),
Error::Channel(e) => Some(e.as_ref()),
Error::Missing => None,
Error::WriteBatchFull(_) => None,
}
}
}

View File

@ -19,8 +19,8 @@ mod mapper;
mod readtx;
mod sstable;
mod storage;
mod writebatch;
mod writelog;
mod writetx;
#[macro_use]
extern crate serde_derive;
@ -28,8 +28,8 @@ extern crate serde_derive;
pub use self::error::{Error, Result};
pub use self::readtx::ReadTx as Snapshot;
pub use self::sstable::Key;
pub use self::writebatch::{Config as WriteBatchConfig, WriteBatch};
pub use self::writelog::Config as LogConfig;
pub use self::writetx::WriteTx;
const TABLES_FILE: &str = "tables.meta";
const LOG_FILE: &str = "mem-log";
@ -100,7 +100,8 @@ impl KvStore {
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
storage::put(&mut *memtable, &mut *log, key, commit as i64, data)?;
log.log_put(key, commit, data).unwrap();
memtable.put(key, commit, data);
self.ensure_memtable(&mut *memtable, &mut *log)?;
@ -119,15 +120,11 @@ impl KvStore {
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
for pair in rows {
let (ref key, ref data) = pair.borrow();
let (ref k, ref d) = pair.borrow();
let (key, data) = (k.borrow(), d.borrow());
storage::put(
&mut *memtable,
&mut *log,
key.borrow(),
commit,
data.borrow(),
)?;
log.log_put(key, commit, data).unwrap();
memtable.put(key, commit, data);
}
self.ensure_memtable(&mut *memtable, &mut *log)?;
@ -148,7 +145,8 @@ impl KvStore {
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
storage::delete(&mut *memtable, &mut *log, key, commit)?;
log.log_delete(key, commit).unwrap();
memtable.delete(key, commit);
self.ensure_memtable(&mut *memtable, &mut *log)?;
@ -164,8 +162,10 @@ impl KvStore {
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
for key in rows {
storage::delete(&mut *memtable, &mut *log, key.borrow(), commit)?;
for k in rows {
let key = k.borrow();
log.log_delete(key, commit).unwrap();
memtable.delete(key, commit);
}
self.ensure_memtable(&mut *memtable, &mut *log)?;
@ -173,12 +173,25 @@ impl KvStore {
Ok(())
}
pub fn transaction(&self) -> Result<WriteTx> {
unimplemented!()
pub fn batch(&self, config: WriteBatchConfig) -> WriteBatch {
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
WriteBatch {
config,
commit,
memtable: MemTable::new(BTreeMap::new()),
log: Arc::clone(&self.log),
}
}
pub fn commit(&self, _txn: WriteTx) -> Result<()> {
unimplemented!()
pub fn commit(&self, mut batch: WriteBatch) -> Result<()> {
let mut memtable = self.mem.write().unwrap();
let mut log = self.log.write().unwrap();
memtable.values.append(&mut batch.memtable.values);
self.ensure_memtable(&mut *memtable, &mut *log)?;
Ok(())
}
pub fn snapshot(&self) -> Snapshot {

View File

@ -1,14 +1,13 @@
use crate::error::Result;
use crate::mapper::{Kind, Mapper};
use crate::sstable::{Key, Merged, SSTable, Value};
use crate::writelog::WriteLog;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::mem;
// Size of timestamp + size of key
const OVERHEAD: usize = 8 + 3 * 8;
const LOG_ERR: &str = "Write to log failed! Halting.";
/// Wrapper over a BTreeMap<`Key`, `Value`> that does basic accounting of memory usage
/// (Doesn't include BTreeMap internal stuff, can't reliably account for that without
/// using special data-structures or depending on unstable implementation details of `std`)
#[derive(Debug)]
pub struct MemTable {
pub mem_size: usize,
@ -16,61 +15,52 @@ pub struct MemTable {
}
impl MemTable {
/// Memory over-head per record. Size of the key + size of commit ID.
pub const OVERHEAD_PER_RECORD: usize = mem::size_of::<Key>() + mem::size_of::<i64>();
pub fn new(values: BTreeMap<Key, Value>) -> MemTable {
let mem_size = values.values().fold(0, |acc, elem| acc + val_mem_use(elem));
let mem_size = values.values().fold(0, |acc, elem| {
acc + Self::OVERHEAD_PER_RECORD + opt_bytes_memory(&elem.val)
});
MemTable { mem_size, values }
}
}
pub fn put(
mem: &mut MemTable,
log: &mut WriteLog,
key: &Key,
commit: i64,
data: &[u8],
) -> Result<()> {
log.log_put(key, commit, data).expect(LOG_ERR);
pub fn put(&mut self, key: &Key, commit: i64, data: &[u8]) {
let value = Value {
ts: commit,
val: Some(data.to_vec()),
};
let value = Value {
ts: commit,
val: Some(data.to_vec()),
};
mem.mem_size += val_mem_use(&value);
match mem.values.entry(*key) {
Entry::Vacant(entry) => {
entry.insert(value);
}
Entry::Occupied(mut entry) => {
let old = entry.insert(value);
mem.mem_size -= val_mem_use(&old);
self.mem_size += data.len();
match self.values.entry(*key) {
Entry::Vacant(entry) => {
entry.insert(value);
self.mem_size += Self::OVERHEAD_PER_RECORD;
}
Entry::Occupied(mut entry) => {
let old = entry.insert(value);
self.mem_size -= opt_bytes_memory(&old.val);
}
}
}
Ok(())
}
pub fn delete(&mut self, key: &Key, commit: i64) {
let value = Value {
ts: commit,
val: None,
};
pub fn delete(mem: &mut MemTable, log: &mut WriteLog, key: &Key, commit: i64) -> Result<()> {
log.log_delete(key, commit).expect(LOG_ERR);
let value = Value {
ts: commit,
val: None,
};
mem.mem_size += val_mem_use(&value);
match mem.values.entry(*key) {
Entry::Vacant(entry) => {
entry.insert(value);
}
Entry::Occupied(mut entry) => {
let old = entry.insert(value);
mem.mem_size -= val_mem_use(&old);
match self.values.entry(*key) {
Entry::Vacant(entry) => {
entry.insert(value);
self.mem_size += Self::OVERHEAD_PER_RECORD;
}
Entry::Occupied(mut entry) => {
let old = entry.insert(value);
self.mem_size -= opt_bytes_memory(&old.val);
}
}
}
Ok(())
}
pub fn flush_table(
@ -151,12 +141,158 @@ pub fn range(
Ok(rows)
}
#[inline]
fn val_mem_use(val: &Value) -> usize {
OVERHEAD + val.val.as_ref().map(Vec::len).unwrap_or(0)
impl Default for MemTable {
fn default() -> MemTable {
MemTable {
values: BTreeMap::new(),
mem_size: 0,
}
}
}
// TODO: Write basic tests using mem-table
// 1. test put + delete works right
// 2. test delete of unknown key recorded
// 3. check memory usage calcs
#[inline]
fn opt_bytes_memory(bytes: &Option<Vec<u8>>) -> usize {
bytes.as_ref().map(Vec::len).unwrap_or(0)
}
#[cfg(test)]
mod test {
use super::*;
use rand::{self, thread_rng, Rng};
const COMMIT: i64 = -1;
#[test]
fn test_put_calc() {
const DATA_SIZE: usize = 16;
let mut table = MemTable::default();
for (key, data) in gen_pairs(DATA_SIZE).take(1024) {
table.put(&key, COMMIT, &data);
}
let expected_size = 1024 * (DATA_SIZE + MemTable::OVERHEAD_PER_RECORD);
assert_eq!(table.mem_size, expected_size);
}
#[test]
fn test_delete_calc() {
const DATA_SIZE: usize = 32;
let mut table = MemTable::default();
let input = gen_pairs(DATA_SIZE).take(1024).collect::<Vec<_>>();
for (key, data) in &input {
table.put(key, COMMIT, data);
}
for (key, _) in input.iter().rev().take(512) {
table.delete(key, COMMIT);
}
let expected_size =
512 * (DATA_SIZE + MemTable::OVERHEAD_PER_RECORD) + 512 * MemTable::OVERHEAD_PER_RECORD;
assert_eq!(table.mem_size, expected_size);
// Deletes of things not in the memory table must be recorded
for key in gen_keys().take(512) {
table.delete(&key, COMMIT);
}
let expected_size = expected_size + 512 * MemTable::OVERHEAD_PER_RECORD;
assert_eq!(table.mem_size, expected_size);
}
#[test]
fn test_put_order_irrelevant() {
let (mut table_1, mut table_2) = (MemTable::default(), MemTable::default());
let big_input: Vec<_> = gen_pairs(1024).take(128).collect();
let small_input: Vec<_> = gen_pairs(16).take(128).collect();
for (key, data) in big_input.iter().chain(small_input.iter()) {
table_1.put(key, COMMIT, data);
}
let iter = big_input
.iter()
.rev()
.zip(small_input.iter().rev())
.enumerate();
for (i, ((big_key, big_data), (small_key, small_data))) in iter {
if i % 2 == 0 {
table_2.put(big_key, COMMIT, big_data);
table_2.put(small_key, COMMIT, small_data);
} else {
table_2.put(small_key, COMMIT, small_data);
table_2.put(big_key, COMMIT, big_data);
}
}
assert_eq!(table_1.mem_size, table_2.mem_size);
assert_eq!(table_1.values, table_2.values);
}
#[test]
fn test_delete_order_irrelevant() {
let (mut table_1, mut table_2) = (MemTable::default(), MemTable::default());
let big_input: Vec<_> = gen_pairs(1024).take(128).collect();
let small_input: Vec<_> = gen_pairs(16).take(128).collect();
for (key, data) in big_input.iter().chain(small_input.iter()) {
table_1.put(key, COMMIT, data);
table_2.put(key, COMMIT, data);
}
let iter = big_input
.iter()
.rev()
.take(64)
.chain(small_input.iter().rev().take(64))
.map(|(key, _)| key);
for key in iter {
table_1.delete(key, COMMIT);
}
let iter = big_input
.iter()
.rev()
.take(64)
.zip(small_input.iter().rev().take(64))
.map(|((key, _), (key2, _))| (key, key2))
.enumerate();
for (i, (big_key, small_key)) in iter {
if i % 2 == 0 {
table_2.delete(big_key, COMMIT);
table_2.delete(small_key, COMMIT);
} else {
table_2.delete(small_key, COMMIT);
table_2.delete(big_key, COMMIT);
}
}
assert_eq!(table_1.mem_size, table_2.mem_size);
assert_eq!(table_1.values, table_2.values);
}
fn gen_keys() -> impl Iterator<Item = Key> {
let mut rng = thread_rng();
std::iter::repeat_with(move || {
let buf = rng.gen();
Key(buf)
})
}
fn gen_data(size: usize) -> impl Iterator<Item = Vec<u8>> {
std::iter::repeat(vec![1u8; size])
}
fn gen_pairs(data_size: usize) -> impl Iterator<Item = (Key, Vec<u8>)> {
gen_keys().zip(gen_data(data_size))
}
}

227
kvstore/src/writebatch.rs Normal file
View File

@ -0,0 +1,227 @@
use crate::error::{Error, Result};
use crate::sstable::Key;
use crate::storage::MemTable;
use crate::writelog::WriteLog;
use crate::DEFAULT_MEM_SIZE;
use std::sync::{Arc, RwLock};
/// Configuration for `WriteBatch`
#[derive(Debug)]
pub struct Config {
/// Determines whether writes using this batch will be written to the write-ahead-log
/// immediately, or only all-at-once when the batch is being committed.
pub log_writes: bool,
/// Size cap for the write-batch. Inserts after it is full will return an `Err`;
pub max_size: usize,
}
#[derive(Debug)]
pub struct WriteBatch {
pub(crate) log: Arc<RwLock<WriteLog>>,
pub(crate) memtable: MemTable,
pub(crate) commit: i64,
pub(crate) config: Config,
}
impl WriteBatch {
pub fn put(&mut self, key: &Key, data: &[u8]) -> Result<()> {
self.check_capacity()?;
if self.config.log_writes {
let mut log = self.log.write().unwrap();
log.log_put(key, self.commit, data).unwrap();
}
self.memtable.put(key, self.commit, data);
Ok(())
}
pub fn put_many<Iter, Tup, K, V>(&mut self, rows: Iter) -> Result<()>
where
Iter: Iterator<Item = Tup>,
Tup: std::borrow::Borrow<(K, V)>,
K: std::borrow::Borrow<Key>,
V: std::borrow::Borrow<[u8]>,
{
self.check_capacity()?;
if self.config.log_writes {
let mut log = self.log.write().unwrap();
for pair in rows {
let (ref key, ref data) = pair.borrow();
let (key, data) = (key.borrow(), data.borrow());
log.log_put(key, self.commit, data).unwrap();
self.memtable.put(key, self.commit, data);
}
} else {
for pair in rows {
let (ref key, ref data) = pair.borrow();
self.memtable.put(key.borrow(), self.commit, data.borrow());
}
}
Ok(())
}
pub fn delete(&mut self, key: &Key) {
if self.config.log_writes {
let mut log = self.log.write().unwrap();
log.log_delete(key, self.commit).unwrap();
}
self.memtable.delete(key, self.commit);
}
pub fn delete_many<Iter, K>(&mut self, rows: Iter)
where
Iter: Iterator<Item = K>,
K: std::borrow::Borrow<Key>,
{
if self.config.log_writes {
let mut log = self.log.write().unwrap();
for key in rows {
let key = key.borrow();
log.log_delete(key, self.commit).unwrap();
self.memtable.delete(key, self.commit);
}
} else {
for key in rows {
self.memtable.delete(key.borrow(), self.commit);
}
}
}
#[inline]
fn check_capacity(&self) -> Result<()> {
if self.memtable.mem_size >= self.config.max_size {
return Err(Error::WriteBatchFull(self.config.max_size));
}
Ok(())
}
}
impl Default for Config {
fn default() -> Config {
Config {
log_writes: true,
max_size: DEFAULT_MEM_SIZE,
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::writelog::Config as WalConfig;
use rand::{self, thread_rng, Rng};
const CAPACITY: usize = 10 * 1024;
#[test]
fn test_put_associative() {
let mut writebatch = setup();
let input: Vec<_> = gen_pairs(32).take(100).collect();
writebatch.put_many(input.iter()).unwrap();
let mut writebatch2 = setup();
for (key, data) in &input {
writebatch2.put(key, data).unwrap();
}
let (materialized_1, materialized_2) = (
writebatch.log.write().unwrap().materialize().unwrap(),
writebatch2.log.write().unwrap().materialize().unwrap(),
);
assert_eq!(materialized_1, materialized_2);
}
#[test]
fn test_delete_associative() {
let (mut writebatch, mut writebatch2) = (setup(), setup());
let input: Vec<_> = gen_pairs(32).take(100).collect();
writebatch.put_many(input.iter()).unwrap();
writebatch2.put_many(input.iter()).unwrap();
writebatch.delete_many(input.iter().map(|(k, _)| k));
for (key, _) in &input {
writebatch2.delete(key);
}
let (materialized_1, materialized_2) = (
writebatch.log.write().unwrap().materialize().unwrap(),
writebatch2.log.write().unwrap().materialize().unwrap(),
);
assert_eq!(materialized_1, materialized_2);
}
#[test]
fn test_no_put_when_full() {
const AMT_RECORDS: usize = 64;
let mut writebatch = setup();
let space_per_record = CAPACITY / AMT_RECORDS - MemTable::OVERHEAD_PER_RECORD;
let input: Vec<_> = gen_pairs(space_per_record).take(AMT_RECORDS).collect();
writebatch.put_many(input.iter()).unwrap();
match writebatch.check_capacity() {
Err(Error::WriteBatchFull(CAPACITY)) => {}
_ => panic!("Writebatch should be exactly at capacity"),
}
let (key, data) = gen_pairs(space_per_record).next().unwrap();
let result = writebatch.put(&key, &data);
assert!(result.is_err());
// Free up space
writebatch.delete(&input[0].0);
let result = writebatch.put(&key, &data);
assert!(result.is_ok());
}
fn setup() -> WriteBatch {
let config = Config {
log_writes: true,
max_size: CAPACITY,
};
let log = WriteLog::memory(WalConfig::default());
WriteBatch {
config,
commit: -1,
memtable: MemTable::default(),
log: Arc::new(RwLock::new(log)),
}
}
fn gen_keys() -> impl Iterator<Item = Key> {
let mut rng = thread_rng();
std::iter::repeat_with(move || {
let buf = rng.gen();
Key(buf)
})
}
fn gen_data(size: usize) -> impl Iterator<Item = Vec<u8>> {
std::iter::repeat(vec![1u8; size])
}
fn gen_pairs(data_size: usize) -> impl Iterator<Item = (Key, Vec<u8>)> {
gen_keys().zip(gen_data(data_size))
}
}

View File

@ -1,17 +0,0 @@
use crate::error::Result;
use crate::sstable::Key;
#[derive(Debug)]
pub struct WriteTx<'a> {
_dummy: &'a mut (),
}
impl<'a> WriteTx<'a> {
pub fn put(&mut self, _key: &Key, _data: &[u8]) -> Result<()> {
unimplemented!()
}
pub fn delete(&mut self, _key: &Key) -> Result<()> {
unimplemented!()
}
}