diff --git a/src/chacha_cuda.rs b/src/chacha_cuda.rs index a7cc03ac91..974521deb4 100644 --- a/src/chacha_cuda.rs +++ b/src/chacha_cuda.rs @@ -109,7 +109,7 @@ mod tests { use crate::chacha::chacha_cbc_encrypt_ledger; use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; - use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, LedgerWriter}; + use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries}; use crate::replicator::sample_file; use solana_sdk::hash::Hash; use std::fs::{remove_dir_all, remove_file}; @@ -123,10 +123,6 @@ mod tests { let entries = make_tiny_test_entries(32); let ledger_dir = "test_encrypt_file_many_keys_single"; let ledger_path = get_tmp_ledger_path(ledger_dir); - { - let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(&entries).unwrap(); - } let db_ledger = Arc::new(DbLedger::open(&ledger_path).unwrap()); db_ledger .write_entries(DEFAULT_SLOT_HEIGHT, 0, &entries) @@ -161,10 +157,6 @@ mod tests { let entries = make_tiny_test_entries(32); let ledger_dir = "test_encrypt_file_many_keys_multiple"; let ledger_path = get_tmp_ledger_path(ledger_dir); - { - let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(&entries).unwrap(); - } let db_ledger = Arc::new(DbLedger::open(&ledger_path).unwrap()); db_ledger .write_entries(DEFAULT_SLOT_HEIGHT, 0, &entries) diff --git a/src/ledger.rs b/src/ledger.rs index 2d76c8b080..852f77a8f7 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -6,9 +6,8 @@ use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT}; use crate::entry::Entry; use crate::mint::Mint; use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE}; -use bincode::{self, deserialize_from, serialize_into, serialized_size}; +use bincode::{self, serialized_size}; use chrono::prelude::Utc; -use log::Level::Trace; use rayon::prelude::*; use solana_sdk::budget_transaction::BudgetTransaction; use solana_sdk::hash::{hash, Hash}; @@ -17,436 +16,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Transaction; use solana_sdk::vote_program::Vote; use solana_sdk::vote_transaction::VoteTransaction; -use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions}; -use std::io::prelude::*; -use std::io::{self, BufReader, BufWriter, Seek, SeekFrom}; -use std::mem::size_of; -use std::path::Path; - -// -// A persistent ledger is 2 files: -// ledger_path/ --+ -// +-- index <== an array of u64 offsets into data, -// | each offset points to the first bytes -// | of a u64 that contains the length of -// | the entry. To make the code smaller, -// | index[0] is set to 0, TODO: this field -// | could later be used for other stuff... -// +-- data <== concatenated instances of -// u64 length -// entry data -// -// When opening a ledger, we have the ability to "audit" it, which means we need -// to pick which file to use as "truth", and correct the other file as -// necessary, if possible. -// -// The protocol for writing the ledger is to append to the data file first, the -// index file 2nd. If the writing node is interupted while appending to the -// ledger, there are some possibilities we need to cover: -// -// 1. a partial write of data, which might be a partial write of length -// or a partial write entry data -// 2. a partial or missing write to index for that entry -// -// There is also the possibility of "unsynchronized" reading of the ledger -// during transfer across nodes via rsync (or whatever). In this case, if the -// transfer of the data file is done before the transfer of the index file, -// it's likely that the index file will be far ahead of the data file in time. -// -// The quickest and most reliable strategy for recovery is therefore to treat -// the data file as nearest to the "truth". -// -// The logic for "recovery/audit" is to open index and read backwards from the -// last u64-aligned entry to get to where index and data agree (i.e. where a -// successful deserialization of an entry can be performed), then truncate -// both files to this syncrhonization point. -// - -// ledger window -#[derive(Debug)] -struct LedgerWindow { - index: BufReader, - data: BufReader, -} - -const LEDGER_DATA_FILE: &str = "data"; -const LEDGER_INDEX_FILE: &str = "index"; - -// use a CONST because there's a cast, and we don't want "sizeof:: as u64"... -const SIZEOF_U64: u64 = size_of::() as u64; - -#[allow(clippy::needless_pass_by_value)] -fn err_bincode_to_io(e: Box) -> io::Error { - io::Error::new(io::ErrorKind::Other, e.to_string()) -} - -fn read_entry(file: &mut A, len: u64) -> io::Result { - deserialize_from(file.take(len)).map_err(err_bincode_to_io) -} - -fn entry_at(file: &mut A, at: u64) -> io::Result { - let len = u64_at(file, at)?; - - read_entry(file, len) -} - -fn next_entry(file: &mut A) -> io::Result { - let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?; - read_entry(file, len) -} - -fn u64_at(file: &mut A, at: u64) -> io::Result { - file.seek(SeekFrom::Start(at))?; - deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) -} - -#[allow(dead_code)] -impl LedgerWindow { - // opens a Ledger in directory, provides "infinite" window - // - fn open(ledger_path: &str) -> io::Result { - let ledger_path = Path::new(&ledger_path); - - let index = File::open(ledger_path.join(LEDGER_INDEX_FILE))?; - let index = BufReader::new(index); - let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?; - let data = BufReader::with_capacity(BLOB_DATA_SIZE, data); - - Ok(LedgerWindow { index, data }) - } - - fn get_entry(&mut self, index: u64) -> io::Result { - let offset = self.get_entry_offset(index)?; - entry_at(&mut self.data, offset) - } - - // Fill 'buf' with num_entries or most number of whole entries that fit into buf.len() - // - // Return tuple of (number of entries read, total size of entries read) - fn get_entries_bytes( - &mut self, - start_index: u64, - num_entries: u64, - buf: &mut [u8], - ) -> io::Result<(u64, u64)> { - let start_offset = self.get_entry_offset(start_index)?; - let mut total_entries = 0; - let mut end_offset = 0; - for i in 0..num_entries { - let offset = self.get_entry_offset(start_index + i)?; - let len = u64_at(&mut self.data, offset)?; - let cur_end_offset = offset + len + SIZEOF_U64; - if (cur_end_offset - start_offset) > buf.len() as u64 { - break; - } - end_offset = cur_end_offset; - total_entries += 1; - } - - if total_entries == 0 { - return Ok((0, 0)); - } - - let read_len = end_offset - start_offset; - self.data.seek(SeekFrom::Start(start_offset))?; - let fread_len = self.data.read(&mut buf[..read_len as usize])? as u64; - if fread_len != read_len { - return Err(io::Error::new( - io::ErrorKind::Other, - format!( - "entry read_len({}) doesn't match expected ({})", - fread_len, read_len - ), - )); - } - Ok((total_entries, read_len)) - } - - fn get_entry_offset(&mut self, index: u64) -> io::Result { - u64_at(&mut self.index, index * SIZEOF_U64) - } -} - -pub fn verify_ledger(ledger_path: &str) -> io::Result<()> { - let ledger_path = Path::new(&ledger_path); - - let index = File::open(ledger_path.join(LEDGER_INDEX_FILE))?; - - let index_len = index.metadata()?.len(); - - if index_len % SIZEOF_U64 != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("index is not a multiple of {} bytes long", SIZEOF_U64), - ))?; - } - let mut index = BufReader::new(index); - - let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?; - let mut data = BufReader::with_capacity(BLOB_DATA_SIZE, data); - - let mut last_data_offset = 0; - let mut index_offset = 0; - let mut data_read = 0; - let mut last_len = 0; - let mut i = 0; - - while index_offset < index_len { - let data_offset = u64_at(&mut index, index_offset)?; - - if last_data_offset + last_len != data_offset { - Err(io::Error::new( - io::ErrorKind::Other, - format!( - "at entry[{}], a gap or an overlap last_offset {} offset {} last_len {}", - i, last_data_offset, data_offset, last_len - ), - ))?; - } - - match entry_at(&mut data, data_offset) { - Err(e) => Err(io::Error::new( - io::ErrorKind::Other, - format!( - "entry[{}] deserialize() failed at offset {}, err: {}", - index_offset / SIZEOF_U64, - data_offset, - e.to_string(), - ), - ))?, - Ok(entry) => { - last_len = serialized_size(&entry).map_err(err_bincode_to_io)? + SIZEOF_U64 - } - } - - last_data_offset = data_offset; - data_read += last_len; - index_offset += SIZEOF_U64; - i += 1; - } - let data = data.into_inner(); - if data_read != data.metadata()?.len() { - Err(io::Error::new( - io::ErrorKind::Other, - "garbage on end of data file", - ))?; - } - Ok(()) -} - -fn recover_ledger(ledger_path: &str) -> io::Result<()> { - let ledger_path = Path::new(ledger_path); - let mut index = OpenOptions::new() - .write(true) - .read(true) - .open(ledger_path.join(LEDGER_INDEX_FILE))?; - - let mut data = OpenOptions::new() - .write(true) - .read(true) - .open(ledger_path.join(LEDGER_DATA_FILE))?; - - // first, truncate to a multiple of SIZEOF_U64 - let len = index.metadata()?.len(); - - if len % SIZEOF_U64 != 0 { - trace!("recover: trimming index len to {}", len - len % SIZEOF_U64); - index.set_len(len - (len % SIZEOF_U64))?; - } - - // next, pull index offsets off one at a time until the last one points - // to a valid entry deserialization offset... - loop { - let len = index.metadata()?.len(); - trace!("recover: index len:{}", len); - - // should never happen - if len < SIZEOF_U64 { - trace!("recover: error index len {} too small", len); - - Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?; - } - - let offset = u64_at(&mut index, len - SIZEOF_U64)?; - trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset); - - match entry_at(&mut data, offset) { - Ok(entry) => { - trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry); - - let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?; - - trace!("recover: entry_len: {}", entry_len); - - // now trim data file to size... - data.set_len(offset + SIZEOF_U64 + entry_len)?; - - trace!( - "recover: trimmed data file to {}", - offset + SIZEOF_U64 + entry_len - ); - - break; // all good - } - Err(_err) => { - trace!( - "recover: no entry recovered at {} {}", - offset, - _err.to_string() - ); - index.set_len(len - SIZEOF_U64)?; - } - } - } - if log_enabled!(Trace) { - let num_entries = index.metadata()?.len() / SIZEOF_U64; - trace!("recover: done. {} entries", num_entries); - } - - // flush everything to disk... - index.sync_all()?; - data.sync_all() -} - -// TODO?? ... we could open the files on demand to support [], but today -// LedgerWindow needs "&mut self" -// -//impl Index for LedgerWindow { -// type Output = io::Result; -// -// fn index(&mut self, index: u64) -> &io::Result { -// match u64_at(&mut self.index, index * SIZEOF_U64) { -// Ok(offset) => &entry_at(&mut self.data, offset), -// Err(e) => &Err(e), -// } -// } -//} - -#[derive(Debug)] -pub struct LedgerWriter { - index: BufWriter, - data: BufWriter, -} - -impl LedgerWriter { - // recover and open the ledger for writing - pub fn recover(ledger_path: &str) -> io::Result { - recover_ledger(ledger_path)?; - LedgerWriter::open(ledger_path, false) - } - - // opens or creates a LedgerWriter in ledger_path directory - pub fn open(ledger_path: &str, create: bool) -> io::Result { - let ledger_path = Path::new(&ledger_path); - - if create { - let _ignored = remove_dir_all(ledger_path); - create_dir_all(ledger_path)?; - } - - let index = OpenOptions::new() - .create(create) - .append(true) - .open(ledger_path.join(LEDGER_INDEX_FILE))?; - - if log_enabled!(Trace) { - let len = index.metadata()?.len(); - trace!("LedgerWriter::new: index fp:{}", len); - } - let index = BufWriter::new(index); - - let data = OpenOptions::new() - .create(create) - .append(true) - .open(ledger_path.join(LEDGER_DATA_FILE))?; - - if log_enabled!(Trace) { - let len = data.metadata()?.len(); - trace!("LedgerWriter::new: data fp:{}", len); - } - let data = BufWriter::new(data); - - Ok(LedgerWriter { index, data }) - } - - fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> { - let len = serialized_size(&entry).map_err(err_bincode_to_io)?; - - serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?; - if log_enabled!(Trace) { - let offset = self.data.seek(SeekFrom::Current(0))?; - trace!("write_entry: after len data fp:{}", offset); - } - - serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?; - if log_enabled!(Trace) { - let offset = self.data.seek(SeekFrom::Current(0))?; - trace!("write_entry: after entry data fp:{}", offset); - } - - let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64; - trace!("write_entry: offset:{} len:{}", offset, len); - - serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?; - - if log_enabled!(Trace) { - let offset = self.index.seek(SeekFrom::Current(0))?; - trace!("write_entry: end index fp:{}", offset); - } - Ok(()) - } - - pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { - self.write_entry_noflush(&entry)?; - self.index.flush()?; - self.data.flush()?; - Ok(()) - } - - pub fn write_entries<'a, I>(&mut self, entries: I) -> io::Result<()> - where - I: IntoIterator, - { - for entry in entries { - self.write_entry_noflush(entry)?; - } - self.index.flush()?; - self.data.flush()?; - Ok(()) - } -} - -#[derive(Debug)] -pub struct LedgerReader { - data: BufReader, -} - -impl Iterator for LedgerReader { - type Item = io::Result; - - fn next(&mut self) -> Option> { - match next_entry(&mut self.data) { - Ok(entry) => Some(Ok(entry)), - Err(_) => None, - } - } -} - -/// Return an iterator for all the entries in the given file. -pub fn read_ledger( - ledger_path: &str, - recover: bool, -) -> io::Result>> { - if recover { - recover_ledger(ledger_path)?; - } - - let ledger_path = Path::new(&ledger_path); - let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?; - let data = BufReader::new(data); - - Ok(LedgerReader { data }) -} +use std::fs::remove_dir_all; // a Block is a slice of Entries pub trait Block { @@ -732,7 +302,7 @@ mod tests { use super::*; use crate::entry::{next_entry, reconstruct_entries_from_blobs, Entry}; use crate::packet::{to_blobs, BLOB_DATA_SIZE, PACKET_DATA_SIZE}; - use bincode::{deserialize, serialized_size}; + use bincode::serialized_size; use solana_sdk::hash::hash; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::transaction::Transaction; @@ -869,212 +439,4 @@ mod tests { assert!(entries0.verify(&id)); } - #[test] - fn test_ledger_reader_writer() { - solana_logger::setup(); - let ledger_path = get_tmp_ledger_path("test_ledger_reader_writer"); - let entries = make_tiny_test_entries(10); - - { - let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(&entries.clone()).unwrap(); - // drops writer, flushes buffers - } - verify_ledger(&ledger_path).unwrap(); - - let mut read_entries = vec![]; - for x in read_ledger(&ledger_path, true).unwrap() { - let entry = x.unwrap(); - trace!("entry... {:?}", entry); - read_entries.push(entry); - } - assert_eq!(read_entries, entries); - - let mut window = LedgerWindow::open(&ledger_path).unwrap(); - - for (i, entry) in entries.iter().enumerate() { - let read_entry = window.get_entry(i as u64).unwrap(); - assert_eq!(*entry, read_entry); - } - assert!(window.get_entry(100).is_err()); - - std::fs::remove_file(Path::new(&ledger_path).join(LEDGER_DATA_FILE)).unwrap(); - // empty data file should fall over - assert!(LedgerWindow::open(&ledger_path).is_err()); - assert!(read_ledger(&ledger_path, false).is_err()); - - std::fs::remove_dir_all(ledger_path).unwrap(); - } - - fn truncated_last_entry(ledger_path: &str, entries: Vec) { - let len = { - let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(&entries).unwrap(); - writer.data.seek(SeekFrom::Current(0)).unwrap() - }; - verify_ledger(&ledger_path).unwrap(); - - let data = OpenOptions::new() - .write(true) - .open(Path::new(&ledger_path).join(LEDGER_DATA_FILE)) - .unwrap(); - data.set_len(len - 4).unwrap(); - } - - fn garbage_on_data(ledger_path: &str, entries: Vec) { - let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(&entries).unwrap(); - writer.data.write_all(b"hi there!").unwrap(); - } - - fn read_ledger_check(ledger_path: &str, entries: Vec, len: usize) { - let read_entries = read_ledger(&ledger_path, true).unwrap(); - let mut i = 0; - - for entry in read_entries { - assert_eq!(entry.unwrap(), entries[i]); - i += 1; - } - assert_eq!(i, len); - } - - fn ledger_window_check(ledger_path: &str, entries: Vec, len: usize) { - let mut window = LedgerWindow::open(&ledger_path).unwrap(); - for i in 0..len { - let entry = window.get_entry(i as u64); - assert_eq!(entry.unwrap(), entries[i]); - } - } - - #[test] - fn test_recover_ledger() { - solana_logger::setup(); - - let entries = make_tiny_test_entries(10); - let ledger_path = get_tmp_ledger_path("test_recover_ledger"); - - // truncate data file, tests recover inside read_ledger_check() - truncated_last_entry(&ledger_path, entries.clone()); - read_ledger_check(&ledger_path, entries.clone(), entries.len() - 1); - - // truncate data file, tests recover inside LedgerWindow::new() - truncated_last_entry(&ledger_path, entries.clone()); - ledger_window_check(&ledger_path, entries.clone(), entries.len() - 1); - - // restore last entry, tests recover_ledger() inside LedgerWriter::new() - truncated_last_entry(&ledger_path, entries.clone()); - // verify should fail at first - assert!(verify_ledger(&ledger_path).is_err()); - { - let mut writer = LedgerWriter::recover(&ledger_path).unwrap(); - writer.write_entry(&entries[entries.len() - 1]).unwrap(); - } - // and be fine after recover() - verify_ledger(&ledger_path).unwrap(); - - read_ledger_check(&ledger_path, entries.clone(), entries.len()); - ledger_window_check(&ledger_path, entries.clone(), entries.len()); - - // make it look like data is newer in time, check reader... - garbage_on_data(&ledger_path, entries.clone()); - read_ledger_check(&ledger_path, entries.clone(), entries.len()); - - // make it look like data is newer in time, check window... - garbage_on_data(&ledger_path, entries.clone()); - ledger_window_check(&ledger_path, entries.clone(), entries.len()); - - // make it look like data is newer in time, check writer... - garbage_on_data(&ledger_path, entries[..entries.len() - 1].to_vec()); - assert!(verify_ledger(&ledger_path).is_err()); - { - let mut writer = LedgerWriter::recover(&ledger_path).unwrap(); - writer.write_entry(&entries[entries.len() - 1]).unwrap(); - } - verify_ledger(&ledger_path).unwrap(); - read_ledger_check(&ledger_path, entries.clone(), entries.len()); - ledger_window_check(&ledger_path, entries.clone(), entries.len()); - let _ignored = remove_dir_all(&ledger_path); - } - - #[test] - fn test_verify_ledger() { - solana_logger::setup(); - - let entries = make_tiny_test_entries(10); - let ledger_path = get_tmp_ledger_path("test_verify_ledger"); - { - let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(&entries).unwrap(); - } - // TODO more cases that make ledger_verify() fail - // assert!(verify_ledger(&ledger_path).is_err()); - - assert!(verify_ledger(&ledger_path).is_ok()); - let _ignored = remove_dir_all(&ledger_path); - } - - #[test] - fn test_get_entries_bytes() { - solana_logger::setup(); - let entries = make_tiny_test_entries(10); - let ledger_path = get_tmp_ledger_path("test_raw_entries"); - { - let mut writer = LedgerWriter::open(&ledger_path, true).unwrap(); - writer.write_entries(&entries).unwrap(); - } - - let mut window = LedgerWindow::open(&ledger_path).unwrap(); - let mut buf = [0; 1024]; - let (num_entries, bytes) = window.get_entries_bytes(0, 1, &mut buf).unwrap(); - let bytes = bytes as usize; - assert_eq!(num_entries, 1); - let entry: Entry = deserialize(&buf[size_of::()..bytes]).unwrap(); - assert_eq!(entry, entries[0]); - - let (num_entries, bytes2) = window.get_entries_bytes(0, 2, &mut buf).unwrap(); - let bytes2 = bytes2 as usize; - assert_eq!(num_entries, 2); - assert!(bytes2 > bytes); - for (i, ref entry) in entries.iter().enumerate() { - info!("entry[{}] = {:?}", i, entry.id); - } - - let entry: Entry = deserialize(&buf[size_of::()..bytes]).unwrap(); - assert_eq!(entry, entries[0]); - - let entry: Entry = deserialize(&buf[bytes + size_of::()..bytes2]).unwrap(); - assert_eq!(entry, entries[1]); - - // buf size part-way into entry[1], should just return entry[0] - let mut buf = vec![0; bytes + size_of::() + 1]; - let (num_entries, bytes3) = window.get_entries_bytes(0, 2, &mut buf).unwrap(); - assert_eq!(num_entries, 1); - let bytes3 = bytes3 as usize; - assert_eq!(bytes3, bytes); - - let mut buf = vec![0; bytes2 - 1]; - let (num_entries, bytes4) = window.get_entries_bytes(0, 2, &mut buf).unwrap(); - assert_eq!(num_entries, 1); - let bytes4 = bytes4 as usize; - assert_eq!(bytes4, bytes); - - let mut buf = vec![0; bytes + size_of::() - 1]; - let (num_entries, bytes5) = window.get_entries_bytes(0, 2, &mut buf).unwrap(); - assert_eq!(num_entries, 1); - let bytes5 = bytes5 as usize; - assert_eq!(bytes5, bytes); - - let mut buf = vec![0; bytes * 2]; - let (num_entries, bytes6) = window.get_entries_bytes(9, 1, &mut buf).unwrap(); - assert_eq!(num_entries, 1); - let bytes6 = bytes6 as usize; - - let entry: Entry = deserialize(&buf[size_of::()..bytes6]).unwrap(); - assert_eq!(entry, entries[9]); - - // Read out of range - assert!(window.get_entries_bytes(20, 2, &mut buf).is_err()); - - let _ignored = remove_dir_all(&ledger_path); - } }