diff --git a/src/ledger.rs b/src/ledger.rs index af3a616873..8333f36575 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -5,16 +5,17 @@ use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size}; use entry::Entry; use hash::Hash; -//use log::Level::Trace; +use log::Level::Trace; use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; use std::collections::VecDeque; use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; use std::io::prelude::*; -use std::io::{self, Cursor, Seek, SeekFrom}; +use std::io::{self, BufReader, BufWriter, Cursor, Seek, SeekFrom}; use std::mem::size_of; use std::path::Path; +use streamer::WINDOW_SIZE; use transaction::Transaction; // @@ -59,8 +60,8 @@ use transaction::Transaction; // ledger window #[derive(Debug)] pub struct LedgerWindow { - index: File, - data: File, + index: BufReader, + data: BufReader, } // use a CONST because there's a cast, and we don't want "sizeof:: as u64"... @@ -71,21 +72,21 @@ fn err_bincode_to_io(e: Box) -> io::Error { io::Error::new(io::ErrorKind::Other, e.to_string()) } -fn entry_at(file: &mut File, at: u64) -> io::Result { +fn entry_at(file: &mut A, at: u64) -> io::Result { file.seek(SeekFrom::Start(at))?; let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?; - //trace!("entry_at({}) len: {}", at, len); + trace!("entry_at({}) len: {}", at, len); deserialize_from(file.take(len)).map_err(err_bincode_to_io) } -fn next_entry(file: &mut File) -> io::Result { +fn next_entry(file: &mut A) -> io::Result { let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?; deserialize_from(file.take(len)).map_err(err_bincode_to_io) } -fn u64_at(file: &mut File, at: u64) -> io::Result { +fn u64_at(file: &mut A, at: u64) -> io::Result { file.seek(SeekFrom::Start(at))?; deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) } @@ -98,7 +99,9 @@ impl LedgerWindow { recover_ledger(ledger_path)?; let index = File::open(ledger_path.join("index"))?; + let index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index); let data = File::open(ledger_path.join("data"))?; + let data = BufReader::with_capacity(WINDOW_SIZE as usize * BLOB_DATA_SIZE, data); Ok(LedgerWindow { index, data }) } @@ -174,7 +177,7 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> { let len = index.metadata()?.len(); if len % SIZEOF_U64 != 0 { - //trace!("recover: trimming index len to {}", len - len % SIZEOF_U64); + trace!("recover: trimming index len to {}", len - len % SIZEOF_U64); index.set_len(len - (len % SIZEOF_U64))?; } @@ -182,50 +185,50 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> { // to a valid entry deserialization offset... loop { let len = index.metadata()?.len(); - //trace!("recover: index len:{}", len); + trace!("recover: index len:{}", len); // should never happen if len < SIZEOF_U64 { - //trace!("recover: error index len {} too small", len); + trace!("recover: error index len {} too small", len); Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?; } let offset = u64_at(&mut index, len - SIZEOF_U64)?; - //trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset); + trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset); match entry_at(&mut data, offset) { Ok(entry) => { - //trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry); + trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry); let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?; - //trace!("recover: entry_len: {}", entry_len); + trace!("recover: entry_len: {}", entry_len); // now trim data file to size... data.set_len(offset + SIZEOF_U64 + entry_len)?; - //trace!( - // "recover: trimmed data file to {}", - // offset + SIZEOF_U64 + entry_len - //); + trace!( + "recover: trimmed data file to {}", + offset + SIZEOF_U64 + entry_len + ); break; // all good } Err(_err) => { - //trace!( - // "recover: no entry recovered at {} {}", - // offset, - // _err.to_string() - //); + trace!( + "recover: no entry recovered at {} {}", + offset, + _err.to_string() + ); index.set_len(len - SIZEOF_U64)?; } } } - //if log_enabled!(Trace) { - // let num_entries = index.metadata()?.len() / SIZEOF_U64; - // trace!("recover: done. {} entries", num_entries); - //} + if log_enabled!(Trace) { + let num_entries = index.metadata()?.len() / SIZEOF_U64; + trace!("recover: done. {} entries", num_entries); + } // flush everything to disk... index.sync_all()?; @@ -248,8 +251,8 @@ fn recover_ledger(ledger_path: &Path) -> io::Result<()> { #[derive(Debug)] pub struct LedgerWriter { - index: File, - data: File, + index: BufWriter, + data: BufWriter, } impl LedgerWriter { @@ -268,20 +271,22 @@ impl LedgerWriter { .append(true) .open(ledger_path.join("index"))?; - //if log_enabled!(Trace) { - // let len = index.metadata()?.len(); - // trace!("LedgerWriter::new: index fp:{}", len); - //} + if log_enabled!(Trace) { + let len = index.metadata()?.len(); + trace!("LedgerWriter::new: index fp:{}", len); + } + let index = BufWriter::new(index); let data = OpenOptions::new() .create(create) .append(true) .open(ledger_path.join("data"))?; - //if log_enabled!(Trace) { - // let len = data.metadata()?.len(); - // trace!("LedgerWriter::new: data fp:{}", len); - //} + if log_enabled!(Trace) { + let len = data.metadata()?.len(); + trace!("LedgerWriter::new: data fp:{}", len); + } + let data = BufWriter::new(data); Ok(LedgerWriter { index, data }) } @@ -290,30 +295,27 @@ impl LedgerWriter { let len = serialized_size(&entry).map_err(err_bincode_to_io)?; serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; - //if log_enabled!(Trace) { - // let offset = self.data.seek(SeekFrom::Current(0))?; - // trace!("write_entry: after len data fp:{}", offset); - //} + if log_enabled!(Trace) { + let offset = self.data.seek(SeekFrom::Current(0))?; + trace!("write_entry: after len data fp:{}", offset); + } serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?; - //if log_enabled!(Trace) { - // let offset = self.data.seek(SeekFrom::Current(0))?; - // trace!("write_entry: after entry data fp:{}", offset); - //} - - //self.data.sync_data()?; + if log_enabled!(Trace) { + let offset = self.data.seek(SeekFrom::Current(0))?; + trace!("write_entry: after entry data fp:{}", offset); + } let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64; - //trace!("write_entry: offset:{} len:{}", offset, len); + trace!("write_entry: offset:{} len:{}", offset, len); - serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io) + serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?; - //if log_enabled!(Trace) { - // let offset = self.index.seek(SeekFrom::Current(0))?; - // trace!("write_entry: end index fp:{}", offset); - //} - - //self.index.sync_data() + if log_enabled!(Trace) { + let offset = self.index.seek(SeekFrom::Current(0))?; + trace!("write_entry: end index fp:{}", offset); + } + Ok(()) } pub fn write_entries(&mut self, entries: I) -> io::Result<()> @@ -329,7 +331,7 @@ impl LedgerWriter { #[derive(Debug)] pub struct LedgerReader { - data: File, + data: BufReader, } impl Iterator for LedgerReader { @@ -350,6 +352,7 @@ pub fn read_ledger(ledger_path: &str) -> io::Result) { - let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); - writer.write_entries(entries).unwrap(); - let len = writer.data.seek(SeekFrom::Current(0)).unwrap(); - writer.data.set_len(len - 4).unwrap(); + let len = { + let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); + writer.write_entries(entries).unwrap(); + writer.data.seek(SeekFrom::Current(0)).unwrap() + }; + + let data = OpenOptions::new() + .write(true) + .open(Path::new(&ledger_path).join("data")) + .unwrap(); + data.set_len(len - 4).unwrap(); } fn garbage_on_data(ledger_path: &str, entries: Vec) { @@ -749,8 +762,10 @@ mod tests { // restore last entry, tests recover_ledger() inside LedgerWriter::new() truncated_last_entry(&ledger_path, entries.clone()); - let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); - writer.write_entry(&entries[entries.len() - 1]).unwrap(); + { + let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); + writer.write_entry(&entries[entries.len() - 1]).unwrap(); + } read_ledger_check(&ledger_path, entries.clone(), entries.len()); ledger_window_check(&ledger_path, entries.clone(), entries.len()); @@ -764,8 +779,10 @@ mod tests { // make it look like data is newer in time, check writer... garbage_on_data(&ledger_path, entries[..entries.len() - 1].to_vec()); - let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); - writer.write_entry(&entries[entries.len() - 1]).unwrap(); + { + let mut writer = LedgerWriter::new(&ledger_path, false).unwrap(); + writer.write_entry(&entries[entries.len() - 1]).unwrap(); + } read_ledger_check(&ledger_path, entries.clone(), entries.len()); ledger_window_check(&ledger_path, entries.clone(), entries.len()); let _ignored = remove_dir_all(&ledger_path); @@ -778,8 +795,10 @@ mod tests { let entries = make_tiny_test_entries(10); let ledger_path = tmp_ledger_path("test_verify_ledger"); - let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); - writer.write_entries(entries.clone()).unwrap(); + { + let mut writer = LedgerWriter::new(&ledger_path, true).unwrap(); + writer.write_entries(entries.clone()).unwrap(); + } assert!(verify_ledger(&ledger_path, false).is_ok()); let _ignored = remove_dir_all(&ledger_path);