tweak random access ledger

* add recover_ledger() to deal with expected common ledger corruptions
  * add verify_ledger() for future use cases (ledger-tool)
  * increase ledger testing
  * allow replicate stage to run without a ledger
  * ledger-tool to output valid json
This commit is contained in:
Rob Walker 2018-08-05 22:04:27 -07:00
parent ee6c15d2db
commit c3db2df7eb
10 changed files with 410 additions and 155 deletions

View File

@ -54,7 +54,7 @@ fn main() -> Result<(), Box<error::Error>> {
let pkcs8: Vec<u8> = serde_json::from_str(&buffer)?;
let mint = Mint::new_with_pkcs8(tokens, pkcs8);
let mut ledger_writer = LedgerWriter::new(&ledger_path)?;
let mut ledger_writer = LedgerWriter::new(&ledger_path, true)?;
ledger_writer.write_entries(mint.create_entries())?;
Ok(())

View File

@ -23,9 +23,11 @@ fn main() {
let entries = read_ledger(ledger_path).expect("opening ledger");
stdout().write_all(b"{\"ledger\":[\n").expect("open array");
for entry in entries {
let entry = entry.unwrap();
serde_json::to_writer(stdout(), &entry).expect("serialize");
stdout().write_all(b"\n").expect("newline");
stdout().write_all(b",\n").expect("newline");
}
stdout().write_all(b"\n]}\n").expect("close array");
}

View File

@ -262,7 +262,7 @@ mod tests {
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
format!("farf/{}-{}", name, keypair.pubkey())
format!("/tmp/farf/{}-{}", name, keypair.pubkey())
}
#[test]

View File

@ -92,7 +92,7 @@ impl FullNode {
node,
&network_entry_point,
exit.clone(),
ledger_path,
Some(ledger_path),
sigverify_disabled,
);
info!(
@ -300,7 +300,7 @@ impl FullNode {
node: TestNode,
entry_point: &NodeInfo,
exit: Arc<AtomicBool>,
ledger_path: &str,
ledger_path: Option<&str>,
_sigverify_disabled: bool,
) -> Self {
let bank = Arc::new(bank);
@ -377,16 +377,9 @@ mod tests {
use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::fs::remove_dir_all;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
format!("farf/{}-{}", name, keypair.pubkey())
}
#[test]
fn validator_exit() {
let kp = KeyPair::new();
@ -395,15 +388,13 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let lp = tmp_ledger_path("validator_exit");
let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false);
let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, None, false);
v.exit();
v.join().unwrap();
remove_dir_all(lp).unwrap();
}
#[test]
fn validator_parallel_exit() {
let vals: Vec<(FullNode, String)> = (0..2)
let vals: Vec<FullNode> = (0..2)
.map(|_| {
let kp = KeyPair::new();
let tn = TestNode::new_localhost_with_pubkey(kp.pubkey());
@ -411,20 +402,15 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let lp = tmp_ledger_path("validator_parallel_exit");
(
FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false),
lp,
)
FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, None, false)
})
.collect();
//each validator can exit in parallel to speed many sequential calls to `join`
vals.iter().for_each(|v| v.0.exit());
vals.iter().for_each(|v| v.exit());
//while join is called sequentially, the above exit call notified all the
//validators to exit from all their threads
vals.into_iter().for_each(|v| {
v.0.join().unwrap();
remove_dir_all(v.1).unwrap()
v.join().unwrap();
});
}
}

View File

@ -5,54 +5,56 @@
use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size};
use entry::Entry;
use hash::Hash;
use log::Level::Trace;
use packet::{self, SharedBlob, BLOB_DATA_SIZE};
use rayon::prelude::*;
use result::{Error, Result};
use std::collections::VecDeque;
use std::fs::{create_dir_all, File, OpenOptions};
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
use std::io::prelude::*;
use std::io::{self, Cursor, Seek, SeekFrom};
use std::mem::size_of;
use std::path::Path;
use transaction::Transaction;
///
/// A persistent ledger is 2 files:
/// ledger_path/ --+
/// +-- index <== an array of u64 offsets into data,
/// | each offset points to the first bytes
/// | of a usize that contains the length of
/// | the entry
/// +-- data <== concatenated usize length + entry data
///
/// When opening a ledger, we have the ability to "audit" it, which means
/// we need to pick which file to use as "truth", and correct the other
/// file as necessary, if possible.
///
/// The protocol for writing the ledger is to append to the data file first,
/// the index file 2nd. If interupted while writing, there are 2
/// possibilities we need to cover:
///
/// 1. a partial write of data, which might be a partial write of length
/// or a partial write entry data.
/// 2. a partial or missing write to index for that entry
///
/// There is also the possibility of "unsynchronized" reading of the ledger
/// during transfer across nodes via rsync (or whatever). In this case,
/// if the transfer of the data file is done before the transfer of the
/// index file, it's possible (likely?) that the index file will be far
/// ahead of the data file in time.
///
/// The quickest and most reliable strategy for recovery is therefore to
/// treat the data file as nearest to the "truth".
///
/// The logic for "recovery/audit" is to open index, reading backwards
/// from the last u64-aligned entry to get to where index and data
/// agree (i.e. where a successful deserialization of an entry can
/// be performed). If index is ahead in time, truncate the index file
/// to match data. If index is behind in time, truncate data to the
/// last entry listed in index.
///
//
// A persistent ledger is 2 files:
// ledger_path/ --+
// +-- index <== an array of u64 offsets into data,
// | each offset points to the first bytes
// | of a u64 that contains the length of
// | the entry. To make the code smaller,
// | index[0] is set to 0, TODO: this field
// | could later be used for other stuff...
// +-- data <== concatenated instances of
// u64 length
// entry data
//
// When opening a ledger, we have the ability to "audit" it, which means we need
// to pick which file to use as "truth", and correct the other file as
// necessary, if possible.
//
// The protocol for writing the ledger is to append to the data file first, the
// index file 2nd. If the writing node is interupted while appending to the
// ledger, there are some possibilities we need to cover:
//
// 1. a partial write of data, which might be a partial write of length
// or a partial write entry data
// 2. a partial or missing write to index for that entry
//
// There is also the possibility of "unsynchronized" reading of the ledger
// during transfer across nodes via rsync (or whatever). In this case, if the
// transfer of the data file is done before the transfer of the index file,
// it's likely that the index file will be far ahead of the data file in time.
//
// The quickest and most reliable strategy for recovery is therefore to treat
// the data file as nearest to the "truth".
//
// The logic for "recovery/audit" is to open index and read backwards from the
// last u64-aligned entry to get to where index and data agree (i.e. where a
// successful deserialization of an entry can be performed), then truncate
// both files to this syncrhonization point.
//
// ledger window
#[derive(Debug)]
@ -63,7 +65,6 @@ pub struct LedgerWindow {
// use a CONST because there's a cast, and we don't want "sizeof::<u64> as u64"...
const SIZEOF_U64: u64 = size_of::<u64>() as u64;
const SIZEOF_USIZE: u64 = size_of::<usize>() as u64;
#[cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]
fn err_bincode_to_io(e: Box<bincode::ErrorKind>) -> io::Error {
@ -73,13 +74,14 @@ fn err_bincode_to_io(e: Box<bincode::ErrorKind>) -> io::Error {
fn entry_at(file: &mut File, at: u64) -> io::Result<Entry> {
file.seek(SeekFrom::Start(at))?;
let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?;
let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?;
trace!("entry_at({}) len: {}", at, len);
deserialize_from(file.take(len)).map_err(err_bincode_to_io)
}
fn next_entry(file: &mut File) -> io::Result<Entry> {
let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?;
let len = deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)?;
deserialize_from(file.take(len)).map_err(err_bincode_to_io)
}
@ -90,11 +92,13 @@ fn u64_at(file: &mut File, at: u64) -> io::Result<u64> {
impl LedgerWindow {
// opens a Ledger in directory, provides "infinite" window
pub fn new(directory: &str) -> io::Result<Self> {
let directory = Path::new(&directory);
pub fn new(ledger_path: &str) -> io::Result<Self> {
let ledger_path = Path::new(&ledger_path);
let index = File::open(directory.join("index"))?;
let data = File::open(directory.join("data"))?;
recover_ledger(ledger_path)?;
let index = File::open(ledger_path.join("index"))?;
let data = File::open(ledger_path.join("data"))?;
Ok(LedgerWindow { index, data })
}
@ -105,6 +109,124 @@ impl LedgerWindow {
}
}
pub fn verify_ledger(ledger_path: &str, recover: bool) -> io::Result<()> {
let ledger_path = Path::new(&ledger_path);
if recover {
recover_ledger(ledger_path)?;
}
let mut index = File::open(ledger_path.join("index"))?;
let mut data = File::open(ledger_path.join("data"))?;
let index_len = index.metadata()?.len();
if index_len % SIZEOF_U64 != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
"expected back-to-back entries",
))?;
}
let mut last_data_offset = 0;
let mut index_offset = 0;
let mut data_read = 0;
let mut last_len = 0;
while index_offset < index_len {
let data_offset = u64_at(&mut index, index_offset)?;
if last_data_offset + last_len != data_offset {
Err(io::Error::new(
io::ErrorKind::Other,
"expected back-to-back entries",
))?;
}
let entry = entry_at(&mut data, data_offset)?;
last_len = serialized_size(&entry).map_err(err_bincode_to_io)? + SIZEOF_U64;
last_data_offset = data_offset;
data_read += last_len;
index_offset += SIZEOF_U64;
}
if data_read != data.metadata()?.len() {
Err(io::Error::new(
io::ErrorKind::Other,
"garbage on end of data file",
))?;
}
Ok(())
}
fn recover_ledger(ledger_path: &Path) -> io::Result<()> {
let mut index = OpenOptions::new()
.write(true)
.read(true)
.open(ledger_path.join("index"))?;
let mut data = OpenOptions::new()
.write(true)
.read(true)
.open(ledger_path.join("data"))?;
// first, truncate to a multiple of SIZEOF_U64
let len = index.metadata()?.len();
if len % SIZEOF_U64 != 0 {
trace!("recover: trimming index len to {}", len - len % SIZEOF_U64);
index.set_len(len - (len % SIZEOF_U64))?;
}
// next, pull index offsets off one at a time until the last one points
// to a valid entry deserialization offset...
loop {
let len = index.metadata()?.len();
trace!("recover: index len:{}", len);
// should never happen
if len < SIZEOF_U64 {
trace!("recover: error index len {} too small", len);
Err(io::Error::new(io::ErrorKind::Other, "empty ledger index"))?;
}
let offset = u64_at(&mut index, len - SIZEOF_U64)?;
trace!("recover: offset[{}]: {}", (len / SIZEOF_U64) - 1, offset);
match entry_at(&mut data, offset) {
Ok(entry) => {
trace!("recover: entry[{}]: {:?}", (len / SIZEOF_U64) - 1, entry);
let entry_len = serialized_size(&entry).map_err(err_bincode_to_io)?;
trace!("recover: entry_len: {}", entry_len);
// now trim data file to size...
data.set_len(offset + SIZEOF_U64 + entry_len)?;
trace!(
"recover: trimmed data file to {}",
offset + SIZEOF_U64 + entry_len
);
break; // all good
}
Err(err) => {
trace!(
"recover: no entry recovered at {} {}",
offset,
err.to_string()
);
index.set_len(len - SIZEOF_U64)?;
}
}
}
// flush everything to disk...
index.sync_all()?;
data.sync_all()
}
// TODO?? ... we could open the files on demand to support [], but today
// LedgerWindow needs "&mut self"
//
@ -126,36 +248,67 @@ pub struct LedgerWriter {
}
impl LedgerWriter {
// opens or creates a LedgerWriter in directory
pub fn new(directory: &str) -> io::Result<Self> {
let directory = Path::new(&directory);
// opens or creates a LedgerWriter in ledger_path directory
pub fn new(ledger_path: &str, create: bool) -> io::Result<Self> {
let ledger_path = Path::new(&ledger_path);
create_dir_all(directory)?;
let index = OpenOptions::new()
.create(true)
if create {
let _ignored = remove_dir_all(ledger_path);
create_dir_all(ledger_path)?;
} else {
recover_ledger(ledger_path)?;
}
let mut index = OpenOptions::new()
.create(create)
.append(true)
.open(directory.join("index"))?;
.open(ledger_path.join("index"))?;
let data = OpenOptions::new()
.create(true)
if log_enabled!(Trace) {
let offset = index.seek(SeekFrom::Current(0))?;
trace!("LedgerWriter::new: index fp:{}", offset);
}
let mut data = OpenOptions::new()
.create(create)
.append(true)
.open(directory.join("data"))?;
.open(ledger_path.join("data"))?;
if log_enabled!(Trace) {
let offset = data.seek(SeekFrom::Current(0))?;
trace!("LedgerWriter::new: data fp:{}", offset);
}
Ok(LedgerWriter { index, data })
}
pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
let offset = self.data.seek(SeekFrom::Current(0))?;
let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
serialize_into(&mut self.data, &len).map_err(err_bincode_to_io)?;
if log_enabled!(Trace) {
let offset = self.data.seek(SeekFrom::Current(0))?;
trace!("write_entry: after len data fp:{}", offset);
}
serialize_into(&mut self.data, &entry).map_err(err_bincode_to_io)?;
self.data.flush()?;
if log_enabled!(Trace) {
let offset = self.data.seek(SeekFrom::Current(0))?;
trace!("write_entry: after entry data fp:{}", offset);
}
self.data.sync_data()?;
let offset = self.data.seek(SeekFrom::Current(0))? - len - SIZEOF_U64;
trace!("write_entry: offset:{} len:{}", offset, len);
serialize_into(&mut self.index, &offset).map_err(err_bincode_to_io)?;
self.index.flush()
if log_enabled!(Trace) {
let offset = self.index.seek(SeekFrom::Current(0))?;
trace!("write_entry: end index fp:{}", offset);
}
self.index.sync_data()
}
pub fn write_entries<I>(&mut self, entries: I) -> io::Result<()>
@ -171,7 +324,6 @@ impl LedgerWriter {
#[derive(Debug)]
pub struct LedgerReader {
// index: File,
data: File,
}
@ -187,27 +339,16 @@ impl Iterator for LedgerReader {
}
/// Return an iterator for all the entries in the given file.
pub fn read_ledger(directory: &str) -> io::Result<impl Iterator<Item = io::Result<Entry>>> {
let directory = Path::new(&directory);
pub fn read_ledger(ledger_path: &str) -> io::Result<impl Iterator<Item = io::Result<Entry>>> {
let ledger_path = Path::new(&ledger_path);
// let index = OpenOptions::new().write(true).open(directory.join("index"));
let data = File::open(directory.join("data"))?;
recover_ledger(ledger_path)?;
// audit_ledger(index, data)?;
let data = File::open(ledger_path.join("data"))?;
Ok(LedgerReader { data })
}
pub fn copy_ledger(from: &str, to: &str) -> io::Result<()> {
let mut to = LedgerWriter::new(to)?;
for entry in read_ledger(from)? {
let entry = entry?;
to.write_entry(&entry)?;
}
Ok(())
}
// a Block is a slice of Entries
pub trait Block {
/// Verifies the hashes and counts of a slice of transactions are all consistent.
@ -359,11 +500,13 @@ mod tests {
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
format!("farf/{}-{}", name, keypair.pubkey())
format!("/tmp/farf/{}-{}", name, keypair.pubkey())
}
#[test]
fn test_verify_slice() {
use logger;
logger::setup();
let zero = Hash::default();
let one = hash(&zero.as_ref());
assert!(vec![][..].verify(&zero)); // base case
@ -376,6 +519,25 @@ mod tests {
assert!(!bad_ticks.verify(&zero)); // inductive step, bad
}
fn make_tiny_test_entries(num: usize) -> Vec<Entry> {
let zero = Hash::default();
let one = hash(&zero.as_ref());
let keypair = KeyPair::new();
let mut id = one;
let mut num_hashes = 0;
(0..num)
.map(|_| {
Entry::new_mut(
&mut id,
&mut num_hashes,
vec![Transaction::new_timestamp(&keypair, Utc::now(), one)],
false,
)
})
.collect()
}
fn make_test_entries() -> Vec<Entry> {
let zero = Hash::default();
let one = hash(&zero.as_ref());
@ -407,6 +569,8 @@ mod tests {
#[test]
fn test_entries_to_blobs() {
use logger;
logger::setup();
let entries = make_test_entries();
let blob_recycler = BlobRecycler::default();
@ -418,6 +582,8 @@ mod tests {
#[test]
fn test_bad_blobs_attack() {
use logger;
logger::setup();
let blob_recycler = BlobRecycler::default();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8000);
let blobs_q = packet::to_blobs(vec![(0, addr)], &blob_recycler).unwrap(); // <-- attack!
@ -480,10 +646,12 @@ mod tests {
#[test]
fn test_ledger_reader_writer() {
use logger;
logger::setup();
let ledger_path = tmp_ledger_path("test_ledger_reader_writer");
let entries = make_test_entries();
let entries = make_tiny_test_entries(10);
let mut writer = LedgerWriter::new(&ledger_path).unwrap();
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap();
let mut read_entries = vec![];
@ -509,28 +677,91 @@ mod tests {
std::fs::remove_dir_all(ledger_path).unwrap();
}
#[test]
fn test_copy_ledger() {
let from = tmp_ledger_path("test_ledger_copy_from");
let entries = make_test_entries();
let mut writer = LedgerWriter::new(&from).unwrap();
fn truncated_last_entry(ledger_path: &str, entries: Vec<Entry>) {
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
writer.write_entries(entries).unwrap();
let len = writer.data.seek(SeekFrom::Current(0)).unwrap();
writer.data.set_len(len - 4).unwrap();
}
fn garbage_on_data(ledger_path: &str, entries: Vec<Entry>) {
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
writer.write_entries(entries).unwrap();
writer.data.write_all(b"hi there!").unwrap();
}
fn read_ledger_check(ledger_path: &str, entries: Vec<Entry>, len: usize) {
let read_entries = read_ledger(&ledger_path).unwrap();
let mut i = 0;
for entry in read_entries {
assert_eq!(entry.unwrap(), entries[i]);
i += 1;
}
assert_eq!(i, len);
}
fn ledger_window_check(ledger_path: &str, entries: Vec<Entry>, len: usize) {
let mut window = LedgerWindow::new(&ledger_path).unwrap();
for i in 0..len {
let entry = window.get_entry(i as u64);
assert_eq!(entry.unwrap(), entries[i]);
}
}
#[test]
fn test_recover_ledger() {
use logger;
logger::setup();
let entries = make_tiny_test_entries(10);
let ledger_path = tmp_ledger_path("test_recover_ledger");
// truncate data file, tests recover inside read_ledger_check()
truncated_last_entry(&ledger_path, entries.clone());
read_ledger_check(&ledger_path, entries.clone(), entries.len() - 1);
// truncate data file, tests recover inside LedgerWindow::new()
truncated_last_entry(&ledger_path, entries.clone());
ledger_window_check(&ledger_path, entries.clone(), entries.len() - 1);
// restore last entry, tests recover_ledger() inside LedgerWriter::new()
truncated_last_entry(&ledger_path, entries.clone());
let mut writer = LedgerWriter::new(&ledger_path, false).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
// make it look like data is newer in time, check reader...
garbage_on_data(&ledger_path, entries.clone());
read_ledger_check(&ledger_path, entries.clone(), entries.len());
// make it look like data is newer in time, check window...
garbage_on_data(&ledger_path, entries.clone());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
// make it look like data is newer in time, check writer...
garbage_on_data(&ledger_path, entries[..entries.len() - 1].to_vec());
let mut writer = LedgerWriter::new(&ledger_path, false).unwrap();
writer.write_entry(&entries[entries.len() - 1]).unwrap();
read_ledger_check(&ledger_path, entries.clone(), entries.len());
ledger_window_check(&ledger_path, entries.clone(), entries.len());
let _ignored = remove_dir_all(&ledger_path);
}
#[test]
fn test_verify_ledger() {
use logger;
logger::setup();
let entries = make_tiny_test_entries(10);
let ledger_path = tmp_ledger_path("test_verify_ledger");
let mut writer = LedgerWriter::new(&ledger_path, true).unwrap();
writer.write_entries(entries.clone()).unwrap();
let to = tmp_ledger_path("test_ledger_copy_to");
copy_ledger(&from, &to).unwrap();
let mut read_entries = vec![];
for x in read_ledger(&to).unwrap() {
let entry = x.unwrap();
trace!("entry... {:?}", entry);
read_entries.push(entry);
}
assert_eq!(read_entries, entries);
std::fs::remove_dir_all(from).unwrap();
std::fs::remove_dir_all(to).unwrap();
assert!(verify_ledger(&ledger_path, false).is_ok());
let _ignored = remove_dir_all(&ledger_path);
}
}

View File

@ -31,7 +31,7 @@ impl ReplicateStage {
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
window_receiver: &BlobReceiver,
ledger_writer: &mut LedgerWriter,
ledger_writer: Option<&mut LedgerWriter>,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available blobs into a single vote
@ -50,8 +50,9 @@ impl ReplicateStage {
"replicate-transactions",
entries.iter().map(|x| x.transactions.len()).sum()
);
ledger_writer.write_entries(entries.clone())?;
if let Some(ledger_writer) = ledger_writer {
ledger_writer.write_entries(entries.clone())?;
}
let res = bank.process_entries(entries);
@ -70,7 +71,7 @@ impl ReplicateStage {
crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler,
window_receiver: BlobReceiver,
ledger_path: &str,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self {
let (vote_blob_sender, vote_blob_receiver) = channel();
@ -90,7 +91,8 @@ impl ReplicateStage {
vote_blob_sender,
exit,
);
let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap();
let mut ledger_writer = ledger_path.map(|p| LedgerWriter::new(p, false).unwrap());
let t_replicate = Builder::new()
.name("solana-replicate-stage".to_string())
@ -100,7 +102,7 @@ impl ReplicateStage {
&crdt,
&blob_recycler,
&window_receiver,
&mut ledger_writer,
ledger_writer.as_mut(),
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,

View File

@ -285,6 +285,7 @@ mod tests {
use budget::Budget;
use crdt::TestNode;
use fullnode::FullNode;
use ledger::LedgerWriter;
use logger;
use mint::Mint;
use service::Service;
@ -294,10 +295,15 @@ mod tests {
use std::sync::Arc;
use transaction::{Instruction, Plan};
fn tmp_ledger_path(name: &str) -> String {
fn tmp_ledger(name: &str, mint: &Mint) -> String {
let keypair = KeyPair::new();
format!("farf/{}-{}", name, keypair.pubkey())
let path = format!("/tmp/farf/{}-{}", name, keypair.pubkey());
let mut writer = LedgerWriter::new(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
path
}
#[test]
@ -311,7 +317,7 @@ mod tests {
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let ledger_path = tmp_ledger_path("thin_client");
let ledger_path = tmp_ledger("thin_client", &alice);
let server = FullNode::new_leader(
leader_keypair,
@ -358,7 +364,7 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger_path("bad_sig");
let ledger_path = tmp_ledger("bad_sig", &alice);
let server = FullNode::new_leader(
leader_keypair,
@ -418,7 +424,7 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger_path("client_check_signature");
let ledger_path = tmp_ledger("client_check_signature", &alice);
let server = FullNode::new_leader(
leader_keypair,

View File

@ -78,7 +78,7 @@ impl Tvu {
replicate_socket: UdpSocket,
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
ledger_path: &str,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
) -> Self {
let blob_recycler = BlobRecycler::default();
@ -154,7 +154,6 @@ pub mod tests {
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
@ -175,12 +174,6 @@ pub mod tests {
Ok((ncp, window))
}
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
format!("farf/{}-{}", name, keypair.pubkey())
}
/// Test that message sent from leader to target1 and replicated to target2
#[test]
fn test_replicate() {
@ -240,7 +233,6 @@ pub mod tests {
let cref1 = Arc::new(RwLock::new(crdt1));
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap();
let ledger_path = tmp_ledger_path("replicate");
let tvu = Tvu::new(
target1_keypair,
&bank,
@ -250,7 +242,7 @@ pub mod tests {
target1.sockets.replicate,
target1.sockets.repair,
target1.sockets.retransmit,
&ledger_path,
None,
exit.clone(),
);
@ -320,6 +312,5 @@ pub mod tests {
dr_1.0.join().expect("join");
t_receiver.join().expect("join");
t_responder.join().expect("join");
remove_dir_all(ledger_path).unwrap();
}
}

View File

@ -84,7 +84,7 @@ impl WriteStage {
vote_blob_receiver,
);
let (blob_sender, blob_receiver) = channel();
let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap();
let mut ledger_writer = LedgerWriter::new(ledger_path, false).unwrap();
let thread_hdl = Builder::new()
.name("solana-writer".to_string())

View File

@ -6,7 +6,7 @@ extern crate solana;
use solana::crdt::{Crdt, NodeInfo, TestNode};
use solana::fullnode::FullNode;
use solana::ledger::{copy_ledger, LedgerWriter};
use solana::ledger::{read_ledger, LedgerWriter};
use solana::logger;
use solana::mint::Mint;
use solana::ncp::Ncp;
@ -18,6 +18,7 @@ use solana::timing::duration_as_s;
use std::cmp::max;
use std::env;
use std::fs::remove_dir_all;
use std::io;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
@ -75,20 +76,54 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
format!("farf/{}-{}", name, keypair.pubkey())
format!("/tmp/farf/{}-{}", name, keypair.pubkey())
}
fn genesis(name: &str, num: i64) -> (Mint, String) {
let mint = Mint::new(num);
let path = tmp_ledger_path(name);
let mut writer = LedgerWriter::new(&path).unwrap();
let mut writer = LedgerWriter::new(&path, true).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
(mint, path)
}
//#[test]
//fn test_copy_ledger() {
// let from = tmp_ledger_path("test_ledger_copy_from");
// let entries = make_tiny_test_entries(10);
//
// let mut writer = LedgerWriter::new(&from, true).unwrap();
// writer.write_entries(entries.clone()).unwrap();
//
// let to = tmp_ledger_path("test_ledger_copy_to");
//
// copy_ledger(&from, &to).unwrap();
//
// let mut read_entries = vec![];
// for x in read_ledger(&to).unwrap() {
// let entry = x.unwrap();
// trace!("entry... {:?}", entry);
// read_entries.push(entry);
// }
// assert_eq!(read_entries, entries);
//
// std::fs::remove_dir_all(from).unwrap();
// std::fs::remove_dir_all(to).unwrap();
//}
//
fn copy_ledger(from: &str, to: &str) -> io::Result<()> {
let mut to = LedgerWriter::new(to, true)?;
for entry in read_ledger(from)? {
let entry = entry?;
to.write_entry(&entry)?;
}
Ok(())
}
fn tmp_copy_ledger(from: &str, name: &str) -> String {
let to = tmp_ledger_path(name);
copy_ledger(from, &to).unwrap();
@ -110,6 +145,12 @@ fn test_multi_node_validator_catchup_from_zero() {
let (alice, leader_ledger_path) = genesis("multi_node_validator_catchup_from_zero", 10_000);
ledger_paths.push(leader_ledger_path.clone());
let zero_ledger_path = tmp_copy_ledger(
&leader_ledger_path,
"multi_node_validator_catchup_from_zero",
);
ledger_paths.push(zero_ledger_path.clone());
let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
// Send leader some tokens to vote
@ -158,18 +199,14 @@ fn test_multi_node_validator_catchup_from_zero() {
assert_eq!(success, servers.len());
success = 0;
// start up another validator, converge and then check everyone's balances
// start up another validator from zero, converge and then check everyone's
// balances
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(
&leader_ledger_path,
"multi_node_validator_catchup_from_zero",
);
ledger_paths.push(ledger_path.clone());
let val = FullNode::new(
validator,
false,
&ledger_path,
&zero_ledger_path,
keypair,
Some(leader_data.contact_info.ncp),
);