get buffered IO back for ledger

This commit is contained in:
Rob Walker 2018-08-07 15:12:20 -07:00
parent 64d6d3015a
commit 2a0025bb57
1 changed files with 87 additions and 68 deletions

View File

@ -5,16 +5,17 @@
use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size}; use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size};
use entry::Entry; use entry::Entry;
use hash::Hash; use hash::Hash;
//use log::Level::Trace; use log::Level::Trace;
use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use packet::{self, SharedBlob, BLOB_DATA_SIZE};
use rayon::prelude::*; use rayon::prelude::*;
use result::{Error, Result}; use result::{Error, Result};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*; use std::io::prelude::*;
use std::io::{self, Cursor, Seek, SeekFrom}; use std::io::{self, BufReader, BufWriter, Cursor, Seek, SeekFrom};
use std::mem::size_of; use std::mem::size_of;
use std::path::Path; use std::path::Path;
use streamer::WINDOW_SIZE;
use transaction::Transaction; use transaction::Transaction;
// //
@ -59,8 +60,8 @@ use transaction::Transaction;
// ledger window // ledger window
#[derive(Debug)] #[derive(Debug)]
pub struct LedgerWindow { pub struct LedgerWindow {
index: File, index: BufReader<File>,
data: File, data: BufReader<File>,
} }
// use a CONST because there's a cast, and we don't want "sizeof::<u64> as u64"... // use a CONST because there's a cast, and we don't want "sizeof::<u64> as u64"...
@ -71,21 +72,21 @@ fn err_bincode_to_io(e: Box<bincode::ErrorKind>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e.to_string()) io::Error::new(io::ErrorKind::Other, e.to_string())
} }
fn entry_at(file: &mut File, at: u64) -> io::Result<Entry> { fn entry_at<A: Read + Seek>(file: &mut A, at: u64) -> io::Result<Entry> {
file.seek(SeekFrom::Start(at))?; file.seek(SeekFrom::Start(at))?;
let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?; let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?;
//trace!("entry_at({}) len: {}", at, len); trace!("entry_at({}) len: {}", at, len);
deserialize_from(file.take(len)).map_err(err_bincode_to_io) deserialize_from(file.take(len)).map_err(err_bincode_to_io)
} }
fn next_entry(file: &mut File) -> io::Result<Entry> { fn next_entry<A: Read>(file: &mut A) -> io::Result<Entry> {
let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?; let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?;
deserialize_from(file.take(len)).map_err(err_bincode_to_io) deserialize_from(file.take(len)).map_err(err_bincode_to_io)
} }
fn u64_at(file: &mut File, at: u64) -> io::Result<u64> { fn u64_at<A: Read + Seek>(file: &mut A, at: u64) -> io::Result<u64> {
file.seek(SeekFrom::Start(at))?; file.seek(SeekFrom::Start(at))?;
deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
} }
@ -98,7 +99,9 @@ impl LedgerWindow {
recover_ledger(ledger_path)?; recover_ledger(ledger_path)?;
let index = File::open(ledger_path.join("index"))?; let index = File::open(ledger_path.join("index"))?;
let index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index);
let data = File::open(ledger_path.join("data"))?; let data = File::open(ledger_path.join("data"))?;
let data = BufReader::with_capacity(WINDOW_SIZE as usize * BLOB_DATA_SIZE, data);
Ok(LedgerWindow { index, data }) Ok(LedgerWindow { index, data })
} }
@ -174,7 +177,7 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> {
let len = index.metadata()?.len(); let len = index.metadata()?.len();
if len % SIZEOF_U64 != 0 { if len % SIZEOF_U64 != 0 {
//trace!("recover: trimming index len to {}", len - len % SIZEOF_U64); trace!("recover: trimming index len to {}", len - len % SIZEOF_U64);
index.set_len(len - (len % SIZEOF_U64))?; index.set_len(len - (len % SIZEOF_U64))?;
} }
@ -182,50 +185,50 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> {
// to a valid entry deserialization offset... // to a valid entry deserialization offset...
loop { loop {
let len = index.metadata()?.len(); let len = index.metadata()?.len();
//trace!("recover: index len:{}", len); trace!("recover: index len:{}", len);
// should never happen // should never happen
if len < SIZEOF_U64 { if len < SIZEOF_U64 {
//trace!("recover: error index len {} too small", len); trace!("recover: error index len {} too small", len);
Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?; Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?;
} }
let offset = u64_at(&mut index, len - SIZEOF_U64)?; let offset = u64_at(&mut index, len - SIZEOF_U64)?;
//trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset); trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset);
match entry_at(&mut data, offset) { match entry_at(&mut data, offset) {
Ok(entry) => { Ok(entry) => {
//trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry); trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry);
let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?; let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?;
//trace!("recover: entry_len: {}", entry_len); trace!("recover: entry_len: {}", entry_len);
// now trim data file to size... // now trim data file to size...
data.set_len(offset + SIZEOF_U64 + entry_len)?; data.set_len(offset + SIZEOF_U64 + entry_len)?;
//trace!( trace!(
// "recover: trimmed data file to {}", "recover: trimmed data file to {}",
// offset + SIZEOF_U64 + entry_len offset + SIZEOF_U64 + entry_len
//); );
break; // all good break; // all good
} }
Err(_err) => { Err(_err) => {
//trace!( trace!(
// "recover: no entry recovered at {} {}", "recover: no entry recovered at {} {}",
// offset, offset,
// _err.to_string() _err.to_string()
//); );
index.set_len(len - SIZEOF_U64)?; index.set_len(len - SIZEOF_U64)?;
} }
} }
} }
//if log_enabled!(Trace) { if log_enabled!(Trace) {
// let num_entries = index.metadata()?.len() / SIZEOF_U64; let num_entries = index.metadata()?.len() / SIZEOF_U64;
// trace!("recover: done. {} entries", num_entries); trace!("recover: done. {} entries", num_entries);
//} }
// flush everything to disk... // flush everything to disk...
index.sync_all()?; index.sync_all()?;
@ -248,8 +251,8 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> {
#[derive(Debug)] #[derive(Debug)]
pub struct LedgerWriter { pub struct LedgerWriter {
index: File, index: BufWriter<File>,
data: File, data: BufWriter<File>,
} }
impl LedgerWriter { impl LedgerWriter {
@ -268,20 +271,22 @@ impl LedgerWriter {
.append(true) .append(true)
.open(ledger_path.join("index"))?; .open(ledger_path.join("index"))?;
//if log_enabled!(Trace) { if log_enabled!(Trace) {
// let len = index.metadata()?.len(); let len = index.metadata()?.len();
// trace!("LedgerWriter::new: index fp:{}", len); trace!("LedgerWriter::new: index fp:{}", len);
//} }
let index = BufWriter::new(index);
let data = OpenOptions::new() let data = OpenOptions::new()
.create(create) .create(create)
.append(true) .append(true)
.open(ledger_path.join("data"))?; .open(ledger_path.join("data"))?;
//if log_enabled!(Trace) { if log_enabled!(Trace) {
// let len = data.metadata()?.len(); let len = data.metadata()?.len();
// trace!("LedgerWriter::new: data fp:{}", len); trace!("LedgerWriter::new: data fp:{}", len);
//} }
let data = BufWriter::new(data);
Ok(LedgerWriter { index, data }) Ok(LedgerWriter { index, data })
} }
@ -290,30 +295,27 @@ impl LedgerWriter {
let len = serialized_size(&entry).map_err(err_bincode_to_io)?; let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
//if log_enabled!(Trace) { if log_enabled!(Trace) {
// let offset = self.data.seek(SeekFrom::Current(0))?; let offset = self.data.seek(SeekFrom::Current(0))?;
// trace!("write_entry: after len data fp:{}", offset); trace!("write_entry: after len data fp:{}", offset);
//} }
serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?;
//if log_enabled!(Trace) { if log_enabled!(Trace) {
// let offset = self.data.seek(SeekFrom::Current(0))?; let offset = self.data.seek(SeekFrom::Current(0))?;
// trace!("write_entry: after entry data fp:{}", offset); trace!("write_entry: after entry data fp:{}", offset);
//} }
//self.data.sync_data()?;
let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64; let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64;
//trace!("write_entry: offset:{} len:{}", offset, len); trace!("write_entry: offset:{} len:{}", offset, len);
serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io) serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?;
//if log_enabled!(Trace) { if log_enabled!(Trace) {
// let offset = self.index.seek(SeekFrom::Current(0))?; let offset = self.index.seek(SeekFrom::Current(0))?;
// trace!("write_entry: end index fp:{}", offset); trace!("write_entry: end index fp:{}", offset);
//} }
Ok(())
//self.index.sync_data()
} }
pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()> pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()>
@ -329,7 +331,7 @@ impl LedgerWriter {
#[derive(Debug)] #[derive(Debug)]
pub struct LedgerReader { pub struct LedgerReader {
data: File, data: BufReader<File>,
} }
impl Iterator for LedgerReader { impl Iterator for LedgerReader {
@ -350,6 +352,7 @@ pub fn read_ledger(ledger_path: &str) -> io::Result<impl Iterator<Item = io::Res
recover_ledger(ledger_path)?; recover_ledger(ledger_path)?;
let data = File::open(ledger_path.join("data"))?; let data = File::open(ledger_path.join("data"))?;
let data = BufReader::new(data);
Ok(LedgerReader { data }) Ok(LedgerReader { data })
} }
@ -672,8 +675,11 @@ mod tests {
let ledger_path = tmp_ledger_path("test_ledger_reader_writer"); let ledger_path = tmp_ledger_path("test_ledger_reader_writer");
let entries = make_tiny_test_entries(10); let entries = make_tiny_test_entries(10);
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); {
writer.write_entries(entries.clone()).unwrap(); let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap();
// drops writer, flushes buffers
}
let mut read_entries = vec![]; let mut read_entries = vec![];
for x in read_ledger(&ledger_path).unwrap() { for x in read_ledger(&ledger_path).unwrap() {
@ -700,10 +706,17 @@ mod tests {
} }
fn truncated_last_entry(ledger_path: &str, entries: Vec<Entry>) { fn truncated_last_entry(ledger_path: &str, entries: Vec<Entry>) {
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); let len = {
writer.write_entries(entries).unwrap(); let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
let len = writer.data.seek(SeekFrom::Current(0)).unwrap(); writer.write_entries(entries).unwrap();
writer.data.set_len(len - 4).unwrap(); writer.data.seek(SeekFrom::Current(0)).unwrap()
};
let data = OpenOptions::new()
.write(true)
.open(Path::new(&ledger_path).join("data"))
.unwrap();
data.set_len(len - 4).unwrap();
} }
fn garbage_on_data(ledger_path: &str, entries: Vec<Entry>) { fn garbage_on_data(ledger_path: &str, entries: Vec<Entry>) {
@ -749,8 +762,10 @@ mod tests {
// restore last entry, tests recover_ledger() inside LedgerWriter::new() // restore last entry, tests recover_ledger() inside LedgerWriter::new()
truncated_last_entry(&ledger_path, entries.clone()); truncated_last_entry(&ledger_path, entries.clone());
let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); {
writer.write_entry(&entries[entries.len() - 1]).unwrap(); let mut writer = LedgerWriter::new(&ledger_path, false).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
}
read_ledger_check(&ledger_path, entries.clone(), entries.len()); read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len()); ledger_window_check(&ledger_path, entries.clone(), entries.len());
@ -764,8 +779,10 @@ mod tests {
// make it look like data is newer in time, check writer... // make it look like data is newer in time, check writer...
garbage_on_data(&ledger_path, entries[..entries.len() - 1].to_vec()); garbage_on_data(&ledger_path, entries[..entries.len() - 1].to_vec());
let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); {
writer.write_entry(&entries[entries.len() - 1]).unwrap(); let mut writer = LedgerWriter::new(&ledger_path, false).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
}
read_ledger_check(&ledger_path, entries.clone(), entries.len()); read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len()); ledger_window_check(&ledger_path, entries.clone(), entries.len());
let _ignored = remove_dir_all(&ledger_path); let _ignored = remove_dir_all(&ledger_path);
@ -778,8 +795,10 @@ mod tests {
let entries = make_tiny_test_entries(10); let entries = make_tiny_test_entries(10);
let ledger_path = tmp_ledger_path("test_verify_ledger"); let ledger_path = tmp_ledger_path("test_verify_ledger");
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); {
writer.write_entries(entries.clone()).unwrap(); let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap();
}
assert!(verify_ledger(&ledger_path, false).is_ok()); assert!(verify_ledger(&ledger_path, false).is_ok());
let _ignored = remove_dir_all(&ledger_path); let _ignored = remove_dir_all(&ledger_path);