From f6c8e1a4bf86c9bf0e981a1b7bbf5fbb2676c80c Mon Sep 17 00:00:00 2001 From: carllin Date: Thu, 25 Oct 2018 16:58:40 -0700 Subject: [PATCH] Vote contract (#1552) * Add Vote Contract * Move ownership of LeaderScheduler from Fullnode to the bank * Modified ReplicateStage to consume leader information from bank * Restart RPC Services in Leader To Validator Transition * Make VoteContract Context Free * Remove voting from ClusterInfo and Tpu * Remove dependency on ActiveValidators in LeaderScheduler * Switch VoteContract to have two steps 1) Register 2) Vote. Change thin client to create + register a voting account on fullnode startup * Remove check in leader_to_validator transition for unique references to bank, b/c jsonrpc service and rpcpubsub hold references through jsonhttpserver --- src/bank.rs | 135 +++++------ src/bin/fullnode.rs | 56 ++++- src/bin/ledger-tool.rs | 8 +- src/budget_instruction.rs | 16 +- src/budget_program.rs | 5 - src/budget_transaction.rs | 22 +- src/cluster_info.rs | 117 +-------- src/drone.rs | 15 +- src/fullnode.rs | 292 ++++++++++++++--------- src/leader_scheduler.rs | 489 ++++++++++++++++++++++++-------------- src/leader_vote_stage.rs | 159 ------------- src/ledger.rs | 30 +-- src/lib.rs | 3 +- src/replicate_stage.rs | 91 +++---- src/replicator.rs | 4 +- src/result.rs | 7 + src/rpc.rs | 33 ++- src/rpc_pubsub.rs | 21 +- src/thin_client.rs | 88 ++++++- src/tpu.rs | 22 +- src/tvu.rs | 26 +- src/vote_program.rs | 151 ++++++++++++ src/vote_stage.rs | 337 ++++---------------------- src/vote_transaction.rs | 85 +++++++ src/wallet.rs | 57 +++-- src/window.rs | 4 +- tests/multinode.rs | 112 +++++---- 27 files changed, 1190 insertions(+), 1195 deletions(-) delete mode 100644 src/leader_vote_stage.rs create mode 100644 src/vote_program.rs create mode 100644 src/vote_transaction.rs diff --git a/src/bank.rs b/src/bank.rs index db2383a00f..6661912c97 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -7,7 +7,6 @@ use bincode::deserialize; use bincode::serialize; use bpf_loader; use budget_program::BudgetState; -use budget_transaction::BudgetTransaction; use counter::Counter; use entry::Entry; use hash::{hash, Hash}; @@ -31,7 +30,7 @@ use std; use std::collections::{BTreeMap, HashMap, HashSet}; use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Mutex, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::Instant; use storage_program::StorageProgram; use system_program::SystemProgram; @@ -42,6 +41,7 @@ use timing::{duration_as_us, timestamp}; use token_program::TokenProgram; use tokio::prelude::Future; use transaction::Transaction; +use vote_program::VoteProgram; use window::WINDOW_SIZE; /// The number of most recent `last_id` values that the bank will track the signatures @@ -151,7 +151,7 @@ impl Default for LastIds { /// The state of all accounts and contracts after processing its entries. pub struct Bank { /// A map of account public keys to the balance in that account. - accounts: RwLock>, + pub accounts: RwLock>, /// set of accounts which are currently in the pipeline account_locks: Mutex>, @@ -171,6 +171,13 @@ pub struct Bank { // Mapping of signatures to Subscriber ids and sinks to notify on confirmation signature_subscriptions: RwLock>>>, + + /// Tracks and updates the leader schedule based on the votes and account stakes + /// processed by the bank + pub leader_scheduler: Arc>, + + // The number of ticks that have elapsed since genesis + tick_height: Mutex, } impl Default for Bank { @@ -183,6 +190,8 @@ impl Default for Bank { finality_time: AtomicUsize::new(std::usize::MAX), account_subscriptions: RwLock::new(HashMap::new()), signature_subscriptions: RwLock::new(HashMap::new()), + leader_scheduler: Arc::new(RwLock::new(LeaderScheduler::default())), + tick_height: Mutex::new(0), } } } @@ -613,6 +622,8 @@ impl Bank { { return Err(BankError::ProgramRuntimeError(instruction_index as u8)); } + } else if VoteProgram::check_id(&tx_program_id) { + VoteProgram::process_transaction(&tx, instruction_index, program_accounts).is_err(); } else { let mut depth = 0; let mut keys = Vec::new(); @@ -890,41 +901,28 @@ impl Bank { results } - pub fn process_entry( - &self, - entry: &Entry, - tick_height: &mut u64, - leader_scheduler: &mut LeaderScheduler, - ) -> Result<()> { + pub fn process_entry(&self, entry: &Entry) -> Result<()> { if !entry.is_tick() { for result in self.process_transactions(&entry.transactions) { result?; } } else { - *tick_height += 1; + let tick_height = { + let mut tick_height_lock = self.tick_height.lock().unwrap(); + *tick_height_lock += 1; + *tick_height_lock + }; + + self.leader_scheduler + .write() + .unwrap() + .update_height(tick_height, self); self.register_entry_id(&entry.id); } - self.process_entry_votes(entry, *tick_height, leader_scheduler); Ok(()) } - fn process_entry_votes( - &self, - entry: &Entry, - tick_height: u64, - leader_scheduler: &mut LeaderScheduler, - ) { - for tx in &entry.transactions { - if tx.vote().is_some() { - // Update the active set in the leader scheduler - leader_scheduler.push_vote(*tx.from(), tick_height); - } - } - - leader_scheduler.update_height(tick_height, self); - } - /// Process an ordered list of entries, populating a circular buffer "tail" /// as we go. fn process_entries_tail( @@ -932,8 +930,6 @@ impl Bank { entries: &[Entry], tail: &mut Vec, tail_idx: &mut usize, - tick_height: &mut u64, - leader_scheduler: &mut LeaderScheduler, ) -> Result { let mut entry_count = 0; @@ -951,7 +947,7 @@ impl Bank { // the leader scheduler. Next we will extract the vote tracking structure // out of the leader scheduler, and into the bank, and remove the leader // scheduler from these banking functions. - self.process_entry(entry, tick_height, leader_scheduler)?; + self.process_entry(entry)?; } Ok(entry_count) @@ -996,6 +992,7 @@ impl Bank { // if its a tick, execute the group and register the tick self.par_execute_entries(&mt_group)?; self.register_entry_id(&entry.id); + *self.tick_height.lock().unwrap() += 1; mt_group = vec![]; continue; } @@ -1025,17 +1022,18 @@ impl Bank { entries: I, tail: &mut Vec, tail_idx: &mut usize, - leader_scheduler: &mut LeaderScheduler, - ) -> Result<(u64, u64)> + ) -> Result where I: IntoIterator, { // Ledger verification needs to be parallelized, but we can't pull the whole // thing into memory. We therefore chunk it. let mut entry_height = *tail_idx as u64; - let mut tick_height = 0; + for entry in &tail[0..*tail_idx] { - tick_height += entry.is_tick() as u64 + if entry.is_tick() { + *self.tick_height.lock().unwrap() += 1; + } } let mut id = start_hash; @@ -1046,25 +1044,15 @@ impl Bank { return Err(BankError::LedgerVerificationFailed); } id = block.last().unwrap().id; - let entry_count = self.process_entries_tail( - &block, - tail, - tail_idx, - &mut tick_height, - leader_scheduler, - )?; + let entry_count = self.process_entries_tail(&block, tail, tail_idx)?; entry_height += entry_count; } - Ok((tick_height, entry_height)) + Ok(entry_height) } /// Process a full ledger. - pub fn process_ledger( - &self, - entries: I, - leader_scheduler: &mut LeaderScheduler, - ) -> Result<(u64, u64, Vec)> + pub fn process_ledger(&self, entries: I) -> Result<(u64, u64, Vec)> where I: IntoIterator, { @@ -1106,20 +1094,14 @@ impl Bank { tail.push(entry0); tail.push(entry1); let mut tail_idx = 2; - let (tick_height, entry_height) = self.process_blocks( - entry1_id, - entries, - &mut tail, - &mut tail_idx, - leader_scheduler, - )?; + let entry_height = self.process_blocks(entry1_id, entries, &mut tail, &mut tail_idx)?; // check if we need to rotate tail if tail.len() == WINDOW_SIZE as usize { tail.rotate_left(tail_idx) } - Ok((tick_height, entry_height, tail)) + Ok((*self.tick_height.lock().unwrap(), entry_height, tail)) } /// Create, sign, and process a Transaction from `keypair` to `to` of @@ -1236,6 +1218,16 @@ impl Bank { subscriptions.remove(pubkey).is_some() } + pub fn get_current_leader(&self) -> Option { + let ls_lock = self.leader_scheduler.read().unwrap(); + let tick_height = self.tick_height.lock().unwrap(); + ls_lock.get_scheduled_leader(*tick_height) + } + + pub fn get_tick_height(&self) -> u64 { + *self.tick_height.lock().unwrap() + } + fn check_account_subscriptions(&self, pubkey: &Pubkey, account: &Account) { let subscriptions = self.account_subscriptions.read().unwrap(); if let Some(hashmap) = subscriptions.get(pubkey) { @@ -1288,13 +1280,6 @@ impl Bank { } subscriptions.remove(&signature); } - - #[cfg(test)] - // Used to access accounts for things like controlling stake to control - // the eligible set of nodes for leader selection - pub fn accounts(&self) -> &RwLock> { - &self.accounts - } } #[cfg(test)] @@ -1307,7 +1292,6 @@ mod tests { use entry_writer::{self, EntryWriter}; use hash::hash; use jsonrpc_macros::pubsub::{Subscriber, SubscriptionId}; - use leader_scheduler::LeaderScheduler; use ledger; use logger; use signature::Keypair; @@ -1640,8 +1624,7 @@ mod tests { let mint = Mint::new(1); let genesis = mint.create_entries(); let bank = Bank::default(); - bank.process_ledger(genesis, &mut LeaderScheduler::default()) - .unwrap(); + bank.process_ledger(genesis).unwrap(); assert_eq!(bank.get_balance(&mint.pubkey()), 1); } @@ -1718,9 +1701,7 @@ mod tests { let (ledger, pubkey) = create_sample_ledger(1); let (ledger, dup) = ledger.tee(); let bank = Bank::default(); - let (tick_height, ledger_height, tail) = bank - .process_ledger(ledger, &mut LeaderScheduler::default()) - .unwrap(); + let (tick_height, ledger_height, tail) = bank.process_ledger(ledger).unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(ledger_height, 4); assert_eq!(tick_height, 2); @@ -1746,9 +1727,7 @@ mod tests { for entry_count in window_size - 3..window_size + 2 { let (ledger, pubkey) = create_sample_ledger(entry_count); let bank = Bank::default(); - let (tick_height, ledger_height, tail) = bank - .process_ledger(ledger, &mut LeaderScheduler::default()) - .unwrap(); + let (tick_height, ledger_height, tail) = bank.process_ledger(ledger).unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(ledger_height, entry_count as u64 + 3); assert_eq!(tick_height, 2); @@ -1774,8 +1753,7 @@ mod tests { let ledger = to_file_iter(ledger); let bank = Bank::default(); - bank.process_ledger(ledger, &mut LeaderScheduler::default()) - .unwrap(); + bank.process_ledger(ledger).unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); } @@ -1786,8 +1764,7 @@ mod tests { let block = to_file_iter(create_sample_block_with_ticks(&mint, 1, 1)); let bank = Bank::default(); - bank.process_ledger(genesis.chain(block), &mut LeaderScheduler::default()) - .unwrap(); + bank.process_ledger(genesis.chain(block)).unwrap(); assert_eq!(bank.get_balance(&mint.pubkey()), 1); } @@ -1801,13 +1778,9 @@ mod tests { let ledger1 = create_sample_ledger_with_mint_and_keypairs(&mint, &keypairs); let bank0 = Bank::default(); - bank0 - .process_ledger(ledger0, &mut LeaderScheduler::default()) - .unwrap(); + bank0.process_ledger(ledger0).unwrap(); let bank1 = Bank::default(); - bank1 - .process_ledger(ledger1, &mut LeaderScheduler::default()) - .unwrap(); + bank1.process_ledger(ledger1).unwrap(); let initial_state = bank0.hash_internal_state(); diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index b9427ef7c2..9e69a186d3 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -17,14 +17,16 @@ use solana::logger; use solana::metrics::set_panic_hook; use solana::signature::{Keypair, KeypairUtil}; use solana::thin_client::poll_gossip_for_leader; +use solana::vote_program::VoteProgram; use solana::wallet::request_airdrop; use std::fs::File; use std::net::{Ipv4Addr, SocketAddr}; use std::process::exit; +use std::sync::Arc; use std::thread::sleep; use std::time::Duration; -fn main() -> () { +fn main() { logger::setup(); set_panic_hook("fullnode"); let matches = App::new("fullnode") @@ -82,7 +84,6 @@ fn main() -> () { // save off some stuff for airdrop let node_info = node.info.clone(); - let pubkey = keypair.pubkey(); let leader = match network { Some(network) => { @@ -91,10 +92,16 @@ fn main() -> () { None => node_info, }; + let vote_account_keypair = Arc::new(Keypair::new()); + let vote_account_id = vote_account_keypair.pubkey(); + let keypair = Arc::new(keypair); + let pubkey = keypair.pubkey(); + let mut fullnode = Fullnode::new( node, ledger_path, - keypair, + keypair.clone(), + vote_account_keypair, network, false, LeaderScheduler::from_bootstrap_leader(leader.id), @@ -129,6 +136,49 @@ fn main() -> () { } } + // Create the vote account + loop { + let last_id = client.get_last_id(); + if client + .create_vote_account(&keypair, vote_account_id, &last_id, 1) + .is_err() + { + sleep(Duration::from_secs(2)); + continue; + } + + let balance = client.poll_get_balance(&vote_account_id).unwrap_or(0); + + if balance > 0 { + break; + } + + sleep(Duration::from_secs(2)); + } + + // Register the vote account to this node + loop { + let last_id = client.get_last_id(); + if client + .register_vote_account(&keypair, vote_account_id, &last_id) + .is_err() + { + sleep(Duration::from_secs(2)); + continue; + } + + let account_user_data = client.get_account_userdata(&vote_account_id); + if let Ok(Some(account_user_data)) = account_user_data { + if let Ok(vote_state) = VoteProgram::deserialize(&account_user_data) { + if vote_state.node_id == pubkey { + break; + } + } + } + + sleep(Duration::from_secs(2)); + } + loop { let status = fullnode.handle_role_transition(); match status { diff --git a/src/bin/ledger-tool.rs b/src/bin/ledger-tool.rs index c8fbcacebb..dfe9c20222 100644 --- a/src/bin/ledger-tool.rs +++ b/src/bin/ledger-tool.rs @@ -5,7 +5,6 @@ extern crate solana; use clap::{App, Arg, SubCommand}; use solana::bank::Bank; -use solana::leader_scheduler::LeaderScheduler; use solana::ledger::{read_ledger, verify_ledger}; use solana::logger; use std::io::{stdout, Write}; @@ -116,7 +115,7 @@ fn main() { }; let genesis = genesis.take(2).map(|e| e.unwrap()); - if let Err(e) = bank.process_ledger(genesis, &mut LeaderScheduler::default()) { + if let Err(e) = bank.process_ledger(genesis) { eprintln!("verify failed at genesis err: {:?}", e); if !matches.is_present("continue") { exit(1); @@ -142,10 +141,7 @@ fn main() { } last_id = entry.id; - let mut tick_height = 0; - let mut leader_scheduler = LeaderScheduler::default(); - if let Err(e) = bank.process_entry(&entry, &mut tick_height, &mut leader_scheduler) - { + if let Err(e) = bank.process_entry(&entry) { eprintln!("verify failed at entry[{}], err: {:?}", i + 2, e); if !matches.is_present("continue") { exit(1); diff --git a/src/budget_instruction.rs b/src/budget_instruction.rs index bf8b575359..c7e83a2cff 100644 --- a/src/budget_instruction.rs +++ b/src/budget_instruction.rs @@ -1,15 +1,12 @@ use budget::Budget; use chrono::prelude::{DateTime, Utc}; +/// A smart contract. #[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] -pub struct Vote { - /// We send some gossip specific membership information through the vote to shortcut - /// liveness voting - /// The version of the ClusterInfo struct that the last_id of this network voted with - pub version: u64, - /// The version of the ClusterInfo struct that has the same network configuration as this one - pub contact_info_version: u64, - // TODO: add signature of the state here as well +pub struct Contract { + /// The number of tokens allocated to the `Budget` and any transaction fees. + pub tokens: i64, + pub budget: Budget, } /// An instruction to progress the smart contract. @@ -24,7 +21,4 @@ pub enum Instruction { /// Tell the budget that the `NewBudget` with `Signature` has been /// signed by the containing transaction's `Pubkey`. ApplySignature, - - /// Vote for a PoH that is equal to the lastid of this transaction - NewVote(Vote), } diff --git a/src/budget_program.rs b/src/budget_program.rs index d6a2827bc2..51c89ff4dd 100644 --- a/src/budget_program.rs +++ b/src/budget_program.rs @@ -172,11 +172,6 @@ impl BudgetState { Err(BudgetError::UninitializedContract) } } - Instruction::NewVote(_vote) => { - // TODO: move vote instruction into a different contract - trace!("GOT VOTE! last_id={}", tx.last_id); - Ok(()) - } } } fn serialize(&self, output: &mut [u8]) -> Result<(), BudgetError> { diff --git a/src/budget_transaction.rs b/src/budget_transaction.rs index 87e2a293d6..06c8d017ef 100644 --- a/src/budget_transaction.rs +++ b/src/budget_transaction.rs @@ -2,7 +2,7 @@ use bincode::{deserialize, serialize}; use budget::{Budget, Condition}; -use budget_instruction::{Instruction, Vote}; +use budget_instruction::Instruction; use budget_program::BudgetState; use chrono::prelude::*; use hash::Hash; @@ -38,8 +38,6 @@ pub trait BudgetTransaction { last_id: Hash, ) -> Self; - fn budget_new_vote(from_keypair: &Keypair, vote: Vote, last_id: Hash, fee: i64) -> Self; - fn budget_new_on_date( from_keypair: &Keypair, to: Pubkey, @@ -61,8 +59,6 @@ pub trait BudgetTransaction { last_id: Hash, ) -> Self; - fn vote(&self) -> Option<(Pubkey, Vote, Hash)>; - fn instruction(&self, program_index: usize) -> Option; fn system_instruction(&self, program_index: usize) -> Option; @@ -153,12 +149,6 @@ impl BudgetTransaction for Transaction { ) } - fn budget_new_vote(from_keypair: &Keypair, vote: Vote, last_id: Hash, fee: i64) -> Self { - let instruction = Instruction::NewVote(vote); - let userdata = serialize(&instruction).expect("serialize instruction"); - Self::new(from_keypair, &[], BudgetState::id(), userdata, last_id, fee) - } - /// Create and sign a postdated Transaction. Used for unit-testing. fn budget_new_on_date( from_keypair: &Keypair, @@ -219,16 +209,6 @@ impl BudgetTransaction for Transaction { ) } - fn vote(&self) -> Option<(Pubkey, Vote, Hash)> { - if self.instructions.len() > 1 { - None - } else if let Some(Instruction::NewVote(vote)) = self.instruction(0) { - Some((self.account_keys[0], vote, self.last_id)) - } else { - None - } - } - fn instruction(&self, instruction_index: usize) -> Option { deserialize(&self.userdata(instruction_index)).ok() } diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 633066d6d8..b3734beee8 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -13,7 +13,6 @@ //! //! Bank needs to provide an interface for us to query the stake weight use bincode::{deserialize, serialize, serialized_size}; -use budget_instruction::Vote; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use counter::Counter; use hash::Hash; @@ -338,47 +337,6 @@ impl ClusterInfo { self.external_liveness.get(key) } - pub fn insert_vote(&mut self, pubkey: &Pubkey, v: &Vote, last_id: Hash) { - if self.table.get(pubkey).is_none() { - warn!("{}: VOTE for unknown id: {}", self.id, pubkey); - return; - } - if v.contact_info_version > self.table[pubkey].contact_info.version { - warn!( - "{}: VOTE for new address version from: {} ours: {} vote: {:?}", - self.id, pubkey, self.table[pubkey].contact_info.version, v, - ); - return; - } - if *pubkey == self.my_data().leader_id { - info!("{}: LEADER_VOTED! {}", self.id, pubkey); - inc_new_counter_info!("cluster_info-insert_vote-leader_voted", 1); - } - - if v.version <= self.table[pubkey].version { - debug!("{}: VOTE for old version: {}", self.id, pubkey); - self.update_liveness(*pubkey); - return; - } else { - let mut data = self.table[pubkey].clone(); - data.version = v.version; - data.ledger_state.last_id = last_id; - - debug!("{}: INSERTING VOTE! for {}", self.id, data.id); - self.update_liveness(data.id); - self.insert(&data); - } - } - pub fn insert_votes(&mut self, votes: &[(Pubkey, Vote, Hash)]) { - inc_new_counter_info!("cluster_info-vote-count", votes.len()); - if !votes.is_empty() { - info!("{}: INSERTING VOTES {}", self.id, votes.len()); - } - for v in votes { - self.insert_vote(&v.0, &v.1, v.2); - } - } - pub fn insert(&mut self, v: &NodeInfo) -> usize { // TODO check that last_verified types are always increasing // update the peer table @@ -539,7 +497,7 @@ impl ClusterInfo { ); // Make sure the next leader in line knows about the entries before his slot in the leader - // rotation so he can initiate repairs if necessary + // rotation so they can initiate repairs if necessary { let ls_lock = leader_scheduler.read().unwrap(); let next_leader_height = ls_lock.max_height_for_leader(tick_height); @@ -824,22 +782,6 @@ impl ClusterInfo { Ok((v.contact_info.ncp, req)) } - pub fn new_vote(&mut self, last_id: Hash) -> Result<(Vote, SocketAddr)> { - let mut me = self.my_data().clone(); - let leader = self - .leader_data() - .ok_or(ClusterInfoError::NoLeader)? - .clone(); - me.version += 1; - me.ledger_state.last_id = last_id; - let vote = Vote { - version: me.version, - contact_info_version: me.contact_info.version, - }; - self.insert(&me); - Ok((vote, leader.contact_info.tpu)) - } - /// At random pick a node and try to get updated changes from them fn run_gossip(obj: &Arc>, blob_sender: &BlobSender) -> Result<()> { //TODO we need to keep track of stakes and weight the selection by stake size @@ -1388,7 +1330,6 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) { #[cfg(test)] mod tests { use bincode::serialize; - use budget_instruction::Vote; use cluster_info::{ ClusterInfo, ClusterInfoError, Node, NodeInfo, Protocol, FULLNODE_PORT_RANGE, GOSSIP_PURGE_MILLIS, GOSSIP_SLEEP_MILLIS, MIN_TABLE_SIZE, @@ -1436,62 +1377,6 @@ mod tests { assert_eq!(cluster_info.table[&d.id].version, 3); assert!(liveness < cluster_info.alive[&d.id]); } - #[test] - fn test_new_vote() { - let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - assert_eq!(d.version, 0); - let mut cluster_info = ClusterInfo::new(d.clone()).unwrap(); - assert_eq!(cluster_info.table[&d.id].version, 0); - let leader = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.2:1235")); - assert_ne!(d.id, leader.id); - assert_matches!( - cluster_info.new_vote(Hash::default()).err(), - Some(Error::ClusterInfoError(ClusterInfoError::NoLeader)) - ); - cluster_info.insert(&leader); - assert_matches!( - cluster_info.new_vote(Hash::default()).err(), - Some(Error::ClusterInfoError(ClusterInfoError::NoLeader)) - ); - cluster_info.set_leader(leader.id); - assert_eq!(cluster_info.table[&d.id].version, 1); - let v = Vote { - version: 2, //version should increase when we vote - contact_info_version: 0, - }; - let expected = (v, cluster_info.table[&leader.id].contact_info.tpu); - assert_eq!(cluster_info.new_vote(Hash::default()).unwrap(), expected); - } - - #[test] - fn test_insert_vote() { - let d = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - assert_eq!(d.version, 0); - let mut cluster_info = ClusterInfo::new(d.clone()).unwrap(); - assert_eq!(cluster_info.table[&d.id].version, 0); - let vote_same_version = Vote { - version: d.version, - contact_info_version: 0, - }; - cluster_info.insert_vote(&d.id, &vote_same_version, Hash::default()); - assert_eq!(cluster_info.table[&d.id].version, 0); - - let vote_new_version_new_addrs = Vote { - version: d.version + 1, - contact_info_version: 1, - }; - cluster_info.insert_vote(&d.id, &vote_new_version_new_addrs, Hash::default()); - //should be dropped since the address is newer then we know - assert_eq!(cluster_info.table[&d.id].version, 0); - - let vote_new_version_old_addrs = Vote { - version: d.version + 1, - contact_info_version: 0, - }; - cluster_info.insert_vote(&d.id, &vote_new_version_old_addrs, Hash::default()); - //should be accepted, since the update is for the same address field as the one we know - assert_eq!(cluster_info.table[&d.id].version, 1); - } fn sorted(ls: &Vec<(NodeInfo, u64)>) -> Vec<(NodeInfo, u64)> { let mut copy: Vec<_> = ls.iter().cloned().collect(); copy.sort_by(|x, y| x.0.id.cmp(&y.0.id)); diff --git a/src/drone.rs b/src/drone.rs index 311298508d..3cd46b7901 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -235,6 +235,7 @@ mod tests { use signature::{Keypair, KeypairUtil}; use std::fs::remove_dir_all; use std::net::{SocketAddr, UdpSocket}; + use std::sync::{Arc, RwLock}; use std::time::Duration; use thin_client::ThinClient; @@ -313,18 +314,24 @@ mod tests { const TPS_BATCH: i64 = 5_000_000; logger::setup(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader.info.id, + ))); + bank.leader_scheduler = leader_scheduler; let bob_pubkey = Keypair::new().pubkey(); let carlos_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = get_tmp_ledger_path("send_airdrop"); + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, 0, @@ -333,7 +340,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); @@ -368,13 +374,14 @@ mod tests { // restart the leader, drone should find the new one at the same gossip port server.close().unwrap(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let server = Fullnode::new( leader, &ledger_path, leader_keypair, + Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_data.id), diff --git a/src/fullnode.rs b/src/fullnode.rs index 3b15b3a4ba..142ff80de6 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -85,12 +85,12 @@ pub enum FullnodeReturnType { pub struct Fullnode { pub node_role: Option, - pub leader_scheduler: Arc>, keypair: Arc, + vote_account_keypair: Arc, exit: Arc, rpu: Option, - rpc_service: JsonRpcService, - rpc_pubsub_service: PubSubService, + rpc_service: Option, + rpc_pubsub_service: Option, ncp: Ncp, bank: Arc, cluster_info: Arc>, @@ -104,6 +104,7 @@ pub struct Fullnode { broadcast_socket: UdpSocket, requests_socket: UdpSocket, respond_socket: UdpSocket, + rpc_port: Option, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -132,14 +133,17 @@ impl Fullnode { pub fn new( node: Node, ledger_path: &str, - keypair: Keypair, + keypair: Arc, + vote_account_keypair: Arc, leader_addr: Option, sigverify_disabled: bool, - mut leader_scheduler: LeaderScheduler, + leader_scheduler: LeaderScheduler, ) -> Self { + let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); + info!("creating bank..."); let (bank, tick_height, entry_height, ledger_tail) = - Self::new_bank_from_ledger(ledger_path, &mut leader_scheduler); + Self::new_bank_from_ledger(ledger_path, leader_scheduler); info!("creating networking stack..."); let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); @@ -154,6 +158,7 @@ impl Fullnode { let leader_info = leader_addr.map(|i| NodeInfo::new_entry_point(&i)); let server = Self::new_with_bank( keypair, + vote_account_keypair, bank, tick_height, entry_height, @@ -162,7 +167,6 @@ impl Fullnode { leader_info.as_ref(), ledger_path, sigverify_disabled, - leader_scheduler, None, ); @@ -236,7 +240,8 @@ impl Fullnode { /// ``` #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new_with_bank( - keypair: Keypair, + keypair: Arc, + vote_account_keypair: Arc, bank: Bank, tick_height: u64, entry_height: u64, @@ -245,7 +250,6 @@ impl Fullnode { bootstrap_leader_info_option: Option<&NodeInfo>, ledger_path: &str, sigverify_disabled: bool, - leader_scheduler: LeaderScheduler, rpc_port: Option, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); @@ -274,21 +278,8 @@ impl Fullnode { ClusterInfo::new(node.info).expect("ClusterInfo::new"), )); - // Use custom RPC port, if provided (`Some(port)`) - // RPC port may be any open port on the node - // If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module - // If rpc_port == `Some(0)`, node will dynamically choose any open port for both - // Rpc and RpcPubsub serivces. Useful for tests. - let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT)); - // TODO: The RPC service assumes that there is a drone running on the leader - // Drone location/id will need to be handled a different way as soon as leader rotation begins - let rpc_service = JsonRpcService::new(&bank, &cluster_info, rpc_addr, exit.clone()); - - let rpc_pubsub_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::from(0)), - rpc_port.map_or(RPC_PORT + 1, |port| if port == 0 { port } else { port + 1 }), - ); - let rpc_pubsub_service = PubSubService::new(&bank, rpc_pubsub_addr, exit.clone()); + let (rpc_service, rpc_pubsub_service) = + Self::startup_rpc_services(rpc_port, &bank, &cluster_info); let ncp = Ncp::new( &cluster_info, @@ -298,9 +289,6 @@ impl Fullnode { exit.clone(), ); - let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); - let keypair = Arc::new(keypair); - // Insert the bootstrap leader info, should only be None if this node // is the bootstrap leader if let Some(bootstrap_leader_info) = bootstrap_leader_info_option { @@ -308,10 +296,8 @@ impl Fullnode { } // Get the scheduled leader - let scheduled_leader = leader_scheduler - .read() - .unwrap() - .get_scheduled_leader(tick_height) + let scheduled_leader = bank + .get_current_leader() .expect("Leader not known after processing bank"); cluster_info.write().unwrap().set_leader(scheduled_leader); @@ -319,8 +305,8 @@ impl Fullnode { // Start in validator mode. let tvu = Tvu::new( keypair.clone(), + vote_account_keypair.clone(), &bank, - tick_height, entry_height, cluster_info.clone(), shared_window.clone(), @@ -338,20 +324,17 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(ledger_path), - leader_scheduler.clone(), ); let validator_state = ValidatorServices::new(tvu); Some(NodeRole::Validator(validator_state)) } else { let max_tick_height = { - let ls_lock = leader_scheduler.read().unwrap(); + let ls_lock = bank.leader_scheduler.read().unwrap(); ls_lock.max_height_for_leader(tick_height) }; // Start in leader mode. let (tpu, entry_receiver, tpu_exit) = Tpu::new( - keypair.clone(), &bank, - &cluster_info, Default::default(), node.sockets .transaction @@ -374,7 +357,7 @@ impl Fullnode { shared_window.clone(), entry_height, entry_receiver, - leader_scheduler.clone(), + bank.leader_scheduler.clone(), tick_height, tpu_exit, ); @@ -384,14 +367,15 @@ impl Fullnode { Fullnode { keypair, + vote_account_keypair, cluster_info, shared_window, bank, sigverify_disabled, rpu, ncp, - rpc_service, - rpc_pubsub_service, + rpc_service: Some(rpc_service), + rpc_pubsub_service: Some(rpc_pubsub_service), node_role, ledger_path: ledger_path.to_owned(), exit, @@ -402,27 +386,50 @@ impl Fullnode { broadcast_socket: node.sockets.broadcast, requests_socket: node.sockets.requests, respond_socket: node.sockets.respond, - leader_scheduler, + rpc_port, } } fn leader_to_validator(&mut self) -> Result<()> { - let (scheduled_leader, tick_height, entry_height, last_entry_id) = { - let mut ls_lock = self.leader_scheduler.write().unwrap(); - // Clear the leader scheduler - ls_lock.reset(); + // Close down any services that could have a reference to the bank + if self.rpu.is_some() { + let old_rpu = self.rpu.take().unwrap(); + old_rpu.close()?; + } + if self.rpc_service.is_some() { + let old_rpc_service = self.rpc_service.take().unwrap(); + old_rpc_service.close()?; + } + + if self.rpc_pubsub_service.is_some() { + let old_rpc_pubsub_service = self.rpc_pubsub_service.take().unwrap(); + old_rpc_pubsub_service.close()?; + } + + // Correctness check: Ensure that references to the bank and leader scheduler are no + // longer held by any running thread + let mut new_leader_scheduler = self.bank.leader_scheduler.read().unwrap().clone(); + + // Clear the leader scheduler + new_leader_scheduler.reset(); + + let (new_bank, scheduled_leader, tick_height, entry_height, last_entry_id) = { // TODO: We can avoid building the bank again once RecordStage is // integrated with BankingStage - let (bank, tick_height, entry_height, ledger_tail) = - Self::new_bank_from_ledger(&self.ledger_path, &mut *ls_lock); + let (new_bank, tick_height, entry_height, ledger_tail) = Self::new_bank_from_ledger( + &self.ledger_path, + Arc::new(RwLock::new(new_leader_scheduler)), + ); - self.bank = Arc::new(bank); + let new_bank = Arc::new(new_bank); + let scheduled_leader = new_bank + .get_current_leader() + .expect("Scheduled leader should exist after rebuilding bank"); ( - ls_lock - .get_scheduled_leader(entry_height) - .expect("Scheduled leader should exist after rebuilding bank"), + new_bank, + scheduled_leader, tick_height, entry_height, ledger_tail @@ -437,21 +444,23 @@ impl Fullnode { .unwrap() .set_leader(scheduled_leader); - // Make a new RPU to serve requests out of the new bank we've created - // instead of the old one - if self.rpu.is_some() { - let old_rpu = self.rpu.take().unwrap(); - old_rpu.close()?; - self.rpu = Some(Rpu::new( - &self.bank, - self.requests_socket - .try_clone() - .expect("Failed to clone requests socket"), - self.respond_socket - .try_clone() - .expect("Failed to clone respond socket"), - )); - } + // Spin up new versions of all the services that relied on the bank, passing in the + // new bank + self.rpu = Some(Rpu::new( + &new_bank, + self.requests_socket + .try_clone() + .expect("Failed to clone requests socket"), + self.respond_socket + .try_clone() + .expect("Failed to clone respond socket"), + )); + + let (rpc_service, rpc_pubsub_service) = + Self::startup_rpc_services(self.rpc_port, &new_bank, &self.cluster_info); + self.rpc_service = Some(rpc_service); + self.rpc_pubsub_service = Some(rpc_pubsub_service); + self.bank = new_bank; // In the rare case that the leader exited on a multiple of seed_rotation_interval // when the new leader schedule was being generated, and there are no other validators @@ -459,32 +468,31 @@ impl Fullnode { // check for that if scheduled_leader == self.keypair.pubkey() { self.validator_to_leader(tick_height, entry_height, last_entry_id); - return Ok(()); + Ok(()) + } else { + let tvu = Tvu::new( + self.keypair.clone(), + self.vote_account_keypair.clone(), + &self.bank, + entry_height, + self.cluster_info.clone(), + self.shared_window.clone(), + self.replicate_socket + .iter() + .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) + .collect(), + self.repair_socket + .try_clone() + .expect("Failed to clone repair socket"), + self.retransmit_socket + .try_clone() + .expect("Failed to clone retransmit socket"), + Some(&self.ledger_path), + ); + let validator_state = ValidatorServices::new(tvu); + self.node_role = Some(NodeRole::Validator(validator_state)); + Ok(()) } - - let tvu = Tvu::new( - self.keypair.clone(), - &self.bank, - tick_height, - entry_height, - self.cluster_info.clone(), - self.shared_window.clone(), - self.replicate_socket - .iter() - .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) - .collect(), - self.repair_socket - .try_clone() - .expect("Failed to clone repair socket"), - self.retransmit_socket - .try_clone() - .expect("Failed to clone retransmit socket"), - Some(&self.ledger_path), - self.leader_scheduler.clone(), - ); - let validator_state = ValidatorServices::new(tvu); - self.node_role = Some(NodeRole::Validator(validator_state)); - Ok(()) } fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_entry_id: Hash) { @@ -494,14 +502,12 @@ impl Fullnode { .set_leader(self.keypair.pubkey()); let max_tick_height = { - let ls_lock = self.leader_scheduler.read().unwrap(); + let ls_lock = self.bank.leader_scheduler.read().unwrap(); ls_lock.max_height_for_leader(tick_height) }; let (tpu, blob_receiver, tpu_exit) = Tpu::new( - self.keypair.clone(), &self.bank, - &self.cluster_info, Default::default(), self.transaction_sockets .iter() @@ -525,7 +531,7 @@ impl Fullnode { self.shared_window.clone(), entry_height, blob_receiver, - self.leader_scheduler.clone(), + self.bank.leader_scheduler.clone(), tick_height, tpu_exit, ); @@ -569,6 +575,12 @@ impl Fullnode { if let Some(ref rpu) = self.rpu { rpu.exit(); } + if let Some(ref rpc_service) = self.rpc_service { + rpc_service.exit(); + } + if let Some(ref rpc_pubsub_service) = self.rpc_pubsub_service { + rpc_pubsub_service.exit(); + } match self.node_role { Some(NodeRole::Leader(ref leader_services)) => leader_services.exit(), Some(NodeRole::Validator(ref validator_services)) => validator_services.exit(), @@ -583,21 +595,50 @@ impl Fullnode { pub fn new_bank_from_ledger( ledger_path: &str, - leader_scheduler: &mut LeaderScheduler, + leader_scheduler: Arc>, ) -> (Bank, u64, u64, Vec) { - let bank = Bank::new_with_builtin_programs(); + let mut bank = Bank::new_with_builtin_programs(); + bank.leader_scheduler = leader_scheduler; let entries = read_ledger(ledger_path, true).expect("opening ledger"); let entries = entries .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); info!("processing ledger..."); - let (tick_height, entry_height, ledger_tail) = bank - .process_ledger(entries, leader_scheduler) - .expect("process_ledger"); + let (tick_height, entry_height, ledger_tail) = + bank.process_ledger(entries).expect("process_ledger"); // entry_height is the network-wide agreed height of the ledger. // initialize it from the input ledger info!("processed {} ledger...", entry_height); (bank, tick_height, entry_height, ledger_tail) } + + pub fn get_leader_scheduler(&self) -> &Arc> { + &self.bank.leader_scheduler + } + + fn startup_rpc_services( + rpc_port: Option, + bank: &Arc, + cluster_info: &Arc>, + ) -> (JsonRpcService, PubSubService) { + // Use custom RPC port, if provided (`Some(port)`) + // RPC port may be any open port on the node + // If rpc_port == `None`, node will listen on the default RPC_PORT from Rpc module + // If rpc_port == `Some(0)`, node will dynamically choose any open port for both + // Rpc and RpcPubsub serivces. Useful for tests. + + let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::from(0)), rpc_port.unwrap_or(RPC_PORT)); + let rpc_pubsub_addr = SocketAddr::new( + IpAddr::V4(Ipv4Addr::from(0)), + rpc_port.map_or(RPC_PORT + 1, |port| if port == 0 { port } else { port + 1 }), + ); + + // TODO: The RPC service assumes that there is a drone running on the leader + // Drone location/id will need to be handled a different way as soon as leader rotation begins + ( + JsonRpcService::new(bank, cluster_info, rpc_addr), + PubSubService::new(bank, rpc_pubsub_addr), + ) + } } impl Service for Fullnode { @@ -607,9 +648,14 @@ impl Service for Fullnode { if let Some(rpu) = self.rpu { rpu.join()?; } + if let Some(rpc_service) = self.rpc_service { + rpc_service.join()?; + } + if let Some(rpc_pubsub_service) = self.rpc_pubsub_service { + rpc_pubsub_service.join()?; + } + self.ncp.join()?; - self.rpc_service.join()?; - self.rpc_pubsub_service.join()?; match self.node_role { Some(NodeRole::Validator(validator_service)) => { @@ -643,7 +689,7 @@ mod tests { use std::fs::remove_dir_all; use std::net::UdpSocket; use std::sync::mpsc::channel; - use std::sync::Arc; + use std::sync::{Arc, RwLock}; use streamer::responder; #[test] @@ -651,13 +697,19 @@ mod tests { let keypair = Keypair::new(); let tn = Node::new_localhost_with_pubkey(keypair.pubkey()); let (mint, validator_ledger_path) = create_tmp_genesis("validator_exit", 10_000); - let bank = Bank::new(&mint); + let mut bank = Bank::new(&mint); let entry = tn.info.clone(); let genesis_entries = &mint.create_entries(); let entry_height = genesis_entries.len() as u64; + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + entry.id, + ))); + bank.leader_scheduler = leader_scheduler; + let v = Fullnode::new_with_bank( - keypair, + Arc::new(keypair), + Arc::new(Keypair::new()), bank, 0, entry_height, @@ -666,7 +718,6 @@ mod tests { Some(&entry), &validator_ledger_path, false, - LeaderScheduler::from_bootstrap_leader(entry.id), Some(0), ); v.close().unwrap(); @@ -683,13 +734,20 @@ mod tests { let (mint, validator_ledger_path) = create_tmp_genesis(&format!("validator_parallel_exit_{}", i), 10_000); ledger_paths.push(validator_ledger_path.clone()); - let bank = Bank::new(&mint); + let mut bank = Bank::new(&mint); let entry = tn.info.clone(); let genesis_entries = &mint.create_entries(); let entry_height = genesis_entries.len() as u64; + + let leader_scheduler = Arc::new(RwLock::new( + LeaderScheduler::from_bootstrap_leader(entry.id), + )); + bank.leader_scheduler = leader_scheduler; + Fullnode::new_with_bank( - keypair, + Arc::new(keypair), + Arc::new(Keypair::new()), bank, 0, entry_height, @@ -698,7 +756,6 @@ mod tests { Some(&entry), &validator_ledger_path, false, - LeaderScheduler::from_bootstrap_leader(entry.id), Some(0), ) }).collect(); @@ -757,7 +814,8 @@ mod tests { let mut bootstrap_leader = Fullnode::new( bootstrap_leader_node, &bootstrap_leader_ledger_path, - bootstrap_leader_keypair, + Arc::new(bootstrap_leader_keypair), + Arc::new(Keypair::new()), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -783,7 +841,7 @@ mod tests { #[test] fn test_wrong_role_transition() { // Create the leader node information - let bootstrap_leader_keypair = Keypair::new(); + let bootstrap_leader_keypair = Arc::new(Keypair::new()); let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); let bootstrap_leader_info = bootstrap_leader_node.info.clone(); @@ -805,7 +863,7 @@ mod tests { // Write the entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); - let active_set_entries = make_active_set_entries( + let (active_set_entries, validator_vote_account_keypair) = make_active_set_entries( &validator_keypair, &mint.keypair(), &last_id, @@ -837,10 +895,12 @@ mod tests { ); // Test that a node knows to transition to a validator based on parsing the ledger + let leader_vote_account_keypair = Arc::new(Keypair::new()); let bootstrap_leader = Fullnode::new( bootstrap_leader_node, &bootstrap_leader_ledger_path, bootstrap_leader_keypair, + leader_vote_account_keypair, Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -857,7 +917,8 @@ mod tests { let validator = Fullnode::new( validator_node, &bootstrap_leader_ledger_path, - validator_keypair, + Arc::new(validator_keypair), + Arc::new(validator_vote_account_keypair), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -905,7 +966,7 @@ mod tests { // // 2) A vote from the validator let mut ledger_writer = LedgerWriter::open(&validator_ledger_path, false).unwrap(); - let active_set_entries = + let (active_set_entries, validator_vote_account_keypair) = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); let initial_tick_height = genesis_entries .iter() @@ -933,7 +994,8 @@ mod tests { let mut validator = Fullnode::new( validator_node, &validator_ledger_path, - validator_keypair, + Arc::new(validator_keypair), + Arc::new(validator_vote_account_keypair), Some(leader_ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -993,7 +1055,7 @@ mod tests { // transitioned after tick_height = bootstrap_height. let (_, tick_height, entry_height, _) = Fullnode::new_bank_from_ledger( &validator_ledger_path, - &mut LeaderScheduler::new(&leader_scheduler_config), + Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))), ); assert_eq!(tick_height, bootstrap_height); diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index 58f93a5f24..19b02b01d6 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -4,82 +4,24 @@ use bank::Bank; use bincode::serialize; -use budget_instruction::Vote; -use budget_transaction::BudgetTransaction; use byteorder::{LittleEndian, ReadBytesExt}; use entry::Entry; use hash::{hash, Hash}; use ledger::create_ticks; use signature::{Keypair, KeypairUtil}; -#[cfg(test)] -use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; -use std::collections::HashMap; +use std::collections::HashSet; use std::io::Cursor; use system_transaction::SystemTransaction; use transaction::Transaction; +use vote_program::{Vote, VoteProgram}; +use vote_transaction::VoteTransaction; pub const DEFAULT_BOOTSTRAP_HEIGHT: u64 = 1000; pub const DEFAULT_LEADER_ROTATION_INTERVAL: u64 = 100; pub const DEFAULT_SEED_ROTATION_INTERVAL: u64 = 1000; pub const DEFAULT_ACTIVE_WINDOW_LENGTH: u64 = 1000; -#[derive(Debug)] -pub struct ActiveValidators { - // Map from validator id to the last PoH height at which they voted, - pub active_validators: HashMap, - pub active_window_length: u64, -} - -impl ActiveValidators { - pub fn new(active_window_length_option: Option) -> Self { - let mut active_window_length = DEFAULT_ACTIVE_WINDOW_LENGTH; - if let Some(input) = active_window_length_option { - active_window_length = input; - } - - ActiveValidators { - active_validators: HashMap::new(), - active_window_length, - } - } - - // Finds all the active voters who have voted in the range - // (height - active_window_length, height], and removes - // anybody who hasn't voted in that range from the map - pub fn get_active_set(&mut self, height: u64) -> Vec { - // Don't filter anything if height is less than the - // size of the active window. Otherwise, calculate the acceptable - // window and filter the active_validators - - // Note: height == 0 will only be included for all - // height < self.active_window_length - let upper_bound = height; - if height >= self.active_window_length { - let lower_bound = height - self.active_window_length; - self.active_validators - .retain(|_, height| *height > lower_bound); - } - - self.active_validators - .iter() - .filter_map(|(k, v)| if *v <= upper_bound { Some(*k) } else { None }) - .collect() - } - - // Push a vote for a validator with id == "id" who voted at PoH height == "height" - pub fn push_vote(&mut self, id: Pubkey, height: u64) -> () { - let old_height = self.active_validators.entry(id).or_insert(height); - if height > *old_height { - *old_height = height; - } - } - - pub fn reset(&mut self) -> () { - self.active_validators.clear(); - } -} - pub struct LeaderSchedulerConfig { // The first leader who will bootstrap the network pub bootstrap_leader: Pubkey, @@ -119,7 +61,7 @@ impl LeaderSchedulerConfig { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct LeaderScheduler { // Set to true if we want the default implementation of the LeaderScheduler, // where ony the bootstrap leader is used @@ -139,12 +81,13 @@ pub struct LeaderScheduler { // the leader rotation process begins to pick future leaders pub bootstrap_height: u64, - // Maintain the set of active validators - pub active_validators: ActiveValidators, - // The last height at which the seed + schedule was generated pub last_seed_height: Option, + // The length of time in ticks for which a vote qualifies a candidate for leader + // selection + pub active_window_length: u64, + // Round-robin ordering for the validators leader_schedule: Vec, @@ -193,6 +136,11 @@ impl LeaderScheduler { seed_rotation_interval = input; } + let mut active_window_length = DEFAULT_ACTIVE_WINDOW_LENGTH; + if let Some(input) = config.active_window_length_option { + active_window_length = input; + } + // Enforced invariants assert!(seed_rotation_interval >= leader_rotation_interval); assert!(bootstrap_height > 0); @@ -200,13 +148,13 @@ impl LeaderScheduler { LeaderScheduler { use_only_bootstrap_leader: false, - active_validators: ActiveValidators::new(config.active_window_length_option), leader_rotation_interval, seed_rotation_interval, leader_schedule: Vec::new(), last_seed_height: None, bootstrap_leader: config.bootstrap_leader, bootstrap_height, + active_window_length, seed: 0, } } @@ -280,15 +228,6 @@ impl LeaderScheduler { pub fn reset(&mut self) { self.last_seed_height = None; - self.active_validators.reset(); - } - - pub fn push_vote(&mut self, id: Pubkey, height: u64) { - if self.use_only_bootstrap_leader { - return; - } - - self.active_validators.push_vote(id, height); } pub fn update_height(&mut self, height: u64, bank: &Bank) { @@ -343,8 +282,34 @@ impl LeaderScheduler { Some(self.leader_schedule[validator_index]) } - fn get_active_set(&mut self, height: u64) -> Vec { - self.active_validators.get_active_set(height) + // TODO: We use a HashSet for now because a single validator could potentially register + // multiple vote account. Once that is no longer possible (see the TODO in vote_program.rs, + // process_transaction(), case VoteInstruction::RegisterAccount), we can use a vector. + fn get_active_set(&mut self, height: u64, bank: &Bank) -> HashSet { + let upper_bound = height; + let lower_bound = height.saturating_sub(self.active_window_length); + + { + let bank_accounts = &*bank.accounts.read().unwrap(); + + bank_accounts + .values() + .filter_map(|account| { + if VoteProgram::check_id(&account.program_id) { + if let Ok(vote_state) = VoteProgram::deserialize(&account.userdata) { + return vote_state + .votes + .back() + .filter(|vote| { + vote.tick_height > lower_bound + && vote.tick_height <= upper_bound + }).map(|_| vote_state.node_id); + } + } + + None + }).collect() + } } // Called every seed_rotation_interval entries, generates the leader schedule @@ -354,8 +319,8 @@ impl LeaderScheduler { assert!((height - self.bootstrap_height) % self.seed_rotation_interval == 0); let seed = Self::calculate_seed(height); self.seed = seed; - let active_set = self.get_active_set(height); - let ranked_active_set = Self::rank_active_set(bank, &active_set[..]); + let active_set = self.get_active_set(height, &bank); + let ranked_active_set = Self::rank_active_set(bank, active_set.iter()); // Handle case where there are no active validators with // non-zero stake. In this case, use the bootstrap leader for @@ -417,9 +382,11 @@ impl LeaderScheduler { bank.get_balance(id) } - fn rank_active_set<'a>(bank: &Bank, active: &'a [Pubkey]) -> Vec<(&'a Pubkey, u64)> { + fn rank_active_set<'a, I>(bank: &Bank, active: I) -> Vec<(&'a Pubkey, u64)> + where + I: Iterator, + { let mut active_accounts: Vec<(&'a Pubkey, u64)> = active - .iter() .filter_map(|pk| { let stake = Self::get_stake(pk, bank); if stake > 0 { @@ -478,24 +445,6 @@ impl Default for LeaderScheduler { } } -// Remove all candiates for leader selection from the active set by clearing the bank, -// and then set a single new candidate who will be eligible starting at height = vote_height -// by adding one new account to the bank -#[cfg(test)] -pub fn set_new_leader(bank: &Bank, leader_scheduler: &mut LeaderScheduler, vote_height: u64) { - // Set the scheduled next leader to some other node - let new_leader_keypair = Keypair::new(); - let new_leader_id = new_leader_keypair.pubkey(); - leader_scheduler.push_vote(new_leader_id, vote_height); - let dummy_id = Keypair::new().pubkey(); - let new_account = Account::new(1, 10, dummy_id.clone()); - - // Remove the previous acounts from the active set - let mut accounts = bank.accounts().write().unwrap(); - accounts.clear(); - accounts.insert(new_leader_id, new_account); -} - // Create two entries so that the node with keypair == active_keypair // is in the active set for leader selection: // 1) Give the node a nonzero number of tokens, @@ -506,50 +455,107 @@ pub fn make_active_set_entries( last_entry_id: &Hash, last_tick_id: &Hash, num_ending_ticks: usize, -) -> Vec { +) -> (Vec, Keypair) { // 1) Create transfer token entry let transfer_tx = - Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_tick_id); + Transaction::system_new(&token_source, active_keypair.pubkey(), 2, *last_tick_id); let transfer_entry = Entry::new(last_entry_id, 1, vec![transfer_tx]); let mut last_entry_id = transfer_entry.id; - // 2) Create vote entry - let vote = Vote { - version: 0, - contact_info_version: 0, - }; - let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, *last_tick_id, 0); + // 2) Create the vote account + let vote_account = Keypair::new(); + let create_vote_account_tx = + Transaction::vote_account_new(active_keypair, vote_account.pubkey(), *last_tick_id, 1); + + let create_vote_account_entry = Entry::new(&last_entry_id, 1, vec![create_vote_account_tx]); + last_entry_id = create_vote_account_entry.id; + + // 3) Register the vote account + let register_vote_account_tx = + Transaction::vote_account_register(active_keypair, vote_account.pubkey(), *last_tick_id, 0); + + let register_vote_account_entry = Entry::new(&last_entry_id, 1, vec![register_vote_account_tx]); + last_entry_id = register_vote_account_entry.id; + + // 4) Create vote entry + let vote = Vote { tick_height: 1 }; + let vote_tx = Transaction::vote_new(&vote_account, vote, *last_tick_id, 0); let vote_entry = Entry::new(&last_entry_id, 1, vec![vote_tx]); last_entry_id = vote_entry.id; - // 3) Create the ending empty ticks - let mut txs = vec![transfer_entry, vote_entry]; + // 5) Create the ending empty ticks + let mut txs = vec![ + transfer_entry, + create_vote_account_entry, + register_vote_account_entry, + vote_entry, + ]; let empty_ticks = create_ticks(num_ending_ticks, last_entry_id); txs.extend(empty_ticks); - txs + (txs, vote_account) } #[cfg(test)] mod tests { use bank::Bank; + use hash::Hash; use leader_scheduler::{ - ActiveValidators, LeaderScheduler, LeaderSchedulerConfig, DEFAULT_ACTIVE_WINDOW_LENGTH, - DEFAULT_BOOTSTRAP_HEIGHT, DEFAULT_LEADER_ROTATION_INTERVAL, DEFAULT_SEED_ROTATION_INTERVAL, + LeaderScheduler, LeaderSchedulerConfig, DEFAULT_BOOTSTRAP_HEIGHT, + DEFAULT_LEADER_ROTATION_INTERVAL, DEFAULT_SEED_ROTATION_INTERVAL, }; use mint::Mint; + use result::Result; use signature::{Keypair, KeypairUtil}; use solana_sdk::pubkey::Pubkey; use std::collections::HashSet; - use std::hash::Hash; + use std::hash::Hash as StdHash; use std::iter::FromIterator; + use transaction::Transaction; + use vote_program::Vote; + use vote_transaction::VoteTransaction; fn to_hashset_owned(slice: &[T]) -> HashSet where - T: Eq + Hash + Clone, + T: Eq + StdHash + Clone, { HashSet::from_iter(slice.iter().cloned()) } + fn push_vote(vote_account: &Keypair, bank: &Bank, height: u64, last_id: Hash) { + let vote = Vote { + tick_height: height, + }; + + let new_vote_tx = Transaction::vote_new(vote_account, vote, last_id, 0); + + bank.process_transaction(&new_vote_tx).unwrap(); + } + + fn create_vote_account( + node_keypair: &Keypair, + bank: &Bank, + num_tokens: i64, + last_id: Hash, + ) -> Result { + let new_vote_account = Keypair::new(); + + // Create the new vote account + let tx = Transaction::vote_account_new( + node_keypair, + new_vote_account.pubkey(), + last_id, + num_tokens, + ); + bank.process_transaction(&tx)?; + + // Register the vote account to the validator + let tx = + Transaction::vote_account_register(node_keypair, new_vote_account.pubkey(), last_id, 0); + bank.process_transaction(&tx)?; + + Ok(new_vote_account) + } + fn run_scheduler_test( num_validators: usize, bootstrap_height: u64, @@ -572,7 +578,11 @@ mod tests { let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); // Create the bank and validators, which are inserted in order of account balance - let mint = Mint::new((((num_validators + 1) / 2) * (num_validators + 1)) as i64); + let num_vote_account_tokens = 1; + let mint = Mint::new( + (((num_validators + 1) / 2) * (num_validators + 1) + + num_vote_account_tokens * num_validators) as i64, + ); let bank = Bank::new(&mint); let mut validators = vec![]; let last_id = mint @@ -584,11 +594,24 @@ mod tests { let new_validator = Keypair::new(); let new_pubkey = new_validator.pubkey(); validators.push(new_pubkey); + // Give the validator some tokens + bank.transfer( + (i + 1 + num_vote_account_tokens) as i64, + &mint.keypair(), + new_pubkey, + last_id, + ).unwrap(); + + // Create a vote account + let new_vote_account = create_vote_account( + &new_validator, + &bank, + num_vote_account_tokens as i64, + mint.last_id(), + ).unwrap(); // Vote to make the validator part of the active set for the entire test // (we made the active_window_length large enough at the beginning of the test) - leader_scheduler.push_vote(new_pubkey, 1); - bank.transfer((i + 1) as i64, &mint.keypair(), new_pubkey, last_id) - .unwrap(); + push_vote(&new_vote_account, &bank, 1, mint.last_id()); } // The scheduled leader during the bootstrapping period (assuming a seed + schedule @@ -666,6 +689,9 @@ mod tests { fn test_active_set() { let leader_id = Keypair::new().pubkey(); let active_window_length = 1000; + let mint = Mint::new(10000); + let bank = Bank::new(&mint); + let leader_scheduler_config = LeaderSchedulerConfig::new( leader_id, Some(100), @@ -681,40 +707,60 @@ mod tests { let num_old_ids = 20; let mut old_ids = HashSet::new(); for _ in 0..num_old_ids { - let pk = Keypair::new().pubkey(); - old_ids.insert(pk); - leader_scheduler.push_vote(pk, start_height); + let new_keypair = Keypair::new(); + let pk = new_keypair.pubkey(); + old_ids.insert(pk.clone()); + + // Give the account some stake + bank.transfer(5, &mint.keypair(), pk, mint.last_id()) + .unwrap(); + + // Create a vote account + let new_vote_account = + create_vote_account(&new_keypair, &bank, 1, mint.last_id()).unwrap(); + + // Push a vote for the account + push_vote(&new_vote_account, &bank, start_height, mint.last_id()); } // Insert a bunch of votes at height "start_height + active_window_length" let num_new_ids = 10; let mut new_ids = HashSet::new(); for _ in 0..num_new_ids { - let pk = Keypair::new().pubkey(); + let new_keypair = Keypair::new(); + let pk = new_keypair.pubkey(); new_ids.insert(pk); - leader_scheduler.push_vote(pk, start_height + active_window_length); + // Give the account some stake + bank.transfer(5, &mint.keypair(), pk, mint.last_id()) + .unwrap(); + + // Create a vote account + let new_vote_account = + create_vote_account(&new_keypair, &bank, 1, mint.last_id()).unwrap(); + + push_vote( + &new_vote_account, + &bank, + start_height + active_window_length, + mint.last_id(), + ); } // Queries for the active set - let result = leader_scheduler.get_active_set(active_window_length + start_height - 1); - assert_eq!(result.len(), num_old_ids); - let result_set = to_hashset_owned(&result); - assert_eq!(result_set, old_ids); + let result = + leader_scheduler.get_active_set(active_window_length + start_height - 1, &bank); + assert_eq!(result, old_ids); - let result = leader_scheduler.get_active_set(active_window_length + start_height); - assert_eq!(result.len(), num_new_ids); - let result_set = to_hashset_owned(&result); - assert_eq!(result_set, new_ids); + let result = leader_scheduler.get_active_set(active_window_length + start_height, &bank); + assert_eq!(result, new_ids); - let result = leader_scheduler.get_active_set(2 * active_window_length + start_height - 1); - assert_eq!(result.len(), num_new_ids); - let result_set = to_hashset_owned(&result); - assert_eq!(result_set, new_ids); + let result = + leader_scheduler.get_active_set(2 * active_window_length + start_height - 1, &bank); + assert_eq!(result, new_ids); - let result = leader_scheduler.get_active_set(2 * active_window_length + start_height); - assert_eq!(result.len(), 0); - let result_set = to_hashset_owned(&result); - assert!(result_set.is_empty()); + let result = + leader_scheduler.get_active_set(2 * active_window_length + start_height, &bank); + assert!(result.is_empty()); } #[test] @@ -754,7 +800,7 @@ mod tests { } let validators_pk: Vec = validators.iter().map(Keypair::pubkey).collect(); - let result = LeaderScheduler::rank_active_set(&bank, &validators_pk[..]); + let result = LeaderScheduler::rank_active_set(&bank, validators_pk.iter()); assert_eq!(result.len(), validators.len()); @@ -784,7 +830,7 @@ mod tests { .chain(new_validators.iter()) .map(Keypair::pubkey) .collect(); - let result = LeaderScheduler::rank_active_set(&bank, &all_validators[..]); + let result = LeaderScheduler::rank_active_set(&bank, all_validators.iter()); assert_eq!(result.len(), new_validators.len()); for (i, (pk, balance)) in result.into_iter().enumerate() { @@ -810,7 +856,7 @@ mod tests { .unwrap(); } - let result = LeaderScheduler::rank_active_set(&bank, &tied_validators_pk[..]); + let result = LeaderScheduler::rank_active_set(&bank, tied_validators_pk.iter()); let mut sorted: Vec<&Pubkey> = tied_validators_pk.iter().map(|x| x).collect(); sorted.sort_by(|pk1, pk2| pk1.cmp(pk2)); assert_eq!(result.len(), tied_validators_pk.len()); @@ -922,6 +968,7 @@ mod tests { #[test] fn test_scheduler_active_window() { let num_validators = 10; + let num_vote_account_tokens = 1; // Set up the LeaderScheduler struct let bootstrap_leader_id = Keypair::new().pubkey(); let bootstrap_height = 500; @@ -943,7 +990,10 @@ mod tests { let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); // Create the bank and validators - let mint = Mint::new((((num_validators + 1) / 2) * (num_validators + 1)) as i64); + let mint = Mint::new( + ((((num_validators + 1) / 2) * (num_validators + 1)) + + (num_vote_account_tokens * num_validators)) as i64, + ); let bank = Bank::new(&mint); let mut validators = vec![]; let last_id = mint @@ -955,10 +1005,29 @@ mod tests { let new_validator = Keypair::new(); let new_pubkey = new_validator.pubkey(); validators.push(new_pubkey); + // Give the validator some tokens + bank.transfer( + (i + 1 + num_vote_account_tokens) as i64, + &mint.keypair(), + new_pubkey, + last_id, + ).unwrap(); + + // Create a vote account + let new_vote_account = create_vote_account( + &new_validator, + &bank, + num_vote_account_tokens as i64, + mint.last_id(), + ).unwrap(); + // Vote at height i * active_window_length for validator i - leader_scheduler.push_vote(new_pubkey, i * active_window_length + bootstrap_height); - bank.transfer((i + 1) as i64, &mint.keypair(), new_pubkey, last_id) - .unwrap(); + push_vote( + &new_vote_account, + &bank, + i * active_window_length + bootstrap_height, + mint.last_id(), + ); } // Generate schedule every active_window_length entries and check that @@ -979,8 +1048,12 @@ mod tests { #[test] fn test_multiple_vote() { - let leader_id = Keypair::new().pubkey(); + let leader_keypair = Keypair::new(); + let leader_id = leader_keypair.pubkey(); let active_window_length = 1000; + let mint = Mint::new(10000); + let bank = Bank::new(&mint); + let leader_scheduler_config = LeaderSchedulerConfig::new( leader_id, Some(100), @@ -991,18 +1064,38 @@ mod tests { let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); - // Check that a validator that votes twice in a row will get included in the active + // Give the node some tokens + bank.transfer(5, &mint.keypair(), leader_id, bank.last_id()) + .unwrap(); + + // Check that a node that votes twice in a row will get included in the active // window let initial_vote_height = 1; + // Create a vote account + let new_vote_account = + create_vote_account(&leader_keypair, &bank, 1, mint.last_id()).unwrap(); + // Vote twice - leader_scheduler.push_vote(leader_id, initial_vote_height); - leader_scheduler.push_vote(leader_id, initial_vote_height + 1); - let result = leader_scheduler.get_active_set(initial_vote_height + active_window_length); - assert_eq!(result, vec![leader_id]); + push_vote( + &new_vote_account, + &bank, + initial_vote_height, + mint.last_id(), + ); + push_vote( + &new_vote_account, + &bank, + initial_vote_height + 1, + mint.last_id(), + ); + let result = - leader_scheduler.get_active_set(initial_vote_height + active_window_length + 1); - assert_eq!(result, vec![]); + leader_scheduler.get_active_set(initial_vote_height + active_window_length, &bank); + assert_eq!(result, to_hashset_owned(&vec![leader_id])); + let result = + leader_scheduler.get_active_set(initial_vote_height + active_window_length + 1, &bank); + assert!(result.is_empty()); } #[test] @@ -1063,13 +1156,6 @@ mod tests { DEFAULT_SEED_ROTATION_INTERVAL ); - // Check defaults for ActiveValidators - let active_validators = ActiveValidators::new(None); - assert_eq!( - active_validators.active_window_length, - DEFAULT_ACTIVE_WINDOW_LENGTH - ); - // Check actual arguments for LeaderScheduler let bootstrap_height = 500; let leader_rotation_interval = 100; @@ -1096,14 +1182,11 @@ mod tests { leader_scheduler.seed_rotation_interval, seed_rotation_interval ); - - // Check actual arguments for ActiveValidators - let active_validators = ActiveValidators::new(Some(active_window_length)); - assert_eq!(active_validators.active_window_length, active_window_length); } fn run_consecutive_leader_test(num_slots_per_epoch: u64, add_validator: bool) { - let bootstrap_leader_id = Keypair::new().pubkey(); + let bootstrap_leader_keypair = Keypair::new(); + let bootstrap_leader_id = bootstrap_leader_keypair.pubkey(); let bootstrap_height = 500; let leader_rotation_interval = 100; let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval; @@ -1130,11 +1213,20 @@ mod tests { let initial_vote_height = 1; // Create and add validator to the active set - let validator_id = Keypair::new().pubkey(); + let validator_keypair = Keypair::new(); + let validator_id = validator_keypair.pubkey(); if add_validator { - leader_scheduler.push_vote(validator_id, initial_vote_height); - bank.transfer(1, &mint.keypair(), validator_id, last_id) + bank.transfer(5, &mint.keypair(), validator_id, last_id) .unwrap(); + // Create a vote account + let new_vote_account = + create_vote_account(&validator_keypair, &bank, 1, mint.last_id()).unwrap(); + push_vote( + &new_vote_account, + &bank, + initial_vote_height, + mint.last_id(), + ); } // Make sure the bootstrap leader, not the validator, is picked again on next slot @@ -1151,10 +1243,29 @@ mod tests { } }; + let vote_account_tokens = 1; + bank.transfer( + leader_stake + vote_account_tokens, + &mint.keypair(), + bootstrap_leader_id, + last_id, + ).unwrap(); + + // Create a vote account + let new_vote_account = create_vote_account( + &bootstrap_leader_keypair, + &bank, + vote_account_tokens, + mint.last_id(), + ).unwrap(); + // Add leader to the active set - leader_scheduler.push_vote(bootstrap_leader_id, initial_vote_height); - bank.transfer(leader_stake, &mint.keypair(), bootstrap_leader_id, last_id) - .unwrap(); + push_vote( + &new_vote_account, + &bank, + initial_vote_height, + mint.last_id(), + ); leader_scheduler.generate_schedule(bootstrap_height, &bank); @@ -1182,7 +1293,8 @@ mod tests { #[test] fn test_max_height_for_leader() { - let bootstrap_leader_id = Keypair::new().pubkey(); + let bootstrap_leader_keypair = Keypair::new(); + let bootstrap_leader_id = bootstrap_leader_keypair.pubkey(); let bootstrap_height = 500; let leader_rotation_interval = 100; let seed_rotation_interval = 2 * leader_rotation_interval; @@ -1254,15 +1366,34 @@ mod tests { // Now test when the active set > 1 node // Create and add validator to the active set - let validator_id = Keypair::new().pubkey(); - leader_scheduler.push_vote(validator_id, initial_vote_height); - bank.transfer(1, &mint.keypair(), validator_id, last_id) + let validator_keypair = Keypair::new(); + let validator_id = validator_keypair.pubkey(); + + // Create a vote account for the validator + bank.transfer(5, &mint.keypair(), validator_id, last_id) .unwrap(); + let new_validator_vote_account = + create_vote_account(&validator_keypair, &bank, 1, mint.last_id()).unwrap(); + push_vote( + &new_validator_vote_account, + &bank, + initial_vote_height, + mint.last_id(), + ); + + // Create a vote account for the leader + bank.transfer(5, &mint.keypair(), bootstrap_leader_id, last_id) + .unwrap(); + let new_leader_vote_account = + create_vote_account(&bootstrap_leader_keypair, &bank, 1, mint.last_id()).unwrap(); // Add leader to the active set - leader_scheduler.push_vote(bootstrap_leader_id, initial_vote_height); - bank.transfer(1, &mint.keypair(), bootstrap_leader_id, last_id) - .unwrap(); + push_vote( + &new_leader_vote_account, + &bank, + initial_vote_height, + mint.last_id(), + ); // Generate the schedule leader_scheduler.generate_schedule(bootstrap_height, &bank); diff --git a/src/leader_vote_stage.rs b/src/leader_vote_stage.rs deleted file mode 100644 index 7eae6b273c..0000000000 --- a/src/leader_vote_stage.rs +++ /dev/null @@ -1,159 +0,0 @@ -//! The `leader_vote_stage` module implements the TPU's vote stage. It -//! computes and notes the votes for the entries, and then sends the -//! Entry to its output channel. - -use bank::Bank; -use cluster_info::ClusterInfo; -use counter::Counter; -use entry::Entry; -use ledger::Block; -use log::Level; -use result::{Error, Result}; -use service::Service; -use signature::Keypair; -use std::net::UdpSocket; -use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; -use std::sync::{Arc, RwLock}; -use std::thread::{self, Builder, JoinHandle}; -use std::time::{Duration, Instant}; -use streamer::responder; -use timing::duration_as_ms; -use vote_stage::send_leader_vote; - -pub struct LeaderVoteStage { - thread_hdls: Vec>, - vote_thread: JoinHandle<()>, -} - -impl LeaderVoteStage { - /// Process any Entry items that have been published by the RecordStage. - /// continuosly send entries out - pub fn compute_vote_and_send_entries( - cluster_info: &Arc>, - entry_sender: &Sender>, - entry_receiver: &Receiver>, - ) -> Result<()> { - let mut ventries = Vec::new(); - let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; - let now = Instant::now(); - let mut num_new_entries = 0; - - loop { - num_new_entries += received_entries.len(); - ventries.push(received_entries); - - if let Ok(n) = entry_receiver.try_recv() { - received_entries = n; - } else { - break; - } - } - inc_new_counter_info!("leader_vote_stage-entries_received", num_new_entries); - debug!("leader_vote_stage entries: {}", num_new_entries); - - for entries in ventries { - let votes = &entries.votes(); - cluster_info.write().unwrap().insert_votes(&votes); - - inc_new_counter_info!("leader_vote_stage-write_entries", entries.len()); - - //TODO(anatoly): real stake based voting needs to change this - //leader simply votes if the current set of validators have voted - //on a valid last id - - trace!("New entries? {}", entries.len()); - if !entries.is_empty() { - inc_new_counter_info!("leader_vote_stage-recv_vote", votes.len()); - inc_new_counter_info!("leader_vote_stage-entries_sent", entries.len()); - trace!("broadcasting {}", entries.len()); - entry_sender.send(entries)?; - } - } - inc_new_counter_info!( - "leader_vote_stage-time_ms", - duration_as_ms(&now.elapsed()) as usize - ); - - Ok(()) - } - - /// Create a new LeaderVoteStage for voting and broadcasting entries. - pub fn new( - keypair: Arc, - bank: Arc, - cluster_info: Arc>, - entry_receiver: Receiver>, - ) -> (Self, Receiver>) { - let (vote_blob_sender, vote_blob_receiver) = channel(); - let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); - let t_responder = responder( - "leader_vote_stage_vote_sender", - Arc::new(send), - vote_blob_receiver, - ); - let (entry_sender, entry_receiver_forward) = channel(); - - let vote_thread = Builder::new() - .name("solana-writer".to_string()) - .spawn(move || { - let mut last_vote = 0; - let mut last_valid_validator_timestamp = 0; - let id = cluster_info.read().unwrap().id; - loop { - if let Err(e) = Self::compute_vote_and_send_entries( - &cluster_info, - &entry_sender, - &entry_receiver, - ) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { - break; - } - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - _ => { - inc_new_counter_info!( - "leader_vote_stage-compute_vote_and_send_entries-error", - 1 - ); - error!("{:?}", e); - } - } - }; - if let Err(e) = send_leader_vote( - &id, - &keypair, - &bank, - &cluster_info, - &vote_blob_sender, - &mut last_vote, - &mut last_valid_validator_timestamp, - ) { - inc_new_counter_info!("leader_vote_stage-leader_vote-error", 1); - error!("{:?}", e); - } - } - }).unwrap(); - - let thread_hdls = vec![t_responder]; - ( - LeaderVoteStage { - vote_thread, - thread_hdls, - }, - entry_receiver_forward, - ) - } -} - -impl Service for LeaderVoteStage { - type JoinReturnType = (); - - fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - - self.vote_thread.join() - } -} diff --git a/src/ledger.rs b/src/ledger.rs index 0e11dd7dc2..f8f0aeee5c 100644 --- a/src/ledger.rs +++ b/src/ledger.rs @@ -3,7 +3,7 @@ //! access read to a persistent file-based ledger. use bincode::{self, deserialize, deserialize_from, serialize_into, serialized_size}; -use budget_instruction::Vote; +#[cfg(test)] use budget_transaction::BudgetTransaction; #[cfg(test)] use chrono::prelude::Utc; @@ -25,6 +25,8 @@ use std::mem::size_of; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::path::Path; use transaction::Transaction; +use vote_program::Vote; +use vote_transaction::VoteTransaction; use window::WINDOW_SIZE; // @@ -496,7 +498,7 @@ impl Block for [Entry] { entry .transactions .iter() - .filter_map(BudgetTransaction::vote) + .flat_map(VoteTransaction::get_votes) }).collect() } } @@ -684,7 +686,6 @@ pub fn make_tiny_test_entries(num: usize) -> Vec { mod tests { use super::*; use bincode::serialized_size; - use budget_instruction::Vote; use budget_transaction::BudgetTransaction; use entry::{next_entry, Entry}; use hash::hash; @@ -693,6 +694,7 @@ mod tests { use std; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use transaction::Transaction; + use vote_program::Vote; #[test] fn test_verify_slice() { @@ -714,15 +716,8 @@ mod tests { let zero = Hash::default(); let one = hash(&zero.as_ref()); let keypair = Keypair::new(); - let tx0 = Transaction::budget_new_vote( - &keypair, - Vote { - version: 0, - contact_info_version: 1, - }, - one, - 1, - ); + let vote_account = Keypair::new(); + let tx0 = Transaction::vote_new(&vote_account, Vote { tick_height: 1 }, one, 1); let tx1 = Transaction::budget_new_timestamp( &keypair, keypair.pubkey(), @@ -772,15 +767,8 @@ mod tests { let id = Hash::default(); let next_id = hash(&id.as_ref()); let keypair = Keypair::new(); - let tx_small = Transaction::budget_new_vote( - &keypair, - Vote { - version: 0, - contact_info_version: 2, - }, - next_id, - 2, - ); + let vote_account = Keypair::new(); + let tx_small = Transaction::vote_new(&vote_account, Vote { tick_height: 1 }, next_id, 2); let tx_large = Transaction::budget_new(&keypair, keypair.pubkey(), 1, next_id); let tx_small_size = serialized_size(&tx_small).unwrap() as usize; diff --git a/src/lib.rs b/src/lib.rs index 5a4b20c238..1f9e9024bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,7 +35,6 @@ pub mod fetch_stage; pub mod fullnode; pub mod hash; pub mod leader_scheduler; -pub mod leader_vote_stage; pub mod ledger; pub mod ledger_write_stage; pub mod loader_transaction; @@ -80,7 +79,9 @@ pub mod token_program; pub mod tpu; pub mod transaction; pub mod tvu; +pub mod vote_program; pub mod vote_stage; +pub mod vote_transaction; pub mod wallet; pub mod window; pub mod window_service; diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 2a758043c9..791e6289cc 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -6,8 +6,6 @@ use counter::Counter; use entry::{EntryReceiver, EntrySender}; use hash::Hash; use influx_db_client as influxdb; -use leader_scheduler::LeaderScheduler; -use ledger::Block; use log::Level; use metrics; use result::{Error, Result}; @@ -59,11 +57,10 @@ impl ReplicateStage { cluster_info: &Arc>, window_receiver: &EntryReceiver, keypair: &Arc, + vote_account_keypair: &Arc, vote_blob_sender: Option<&BlobSender>, ledger_entry_sender: &EntrySender, - tick_height: &mut u64, entry_height: &mut u64, - leader_scheduler: &Arc>, ) -> Result { let timer = Duration::new(1, 0); //coalesce all the available entries into a single vote @@ -81,37 +78,23 @@ impl ReplicateStage { let mut res = Ok(()); let last_entry_id = { let mut num_entries_to_write = entries.len(); + let current_leader = bank + .get_current_leader() + .expect("Scheduled leader id should never be unknown while processing entries"); for (i, entry) in entries.iter().enumerate() { - // max_tick_height is the PoH height at which the next leader rotation will - // happen. The leader should send an entry such that the total PoH is equal - // to max_tick_height - guard. - // TODO: Introduce a "guard" for the end of transmission periods, the guard - // is assumed to be zero for now. - let max_tick_height = { - let ls_lock = leader_scheduler.read().unwrap(); - ls_lock.max_height_for_leader(*tick_height) - }; + res = bank.process_entry(&entry); + let my_id = keypair.pubkey(); + let scheduled_leader = bank + .get_current_leader() + .expect("Scheduled leader id should never be unknown while processing entries"); - res = bank.process_entry( - &entry, - tick_height, - &mut *leader_scheduler.write().unwrap(), - ); - - // Will run only if leader_scheduler.use_only_bootstrap_leader is false - if let Some(max_tick_height) = max_tick_height { - let ls_lock = leader_scheduler.read().unwrap(); - if *tick_height == max_tick_height { - let my_id = keypair.pubkey(); - let scheduled_leader = ls_lock.get_scheduled_leader(*tick_height).expect( - "Scheduled leader id should never be unknown while processing entries", - ); - cluster_info.write().unwrap().set_leader(scheduled_leader); - if my_id == scheduled_leader { - num_entries_to_write = i + 1; - break; - } - } + // TODO: Remove this soon once we boot the leader from ClusterInfo + if scheduled_leader != current_leader { + cluster_info.write().unwrap().set_leader(scheduled_leader); + } + if my_id == scheduled_leader { + num_entries_to_write = i + 1; + break; } if res.is_err() { @@ -134,11 +117,9 @@ impl ReplicateStage { }; if let Some(sender) = vote_blob_sender { - send_validator_vote(bank, keypair, cluster_info, sender)?; + send_validator_vote(bank, vote_account_keypair, &cluster_info, sender)?; } - cluster_info.write().unwrap().insert_votes(&entries.votes()); - inc_new_counter_info!( "replicate-transactions", entries.iter().map(|x| x.transactions.len()).sum() @@ -160,13 +141,12 @@ impl ReplicateStage { pub fn new( keypair: Arc, + vote_account_keypair: Arc, bank: Arc, cluster_info: Arc>, window_receiver: EntryReceiver, exit: Arc, - tick_height: u64, entry_height: u64, - leader_scheduler: Arc>, ) -> (Self, EntryReceiver) { let (vote_blob_sender, vote_blob_receiver) = channel(); let (ledger_entry_sender, ledger_entry_receiver) = channel(); @@ -182,17 +162,15 @@ impl ReplicateStage { let now = Instant::now(); let mut next_vote_secs = 1; let mut entry_height_ = entry_height; - let mut tick_height_ = tick_height; let mut last_entry_id = None; loop { - let leader_id = leader_scheduler - .read() - .unwrap() - .get_scheduled_leader(tick_height_) - .expect("Scheduled leader id should never be unknown at this point"); + let leader_id = + bank.get_current_leader() + .expect("Scheduled leader id should never be unknown at this point"); + if leader_id == keypair.pubkey() { return Some(ReplicateStageReturnType::LeaderRotation( - tick_height_, + bank.get_tick_height(), entry_height_, // We should never start the TPU / this stage on an exact entry that causes leader // rotation (Fullnode should automatically transition on startup if it detects @@ -215,11 +193,10 @@ impl ReplicateStage { &cluster_info, &window_receiver, &keypair, + &vote_account_keypair, vote_sender, &ledger_entry_sender, - &mut tick_height_, &mut entry_height_, - &leader_scheduler, ) { Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break, Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (), @@ -294,7 +271,7 @@ mod test { // 1) Give the validator a nonzero number of tokens 2) A vote from the validator . // This will cause leader rotation after the bootstrap height let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); - let active_set_entries = + let (active_set_entries, vote_account_keypair) = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0); last_id = active_set_entries.last().unwrap().id; let initial_tick_height = genesis_entries @@ -319,26 +296,23 @@ mod test { Some(bootstrap_height), ); - let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); + let leader_scheduler = + Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config))); // Set up the bank - let (bank, _, _, _) = - Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler); - - let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); + let (bank, _, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, leader_scheduler); // Set up the replicate stage let (entry_sender, entry_receiver) = channel(); let exit = Arc::new(AtomicBool::new(false)); let (replicate_stage, _ledger_writer_recv) = ReplicateStage::new( Arc::new(my_keypair), + Arc::new(vote_account_keypair), Arc::new(bank), Arc::new(RwLock::new(cluster_info_me)), entry_receiver, exit.clone(), - initial_tick_height, initial_entry_len, - leader_scheduler.clone(), ); // Send enough ticks to trigger leader rotation @@ -375,13 +349,6 @@ mod test { assert_eq!(exit.load(Ordering::Relaxed), true); - // Check ledger height is correct - let mut leader_scheduler = Arc::try_unwrap(leader_scheduler) - .expect("Multiple references to this RwLock still exist") - .into_inner() - .expect("RwLock for LeaderScheduler is still locked"); - - leader_scheduler.reset(); let _ignored = remove_dir_all(&my_ledger_path); } } diff --git a/src/replicator.rs b/src/replicator.rs index fa51dd3a15..ac53ce737c 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -198,14 +198,16 @@ mod tests { let (mint, leader_ledger_path) = create_tmp_genesis(leader_ledger_path, 100); info!("starting leader node"); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let network_addr = leader_node.sockets.gossip.local_addr().unwrap(); let leader_info = leader_node.info.clone(); + let vote_account_keypair = Arc::new(Keypair::new()); let leader = Fullnode::new( leader_node, &leader_ledger_path, leader_keypair, + vote_account_keypair, None, false, LeaderScheduler::from_bootstrap_leader(leader_info.id), diff --git a/src/result.rs b/src/result.rs index c66d236596..bb63cae72a 100644 --- a/src/result.rs +++ b/src/result.rs @@ -10,6 +10,7 @@ use poh_recorder; use serde_json; use std; use std::any::Any; +use vote_stage; #[derive(Debug)] pub enum Error { @@ -27,6 +28,7 @@ pub enum Error { ErasureError(erasure::ErasureError), SendError, PohRecorderError(poh_recorder::PohRecorderError), + VoteError(vote_stage::VoteError), } pub type Result = std::result::Result; @@ -100,6 +102,11 @@ impl std::convert::From for Error { Error::PohRecorderError(e) } } +impl std::convert::From for Error { + fn from(e: vote_stage::VoteError) -> Error { + Error::VoteError(e) + } +} #[cfg(test)] mod tests { diff --git a/src/rpc.rs b/src/rpc.rs index c898bbec4b..7ba02da438 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -28,6 +28,7 @@ pub const RPC_PORT: u16 = 8899; pub struct JsonRpcService { thread_hdl: JoinHandle<()>, + exit: Arc, } impl JsonRpcService { @@ -35,11 +36,12 @@ impl JsonRpcService { bank: &Arc, cluster_info: &Arc>, rpc_addr: SocketAddr, - exit: Arc, ) -> Self { + let exit = Arc::new(AtomicBool::new(false)); let request_processor = JsonRpcRequestProcessor::new(bank.clone()); let info = cluster_info.clone(); let exit_pubsub = exit.clone(); + let exit_ = exit.clone(); let thread_hdl = Builder::new() .name("solana-jsonrpc".to_string()) .spawn(move || { @@ -62,14 +64,23 @@ impl JsonRpcService { warn!("JSON RPC service unavailable: unable to bind to RPC port {}. \nMake sure this port is not already in use by another application", rpc_addr.port()); return; } - while !exit.load(Ordering::Relaxed) { + while !exit_.load(Ordering::Relaxed) { sleep(Duration::from_millis(100)); } server.unwrap().close(); () }) .unwrap(); - JsonRpcService { thread_hdl } + JsonRpcService { thread_hdl, exit } + } + + pub fn exit(&self) { + self.exit.store(true, Ordering::Relaxed); + } + + pub fn close(self) -> thread::Result<()> { + self.exit(); + self.join() } } @@ -379,8 +390,7 @@ mod tests { ClusterInfo::new(NodeInfo::new_unspecified()).unwrap(), )); let rpc_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 24680); - let exit = Arc::new(AtomicBool::new(false)); - let rpc_service = JsonRpcService::new(&Arc::new(bank), &cluster_info, rpc_addr, exit); + let rpc_service = JsonRpcService::new(&Arc::new(bank), &cluster_info, rpc_addr); let thread = rpc_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); @@ -586,11 +596,11 @@ mod tests { #[test] fn test_rpc_send_tx() { - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("rpc_send_tx", &alice); @@ -602,8 +612,16 @@ mod tests { let genesis_entries = &alice.create_entries(); let entry_height = genesis_entries.len() as u64; + + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, entry_height, @@ -612,7 +630,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); diff --git a/src/rpc_pubsub.rs b/src/rpc_pubsub.rs index f5de442e9e..4a920e1b1e 100644 --- a/src/rpc_pubsub.rs +++ b/src/rpc_pubsub.rs @@ -27,6 +27,7 @@ pub enum ClientState { pub struct PubSubService { thread_hdl: JoinHandle<()>, + exit: Arc, } impl Service for PubSubService { @@ -38,8 +39,10 @@ impl Service for PubSubService { } impl PubSubService { - pub fn new(bank: &Arc, pubsub_addr: SocketAddr, exit: Arc) -> Self { + pub fn new(bank: &Arc, pubsub_addr: SocketAddr) -> Self { let rpc = RpcSolPubSubImpl::new(JsonRpcRequestProcessor::new(bank.clone()), bank.clone()); + let exit = Arc::new(AtomicBool::new(false)); + let exit_ = exit.clone(); let thread_hdl = Builder::new() .name("solana-pubsub".to_string()) .spawn(move || { @@ -60,14 +63,23 @@ impl PubSubService { warn!("Pubsub service unavailable: unable to bind to port {}. \nMake sure this port is not already in use by another application", pubsub_addr.port()); return; } - while !exit.load(Ordering::Relaxed) { + while !exit_.load(Ordering::Relaxed) { sleep(Duration::from_millis(100)); } server.unwrap().close(); () }) .unwrap(); - PubSubService { thread_hdl } + PubSubService { thread_hdl, exit } + } + + pub fn exit(&self) { + self.exit.store(true, Ordering::Relaxed); + } + + pub fn close(self) -> thread::Result<()> { + self.exit(); + self.join() } } @@ -249,8 +261,7 @@ mod tests { let alice = Mint::new(10_000); let bank = Bank::new(&alice); let pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0); - let exit = Arc::new(AtomicBool::new(false)); - let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr, exit); + let pubsub_service = PubSubService::new(&Arc::new(bank), pubsub_addr); let thread = pubsub_service.thread_hdl.thread(); assert_eq!(thread.name().unwrap(), "solana-pubsub"); } diff --git a/src/thin_client.rs b/src/thin_client.rs index fa8f1b8294..4feb155de5 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -26,6 +26,7 @@ use std::time::Instant; use system_transaction::SystemTransaction; use timing; use transaction::Transaction; +use vote_transaction::VoteTransaction; use influx_db_client as influxdb; use metrics; @@ -148,6 +149,29 @@ impl ThinClient { )) } + pub fn create_vote_account( + &self, + node_keypair: &Keypair, + vote_account_id: Pubkey, + last_id: &Hash, + num_tokens: i64, + ) -> io::Result { + let tx = + Transaction::vote_account_new(&node_keypair, vote_account_id, *last_id, num_tokens); + self.transfer_signed(&tx) + } + + /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. + pub fn register_vote_account( + &self, + node_keypair: &Keypair, + vote_account_id: Pubkey, + last_id: &Hash, + ) -> io::Result { + let tx = Transaction::vote_account_register(node_keypair, vote_account_id, *last_id, 0); + self.transfer_signed(&tx) + } + /// Creates, signs, and processes a Transaction. Useful for writing unit-tests. pub fn transfer( &self, @@ -170,6 +194,24 @@ impl ThinClient { result } + pub fn get_account_userdata(&mut self, pubkey: &Pubkey) -> io::Result>> { + let req = Request::GetAccount { key: *pubkey }; + let data = serialize(&req).expect("serialize GetAccount in pub fn get_account_userdata"); + self.requests_socket + .send_to(&data, &self.requests_addr) + .expect("buffer error in pub fn get_account_userdata"); + + loop { + let resp = self.recv_response()?; + trace!("recv_response {:?}", resp); + if let Response::Account { key, account } = resp { + if key == *pubkey { + return Ok(account.map(|account| account.userdata)); + } + } + } + } + /// Request the balance of the user holding `pubkey`. This method blocks /// until the server sends a response. If the response packet is dropped /// by the network, this method will hang indefinitely. @@ -446,17 +488,23 @@ mod tests { #[ignore] fn test_thin_client() { logger::setup(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let alice = Mint::new(10_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let ledger_path = create_tmp_ledger_with_mint("thin_client", &alice); + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, 0, @@ -465,7 +513,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); sleep(Duration::from_millis(900)); @@ -495,16 +542,22 @@ mod tests { #[ignore] fn test_bad_sig() { logger::setup(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("bad_sig", &alice); + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, 0, @@ -513,7 +566,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); //TODO: remove this sleep, or add a retry so CI is stable @@ -556,18 +608,25 @@ mod tests { #[test] fn test_client_check_signature() { logger::setup(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("client_check_signature", &alice); let genesis_entries = &alice.create_entries(); let entry_height = genesis_entries.len() as u64; + + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, entry_height, @@ -576,7 +635,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); sleep(Duration::from_millis(300)); @@ -620,18 +678,25 @@ mod tests { #[test] fn test_zero_balance_after_nonzero() { logger::setup(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let alice = Mint::new(10_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_keypair = Keypair::new(); let leader_data = leader.info.clone(); let ledger_path = create_tmp_ledger_with_mint("zero_balance_check", &alice); let genesis_entries = &alice.create_entries(); let entry_height = genesis_entries.len() as u64; + + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, entry_height, @@ -640,7 +705,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); sleep(Duration::from_millis(900)); diff --git a/src/tpu.rs b/src/tpu.rs index 66fe9a902d..531be39591 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -27,21 +27,18 @@ use bank::Bank; use banking_stage::{BankingStage, BankingStageReturnType}; -use cluster_info::ClusterInfo; use entry::Entry; use fetch_stage::FetchStage; use hash::Hash; -use leader_vote_stage::LeaderVoteStage; use ledger_write_stage::LedgerWriteStage; use poh_service::Config; use service::Service; -use signature::Keypair; use sigverify_stage::SigVerifyStage; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::mpsc::Receiver; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::thread; pub enum TpuReturnType { @@ -52,7 +49,6 @@ pub struct Tpu { fetch_stage: FetchStage, sigverify_stage: SigVerifyStage, banking_stage: BankingStage, - leader_vote_stage: LeaderVoteStage, ledger_write_stage: LedgerWriteStage, exit: Arc, } @@ -60,9 +56,7 @@ pub struct Tpu { impl Tpu { #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new( - keypair: Arc, bank: &Arc, - cluster_info: &Arc>, tick_duration: Config, transactions_sockets: Vec, ledger_path: &str, @@ -87,28 +81,21 @@ impl Tpu { max_tick_height, ); - let (leader_vote_stage, ledger_entry_receiver) = - LeaderVoteStage::new(keypair, bank.clone(), cluster_info.clone(), entry_receiver); - let (ledger_entry_sender, entry_forwarder) = channel(); - let ledger_write_stage = LedgerWriteStage::new( - Some(ledger_path), - ledger_entry_receiver, - Some(ledger_entry_sender), - ); + let ledger_write_stage = + LedgerWriteStage::new(Some(ledger_path), entry_receiver, Some(ledger_entry_sender)); let tpu = Tpu { fetch_stage, sigverify_stage, banking_stage, - leader_vote_stage, ledger_write_stage, exit: exit.clone(), }; (tpu, entry_forwarder, exit) } - pub fn exit(&self) -> () { + pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); } @@ -128,7 +115,6 @@ impl Service for Tpu { fn join(self) -> thread::Result<(Option)> { self.fetch_stage.join()?; self.sigverify_stage.join()?; - self.leader_vote_stage.join()?; self.ledger_write_stage.join()?; match self.banking_stage.join()? { Some(BankingStageReturnType::LeaderRotation) => Ok(Some(TpuReturnType::LeaderRotation)), diff --git a/src/tvu.rs b/src/tvu.rs index 950ccca04e..d13b2ed70a 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -40,7 +40,6 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use cluster_info::ClusterInfo; use hash::Hash; -use leader_scheduler::LeaderScheduler; use ledger_write_stage::LedgerWriteStage; use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; use retransmit_stage::RetransmitStage; @@ -80,8 +79,8 @@ impl Tvu { #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new( keypair: Arc, + vote_account_keypair: Arc, bank: &Arc, - tick_height: u64, entry_height: u64, cluster_info: Arc>, window: SharedWindow, @@ -89,7 +88,6 @@ impl Tvu { repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, - leader_scheduler: Arc>, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); @@ -105,23 +103,22 @@ impl Tvu { let (retransmit_stage, blob_window_receiver) = RetransmitStage::new( &cluster_info, window, - tick_height, + bank.get_tick_height(), entry_height, Arc::new(retransmit_socket), repair_socket, blob_fetch_receiver, - leader_scheduler.clone(), + bank.leader_scheduler.clone(), ); let (replicate_stage, ledger_entry_receiver) = ReplicateStage::new( keypair, + vote_account_keypair, bank.clone(), cluster_info, blob_window_receiver, exit.clone(), - tick_height, entry_height, - leader_scheduler, ); let ledger_write_stage = LedgerWriteStage::new(ledger_path, ledger_entry_receiver, None); @@ -139,7 +136,7 @@ impl Tvu { self.exit.load(Ordering::Relaxed) } - pub fn exit(&self) -> () { + pub fn exit(&self) { self.exit.store(true, Ordering::Relaxed); } @@ -255,7 +252,12 @@ pub mod tests { let starting_balance = 10_000; let mint = Mint::new(starting_balance); let replicate_addr = target1.info.contact_info.tvu; - let bank = Arc::new(Bank::new(&mint)); + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_id, + ))); + let mut bank = Bank::new(&mint); + bank.leader_scheduler = leader_scheduler; + let bank = Arc::new(bank); //start cluster_info1 let mut cluster_info1 = ClusterInfo::new(target1.info.clone()).expect("ClusterInfo::new"); @@ -264,20 +266,18 @@ pub mod tests { let cref1 = Arc::new(RwLock::new(cluster_info1)); let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()); + let vote_account_keypair = Arc::new(Keypair::new()); let tvu = Tvu::new( Arc::new(target1_keypair), + vote_account_keypair, &bank, 0, - 0, cref1, dr_1.1, target1.sockets.replicate, target1.sockets.repair, target1.sockets.retransmit, None, - Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( - leader_id, - ))), ); let mut alice_ref_balance = starting_balance; diff --git a/src/vote_program.rs b/src/vote_program.rs new file mode 100644 index 0000000000..40e0881688 --- /dev/null +++ b/src/vote_program.rs @@ -0,0 +1,151 @@ +//! Vote program +//! Receive and processes votes from validators + +use bincode::{deserialize, serialize}; +use byteorder::{ByteOrder, LittleEndian}; +use solana_sdk::account::Account; +use solana_sdk::pubkey::Pubkey; +use std; +use std::collections::VecDeque; +use transaction::Transaction; + +// Upper limit on the size of the Vote State +pub const MAX_STATE_SIZE: usize = 1024; + +// Maximum number of votes to keep around +const MAX_VOTE_HISTORY: usize = 32; + +#[derive(Debug, PartialEq)] +pub enum Error { + UserdataDeserializeFailure, + InvalidArguments, + InvalidUserdata, + UserdataTooSmall, +} +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "error") + } +} +pub type Result = std::result::Result; + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub struct Vote { + // TODO: add signature of the state here as well + /// A vote for height tick_height + pub tick_height: u64, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub enum VoteInstruction { + /// Register a new "vote account" to represent a particular validator in the Vote Contract, + /// and initialize the VoteState for this "vote account" + /// * Transaction::keys[0] - the validator id + /// * Transaction::keys[1] - the new "vote account" to be associated with the validator + /// identified by keys[0] for voting + RegisterAccount, + NewVote(Vote), +} + +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct VoteProgram { + pub votes: VecDeque, + pub node_id: Pubkey, +} + +pub const VOTE_PROGRAM_ID: [u8; 32] = [ + 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, +]; + +impl VoteProgram { + pub fn check_id(program_id: &Pubkey) -> bool { + program_id.as_ref() == VOTE_PROGRAM_ID + } + + pub fn id() -> Pubkey { + Pubkey::new(&VOTE_PROGRAM_ID) + } + + pub fn deserialize(input: &[u8]) -> Result { + let len = LittleEndian::read_u16(&input[0..2]) as usize; + + if len == 0 || input.len() < len + 1 { + Err(Error::InvalidUserdata) + } else { + deserialize(&input[2..=len + 1]).map_err(|err| { + error!("Unable to deserialize vote state: {:?}", err); + Error::InvalidUserdata + }) + } + } + + pub fn serialize(self: &VoteProgram, output: &mut [u8]) -> Result<()> { + let self_serialized = serialize(self).unwrap(); + + if output.len() + 2 < self_serialized.len() { + warn!( + "{} bytes required to serialize but only have {} bytes", + self_serialized.len(), + output.len() + 2, + ); + return Err(Error::UserdataTooSmall); + } + + let serialized_len = self_serialized.len() as u16; + LittleEndian::write_u16(&mut output[0..2], serialized_len); + output[2..=serialized_len as usize + 1].clone_from_slice(&self_serialized); + Ok(()) + } + + pub fn process_transaction( + tx: &Transaction, + instruction_index: usize, + accounts: &mut [&mut Account], + ) -> Result<()> { + match deserialize(tx.userdata(instruction_index)) { + Ok(VoteInstruction::RegisterAccount) => { + // TODO: a single validator could register multiple "vote accounts" + // which would clutter the "accounts" structure. + accounts[1].program_id = Self::id(); + + let mut vote_state = VoteProgram { + votes: VecDeque::new(), + node_id: *tx.from(), + }; + + vote_state.serialize(&mut accounts[1].userdata)?; + + Ok(()) + } + Ok(VoteInstruction::NewVote(vote)) => { + if !Self::check_id(&accounts[0].program_id) { + error!("accounts[0] is not assigned to the VOTE_PROGRAM"); + Err(Error::InvalidArguments)?; + } + + let mut vote_state = Self::deserialize(&accounts[0].userdata)?; + + // TODO: Integrity checks + // a) Verify the vote's bank hash matches what is expected + // b) Verify vote is older than previous votes + + // Only keep around the most recent MAX_VOTE_HISTORY votes + if vote_state.votes.len() == MAX_VOTE_HISTORY { + vote_state.votes.pop_front(); + } + + vote_state.votes.push_back(vote); + vote_state.serialize(&mut accounts[0].userdata)?; + + Ok(()) + } + Err(_) => { + info!( + "Invalid vote transaction userdata: {:?}", + tx.userdata(instruction_index) + ); + Err(Error::UserdataDeserializeFailure) + } + } + } +} diff --git a/src/vote_stage.rs b/src/vote_stage.rs index ce07fb11e5..eee722b980 100644 --- a/src/vote_stage.rs +++ b/src/vote_stage.rs @@ -2,341 +2,90 @@ use bank::Bank; use bincode::serialize; -use budget_transaction::BudgetTransaction; use cluster_info::ClusterInfo; use counter::Counter; use hash::Hash; -use influx_db_client as influxdb; use log::Level; -use metrics; use packet::SharedBlob; -use result::Result; +use result::{Error, Result}; use signature::Keypair; -use solana_sdk::pubkey::Pubkey; -use std::result; +use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; use std::sync::{Arc, RwLock}; use streamer::BlobSender; -use timing; use transaction::Transaction; +use vote_program::Vote; +use vote_transaction::VoteTransaction; pub const VOTE_TIMEOUT_MS: u64 = 1000; #[derive(Debug, PartialEq, Eq)] -enum VoteError { +pub enum VoteError { NoValidLastIdsToVoteOn, + NoLeader, + LeaderInfoNotFound, } +// TODO: Change voting to be on fixed tick intervals based on bank state pub fn create_new_signed_vote_blob( last_id: &Hash, - keypair: &Keypair, + vote_account: &Keypair, + bank: &Arc, cluster_info: &Arc>, ) -> Result { let shared_blob = SharedBlob::default(); - let (vote, addr) = { - let mut wcluster_info = cluster_info.write().unwrap(); - //TODO: doesn't seem like there is a synchronous call to get height and id - debug!("voting on {:?}", &last_id.as_ref()[..8]); - wcluster_info.new_vote(*last_id) - }?; - let tx = Transaction::budget_new_vote(&keypair, vote, *last_id, 0); + let tick_height = bank.get_tick_height(); + + let leader_tpu = get_leader_tpu(bank, cluster_info)?; + //TODO: doesn't seem like there is a synchronous call to get height and id + debug!("voting on {:?}", &last_id.as_ref()[..8]); + let vote = Vote { tick_height }; + let tx = Transaction::vote_new(&vote_account, vote, *last_id, 0); { let mut blob = shared_blob.write().unwrap(); let bytes = serialize(&tx)?; let len = bytes.len(); blob.data[..len].copy_from_slice(&bytes); - blob.meta.set_addr(&addr); + blob.meta.set_addr(&leader_tpu); blob.meta.size = len; - } + }; + Ok(shared_blob) } -fn get_last_id_to_vote_on( - id: &Pubkey, - ids: &[Hash], - bank: &Arc, - now: u64, - last_vote: &mut u64, - last_valid_validator_timestamp: &mut u64, -) -> result::Result<(Hash, u64), VoteError> { - let mut valid_ids = bank.count_valid_ids(&ids); - let super_majority_index = (2 * ids.len()) / 3; - - //TODO(anatoly): this isn't stake based voting - debug!( - "{}: valid_ids {}/{} {}", - id, - valid_ids.len(), - ids.len(), - super_majority_index, - ); - - metrics::submit( - influxdb::Point::new("vote_stage-peer_count") - .add_field("total_peers", influxdb::Value::Integer(ids.len() as i64)) - .add_field( - "valid_peers", - influxdb::Value::Integer(valid_ids.len() as i64), - ).to_owned(), - ); - - if valid_ids.len() > super_majority_index { - *last_vote = now; - - // Sort by timestamp - valid_ids.sort_by(|a, b| a.1.cmp(&b.1)); - - let last_id = ids[valid_ids[super_majority_index].0]; - return Ok((last_id, valid_ids[super_majority_index].1)); - } - - if *last_valid_validator_timestamp != 0 { - metrics::submit( - influxdb::Point::new(&"leader-finality") - .add_field( - "duration_ms", - influxdb::Value::Integer((now - *last_valid_validator_timestamp) as i64), - ).to_owned(), - ); - } - - Err(VoteError::NoValidLastIdsToVoteOn) -} - -pub fn send_leader_vote( - id: &Pubkey, - keypair: &Keypair, - bank: &Arc, - cluster_info: &Arc>, - vote_blob_sender: &BlobSender, - last_vote: &mut u64, - last_valid_validator_timestamp: &mut u64, -) -> Result<()> { - let now = timing::timestamp(); - if now - *last_vote > VOTE_TIMEOUT_MS { - let ids: Vec<_> = cluster_info.read().unwrap().valid_last_ids(); - if let Ok((last_id, super_majority_timestamp)) = get_last_id_to_vote_on( - id, - &ids, - bank, - now, - last_vote, - last_valid_validator_timestamp, - ) { - if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, cluster_info) { - vote_blob_sender.send(vec![shared_blob])?; - let finality_ms = now - super_majority_timestamp; - - *last_valid_validator_timestamp = super_majority_timestamp; - debug!("{} leader_sent_vote finality: {} ms", id, finality_ms); - inc_new_counter_info!("vote_stage-leader_sent_vote", 1); - - bank.set_finality((now - *last_valid_validator_timestamp) as usize); - - metrics::submit( - influxdb::Point::new(&"leader-finality") - .add_field("duration_ms", influxdb::Value::Integer(finality_ms as i64)) - .to_owned(), - ); - } +fn get_leader_tpu(bank: &Bank, cluster_info: &Arc>) -> Result { + let leader_id = { + if let Some(leader_id) = bank.get_current_leader() { + leader_id + } else { + return Err(Error::VoteError(VoteError::NoLeader)); } + }; + + let rcluster_info = cluster_info.read().unwrap(); + let leader_tpu = rcluster_info + .table + .get(&leader_id) + .map(|leader| leader.contact_info.tpu); + + if let Some(leader_tpu) = leader_tpu { + Ok(leader_tpu) + } else { + Err(Error::VoteError(VoteError::LeaderInfoNotFound)) } - Ok(()) } pub fn send_validator_vote( bank: &Arc, - keypair: &Arc, + vote_account: &Keypair, cluster_info: &Arc>, vote_blob_sender: &BlobSender, ) -> Result<()> { let last_id = bank.last_id(); - if let Ok(shared_blob) = create_new_signed_vote_blob(&last_id, keypair, cluster_info) { - inc_new_counter_info!("replicate-vote_sent", 1); - vote_blob_sender.send(vec![shared_blob])?; - } + let shared_blob = create_new_signed_vote_blob(&last_id, vote_account, bank, cluster_info)?; + inc_new_counter_info!("replicate-vote_sent", 1); + vote_blob_sender.send(vec![shared_blob])?; + Ok(()) } - -#[cfg(test)] -pub mod tests { - use super::*; - use bank::Bank; - use bincode::deserialize; - use budget_instruction::Vote; - use cluster_info::{ClusterInfo, NodeInfo}; - use entry::next_entry; - use hash::{hash, Hash}; - use logger; - use mint::Mint; - use std::sync::mpsc::channel; - use std::sync::{Arc, RwLock}; - use std::thread::sleep; - use std::time::Duration; - use system_transaction::SystemTransaction; - use transaction::Transaction; - - #[test] - fn test_send_leader_vote() { - logger::setup(); - - // create a mint/bank - let mint = Mint::new(1000); - let bank = Arc::new(Bank::new(&mint)); - let hash0 = Hash::default(); - - // get a non-default hash last_id - let entry = next_entry(&hash0, 1, vec![]); - bank.register_entry_id(&entry.id); - - // Create a leader - let leader_data = NodeInfo::new_with_socketaddr(&"127.0.0.1:1234".parse().unwrap()); - let leader_pubkey = leader_data.id.clone(); - let mut leader_cluster_info = ClusterInfo::new(leader_data).unwrap(); - - // give the leader some tokens - let give_leader_tokens_tx = - Transaction::system_new(&mint.keypair(), leader_pubkey.clone(), 100, entry.id); - bank.process_transaction(&give_leader_tokens_tx).unwrap(); - - leader_cluster_info.set_leader(leader_pubkey); - - // Insert 7 agreeing validators / 3 disagreeing - // and votes for new last_id - for i in 0..10 { - let mut validator = - NodeInfo::new_with_socketaddr(&format!("127.0.0.1:234{}", i).parse().unwrap()); - - let vote = Vote { - version: validator.version + 1, - contact_info_version: 1, - }; - - if i < 7 { - validator.ledger_state.last_id = entry.id; - } - - leader_cluster_info.insert(&validator); - trace!("validator id: {:?}", validator.id); - - leader_cluster_info.insert_vote(&validator.id, &vote, entry.id); - } - let leader = Arc::new(RwLock::new(leader_cluster_info)); - let (vote_blob_sender, vote_blob_receiver) = channel(); - let mut last_vote: u64 = timing::timestamp() - VOTE_TIMEOUT_MS - 1; - let mut last_valid_validator_timestamp = 0; - let res = send_leader_vote( - &mint.pubkey(), - &mint.keypair(), - &bank, - &leader, - &vote_blob_sender, - &mut last_vote, - &mut last_valid_validator_timestamp, - ); - trace!("vote result: {:?}", res); - assert!(res.is_ok()); - let vote_blob = vote_blob_receiver.recv_timeout(Duration::from_millis(500)); - trace!("vote_blob: {:?}", vote_blob); - - // leader shouldn't vote yet, not enough votes - assert!(vote_blob.is_err()); - - // add two more nodes and see that it succeeds - for i in 0..2 { - let mut validator = - NodeInfo::new_with_socketaddr(&format!("127.0.0.1:234{}", i).parse().unwrap()); - - let vote = Vote { - version: validator.version + 1, - contact_info_version: 1, - }; - - validator.ledger_state.last_id = entry.id; - - leader.write().unwrap().insert(&validator); - trace!("validator id: {:?}", validator.id); - - leader - .write() - .unwrap() - .insert_vote(&validator.id, &vote, entry.id); - } - - last_vote = timing::timestamp() - VOTE_TIMEOUT_MS - 1; - let res = send_leader_vote( - &Pubkey::default(), - &mint.keypair(), - &bank, - &leader, - &vote_blob_sender, - &mut last_vote, - &mut last_valid_validator_timestamp, - ); - trace!("vote result: {:?}", res); - assert!(res.is_ok()); - let vote_blob = vote_blob_receiver.recv_timeout(Duration::from_millis(500)); - trace!("vote_blob: {:?}", vote_blob); - - // leader should vote now - assert!(vote_blob.is_ok()); - - // vote should be valid - let blob = &vote_blob.unwrap()[0]; - let tx = deserialize(&(blob.read().unwrap().data)).unwrap(); - assert_eq!(bank.process_transaction(&tx), Ok(())); - } - - #[test] - fn test_get_last_id_to_vote_on() { - logger::setup(); - - let mint = Mint::new(1234); - let bank = Arc::new(Bank::new(&mint)); - let mut last_vote = 0; - let mut last_valid_validator_timestamp = 0; - - // generate 10 last_ids, register 6 with the bank - let ids: Vec<_> = (0..10) - .map(|i| { - let last_id = hash(&serialize(&i).unwrap()); // Unique hash - if i < 6 { - bank.register_entry_id(&last_id); - } - // sleep to get a different timestamp in the bank - sleep(Duration::from_millis(1)); - last_id - }).collect(); - - // see that we fail to have 2/3rds consensus - assert!( - get_last_id_to_vote_on( - &Pubkey::default(), - &ids, - &bank, - 0, - &mut last_vote, - &mut last_valid_validator_timestamp - ).is_err() - ); - - // register another, see passing - bank.register_entry_id(&ids[6]); - - let res = get_last_id_to_vote_on( - &Pubkey::default(), - &ids, - &bank, - 0, - &mut last_vote, - &mut last_valid_validator_timestamp, - ); - if let Ok((hash, timestamp)) = res { - assert!(hash == ids[6]); - assert!(timestamp != 0); - } else { - assert!(false, "get_last_id returned error!: {:?}", res); - } - } -} diff --git a/src/vote_transaction.rs b/src/vote_transaction.rs new file mode 100644 index 0000000000..fe5edb835a --- /dev/null +++ b/src/vote_transaction.rs @@ -0,0 +1,85 @@ +//! The `vote_transaction` module provides functionality for creating vote transactions. + +use bincode::{deserialize, serialize}; +use hash::Hash; +use signature::Keypair; +use solana_sdk::pubkey::Pubkey; +use system_transaction::SystemTransaction; +use transaction::Transaction; +use vote_program::{Vote, VoteInstruction, VoteProgram, MAX_STATE_SIZE}; + +pub trait VoteTransaction { + fn vote_new(vote_account: &Keypair, vote: Vote, last_id: Hash, fee: i64) -> Self; + fn vote_account_new( + validator_id: &Keypair, + new_vote_account_id: Pubkey, + last_id: Hash, + num_tokens: i64, + ) -> Self; + fn vote_account_register( + validator_id: &Keypair, + vote_account_id: Pubkey, + last_id: Hash, + fee: i64, + ) -> Self; + fn get_votes(&self) -> Vec<(Pubkey, Vote, Hash)>; +} + +impl VoteTransaction for Transaction { + fn vote_new(vote_account: &Keypair, vote: Vote, last_id: Hash, fee: i64) -> Self { + let instruction = VoteInstruction::NewVote(vote); + let userdata = serialize(&instruction).expect("serialize instruction"); + Transaction::new(vote_account, &[], VoteProgram::id(), userdata, last_id, fee) + } + + fn vote_account_new( + validator_id: &Keypair, + new_vote_account_id: Pubkey, + last_id: Hash, + num_tokens: i64, + ) -> Self { + Transaction::system_create( + validator_id, + new_vote_account_id, + last_id, + num_tokens, + MAX_STATE_SIZE as u64, + VoteProgram::id(), + 0, + ) + } + + fn vote_account_register( + validator_id: &Keypair, + vote_account_id: Pubkey, + last_id: Hash, + fee: i64, + ) -> Self { + let register_tx = VoteInstruction::RegisterAccount; + let userdata = serialize(®ister_tx).unwrap(); + Transaction::new( + validator_id, + &[vote_account_id], + VoteProgram::id(), + userdata, + last_id, + fee, + ) + } + + fn get_votes(&self) -> Vec<(Pubkey, Vote, Hash)> { + let mut votes = vec![]; + for i in 0..self.instructions.len() { + let tx_program_id = self.program_id(i); + if VoteProgram::check_id(&tx_program_id) { + if let Ok(Some(VoteInstruction::NewVote(vote))) = deserialize(&self.userdata(i)) { + votes.push((self.account_keys[0], vote, self.last_id)) + } + } + } + votes + } +} + +#[cfg(test)] +mod tests {} diff --git a/src/wallet.rs b/src/wallet.rs index af7abb6aeb..c82525d923 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -780,6 +780,7 @@ mod tests { use signature::{read_keypair, read_pkcs8, Keypair, KeypairUtil}; use std::fs::remove_dir_all; use std::sync::mpsc::channel; + use std::sync::{Arc, RwLock}; #[test] fn test_wallet_parse_command() { @@ -1074,11 +1075,11 @@ mod tests { #[ignore] fn test_wallet_process_command() { let (alice, ledger_path) = create_tmp_genesis("wallet_process_command", 10_000_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_data1 = leader.info.clone(); @@ -1086,8 +1087,14 @@ mod tests { let mut config = WalletConfig::default(); let rpc_port = 12345; // Needs to be distinct known number to not conflict with other tests + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, 0, @@ -1096,7 +1103,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1152,10 +1158,10 @@ mod tests { #[test] fn test_wallet_request_airdrop() { let (alice, ledger_path) = create_tmp_genesis("wallet_request_airdrop", 10_000_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); @@ -1163,8 +1169,15 @@ mod tests { let genesis_entries = &alice.create_entries(); let entry_height = genesis_entries.len() as u64; + + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, entry_height, @@ -1173,7 +1186,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1227,11 +1239,11 @@ mod tests { #[ignore] fn test_wallet_timestamp_tx() { let (alice, ledger_path) = create_tmp_genesis("wallet_timestamp_tx", 10_000_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_data1 = leader.info.clone(); @@ -1241,8 +1253,14 @@ mod tests { let mut config_witness = WalletConfig::default(); let rpc_port = 13579; // Needs to be distinct known number to not conflict with other tests + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, 0, @@ -1251,7 +1269,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1349,9 +1366,9 @@ mod tests { #[ignore] fn test_wallet_witness_tx() { let (alice, ledger_path) = create_tmp_genesis("wallet_witness_tx", 10_000_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_data1 = leader.info.clone(); @@ -1361,8 +1378,14 @@ mod tests { let mut config_witness = WalletConfig::default(); let rpc_port = 11223; // Needs to be distinct known number to not conflict with other tests + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, 0, @@ -1371,7 +1394,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1467,9 +1489,9 @@ mod tests { #[ignore] fn test_wallet_cancel_tx() { let (alice, ledger_path) = create_tmp_genesis("wallet_cancel_tx", 10_000_000); - let bank = Bank::new(&alice); + let mut bank = Bank::new(&alice); let bob_pubkey = Keypair::new().pubkey(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_data1 = leader.info.clone(); @@ -1479,8 +1501,14 @@ mod tests { let mut config_witness = WalletConfig::default(); let rpc_port = 13456; // Needs to be distinct known number to not conflict with other tests + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_data.id, + ))); + bank.leader_scheduler = leader_scheduler; + let vote_account_keypair = Arc::new(Keypair::new()); let server = Fullnode::new_with_bank( leader_keypair, + vote_account_keypair, bank, 0, 0, @@ -1489,7 +1517,6 @@ mod tests { None, &ledger_path, false, - LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); diff --git a/src/window.rs b/src/window.rs index d5f3c97c1e..970354afa9 100644 --- a/src/window.rs +++ b/src/window.rs @@ -127,8 +127,8 @@ impl WindowUtil for Window { // 2) We are on the border between seed_rotation_intervals, so the // schedule won't be known until the entry on that cusp is received // by the replicate stage (which comes after this stage). Hence, the next - // leader at the beginning of that next epoch will not know he is the - // leader until he receives that last "cusp" entry. He also won't ask for repairs + // leader at the beginning of that next epoch will not know they are the + // leader until they receive that last "cusp" entry. The leader also won't ask for repairs // for that entry because "is_next_leader" won't be set here. In this case, // everybody will be blocking waiting for that "cusp" entry instead of repairing, // until the leader hits "times" >= the max times in calculate_max_repair(). diff --git a/tests/multinode.rs b/tests/multinode.rs index 03606ff18c..a68d35ca05 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -109,7 +109,7 @@ fn make_tiny_test_entries(start_hash: Hash, num: usize) -> Vec { fn test_multi_node_ledger_window() -> result::Result<()> { logger::setup(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); @@ -136,6 +136,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { leader, &leader_ledger_path, leader_keypair, + Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -148,7 +149,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { // start up another validator from zero, converge and then check // balances - let keypair = Keypair::new(); + let keypair = Arc::new(Keypair::new()); let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.info.clone(); @@ -156,6 +157,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { validator, &zero_ledger_path, keypair, + Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -206,7 +208,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { logger::setup(); const N: usize = 5; trace!("test_multi_node_validator_catchup_from_zero"); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); @@ -227,6 +229,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { leader, &leader_ledger_path, leader_keypair, + Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -239,7 +242,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { let mut nodes = vec![server]; for _ in 0..N { - let keypair = Keypair::new(); + let keypair = Arc::new(Keypair::new()); let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let ledger_path = tmp_copy_ledger( @@ -258,6 +261,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { validator, &ledger_path, keypair, + Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -288,12 +292,13 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { success = 0; // start up another validator from zero, converge and then check everyone's // balances - let keypair = Keypair::new(); + let keypair = Arc::new(Keypair::new()); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let val = Fullnode::new( validator, &zero_ledger_path, keypair, + Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -347,7 +352,7 @@ fn test_multi_node_basic() { logger::setup(); const N: usize = 5; trace!("test_multi_node_basic"); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); @@ -360,6 +365,7 @@ fn test_multi_node_basic() { leader, &leader_ledger_path, leader_keypair, + Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -372,7 +378,7 @@ fn test_multi_node_basic() { let mut nodes = vec![server]; for _ in 0..N { - let keypair = Keypair::new(); + let keypair = Arc::new(Keypair::new()); let validator_pubkey = keypair.pubkey().clone(); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let ledger_path = tmp_copy_ledger(&leader_ledger_path, "multi_node_basic"); @@ -388,6 +394,7 @@ fn test_multi_node_basic() { validator, &ledger_path, keypair, + Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -426,7 +433,7 @@ fn test_multi_node_basic() { #[ignore] fn test_boot_validator_from_file() -> result::Result<()> { logger::setup(); - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = Keypair::new().pubkey(); @@ -439,6 +446,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { leader, &leader_ledger_path, leader_keypair, + Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -450,7 +458,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(1000)).unwrap(); assert_eq!(leader_balance, 1000); - let keypair = Keypair::new(); + let keypair = Arc::new(Keypair::new()); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.info.clone(); let ledger_path = tmp_copy_ledger(&leader_ledger_path, "boot_validator_from_file"); @@ -459,6 +467,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { validator, &ledger_path, keypair, + Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -477,13 +486,14 @@ fn test_boot_validator_from_file() -> result::Result<()> { } fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) { - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); let leader_fullnode = Fullnode::new( leader, &ledger_path, leader_keypair, + Arc::new(Keypair::new()), None, false, LeaderScheduler::from_bootstrap_leader(leader_data.id), @@ -532,7 +542,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { let (leader_data, leader_fullnode) = create_leader(&ledger_path); // start validator from old ledger - let keypair = Keypair::new(); + let keypair = Arc::new(Keypair::new()); let validator = Node::new_localhost_with_pubkey(keypair.pubkey()); let validator_data = validator.info.clone(); @@ -540,6 +550,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { validator, &stale_ledger_path, keypair, + Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), false, LeaderScheduler::from_bootstrap_leader(leader_data.id), @@ -588,7 +599,7 @@ fn test_multi_node_dynamic_network() { Err(_) => 120, }; - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader_pubkey = leader_keypair.pubkey().clone(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = Keypair::new().pubkey(); @@ -604,6 +615,7 @@ fn test_multi_node_dynamic_network() { leader, &leader_ledger_path, leader_keypair, + Arc::new(Keypair::new()), None, true, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -675,7 +687,8 @@ fn test_multi_node_dynamic_network() { let val = Fullnode::new( validator, &ledger_path, - keypair, + Arc::new(keypair), + Arc::new(Keypair::new()), Some(leader_data.contact_info.ncp), true, LeaderScheduler::from_bootstrap_leader(leader_pubkey), @@ -780,7 +793,7 @@ fn test_leader_to_validator_transition() { let validator_keypair = Keypair::new(); // Create the leader node information - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = leader_node.info.clone(); @@ -801,7 +814,7 @@ fn test_leader_to_validator_transition() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); - let bootstrap_entries = + let (bootstrap_entries, _) = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); ledger_writer.write_entries(&bootstrap_entries).unwrap(); @@ -819,6 +832,7 @@ fn test_leader_to_validator_transition() { leader_node, &leader_ledger_path, leader_keypair, + Arc::new(Keypair::new()), Some(leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -871,7 +885,7 @@ fn test_leader_to_validator_transition() { _ => panic!("Expected reason for exit to be leader rotation"), } - // Query newly transitioned validator to make sure that he has the proper balances in + // Query newly transitioned validator to make sure that they have the proper balances in // the after the transitions let mut leader_client = mk_client(&leader_info); @@ -883,8 +897,10 @@ fn test_leader_to_validator_transition() { // Check the ledger to make sure it's the right height, we should've // transitioned after tick_height == bootstrap_height - let (_, tick_height, _, _) = - Fullnode::new_bank_from_ledger(&leader_ledger_path, &mut LeaderScheduler::default()); + let (_, tick_height, _, _) = Fullnode::new_bank_from_ledger( + &leader_ledger_path, + Arc::new(RwLock::new(LeaderScheduler::default())), + ); assert_eq!(tick_height, bootstrap_height); @@ -903,12 +919,12 @@ fn test_leader_validator_basic() { let bob_pubkey = Keypair::new().pubkey(); // Create the leader node information - let leader_keypair = Keypair::new(); + let leader_keypair = Arc::new(Keypair::new()); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = leader_node.info.clone(); // Create the validator node information - let validator_keypair = Keypair::new(); + let validator_keypair = Arc::new(Keypair::new()); let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); // Make a common mint and a genesis entry for both leader + validator ledgers @@ -931,7 +947,7 @@ fn test_leader_validator_basic() { // Write the bootstrap entries to the ledger that will cause leader rotation // after the bootstrap height let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); - let active_set_entries = + let (active_set_entries, vote_account_keypair) = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0); ledger_writer.write_entries(&active_set_entries).unwrap(); @@ -946,21 +962,23 @@ fn test_leader_validator_basic() { Some(bootstrap_height), ); - // Start the leader fullnode - let mut leader = Fullnode::new( - leader_node, - &leader_ledger_path, - leader_keypair, - Some(leader_info.contact_info.ncp), - false, - LeaderScheduler::new(&leader_scheduler_config), - ); - // Start the validator node let mut validator = Fullnode::new( validator_node, &validator_ledger_path, validator_keypair, + Arc::new(vote_account_keypair), + Some(leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ); + + // Start the leader fullnode + let mut leader = Fullnode::new( + leader_node, + &leader_ledger_path, + leader_keypair, + Arc::new(Keypair::new()), Some(leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -999,7 +1017,7 @@ fn test_leader_validator_basic() { _ => panic!("Expected reason for exit to be leader rotation"), } - // Query newly transitioned validator to make sure that he has the proper balances + // Query newly transitioned validator to make sure they have the proper balances // in the bank after the transitions let mut leader_client = mk_client(&leader_info); @@ -1071,7 +1089,7 @@ fn test_dropped_handoff_recovery() { logger::setup(); // Create the bootstrap leader node information - let bootstrap_leader_keypair = Keypair::new(); + let bootstrap_leader_keypair = Arc::new(Keypair::new()); let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); let bootstrap_leader_info = bootstrap_leader_node.info.clone(); @@ -1086,17 +1104,17 @@ fn test_dropped_handoff_recovery() { .id; // Create the validator keypair that will be the next leader in line - let next_leader_keypair = Keypair::new(); + let next_leader_keypair = Arc::new(Keypair::new()); // Create a common ledger with entries in the beginning that will add only // the "next_leader" validator to the active set for leader election, guaranteeing - // he is the next leader after bootstrap_height + // they are the next leader after bootstrap_height let mut ledger_paths = Vec::new(); ledger_paths.push(bootstrap_leader_ledger_path.clone()); - // Make the entries to give the next_leader validator some stake so that he will be in + // Make the entries to give the next_leader validator some stake so that they will be in // leader election active set - let active_set_entries = + let (active_set_entries, vote_account_keypair) = make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id, &last_id, 0); // Write the entries @@ -1131,6 +1149,7 @@ fn test_dropped_handoff_recovery() { bootstrap_leader_node, &bootstrap_leader_ledger_path, bootstrap_leader_keypair, + Arc::new(Keypair::new()), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1140,7 +1159,7 @@ fn test_dropped_handoff_recovery() { // Start up the validators other than the "next_leader" validator for _ in 0..(N - 1) { - let kp = Keypair::new(); + let kp = Arc::new(Keypair::new()); let validator_ledger_path = tmp_copy_ledger( &bootstrap_leader_ledger_path, "test_dropped_handoff_recovery", @@ -1152,6 +1171,7 @@ fn test_dropped_handoff_recovery() { validator_node, &validator_ledger_path, kp, + Arc::new(Keypair::new()), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1176,6 +1196,7 @@ fn test_dropped_handoff_recovery() { next_leader_node, &next_leader_ledger_path, next_leader_keypair, + Arc::new(vote_account_keypair), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1247,10 +1268,11 @@ fn test_full_leader_validator_network() { let mut ledger_paths = Vec::new(); ledger_paths.push(bootstrap_leader_ledger_path.clone()); + let mut vote_account_keypairs = VecDeque::new(); for node_keypair in node_keypairs.iter() { - // Make entries to give the validator some stake so that he will be in + // Make entries to give each node some stake so that they will be in the // leader election active set - let bootstrap_entries = make_active_set_entries( + let (bootstrap_entries, vote_account_keypair) = make_active_set_entries( node_keypair, &mint.keypair(), &last_entry_id, @@ -1258,6 +1280,8 @@ fn test_full_leader_validator_network() { 0, ); + vote_account_keypairs.push_back(vote_account_keypair); + // Write the entries let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); last_entry_id = bootstrap_entries @@ -1286,7 +1310,8 @@ fn test_full_leader_validator_network() { let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new( bootstrap_leader_node, &bootstrap_leader_ledger_path, - node_keypairs.pop_front().unwrap(), + Arc::new(node_keypairs.pop_front().unwrap()), + Arc::new(vote_account_keypairs.pop_front().unwrap()), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1311,7 +1336,8 @@ fn test_full_leader_validator_network() { let validator = Arc::new(RwLock::new(Fullnode::new( validator_node, &validator_ledger_path, - kp, + Arc::new(kp), + Arc::new(vote_account_keypairs.pop_front().unwrap()), Some(bootstrap_leader_info.contact_info.ncp), false, LeaderScheduler::new(&leader_scheduler_config), @@ -1338,7 +1364,7 @@ fn test_full_leader_validator_network() { num_reached_target_height = 0; for n in nodes.iter() { let node_lock = n.read().unwrap(); - let ls_lock = &node_lock.leader_scheduler; + let ls_lock = node_lock.get_leader_scheduler(); if let Some(sh) = ls_lock.read().unwrap().last_seed_height { if sh >= target_height { num_reached_target_height += 1;