plug in new ledger

This commit is contained in:
Rob Walker 2018-08-02 10:18:59 -07:00
parent a9240a42bf
commit 46d9ba5ca0
15 changed files with 299 additions and 182 deletions

5
.gitignore vendored
View File

@ -1,5 +1,6 @@
Cargo.lock
/target/
**/*.rs.bk
.cargo
@ -11,5 +12,5 @@ Cargo.lock
/config-client/
/multinode-demo/test/config-client/
# Generated log files
/log-*.txt
# test temp files, ledgers, etc.
/farf/

View File

@ -28,7 +28,7 @@ tune_networking
trap 'kill "$pid" && wait "$pid"' INT TERM
$program \
--identity "$SOLANA_CONFIG_DIR"/leader.json \
--ledger "$SOLANA_CONFIG_DIR"/ledger.log \
--ledger "$SOLANA_CONFIG_DIR"/ledger \
> >($leader_logger) 2>&1 &
pid=$!
wait "$pid"

View File

@ -91,8 +91,8 @@ if $node_type_leader; then
echo "Creating $mint_path with $num_tokens tokens"
$solana_keygen -o "$mint_path"
echo "Creating $SOLANA_CONFIG_DIR/ledger.log"
$solana_genesis --tokens="$num_tokens" < "$mint_path" > "$SOLANA_CONFIG_DIR"/ledger.log
echo "Creating $SOLANA_CONFIG_DIR/ledger"
$solana_genesis --tokens="$num_tokens" --ledger "$SOLANA_CONFIG_DIR"/ledger < "$mint_path"
echo "Creating $SOLANA_CONFIG_DIR/leader.json"
$solana_fullnode_config --keypair="$leader_id_path" "${leader_address_args[@]}" > "$SOLANA_CONFIG_DIR"/leader.json

View File

@ -71,8 +71,8 @@ SOLANA_LEADER_CONFIG_DIR="$SOLANA_CONFIG_DIR"/leader-config
rm -rf "$SOLANA_LEADER_CONFIG_DIR"
set -ex
$rsync -vPrz --max-size=100M "$rsync_leader_url"/config/ "$SOLANA_LEADER_CONFIG_DIR"
[[ -r "$SOLANA_LEADER_CONFIG_DIR"/ledger.log ]] || {
echo "Unable to retrieve ledger.log from $rsync_leader_url"
[[ -d "$SOLANA_LEADER_CONFIG_DIR"/ledger ]] || {
echo "Unable to retrieve ledger from $rsync_leader_url"
exit 1
}
@ -80,7 +80,7 @@ trap 'kill "$pid" && wait "$pid"' INT TERM
$program \
--identity "$SOLANA_CONFIG_DIR"/validator.json \
--testnet "$leader_address:$leader_port" \
--ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger.log \
--ledger "$SOLANA_LEADER_CONFIG_DIR"/ledger \
> >($validator_logger) 2>&1 &
pid=$!
wait "$pid"

View File

@ -8,7 +8,7 @@ use clap::{App, Arg};
use solana::client::mk_client;
use solana::crdt::{NodeInfo, TestNode};
use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, FullNode, LedgerFile};
use solana::fullnode::{Config, FullNode};
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::service::Service;
@ -41,11 +41,12 @@ fn main() -> () {
)
.arg(
Arg::with_name("ledger")
.short("L")
.short("l")
.long("ledger")
.value_name("FILE")
.value_name("DIR")
.takes_value(true)
.help("use FILE as persistent ledger (defaults to stdin/stdout)"),
.required(true)
.help("use DIR as persistent ledger location"),
)
.get_matches();
@ -72,11 +73,7 @@ fn main() -> () {
let leader_pubkey = keypair.pubkey();
let repl_clone = repl_data.clone();
let ledger = if let Some(l) = matches.value_of("ledger") {
LedgerFile::Path(l.to_string())
} else {
LedgerFile::StdInOut
};
let ledger_path = matches.value_of("ledger").unwrap();
let mut node = TestNode::new_with_bind_addr(repl_data, bind_addr);
let mut drone_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DRONE_PORT);
@ -85,11 +82,11 @@ fn main() -> () {
let testnet_addr: SocketAddr = testnet_address_string.parse().unwrap();
drone_addr.set_ip(testnet_addr.ip());
FullNode::new(node, false, ledger, keypair, Some(testnet_addr))
FullNode::new(node, false, ledger_path, keypair, Some(testnet_addr))
} else {
node.data.leader_id = node.data.id;
FullNode::new(node, true, ledger, keypair, None)
FullNode::new(node, true, ledger_path, keypair, None)
};
let mut client = mk_client(&repl_clone);

View File

@ -8,10 +8,10 @@ extern crate solana;
use atty::{is, Stream};
use clap::{App, Arg};
use solana::entry_writer::EntryWriter;
use solana::ledger::LedgerWriter;
use solana::mint::Mint;
use std::error;
use std::io::{stdin, stdout, Read};
use std::io::{stdin, Read};
use std::process::exit;
fn main() -> Result<(), Box<error::Error>> {
@ -25,9 +25,19 @@ fn main() -> Result<(), Box<error::Error>> {
.required(true)
.help("Number of tokens with which to initialize mint"),
)
.arg(
Arg::with_name("ledger")
.short("l")
.long("ledger")
.value_name("DIR")
.takes_value(true)
.required(true)
.help("use DIR as persistent ledger location"),
)
.get_matches();
let tokens = value_t_or_exit!(matches, "tokens", i64);
let ledger_path = matches.value_of("ledger").unwrap();
if is(Stream::Stdin) {
eprintln!("nothing found on stdin, expected a json file");
@ -44,7 +54,8 @@ fn main() -> Result<(), Box<error::Error>> {
let pkcs8: Vec<u8> = serde_json::from_str(&buffer)?;
let mint = Mint::new_with_pkcs8(tokens, pkcs8);
let mut writer = stdout();
EntryWriter::write_entries(&mut writer, mint.create_entries())?;
let mut ledger_writer = LedgerWriter::new(&ledger_path)?;
ledger_writer.write_entries(mint.create_entries())?;
Ok(())
}

View File

@ -164,7 +164,7 @@ mod tests {
use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::fs::remove_dir_all;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
@ -259,6 +259,21 @@ mod tests {
assert_eq!(drone.request_cap, REQUEST_CAP);
}
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
let id = {
let ids: Vec<_> = keypair
.pubkey()
.iter()
.map(|id| format!("{}", id))
.collect();
ids.join("")
};
format!("farf/{}-{}", name, id)
}
#[test]
#[ignore]
fn test_send_airdrop() {
@ -275,6 +290,7 @@ mod tests {
let carlos_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger_path("send_airdrop");
let server = FullNode::new_leader(
leader_keypair,
@ -284,7 +300,7 @@ mod tests {
Some(Duration::from_millis(30)),
leader,
exit.clone(),
sink(),
&ledger_path,
false,
);
//TODO: this seems unstable
@ -336,5 +352,6 @@ mod tests {
exit.store(true, Ordering::Relaxed);
server.join().unwrap();
remove_dir_all(ledger_path).unwrap();
}
}

View File

@ -3,17 +3,13 @@
use bank::Bank;
use crdt::{Crdt, NodeInfo, TestNode};
use entry::Entry;
use entry_writer;
use ledger::Block;
use ledger::{read_ledger, Block};
use ncp::Ncp;
use packet::BlobRecycler;
use rpu::Rpu;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::fs::{File, OpenOptions};
use std::io::{stdin, stdout, BufReader};
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
@ -30,11 +26,6 @@ pub struct FullNode {
thread_hdls: Vec<JoinHandle<()>>,
}
pub enum LedgerFile {
StdInOut,
Path(String),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
/// Fullnode configuration to be stored in file
pub struct Config {
@ -61,28 +52,17 @@ impl FullNode {
fn new_internal(
mut node: TestNode,
leader: bool,
ledger: LedgerFile,
ledger_path: &str,
keypair: KeyPair,
network_entry_for_validator: Option<SocketAddr>,
sigverify_disabled: bool,
) -> FullNode {
info!("creating bank...");
let bank = Bank::default();
let (infile, outfile): (Box<Read>, Box<Write + Send>) = match ledger {
LedgerFile::Path(path) => (
Box::new(File::open(path.clone()).expect("opening ledger file")),
Box::new(
OpenOptions::new()
.create(true)
.append(true)
.open(path)
.expect("opening ledger file"),
),
),
LedgerFile::StdInOut => (Box::new(stdin()), Box::new(stdout())),
};
let reader = BufReader::new(infile);
let entries = entry_writer::read_entries(reader).map(|e| e.expect("failed to parse entry"));
let entries = read_ledger(ledger_path).expect("opening ledger");
let entries = entries.map(|e| e.expect("failed to parse entry"));
info!("processing ledger...");
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
@ -112,6 +92,7 @@ impl FullNode {
node,
&network_entry_point,
exit.clone(),
ledger_path,
sigverify_disabled,
);
info!(
@ -131,7 +112,7 @@ impl FullNode {
None,
node,
exit.clone(),
outfile,
ledger_path,
sigverify_disabled,
);
info!(
@ -145,7 +126,7 @@ impl FullNode {
pub fn new(
node: TestNode,
leader: bool,
ledger: LedgerFile,
ledger: &str,
keypair: KeyPair,
network_entry_for_validator: Option<SocketAddr>,
) -> FullNode {
@ -162,7 +143,7 @@ impl FullNode {
pub fn new_without_sigverify(
node: TestNode,
leader: bool,
ledger: LedgerFile,
ledger: &str,
keypair: KeyPair,
network_entry_for_validator: Option<SocketAddr>,
) -> FullNode {
@ -220,7 +201,7 @@ impl FullNode {
/// | | `------------`
/// `---------------------`
/// ```
pub fn new_leader<W: Write + Send + 'static>(
pub fn new_leader(
keypair: KeyPair,
bank: Bank,
entry_height: u64,
@ -228,7 +209,7 @@ impl FullNode {
tick_duration: Option<Duration>,
node: TestNode,
exit: Arc<AtomicBool>,
writer: W,
ledger_path: &str,
sigverify_disabled: bool,
) -> Self {
let bank = Arc::new(bank);
@ -245,6 +226,9 @@ impl FullNode {
let window = FullNode::new_window(ledger_tail, entry_height, &node.data, &blob_recycler);
let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new")));
// let mut ledger_writer = LedgerWriter::new(ledger_path);
let (tpu, blob_receiver) = Tpu::new(
keypair,
&bank,
@ -253,7 +237,7 @@ impl FullNode {
node.sockets.transaction,
&blob_recycler,
exit.clone(),
writer,
ledger_path,
sigverify_disabled,
);
thread_hdls.extend(tpu.thread_hdls());
@ -316,6 +300,7 @@ impl FullNode {
node: TestNode,
entry_point: &NodeInfo,
exit: Arc<AtomicBool>,
ledger_path: &str,
_sigverify_disabled: bool,
) -> Self {
let bank = Arc::new(bank);
@ -353,6 +338,7 @@ impl FullNode {
node.sockets.replicate,
node.sockets.repair,
node.sockets.retransmit,
ledger_path,
exit.clone(),
);
thread_hdls.extend(tvu.thread_hdls());
@ -391,8 +377,25 @@ mod tests {
use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::fs::remove_dir_all;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
let id = {
let ids: Vec<_> = keypair
.pubkey()
.iter()
.map(|id| format!("{}", id))
.collect();
ids.join("")
};
format!("farf/{}-{}", name, id)
}
#[test]
fn validator_exit() {
let kp = KeyPair::new();
@ -401,13 +404,15 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false);
let lp = tmp_ledger_path("validator_exit");
let v = FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false);
v.exit();
v.join().unwrap();
remove_dir_all(lp).unwrap();
}
#[test]
fn validator_parallel_exit() {
let vals: Vec<FullNode> = (0..2)
let vals: Vec<(FullNode, String)> = (0..2)
.map(|_| {
let kp = KeyPair::new();
let tn = TestNode::new_localhost_with_pubkey(kp.pubkey());
@ -415,13 +420,20 @@ mod tests {
let bank = Bank::new(&alice);
let exit = Arc::new(AtomicBool::new(false));
let entry = tn.data.clone();
FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, false)
let lp = tmp_ledger_path("validator_parallel_exit");
(
FullNode::new_validator(kp, bank, 0, None, tn, &entry, exit, &lp, false),
lp,
)
})
.collect();
//each validator can exit in parallel to speed many sequential calls to `join`
vals.iter().for_each(|v| v.exit());
vals.iter().for_each(|v| v.0.exit());
//while join is called sequentially, the above exit call notified all the
//validators to exit from all their threads
vals.into_iter().for_each(|v| v.join().unwrap());
vals.into_iter().for_each(|v| {
v.0.join().unwrap();
remove_dir_all(v.1).unwrap()
});
}
}

View File

@ -39,17 +39,15 @@ fn entry_at(file: &mut File, at: u64) -> io::Result<Entry> {
deserialize_from(file.take(len)).map_err(err_bincode_to_io)
}
fn next_offset(file: &mut File) -> io::Result<u64> {
deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
}
// unused, but would work for the iterator if we only have the data file...
//
//fn next_entry(file: &mut File) -> io::Result<Entry> {
// let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?;
// deserialize_from(file.take(len)).map_err(err_bincode_to_io)
//fn next_offset(file: &mut File) -> io::Result<u64> {
// deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
//}
fn next_entry(file: &mut File) -> io::Result<Entry> {
let len = deserialize_from(file.take(SIZEOF_USIZE)).map_err(err_bincode_to_io)?;
deserialize_from(file.take(len)).map_err(err_bincode_to_io)
}
fn u64_at(file: &mut File, at: u64) -> io::Result<u64> {
file.seek(SeekFrom::Start(at))?;
deserialize_from(file.take(SIZEOF_U64)).map_err(err_bincode_to_io)
@ -112,7 +110,7 @@ impl LedgerWriter {
Ok(LedgerWriter { index, data })
}
fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
pub fn write_entry(&mut self, entry: &Entry) -> io::Result<()> {
let offset = self.data.seek(SeekFrom::Current(0))?;
let len = serialized_size(&entry).map_err(err_bincode_to_io)?;
@ -146,8 +144,8 @@ impl Iterator for LedgerReader {
type Item = io::Result<Entry>;
fn next(&mut self) -> Option<io::Result<Entry>> {
match next_offset(&mut self.index) {
Ok(offset) => Some(entry_at(&mut self.data, offset)),
match next_entry(&mut self.data) {
Ok(entry) => Some(Ok(entry)),
Err(_) => None,
}
}
@ -163,7 +161,7 @@ pub fn read_ledger(directory: &str) -> io::Result<impl Iterator<Item = io::Resul
Ok(LedgerReader { index, data })
}
pub fn copy(from: &str, to: &str) -> io::Result<()> {
pub fn copy_ledger(from: &str, to: &str) -> io::Result<()> {
let mut to = LedgerWriter::new(to)?;
for entry in read_ledger(from)? {
@ -321,6 +319,12 @@ mod tests {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use transaction::{Transaction, Vote};
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
format!("farf/{}-{}", name, keypair.pubkey())
}
#[test]
fn test_verify_slice() {
let zero = Hash::default();
@ -437,18 +441,11 @@ mod tests {
assert!(entries0.verify(&id));
}
fn tmp_ledger_path() -> String {
let keypair = KeyPair::new();
format!(
"target/test_ledger_reader_writer_window-{}",
keypair.pubkey()
)
}
fn tmp_ledger_path() -> String {}
#[test]
fn test_ledger_reader_writer() {
let ledger_path = tmp_ledger_path();
let ledger_path = tmp_ledger_path("test_ledger_reader_writer");
let entries = make_test_entries();
let mut writer = LedgerWriter::new(&ledger_path).unwrap();
@ -478,16 +475,16 @@ mod tests {
std::fs::remove_dir_all(ledger_path).unwrap();
}
#[test]
fn test_ledger_copy() {
let from = tmp_ledger_path();
fn test_copy_ledger() {
let from = tmp_ledger_path("test_ledger_copy_from");
let entries = make_test_entries();
let mut writer = LedgerWriter::new(&from).unwrap();
writer.write_entries(entries.clone()).unwrap();
let to = tmp_ledger_path();
let to = tmp_ledger_path("test_ledger_copy_to");
copy(&from, &to).unwrap();
copy_ledger(&from, &to).unwrap();
let mut read_entries = vec![];
for x in read_ledger(&to).unwrap() {

View File

@ -3,7 +3,7 @@
use bank::Bank;
use counter::Counter;
use crdt::Crdt;
use ledger;
use ledger::{reconstruct_entries_from_blobs, LedgerWriter};
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
@ -31,6 +31,7 @@ impl ReplicateStage {
crdt: &Arc<RwLock<Crdt>>,
blob_recycler: &BlobRecycler,
window_receiver: &BlobReceiver,
ledger_writer: &mut LedgerWriter,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available blobs into a single vote
@ -39,7 +40,7 @@ impl ReplicateStage {
blobs.append(&mut more);
}
let blobs_len = blobs.len();
let entries = ledger::reconstruct_entries_from_blobs(blobs.clone())?;
let entries = reconstruct_entries_from_blobs(blobs.clone())?;
{
let votes = entries_to_votes(&entries);
let mut wcrdt = crdt.write().unwrap();
@ -49,7 +50,11 @@ impl ReplicateStage {
"replicate-transactions",
entries.iter().map(|x| x.transactions.len()).sum()
);
ledger_writer.write_entries(entries.clone())?;
let res = bank.process_entries(entries);
if res.is_err() {
error!("process_entries {} {:?}", blobs_len, res);
}
@ -65,6 +70,7 @@ impl ReplicateStage {
crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler,
window_receiver: BlobReceiver,
ledger_path: &str,
exit: Arc<AtomicBool>,
) -> Self {
let (vote_blob_sender, vote_blob_receiver) = channel();
@ -84,13 +90,18 @@ impl ReplicateStage {
vote_blob_sender,
exit,
);
let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap();
let t_replicate = Builder::new()
.name("solana-replicate-stage".to_string())
.spawn(move || loop {
if let Err(e) =
Self::replicate_requests(&bank, &crdt, &blob_recycler, &window_receiver)
{
if let Err(e) = Self::replicate_requests(
&bank,
&crdt,
&blob_recycler,
&window_receiver,
&mut ledger_writer,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),

View File

@ -289,11 +289,26 @@ mod tests {
use mint::Mint;
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::fs::remove_dir_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use transaction::{Instruction, Plan};
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
let id = {
let ids: Vec<_> = keypair
.pubkey()
.iter()
.map(|id| format!("{}", id))
.collect();
ids.join("")
};
format!("farf/{}-{}", name, id)
}
#[test]
fn test_thin_client() {
logger::setup();
@ -305,6 +320,7 @@ mod tests {
let bank = Bank::new(&alice);
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let ledger_path = tmp_ledger_path("thin_client");
let server = FullNode::new_leader(
leader_keypair,
@ -314,7 +330,7 @@ mod tests {
Some(Duration::from_millis(30)),
leader,
exit.clone(),
sink(),
&ledger_path,
false,
);
sleep(Duration::from_millis(900));
@ -351,6 +367,7 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger_path("bad_sig");
let server = FullNode::new_leader(
leader_keypair,
@ -360,7 +377,7 @@ mod tests {
Some(Duration::from_millis(30)),
leader,
exit.clone(),
sink(),
&ledger_path,
false,
);
//TODO: remove this sleep, or add a retry so CI is stable
@ -397,6 +414,7 @@ mod tests {
assert_eq!(balance.unwrap(), 500);
exit.store(true, Ordering::Relaxed);
server.join().unwrap();
remove_dir_all(ledger_path).unwrap();
}
#[test]
@ -409,6 +427,8 @@ mod tests {
let bob_pubkey = KeyPair::new().pubkey();
let exit = Arc::new(AtomicBool::new(false));
let leader_data = leader.data.clone();
let ledger_path = tmp_ledger_path("client_check_signature");
let server = FullNode::new_leader(
leader_keypair,
bank,
@ -417,7 +437,7 @@ mod tests {
Some(Duration::from_millis(30)),
leader,
exit.clone(),
sink(),
&ledger_path,
false,
);
sleep(Duration::from_millis(300));
@ -443,5 +463,6 @@ mod tests {
exit.store(true, Ordering::Relaxed);
server.join().unwrap();
remove_dir_all(ledger_path).unwrap();
}
}

View File

@ -34,7 +34,6 @@ use record_stage::RecordStage;
use service::Service;
use signature::KeyPair;
use sigverify_stage::SigVerifyStage;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
@ -52,7 +51,7 @@ pub struct Tpu {
}
impl Tpu {
pub fn new<W: Write + Send + 'static>(
pub fn new(
keypair: KeyPair,
bank: &Arc<Bank>,
crdt: &Arc<RwLock<Crdt>>,
@ -60,7 +59,7 @@ impl Tpu {
transactions_socket: UdpSocket,
blob_recycler: &BlobRecycler,
exit: Arc<AtomicBool>,
writer: W,
ledger_path: &str,
sigverify_disabled: bool,
) -> (Self, BlobReceiver) {
let packet_recycler = PacketRecycler::default();
@ -86,7 +85,7 @@ impl Tpu {
bank.clone(),
crdt.clone(),
blob_recycler.clone(),
writer,
ledger_path,
entry_receiver,
);

View File

@ -77,6 +77,7 @@ impl Tvu {
replicate_socket: UdpSocket,
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
ledger_path: &str,
exit: Arc<AtomicBool>,
) -> Self {
let blob_recycler = BlobRecycler::default();
@ -103,6 +104,7 @@ impl Tvu {
crdt,
blob_recycler,
blob_window_receiver,
ledger_path,
exit,
);
@ -151,6 +153,7 @@ pub mod tests {
use service::Service;
use signature::{KeyPair, KeyPairUtil};
use std::collections::VecDeque;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
@ -170,6 +173,22 @@ pub mod tests {
let ncp = Ncp::new(&crdt, window.clone(), listen, send_sock, exit)?;
Ok((ncp, window))
}
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
let id = {
let ids: Vec<_> = keypair
.pubkey()
.iter()
.map(|id| format!("{}", id))
.collect();
ids.join("")
};
format!("farf/{}-{}", name, id)
}
/// Test that message sent from leader to target1 and replicated to target2
#[test]
fn test_replicate() {
@ -229,6 +248,7 @@ pub mod tests {
let cref1 = Arc::new(RwLock::new(crdt1));
let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap();
let ledger_path = tmp_ledger_path("replicate");
let tvu = Tvu::new(
target1_keypair,
&bank,
@ -238,6 +258,7 @@ pub mod tests {
target1.sockets.replicate,
target1.sockets.repair,
target1.sockets.retransmit,
&ledger_path,
exit.clone(),
);
@ -307,5 +328,6 @@ pub mod tests {
dr_1.0.join().expect("join");
t_receiver.join().expect("join");
t_responder.join().expect("join");
remove_dir_all(ledger_path).unwrap();
}
}

View File

@ -6,14 +6,12 @@ use bank::Bank;
use counter::Counter;
use crdt::Crdt;
use entry::Entry;
use entry_writer::EntryWriter;
use ledger::Block;
use ledger::{Block, LedgerWriter};
use packet::BlobRecycler;
use result::{Error, Result};
use service::Service;
use signature::KeyPair;
use std::collections::VecDeque;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
@ -31,21 +29,30 @@ pub struct WriteStage {
impl WriteStage {
/// Process any Entry items that have been published by the RecordStage.
/// continuosly broadcast blobs of entries out
pub fn write_and_send_entries<W: Write>(
pub fn write_and_send_entries(
crdt: &Arc<RwLock<Crdt>>,
entry_writer: &mut EntryWriter<W>,
bank: &Arc<Bank>,
ledger_writer: &mut LedgerWriter,
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
entry_receiver: &Receiver<Vec<Entry>>,
) -> Result<()> {
let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let votes = entries_to_votes(&entries);
crdt.write().unwrap().insert_votes(&votes);
for entry in entries.clone() {
ledger_writer.write_entry(&entry)?;
if !entry.has_more {
bank.register_entry_id(&entry.id);
}
}
//TODO(anatoly): real stake based voting needs to change this
//leader simply votes if the current set of validators have voted
//on a valid last id
entry_writer.write_and_register_entries(&entries)?;
trace!("New blobs? {}", entries.len());
let mut blobs = VecDeque::new();
entries.to_blobs(blob_recycler, &mut blobs);
@ -60,12 +67,12 @@ impl WriteStage {
}
/// Create a new WriteStage for writing and broadcasting entries.
pub fn new<W: Write + Send + 'static>(
pub fn new(
keypair: KeyPair,
bank: Arc<Bank>,
crdt: Arc<RwLock<Crdt>>,
blob_recycler: BlobRecycler,
writer: W,
ledger_path: &str,
entry_receiver: Receiver<Vec<Entry>>,
) -> (Self, BlobReceiver) {
let (vote_blob_sender, vote_blob_receiver) = channel();
@ -77,16 +84,18 @@ impl WriteStage {
vote_blob_receiver,
);
let (blob_sender, blob_receiver) = channel();
let mut ledger_writer = LedgerWriter::new(ledger_path).unwrap();
let thread_hdl = Builder::new()
.name("solana-writer".to_string())
.spawn(move || {
let mut entry_writer = EntryWriter::new(&bank, writer);
let mut last_vote = 0;
let debug_id = crdt.read().unwrap().debug_id();
loop {
if let Err(e) = Self::write_and_send_entries(
&crdt,
&mut entry_writer,
&bank,
&mut ledger_writer,
&blob_sender,
&blob_recycler,
&entry_receiver,

View File

@ -4,10 +4,9 @@ extern crate bincode;
extern crate serde_json;
extern crate solana;
use solana::crdt::TestNode;
use solana::crdt::{Crdt, NodeInfo};
use solana::entry_writer::EntryWriter;
use solana::fullnode::{FullNode, LedgerFile};
use solana::crdt::{Crdt, NodeInfo, TestNode};
use solana::fullnode::FullNode;
use solana::ledger::{copy_ledger, LedgerWriter};
use solana::logger;
use solana::mint::Mint;
use solana::ncp::Ncp;
@ -18,7 +17,7 @@ use solana::thin_client::ThinClient;
use solana::timing::duration_as_s;
use std::cmp::max;
use std::env;
use std::fs::File;
use std::fs::remove_dir_all;
use std::net::UdpSocket;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
@ -73,16 +72,27 @@ fn converge(leader: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
rv
}
fn genesis(num: i64) -> (Mint, String) {
let mint = Mint::new(num);
let path = format!(
"target/test_multi_node_dynamic_network-{}.log",
mint.pubkey()
);
let mut writer = File::create(path.clone()).unwrap();
fn tmp_ledger_path(name: &str) -> String {
let keypair = KeyPair::new();
EntryWriter::write_entries(&mut writer, mint.create_entries()).unwrap();
(mint, path.to_string())
format!("farf/{}-{}", name, keypair.pubkey())
}
fn genesis(name: &str, num: i64) -> (Mint, String) {
let mint = Mint::new(num);
let path = tmp_ledger_path(name);
let mut writer = LedgerWriter::new(&path).unwrap();
writer.write_entries(mint.create_entries()).unwrap();
(mint, path)
}
fn tmp_copy_ledger(from: &str, name: &str) -> String {
let to = tmp_ledger_path(name);
copy_ledger(from, &to).unwrap();
to
}
#[test]
@ -95,15 +105,12 @@ fn test_multi_node_validator_catchup_from_zero() {
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey();
let mut ledger_paths = Vec::new();
let (alice, ledger_path) = genesis(10_000);
let server = FullNode::new(
leader,
true,
LedgerFile::Path(ledger_path.clone()),
leader_keypair,
None,
);
let (alice, leader_ledger_path) = genesis("multi_node_validator_catchup_from_zero", 10_000);
ledger_paths.push(leader_ledger_path.clone());
let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
// Send leader some tokens to vote
let leader_balance =
@ -114,10 +121,16 @@ fn test_multi_node_validator_catchup_from_zero() {
for _ in 0..N {
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(
&leader_ledger_path,
"multi_node_validator_catchup_from_zero_validator",
);
ledger_paths.push(ledger_path.clone());
let mut val = FullNode::new(
validator,
false,
LedgerFile::Path(ledger_path.clone()),
&ledger_path,
keypair,
Some(leader_data.contact_info.ncp),
);
@ -148,10 +161,15 @@ fn test_multi_node_validator_catchup_from_zero() {
// start up another validator, converge and then check everyone's balances
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(
&leader_ledger_path,
"multi_node_validator_catchup_from_zero",
);
ledger_paths.push(ledger_path.clone());
let val = FullNode::new(
validator,
false,
LedgerFile::Path(ledger_path.clone()),
&ledger_path,
keypair,
Some(leader_data.contact_info.ncp),
);
@ -191,6 +209,9 @@ fn test_multi_node_validator_catchup_from_zero() {
for node in nodes {
node.close().unwrap();
}
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
}
#[test]
@ -204,14 +225,11 @@ fn test_multi_node_basic() {
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(10_000);
let server = FullNode::new(
leader,
true,
LedgerFile::Path(ledger_path.clone()),
leader_keypair,
None,
);
let mut ledger_paths = Vec::new();
let (alice, leader_ledger_path) = genesis("multi_node_basic", 10_000);
ledger_paths.push(leader_ledger_path.clone());
let server = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
// Send leader some tokens to vote
let leader_balance =
@ -222,10 +240,12 @@ fn test_multi_node_basic() {
for _ in 0..N {
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic");
ledger_paths.push(ledger_path.clone());
let val = FullNode::new(
validator,
false,
LedgerFile::Path(ledger_path.clone()),
&ledger_path,
keypair,
Some(leader_data.contact_info.ncp),
);
@ -254,7 +274,9 @@ fn test_multi_node_basic() {
for node in nodes {
node.close().unwrap();
}
std::fs::remove_file(ledger_path).unwrap();
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
}
#[test]
@ -263,15 +285,12 @@ fn test_boot_validator_from_file() {
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(100_000);
let (alice, leader_ledger_path) = genesis("boot_validator_from_file", 100_000);
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new(
leader,
true,
LedgerFile::Path(ledger_path.clone()),
leader_keypair,
None,
);
let leader_fullnode = FullNode::new(leader, true, &leader_ledger_path, leader_keypair, None);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, Some(500)).unwrap();
assert_eq!(leader_balance, 500);
@ -282,10 +301,12 @@ fn test_boot_validator_from_file() {
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file");
ledger_paths.push(ledger_path.clone());
let val_fullnode = FullNode::new(
validator,
false,
LedgerFile::Path(ledger_path.clone()),
&ledger_path,
keypair,
Some(leader_data.contact_info.ncp),
);
@ -295,20 +316,16 @@ fn test_boot_validator_from_file() {
val_fullnode.close().unwrap();
leader_fullnode.close().unwrap();
std::fs::remove_file(ledger_path).unwrap();
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
}
fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) {
let leader_keypair = KeyPair::new();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.data.clone();
let leader_fullnode = FullNode::new(
leader,
true,
LedgerFile::Path(ledger_path.to_string()),
leader_keypair,
None,
);
let leader_fullnode = FullNode::new(leader, true, &ledger_path, leader_keypair, None);
(leader_data, leader_fullnode)
}
@ -319,7 +336,7 @@ fn test_leader_restart_validator_start_from_old_ledger() {
// ledger (currently up to WINDOW_SIZE entries)
logger::setup();
let (alice, ledger_path) = genesis(100_000);
let (alice, ledger_path) = genesis("leader_restart_validator_start_from_old_ledger", 100_000);
let bob_pubkey = KeyPair::new().pubkey();
let (leader_data, leader_fullnode) = create_leader(&ledger_path);
@ -330,11 +347,10 @@ fn test_leader_restart_validator_start_from_old_ledger() {
assert_eq!(leader_balance, 500);
// create a "stale" ledger by copying current ledger
let mut stale_ledger_path = ledger_path.clone();
stale_ledger_path.insert_str(ledger_path.rfind("/").unwrap() + 1, "stale_");
std::fs::copy(&ledger_path, &stale_ledger_path)
.expect(format!("copy {} to {}", &ledger_path, &stale_ledger_path,).as_str());
let stale_ledger_path = tmp_copy_ledger(
&ledger_path,
"leader_restart_validator_start_from_old_ledger",
);
// restart the leader
leader_fullnode.close().unwrap();
@ -353,10 +369,11 @@ fn test_leader_restart_validator_start_from_old_ledger() {
let keypair = KeyPair::new();
let validator = TestNode::new_localhost_with_pubkey(keypair.pubkey());
let validator_data = validator.data.clone();
let val_fullnode = FullNode::new(
validator,
false,
LedgerFile::Path(stale_ledger_path.clone()),
&stale_ledger_path,
keypair,
Some(leader_data.contact_info.ncp),
);
@ -382,8 +399,8 @@ fn test_leader_restart_validator_start_from_old_ledger() {
val_fullnode.close().unwrap();
leader_fullnode.close().unwrap();
std::fs::remove_file(ledger_path).unwrap();
std::fs::remove_file(stale_ledger_path).unwrap();
remove_dir_all(ledger_path).unwrap();
remove_dir_all(stale_ledger_path).unwrap();
}
//TODO: this test will run a long time so it's disabled for CI
@ -409,16 +426,16 @@ fn test_multi_node_dynamic_network() {
let leader_pubkey = leader_keypair.pubkey().clone();
let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = KeyPair::new().pubkey();
let (alice, ledger_path) = genesis(10_000_000);
let (alice, leader_ledger_path) = genesis("multi_node_dynamic_network", 10_000_000);
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
let alice_arc = Arc::new(RwLock::new(alice));
let leader_data = leader.data.clone();
let server = FullNode::new_without_sigverify(
leader,
true,
LedgerFile::Path(ledger_path.clone()),
leader_keypair,
None,
);
let server =
FullNode::new_without_sigverify(leader, true, &leader_ledger_path, leader_keypair, None);
// Send leader some tokens to vote
let leader_balance = send_tx_and_retry_get_balance(
@ -478,7 +495,8 @@ fn test_multi_node_dynamic_network() {
.into_iter()
.map(|keypair| {
let leader_data = leader_data.clone();
let ledger_path = ledger_path.clone();
let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_dynamic_network");
ledger_paths.push(ledger_path.clone());
Builder::new()
.name("validator-launch-thread".to_string())
.spawn(move || {
@ -488,7 +506,7 @@ fn test_multi_node_dynamic_network() {
let val = FullNode::new_without_sigverify(
validator,
false,
LedgerFile::Path(ledger_path.clone()),
&ledger_path,
keypair,
Some(leader_data.contact_info.ncp),
);
@ -597,7 +615,9 @@ fn test_multi_node_dynamic_network() {
}
server.join().unwrap();
std::fs::remove_file(ledger_path).unwrap();
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
}
fn mk_client(leader: &NodeInfo) -> ThinClient {