Clean up locks in KvStore (#3358)

* Lift all shared mutable state into Kvstore

commit is now an AtomicUsize

In-memory table and write-log are now struct members behind individual RwLocks
This commit is contained in:
Mark E. Sinclair 2019-03-18 19:04:31 -05:00 committed by GitHub
parent ef111dcbe1
commit 5d73ab299b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 148 additions and 169 deletions

View File

@ -1,12 +1,13 @@
use crate::mapper::{Disk, Mapper, Memory}; use crate::mapper::{Disk, Mapper, Memory};
use crate::sstable::SSTable; use crate::sstable::SSTable;
use crate::storage::WriteState; use crate::storage::MemTable;
use crate::writelog::WriteLog; use crate::writelog::WriteLog;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fs; use std::fs;
use std::io; use std::io;
use std::ops::RangeInclusive; use std::ops::RangeInclusive;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender}; use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use std::thread::JoinHandle; use std::thread::JoinHandle;
@ -35,6 +36,7 @@ const LOG_FILE: &str = "mem-log";
const DEFAULT_TABLE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_TABLE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_MEM_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_MEM_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_MAX_PAGES: usize = 10; const DEFAULT_MAX_PAGES: usize = 10;
const COMMIT_ORDERING: Ordering = Ordering::Relaxed;
#[derive(Debug, PartialEq, Copy, Clone)] #[derive(Debug, PartialEq, Copy, Clone)]
pub struct Config { pub struct Config {
@ -47,10 +49,12 @@ pub struct Config {
#[derive(Debug)] #[derive(Debug)]
pub struct KvStore { pub struct KvStore {
write: RwLock<WriteState>,
tables: RwLock<Vec<BTreeMap<Key, SSTable>>>,
config: Config, config: Config,
root: PathBuf, root: PathBuf,
commit: AtomicUsize,
mem: RwLock<MemTable>,
log: Arc<RwLock<WriteLog>>,
tables: RwLock<Vec<BTreeMap<Key, SSTable>>>,
mapper: Arc<dyn Mapper>, mapper: Arc<dyn Mapper>,
sender: Mutex<Sender<compactor::Req>>, sender: Mutex<Sender<compactor::Req>>,
receiver: Mutex<Receiver<compactor::Resp>>, receiver: Mutex<Receiver<compactor::Resp>>,
@ -92,12 +96,13 @@ impl KvStore {
} }
pub fn put(&self, key: &Key, data: &[u8]) -> Result<()> { pub fn put(&self, key: &Key, data: &[u8]) -> Result<()> {
self.ensure_mem()?; let mut memtable = self.mem.write().unwrap();
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
let mut write = self.write.write().unwrap(); storage::put(&mut *memtable, &mut *log, key, commit as i64, data)?;
write.put(key, data)?; self.ensure_memtable(&mut *memtable, &mut *log)?;
write.commit += 1;
Ok(()) Ok(())
} }
@ -109,18 +114,23 @@ impl KvStore {
K: std::borrow::Borrow<Key>, K: std::borrow::Borrow<Key>,
V: std::borrow::Borrow<[u8]>, V: std::borrow::Borrow<[u8]>,
{ {
{ let mut memtable = self.mem.write().unwrap();
let mut write = self.write.write().unwrap(); let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
for pair in rows { for pair in rows {
let tup = pair.borrow(); let (ref key, ref data) = pair.borrow();
let (key, data) = (tup.0.borrow(), tup.1.borrow());
write.put(key, data)?; storage::put(
} &mut *memtable,
write.commit += 1; &mut *log,
key.borrow(),
commit,
data.borrow(),
)?;
} }
self.ensure_mem()?; self.ensure_memtable(&mut *memtable, &mut *log)?;
Ok(()) Ok(())
} }
@ -128,22 +138,20 @@ impl KvStore {
pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> { pub fn get(&self, key: &Key) -> Result<Option<Vec<u8>>> {
self.query_compactor()?; self.query_compactor()?;
let (write_state, tables) = (self.write.read().unwrap(), self.tables.read().unwrap()); let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap());
storage::get(&write_state.values, &*tables, key) storage::get(&memtable.values, &*tables, key)
} }
pub fn delete(&self, key: &Key) -> Result<()> { pub fn delete(&self, key: &Key) -> Result<()> {
self.query_compactor()?; let mut memtable = self.mem.write().unwrap();
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
{ storage::delete(&mut *memtable, &mut *log, key, commit)?;
let mut write = self.write.write().unwrap();
write.delete(key)?; self.ensure_memtable(&mut *memtable, &mut *log)?;
write.commit += 1;
}
self.ensure_mem()?;
Ok(()) Ok(())
} }
@ -152,18 +160,16 @@ impl KvStore {
Iter: Iterator<Item = K>, Iter: Iterator<Item = K>,
K: std::borrow::Borrow<Key>, K: std::borrow::Borrow<Key>,
{ {
self.query_compactor()?; let mut memtable = self.mem.write().unwrap();
let mut log = self.log.write().unwrap();
let commit = self.commit.fetch_add(1, COMMIT_ORDERING) as i64;
{ for key in rows {
let mut write = self.write.write().unwrap(); storage::delete(&mut *memtable, &mut *log, key.borrow(), commit)?;
for k in rows {
let key = k.borrow();
write.delete(key)?;
}
write.commit += 1;
} }
self.ensure_mem()?; self.ensure_memtable(&mut *memtable, &mut *log)?;
Ok(()) Ok(())
} }
@ -176,9 +182,12 @@ impl KvStore {
} }
pub fn snapshot(&self) -> Snapshot { pub fn snapshot(&self) -> Snapshot {
let (state, tables) = (self.write.read().unwrap(), self.tables.read().unwrap()); let (memtable, tables) = (
self.mem.read().unwrap().values.clone(),
self.tables.read().unwrap().clone(),
);
Snapshot::new(state.values.clone(), tables.clone()) Snapshot::new(memtable, tables)
} }
pub fn range( pub fn range(
@ -187,8 +196,9 @@ impl KvStore {
) -> Result<impl Iterator<Item = (Key, Vec<u8>)>> { ) -> Result<impl Iterator<Item = (Key, Vec<u8>)>> {
self.query_compactor()?; self.query_compactor()?;
let (write_state, tables) = (self.write.read().unwrap(), self.tables.read().unwrap()); let (memtable, tables) = (self.mem.read().unwrap(), self.tables.read().unwrap());
storage::range(&write_state.values, &*tables, range)
storage::range(&memtable.values, &*tables, range)
} }
pub fn destroy<P>(path: P) -> Result<()> pub fn destroy<P>(path: P) -> Result<()>
@ -222,31 +232,22 @@ impl KvStore {
Ok(()) Ok(())
} }
fn ensure_mem(&self) -> Result<()> { fn ensure_memtable(&self, mem: &mut MemTable, log: &mut WriteLog) -> Result<()> {
let trigger_compact = { if mem.mem_size < self.config.max_mem {
let mut write_rw = self.write.write().unwrap(); return Ok(());
}
if write_rw.mem_size < self.config.max_mem { let mut tables = self.tables.write().unwrap();
return Ok(());
}
let mut tables = self.tables.write().unwrap(); storage::flush_table(&mem.values, &*self.mapper, &mut *tables)?;
storage::flush_table(&write_rw.values, &*self.mapper, &mut *tables)?; mem.values.clear();
mem.mem_size = 0;
log.reset().expect("Write-log rotation failed");
write_rw.reset()?; if is_lvl0_full(&tables, &self.config) {
write_rw.commit += 1; let sender = self.sender.lock().unwrap();
is_lvl0_full(&tables, &self.config) sender.send(compactor::Req::Start(PathBuf::new()))?;
};
dump_tables(&self.root, &*self.mapper).unwrap();
if trigger_compact {
let tables_path = self.root.join(TABLES_FILE);
self.sender
.lock()
.unwrap()
.send(compactor::Req::Start(tables_path))
.expect("compactor thread dead");
} }
Ok(()) Ok(())
@ -274,17 +275,16 @@ fn open(root: &Path, mapper: Arc<dyn Mapper>, config: Config) -> Result<KvStore>
fs::create_dir(&root)?; fs::create_dir(&root)?;
} }
let write_log = WriteLog::open(&log_path, config.log_config)?; let commit = chrono::Utc::now().timestamp();
let mem = if restore_log && !config.in_memory { let mut log = WriteLog::open(&log_path, config.log_config)?;
write_log.materialize()? let values = if restore_log && !config.in_memory {
log.materialize()?
} else { } else {
BTreeMap::new() BTreeMap::new()
}; };
let mem = MemTable::new(values);
let write = RwLock::new(WriteState::new(write_log, mem));
let tables = load_tables(&root, &*mapper)?; let tables = load_tables(&root, &*mapper)?;
let tables = RwLock::new(tables);
let cfg = compactor::Config { let cfg = compactor::Config {
max_pages: config.max_tables, max_pages: config.max_tables,
@ -292,16 +292,17 @@ fn open(root: &Path, mapper: Arc<dyn Mapper>, config: Config) -> Result<KvStore>
}; };
let (sender, receiver, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg) let (sender, receiver, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let (sender, receiver) = (Mutex::new(sender), Mutex::new(receiver));
Ok(KvStore { Ok(KvStore {
write,
tables,
config, config,
mapper,
root, root,
sender, commit: AtomicUsize::new(commit as usize),
receiver, mem: RwLock::new(mem),
log: Arc::new(RwLock::new(log)),
tables: RwLock::new(tables),
mapper,
sender: Mutex::new(sender),
receiver: Mutex::new(receiver),
compactor_handle, compactor_handle,
}) })
} }

View File

@ -154,7 +154,7 @@ impl SSTable {
(meta.start, meta.level) (meta.start, meta.level)
}; };
while level as usize >= tables.len() { while level as usize >= sorted.len() {
sorted.push(BTreeMap::new()); sorted.push(BTreeMap::new());
} }
sorted[level as usize].insert(key, sst.clone()); sorted[level as usize].insert(key, sst.clone());

View File

@ -2,92 +2,79 @@ use crate::error::Result;
use crate::mapper::{Kind, Mapper}; use crate::mapper::{Kind, Mapper};
use crate::sstable::{Key, Merged, SSTable, Value}; use crate::sstable::{Key, Merged, SSTable, Value};
use crate::writelog::WriteLog; use crate::writelog::WriteLog;
use std::collections::btree_map::Entry;
use chrono::Utc;
use std::collections::BTreeMap; use std::collections::BTreeMap;
type MemTable = BTreeMap<Key, Value>;
// Size of timestamp + size of key // Size of timestamp + size of key
const OVERHEAD: usize = 8 + 3 * 8; const OVERHEAD: usize = 8 + 3 * 8;
const LOG_ERR: &str = "Write to log failed! Halting."; const LOG_ERR: &str = "Write to log failed! Halting.";
#[derive(Debug)] #[derive(Debug)]
pub struct WriteState { pub struct MemTable {
pub commit: i64,
pub log: WriteLog,
pub values: MemTable,
pub mem_size: usize, pub mem_size: usize,
pub values: BTreeMap<Key, Value>,
} }
impl WriteState { impl MemTable {
pub fn new(log: WriteLog, values: BTreeMap<Key, Value>) -> WriteState { pub fn new(values: BTreeMap<Key, Value>) -> MemTable {
let mem_size = values.values().fold(0, |acc, elem| acc + val_mem_use(elem)); let mem_size = values.values().fold(0, |acc, elem| acc + val_mem_use(elem));
WriteState { MemTable { mem_size, values }
commit: Utc::now().timestamp(), }
log, }
mem_size,
values, pub fn put(
mem: &mut MemTable,
log: &mut WriteLog,
key: &Key,
commit: i64,
data: &[u8],
) -> Result<()> {
log.log_put(key, commit, data).expect(LOG_ERR);
let value = Value {
ts: commit,
val: Some(data.to_vec()),
};
mem.mem_size += val_mem_use(&value);
match mem.values.entry(*key) {
Entry::Vacant(entry) => {
entry.insert(value);
}
Entry::Occupied(mut entry) => {
let old = entry.insert(value);
mem.mem_size -= val_mem_use(&old);
} }
} }
pub fn put(&mut self, key: &Key, data: &[u8]) -> Result<()> { Ok(())
use std::collections::btree_map::Entry; }
let ts = self.commit;
let value = Value {
ts,
val: Some(data.to_vec()),
};
self.log.log_put(key, ts, data).expect(LOG_ERR);
self.mem_size += val_mem_use(&value); pub fn delete(mem: &mut MemTable, log: &mut WriteLog, key: &Key, commit: i64) -> Result<()> {
log.log_delete(key, commit).expect(LOG_ERR);
let value = Value {
ts: commit,
val: None,
};
match self.values.entry(*key) { mem.mem_size += val_mem_use(&value);
Entry::Vacant(entry) => {
entry.insert(value); match mem.values.entry(*key) {
} Entry::Vacant(entry) => {
Entry::Occupied(mut entry) => { entry.insert(value);
let old = entry.insert(value);
self.mem_size -= val_mem_use(&old);
}
} }
Entry::Occupied(mut entry) => {
Ok(()) let old = entry.insert(value);
} mem.mem_size -= val_mem_use(&old);
pub fn delete(&mut self, key: &Key) -> Result<()> {
use std::collections::btree_map::Entry;
let ts = self.commit;
let value = Value { ts, val: None };
self.log.log_delete(key, ts).expect(LOG_ERR);
self.mem_size += val_mem_use(&value);
match self.values.entry(*key) {
Entry::Vacant(entry) => {
entry.insert(value);
}
Entry::Occupied(mut entry) => {
let old = entry.insert(value);
self.mem_size -= val_mem_use(&old);
}
} }
Ok(())
} }
pub fn reset(&mut self) -> Result<()> { Ok(())
self.values.clear();
self.log.reset()?;
self.mem_size = 0;
Ok(())
}
} }
pub fn flush_table( pub fn flush_table(
mem: &MemTable, mem: &BTreeMap<Key, Value>,
mapper: &dyn Mapper, mapper: &dyn Mapper,
pages: &mut Vec<BTreeMap<Key, SSTable>>, pages: &mut Vec<BTreeMap<Key, SSTable>>,
) -> Result<()> { ) -> Result<()> {
@ -110,7 +97,11 @@ pub fn flush_table(
Ok(()) Ok(())
} }
pub fn get(mem: &MemTable, pages: &[BTreeMap<Key, SSTable>], key: &Key) -> Result<Option<Vec<u8>>> { pub fn get(
mem: &BTreeMap<Key, Value>,
pages: &[BTreeMap<Key, SSTable>],
key: &Key,
) -> Result<Option<Vec<u8>>> {
if let Some(idx) = mem.get(key) { if let Some(idx) = mem.get(key) {
return Ok(idx.val.clone()); return Ok(idx.val.clone());
} }
@ -134,7 +125,7 @@ pub fn get(mem: &MemTable, pages: &[BTreeMap<Key, SSTable>], key: &Key) -> Resul
} }
pub fn range( pub fn range(
mem: &MemTable, mem: &BTreeMap<Key, Value>,
tables: &[BTreeMap<Key, SSTable>], tables: &[BTreeMap<Key, SSTable>],
range: std::ops::RangeInclusive<Key>, range: std::ops::RangeInclusive<Key>,
) -> Result<impl Iterator<Item = (Key, Vec<u8>)>> { ) -> Result<impl Iterator<Item = (Key, Vec<u8>)>> {
@ -144,21 +135,17 @@ pub fn range(
.range(range.clone()) .range(range.clone())
.map(|(k, v)| (*k, v.clone())) .map(|(k, v)| (*k, v.clone()))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
sources.push(Box::new(mem.into_iter()));
let mut disk = Vec::new();
for level in tables.iter() { for level in tables.iter() {
for sst in level.values() { for sst in level.values() {
let iter = sst.range(&range)?; let iter = sst.range(&range)?;
let iter = Box::new(iter) as Box<dyn Iterator<Item = (Key, Value)>>; let iter = Box::new(iter) as Box<dyn Iterator<Item = (Key, Value)>>;
disk.push(iter); sources.push(iter);
} }
} }
sources.push(Box::new(mem.into_iter()));
sources.extend(disk);
let rows = Merged::new(sources).map(|(k, v)| (k, v.val.unwrap())); let rows = Merged::new(sources).map(|(k, v)| (k, v.val.unwrap()));
Ok(rows) Ok(rows)

View File

@ -8,7 +8,6 @@ use std::collections::BTreeMap;
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::RwLock;
// RocksDb's log uses this size. // RocksDb's log uses this size.
// May be worth making configurable and experimenting // May be worth making configurable and experimenting
@ -17,7 +16,7 @@ const BLOCK_SIZE: usize = 32 * 1024;
#[derive(Debug)] #[derive(Debug)]
pub struct WriteLog { pub struct WriteLog {
log_path: PathBuf, log_path: PathBuf,
logger: RwLock<Logger>, logger: Logger,
config: Config, config: Config,
in_memory: bool, in_memory: bool,
} }
@ -35,7 +34,7 @@ impl WriteLog {
Ok(WriteLog { Ok(WriteLog {
config, config,
log_path: path.to_path_buf(), log_path: path.to_path_buf(),
logger: RwLock::new(Logger::disk(file)), logger: Logger::disk(file),
in_memory: false, in_memory: false,
}) })
} }
@ -44,15 +43,13 @@ impl WriteLog {
pub fn memory(config: Config) -> WriteLog { pub fn memory(config: Config) -> WriteLog {
WriteLog { WriteLog {
config, config,
logger: RwLock::new(Logger::memory()), logger: Logger::memory(),
log_path: Path::new("").to_path_buf(), log_path: Path::new("").to_path_buf(),
in_memory: true, in_memory: true,
} }
} }
pub fn reset(&self) -> Result<()> { pub fn reset(&mut self) -> Result<()> {
let mut logger = self.logger.write().unwrap();
let new_logger = if self.in_memory { let new_logger = if self.in_memory {
Logger::memory() Logger::memory()
} else { } else {
@ -60,44 +57,38 @@ impl WriteLog {
Logger::disk(file) Logger::disk(file)
}; };
*logger = new_logger; self.logger = new_logger;
Ok(()) Ok(())
} }
pub fn log_put(&self, key: &Key, ts: i64, val: &[u8]) -> Result<()> { pub fn log_put(&mut self, key: &Key, ts: i64, val: &[u8]) -> Result<()> {
let mut logger = self.logger.write().unwrap(); log(&mut self.logger, key, ts, Some(val))?;
log(&mut logger, key, ts, Some(val))?;
if self.config.sync_every_write { if self.config.sync_every_write {
sync(&mut logger, self.config.use_fsync)?; sync(&mut self.logger, self.config.use_fsync)?;
} }
Ok(()) Ok(())
} }
pub fn log_delete(&self, key: &Key, ts: i64) -> Result<()> { pub fn log_delete(&mut self, key: &Key, ts: i64) -> Result<()> {
let mut logger = self.logger.write().unwrap(); log(&mut self.logger, key, ts, None)?;
log(&mut logger, key, ts, None)?;
if self.config.sync_every_write { if self.config.sync_every_write {
sync(&mut logger, self.config.use_fsync)?; sync(&mut self.logger, self.config.use_fsync)?;
} }
Ok(()) Ok(())
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn sync(&self) -> Result<()> { pub fn sync(&mut self) -> Result<()> {
let mut logger = self.logger.write().unwrap(); sync(&mut self.logger, self.config.use_fsync)
sync(&mut logger, self.config.use_fsync)
} }
pub fn materialize(&self) -> Result<BTreeMap<Key, Value>> { pub fn materialize(&mut self) -> Result<BTreeMap<Key, Value>> {
let mmap = self.logger.write().unwrap().writer.mmap()?; let mmap = self.logger.writer.mmap()?;
read_log(&mmap) read_log(&mmap)
} }
} }
@ -281,7 +272,7 @@ mod test {
#[test] #[test]
fn test_log_round_trip() { fn test_log_round_trip() {
let wal = WriteLog::memory(Config::default()); let mut wal = WriteLog::memory(Config::default());
let values: BTreeMap<Key, Value> = (0u64..100) let values: BTreeMap<Key, Value> = (0u64..100)
.map(|n| { .map(|n| {
@ -313,7 +304,7 @@ mod test {
fn test_reset() { fn test_reset() {
use crate::error::Error; use crate::error::Error;
let wal = WriteLog::memory(Config::default()); let mut wal = WriteLog::memory(Config::default());
let values: BTreeMap<Key, Value> = (0u64..100) let values: BTreeMap<Key, Value> = (0u64..100)
.map(|n| { .map(|n| {