solana/src/ledger.rs

505 lines
16 KiB
Rust
Raw Normal View History

2018-03-30 10:43:38 -07:00
//! The `ledger` module provides functions for parallel verification of the
2018-03-29 11:20:54 -07:00
//! Proof of History ledger.
2018-08-01 14:15:51 -07:00
use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size};
use entry::Entry;
use hash::Hash;
use packet::{self, SharedBlob, BLOB_DATA_SIZE};
2018-03-04 06:28:51 -08:00
use rayon::prelude::*;
2018-07-25 12:15:26 -07:00
use result::{Error, Result};
use std::collections::VecDeque;
use std::fs::{create_dir_all, File, OpenOptions};
use std::io::prelude::*;
2018-08-01 14:15:51 -07:00
use std::io::{self, Cursor, Seek, SeekFrom};
use std::mem::size_of;
use std::path::Path;
2018-06-26 15:20:07 -07:00
use transaction::Transaction;
// ledger window
2018-08-01 10:01:17 -07:00
#[derive(Debug)]
pub struct LedgerWindow {
index: File,
data: File,
}
2018-07-30 16:35:03 -07:00
2018-08-01 10:01:17 -07:00
// use a CONST because there's a cast, and we don't want "sizeof::<u64> as u64"...
const SIZEOF_U64: u64 = size_of::<u64>() as u64;
2018-08-01 14:15:51 -07:00
const SIZEOF_USIZE: u64 = size_of::<usize>() as u64;
2018-08-01 14:34:28 -07:00
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
2018-08-01 14:15:51 -07:00
fn err_bincode_to_io(e: Box<bincode::ErrorKind>) -> io::Error {
io::Error::new(io::ErrorKind::Other, e.to_string())
}
fn entry_at(file: &mut File, at: u64) -> io::Result<Entry> {
file.seek(SeekFrom::Start(at))?;
let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?;
deserialize_from(file.take(len)).map_err(err_bincode_to_io)
}
fn next_offset(file: &mut File) -> io::Result<u64> {
deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
2018-08-02 10:18:59 -07:00
}
// unused, but would work for the iterator if we only have the data file...
//
//fn next_entry(file: &mut File) -> io::Result<Entry> {
// let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?;
// deserialize_from(file.take(len)).map_err(err_bincode_to_io)
//}
2018-08-01 14:15:51 -07:00
fn u64_at(file: &mut File, at: u64) -> io::Result<u64> {
file.seek(SeekFrom::Start(at))?;
deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
}
2018-08-01 10:01:17 -07:00
impl LedgerWindow {
// opens a Ledger in directory, provides "infinite" window
2018-08-01 11:02:20 -07:00
pub fn new(directory: &str) -> io::Result<Self> {
let directory = Path::new(&directory);
2018-07-30 16:35:03 -07:00
let index = File::open(directory.join("index"))?;
let data = File::open(directory.join("data"))?;
2018-07-30 16:35:03 -07:00
Ok(LedgerWindow { index, data })
2018-07-30 16:35:03 -07:00
}
pub fn get_entry(&mut self, index: u64) -> io::Result<Entry> {
2018-08-01 14:15:51 -07:00
let offset = u64_at(&mut self.index, index * SIZEOF_U64)?;
entry_at(&mut self.data, offset)
2018-07-30 16:35:03 -07:00
}
}
2018-08-01 14:34:28 -07:00
// TODO?? ... we could open the files on demand to support [], but today
// LedgerWindow needs "&mut self"
//
//impl Index<u64> for LedgerWindow {
// type Output = io::Result<Entry>;
//
// fn index(&mut self, index: u64) -> &io::Result<Entry> {
// match u64_at(&mut self.index, index * SIZEOF_U64) {
// Ok(offset) => &entry_at(&mut self.data, offset),
// Err(e) => &Err(e),
// }
// }
//}
2018-08-01 10:01:17 -07:00
#[derive(Debug)]
pub struct LedgerWriter {
index: File,
data: File,
}
impl LedgerWriter {
// opens or creates a LedgerWriter in directory
2018-08-01 11:02:20 -07:00
pub fn new(directory: &str) -> io::Result<Self> {
let directory = Path::new(&directory);
create_dir_all(directory)?;
let index = OpenOptions::new()
.create(true)
.append(true)
.open(directory.join("index"))?;
let data = OpenOptions::new()
.create(true)
.append(true)
.open(directory.join("data"))?;
Ok(LedgerWriter { index, data })
2018-07-30 16:35:03 -07:00
}
fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
2018-08-01 14:15:51 -07:00
let offset = self.data.seek(SeekFrom::Current(0))?;
let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?;
self.data.flush()?;
2018-08-01 14:15:51 -07:00
serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?;
self.index.flush()
2018-07-30 16:35:03 -07:00
}
pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()>
where
I: IntoIterator<Item = Entry>,
{
for entry in entries {
self.write_entry(&entry)?;
}
Ok(())
2018-07-30 16:35:03 -07:00
}
}
2018-08-01 10:01:17 -07:00
#[derive(Debug)]
pub struct LedgerReader {
index: File,
data: File,
}
impl Iterator for LedgerReader {
2018-07-30 16:35:03 -07:00
type Item = io::Result<Entry>;
fn next(&mut self) -> Option<io::Result<Entry>> {
match next_offset(&mut self.index) {
Ok(offset) => Some(entry_at(&mut self.data, offset)),
Err(_) => None,
}
2018-07-30 16:35:03 -07:00
}
}
/// Return an iterator for all the entries in the given file.
2018-08-01 11:02:20 -07:00
pub fn read_ledger(directory: &str) -> io::Result<impl Iterator<Item = io::Result<Entry>>> {
let directory = Path::new(&directory);
let index = File::open(directory.join("index"))?;
let data = File::open(directory.join("data"))?;
2018-08-01 14:15:51 -07:00
Ok(LedgerReader { index, data })
}
pub fn copy(from: &str, to: &str) -> io::Result<()> {
2018-08-01 14:15:51 -07:00
let mut to = LedgerWriter::new(to)?;
for entry in read_ledger(from)? {
let entry = entry?;
to.write_entry(&entry)?;
}
Ok(())
}
2018-07-30 16:35:03 -07:00
// a Block is a slice of Entries
2018-04-02 10:36:51 -07:00
pub trait Block {
2018-05-25 14:51:41 -07:00
/// Verifies the hashes and counts of a slice of transactions are all consistent.
2018-04-02 10:36:51 -07:00
fn verify(&self, start_hash: &Hash) -> bool;
2018-05-29 10:52:00 -07:00
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>);
2018-04-02 10:36:51 -07:00
}
impl Block for [Entry] {
fn verify(&self, start_hash: &Hash) -> bool {
let genesis = [Entry::new_tick(0, start_hash)];
let entry_pairs = genesis.par_iter().chain(self).zip(self);
2018-07-23 16:13:24 -07:00
entry_pairs.all(|(x0, x1)| {
let r = x1.verify(&x0.id);
if !r {
warn!(
2018-07-23 16:13:24 -07:00
"entry invalid!: {:?} num txs: {}",
x1.id,
x1.transactions.len()
);
}
r
})
2018-04-02 10:36:51 -07:00
}
2018-05-29 10:52:00 -07:00
fn to_blobs(&self, blob_recycler: &packet::BlobRecycler, q: &mut VecDeque<SharedBlob>) {
for entry in self {
let blob = blob_recycler.allocate();
let pos = {
let mut bd = blob.write().unwrap();
let mut out = Cursor::new(bd.data_mut());
serialize_into(&mut out, &entry).expect("failed to serialize output");
out.position() as usize
};
2018-07-25 10:23:27 -07:00
assert!(pos <= BLOB_DATA_SIZE, "pos: {}", pos);
blob.write().unwrap().set_size(pos);
q.push_back(blob);
2018-05-29 10:52:00 -07:00
}
}
}
2018-07-25 12:15:26 -07:00
pub fn reconstruct_entries_from_blobs(blobs: VecDeque<SharedBlob>) -> Result<Vec<Entry>> {
let mut entries: Vec<Entry> = Vec::with_capacity(blobs.len());
for blob in blobs {
let entry = {
let msg = blob.read().unwrap();
2018-07-25 12:15:26 -07:00
let msg_size = msg.get_size()?;
deserialize(&msg.data()[..msg_size])
};
match entry {
Ok(entry) => entries.push(entry),
Err(err) => {
2018-07-25 12:15:26 -07:00
trace!("reconstruct_entry_from_blobs: {:?}", err);
return Err(Error::Serialize(err));
}
}
}
Ok(entries)
}
/// Creates the next entries for given transactions, outputs
/// updates start_hash to id of last Entry, sets cur_hashes to 0
pub fn next_entries_mut(
start_hash: &mut Hash,
cur_hashes: &mut u64,
transactions: Vec<Transaction>,
) -> Vec<Entry> {
// TODO: find a magic number that works better than | ?
// V
if transactions.is_empty() || transactions.len() == 1 {
vec![Entry::new_mut(start_hash, cur_hashes, transactions, false)]
} else {
let mut start = 0;
let mut entries = Vec::new();
while start < transactions.len() {
let mut chunk_end = transactions.len();
let mut upper = chunk_end;
let mut lower = start;
let mut next = chunk_end; // be optimistic that all will fit
// binary search for how many transactions will fit in an Entry (i.e. a BLOB)
loop {
debug!(
"chunk_end {}, upper {} lower {} next {} transactions.len() {}",
chunk_end,
upper,
lower,
next,
transactions.len()
);
if Entry::will_fit(transactions[start..chunk_end].to_vec()) {
next = (upper + chunk_end) / 2;
lower = chunk_end;
debug!(
"chunk_end {} fits, maybe too well? trying {}",
chunk_end, next
);
} else {
next = (lower + chunk_end) / 2;
upper = chunk_end;
debug!("chunk_end {} doesn't fit! trying {}", chunk_end, next);
}
// same as last time
if next == chunk_end {
debug!("converged on chunk_end {}", chunk_end);
break;
}
chunk_end = next;
}
2018-06-26 10:11:56 -07:00
entries.push(Entry::new_mut(
start_hash,
cur_hashes,
transactions[start..chunk_end].to_vec(),
transactions.len() - chunk_end > 0,
2018-06-26 10:11:56 -07:00
));
start = chunk_end;
}
entries
}
}
/// Creates the next Entries for given transactions
pub fn next_entries(
start_hash: &Hash,
cur_hashes: u64,
transactions: Vec<Transaction>,
) -> Vec<Entry> {
let mut id = *start_hash;
let mut num_hashes = cur_hashes;
next_entries_mut(&mut id, &mut num_hashes, transactions)
}
#[cfg(test)]
mod tests {
use super::*;
2018-07-24 18:13:48 -07:00
use bincode::serialized_size;
use chrono::prelude::*;
use entry::{next_entry, Entry};
use hash::hash;
2018-07-24 18:13:48 -07:00
use packet::{BlobRecycler, BLOB_DATA_SIZE, PACKET_DATA_SIZE};
use signature::{KeyPair, KeyPairUtil};
use std;
2018-06-16 08:18:42 -07:00
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2018-07-24 18:13:48 -07:00
use transaction::{Transaction, Vote};
#[test]
fn test_verify_slice() {
let zero = Hash::default();
2018-08-01 11:23:52 -07:00
let one = hash(&zero.as_ref());
2018-04-02 10:36:51 -07:00
assert!(vec![][..].verify(&zero)); // base case
assert!(vec![Entry::new_tick(0, &zero)][..].verify(&zero)); // singleton case 1
assert!(!vec![Entry::new_tick(0, &zero)][..].verify(&one)); // singleton case 2, bad
2018-06-26 10:11:56 -07:00
assert!(vec![next_entry(&zero, 0, vec![]); 2][..].verify(&zero)); // inductive step
2018-06-26 10:11:56 -07:00
let mut bad_ticks = vec![next_entry(&zero, 0, vec![]); 2];
2018-03-04 06:34:38 -08:00
bad_ticks[1].id = one;
2018-04-02 10:36:51 -07:00
assert!(!bad_ticks.verify(&zero)); // inductive step, bad
}
fn make_test_entries() -> Vec<Entry> {
let zero = Hash::default();
2018-08-01 11:23:52 -07:00
let one = hash(&zero.as_ref());
let keypair = KeyPair::new();
let tx0 = Transaction::new_vote(
&keypair,
Vote {
version: 0,
contact_info_version: 1,
},
one,
1,
);
let tx1 = Transaction::new_timestamp(&keypair, Utc::now(), one);
//
// TODO: this magic number and the mix of transaction types
// is designed to fill up a Blob more or less exactly,
// to get near enough the the threshold that
// deserialization falls over if it uses the wrong size()
// parameter to index into blob.data()
//
// magic numbers -----------------+
// |
// V
let mut transactions = vec![tx0; 362];
transactions.extend(vec![tx1; 100]);
2018-08-01 10:01:17 -07:00
next_entries(&zero, 0, transactions)
}
#[test]
fn test_entries_to_blobs() {
let entries = make_test_entries();
let blob_recycler = BlobRecycler::default();
let mut blob_q = VecDeque::new();
2018-05-29 10:52:00 -07:00
entries.to_blobs(&blob_recycler, &mut blob_q);
assert_eq!(reconstruct_entries_from_blobs(blob_q).unwrap(), entries);
2018-06-16 08:18:42 -07:00
}
#[test]
fn test_bad_blobs_attack() {
let blob_recycler = BlobRecycler::default();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
let blobs_q = packet::to_blobs(vec![(0, addr)], &blob_recycler).unwrap(); // <-- attack!
assert!(reconstruct_entries_from_blobs(blobs_q).is_err());
}
#[test]
2018-06-26 10:11:56 -07:00
fn test_next_entries() {
2018-07-24 18:13:48 -07:00
use logger;
logger::setup();
2018-06-26 10:11:56 -07:00
let id = Hash::default();
2018-08-01 11:23:52 -07:00
let next_id = hash(&id.as_ref());
let keypair = KeyPair::new();
2018-07-24 18:13:48 -07:00
let tx_small = Transaction::new_vote(
&keypair,
Vote {
version: 0,
contact_info_version: 2,
},
next_id,
2,
);
let tx_large = Transaction::new(&keypair, keypair.pubkey(), 1, next_id);
let tx_small_size = serialized_size(&tx_small).unwrap();
let tx_large_size = serialized_size(&tx_large).unwrap();
assert!(tx_small_size < tx_large_size);
assert!(tx_large_size < PACKET_DATA_SIZE as u64);
2018-06-26 10:11:56 -07:00
// NOTE: if Entry grows to larger than a transaction, the code below falls over
2018-07-24 18:13:48 -07:00
let threshold = (BLOB_DATA_SIZE / PACKET_DATA_SIZE) - 1;
2018-06-26 10:11:56 -07:00
// verify no split
2018-07-24 18:13:48 -07:00
let transactions = vec![tx_small.clone(); threshold];
2018-06-26 10:11:56 -07:00
let entries0 = next_entries(&id, 0, transactions.clone());
assert_eq!(entries0.len(), 1);
assert!(entries0.verify(&id));
2018-07-24 18:13:48 -07:00
// verify the split with uniform transactions
let transactions = vec![tx_small.clone(); threshold * 2];
2018-06-26 10:11:56 -07:00
let entries0 = next_entries(&id, 0, transactions.clone());
assert_eq!(entries0.len(), 2);
assert!(entries0[0].has_more);
assert!(!entries0[entries0.len() - 1].has_more);
2018-07-24 18:13:48 -07:00
assert!(entries0.verify(&id));
// verify the split with small transactions followed by large
// transactions
let mut transactions = vec![tx_small.clone(); BLOB_DATA_SIZE / (tx_small_size as usize)];
let large_transactions = vec![tx_large.clone(); BLOB_DATA_SIZE / (tx_large_size as usize)];
transactions.extend(large_transactions);
2018-06-26 10:11:56 -07:00
2018-07-24 18:13:48 -07:00
let entries0 = next_entries(&id, 0, transactions.clone());
assert!(entries0.len() > 2);
2018-07-24 18:13:48 -07:00
assert!(entries0[0].has_more);
assert!(!entries0[entries0.len() - 1].has_more);
2018-06-26 10:11:56 -07:00
assert!(entries0.verify(&id));
}
fn tmp_ledger_path() -> String {
let keypair = KeyPair::new();
format!(
"target/test_ledger_reader_writer_window-{}",
keypair.pubkey()
)
}
2018-08-01 10:01:17 -07:00
#[test]
fn test_ledger_reader_writer() {
let ledger_path = tmp_ledger_path();
let entries = make_test_entries();
2018-08-01 11:45:43 -07:00
let mut writer = LedgerWriter::new(&ledger_path).unwrap();
writer.write_entries(entries.clone()).unwrap();
let mut read_entries = vec![];
2018-08-01 11:45:43 -07:00
for x in read_ledger(&ledger_path).unwrap() {
let entry = x.unwrap();
trace!("entry... {:?}", entry);
read_entries.push(entry);
}
assert_eq!(read_entries, entries);
2018-08-01 11:45:43 -07:00
let mut window = LedgerWindow::new(&ledger_path).unwrap();
for (i, entry) in entries.iter().enumerate() {
let read_entry = window.get_entry(i as u64).unwrap();
assert_eq!(*entry, read_entry);
}
2018-08-01 11:02:20 -07:00
assert!(window.get_entry(100).is_err());
2018-08-01 10:01:17 -07:00
std::fs::remove_file(Path::new(&ledger_path).join("data")).unwrap();
// empty data file should fall over
2018-08-01 11:45:43 -07:00
assert!(LedgerWindow::new(&ledger_path).is_err());
assert!(read_ledger(&ledger_path).is_err());
2018-08-01 10:01:17 -07:00
std::fs::remove_dir_all(ledger_path).unwrap();
}
2018-08-01 14:15:51 -07:00
#[test]
fn test_ledger_copy() {
let from = tmp_ledger_path();
2018-08-01 14:15:51 -07:00
let entries = make_test_entries();
let mut writer = LedgerWriter::new(&from).unwrap();
writer.write_entries(entries.clone()).unwrap();
let to = tmp_ledger_path();
2018-08-01 14:15:51 -07:00
copy(&from, &to).unwrap();
2018-08-01 14:15:51 -07:00
let mut read_entries = vec![];
for x in read_ledger(&to).unwrap() {
let entry = x.unwrap();
trace!("entry... {:?}", entry);
read_entries.push(entry);
}
assert_eq!(read_entries, entries);
std::fs::remove_dir_all(from).unwrap();
std::fs::remove_dir_all(to).unwrap();
}
}