Leader scheduler plumbing (#1440)

* Added LeaderScheduler module and tests

* plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage

* Add LeaderScheduler plumbing for Tvu, window, and tests

* Fix bank and switch tests to use new LeaderScheduler

* move leader rotation check from window service to replicate stage

* Add replicate_stage leader rotation exit test

* removed leader scheduler from the window service and associated modules/tests

* Corrected is_leader calculation in repair() function in window.rs

* Integrate LeaderScheduler with write_stage for leader to validator transitions

* Integrated LeaderScheduler with BroadcastStage

* Removed gossip leader rotation from crdt

* Add multi validator, leader test

* Comments and cleanup

* Remove unneeded checks from broadcast stage

* Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role

* Set new leader in validator -> validator transitions

* Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail

* Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops

* Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
This commit is contained in:
carllin 2018-10-10 16:49:41 -07:00 committed by GitHub
parent 2ba2bc72ca
commit 9931ac9780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 1743 additions and 898 deletions

View File

@ -6,12 +6,14 @@
use bincode::deserialize;
use bincode::serialize;
use budget_program::BudgetState;
use budget_transaction::BudgetTransaction;
use counter::Counter;
use dynamic_program::DynamicProgram;
use entry::Entry;
use hash::{hash, Hash};
use itertools::Itertools;
use jsonrpc_macros::pubsub::Sink;
use leader_scheduler::LeaderScheduler;
use ledger::Block;
use log::Level;
use mint::Mint;
@ -781,6 +783,22 @@ impl Bank {
results
}
pub fn process_entry_votes(
bank: &Bank,
entry: &Entry,
entry_height: u64,
leader_scheduler: &mut LeaderScheduler,
) {
for tx in &entry.transactions {
if tx.vote().is_some() {
// Update the active set in the leader scheduler
leader_scheduler.push_vote(*tx.from(), entry_height);
}
}
leader_scheduler.update_height(entry_height, bank);
}
pub fn process_entry(&self, entry: &Entry) -> Result<()> {
if !entry.transactions.is_empty() {
for result in self.process_transactions(&entry.transactions) {
@ -795,7 +813,7 @@ impl Bank {
/// as we go.
fn process_entries_tail(
&self,
entries: Vec<Entry>,
entries: &[Entry],
tail: &mut Vec<Entry>,
tail_idx: &mut usize,
) -> Result<u64> {
@ -810,7 +828,7 @@ impl Bank {
*tail_idx = (*tail_idx + 1) % WINDOW_SIZE as usize;
entry_count += 1;
self.process_entry(&entry)?;
self.process_entry(entry)?;
}
Ok(entry_count)
@ -831,6 +849,7 @@ impl Bank {
entries: I,
tail: &mut Vec<Entry>,
tail_idx: &mut usize,
leader_scheduler: &mut LeaderScheduler,
) -> Result<u64>
where
I: IntoIterator<Item = Entry>,
@ -846,13 +865,30 @@ impl Bank {
return Err(BankError::LedgerVerificationFailed);
}
id = block.last().unwrap().id;
entry_count += self.process_entries_tail(block, tail, tail_idx)?;
let tail_count = self.process_entries_tail(&block, tail, tail_idx)?;
if !leader_scheduler.use_only_bootstrap_leader {
for (i, entry) in block.iter().enumerate() {
Self::process_entry_votes(
self,
&entry,
entry_count + i as u64 + 1,
leader_scheduler,
);
}
}
entry_count += tail_count;
}
Ok(entry_count)
}
/// Process a full ledger.
pub fn process_ledger<I>(&self, entries: I) -> Result<(u64, Vec<Entry>)>
pub fn process_ledger<I>(
&self,
entries: I,
leader_scheduler: &mut LeaderScheduler,
) -> Result<(u64, Vec<Entry>)>
where
I: IntoIterator<Item = Entry>,
{
@ -894,9 +930,15 @@ impl Bank {
tail.push(entry0);
tail.push(entry1);
let mut tail_idx = 2;
let entry_count = self.process_blocks(entry1_id, entries, &mut tail, &mut tail_idx)?;
let entry_count = self.process_blocks(
entry1_id,
entries,
&mut tail,
&mut tail_idx,
leader_scheduler,
)?;
// check f we need to rotate tail
// check if we need to rotate tail
if tail.len() == WINDOW_SIZE as usize {
tail.rotate_left(tail_idx)
}
@ -1069,6 +1111,13 @@ impl Bank {
}
subscriptions.remove(&signature);
}
#[cfg(test)]
// Used to access accounts for things like controlling stake to control
// the eligible set of nodes for leader selection
pub fn accounts(&self) -> &RwLock<HashMap<Pubkey, Account>> {
&self.accounts
}
}
#[cfg(test)]
@ -1081,6 +1130,7 @@ mod tests {
use entry_writer::{self, EntryWriter};
use hash::hash;
use jsonrpc_macros::pubsub::{Subscriber, SubscriptionId};
use leader_scheduler::LeaderScheduler;
use ledger;
use logger;
use signature::Keypair;
@ -1429,7 +1479,8 @@ mod tests {
let mint = Mint::new(1);
let genesis = mint.create_entries();
let bank = Bank::default();
bank.process_ledger(genesis).unwrap();
bank.process_ledger(genesis, &mut LeaderScheduler::default())
.unwrap();
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}
@ -1487,7 +1538,9 @@ mod tests {
let (ledger, pubkey) = create_sample_ledger(1);
let (ledger, dup) = ledger.tee();
let bank = Bank::default();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
let (ledger_height, tail) = bank
.process_ledger(ledger, &mut LeaderScheduler::default())
.unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 3);
assert_eq!(tail.len(), 3);
@ -1509,7 +1562,9 @@ mod tests {
for entry_count in window_size - 3..window_size + 2 {
let (ledger, pubkey) = create_sample_ledger(entry_count);
let bank = Bank::default();
let (ledger_height, tail) = bank.process_ledger(ledger).unwrap();
let (ledger_height, tail) = bank
.process_ledger(ledger, &mut LeaderScheduler::default())
.unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, entry_count as u64 + 2);
assert!(tail.len() <= window_size);
@ -1534,7 +1589,8 @@ mod tests {
let ledger = to_file_iter(ledger);
let bank = Bank::default();
bank.process_ledger(ledger).unwrap();
bank.process_ledger(ledger, &mut LeaderScheduler::default())
.unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
}
@ -1545,7 +1601,8 @@ mod tests {
let block = to_file_iter(create_sample_block(&mint, 1));
let bank = Bank::default();
bank.process_ledger(genesis.chain(block)).unwrap();
bank.process_ledger(genesis.chain(block), &mut LeaderScheduler::default())
.unwrap();
assert_eq!(bank.get_balance(&mint.pubkey()), 1);
}
@ -1568,9 +1625,13 @@ mod tests {
let ledger1 = create_sample_ledger_with_mint_and_keypairs(&mint, &keypairs);
let bank0 = Bank::default();
bank0.process_ledger(ledger0).unwrap();
bank0
.process_ledger(ledger0, &mut LeaderScheduler::default())
.unwrap();
let bank1 = Bank::default();
bank1.process_ledger(ledger1).unwrap();
bank1
.process_ledger(ledger1, &mut LeaderScheduler::default())
.unwrap();
let initial_state = bank0.hash_internal_state();

View File

@ -84,7 +84,6 @@ impl BankingStage {
// Many banks that process transactions in parallel.
let mut thread_hdls: Vec<JoinHandle<()>> = (0..NUM_THREADS)
.into_iter()
.map(|_| {
let thread_bank = bank.clone();
let thread_verified_receiver = shared_verified_receiver.clone();

View File

@ -12,6 +12,7 @@ use solana::client::mk_client;
use solana::cluster_info::Node;
use solana::drone::DRONE_PORT;
use solana::fullnode::{Config, Fullnode, FullnodeReturnType};
use solana::leader_scheduler::LeaderScheduler;
use solana::logger;
use solana::metrics::set_panic_hook;
use solana::signature::{Keypair, KeypairUtil};
@ -83,9 +84,6 @@ fn main() -> () {
let node_info = node.info.clone();
let pubkey = keypair.pubkey();
let mut fullnode = Fullnode::new(node, ledger_path, keypair, network, false, None);
// airdrop stuff, probably goes away at some point
let leader = match network {
Some(network) => {
poll_gossip_for_leader(network, None).expect("can't find leader on network")
@ -93,6 +91,14 @@ fn main() -> () {
None => node_info,
};
let mut fullnode = Fullnode::new(
node,
ledger_path,
keypair,
network,
false,
LeaderScheduler::from_bootstrap_leader(leader.id),
);
let mut client = mk_client(&leader);
// TODO: maybe have the drone put itself in gossip somewhere instead of hardcoding?
@ -126,7 +132,8 @@ fn main() -> () {
loop {
let status = fullnode.handle_role_transition();
match status {
Ok(Some(FullnodeReturnType::LeaderRotation)) => (),
Ok(Some(FullnodeReturnType::LeaderToValidatorRotation)) => (),
Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation)) => (),
_ => {
// Fullnode tpu/tvu exited for some unexpected
// reason, so exit

View File

@ -5,6 +5,7 @@ extern crate solana;
use clap::{App, Arg, SubCommand};
use solana::bank::Bank;
use solana::leader_scheduler::LeaderScheduler;
use solana::ledger::{read_ledger, verify_ledger};
use solana::logger;
use std::io::{stdout, Write};
@ -116,8 +117,7 @@ fn main() {
};
let genesis = genesis.take(2).map(|e| e.unwrap());
if let Err(e) = bank.process_ledger(genesis) {
if let Err(e) = bank.process_ledger(genesis, &mut LeaderScheduler::default()) {
eprintln!("verify failed at genesis err: {:?}", e);
if !matches.is_present("continue") {
exit(1);

View File

@ -5,6 +5,7 @@ use counter::Counter;
use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
use leader_scheduler::LeaderScheduler;
use ledger::Block;
use log::Level;
use packet::SharedBlobs;
@ -26,9 +27,9 @@ pub enum BroadcastStageReturnType {
ChannelDisconnected,
}
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
fn broadcast(
cluster_info: &Arc<RwLock<ClusterInfo>>,
leader_rotation_interval: u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
@ -130,8 +131,7 @@ fn broadcast(
// Send blobs out from the window
ClusterInfo::broadcast(
cluster_info,
leader_rotation_interval,
&leader_scheduler,
&node_info,
&broadcast_table,
&window,
@ -183,38 +183,18 @@ impl BroadcastStage {
window: &SharedWindow,
entry_height: u64,
receiver: &Receiver<Vec<Entry>>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> BroadcastStageReturnType {
let mut transmit_index = WindowIndex {
data: entry_height,
coding: entry_height,
};
let mut receive_index = entry_height;
let me;
let leader_rotation_interval;
{
let rcluster_info = cluster_info.read().unwrap();
me = rcluster_info.my_data().clone();
leader_rotation_interval = rcluster_info.get_leader_rotation_interval();
}
let me = cluster_info.read().unwrap().my_data().clone();
loop {
if transmit_index.data % (leader_rotation_interval as u64) == 0 {
let rcluster_info = cluster_info.read().unwrap();
let my_id = rcluster_info.my_data().id;
match rcluster_info.get_scheduled_leader(transmit_index.data) {
Some(id) if id == my_id => (),
// If the leader stays in power for the next
// round as well, then we don't exit. Otherwise, exit.
_ => {
return BroadcastStageReturnType::LeaderRotation;
}
}
}
let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast(
cluster_info,
leader_rotation_interval,
leader_scheduler,
&me,
&broadcast_table,
&window,
@ -259,13 +239,21 @@ impl BroadcastStage {
window: SharedWindow,
entry_height: u64,
receiver: Receiver<Vec<Entry>>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
exit_sender: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
.name("solana-broadcaster".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_sender);
Self::run(&sock, &cluster_info, &window, entry_height, &receiver)
Self::run(
&sock,
&cluster_info,
&window,
entry_height,
&receiver,
&leader_scheduler,
)
}).unwrap();
BroadcastStage { thread_hdl }
@ -279,151 +267,3 @@ impl Service for BroadcastStage {
self.thread_hdl.join()
}
}
#[cfg(test)]
mod tests {
use broadcast_stage::{BroadcastStage, BroadcastStageReturnType};
use cluster_info::{ClusterInfo, Node};
use entry::Entry;
use ledger::next_entries_mut;
use mint::Mint;
use service::Service;
use signature::{Keypair, KeypairUtil};
use solana_program_interface::pubkey::Pubkey;
use std::cmp;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, RwLock};
use window::{new_window_from_entries, SharedWindow};
struct DummyBroadcastStage {
my_id: Pubkey,
buddy_id: Pubkey,
broadcast_stage: BroadcastStage,
shared_window: SharedWindow,
entry_sender: Sender<Vec<Entry>>,
cluster_info: Arc<RwLock<ClusterInfo>>,
entries: Vec<Entry>,
}
fn setup_dummy_broadcast_stage(leader_rotation_interval: u64) -> DummyBroadcastStage {
// Setup dummy leader info
let leader_keypair = Keypair::new();
let my_id = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
// Give the leader somebody to broadcast to so he isn't lonely
let buddy_keypair = Keypair::new();
let buddy_id = buddy_keypair.pubkey();
let broadcast_buddy = Node::new_localhost_with_pubkey(buddy_keypair.pubkey());
// Fill the cluster_info with the buddy's info
let mut cluster_info =
ClusterInfo::new(leader_info.info.clone()).expect("ClusterInfo::new");
cluster_info.insert(&broadcast_buddy.info);
cluster_info.set_leader_rotation_interval(leader_rotation_interval);
let cluster_info = Arc::new(RwLock::new(cluster_info));
// Make dummy initial entries
let mint = Mint::new(10000);
let entries = mint.create_entries();
let entry_height = entries.len() as u64;
// Setup a window
let window = new_window_from_entries(&entries, entry_height, &leader_info.info);
let shared_window = Arc::new(RwLock::new(window));
let (entry_sender, entry_receiver) = channel();
let exit_sender = Arc::new(AtomicBool::new(false));
// Start up the broadcast stage
let broadcast_stage = BroadcastStage::new(
leader_info.sockets.broadcast,
cluster_info.clone(),
shared_window.clone(),
entry_height,
entry_receiver,
exit_sender,
);
DummyBroadcastStage {
my_id,
buddy_id,
broadcast_stage,
shared_window,
entry_sender,
cluster_info,
entries,
}
}
fn find_highest_window_index(shared_window: &SharedWindow) -> u64 {
let window = shared_window.read().unwrap();
window.iter().fold(0, |m, w_slot| {
if let Some(ref blob) = w_slot.data {
cmp::max(m, blob.read().unwrap().get_index().unwrap())
} else {
m
}
})
}
#[test]
fn test_broadcast_stage_leader_rotation_exit() {
let leader_rotation_interval = 10;
let broadcast_info = setup_dummy_broadcast_stage(leader_rotation_interval);
{
let mut wcluster_info = broadcast_info.cluster_info.write().unwrap();
// Set the leader for the next rotation to be myself
wcluster_info.set_scheduled_leader(leader_rotation_interval, broadcast_info.my_id);
}
let genesis_len = broadcast_info.entries.len() as u64;
let mut last_id = broadcast_info
.entries
.last()
.expect("Ledger should not be empty")
.id;
let mut num_hashes = 0;
// Input enough entries to make exactly leader_rotation_interval entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
for _ in genesis_len..leader_rotation_interval {
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
broadcast_info.entry_sender.send(new_entry).unwrap();
}
// Set the scheduled next leader in the cluster_info to the other buddy on the network
broadcast_info
.cluster_info
.write()
.unwrap()
.set_scheduled_leader(2 * leader_rotation_interval, broadcast_info.buddy_id);
// Input another leader_rotation_interval dummy entries, which will take us
// past the point of the leader rotation. The write_stage will see that
// it's no longer the leader after checking the cluster_info, and exit
for _ in 0..leader_rotation_interval {
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
match broadcast_info.entry_sender.send(new_entry) {
// We disconnected, break out of loop and check the results
Err(_) => break,
_ => (),
};
}
// Make sure the threads closed cleanly
assert_eq!(
broadcast_info.broadcast_stage.join().unwrap(),
BroadcastStageReturnType::LeaderRotation
);
let highest_index = find_highest_window_index(&broadcast_info.shared_window);
// The blob index is zero indexed, so it will always be one behind the entry height
// which starts at one.
assert_eq!(highest_index, 2 * leader_rotation_interval - 1);
}
}

View File

@ -17,6 +17,7 @@ use budget_instruction::Vote;
use choose_gossip_peer_strategy::{ChooseGossipPeerStrategy, ChooseWeightedPeerStrategy};
use counter::Counter;
use hash::Hash;
use leader_scheduler::LeaderScheduler;
use ledger::LedgerWindow;
use log::Level;
use netutil::{bind_in_range, bind_to, multi_bind_in_range};
@ -223,12 +224,6 @@ pub struct ClusterInfo {
/// last time we heard from anyone getting a message fro this public key
/// these are rumers and shouldn't be trusted directly
external_liveness: HashMap<Pubkey, HashMap<Pubkey, u64>>,
/// TODO: Clearly not the correct implementation of this, but a temporary abstraction
/// for testing
pub scheduled_leaders: HashMap<u64, Pubkey>,
// TODO: Is there a better way to do this? We didn't make this a constant because
// we want to be able to set it in integration tests so that the tests don't time out.
pub leader_rotation_interval: u64,
}
// TODO These messages should be signed, and go through the gpu pipeline for spam filtering
@ -260,8 +255,6 @@ impl ClusterInfo {
external_liveness: HashMap::new(),
id: node_info.id,
update_index: 1,
scheduled_leaders: HashMap::new(),
leader_rotation_interval: 100,
};
me.local.insert(node_info.id, me.update_index);
me.table.insert(node_info.id, node_info);
@ -324,32 +317,10 @@ impl ClusterInfo {
self.insert(&me);
}
// TODO: Dummy leader scheduler, need to implement actual leader scheduling.
pub fn get_scheduled_leader(&self, entry_height: u64) -> Option<Pubkey> {
match self.scheduled_leaders.get(&entry_height) {
Some(x) => Some(*x),
None => Some(self.my_data().leader_id),
}
}
pub fn set_leader_rotation_interval(&mut self, leader_rotation_interval: u64) {
self.leader_rotation_interval = leader_rotation_interval;
}
pub fn get_leader_rotation_interval(&self) -> u64 {
self.leader_rotation_interval
}
// TODO: Dummy leader schedule setter, need to implement actual leader scheduling.
pub fn set_scheduled_leader(&mut self, entry_height: u64, new_leader_id: Pubkey) -> () {
self.scheduled_leaders.insert(entry_height, new_leader_id);
}
pub fn get_valid_peers(&self) -> Vec<NodeInfo> {
let me = self.my_data().id;
self.table
.values()
.into_iter()
.filter(|x| x.id != me)
.filter(|x| ClusterInfo::is_valid_address(&x.contact_info.rpu))
.cloned()
@ -515,9 +486,9 @@ impl ClusterInfo {
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
/// We need to avoid having obj locked while doing any io, such as the `send_to`
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn broadcast(
cluster_info: &Arc<RwLock<ClusterInfo>>,
leader_rotation_interval: u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
me: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
@ -562,11 +533,21 @@ impl ClusterInfo {
// Make sure the next leader in line knows about the last entry before rotation
// so he can initiate repairs if necessary
let entry_height = idx + 1;
if entry_height % leader_rotation_interval == 0 {
let next_leader_id = cluster_info
.read()
.unwrap()
.get_scheduled_leader(entry_height);
{
let ls_lock = leader_scheduler.read().unwrap();
let next_leader_id = ls_lock.get_scheduled_leader(entry_height);
// In the case the next scheduled leader is None, then the write_stage moved
// the schedule too far ahead and we no longer are in the known window
// (will happen during calculation of the next set of slots every epoch or
// seed_rotation_interval heights when we move the window forward in the
// LeaderScheduler). For correctness, this is fine write_stage will never send
// blobs past the point of when this node should stop being leader, so we just
// continue broadcasting until we catch up to write_stage. The downside is we
// can't guarantee the current leader will broadcast the last entry to the next
// scheduled leader, so the next leader will have to rely on avalanche/repairs
// to get this last blob, which could cause slowdowns during leader handoffs.
// See corresponding issue for repairs in repair() function in window.rs.
if next_leader_id.is_some() && next_leader_id != Some(me.id) {
let info_result = broadcast_table
.iter()
@ -842,8 +823,8 @@ impl ClusterInfo {
blob_sender.send(vec![blob])?;
Ok(())
}
/// TODO: This is obviously the wrong way to do this. Need to implement leader selection
fn top_leader(&self) -> Option<Pubkey> {
pub fn get_gossip_top_leader(&self) -> Option<&NodeInfo> {
let mut table = HashMap::new();
let def = Pubkey::default();
let cur = self.table.values().filter(|x| x.leader_id != def);
@ -857,16 +838,12 @@ impl ClusterInfo {
trace!("{}: sorted leaders {} votes: {}", self.id, x.0, x.1);
}
sorted.sort_by_key(|a| a.1);
sorted.last().map(|a| *a.0)
}
let top_leader = sorted.last().map(|a| *a.0);
/// TODO: This is obviously the wrong way to do this. Need to implement leader selection
/// A t-shirt for the first person to actually use this bad behavior to attack the alpha testnet
fn update_leader(&mut self) {
if let Some(leader_id) = self.top_leader() {
if self.my_data().leader_id != leader_id && self.table.get(&leader_id).is_some() {
self.set_leader(leader_id);
}
if let Some(l) = top_leader {
self.table.get(&l)
} else {
None
}
}
@ -936,7 +913,6 @@ impl ClusterInfo {
obj.write().unwrap().purge(timestamp());
//TODO: possibly tune this parameter
//we saw a deadlock passing an obj.read().unwrap().timeout into sleep
obj.write().unwrap().update_leader();
let elapsed = timestamp() - start;
if GOSSIP_SLEEP_MILLIS > elapsed {
let time_left = GOSSIP_SLEEP_MILLIS - elapsed;
@ -1842,32 +1818,6 @@ mod tests {
assert_eq!(blob.get_id().unwrap(), id);
}
}
/// TODO: This is obviously the wrong way to do this. Need to implement leader selection,
/// delete this test after leader selection is correctly implemented
#[test]
fn test_update_leader() {
logger::setup();
let me = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let leader0 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let leader1 = NodeInfo::new_with_socketaddr(&socketaddr!("127.0.0.1:1234"));
let mut cluster_info = ClusterInfo::new(me.clone()).expect("ClusterInfo::new");
assert_eq!(cluster_info.top_leader(), None);
cluster_info.set_leader(leader0.id);
assert_eq!(cluster_info.top_leader().unwrap(), leader0.id);
//add a bunch of nodes with a new leader
for _ in 0..10 {
let mut dum = NodeInfo::new_entry_point(&socketaddr!("127.0.0.1:1234"));
dum.id = Keypair::new().pubkey();
dum.leader_id = leader1.id;
cluster_info.insert(&dum);
}
assert_eq!(cluster_info.top_leader().unwrap(), leader1.id);
cluster_info.update_leader();
assert_eq!(cluster_info.my_data().leader_id, leader0.id);
cluster_info.insert(&leader1);
cluster_info.update_leader();
assert_eq!(cluster_info.my_data().leader_id, leader1.id);
}
#[test]
fn test_valid_last_ids() {

View File

@ -227,6 +227,7 @@ mod tests {
use cluster_info::Node;
use drone::{Drone, DroneRequest, REQUEST_CAP, TIME_SLICE};
use fullnode::Fullnode;
use leader_scheduler::LeaderScheduler;
use logger;
use mint::Mint;
use netutil::get_ip_addr;
@ -338,7 +339,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(0),
);
@ -376,7 +377,14 @@ mod tests {
let leader_keypair = Keypair::new();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.info.clone();
let server = Fullnode::new(leader, &ledger_path, leader_keypair, None, false, None);
let server = Fullnode::new(
leader,
&ledger_path,
leader_keypair,
None,
false,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
);
let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket");
let transactions_socket =

View File

@ -5,13 +5,13 @@ use broadcast_stage::BroadcastStage;
use cluster_info::{ClusterInfo, Node, NodeInfo};
use drone::DRONE_PORT;
use entry::Entry;
use leader_scheduler::LeaderScheduler;
use ledger::read_ledger;
use ncp::Ncp;
use rpc::{JsonRpcService, RPC_PORT};
use rpu::Rpu;
use service::Service;
use signature::{Keypair, KeypairUtil};
use solana_program_interface::pubkey::Pubkey;
use std::net::UdpSocket;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
@ -45,6 +45,10 @@ impl LeaderServices {
self.tpu.join()
}
pub fn is_exited(&self) -> bool {
self.tpu.is_exited()
}
pub fn exit(&self) -> () {
self.tpu.exit();
}
@ -63,17 +67,23 @@ impl ValidatorServices {
self.tvu.join()
}
pub fn is_exited(&self) -> bool {
self.tvu.is_exited()
}
pub fn exit(&self) -> () {
self.tvu.exit()
}
}
pub enum FullnodeReturnType {
LeaderRotation,
LeaderToValidatorRotation,
ValidatorToLeaderRotation,
}
pub struct Fullnode {
pub node_role: Option<NodeRole>,
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
keypair: Arc<Keypair>,
exit: Arc<AtomicBool>,
rpu: Option<Rpu>,
@ -122,10 +132,11 @@ impl Fullnode {
keypair: Keypair,
leader_addr: Option<SocketAddr>,
sigverify_disabled: bool,
leader_rotation_interval: Option<u64>,
mut leader_scheduler: LeaderScheduler,
) -> Self {
info!("creating bank...");
let (bank, entry_height, ledger_tail) = Self::new_bank_from_ledger(ledger_path);
let (bank, entry_height, ledger_tail) =
Self::new_bank_from_ledger(ledger_path, &mut leader_scheduler);
info!("creating networking stack...");
let local_gossip_addr = node.sockets.gossip.local_addr().unwrap();
@ -147,7 +158,7 @@ impl Fullnode {
leader_info.as_ref(),
ledger_path,
sigverify_disabled,
leader_rotation_interval,
leader_scheduler,
None,
);
@ -225,16 +236,13 @@ impl Fullnode {
bank: Bank,
entry_height: u64,
ledger_tail: &[Entry],
mut node: Node,
leader_info: Option<&NodeInfo>,
node: Node,
bootstrap_leader_info_option: Option<&NodeInfo>,
ledger_path: &str,
sigverify_disabled: bool,
leader_rotation_interval: Option<u64>,
leader_scheduler: LeaderScheduler,
rpc_port: Option<u16>,
) -> Self {
if leader_info.is_none() {
node.info.leader_id = node.info.id;
}
let exit = Arc::new(AtomicBool::new(false));
let bank = Arc::new(bank);
@ -269,12 +277,9 @@ impl Fullnode {
let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info);
let shared_window = Arc::new(RwLock::new(window));
let mut cluster_info = ClusterInfo::new(node.info).expect("ClusterInfo::new");
if let Some(interval) = leader_rotation_interval {
cluster_info.set_leader_rotation_interval(interval);
}
let cluster_info = Arc::new(RwLock::new(cluster_info));
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(node.info).expect("ClusterInfo::new"),
));
let ncp = Ncp::new(
&cluster_info,
@ -284,70 +289,83 @@ impl Fullnode {
exit.clone(),
);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
let keypair = Arc::new(keypair);
let node_role;
match leader_info {
Some(leader_info) => {
// Start in validator mode.
// TODO: let ClusterInfo get that data from the network?
cluster_info.write().unwrap().insert(leader_info);
let tvu = Tvu::new(
keypair.clone(),
&bank,
entry_height,
cluster_info.clone(),
shared_window.clone(),
node.sockets
.replicate
.iter()
.map(|s| s.try_clone().expect("Failed to clone replicate sockets"))
.collect(),
node.sockets
.repair
.try_clone()
.expect("Failed to clone repair socket"),
node.sockets
.retransmit
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(ledger_path),
);
let validator_state = ValidatorServices::new(tvu);
node_role = Some(NodeRole::Validator(validator_state));
}
None => {
// Start in leader mode.
let (tpu, entry_receiver, tpu_exit) = Tpu::new(
keypair.clone(),
&bank,
&cluster_info,
Default::default(),
node.sockets
.transaction
.iter()
.map(|s| s.try_clone().expect("Failed to clone transaction sockets"))
.collect(),
ledger_path,
sigverify_disabled,
entry_height,
);
let broadcast_stage = BroadcastStage::new(
node.sockets
.broadcast
.try_clone()
.expect("Failed to clone broadcast socket"),
cluster_info.clone(),
shared_window.clone(),
entry_height,
entry_receiver,
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
node_role = Some(NodeRole::Leader(leader_state));
}
// Insert the bootstrap leader info, should only be None if this node
// is the bootstrap leader
if let Some(bootstrap_leader_info) = bootstrap_leader_info_option {
cluster_info.write().unwrap().insert(bootstrap_leader_info);
}
// Get the scheduled leader
let scheduled_leader = leader_scheduler
.read()
.unwrap()
.get_scheduled_leader(entry_height)
.expect("Leader not known after processing bank");
cluster_info.write().unwrap().set_leader(scheduled_leader);
let node_role = if scheduled_leader != keypair.pubkey() {
// Start in validator mode.
let tvu = Tvu::new(
keypair.clone(),
&bank,
entry_height,
cluster_info.clone(),
shared_window.clone(),
node.sockets
.replicate
.iter()
.map(|s| s.try_clone().expect("Failed to clone replicate sockets"))
.collect(),
node.sockets
.repair
.try_clone()
.expect("Failed to clone repair socket"),
node.sockets
.retransmit
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(ledger_path),
leader_scheduler.clone(),
);
let validator_state = ValidatorServices::new(tvu);
Some(NodeRole::Validator(validator_state))
} else {
// Start in leader mode.
let (tpu, entry_receiver, tpu_exit) = Tpu::new(
keypair.clone(),
&bank,
&cluster_info,
Default::default(),
node.sockets
.transaction
.iter()
.map(|s| s.try_clone().expect("Failed to clone transaction sockets"))
.collect(),
ledger_path,
sigverify_disabled,
entry_height,
leader_scheduler.clone(),
);
let broadcast_stage = BroadcastStage::new(
node.sockets
.broadcast
.try_clone()
.expect("Failed to clone broadcast socket"),
cluster_info.clone(),
shared_window.clone(),
entry_height,
entry_receiver,
leader_scheduler.clone(),
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
Some(NodeRole::Leader(leader_state))
};
Fullnode {
keypair,
cluster_info,
@ -367,25 +385,35 @@ impl Fullnode {
broadcast_socket: node.sockets.broadcast,
requests_socket: node.sockets.requests,
respond_socket: node.sockets.respond,
leader_scheduler,
}
}
fn leader_to_validator(&mut self) -> Result<()> {
// TODO: We can avoid building the bank again once RecordStage is
// integrated with BankingStage
let (bank, entry_height, _) = Self::new_bank_from_ledger(&self.ledger_path);
self.bank = Arc::new(bank);
let (scheduled_leader, entry_height) = {
let mut ls_lock = self.leader_scheduler.write().unwrap();
// Clear the leader scheduler
ls_lock.reset();
{
let mut wcluster_info = self.cluster_info.write().unwrap();
let scheduled_leader = wcluster_info.get_scheduled_leader(entry_height);
match scheduled_leader {
//TODO: Handle the case where we don't know who the next
//scheduled leader is
None => (),
Some(leader_id) => wcluster_info.set_leader(leader_id),
}
}
// TODO: We can avoid building the bank again once RecordStage is
// integrated with BankingStage
let (bank, entry_height, _) =
Self::new_bank_from_ledger(&self.ledger_path, &mut *ls_lock);
self.bank = Arc::new(bank);
(
ls_lock
.get_scheduled_leader(entry_height)
.expect("Scheduled leader should exist after rebuilding bank"),
entry_height,
)
};
self.cluster_info
.write()
.unwrap()
.set_leader(scheduled_leader);
// Make a new RPU to serve requests out of the new bank we've created
// instead of the old one
@ -420,6 +448,7 @@ impl Fullnode {
.try_clone()
.expect("Failed to clone retransmit socket"),
Some(&self.ledger_path),
self.leader_scheduler.clone(),
);
let validator_state = ValidatorServices::new(tvu);
self.node_role = Some(NodeRole::Validator(validator_state));
@ -443,6 +472,7 @@ impl Fullnode {
&self.ledger_path,
self.sigverify_disabled,
entry_height,
self.leader_scheduler.clone(),
);
let broadcast_stage = BroadcastStage::new(
@ -453,26 +483,35 @@ impl Fullnode {
self.shared_window.clone(),
entry_height,
blob_receiver,
self.leader_scheduler.clone(),
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
self.node_role = Some(NodeRole::Leader(leader_state));
}
pub fn check_role_exited(&self) -> bool {
match self.node_role {
Some(NodeRole::Leader(ref leader_services)) => leader_services.is_exited(),
Some(NodeRole::Validator(ref validator_services)) => validator_services.is_exited(),
None => false,
}
}
pub fn handle_role_transition(&mut self) -> Result<Option<FullnodeReturnType>> {
let node_role = self.node_role.take();
match node_role {
Some(NodeRole::Leader(leader_services)) => match leader_services.join()? {
Some(TpuReturnType::LeaderRotation) => {
self.leader_to_validator()?;
Ok(Some(FullnodeReturnType::LeaderRotation))
Ok(Some(FullnodeReturnType::LeaderToValidatorRotation))
}
_ => Ok(None),
},
Some(NodeRole::Validator(validator_services)) => match validator_services.join()? {
Some(TvuReturnType::LeaderRotation(entry_height)) => {
self.validator_to_leader(entry_height);
Ok(Some(FullnodeReturnType::LeaderRotation))
Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation))
}
_ => Ok(None),
},
@ -498,22 +537,18 @@ impl Fullnode {
self.join()
}
// TODO: only used for testing, get rid of this once we have actual
// leader scheduling
pub fn set_scheduled_leader(&self, leader_id: Pubkey, entry_height: u64) {
self.cluster_info
.write()
.unwrap()
.set_scheduled_leader(entry_height, leader_id);
}
fn new_bank_from_ledger(ledger_path: &str) -> (Bank, u64, Vec<Entry>) {
pub fn new_bank_from_ledger(
ledger_path: &str,
leader_scheduler: &mut LeaderScheduler,
) -> (Bank, u64, Vec<Entry>) {
let bank = Bank::new_default(false);
let entries = read_ledger(ledger_path, true).expect("opening ledger");
let entries = entries
.map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err)));
info!("processing ledger...");
let (entry_height, ledger_tail) = bank.process_ledger(entries).expect("process_ledger");
let (entry_height, ledger_tail) = bank
.process_ledger(entries, leader_scheduler)
.expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
@ -534,12 +569,12 @@ impl Service for Fullnode {
match self.node_role {
Some(NodeRole::Validator(validator_service)) => {
if let Some(TvuReturnType::LeaderRotation(_)) = validator_service.join()? {
return Ok(Some(FullnodeReturnType::LeaderRotation));
return Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation));
}
}
Some(NodeRole::Leader(leader_service)) => {
if let Some(TpuReturnType::LeaderRotation) = leader_service.join()? {
return Ok(Some(FullnodeReturnType::LeaderRotation));
return Ok(Some(FullnodeReturnType::LeaderToValidatorRotation));
}
}
_ => (),
@ -554,7 +589,8 @@ mod tests {
use bank::Bank;
use cluster_info::Node;
use fullnode::{Fullnode, NodeRole, TvuReturnType};
use ledger::genesis;
use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use ledger::{genesis, LedgerWriter};
use packet::make_consecutive_blobs;
use service::Service;
use signature::{Keypair, KeypairUtil};
@ -581,7 +617,7 @@ mod tests {
Some(&entry),
&validator_ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(entry.id),
Some(0),
);
v.close().unwrap();
@ -609,7 +645,7 @@ mod tests {
Some(&entry),
&validator_ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(entry.id),
Some(0),
)
}).collect();
@ -627,6 +663,87 @@ mod tests {
}
}
#[test]
fn test_wrong_role_transition() {
// Create the leader node information
let bootstrap_leader_keypair = Keypair::new();
let bootstrap_leader_node =
Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey());
let bootstrap_leader_info = bootstrap_leader_node.info.clone();
// Create the validator node information
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
// Make a common mint and a genesis entry for both leader + validator's ledgers
let (mint, bootstrap_leader_ledger_path) = genesis("test_wrong_role_transition", 10_000);
let genesis_entries = mint.create_entries();
let last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
.id;
// Write the entries to the ledger that will cause leader rotation
// after the bootstrap height
let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();
let first_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id);
let ledger_initial_len = (genesis_entries.len() + first_entries.len()) as u64;
ledger_writer.write_entries(first_entries).unwrap();
// Create the common leader scheduling configuration
let num_slots_per_epoch = 3;
let leader_rotation_interval = 5;
let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval;
// Set the bootstrap height exactly the current ledger length, so that we can
// test if the bootstrap leader knows to immediately transition to a validator
// after parsing the ledger during startup
let bootstrap_height = ledger_initial_len;
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_leader_info.id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(ledger_initial_len),
);
// Test that a node knows to transition to a validator based on parsing the ledger
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
bootstrap_leader_keypair,
Some(bootstrap_leader_info.contact_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
);
match bootstrap_leader.node_role {
Some(NodeRole::Validator(_)) => (),
_ => {
panic!("Expected bootstrap leader to be a validator");
}
}
// Test that a node knows to transition to a leader based on parsing the ledger
let validator = Fullnode::new(
validator_node,
&bootstrap_leader_ledger_path,
validator_keypair,
Some(bootstrap_leader_info.contact_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
);
match validator.node_role {
Some(NodeRole::Leader(_)) => (),
_ => {
panic!("Expected node to be the leader");
}
}
}
#[test]
fn test_validator_to_leader_transition() {
// Make a leader identity
@ -635,29 +752,54 @@ mod tests {
let leader_id = leader_node.info.id;
let leader_ncp = leader_node.info.contact_info.ncp;
// Start the validator node
let leader_rotation_interval = 10;
// Create validator identity
let (mint, validator_ledger_path) = genesis("test_validator_to_leader_transition", 10_000);
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
let validator_info = validator_node.info.clone();
let genesis_entries = mint.create_entries();
let mut last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
.id;
// Write two entries so that the validator is in the active set:
//
// 1) Give the validator a nonzero number of tokens
// Write the bootstrap entries to the ledger that will cause leader rotation
// after the bootstrap height
//
// 2) A vote from the validator
let mut ledger_writer = LedgerWriter::open(&validator_ledger_path, false).unwrap();
let bootstrap_entries =
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id);
let bootstrap_entries_len = bootstrap_entries.len();
last_id = bootstrap_entries.last().unwrap().id;
ledger_writer.write_entries(bootstrap_entries).unwrap();
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64;
// Set the leader scheduler for the validator
let leader_rotation_interval = 10;
let num_bootstrap_slots = 2;
let bootstrap_height = num_bootstrap_slots * leader_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
leader_id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(leader_rotation_interval * 2),
Some(bootstrap_height),
);
// Start the validator
let mut validator = Fullnode::new(
validator_node,
&validator_ledger_path,
validator_keypair,
Some(leader_ncp),
false,
Some(leader_rotation_interval),
);
// Set the leader schedule for the validator
let my_leader_begin_epoch = 2;
for i in 0..my_leader_begin_epoch {
validator.set_scheduled_leader(leader_id, leader_rotation_interval * i);
}
validator.set_scheduled_leader(
validator_info.id,
my_leader_begin_epoch * leader_rotation_interval,
LeaderScheduler::new(&leader_scheduler_config),
);
// Send blobs to the validator from our mock leader
@ -679,19 +821,17 @@ mod tests {
// Send the blobs out of order, in reverse. Also send an extra
// "extra_blobs" number of blobs to make sure the window stops in the right place.
let extra_blobs = cmp::max(leader_rotation_interval / 3, 1);
let total_blobs_to_send =
my_leader_begin_epoch * leader_rotation_interval + extra_blobs;
let genesis_entries = mint.create_entries();
let last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
.id;
let total_blobs_to_send = bootstrap_height + extra_blobs;
let tvu_address = &validator_info.contact_info.tvu;
let msgs =
make_consecutive_blobs(leader_id, total_blobs_to_send, last_id, &tvu_address)
.into_iter()
.rev()
.collect();
let msgs = make_consecutive_blobs(
leader_id,
total_blobs_to_send,
ledger_initial_len,
last_id,
&tvu_address,
).into_iter()
.rev()
.collect();
s_responder.send(msgs).expect("send");
t_responder
};
@ -705,22 +845,21 @@ mod tests {
.expect("Expected successful validator join");
assert_eq!(
join_result,
Some(TvuReturnType::LeaderRotation(
my_leader_begin_epoch * leader_rotation_interval
))
Some(TvuReturnType::LeaderRotation(bootstrap_height))
);
}
_ => panic!("Role should not be leader"),
}
// Check the validator ledger to make sure it's the right height
let (_, entry_height, _) = Fullnode::new_bank_from_ledger(&validator_ledger_path);
assert_eq!(
entry_height,
my_leader_begin_epoch * leader_rotation_interval
// Check the validator ledger to make sure it's the right height, we should've
// transitioned after the bootstrap_height entry
let (_, entry_height, _) = Fullnode::new_bank_from_ledger(
&validator_ledger_path,
&mut LeaderScheduler::new(&leader_scheduler_config),
);
assert_eq!(entry_height, bootstrap_height);
// Shut down
t_responder.join().expect("responder thread join");
validator.close().unwrap();

View File

@ -2,18 +2,28 @@
//! managing the schedule for leader rotation
use bank::Bank;
use bincode::serialize;
use budget_instruction::Vote;
use budget_transaction::BudgetTransaction;
use byteorder::{LittleEndian, ReadBytesExt};
use hash::hash;
use entry::Entry;
use hash::{hash, Hash};
use signature::{Keypair, KeypairUtil};
#[cfg(test)]
use solana_program_interface::account::Account;
use solana_program_interface::pubkey::Pubkey;
use std::collections::HashMap;
use std::io::Cursor;
use system_transaction::SystemTransaction;
use transaction::Transaction;
pub const DEFAULT_BOOTSTRAP_HEIGHT: u64 = 1000;
pub const DEFAULT_LEADER_ROTATION_INTERVAL: u64 = 100;
pub const DEFAULT_SEED_ROTATION_INTERVAL: u64 = 1000;
pub const DEFAULT_ACTIVE_WINDOW_LENGTH: u64 = 1000;
#[derive(Debug)]
pub struct ActiveValidators {
// Map from validator id to the last PoH height at which they voted,
pub active_validators: HashMap<Pubkey, u64>,
@ -43,13 +53,17 @@ impl ActiveValidators {
// Note: height == 0 will only be included for all
// height < self.active_window_length
let upper_bound = height;
if height >= self.active_window_length {
let lower_bound = height - self.active_window_length;
self.active_validators
.retain(|_, height| *height > lower_bound);
}
self.active_validators.keys().cloned().collect()
self.active_validators
.iter()
.filter_map(|(k, v)| if *v <= upper_bound { Some(*k) } else { None })
.collect()
}
// Push a vote for a validator with id == "id" who voted at PoH height == "height"
@ -59,6 +73,10 @@ impl ActiveValidators {
*old_height = height;
}
}
pub fn reset(&mut self) -> () {
self.active_validators.clear();
}
}
pub struct LeaderSchedulerConfig {
@ -83,7 +101,6 @@ pub struct LeaderSchedulerConfig {
// Used to toggle leader rotation in fullnode so that tests that don't
// need leader rotation don't break
impl LeaderSchedulerConfig {
#[allow(dead_code)]
pub fn new(
bootstrap_leader: Pubkey,
bootstrap_height_option: Option<u64>,
@ -101,7 +118,12 @@ impl LeaderSchedulerConfig {
}
}
#[derive(Debug)]
pub struct LeaderScheduler {
// Set to true if we want the default implementation of the LeaderScheduler,
// where ony the bootstrap leader is used
pub use_only_bootstrap_leader: bool,
// The interval at which to rotate the leader, should be much less than
// seed_rotation_interval
pub leader_rotation_interval: u64,
@ -119,12 +141,12 @@ pub struct LeaderScheduler {
// Maintain the set of active validators
pub active_validators: ActiveValidators,
// The last height at which the seed + schedule was generated
pub last_seed_height: Option<u64>,
// Round-robin ordering for the validators
leader_schedule: Vec<Pubkey>,
// The last height at which the seed + schedule was generated
last_seed_height: Option<u64>,
// The seed used to determine the round robin order of leaders
seed: u64,
}
@ -147,6 +169,13 @@ pub struct LeaderScheduler {
// 3) When we we hit the next seed rotation PoH height, step 2) is executed again to
// calculate the leader schedule for the upcoming seed_rotation_interval PoH counts.
impl LeaderScheduler {
pub fn from_bootstrap_leader(bootstrap_leader: Pubkey) -> Self {
let config = LeaderSchedulerConfig::new(bootstrap_leader, None, None, None, None);
let mut leader_scheduler = LeaderScheduler::new(&config);
leader_scheduler.use_only_bootstrap_leader = true;
leader_scheduler
}
pub fn new(config: &LeaderSchedulerConfig) -> Self {
let mut bootstrap_height = DEFAULT_BOOTSTRAP_HEIGHT;
if let Some(input) = config.bootstrap_height_option {
@ -169,6 +198,7 @@ impl LeaderScheduler {
assert!(seed_rotation_interval % leader_rotation_interval == 0);
LeaderScheduler {
use_only_bootstrap_leader: false,
active_validators: ActiveValidators::new(config.active_window_length_option),
leader_rotation_interval,
seed_rotation_interval,
@ -180,15 +210,51 @@ impl LeaderScheduler {
}
}
pub fn is_leader_rotation_height(&self, height: u64) -> bool {
if self.use_only_bootstrap_leader {
return false;
}
if height < self.bootstrap_height {
return false;
}
(height - self.bootstrap_height) % self.leader_rotation_interval == 0
}
pub fn entries_until_next_leader_rotation(&self, height: u64) -> Option<u64> {
if self.use_only_bootstrap_leader {
return None;
}
if height < self.bootstrap_height {
Some(self.bootstrap_height - height)
} else {
Some(
self.leader_rotation_interval
- ((height - self.bootstrap_height) % self.leader_rotation_interval),
)
}
}
pub fn reset(&mut self) {
self.last_seed_height = None;
self.active_validators.reset();
}
pub fn push_vote(&mut self, id: Pubkey, height: u64) {
if self.use_only_bootstrap_leader {
return;
}
self.active_validators.push_vote(id, height);
}
fn get_active_set(&mut self, height: u64) -> Vec<Pubkey> {
self.active_validators.get_active_set(height)
}
pub fn update_height(&mut self, height: u64, bank: &Bank) {
if self.use_only_bootstrap_leader {
return;
}
if height < self.bootstrap_height {
return;
}
@ -201,6 +267,10 @@ impl LeaderScheduler {
// Uses the schedule generated by the last call to generate_schedule() to return the
// leader for a given PoH height in round-robin fashion
pub fn get_scheduled_leader(&self, height: u64) -> Option<Pubkey> {
if self.use_only_bootstrap_leader {
return Some(self.bootstrap_leader);
}
// This covers cases where the schedule isn't yet generated.
if self.last_seed_height == None {
if height < self.bootstrap_height {
@ -226,6 +296,10 @@ impl LeaderScheduler {
Some(self.leader_schedule[validator_index])
}
fn get_active_set(&mut self, height: u64) -> Vec<Pubkey> {
self.active_validators.get_active_set(height)
}
// Called every seed_rotation_interval entries, generates the leader schedule
// for the range of entries: [height, height + seed_rotation_interval)
fn generate_schedule(&mut self, height: u64, bank: &Bank) {
@ -238,6 +312,7 @@ impl LeaderScheduler {
// non-zero stake. In this case, use the bootstrap leader for
// the upcoming rounds
if ranked_active_set.is_empty() {
self.last_seed_height = Some(height);
self.leader_schedule = vec![self.bootstrap_leader];
self.last_seed_height = Some(height);
return;
@ -322,6 +397,57 @@ impl LeaderScheduler {
}
}
impl Default for LeaderScheduler {
// Create a dummy leader scheduler
fn default() -> Self {
let id = Keypair::new().pubkey();
Self::from_bootstrap_leader(id)
}
}
// Remove all candiates for leader selection from the active set by clearing the bank,
// and then set a single new candidate who will be eligible starting at height = vote_height
// by adding one new account to the bank
#[cfg(test)]
pub fn set_new_leader(bank: &Bank, leader_scheduler: &mut LeaderScheduler, vote_height: u64) {
// Set the scheduled next leader to some other node
let new_leader_keypair = Keypair::new();
let new_leader_id = new_leader_keypair.pubkey();
leader_scheduler.push_vote(new_leader_id, vote_height);
let dummy_id = Keypair::new().pubkey();
let new_account = Account::new(1, 10, dummy_id.clone());
// Remove the previous acounts from the active set
let mut accounts = bank.accounts().write().unwrap();
accounts.clear();
accounts.insert(new_leader_id, new_account);
}
// Create two entries so that the node with keypair == active_keypair
// is in the active set for leader selection:
// 1) Give the node a nonzero number of tokens,
// 2) A vote from the validator
pub fn make_active_set_entries(
active_keypair: &Keypair,
token_source: &Keypair,
last_id: &Hash,
) -> Vec<Entry> {
// 1) Create transfer token entry
let transfer_tx = Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_id);
let transfer_entry = Entry::new(last_id, 0, vec![transfer_tx]);
let last_id = transfer_entry.id;
// 2) Create vote entry
let vote = Vote {
version: 0,
contact_info_version: 0,
};
let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, last_id, 0);
let vote_entry = Entry::new(&last_id, 0, vec![vote_tx]);
vec![transfer_entry, vote_entry]
}
#[cfg(test)]
mod tests {
use bank::Bank;
@ -336,13 +462,6 @@ mod tests {
use std::hash::Hash;
use std::iter::FromIterator;
fn to_hashset<T>(slice: &[T]) -> HashSet<&T>
where
T: Eq + Hash,
{
HashSet::from_iter(slice.iter())
}
fn to_hashset_owned<T>(slice: &[T]) -> HashSet<T>
where
T: Eq + Hash + Clone,
@ -495,13 +614,11 @@ mod tests {
leader_scheduler.push_vote(pk, start_height + active_window_length);
}
let all_ids = old_ids.union(&new_ids).collect();
// Queries for the active set
let result = leader_scheduler.get_active_set(active_window_length + start_height - 1);
assert_eq!(result.len(), num_old_ids + num_new_ids);
let result_set = to_hashset(&result);
assert_eq!(result_set, all_ids);
assert_eq!(result.len(), num_old_ids);
let result_set = to_hashset_owned(&result);
assert_eq!(result_set, old_ids);
let result = leader_scheduler.get_active_set(active_window_length + start_height);
assert_eq!(result.len(), num_new_ids);
@ -769,18 +886,13 @@ mod tests {
for i in 0..=num_validators {
leader_scheduler.generate_schedule(i * active_window_length, &bank);
let result = &leader_scheduler.leader_schedule;
let mut expected_set;
if i == num_validators {
// When there are no active validators remaining, should default back to the
// bootstrap leader
expected_set = HashSet::new();
expected_set.insert(&bootstrap_leader_id);
let expected = if i == num_validators {
bootstrap_leader_id
} else {
assert_eq!(num_validators - i, result.len() as u64);
expected_set = to_hashset(&validators[i as usize..]);
}
let result_set = to_hashset(&result[..]);
assert_eq!(expected_set, result_set);
validators[i as usize]
};
assert_eq!(vec![expected], *result);
}
}

View File

@ -406,6 +406,7 @@ impl Blob {
pub fn make_consecutive_blobs(
me_id: Pubkey,
num_blobs_to_make: u64,
start_height: u64,
start_hash: Hash,
addr: &SocketAddr,
) -> SharedBlobs {
@ -415,7 +416,7 @@ pub fn make_consecutive_blobs(
for _ in 0..num_blobs_to_make {
all_entries.extend(next_entries_mut(&mut last_hash, &mut num_hashes, vec![]));
}
let mut new_blobs = all_entries.to_blobs_with_id(me_id, 0, addr);
let mut new_blobs = all_entries.to_blobs_with_id(me_id, start_height, addr);
new_blobs.truncate(num_blobs_to_make as usize);
new_blobs
}

View File

@ -4,11 +4,12 @@ use bank::Bank;
use cluster_info::ClusterInfo;
use counter::Counter;
use entry::EntryReceiver;
use leader_scheduler::LeaderScheduler;
use ledger::{Block, LedgerWriter};
use log::Level;
use result::{Error, Result};
use service::Service;
use signature::Keypair;
use signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc::channel;
@ -20,6 +21,11 @@ use std::time::Instant;
use streamer::{responder, BlobSender};
use vote_stage::send_validator_vote;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ReplicateStageReturnType {
LeaderRotation(u64),
}
// Implement a destructor for the ReplicateStage thread to signal it exited
// even on panics
struct Finalizer {
@ -39,7 +45,8 @@ impl Drop for Finalizer {
}
pub struct ReplicateStage {
thread_hdls: Vec<JoinHandle<()>>,
t_responder: JoinHandle<()>,
t_replicate: JoinHandle<Option<ReplicateStageReturnType>>,
}
impl ReplicateStage {
@ -51,6 +58,8 @@ impl ReplicateStage {
ledger_writer: Option<&mut LedgerWriter>,
keypair: &Arc<Keypair>,
vote_blob_sender: Option<&BlobSender>,
entry_height: &mut u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
let timer = Duration::new(1, 0);
//coalesce all the available entries into a single vote
@ -59,27 +68,72 @@ impl ReplicateStage {
entries.append(&mut more);
}
let res = bank.process_entries(&entries);
let mut res = Ok(());
{
let mut num_entries_to_write = entries.len();
for (i, entry) in entries.iter().enumerate() {
res = bank.process_entry(&entry);
Bank::process_entry_votes(
&bank,
&entry,
*entry_height + i as u64 + 1,
&mut *leader_scheduler.write().unwrap(),
);
{
let ls_lock = leader_scheduler.read().unwrap();
if ls_lock.is_leader_rotation_height(
// i is zero indexed, so current entry height for this entry is actually the
// old entry height + i + 1
*entry_height + i as u64 + 1,
) {
let my_id = keypair.pubkey();
let scheduled_leader =
ls_lock.get_scheduled_leader(*entry_height + i as u64 + 1).expect("Scheduled leader id should never be unknown while processing entries");
cluster_info.write().unwrap().set_leader(scheduled_leader);
if my_id == scheduled_leader {
num_entries_to_write = i + 1;
break;
}
}
}
if res.is_err() {
// TODO: This will return early from the first entry that has an erroneous
// transaction, instad of processing the rest of the entries in the vector
// of received entries. This is in line with previous behavior when
// bank.process_entries() was used to process the entries, but doesn't solve the
// issue that the bank state was still changed, leading to inconsistencies with the
// leader as the leader currently should not be publishing erroneous transactions
break;
}
}
// If leader rotation happened, only write the entries up to leader rotation.
entries.truncate(num_entries_to_write);
}
if let Some(sender) = vote_blob_sender {
send_validator_vote(bank, keypair, cluster_info, sender)?;
}
{
let mut wcluster_info = cluster_info.write().unwrap();
wcluster_info.insert_votes(&entries.votes());
}
cluster_info.write().unwrap().insert_votes(&entries.votes());
inc_new_counter_info!(
"replicate-transactions",
entries.iter().map(|x| x.transactions.len()).sum()
);
let entries_len = entries.len() as u64;
// TODO: move this to another stage?
// TODO: In line with previous behavior, this will write all the entries even if
// an error occurred processing one of the entries (causing the rest of the entries to
// not be processed).
if let Some(ledger_writer) = ledger_writer {
ledger_writer.write_entries(entries)?;
}
*entry_height += entries_len;
res?;
Ok(())
}
@ -91,6 +145,8 @@ impl ReplicateStage {
window_receiver: EntryReceiver,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
entry_height: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> Self {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
@ -105,7 +161,17 @@ impl ReplicateStage {
let _exit = Finalizer::new(exit);
let now = Instant::now();
let mut next_vote_secs = 1;
let mut entry_height_ = entry_height;
loop {
let leader_id = leader_scheduler
.read()
.unwrap()
.get_scheduled_leader(entry_height_)
.expect("Scheduled leader id should never be unknown at this point");
if leader_id == keypair.pubkey() {
return Some(ReplicateStageReturnType::LeaderRotation(entry_height_));
}
// Only vote once a second.
let vote_sender = if now.elapsed().as_secs() > next_vote_secs {
next_vote_secs += 1;
@ -121,6 +187,8 @@ impl ReplicateStage {
ledger_writer.as_mut(),
&keypair,
vote_sender,
&mut entry_height_,
&leader_scheduler,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
@ -129,21 +197,134 @@ impl ReplicateStage {
}
}
}
None
}).unwrap();
let thread_hdls = vec![t_responder, t_replicate];
ReplicateStage { thread_hdls }
ReplicateStage {
t_responder,
t_replicate,
}
}
}
impl Service for ReplicateStage {
type JoinReturnType = ();
type JoinReturnType = Option<ReplicateStageReturnType>;
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
fn join(self) -> thread::Result<Option<ReplicateStageReturnType>> {
self.t_responder.join()?;
self.t_replicate.join()
}
}
#[cfg(test)]
mod test {
use cluster_info::{ClusterInfo, Node};
use fullnode::Fullnode;
use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use ledger::{genesis, next_entries_mut, LedgerWriter};
use logger;
use replicate_stage::{ReplicateStage, ReplicateStageReturnType};
use service::Service;
use signature::{Keypair, KeypairUtil};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
#[test]
pub fn test_replicate_stage_leader_rotation_exit() {
logger::setup();
// Set up dummy node to host a ReplicateStage
let my_keypair = Keypair::new();
let my_id = my_keypair.pubkey();
let my_node = Node::new_localhost_with_pubkey(my_id);
let cluster_info_me = ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new");
// Create a ledger
let (mint, my_ledger_path) = genesis("test_replicate_stage_leader_rotation_exit", 10_000);
let genesis_entries = mint.create_entries();
let mut last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
.id;
// Write two entries to the ledger so that the validator is in the active set:
// 1) Give the validator a nonzero number of tokens 2) A vote from the validator .
// This will cause leader rotation after the bootstrap height
let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap();
let bootstrap_entries = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id);
last_id = bootstrap_entries.last().unwrap().id;
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64;
ledger_writer.write_entries(bootstrap_entries).unwrap();
// Set up the LeaderScheduler so that this this node becomes the leader at
// bootstrap_height = num_bootstrap_slots * leader_rotation_interval
let old_leader_id = Keypair::new().pubkey();
let leader_rotation_interval = 10;
let num_bootstrap_slots = 2;
let bootstrap_height = num_bootstrap_slots * leader_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
old_leader_id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(leader_rotation_interval * 2),
Some(bootstrap_height),
);
let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config);
// Set up the bank
let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
// Set up the replicate stage
let (entry_sender, entry_receiver) = channel();
let exit = Arc::new(AtomicBool::new(false));
let replicate_stage = ReplicateStage::new(
Arc::new(my_keypair),
Arc::new(bank),
Arc::new(RwLock::new(cluster_info_me)),
entry_receiver,
Some(&my_ledger_path),
exit.clone(),
ledger_initial_len,
leader_scheduler.clone(),
);
// Send enough entries to trigger leader rotation
let extra_entries = leader_rotation_interval;
let total_entries_to_send = (bootstrap_height + extra_entries) as usize;
let mut num_hashes = 0;
let mut entries_to_send = vec![];
while entries_to_send.len() < total_entries_to_send {
let entries = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
last_id = entries.last().expect("expected at least one entry").id;
entries_to_send.extend(entries);
}
entries_to_send.truncate(total_entries_to_send);
entry_sender.send(entries_to_send).unwrap();
// Wait for replicate_stage to exit and check return value is correct
assert_eq!(
Some(ReplicateStageReturnType::LeaderRotation(bootstrap_height)),
replicate_stage.join().expect("replicate stage join")
);
assert_eq!(exit.load(Ordering::Relaxed), true);
//Check ledger height is correct
let mut leader_scheduler = Arc::try_unwrap(leader_scheduler)
.expect("Multiple references to this RwLock still exist")
.into_inner()
.expect("RwLock for LeaderScheduler is still locked");
let (_, entry_height, _) =
Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler);
assert_eq!(entry_height, bootstrap_height);
}
}

View File

@ -1,6 +1,7 @@
use blob_fetch_stage::BlobFetchStage;
use cluster_info::{ClusterInfo, Node, NodeInfo};
use hash::{Hash, Hasher};
use leader_scheduler::LeaderScheduler;
use ncp::Ncp;
use service::Service;
use std::fs::File;
@ -22,13 +23,13 @@ use std::time::Duration;
use store_ledger_stage::StoreLedgerStage;
use streamer::BlobReceiver;
use window;
use window_service::{window_service, WindowServiceReturnType};
use window_service::window_service;
pub struct Replicator {
ncp: Ncp,
fetch_stage: BlobFetchStage,
store_ledger_stage: StoreLedgerStage,
t_window: JoinHandle<Option<WindowServiceReturnType>>,
t_window: JoinHandle<()>,
pub retransmit_receiver: BlobReceiver,
}
@ -82,8 +83,9 @@ impl Replicator {
));
let leader_info = network_addr.map(|i| NodeInfo::new_entry_point(&i));
let leader_pubkey;
if let Some(leader_info) = leader_info.as_ref() {
leader_pubkey = leader_info.id;
cluster_info.write().unwrap().insert(leader_info);
} else {
panic!("No leader info!");
@ -108,6 +110,9 @@ impl Replicator {
entry_window_sender,
retransmit_sender,
repair_socket,
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_pubkey,
))),
done,
);
@ -152,6 +157,7 @@ mod tests {
use cluster_info::Node;
use fullnode::Fullnode;
use hash::Hash;
use leader_scheduler::LeaderScheduler;
use ledger::{genesis, read_ledger, tmp_ledger_path};
use logger;
use replicator::sample_file;
@ -185,14 +191,13 @@ mod tests {
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let network_addr = leader_node.sockets.gossip.local_addr().unwrap();
let leader_info = leader_node.info.clone();
let leader_rotation_interval = 20;
let leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
None,
false,
Some(leader_rotation_interval),
LeaderScheduler::from_bootstrap_leader(leader_info.id),
);
let mut leader_client = mk_client(&leader_info);

View File

@ -3,6 +3,7 @@
use cluster_info::ClusterInfo;
use counter::Counter;
use entry::Entry;
use leader_scheduler::LeaderScheduler;
use log::Level;
use result::{Error, Result};
use service::Service;
@ -15,12 +16,7 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
use streamer::BlobReceiver;
use window::SharedWindow;
use window_service::{window_service, WindowServiceReturnType};
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum RetransmitStageReturnType {
LeaderRotation(u64),
}
use window_service::window_service;
fn retransmit(
cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -71,8 +67,7 @@ fn retransmitter(
}
pub struct RetransmitStage {
t_retransmit: JoinHandle<()>,
t_window: JoinHandle<Option<WindowServiceReturnType>>,
thread_hdls: Vec<JoinHandle<()>>,
}
impl RetransmitStage {
@ -83,6 +78,7 @@ impl RetransmitStage {
retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>,
fetch_stage_receiver: BlobReceiver,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> (Self, Receiver<Vec<Entry>>) {
let (retransmit_sender, retransmit_receiver) = channel();
@ -99,29 +95,22 @@ impl RetransmitStage {
entry_sender,
retransmit_sender,
repair_socket,
leader_scheduler,
done,
);
(
RetransmitStage {
t_window,
t_retransmit,
},
entry_receiver,
)
let thread_hdls = vec![t_retransmit, t_window];
(RetransmitStage { thread_hdls }, entry_receiver)
}
}
impl Service for RetransmitStage {
type JoinReturnType = Option<RetransmitStageReturnType>;
type JoinReturnType = ();
fn join(self) -> thread::Result<Option<RetransmitStageReturnType>> {
self.t_retransmit.join()?;
match self.t_window.join()? {
Some(WindowServiceReturnType::LeaderRotation(entry_height)) => Ok(Some(
RetransmitStageReturnType::LeaderRotation(entry_height),
)),
_ => Ok(None),
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
}
}

View File

@ -403,7 +403,7 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option<u64>) -> R
loop {
trace!("polling {:?} for leader from {:?}", leader_ncp, my_addr);
if let Some(l) = cluster_info.read().unwrap().leader_data() {
if let Some(l) = cluster_info.read().unwrap().get_gossip_top_leader() {
leader = Some(l.clone());
break;
}
@ -434,6 +434,7 @@ mod tests {
use bank::Bank;
use cluster_info::Node;
use fullnode::Fullnode;
use leader_scheduler::LeaderScheduler;
use ledger::LedgerWriter;
use logger;
use mint::Mint;
@ -476,7 +477,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(0),
);
sleep(Duration::from_millis(900));
@ -523,7 +524,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(0),
);
//TODO: remove this sleep, or add a retry so CI is stable
@ -583,7 +584,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(0),
);
sleep(Duration::from_millis(300));
@ -644,7 +645,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(0),
);
sleep(Duration::from_millis(900));

View File

@ -30,6 +30,7 @@ use banking_stage::{BankingStage, Config};
use cluster_info::ClusterInfo;
use entry::Entry;
use fetch_stage::FetchStage;
use leader_scheduler::LeaderScheduler;
use service::Service;
use signature::Keypair;
use sigverify_stage::SigVerifyStage;
@ -62,6 +63,7 @@ impl Tpu {
ledger_path: &str,
sigverify_disabled: bool,
entry_height: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> (Self, Receiver<Vec<Entry>>, Arc<AtomicBool>) {
let exit = Arc::new(AtomicBool::new(false));
@ -80,6 +82,7 @@ impl Tpu {
ledger_path,
entry_receiver,
entry_height,
leader_scheduler,
);
let tpu = Tpu {
@ -96,6 +99,10 @@ impl Tpu {
self.exit.store(true, Ordering::Relaxed);
}
pub fn is_exited(&self) -> bool {
self.exit.load(Ordering::Relaxed)
}
pub fn close(self) -> thread::Result<Option<TpuReturnType>> {
self.fetch_stage.close();
self.join()

View File

@ -62,9 +62,7 @@ impl Transaction {
let instructions = vec![Instruction {
program_ids_index: 0,
userdata,
accounts: (0..(transaction_keys.len() as u8 + 1))
.into_iter()
.collect(),
accounts: (0..=transaction_keys.len() as u8).collect(),
}];
Self::new_with_instructions(
from_keypair,
@ -164,7 +162,7 @@ impl Transaction {
true
}
fn from(&self) -> &Pubkey {
pub fn from(&self) -> &Pubkey {
&self.account_keys[0]
}

View File

@ -39,8 +39,9 @@
use bank::Bank;
use blob_fetch_stage::BlobFetchStage;
use cluster_info::ClusterInfo;
use replicate_stage::ReplicateStage;
use retransmit_stage::{RetransmitStage, RetransmitStageReturnType};
use leader_scheduler::LeaderScheduler;
use replicate_stage::{ReplicateStage, ReplicateStageReturnType};
use retransmit_stage::RetransmitStage;
use service::Service;
use signature::Keypair;
use std::net::UdpSocket;
@ -84,6 +85,7 @@ impl Tvu {
repair_socket: UdpSocket,
retransmit_socket: UdpSocket,
ledger_path: Option<&str>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> Self {
let exit = Arc::new(AtomicBool::new(false));
@ -103,6 +105,7 @@ impl Tvu {
Arc::new(retransmit_socket),
repair_socket,
blob_fetch_receiver,
leader_scheduler.clone(),
);
let replicate_stage = ReplicateStage::new(
@ -112,6 +115,8 @@ impl Tvu {
blob_window_receiver,
ledger_path,
exit.clone(),
entry_height,
leader_scheduler,
);
Tvu {
@ -122,6 +127,10 @@ impl Tvu {
}
}
pub fn is_exited(&self) -> bool {
self.exit.load(Ordering::Relaxed)
}
pub fn exit(&self) -> () {
self.exit.store(true, Ordering::Relaxed);
}
@ -136,10 +145,10 @@ impl Service for Tvu {
type JoinReturnType = Option<TvuReturnType>;
fn join(self) -> thread::Result<Option<TvuReturnType>> {
self.replicate_stage.join()?;
self.retransmit_stage.join()?;
self.fetch_stage.join()?;
match self.retransmit_stage.join()? {
Some(RetransmitStageReturnType::LeaderRotation(entry_height)) => {
match self.replicate_stage.join()? {
Some(ReplicateStageReturnType::LeaderRotation(entry_height)) => {
Ok(Some(TvuReturnType::LeaderRotation(entry_height)))
}
_ => Ok(None),
@ -154,6 +163,7 @@ pub mod tests {
use cluster_info::{ClusterInfo, Node};
use entry::Entry;
use hash::{hash, Hash};
use leader_scheduler::LeaderScheduler;
use logger;
use mint::Mint;
use ncp::Ncp;
@ -249,6 +259,9 @@ pub mod tests {
target1.sockets.repair,
target1.sockets.retransmit,
None,
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(
leader_id,
))),
);
let mut alice_ref_balance = starting_balance;

View File

@ -616,6 +616,7 @@ mod tests {
use cluster_info::Node;
use drone::run_local_drone;
use fullnode::Fullnode;
use leader_scheduler::LeaderScheduler;
use ledger::LedgerWriter;
use mint::Mint;
use serde_json::Value;
@ -950,7 +951,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(rpc_port),
);
sleep(Duration::from_millis(900));
@ -1025,7 +1026,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(rpc_port),
);
sleep(Duration::from_millis(900));
@ -1101,7 +1102,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(rpc_port),
);
sleep(Duration::from_millis(900));
@ -1222,7 +1223,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(rpc_port),
);
sleep(Duration::from_millis(900));
@ -1341,7 +1342,7 @@ mod tests {
None,
&ledger_path,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
Some(rpc_port),
);
sleep(Duration::from_millis(900));

View File

@ -5,6 +5,7 @@ use counter::Counter;
use entry::Entry;
#[cfg(feature = "erasure")]
use erasure;
use leader_scheduler::LeaderScheduler;
use ledger::{reconstruct_entries_from_blobs, Block};
use log::Level;
use packet::SharedBlob;
@ -51,6 +52,7 @@ pub trait WindowUtil {
/// Finds available slots, clears them, and returns their indices.
fn clear_slots(&mut self, consumed: u64, received: u64) -> Vec<u64>;
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
fn repair(
&mut self,
cluster_info: &Arc<RwLock<ClusterInfo>>,
@ -59,6 +61,7 @@ pub trait WindowUtil {
consumed: u64,
received: u64,
max_entry_height: u64,
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
) -> Vec<(SocketAddr, Vec<u8>)>;
fn print(&self, id: &Pubkey, consumed: u64) -> String;
@ -67,14 +70,12 @@ pub trait WindowUtil {
fn process_blob(
&mut self,
id: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
blob: SharedBlob,
pix: u64,
consume_queue: &mut Vec<Entry>,
consumed: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
leader_rotation_interval: u64,
);
}
@ -101,13 +102,40 @@ impl WindowUtil for Window {
consumed: u64,
received: u64,
max_entry_height: u64,
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
) -> Vec<(SocketAddr, Vec<u8>)> {
let rcluster_info = cluster_info.read().unwrap();
let leader_rotation_interval = rcluster_info.get_leader_rotation_interval();
// Calculate the next leader rotation height and check if we are the leader
let next_leader_rotation =
consumed + leader_rotation_interval - (consumed % leader_rotation_interval);
let is_next_leader = rcluster_info.get_scheduled_leader(next_leader_rotation) == Some(*id);
let mut is_next_leader = false;
{
let ls_lock = leader_scheduler_option.read().unwrap();
if !ls_lock.use_only_bootstrap_leader {
// Calculate the next leader rotation height and check if we are the leader
let next_leader_rotation_height = consumed + ls_lock.entries_until_next_leader_rotation(consumed).expect("Leader rotation should exist when not using default implementation of LeaderScheduler");
match ls_lock.get_scheduled_leader(next_leader_rotation_height) {
Some(leader_id) if leader_id == *id => is_next_leader = true,
// In the case that we are not in the current scope of the leader schedule
// window then either:
//
// 1) The replicate stage hasn't caught up to the "consumed" entries we sent,
// in which case it will eventually catch up
//
// 2) We are on the border between seed_rotation_intervals, so the
// schedule won't be known until the entry on that cusp is received
// by the replicate stage (which comes after this stage). Hence, the next
// leader at the beginning of that next epoch will not know he is the
// leader until he receives that last "cusp" entry. He also won't ask for repairs
// for that entry because "is_next_leader" won't be set here. In this case,
// everybody will be blocking waiting for that "cusp" entry instead of repairing,
// until the leader hits "times" >= the max times in calculate_max_repair().
// The impact of this, along with the similar problem from broadcast for the transitioning
// leader, can be observed in the multinode test, test_full_leader_validator_network(),
None => (),
_ => (),
}
}
}
let num_peers = rcluster_info.table.len() as u64;
let max_repair = if max_entry_height == 0 {
@ -196,14 +224,12 @@ impl WindowUtil for Window {
fn process_blob(
&mut self,
id: &Pubkey,
cluster_info: &Arc<RwLock<ClusterInfo>>,
blob: SharedBlob,
pix: u64,
consume_queue: &mut Vec<Entry>,
consumed: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
leader_rotation_interval: u64,
) {
let w = (pix % WINDOW_SIZE) as usize;
@ -258,18 +284,6 @@ impl WindowUtil for Window {
// push all contiguous blobs into consumed queue, increment consumed
loop {
if *consumed != 0 && *consumed % (leader_rotation_interval as u64) == 0 {
let rcluster_info = cluster_info.read().unwrap();
let my_id = rcluster_info.my_data().id;
match rcluster_info.get_scheduled_leader(*consumed) {
// If we are the next leader, exit
Some(id) if id == my_id => {
break;
}
_ => (),
}
}
let k = (*consumed % WINDOW_SIZE) as usize;
trace!("{}: k: {} consumed: {}", id, k, *consumed,);

View File

@ -3,6 +3,7 @@
use cluster_info::{ClusterInfo, NodeInfo};
use counter::Counter;
use entry::EntrySender;
use leader_scheduler::LeaderScheduler;
use log::Level;
use packet::SharedBlob;
use rand::{thread_rng, Rng};
@ -144,7 +145,6 @@ fn recv_window(
s: &EntrySender,
retransmit: &BlobSender,
pending_retransmits: &mut bool,
leader_rotation_interval: u64,
done: &Arc<AtomicBool>,
) -> Result<()> {
let timer = Duration::from_millis(200);
@ -204,14 +204,12 @@ fn recv_window(
window.write().unwrap().process_blob(
id,
cluster_info,
b,
pix,
&mut consume_queue,
consumed,
leader_unknown,
pending_retransmits,
leader_rotation_interval,
);
// Send a signal when we hit the max entry_height
@ -238,6 +236,7 @@ fn recv_window(
Ok(())
}
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn window_service(
cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
@ -247,8 +246,9 @@ pub fn window_service(
s: EntrySender,
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
done: Arc<AtomicBool>,
) -> JoinHandle<Option<WindowServiceReturnType>> {
) -> JoinHandle<()> {
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
@ -256,30 +256,11 @@ pub fn window_service(
let mut received = entry_height;
let mut last = entry_height;
let mut times = 0;
let id;
let leader_rotation_interval;
{
let rcluster_info = cluster_info.read().unwrap();
id = rcluster_info.id;
leader_rotation_interval = rcluster_info.get_leader_rotation_interval();
}
let id = cluster_info.read().unwrap().id;
let mut pending_retransmits = false;
trace!("{}: RECV_WINDOW started", id);
loop {
if consumed != 0 && consumed % (leader_rotation_interval as u64) == 0 {
match cluster_info.read().unwrap().get_scheduled_leader(consumed) {
// If we are the next leader, exit
Some(next_leader_id) if id == next_leader_id => {
return Some(WindowServiceReturnType::LeaderRotation(consumed));
}
// TODO: Figure out where to set the new leader in the cluster_info for
// validator -> validator transition (once we have real leader scheduling,
// this decision will be clearer). Also make sure new blobs to window actually
// originate from new leader
_ => (),
}
}
// Check if leader rotation was configured
if let Err(e) = recv_window(
&window,
&id,
@ -291,7 +272,6 @@ pub fn window_service(
&s,
&retransmit,
&mut pending_retransmits,
leader_rotation_interval,
&done,
) {
match e {
@ -322,7 +302,15 @@ pub fn window_service(
trace!("{} let's repair! times = {}", id, times);
let mut window = window.write().unwrap();
let reqs = window.repair(&cluster_info, &id, times, consumed, received, max_entry_height);
let reqs = window.repair(
&cluster_info,
&id,
times,
consumed,
received,
max_entry_height,
&leader_scheduler,
);
for (to, req) in reqs {
repair_socket.send_to(&req, to).unwrap_or_else(|e| {
info!("{} repair req send_to({}) error {:?}", id, to, e);
@ -330,7 +318,6 @@ pub fn window_service(
});
}
}
None
}).unwrap()
}
@ -339,9 +326,9 @@ mod test {
use cluster_info::{ClusterInfo, Node};
use entry::Entry;
use hash::Hash;
use leader_scheduler::LeaderScheduler;
use logger;
use packet::{make_consecutive_blobs, SharedBlob, PACKET_DATA_SIZE};
use signature::{Keypair, KeypairUtil};
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
@ -349,7 +336,7 @@ mod test {
use std::time::Duration;
use streamer::{blob_receiver, responder};
use window::default_window;
use window_service::{repair_backoff, window_service, WindowServiceReturnType};
use window_service::{repair_backoff, window_service};
fn get_entries(r: Receiver<Vec<Entry>>, num: &mut usize) {
for _t in 0..5 {
@ -391,6 +378,7 @@ mod test {
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
Arc::new(RwLock::new(LeaderScheduler::from_bootstrap_leader(me_id))),
done,
);
let t_responder = {
@ -401,11 +389,15 @@ mod test {
let t_responder = responder("window_send_test", blob_sockets[0].clone(), r_responder);
let mut num_blobs_to_make = 10;
let gossip_address = &tn.info.contact_info.ncp;
let msgs =
make_consecutive_blobs(me_id, num_blobs_to_make, Hash::default(), &gossip_address)
.into_iter()
.rev()
.collect();;
let msgs = make_consecutive_blobs(
me_id,
num_blobs_to_make,
0,
Hash::default(),
&gossip_address,
).into_iter()
.rev()
.collect();;
s_responder.send(msgs).expect("send");
t_responder
};
@ -448,6 +440,13 @@ mod test {
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
// TODO: For now, the window still checks the ClusterInfo for the current leader
// to determine whether to retransmit a block. In the future when we rely on
// the LeaderScheduler for retransmits, this test will need to be rewritten
// because a leader should only be unknown in the window when the write stage
// hasn't yet calculated the leaders for slots in the next epoch (on entries
// at heights that are multiples of seed_rotation_interval in LeaderScheduler)
Arc::new(RwLock::new(LeaderScheduler::default())),
done,
);
let t_responder = {
@ -504,6 +503,13 @@ mod test {
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
// TODO: For now, the window still checks the ClusterInfo for the current leader
// to determine whether to retransmit a block. In the future when we rely on
// the LeaderScheduler for retransmits, this test will need to be rewritten
// becasue a leader should only be unknown in the window when the write stage
// hasn't yet calculated the leaders for slots in the next epoch (on entries
// at heights that are multiples of seed_rotation_interval in LeaderScheduler)
Arc::new(RwLock::new(LeaderScheduler::default())),
done,
);
let t_responder = {
@ -585,85 +591,4 @@ mod test {
assert!(avg >= 3);
assert!(avg <= 5);
}
#[test]
pub fn test_window_leader_rotation_exit() {
logger::setup();
let leader_rotation_interval = 10;
// Height at which this node becomes the leader =
// my_leader_begin_epoch * leader_rotation_interval
let my_leader_begin_epoch = 2;
let tn = Node::new_localhost();
let exit = Arc::new(AtomicBool::new(false));
let mut cluster_info_me = ClusterInfo::new(tn.info.clone()).expect("ClusterInfo::new");
let me_id = cluster_info_me.my_data().id;
// Set myself in an upcoming epoch, but set the old_leader_id as the
// leader for all epochs before that
let old_leader_id = Keypair::new().pubkey();
cluster_info_me.set_leader(me_id);
cluster_info_me.set_leader_rotation_interval(leader_rotation_interval);
for i in 0..my_leader_begin_epoch {
cluster_info_me.set_scheduled_leader(leader_rotation_interval * i, old_leader_id);
}
cluster_info_me
.set_scheduled_leader(my_leader_begin_epoch * leader_rotation_interval, me_id);
let subs = Arc::new(RwLock::new(cluster_info_me));
let (s_reader, r_reader) = channel();
let t_receiver = blob_receiver(Arc::new(tn.sockets.gossip), exit.clone(), s_reader);
let (s_window, _r_window) = channel();
let (s_retransmit, _r_retransmit) = channel();
let win = Arc::new(RwLock::new(default_window()));
let done = Arc::new(AtomicBool::new(false));
let t_window = window_service(
subs,
win,
0,
0,
r_reader,
s_window,
s_retransmit,
Arc::new(tn.sockets.repair),
done,
);
let t_responder = {
let (s_responder, r_responder) = channel();
let blob_sockets: Vec<Arc<UdpSocket>> =
tn.sockets.replicate.into_iter().map(Arc::new).collect();
let t_responder = responder(
"test_window_leader_rotation_exit",
blob_sockets[0].clone(),
r_responder,
);
let ncp_address = &tn.info.contact_info.ncp;
// Send the blobs out of order, in reverse. Also send an extra leader_rotation_interval
// number of blobs to make sure the window stops in the right place.
let extra_blobs = leader_rotation_interval;
let total_blobs_to_send =
my_leader_begin_epoch * leader_rotation_interval + extra_blobs;
let msgs =
make_consecutive_blobs(me_id, total_blobs_to_send, Hash::default(), &ncp_address)
.into_iter()
.rev()
.collect();;
s_responder.send(msgs).expect("send");
t_responder
};
assert_eq!(
Some(WindowServiceReturnType::LeaderRotation(
my_leader_begin_epoch * leader_rotation_interval
)),
t_window.join().expect("window service join")
);
t_responder.join().expect("responder thread join");
exit.store(true, Ordering::Relaxed);
t_receiver.join().expect("receiver thread join");
}
}

View File

@ -3,14 +3,17 @@
//! stdout, and then sends the Entry to its output channel.
use bank::Bank;
use budget_transaction::BudgetTransaction;
use cluster_info::ClusterInfo;
use counter::Counter;
use entry::Entry;
use leader_scheduler::LeaderScheduler;
use ledger::{Block, LedgerWriter};
use log::Level;
use result::{Error, Result};
use service::Service;
use signature::Keypair;
use solana_program_interface::pubkey::Pubkey;
use std::cmp;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
@ -38,11 +41,19 @@ impl WriteStage {
// fit before we hit the entry height for leader rotation. Also return a boolean
// reflecting whether we actually hit an entry height for leader rotation.
fn find_leader_rotation_index(
cluster_info: &Arc<RwLock<ClusterInfo>>,
leader_rotation_interval: u64,
bank: &Arc<Bank>,
my_id: &Pubkey,
leader_scheduler: &mut LeaderScheduler,
entry_height: u64,
mut new_entries: Vec<Entry>,
) -> (Vec<Entry>, bool) {
// In the case we're using the default implemntation of the leader scheduler,
// there won't ever be leader rotations at particular entry heights, so just
// return the entire input vector of entries
if leader_scheduler.use_only_bootstrap_leader {
return (new_entries, false);
}
let new_entries_length = new_entries.len();
// i is the number of entries to take
@ -50,14 +61,11 @@ impl WriteStage {
let mut is_leader_rotation = false;
loop {
if (entry_height + i as u64) % leader_rotation_interval == 0 {
let rcluster_info = cluster_info.read().unwrap();
let my_id = rcluster_info.my_data().id;
let next_leader = rcluster_info.get_scheduled_leader(entry_height + i as u64);
if next_leader != Some(my_id) {
is_leader_rotation = true;
break;
}
let next_leader = leader_scheduler.get_scheduled_leader(entry_height + i as u64);
if next_leader != Some(*my_id) {
is_leader_rotation = true;
break;
}
if i == new_entries_length {
@ -66,16 +74,41 @@ impl WriteStage {
// Find out how many more entries we can squeeze in until the next leader
// rotation
let entries_until_leader_rotation =
leader_rotation_interval - (entry_height % leader_rotation_interval);
let entries_until_leader_rotation = leader_scheduler.entries_until_next_leader_rotation(
entry_height + (i as u64)
).expect("Leader rotation should exist when not using default implementation of LeaderScheduler");
// Check the next leader rotation height entries in new_entries, or
// if the new_entries doesnt have that many entries remaining,
// just check the rest of the new_entries_vector
i += cmp::min(
let step = cmp::min(
entries_until_leader_rotation as usize,
new_entries_length - i,
);
// 1) Since "i" is the current number/count of items from the new_entries vector we have
// have already checked, then "i" is also the INDEX into the new_entries vector of the
// next unprocessed entry. Hence this loop checks all entries between indexes:
// [entry_height + i, entry_height + i + step - 1], which is equivalent to the
// entry heights: [entry_height + i + 1, entry_height + i + step]
for (entry, new_entries_index) in new_entries[i..(i + step)].iter().zip(i..(i + step)) {
let votes = entry
.transactions
.iter()
.filter_map(BudgetTransaction::vote);
for (voter_id, _, _) in votes {
leader_scheduler
.push_vote(voter_id, entry_height + new_entries_index as u64 + 1);
}
// TODO: There's an issue here that the bank state may have updated
// while this entry was in flight from the BankingStage, which could cause
// the leader schedule, which is based on stake (tied to the bank account balances)
// right now, to be inconsistent with the rest of the network. Fix once
// we can pin PoH height to bank state
leader_scheduler.update_height(entry_height + new_entries_index as u64 + 1, bank);
}
i += step
}
new_entries.truncate(i as usize);
@ -86,12 +119,13 @@ impl WriteStage {
/// Process any Entry items that have been published by the RecordStage.
/// continuosly send entries out
pub fn write_and_send_entries(
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
ledger_writer: &mut LedgerWriter,
entry_sender: &Sender<Vec<Entry>>,
entry_receiver: &Receiver<Vec<Entry>>,
entry_height: &mut u64,
leader_rotation_interval: u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
let mut ventries = Vec::new();
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
@ -103,8 +137,9 @@ impl WriteStage {
// Find out how many more entries we can squeeze in until the next leader
// rotation
let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index(
cluster_info,
leader_rotation_interval,
bank,
&cluster_info.read().unwrap().my_data().id,
&mut *leader_scheduler.write().unwrap(),
*entry_height + num_new_entries as u64,
received_entries,
);
@ -182,6 +217,7 @@ impl WriteStage {
ledger_path: &str,
entry_receiver: Receiver<Vec<Entry>>,
entry_height: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> (Self, Receiver<Vec<Entry>>) {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
@ -198,44 +234,39 @@ impl WriteStage {
.spawn(move || {
let mut last_vote = 0;
let mut last_valid_validator_timestamp = 0;
let id;
let leader_rotation_interval;
{
let rcluster_info = cluster_info.read().unwrap();
id = rcluster_info.id;
leader_rotation_interval = rcluster_info.get_leader_rotation_interval();
}
let mut entry_height = entry_height;
let id = cluster_info.read().unwrap().id;
let mut entry_height_ = entry_height;
loop {
// Note that entry height is not zero indexed, it starts at 1, so the
// old leader is in power up to and including entry height
// n * leader_rotation_interval for some "n". Once we've forwarded
// that last block, check for the next scheduled leader.
if entry_height % (leader_rotation_interval as u64) == 0 {
let rcluster_info = cluster_info.read().unwrap();
let my_id = rcluster_info.my_data().id;
let scheduled_leader = rcluster_info.get_scheduled_leader(entry_height);
drop(rcluster_info);
match scheduled_leader {
Some(id) if id == my_id => (),
// If the leader stays in power for the next
// round as well, then we don't exit. Otherwise, exit.
_ => {
// When the broadcast stage has received the last blob, it
// will signal to close the fetch stage, which will in turn
// close down this write stage
return WriteStageReturnType::LeaderRotation;
}
match leader_scheduler
.read()
.unwrap()
.get_scheduled_leader(entry_height_)
{
Some(leader_id) if leader_id == id => (),
None => panic!(
"Scheduled leader id should never be unknown while processing entries"
),
_ => {
// If the leader is no longer in power, exit.
// When the broadcast stage has received the last blob, it
// will signal to close the fetch stage, which will in turn
// close down this write stage
return WriteStageReturnType::LeaderRotation;
}
}
if let Err(e) = Self::write_and_send_entries(
&bank,
&cluster_info,
&mut ledger_writer,
&entry_sender,
&entry_receiver,
&mut entry_height,
leader_rotation_interval,
&mut entry_height_,
&leader_scheduler,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
@ -295,9 +326,11 @@ mod tests {
use cluster_info::{ClusterInfo, Node};
use entry::Entry;
use hash::Hash;
use leader_scheduler::{set_new_leader, LeaderScheduler, LeaderSchedulerConfig};
use ledger::{genesis, next_entries_mut, read_ledger};
use service::Service;
use signature::{Keypair, KeypairUtil};
use solana_program_interface::account::Account;
use solana_program_interface::pubkey::Pubkey;
use std::fs::remove_dir_all;
use std::sync::mpsc::{channel, Receiver, Sender};
@ -305,14 +338,13 @@ mod tests {
use write_stage::{WriteStage, WriteStageReturnType};
struct DummyWriteStage {
my_id: Pubkey,
write_stage: WriteStage,
entry_sender: Sender<Vec<Entry>>,
_write_stage_entry_receiver: Receiver<Vec<Entry>>,
cluster_info: Arc<RwLock<ClusterInfo>>,
bank: Arc<Bank>,
leader_ledger_path: String,
ledger_tail: Vec<Entry>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
}
fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec<Entry>) {
@ -322,29 +354,34 @@ mod tests {
.map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err)));
info!("processing ledger...");
bank.process_ledger(entries).expect("process_ledger")
bank.process_ledger(entries, &mut LeaderScheduler::default())
.expect("process_ledger")
}
fn setup_dummy_write_stage(leader_rotation_interval: u64) -> DummyWriteStage {
fn setup_dummy_write_stage(
leader_keypair: Arc<Keypair>,
leader_scheduler_config: &LeaderSchedulerConfig,
test_name: &str,
) -> DummyWriteStage {
// Setup leader info
let leader_keypair = Arc::new(Keypair::new());
let my_id = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let mut cluster_info = ClusterInfo::new(leader_info.info).expect("ClusterInfo::new");
cluster_info.set_leader_rotation_interval(leader_rotation_interval);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let bank = Bank::new_default(true);
let bank = Arc::new(bank);
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(leader_info.info).expect("ClusterInfo::new"),
));
let bank = Arc::new(Bank::new_default(true));
// Make a ledger
let (_, leader_ledger_path) = genesis("test_leader_rotation_exit", 10_000);
let (_, leader_ledger_path) = genesis(test_name, 10_000);
let (entry_height, ledger_tail) = process_ledger(&leader_ledger_path, &bank);
// Make a dummy pipe
let (entry_sender, entry_receiver) = channel();
// Make a dummy leader scheduler we can manipulate
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(leader_scheduler_config)));
// Start up the write stage
let (write_stage, _write_stage_entry_receiver) = WriteStage::new(
leader_keypair,
@ -353,31 +390,45 @@ mod tests {
&leader_ledger_path,
entry_receiver,
entry_height,
leader_scheduler.clone(),
);
DummyWriteStage {
my_id,
write_stage,
entry_sender,
// Need to keep this alive, otherwise the write_stage will detect ChannelClosed
// and shut down
_write_stage_entry_receiver,
cluster_info,
bank,
leader_ledger_path,
ledger_tail,
leader_scheduler,
}
}
#[test]
fn test_write_stage_leader_rotation_exit() {
let leader_rotation_interval = 10;
let write_stage_info = setup_dummy_write_stage(leader_rotation_interval);
let leader_keypair = Keypair::new();
let leader_id = leader_keypair.pubkey();
{
let mut wcluster_info = write_stage_info.cluster_info.write().unwrap();
wcluster_info.set_scheduled_leader(leader_rotation_interval, write_stage_info.my_id);
}
// Make a dummy leader scheduler we can manipulate
let bootstrap_height = 20;
let leader_rotation_interval = 10;
let seed_rotation_interval = 2 * leader_rotation_interval;
let active_window = bootstrap_height + 3 * seed_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
leader_keypair.pubkey(),
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(active_window),
);
let write_stage_info = setup_dummy_write_stage(
Arc::new(leader_keypair),
&leader_scheduler_config,
"test_write_stage_leader_rotation_exit",
);
let mut last_id = write_stage_info
.ledger_tail
@ -388,30 +439,39 @@ mod tests {
let genesis_entry_height = write_stage_info.ledger_tail.len() as u64;
// Insert a nonzero balance and vote into the bank to make this node eligible
// for leader selection
write_stage_info
.leader_scheduler
.write()
.unwrap()
.push_vote(leader_id, 1);
let dummy_id = Keypair::new().pubkey();
let accounts = write_stage_info.bank.accounts();
let new_account = Account::new(1, 10, dummy_id.clone());
accounts
.write()
.unwrap()
.insert(leader_id, new_account.clone());
// Input enough entries to make exactly leader_rotation_interval entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
for _ in genesis_entry_height..leader_rotation_interval {
for _ in genesis_entry_height..bootstrap_height {
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
write_stage_info.entry_sender.send(new_entry).unwrap();
}
// Set the scheduled next leader in the cluster_info to some other node
let leader2_keypair = Keypair::new();
let leader2_info = Node::new_localhost_with_pubkey(leader2_keypair.pubkey());
// Set the next scheduled next leader to some other node
{
let mut wcluster_info = write_stage_info.cluster_info.write().unwrap();
wcluster_info.insert(&leader2_info.info);
wcluster_info
.set_scheduled_leader(2 * leader_rotation_interval, leader2_keypair.pubkey());
let mut leader_scheduler = write_stage_info.leader_scheduler.write().unwrap();
set_new_leader(&write_stage_info.bank, &mut (*leader_scheduler), 1);
}
// Input another leader_rotation_interval dummy entries one at a time,
// which will take us past the point of the leader rotation.
// Input enough dummy entries until the next seed rotation_interval,
// The write_stage will see that it's no longer the leader after
// checking the schedule, and exit
for _ in 0..leader_rotation_interval {
for _ in 0..seed_rotation_interval {
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
write_stage_info.entry_sender.send(new_entry).unwrap();
}
@ -428,120 +488,240 @@ mod tests {
assert_eq!(entry_height, 2 * leader_rotation_interval);
}
fn make_leader_scheduler(
my_id: Pubkey,
bootstrap_height: u64,
leader_rotation_interval: u64,
seed_rotation_interval: u64,
active_window: u64,
) -> LeaderScheduler {
let leader_scheduler_config = LeaderSchedulerConfig::new(
my_id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(active_window),
);
let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config);
leader_scheduler.push_vote(my_id, 1);
leader_scheduler
}
#[test]
fn test_leader_index_calculation() {
// Tests for when the leader across slots and epochs are the same
fn test_same_leader_index_calculation() {
// Set up a dummy node
let leader_keypair = Arc::new(Keypair::new());
let my_id = leader_keypair.pubkey();
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let my_keypair = Arc::new(Keypair::new());
let my_id = my_keypair.pubkey();
let leader_rotation_interval = 10;
// Set up a dummy bank
let bank = Arc::new(Bank::new_default(true));
let accounts = bank.accounts();
let dummy_id = Keypair::new().pubkey();
let new_account = Account::new(1, 10, dummy_id.clone());
accounts.write().unwrap().insert(my_id, new_account.clone());
// An epoch is the period of leader_rotation_interval entries
// time during which a leader is in power
let num_epochs = 3;
let mut cluster_info = ClusterInfo::new(leader_info.info).expect("ClusterInfo::new");
cluster_info.set_leader_rotation_interval(leader_rotation_interval as u64);
for i in 0..num_epochs {
cluster_info.set_scheduled_leader(i * leader_rotation_interval, my_id)
}
let cluster_info = Arc::new(RwLock::new(cluster_info));
let entry = Entry::new(&Hash::default(), 0, vec![]);
// A vector that is completely within a certain epoch should return that
// Note: An slot is the period of leader_rotation_interval entries
// time during which a leader is in power
let leader_rotation_interval = 10;
let bootstrap_height = 17;
let seed_rotation_interval = 3 * leader_rotation_interval;
let active_window = bootstrap_height + 3 * seed_rotation_interval;
let mut leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
seed_rotation_interval,
active_window,
);
// Case 1: A vector that is completely within a certain slot should return that
// entire vector
let mut len = leader_rotation_interval as usize - 1;
let mut len = (leader_scheduler.bootstrap_height - 1) as usize;
let mut input = vec![entry.clone(); len];
let mut result = WriteStage::find_leader_rotation_index(
&cluster_info,
leader_rotation_interval,
(num_epochs - 1) * leader_rotation_interval,
&bank,
&my_id,
&mut leader_scheduler,
0,
input.clone(),
);
assert_eq!(result, (input, false));
// A vector that spans two different epochs for different leaders
// should get truncated
len = leader_rotation_interval as usize - 1;
// Case 2: A vector of new entries that spans multiple slots should return the
// entire vector, assuming that the same leader is in power for all the slots.
len = 2 * seed_rotation_interval as usize;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&cluster_info,
leader_rotation_interval,
(num_epochs * leader_rotation_interval) - 1,
&bank,
&my_id,
&mut leader_scheduler,
bootstrap_height - 1,
input.clone(),
);
input.truncate(1);
assert_eq!(result, (input, true));
assert_eq!(result, (input, false));
// A vector that triggers a check for leader rotation should return
// Case 3: A vector that triggers a check for leader rotation should return
// the entire vector and signal leader_rotation == false, if the
// same leader is in power for the next epoch as well.
// same leader is in power for the next slot as well.
let mut leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
seed_rotation_interval,
active_window,
);
len = 1;
let mut input = vec![entry.clone(); len];
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&cluster_info,
leader_rotation_interval,
leader_rotation_interval - 1,
&bank,
&my_id,
&mut leader_scheduler,
bootstrap_height - 1,
input.clone(),
);
assert_eq!(result, (input.clone(), false));
result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
bootstrap_height + seed_rotation_interval - 1,
input.clone(),
);
assert_eq!(result, (input, false));
}
// A vector of new entries that spans two epochs should return the
// entire vector, assuming that the same leader is in power for both epochs.
len = leader_rotation_interval as usize;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&cluster_info,
// Tests for when the leader across slots / epochs are different
#[test]
fn test_different_leader_index_calculation() {
// Set up a dummy node
let my_keypair = Arc::new(Keypair::new());
let my_id = my_keypair.pubkey();
// Set up a dummy bank
let bank = Arc::new(Bank::new_default(true));
let entry = Entry::new(&Hash::default(), 0, vec![]);
// Note: An slot is the period of leader_rotation_interval entries
// time during which a leader is in power
let leader_rotation_interval = 10;
let bootstrap_height = 17;
let seed_rotation_interval = 3 * leader_rotation_interval;
let active_window = 1;
let swap_entry_height = bootstrap_height + 2 * seed_rotation_interval;
// Case 1: A vector that spans different epochs for different leaders
// should get truncated
// Set the leader scheduler
let mut leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
leader_rotation_interval - 1,
input.clone(),
seed_rotation_interval,
active_window,
);
assert_eq!(result, (input, false));
set_new_leader(&bank, &mut leader_scheduler, swap_entry_height);
// A vector of new entries that spans multiple epochs should return the
// entire vector, assuming that the same leader is in power for both dynasties.
len = (num_epochs - 1) as usize * leader_rotation_interval as usize;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&cluster_info,
leader_rotation_interval,
leader_rotation_interval - 1,
// Start test
let mut start_height = bootstrap_height - 1;
let extra_len = leader_rotation_interval;
let expected_len = swap_entry_height - start_height;
let mut len = expected_len + extra_len;
let mut input = vec![entry.clone(); len as usize];
let mut result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
start_height,
input.clone(),
);
assert_eq!(result, (input, false));
// A vector of new entries that spans multiple leader epochs and has a length
// exactly equal to the remainining number of entries before the next, different
// leader should return the entire vector and signal that leader_rotation == true.
len = (num_epochs - 1) as usize * leader_rotation_interval as usize + 1;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&cluster_info,
leader_rotation_interval,
leader_rotation_interval - 1,
input.clone(),
);
input.truncate(expected_len as usize);
assert_eq!(result, (input, true));
// Start at entry height == the height for leader rotation, should return
// no entries.
len = leader_rotation_interval as usize;
input = vec![entry.clone(); len];
// Case 2: Start at entry height == the height where the next leader is elected, should
// return no entries
len = 1;
input = vec![entry.clone(); len as usize];
result = WriteStage::find_leader_rotation_index(
&cluster_info,
leader_rotation_interval,
num_epochs * leader_rotation_interval,
&bank,
&my_id,
&mut leader_scheduler,
swap_entry_height,
input.clone(),
);
assert_eq!(result, (vec![], true));
// Case 3: A vector that lands one before leader rotation happens should not be
// truncated, and should signal leader rotation == false
// Reset the leader scheduler
leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
seed_rotation_interval,
active_window,
);
set_new_leader(&bank, &mut leader_scheduler, swap_entry_height);
// Start test
start_height = bootstrap_height - 1;
let len_remaining = swap_entry_height - start_height;
len = len_remaining - 1;
input = vec![entry.clone(); len as usize];
result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
start_height,
input.clone(),
);
assert_eq!(result, (input, false));
// Case 4: A vector that lands exactly where leader rotation happens should not be
// truncated, but should return leader rotation == true
// Reset the leader scheduler
leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
seed_rotation_interval,
active_window,
);
set_new_leader(&bank, &mut leader_scheduler, swap_entry_height);
// Generate the schedule
leader_scheduler.update_height(bootstrap_height, &bank);
// Start test
start_height = bootstrap_height + leader_rotation_interval - 1;
len = swap_entry_height - start_height;
input = vec![entry.clone(); len as usize];
result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
start_height,
input.clone(),
);
assert_eq!(result, (input, true));
}
}

View File

@ -10,6 +10,7 @@ use solana::cluster_info::{ClusterInfo, Node, NodeInfo};
use solana::entry::Entry;
use solana::fullnode::{Fullnode, FullnodeReturnType};
use solana::hash::Hash;
use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use solana::ledger::{read_ledger, LedgerWriter};
use solana::logger;
use solana::mint::Mint;
@ -21,21 +22,22 @@ use solana::thin_client::ThinClient;
use solana::timing::{duration_as_ms, duration_as_s};
use solana::window::{default_window, WINDOW_SIZE};
use solana_program_interface::pubkey::Pubkey;
use std::collections::{HashSet, VecDeque};
use std::env;
use std::fs::{copy, create_dir_all, remove_dir_all};
use std::net::UdpSocket;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::Builder;
use std::thread::{sleep, Builder, JoinHandle};
use std::time::{Duration, Instant};
fn make_spy_node(leader: &NodeInfo) -> (Ncp, Arc<RwLock<ClusterInfo>>, Pubkey) {
let exit = Arc::new(AtomicBool::new(false));
let mut spy = Node::new_localhost();
let me = spy.info.id.clone();
spy.info.contact_info.tvu = spy.sockets.replicate[0].local_addr().unwrap();
let daddr = "0.0.0.0:0".parse().unwrap();
spy.info.contact_info.tvu = daddr;
spy.info.contact_info.rpu = spy.sockets.transaction[0].local_addr().unwrap();
let mut spy_cluster_info = ClusterInfo::new(spy.info).expect("ClusterInfo::new");
spy_cluster_info.insert(&leader);
@ -150,7 +152,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
leader_keypair,
None,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
// Send leader some tokens to vote
@ -170,7 +172,7 @@ fn test_multi_node_ledger_window() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
// Send validator some tokens to vote
@ -240,7 +242,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
leader_keypair,
None,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
// Send leader some tokens to vote
@ -271,7 +273,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
nodes.push(val);
}
@ -307,7 +309,7 @@ fn test_multi_node_validator_catchup_from_zero() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
nodes.push(val);
//contains the leader and new node
@ -373,7 +375,7 @@ fn test_multi_node_basic() {
leader_keypair,
None,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
// Send leader some tokens to vote
@ -401,7 +403,7 @@ fn test_multi_node_basic() {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
nodes.push(val);
}
@ -437,6 +439,7 @@ fn test_multi_node_basic() {
fn test_boot_validator_from_file() -> result::Result<()> {
logger::setup();
let leader_keypair = Keypair::new();
let leader_pubkey = leader_keypair.pubkey();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let bob_pubkey = Keypair::new().pubkey();
let (alice, leader_ledger_path, _) = genesis("boot_validator_from_file", 100_000);
@ -450,7 +453,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
leader_keypair,
None,
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
let leader_balance =
send_tx_and_retry_get_balance(&leader_data, &alice, &bob_pubkey, 500, Some(500)).unwrap();
@ -470,7 +473,7 @@ fn test_boot_validator_from_file() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
let mut client = mk_client(&validator_data);
let getbal = retry_get_balance(&mut client, &bob_pubkey, Some(leader_balance));
@ -489,7 +492,14 @@ fn create_leader(ledger_path: &str) -> (NodeInfo, Fullnode) {
let leader_keypair = Keypair::new();
let leader = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_data = leader.info.clone();
let leader_fullnode = Fullnode::new(leader, &ledger_path, leader_keypair, None, false, None);
let leader_fullnode = Fullnode::new(
leader,
&ledger_path,
leader_keypair,
None,
false,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
);
(leader_data, leader_fullnode)
}
@ -543,7 +553,7 @@ fn test_leader_restart_validator_start_from_old_ledger() -> result::Result<()> {
keypair,
Some(leader_data.contact_info.ncp),
false,
None,
LeaderScheduler::from_bootstrap_leader(leader_data.id),
);
// trigger broadcast, validator should catch up from leader, whose window contains
@ -607,7 +617,7 @@ fn test_multi_node_dynamic_network() {
leader_keypair,
None,
true,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
// Send leader some tokens to vote
@ -681,7 +691,7 @@ fn test_multi_node_dynamic_network() {
keypair,
Some(leader_data.contact_info.ncp),
true,
None,
LeaderScheduler::from_bootstrap_leader(leader_pubkey),
);
(rd, val)
}).unwrap()
@ -775,39 +785,57 @@ fn test_multi_node_dynamic_network() {
}
#[test]
#[ignore]
fn test_leader_to_validator_transition() {
logger::setup();
let leader_rotation_interval = 20;
// Make a dummy address to be the sink for this test's mock transactions
let bob_pubkey = Keypair::new().pubkey();
// Make a dummy validator id to be the next leader and
// sink for this test's mock transactions
let validator_keypair = Keypair::new();
let validator_id = validator_keypair.pubkey();
// Initialize the leader ledger. Make a mint and a genesis entry
// in the leader ledger
let (mint, leader_ledger_path, entries) = genesis(
"test_leader_to_validator_transition",
(3 * leader_rotation_interval) as i64,
);
let genesis_height = entries.len() as u64;
// Start the leader node
// Create the leader node information
let leader_keypair = Keypair::new();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_info = leader_node.info.clone();
// Initialize the leader ledger. Make a mint and a genesis entry
// in the leader ledger
let (mint, leader_ledger_path, genesis_entries) =
genesis("test_leader_to_validator_transition", 10_000);
let last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
.id;
// Write the bootstrap entries to the ledger that will cause leader rotation
// after the bootstrap height
let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id);
let bootstrap_entries_len = bootstrap_entries.len();
ledger_writer.write_entries(bootstrap_entries).unwrap();
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64;
// Start the leader node
let bootstrap_height = leader_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
leader_info.id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(leader_rotation_interval * 2),
Some(bootstrap_height),
);
let mut leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
None,
Some(leader_info.contact_info.ncp),
false,
Some(leader_rotation_interval),
LeaderScheduler::new(&leader_scheduler_config),
);
// Set the next leader to be Bob
leader.set_scheduled_leader(bob_pubkey, leader_rotation_interval);
// Make an extra node for our leader to broadcast to,
// who won't vote and mess with our leader's entry count
let (ncp, spy_node, _) = make_spy_node(&leader_info);
@ -828,55 +856,66 @@ fn test_leader_to_validator_transition() {
assert!(converged);
let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1);
let extra_transactions = std::cmp::max(bootstrap_height / 3, 1);
// Push leader "extra_transactions" past leader_rotation_interval entry height,
// Push leader "extra_transactions" past bootstrap_height,
// make sure the leader stops.
assert!(genesis_height < leader_rotation_interval);
for i in genesis_height..(leader_rotation_interval + extra_transactions) {
if i < leader_rotation_interval {
assert!(ledger_initial_len < bootstrap_height);
for i in ledger_initial_len..(bootstrap_height + extra_transactions) {
if i < bootstrap_height {
// Poll to see that the bank state is updated after every transaction
// to ensure that each transaction is packaged as a single entry,
// so that we can be sure leader rotation is triggered
let expected_balance = std::cmp::min(
leader_rotation_interval - genesis_height,
i - genesis_height + 1,
bootstrap_height - ledger_initial_len,
i - ledger_initial_len + 1,
);
let result = send_tx_and_retry_get_balance(
&leader_info,
&mint,
&bob_pubkey,
&validator_id,
1,
Some(expected_balance as i64),
);
assert_eq!(result, Some(expected_balance as i64))
// If the transaction wasn't reflected in the node, then we assume
// the node has transitioned already
if result != Some(expected_balance as i64) {
break;
}
} else {
// After leader_rotation_interval entries, we don't care about the
// After bootstrap entries, we don't care about the
// number of entries generated by these transactions. These are just
// here for testing to make sure the leader stopped at the correct point.
send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None);
send_tx_and_retry_get_balance(&leader_info, &mint, &validator_id, 1, None);
}
}
// Wait for leader to shut down tpu and restart tvu
match leader.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderRotation) => (),
Some(FullnodeReturnType::LeaderToValidatorRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
// Query now validator to make sure that he has the proper balances in his bank
// after the transition, even though we submitted "extra_transactions"
// after the transitions, even though we submitted "extra_transactions"
// transactions earlier
let mut leader_client = mk_client(&leader_info);
let expected_bal = leader_rotation_interval - genesis_height;
let maximum_bal = bootstrap_height - ledger_initial_len;
let bal = leader_client
.poll_get_balance(&bob_pubkey)
.poll_get_balance(&validator_id)
.expect("Expected success when polling newly transitioned validator for balance")
as u64;
assert_eq!(bal, expected_bal);
assert!(bal <= maximum_bal);
// Check the ledger to make sure it's the right height, we should've
// transitioned after the bootstrap_height entry
let (_, entry_height, _) =
Fullnode::new_bank_from_ledger(&leader_ledger_path, &mut LeaderScheduler::default());
assert_eq!(entry_height, bootstrap_height);
// Shut down
ncp.close().unwrap();
@ -885,7 +924,6 @@ fn test_leader_to_validator_transition() {
}
#[test]
#[ignore]
fn test_leader_validator_basic() {
logger::setup();
let leader_rotation_interval = 10;
@ -893,61 +931,67 @@ fn test_leader_validator_basic() {
// Account that will be the sink for all the test's transactions
let bob_pubkey = Keypair::new().pubkey();
// Make a mint and a genesis entry in the leader ledger
let (mint, leader_ledger_path, genesis_entries) =
genesis("test_leader_validator_basic", 10_000);
let genesis_height = genesis_entries.len();
// Initialize the leader ledger
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
// Create the leader fullnode
// Create the leader node information
let leader_keypair = Keypair::new();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let leader_info = leader_node.info.clone();
// Create the validator node information
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
// Make a common mint and a genesis entry for both leader + validator ledgers
let (mint, leader_ledger_path, genesis_entries) =
genesis("test_leader_validator_basic", 10_000);
let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic");
let last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
.id;
let genesis_height = genesis_entries.len();
// Initialize both leader + validator ledger
let mut ledger_paths = Vec::new();
ledger_paths.push(leader_ledger_path.clone());
ledger_paths.push(validator_ledger_path.clone());
// Write the bootstrap entries to the ledger that will cause leader rotation
// after the bootstrap height
let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id);
ledger_writer.write_entries(bootstrap_entries).unwrap();
// Create the leader scheduler config
let num_bootstrap_slots = 2;
let bootstrap_height = num_bootstrap_slots * leader_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
leader_info.id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(leader_rotation_interval * 2),
Some(bootstrap_height),
);
// Start the leader fullnode
let mut leader = Fullnode::new(
leader_node,
&leader_ledger_path,
leader_keypair,
None,
Some(leader_info.contact_info.ncp),
false,
Some(leader_rotation_interval),
LeaderScheduler::new(&leader_scheduler_config),
);
// Send leader some tokens to vote
send_tx_and_retry_get_balance(&leader_info, &mint, &leader_info.id, 500, None).unwrap();
// Start the validator node
let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic");
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
let validator_info = validator_node.info.clone();
let mut validator = Fullnode::new(
validator_node,
&validator_ledger_path,
validator_keypair,
Some(leader_info.contact_info.ncp),
false,
Some(leader_rotation_interval),
);
ledger_paths.push(validator_ledger_path.clone());
// Set the leader schedule for the validator and leader
let my_leader_begin_epoch = 2;
for i in 0..my_leader_begin_epoch {
validator.set_scheduled_leader(leader_info.id, leader_rotation_interval * i);
leader.set_scheduled_leader(leader_info.id, leader_rotation_interval * i);
}
validator.set_scheduled_leader(
validator_info.id,
my_leader_begin_epoch * leader_rotation_interval,
);
leader.set_scheduled_leader(
validator_info.id,
my_leader_begin_epoch * leader_rotation_interval,
LeaderScheduler::new(&leader_scheduler_config),
);
// Wait for convergence
@ -956,8 +1000,7 @@ fn test_leader_validator_basic() {
// Send transactions to the leader
let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1);
let total_transactions_to_send =
my_leader_begin_epoch * leader_rotation_interval + extra_transactions;
let total_transactions_to_send = bootstrap_height + extra_transactions;
// Push "extra_transactions" past leader_rotation_interval entry height,
// make sure the validator stops.
@ -967,20 +1010,13 @@ fn test_leader_validator_basic() {
// Wait for validator to shut down tvu and restart tpu
match validator.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderRotation) => (),
Some(FullnodeReturnType::ValidatorToLeaderRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
// TODO: We ignore this test for now b/c there's a chance here that the cluster_info
// in the new leader calls the dummy sequence of update_leader -> top_leader()
// (see the TODOs in those functions) during gossip and sets the leader back
// to the old leader, which causes a panic from an assertion failure in cluster_info broadcast(),
// specifically: assert!(me.leader_id != v.id). We can enable this test once we have real
// leader scheduling
// Wait for the leader to shut down tpu and restart tvu
match leader.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderRotation) => (),
Some(FullnodeReturnType::LeaderToValidatorRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
@ -994,15 +1030,393 @@ fn test_leader_validator_basic() {
let validator_entries =
read_ledger(&validator_ledger_path, true).expect("Expected parsing of validator ledger");
let leader_entries =
read_ledger(&validator_ledger_path, true).expect("Expected parsing of leader ledger");
read_ledger(&leader_ledger_path, true).expect("Expected parsing of leader ledger");
let mut min_len = 0;
for (v, l) in validator_entries.zip(leader_entries) {
min_len += 1;
assert_eq!(
v.expect("expected valid validator entry"),
l.expect("expected valid leader entry")
);
}
assert!(min_len >= bootstrap_height);
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
}
fn run_node(
id: Pubkey,
fullnode: Arc<RwLock<Fullnode>>,
should_exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name(format!("run_node-{:?}", id).to_string())
.spawn(move || loop {
if should_exit.load(Ordering::Relaxed) {
return;
}
if fullnode.read().unwrap().check_role_exited() {
match fullnode.write().unwrap().handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderToValidatorRotation) => (),
Some(FullnodeReturnType::ValidatorToLeaderRotation) => (),
_ => {
panic!("Expected reason for exit to be leader rotation");
}
};
}
sleep(Duration::new(1, 0));
}).unwrap()
}
#[test]
#[ignore]
fn test_dropped_handoff_recovery() {
logger::setup();
// The number of validators
const N: usize = 3;
assert!(N > 1);
logger::setup();
// Create the bootstrap leader node information
let bootstrap_leader_keypair = Keypair::new();
let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey());
let bootstrap_leader_info = bootstrap_leader_node.info.clone();
// Make a common mint and a genesis entry for both leader + validator's ledgers
let (mint, bootstrap_leader_ledger_path, genesis_entries) =
genesis("test_dropped_handoff_recovery", 10_000);
let last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
.id;
// Create the validator keypair that will be the next leader in line
let next_leader_keypair = Keypair::new();
// Create a common ledger with entries in the beginning that will add only
// the "next_leader" validator to the active set for leader election, guaranteeing
// he is the next leader after bootstrap_height
let mut ledger_paths = Vec::new();
ledger_paths.push(bootstrap_leader_ledger_path.clone());
// Make the entries to give the next_leader validator some stake so that he will be in
// leader election active set
let first_entries = make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id);
let first_entries_len = first_entries.len();
// Write the entries
let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();
ledger_writer.write_entries(first_entries).unwrap();
let ledger_initial_len = (genesis_entries.len() + first_entries_len) as u64;
let next_leader_ledger_path = tmp_copy_ledger(
&bootstrap_leader_ledger_path,
"test_dropped_handoff_recovery",
);
ledger_paths.push(next_leader_ledger_path.clone());
// Create the common leader scheduling configuration
let num_slots_per_epoch = (N + 1) as u64;
let leader_rotation_interval = 5;
let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval;
let bootstrap_height = ledger_initial_len + 1;
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_leader_info.id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(leader_rotation_interval),
);
// Start up the bootstrap leader fullnode
let bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
bootstrap_leader_keypair,
Some(bootstrap_leader_info.contact_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
);
let mut nodes = vec![bootstrap_leader];
// Start up the validators other than the "next_leader" validator
for _ in 0..(N - 1) {
let kp = Keypair::new();
let validator_ledger_path = tmp_copy_ledger(
&bootstrap_leader_ledger_path,
"test_dropped_handoff_recovery",
);
ledger_paths.push(validator_ledger_path.clone());
let validator_id = kp.pubkey();
let validator_node = Node::new_localhost_with_pubkey(validator_id);
let validator = Fullnode::new(
validator_node,
&validator_ledger_path,
kp,
Some(bootstrap_leader_info.contact_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
);
nodes.push(validator);
}
// Wait for convergence
let num_converged = converge(&bootstrap_leader_info, N).len();
assert_eq!(num_converged, N);
// Wait for leader transition
match nodes[0].handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderToValidatorRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
// Now start up the "next leader" node
let next_leader_node = Node::new_localhost_with_pubkey(next_leader_keypair.pubkey());
let mut next_leader = Fullnode::new(
next_leader_node,
&next_leader_ledger_path,
next_leader_keypair,
Some(bootstrap_leader_info.contact_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
);
// Wait for catchup
match next_leader.handle_role_transition().unwrap() {
Some(FullnodeReturnType::ValidatorToLeaderRotation) => (),
_ => panic!("Expected reason for exit to be leader rotation"),
}
nodes.push(next_leader);
for node in nodes {
node.close().unwrap();
}
for path in ledger_paths {
remove_dir_all(path).unwrap();
}
}
#[test]
//TODO: Ignore for now due to bug exposed by the test "test_dropped_handoff_recovery"
#[ignore]
fn test_full_leader_validator_network() {
logger::setup();
// The number of validators
const N: usize = 5;
logger::setup();
// Create the bootstrap leader node information
let bootstrap_leader_keypair = Keypair::new();
let bootstrap_leader_node = Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey());
let bootstrap_leader_info = bootstrap_leader_node.info.clone();
let mut node_keypairs = VecDeque::new();
node_keypairs.push_back(bootstrap_leader_keypair);
// Create the validator keypairs
for _ in 0..N {
let validator_keypair = Keypair::new();
node_keypairs.push_back(validator_keypair);
}
// Make a common mint and a genesis entry for both leader + validator's ledgers
let (mint, bootstrap_leader_ledger_path, genesis_entries) =
genesis("test_full_leader_validator_network", 10_000);
let mut last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
.id;
// Create a common ledger with entries in the beginnging that will add all the validators
// to the active set for leader election. TODO: Leader rotation does not support dynamic
// stakes for safe leader -> validator transitions due to the unpredictability of
// bank state due to transactions being in-flight during leader seed calculation in
// write stage.
let mut ledger_paths = Vec::new();
ledger_paths.push(bootstrap_leader_ledger_path.clone());
for node_keypair in node_keypairs.iter() {
// Make entries to give the validator some stake so that he will be in
// leader election active set
let bootstrap_entries = make_active_set_entries(node_keypair, &mint.keypair(), &last_id);
// Write the entries
let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();
last_id = bootstrap_entries
.last()
.expect("expected at least one genesis entry")
.id;
ledger_writer.write_entries(bootstrap_entries).unwrap();
}
// Create the common leader scheduling configuration
let num_slots_per_epoch = (N + 1) as u64;
let num_bootstrap_slots = 2;
let leader_rotation_interval = 5;
let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval;
let bootstrap_height = num_bootstrap_slots * leader_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_leader_info.id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(leader_rotation_interval),
);
let exit = Arc::new(AtomicBool::new(false));
// Start the bootstrap leader fullnode
let bootstrap_leader = Arc::new(RwLock::new(Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
node_keypairs.pop_front().unwrap(),
Some(bootstrap_leader_info.contact_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
)));
let mut nodes: Vec<Arc<RwLock<Fullnode>>> = vec![bootstrap_leader.clone()];
let mut t_nodes = vec![run_node(
bootstrap_leader_info.id,
bootstrap_leader,
exit.clone(),
)];
// Start up the validators
for kp in node_keypairs.into_iter() {
let validator_ledger_path = tmp_copy_ledger(
&bootstrap_leader_ledger_path,
"test_full_leader_validator_network",
);
ledger_paths.push(validator_ledger_path.clone());
let validator_id = kp.pubkey();
let validator_node = Node::new_localhost_with_pubkey(validator_id);
let validator = Arc::new(RwLock::new(Fullnode::new(
validator_node,
&validator_ledger_path,
kp,
Some(bootstrap_leader_info.contact_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
)));
nodes.push(validator.clone());
t_nodes.push(run_node(validator_id, validator, exit.clone()));
}
// Wait for convergence
let num_converged = converge(&bootstrap_leader_info, N + 1).len();
assert_eq!(num_converged, N + 1);
// Wait for each node to hit a specific target height in the leader schedule.
// Once all the nodes hit that height, exit them all together. They must
// only quit once they've all confirmed to reach that specific target height.
// Otherwise, some nodes may never reach the target height if a critical
// next leader node exits first, and stops generating entries. (We don't
// have a timeout mechanism).
let target_height = bootstrap_height + seed_rotation_interval;
let mut num_reached_target_height = 0;
while num_reached_target_height != N + 1 {
num_reached_target_height = 0;
for n in nodes.iter() {
let node_lock = n.read().unwrap();
let ls_lock = &node_lock.leader_scheduler;
if let Some(sh) = ls_lock.read().unwrap().last_seed_height {
if sh >= target_height {
num_reached_target_height += 1;
}
}
drop(ls_lock);
}
sleep(Duration::new(1, 0));
}
exit.store(true, Ordering::Relaxed);
// Wait for threads running the nodes to exit
for t in t_nodes {
t.join().unwrap();
}
// Exit all fullnodes
for n in nodes {
let result = Arc::try_unwrap(n);
match result {
Ok(lock) => {
let f = lock
.into_inner()
.expect("RwLock for fullnode is still locked");
f.close().unwrap();
}
Err(_) => panic!("Multiple references to RwLock<FullNode> still exist"),
}
}
let mut node_entries = vec![];
// Check that all the ledgers match
for ledger_path in ledger_paths.iter() {
let entries = read_ledger(ledger_path, true).expect("Expected parsing of node ledger");
node_entries.push(entries);
}
let mut shortest = None;
let mut length = 0;
loop {
let mut expected_entry_option = None;
let mut empty_iterators = HashSet::new();
for (i, entries_for_specific_node) in node_entries.iter_mut().enumerate() {
if let Some(next_entry_option) = entries_for_specific_node.next() {
// If this ledger iterator has another entry, make sure that the
// ledger reader parsed it correctly
let next_entry = next_entry_option.expect("expected valid ledger entry");
// Check if another earlier ledger iterator had another entry. If so, make
// sure they match
if let Some(ref expected_entry) = expected_entry_option {
assert_eq!(*expected_entry, next_entry);
} else {
expected_entry_option = Some(next_entry);
}
} else {
// The shortest iterator is the first one to return a None when
// calling next()
if shortest.is_none() {
shortest = Some(length);
}
empty_iterators.insert(i);
}
}
// Remove the empty iterators
node_entries = node_entries
.into_iter()
.enumerate()
.filter_map(|(i, x)| match empty_iterators.get(&i) {
None => Some(x),
_ => None,
}).collect();
if node_entries.len() == 0 {
break;
}
length += 1;
}
assert!(shortest.unwrap() >= target_height);
for path in ledger_paths {
remove_dir_all(path).unwrap();
}