make KvStore Send+Sync (#3317)

This commit is contained in:
Mark 2019-03-15 13:01:34 -05:00 committed by GitHub
parent 4ba4ad9878
commit a15927f8d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 21 deletions

View File

@ -1,5 +1,5 @@
use crate::entry::Entry;
use crate::kvstore::{self, Key};
use crate::kvstore::{self, Key, KvStore};
use crate::packet::Blob;
use crate::result::{Error, Result};
@ -12,7 +12,7 @@ use super::db::{
use super::{Blocktree, BlocktreeError};
#[derive(Debug)]
pub struct Kvs(());
pub struct Kvs(KvStore);
/// The metadata column family
#[derive(Debug)]

View File

@ -2,14 +2,13 @@ use crate::kvstore::mapper::{Disk, Mapper, Memory};
use crate::kvstore::sstable::SSTable;
use crate::kvstore::storage::WriteState;
use crate::kvstore::writelog::WriteLog;
use std::collections::BTreeMap;
use std::fs;
use std::io;
use std::ops::RangeInclusive;
use std::path::{Path, PathBuf};
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::JoinHandle;
mod compactor;
@ -48,8 +47,8 @@ pub struct KvStore {
config: Config,
root: PathBuf,
mapper: Arc<dyn Mapper>,
req_tx: RwLock<Sender<compactor::Req>>,
resp_rx: RwLock<Receiver<compactor::Resp>>,
sender: Mutex<Sender<compactor::Req>>,
receiver: Mutex<Receiver<compactor::Resp>>,
compactor_handle: JoinHandle<()>,
}
@ -201,17 +200,17 @@ impl KvStore {
}
fn query_compactor(&self) -> Result<()> {
if let (Ok(mut req_tx), Ok(mut resp_rx), Ok(mut tables)) = (
self.req_tx.try_write(),
self.resp_rx.try_write(),
if let (Ok(mut sender), Ok(mut receiver), Ok(mut tables)) = (
self.sender.try_lock(),
self.receiver.try_lock(),
self.tables.try_write(),
) {
query_compactor(
&self.root,
&*self.mapper,
&mut *tables,
&mut *resp_rx,
&mut *req_tx,
&mut *receiver,
&mut *sender,
)?;
}
@ -238,8 +237,8 @@ impl KvStore {
dump_tables(&self.root, &*self.mapper).unwrap();
if trigger_compact {
let tables_path = self.root.join(TABLES_FILE);
self.req_tx
.write()
self.sender
.lock()
.unwrap()
.send(compactor::Req::Start(tables_path))
.expect("compactor thread dead");
@ -279,9 +278,9 @@ fn open(root: &Path, mapper: Arc<dyn Mapper>, config: Config) -> Result<KvStore>
max_pages: config.max_tables,
page_size: config.page_size,
};
let (req_tx, resp_rx, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg)
let (sender, receiver, compactor_handle) = compactor::spawn_compactor(Arc::clone(&mapper), cfg)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let (req_tx, resp_rx) = (RwLock::new(req_tx), RwLock::new(resp_rx));
let (sender, receiver) = (Mutex::new(sender), Mutex::new(receiver));
Ok(KvStore {
write,
@ -289,8 +288,8 @@ fn open(root: &Path, mapper: Arc<dyn Mapper>, config: Config) -> Result<KvStore>
config,
mapper,
root,
req_tx,
resp_rx,
sender,
receiver,
compactor_handle,
})
}
@ -316,14 +315,14 @@ fn query_compactor(
root: &Path,
mapper: &dyn Mapper,
tables: &mut Vec<BTreeMap<Key, SSTable>>,
resp_rx: &mut Receiver<compactor::Resp>,
req_tx: &mut Sender<compactor::Req>,
receiver: &mut Receiver<compactor::Resp>,
sender: &mut Sender<compactor::Req>,
) -> Result<()> {
match resp_rx.try_recv() {
match receiver.try_recv() {
Ok(compactor::Resp::Done(new_tables)) => {
std::mem::replace(tables, new_tables);
dump_tables(root, mapper)?;
req_tx.send(compactor::Req::Gc).unwrap();
sender.send(compactor::Req::Gc).unwrap();
}
Ok(compactor::Resp::Failed(e)) => {
return Err(e);