Kvstore: use bincode serialization (#3385)
* use bincode for SSTable serialization; add tests * Fix bug uncovered in merge algorithm by unit tests * use bincode in write-ahead-log serialization * Add helper `Fill` trait for zeroing buffers
This commit is contained in:
parent
0dc364c17a
commit
13c9d3d4e1
|
@ -43,6 +43,11 @@ pub struct CRCReader<R: Read> {
|
|||
chunk_size: usize,
|
||||
}
|
||||
|
||||
/// Helper trait to make zeroing buffers easier
|
||||
pub trait Fill<T> {
|
||||
fn fill(&mut self, v: T);
|
||||
}
|
||||
|
||||
impl SharedWriter {
|
||||
pub fn new(buf: Arc<RwLock<Vec<u8>>>) -> SharedWriter {
|
||||
SharedWriter { buf, pos: 0 }
|
||||
|
@ -146,6 +151,17 @@ impl<R: Read> CRCReader<R> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Fill<T> for [T]
|
||||
where
|
||||
T: Clone,
|
||||
{
|
||||
fn fill(&mut self, v: T) {
|
||||
for i in self {
|
||||
*i = v.clone()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for MemMap {
|
||||
type Target = [u8];
|
||||
|
||||
|
|
|
@ -1,31 +1,21 @@
|
|||
use crate::error::Result;
|
||||
use crate::io_utils::{MemMap, Writer};
|
||||
use crate::io_utils::{Fill, MemMap};
|
||||
|
||||
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
|
||||
use std::borrow::Borrow;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::io::{prelude::*, Cursor, Seek, SeekFrom};
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::io::prelude::*;
|
||||
use std::mem;
|
||||
use std::ops::RangeInclusive;
|
||||
use std::sync::Arc;
|
||||
use std::u64;
|
||||
|
||||
// ___________________________________________
|
||||
// | start_key | end_key | level | data_size |
|
||||
// -------------------------------------------
|
||||
const IDX_META_SIZE: usize = KEY_LEN + KEY_LEN + 1 + 8;
|
||||
|
||||
const KEY_LEN: usize = 3 * 8;
|
||||
// _________________
|
||||
// | offset | size |
|
||||
// -----------------
|
||||
const PTR_SIZE: usize = 2 * 8;
|
||||
// __________________________________________
|
||||
// | key | timestamp | pointer OR tombstone |
|
||||
// ------------------------------------------
|
||||
const INDEX_ENTRY_SIZE: usize = KEY_LEN + 8 + PTR_SIZE;
|
||||
// Represented by zero offset and size
|
||||
const TOMBSTONE: [u8; PTR_SIZE] = [0u8; PTR_SIZE];
|
||||
const INDEX_META_SIZE: usize = mem::size_of::<IndexMeta>();
|
||||
const KEY_LEN: usize = mem::size_of::<Key>();
|
||||
const INDEX_ENTRY_SIZE: usize = mem::size_of::<IndexEntry>();
|
||||
const INDEX_RECORD_SIZE: usize = KEY_LEN + INDEX_ENTRY_SIZE;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct SSTable {
|
||||
|
@ -34,7 +24,7 @@ pub struct SSTable {
|
|||
meta: IndexMeta,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone)]
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexMeta {
|
||||
pub level: u8,
|
||||
pub data_size: u64,
|
||||
|
@ -42,26 +32,30 @@ pub struct IndexMeta {
|
|||
pub end: Key,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Hash)]
|
||||
#[derive(
|
||||
Debug, Default, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, Hash, Serialize, Deserialize,
|
||||
)]
|
||||
pub struct Key(pub [u8; 24]);
|
||||
|
||||
#[derive(Debug, PartialEq, PartialOrd, Eq, Ord, Copy, Clone)]
|
||||
pub struct Index {
|
||||
#[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexEntry {
|
||||
pub timestamp: i64,
|
||||
pub offset: u64,
|
||||
pub size: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
|
||||
pub struct Value {
|
||||
pub ts: i64,
|
||||
pub val: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
/// An iterator that produces logical view over a set of SSTables
|
||||
/// An iterator that produces logical view over a set of SSTables.
|
||||
/// It implements [direct k-way merge](https://en.wikipedia.org/wiki/K-way_merge_algorithm#Heap)
|
||||
/// and reconciles out-of-date/deleted values in a lazy fashion. Inputs *MUST* be sorted
|
||||
pub struct Merged<I> {
|
||||
sources: Vec<I>,
|
||||
heads: BTreeMap<(Key, usize), Value>,
|
||||
seen: HashMap<Key, i64>,
|
||||
}
|
||||
|
||||
impl SSTable {
|
||||
|
@ -71,7 +65,7 @@ impl SSTable {
|
|||
|
||||
#[allow(dead_code)]
|
||||
pub fn num_keys(&self) -> u64 {
|
||||
((self.index.len() - IDX_META_SIZE) / INDEX_ENTRY_SIZE) as u64
|
||||
((self.index.len() - INDEX_META_SIZE) / INDEX_ENTRY_SIZE) as u64
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &Key) -> Result<Option<Value>> {
|
||||
|
@ -92,8 +86,8 @@ impl SSTable {
|
|||
rows: &mut I,
|
||||
level: u8,
|
||||
max_table_size: u64,
|
||||
data_wtr: &mut Writer,
|
||||
index_wtr: &mut Writer,
|
||||
data_wtr: &mut dyn Write,
|
||||
index_wtr: &mut dyn Write,
|
||||
) where
|
||||
I: Iterator<Item = (K, V)>,
|
||||
K: Borrow<Key>,
|
||||
|
@ -123,8 +117,12 @@ impl SSTable {
|
|||
index_wtr.flush().expect(INDEX_ERR);
|
||||
}
|
||||
|
||||
pub fn create<I, K, V>(rows: &mut I, level: u8, data_wtr: &mut Writer, index_wtr: &mut Writer)
|
||||
where
|
||||
pub fn create<I, K, V>(
|
||||
rows: &mut I,
|
||||
level: u8,
|
||||
data_wtr: &mut dyn Write,
|
||||
index_wtr: &mut dyn Write,
|
||||
) where
|
||||
I: Iterator<Item = (K, V)>,
|
||||
K: Borrow<Key>,
|
||||
V: Borrow<Value>,
|
||||
|
@ -133,7 +131,14 @@ impl SSTable {
|
|||
}
|
||||
|
||||
pub fn from_parts(data: Arc<MemMap>, index: Arc<MemMap>) -> Result<Self> {
|
||||
sst_from_parts(data, index)
|
||||
let len = index.len() as usize;
|
||||
|
||||
assert!(len > INDEX_META_SIZE);
|
||||
assert_eq!((len - INDEX_META_SIZE) % INDEX_RECORD_SIZE, 0);
|
||||
|
||||
let meta = bincode::deserialize_from(&index[..INDEX_META_SIZE])?;
|
||||
|
||||
Ok(SSTable { data, index, meta })
|
||||
}
|
||||
|
||||
pub fn could_contain(&self, key: &Key) -> bool {
|
||||
|
@ -165,8 +170,8 @@ impl SSTable {
|
|||
}
|
||||
|
||||
impl Key {
|
||||
pub const MIN: Key = Key([0u8; KEY_LEN as usize]);
|
||||
pub const MAX: Key = Key([255u8; KEY_LEN as usize]);
|
||||
pub const MIN: Key = Key([0u8; KEY_LEN]);
|
||||
pub const MAX: Key = Key([255u8; KEY_LEN]);
|
||||
pub const ALL_INCLUSIVE: RangeInclusive<Key> = RangeInclusive::new(Key::MIN, Key::MAX);
|
||||
|
||||
pub fn write<W: Write>(&self, wtr: &mut W) -> Result<()> {
|
||||
|
@ -181,6 +186,15 @@ impl Key {
|
|||
}
|
||||
}
|
||||
|
||||
impl Value {
|
||||
pub fn new(commit: i64, data: Option<Vec<u8>>) -> Value {
|
||||
Value {
|
||||
ts: commit,
|
||||
val: data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Scan {
|
||||
bounds: RangeInclusive<Key>,
|
||||
data: Arc<MemMap>,
|
||||
|
@ -194,18 +208,19 @@ impl Scan {
|
|||
bounds,
|
||||
data,
|
||||
index,
|
||||
index_pos: IDX_META_SIZE as usize,
|
||||
index_pos: INDEX_META_SIZE as usize,
|
||||
}
|
||||
}
|
||||
|
||||
fn step(&mut self) -> Result<Option<(Key, Value)>> {
|
||||
while self.index_pos < self.index.len() {
|
||||
let pos = self.index_pos as usize;
|
||||
let end = pos + INDEX_ENTRY_SIZE;
|
||||
let (key, ts, idx) = read_index_rec(&self.index[pos..end]);
|
||||
let end = pos + INDEX_RECORD_SIZE;
|
||||
|
||||
let (key, entry): (Key, IndexEntry) = bincode::deserialize_from(&self.index[pos..end])?;
|
||||
self.index_pos = end;
|
||||
|
||||
if key < *self.bounds.start() {
|
||||
self.index_pos = end;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -214,22 +229,29 @@ impl Scan {
|
|||
return Ok(None);
|
||||
}
|
||||
|
||||
let bytes_opt = idx.map(|ptr| get_val(&self.data, ptr).to_vec());
|
||||
let record_range = entry.offset as usize..(entry.offset + entry.size) as usize;
|
||||
let (data_key, value) = bincode::deserialize_from(&self.data[record_range])?;
|
||||
assert_eq!(data_key, key);
|
||||
|
||||
let val = Value { ts, val: bytes_opt };
|
||||
|
||||
self.index_pos = end;
|
||||
|
||||
return Ok(Some((key, val)));
|
||||
return Ok(Some((data_key, value)));
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Key {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let k0 = BigEndian::read_u64(&self.0[..8]);
|
||||
let k1 = BigEndian::read_u64(&self.0[8..16]);
|
||||
let k2 = BigEndian::read_u64(&self.0[16..]);
|
||||
write!(f, "Key({}, {}, {})", k0, k1, k2)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<(u64, u64, u64)> for Key {
|
||||
fn from((k0, k1, k2): (u64, u64, u64)) -> Self {
|
||||
let mut buf = [0u8; KEY_LEN as usize];
|
||||
let mut buf = [0u8; KEY_LEN];
|
||||
|
||||
BigEndian::write_u64(&mut buf[..8], k0);
|
||||
BigEndian::write_u64(&mut buf[8..16], k1);
|
||||
|
@ -239,46 +261,6 @@ impl From<(u64, u64, u64)> for Key {
|
|||
}
|
||||
}
|
||||
|
||||
impl Index {
|
||||
fn write<W: Write>(&self, wtr: &mut W) -> Result<()> {
|
||||
wtr.write_u64::<BigEndian>(self.offset)?;
|
||||
wtr.write_u64::<BigEndian>(self.size)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn read(bytes: &[u8]) -> Index {
|
||||
let offset = BigEndian::read_u64(&bytes[..8]);
|
||||
let size = BigEndian::read_u64(&bytes[8..16]);
|
||||
|
||||
Index { offset, size }
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexMeta {
|
||||
fn write<W: Write>(&self, wtr: &mut W) -> Result<()> {
|
||||
self.start.write(wtr)?;
|
||||
self.end.write(wtr)?;
|
||||
wtr.write_u8(self.level)?;
|
||||
wtr.write_u64::<BigEndian>(self.data_size)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read(data: &[u8]) -> Self {
|
||||
let start = Key::read(&data[..24]);
|
||||
let end = Key::read(&data[24..48]);
|
||||
let level = data[48];
|
||||
let data_size = BigEndian::read_u64(&data[49..57]);
|
||||
|
||||
IndexMeta {
|
||||
start,
|
||||
end,
|
||||
level,
|
||||
data_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<I> Merged<I>
|
||||
where
|
||||
I: Iterator<Item = (Key, Value)>,
|
||||
|
@ -292,11 +274,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
Merged {
|
||||
sources,
|
||||
heads,
|
||||
seen: HashMap::new(),
|
||||
}
|
||||
Merged { sources, heads }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -308,30 +286,38 @@ where
|
|||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
while !self.heads.is_empty() {
|
||||
// get new key
|
||||
let (key, source_idx) = *self.heads.keys().next().unwrap();
|
||||
let val = self.heads.remove(&(key, source_idx)).unwrap();
|
||||
let mut val = self.heads.remove(&(key, source_idx)).unwrap();
|
||||
|
||||
// replace
|
||||
if let Some((k, v)) = self.sources[source_idx].next() {
|
||||
self.heads.insert((k, source_idx), v);
|
||||
}
|
||||
|
||||
// merge logic
|
||||
// if deleted, remember
|
||||
let (deleted, stale) = match self.seen.get(&key) {
|
||||
Some(&seen_ts) if seen_ts < val.ts => {
|
||||
// fresh val
|
||||
self.seen.insert(key, val.ts);
|
||||
(val.val.is_none(), false)
|
||||
}
|
||||
Some(_) => (val.val.is_none(), true),
|
||||
None => {
|
||||
self.seen.insert(key, val.ts);
|
||||
(val.val.is_none(), false)
|
||||
}
|
||||
};
|
||||
// check for other versions of this record
|
||||
while !self.heads.is_empty() {
|
||||
let (next_key, source_idx) = *self.heads.keys().next().unwrap();
|
||||
|
||||
if !(stale || deleted) {
|
||||
// Found a different version of the record
|
||||
if key == next_key {
|
||||
// pop this version, check if it's newer
|
||||
let other_version = self.heads.remove(&(next_key, source_idx)).unwrap();
|
||||
if other_version.ts > val.ts {
|
||||
val = other_version;
|
||||
}
|
||||
|
||||
// replace
|
||||
if let Some((k, v)) = self.sources[source_idx].next() {
|
||||
self.heads.insert((k, source_idx), v);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Don't produce deleted records
|
||||
if val.val.is_some() {
|
||||
return Some((key, val));
|
||||
}
|
||||
}
|
||||
|
@ -358,66 +344,64 @@ impl Iterator for Scan {
|
|||
}
|
||||
}
|
||||
|
||||
fn sst_from_parts(data: Arc<MemMap>, index: Arc<MemMap>) -> Result<SSTable> {
|
||||
let len = index.len() as usize;
|
||||
|
||||
assert!(len > IDX_META_SIZE);
|
||||
assert_eq!((len - IDX_META_SIZE) % INDEX_ENTRY_SIZE, 0);
|
||||
|
||||
let mut rdr = Cursor::new(&**index);
|
||||
let mut idx_buf = [0; IDX_META_SIZE];
|
||||
rdr.read_exact(&mut idx_buf)?;
|
||||
|
||||
let meta = IndexMeta::read(&idx_buf);
|
||||
|
||||
Ok(SSTable { data, index, meta })
|
||||
}
|
||||
|
||||
fn flush_index(
|
||||
index: &BTreeMap<Key, (i64, Option<Index>)>,
|
||||
index: &BTreeMap<Key, IndexEntry>,
|
||||
meta: &IndexMeta,
|
||||
wtr: &mut Writer,
|
||||
writer: &mut dyn Write,
|
||||
) -> Result<()> {
|
||||
meta.write(wtr)?;
|
||||
let mut entry_buffer = [0u8; INDEX_RECORD_SIZE];
|
||||
let mut meta_buffer = [0u8; INDEX_META_SIZE];
|
||||
|
||||
for (&key, &(ts, idx)) in index.iter() {
|
||||
write_index_rec(wtr, (key, ts, idx))?;
|
||||
bincode::serialize_into(&mut meta_buffer[..], meta)?;
|
||||
writer.write_all(&meta_buffer)?;
|
||||
|
||||
for (key, entry) in index.iter() {
|
||||
let rec = (key, entry);
|
||||
entry_buffer.fill(0);
|
||||
|
||||
bincode::serialize_into(&mut entry_buffer[..], &rec)?;
|
||||
writer.write_all(&entry_buffer)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
#[allow(clippy::type_complexity)]
|
||||
|
||||
fn flush_mem_table_capped<I, K, V>(
|
||||
rows: &mut I,
|
||||
wtr: &mut Writer,
|
||||
mut wtr: &mut dyn Write,
|
||||
max_table_size: u64,
|
||||
) -> Result<(u64, BTreeMap<Key, (i64, Option<Index>)>)>
|
||||
) -> Result<(u64, BTreeMap<Key, IndexEntry>)>
|
||||
where
|
||||
I: Iterator<Item = (K, V)>,
|
||||
K: Borrow<Key>,
|
||||
V: Borrow<Value>,
|
||||
{
|
||||
let mut ssi = BTreeMap::new();
|
||||
let mut index = BTreeMap::new();
|
||||
let mut size = 0;
|
||||
let bincode_config = bincode::config();
|
||||
|
||||
for (key, val) in rows {
|
||||
let (key, val) = (key.borrow(), val.borrow());
|
||||
let ts = val.ts;
|
||||
let record = (key.borrow(), val.borrow());
|
||||
|
||||
let (index, item_size) = match val.val {
|
||||
Some(ref bytes) => (Some(write_val(wtr, bytes)?), bytes.len()),
|
||||
None => (None, 0),
|
||||
let serialized_size = bincode_config.serialized_size(&record)?;
|
||||
bincode::serialize_into(&mut wtr, &record)?;
|
||||
|
||||
let entry = IndexEntry {
|
||||
timestamp: record.1.ts,
|
||||
offset: size,
|
||||
size: serialized_size,
|
||||
};
|
||||
|
||||
size += item_size as u64;
|
||||
ssi.insert(*key, (ts, index));
|
||||
size += serialized_size;
|
||||
|
||||
index.insert(*record.0, entry);
|
||||
|
||||
if size >= max_table_size {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok((size, ssi))
|
||||
Ok((size, index))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -425,52 +409,165 @@ fn overlapping<T: Ord + Eq>(r1: &RangeInclusive<T>, r2: &RangeInclusive<T>) -> b
|
|||
r1.start() <= r2.end() && r2.start() <= r1.end()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_val<W: Write + Seek>(wtr: &mut W, val: &[u8]) -> Result<Index> {
|
||||
let offset = wtr.seek(SeekFrom::Current(0))?;
|
||||
let size = val.len() as u64;
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
#[test]
|
||||
fn test_dump_data() {
|
||||
let mut data_buffer = vec![];
|
||||
let records: BTreeMap<_, _> = gen_records().take(512).collect();
|
||||
|
||||
let (_, index) =
|
||||
flush_mem_table_capped(&mut records.iter(), &mut data_buffer, u64::MAX).unwrap();
|
||||
|
||||
assert_eq!(index.len(), records.len());
|
||||
assert!(index.keys().eq(records.keys()));
|
||||
|
||||
let mut retrieved = BTreeMap::new();
|
||||
|
||||
for (key, entry) in index.iter() {
|
||||
let range = entry.offset as usize..(entry.offset + entry.size) as usize;
|
||||
let (data_key, value) = bincode::deserialize_from(&data_buffer[range]).unwrap();
|
||||
assert_eq!(&data_key, key);
|
||||
retrieved.insert(data_key, value);
|
||||
}
|
||||
|
||||
assert_eq!(records, retrieved);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_dump_indexes() {
|
||||
let mut data_buffer = vec![];
|
||||
let mut index_buffer = vec![];
|
||||
let records: BTreeMap<_, _> = gen_records().take(512).collect();
|
||||
|
||||
let (data_size, index) =
|
||||
flush_mem_table_capped(&mut records.iter(), &mut data_buffer, u64::MAX).unwrap();
|
||||
|
||||
let (&start, &end) = (
|
||||
index.keys().next().unwrap(),
|
||||
index.keys().next_back().unwrap(),
|
||||
);
|
||||
|
||||
let meta = IndexMeta {
|
||||
start,
|
||||
end,
|
||||
data_size,
|
||||
level: 0,
|
||||
};
|
||||
|
||||
flush_index(&index, &meta, &mut index_buffer).unwrap();
|
||||
|
||||
let retrieved_meta = bincode::deserialize_from(&index_buffer[..INDEX_META_SIZE]).unwrap();
|
||||
assert_eq!(meta, retrieved_meta);
|
||||
|
||||
// By iterating over the BTreeMap we also check the order of index entries as written
|
||||
for (i, (key, entry)) in index.iter().enumerate() {
|
||||
let start = i * INDEX_RECORD_SIZE + INDEX_META_SIZE;
|
||||
let end = start + INDEX_RECORD_SIZE;
|
||||
|
||||
let (retrieved_key, retrieved_entry) =
|
||||
bincode::deserialize_from(&index_buffer[start..end]).unwrap();
|
||||
|
||||
assert_eq!(key, &retrieved_key);
|
||||
assert_eq!(entry, &retrieved_entry);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sstable_scan() {
|
||||
let mut data_buffer = vec![];
|
||||
let mut index_buffer = vec![];
|
||||
let records: BTreeMap<_, _> = gen_records().take(512).collect();
|
||||
|
||||
SSTable::create(&mut records.iter(), 0, &mut data_buffer, &mut index_buffer);
|
||||
|
||||
let data = MemMap::Mem(Arc::new(RwLock::new(data_buffer)));
|
||||
let index = MemMap::Mem(Arc::new(RwLock::new(index_buffer)));
|
||||
|
||||
let sst = SSTable::from_parts(Arc::new(data), Arc::new(index)).unwrap();
|
||||
|
||||
let output_iter = Scan::new(
|
||||
Key::ALL_INCLUSIVE,
|
||||
Arc::clone(&sst.data),
|
||||
Arc::clone(&sst.index),
|
||||
);
|
||||
|
||||
assert!(output_iter.eq(records.into_iter()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_2way() {
|
||||
let records: BTreeMap<_, _> = gen_records().take(512).collect();
|
||||
let updates: BTreeMap<_, _> = records
|
||||
.iter()
|
||||
.map(|(k, v)| (*k, Value::new(v.ts + 1, Some(vec![]))))
|
||||
.collect();
|
||||
let deletes: BTreeMap<_, _> = records
|
||||
.iter()
|
||||
.map(|(k, v)| (*k, Value::new(v.ts + 1, None)))
|
||||
.collect();
|
||||
|
||||
let owned = |(k, v): (&Key, &Value)| (*k, v.clone());
|
||||
|
||||
let sources = vec![records.iter().map(owned), updates.iter().map(owned)];
|
||||
let merged: Vec<_> = Merged::new(sources).collect();
|
||||
assert!(merged.into_iter().eq(updates.into_iter()));
|
||||
|
||||
let sources = vec![records.into_iter(), deletes.into_iter()];
|
||||
let merged: Vec<_> = Merged::new(sources).collect();
|
||||
assert_eq!(merged.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_4way() {
|
||||
// delete last half, then update first half, then delete last half of first half
|
||||
let start: BTreeMap<_, _> = gen_records().take(512).collect();
|
||||
let deletes: BTreeMap<_, _> = start
|
||||
.iter()
|
||||
.skip(256)
|
||||
.map(|(k, v)| (*k, Value::new(v.ts + 1, None)))
|
||||
.collect();
|
||||
let updates: BTreeMap<_, _> = start
|
||||
.iter()
|
||||
.take(256)
|
||||
.map(|(k, v)| (*k, Value::new(v.ts + 2, Some(vec![]))))
|
||||
.collect();
|
||||
let more_deletes: BTreeMap<_, _> = updates
|
||||
.iter()
|
||||
.skip(128)
|
||||
.map(|(k, v)| (*k, Value::new(v.ts + 3, None)))
|
||||
.collect();
|
||||
|
||||
let sources = vec![
|
||||
more_deletes.into_iter(),
|
||||
updates.clone().into_iter(),
|
||||
start.into_iter(),
|
||||
deletes.into_iter(),
|
||||
];
|
||||
|
||||
let merged: Vec<_> = Merged::new(sources).collect();
|
||||
let expected: Vec<_> = updates.into_iter().take(128).collect();
|
||||
|
||||
assert_eq!(merged.len(), expected.len());
|
||||
assert_eq!(merged, expected);
|
||||
}
|
||||
|
||||
fn gen_records() -> impl Iterator<Item = (Key, Value)> {
|
||||
let mut rng = thread_rng();
|
||||
let commit = rng.gen();
|
||||
|
||||
std::iter::repeat_with(move || {
|
||||
let buf: [u8; KEY_LEN] = rng.gen();
|
||||
let data_size: u8 = buf[0];
|
||||
|
||||
let val = Some(vec![0; data_size as usize]);
|
||||
|
||||
(Key(buf), Value::new(commit, val))
|
||||
})
|
||||
}
|
||||
|
||||
wtr.write_all(val)?;
|
||||
Ok(Index { offset, size })
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn get_val(mmap: &MemMap, idx: Index) -> &[u8] {
|
||||
let row = &mmap[idx.offset as usize..(idx.offset + idx.size) as usize];
|
||||
assert_eq!(row.len(), idx.size as usize);
|
||||
row
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_index_rec<W: Write>(wtr: &mut W, (key, ts, ptr): (Key, i64, Option<Index>)) -> Result<()> {
|
||||
key.write(wtr)?;
|
||||
|
||||
wtr.write_i64::<BigEndian>(ts)?;
|
||||
|
||||
match ptr {
|
||||
Some(idx) => idx.write(wtr)?,
|
||||
None => wtr.write_all(&TOMBSTONE)?,
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn read_index_rec(bytes: &[u8]) -> (Key, i64, Option<Index>) {
|
||||
assert_eq!(bytes.len(), INDEX_ENTRY_SIZE);
|
||||
const TS_END: usize = KEY_LEN + 8;
|
||||
|
||||
let mut key_buf = [0; KEY_LEN as usize];
|
||||
key_buf.copy_from_slice(&bytes[..KEY_LEN as usize]);
|
||||
let key = Key(key_buf);
|
||||
let ts = BigEndian::read_i64(&bytes[KEY_LEN..TS_END]);
|
||||
|
||||
let idx_slice = &bytes[TS_END..INDEX_ENTRY_SIZE];
|
||||
let idx = if idx_slice == TOMBSTONE {
|
||||
None
|
||||
} else {
|
||||
Some(Index::read(idx_slice))
|
||||
};
|
||||
|
||||
(key, ts, idx)
|
||||
}
|
||||
|
|
|
@ -2,11 +2,10 @@ use crate::error::Result;
|
|||
use crate::io_utils::{CRCReader, CRCWriter};
|
||||
use crate::sstable::Value;
|
||||
use crate::Key;
|
||||
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
|
||||
use memmap::Mmap;
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Read, Write};
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
// RocksDb's log uses this size.
|
||||
|
@ -162,7 +161,8 @@ impl LogWriter for CRCWriter<File> {
|
|||
|
||||
fn log(logger: &mut Logger, key: &Key, commit: i64, data: Option<&[u8]>) -> Result<()> {
|
||||
let writer = &mut logger.writer;
|
||||
write_value(writer, key, commit, data)?;
|
||||
|
||||
bincode::serialize_into(writer, &(key, commit, data))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -184,90 +184,33 @@ fn file_opts() -> fs::OpenOptions {
|
|||
|
||||
fn read_log(log_buf: &[u8]) -> Result<BTreeMap<Key, Value>> {
|
||||
let mut map = BTreeMap::new();
|
||||
if log_buf.len() <= 8 + 24 + 8 + 1 {
|
||||
return Ok(map);
|
||||
}
|
||||
|
||||
let mut reader = CRCReader::new(log_buf, BLOCK_SIZE);
|
||||
|
||||
while let Ok((key, val)) = read_value(&mut reader) {
|
||||
map.insert(key, val);
|
||||
while let Ok((key, commit, opt_bytes)) = bincode::deserialize_from(&mut reader) {
|
||||
map.insert(key, Value::new(commit, opt_bytes));
|
||||
}
|
||||
|
||||
Ok(map)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_value<W: Write>(
|
||||
writer: &mut W,
|
||||
key: &Key,
|
||||
commit: i64,
|
||||
data: Option<&[u8]>,
|
||||
) -> Result<()> {
|
||||
let len = 24 + 8 + 1 + data.map(<[u8]>::len).unwrap_or(0);
|
||||
|
||||
writer.write_u64::<BigEndian>(len as u64)?;
|
||||
writer.write_all(&key.0)?;
|
||||
writer.write_i64::<BigEndian>(commit)?;
|
||||
|
||||
match data {
|
||||
Some(data) => {
|
||||
writer.write_u8(1)?;
|
||||
writer.write_all(data)?;
|
||||
}
|
||||
None => {
|
||||
writer.write_u8(0)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn read_value<R: Read>(reader: &mut R) -> Result<(Key, Value)> {
|
||||
let len = reader.read_u64::<BigEndian>()?;
|
||||
let data_len = len as usize - (24 + 8 + 1);
|
||||
|
||||
let mut reader = reader.by_ref().take(len);
|
||||
|
||||
let mut key_buf = [0; 24];
|
||||
reader.read_exact(&mut key_buf)?;
|
||||
let key = Key(key_buf);
|
||||
|
||||
let commit = reader.read_i64::<BigEndian>()?;
|
||||
let exists = reader.read_u8()? != 0;
|
||||
|
||||
let data = if exists {
|
||||
let mut buf = Vec::with_capacity(data_len);
|
||||
reader.read_to_end(&mut buf)?;
|
||||
Some(buf)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let val = Value {
|
||||
ts: commit,
|
||||
val: data,
|
||||
};
|
||||
Ok((key, val))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_log_serialization() {
|
||||
let (key, commit, data) = (&Key::from((1, 2, 3)), 4, vec![0; 1024]);
|
||||
let (key, commit, data) = (Key::from((1, 2, 3)), 4, Some(vec![0; 1024]));
|
||||
|
||||
let mut buf = vec![];
|
||||
|
||||
write_value(&mut buf, key, commit, Some(&data)).unwrap();
|
||||
bincode::serialize_into(&mut buf, &(&key, commit, &data)).unwrap();
|
||||
buf.extend(std::iter::repeat(0).take(buf.len()));
|
||||
|
||||
let (stored_key, stored_val) = read_value(&mut &buf[..]).unwrap();
|
||||
assert_eq!(&stored_key, key);
|
||||
assert_eq!(stored_val.val.as_ref().unwrap(), &data);
|
||||
assert_eq!(stored_val.ts, commit);
|
||||
let log_record: (Key, i64, Option<Vec<u8>>) = bincode::deserialize_from(&buf[..]).unwrap();
|
||||
assert_eq!(log_record.0, key);
|
||||
assert_eq!(log_record.1, commit);
|
||||
assert_eq!(log_record.2, data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue