diff --git a/src/bank.rs b/src/bank.rs index 2d1ec5cf47..9be12cf325 100644 --- a/src/bank.rs +++ b/src/bank.rs @@ -6,12 +6,14 @@ use bincode::deserialize; use bincode::serialize; use budget_program::BudgetState; +use budget_transaction::BudgetTransaction; use counter::Counter; use dynamic_program::DynamicProgram; use entry::Entry; use hash::{hash, Hash}; use itertools::Itertools; use jsonrpc_macros::pubsub::Sink; +use leader_scheduler::LeaderScheduler; use ledger::Block; use log::Level; use mint::Mint; @@ -781,6 +783,22 @@ impl Bank { results } + pub fn process_entry_votes( + bank: &Bank, + entry: &Entry, + entry_height: u64, + leader_scheduler: &mut LeaderScheduler, + ) { + for tx in &entry.transactions { + if tx.vote().is_some() { + // Update the active set in the leader scheduler + leader_scheduler.push_vote(*tx.from(), entry_height); + } + } + + leader_scheduler.update_height(entry_height, bank); + } + pub fn process_entry(&self, entry: &Entry) -> Result<()> { if !entry.transactions.is_empty() { for result in self.process_transactions(&entry.transactions) { @@ -795,7 +813,7 @@ impl Bank { /// as we go. fn process_entries_tail( &self, - entries: Vec, + entries: &[Entry], tail: &mut Vec, tail_idx: &mut usize, ) -> Result { @@ -810,7 +828,7 @@ impl Bank { *tail_idx = (*tail_idx + 1) % WINDOW_SIZE as usize; entry_count += 1; - self.process_entry(&entry)?; + self.process_entry(entry)?; } Ok(entry_count) @@ -831,6 +849,7 @@ impl Bank { entries: I, tail: &mut Vec, tail_idx: &mut usize, + leader_scheduler: &mut LeaderScheduler, ) -> Result where I: IntoIterator, @@ -846,13 +865,30 @@ impl Bank { return Err(BankError::LedgerVerificationFailed); } id = block.last().unwrap().id; - entry_count += self.process_entries_tail(block, tail, tail_idx)?; + let tail_count = self.process_entries_tail(&block, tail, tail_idx)?; + + if !leader_scheduler.use_only_bootstrap_leader { + for (i, entry) in block.iter().enumerate() { + Self::process_entry_votes( + self, + &entry, + entry_count + i as u64 + 1, + leader_scheduler, + ); + } + } + + entry_count += tail_count; } Ok(entry_count) } /// Process a full ledger. - pub fn process_ledger(&self, entries: I) -> Result<(u64, Vec)> + pub fn process_ledger( + &self, + entries: I, + leader_scheduler: &mut LeaderScheduler, + ) -> Result<(u64, Vec)> where I: IntoIterator, { @@ -894,9 +930,15 @@ impl Bank { tail.push(entry0); tail.push(entry1); let mut tail_idx = 2; - let entry_count = self.process_blocks(entry1_id, entries, &mut tail, &mut tail_idx)?; + let entry_count = self.process_blocks( + entry1_id, + entries, + &mut tail, + &mut tail_idx, + leader_scheduler, + )?; - // check f we need to rotate tail + // check if we need to rotate tail if tail.len() == WINDOW_SIZE as usize { tail.rotate_left(tail_idx) } @@ -1069,6 +1111,13 @@ impl Bank { } subscriptions.remove(&signature); } + + #[cfg(test)] + // Used to access accounts for things like controlling stake to control + // the eligible set of nodes for leader selection + pub fn accounts(&self) -> &RwLock> { + &self.accounts + } } #[cfg(test)] @@ -1081,6 +1130,7 @@ mod tests { use entry_writer::{self, EntryWriter}; use hash::hash; use jsonrpc_macros::pubsub::{Subscriber, SubscriptionId}; + use leader_scheduler::LeaderScheduler; use ledger; use logger; use signature::Keypair; @@ -1429,7 +1479,8 @@ mod tests { let mint = Mint::new(1); let genesis = mint.create_entries(); let bank = Bank::default(); - bank.process_ledger(genesis).unwrap(); + bank.process_ledger(genesis, &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&mint.pubkey()), 1); } @@ -1487,7 +1538,9 @@ mod tests { let (ledger, pubkey) = create_sample_ledger(1); let (ledger, dup) = ledger.tee(); let bank = Bank::default(); - let (ledger_height, tail) = bank.process_ledger(ledger).unwrap(); + let (ledger_height, tail) = bank + .process_ledger(ledger, &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(ledger_height, 3); assert_eq!(tail.len(), 3); @@ -1509,7 +1562,9 @@ mod tests { for entry_count in window_size - 3..window_size + 2 { let (ledger, pubkey) = create_sample_ledger(entry_count); let bank = Bank::default(); - let (ledger_height, tail) = bank.process_ledger(ledger).unwrap(); + let (ledger_height, tail) = bank + .process_ledger(ledger, &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); assert_eq!(ledger_height, entry_count as u64 + 2); assert!(tail.len() <= window_size); @@ -1534,7 +1589,8 @@ mod tests { let ledger = to_file_iter(ledger); let bank = Bank::default(); - bank.process_ledger(ledger).unwrap(); + bank.process_ledger(ledger, &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&pubkey), 1); } @@ -1545,7 +1601,8 @@ mod tests { let block = to_file_iter(create_sample_block(&mint, 1)); let bank = Bank::default(); - bank.process_ledger(genesis.chain(block)).unwrap(); + bank.process_ledger(genesis.chain(block), &mut LeaderScheduler::default()) + .unwrap(); assert_eq!(bank.get_balance(&mint.pubkey()), 1); } @@ -1568,9 +1625,13 @@ mod tests { let ledger1 = create_sample_ledger_with_mint_and_keypairs(&mint, &keypairs); let bank0 = Bank::default(); - bank0.process_ledger(ledger0).unwrap(); + bank0 + .process_ledger(ledger0, &mut LeaderScheduler::default()) + .unwrap(); let bank1 = Bank::default(); - bank1.process_ledger(ledger1).unwrap(); + bank1 + .process_ledger(ledger1, &mut LeaderScheduler::default()) + .unwrap(); let initial_state = bank0.hash_internal_state(); diff --git a/src/banking_stage.rs b/src/banking_stage.rs index db6585738d..eb8ad66135 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -84,7 +84,6 @@ impl BankingStage { // Many banks that process transactions in parallel. let mut thread_hdls: Vec> = (0..NUM_THREADS) - .into_iter() .map(|_| { let thread_bank = bank.clone(); let thread_verified_receiver = shared_verified_receiver.clone(); diff --git a/src/bin/fullnode.rs b/src/bin/fullnode.rs index 9703e1e98f..b9427ef7c2 100644 --- a/src/bin/fullnode.rs +++ b/src/bin/fullnode.rs @@ -12,6 +12,7 @@ use solana::client::mk_client; use solana::cluster_info::Node; use solana::drone::DRONE_PORT; use solana::fullnode::{Config, Fullnode, FullnodeReturnType}; +use solana::leader_scheduler::LeaderScheduler; use solana::logger; use solana::metrics::set_panic_hook; use solana::signature::{Keypair, KeypairUtil}; @@ -83,9 +84,6 @@ fn main() -> () { let node_info = node.info.clone(); let pubkey = keypair.pubkey(); - let mut fullnode = Fullnode::new(node, ledger_path, keypair, network, false, None); - - // airdrop stuff, probably goes away at some point let leader = match network { Some(network) => { poll_gossip_for_leader(network, None).expect("can't find leader on network") @@ -93,6 +91,14 @@ fn main() -> () { None => node_info, }; + let mut fullnode = Fullnode::new( + node, + ledger_path, + keypair, + network, + false, + LeaderScheduler::from_bootstrap_leader(leader.id), + ); let mut client = mk_client(&leader); // TODO: maybe have the drone put itself in gossip somewhere instead of hardcoding? @@ -126,7 +132,8 @@ fn main() -> () { loop { let status = fullnode.handle_role_transition(); match status { - Ok(Some(FullnodeReturnType::LeaderRotation)) => (), + Ok(Some(FullnodeReturnType::LeaderToValidatorRotation)) => (), + Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)) => (), _ => { // Fullnode tpu/tvu exited for some unexpected // reason, so exit diff --git a/src/bin/ledger-tool.rs b/src/bin/ledger-tool.rs index 1af804ce47..a6e98a2828 100644 --- a/src/bin/ledger-tool.rs +++ b/src/bin/ledger-tool.rs @@ -5,6 +5,7 @@ extern crate solana; use clap::{App, Arg, SubCommand}; use solana::bank::Bank; +use solana::leader_scheduler::LeaderScheduler; use solana::ledger::{read_ledger, verify_ledger}; use solana::logger; use std::io::{stdout, Write}; @@ -116,8 +117,7 @@ fn main() { }; let genesis = genesis.take(2).map(|e| e.unwrap()); - - if let Err(e) = bank.process_ledger(genesis) { + if let Err(e) = bank.process_ledger(genesis, &mut LeaderScheduler::default()) { eprintln!("verify failed at genesis err: {:?}", e); if !matches.is_present("continue") { exit(1); diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 5bc9500d3d..31226cfc9c 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -5,6 +5,7 @@ use counter::Counter; use entry::Entry; #[cfg(feature = "erasure")] use erasure; +use leader_scheduler::LeaderScheduler; use ledger::Block; use log::Level; use packet::SharedBlobs; @@ -26,9 +27,9 @@ pub enum BroadcastStageReturnType { ChannelDisconnected, } +#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn broadcast( - cluster_info: &Arc>, - leader_rotation_interval: u64, + leader_scheduler: &Arc>, node_info: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -130,8 +131,7 @@ fn broadcast( // Send blobs out from the window ClusterInfo::broadcast( - cluster_info, - leader_rotation_interval, + &leader_scheduler, &node_info, &broadcast_table, &window, @@ -183,38 +183,18 @@ impl BroadcastStage { window: &SharedWindow, entry_height: u64, receiver: &Receiver>, + leader_scheduler: &Arc>, ) -> BroadcastStageReturnType { let mut transmit_index = WindowIndex { data: entry_height, coding: entry_height, }; let mut receive_index = entry_height; - let me; - let leader_rotation_interval; - { - let rcluster_info = cluster_info.read().unwrap(); - me = rcluster_info.my_data().clone(); - leader_rotation_interval = rcluster_info.get_leader_rotation_interval(); - } - + let me = cluster_info.read().unwrap().my_data().clone(); loop { - if transmit_index.data % (leader_rotation_interval as u64) == 0 { - let rcluster_info = cluster_info.read().unwrap(); - let my_id = rcluster_info.my_data().id; - match rcluster_info.get_scheduled_leader(transmit_index.data) { - Some(id) if id == my_id => (), - // If the leader stays in power for the next - // round as well, then we don't exit. Otherwise, exit. - _ => { - return BroadcastStageReturnType::LeaderRotation; - } - } - } - let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table(); if let Err(e) = broadcast( - cluster_info, - leader_rotation_interval, + leader_scheduler, &me, &broadcast_table, &window, @@ -259,13 +239,21 @@ impl BroadcastStage { window: SharedWindow, entry_height: u64, receiver: Receiver>, + leader_scheduler: Arc>, exit_sender: Arc, ) -> Self { let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { let _exit = Finalizer::new(exit_sender); - Self::run(&sock, &cluster_info, &window, entry_height, &receiver) + Self::run( + &sock, + &cluster_info, + &window, + entry_height, + &receiver, + &leader_scheduler, + ) }).unwrap(); BroadcastStage { thread_hdl } @@ -279,151 +267,3 @@ impl Service for BroadcastStage { self.thread_hdl.join() } } - -#[cfg(test)] -mod tests { - use broadcast_stage::{BroadcastStage, BroadcastStageReturnType}; - use cluster_info::{ClusterInfo, Node}; - use entry::Entry; - use ledger::next_entries_mut; - use mint::Mint; - use service::Service; - use signature::{Keypair, KeypairUtil}; - use solana_program_interface::pubkey::Pubkey; - use std::cmp; - use std::sync::atomic::AtomicBool; - use std::sync::mpsc::{channel, Sender}; - use std::sync::{Arc, RwLock}; - use window::{new_window_from_entries, SharedWindow}; - - struct DummyBroadcastStage { - my_id: Pubkey, - buddy_id: Pubkey, - broadcast_stage: BroadcastStage, - shared_window: SharedWindow, - entry_sender: Sender>, - cluster_info: Arc>, - entries: Vec, - } - - fn setup_dummy_broadcast_stage(leader_rotation_interval: u64) -> DummyBroadcastStage { - // Setup dummy leader info - let leader_keypair = Keypair::new(); - let my_id = leader_keypair.pubkey(); - let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - - // Give the leader somebody to broadcast to so he isn't lonely - let buddy_keypair = Keypair::new(); - let buddy_id = buddy_keypair.pubkey(); - let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey()); - - // Fill the cluster_info with the buddy's info - let mut cluster_info = - ClusterInfo::new(leader_info.info.clone()).expect("ClusterInfo::new"); - cluster_info.insert(&broadcast_buddy.info); - cluster_info.set_leader_rotation_interval(leader_rotation_interval); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - - // Make dummy initial entries - let mint = Mint::new(10000); - let entries = mint.create_entries(); - let entry_height = entries.len() as u64; - - // Setup a window - let window = new_window_from_entries(&entries, entry_height, &leader_info.info); - - let shared_window = Arc::new(RwLock::new(window)); - - let (entry_sender, entry_receiver) = channel(); - let exit_sender = Arc::new(AtomicBool::new(false)); - // Start up the broadcast stage - let broadcast_stage = BroadcastStage::new( - leader_info.sockets.broadcast, - cluster_info.clone(), - shared_window.clone(), - entry_height, - entry_receiver, - exit_sender, - ); - - DummyBroadcastStage { - my_id, - buddy_id, - broadcast_stage, - shared_window, - entry_sender, - cluster_info, - entries, - } - } - - fn find_highest_window_index(shared_window: &SharedWindow) -> u64 { - let window = shared_window.read().unwrap(); - window.iter().fold(0, |m, w_slot| { - if let Some(ref blob) = w_slot.data { - cmp::max(m, blob.read().unwrap().get_index().unwrap()) - } else { - m - } - }) - } - - #[test] - fn test_broadcast_stage_leader_rotation_exit() { - let leader_rotation_interval = 10; - let broadcast_info = setup_dummy_broadcast_stage(leader_rotation_interval); - { - let mut wcluster_info = broadcast_info.cluster_info.write().unwrap(); - // Set the leader for the next rotation to be myself - wcluster_info.set_scheduled_leader(leader_rotation_interval, broadcast_info.my_id); - } - - let genesis_len = broadcast_info.entries.len() as u64; - let mut last_id = broadcast_info - .entries - .last() - .expect("Ledger should not be empty") - .id; - let mut num_hashes = 0; - - // Input enough entries to make exactly leader_rotation_interval entries, which will - // trigger a check for leader rotation. Because the next scheduled leader - // is ourselves, we won't exit - for _ in genesis_len..leader_rotation_interval { - let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); - - broadcast_info.entry_sender.send(new_entry).unwrap(); - } - - // Set the scheduled next leader in the cluster_info to the other buddy on the network - broadcast_info - .cluster_info - .write() - .unwrap() - .set_scheduled_leader(2 * leader_rotation_interval, broadcast_info.buddy_id); - - // Input another leader_rotation_interval dummy entries, which will take us - // past the point of the leader rotation. The write_stage will see that - // it's no longer the leader after checking the cluster_info, and exit - for _ in 0..leader_rotation_interval { - let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); - - match broadcast_info.entry_sender.send(new_entry) { - // We disconnected, break out of loop and check the results - Err(_) => break, - _ => (), - }; - } - - // Make sure the threads closed cleanly - assert_eq!( - broadcast_info.broadcast_stage.join().unwrap(), - BroadcastStageReturnType::LeaderRotation - ); - - let highest_index = find_highest_window_index(&broadcast_info.shared_window); - // The blob index is zero indexed, so it will always be one behind the entry height - // which starts at one. - assert_eq!(highest_index, 2 * leader_rotation_interval - 1); - } -} diff --git a/src/cluster_info.rs b/src/cluster_info.rs index 79ccc52756..05fd4b5d4a 100644 --- a/src/cluster_info.rs +++ b/src/cluster_info.rs @@ -17,6 +17,7 @@ use budget_instruction::Vote; use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy}; use counter::Counter; use hash::Hash; +use leader_scheduler::LeaderScheduler; use ledger::LedgerWindow; use log::Level; use netutil::{bind_in_range, bind_to, multi_bind_in_range}; @@ -223,12 +224,6 @@ pub struct ClusterInfo { /// last time we heard from anyone getting a message fro this public key /// these are rumers and shouldn't be trusted directly external_liveness: HashMap>, - /// TODO: Clearly not the correct implementation of this, but a temporary abstraction - /// for testing - pub scheduled_leaders: HashMap, - // TODO: Is there a better way to do this? We didn't make this a constant because - // we want to be able to set it in integration tests so that the tests don't time out. - pub leader_rotation_interval: u64, } // TODO These messages should be signed, and go through the gpu pipeline for spam filtering @@ -260,8 +255,6 @@ impl ClusterInfo { external_liveness: HashMap::new(), id: node_info.id, update_index: 1, - scheduled_leaders: HashMap::new(), - leader_rotation_interval: 100, }; me.local.insert(node_info.id, me.update_index); me.table.insert(node_info.id, node_info); @@ -324,32 +317,10 @@ impl ClusterInfo { self.insert(&me); } - // TODO: Dummy leader scheduler, need to implement actual leader scheduling. - pub fn get_scheduled_leader(&self, entry_height: u64) -> Option { - match self.scheduled_leaders.get(&entry_height) { - Some(x) => Some(*x), - None => Some(self.my_data().leader_id), - } - } - - pub fn set_leader_rotation_interval(&mut self, leader_rotation_interval: u64) { - self.leader_rotation_interval = leader_rotation_interval; - } - - pub fn get_leader_rotation_interval(&self) -> u64 { - self.leader_rotation_interval - } - - // TODO: Dummy leader schedule setter, need to implement actual leader scheduling. - pub fn set_scheduled_leader(&mut self, entry_height: u64, new_leader_id: Pubkey) -> () { - self.scheduled_leaders.insert(entry_height, new_leader_id); - } - pub fn get_valid_peers(&self) -> Vec { let me = self.my_data().id; self.table .values() - .into_iter() .filter(|x| x.id != me) .filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu)) .cloned() @@ -515,9 +486,9 @@ impl ClusterInfo { /// broadcast messages from the leader to layer 1 nodes /// # Remarks /// We need to avoid having obj locked while doing any io, such as the `send_to` + #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn broadcast( - cluster_info: &Arc>, - leader_rotation_interval: u64, + leader_scheduler: &Arc>, me: &NodeInfo, broadcast_table: &[NodeInfo], window: &SharedWindow, @@ -562,11 +533,21 @@ impl ClusterInfo { // Make sure the next leader in line knows about the last entry before rotation // so he can initiate repairs if necessary let entry_height = idx + 1; - if entry_height % leader_rotation_interval == 0 { - let next_leader_id = cluster_info - .read() - .unwrap() - .get_scheduled_leader(entry_height); + + { + let ls_lock = leader_scheduler.read().unwrap(); + let next_leader_id = ls_lock.get_scheduled_leader(entry_height); + // In the case the next scheduled leader is None, then the write_stage moved + // the schedule too far ahead and we no longer are in the known window + // (will happen during calculation of the next set of slots every epoch or + // seed_rotation_interval heights when we move the window forward in the + // LeaderScheduler). For correctness, this is fine write_stage will never send + // blobs past the point of when this node should stop being leader, so we just + // continue broadcasting until we catch up to write_stage. The downside is we + // can't guarantee the current leader will broadcast the last entry to the next + // scheduled leader, so the next leader will have to rely on avalanche/repairs + // to get this last blob, which could cause slowdowns during leader handoffs. + // See corresponding issue for repairs in repair() function in window.rs. if next_leader_id.is_some() && next_leader_id != Some(me.id) { let info_result = broadcast_table .iter() @@ -842,8 +823,8 @@ impl ClusterInfo { blob_sender.send(vec![blob])?; Ok(()) } - /// TODO: This is obviously the wrong way to do this. Need to implement leader selection - fn top_leader(&self) -> Option { + + pub fn get_gossip_top_leader(&self) -> Option<&NodeInfo> { let mut table = HashMap::new(); let def = Pubkey::default(); let cur = self.table.values().filter(|x| x.leader_id != def); @@ -857,16 +838,12 @@ impl ClusterInfo { trace!("{}: sorted leaders {} votes: {}", self.id, x.0, x.1); } sorted.sort_by_key(|a| a.1); - sorted.last().map(|a| *a.0) - } + let top_leader = sorted.last().map(|a| *a.0); - /// TODO: This is obviously the wrong way to do this. Need to implement leader selection - /// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet - fn update_leader(&mut self) { - if let Some(leader_id) = self.top_leader() { - if self.my_data().leader_id != leader_id && self.table.get(&leader_id).is_some() { - self.set_leader(leader_id); - } + if let Some(l) = top_leader { + self.table.get(&l) + } else { + None } } @@ -936,7 +913,6 @@ impl ClusterInfo { obj.write().unwrap().purge(timestamp()); //TODO: possibly tune this parameter //we saw a deadlock passing an obj.read().unwrap().timeout into sleep - obj.write().unwrap().update_leader(); let elapsed = timestamp() - start; if GOSSIP_SLEEP_MILLIS > elapsed { let time_left = GOSSIP_SLEEP_MILLIS - elapsed; @@ -1842,32 +1818,6 @@ mod tests { assert_eq!(blob.get_id().unwrap(), id); } } - /// TODO: This is obviously the wrong way to do this. Need to implement leader selection, - /// delete this test after leader selection is correctly implemented - #[test] - fn test_update_leader() { - logger::setup(); - let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - let leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - let leader1 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234")); - let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new"); - assert_eq!(cluster_info.top_leader(), None); - cluster_info.set_leader(leader0.id); - assert_eq!(cluster_info.top_leader().unwrap(), leader0.id); - //add a bunch of nodes with a new leader - for _ in 0..10 { - let mut dum = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1234")); - dum.id = Keypair::new().pubkey(); - dum.leader_id = leader1.id; - cluster_info.insert(&dum); - } - assert_eq!(cluster_info.top_leader().unwrap(), leader1.id); - cluster_info.update_leader(); - assert_eq!(cluster_info.my_data().leader_id, leader0.id); - cluster_info.insert(&leader1); - cluster_info.update_leader(); - assert_eq!(cluster_info.my_data().leader_id, leader1.id); - } #[test] fn test_valid_last_ids() { diff --git a/src/drone.rs b/src/drone.rs index c57a761e47..ad67454107 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -227,6 +227,7 @@ mod tests { use cluster_info::Node; use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE}; use fullnode::Fullnode; + use leader_scheduler::LeaderScheduler; use logger; use mint::Mint; use netutil::get_ip_addr; @@ -338,7 +339,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); @@ -376,7 +377,14 @@ mod tests { let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); - let server = Fullnode::new(leader, &ledger_path, leader_keypair, None, false, None); + let server = Fullnode::new( + leader, + &ledger_path, + leader_keypair, + None, + false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), + ); let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket"); let transactions_socket = diff --git a/src/fullnode.rs b/src/fullnode.rs index 66eecf4506..91b2de6140 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -5,13 +5,13 @@ use broadcast_stage::BroadcastStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; use drone::DRONE_PORT; use entry::Entry; +use leader_scheduler::LeaderScheduler; use ledger::read_ledger; use ncp::Ncp; use rpc::{JsonRpcService, RPC_PORT}; use rpu::Rpu; use service::Service; use signature::{Keypair, KeypairUtil}; -use solana_program_interface::pubkey::Pubkey; use std::net::UdpSocket; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::atomic::{AtomicBool, Ordering}; @@ -45,6 +45,10 @@ impl LeaderServices { self.tpu.join() } + pub fn is_exited(&self) -> bool { + self.tpu.is_exited() + } + pub fn exit(&self) -> () { self.tpu.exit(); } @@ -63,17 +67,23 @@ impl ValidatorServices { self.tvu.join() } + pub fn is_exited(&self) -> bool { + self.tvu.is_exited() + } + pub fn exit(&self) -> () { self.tvu.exit() } } pub enum FullnodeReturnType { - LeaderRotation, + LeaderToValidatorRotation, + ValidatorToLeaderRotation, } pub struct Fullnode { pub node_role: Option, + pub leader_scheduler: Arc>, keypair: Arc, exit: Arc, rpu: Option, @@ -122,10 +132,11 @@ impl Fullnode { keypair: Keypair, leader_addr: Option, sigverify_disabled: bool, - leader_rotation_interval: Option, + mut leader_scheduler: LeaderScheduler, ) -> Self { info!("creating bank..."); - let (bank, entry_height, ledger_tail) = Self::new_bank_from_ledger(ledger_path); + let (bank, entry_height, ledger_tail) = + Self::new_bank_from_ledger(ledger_path, &mut leader_scheduler); info!("creating networking stack..."); let local_gossip_addr = node.sockets.gossip.local_addr().unwrap(); @@ -147,7 +158,7 @@ impl Fullnode { leader_info.as_ref(), ledger_path, sigverify_disabled, - leader_rotation_interval, + leader_scheduler, None, ); @@ -225,16 +236,13 @@ impl Fullnode { bank: Bank, entry_height: u64, ledger_tail: &[Entry], - mut node: Node, - leader_info: Option<&NodeInfo>, + node: Node, + bootstrap_leader_info_option: Option<&NodeInfo>, ledger_path: &str, sigverify_disabled: bool, - leader_rotation_interval: Option, + leader_scheduler: LeaderScheduler, rpc_port: Option, ) -> Self { - if leader_info.is_none() { - node.info.leader_id = node.info.id; - } let exit = Arc::new(AtomicBool::new(false)); let bank = Arc::new(bank); @@ -269,12 +277,9 @@ impl Fullnode { let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info); let shared_window = Arc::new(RwLock::new(window)); - - let mut cluster_info = ClusterInfo::new(node.info).expect("ClusterInfo::new"); - if let Some(interval) = leader_rotation_interval { - cluster_info.set_leader_rotation_interval(interval); - } - let cluster_info = Arc::new(RwLock::new(cluster_info)); + let cluster_info = Arc::new(RwLock::new( + ClusterInfo::new(node.info).expect("ClusterInfo::new"), + )); let ncp = Ncp::new( &cluster_info, @@ -284,70 +289,83 @@ impl Fullnode { exit.clone(), ); + let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); let keypair = Arc::new(keypair); - let node_role; - match leader_info { - Some(leader_info) => { - // Start in validator mode. - // TODO: let ClusterInfo get that data from the network? - cluster_info.write().unwrap().insert(leader_info); - let tvu = Tvu::new( - keypair.clone(), - &bank, - entry_height, - cluster_info.clone(), - shared_window.clone(), - node.sockets - .replicate - .iter() - .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) - .collect(), - node.sockets - .repair - .try_clone() - .expect("Failed to clone repair socket"), - node.sockets - .retransmit - .try_clone() - .expect("Failed to clone retransmit socket"), - Some(ledger_path), - ); - let validator_state = ValidatorServices::new(tvu); - node_role = Some(NodeRole::Validator(validator_state)); - } - None => { - // Start in leader mode. - let (tpu, entry_receiver, tpu_exit) = Tpu::new( - keypair.clone(), - &bank, - &cluster_info, - Default::default(), - node.sockets - .transaction - .iter() - .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) - .collect(), - ledger_path, - sigverify_disabled, - entry_height, - ); - let broadcast_stage = BroadcastStage::new( - node.sockets - .broadcast - .try_clone() - .expect("Failed to clone broadcast socket"), - cluster_info.clone(), - shared_window.clone(), - entry_height, - entry_receiver, - tpu_exit, - ); - let leader_state = LeaderServices::new(tpu, broadcast_stage); - node_role = Some(NodeRole::Leader(leader_state)); - } + // Insert the bootstrap leader info, should only be None if this node + // is the bootstrap leader + if let Some(bootstrap_leader_info) = bootstrap_leader_info_option { + cluster_info.write().unwrap().insert(bootstrap_leader_info); } + // Get the scheduled leader + let scheduled_leader = leader_scheduler + .read() + .unwrap() + .get_scheduled_leader(entry_height) + .expect("Leader not known after processing bank"); + + cluster_info.write().unwrap().set_leader(scheduled_leader); + let node_role = if scheduled_leader != keypair.pubkey() { + // Start in validator mode. + let tvu = Tvu::new( + keypair.clone(), + &bank, + entry_height, + cluster_info.clone(), + shared_window.clone(), + node.sockets + .replicate + .iter() + .map(|s| s.try_clone().expect("Failed to clone replicate sockets")) + .collect(), + node.sockets + .repair + .try_clone() + .expect("Failed to clone repair socket"), + node.sockets + .retransmit + .try_clone() + .expect("Failed to clone retransmit socket"), + Some(ledger_path), + leader_scheduler.clone(), + ); + let validator_state = ValidatorServices::new(tvu); + Some(NodeRole::Validator(validator_state)) + } else { + // Start in leader mode. + let (tpu, entry_receiver, tpu_exit) = Tpu::new( + keypair.clone(), + &bank, + &cluster_info, + Default::default(), + node.sockets + .transaction + .iter() + .map(|s| s.try_clone().expect("Failed to clone transaction sockets")) + .collect(), + ledger_path, + sigverify_disabled, + entry_height, + leader_scheduler.clone(), + ); + + let broadcast_stage = BroadcastStage::new( + node.sockets + .broadcast + .try_clone() + .expect("Failed to clone broadcast socket"), + cluster_info.clone(), + shared_window.clone(), + entry_height, + entry_receiver, + leader_scheduler.clone(), + tpu_exit, + ); + let leader_state = LeaderServices::new(tpu, broadcast_stage); + Some(NodeRole::Leader(leader_state)) + }; + Fullnode { keypair, cluster_info, @@ -367,25 +385,35 @@ impl Fullnode { broadcast_socket: node.sockets.broadcast, requests_socket: node.sockets.requests, respond_socket: node.sockets.respond, + leader_scheduler, } } fn leader_to_validator(&mut self) -> Result<()> { - // TODO: We can avoid building the bank again once RecordStage is - // integrated with BankingStage - let (bank, entry_height, _) = Self::new_bank_from_ledger(&self.ledger_path); - self.bank = Arc::new(bank); + let (scheduled_leader, entry_height) = { + let mut ls_lock = self.leader_scheduler.write().unwrap(); + // Clear the leader scheduler + ls_lock.reset(); - { - let mut wcluster_info = self.cluster_info.write().unwrap(); - let scheduled_leader = wcluster_info.get_scheduled_leader(entry_height); - match scheduled_leader { - //TODO: Handle the case where we don't know who the next - //scheduled leader is - None => (), - Some(leader_id) => wcluster_info.set_leader(leader_id), - } - } + // TODO: We can avoid building the bank again once RecordStage is + // integrated with BankingStage + let (bank, entry_height, _) = + Self::new_bank_from_ledger(&self.ledger_path, &mut *ls_lock); + + self.bank = Arc::new(bank); + + ( + ls_lock + .get_scheduled_leader(entry_height) + .expect("Scheduled leader should exist after rebuilding bank"), + entry_height, + ) + }; + + self.cluster_info + .write() + .unwrap() + .set_leader(scheduled_leader); // Make a new RPU to serve requests out of the new bank we've created // instead of the old one @@ -420,6 +448,7 @@ impl Fullnode { .try_clone() .expect("Failed to clone retransmit socket"), Some(&self.ledger_path), + self.leader_scheduler.clone(), ); let validator_state = ValidatorServices::new(tvu); self.node_role = Some(NodeRole::Validator(validator_state)); @@ -443,6 +472,7 @@ impl Fullnode { &self.ledger_path, self.sigverify_disabled, entry_height, + self.leader_scheduler.clone(), ); let broadcast_stage = BroadcastStage::new( @@ -453,26 +483,35 @@ impl Fullnode { self.shared_window.clone(), entry_height, blob_receiver, + self.leader_scheduler.clone(), tpu_exit, ); let leader_state = LeaderServices::new(tpu, broadcast_stage); self.node_role = Some(NodeRole::Leader(leader_state)); } + pub fn check_role_exited(&self) -> bool { + match self.node_role { + Some(NodeRole::Leader(ref leader_services)) => leader_services.is_exited(), + Some(NodeRole::Validator(ref validator_services)) => validator_services.is_exited(), + None => false, + } + } + pub fn handle_role_transition(&mut self) -> Result> { let node_role = self.node_role.take(); match node_role { Some(NodeRole::Leader(leader_services)) => match leader_services.join()? { Some(TpuReturnType::LeaderRotation) => { self.leader_to_validator()?; - Ok(Some(FullnodeReturnType::LeaderRotation)) + Ok(Some(FullnodeReturnType::LeaderToValidatorRotation)) } _ => Ok(None), }, Some(NodeRole::Validator(validator_services)) => match validator_services.join()? { Some(TvuReturnType::LeaderRotation(entry_height)) => { self.validator_to_leader(entry_height); - Ok(Some(FullnodeReturnType::LeaderRotation)) + Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)) } _ => Ok(None), }, @@ -498,22 +537,18 @@ impl Fullnode { self.join() } - // TODO: only used for testing, get rid of this once we have actual - // leader scheduling - pub fn set_scheduled_leader(&self, leader_id: Pubkey, entry_height: u64) { - self.cluster_info - .write() - .unwrap() - .set_scheduled_leader(entry_height, leader_id); - } - - fn new_bank_from_ledger(ledger_path: &str) -> (Bank, u64, Vec) { + pub fn new_bank_from_ledger( + ledger_path: &str, + leader_scheduler: &mut LeaderScheduler, + ) -> (Bank, u64, Vec) { let bank = Bank::new_default(false); let entries = read_ledger(ledger_path, true).expect("opening ledger"); let entries = entries .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); info!("processing ledger..."); - let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger"); + let (entry_height, ledger_tail) = bank + .process_ledger(entries, leader_scheduler) + .expect("process_ledger"); // entry_height is the network-wide agreed height of the ledger. // initialize it from the input ledger info!("processed {} ledger...", entry_height); @@ -534,12 +569,12 @@ impl Service for Fullnode { match self.node_role { Some(NodeRole::Validator(validator_service)) => { if let Some(TvuReturnType::LeaderRotation(_)) = validator_service.join()? { - return Ok(Some(FullnodeReturnType::LeaderRotation)); + return Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)); } } Some(NodeRole::Leader(leader_service)) => { if let Some(TpuReturnType::LeaderRotation) = leader_service.join()? { - return Ok(Some(FullnodeReturnType::LeaderRotation)); + return Ok(Some(FullnodeReturnType::LeaderToValidatorRotation)); } } _ => (), @@ -554,7 +589,8 @@ mod tests { use bank::Bank; use cluster_info::Node; use fullnode::{Fullnode, NodeRole, TvuReturnType}; - use ledger::genesis; + use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; + use ledger::{genesis, LedgerWriter}; use packet::make_consecutive_blobs; use service::Service; use signature::{Keypair, KeypairUtil}; @@ -581,7 +617,7 @@ mod tests { Some(&entry), &validator_ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(entry.id), Some(0), ); v.close().unwrap(); @@ -609,7 +645,7 @@ mod tests { Some(&entry), &validator_ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(entry.id), Some(0), ) }).collect(); @@ -627,6 +663,87 @@ mod tests { } } + #[test] + fn test_wrong_role_transition() { + // Create the leader node information + let bootstrap_leader_keypair = Keypair::new(); + let bootstrap_leader_node = + Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); + let bootstrap_leader_info = bootstrap_leader_node.info.clone(); + + // Create the validator node information + let validator_keypair = Keypair::new(); + let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); + + // Make a common mint and a genesis entry for both leader + validator's ledgers + let (mint, bootstrap_leader_ledger_path) = genesis("test_wrong_role_transition", 10_000); + + let genesis_entries = mint.create_entries(); + let last_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + + // Write the entries to the ledger that will cause leader rotation + // after the bootstrap height + let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); + let first_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id); + + let ledger_initial_len = (genesis_entries.len() + first_entries.len()) as u64; + ledger_writer.write_entries(first_entries).unwrap(); + + // Create the common leader scheduling configuration + let num_slots_per_epoch = 3; + let leader_rotation_interval = 5; + let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval; + + // Set the bootstrap height exactly the current ledger length, so that we can + // test if the bootstrap leader knows to immediately transition to a validator + // after parsing the ledger during startup + let bootstrap_height = ledger_initial_len; + let leader_scheduler_config = LeaderSchedulerConfig::new( + bootstrap_leader_info.id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(seed_rotation_interval), + Some(ledger_initial_len), + ); + + // Test that a node knows to transition to a validator based on parsing the ledger + let bootstrap_leader = Fullnode::new( + bootstrap_leader_node, + &bootstrap_leader_ledger_path, + bootstrap_leader_keypair, + Some(bootstrap_leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ); + + match bootstrap_leader.node_role { + Some(NodeRole::Validator(_)) => (), + _ => { + panic!("Expected bootstrap leader to be a validator"); + } + } + + // Test that a node knows to transition to a leader based on parsing the ledger + let validator = Fullnode::new( + validator_node, + &bootstrap_leader_ledger_path, + validator_keypair, + Some(bootstrap_leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ); + + match validator.node_role { + Some(NodeRole::Leader(_)) => (), + _ => { + panic!("Expected node to be the leader"); + } + } + } + #[test] fn test_validator_to_leader_transition() { // Make a leader identity @@ -635,29 +752,54 @@ mod tests { let leader_id = leader_node.info.id; let leader_ncp = leader_node.info.contact_info.ncp; - // Start the validator node - let leader_rotation_interval = 10; + // Create validator identity let (mint, validator_ledger_path) = genesis("test_validator_to_leader_transition", 10_000); let validator_keypair = Keypair::new(); let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); let validator_info = validator_node.info.clone(); + + let genesis_entries = mint.create_entries(); + let mut last_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + + // Write two entries so that the validator is in the active set: + // + // 1) Give the validator a nonzero number of tokens + // Write the bootstrap entries to the ledger that will cause leader rotation + // after the bootstrap height + // + // 2) A vote from the validator + let mut ledger_writer = LedgerWriter::open(&validator_ledger_path, false).unwrap(); + let bootstrap_entries = + make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id); + let bootstrap_entries_len = bootstrap_entries.len(); + last_id = bootstrap_entries.last().unwrap().id; + ledger_writer.write_entries(bootstrap_entries).unwrap(); + let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64; + + // Set the leader scheduler for the validator + let leader_rotation_interval = 10; + let num_bootstrap_slots = 2; + let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; + + let leader_scheduler_config = LeaderSchedulerConfig::new( + leader_id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(leader_rotation_interval * 2), + Some(bootstrap_height), + ); + + // Start the validator let mut validator = Fullnode::new( validator_node, &validator_ledger_path, validator_keypair, Some(leader_ncp), false, - Some(leader_rotation_interval), - ); - - // Set the leader schedule for the validator - let my_leader_begin_epoch = 2; - for i in 0..my_leader_begin_epoch { - validator.set_scheduled_leader(leader_id, leader_rotation_interval * i); - } - validator.set_scheduled_leader( - validator_info.id, - my_leader_begin_epoch * leader_rotation_interval, + LeaderScheduler::new(&leader_scheduler_config), ); // Send blobs to the validator from our mock leader @@ -679,19 +821,17 @@ mod tests { // Send the blobs out of order, in reverse. Also send an extra // "extra_blobs" number of blobs to make sure the window stops in the right place. let extra_blobs = cmp::max(leader_rotation_interval / 3, 1); - let total_blobs_to_send = - my_leader_begin_epoch * leader_rotation_interval + extra_blobs; - let genesis_entries = mint.create_entries(); - let last_id = genesis_entries - .last() - .expect("expected at least one genesis entry") - .id; + let total_blobs_to_send = bootstrap_height + extra_blobs; let tvu_address = &validator_info.contact_info.tvu; - let msgs = - make_consecutive_blobs(leader_id, total_blobs_to_send, last_id, &tvu_address) - .into_iter() - .rev() - .collect(); + let msgs = make_consecutive_blobs( + leader_id, + total_blobs_to_send, + ledger_initial_len, + last_id, + &tvu_address, + ).into_iter() + .rev() + .collect(); s_responder.send(msgs).expect("send"); t_responder }; @@ -705,22 +845,21 @@ mod tests { .expect("Expected successful validator join"); assert_eq!( join_result, - Some(TvuReturnType::LeaderRotation( - my_leader_begin_epoch * leader_rotation_interval - )) + Some(TvuReturnType::LeaderRotation(bootstrap_height)) ); } _ => panic!("Role should not be leader"), } - // Check the validator ledger to make sure it's the right height - let (_, entry_height, _) = Fullnode::new_bank_from_ledger(&validator_ledger_path); - - assert_eq!( - entry_height, - my_leader_begin_epoch * leader_rotation_interval + // Check the validator ledger to make sure it's the right height, we should've + // transitioned after the bootstrap_height entry + let (_, entry_height, _) = Fullnode::new_bank_from_ledger( + &validator_ledger_path, + &mut LeaderScheduler::new(&leader_scheduler_config), ); + assert_eq!(entry_height, bootstrap_height); + // Shut down t_responder.join().expect("responder thread join"); validator.close().unwrap(); diff --git a/src/leader_scheduler.rs b/src/leader_scheduler.rs index 87ed61fe49..be889dcb59 100644 --- a/src/leader_scheduler.rs +++ b/src/leader_scheduler.rs @@ -2,18 +2,28 @@ //! managing the schedule for leader rotation use bank::Bank; + use bincode::serialize; +use budget_instruction::Vote; +use budget_transaction::BudgetTransaction; use byteorder::{LittleEndian, ReadBytesExt}; -use hash::hash; +use entry::Entry; +use hash::{hash, Hash}; +use signature::{Keypair, KeypairUtil}; +#[cfg(test)] +use solana_program_interface::account::Account; use solana_program_interface::pubkey::Pubkey; use std::collections::HashMap; use std::io::Cursor; +use system_transaction::SystemTransaction; +use transaction::Transaction; pub const DEFAULT_BOOTSTRAP_HEIGHT: u64 = 1000; pub const DEFAULT_LEADER_ROTATION_INTERVAL: u64 = 100; pub const DEFAULT_SEED_ROTATION_INTERVAL: u64 = 1000; pub const DEFAULT_ACTIVE_WINDOW_LENGTH: u64 = 1000; +#[derive(Debug)] pub struct ActiveValidators { // Map from validator id to the last PoH height at which they voted, pub active_validators: HashMap, @@ -43,13 +53,17 @@ impl ActiveValidators { // Note: height == 0 will only be included for all // height < self.active_window_length + let upper_bound = height; if height >= self.active_window_length { let lower_bound = height - self.active_window_length; self.active_validators .retain(|_, height| *height > lower_bound); } - self.active_validators.keys().cloned().collect() + self.active_validators + .iter() + .filter_map(|(k, v)| if *v <= upper_bound { Some(*k) } else { None }) + .collect() } // Push a vote for a validator with id == "id" who voted at PoH height == "height" @@ -59,6 +73,10 @@ impl ActiveValidators { *old_height = height; } } + + pub fn reset(&mut self) -> () { + self.active_validators.clear(); + } } pub struct LeaderSchedulerConfig { @@ -83,7 +101,6 @@ pub struct LeaderSchedulerConfig { // Used to toggle leader rotation in fullnode so that tests that don't // need leader rotation don't break impl LeaderSchedulerConfig { - #[allow(dead_code)] pub fn new( bootstrap_leader: Pubkey, bootstrap_height_option: Option, @@ -101,7 +118,12 @@ impl LeaderSchedulerConfig { } } +#[derive(Debug)] pub struct LeaderScheduler { + // Set to true if we want the default implementation of the LeaderScheduler, + // where ony the bootstrap leader is used + pub use_only_bootstrap_leader: bool, + // The interval at which to rotate the leader, should be much less than // seed_rotation_interval pub leader_rotation_interval: u64, @@ -119,12 +141,12 @@ pub struct LeaderScheduler { // Maintain the set of active validators pub active_validators: ActiveValidators, + // The last height at which the seed + schedule was generated + pub last_seed_height: Option, + // Round-robin ordering for the validators leader_schedule: Vec, - // The last height at which the seed + schedule was generated - last_seed_height: Option, - // The seed used to determine the round robin order of leaders seed: u64, } @@ -147,6 +169,13 @@ pub struct LeaderScheduler { // 3) When we we hit the next seed rotation PoH height, step 2) is executed again to // calculate the leader schedule for the upcoming seed_rotation_interval PoH counts. impl LeaderScheduler { + pub fn from_bootstrap_leader(bootstrap_leader: Pubkey) -> Self { + let config = LeaderSchedulerConfig::new(bootstrap_leader, None, None, None, None); + let mut leader_scheduler = LeaderScheduler::new(&config); + leader_scheduler.use_only_bootstrap_leader = true; + leader_scheduler + } + pub fn new(config: &LeaderSchedulerConfig) -> Self { let mut bootstrap_height = DEFAULT_BOOTSTRAP_HEIGHT; if let Some(input) = config.bootstrap_height_option { @@ -169,6 +198,7 @@ impl LeaderScheduler { assert!(seed_rotation_interval % leader_rotation_interval == 0); LeaderScheduler { + use_only_bootstrap_leader: false, active_validators: ActiveValidators::new(config.active_window_length_option), leader_rotation_interval, seed_rotation_interval, @@ -180,15 +210,51 @@ impl LeaderScheduler { } } + pub fn is_leader_rotation_height(&self, height: u64) -> bool { + if self.use_only_bootstrap_leader { + return false; + } + + if height < self.bootstrap_height { + return false; + } + + (height - self.bootstrap_height) % self.leader_rotation_interval == 0 + } + + pub fn entries_until_next_leader_rotation(&self, height: u64) -> Option { + if self.use_only_bootstrap_leader { + return None; + } + + if height < self.bootstrap_height { + Some(self.bootstrap_height - height) + } else { + Some( + self.leader_rotation_interval + - ((height - self.bootstrap_height) % self.leader_rotation_interval), + ) + } + } + + pub fn reset(&mut self) { + self.last_seed_height = None; + self.active_validators.reset(); + } + pub fn push_vote(&mut self, id: Pubkey, height: u64) { + if self.use_only_bootstrap_leader { + return; + } + self.active_validators.push_vote(id, height); } - fn get_active_set(&mut self, height: u64) -> Vec { - self.active_validators.get_active_set(height) - } - pub fn update_height(&mut self, height: u64, bank: &Bank) { + if self.use_only_bootstrap_leader { + return; + } + if height < self.bootstrap_height { return; } @@ -201,6 +267,10 @@ impl LeaderScheduler { // Uses the schedule generated by the last call to generate_schedule() to return the // leader for a given PoH height in round-robin fashion pub fn get_scheduled_leader(&self, height: u64) -> Option { + if self.use_only_bootstrap_leader { + return Some(self.bootstrap_leader); + } + // This covers cases where the schedule isn't yet generated. if self.last_seed_height == None { if height < self.bootstrap_height { @@ -226,6 +296,10 @@ impl LeaderScheduler { Some(self.leader_schedule[validator_index]) } + fn get_active_set(&mut self, height: u64) -> Vec { + self.active_validators.get_active_set(height) + } + // Called every seed_rotation_interval entries, generates the leader schedule // for the range of entries: [height, height + seed_rotation_interval) fn generate_schedule(&mut self, height: u64, bank: &Bank) { @@ -238,6 +312,7 @@ impl LeaderScheduler { // non-zero stake. In this case, use the bootstrap leader for // the upcoming rounds if ranked_active_set.is_empty() { + self.last_seed_height = Some(height); self.leader_schedule = vec![self.bootstrap_leader]; self.last_seed_height = Some(height); return; @@ -322,6 +397,57 @@ impl LeaderScheduler { } } +impl Default for LeaderScheduler { + // Create a dummy leader scheduler + fn default() -> Self { + let id = Keypair::new().pubkey(); + Self::from_bootstrap_leader(id) + } +} + +// Remove all candiates for leader selection from the active set by clearing the bank, +// and then set a single new candidate who will be eligible starting at height = vote_height +// by adding one new account to the bank +#[cfg(test)] +pub fn set_new_leader(bank: &Bank, leader_scheduler: &mut LeaderScheduler, vote_height: u64) { + // Set the scheduled next leader to some other node + let new_leader_keypair = Keypair::new(); + let new_leader_id = new_leader_keypair.pubkey(); + leader_scheduler.push_vote(new_leader_id, vote_height); + let dummy_id = Keypair::new().pubkey(); + let new_account = Account::new(1, 10, dummy_id.clone()); + + // Remove the previous acounts from the active set + let mut accounts = bank.accounts().write().unwrap(); + accounts.clear(); + accounts.insert(new_leader_id, new_account); +} + +// Create two entries so that the node with keypair == active_keypair +// is in the active set for leader selection: +// 1) Give the node a nonzero number of tokens, +// 2) A vote from the validator +pub fn make_active_set_entries( + active_keypair: &Keypair, + token_source: &Keypair, + last_id: &Hash, +) -> Vec { + // 1) Create transfer token entry + let transfer_tx = Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_id); + let transfer_entry = Entry::new(last_id, 0, vec![transfer_tx]); + let last_id = transfer_entry.id; + + // 2) Create vote entry + let vote = Vote { + version: 0, + contact_info_version: 0, + }; + let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, last_id, 0); + let vote_entry = Entry::new(&last_id, 0, vec![vote_tx]); + + vec![transfer_entry, vote_entry] +} + #[cfg(test)] mod tests { use bank::Bank; @@ -336,13 +462,6 @@ mod tests { use std::hash::Hash; use std::iter::FromIterator; - fn to_hashset(slice: &[T]) -> HashSet<&T> - where - T: Eq + Hash, - { - HashSet::from_iter(slice.iter()) - } - fn to_hashset_owned(slice: &[T]) -> HashSet where T: Eq + Hash + Clone, @@ -495,13 +614,11 @@ mod tests { leader_scheduler.push_vote(pk, start_height + active_window_length); } - let all_ids = old_ids.union(&new_ids).collect(); - // Queries for the active set let result = leader_scheduler.get_active_set(active_window_length + start_height - 1); - assert_eq!(result.len(), num_old_ids + num_new_ids); - let result_set = to_hashset(&result); - assert_eq!(result_set, all_ids); + assert_eq!(result.len(), num_old_ids); + let result_set = to_hashset_owned(&result); + assert_eq!(result_set, old_ids); let result = leader_scheduler.get_active_set(active_window_length + start_height); assert_eq!(result.len(), num_new_ids); @@ -769,18 +886,13 @@ mod tests { for i in 0..=num_validators { leader_scheduler.generate_schedule(i * active_window_length, &bank); let result = &leader_scheduler.leader_schedule; - let mut expected_set; - if i == num_validators { - // When there are no active validators remaining, should default back to the - // bootstrap leader - expected_set = HashSet::new(); - expected_set.insert(&bootstrap_leader_id); + let expected = if i == num_validators { + bootstrap_leader_id } else { - assert_eq!(num_validators - i, result.len() as u64); - expected_set = to_hashset(&validators[i as usize..]); - } - let result_set = to_hashset(&result[..]); - assert_eq!(expected_set, result_set); + validators[i as usize] + }; + + assert_eq!(vec![expected], *result); } } diff --git a/src/packet.rs b/src/packet.rs index 83dd1111a0..b212f1050c 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -406,6 +406,7 @@ impl Blob { pub fn make_consecutive_blobs( me_id: Pubkey, num_blobs_to_make: u64, + start_height: u64, start_hash: Hash, addr: &SocketAddr, ) -> SharedBlobs { @@ -415,7 +416,7 @@ pub fn make_consecutive_blobs( for _ in 0..num_blobs_to_make { all_entries.extend(next_entries_mut(&mut last_hash, &mut num_hashes, vec![])); } - let mut new_blobs = all_entries.to_blobs_with_id(me_id, 0, addr); + let mut new_blobs = all_entries.to_blobs_with_id(me_id, start_height, addr); new_blobs.truncate(num_blobs_to_make as usize); new_blobs } diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 0b0dc7efe2..39f70b23bb 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -4,11 +4,12 @@ use bank::Bank; use cluster_info::ClusterInfo; use counter::Counter; use entry::EntryReceiver; +use leader_scheduler::LeaderScheduler; use ledger::{Block, LedgerWriter}; use log::Level; use result::{Error, Result}; use service::Service; -use signature::Keypair; +use signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; @@ -20,6 +21,11 @@ use std::time::Instant; use streamer::{responder, BlobSender}; use vote_stage::send_validator_vote; +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum ReplicateStageReturnType { + LeaderRotation(u64), +} + // Implement a destructor for the ReplicateStage thread to signal it exited // even on panics struct Finalizer { @@ -39,7 +45,8 @@ impl Drop for Finalizer { } pub struct ReplicateStage { - thread_hdls: Vec>, + t_responder: JoinHandle<()>, + t_replicate: JoinHandle>, } impl ReplicateStage { @@ -51,6 +58,8 @@ impl ReplicateStage { ledger_writer: Option<&mut LedgerWriter>, keypair: &Arc, vote_blob_sender: Option<&BlobSender>, + entry_height: &mut u64, + leader_scheduler: &Arc>, ) -> Result<()> { let timer = Duration::new(1, 0); //coalesce all the available entries into a single vote @@ -59,27 +68,72 @@ impl ReplicateStage { entries.append(&mut more); } - let res = bank.process_entries(&entries); + let mut res = Ok(()); + { + let mut num_entries_to_write = entries.len(); + for (i, entry) in entries.iter().enumerate() { + res = bank.process_entry(&entry); + Bank::process_entry_votes( + &bank, + &entry, + *entry_height + i as u64 + 1, + &mut *leader_scheduler.write().unwrap(), + ); + + { + let ls_lock = leader_scheduler.read().unwrap(); + if ls_lock.is_leader_rotation_height( + // i is zero indexed, so current entry height for this entry is actually the + // old entry height + i + 1 + *entry_height + i as u64 + 1, + ) { + let my_id = keypair.pubkey(); + let scheduled_leader = + ls_lock.get_scheduled_leader(*entry_height + i as u64 + 1).expect("Scheduled leader id should never be unknown while processing entries"); + cluster_info.write().unwrap().set_leader(scheduled_leader); + if my_id == scheduled_leader { + num_entries_to_write = i + 1; + break; + } + } + } + + if res.is_err() { + // TODO: This will return early from the first entry that has an erroneous + // transaction, instad of processing the rest of the entries in the vector + // of received entries. This is in line with previous behavior when + // bank.process_entries() was used to process the entries, but doesn't solve the + // issue that the bank state was still changed, leading to inconsistencies with the + // leader as the leader currently should not be publishing erroneous transactions + break; + } + } + + // If leader rotation happened, only write the entries up to leader rotation. + entries.truncate(num_entries_to_write); + } if let Some(sender) = vote_blob_sender { send_validator_vote(bank, keypair, cluster_info, sender)?; } - { - let mut wcluster_info = cluster_info.write().unwrap(); - wcluster_info.insert_votes(&entries.votes()); - } + cluster_info.write().unwrap().insert_votes(&entries.votes()); inc_new_counter_info!( "replicate-transactions", entries.iter().map(|x| x.transactions.len()).sum() ); + let entries_len = entries.len() as u64; // TODO: move this to another stage? + // TODO: In line with previous behavior, this will write all the entries even if + // an error occurred processing one of the entries (causing the rest of the entries to + // not be processed). if let Some(ledger_writer) = ledger_writer { ledger_writer.write_entries(entries)?; } + *entry_height += entries_len; res?; Ok(()) } @@ -91,6 +145,8 @@ impl ReplicateStage { window_receiver: EntryReceiver, ledger_path: Option<&str>, exit: Arc, + entry_height: u64, + leader_scheduler: Arc>, ) -> Self { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); @@ -105,7 +161,17 @@ impl ReplicateStage { let _exit = Finalizer::new(exit); let now = Instant::now(); let mut next_vote_secs = 1; + let mut entry_height_ = entry_height; loop { + let leader_id = leader_scheduler + .read() + .unwrap() + .get_scheduled_leader(entry_height_) + .expect("Scheduled leader id should never be unknown at this point"); + if leader_id == keypair.pubkey() { + return Some(ReplicateStageReturnType::LeaderRotation(entry_height_)); + } + // Only vote once a second. let vote_sender = if now.elapsed().as_secs() > next_vote_secs { next_vote_secs += 1; @@ -121,6 +187,8 @@ impl ReplicateStage { ledger_writer.as_mut(), &keypair, vote_sender, + &mut entry_height_, + &leader_scheduler, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break, @@ -129,21 +197,134 @@ impl ReplicateStage { } } } + + None }).unwrap(); - let thread_hdls = vec![t_responder, t_replicate]; - - ReplicateStage { thread_hdls } + ReplicateStage { + t_responder, + t_replicate, + } } } impl Service for ReplicateStage { - type JoinReturnType = (); + type JoinReturnType = Option; - fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) + fn join(self) -> thread::Result> { + self.t_responder.join()?; + self.t_replicate.join() + } +} + +#[cfg(test)] +mod test { + use cluster_info::{ClusterInfo, Node}; + use fullnode::Fullnode; + use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; + use ledger::{genesis, next_entries_mut, LedgerWriter}; + use logger; + use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; + use service::Service; + use signature::{Keypair, KeypairUtil}; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::channel; + use std::sync::{Arc, RwLock}; + + #[test] + pub fn test_replicate_stage_leader_rotation_exit() { + logger::setup(); + + // Set up dummy node to host a ReplicateStage + let my_keypair = Keypair::new(); + let my_id = my_keypair.pubkey(); + let my_node = Node::new_localhost_with_pubkey(my_id); + let cluster_info_me = ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new"); + + // Create a ledger + let (mint, my_ledger_path) = genesis("test_replicate_stage_leader_rotation_exit", 10_000); + let genesis_entries = mint.create_entries(); + let mut last_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + + // Write two entries to the ledger so that the validator is in the active set: + // 1) Give the validator a nonzero number of tokens 2) A vote from the validator . + // This will cause leader rotation after the bootstrap height + let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap(); + let bootstrap_entries = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id); + last_id = bootstrap_entries.last().unwrap().id; + let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64; + ledger_writer.write_entries(bootstrap_entries).unwrap(); + + // Set up the LeaderScheduler so that this this node becomes the leader at + // bootstrap_height = num_bootstrap_slots * leader_rotation_interval + let old_leader_id = Keypair::new().pubkey(); + let leader_rotation_interval = 10; + let num_bootstrap_slots = 2; + let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; + let leader_scheduler_config = LeaderSchedulerConfig::new( + old_leader_id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(leader_rotation_interval * 2), + Some(bootstrap_height), + ); + + let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); + + // Set up the bank + let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler); + + let leader_scheduler = Arc::new(RwLock::new(leader_scheduler)); + + // Set up the replicate stage + let (entry_sender, entry_receiver) = channel(); + let exit = Arc::new(AtomicBool::new(false)); + let replicate_stage = ReplicateStage::new( + Arc::new(my_keypair), + Arc::new(bank), + Arc::new(RwLock::new(cluster_info_me)), + entry_receiver, + Some(&my_ledger_path), + exit.clone(), + ledger_initial_len, + leader_scheduler.clone(), + ); + + // Send enough entries to trigger leader rotation + let extra_entries = leader_rotation_interval; + let total_entries_to_send = (bootstrap_height + extra_entries) as usize; + let mut num_hashes = 0; + let mut entries_to_send = vec![]; + + while entries_to_send.len() < total_entries_to_send { + let entries = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); + last_id = entries.last().expect("expected at least one entry").id; + entries_to_send.extend(entries); + } + + entries_to_send.truncate(total_entries_to_send); + entry_sender.send(entries_to_send).unwrap(); + + // Wait for replicate_stage to exit and check return value is correct + assert_eq!( + Some(ReplicateStageReturnType::LeaderRotation(bootstrap_height)), + replicate_stage.join().expect("replicate stage join") + ); + + assert_eq!(exit.load(Ordering::Relaxed), true); + + //Check ledger height is correct + let mut leader_scheduler = Arc::try_unwrap(leader_scheduler) + .expect("Multiple references to this RwLock still exist") + .into_inner() + .expect("RwLock for LeaderScheduler is still locked"); + + let (_, entry_height, _) = + Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler); + + assert_eq!(entry_height, bootstrap_height); } } diff --git a/src/replicator.rs b/src/replicator.rs index 2a8f629388..46b41e3d00 100644 --- a/src/replicator.rs +++ b/src/replicator.rs @@ -1,6 +1,7 @@ use blob_fetch_stage::BlobFetchStage; use cluster_info::{ClusterInfo, Node, NodeInfo}; use hash::{Hash, Hasher}; +use leader_scheduler::LeaderScheduler; use ncp::Ncp; use service::Service; use std::fs::File; @@ -22,13 +23,13 @@ use std::time::Duration; use store_ledger_stage::StoreLedgerStage; use streamer::BlobReceiver; use window; -use window_service::{window_service, WindowServiceReturnType}; +use window_service::window_service; pub struct Replicator { ncp: Ncp, fetch_stage: BlobFetchStage, store_ledger_stage: StoreLedgerStage, - t_window: JoinHandle>, + t_window: JoinHandle<()>, pub retransmit_receiver: BlobReceiver, } @@ -82,8 +83,9 @@ impl Replicator { )); let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i)); - + let leader_pubkey; if let Some(leader_info) = leader_info.as_ref() { + leader_pubkey = leader_info.id; cluster_info.write().unwrap().insert(leader_info); } else { panic!("No leader info!"); @@ -108,6 +110,9 @@ impl Replicator { entry_window_sender, retransmit_sender, repair_socket, + Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_pubkey, + ))), done, ); @@ -152,6 +157,7 @@ mod tests { use cluster_info::Node; use fullnode::Fullnode; use hash::Hash; + use leader_scheduler::LeaderScheduler; use ledger::{genesis, read_ledger, tmp_ledger_path}; use logger; use replicator::sample_file; @@ -185,14 +191,13 @@ mod tests { let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let network_addr = leader_node.sockets.gossip.local_addr().unwrap(); let leader_info = leader_node.info.clone(); - let leader_rotation_interval = 20; let leader = Fullnode::new( leader_node, &leader_ledger_path, leader_keypair, None, false, - Some(leader_rotation_interval), + LeaderScheduler::from_bootstrap_leader(leader_info.id), ); let mut leader_client = mk_client(&leader_info); diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index 5cad16e785..ec3dca1530 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -3,6 +3,7 @@ use cluster_info::ClusterInfo; use counter::Counter; use entry::Entry; +use leader_scheduler::LeaderScheduler; use log::Level; use result::{Error, Result}; use service::Service; @@ -15,12 +16,7 @@ use std::thread::{self, Builder, JoinHandle}; use std::time::Duration; use streamer::BlobReceiver; use window::SharedWindow; -use window_service::{window_service, WindowServiceReturnType}; - -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum RetransmitStageReturnType { - LeaderRotation(u64), -} +use window_service::window_service; fn retransmit( cluster_info: &Arc>, @@ -71,8 +67,7 @@ fn retransmitter( } pub struct RetransmitStage { - t_retransmit: JoinHandle<()>, - t_window: JoinHandle>, + thread_hdls: Vec>, } impl RetransmitStage { @@ -83,6 +78,7 @@ impl RetransmitStage { retransmit_socket: Arc, repair_socket: Arc, fetch_stage_receiver: BlobReceiver, + leader_scheduler: Arc>, ) -> (Self, Receiver>) { let (retransmit_sender, retransmit_receiver) = channel(); @@ -99,29 +95,22 @@ impl RetransmitStage { entry_sender, retransmit_sender, repair_socket, + leader_scheduler, done, ); - ( - RetransmitStage { - t_window, - t_retransmit, - }, - entry_receiver, - ) + let thread_hdls = vec![t_retransmit, t_window]; + (RetransmitStage { thread_hdls }, entry_receiver) } } impl Service for RetransmitStage { - type JoinReturnType = Option; + type JoinReturnType = (); - fn join(self) -> thread::Result> { - self.t_retransmit.join()?; - match self.t_window.join()? { - Some(WindowServiceReturnType::LeaderRotation(entry_height)) => Ok(Some( - RetransmitStageReturnType::LeaderRotation(entry_height), - )), - _ => Ok(None), + fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; } + Ok(()) } } diff --git a/src/thin_client.rs b/src/thin_client.rs index a46a0eab28..e41c543eab 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -403,7 +403,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R loop { trace!("polling {:?} for leader from {:?}", leader_ncp, my_addr); - if let Some(l) = cluster_info.read().unwrap().leader_data() { + if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() { leader = Some(l.clone()); break; } @@ -434,6 +434,7 @@ mod tests { use bank::Bank; use cluster_info::Node; use fullnode::Fullnode; + use leader_scheduler::LeaderScheduler; use ledger::LedgerWriter; use logger; use mint::Mint; @@ -476,7 +477,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); sleep(Duration::from_millis(900)); @@ -523,7 +524,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); //TODO: remove this sleep, or add a retry so CI is stable @@ -583,7 +584,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); sleep(Duration::from_millis(300)); @@ -644,7 +645,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(0), ); sleep(Duration::from_millis(900)); diff --git a/src/tpu.rs b/src/tpu.rs index 1ce81b35f5..f4ca8fb072 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -30,6 +30,7 @@ use banking_stage::{BankingStage, Config}; use cluster_info::ClusterInfo; use entry::Entry; use fetch_stage::FetchStage; +use leader_scheduler::LeaderScheduler; use service::Service; use signature::Keypair; use sigverify_stage::SigVerifyStage; @@ -62,6 +63,7 @@ impl Tpu { ledger_path: &str, sigverify_disabled: bool, entry_height: u64, + leader_scheduler: Arc>, ) -> (Self, Receiver>, Arc) { let exit = Arc::new(AtomicBool::new(false)); @@ -80,6 +82,7 @@ impl Tpu { ledger_path, entry_receiver, entry_height, + leader_scheduler, ); let tpu = Tpu { @@ -96,6 +99,10 @@ impl Tpu { self.exit.store(true, Ordering::Relaxed); } + pub fn is_exited(&self) -> bool { + self.exit.load(Ordering::Relaxed) + } + pub fn close(self) -> thread::Result> { self.fetch_stage.close(); self.join() diff --git a/src/transaction.rs b/src/transaction.rs index 08a45af8ca..9e31a5a9c2 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -62,9 +62,7 @@ impl Transaction { let instructions = vec![Instruction { program_ids_index: 0, userdata, - accounts: (0..(transaction_keys.len() as u8 + 1)) - .into_iter() - .collect(), + accounts: (0..=transaction_keys.len() as u8).collect(), }]; Self::new_with_instructions( from_keypair, @@ -164,7 +162,7 @@ impl Transaction { true } - fn from(&self) -> &Pubkey { + pub fn from(&self) -> &Pubkey { &self.account_keys[0] } diff --git a/src/tvu.rs b/src/tvu.rs index 522e7f50fd..f9459d601f 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -39,8 +39,9 @@ use bank::Bank; use blob_fetch_stage::BlobFetchStage; use cluster_info::ClusterInfo; -use replicate_stage::ReplicateStage; -use retransmit_stage::{RetransmitStage, RetransmitStageReturnType}; +use leader_scheduler::LeaderScheduler; +use replicate_stage::{ReplicateStage, ReplicateStageReturnType}; +use retransmit_stage::RetransmitStage; use service::Service; use signature::Keypair; use std::net::UdpSocket; @@ -84,6 +85,7 @@ impl Tvu { repair_socket: UdpSocket, retransmit_socket: UdpSocket, ledger_path: Option<&str>, + leader_scheduler: Arc>, ) -> Self { let exit = Arc::new(AtomicBool::new(false)); @@ -103,6 +105,7 @@ impl Tvu { Arc::new(retransmit_socket), repair_socket, blob_fetch_receiver, + leader_scheduler.clone(), ); let replicate_stage = ReplicateStage::new( @@ -112,6 +115,8 @@ impl Tvu { blob_window_receiver, ledger_path, exit.clone(), + entry_height, + leader_scheduler, ); Tvu { @@ -122,6 +127,10 @@ impl Tvu { } } + pub fn is_exited(&self) -> bool { + self.exit.load(Ordering::Relaxed) + } + pub fn exit(&self) -> () { self.exit.store(true, Ordering::Relaxed); } @@ -136,10 +145,10 @@ impl Service for Tvu { type JoinReturnType = Option; fn join(self) -> thread::Result> { - self.replicate_stage.join()?; + self.retransmit_stage.join()?; self.fetch_stage.join()?; - match self.retransmit_stage.join()? { - Some(RetransmitStageReturnType::LeaderRotation(entry_height)) => { + match self.replicate_stage.join()? { + Some(ReplicateStageReturnType::LeaderRotation(entry_height)) => { Ok(Some(TvuReturnType::LeaderRotation(entry_height))) } _ => Ok(None), @@ -154,6 +163,7 @@ pub mod tests { use cluster_info::{ClusterInfo, Node}; use entry::Entry; use hash::{hash, Hash}; + use leader_scheduler::LeaderScheduler; use logger; use mint::Mint; use ncp::Ncp; @@ -249,6 +259,9 @@ pub mod tests { target1.sockets.repair, target1.sockets.retransmit, None, + Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader( + leader_id, + ))), ); let mut alice_ref_balance = starting_balance; diff --git a/src/wallet.rs b/src/wallet.rs index 8f5ec07d4b..824f04d095 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -616,6 +616,7 @@ mod tests { use cluster_info::Node; use drone::run_local_drone; use fullnode::Fullnode; + use leader_scheduler::LeaderScheduler; use ledger::LedgerWriter; use mint::Mint; use serde_json::Value; @@ -950,7 +951,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1025,7 +1026,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1101,7 +1102,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1222,7 +1223,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); @@ -1341,7 +1342,7 @@ mod tests { None, &ledger_path, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), Some(rpc_port), ); sleep(Duration::from_millis(900)); diff --git a/src/window.rs b/src/window.rs index cceed96c96..8c9648f750 100644 --- a/src/window.rs +++ b/src/window.rs @@ -5,6 +5,7 @@ use counter::Counter; use entry::Entry; #[cfg(feature = "erasure")] use erasure; +use leader_scheduler::LeaderScheduler; use ledger::{reconstruct_entries_from_blobs, Block}; use log::Level; use packet::SharedBlob; @@ -51,6 +52,7 @@ pub trait WindowUtil { /// Finds available slots, clears them, and returns their indices. fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec; + #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] fn repair( &mut self, cluster_info: &Arc>, @@ -59,6 +61,7 @@ pub trait WindowUtil { consumed: u64, received: u64, max_entry_height: u64, + leader_scheduler_option: &Arc>, ) -> Vec<(SocketAddr, Vec)>; fn print(&self, id: &Pubkey, consumed: u64) -> String; @@ -67,14 +70,12 @@ pub trait WindowUtil { fn process_blob( &mut self, id: &Pubkey, - cluster_info: &Arc>, blob: SharedBlob, pix: u64, consume_queue: &mut Vec, consumed: &mut u64, leader_unknown: bool, pending_retransmits: &mut bool, - leader_rotation_interval: u64, ); } @@ -101,13 +102,40 @@ impl WindowUtil for Window { consumed: u64, received: u64, max_entry_height: u64, + leader_scheduler_option: &Arc>, ) -> Vec<(SocketAddr, Vec)> { let rcluster_info = cluster_info.read().unwrap(); - let leader_rotation_interval = rcluster_info.get_leader_rotation_interval(); - // Calculate the next leader rotation height and check if we are the leader - let next_leader_rotation = - consumed + leader_rotation_interval - (consumed % leader_rotation_interval); - let is_next_leader = rcluster_info.get_scheduled_leader(next_leader_rotation) == Some(*id); + let mut is_next_leader = false; + { + let ls_lock = leader_scheduler_option.read().unwrap(); + if !ls_lock.use_only_bootstrap_leader { + // Calculate the next leader rotation height and check if we are the leader + let next_leader_rotation_height = consumed + ls_lock.entries_until_next_leader_rotation(consumed).expect("Leader rotation should exist when not using default implementation of LeaderScheduler"); + + match ls_lock.get_scheduled_leader(next_leader_rotation_height) { + Some(leader_id) if leader_id == *id => is_next_leader = true, + // In the case that we are not in the current scope of the leader schedule + // window then either: + // + // 1) The replicate stage hasn't caught up to the "consumed" entries we sent, + // in which case it will eventually catch up + // + // 2) We are on the border between seed_rotation_intervals, so the + // schedule won't be known until the entry on that cusp is received + // by the replicate stage (which comes after this stage). Hence, the next + // leader at the beginning of that next epoch will not know he is the + // leader until he receives that last "cusp" entry. He also won't ask for repairs + // for that entry because "is_next_leader" won't be set here. In this case, + // everybody will be blocking waiting for that "cusp" entry instead of repairing, + // until the leader hits "times" >= the max times in calculate_max_repair(). + // The impact of this, along with the similar problem from broadcast for the transitioning + // leader, can be observed in the multinode test, test_full_leader_validator_network(), + None => (), + _ => (), + } + } + } + let num_peers = rcluster_info.table.len() as u64; let max_repair = if max_entry_height == 0 { @@ -196,14 +224,12 @@ impl WindowUtil for Window { fn process_blob( &mut self, id: &Pubkey, - cluster_info: &Arc>, blob: SharedBlob, pix: u64, consume_queue: &mut Vec, consumed: &mut u64, leader_unknown: bool, pending_retransmits: &mut bool, - leader_rotation_interval: u64, ) { let w = (pix % WINDOW_SIZE) as usize; @@ -258,18 +284,6 @@ impl WindowUtil for Window { // push all contiguous blobs into consumed queue, increment consumed loop { - if *consumed != 0 && *consumed % (leader_rotation_interval as u64) == 0 { - let rcluster_info = cluster_info.read().unwrap(); - let my_id = rcluster_info.my_data().id; - match rcluster_info.get_scheduled_leader(*consumed) { - // If we are the next leader, exit - Some(id) if id == my_id => { - break; - } - _ => (), - } - } - let k = (*consumed % WINDOW_SIZE) as usize; trace!("{}: k: {} consumed: {}", id, k, *consumed,); diff --git a/src/window_service.rs b/src/window_service.rs index 83afac0c48..3796a032dd 100644 --- a/src/window_service.rs +++ b/src/window_service.rs @@ -3,6 +3,7 @@ use cluster_info::{ClusterInfo, NodeInfo}; use counter::Counter; use entry::EntrySender; +use leader_scheduler::LeaderScheduler; use log::Level; use packet::SharedBlob; use rand::{thread_rng, Rng}; @@ -144,7 +145,6 @@ fn recv_window( s: &EntrySender, retransmit: &BlobSender, pending_retransmits: &mut bool, - leader_rotation_interval: u64, done: &Arc, ) -> Result<()> { let timer = Duration::from_millis(200); @@ -204,14 +204,12 @@ fn recv_window( window.write().unwrap().process_blob( id, - cluster_info, b, pix, &mut consume_queue, consumed, leader_unknown, pending_retransmits, - leader_rotation_interval, ); // Send a signal when we hit the max entry_height @@ -238,6 +236,7 @@ fn recv_window( Ok(()) } +#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn window_service( cluster_info: Arc>, window: SharedWindow, @@ -247,8 +246,9 @@ pub fn window_service( s: EntrySender, retransmit: BlobSender, repair_socket: Arc, + leader_scheduler: Arc>, done: Arc, -) -> JoinHandle> { +) -> JoinHandle<()> { Builder::new() .name("solana-window".to_string()) .spawn(move || { @@ -256,30 +256,11 @@ pub fn window_service( let mut received = entry_height; let mut last = entry_height; let mut times = 0; - let id; - let leader_rotation_interval; - { - let rcluster_info = cluster_info.read().unwrap(); - id = rcluster_info.id; - leader_rotation_interval = rcluster_info.get_leader_rotation_interval(); - } + let id = cluster_info.read().unwrap().id; let mut pending_retransmits = false; trace!("{}: RECV_WINDOW started", id); loop { - if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 { - match cluster_info.read().unwrap().get_scheduled_leader(consumed) { - // If we are the next leader, exit - Some(next_leader_id) if id == next_leader_id => { - return Some(WindowServiceReturnType::LeaderRotation(consumed)); - } - // TODO: Figure out where to set the new leader in the cluster_info for - // validator -> validator transition (once we have real leader scheduling, - // this decision will be clearer). Also make sure new blobs to window actually - // originate from new leader - _ => (), - } - } - + // Check if leader rotation was configured if let Err(e) = recv_window( &window, &id, @@ -291,7 +272,6 @@ pub fn window_service( &s, &retransmit, &mut pending_retransmits, - leader_rotation_interval, &done, ) { match e { @@ -322,7 +302,15 @@ pub fn window_service( trace!("{} let's repair! times = {}", id, times); let mut window = window.write().unwrap(); - let reqs = window.repair(&cluster_info, &id, times, consumed, received, max_entry_height); + let reqs = window.repair( + &cluster_info, + &id, + times, + consumed, + received, + max_entry_height, + &leader_scheduler, + ); for (to, req) in reqs { repair_socket.send_to(&req, to).unwrap_or_else(|e| { info!("{} repair req send_to({}) error {:?}", id, to, e); @@ -330,7 +318,6 @@ pub fn window_service( }); } } - None }).unwrap() } @@ -339,9 +326,9 @@ mod test { use cluster_info::{ClusterInfo, Node}; use entry::Entry; use hash::Hash; + use leader_scheduler::LeaderScheduler; use logger; use packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE}; - use signature::{Keypair, KeypairUtil}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver}; @@ -349,7 +336,7 @@ mod test { use std::time::Duration; use streamer::{blob_receiver, responder}; use window::default_window; - use window_service::{repair_backoff, window_service, WindowServiceReturnType}; + use window_service::{repair_backoff, window_service}; fn get_entries(r: Receiver>, num: &mut usize) { for _t in 0..5 { @@ -391,6 +378,7 @@ mod test { s_window, s_retransmit, Arc::new(tn.sockets.repair), + Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))), done, ); let t_responder = { @@ -401,11 +389,15 @@ mod test { let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder); let mut num_blobs_to_make = 10; let gossip_address = &tn.info.contact_info.ncp; - let msgs = - make_consecutive_blobs(me_id, num_blobs_to_make, Hash::default(), &gossip_address) - .into_iter() - .rev() - .collect();; + let msgs = make_consecutive_blobs( + me_id, + num_blobs_to_make, + 0, + Hash::default(), + &gossip_address, + ).into_iter() + .rev() + .collect();; s_responder.send(msgs).expect("send"); t_responder }; @@ -448,6 +440,13 @@ mod test { s_window, s_retransmit, Arc::new(tn.sockets.repair), + // TODO: For now, the window still checks the ClusterInfo for the current leader + // to determine whether to retransmit a block. In the future when we rely on + // the LeaderScheduler for retransmits, this test will need to be rewritten + // because a leader should only be unknown in the window when the write stage + // hasn't yet calculated the leaders for slots in the next epoch (on entries + // at heights that are multiples of seed_rotation_interval in LeaderScheduler) + Arc::new(RwLock::new(LeaderScheduler::default())), done, ); let t_responder = { @@ -504,6 +503,13 @@ mod test { s_window, s_retransmit, Arc::new(tn.sockets.repair), + // TODO: For now, the window still checks the ClusterInfo for the current leader + // to determine whether to retransmit a block. In the future when we rely on + // the LeaderScheduler for retransmits, this test will need to be rewritten + // becasue a leader should only be unknown in the window when the write stage + // hasn't yet calculated the leaders for slots in the next epoch (on entries + // at heights that are multiples of seed_rotation_interval in LeaderScheduler) + Arc::new(RwLock::new(LeaderScheduler::default())), done, ); let t_responder = { @@ -585,85 +591,4 @@ mod test { assert!(avg >= 3); assert!(avg <= 5); } - - #[test] - pub fn test_window_leader_rotation_exit() { - logger::setup(); - let leader_rotation_interval = 10; - // Height at which this node becomes the leader = - // my_leader_begin_epoch * leader_rotation_interval - let my_leader_begin_epoch = 2; - let tn = Node::new_localhost(); - let exit = Arc::new(AtomicBool::new(false)); - let mut cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new"); - let me_id = cluster_info_me.my_data().id; - - // Set myself in an upcoming epoch, but set the old_leader_id as the - // leader for all epochs before that - let old_leader_id = Keypair::new().pubkey(); - cluster_info_me.set_leader(me_id); - cluster_info_me.set_leader_rotation_interval(leader_rotation_interval); - for i in 0..my_leader_begin_epoch { - cluster_info_me.set_scheduled_leader(leader_rotation_interval * i, old_leader_id); - } - cluster_info_me - .set_scheduled_leader(my_leader_begin_epoch * leader_rotation_interval, me_id); - - let subs = Arc::new(RwLock::new(cluster_info_me)); - - let (s_reader, r_reader) = channel(); - let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader); - let (s_window, _r_window) = channel(); - let (s_retransmit, _r_retransmit) = channel(); - let win = Arc::new(RwLock::new(default_window())); - let done = Arc::new(AtomicBool::new(false)); - let t_window = window_service( - subs, - win, - 0, - 0, - r_reader, - s_window, - s_retransmit, - Arc::new(tn.sockets.repair), - done, - ); - - let t_responder = { - let (s_responder, r_responder) = channel(); - let blob_sockets: Vec> = - tn.sockets.replicate.into_iter().map(Arc::new).collect(); - - let t_responder = responder( - "test_window_leader_rotation_exit", - blob_sockets[0].clone(), - r_responder, - ); - - let ncp_address = &tn.info.contact_info.ncp; - // Send the blobs out of order, in reverse. Also send an extra leader_rotation_interval - // number of blobs to make sure the window stops in the right place. - let extra_blobs = leader_rotation_interval; - let total_blobs_to_send = - my_leader_begin_epoch * leader_rotation_interval + extra_blobs; - let msgs = - make_consecutive_blobs(me_id, total_blobs_to_send, Hash::default(), &ncp_address) - .into_iter() - .rev() - .collect();; - s_responder.send(msgs).expect("send"); - t_responder - }; - - assert_eq!( - Some(WindowServiceReturnType::LeaderRotation( - my_leader_begin_epoch * leader_rotation_interval - )), - t_window.join().expect("window service join") - ); - - t_responder.join().expect("responder thread join"); - exit.store(true, Ordering::Relaxed); - t_receiver.join().expect("receiver thread join"); - } } diff --git a/src/write_stage.rs b/src/write_stage.rs index 4e03c29af7..417b759fe6 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -3,14 +3,17 @@ //! stdout, and then sends the Entry to its output channel. use bank::Bank; +use budget_transaction::BudgetTransaction; use cluster_info::ClusterInfo; use counter::Counter; use entry::Entry; +use leader_scheduler::LeaderScheduler; use ledger::{Block, LedgerWriter}; use log::Level; use result::{Error, Result}; use service::Service; use signature::Keypair; +use solana_program_interface::pubkey::Pubkey; use std::cmp; use std::net::UdpSocket; use std::sync::atomic::AtomicUsize; @@ -38,11 +41,19 @@ impl WriteStage { // fit before we hit the entry height for leader rotation. Also return a boolean // reflecting whether we actually hit an entry height for leader rotation. fn find_leader_rotation_index( - cluster_info: &Arc>, - leader_rotation_interval: u64, + bank: &Arc, + my_id: &Pubkey, + leader_scheduler: &mut LeaderScheduler, entry_height: u64, mut new_entries: Vec, ) -> (Vec, bool) { + // In the case we're using the default implemntation of the leader scheduler, + // there won't ever be leader rotations at particular entry heights, so just + // return the entire input vector of entries + if leader_scheduler.use_only_bootstrap_leader { + return (new_entries, false); + } + let new_entries_length = new_entries.len(); // i is the number of entries to take @@ -50,14 +61,11 @@ impl WriteStage { let mut is_leader_rotation = false; loop { - if (entry_height + i as u64) % leader_rotation_interval == 0 { - let rcluster_info = cluster_info.read().unwrap(); - let my_id = rcluster_info.my_data().id; - let next_leader = rcluster_info.get_scheduled_leader(entry_height + i as u64); - if next_leader != Some(my_id) { - is_leader_rotation = true; - break; - } + let next_leader = leader_scheduler.get_scheduled_leader(entry_height + i as u64); + + if next_leader != Some(*my_id) { + is_leader_rotation = true; + break; } if i == new_entries_length { @@ -66,16 +74,41 @@ impl WriteStage { // Find out how many more entries we can squeeze in until the next leader // rotation - let entries_until_leader_rotation = - leader_rotation_interval - (entry_height % leader_rotation_interval); + let entries_until_leader_rotation = leader_scheduler.entries_until_next_leader_rotation( + entry_height + (i as u64) + ).expect("Leader rotation should exist when not using default implementation of LeaderScheduler"); // Check the next leader rotation height entries in new_entries, or // if the new_entries doesnt have that many entries remaining, // just check the rest of the new_entries_vector - i += cmp::min( + let step = cmp::min( entries_until_leader_rotation as usize, new_entries_length - i, ); + + // 1) Since "i" is the current number/count of items from the new_entries vector we have + // have already checked, then "i" is also the INDEX into the new_entries vector of the + // next unprocessed entry. Hence this loop checks all entries between indexes: + // [entry_height + i, entry_height + i + step - 1], which is equivalent to the + // entry heights: [entry_height + i + 1, entry_height + i + step] + for (entry, new_entries_index) in new_entries[i..(i + step)].iter().zip(i..(i + step)) { + let votes = entry + .transactions + .iter() + .filter_map(BudgetTransaction::vote); + for (voter_id, _, _) in votes { + leader_scheduler + .push_vote(voter_id, entry_height + new_entries_index as u64 + 1); + } + // TODO: There's an issue here that the bank state may have updated + // while this entry was in flight from the BankingStage, which could cause + // the leader schedule, which is based on stake (tied to the bank account balances) + // right now, to be inconsistent with the rest of the network. Fix once + // we can pin PoH height to bank state + leader_scheduler.update_height(entry_height + new_entries_index as u64 + 1, bank); + } + + i += step } new_entries.truncate(i as usize); @@ -86,12 +119,13 @@ impl WriteStage { /// Process any Entry items that have been published by the RecordStage. /// continuosly send entries out pub fn write_and_send_entries( + bank: &Arc, cluster_info: &Arc>, ledger_writer: &mut LedgerWriter, entry_sender: &Sender>, entry_receiver: &Receiver>, entry_height: &mut u64, - leader_rotation_interval: u64, + leader_scheduler: &Arc>, ) -> Result<()> { let mut ventries = Vec::new(); let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; @@ -103,8 +137,9 @@ impl WriteStage { // Find out how many more entries we can squeeze in until the next leader // rotation let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index( - cluster_info, - leader_rotation_interval, + bank, + &cluster_info.read().unwrap().my_data().id, + &mut *leader_scheduler.write().unwrap(), *entry_height + num_new_entries as u64, received_entries, ); @@ -182,6 +217,7 @@ impl WriteStage { ledger_path: &str, entry_receiver: Receiver>, entry_height: u64, + leader_scheduler: Arc>, ) -> (Self, Receiver>) { let (vote_blob_sender, vote_blob_receiver) = channel(); let send = UdpSocket::bind("0.0.0.0:0").expect("bind"); @@ -198,44 +234,39 @@ impl WriteStage { .spawn(move || { let mut last_vote = 0; let mut last_valid_validator_timestamp = 0; - let id; - let leader_rotation_interval; - { - let rcluster_info = cluster_info.read().unwrap(); - id = rcluster_info.id; - leader_rotation_interval = rcluster_info.get_leader_rotation_interval(); - } - let mut entry_height = entry_height; + let id = cluster_info.read().unwrap().id; + let mut entry_height_ = entry_height; loop { // Note that entry height is not zero indexed, it starts at 1, so the // old leader is in power up to and including entry height // n * leader_rotation_interval for some "n". Once we've forwarded // that last block, check for the next scheduled leader. - if entry_height % (leader_rotation_interval as u64) == 0 { - let rcluster_info = cluster_info.read().unwrap(); - let my_id = rcluster_info.my_data().id; - let scheduled_leader = rcluster_info.get_scheduled_leader(entry_height); - drop(rcluster_info); - match scheduled_leader { - Some(id) if id == my_id => (), - // If the leader stays in power for the next - // round as well, then we don't exit. Otherwise, exit. - _ => { - // When the broadcast stage has received the last blob, it - // will signal to close the fetch stage, which will in turn - // close down this write stage - return WriteStageReturnType::LeaderRotation; - } + match leader_scheduler + .read() + .unwrap() + .get_scheduled_leader(entry_height_) + { + Some(leader_id) if leader_id == id => (), + None => panic!( + "Scheduled leader id should never be unknown while processing entries" + ), + _ => { + // If the leader is no longer in power, exit. + // When the broadcast stage has received the last blob, it + // will signal to close the fetch stage, which will in turn + // close down this write stage + return WriteStageReturnType::LeaderRotation; } } if let Err(e) = Self::write_and_send_entries( + &bank, &cluster_info, &mut ledger_writer, &entry_sender, &entry_receiver, - &mut entry_height, - leader_rotation_interval, + &mut entry_height_, + &leader_scheduler, ) { match e { Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => { @@ -295,9 +326,11 @@ mod tests { use cluster_info::{ClusterInfo, Node}; use entry::Entry; use hash::Hash; + use leader_scheduler::{set_new_leader, LeaderScheduler, LeaderSchedulerConfig}; use ledger::{genesis, next_entries_mut, read_ledger}; use service::Service; use signature::{Keypair, KeypairUtil}; + use solana_program_interface::account::Account; use solana_program_interface::pubkey::Pubkey; use std::fs::remove_dir_all; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -305,14 +338,13 @@ mod tests { use write_stage::{WriteStage, WriteStageReturnType}; struct DummyWriteStage { - my_id: Pubkey, write_stage: WriteStage, entry_sender: Sender>, _write_stage_entry_receiver: Receiver>, - cluster_info: Arc>, bank: Arc, leader_ledger_path: String, ledger_tail: Vec, + leader_scheduler: Arc>, } fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec) { @@ -322,29 +354,34 @@ mod tests { .map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err))); info!("processing ledger..."); - bank.process_ledger(entries).expect("process_ledger") + bank.process_ledger(entries, &mut LeaderScheduler::default()) + .expect("process_ledger") } - fn setup_dummy_write_stage(leader_rotation_interval: u64) -> DummyWriteStage { + fn setup_dummy_write_stage( + leader_keypair: Arc, + leader_scheduler_config: &LeaderSchedulerConfig, + test_name: &str, + ) -> DummyWriteStage { // Setup leader info - let leader_keypair = Arc::new(Keypair::new()); - let my_id = leader_keypair.pubkey(); let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); - let mut cluster_info = ClusterInfo::new(leader_info.info).expect("ClusterInfo::new"); - cluster_info.set_leader_rotation_interval(leader_rotation_interval); - let cluster_info = Arc::new(RwLock::new(cluster_info)); - let bank = Bank::new_default(true); - let bank = Arc::new(bank); + let cluster_info = Arc::new(RwLock::new( + ClusterInfo::new(leader_info.info).expect("ClusterInfo::new"), + )); + let bank = Arc::new(Bank::new_default(true)); // Make a ledger - let (_, leader_ledger_path) = genesis("test_leader_rotation_exit", 10_000); + let (_, leader_ledger_path) = genesis(test_name, 10_000); let (entry_height, ledger_tail) = process_ledger(&leader_ledger_path, &bank); // Make a dummy pipe let (entry_sender, entry_receiver) = channel(); + // Make a dummy leader scheduler we can manipulate + let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(leader_scheduler_config))); + // Start up the write stage let (write_stage, _write_stage_entry_receiver) = WriteStage::new( leader_keypair, @@ -353,31 +390,45 @@ mod tests { &leader_ledger_path, entry_receiver, entry_height, + leader_scheduler.clone(), ); DummyWriteStage { - my_id, write_stage, entry_sender, // Need to keep this alive, otherwise the write_stage will detect ChannelClosed // and shut down _write_stage_entry_receiver, - cluster_info, bank, leader_ledger_path, ledger_tail, + leader_scheduler, } } #[test] fn test_write_stage_leader_rotation_exit() { - let leader_rotation_interval = 10; - let write_stage_info = setup_dummy_write_stage(leader_rotation_interval); + let leader_keypair = Keypair::new(); + let leader_id = leader_keypair.pubkey(); - { - let mut wcluster_info = write_stage_info.cluster_info.write().unwrap(); - wcluster_info.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id); - } + // Make a dummy leader scheduler we can manipulate + let bootstrap_height = 20; + let leader_rotation_interval = 10; + let seed_rotation_interval = 2 * leader_rotation_interval; + let active_window = bootstrap_height + 3 * seed_rotation_interval; + let leader_scheduler_config = LeaderSchedulerConfig::new( + leader_keypair.pubkey(), + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(seed_rotation_interval), + Some(active_window), + ); + + let write_stage_info = setup_dummy_write_stage( + Arc::new(leader_keypair), + &leader_scheduler_config, + "test_write_stage_leader_rotation_exit", + ); let mut last_id = write_stage_info .ledger_tail @@ -388,30 +439,39 @@ mod tests { let genesis_entry_height = write_stage_info.ledger_tail.len() as u64; + // Insert a nonzero balance and vote into the bank to make this node eligible + // for leader selection + write_stage_info + .leader_scheduler + .write() + .unwrap() + .push_vote(leader_id, 1); + let dummy_id = Keypair::new().pubkey(); + let accounts = write_stage_info.bank.accounts(); + let new_account = Account::new(1, 10, dummy_id.clone()); + accounts + .write() + .unwrap() + .insert(leader_id, new_account.clone()); + // Input enough entries to make exactly leader_rotation_interval entries, which will // trigger a check for leader rotation. Because the next scheduled leader // is ourselves, we won't exit - for _ in genesis_entry_height..leader_rotation_interval { + for _ in genesis_entry_height..bootstrap_height { let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); write_stage_info.entry_sender.send(new_entry).unwrap(); } - // Set the scheduled next leader in the cluster_info to some other node - let leader2_keypair = Keypair::new(); - let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey()); - + // Set the next scheduled next leader to some other node { - let mut wcluster_info = write_stage_info.cluster_info.write().unwrap(); - wcluster_info.insert(&leader2_info.info); - wcluster_info - .set_scheduled_leader(2 * leader_rotation_interval, leader2_keypair.pubkey()); + let mut leader_scheduler = write_stage_info.leader_scheduler.write().unwrap(); + set_new_leader(&write_stage_info.bank, &mut (*leader_scheduler), 1); } - // Input another leader_rotation_interval dummy entries one at a time, - // which will take us past the point of the leader rotation. + // Input enough dummy entries until the next seed rotation_interval, // The write_stage will see that it's no longer the leader after // checking the schedule, and exit - for _ in 0..leader_rotation_interval { + for _ in 0..seed_rotation_interval { let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]); write_stage_info.entry_sender.send(new_entry).unwrap(); } @@ -428,120 +488,240 @@ mod tests { assert_eq!(entry_height, 2 * leader_rotation_interval); } + fn make_leader_scheduler( + my_id: Pubkey, + bootstrap_height: u64, + leader_rotation_interval: u64, + seed_rotation_interval: u64, + active_window: u64, + ) -> LeaderScheduler { + let leader_scheduler_config = LeaderSchedulerConfig::new( + my_id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(seed_rotation_interval), + Some(active_window), + ); + + let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config); + leader_scheduler.push_vote(my_id, 1); + leader_scheduler + } + #[test] - fn test_leader_index_calculation() { + // Tests for when the leader across slots and epochs are the same + fn test_same_leader_index_calculation() { // Set up a dummy node - let leader_keypair = Arc::new(Keypair::new()); - let my_id = leader_keypair.pubkey(); - let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); + let my_keypair = Arc::new(Keypair::new()); + let my_id = my_keypair.pubkey(); - let leader_rotation_interval = 10; + // Set up a dummy bank + let bank = Arc::new(Bank::new_default(true)); + let accounts = bank.accounts(); + let dummy_id = Keypair::new().pubkey(); + let new_account = Account::new(1, 10, dummy_id.clone()); + accounts.write().unwrap().insert(my_id, new_account.clone()); - // An epoch is the period of leader_rotation_interval entries - // time during which a leader is in power - let num_epochs = 3; - - let mut cluster_info = ClusterInfo::new(leader_info.info).expect("ClusterInfo::new"); - cluster_info.set_leader_rotation_interval(leader_rotation_interval as u64); - for i in 0..num_epochs { - cluster_info.set_scheduled_leader(i * leader_rotation_interval, my_id) - } - - let cluster_info = Arc::new(RwLock::new(cluster_info)); let entry = Entry::new(&Hash::default(), 0, vec![]); - // A vector that is completely within a certain epoch should return that + // Note: An slot is the period of leader_rotation_interval entries + // time during which a leader is in power + + let leader_rotation_interval = 10; + let bootstrap_height = 17; + let seed_rotation_interval = 3 * leader_rotation_interval; + let active_window = bootstrap_height + 3 * seed_rotation_interval; + + let mut leader_scheduler = make_leader_scheduler( + my_id, + bootstrap_height, + leader_rotation_interval, + seed_rotation_interval, + active_window, + ); + + // Case 1: A vector that is completely within a certain slot should return that // entire vector - let mut len = leader_rotation_interval as usize - 1; + let mut len = (leader_scheduler.bootstrap_height - 1) as usize; let mut input = vec![entry.clone(); len]; let mut result = WriteStage::find_leader_rotation_index( - &cluster_info, - leader_rotation_interval, - (num_epochs - 1) * leader_rotation_interval, + &bank, + &my_id, + &mut leader_scheduler, + 0, input.clone(), ); assert_eq!(result, (input, false)); - // A vector that spans two different epochs for different leaders - // should get truncated - len = leader_rotation_interval as usize - 1; + // Case 2: A vector of new entries that spans multiple slots should return the + // entire vector, assuming that the same leader is in power for all the slots. + len = 2 * seed_rotation_interval as usize; input = vec![entry.clone(); len]; result = WriteStage::find_leader_rotation_index( - &cluster_info, - leader_rotation_interval, - (num_epochs * leader_rotation_interval) - 1, + &bank, + &my_id, + &mut leader_scheduler, + bootstrap_height - 1, input.clone(), ); - input.truncate(1); - assert_eq!(result, (input, true)); + assert_eq!(result, (input, false)); - // A vector that triggers a check for leader rotation should return + // Case 3: A vector that triggers a check for leader rotation should return // the entire vector and signal leader_rotation == false, if the - // same leader is in power for the next epoch as well. + // same leader is in power for the next slot as well. + let mut leader_scheduler = make_leader_scheduler( + my_id, + bootstrap_height, + leader_rotation_interval, + seed_rotation_interval, + active_window, + ); + len = 1; - let mut input = vec![entry.clone(); len]; + input = vec![entry.clone(); len]; result = WriteStage::find_leader_rotation_index( - &cluster_info, - leader_rotation_interval, - leader_rotation_interval - 1, + &bank, + &my_id, + &mut leader_scheduler, + bootstrap_height - 1, + input.clone(), + ); + + assert_eq!(result, (input.clone(), false)); + + result = WriteStage::find_leader_rotation_index( + &bank, + &my_id, + &mut leader_scheduler, + bootstrap_height + seed_rotation_interval - 1, input.clone(), ); assert_eq!(result, (input, false)); + } - // A vector of new entries that spans two epochs should return the - // entire vector, assuming that the same leader is in power for both epochs. - len = leader_rotation_interval as usize; - input = vec![entry.clone(); len]; - result = WriteStage::find_leader_rotation_index( - &cluster_info, + // Tests for when the leader across slots / epochs are different + #[test] + fn test_different_leader_index_calculation() { + // Set up a dummy node + let my_keypair = Arc::new(Keypair::new()); + let my_id = my_keypair.pubkey(); + + // Set up a dummy bank + let bank = Arc::new(Bank::new_default(true)); + let entry = Entry::new(&Hash::default(), 0, vec![]); + + // Note: An slot is the period of leader_rotation_interval entries + // time during which a leader is in power + + let leader_rotation_interval = 10; + let bootstrap_height = 17; + let seed_rotation_interval = 3 * leader_rotation_interval; + let active_window = 1; + let swap_entry_height = bootstrap_height + 2 * seed_rotation_interval; + + // Case 1: A vector that spans different epochs for different leaders + // should get truncated + + // Set the leader scheduler + let mut leader_scheduler = make_leader_scheduler( + my_id, + bootstrap_height, leader_rotation_interval, - leader_rotation_interval - 1, - input.clone(), + seed_rotation_interval, + active_window, ); - assert_eq!(result, (input, false)); + set_new_leader(&bank, &mut leader_scheduler, swap_entry_height); - // A vector of new entries that spans multiple epochs should return the - // entire vector, assuming that the same leader is in power for both dynasties. - len = (num_epochs - 1) as usize * leader_rotation_interval as usize; - input = vec![entry.clone(); len]; - result = WriteStage::find_leader_rotation_index( - &cluster_info, - leader_rotation_interval, - leader_rotation_interval - 1, + // Start test + let mut start_height = bootstrap_height - 1; + let extra_len = leader_rotation_interval; + let expected_len = swap_entry_height - start_height; + let mut len = expected_len + extra_len; + let mut input = vec![entry.clone(); len as usize]; + let mut result = WriteStage::find_leader_rotation_index( + &bank, + &my_id, + &mut leader_scheduler, + start_height, input.clone(), ); - - assert_eq!(result, (input, false)); - - // A vector of new entries that spans multiple leader epochs and has a length - // exactly equal to the remainining number of entries before the next, different - // leader should return the entire vector and signal that leader_rotation == true. - len = (num_epochs - 1) as usize * leader_rotation_interval as usize + 1; - input = vec![entry.clone(); len]; - result = WriteStage::find_leader_rotation_index( - &cluster_info, - leader_rotation_interval, - leader_rotation_interval - 1, - input.clone(), - ); - + input.truncate(expected_len as usize); assert_eq!(result, (input, true)); - // Start at entry height == the height for leader rotation, should return - // no entries. - len = leader_rotation_interval as usize; - input = vec![entry.clone(); len]; + // Case 2: Start at entry height == the height where the next leader is elected, should + // return no entries + len = 1; + input = vec![entry.clone(); len as usize]; result = WriteStage::find_leader_rotation_index( - &cluster_info, - leader_rotation_interval, - num_epochs * leader_rotation_interval, + &bank, + &my_id, + &mut leader_scheduler, + swap_entry_height, input.clone(), ); assert_eq!(result, (vec![], true)); + + // Case 3: A vector that lands one before leader rotation happens should not be + // truncated, and should signal leader rotation == false + + // Reset the leader scheduler + leader_scheduler = make_leader_scheduler( + my_id, + bootstrap_height, + leader_rotation_interval, + seed_rotation_interval, + active_window, + ); + + set_new_leader(&bank, &mut leader_scheduler, swap_entry_height); + + // Start test + start_height = bootstrap_height - 1; + let len_remaining = swap_entry_height - start_height; + len = len_remaining - 1; + input = vec![entry.clone(); len as usize]; + result = WriteStage::find_leader_rotation_index( + &bank, + &my_id, + &mut leader_scheduler, + start_height, + input.clone(), + ); + assert_eq!(result, (input, false)); + + // Case 4: A vector that lands exactly where leader rotation happens should not be + // truncated, but should return leader rotation == true + + // Reset the leader scheduler + leader_scheduler = make_leader_scheduler( + my_id, + bootstrap_height, + leader_rotation_interval, + seed_rotation_interval, + active_window, + ); + set_new_leader(&bank, &mut leader_scheduler, swap_entry_height); + + // Generate the schedule + leader_scheduler.update_height(bootstrap_height, &bank); + + // Start test + start_height = bootstrap_height + leader_rotation_interval - 1; + len = swap_entry_height - start_height; + input = vec![entry.clone(); len as usize]; + result = WriteStage::find_leader_rotation_index( + &bank, + &my_id, + &mut leader_scheduler, + start_height, + input.clone(), + ); + + assert_eq!(result, (input, true)); } } diff --git a/tests/multinode.rs b/tests/multinode.rs index ce9ff85f81..e2cf15530e 100644 --- a/tests/multinode.rs +++ b/tests/multinode.rs @@ -10,6 +10,7 @@ use solana::cluster_info::{ClusterInfo, Node, NodeInfo}; use solana::entry::Entry; use solana::fullnode::{Fullnode, FullnodeReturnType}; use solana::hash::Hash; +use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig}; use solana::ledger::{read_ledger, LedgerWriter}; use solana::logger; use solana::mint::Mint; @@ -21,21 +22,22 @@ use solana::thin_client::ThinClient; use solana::timing::{duration_as_ms, duration_as_s}; use solana::window::{default_window, WINDOW_SIZE}; use solana_program_interface::pubkey::Pubkey; +use std::collections::{HashSet, VecDeque}; use std::env; use std::fs::{copy, create_dir_all, remove_dir_all}; use std::net::UdpSocket; use std::path::Path; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; -use std::thread::sleep; -use std::thread::Builder; +use std::thread::{sleep, Builder, JoinHandle}; use std::time::{Duration, Instant}; fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc>, Pubkey) { let exit = Arc::new(AtomicBool::new(false)); let mut spy = Node::new_localhost(); let me = spy.info.id.clone(); - spy.info.contact_info.tvu = spy.sockets.replicate[0].local_addr().unwrap(); + let daddr = "0.0.0.0:0".parse().unwrap(); + spy.info.contact_info.tvu = daddr; spy.info.contact_info.rpu = spy.sockets.transaction[0].local_addr().unwrap(); let mut spy_cluster_info = ClusterInfo::new(spy.info).expect("ClusterInfo::new"); spy_cluster_info.insert(&leader); @@ -150,7 +152,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { leader_keypair, None, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); // Send leader some tokens to vote @@ -170,7 +172,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); // Send validator some tokens to vote @@ -240,7 +242,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { leader_keypair, None, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); // Send leader some tokens to vote @@ -271,7 +273,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); nodes.push(val); } @@ -307,7 +309,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); nodes.push(val); //contains the leader and new node @@ -373,7 +375,7 @@ fn test_multi_node_basic() { leader_keypair, None, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); // Send leader some tokens to vote @@ -401,7 +403,7 @@ fn test_multi_node_basic() { keypair, Some(leader_data.contact_info.ncp), false, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); nodes.push(val); } @@ -437,6 +439,7 @@ fn test_multi_node_basic() { fn test_boot_validator_from_file() -> result::Result<()> { logger::setup(); let leader_keypair = Keypair::new(); + let leader_pubkey = leader_keypair.pubkey(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let bob_pubkey = Keypair::new().pubkey(); let (alice, leader_ledger_path, _) = genesis("boot_validator_from_file", 100_000); @@ -450,7 +453,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { leader_keypair, None, false, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); let leader_balance = send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap(); @@ -470,7 +473,7 @@ fn test_boot_validator_from_file() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); let mut client = mk_client(&validator_data); let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance)); @@ -489,7 +492,14 @@ fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) { let leader_keypair = Keypair::new(); let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_data = leader.info.clone(); - let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false, None); + let leader_fullnode = Fullnode::new( + leader, + &ledger_path, + leader_keypair, + None, + false, + LeaderScheduler::from_bootstrap_leader(leader_data.id), + ); (leader_data, leader_fullnode) } @@ -543,7 +553,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> { keypair, Some(leader_data.contact_info.ncp), false, - None, + LeaderScheduler::from_bootstrap_leader(leader_data.id), ); // trigger broadcast, validator should catch up from leader, whose window contains @@ -607,7 +617,7 @@ fn test_multi_node_dynamic_network() { leader_keypair, None, true, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); // Send leader some tokens to vote @@ -681,7 +691,7 @@ fn test_multi_node_dynamic_network() { keypair, Some(leader_data.contact_info.ncp), true, - None, + LeaderScheduler::from_bootstrap_leader(leader_pubkey), ); (rd, val) }).unwrap() @@ -775,39 +785,57 @@ fn test_multi_node_dynamic_network() { } #[test] -#[ignore] fn test_leader_to_validator_transition() { logger::setup(); let leader_rotation_interval = 20; - // Make a dummy address to be the sink for this test's mock transactions - let bob_pubkey = Keypair::new().pubkey(); + // Make a dummy validator id to be the next leader and + // sink for this test's mock transactions + let validator_keypair = Keypair::new(); + let validator_id = validator_keypair.pubkey(); - // Initialize the leader ledger. Make a mint and a genesis entry - // in the leader ledger - let (mint, leader_ledger_path, entries) = genesis( - "test_leader_to_validator_transition", - (3 * leader_rotation_interval) as i64, - ); - - let genesis_height = entries.len() as u64; - - // Start the leader node + // Create the leader node information let leader_keypair = Keypair::new(); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = leader_node.info.clone(); + + // Initialize the leader ledger. Make a mint and a genesis entry + // in the leader ledger + let (mint, leader_ledger_path, genesis_entries) = + genesis("test_leader_to_validator_transition", 10_000); + + let last_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + + // Write the bootstrap entries to the ledger that will cause leader rotation + // after the bootstrap height + let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); + let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id); + let bootstrap_entries_len = bootstrap_entries.len(); + ledger_writer.write_entries(bootstrap_entries).unwrap(); + let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64; + + // Start the leader node + let bootstrap_height = leader_rotation_interval; + let leader_scheduler_config = LeaderSchedulerConfig::new( + leader_info.id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(leader_rotation_interval * 2), + Some(bootstrap_height), + ); + let mut leader = Fullnode::new( leader_node, &leader_ledger_path, leader_keypair, - None, + Some(leader_info.contact_info.ncp), false, - Some(leader_rotation_interval), + LeaderScheduler::new(&leader_scheduler_config), ); - // Set the next leader to be Bob - leader.set_scheduled_leader(bob_pubkey, leader_rotation_interval); - // Make an extra node for our leader to broadcast to, // who won't vote and mess with our leader's entry count let (ncp, spy_node, _) = make_spy_node(&leader_info); @@ -828,55 +856,66 @@ fn test_leader_to_validator_transition() { assert!(converged); - let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1); + let extra_transactions = std::cmp::max(bootstrap_height / 3, 1); - // Push leader "extra_transactions" past leader_rotation_interval entry height, + // Push leader "extra_transactions" past bootstrap_height, // make sure the leader stops. - assert!(genesis_height < leader_rotation_interval); - for i in genesis_height..(leader_rotation_interval + extra_transactions) { - if i < leader_rotation_interval { + assert!(ledger_initial_len < bootstrap_height); + for i in ledger_initial_len..(bootstrap_height + extra_transactions) { + if i < bootstrap_height { // Poll to see that the bank state is updated after every transaction // to ensure that each transaction is packaged as a single entry, // so that we can be sure leader rotation is triggered let expected_balance = std::cmp::min( - leader_rotation_interval - genesis_height, - i - genesis_height + 1, + bootstrap_height - ledger_initial_len, + i - ledger_initial_len + 1, ); let result = send_tx_and_retry_get_balance( &leader_info, &mint, - &bob_pubkey, + &validator_id, 1, Some(expected_balance as i64), ); - assert_eq!(result, Some(expected_balance as i64)) + // If the transaction wasn't reflected in the node, then we assume + // the node has transitioned already + if result != Some(expected_balance as i64) { + break; + } } else { - // After leader_rotation_interval entries, we don't care about the + // After bootstrap entries, we don't care about the // number of entries generated by these transactions. These are just // here for testing to make sure the leader stopped at the correct point. - send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None); + send_tx_and_retry_get_balance(&leader_info, &mint, &validator_id, 1, None); } } // Wait for leader to shut down tpu and restart tvu match leader.handle_role_transition().unwrap() { - Some(FullnodeReturnType::LeaderRotation) => (), + Some(FullnodeReturnType::LeaderToValidatorRotation) => (), _ => panic!("Expected reason for exit to be leader rotation"), } // Query now validator to make sure that he has the proper balances in his bank - // after the transition, even though we submitted "extra_transactions" + // after the transitions, even though we submitted "extra_transactions" // transactions earlier let mut leader_client = mk_client(&leader_info); - let expected_bal = leader_rotation_interval - genesis_height; + let maximum_bal = bootstrap_height - ledger_initial_len; let bal = leader_client - .poll_get_balance(&bob_pubkey) + .poll_get_balance(&validator_id) .expect("Expected success when polling newly transitioned validator for balance") as u64; - assert_eq!(bal, expected_bal); + assert!(bal <= maximum_bal); + + // Check the ledger to make sure it's the right height, we should've + // transitioned after the bootstrap_height entry + let (_, entry_height, _) = + Fullnode::new_bank_from_ledger(&leader_ledger_path, &mut LeaderScheduler::default()); + + assert_eq!(entry_height, bootstrap_height); // Shut down ncp.close().unwrap(); @@ -885,7 +924,6 @@ fn test_leader_to_validator_transition() { } #[test] -#[ignore] fn test_leader_validator_basic() { logger::setup(); let leader_rotation_interval = 10; @@ -893,61 +931,67 @@ fn test_leader_validator_basic() { // Account that will be the sink for all the test's transactions let bob_pubkey = Keypair::new().pubkey(); - // Make a mint and a genesis entry in the leader ledger - let (mint, leader_ledger_path, genesis_entries) = - genesis("test_leader_validator_basic", 10_000); - let genesis_height = genesis_entries.len(); - - // Initialize the leader ledger - let mut ledger_paths = Vec::new(); - ledger_paths.push(leader_ledger_path.clone()); - - // Create the leader fullnode + // Create the leader node information let leader_keypair = Keypair::new(); let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey()); let leader_info = leader_node.info.clone(); + // Create the validator node information + let validator_keypair = Keypair::new(); + let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); + + // Make a common mint and a genesis entry for both leader + validator ledgers + let (mint, leader_ledger_path, genesis_entries) = + genesis("test_leader_validator_basic", 10_000); + + let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic"); + + let last_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + let genesis_height = genesis_entries.len(); + + // Initialize both leader + validator ledger + let mut ledger_paths = Vec::new(); + ledger_paths.push(leader_ledger_path.clone()); + ledger_paths.push(validator_ledger_path.clone()); + + // Write the bootstrap entries to the ledger that will cause leader rotation + // after the bootstrap height + let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap(); + let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id); + ledger_writer.write_entries(bootstrap_entries).unwrap(); + + // Create the leader scheduler config + let num_bootstrap_slots = 2; + let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; + let leader_scheduler_config = LeaderSchedulerConfig::new( + leader_info.id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(leader_rotation_interval * 2), + Some(bootstrap_height), + ); + + // Start the leader fullnode let mut leader = Fullnode::new( leader_node, &leader_ledger_path, leader_keypair, - None, + Some(leader_info.contact_info.ncp), false, - Some(leader_rotation_interval), + LeaderScheduler::new(&leader_scheduler_config), ); - // Send leader some tokens to vote - send_tx_and_retry_get_balance(&leader_info, &mint, &leader_info.id, 500, None).unwrap(); - // Start the validator node - let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic"); - let validator_keypair = Keypair::new(); - let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey()); - let validator_info = validator_node.info.clone(); let mut validator = Fullnode::new( validator_node, &validator_ledger_path, validator_keypair, Some(leader_info.contact_info.ncp), false, - Some(leader_rotation_interval), - ); - - ledger_paths.push(validator_ledger_path.clone()); - - // Set the leader schedule for the validator and leader - let my_leader_begin_epoch = 2; - for i in 0..my_leader_begin_epoch { - validator.set_scheduled_leader(leader_info.id, leader_rotation_interval * i); - leader.set_scheduled_leader(leader_info.id, leader_rotation_interval * i); - } - validator.set_scheduled_leader( - validator_info.id, - my_leader_begin_epoch * leader_rotation_interval, - ); - leader.set_scheduled_leader( - validator_info.id, - my_leader_begin_epoch * leader_rotation_interval, + LeaderScheduler::new(&leader_scheduler_config), ); // Wait for convergence @@ -956,8 +1000,7 @@ fn test_leader_validator_basic() { // Send transactions to the leader let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1); - let total_transactions_to_send = - my_leader_begin_epoch * leader_rotation_interval + extra_transactions; + let total_transactions_to_send = bootstrap_height + extra_transactions; // Push "extra_transactions" past leader_rotation_interval entry height, // make sure the validator stops. @@ -967,20 +1010,13 @@ fn test_leader_validator_basic() { // Wait for validator to shut down tvu and restart tpu match validator.handle_role_transition().unwrap() { - Some(FullnodeReturnType::LeaderRotation) => (), + Some(FullnodeReturnType::ValidatorToLeaderRotation) => (), _ => panic!("Expected reason for exit to be leader rotation"), } - // TODO: We ignore this test for now b/c there's a chance here that the cluster_info - // in the new leader calls the dummy sequence of update_leader -> top_leader() - // (see the TODOs in those functions) during gossip and sets the leader back - // to the old leader, which causes a panic from an assertion failure in cluster_info broadcast(), - // specifically: assert!(me.leader_id != v.id). We can enable this test once we have real - // leader scheduling - // Wait for the leader to shut down tpu and restart tvu match leader.handle_role_transition().unwrap() { - Some(FullnodeReturnType::LeaderRotation) => (), + Some(FullnodeReturnType::LeaderToValidatorRotation) => (), _ => panic!("Expected reason for exit to be leader rotation"), } @@ -994,15 +1030,393 @@ fn test_leader_validator_basic() { let validator_entries = read_ledger(&validator_ledger_path, true).expect("Expected parsing of validator ledger"); let leader_entries = - read_ledger(&validator_ledger_path, true).expect("Expected parsing of leader ledger"); + read_ledger(&leader_ledger_path, true).expect("Expected parsing of leader ledger"); + let mut min_len = 0; for (v, l) in validator_entries.zip(leader_entries) { + min_len += 1; assert_eq!( v.expect("expected valid validator entry"), l.expect("expected valid leader entry") ); } + assert!(min_len >= bootstrap_height); + + for path in ledger_paths { + remove_dir_all(path).unwrap(); + } +} + +fn run_node( + id: Pubkey, + fullnode: Arc>, + should_exit: Arc, +) -> JoinHandle<()> { + Builder::new() + .name(format!("run_node-{:?}", id).to_string()) + .spawn(move || loop { + if should_exit.load(Ordering::Relaxed) { + return; + } + if fullnode.read().unwrap().check_role_exited() { + match fullnode.write().unwrap().handle_role_transition().unwrap() { + Some(FullnodeReturnType::LeaderToValidatorRotation) => (), + Some(FullnodeReturnType::ValidatorToLeaderRotation) => (), + _ => { + panic!("Expected reason for exit to be leader rotation"); + } + }; + } + sleep(Duration::new(1, 0)); + }).unwrap() +} + +#[test] +#[ignore] +fn test_dropped_handoff_recovery() { + logger::setup(); + // The number of validators + const N: usize = 3; + assert!(N > 1); + logger::setup(); + + // Create the bootstrap leader node information + let bootstrap_leader_keypair = Keypair::new(); + let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); + let bootstrap_leader_info = bootstrap_leader_node.info.clone(); + + // Make a common mint and a genesis entry for both leader + validator's ledgers + let (mint, bootstrap_leader_ledger_path, genesis_entries) = + genesis("test_dropped_handoff_recovery", 10_000); + + let last_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + + // Create the validator keypair that will be the next leader in line + let next_leader_keypair = Keypair::new(); + + // Create a common ledger with entries in the beginning that will add only + // the "next_leader" validator to the active set for leader election, guaranteeing + // he is the next leader after bootstrap_height + let mut ledger_paths = Vec::new(); + ledger_paths.push(bootstrap_leader_ledger_path.clone()); + + // Make the entries to give the next_leader validator some stake so that he will be in + // leader election active set + let first_entries = make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id); + let first_entries_len = first_entries.len(); + + // Write the entries + let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); + ledger_writer.write_entries(first_entries).unwrap(); + + let ledger_initial_len = (genesis_entries.len() + first_entries_len) as u64; + + let next_leader_ledger_path = tmp_copy_ledger( + &bootstrap_leader_ledger_path, + "test_dropped_handoff_recovery", + ); + + ledger_paths.push(next_leader_ledger_path.clone()); + + // Create the common leader scheduling configuration + let num_slots_per_epoch = (N + 1) as u64; + let leader_rotation_interval = 5; + let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval; + let bootstrap_height = ledger_initial_len + 1; + let leader_scheduler_config = LeaderSchedulerConfig::new( + bootstrap_leader_info.id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(seed_rotation_interval), + Some(leader_rotation_interval), + ); + + // Start up the bootstrap leader fullnode + let bootstrap_leader = Fullnode::new( + bootstrap_leader_node, + &bootstrap_leader_ledger_path, + bootstrap_leader_keypair, + Some(bootstrap_leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ); + + let mut nodes = vec![bootstrap_leader]; + + // Start up the validators other than the "next_leader" validator + for _ in 0..(N - 1) { + let kp = Keypair::new(); + let validator_ledger_path = tmp_copy_ledger( + &bootstrap_leader_ledger_path, + "test_dropped_handoff_recovery", + ); + ledger_paths.push(validator_ledger_path.clone()); + let validator_id = kp.pubkey(); + let validator_node = Node::new_localhost_with_pubkey(validator_id); + let validator = Fullnode::new( + validator_node, + &validator_ledger_path, + kp, + Some(bootstrap_leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ); + + nodes.push(validator); + } + + // Wait for convergence + let num_converged = converge(&bootstrap_leader_info, N).len(); + assert_eq!(num_converged, N); + + // Wait for leader transition + match nodes[0].handle_role_transition().unwrap() { + Some(FullnodeReturnType::LeaderToValidatorRotation) => (), + _ => panic!("Expected reason for exit to be leader rotation"), + } + + // Now start up the "next leader" node + let next_leader_node = Node::new_localhost_with_pubkey(next_leader_keypair.pubkey()); + let mut next_leader = Fullnode::new( + next_leader_node, + &next_leader_ledger_path, + next_leader_keypair, + Some(bootstrap_leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ); + + // Wait for catchup + match next_leader.handle_role_transition().unwrap() { + Some(FullnodeReturnType::ValidatorToLeaderRotation) => (), + _ => panic!("Expected reason for exit to be leader rotation"), + } + + nodes.push(next_leader); + + for node in nodes { + node.close().unwrap(); + } + + for path in ledger_paths { + remove_dir_all(path).unwrap(); + } +} + +#[test] +//TODO: Ignore for now due to bug exposed by the test "test_dropped_handoff_recovery" +#[ignore] +fn test_full_leader_validator_network() { + logger::setup(); + // The number of validators + const N: usize = 5; + logger::setup(); + + // Create the bootstrap leader node information + let bootstrap_leader_keypair = Keypair::new(); + let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey()); + let bootstrap_leader_info = bootstrap_leader_node.info.clone(); + + let mut node_keypairs = VecDeque::new(); + node_keypairs.push_back(bootstrap_leader_keypair); + + // Create the validator keypairs + for _ in 0..N { + let validator_keypair = Keypair::new(); + node_keypairs.push_back(validator_keypair); + } + + // Make a common mint and a genesis entry for both leader + validator's ledgers + let (mint, bootstrap_leader_ledger_path, genesis_entries) = + genesis("test_full_leader_validator_network", 10_000); + + let mut last_id = genesis_entries + .last() + .expect("expected at least one genesis entry") + .id; + + // Create a common ledger with entries in the beginnging that will add all the validators + // to the active set for leader election. TODO: Leader rotation does not support dynamic + // stakes for safe leader -> validator transitions due to the unpredictability of + // bank state due to transactions being in-flight during leader seed calculation in + // write stage. + let mut ledger_paths = Vec::new(); + ledger_paths.push(bootstrap_leader_ledger_path.clone()); + + for node_keypair in node_keypairs.iter() { + // Make entries to give the validator some stake so that he will be in + // leader election active set + let bootstrap_entries = make_active_set_entries(node_keypair, &mint.keypair(), &last_id); + + // Write the entries + let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap(); + last_id = bootstrap_entries + .last() + .expect("expected at least one genesis entry") + .id; + ledger_writer.write_entries(bootstrap_entries).unwrap(); + } + + // Create the common leader scheduling configuration + let num_slots_per_epoch = (N + 1) as u64; + let num_bootstrap_slots = 2; + let leader_rotation_interval = 5; + let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval; + let bootstrap_height = num_bootstrap_slots * leader_rotation_interval; + let leader_scheduler_config = LeaderSchedulerConfig::new( + bootstrap_leader_info.id, + Some(bootstrap_height), + Some(leader_rotation_interval), + Some(seed_rotation_interval), + Some(leader_rotation_interval), + ); + + let exit = Arc::new(AtomicBool::new(false)); + // Start the bootstrap leader fullnode + let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new( + bootstrap_leader_node, + &bootstrap_leader_ledger_path, + node_keypairs.pop_front().unwrap(), + Some(bootstrap_leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ))); + + let mut nodes: Vec>> = vec![bootstrap_leader.clone()]; + let mut t_nodes = vec![run_node( + bootstrap_leader_info.id, + bootstrap_leader, + exit.clone(), + )]; + + // Start up the validators + for kp in node_keypairs.into_iter() { + let validator_ledger_path = tmp_copy_ledger( + &bootstrap_leader_ledger_path, + "test_full_leader_validator_network", + ); + ledger_paths.push(validator_ledger_path.clone()); + let validator_id = kp.pubkey(); + let validator_node = Node::new_localhost_with_pubkey(validator_id); + let validator = Arc::new(RwLock::new(Fullnode::new( + validator_node, + &validator_ledger_path, + kp, + Some(bootstrap_leader_info.contact_info.ncp), + false, + LeaderScheduler::new(&leader_scheduler_config), + ))); + + nodes.push(validator.clone()); + t_nodes.push(run_node(validator_id, validator, exit.clone())); + } + + // Wait for convergence + let num_converged = converge(&bootstrap_leader_info, N + 1).len(); + assert_eq!(num_converged, N + 1); + + // Wait for each node to hit a specific target height in the leader schedule. + // Once all the nodes hit that height, exit them all together. They must + // only quit once they've all confirmed to reach that specific target height. + // Otherwise, some nodes may never reach the target height if a critical + // next leader node exits first, and stops generating entries. (We don't + // have a timeout mechanism). + let target_height = bootstrap_height + seed_rotation_interval; + let mut num_reached_target_height = 0; + + while num_reached_target_height != N + 1 { + num_reached_target_height = 0; + for n in nodes.iter() { + let node_lock = n.read().unwrap(); + let ls_lock = &node_lock.leader_scheduler; + if let Some(sh) = ls_lock.read().unwrap().last_seed_height { + if sh >= target_height { + num_reached_target_height += 1; + } + } + drop(ls_lock); + } + + sleep(Duration::new(1, 0)); + } + + exit.store(true, Ordering::Relaxed); + + // Wait for threads running the nodes to exit + for t in t_nodes { + t.join().unwrap(); + } + + // Exit all fullnodes + for n in nodes { + let result = Arc::try_unwrap(n); + match result { + Ok(lock) => { + let f = lock + .into_inner() + .expect("RwLock for fullnode is still locked"); + f.close().unwrap(); + } + Err(_) => panic!("Multiple references to RwLock still exist"), + } + } + + let mut node_entries = vec![]; + // Check that all the ledgers match + for ledger_path in ledger_paths.iter() { + let entries = read_ledger(ledger_path, true).expect("Expected parsing of node ledger"); + node_entries.push(entries); + } + + let mut shortest = None; + let mut length = 0; + loop { + let mut expected_entry_option = None; + let mut empty_iterators = HashSet::new(); + for (i, entries_for_specific_node) in node_entries.iter_mut().enumerate() { + if let Some(next_entry_option) = entries_for_specific_node.next() { + // If this ledger iterator has another entry, make sure that the + // ledger reader parsed it correctly + let next_entry = next_entry_option.expect("expected valid ledger entry"); + + // Check if another earlier ledger iterator had another entry. If so, make + // sure they match + if let Some(ref expected_entry) = expected_entry_option { + assert_eq!(*expected_entry, next_entry); + } else { + expected_entry_option = Some(next_entry); + } + } else { + // The shortest iterator is the first one to return a None when + // calling next() + if shortest.is_none() { + shortest = Some(length); + } + empty_iterators.insert(i); + } + } + + // Remove the empty iterators + node_entries = node_entries + .into_iter() + .enumerate() + .filter_map(|(i, x)| match empty_iterators.get(&i) { + None => Some(x), + _ => None, + }).collect(); + + if node_entries.len() == 0 { + break; + } + + length += 1; + } + + assert!(shortest.unwrap() >= target_height); for path in ledger_paths { remove_dir_all(path).unwrap(); }