Remove most of the old ledger code

Removes LedgerWriter, read_ledger, LedgerWindow
This commit is contained in:
Stephen Akridge 2019-01-09 13:17:58 -08:00 committed by sakridge
parent 45b4cf2887
commit 045c5e8556
2 changed files with 4 additions and 650 deletions

View File

@ -109,7 +109,7 @@ mod tests {
use crate::chacha::chacha_cbc_encrypt_ledger;
use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys;
use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT};
use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries, LedgerWriter};
use crate::ledger::{get_tmp_ledger_path, make_tiny_test_entries};
use crate::replicator::sample_file;
use solana_sdk::hash::Hash;
use std::fs::{remove_dir_all, remove_file};
@ -123,10 +123,6 @@ mod tests {
let entries = make_tiny_test_entries(32);
let ledger_dir = "test_encrypt_file_many_keys_single";
let ledger_path = get_tmp_ledger_path(ledger_dir);
{
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(&entries).unwrap();
}
let db_ledger = Arc::new(DbLedger::open(&ledger_path).unwrap());
db_ledger
.write_entries(DEFAULT_SLOT_HEIGHT, 0, &entries)
@ -161,10 +157,6 @@ mod tests {
let entries = make_tiny_test_entries(32);
let ledger_dir = "test_encrypt_file_many_keys_multiple";
let ledger_path = get_tmp_ledger_path(ledger_dir);
{
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(&entries).unwrap();
}
let db_ledger = Arc::new(DbLedger::open(&ledger_path).unwrap());
db_ledger
.write_entries(DEFAULT_SLOT_HEIGHT, 0, &entries)

View File

@ -6,9 +6,8 @@ use crate::db_ledger::{DbLedger, DEFAULT_SLOT_HEIGHT};
use crate::entry::Entry;
use crate::mint::Mint;
use crate::packet::{Blob, SharedBlob, BLOB_DATA_SIZE};
use bincode::{self, deserialize_from, serialize_into, serialized_size};
use bincode::{self, serialized_size};
use chrono::prelude::Utc;
use log::Level::Trace;
use rayon::prelude::*;
use solana_sdk::budget_transaction::BudgetTransaction;
use solana_sdk::hash::{hash, Hash};
@ -17,436 +16,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::transaction::Transaction;
use solana_sdk::vote_program::Vote;
use solana_sdk::vote_transaction::VoteTransaction;
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*;
use std::io::{self, BufReader, BufWriter, Seek, SeekFrom};
use std::mem::size_of;
use std::path::Path;
//
// A persistent ledger is 2 files:
// ledger_path/ --+
// +-- index <== an array of u64 offsets into data,
// | each offset points to the first bytes
// | of a u64 that contains the length of
// | the entry. To make the code smaller,
// | index[0] is set to 0, TODO: this field
// | could later be used for other stuff...
// +-- data <== concatenated instances of
// u64 length
// entry data
//
// When opening a ledger, we have the ability to "audit" it, which means we need
// to pick which file to use as "truth", and correct the other file as
// necessary, if possible.
//
// The protocol for writing the ledger is to append to the data file first, the
// index file 2nd. If the writing node is interupted while appending to the
// ledger, there are some possibilities we need to cover:
//
// 1. a partial write of data, which might be a partial write of length
// or a partial write entry data
// 2. a partial or missing write to index for that entry
//
// There is also the possibility of "unsynchronized" reading of the ledger
// during transfer across nodes via rsync (or whatever). In this case, if the
// transfer of the data file is done before the transfer of the index file,
// it's likely that the index file will be far ahead of the data file in time.
//
// The quickest and most reliable strategy for recovery is therefore to treat
// the data file as nearest to the "truth".
//
// The logic for "recovery/audit" is to open index and read backwards from the
// last u64-aligned entry to get to where index and data agree (i.e. where a
// successful deserialization of an entry can be performed), then truncate
// both files to this syncrhonization point.
//
// ledger window
#[derive(Debug)]
struct LedgerWindow {
index: BufReader<File>,
data: BufReader<File>,
}
const LEDGER_DATA_FILE: &str = "data";
const LEDGER_INDEX_FILE: &str = "index";
// use a CONST because there's a cast, and we don't want "sizeof::<u64> as u64"...
const SIZEOF_U64: u64 = size_of::<u64>() as u64;
#[allow(clippy::needless_pass_by_value)]
fn err_bincode_to_io(e: Box<bincode::ErrorKind>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e.to_string())
}
fn read_entry<A: Read>(file: &mut A, len: u64) -> io::Result<Entry> {
deserialize_from(file.take(len)).map_err(err_bincode_to_io)
}
fn entry_at<A: Read + Seek>(file: &mut A, at: u64) -> io::Result<Entry> {
let len = u64_at(file, at)?;
read_entry(file, len)
}
fn next_entry<A: Read>(file: &mut A) -> io::Result<Entry> {
let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?;
read_entry(file, len)
}
fn u64_at<A: Read + Seek>(file: &mut A, at: u64) -> io::Result<u64> {
file.seek(SeekFrom::Start(at))?;
deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
}
#[allow(dead_code)]
impl LedgerWindow {
// opens a Ledger in directory, provides "infinite" window
//
fn open(ledger_path: &str) -> io::Result<Self> {
let ledger_path = Path::new(&ledger_path);
let index = File::open(ledger_path.join(LEDGER_INDEX_FILE))?;
let index = BufReader::new(index);
let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?;
let data = BufReader::with_capacity(BLOB_DATA_SIZE, data);
Ok(LedgerWindow { index, data })
}
fn get_entry(&mut self, index: u64) -> io::Result<Entry> {
let offset = self.get_entry_offset(index)?;
entry_at(&mut self.data, offset)
}
// Fill 'buf' with num_entries or most number of whole entries that fit into buf.len()
//
// Return tuple of (number of entries read, total size of entries read)
fn get_entries_bytes(
&mut self,
start_index: u64,
num_entries: u64,
buf: &mut [u8],
) -> io::Result<(u64, u64)> {
let start_offset = self.get_entry_offset(start_index)?;
let mut total_entries = 0;
let mut end_offset = 0;
for i in 0..num_entries {
let offset = self.get_entry_offset(start_index + i)?;
let len = u64_at(&mut self.data, offset)?;
let cur_end_offset = offset + len + SIZEOF_U64;
if (cur_end_offset - start_offset) > buf.len() as u64 {
break;
}
end_offset = cur_end_offset;
total_entries += 1;
}
if total_entries == 0 {
return Ok((0, 0));
}
let read_len = end_offset - start_offset;
self.data.seek(SeekFrom::Start(start_offset))?;
let fread_len = self.data.read(&mut buf[..read_len as usize])? as u64;
if fread_len != read_len {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"entry read_len({}) doesn't match expected ({})",
fread_len, read_len
),
));
}
Ok((total_entries, read_len))
}
fn get_entry_offset(&mut self, index: u64) -> io::Result<u64> {
u64_at(&mut self.index, index * SIZEOF_U64)
}
}
pub fn verify_ledger(ledger_path: &str) -> io::Result<()> {
let ledger_path = Path::new(&ledger_path);
let index = File::open(ledger_path.join(LEDGER_INDEX_FILE))?;
let index_len = index.metadata()?.len();
if index_len % SIZEOF_U64 != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("index is not a multiple of {} bytes long", SIZEOF_U64),
))?;
}
let mut index = BufReader::new(index);
let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?;
let mut data = BufReader::with_capacity(BLOB_DATA_SIZE, data);
let mut last_data_offset = 0;
let mut index_offset = 0;
let mut data_read = 0;
let mut last_len = 0;
let mut i = 0;
while index_offset < index_len {
let data_offset = u64_at(&mut index, index_offset)?;
if last_data_offset + last_len != data_offset {
Err(io::Error::new(
io::ErrorKind::Other,
format!(
"at entry[{}], a gap or an overlap last_offset {} offset {} last_len {}",
i, last_data_offset, data_offset, last_len
),
))?;
}
match entry_at(&mut data, data_offset) {
Err(e) => Err(io::Error::new(
io::ErrorKind::Other,
format!(
"entry[{}] deserialize() failed at offset {}, err: {}",
index_offset / SIZEOF_U64,
data_offset,
e.to_string(),
),
))?,
Ok(entry) => {
last_len = serialized_size(&entry).map_err(err_bincode_to_io)? + SIZEOF_U64
}
}
last_data_offset = data_offset;
data_read += last_len;
index_offset += SIZEOF_U64;
i += 1;
}
let data = data.into_inner();
if data_read != data.metadata()?.len() {
Err(io::Error::new(
io::ErrorKind::Other,
"garbage on end of data file",
))?;
}
Ok(())
}
fn recover_ledger(ledger_path: &str) -> io::Result<()> {
let ledger_path = Path::new(ledger_path);
let mut index = OpenOptions::new()
.write(true)
.read(true)
.open(ledger_path.join(LEDGER_INDEX_FILE))?;
let mut data = OpenOptions::new()
.write(true)
.read(true)
.open(ledger_path.join(LEDGER_DATA_FILE))?;
// first, truncate to a multiple of SIZEOF_U64
let len = index.metadata()?.len();
if len % SIZEOF_U64 != 0 {
trace!("recover: trimming index len to {}", len - len % SIZEOF_U64);
index.set_len(len - (len % SIZEOF_U64))?;
}
// next, pull index offsets off one at a time until the last one points
// to a valid entry deserialization offset...
loop {
let len = index.metadata()?.len();
trace!("recover: index len:{}", len);
// should never happen
if len < SIZEOF_U64 {
trace!("recover: error index len {} too small", len);
Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?;
}
let offset = u64_at(&mut index, len - SIZEOF_U64)?;
trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset);
match entry_at(&mut data, offset) {
Ok(entry) => {
trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry);
let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?;
trace!("recover: entry_len: {}", entry_len);
// now trim data file to size...
data.set_len(offset + SIZEOF_U64 + entry_len)?;
trace!(
"recover: trimmed data file to {}",
offset + SIZEOF_U64 + entry_len
);
break; // all good
}
Err(_err) => {
trace!(
"recover: no entry recovered at {} {}",
offset,
_err.to_string()
);
index.set_len(len - SIZEOF_U64)?;
}
}
}
if log_enabled!(Trace) {
let num_entries = index.metadata()?.len() / SIZEOF_U64;
trace!("recover: done. {} entries", num_entries);
}
// flush everything to disk...
index.sync_all()?;
data.sync_all()
}
// TODO?? ... we could open the files on demand to support [], but today
// LedgerWindow needs "&mut self"
//
//impl Index<u64> for LedgerWindow {
// type Output = io::Result<Entry>;
//
// fn index(&mut self, index: u64) -> &io::Result<Entry> {
// match u64_at(&mut self.index, index * SIZEOF_U64) {
// Ok(offset) => &entry_at(&mut self.data, offset),
// Err(e) => &Err(e),
// }
// }
//}
#[derive(Debug)]
pub struct LedgerWriter {
index: BufWriter<File>,
data: BufWriter<File>,
}
impl LedgerWriter {
// recover and open the ledger for writing
pub fn recover(ledger_path: &str) -> io::Result<Self> {
recover_ledger(ledger_path)?;
LedgerWriter::open(ledger_path, false)
}
// opens or creates a LedgerWriter in ledger_path directory
pub fn open(ledger_path: &str, create: bool) -> io::Result<Self> {
let ledger_path = Path::new(&ledger_path);
if create {
let _ignored = remove_dir_all(ledger_path);
create_dir_all(ledger_path)?;
}
let index = OpenOptions::new()
.create(create)
.append(true)
.open(ledger_path.join(LEDGER_INDEX_FILE))?;
if log_enabled!(Trace) {
let len = index.metadata()?.len();
trace!("LedgerWriter::new: index fp:{}", len);
}
let index = BufWriter::new(index);
let data = OpenOptions::new()
.create(create)
.append(true)
.open(ledger_path.join(LEDGER_DATA_FILE))?;
if log_enabled!(Trace) {
let len = data.metadata()?.len();
trace!("LedgerWriter::new: data fp:{}", len);
}
let data = BufWriter::new(data);
Ok(LedgerWriter { index, data })
}
fn write_entry_noflush(&mut self, entry: &Entry) -> io::Result<()> {
let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
if log_enabled!(Trace) {
let offset = self.data.seek(SeekFrom::Current(0))?;
trace!("write_entry: after len data fp:{}", offset);
}
serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?;
if log_enabled!(Trace) {
let offset = self.data.seek(SeekFrom::Current(0))?;
trace!("write_entry: after entry data fp:{}", offset);
}
let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64;
trace!("write_entry: offset:{} len:{}", offset, len);
serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?;
if log_enabled!(Trace) {
let offset = self.index.seek(SeekFrom::Current(0))?;
trace!("write_entry: end index fp:{}", offset);
}
Ok(())
}
pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
self.write_entry_noflush(&entry)?;
self.index.flush()?;
self.data.flush()?;
Ok(())
}
pub fn write_entries<'a, I>(&mut self, entries: I) -> io::Result<()>
where
I: IntoIterator<Item = &'a Entry>,
{
for entry in entries {
self.write_entry_noflush(entry)?;
}
self.index.flush()?;
self.data.flush()?;
Ok(())
}
}
#[derive(Debug)]
pub struct LedgerReader {
data: BufReader<File>,
}
impl Iterator for LedgerReader {
type Item = io::Result<Entry>;
fn next(&mut self) -> Option<io::Result<Entry>> {
match next_entry(&mut self.data) {
Ok(entry) => Some(Ok(entry)),
Err(_) => None,
}
}
}
/// Return an iterator for all the entries in the given file.
pub fn read_ledger(
ledger_path: &str,
recover: bool,
) -> io::Result<impl Iterator<Item = io::Result<Entry>>> {
if recover {
recover_ledger(ledger_path)?;
}
let ledger_path = Path::new(&ledger_path);
let data = File::open(ledger_path.join(LEDGER_DATA_FILE))?;
let data = BufReader::new(data);
Ok(LedgerReader { data })
}
use std::fs::remove_dir_all;
// a Block is a slice of Entries
pub trait Block {
@ -732,7 +302,7 @@ mod tests {
use super::*;
use crate::entry::{next_entry, reconstruct_entries_from_blobs, Entry};
use crate::packet::{to_blobs, BLOB_DATA_SIZE, PACKET_DATA_SIZE};
use bincode::{deserialize, serialized_size};
use bincode::serialized_size;
use solana_sdk::hash::hash;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::transaction::Transaction;
@ -869,212 +439,4 @@ mod tests {
assert!(entries0.verify(&id));
}
#[test]
fn test_ledger_reader_writer() {
solana_logger::setup();
let ledger_path = get_tmp_ledger_path("test_ledger_reader_writer");
let entries = make_tiny_test_entries(10);
{
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(&entries.clone()).unwrap();
// drops writer, flushes buffers
}
verify_ledger(&ledger_path).unwrap();
let mut read_entries = vec![];
for x in read_ledger(&ledger_path, true).unwrap() {
let entry = x.unwrap();
trace!("entry... {:?}", entry);
read_entries.push(entry);
}
assert_eq!(read_entries, entries);
let mut window = LedgerWindow::open(&ledger_path).unwrap();
for (i, entry) in entries.iter().enumerate() {
let read_entry = window.get_entry(i as u64).unwrap();
assert_eq!(*entry, read_entry);
}
assert!(window.get_entry(100).is_err());
std::fs::remove_file(Path::new(&ledger_path).join(LEDGER_DATA_FILE)).unwrap();
// empty data file should fall over
assert!(LedgerWindow::open(&ledger_path).is_err());
assert!(read_ledger(&ledger_path, false).is_err());
std::fs::remove_dir_all(ledger_path).unwrap();
}
fn truncated_last_entry(ledger_path: &str, entries: Vec<Entry>) {
let len = {
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(&entries).unwrap();
writer.data.seek(SeekFrom::Current(0)).unwrap()
};
verify_ledger(&ledger_path).unwrap();
let data = OpenOptions::new()
.write(true)
.open(Path::new(&ledger_path).join(LEDGER_DATA_FILE))
.unwrap();
data.set_len(len - 4).unwrap();
}
fn garbage_on_data(ledger_path: &str, entries: Vec<Entry>) {
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(&entries).unwrap();
writer.data.write_all(b"hi there!").unwrap();
}
fn read_ledger_check(ledger_path: &str, entries: Vec<Entry>, len: usize) {
let read_entries = read_ledger(&ledger_path, true).unwrap();
let mut i = 0;
for entry in read_entries {
assert_eq!(entry.unwrap(), entries[i]);
i += 1;
}
assert_eq!(i, len);
}
fn ledger_window_check(ledger_path: &str, entries: Vec<Entry>, len: usize) {
let mut window = LedgerWindow::open(&ledger_path).unwrap();
for i in 0..len {
let entry = window.get_entry(i as u64);
assert_eq!(entry.unwrap(), entries[i]);
}
}
#[test]
fn test_recover_ledger() {
solana_logger::setup();
let entries = make_tiny_test_entries(10);
let ledger_path = get_tmp_ledger_path("test_recover_ledger");
// truncate data file, tests recover inside read_ledger_check()
truncated_last_entry(&ledger_path, entries.clone());
read_ledger_check(&ledger_path, entries.clone(), entries.len() - 1);
// truncate data file, tests recover inside LedgerWindow::new()
truncated_last_entry(&ledger_path, entries.clone());
ledger_window_check(&ledger_path, entries.clone(), entries.len() - 1);
// restore last entry, tests recover_ledger() inside LedgerWriter::new()
truncated_last_entry(&ledger_path, entries.clone());
// verify should fail at first
assert!(verify_ledger(&ledger_path).is_err());
{
let mut writer = LedgerWriter::recover(&ledger_path).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
}
// and be fine after recover()
verify_ledger(&ledger_path).unwrap();
read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
// make it look like data is newer in time, check reader...
garbage_on_data(&ledger_path, entries.clone());
read_ledger_check(&ledger_path, entries.clone(), entries.len());
// make it look like data is newer in time, check window...
garbage_on_data(&ledger_path, entries.clone());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
// make it look like data is newer in time, check writer...
garbage_on_data(&ledger_path, entries[..entries.len() - 1].to_vec());
assert!(verify_ledger(&ledger_path).is_err());
{
let mut writer = LedgerWriter::recover(&ledger_path).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
}
verify_ledger(&ledger_path).unwrap();
read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
let _ignored = remove_dir_all(&ledger_path);
}
#[test]
fn test_verify_ledger() {
solana_logger::setup();
let entries = make_tiny_test_entries(10);
let ledger_path = get_tmp_ledger_path("test_verify_ledger");
{
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(&entries).unwrap();
}
// TODO more cases that make ledger_verify() fail
// assert!(verify_ledger(&ledger_path).is_err());
assert!(verify_ledger(&ledger_path).is_ok());
let _ignored = remove_dir_all(&ledger_path);
}
#[test]
fn test_get_entries_bytes() {
solana_logger::setup();
let entries = make_tiny_test_entries(10);
let ledger_path = get_tmp_ledger_path("test_raw_entries");
{
let mut writer = LedgerWriter::open(&ledger_path, true).unwrap();
writer.write_entries(&entries).unwrap();
}
let mut window = LedgerWindow::open(&ledger_path).unwrap();
let mut buf = [0; 1024];
let (num_entries, bytes) = window.get_entries_bytes(0, 1, &mut buf).unwrap();
let bytes = bytes as usize;
assert_eq!(num_entries, 1);
let entry: Entry = deserialize(&buf[size_of::<u64>()..bytes]).unwrap();
assert_eq!(entry, entries[0]);
let (num_entries, bytes2) = window.get_entries_bytes(0, 2, &mut buf).unwrap();
let bytes2 = bytes2 as usize;
assert_eq!(num_entries, 2);
assert!(bytes2 > bytes);
for (i, ref entry) in entries.iter().enumerate() {
info!("entry[{}] = {:?}", i, entry.id);
}
let entry: Entry = deserialize(&buf[size_of::<u64>()..bytes]).unwrap();
assert_eq!(entry, entries[0]);
let entry: Entry = deserialize(&buf[bytes + size_of::<u64>()..bytes2]).unwrap();
assert_eq!(entry, entries[1]);
// buf size part-way into entry[1], should just return entry[0]
let mut buf = vec![0; bytes + size_of::<u64>() + 1];
let (num_entries, bytes3) = window.get_entries_bytes(0, 2, &mut buf).unwrap();
assert_eq!(num_entries, 1);
let bytes3 = bytes3 as usize;
assert_eq!(bytes3, bytes);
let mut buf = vec![0; bytes2 - 1];
let (num_entries, bytes4) = window.get_entries_bytes(0, 2, &mut buf).unwrap();
assert_eq!(num_entries, 1);
let bytes4 = bytes4 as usize;
assert_eq!(bytes4, bytes);
let mut buf = vec![0; bytes + size_of::<u64>() - 1];
let (num_entries, bytes5) = window.get_entries_bytes(0, 2, &mut buf).unwrap();
assert_eq!(num_entries, 1);
let bytes5 = bytes5 as usize;
assert_eq!(bytes5, bytes);
let mut buf = vec![0; bytes * 2];
let (num_entries, bytes6) = window.get_entries_bytes(9, 1, &mut buf).unwrap();
assert_eq!(num_entries, 1);
let bytes6 = bytes6 as usize;
let entry: Entry = deserialize(&buf[size_of::<u64>()..bytes6]).unwrap();
assert_eq!(entry, entries[9]);
// Read out of range
assert!(window.get_entries_bytes(20, 2, &mut buf).is_err());
let _ignored = remove_dir_all(&ledger_path);
}
}