diff --git a/kvstore/src/lib.rs b/kvstore/src/lib.rs index 3dd3ce86ab..ebf057e71f 100644 --- a/kvstore/src/lib.rs +++ b/kvstore/src/lib.rs @@ -1,12 +1,13 @@ use crate::mapper::{Disk, Mapper, Memory}; use crate::sstable::SSTable; -use crate::storage::WriteState; +use crate::storage::MemTable; use crate::writelog::WriteLog; use std::collections::BTreeMap; use std::fs; use std::io; use std::ops::RangeInclusive; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{Receiver, Sender}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::JoinHandle; @@ -35,6 +36,7 @@ const LOG_FILE: &str = "mem-log"; const DEFAULT_TABLE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_MEM_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_MAX_PAGES: usize = 10; +const COMMIT_ORDERING: Ordering = Ordering::Relaxed; #[derive(Debug, PartialEq, Copy, Clone)] pub struct Config { @@ -47,10 +49,12 @@ pub struct Config { #[derive(Debug)] pub struct KvStore { - write: RwLock, - tables: RwLock>>, config: Config, root: PathBuf, + commit: AtomicUsize, + mem: RwLock, + log: Arc>, + tables: RwLock>>, mapper: Arc, sender: Mutex>, receiver: Mutex>, @@ -92,12 +96,13 @@ impl KvStore { } pub fn put(&self, key: &Key, data: &[u8]) -> Result<()> { - self.ensure_mem()?; + let mut memtable = self.mem.write().unwrap(); + let mut log = self.log.write().unwrap(); + let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - let mut write = self.write.write().unwrap(); + storage::put(&mut *memtable, &mut *log, key, commit as i64, data)?; - write.put(key, data)?; - write.commit += 1; + self.ensure_memtable(&mut *memtable, &mut *log)?; Ok(()) } @@ -109,18 +114,23 @@ impl KvStore { K: std::borrow::Borrow, V: std::borrow::Borrow<[u8]>, { - { - let mut write = self.write.write().unwrap(); + let mut memtable = self.mem.write().unwrap(); + let mut log = self.log.write().unwrap(); + let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - for pair in rows { - let tup = pair.borrow(); - let (key, data) = (tup.0.borrow(), tup.1.borrow()); - write.put(key, data)?; - } - write.commit += 1; + for pair in rows { + let (ref key, ref data) = pair.borrow(); + + storage::put( + &mut *memtable, + &mut *log, + key.borrow(), + commit, + data.borrow(), + )?; } - self.ensure_mem()?; + self.ensure_memtable(&mut *memtable, &mut *log)?; Ok(()) } @@ -128,22 +138,20 @@ impl KvStore { pub fn get(&self, key: &Key) -> Result>> { self.query_compactor()?; - let (write_state, tables) = (self.write.read().unwrap(), self.tables.read().unwrap()); + let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap()); - storage::get(&write_state.values, &*tables, key) + storage::get(&memtable.values, &*tables, key) } pub fn delete(&self, key: &Key) -> Result<()> { - self.query_compactor()?; + let mut memtable = self.mem.write().unwrap(); + let mut log = self.log.write().unwrap(); + let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - { - let mut write = self.write.write().unwrap(); + storage::delete(&mut *memtable, &mut *log, key, commit)?; - write.delete(key)?; - write.commit += 1; - } + self.ensure_memtable(&mut *memtable, &mut *log)?; - self.ensure_mem()?; Ok(()) } @@ -152,18 +160,16 @@ impl KvStore { Iter: Iterator, K: std::borrow::Borrow, { - self.query_compactor()?; + let mut memtable = self.mem.write().unwrap(); + let mut log = self.log.write().unwrap(); + let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64; - { - let mut write = self.write.write().unwrap(); - for k in rows { - let key = k.borrow(); - write.delete(key)?; - } - write.commit += 1; + for key in rows { + storage::delete(&mut *memtable, &mut *log, key.borrow(), commit)?; } - self.ensure_mem()?; + self.ensure_memtable(&mut *memtable, &mut *log)?; + Ok(()) } @@ -176,9 +182,12 @@ impl KvStore { } pub fn snapshot(&self) -> Snapshot { - let (state, tables) = (self.write.read().unwrap(), self.tables.read().unwrap()); + let (memtable, tables) = ( + self.mem.read().unwrap().values.clone(), + self.tables.read().unwrap().clone(), + ); - Snapshot::new(state.values.clone(), tables.clone()) + Snapshot::new(memtable, tables) } pub fn range( @@ -187,8 +196,9 @@ impl KvStore { ) -> Result)>> { self.query_compactor()?; - let (write_state, tables) = (self.write.read().unwrap(), self.tables.read().unwrap()); - storage::range(&write_state.values, &*tables, range) + let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap()); + + storage::range(&memtable.values, &*tables, range) } pub fn destroy

(path: P) -> Result<()> @@ -222,31 +232,22 @@ impl KvStore { Ok(()) } - fn ensure_mem(&self) -> Result<()> { - let trigger_compact = { - let mut write_rw = self.write.write().unwrap(); + fn ensure_memtable(&self, mem: &mut MemTable, log: &mut WriteLog) -> Result<()> { + if mem.mem_size < self.config.max_mem { + return Ok(()); + } - if write_rw.mem_size < self.config.max_mem { - return Ok(()); - } + let mut tables = self.tables.write().unwrap(); - let mut tables = self.tables.write().unwrap(); - storage::flush_table(&write_rw.values, &*self.mapper, &mut *tables)?; + storage::flush_table(&mem.values, &*self.mapper, &mut *tables)?; + mem.values.clear(); + mem.mem_size = 0; + log.reset().expect("Write-log rotation failed"); - write_rw.reset()?; - write_rw.commit += 1; + if is_lvl0_full(&tables, &self.config) { + let sender = self.sender.lock().unwrap(); - is_lvl0_full(&tables, &self.config) - }; - - dump_tables(&self.root, &*self.mapper).unwrap(); - if trigger_compact { - let tables_path = self.root.join(TABLES_FILE); - self.sender - .lock() - .unwrap() - .send(compactor::Req::Start(tables_path)) - .expect("compactor thread dead"); + sender.send(compactor::Req::Start(PathBuf::new()))?; } Ok(()) @@ -274,17 +275,16 @@ fn open(root: &Path, mapper: Arc, config: Config) -> Result fs::create_dir(&root)?; } - let write_log = WriteLog::open(&log_path, config.log_config)?; - let mem = if restore_log && !config.in_memory { - write_log.materialize()? + let commit = chrono::Utc::now().timestamp(); + let mut log = WriteLog::open(&log_path, config.log_config)?; + let values = if restore_log && !config.in_memory { + log.materialize()? } else { BTreeMap::new() }; - - let write = RwLock::new(WriteState::new(write_log, mem)); + let mem = MemTable::new(values); let tables = load_tables(&root, &*mapper)?; - let tables = RwLock::new(tables); let cfg = compactor::Config { max_pages: config.max_tables, @@ -292,16 +292,17 @@ fn open(root: &Path, mapper: Arc, config: Config) -> Result }; let (sender, receiver, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let (sender, receiver) = (Mutex::new(sender), Mutex::new(receiver)); Ok(KvStore { - write, - tables, config, - mapper, root, - sender, - receiver, + commit: AtomicUsize::new(commit as usize), + mem: RwLock::new(mem), + log: Arc::new(RwLock::new(log)), + tables: RwLock::new(tables), + mapper, + sender: Mutex::new(sender), + receiver: Mutex::new(receiver), compactor_handle, }) } diff --git a/kvstore/src/sstable.rs b/kvstore/src/sstable.rs index 55d8e50af3..3d9e4c700a 100644 --- a/kvstore/src/sstable.rs +++ b/kvstore/src/sstable.rs @@ -154,7 +154,7 @@ impl SSTable { (meta.start, meta.level) }; - while level as usize >= tables.len() { + while level as usize >= sorted.len() { sorted.push(BTreeMap::new()); } sorted[level as usize].insert(key, sst.clone()); diff --git a/kvstore/src/storage.rs b/kvstore/src/storage.rs index 43f0d9504b..f95a0e355a 100644 --- a/kvstore/src/storage.rs +++ b/kvstore/src/storage.rs @@ -2,92 +2,79 @@ use crate::error::Result; use crate::mapper::{Kind, Mapper}; use crate::sstable::{Key, Merged, SSTable, Value}; use crate::writelog::WriteLog; - -use chrono::Utc; - +use std::collections::btree_map::Entry; use std::collections::BTreeMap; -type MemTable = BTreeMap; - // Size of timestamp + size of key const OVERHEAD: usize = 8 + 3 * 8; const LOG_ERR: &str = "Write to log failed! Halting."; #[derive(Debug)] -pub struct WriteState { - pub commit: i64, - pub log: WriteLog, - pub values: MemTable, +pub struct MemTable { pub mem_size: usize, + pub values: BTreeMap, } -impl WriteState { - pub fn new(log: WriteLog, values: BTreeMap) -> WriteState { +impl MemTable { + pub fn new(values: BTreeMap) -> MemTable { let mem_size = values.values().fold(0, |acc, elem| acc + val_mem_use(elem)); - WriteState { - commit: Utc::now().timestamp(), - log, - mem_size, - values, + MemTable { mem_size, values } + } +} + +pub fn put( + mem: &mut MemTable, + log: &mut WriteLog, + key: &Key, + commit: i64, + data: &[u8], +) -> Result<()> { + log.log_put(key, commit, data).expect(LOG_ERR); + + let value = Value { + ts: commit, + val: Some(data.to_vec()), + }; + + mem.mem_size += val_mem_use(&value); + + match mem.values.entry(*key) { + Entry::Vacant(entry) => { + entry.insert(value); + } + Entry::Occupied(mut entry) => { + let old = entry.insert(value); + mem.mem_size -= val_mem_use(&old); } } - pub fn put(&mut self, key: &Key, data: &[u8]) -> Result<()> { - use std::collections::btree_map::Entry; - let ts = self.commit; - let value = Value { - ts, - val: Some(data.to_vec()), - }; - self.log.log_put(key, ts, data).expect(LOG_ERR); + Ok(()) +} - self.mem_size += val_mem_use(&value); +pub fn delete(mem: &mut MemTable, log: &mut WriteLog, key: &Key, commit: i64) -> Result<()> { + log.log_delete(key, commit).expect(LOG_ERR); + let value = Value { + ts: commit, + val: None, + }; - match self.values.entry(*key) { - Entry::Vacant(entry) => { - entry.insert(value); - } - Entry::Occupied(mut entry) => { - let old = entry.insert(value); - self.mem_size -= val_mem_use(&old); - } + mem.mem_size += val_mem_use(&value); + + match mem.values.entry(*key) { + Entry::Vacant(entry) => { + entry.insert(value); } - - Ok(()) - } - - pub fn delete(&mut self, key: &Key) -> Result<()> { - use std::collections::btree_map::Entry; - let ts = self.commit; - let value = Value { ts, val: None }; - - self.log.log_delete(key, ts).expect(LOG_ERR); - - self.mem_size += val_mem_use(&value); - - match self.values.entry(*key) { - Entry::Vacant(entry) => { - entry.insert(value); - } - Entry::Occupied(mut entry) => { - let old = entry.insert(value); - self.mem_size -= val_mem_use(&old); - } + Entry::Occupied(mut entry) => { + let old = entry.insert(value); + mem.mem_size -= val_mem_use(&old); } - - Ok(()) } - pub fn reset(&mut self) -> Result<()> { - self.values.clear(); - self.log.reset()?; - self.mem_size = 0; - Ok(()) - } + Ok(()) } pub fn flush_table( - mem: &MemTable, + mem: &BTreeMap, mapper: &dyn Mapper, pages: &mut Vec>, ) -> Result<()> { @@ -110,7 +97,11 @@ pub fn flush_table( Ok(()) } -pub fn get(mem: &MemTable, pages: &[BTreeMap], key: &Key) -> Result>> { +pub fn get( + mem: &BTreeMap, + pages: &[BTreeMap], + key: &Key, +) -> Result>> { if let Some(idx) = mem.get(key) { return Ok(idx.val.clone()); } @@ -134,7 +125,7 @@ pub fn get(mem: &MemTable, pages: &[BTreeMap], key: &Key) -> Resul } pub fn range( - mem: &MemTable, + mem: &BTreeMap, tables: &[BTreeMap], range: std::ops::RangeInclusive, ) -> Result)>> { @@ -144,21 +135,17 @@ pub fn range( .range(range.clone()) .map(|(k, v)| (*k, v.clone())) .collect::>(); - - let mut disk = Vec::new(); + sources.push(Box::new(mem.into_iter())); for level in tables.iter() { for sst in level.values() { let iter = sst.range(&range)?; let iter = Box::new(iter) as Box>; - disk.push(iter); + sources.push(iter); } } - sources.push(Box::new(mem.into_iter())); - sources.extend(disk); - let rows = Merged::new(sources).map(|(k, v)| (k, v.val.unwrap())); Ok(rows) diff --git a/kvstore/src/writelog.rs b/kvstore/src/writelog.rs index 0310312626..1c5987408c 100644 --- a/kvstore/src/writelog.rs +++ b/kvstore/src/writelog.rs @@ -8,7 +8,6 @@ use std::collections::BTreeMap; use std::fs::{self, File}; use std::io::{Read, Write}; use std::path::{Path, PathBuf}; -use std::sync::RwLock; // RocksDb's log uses this size. // May be worth making configurable and experimenting @@ -17,7 +16,7 @@ const BLOCK_SIZE: usize = 32 * 1024; #[derive(Debug)] pub struct WriteLog { log_path: PathBuf, - logger: RwLock, + logger: Logger, config: Config, in_memory: bool, } @@ -35,7 +34,7 @@ impl WriteLog { Ok(WriteLog { config, log_path: path.to_path_buf(), - logger: RwLock::new(Logger::disk(file)), + logger: Logger::disk(file), in_memory: false, }) } @@ -44,15 +43,13 @@ impl WriteLog { pub fn memory(config: Config) -> WriteLog { WriteLog { config, - logger: RwLock::new(Logger::memory()), + logger: Logger::memory(), log_path: Path::new("").to_path_buf(), in_memory: true, } } - pub fn reset(&self) -> Result<()> { - let mut logger = self.logger.write().unwrap(); - + pub fn reset(&mut self) -> Result<()> { let new_logger = if self.in_memory { Logger::memory() } else { @@ -60,44 +57,38 @@ impl WriteLog { Logger::disk(file) }; - *logger = new_logger; + self.logger = new_logger; Ok(()) } - pub fn log_put(&self, key: &Key, ts: i64, val: &[u8]) -> Result<()> { - let mut logger = self.logger.write().unwrap(); - - log(&mut logger, key, ts, Some(val))?; + pub fn log_put(&mut self, key: &Key, ts: i64, val: &[u8]) -> Result<()> { + log(&mut self.logger, key, ts, Some(val))?; if self.config.sync_every_write { - sync(&mut logger, self.config.use_fsync)?; + sync(&mut self.logger, self.config.use_fsync)?; } Ok(()) } - pub fn log_delete(&self, key: &Key, ts: i64) -> Result<()> { - let mut logger = self.logger.write().unwrap(); - - log(&mut logger, key, ts, None)?; + pub fn log_delete(&mut self, key: &Key, ts: i64) -> Result<()> { + log(&mut self.logger, key, ts, None)?; if self.config.sync_every_write { - sync(&mut logger, self.config.use_fsync)?; + sync(&mut self.logger, self.config.use_fsync)?; } Ok(()) } #[allow(dead_code)] - pub fn sync(&self) -> Result<()> { - let mut logger = self.logger.write().unwrap(); - - sync(&mut logger, self.config.use_fsync) + pub fn sync(&mut self) -> Result<()> { + sync(&mut self.logger, self.config.use_fsync) } - pub fn materialize(&self) -> Result> { - let mmap = self.logger.write().unwrap().writer.mmap()?; + pub fn materialize(&mut self) -> Result> { + let mmap = self.logger.writer.mmap()?; read_log(&mmap) } } @@ -281,7 +272,7 @@ mod test { #[test] fn test_log_round_trip() { - let wal = WriteLog::memory(Config::default()); + let mut wal = WriteLog::memory(Config::default()); let values: BTreeMap = (0u64..100) .map(|n| { @@ -313,7 +304,7 @@ mod test { fn test_reset() { use crate::error::Error; - let wal = WriteLog::memory(Config::default()); + let mut wal = WriteLog::memory(Config::default()); let values: BTreeMap = (0u64..100) .map(|n| {