diff --git a/.gitignore b/.gitignore index 5e24c10d2..94a99c056 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ Cargo.lock /target/ + **/*.rs.bk .cargo @@ -11,5 +12,5 @@ Cargo.lock /config-client/ /multinode-demo/test/config-client/ -# Generated log files -/log-*.txt +# test temp files, ledgers, etc. +/farf/ diff --git a/multinode-demo/leader.sh b/multinode-demo/leader.sh index f58ac643d..b0b031e1f 100755 --- a/multinode-demo/leader.sh +++ b/multinode-demo/leader.sh @@ -28,7 +28,7 @@ tune_networking trap 'kill "$pid" && wait "$pid"' INT TERM $program \ --identity "$SOLANA_CONFIG_DIR"/leader.json \ - --ledger "$SOLANA_CONFIG_DIR"/ledger.log \ + --ledger "$SOLANA_CONFIG_DIR"/ledger \ > >($leader_logger) 2>&1 & pid=$! wait "$pid" diff --git a/multinode-demo/setup.sh b/multinode-demo/setup.sh index e0a2d6b05..094d413ae 100755 --- a/multinode-demo/setup.sh +++ b/multinode-demo/setup.sh @@ -91,8 +91,8 @@ if $node_type_leader; then echo "Creating $mint_path with $num_tokens tokens" $solana_keygen -o "$mint_path" - echo "Creating $SOLANA_CONFIG_DIR/ledger.log" - $solana_genesis --tokens="$num_tokens" < "$mint_path" > "$SOLANA_CONFIG_DIR"/ledger.log + echo "Creating $SOLANA_CONFIG_DIR/ledger" + $solana_genesis --tokens="$num_tokens" --ledger "$SOLANA_CONFIG_DIR"/ledger < "$mint_path" echo "Creating $SOLANA_CONFIG_DIR/leader.json" $solana_fullnode_config --keypair="$leader_id_path" "${leader_address_args[@]}" > "$SOLANA_CONFIG_DIR"/leader.json diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 7f0c51af6..35ac041bf 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -71,8 +71,8 @@ SOLANA_LEADER_CONFIG_DIR="$SOLANA_CONFIG_DIR"/leader-config rm -rf "$SOLANA_LEADER_CONFIG_DIR" set -ex $rsync -vPrz --max-size=100M "$rsync_leader_url"/config/ "$SOLANA_LEADER_CONFIG_DIR" -[[ -r "$SOLANA_LEADER_CONFIG_DIR"/ledger.log ]] || { - echo "Unable to retrieve ledger.log from $rsync_leader_url" +[[ -d "$SOLANA_LEADER_CONFIG_DIR"/ledger ]] || { + echo "Unable to retrieve ledger from $rsync_leader_url" exit 1 } @@ -80,7 +80,7 @@ trap 'kill "$pid" && wait "$pid"' INT TERM $program \ --identity "$SOLANA_CONFIG_DIR"/validator.json \ --testnet "$leader_address:$leader_port" \ - --ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger.log \ + --ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger \ > >($validator_logger) 2>&1 & pid=$! wait "$pid" diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 076b21161..f07e8c91c 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -8,7 +8,7 @@ use clap::{App, Arg}; use solana::client::mk_client; use solana::crdt::{NodeInfo, TestNode}; use solana::drone::DRONE_PORT; -use solana::fullnode::{Config, FullNode, LedgerFile}; +use solana::fullnode::{Config, FullNode}; use solana::logger; use solana::metrics::set_panic_hook; use solana::service::Service; @@ -41,11 +41,12 @@ fn main() -> () { ) .arg( Arg::with_name("ledger") - .short("L") + .short("l") .long("ledger") - .value_name("FILE") + .value_name("DIR") .takes_value(true) - .help("use FILE as persistent ledger (defaults to stdin/stdout)"), + .required(true) + .help("use DIR as persistent ledger location"), ) .get_matches(); @@ -72,11 +73,7 @@ fn main() -> () { let leader_pubkey = keypair.pubkey(); let repl_clone = repl_data.clone(); - let ledger = if let Some(l) = matches.value_of("ledger") { - LedgerFile::Path(l.to_string()) - } else { - LedgerFile::StdInOut - }; + let ledger_path = matches.value_of("ledger").unwrap(); let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr); let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT); @@ -85,11 +82,11 @@ fn main() -> () { let testnet_addr: SocketAddr = testnet_address_string.parse().unwrap(); drone_addr.set_ip(testnet_addr.ip()); - FullNode::new(node, false, ledger, keypair, Some(testnet_addr)) + FullNode::new(node, false, ledger_path, keypair, Some(testnet_addr)) } else { node.data.leader_id = node.data.id; - FullNode::new(node, true, ledger, keypair, None) + FullNode::new(node, true, ledger_path, keypair, None) }; let mut client = mk_client(&repl_clone); diff --git a/src/bin/genesis.rs b/src/bin/genesis.rs index 8db6fe81a..b1a7e1133 100644 --- a/src/bin/genesis.rs +++ b/src/bin/genesis.rs @@ -8,10 +8,10 @@ extern crate solana; use atty::{is, Stream}; use clap::{App, Arg}; -use solana::entry_writer::EntryWriter; +use solana::ledger::LedgerWriter; use solana::mint::Mint; use std::error; -use std::io::{stdin, stdout, Read}; +use std::io::{stdin, Read}; use std::process::exit; fn main() -> Result<(), Box> { @@ -25,9 +25,19 @@ fn main() -> Result<(), Box> { .required(true) .help("Number of tokens with which to initialize mint"), ) + .arg( + Arg::with_name("ledger") + .short("l") + .long("ledger") + .value_name("DIR") + .takes_value(true) + .required(true) + .help("use DIR as persistent ledger location"), + ) .get_matches(); let tokens = value_t_or_exit!(matches, "tokens", i64); + let ledger_path = matches.value_of("ledger").unwrap(); if is(Stream::Stdin) { eprintln!("nothing found on stdin, expected a json file"); @@ -44,7 +54,8 @@ fn main() -> Result<(), Box> { let pkcs8: Vec = serde_json::from_str(&buffer)?; let mint = Mint::new_with_pkcs8(tokens, pkcs8); - let mut writer = stdout(); - EntryWriter::write_entries(&mut writer, mint.create_entries())?; + let mut ledger_writer = LedgerWriter::new(&ledger_path)?; + ledger_writer.write_entries(mint.create_entries())?; + Ok(()) } diff --git a/src/drone.rs b/src/drone.rs index c0ec1acf1..ce5a69cb6 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -164,7 +164,7 @@ mod tests { use mint::Mint; use service::Service; use signature::{KeyPair, KeyPairUtil}; - use std::io::sink; + use std::fs::remove_dir_all; use std::net::{SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -259,6 +259,21 @@ mod tests { assert_eq!(drone.request_cap, REQUEST_CAP); } + fn tmp_ledger_path(name: &str) -> String { + let keypair = KeyPair::new(); + + let id = { + let ids: Vec<_> = keypair + .pubkey() + .iter() + .map(|id| format!("{}", id)) + .collect(); + ids.join("") + }; + + format!("farf/{}-{}", name, id) + } + #[test] #[ignore] fn test_send_airdrop() { @@ -275,6 +290,7 @@ mod tests { let carlos_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_data = leader.data.clone(); + let ledger_path = tmp_ledger_path("send_airdrop"); let server = FullNode::new_leader( leader_keypair, @@ -284,7 +300,7 @@ mod tests { Some(Duration::from_millis(30)), leader, exit.clone(), - sink(), + &ledger_path, false, ); //TODO: this seems unstable @@ -336,5 +352,6 @@ mod tests { exit.store(true, Ordering::Relaxed); server.join().unwrap(); + remove_dir_all(ledger_path).unwrap(); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index a87207f94..fe45f6814 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -3,17 +3,13 @@ use bank::Bank; use crdt::{Crdt, NodeInfo, TestNode}; use entry::Entry; -use entry_writer; -use ledger::Block; +use ledger::{read_ledger, Block}; use ncp::Ncp; use packet::BlobRecycler; use rpu::Rpu; use service::Service; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; -use std::fs::{File, OpenOptions}; -use std::io::{stdin, stdout, BufReader}; -use std::io::{Read, Write}; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; @@ -30,11 +26,6 @@ pub struct FullNode { thread_hdls: Vec>, } -pub enum LedgerFile { - StdInOut, - Path(String), -} - #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] /// Fullnode configuration to be stored in file pub struct Config { @@ -61,28 +52,17 @@ impl FullNode { fn new_internal( mut node: TestNode, leader: bool, - ledger: LedgerFile, + ledger_path: &str, keypair: KeyPair, network_entry_for_validator: Option, sigverify_disabled: bool, ) -> FullNode { info!("creating bank..."); let bank = Bank::default(); - let (infile, outfile): (Box, Box) = match ledger { - LedgerFile::Path(path) => ( - Box::new(File::open(path.clone()).expect("opening ledger file")), - Box::new( - OpenOptions::new() - .create(true) - .append(true) - .open(path) - .expect("opening ledger file"), - ), - ), - LedgerFile::StdInOut => (Box::new(stdin()), Box::new(stdout())), - }; - let reader = BufReader::new(infile); - let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry")); + + let entries = read_ledger(ledger_path).expect("opening ledger"); + + let entries = entries.map(|e| e.expect("failed to parse entry")); info!("processing ledger..."); let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger"); @@ -112,6 +92,7 @@ impl FullNode { node, &network_entry_point, exit.clone(), + ledger_path, sigverify_disabled, ); info!( @@ -131,7 +112,7 @@ impl FullNode { None, node, exit.clone(), - outfile, + ledger_path, sigverify_disabled, ); info!( @@ -145,7 +126,7 @@ impl FullNode { pub fn new( node: TestNode, leader: bool, - ledger: LedgerFile, + ledger: &str, keypair: KeyPair, network_entry_for_validator: Option, ) -> FullNode { @@ -162,7 +143,7 @@ impl FullNode { pub fn new_without_sigverify( node: TestNode, leader: bool, - ledger: LedgerFile, + ledger: &str, keypair: KeyPair, network_entry_for_validator: Option, ) -> FullNode { @@ -220,7 +201,7 @@ impl FullNode { /// | | `------------` /// `---------------------` /// ``` - pub fn new_leader( + pub fn new_leader( keypair: KeyPair, bank: Bank, entry_height: u64, @@ -228,7 +209,7 @@ impl FullNode { tick_duration: Option, node: TestNode, exit: Arc, - writer: W, + ledger_path: &str, sigverify_disabled: bool, ) -> Self { let bank = Arc::new(bank); @@ -245,6 +226,9 @@ impl FullNode { let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler); let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); + + // let mut ledger_writer = LedgerWriter::new(ledger_path); + let (tpu, blob_receiver) = Tpu::new( keypair, &bank, @@ -253,7 +237,7 @@ impl FullNode { node.sockets.transaction, &blob_recycler, exit.clone(), - writer, + ledger_path, sigverify_disabled, ); thread_hdls.extend(tpu.thread_hdls()); @@ -316,6 +300,7 @@ impl FullNode { node: TestNode, entry_point: &NodeInfo, exit: Arc, + ledger_path: &str, _sigverify_disabled: bool, ) -> Self { let bank = Arc::new(bank); @@ -353,6 +338,7 @@ impl FullNode { node.sockets.replicate, node.sockets.repair, node.sockets.retransmit, + ledger_path, exit.clone(), ); thread_hdls.extend(tvu.thread_hdls()); @@ -391,8 +377,25 @@ mod tests { use mint::Mint; use service::Service; use signature::{KeyPair, KeyPairUtil}; + use std::fs::remove_dir_all; use std::sync::atomic::AtomicBool; use std::sync::Arc; + + fn tmp_ledger_path(name: &str) -> String { + let keypair = KeyPair::new(); + + let id = { + let ids: Vec<_> = keypair + .pubkey() + .iter() + .map(|id| format!("{}", id)) + .collect(); + ids.join("") + }; + + format!("farf/{}-{}", name, id) + } + #[test] fn validator_exit() { let kp = KeyPair::new(); @@ -401,13 +404,15 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false); + let lp = tmp_ledger_path("validator_exit"); + let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false); v.exit(); v.join().unwrap(); + remove_dir_all(lp).unwrap(); } #[test] fn validator_parallel_exit() { - let vals: Vec = (0..2) + let vals: Vec<(FullNode, String)> = (0..2) .map(|_| { let kp = KeyPair::new(); let tn = TestNode::new_localhost_with_pubkey(kp.pubkey()); @@ -415,13 +420,20 @@ mod tests { let bank = Bank::new(&alice); let exit = Arc::new(AtomicBool::new(false)); let entry = tn.data.clone(); - FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false) + let lp = tmp_ledger_path("validator_parallel_exit"); + ( + FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false), + lp, + ) }) .collect(); //each validator can exit in parallel to speed many sequential calls to `join` - vals.iter().for_each(|v| v.exit()); + vals.iter().for_each(|v| v.0.exit()); //while join is called sequentially, the above exit call notified all the //validators to exit from all their threads - vals.into_iter().for_each(|v| v.join().unwrap()); + vals.into_iter().for_each(|v| { + v.0.join().unwrap(); + remove_dir_all(v.1).unwrap() + }); } } diff --git a/src/ledger.rs b/src/ledger.rs index 9a66f5762..10ad25d12 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -39,17 +39,15 @@ fn entry_at(file: &mut File, at: u64) -> io::Result { deserialize_from(file.take(len)).map_err(err_bincode_to_io) } -fn next_offset(file: &mut File) -> io::Result { - deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) -} - -// unused, but would work for the iterator if we only have the data file... -// -//fn next_entry(file: &mut File) -> io::Result { -// let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?; -// deserialize_from(file.take(len)).map_err(err_bincode_to_io) +//fn next_offset(file: &mut File) -> io::Result { +// deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) //} +fn next_entry(file: &mut File) -> io::Result { + let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?; + deserialize_from(file.take(len)).map_err(err_bincode_to_io) +} + fn u64_at(file: &mut File, at: u64) -> io::Result { file.seek(SeekFrom::Start(at))?; deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io) @@ -112,7 +110,7 @@ impl LedgerWriter { Ok(LedgerWriter { index, data }) } - fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { + pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> { let offset = self.data.seek(SeekFrom::Current(0))?; let len = serialized_size(&entry).map_err(err_bincode_to_io)?; @@ -146,8 +144,8 @@ impl Iterator for LedgerReader { type Item = io::Result; fn next(&mut self) -> Option> { - match next_offset(&mut self.index) { - Ok(offset) => Some(entry_at(&mut self.data, offset)), + match next_entry(&mut self.data) { + Ok(entry) => Some(Ok(entry)), Err(_) => None, } } @@ -163,7 +161,7 @@ pub fn read_ledger(directory: &str) -> io::Result io::Result<()> { +pub fn copy_ledger(from: &str, to: &str) -> io::Result<()> { let mut to = LedgerWriter::new(to)?; for entry in read_ledger(from)? { @@ -321,6 +319,12 @@ mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use transaction::{Transaction, Vote}; + fn tmp_ledger_path(name: &str) -> String { + let keypair = KeyPair::new(); + + format!("farf/{}-{}", name, keypair.pubkey()) + } + #[test] fn test_verify_slice() { let zero = Hash::default(); @@ -437,18 +441,11 @@ mod tests { assert!(entries0.verify(&id)); } - fn tmp_ledger_path() -> String { - let keypair = KeyPair::new(); - - format!( - "target/test_ledger_reader_writer_window-{}", - keypair.pubkey() - ) - } + fn tmp_ledger_path() -> String {} #[test] fn test_ledger_reader_writer() { - let ledger_path = tmp_ledger_path(); + let ledger_path = tmp_ledger_path("test_ledger_reader_writer"); let entries = make_test_entries(); let mut writer = LedgerWriter::new(&ledger_path).unwrap(); @@ -478,16 +475,16 @@ mod tests { std::fs::remove_dir_all(ledger_path).unwrap(); } #[test] - fn test_ledger_copy() { - let from = tmp_ledger_path(); + fn test_copy_ledger() { + let from = tmp_ledger_path("test_ledger_copy_from"); let entries = make_test_entries(); let mut writer = LedgerWriter::new(&from).unwrap(); writer.write_entries(entries.clone()).unwrap(); - let to = tmp_ledger_path(); + let to = tmp_ledger_path("test_ledger_copy_to"); - copy(&from, &to).unwrap(); + copy_ledger(&from, &to).unwrap(); let mut read_entries = vec![]; for x in read_ledger(&to).unwrap() { diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index dbe1051c0..e25f5ee0a 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -3,7 +3,7 @@ use bank::Bank; use counter::Counter; use crdt::Crdt; -use ledger; +use ledger::{reconstruct_entries_from_blobs, LedgerWriter}; use packet::BlobRecycler; use result::{Error, Result}; use service::Service; @@ -31,6 +31,7 @@ impl ReplicateStage { crdt: &Arc>, blob_recycler: &BlobRecycler, window_receiver: &BlobReceiver, + ledger_writer: &mut LedgerWriter, ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available blobs into a single vote @@ -39,7 +40,7 @@ impl ReplicateStage { blobs.append(&mut more); } let blobs_len = blobs.len(); - let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?; + let entries = reconstruct_entries_from_blobs(blobs.clone())?; { let votes = entries_to_votes(&entries); let mut wcrdt = crdt.write().unwrap(); @@ -49,7 +50,11 @@ impl ReplicateStage { "replicate-transactions", entries.iter().map(|x| x.transactions.len()).sum() ); + + ledger_writer.write_entries(entries.clone())?; + let res = bank.process_entries(entries); + if res.is_err() { error!("process_entries {} {:?}", blobs_len, res); } @@ -65,6 +70,7 @@ impl ReplicateStage { crdt: Arc>, blob_recycler: BlobRecycler, window_receiver: BlobReceiver, + ledger_path: &str, exit: Arc, ) -> Self { let (vote_blob_sender, vote_blob_receiver) = channel(); @@ -84,13 +90,18 @@ impl ReplicateStage { vote_blob_sender, exit, ); + let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap(); let t_replicate = Builder::new() .name("solana-replicate-stage".to_string()) .spawn(move || loop { - if let Err(e) = - Self::replicate_requests(&bank, &crdt, &blob_recycler, &window_receiver) - { + if let Err(e) = Self::replicate_requests( + &bank, + &crdt, + &blob_recycler, + &window_receiver, + &mut ledger_writer, + ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), diff --git a/src/thin_client.rs b/src/thin_client.rs index d09980dea..df2bf4749 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -289,11 +289,26 @@ mod tests { use mint::Mint; use service::Service; use signature::{KeyPair, KeyPairUtil}; - use std::io::sink; + use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use transaction::{Instruction, Plan}; + fn tmp_ledger_path(name: &str) -> String { + let keypair = KeyPair::new(); + + let id = { + let ids: Vec<_> = keypair + .pubkey() + .iter() + .map(|id| format!("{}", id)) + .collect(); + ids.join("") + }; + + format!("farf/{}-{}", name, id) + } + #[test] fn test_thin_client() { logger::setup(); @@ -305,6 +320,7 @@ mod tests { let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); + let ledger_path = tmp_ledger_path("thin_client"); let server = FullNode::new_leader( leader_keypair, @@ -314,7 +330,7 @@ mod tests { Some(Duration::from_millis(30)), leader, exit.clone(), - sink(), + &ledger_path, false, ); sleep(Duration::from_millis(900)); @@ -351,6 +367,7 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_data = leader.data.clone(); + let ledger_path = tmp_ledger_path("bad_sig"); let server = FullNode::new_leader( leader_keypair, @@ -360,7 +377,7 @@ mod tests { Some(Duration::from_millis(30)), leader, exit.clone(), - sink(), + &ledger_path, false, ); //TODO: remove this sleep, or add a retry so CI is stable @@ -397,6 +414,7 @@ mod tests { assert_eq!(balance.unwrap(), 500); exit.store(true, Ordering::Relaxed); server.join().unwrap(); + remove_dir_all(ledger_path).unwrap(); } #[test] @@ -409,6 +427,8 @@ mod tests { let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_data = leader.data.clone(); + let ledger_path = tmp_ledger_path("client_check_signature"); + let server = FullNode::new_leader( leader_keypair, bank, @@ -417,7 +437,7 @@ mod tests { Some(Duration::from_millis(30)), leader, exit.clone(), - sink(), + &ledger_path, false, ); sleep(Duration::from_millis(300)); @@ -443,5 +463,6 @@ mod tests { exit.store(true, Ordering::Relaxed); server.join().unwrap(); + remove_dir_all(ledger_path).unwrap(); } } diff --git a/src/tpu.rs b/src/tpu.rs index d02f161cb..90a4d2845 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -34,7 +34,6 @@ use record_stage::RecordStage; use service::Service; use signature::KeyPair; use sigverify_stage::SigVerifyStage; -use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -52,7 +51,7 @@ pub struct Tpu { } impl Tpu { - pub fn new( + pub fn new( keypair: KeyPair, bank: &Arc, crdt: &Arc>, @@ -60,7 +59,7 @@ impl Tpu { transactions_socket: UdpSocket, blob_recycler: &BlobRecycler, exit: Arc, - writer: W, + ledger_path: &str, sigverify_disabled: bool, ) -> (Self, BlobReceiver) { let packet_recycler = PacketRecycler::default(); @@ -86,7 +85,7 @@ impl Tpu { bank.clone(), crdt.clone(), blob_recycler.clone(), - writer, + ledger_path, entry_receiver, ); diff --git a/src/tvu.rs b/src/tvu.rs index eb421f432..a3e96d631 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -77,6 +77,7 @@ impl Tvu { replicate_socket: UdpSocket, repair_socket: UdpSocket, retransmit_socket: UdpSocket, + ledger_path: &str, exit: Arc, ) -> Self { let blob_recycler = BlobRecycler::default(); @@ -103,6 +104,7 @@ impl Tvu { crdt, blob_recycler, blob_window_receiver, + ledger_path, exit, ); @@ -151,6 +153,7 @@ pub mod tests { use service::Service; use signature::{KeyPair, KeyPairUtil}; use std::collections::VecDeque; + use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; @@ -170,6 +173,22 @@ pub mod tests { let ncp = Ncp::new(&crdt, window.clone(), listen, send_sock, exit)?; Ok((ncp, window)) } + + fn tmp_ledger_path(name: &str) -> String { + let keypair = KeyPair::new(); + + let id = { + let ids: Vec<_> = keypair + .pubkey() + .iter() + .map(|id| format!("{}", id)) + .collect(); + ids.join("") + }; + + format!("farf/{}-{}", name, id) + } + /// Test that message sent from leader to target1 and replicated to target2 #[test] fn test_replicate() { @@ -229,6 +248,7 @@ pub mod tests { let cref1 = Arc::new(RwLock::new(crdt1)); let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap(); + let ledger_path = tmp_ledger_path("replicate"); let tvu = Tvu::new( target1_keypair, &bank, @@ -238,6 +258,7 @@ pub mod tests { target1.sockets.replicate, target1.sockets.repair, target1.sockets.retransmit, + &ledger_path, exit.clone(), ); @@ -307,5 +328,6 @@ pub mod tests { dr_1.0.join().expect("join"); t_receiver.join().expect("join"); t_responder.join().expect("join"); + remove_dir_all(ledger_path).unwrap(); } } diff --git a/src/write_stage.rs b/src/write_stage.rs index 34de2328c..3436a2138 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -6,14 +6,12 @@ use bank::Bank; use counter::Counter; use crdt::Crdt; use entry::Entry; -use entry_writer::EntryWriter; -use ledger::Block; +use ledger::{Block, LedgerWriter}; use packet::BlobRecycler; use result::{Error, Result}; use service::Service; use signature::KeyPair; use std::collections::VecDeque; -use std::io::Write; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; @@ -31,21 +29,30 @@ pub struct WriteStage { impl WriteStage { /// Process any Entry items that have been published by the RecordStage. /// continuosly broadcast blobs of entries out - pub fn write_and_send_entries( + pub fn write_and_send_entries( crdt: &Arc>, - entry_writer: &mut EntryWriter, + bank: &Arc, + ledger_writer: &mut LedgerWriter, blob_sender: &BlobSender, blob_recycler: &BlobRecycler, entry_receiver: &Receiver>, ) -> Result<()> { let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; + let votes = entries_to_votes(&entries); crdt.write().unwrap().insert_votes(&votes); + for entry in entries.clone() { + ledger_writer.write_entry(&entry)?; + if !entry.has_more { + bank.register_entry_id(&entry.id); + } + } + //TODO(anatoly): real stake based voting needs to change this //leader simply votes if the current set of validators have voted //on a valid last id - entry_writer.write_and_register_entries(&entries)?; + trace!("New blobs? {}", entries.len()); let mut blobs = VecDeque::new(); entries.to_blobs(blob_recycler, &mut blobs); @@ -60,12 +67,12 @@ impl WriteStage { } /// Create a new WriteStage for writing and broadcasting entries. - pub fn new( + pub fn new( keypair: KeyPair, bank: Arc, crdt: Arc>, blob_recycler: BlobRecycler, - writer: W, + ledger_path: &str, entry_receiver: Receiver>, ) -> (Self, BlobReceiver) { let (vote_blob_sender, vote_blob_receiver) = channel(); @@ -77,16 +84,18 @@ impl WriteStage { vote_blob_receiver, ); let (blob_sender, blob_receiver) = channel(); + let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap(); + let thread_hdl = Builder::new() .name("solana-writer".to_string()) .spawn(move || { - let mut entry_writer = EntryWriter::new(&bank, writer); let mut last_vote = 0; let debug_id = crdt.read().unwrap().debug_id(); loop { if let Err(e) = Self::write_and_send_entries( &crdt, - &mut entry_writer, + &bank, + &mut ledger_writer, &blob_sender, &blob_recycler, &entry_receiver, diff --git a/tests/multinode.rs b/tests/multinode.rs index ea5d792ff..1559eb22a 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -4,10 +4,9 @@ extern crate bincode; extern crate serde_json; extern crate solana; -use solana::crdt::TestNode; -use solana::crdt::{Crdt, NodeInfo}; -use solana::entry_writer::EntryWriter; -use solana::fullnode::{FullNode, LedgerFile}; +use solana::crdt::{Crdt, NodeInfo, TestNode}; +use solana::fullnode::FullNode; +use solana::ledger::{copy_ledger, LedgerWriter}; use solana::logger; use solana::mint::Mint; use solana::ncp::Ncp; @@ -18,7 +17,7 @@ use solana::thin_client::ThinClient; use solana::timing::duration_as_s; use std::cmp::max; use std::env; -use std::fs::File; +use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; @@ -73,16 +72,27 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec { rv } -fn genesis(num: i64) -> (Mint, String) { - let mint = Mint::new(num); - let path = format!( - "target/test_multi_node_dynamic_network-{}.log", - mint.pubkey() - ); - let mut writer = File::create(path.clone()).unwrap(); +fn tmp_ledger_path(name: &str) -> String { + let keypair = KeyPair::new(); - EntryWriter::write_entries(&mut writer, mint.create_entries()).unwrap(); - (mint, path.to_string()) + format!("farf/{}-{}", name, keypair.pubkey()) +} + +fn genesis(name: &str, num: i64) -> (Mint, String) { + let mint = Mint::new(num); + + let path = tmp_ledger_path(name); + let mut writer = LedgerWriter::new(&path).unwrap(); + + writer.write_entries(mint.create_entries()).unwrap(); + + (mint, path) +} + +fn tmp_copy_ledger(from: &str, name: &str) -> String { + let to = tmp_ledger_path(name); + copy_ledger(from, &to).unwrap(); + to } #[test] @@ -95,15 +105,12 @@ fn test_multi_node_validator_catchup_from_zero() { let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); + let mut ledger_paths = Vec::new(); - let (alice, ledger_path) = genesis(10_000); - let server = FullNode::new( - leader, - true, - LedgerFile::Path(ledger_path.clone()), - leader_keypair, - None, - ); + let (alice, leader_ledger_path) = genesis("multi_node_validator_catchup_from_zero", 10_000); + ledger_paths.push(leader_ledger_path.clone()); + + let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None); // Send leader some tokens to vote let leader_balance = @@ -114,10 +121,16 @@ fn test_multi_node_validator_catchup_from_zero() { for _ in 0..N { let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let ledger_path = tmp_copy_ledger( + &leader_ledger_path, + "multi_node_validator_catchup_from_zero_validator", + ); + ledger_paths.push(ledger_path.clone()); + let mut val = FullNode::new( validator, false, - LedgerFile::Path(ledger_path.clone()), + &ledger_path, keypair, Some(leader_data.contact_info.ncp), ); @@ -148,10 +161,15 @@ fn test_multi_node_validator_catchup_from_zero() { // start up another validator, converge and then check everyone's balances let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let ledger_path = tmp_copy_ledger( + &leader_ledger_path, + "multi_node_validator_catchup_from_zero", + ); + ledger_paths.push(ledger_path.clone()); let val = FullNode::new( validator, false, - LedgerFile::Path(ledger_path.clone()), + &ledger_path, keypair, Some(leader_data.contact_info.ncp), ); @@ -191,6 +209,9 @@ fn test_multi_node_validator_catchup_from_zero() { for node in nodes { node.close().unwrap(); } + for path in ledger_paths { + remove_dir_all(path).unwrap(); + } } #[test] @@ -204,14 +225,11 @@ fn test_multi_node_basic() { let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); - let (alice, ledger_path) = genesis(10_000); - let server = FullNode::new( - leader, - true, - LedgerFile::Path(ledger_path.clone()), - leader_keypair, - None, - ); + let mut ledger_paths = Vec::new(); + + let (alice, leader_ledger_path) = genesis("multi_node_basic", 10_000); + ledger_paths.push(leader_ledger_path.clone()); + let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None); // Send leader some tokens to vote let leader_balance = @@ -222,10 +240,12 @@ fn test_multi_node_basic() { for _ in 0..N { let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); + let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic"); + ledger_paths.push(ledger_path.clone()); let val = FullNode::new( validator, false, - LedgerFile::Path(ledger_path.clone()), + &ledger_path, keypair, Some(leader_data.contact_info.ncp), ); @@ -254,7 +274,9 @@ fn test_multi_node_basic() { for node in nodes { node.close().unwrap(); } - std::fs::remove_file(ledger_path).unwrap(); + for path in ledger_paths { + remove_dir_all(path).unwrap(); + } } #[test] @@ -263,15 +285,12 @@ fn test_boot_validator_from_file() { let leader_keypair = KeyPair::new(); let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = KeyPair::new().pubkey(); - let (alice, ledger_path) = genesis(100_000); + let (alice, leader_ledger_path) = genesis("boot_validator_from_file", 100_000); + let mut ledger_paths = Vec::new(); + ledger_paths.push(leader_ledger_path.clone()); + let leader_data = leader.data.clone(); - let leader_fullnode = FullNode::new( - leader, - true, - LedgerFile::Path(ledger_path.clone()), - leader_keypair, - None, - ); + let leader_fullnode = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap(); assert_eq!(leader_balance, 500); @@ -282,10 +301,12 @@ fn test_boot_validator_from_file() { let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.data.clone(); + let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file"); + ledger_paths.push(ledger_path.clone()); let val_fullnode = FullNode::new( validator, false, - LedgerFile::Path(ledger_path.clone()), + &ledger_path, keypair, Some(leader_data.contact_info.ncp), ); @@ -295,20 +316,16 @@ fn test_boot_validator_from_file() { val_fullnode.close().unwrap(); leader_fullnode.close().unwrap(); - std::fs::remove_file(ledger_path).unwrap(); + for path in ledger_paths { + remove_dir_all(path).unwrap(); + } } fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) { let leader_keypair = KeyPair::new(); let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); - let leader_fullnode = FullNode::new( - leader, - true, - LedgerFile::Path(ledger_path.to_string()), - leader_keypair, - None, - ); + let leader_fullnode = FullNode::new(leader, true, &ledger_path, leader_keypair, None); (leader_data, leader_fullnode) } @@ -319,7 +336,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { // ledger (currently up to WINDOW_SIZE entries) logger::setup(); - let (alice, ledger_path) = genesis(100_000); + let (alice, ledger_path) = genesis("leader_restart_validator_start_from_old_ledger", 100_000); let bob_pubkey = KeyPair::new().pubkey(); let (leader_data, leader_fullnode) = create_leader(&ledger_path); @@ -330,11 +347,10 @@ fn test_leader_restart_validator_start_from_old_ledger() { assert_eq!(leader_balance, 500); // create a "stale" ledger by copying current ledger - let mut stale_ledger_path = ledger_path.clone(); - stale_ledger_path.insert_str(ledger_path.rfind("/").unwrap() + 1, "stale_"); - - std::fs::copy(&ledger_path, &stale_ledger_path) - .expect(format!("copy {} to {}", &ledger_path, &stale_ledger_path,).as_str()); + let stale_ledger_path = tmp_copy_ledger( + &ledger_path, + "leader_restart_validator_start_from_old_ledger", + ); // restart the leader leader_fullnode.close().unwrap(); @@ -353,10 +369,11 @@ fn test_leader_restart_validator_start_from_old_ledger() { let keypair = KeyPair::new(); let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.data.clone(); + let val_fullnode = FullNode::new( validator, false, - LedgerFile::Path(stale_ledger_path.clone()), + &stale_ledger_path, keypair, Some(leader_data.contact_info.ncp), ); @@ -382,8 +399,8 @@ fn test_leader_restart_validator_start_from_old_ledger() { val_fullnode.close().unwrap(); leader_fullnode.close().unwrap(); - std::fs::remove_file(ledger_path).unwrap(); - std::fs::remove_file(stale_ledger_path).unwrap(); + remove_dir_all(ledger_path).unwrap(); + remove_dir_all(stale_ledger_path).unwrap(); } //TODO: this test will run a long time so it's disabled for CI @@ -409,16 +426,16 @@ fn test_multi_node_dynamic_network() { let leader_pubkey = leader_keypair.pubkey().clone(); let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = KeyPair::new().pubkey(); - let (alice, ledger_path) = genesis(10_000_000); + let (alice, leader_ledger_path) = genesis("multi_node_dynamic_network", 10_000_000); + + let mut ledger_paths = Vec::new(); + ledger_paths.push(leader_ledger_path.clone()); + let alice_arc = Arc::new(RwLock::new(alice)); let leader_data = leader.data.clone(); - let server = FullNode::new_without_sigverify( - leader, - true, - LedgerFile::Path(ledger_path.clone()), - leader_keypair, - None, - ); + + let server = + FullNode::new_without_sigverify(leader, true, &leader_ledger_path, leader_keypair, None); // Send leader some tokens to vote let leader_balance = send_tx_and_retry_get_balance( @@ -478,7 +495,8 @@ fn test_multi_node_dynamic_network() { .into_iter() .map(|keypair| { let leader_data = leader_data.clone(); - let ledger_path = ledger_path.clone(); + let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_dynamic_network"); + ledger_paths.push(ledger_path.clone()); Builder::new() .name("validator-launch-thread".to_string()) .spawn(move || { @@ -488,7 +506,7 @@ fn test_multi_node_dynamic_network() { let val = FullNode::new_without_sigverify( validator, false, - LedgerFile::Path(ledger_path.clone()), + &ledger_path, keypair, Some(leader_data.contact_info.ncp), ); @@ -597,7 +615,9 @@ fn test_multi_node_dynamic_network() { } server.join().unwrap(); - std::fs::remove_file(ledger_path).unwrap(); + for path in ledger_paths { + remove_dir_all(path).unwrap(); + } } fn mk_client(leader: &NodeInfo) -> ThinClient {