Revert "plug in new ledger"

This reverts commit 46d9ba5ca0.
This commit is contained in:
Rob Walker 2018-08-03 10:13:00 -07:00
parent e2c68d8775
commit 57e928d1d0
15 changed files with 181 additions and 298 deletions

5
.gitignore vendored
View File

@ -1,6 +1,5 @@
Cargo.lock
/target/
**/*.rs.bk
.cargo
@ -12,5 +11,5 @@ Cargo.lock
/config-client/
/multinode-demo/test/config-client/
# test temp files, ledgers, etc.
/farf/
# Generated log files
/log-*.txt

View File

@ -28,7 +28,7 @@ tune_networking
trap 'kill "$pid" && wait "$pid"' INT TERM
$program \
--identity "$SOLANA_CONFIG_DIR"/leader.json \
--ledger "$SOLANA_CONFIG_DIR"/ledger \
--ledger "$SOLANA_CONFIG_DIR"/ledger.log \
> >($leader_logger) 2>&1 &
pid=$!
wait "$pid"

View File

@ -91,8 +91,8 @@ if $node_type_leader; then
echo "Creating $mint_path with $num_tokens tokens"
$solana_keygen -o "$mint_path"
echo "Creating $SOLANA_CONFIG_DIR/ledger"
$solana_genesis --tokens="$num_tokens" --ledger "$SOLANA_CONFIG_DIR"/ledger < "$mint_path"
echo "Creating $SOLANA_CONFIG_DIR/ledger.log"
$solana_genesis --tokens="$num_tokens" < "$mint_path" > "$SOLANA_CONFIG_DIR"/ledger.log
echo "Creating $SOLANA_CONFIG_DIR/leader.json"
$solana_fullnode_config --keypair="$leader_id_path" "${leader_address_args[@]}" > "$SOLANA_CONFIG_DIR"/leader.json

View File

@ -71,8 +71,8 @@ SOLANA_LEADER_CONFIG_DIR="$SOLANA_CONFIG_DIR"/leader-config
rm -rf "$SOLANA_LEADER_CONFIG_DIR"
set -ex
$rsync -vPrz --max-size=100M "$rsync_leader_url"/config/ "$SOLANA_LEADER_CONFIG_DIR"
[[ -d "$SOLANA_LEADER_CONFIG_DIR"/ledger ]] || {
echo "Unable to retrieve ledger from $rsync_leader_url"
[[ -r "$SOLANA_LEADER_CONFIG_DIR"/ledger.log ]] || {
echo "Unable to retrieve ledger.log from $rsync_leader_url"
exit 1
}
@ -80,7 +80,7 @@ trap 'kill "$pid" && wait "$pid"' INT TERM
$program \
--identity "$SOLANA_CONFIG_DIR"/validator.json \
--testnet "$leader_address:$leader_port" \
--ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger \
--ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger.log \
> >($validator_logger) 2>&1 &
pid=$!
wait "$pid"

View File

@ -8,7 +8,7 @@ use clap::{App, Arg};
use solana::client::mk_client;
use solana::crdt::{NodeInfo, TestNode};
use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, FullNode};
use solana::fullnode::{Config, FullNode, LedgerFile};
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::service::Service;
@ -41,12 +41,11 @@ fn main() -> () {
)
.arg(
Arg::with_name("ledger")
.short("l")
.short("L")
.long("ledger")
.value_name("DIR")
.value_name("FILE")
.takes_value(true)
.required(true)
.help("use DIR as persistent ledger location"),
.help("use FILE as persistent ledger (defaults to stdin/stdout)"),
)
.get_matches();
@ -73,7 +72,11 @@ fn main() -> () {
let leader_pubkey = keypair.pubkey();
let repl_clone = repl_data.clone();
let ledger_path = matches.value_of("ledger").unwrap();
let ledger = if let Some(l) = matches.value_of("ledger") {
LedgerFile::Path(l.to_string())
} else {
LedgerFile::StdInOut
};
let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr);
let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT);
@ -82,11 +85,11 @@ fn main() -> () {
let testnet_addr: SocketAddr = testnet_address_string.parse().unwrap();
drone_addr.set_ip(testnet_addr.ip());
FullNode::new(node, false, ledger_path, keypair, Some(testnet_addr))
FullNode::new(node, false, ledger, keypair, Some(testnet_addr))
} else {
node.data.leader_id = node.data.id;
FullNode::new(node, true, ledger_path, keypair, None)
FullNode::new(node, true, ledger, keypair, None)
};
let mut client = mk_client(&repl_clone);

View File

@ -8,10 +8,10 @@ extern crate solana;
use atty::{is, Stream};
use clap::{App, Arg};
use solana::ledger::LedgerWriter;
use solana::entry_writer::EntryWriter;
use solana::mint::Mint;
use std::error;
use std::io::{stdin, Read};
use std::io::{stdin, stdout, Read};
use std::process::exit;
fn main() -> Result<(), Box<error::Error>> {
@ -25,19 +25,9 @@ fn main() -> Result<(), Box<error::Error>> {
.required(true)
.help("Number of tokens with which to initialize mint"),
)
.arg(
Arg::with_name("ledger")
.short("l")
.long("ledger")
.value_name("DIR")
.takes_value(true)
.required(true)
.help("use DIR as persistent ledger location"),
)
.get_matches();
let tokens = value_t_or_exit!(matches, "tokens", i64);
let ledger_path = matches.value_of("ledger").unwrap();
if is(Stream::Stdin) {
eprintln!("nothing found on stdin, expected a json file");
@ -54,8 +44,7 @@ fn main() -> Result<(), Box<error::Error>> {
let pkcs8: Vec<u8> = serde_json::from_str(&buffer)?;
let mint = Mint::new_with_pkcs8(tokens, pkcs8);
let mut ledger_writer = LedgerWriter::new(&ledger_path)?;
ledger_writer.write_entries(mint.create_entries())?;
let mut writer = stdout();
EntryWriter::write_entries(&mut writer, mint.create_entries())?;
Ok(())
}

View File

@ -164,7 +164,7 @@ mod tests {
use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::fs::remove_dir_all;
use std::io::sink;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@ -259,21 +259,6 @@ mod tests {
assert_eq!(drone.request_cap, REQUEST_CAP);
}
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
let id = {
let ids: Vec<_> = keypair
.pubkey()
.iter()
.map(|id| format!("{}", id))
.collect();
ids.join("")
};
format!("farf/{}-{}", name, id)
}
#[test]
#[ignore]
fn test_send_airdrop() {
@ -290,7 +275,6 @@ mod tests {
let carlos_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger_path("send_airdrop");
let server = FullNode::new_leader(
leader_keypair,
@ -300,7 +284,7 @@ mod tests {
Some(Duration::from_millis(30)),
leader,
exit.clone(),
&ledger_path,
sink(),
false,
);
//TODO: this seems unstable
@ -352,6 +336,5 @@ mod tests {
exit.store(true, Ordering::Relaxed);
server.join().unwrap();
remove_dir_all(ledger_path).unwrap();
}
}

View File

@ -3,13 +3,17 @@
use bank::Bank;
use crdt::{Crdt, NodeInfo, TestNode};
use entry::Entry;
use ledger::{read_ledger, Block};
use entry_writer;
use ledger::Block;
use ncp::Ncp;
use packet::BlobRecycler;
use rpu::Rpu;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{stdin, stdout, BufReader};
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
@ -26,6 +30,11 @@ pub struct FullNode {
thread_hdls: Vec<JoinHandle<()>>,
}
pub enum LedgerFile {
StdInOut,
Path(String),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
/// Fullnode configuration to be stored in file
pub struct Config {
@ -52,17 +61,28 @@ impl FullNode {
fn new_internal(
mut node: TestNode,
leader: bool,
ledger_path: &str,
ledger: LedgerFile,
keypair: KeyPair,
network_entry_for_validator: Option<SocketAddr>,
sigverify_disabled: bool,
) -> FullNode {
info!("creating bank...");
let bank = Bank::default();
let entries = read_ledger(ledger_path).expect("opening ledger");
let entries = entries.map(|e| e.expect("failed to parse entry"));
let (infile, outfile): (Box<Read>, Box<Write + Send>) = match ledger {
LedgerFile::Path(path) => (
Box::new(File::open(path.clone()).expect("opening ledger file")),
Box::new(
OpenOptions::new()
.create(true)
.append(true)
.open(path)
.expect("opening ledger file"),
),
),
LedgerFile::StdInOut => (Box::new(stdin()), Box::new(stdout())),
};
let reader = BufReader::new(infile);
let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry"));
info!("processing ledger...");
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
@ -92,7 +112,6 @@ impl FullNode {
node,
&network_entry_point,
exit.clone(),
ledger_path,
sigverify_disabled,
);
info!(
@ -112,7 +131,7 @@ impl FullNode {
None,
node,
exit.clone(),
ledger_path,
outfile,
sigverify_disabled,
);
info!(
@ -126,7 +145,7 @@ impl FullNode {
pub fn new(
node: TestNode,
leader: bool,
ledger: &str,
ledger: LedgerFile,
keypair: KeyPair,
network_entry_for_validator: Option<SocketAddr>,
) -> FullNode {
@ -143,7 +162,7 @@ impl FullNode {
pub fn new_without_sigverify(
node: TestNode,
leader: bool,
ledger: &str,
ledger: LedgerFile,
keypair: KeyPair,
network_entry_for_validator: Option<SocketAddr>,
) -> FullNode {
@ -201,7 +220,7 @@ impl FullNode {
/// | | `------------`
/// `---------------------`
/// ```
pub fn new_leader(
pub fn new_leader<W: Write + Send + 'static>(
keypair: KeyPair,
bank: Bank,
entry_height: u64,
@ -209,7 +228,7 @@ impl FullNode {
tick_duration: Option<Duration>,
node: TestNode,
exit: Arc<AtomicBool>,
ledger_path: &str,
writer: W,
sigverify_disabled: bool,
) -> Self {
let bank = Arc::new(bank);
@ -226,9 +245,6 @@ impl FullNode {
let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
// let mut ledger_writer = LedgerWriter::new(ledger_path);
let (tpu, blob_receiver) = Tpu::new(
keypair,
&bank,
@ -237,7 +253,7 @@ impl FullNode {
node.sockets.transaction,
&blob_recycler,
exit.clone(),
ledger_path,
writer,
sigverify_disabled,
);
thread_hdls.extend(tpu.thread_hdls());
@ -300,7 +316,6 @@ impl FullNode {
node: TestNode,
entry_point: &NodeInfo,
exit: Arc<AtomicBool>,
ledger_path: &str,
_sigverify_disabled: bool,
) -> Self {
let bank = Arc::new(bank);
@ -338,7 +353,6 @@ impl FullNode {
node.sockets.replicate,
node.sockets.repair,
node.sockets.retransmit,
ledger_path,
exit.clone(),
);
thread_hdls.extend(tvu.thread_hdls());
@ -377,25 +391,8 @@ mod tests {
use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::fs::remove_dir_all;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
let id = {
let ids: Vec<_> = keypair
.pubkey()
.iter()
.map(|id| format!("{}", id))
.collect();
ids.join("")
};
format!("farf/{}-{}", name, id)
}
#[test]
fn validator_exit() {
let kp = KeyPair::new();
@ -404,15 +401,13 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let lp = tmp_ledger_path("validator_exit");
let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false);
let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false);
v.exit();
v.join().unwrap();
remove_dir_all(lp).unwrap();
}
#[test]
fn validator_parallel_exit() {
let vals: Vec<(FullNode, String)> = (0..2)
let vals: Vec<FullNode> = (0..2)
.map(|_| {
let kp = KeyPair::new();
let tn = TestNode::new_localhost_with_pubkey(kp.pubkey());
@ -420,20 +415,13 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let lp = tmp_ledger_path("validator_parallel_exit");
(
FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false),
lp,
)
FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false)
})
.collect();
//each validator can exit in parallel to speed many sequential calls to `join`
vals.iter().for_each(|v| v.0.exit());
vals.iter().for_each(|v| v.exit());
//while join is called sequentially, the above exit call notified all the
//validators to exit from all their threads
vals.into_iter().for_each(|v| {
v.0.join().unwrap();
remove_dir_all(v.1).unwrap()
});
vals.into_iter().for_each(|v| v.join().unwrap());
}
}

View File

@ -39,15 +39,17 @@ fn entry_at(file: &mut File, at: u64) -> io::Result<Entry> {
deserialize_from(file.take(len)).map_err(err_bincode_to_io)
}
//fn next_offset(file: &mut File) -> io::Result<u64> {
// deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
//}
fn next_entry(file: &mut File) -> io::Result<Entry> {
let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?;
deserialize_from(file.take(len)).map_err(err_bincode_to_io)
fn next_offset(file: &mut File) -> io::Result<u64> {
deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
}
// unused, but would work for the iterator if we only have the data file...
//
//fn next_entry(file: &mut File) -> io::Result<Entry> {
// let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?;
// deserialize_from(file.take(len)).map_err(err_bincode_to_io)
//}
fn u64_at(file: &mut File, at: u64) -> io::Result<u64> {
file.seek(SeekFrom::Start(at))?;
deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
@ -110,7 +112,7 @@ impl LedgerWriter {
Ok(LedgerWriter { index, data })
}
pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
let offset = self.data.seek(SeekFrom::Current(0))?;
let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
@ -144,8 +146,8 @@ impl Iterator for LedgerReader {
type Item = io::Result<Entry>;
fn next(&mut self) -> Option<io::Result<Entry>> {
match next_entry(&mut self.data) {
Ok(entry) => Some(Ok(entry)),
match next_offset(&mut self.index) {
Ok(offset) => Some(entry_at(&mut self.data, offset)),
Err(_) => None,
}
}
@ -161,7 +163,7 @@ pub fn read_ledger(directory: &str) -> io::Result<impl Iterator<Item = io::Resul
Ok(LedgerReader { index, data })
}
pub fn copy_ledger(from: &str, to: &str) -> io::Result<()> {
pub fn copy(from: &str, to: &str) -> io::Result<()> {
let mut to = LedgerWriter::new(to)?;
for entry in read_ledger(from)? {
@ -319,12 +321,6 @@ mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use transaction::{Transaction, Vote};
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
format!("farf/{}-{}", name, keypair.pubkey())
}
#[test]
fn test_verify_slice() {
let zero = Hash::default();
@ -441,11 +437,18 @@ mod tests {
assert!(entries0.verify(&id));
}
fn tmp_ledger_path() -> String {}
fn tmp_ledger_path() -> String {
let keypair = KeyPair::new();
format!(
"target/test_ledger_reader_writer_window-{}",
keypair.pubkey()
)
}
#[test]
fn test_ledger_reader_writer() {
let ledger_path = tmp_ledger_path("test_ledger_reader_writer");
let ledger_path = tmp_ledger_path();
let entries = make_test_entries();
let mut writer = LedgerWriter::new(&ledger_path).unwrap();
@ -475,16 +478,16 @@ mod tests {
std::fs::remove_dir_all(ledger_path).unwrap();
}
#[test]
fn test_copy_ledger() {
let from = tmp_ledger_path("test_ledger_copy_from");
fn test_ledger_copy() {
let from = tmp_ledger_path();
let entries = make_test_entries();
let mut writer = LedgerWriter::new(&from).unwrap();
writer.write_entries(entries.clone()).unwrap();
let to = tmp_ledger_path("test_ledger_copy_to");
let to = tmp_ledger_path();
copy_ledger(&from, &to).unwrap();
copy(&from, &to).unwrap();
let mut read_entries = vec![];
for x in read_ledger(&to).unwrap() {

View File

@ -3,7 +3,7 @@
use bank::Bank;
use counter::Counter;
use crdt::Crdt;
use ledger::{reconstruct_entries_from_blobs, LedgerWriter};
use ledger;
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
@ -31,7 +31,6 @@ impl ReplicateStage {
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
window_receiver: &BlobReceiver,
ledger_writer: &mut LedgerWriter,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available blobs into a single vote
@ -40,7 +39,7 @@ impl ReplicateStage {
blobs.append(&mut more);
}
let blobs_len = blobs.len();
let entries = reconstruct_entries_from_blobs(blobs.clone())?;
let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?;
{
let votes = entries_to_votes(&entries);
let mut wcrdt = crdt.write().unwrap();
@ -50,11 +49,7 @@ impl ReplicateStage {
"replicate-transactions",
entries.iter().map(|x| x.transactions.len()).sum()
);
ledger_writer.write_entries(entries.clone())?;
let res = bank.process_entries(entries);
if res.is_err() {
error!("process_entries {} {:?}", blobs_len, res);
}
@ -70,7 +65,6 @@ impl ReplicateStage {
crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler,
window_receiver: BlobReceiver,
ledger_path: &str,
exit: Arc<AtomicBool>,
) -> Self {
let (vote_blob_sender, vote_blob_receiver) = channel();
@ -90,18 +84,13 @@ impl ReplicateStage {
vote_blob_sender,
exit,
);
let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap();
let t_replicate = Builder::new()
.name("solana-replicate-stage".to_string())
.spawn(move || loop {
if let Err(e) = Self::replicate_requests(
&bank,
&crdt,
&blob_recycler,
&window_receiver,
&mut ledger_writer,
) {
if let Err(e) =
Self::replicate_requests(&bank, &crdt, &blob_recycler, &window_receiver)
{
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),

View File

@ -289,26 +289,11 @@ mod tests {
use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::fs::remove_dir_all;
use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use transaction::{Instruction, Plan};
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
let id = {
let ids: Vec<_> = keypair
.pubkey()
.iter()
.map(|id| format!("{}", id))
.collect();
ids.join("")
};
format!("farf/{}-{}", name, id)
}
#[test]
fn test_thin_client() {
logger::setup();
@ -320,7 +305,6 @@ mod tests {
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let ledger_path = tmp_ledger_path("thin_client");
let server = FullNode::new_leader(
leader_keypair,
@ -330,7 +314,7 @@ mod tests {
Some(Duration::from_millis(30)),
leader,
exit.clone(),
&ledger_path,
sink(),
false,
);
sleep(Duration::from_millis(900));
@ -367,7 +351,6 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger_path("bad_sig");
let server = FullNode::new_leader(
leader_keypair,
@ -377,7 +360,7 @@ mod tests {
Some(Duration::from_millis(30)),
leader,
exit.clone(),
&ledger_path,
sink(),
false,
);
//TODO: remove this sleep, or add a retry so CI is stable
@ -414,7 +397,6 @@ mod tests {
assert_eq!(balance.unwrap(), 500);
exit.store(true, Ordering::Relaxed);
server.join().unwrap();
remove_dir_all(ledger_path).unwrap();
}
#[test]
@ -427,8 +409,6 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger_path("client_check_signature");
let server = FullNode::new_leader(
leader_keypair,
bank,
@ -437,7 +417,7 @@ mod tests {
Some(Duration::from_millis(30)),
leader,
exit.clone(),
&ledger_path,
sink(),
false,
);
sleep(Duration::from_millis(300));
@ -463,6 +443,5 @@ mod tests {
exit.store(true, Ordering::Relaxed);
server.join().unwrap();
remove_dir_all(ledger_path).unwrap();
}
}

View File

@ -34,6 +34,7 @@ use record_stage::RecordStage;
use service::Service;
use signature::KeyPair;
use sigverify_stage::SigVerifyStage;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
@ -51,7 +52,7 @@ pub struct Tpu {
}
impl Tpu {
pub fn new(
pub fn new<W: Write + Send + 'static>(
keypair: KeyPair,
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
@ -59,7 +60,7 @@ impl Tpu {
transactions_socket: UdpSocket,
blob_recycler: &BlobRecycler,
exit: Arc<AtomicBool>,
ledger_path: &str,
writer: W,
sigverify_disabled: bool,
) -> (Self, BlobReceiver) {
let packet_recycler = PacketRecycler::default();
@ -85,7 +86,7 @@ impl Tpu {
bank.clone(),
crdt.clone(),
blob_recycler.clone(),
ledger_path,
writer,
entry_receiver,
);

View File

@ -77,7 +77,6 @@ impl Tvu {
replicate_socket: UdpSocket,
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
ledger_path: &str,
exit: Arc<AtomicBool>,
) -> Self {
let blob_recycler = BlobRecycler::default();
@ -104,7 +103,6 @@ impl Tvu {
crdt,
blob_recycler,
blob_window_receiver,
ledger_path,
exit,
);
@ -153,7 +151,6 @@ pub mod tests {
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
@ -173,22 +170,6 @@ pub mod tests {
let ncp = Ncp::new(&crdt, window.clone(), listen, send_sock, exit)?;
Ok((ncp, window))
}
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
let id = {
let ids: Vec<_> = keypair
.pubkey()
.iter()
.map(|id| format!("{}", id))
.collect();
ids.join("")
};
format!("farf/{}-{}", name, id)
}
/// Test that message sent from leader to target1 and replicated to target2
#[test]
fn test_replicate() {
@ -248,7 +229,6 @@ pub mod tests {
let cref1 = Arc::new(RwLock::new(crdt1));
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap();
let ledger_path = tmp_ledger_path("replicate");
let tvu = Tvu::new(
target1_keypair,
&bank,
@ -258,7 +238,6 @@ pub mod tests {
target1.sockets.replicate,
target1.sockets.repair,
target1.sockets.retransmit,
&ledger_path,
exit.clone(),
);
@ -328,6 +307,5 @@ pub mod tests {
dr_1.0.join().expect("join");
t_receiver.join().expect("join");
t_responder.join().expect("join");
remove_dir_all(ledger_path).unwrap();
}
}

View File

@ -6,12 +6,14 @@ use bank::Bank;
use counter::Counter;
use crdt::Crdt;
use entry::Entry;
use ledger::{Block, LedgerWriter};
use entry_writer::EntryWriter;
use ledger::Block;
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use signature::KeyPair;
use std::collections::VecDeque;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
@ -29,30 +31,21 @@ pub struct WriteStage {
impl WriteStage {
/// Process any Entry items that have been published by the RecordStage.
/// continuosly broadcast blobs of entries out
pub fn write_and_send_entries(
pub fn write_and_send_entries<W: Write>(
crdt: &Arc<RwLock<Crdt>>,
bank: &Arc<Bank>,
ledger_writer: &mut LedgerWriter,
entry_writer: &mut EntryWriter<W>,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> {
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let votes = entries_to_votes(&entries);
crdt.write().unwrap().insert_votes(&votes);
for entry in entries.clone() {
ledger_writer.write_entry(&entry)?;
if !entry.has_more {
bank.register_entry_id(&entry.id);
}
}
//TODO(anatoly): real stake based voting needs to change this
//leader simply votes if the current set of validators have voted
//on a valid last id
entry_writer.write_and_register_entries(&entries)?;
trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new();
entries.to_blobs(blob_recycler, &mut blobs);
@ -67,12 +60,12 @@ impl WriteStage {
}
/// Create a new WriteStage for writing and broadcasting entries.
pub fn new(
pub fn new<W: Write + Send + 'static>(
keypair: KeyPair,
bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler,
ledger_path: &str,
writer: W,
entry_receiver: Receiver<Vec<Entry>>,
) -> (Self, BlobReceiver) {
let (vote_blob_sender, vote_blob_receiver) = channel();
@ -84,18 +77,16 @@ impl WriteStage {
vote_blob_receiver,
);
let (blob_sender, blob_receiver) = channel();
let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap();
let thread_hdl = Builder::new()
.name("solana-writer".to_string())
.spawn(move || {
let mut entry_writer = EntryWriter::new(&bank, writer);
let mut last_vote = 0;
let debug_id = crdt.read().unwrap().debug_id();
loop {
if let Err(e) = Self::write_and_send_entries(
&crdt,
&bank,
&mut ledger_writer,
&mut entry_writer,
&blob_sender,
&blob_recycler,
&entry_receiver,

View File

@ -4,9 +4,10 @@ extern crate bincode;
extern crate serde_json;
extern crate solana;
use solana::crdt::{Crdt, NodeInfo, TestNode};
use solana::fullnode::FullNode;
use solana::ledger::{copy_ledger, LedgerWriter};
use solana::crdt::TestNode;
use solana::crdt::{Crdt, NodeInfo};
use solana::entry_writer::EntryWriter;
use solana::fullnode::{FullNode, LedgerFile};
use solana::logger;
use solana::mint::Mint;
use solana::ncp::Ncp;
@ -17,7 +18,7 @@ use solana::thin_client::ThinClient;
use solana::timing::duration_as_s;
use std::cmp::max;
use std::env;
use std::fs::remove_dir_all;
use std::fs::File;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
@ -72,27 +73,16 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
rv
}
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
format!("farf/{}-{}", name, keypair.pubkey())
}
fn genesis(name: &str, num: i64) -> (Mint, String) {
fn genesis(num: i64) -> (Mint, String) {
let mint = Mint::new(num);
let path = format!(
"target/test_multi_node_dynamic_network-{}.log",
mint.pubkey()
);
let mut writer = File::create(path.clone()).unwrap();
let path = tmp_ledger_path(name);
let mut writer = LedgerWriter::new(&path).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
(mint, path)
}
fn tmp_copy_ledger(from: &str, name: &str) -> String {
let to = tmp_ledger_path(name);
copy_ledger(from, &to).unwrap();
to
EntryWriter::write_entries(&mut writer, mint.create_entries()).unwrap();
(mint, path.to_string())
}
#[test]
@ -105,12 +95,15 @@ fn test_multi_node_validator_catchup_from_zero() {
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey();
let mut ledger_paths = Vec::new();
let (alice, leader_ledger_path) = genesis("multi_node_validator_catchup_from_zero", 10_000);
ledger_paths.push(leader_ledger_path.clone());
let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
let (alice, ledger_path) = genesis(10_000);
let server = FullNode::new(
leader,
true,
LedgerFile::Path(ledger_path.clone()),
leader_keypair,
None,
);
// Send leader some tokens to vote
let leader_balance =
@ -121,16 +114,10 @@ fn test_multi_node_validator_catchup_from_zero() {
for _ in 0..N {
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(
&leader_ledger_path,
"multi_node_validator_catchup_from_zero_validator",
);
ledger_paths.push(ledger_path.clone());
let mut val = FullNode::new(
validator,
false,
&ledger_path,
LedgerFile::Path(ledger_path.clone()),
keypair,
Some(leader_data.contact_info.ncp),
);
@ -161,15 +148,10 @@ fn test_multi_node_validator_catchup_from_zero() {
// start up another validator, converge and then check everyone's balances
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(
&leader_ledger_path,
"multi_node_validator_catchup_from_zero",
);
ledger_paths.push(ledger_path.clone());
let val = FullNode::new(
validator,
false,
&ledger_path,
LedgerFile::Path(ledger_path.clone()),
keypair,
Some(leader_data.contact_info.ncp),
);
@ -209,9 +191,6 @@ fn test_multi_node_validator_catchup_from_zero() {
for node in nodes {
node.close().unwrap();
}
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
}
#[test]
@ -225,11 +204,14 @@ fn test_multi_node_basic() {
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey();
let mut ledger_paths = Vec::new();
let (alice, leader_ledger_path) = genesis("multi_node_basic", 10_000);
ledger_paths.push(leader_ledger_path.clone());
let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
let (alice, ledger_path) = genesis(10_000);
let server = FullNode::new(
leader,
true,
LedgerFile::Path(ledger_path.clone()),
leader_keypair,
None,
);
// Send leader some tokens to vote
let leader_balance =
@ -240,12 +222,10 @@ fn test_multi_node_basic() {
for _ in 0..N {
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic");
ledger_paths.push(ledger_path.clone());
let val = FullNode::new(
validator,
false,
&ledger_path,
LedgerFile::Path(ledger_path.clone()),
keypair,
Some(leader_data.contact_info.ncp),
);
@ -274,9 +254,7 @@ fn test_multi_node_basic() {
for node in nodes {
node.close().unwrap();
}
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
std::fs::remove_file(ledger_path).unwrap();
}
#[test]
@ -285,12 +263,15 @@ fn test_boot_validator_from_file() {
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = KeyPair::new().pubkey();
let (alice, leader_ledger_path) = genesis("boot_validator_from_file", 100_000);
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
let (alice, ledger_path) = genesis(100_000);
let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
let leader_fullnode = FullNode::new(
leader,
true,
LedgerFile::Path(ledger_path.clone()),
leader_keypair,
None,
);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
assert_eq!(leader_balance, 500);
@ -301,12 +282,10 @@ fn test_boot_validator_from_file() {
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file");
ledger_paths.push(ledger_path.clone());
let val_fullnode = FullNode::new(
validator,
false,
&ledger_path,
LedgerFile::Path(ledger_path.clone()),
keypair,
Some(leader_data.contact_info.ncp),
);
@ -316,16 +295,20 @@ fn test_boot_validator_from_file() {
val_fullnode.close().unwrap();
leader_fullnode.close().unwrap();
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
std::fs::remove_file(ledger_path).unwrap();
}
fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) {
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new(leader, true, &ledger_path, leader_keypair, None);
let leader_fullnode = FullNode::new(
leader,
true,
LedgerFile::Path(ledger_path.to_string()),
leader_keypair,
None,
);
(leader_data, leader_fullnode)
}
@ -336,7 +319,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
// ledger (currently up to WINDOW_SIZE entries)
logger::setup();
let (alice, ledger_path) = genesis("leader_restart_validator_start_from_old_ledger", 100_000);
let (alice, ledger_path) = genesis(100_000);
let bob_pubkey = KeyPair::new().pubkey();
let (leader_data, leader_fullnode) = create_leader(&ledger_path);
@ -347,10 +330,11 @@ fn test_leader_restart_validator_start_from_old_ledger() {
assert_eq!(leader_balance, 500);
// create a "stale" ledger by copying current ledger
let stale_ledger_path = tmp_copy_ledger(
&ledger_path,
"leader_restart_validator_start_from_old_ledger",
);
let mut stale_ledger_path = ledger_path.clone();
stale_ledger_path.insert_str(ledger_path.rfind("/").unwrap() + 1, "stale_");
std::fs::copy(&ledger_path, &stale_ledger_path)
.expect(format!("copy {} to {}", &ledger_path, &stale_ledger_path,).as_str());
// restart the leader
leader_fullnode.close().unwrap();
@ -369,11 +353,10 @@ fn test_leader_restart_validator_start_from_old_ledger() {
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let val_fullnode = FullNode::new(
validator,
false,
&stale_ledger_path,
LedgerFile::Path(stale_ledger_path.clone()),
keypair,
Some(leader_data.contact_info.ncp),
);
@ -399,8 +382,8 @@ fn test_leader_restart_validator_start_from_old_ledger() {
val_fullnode.close().unwrap();
leader_fullnode.close().unwrap();
remove_dir_all(ledger_path).unwrap();
remove_dir_all(stale_ledger_path).unwrap();
std::fs::remove_file(ledger_path).unwrap();
std::fs::remove_file(stale_ledger_path).unwrap();
}
//TODO: this test will run a long time so it's disabled for CI
@ -426,16 +409,16 @@ fn test_multi_node_dynamic_network() {
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = KeyPair::new().pubkey();
let (alice, leader_ledger_path) = genesis("multi_node_dynamic_network", 10_000_000);
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
let (alice, ledger_path) = genesis(10_000_000);
let alice_arc = Arc::new(RwLock::new(alice));
let leader_data = leader.data.clone();
let server =
FullNode::new_without_sigverify(leader, true, &leader_ledger_path, leader_keypair, None);
let server = FullNode::new_without_sigverify(
leader,
true,
LedgerFile::Path(ledger_path.clone()),
leader_keypair,
None,
);
// Send leader some tokens to vote
let leader_balance = send_tx_and_retry_get_balance(
@ -495,8 +478,7 @@ fn test_multi_node_dynamic_network() {
.into_iter()
.map(|keypair| {
let leader_data = leader_data.clone();
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_dynamic_network");
ledger_paths.push(ledger_path.clone());
let ledger_path = ledger_path.clone();
Builder::new()
.name("validator-launch-thread".to_string())
.spawn(move || {
@ -506,7 +488,7 @@ fn test_multi_node_dynamic_network() {
let val = FullNode::new_without_sigverify(
validator,
false,
&ledger_path,
LedgerFile::Path(ledger_path.clone()),
keypair,
Some(leader_data.contact_info.ncp),
);
@ -615,9 +597,7 @@ fn test_multi_node_dynamic_network() {
}
server.join().unwrap();
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
std::fs::remove_file(ledger_path).unwrap();
}
fn mk_client(leader: &NodeInfo) -> ThinClient {