From b3cdf58e4bd0a9bcac6618a2b1661441205cbd56 Mon Sep 17 00:00:00 2001 From: Mark Date: Wed, 20 Mar 2019 06:55:39 -0500 Subject: [PATCH] Add WriteBatch to KvStore (#3364) * implement write-batch in kvstore * Add tests to writebatch, and in-memory table --- kvstore/src/error.rs | 3 + kvstore/src/lib.rs | 49 +++++--- kvstore/src/storage.rs | 248 +++++++++++++++++++++++++++++--------- kvstore/src/writebatch.rs | 227 ++++++++++++++++++++++++++++++++++ kvstore/src/writetx.rs | 17 --- 5 files changed, 453 insertions(+), 91 deletions(-) create mode 100644 kvstore/src/writebatch.rs delete mode 100644 kvstore/src/writetx.rs diff --git a/kvstore/src/error.rs b/kvstore/src/error.rs index 5a29317330..702eec2afc 100644 --- a/kvstore/src/error.rs +++ b/kvstore/src/error.rs @@ -12,6 +12,7 @@ pub enum Error { Corrupted(bincode::Error), Channel(Box), Missing, + WriteBatchFull(usize), } impl fmt::Display for Error { @@ -21,6 +22,7 @@ impl fmt::Display for Error { Error::Channel(e) => write!(f, "Internal communication error: {}", e), Error::Io(e) => write!(f, "I/O error: {}", e), Error::Missing => write!(f, "Item not present in ledger"), + Error::WriteBatchFull(capacity) => write!(f, "WriteBatch capacity {} full", capacity), } } } @@ -32,6 +34,7 @@ impl StdErr for Error { Error::Corrupted(ref e) => Some(e), Error::Channel(e) => Some(e.as_ref()), Error::Missing => None, + Error::WriteBatchFull(_) => None, } } } diff --git a/kvstore/src/lib.rs b/kvstore/src/lib.rs index ebf057e71f..f3ee853f69 100644 --- a/kvstore/src/lib.rs +++ b/kvstore/src/lib.rs @@ -19,8 +19,8 @@ mod mapper; mod readtx; mod sstable; mod storage; +mod writebatch; mod writelog; -mod writetx; #[macro_use] extern crate serde_derive; @@ -28,8 +28,8 @@ extern crate serde_derive; pub use self::error::{Error, Result}; pub use self::readtx::ReadTx as Snapshot; pub use self::sstable::Key; +pub use self::writebatch::{Config as WriteBatchConfig, WriteBatch}; pub use self::writelog::Config as LogConfig; -pub use self::writetx::WriteTx; const TABLES_FILE: &str = "tables.meta"; const LOG_FILE: &str = "mem-log"; @@ -100,7 +100,8 @@ impl KvStore { let mut log = self.log.write().unwrap(); let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - storage::put(&mut *memtable, &mut *log, key, commit as i64, data)?; + log.log_put(key, commit, data).unwrap(); + memtable.put(key, commit, data); self.ensure_memtable(&mut *memtable, &mut *log)?; @@ -119,15 +120,11 @@ impl KvStore { let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; for pair in rows { - let (ref key, ref data) = pair.borrow(); + let (ref k, ref d) = pair.borrow(); + let (key, data) = (k.borrow(), d.borrow()); - storage::put( - &mut *memtable, - &mut *log, - key.borrow(), - commit, - data.borrow(), - )?; + log.log_put(key, commit, data).unwrap(); + memtable.put(key, commit, data); } self.ensure_memtable(&mut *memtable, &mut *log)?; @@ -148,7 +145,8 @@ impl KvStore { let mut log = self.log.write().unwrap(); let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - storage::delete(&mut *memtable, &mut *log, key, commit)?; + log.log_delete(key, commit).unwrap(); + memtable.delete(key, commit); self.ensure_memtable(&mut *memtable, &mut *log)?; @@ -164,8 +162,10 @@ impl KvStore { let mut log = self.log.write().unwrap(); let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - for key in rows { - storage::delete(&mut *memtable, &mut *log, key.borrow(), commit)?; + for k in rows { + let key = k.borrow(); + log.log_delete(key, commit).unwrap(); + memtable.delete(key, commit); } self.ensure_memtable(&mut *memtable, &mut *log)?; @@ -173,12 +173,25 @@ impl KvStore { Ok(()) } - pub fn transaction(&self) -> Result { - unimplemented!() + pub fn batch(&self, config: WriteBatchConfig) -> WriteBatch { + let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; + + WriteBatch { + config, + commit, + memtable: MemTable::new(BTreeMap::new()), + log: Arc::clone(&self.log), + } } - pub fn commit(&self, _txn: WriteTx) -> Result<()> { - unimplemented!() + pub fn commit(&self, mut batch: WriteBatch) -> Result<()> { + let mut memtable = self.mem.write().unwrap(); + let mut log = self.log.write().unwrap(); + + memtable.values.append(&mut batch.memtable.values); + self.ensure_memtable(&mut *memtable, &mut *log)?; + + Ok(()) } pub fn snapshot(&self) -> Snapshot { diff --git a/kvstore/src/storage.rs b/kvstore/src/storage.rs index f95a0e355a..186fa1d79d 100644 --- a/kvstore/src/storage.rs +++ b/kvstore/src/storage.rs @@ -1,14 +1,13 @@ use crate::error::Result; use crate::mapper::{Kind, Mapper}; use crate::sstable::{Key, Merged, SSTable, Value}; -use crate::writelog::WriteLog; use std::collections::btree_map::Entry; use std::collections::BTreeMap; +use std::mem; -// Size of timestamp + size of key -const OVERHEAD: usize = 8 + 3 * 8; -const LOG_ERR: &str = "Write to log failed! Halting."; - +/// Wrapper over a BTreeMap<`Key`, `Value`> that does basic accounting of memory usage +/// (Doesn't include BTreeMap internal stuff, can't reliably account for that without +/// using special data-structures or depending on unstable implementation details of `std`) #[derive(Debug)] pub struct MemTable { pub mem_size: usize, @@ -16,61 +15,52 @@ pub struct MemTable { } impl MemTable { + /// Memory over-head per record. Size of the key + size of commit ID. + pub const OVERHEAD_PER_RECORD: usize = mem::size_of::() + mem::size_of::(); + pub fn new(values: BTreeMap) -> MemTable { - let mem_size = values.values().fold(0, |acc, elem| acc + val_mem_use(elem)); + let mem_size = values.values().fold(0, |acc, elem| { + acc + Self::OVERHEAD_PER_RECORD + opt_bytes_memory(&elem.val) + }); MemTable { mem_size, values } } -} -pub fn put( - mem: &mut MemTable, - log: &mut WriteLog, - key: &Key, - commit: i64, - data: &[u8], -) -> Result<()> { - log.log_put(key, commit, data).expect(LOG_ERR); + pub fn put(&mut self, key: &Key, commit: i64, data: &[u8]) { + let value = Value { + ts: commit, + val: Some(data.to_vec()), + }; - let value = Value { - ts: commit, - val: Some(data.to_vec()), - }; - - mem.mem_size += val_mem_use(&value); - - match mem.values.entry(*key) { - Entry::Vacant(entry) => { - entry.insert(value); - } - Entry::Occupied(mut entry) => { - let old = entry.insert(value); - mem.mem_size -= val_mem_use(&old); + self.mem_size += data.len(); + match self.values.entry(*key) { + Entry::Vacant(entry) => { + entry.insert(value); + self.mem_size += Self::OVERHEAD_PER_RECORD; + } + Entry::Occupied(mut entry) => { + let old = entry.insert(value); + self.mem_size -= opt_bytes_memory(&old.val); + } } } - Ok(()) -} + pub fn delete(&mut self, key: &Key, commit: i64) { + let value = Value { + ts: commit, + val: None, + }; -pub fn delete(mem: &mut MemTable, log: &mut WriteLog, key: &Key, commit: i64) -> Result<()> { - log.log_delete(key, commit).expect(LOG_ERR); - let value = Value { - ts: commit, - val: None, - }; - - mem.mem_size += val_mem_use(&value); - - match mem.values.entry(*key) { - Entry::Vacant(entry) => { - entry.insert(value); - } - Entry::Occupied(mut entry) => { - let old = entry.insert(value); - mem.mem_size -= val_mem_use(&old); + match self.values.entry(*key) { + Entry::Vacant(entry) => { + entry.insert(value); + self.mem_size += Self::OVERHEAD_PER_RECORD; + } + Entry::Occupied(mut entry) => { + let old = entry.insert(value); + self.mem_size -= opt_bytes_memory(&old.val); + } } } - - Ok(()) } pub fn flush_table( @@ -151,12 +141,158 @@ pub fn range( Ok(rows) } -#[inline] -fn val_mem_use(val: &Value) -> usize { - OVERHEAD + val.val.as_ref().map(Vec::len).unwrap_or(0) +impl Default for MemTable { + fn default() -> MemTable { + MemTable { + values: BTreeMap::new(), + mem_size: 0, + } + } } -// TODO: Write basic tests using mem-table -// 1. test put + delete works right -// 2. test delete of unknown key recorded -// 3. check memory usage calcs +#[inline] +fn opt_bytes_memory(bytes: &Option>) -> usize { + bytes.as_ref().map(Vec::len).unwrap_or(0) +} + +#[cfg(test)] +mod test { + use super::*; + use rand::{self, thread_rng, Rng}; + + const COMMIT: i64 = -1; + + #[test] + fn test_put_calc() { + const DATA_SIZE: usize = 16; + + let mut table = MemTable::default(); + + for (key, data) in gen_pairs(DATA_SIZE).take(1024) { + table.put(&key, COMMIT, &data); + } + + let expected_size = 1024 * (DATA_SIZE + MemTable::OVERHEAD_PER_RECORD); + assert_eq!(table.mem_size, expected_size); + } + + #[test] + fn test_delete_calc() { + const DATA_SIZE: usize = 32; + + let mut table = MemTable::default(); + let input = gen_pairs(DATA_SIZE).take(1024).collect::>(); + + for (key, data) in &input { + table.put(key, COMMIT, data); + } + + for (key, _) in input.iter().rev().take(512) { + table.delete(key, COMMIT); + } + + let expected_size = + 512 * (DATA_SIZE + MemTable::OVERHEAD_PER_RECORD) + 512 * MemTable::OVERHEAD_PER_RECORD; + assert_eq!(table.mem_size, expected_size); + + // Deletes of things not in the memory table must be recorded + for key in gen_keys().take(512) { + table.delete(&key, COMMIT); + } + + let expected_size = expected_size + 512 * MemTable::OVERHEAD_PER_RECORD; + assert_eq!(table.mem_size, expected_size); + } + + #[test] + fn test_put_order_irrelevant() { + let (mut table_1, mut table_2) = (MemTable::default(), MemTable::default()); + let big_input: Vec<_> = gen_pairs(1024).take(128).collect(); + let small_input: Vec<_> = gen_pairs(16).take(128).collect(); + + for (key, data) in big_input.iter().chain(small_input.iter()) { + table_1.put(key, COMMIT, data); + } + + let iter = big_input + .iter() + .rev() + .zip(small_input.iter().rev()) + .enumerate(); + + for (i, ((big_key, big_data), (small_key, small_data))) in iter { + if i % 2 == 0 { + table_2.put(big_key, COMMIT, big_data); + table_2.put(small_key, COMMIT, small_data); + } else { + table_2.put(small_key, COMMIT, small_data); + table_2.put(big_key, COMMIT, big_data); + } + } + + assert_eq!(table_1.mem_size, table_2.mem_size); + assert_eq!(table_1.values, table_2.values); + } + + #[test] + fn test_delete_order_irrelevant() { + let (mut table_1, mut table_2) = (MemTable::default(), MemTable::default()); + let big_input: Vec<_> = gen_pairs(1024).take(128).collect(); + let small_input: Vec<_> = gen_pairs(16).take(128).collect(); + + for (key, data) in big_input.iter().chain(small_input.iter()) { + table_1.put(key, COMMIT, data); + table_2.put(key, COMMIT, data); + } + + let iter = big_input + .iter() + .rev() + .take(64) + .chain(small_input.iter().rev().take(64)) + .map(|(key, _)| key); + + for key in iter { + table_1.delete(key, COMMIT); + } + + let iter = big_input + .iter() + .rev() + .take(64) + .zip(small_input.iter().rev().take(64)) + .map(|((key, _), (key2, _))| (key, key2)) + .enumerate(); + + for (i, (big_key, small_key)) in iter { + if i % 2 == 0 { + table_2.delete(big_key, COMMIT); + table_2.delete(small_key, COMMIT); + } else { + table_2.delete(small_key, COMMIT); + table_2.delete(big_key, COMMIT); + } + } + + assert_eq!(table_1.mem_size, table_2.mem_size); + assert_eq!(table_1.values, table_2.values); + } + + fn gen_keys() -> impl Iterator { + let mut rng = thread_rng(); + + std::iter::repeat_with(move || { + let buf = rng.gen(); + + Key(buf) + }) + } + + fn gen_data(size: usize) -> impl Iterator> { + std::iter::repeat(vec![1u8; size]) + } + + fn gen_pairs(data_size: usize) -> impl Iterator)> { + gen_keys().zip(gen_data(data_size)) + } +} diff --git a/kvstore/src/writebatch.rs b/kvstore/src/writebatch.rs new file mode 100644 index 0000000000..7e710f09ca --- /dev/null +++ b/kvstore/src/writebatch.rs @@ -0,0 +1,227 @@ +use crate::error::{Error, Result}; +use crate::sstable::Key; +use crate::storage::MemTable; +use crate::writelog::WriteLog; +use crate::DEFAULT_MEM_SIZE; +use std::sync::{Arc, RwLock}; + +/// Configuration for `WriteBatch` +#[derive(Debug)] +pub struct Config { + /// Determines whether writes using this batch will be written to the write-ahead-log + /// immediately, or only all-at-once when the batch is being committed. + pub log_writes: bool, + /// Size cap for the write-batch. Inserts after it is full will return an `Err`; + pub max_size: usize, +} + +#[derive(Debug)] +pub struct WriteBatch { + pub(crate) log: Arc>, + pub(crate) memtable: MemTable, + pub(crate) commit: i64, + pub(crate) config: Config, +} + +impl WriteBatch { + pub fn put(&mut self, key: &Key, data: &[u8]) -> Result<()> { + self.check_capacity()?; + + if self.config.log_writes { + let mut log = self.log.write().unwrap(); + log.log_put(key, self.commit, data).unwrap(); + } + + self.memtable.put(key, self.commit, data); + + Ok(()) + } + + pub fn put_many(&mut self, rows: Iter) -> Result<()> + where + Iter: Iterator, + Tup: std::borrow::Borrow<(K, V)>, + K: std::borrow::Borrow, + V: std::borrow::Borrow<[u8]>, + { + self.check_capacity()?; + + if self.config.log_writes { + let mut log = self.log.write().unwrap(); + + for pair in rows { + let (ref key, ref data) = pair.borrow(); + let (key, data) = (key.borrow(), data.borrow()); + log.log_put(key, self.commit, data).unwrap(); + + self.memtable.put(key, self.commit, data); + } + } else { + for pair in rows { + let (ref key, ref data) = pair.borrow(); + self.memtable.put(key.borrow(), self.commit, data.borrow()); + } + } + + Ok(()) + } + + pub fn delete(&mut self, key: &Key) { + if self.config.log_writes { + let mut log = self.log.write().unwrap(); + log.log_delete(key, self.commit).unwrap(); + } + + self.memtable.delete(key, self.commit); + } + + pub fn delete_many(&mut self, rows: Iter) + where + Iter: Iterator, + K: std::borrow::Borrow, + { + if self.config.log_writes { + let mut log = self.log.write().unwrap(); + + for key in rows { + let key = key.borrow(); + log.log_delete(key, self.commit).unwrap(); + + self.memtable.delete(key, self.commit); + } + } else { + for key in rows { + self.memtable.delete(key.borrow(), self.commit); + } + } + } + + #[inline] + fn check_capacity(&self) -> Result<()> { + if self.memtable.mem_size >= self.config.max_size { + return Err(Error::WriteBatchFull(self.config.max_size)); + } + + Ok(()) + } +} + +impl Default for Config { + fn default() -> Config { + Config { + log_writes: true, + max_size: DEFAULT_MEM_SIZE, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::writelog::Config as WalConfig; + use rand::{self, thread_rng, Rng}; + + const CAPACITY: usize = 10 * 1024; + + #[test] + fn test_put_associative() { + let mut writebatch = setup(); + let input: Vec<_> = gen_pairs(32).take(100).collect(); + + writebatch.put_many(input.iter()).unwrap(); + + let mut writebatch2 = setup(); + for (key, data) in &input { + writebatch2.put(key, data).unwrap(); + } + + let (materialized_1, materialized_2) = ( + writebatch.log.write().unwrap().materialize().unwrap(), + writebatch2.log.write().unwrap().materialize().unwrap(), + ); + + assert_eq!(materialized_1, materialized_2); + } + + #[test] + fn test_delete_associative() { + let (mut writebatch, mut writebatch2) = (setup(), setup()); + let input: Vec<_> = gen_pairs(32).take(100).collect(); + + writebatch.put_many(input.iter()).unwrap(); + writebatch2.put_many(input.iter()).unwrap(); + + writebatch.delete_many(input.iter().map(|(k, _)| k)); + + for (key, _) in &input { + writebatch2.delete(key); + } + + let (materialized_1, materialized_2) = ( + writebatch.log.write().unwrap().materialize().unwrap(), + writebatch2.log.write().unwrap().materialize().unwrap(), + ); + + assert_eq!(materialized_1, materialized_2); + } + + #[test] + fn test_no_put_when_full() { + const AMT_RECORDS: usize = 64; + + let mut writebatch = setup(); + + let space_per_record = CAPACITY / AMT_RECORDS - MemTable::OVERHEAD_PER_RECORD; + let input: Vec<_> = gen_pairs(space_per_record).take(AMT_RECORDS).collect(); + + writebatch.put_many(input.iter()).unwrap(); + + match writebatch.check_capacity() { + Err(Error::WriteBatchFull(CAPACITY)) => {} + _ => panic!("Writebatch should be exactly at capacity"), + } + + let (key, data) = gen_pairs(space_per_record).next().unwrap(); + let result = writebatch.put(&key, &data); + assert!(result.is_err()); + + // Free up space + writebatch.delete(&input[0].0); + let result = writebatch.put(&key, &data); + assert!(result.is_ok()); + } + + fn setup() -> WriteBatch { + let config = Config { + log_writes: true, + max_size: CAPACITY, + }; + + let log = WriteLog::memory(WalConfig::default()); + + WriteBatch { + config, + commit: -1, + memtable: MemTable::default(), + log: Arc::new(RwLock::new(log)), + } + } + + fn gen_keys() -> impl Iterator { + let mut rng = thread_rng(); + + std::iter::repeat_with(move || { + let buf = rng.gen(); + + Key(buf) + }) + } + + fn gen_data(size: usize) -> impl Iterator> { + std::iter::repeat(vec![1u8; size]) + } + + fn gen_pairs(data_size: usize) -> impl Iterator)> { + gen_keys().zip(gen_data(data_size)) + } +} diff --git a/kvstore/src/writetx.rs b/kvstore/src/writetx.rs deleted file mode 100644 index 44863e23c7..0000000000 --- a/kvstore/src/writetx.rs +++ /dev/null @@ -1,17 +0,0 @@ -use crate::error::Result; -use crate::sstable::Key; - -#[derive(Debug)] -pub struct WriteTx<'a> { - _dummy: &'a mut (), -} - -impl<'a> WriteTx<'a> { - pub fn put(&mut self, _key: &Key, _data: &[u8]) -> Result<()> { - unimplemented!() - } - - pub fn delete(&mut self, _key: &Key) -> Result<()> { - unimplemented!() - } -}