diff --git a/README.md b/README.md index fe034f086b..a34e6f85f1 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,20 @@ These files can be generated by running the following script. $ ./multinode-demo/setup.sh ``` +Drone +--- + +In order for the leader, client and validators to work, we'll need to +spin up a drone to give out some test tokens. The drone delivers Milton +Friedman-style "air drops" (free tokens to requesting clients) to be used in +test transactions. + +Start the drone on the leader node with: + +```bash +$ ./multinode-demo/drone.sh +``` + Singlenode Testnet --- @@ -85,22 +99,8 @@ $ ./multinode-demo/leader.sh ``` Wait a few seconds for the server to initialize. It will print "Ready." when it's ready to -receive transactions. - -Drone ---- - -In order for the below test client and validators to work, we'll also need to -spin up a drone to give out some test tokens. The drone delivers Milton -Friedman-style "air drops" (free tokens to requesting clients) to be used in -test transactions. - -Start the drone on the leader node with: - -```bash -$ ./multinode-demo/drone.sh -``` - +receive transactions. The leader will request some tokens from the drone if it doesn't have any. +The drone does not need to be running for subsequent leader starts. Multinode Testnet --- diff --git a/multinode-demo/remote_leader.sh b/multinode-demo/remote_leader.sh index 59c58c46d2..977dc5b9e3 100755 --- a/multinode-demo/remote_leader.sh +++ b/multinode-demo/remote_leader.sh @@ -10,5 +10,5 @@ PATH="$HOME"/.cargo/bin:"$PATH" # Run setup USE_INSTALL=1 ./multinode-demo/setup.sh -p -USE_INSTALL=1 SOLANA_CUDA=1 ./multinode-demo/leader.sh >leader.log 2>&1 & USE_INSTALL=1 ./multinode-demo/drone.sh >drone.log 2>&1 & +USE_INSTALL=1 SOLANA_CUDA=1 ./multinode-demo/leader.sh >leader.log 2>&1 & diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index cb6810f0e6..7f0c51af65 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -76,14 +76,6 @@ $rsync -vPrz --max-size=100M "$rsync_leader_url"/config/ "$SOLANA_LEADER_CONFIG_ exit 1 } -# Ensure the validator has at least 1 token before connecting to the network -# TODO: Remove this workaround -while ! $solana_wallet \ - -l "$SOLANA_LEADER_CONFIG_DIR"/leader.json \ - -k "$SOLANA_CONFIG_PRIVATE_DIR"/validator-id.json airdrop --tokens 1; do - sleep 1 -done - trap 'kill "$pid" && wait "$pid"' INT TERM $program \ --identity "$SOLANA_CONFIG_DIR"/validator.json \ diff --git a/snap/hooks/configure b/snap/hooks/configure index 46f39885b7..7dfa35c91e 100755 --- a/snap/hooks/configure +++ b/snap/hooks/configure @@ -16,8 +16,8 @@ num_tokens="$(snapctl get num-tokens)" case $mode in leader+drone) $SNAP/bin/setup.sh ${num_tokens:+-n $num_tokens} ${ip_address_arg} -t leader - snapctl start --enable solana.daemon-leader snapctl start --enable solana.daemon-drone + snapctl start --enable solana.daemon-leader ;; leader) $SNAP/bin/setup.sh ${num_tokens:+-n $num_tokens} ${ip_address_arg} -t leader diff --git a/src/bank.rs b/src/bank.rs index 3b9994c4d4..ca9e3296f8 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -176,6 +176,14 @@ impl Bank { } Err(BankError::LastIdNotFound(*last_id)) } + /// Look through the last_ids and find all the valid ids + /// This is batched to avoid holding the lock for a significant amount of time + pub fn count_valid_ids(&self, ids: &[Hash]) -> usize { + let last_ids = self.last_ids_sigs.read().unwrap(); + ids.iter() + .map(|id| last_ids.get(id).is_some() as usize) + .sum() + } /// Tell the bank which Entry IDs exist on the ledger. This function /// assumes subsequent calls correspond to later entries, and will boot @@ -737,7 +745,21 @@ mod tests { Err(BankError::LastIdNotFound(mint.last_id())) ); } - + #[test] + fn test_count_valid_ids() { + let mint = Mint::new(1); + let bank = Bank::new(&mint); + let ids: Vec<_> = (0..MAX_ENTRY_IDS) + .map(|i| { + let last_id = hash(&serialize(&i).unwrap()); // Unique hash + bank.register_entry_id(&last_id); + last_id + }) + .collect(); + assert_eq!(bank.count_valid_ids(&[]), 0); + assert_eq!(bank.count_valid_ids(&[mint.last_id()]), 0); + assert_eq!(bank.count_valid_ids(&ids), ids.len()); + } #[test] fn test_debits_before_credits() { let mint = Mint::new(2); diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 7d9f1a49bb..c33106ff23 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -5,12 +5,12 @@ extern crate rayon; extern crate serde_json; extern crate solana; -use bincode::serialize; use clap::{App, Arg}; use influx_db_client as influxdb; use rayon::prelude::*; +use solana::client::mk_client; use solana::crdt::{Crdt, NodeInfo}; -use solana::drone::{DroneRequest, DRONE_PORT}; +use solana::drone::DRONE_PORT; use solana::fullnode::Config; use solana::hash::Hash; use solana::logger; @@ -23,11 +23,10 @@ use solana::streamer::default_window; use solana::thin_client::ThinClient; use solana::timing::{duration_as_ms, duration_as_s}; use solana::transaction::Transaction; +use solana::wallet::request_airdrop; use std::collections::VecDeque; -use std::error; use std::fs::File; -use std::io::Write; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; use std::process::exit; use std::sync::atomic::{AtomicBool, AtomicIsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -244,7 +243,7 @@ fn airdrop_tokens(client: &mut ThinClient, leader: &NodeInfo, id: &KeyPair, tx_c ); let previous_balance = starting_balance; - request_airdrop(&drone_addr, &id, airdrop_amount as u64).unwrap(); + request_airdrop(&drone_addr, &id.pubkey(), airdrop_amount as u64).unwrap(); // TODO: return airdrop Result from Drone instead of polling the // network @@ -590,22 +589,6 @@ fn main() { } } -fn mk_client(r: &NodeInfo) -> ThinClient { - let requests_socket = udp_random_bind(8000, 10000, 5).unwrap(); - let transactions_socket = udp_random_bind(8000, 10000, 5).unwrap(); - - requests_socket - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); - - ThinClient::new( - r.contact_info.rpu, - requests_socket, - r.contact_info.tpu, - transactions_socket, - ) -} - fn spy_node(addr: Option) -> (NodeInfo, UdpSocket) { let gossip_socket_pair; if let Some(a) = addr { @@ -693,19 +676,3 @@ fn read_leader(path: &str) -> Config { let file = File::open(path).unwrap_or_else(|_| panic!("file not found: {}", path)); serde_json::from_reader(file).unwrap_or_else(|_| panic!("failed to parse {}", path)) } - -fn request_airdrop( - drone_addr: &SocketAddr, - id: &KeyPair, - tokens: u64, -) -> Result<(), Box> { - let mut stream = TcpStream::connect(drone_addr)?; - let req = DroneRequest::GetAirdrop { - airdrop_request_amount: tokens, - client_public_key: id.pubkey(), - }; - let tx = serialize(&req).expect("serialize drone request"); - stream.write_all(&tx).unwrap(); - // TODO: add timeout to this function, in case of unresponsive drone - Ok(()) -} diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 5b1463b56f..6d2192bb8c 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -5,12 +5,15 @@ extern crate serde_json; extern crate solana; use clap::{App, Arg}; +use solana::client::mk_client; use solana::crdt::{NodeInfo, TestNode}; +use solana::drone::DRONE_PORT; use solana::fullnode::{Config, FullNode, LedgerFile}; use solana::logger; use solana::metrics::set_panic_hook; use solana::service::Service; use solana::signature::{KeyPair, KeyPairUtil}; +use solana::wallet::request_airdrop; use std::fs::File; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::process::exit; @@ -65,6 +68,10 @@ fn main() -> () { exit(1); } } + + let leader_pubkey = keypair.pubkey(); + let repl_clone = repl_data.clone(); + let ledger = if let Some(l) = matches.value_of("ledger") { LedgerFile::Path(l.to_string()) } else { @@ -76,11 +83,32 @@ fn main() -> () { let testnet_address_string = t.to_string(); let testnet_addr = testnet_address_string.parse().unwrap(); - FullNode::new(node, false, ledger, Some(keypair), Some(testnet_addr)) + FullNode::new(node, false, ledger, keypair, Some(testnet_addr)) } else { node.data.leader_id = node.data.id; - FullNode::new(node, true, ledger, None, None) + FullNode::new(node, true, ledger, keypair, None) }; + + let mut client = mk_client(&repl_clone); + let previous_balance = client.poll_get_balance(&leader_pubkey).unwrap(); + eprintln!("balance is {}", previous_balance); + + if previous_balance == 0 { + let mut drone_addr = repl_clone.contact_info.tpu; + drone_addr.set_port(DRONE_PORT); + + request_airdrop(&drone_addr, &leader_pubkey, 50).unwrap_or_else(|_| { + panic!( + "Airdrop failed, is the drone address correct {:?} drone running?", + drone_addr + ) + }); + + let balance = client.poll_get_balance(&leader_pubkey).unwrap(); + eprintln!("new balance is {}", balance); + assert!(balance > 0); + } + fullnode.join().expect("join"); } diff --git a/src/bin/wallet.rs b/src/bin/wallet.rs index 6d03454595..a804cfd192 100644 --- a/src/bin/wallet.rs +++ b/src/bin/wallet.rs @@ -6,20 +6,19 @@ extern crate dirs; extern crate serde_json; extern crate solana; -use bincode::serialize; use clap::{App, Arg, SubCommand}; +use solana::client::mk_client; use solana::crdt::NodeInfo; -use solana::drone::{DroneRequest, DRONE_PORT}; +use solana::drone::DRONE_PORT; use solana::fullnode::Config; use solana::logger; use solana::signature::{read_keypair, KeyPair, KeyPairUtil, PublicKey, Signature}; use solana::thin_client::ThinClient; +use solana::wallet::request_airdrop; use std::error; use std::fmt; use std::fs::File; -use std::io; -use std::io::prelude::*; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream, UdpSocket}; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::thread::sleep; use std::time::Duration; @@ -255,7 +254,7 @@ fn process_command( tokens, config.drone_addr ); let previous_balance = client.poll_get_balance(&config.id.pubkey())?; - request_airdrop(&config.drone_addr, &config.id, tokens as u64)?; + request_airdrop(&config.drone_addr, &config.id.pubkey(), tokens as u64)?; // TODO: return airdrop Result from Drone instead of polling the // network @@ -318,40 +317,9 @@ fn read_leader(path: &str) -> Result { }) } -fn mk_client(r: &NodeInfo) -> io::Result { - let requests_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - requests_socket - .set_read_timeout(Some(Duration::new(1, 0))) - .unwrap(); - - Ok(ThinClient::new( - r.contact_info.rpu, - requests_socket, - r.contact_info.tpu, - transactions_socket, - )) -} - -fn request_airdrop( - drone_addr: &SocketAddr, - id: &KeyPair, - tokens: u64, -) -> Result<(), Box> { - let mut stream = TcpStream::connect(drone_addr)?; - let req = DroneRequest::GetAirdrop { - airdrop_request_amount: tokens, - client_public_key: id.pubkey(), - }; - let tx = serialize(&req).expect("serialize drone request"); - stream.write_all(&tx).unwrap(); - // TODO: add timeout to this function, in case of unresponsive drone - Ok(()) -} - fn main() -> Result<(), Box> { logger::setup(); let config = parse_args()?; - let mut client = mk_client(&config.leader)?; + let mut client = mk_client(&config.leader); process_command(&config, &mut client) } diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000000..15859d8ae8 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,20 @@ +use crdt::NodeInfo; +use nat::udp_random_bind; +use std::time::Duration; +use thin_client::ThinClient; + +pub fn mk_client(r: &NodeInfo) -> ThinClient { + let requests_socket = udp_random_bind(8000, 10000, 5).unwrap(); + let transactions_socket = udp_random_bind(8000, 10000, 5).unwrap(); + + requests_socket + .set_read_timeout(Some(Duration::new(1, 0))) + .unwrap(); + + ThinClient::new( + r.contact_info.rpu, + requests_socket, + r.contact_info.tpu, + transactions_socket, + ) +} diff --git a/src/crdt.rs b/src/crdt.rs index ad332f4152..55e818f18d 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -132,7 +132,7 @@ pub struct NodeInfo { /// current leader identity pub leader_id: PublicKey, /// information about the state of the ledger - ledger_state: LedgerState, + pub ledger_state: LedgerState, } fn make_debug_id(buf: &[u8]) -> u64 { @@ -360,7 +360,7 @@ impl Crdt { warn!( "{:x}: VOTE for unknown id: {:x}", self.debug_id(), - make_debug_id(&pubkey) + make_debug_id(pubkey) ); return; } @@ -374,7 +374,15 @@ impl Crdt { ); return; } - self.update_leader_liveness(); + if *pubkey == self.my_data().leader_id { + info!( + "{:x}: LEADER_VOTED! {:x}", + self.debug_id(), + make_debug_id(&pubkey) + ); + inc_new_counter!("crdt-insert_vote-leader_voted", 1); + } + if v.version <= self.table[pubkey].version { debug!( "{:x}: VOTE for old version: {:x}", @@ -387,23 +395,16 @@ impl Crdt { let mut data = self.table[pubkey].clone(); data.version = v.version; data.ledger_state.last_id = last_id; + debug!( "{:x}: INSERTING VOTE! for {:x}", self.debug_id(), data.debug_id() ); + self.update_liveness(data.id); self.insert(&data); } } - fn update_leader_liveness(&mut self) { - //TODO: (leaders should vote) - //until then we pet their liveness every time we see some votes from anyone - let ld = self.leader_data().map(|x| x.id); - trace!("leader_id {:?}", ld); - if let Some(leader_id) = ld { - self.update_liveness(leader_id); - } - } pub fn insert_votes(&mut self, votes: &[(PublicKey, Vote, Hash)]) { inc_new_counter!("crdt-vote-count", votes.len()); if !votes.is_empty() { @@ -1165,7 +1166,7 @@ impl Crdt { return; } if e.is_err() { - info!( + debug!( "{:x}: run_listen timeout, table size: {}", debug_id, obj.read().unwrap().table.len() @@ -1465,7 +1466,7 @@ mod tests { crdt.set_leader(leader.id); assert_eq!(crdt.table[&d.id].version, 1); let v = Vote { - version: 2, //version shoud increase when we vote + version: 2, //version should increase when we vote contact_info_version: 0, }; let expected = (v, crdt.table[&leader.id].contact_info.tpu); @@ -1501,34 +1502,6 @@ mod tests { //should be accepted, since the update is for the same address field as the one we know assert_eq!(crdt.table[&d.id].version, 1); } - - #[test] - fn test_insert_vote_leader_liveness() { - logger::setup(); - // TODO: remove this test once leaders vote - let d = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); - assert_eq!(d.version, 0); - let mut crdt = Crdt::new(d.clone()).unwrap(); - let leader = NodeInfo::new_leader(&"127.0.0.2:1235".parse().unwrap()); - assert_ne!(d.id, leader.id); - crdt.insert(&leader); - crdt.set_leader(leader.id); - let live: u64 = crdt.alive[&leader.id]; - trace!("{:x} live {}", leader.debug_id(), live); - let vote_new_version_old_addrs = Vote { - version: d.version + 1, - contact_info_version: 0, - }; - sleep(Duration::from_millis(100)); - let votes = vec![(d.id.clone(), vote_new_version_old_addrs, Hash::default())]; - crdt.insert_votes(&votes); - let updated = crdt.alive[&leader.id]; - //should be accepted, since the update is for the same address field as the one we know - assert_eq!(crdt.table[&d.id].version, 1); - trace!("{:x} {} {}", leader.debug_id(), updated, live); - assert!(updated > live); - } - fn sorted(ls: &Vec) -> Vec { let mut copy: Vec<_> = ls.iter().cloned().collect(); copy.sort_by(|x, y| x.id.cmp(&y.id)); diff --git a/src/drone.rs b/src/drone.rs index b3f8d345c6..c0ec1acf19 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -266,7 +266,8 @@ mod tests { const TPS_BATCH: i64 = 5_000_000; logger::setup(); - let leader = TestNode::new_localhost(); + let leader_keypair = KeyPair::new(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000_000); let bank = Bank::new(&alice); @@ -276,6 +277,7 @@ mod tests { let leader_data = leader.data.clone(); let server = FullNode::new_leader( + leader_keypair, bank, 0, None, diff --git a/src/fullnode.rs b/src/fullnode.rs index 0ed72b9561..d27336c3fc 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -62,7 +62,7 @@ impl FullNode { mut node: TestNode, leader: bool, ledger: LedgerFile, - keypair_for_validator: Option, + keypair: KeyPair, network_entry_for_validator: Option, sigverify_disabled: bool, ) -> FullNode { @@ -104,7 +104,6 @@ impl FullNode { let testnet_addr = network_entry_for_validator.expect("validator requires entry"); let network_entry_point = NodeInfo::new_entry_point(testnet_addr); - let keypair = keypair_for_validator.expect("validator requires keypair"); let server = FullNode::new_validator( keypair, bank, @@ -124,6 +123,7 @@ impl FullNode { node.data.leader_id = node.data.id; let server = FullNode::new_leader( + keypair, bank, entry_height, Some(ledger_tail), @@ -221,6 +221,7 @@ impl FullNode { /// `---------------------` /// ``` pub fn new_leader( + keypair: KeyPair, bank: Bank, entry_height: u64, ledger_tail: Option>, @@ -245,6 +246,7 @@ impl FullNode { let crdt = Arc::new(RwLock::new(Crdt::new(node.data).expect("Crdt::new"))); let (tpu, blob_receiver) = Tpu::new( + keypair, &bank, &crdt, tick_duration, diff --git a/src/lib.rs b/src/lib.rs index 58526729ae..0ec9cf6301 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod banking_stage; pub mod blob_fetch_stage; pub mod budget; pub mod choose_gossip_peer_strategy; +pub mod client; pub mod crdt; pub mod drone; pub mod entry; @@ -51,6 +52,7 @@ pub mod transaction; pub mod tvu; pub mod vote_stage; pub mod voting; +pub mod wallet; pub mod window_stage; pub mod write_stage; extern crate bincode; diff --git a/src/thin_client.rs b/src/thin_client.rs index abc7764737..d09980dea4 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -297,7 +297,8 @@ mod tests { #[test] fn test_thin_client() { logger::setup(); - let leader = TestNode::new_localhost(); + let leader_keypair = KeyPair::new(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let alice = Mint::new(10_000); @@ -306,6 +307,7 @@ mod tests { let exit = Arc::new(AtomicBool::new(false)); let server = FullNode::new_leader( + leader_keypair, bank, 0, None, @@ -342,7 +344,8 @@ mod tests { #[ignore] fn test_bad_sig() { logger::setup(); - let leader = TestNode::new_localhost(); + let leader_keypair = KeyPair::new(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); @@ -350,6 +353,7 @@ mod tests { let leader_data = leader.data.clone(); let server = FullNode::new_leader( + leader_keypair, bank, 0, None, @@ -398,13 +402,15 @@ mod tests { #[test] fn test_client_check_signature() { logger::setup(); - let leader = TestNode::new_localhost(); + let leader_keypair = KeyPair::new(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); let bank = Bank::new(&alice); let bob_pubkey = KeyPair::new().pubkey(); let exit = Arc::new(AtomicBool::new(false)); let leader_data = leader.data.clone(); let server = FullNode::new_leader( + leader_keypair, bank, 0, None, diff --git a/src/tpu.rs b/src/tpu.rs index dd79e97b55..d02f161cb4 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -32,6 +32,7 @@ use fetch_stage::FetchStage; use packet::{BlobRecycler, PacketRecycler}; use record_stage::RecordStage; use service::Service; +use signature::KeyPair; use sigverify_stage::SigVerifyStage; use std::io::Write; use std::net::UdpSocket; @@ -52,6 +53,7 @@ pub struct Tpu { impl Tpu { pub fn new( + keypair: KeyPair, bank: &Arc, crdt: &Arc>, tick_duration: Option, @@ -80,6 +82,7 @@ impl Tpu { }; let (write_stage, blob_receiver) = WriteStage::new( + keypair, bank.clone(), crdt.clone(), blob_recycler.clone(), diff --git a/src/tvu.rs b/src/tvu.rs index 7d2e134eec..1b54e8f648 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -175,8 +175,8 @@ pub mod tests { fn test_replicate() { logger::setup(); let leader = TestNode::new_localhost(); - let target1_kp = KeyPair::new(); - let target1 = TestNode::new_localhost_with_pubkey(target1_kp.pubkey()); + let target1_keypair = KeyPair::new(); + let target1 = TestNode::new_localhost_with_pubkey(target1_keypair.pubkey()); let target2 = TestNode::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); @@ -230,7 +230,7 @@ pub mod tests { let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()).unwrap(); let tvu = Tvu::new( - target1_kp, + target1_keypair, &bank, 0, cref1, diff --git a/src/vote_stage.rs b/src/vote_stage.rs index 97b60f7026..6a4277ef77 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -4,8 +4,7 @@ use bank::Bank; use bincode::serialize; use counter::Counter; use crdt::Crdt; -use hash::Hash; -use packet::BlobRecycler; +use packet::{BlobRecycler, SharedBlob}; use result::Result; use service::Service; use signature::KeyPair; @@ -15,14 +14,99 @@ use std::sync::{Arc, RwLock}; use std::thread::{self, sleep, spawn, JoinHandle}; use std::time::Duration; use streamer::BlobSender; +use timing; use transaction::Transaction; -const VOTE_TIMEOUT_MS: u64 = 1000; +pub const VOTE_TIMEOUT_MS: u64 = 1000; pub struct VoteStage { thread_hdl: JoinHandle<()>, } +pub fn create_vote_tx_and_blob( + bank: &Arc, + keypair: &KeyPair, + crdt: &Arc>, + blob_recycler: &BlobRecycler, +) -> Result<(Transaction, SharedBlob)> { + let last_id = bank.last_id(); + let shared_blob = blob_recycler.allocate(); + let (vote, addr) = { + let mut wcrdt = crdt.write().unwrap(); + //TODO: doesn't seem like there is a synchronous call to get height and id + info!("voting on {:?}", &last_id[..8]); + wcrdt.new_vote(last_id) + }?; + let tx = Transaction::new_vote(&keypair, vote, last_id, 0); + { + let mut blob = shared_blob.write().unwrap(); + let bytes = serialize(&tx)?; + let len = bytes.len(); + blob.data[..len].copy_from_slice(&bytes); + blob.meta.set_addr(&addr); + blob.meta.size = len; + } + Ok((tx, shared_blob)) +} + +pub fn send_leader_vote( + debug_id: u64, + keypair: &KeyPair, + bank: &Arc, + crdt: &Arc>, + blob_recycler: &BlobRecycler, + vote_blob_sender: &BlobSender, + last_vote: &mut u64, +) -> Result<()> { + let now = timing::timestamp(); + if now - *last_vote > VOTE_TIMEOUT_MS { + //TODO(anatoly): vote if the last id set is mostly valid + let ids: Vec<_> = crdt.read() + .unwrap() + .table + .values() + .map(|x| x.ledger_state.last_id) + .collect(); + let total = bank.count_valid_ids(&ids); + //TODO(anatoly): this isn't stake based voting + info!( + "{:x}: valid_ids {}/{} {}", + debug_id, + total, + ids.len(), + (2 * ids.len()) / 3 + ); + if total > (2 * ids.len()) / 3 { + *last_vote = now; + + if let Ok((tx, shared_blob)) = + create_vote_tx_and_blob(bank, keypair, crdt, blob_recycler) + { + bank.process_transaction(&tx)?; + vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; + info!("{:x} leader_sent_vote", debug_id); + inc_new_counter!("write_stage-leader_sent_vote", 1); + } + } + } + Ok(()) +} + +fn send_validator_vote( + bank: &Arc, + keypair: &Arc, + crdt: &Arc>, + blob_recycler: &BlobRecycler, + vote_blob_sender: &BlobSender, +) -> Result<()> { + if let Ok((_, shared_blob)) = create_vote_tx_and_blob(bank, keypair, crdt, blob_recycler) { + inc_new_counter!("replicate-vote_sent", 1); + + vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; + } + Ok(()) +} + impl VoteStage { pub fn new( keypair: Arc, @@ -54,43 +138,14 @@ impl VoteStage { exit: &Arc, ) { while !exit.load(Ordering::Relaxed) { - let last_id = bank.last_id(); - - if let Err(err) = Self::vote(&last_id, keypair, crdt, blob_recycler, vote_blob_sender) { + if let Err(err) = + send_validator_vote(bank, keypair, crdt, blob_recycler, vote_blob_sender) + { info!("Vote failed: {:?}", err); } sleep(Duration::from_millis(VOTE_TIMEOUT_MS)); } } - - fn vote( - last_id: &Hash, - keypair: &Arc, - crdt: &Arc>, - blob_recycler: &BlobRecycler, - vote_blob_sender: &BlobSender, - ) -> Result<()> { - let shared_blob = blob_recycler.allocate(); - let (vote, addr) = { - let mut wcrdt = crdt.write().unwrap(); - //TODO: doesn't seem like there is a synchronous call to get height and id - info!("voting on {:?}", &last_id[..8]); - wcrdt.new_vote(*last_id) - }?; - { - let mut blob = shared_blob.write().unwrap(); - let tx = Transaction::new_vote(&keypair, vote, *last_id, 0); - let bytes = serialize(&tx)?; - let len = bytes.len(); - blob.data[..len].copy_from_slice(&bytes); - blob.meta.set_addr(&addr); - blob.meta.size = len; - } - inc_new_counter!("replicate-vote_sent", 1); - - vote_blob_sender.send(VecDeque::from(vec![shared_blob]))?; - Ok(()) - } } impl Service for VoteStage { @@ -108,7 +163,10 @@ impl Service for VoteStage { pub mod tests { use super::*; use bank::Bank; - use crdt::{Crdt, TestNode}; + use crdt::{Crdt, NodeInfo, TestNode}; + use entry::next_entry; + use hash::Hash; + use logger; use mint::Mint; use packet::BlobRecycler; use service::Service; @@ -116,6 +174,7 @@ pub mod tests { use std::sync::atomic::AtomicBool; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; + use transaction::{Transaction, Vote}; /// Ensure the VoteStage issues votes at the expected cadence #[test] @@ -150,4 +209,111 @@ pub mod tests { exit.store(true, Ordering::Relaxed); vote_stage.join().expect("join"); } + + #[test] + fn test_send_leader_vote() { + logger::setup(); + + // create a mint/bank + let mint = Mint::new(1000); + let bank = Arc::new(Bank::new(&mint)); + let hash0 = Hash::default(); + + // get a non-default hash last_id + let entry = next_entry(&hash0, 1, vec![]); + bank.register_entry_id(&entry.id); + + // Create a leader + let leader_data = NodeInfo::new_leader(&"127.0.0.1:1234".parse().unwrap()); + let leader_pubkey = leader_data.id.clone(); + let mut leader_crdt = Crdt::new(leader_data).unwrap(); + + // give the leader some tokens + let give_leader_tokens_tx = + Transaction::new(&mint.keypair(), leader_pubkey.clone(), 100, entry.id); + bank.process_transaction(&give_leader_tokens_tx).unwrap(); + + leader_crdt.set_leader(leader_pubkey); + + // Insert 7 agreeing validators / 3 disagreeing + // and votes for new last_id + for i in 0..10 { + let mut validator = + NodeInfo::new_leader(&format!("127.0.0.1:234{}", i).parse().unwrap()); + + let vote = Vote { + version: validator.version + 1, + contact_info_version: 1, + }; + + if i < 7 { + validator.ledger_state.last_id = entry.id; + } + + leader_crdt.insert(&validator); + trace!("validator id: {:?}", validator.id); + + leader_crdt.insert_vote(&validator.id, &vote, entry.id); + } + let leader = Arc::new(RwLock::new(leader_crdt)); + let blob_recycler = BlobRecycler::default(); + let (vote_blob_sender, vote_blob_receiver) = channel(); + let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1; + let res = send_leader_vote( + 1234, + &mint.keypair(), + &bank, + &leader, + &blob_recycler, + &vote_blob_sender, + &mut last_vote, + ); + trace!("vote result: {:?}", res); + assert!(res.is_ok()); + let vote_blob = vote_blob_receiver.recv_timeout(Duration::from_millis(500)); + trace!("vote_blob: {:?}", vote_blob); + + // leader shouldn't vote yet, not enough votes + assert!(vote_blob.is_err()); + + // add two more nodes and see that it succeeds + for i in 0..2 { + let mut validator = + NodeInfo::new_leader(&format!("127.0.0.1:234{}", i).parse().unwrap()); + + let vote = Vote { + version: validator.version + 1, + contact_info_version: 1, + }; + + validator.ledger_state.last_id = entry.id; + + leader.write().unwrap().insert(&validator); + trace!("validator id: {:?}", validator.id); + + leader + .write() + .unwrap() + .insert_vote(&validator.id, &vote, entry.id); + } + + last_vote = timing::timestamp() - VOTE_TIMEOUT_MS - 1; + let res = send_leader_vote( + 2345, + &mint.keypair(), + &bank, + &leader, + &blob_recycler, + &vote_blob_sender, + &mut last_vote, + ); + trace!("vote result: {:?}", res); + assert!(res.is_ok()); + let vote_blob = vote_blob_receiver.recv_timeout(Duration::from_millis(500)); + trace!("vote_blob: {:?}", vote_blob); + + // leader should vote now + assert!(vote_blob.is_ok()); + } + } diff --git a/src/voting.rs b/src/voting.rs index 56bb10a5cd..a16b46def7 100644 --- a/src/voting.rs +++ b/src/voting.rs @@ -1,21 +1,18 @@ use entry::Entry; use hash::Hash; use signature::PublicKey; -use transaction::{Instruction, Vote}; +use transaction::{Instruction, Transaction, Vote}; pub fn entries_to_votes(entries: &[Entry]) -> Vec<(PublicKey, Vote, Hash)> { entries .iter() - .flat_map(|entry| { - let vs: Vec<(PublicKey, Vote, Hash)> = entry - .transactions - .iter() - .filter_map(|tx| match tx.instruction { - Instruction::NewVote(ref vote) => Some((tx.from, vote.clone(), tx.last_id)), - _ => None, - }) - .collect(); - vs - }) + .flat_map(|entry| entry.transactions.iter().filter_map(transaction_to_vote)) .collect() } + +pub fn transaction_to_vote(tx: &Transaction) -> Option<(PublicKey, Vote, Hash)> { + match tx.instruction { + Instruction::NewVote(ref vote) => Some((tx.from, vote.clone(), tx.last_id)), + _ => None, + } +} diff --git a/src/wallet.rs b/src/wallet.rs new file mode 100644 index 0000000000..bb3219a975 --- /dev/null +++ b/src/wallet.rs @@ -0,0 +1,22 @@ +use bincode::serialize; +use drone::DroneRequest; +use signature::PublicKey; +use std::error; +use std::io::Write; +use std::net::{SocketAddr, TcpStream}; + +pub fn request_airdrop( + drone_addr: &SocketAddr, + id: &PublicKey, + tokens: u64, +) -> Result<(), Box> { + let mut stream = TcpStream::connect(drone_addr)?; + let req = DroneRequest::GetAirdrop { + airdrop_request_amount: tokens, + client_public_key: *id, + }; + let tx = serialize(&req).expect("serialize drone request"); + stream.write_all(&tx).unwrap(); + // TODO: add timeout to this function, in case of unresponsive drone + Ok(()) +} diff --git a/src/write_stage.rs b/src/write_stage.rs index 866fe9a547..34de2328ce 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -11,18 +11,21 @@ use ledger::Block; use packet::BlobRecycler; use result::{Error, Result}; use service::Service; +use signature::KeyPair; use std::collections::VecDeque; use std::io::Write; +use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::{Arc, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; -use streamer::{BlobReceiver, BlobSender}; +use streamer::{responder, BlobReceiver, BlobSender}; +use vote_stage::send_leader_vote; use voting::entries_to_votes; pub struct WriteStage { - thread_hdl: JoinHandle<()>, + thread_hdls: Vec>, } impl WriteStage { @@ -38,13 +41,18 @@ impl WriteStage { let entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; let votes = entries_to_votes(&entries); crdt.write().unwrap().insert_votes(&votes); + + //TODO(anatoly): real stake based voting needs to change this + //leader simply votes if the current set of validators have voted + //on a valid last id entry_writer.write_and_register_entries(&entries)?; trace!("New blobs? {}", entries.len()); let mut blobs = VecDeque::new(); entries.to_blobs(blob_recycler, &mut blobs); + if !blobs.is_empty() { - inc_new_counter!("write_stage-broadcast_vote-count", votes.len()); - inc_new_counter!("write_stage-broadcast_blobs-count", blobs.len()); + inc_new_counter!("write_stage-recv_vote", votes.len()); + inc_new_counter!("write_stage-broadcast_blobs", blobs.len()); trace!("broadcasting {}", blobs.len()); blob_sender.send(blobs)?; } @@ -53,17 +61,28 @@ impl WriteStage { /// Create a new WriteStage for writing and broadcasting entries. pub fn new( + keypair: KeyPair, bank: Arc, crdt: Arc>, blob_recycler: BlobRecycler, writer: W, entry_receiver: Receiver>, ) -> (Self, BlobReceiver) { + let (vote_blob_sender, vote_blob_receiver) = channel(); + let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); + let t_responder = responder( + "write_stage_vote_sender", + send, + blob_recycler.clone(), + vote_blob_receiver, + ); let (blob_sender, blob_receiver) = channel(); let thread_hdl = Builder::new() .name("solana-writer".to_string()) .spawn(move || { let mut entry_writer = EntryWriter::new(&bank, writer); + let mut last_vote = 0; + let debug_id = crdt.read().unwrap().debug_id(); loop { if let Err(e) = Self::write_and_send_entries( &crdt, @@ -76,25 +95,40 @@ impl WriteStage { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), _ => { - inc_new_counter!("write_stage-error", 1); + inc_new_counter!("write_stage-write_and_send_entries-error", 1); error!("{:?}", e); } } }; + if let Err(e) = send_leader_vote( + debug_id, + &keypair, + &bank, + &crdt, + &blob_recycler, + &vote_blob_sender, + &mut last_vote, + ) { + inc_new_counter!("write_stage-leader_vote-error", 1); + error!("{:?}", e); + } } }) .unwrap(); - (WriteStage { thread_hdl }, blob_receiver) + let thread_hdls = vec![t_responder, thread_hdl]; + (WriteStage { thread_hdls }, blob_receiver) } } impl Service for WriteStage { fn thread_hdls(self) -> Vec> { - vec![self.thread_hdl] + self.thread_hdls } - fn join(self) -> thread::Result<()> { - self.thread_hdl.join() + for thread_hdl in self.thread_hdls() { + thread_hdl.join()?; + } + Ok(()) } } diff --git a/tests/multinode.rs b/tests/multinode.rs index 598d39a8ea..163e325a45 100755 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -91,7 +91,9 @@ fn test_multi_node_validator_catchup_from_zero() { logger::setup(); const N: usize = 5; trace!("test_multi_node_validator_catchup_from_zero"); - let leader = TestNode::new_localhost(); + let leader_keypair = KeyPair::new(); + let leader_pubkey = leader_keypair.pubkey().clone(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); @@ -100,9 +102,15 @@ fn test_multi_node_validator_catchup_from_zero() { leader, true, LedgerFile::Path(ledger_path.clone()), - None, + leader_keypair, None, ); + + // Send leader some tokens to vote + let leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap(); + info!("leader balance {}", leader_balance); + let mut nodes = vec![server]; for _ in 0..N { let keypair = KeyPair::new(); @@ -111,7 +119,7 @@ fn test_multi_node_validator_catchup_from_zero() { validator, false, LedgerFile::Path(ledger_path.clone()), - Some(keypair), + keypair, Some(leader_data.contact_info.ncp), ); nodes.push(val); @@ -145,7 +153,7 @@ fn test_multi_node_validator_catchup_from_zero() { validator, false, LedgerFile::Path(ledger_path.clone()), - Some(keypair), + keypair, Some(leader_data.contact_info.ncp), ); nodes.push(val); @@ -191,7 +199,9 @@ fn test_multi_node_basic() { logger::setup(); const N: usize = 5; trace!("test_multi_node_basic"); - let leader = TestNode::new_localhost(); + let leader_keypair = KeyPair::new(); + let leader_pubkey = leader_keypair.pubkey().clone(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let bob_pubkey = KeyPair::new().pubkey(); let (alice, ledger_path) = genesis(10_000); @@ -199,9 +209,15 @@ fn test_multi_node_basic() { leader, true, LedgerFile::Path(ledger_path.clone()), - None, + leader_keypair, None, ); + + // Send leader some tokens to vote + let leader_balance = + send_tx_and_retry_get_balance(&leader_data, &alice, &leader_pubkey, None).unwrap(); + info!("leader balance {}", leader_balance); + let mut nodes = vec![server]; for _ in 0..N { let keypair = KeyPair::new(); @@ -210,7 +226,7 @@ fn test_multi_node_basic() { validator, false, LedgerFile::Path(ledger_path.clone()), - Some(keypair), + keypair, Some(leader_data.contact_info.ncp), ); nodes.push(val); @@ -244,7 +260,8 @@ fn test_multi_node_basic() { #[test] fn test_boot_validator_from_file() { logger::setup(); - let leader = TestNode::new_localhost(); + let leader_keypair = KeyPair::new(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = KeyPair::new().pubkey(); let (alice, ledger_path) = genesis(100_000); let leader_data = leader.data.clone(); @@ -252,7 +269,7 @@ fn test_boot_validator_from_file() { leader, true, LedgerFile::Path(ledger_path.clone()), - None, + leader_keypair, None, ); let leader_balance = @@ -269,7 +286,7 @@ fn test_boot_validator_from_file() { validator, false, LedgerFile::Path(ledger_path.clone()), - Some(keypair), + keypair, Some(leader_data.contact_info.ncp), ); let mut client = mk_client(&validator_data); @@ -282,13 +299,14 @@ fn test_boot_validator_from_file() { } fn create_leader(ledger_path: &str) -> (NodeInfo, FullNode) { - let leader = TestNode::new_localhost(); + let leader_keypair = KeyPair::new(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.data.clone(); let leader_fullnode = FullNode::new( leader, true, LedgerFile::Path(ledger_path.to_string()), - None, + leader_keypair, None, ); (leader_data, leader_fullnode) @@ -339,7 +357,7 @@ fn test_leader_restart_validator_start_from_old_ledger() { validator, false, LedgerFile::Path(stale_ledger_path.clone()), - Some(keypair), + keypair, Some(leader_data.contact_info.ncp), ); @@ -387,7 +405,9 @@ fn test_multi_node_dynamic_network() { Err(_) => std::usize::MAX, }; - let leader = TestNode::new_localhost(); + let leader_keypair = KeyPair::new(); + let leader_pubkey = leader_keypair.pubkey().clone(); + let leader = TestNode::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = KeyPair::new().pubkey(); let (alice, ledger_path) = genesis(10_000_000); let alice_arc = Arc::new(RwLock::new(alice)); @@ -396,9 +416,19 @@ fn test_multi_node_dynamic_network() { leader, true, LedgerFile::Path(ledger_path.clone()), - None, + leader_keypair, None, ); + + // Send leader some tokens to vote + let leader_balance = send_tx_and_retry_get_balance( + &leader_data, + &alice_arc.read().unwrap(), + &leader_pubkey, + None, + ).unwrap(); + info!("leader balance {}", leader_balance); + info!("{:x} LEADER", leader_data.debug_id()); let leader_balance = retry_send_tx_and_retry_get_balance( &leader_data, @@ -464,7 +494,7 @@ fn test_multi_node_dynamic_network() { validator, false, LedgerFile::Path(ledger_path.clone()), - Some(keypair), + keypair, Some(leader_data.contact_info.ncp), ); (rd, val)