diff --git a/src/ledger.rs b/src/ledger.rs index a141c1c40..a3c291f26 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -1,64 +1,154 @@ //! The `ledger` module provides functions for parallel verification of the //! Proof of History ledger. -use bincode::{deserialize, serialize_into}; +use bincode::{deserialize, deserialize_from, serialize_into}; use entry::Entry; use hash::Hash; use packet::{self, SharedBlob, BLOB_DATA_SIZE}; use rayon::prelude::*; use result::{Error, Result}; use std::collections::VecDeque; -use std::io::Cursor; +use std::fs::{create_dir_all, File, OpenOptions}; +use std::io; +use std::io::prelude::*; +use std::io::{Cursor, ErrorKind, Seek, SeekFrom}; +use std::mem::size_of; +use std::path::Path; use transaction::Transaction; -use std::fs::{File, OpenOptions}; -// ledger -pub struct Ledger{ - entry_len_len: usize; - - entry_height: u64; // current index +// ledger window +pub struct LedgerWindow { + index: File, + data: File, +} - pub struct Files { - index: File; // an array of usize elements - data: File; // concatenated entries +impl LedgerWindow { + // opens a Ledger in directory, provides "infinite" window + pub fn new(directory: String) -> io::Result { + let directory = Path::new(&directory); + + let index = File::open(directory.join("index"))?; + let data = File::open(directory.join("data"))?; + + Ok(LedgerWindow { index, data }) } - reader: Files; - writer: Files; + pub fn get_entry(&mut self, index: u64) -> io::Result { + fn u64_at(file: &mut File, at: u64) -> io::Result { + file.seek(SeekFrom::Start(at))?; + deserialize_from(file.take(size_of::() as u64)) + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string())) + } - pub fn new(directory: String) -> Self { - - } + let end_offset = u64_at(&mut self.index, index * size_of::() as u64)?; - pub fn get_entry_height(&self) -> u64 { - entry_height - } - - pub fn entry_at(&self, index: u64) -> Result { - - } - pub fn append_entry(&self, entry: &Entry) -> io::Result { - Ok(0) - } - pub fn append_entries(&self, entries: &[Entry]) -> io::Result { - Ok(0) + let start_offset = if index != 0 { + u64_at(&mut self.index, (index - 1) * size_of::() as u64)? + } else { + 0u64 + }; + + fn entry_at(file: &mut File, at: u64, len: u64) -> io::Result { + file.seek(SeekFrom::Start(at))?; + + deserialize_from(file.take(len)) + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string())) + } + + entry_at(&mut self.data, start_offset, end_offset - start_offset) } } -impl Iterator for Ledger { +pub struct LedgerWriter { + index: File, + data: File, +} + +impl LedgerWriter { + // opens or creates a LedgerWriter in directory + pub fn new(directory: String) -> io::Result { + let directory = Path::new(&directory); + + create_dir_all(directory)?; + + let index = OpenOptions::new() + .create(true) + .append(true) + .open(directory.join("index"))?; + + let data = OpenOptions::new() + .create(true) + .append(true) + .open(directory.join("data"))?; + + Ok(LedgerWriter { index, data }) + } + + fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { + serialize_into(&mut self.data, &entry) + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; + self.data.flush()?; + + let offset = self.data.seek(SeekFrom::Current(0))?; + serialize_into(&mut self.index, &offset) + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; + self.index.flush() + } + + pub fn write_entries(&mut self, entries: I) -> io::Result<()> + where + I: IntoIterator, + { + for entry in entries { + self.write_entry(&entry)?; + } + Ok(()) + } +} + +pub struct LedgerReader { + offset: u64, // next start_offset + index: File, + data: File, +} + +impl Iterator for LedgerReader { type Item = io::Result; fn next(&mut self) -> Option> { - let mut entry_len_bytes = [0u8; sizeof(::()]; // TODO: sizeof()? - - let mut entry_len = - if self.reader.index - .read_exact(&mut entry_len_bytes[..self.entry_len_len]) - .is_ok() + fn next_offset(file: &mut File) -> io::Result { + deserialize_from(file.take(size_of::() as u64)) + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string())) + } + fn next_entry(file: &mut File, len: u64) -> io::Result { + deserialize_from(file.take(len)) + .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string())) + } + match next_offset(&mut self.index) { + Ok(end_offset) => { + let len = end_offset - self.offset; + self.offset = end_offset; + Some(next_entry(&mut self.data, len)) + } + Err(_) => None, + } } } +/// Return an iterator for all the entries in the given file. +pub fn read_ledger(directory: String) -> io::Result>> { + let directory = Path::new(&directory); + + let index = File::open(directory.join("index"))?; + let data = File::open(directory.join("data"))?; + + Ok(LedgerReader { + offset: 0, + index, + data, + }) +} // a Block is a slice of Entries pub trait Block { @@ -204,6 +294,7 @@ mod tests { use hash::hash; use packet::{BlobRecycler, BLOB_DATA_SIZE, PACKET_DATA_SIZE}; use signature::{KeyPair, KeyPairUtil}; + use std; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use transaction::{Transaction, Vote}; @@ -221,8 +312,7 @@ mod tests { assert!(!bad_ticks.verify(&zero)); // inductive step, bad } - #[test] - fn test_entries_to_blobs() { + fn make_test_entries() -> Vec { let zero = Hash::default(); let one = hash(&zero); let keypair = KeyPair::new(); @@ -248,7 +338,13 @@ mod tests { // V let mut transactions = vec![tx0; 362]; transactions.extend(vec![tx1; 100]); - let entries = next_entries(&zero, 0, transactions); + next_entries(&zero, 0, transactions); + } + + #[test] + fn test_entries_to_blobs() { + let entries = make_test_entries(); + let blob_recycler = BlobRecycler::default(); let mut blob_q = VecDeque::new(); entries.to_blobs(&blob_recycler, &mut blob_q); @@ -316,17 +412,46 @@ mod tests { assert!(entries0[0].has_more); assert!(!entries0[entries0.len() - 1].has_more); assert!(entries0.verify(&id)); - // test hand-construction... brittle, changes if split method changes... ? - // let mut entries1 = vec![]; - // entries1.push(Entry::new(&id, 1, transactions[..threshold].to_vec(), true)); - // id = entries1[0].id; - // entries1.push(Entry::new( - // &id, - // 1, - // transactions[threshold..].to_vec(), - // false, - // )); - // - // assert_eq!(entries0, entries1); } + + #[test] + fn test_ledger_reader_writer() { + let keypair = KeyPair::new(); + + let ledger_path = { + let id = { + let ids: Vec<_> = keypair + .pubkey() + .iter() + .map(|id| format!("{}", id)) + .collect(); + ids.join("") + }; + + format!("target/test_ledger_reader_writer_window-{}", id) + }; + + let entries = make_test_entries(); + + let mut writer = LedgerWriter::new(ledger_path.clone()).unwrap(); + writer.write_entries(entries.clone()).unwrap(); + + let mut read_entries = vec![]; + for x in read_ledger(ledger_path.clone()).unwrap() { + let entry = x.unwrap(); + trace!("entry... {:?}", entry); + read_entries.push(entry); + } + assert_eq!(read_entries, entries); + + let mut window = LedgerWindow::new(ledger_path.clone()).unwrap(); + + for (i, entry) in entries.iter().enumerate() { + let read_entry = window.get_entry(i as u64).unwrap(); + assert_eq!(*entry, read_entry); + } + + std::fs::remove_dir_all(ledger_path).unwrap(); + } + }