add some flushing to ledger

This commit is contained in:
Rob Walker 2018-08-09 18:15:05 -07:00
parent 846ad61941
commit bbf9ea89c5
2 changed files with 26 additions and 6 deletions

View File

@ -119,8 +119,7 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> {
recover_ledger(ledger_path)?;
}
let mut index = File::open(ledger_path.join("index"))?;
let mut data = File::open(ledger_path.join("data"))?;
let index = File::open(ledger_path.join("index"))?;
let index_len = index.metadata()?.len();
@ -130,6 +129,10 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> {
"expected back-to-back entries",
))?;
}
let mut index = BufReader::with_capacity((WINDOW_SIZE * SIZEOF_U64) as usize, index);
let data = File::open(ledger_path.join("data"))?;
let mut data = BufReader::with_capacity(WINDOW_SIZE as usize * BLOB_DATA_SIZE, data);
let mut last_data_offset = 0;
let mut index_offset = 0;
@ -142,7 +145,8 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> {
if last_data_offset + last_len != data_offset {
Err(io::Error::new(
io::ErrorKind::Other,
"expected back-to-back entries",
format!("expected back-to-back entries... entry[{}] has a gap of {} bytes to the previous entry",
index_offset/SIZEOF_U64, data_offset as i64 - (last_data_offset as i64 + last_len as i64))
))?;
}
@ -153,6 +157,7 @@ pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> {
data_read += last_len;
index_offset += SIZEOF_U64;
}
let data = data.into_inner();
if data_read != data.metadata()?.len() {
Err(io::Error::new(
io::ErrorKind::Other,
@ -291,7 +296,7 @@ impl LedgerWriter {
Ok(LedgerWriter { index, data })
}
pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> {
let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
@ -318,13 +323,22 @@ impl LedgerWriter {
Ok(())
}
pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
self.write_entry_noflush(&entry)?;
self.index.flush()?;
self.data.flush()?;
Ok(())
}
pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()>
where
I: IntoIterator<Item = Entry>,
{
for entry in entries {
self.write_entry(&entry)?;
self.write_entry_noflush(&entry)?;
}
self.index.flush()?;
self.data.flush()?;
Ok(())
}
}
@ -672,6 +686,7 @@ mod tests {
writer.write_entries(entries.clone()).unwrap();
// drops writer, flushes buffers
}
verify_ledger(&ledger_path, false).unwrap();
let mut read_entries = vec![];
for x in read_ledger(&ledger_path).unwrap() {
@ -703,6 +718,7 @@ mod tests {
writer.write_entries(entries).unwrap();
writer.data.seek(SeekFrom::Current(0)).unwrap()
};
verify_ledger(&ledger_path, false).unwrap();
let data = OpenOptions::new()
.write(true)
@ -758,6 +774,8 @@ mod tests {
let mut writer = LedgerWriter::new(&ledger_path, false).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
}
verify_ledger(&ledger_path, false).unwrap();
read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
@ -775,6 +793,7 @@ mod tests {
let mut writer = LedgerWriter::new(&ledger_path, false).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
}
verify_ledger(&ledger_path, false).unwrap();
read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
let _ignored = remove_dir_all(&ledger_path);

View File

@ -43,8 +43,9 @@ impl WriteStage {
let votes = entries_to_votes(&entries);
crdt.write().unwrap().insert_votes(&votes);
ledger_writer.write_entries(entries.clone())?;
for entry in entries.clone() {
ledger_writer.write_entry(&entry)?;
if !entry.has_more {
bank.register_entry_id(&entry.id);
}