Don't use the Bank's LeaderScheduler

This commit is contained in:
Greg Fitzgerald 2019-02-16 09:23:29 -07:00
parent 643384e1ec
commit 97c93629a5
8 changed files with 110 additions and 49 deletions

View File

@ -103,7 +103,7 @@ pub struct Bank {
/// Tracks and updates the leader schedule based on the votes and account stakes
/// processed by the bank
pub leader_scheduler: Arc<RwLock<LeaderScheduler>>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
subscriptions: RwLock<Option<Arc<RpcSubscriptions>>>,
}

View File

@ -166,11 +166,13 @@ mod test {
// Set up bank and leader_scheduler
let leader_scheduler_config = LeaderSchedulerConfig::new(5, 2, 10);
let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000);
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
let bank = Bank::new_with_leader_scheduler(&genesis_block, leader_scheduler.clone());
// Set up entry stream
let entry_stream =
MockEntryStream::new("test_stream".to_string(), bank.leader_scheduler.clone());
let ticks_per_slot = bank.leader_scheduler.read().unwrap().ticks_per_slot;
MockEntryStream::new("test_stream".to_string(), leader_scheduler.clone());
let ticks_per_slot = leader_scheduler.read().unwrap().ticks_per_slot;
let mut last_id = Hash::default();
let mut entries = Vec::new();
@ -178,13 +180,11 @@ mod test {
let tick_height_initial = 0;
let tick_height_final = tick_height_initial + ticks_per_slot + 2;
let mut previous_slot = bank
.leader_scheduler
let mut previous_slot = leader_scheduler
.read()
.unwrap()
.tick_height_to_slot(tick_height_initial);
let leader_id = bank
.leader_scheduler
let leader_id = leader_scheduler
.read()
.unwrap()
.get_leader_for_slot(previous_slot)
@ -192,12 +192,11 @@ mod test {
.unwrap_or_else(|| "None".to_string());
for tick_height in tick_height_initial..=tick_height_final {
bank.leader_scheduler
leader_scheduler
.write()
.unwrap()
.update_tick_height(tick_height, &bank);
let curr_slot = bank
.leader_scheduler
let curr_slot = leader_scheduler
.read()
.unwrap()
.tick_height_to_slot(tick_height);

View File

@ -122,14 +122,19 @@ mod test {
#[test]
fn test_entry_stream_stage_process_entries() {
// Set up bank and leader_scheduler
// Set up the bank and leader_scheduler
let ticks_per_slot = 5;
let leader_scheduler_config = LeaderSchedulerConfig::new(ticks_per_slot, 2, 10);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Side-effect: Register a bank with the leader_scheduler
let (genesis_block, _mint_keypair) = GenesisBlock::new(1_000_000);
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
Bank::new_with_leader_scheduler(&genesis_block, leader_scheduler.clone());
// Set up entry stream
let mut entry_stream =
EntryStream::new("test_stream".to_string(), bank.leader_scheduler.clone());
EntryStream::new("test_stream".to_string(), leader_scheduler.clone());
// Set up dummy channels to host an EntryStreamStage
let (ledger_entry_sender, ledger_entry_receiver) = channel();

View File

@ -106,6 +106,7 @@ pub struct Fullnode {
rotation_sender: TpuRotationSender,
rotation_receiver: TpuRotationReceiver,
blocktree: Arc<Blocktree>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
}
impl Fullnode {
@ -202,7 +203,7 @@ impl Fullnode {
let (scheduled_leader, max_tick_height, blob_index) = {
let next_tick = bank.tick_height() + 1;
let leader_scheduler = bank.leader_scheduler.read().unwrap();
let leader_scheduler = leader_scheduler.read().unwrap();
let slot_at_next_tick = leader_scheduler.tick_height_to_slot(next_tick);
let scheduled_leader = leader_scheduler
@ -270,6 +271,7 @@ impl Fullnode {
config.entry_stream.as_ref(),
ledger_signal_sender,
ledger_signal_receiver,
leader_scheduler.clone(),
);
let tpu = Tpu::new(id, &cluster_info);
@ -287,6 +289,7 @@ impl Fullnode {
rotation_sender,
rotation_receiver,
blocktree,
leader_scheduler,
};
fullnode.rotate(
@ -315,7 +318,7 @@ impl Fullnode {
}
let (scheduled_leader, max_tick_height) = {
let mut leader_scheduler = self.bank.leader_scheduler.write().unwrap();
let mut leader_scheduler = self.leader_scheduler.write().unwrap();
// A transition is only permitted on the final tick of a slot
assert_eq!(leader_scheduler.num_ticks_left_in_slot(tick_height), 0);
@ -377,6 +380,7 @@ impl Fullnode {
last_entry_id,
&self.rotation_sender,
&self.blocktree,
&self.leader_scheduler,
);
transition
} else {
@ -820,13 +824,14 @@ mod tests {
// Close the validator so that rocksdb has locks available
validator_exit();
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
let (bank, entry_height, _, _, _, _) = new_bank_from_ledger(
&validator_ledger_path,
&BlocktreeConfig::default(),
&Arc::new(RwLock::new(LeaderScheduler::default())),
&leader_scheduler,
);
assert!(bank.tick_height() >= bank.leader_scheduler.read().unwrap().ticks_per_epoch);
assert!(bank.tick_height() >= leader_scheduler.read().unwrap().ticks_per_epoch);
assert!(entry_height >= ledger_initial_len);

View File

@ -436,6 +436,7 @@ pub mod tests {
use hashbrown::HashSet;
use std::hash::Hash as StdHash;
use std::iter::FromIterator;
use std::sync::RwLock;
fn to_hashset_owned<T>(slice: &[T]) -> HashSet<T>
where
@ -480,12 +481,14 @@ pub mod tests {
ticks_per_epoch / ticks_per_slot,
active_window_tick_length,
);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Create the bank and validators, which are inserted in order of account balance
let num_vote_account_tokens = 1;
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
info!("bootstrap_leader_id: {}", genesis_block.bootstrap_leader_id);
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
let bank = Bank::new_with_leader_scheduler(&genesis_block, leader_scheduler.clone());
let mut validators = vec![];
let last_id = genesis_block.last_id();
for i in 0..num_validators {
@ -625,8 +628,10 @@ pub mod tests {
let leader_id = Keypair::new().pubkey();
let active_window_tick_length = 1000;
let leader_scheduler_config = LeaderSchedulerConfig::new(100, 1, active_window_tick_length);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
let (genesis_block, mint_keypair) = GenesisBlock::new_with_leader(10000, leader_id, 500);
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
let bank = Bank::new_with_leader_scheduler(&genesis_block, leader_scheduler.clone());
let bootstrap_ids = to_hashset_owned(&vec![genesis_block.bootstrap_leader_id]);
@ -692,7 +697,7 @@ pub mod tests {
}
// Query for the active set at various heights
let mut leader_scheduler = bank.leader_scheduler.write().unwrap();
let mut leader_scheduler = leader_scheduler.write().unwrap();
let result = leader_scheduler.get_active_set(0, &bank);
assert_eq!(result, bootstrap_ids);
@ -917,13 +922,15 @@ pub mod tests {
let leader_scheduler_config =
LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, active_window_tick_length);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Create the bank and validators
let (genesis_block, mint_keypair) = GenesisBlock::new(
((((num_validators + 1) / 2) * (num_validators + 1))
+ (num_vote_account_tokens * num_validators)) as u64,
);
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
let bank = Bank::new_with_leader_scheduler(&genesis_block, leader_scheduler.clone());
let mut validators = vec![];
let last_id = genesis_block.last_id();
for i in 0..num_validators {
@ -960,7 +967,7 @@ pub mod tests {
// Generate schedule every active_window_tick_length entries and check that
// validators are falling out of the rotation as they fall out of the
// active set
let mut leader_scheduler = bank.leader_scheduler.write().unwrap();
let mut leader_scheduler = leader_scheduler.write().unwrap();
trace!("bootstrap_leader_id: {}", genesis_block.bootstrap_leader_id);
for i in 0..num_validators {
trace!("validators[{}]: {}", i, validators[i as usize]);
@ -993,11 +1000,13 @@ pub mod tests {
let active_window_tick_length = 1000;
let (genesis_block, _mint_keypair) = GenesisBlock::new_with_leader(10000, leader_id, 500);
let leader_scheduler_config = LeaderSchedulerConfig::new(100, 1, active_window_tick_length);
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
let bank = Bank::new_with_leader_scheduler(&genesis_block, leader_scheduler.clone());
// Bootstrap leader should be in the active set even without explicit votes
{
let mut leader_scheduler = bank.leader_scheduler.write().unwrap();
let mut leader_scheduler = leader_scheduler.write().unwrap();
let result = leader_scheduler.get_active_set(0, &bank);
assert_eq!(result, to_hashset_owned(&vec![leader_id]));
@ -1025,7 +1034,7 @@ pub mod tests {
push_vote(&voting_keypair, &bank, 1, genesis_block.last_id());
{
let mut leader_scheduler = bank.leader_scheduler.write().unwrap();
let mut leader_scheduler = leader_scheduler.write().unwrap();
let result = leader_scheduler.get_active_set(active_window_tick_length + 1, &bank);
assert_eq!(result, to_hashset_owned(&vec![leader_id]));
@ -1037,7 +1046,7 @@ pub mod tests {
push_vote(&voting_keypair, &bank, 2, genesis_block.last_id());
{
let mut leader_scheduler = bank.leader_scheduler.write().unwrap();
let mut leader_scheduler = leader_scheduler.write().unwrap();
let result = leader_scheduler.get_active_set(active_window_tick_length + 2, &bank);
assert_eq!(result, to_hashset_owned(&vec![leader_id]));
@ -1057,12 +1066,14 @@ pub mod tests {
let leader_scheduler_config =
LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, active_window_tick_length);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Check that the generate_schedule() function is being called by the
// update_tick_height() function at the correct entry heights.
let (genesis_block, _) = GenesisBlock::new(10_000);
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
let mut leader_scheduler = bank.leader_scheduler.write().unwrap();
let bank = Bank::new_with_leader_scheduler(&genesis_block, leader_scheduler.clone());
let mut leader_scheduler = leader_scheduler.write().unwrap();
info!(
"bootstrap_leader_id: {:?}",
genesis_block.bootstrap_leader_id
@ -1198,11 +1209,13 @@ pub mod tests {
let leader_scheduler_config =
LeaderSchedulerConfig::new(ticks_per_slot, slots_per_epoch, active_window_tick_length);
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
// Create mint and bank
let (genesis_block, mint_keypair) =
GenesisBlock::new_with_leader(10_000, bootstrap_leader_id, BOOTSTRAP_LEADER_TOKENS);
let bank = Bank::new_with_leader_scheduler_config(&genesis_block, &leader_scheduler_config);
let bank = Bank::new_with_leader_scheduler(&genesis_block, leader_scheduler.clone());
let last_id = genesis_block.last_id();
let initial_vote_height = 1;

View File

@ -5,6 +5,7 @@ use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::counter::Counter;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_scheduler::LeaderScheduler;
use crate::packet::BlobError;
use crate::result::{Error, Result};
use crate::service::Service;
@ -65,6 +66,7 @@ impl ReplayStage {
ledger_entry_sender: &EntrySender,
current_blob_index: &mut u64,
last_entry_id: &Arc<RwLock<Hash>>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
// Coalesce all the available entries into a single vote
submit(
@ -87,8 +89,7 @@ impl ReplayStage {
);
let num_ticks = bank.tick_height();
let mut num_ticks_to_next_vote = bank
.leader_scheduler
let mut num_ticks_to_next_vote = leader_scheduler
.read()
.unwrap()
.num_ticks_left_in_slot(num_ticks);
@ -97,7 +98,7 @@ impl ReplayStage {
inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize);
if entry.is_tick() {
if num_ticks_to_next_vote == 0 {
num_ticks_to_next_vote = bank.leader_scheduler.read().unwrap().ticks_per_slot;
num_ticks_to_next_vote = leader_scheduler.read().unwrap().ticks_per_slot;
}
num_ticks_to_next_vote -= 1;
}
@ -181,6 +182,7 @@ impl ReplayStage {
to_leader_sender: &TvuRotationSender,
ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> (Self, EntryReceiver) {
let (ledger_entry_sender, ledger_entry_receiver) = channel();
#[cfg(test)]
@ -190,16 +192,18 @@ impl ReplayStage {
(pause, pause_)
};
let exit_ = exit.clone();
let leader_scheduler_ = leader_scheduler.clone();
let to_leader_sender = to_leader_sender.clone();
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_.clone());
let mut last_leader_id = Self::get_leader_for_next_tick(&bank);
let mut last_leader_id =
Self::get_leader_for_next_tick(bank.tick_height(), &leader_scheduler_);
let mut prev_slot = None;
let (mut current_slot, mut max_tick_height_for_slot) = {
let tick_height = bank.tick_height();
let leader_scheduler = bank.leader_scheduler.read().unwrap();
let leader_scheduler = leader_scheduler_.read().unwrap();
let current_slot = leader_scheduler.tick_height_to_slot(tick_height + 1);
let first_tick_in_current_slot = current_slot * leader_scheduler.ticks_per_slot;
(
@ -230,7 +234,7 @@ impl ReplayStage {
// Reset the state
current_slot = new_slot;
current_blob_index = 0;
let leader_scheduler = bank.leader_scheduler.read().unwrap();
let leader_scheduler = leader_scheduler_.read().unwrap();
let first_tick_in_current_slot =
current_slot.unwrap() * leader_scheduler.ticks_per_slot;
max_tick_height_for_slot = first_tick_in_current_slot
@ -266,6 +270,7 @@ impl ReplayStage {
&ledger_entry_sender,
&mut current_blob_index,
&last_entry_id,
&leader_scheduler_,
) {
error!("process_entries failed: {:?}", e);
}
@ -276,7 +281,10 @@ impl ReplayStage {
// for leader rotation
if max_tick_height_for_slot == current_tick_height {
// Check for leader rotation
let leader_id = Self::get_leader_for_next_tick(&bank);
let leader_id = Self::get_leader_for_next_tick(
bank.tick_height(),
&leader_scheduler_,
);
if leader_id != last_leader_id {
if my_id == leader_id {
@ -332,9 +340,11 @@ impl ReplayStage {
let _ = self.ledger_signal_sender.send(true);
}
fn get_leader_for_next_tick(bank: &Bank) -> Pubkey {
let tick_height = bank.tick_height();
let leader_scheduler = bank.leader_scheduler.read().unwrap();
fn get_leader_for_next_tick(
tick_height: u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Pubkey {
let leader_scheduler = leader_scheduler.read().unwrap();
let slot = leader_scheduler.tick_height_to_slot(tick_height + 1);
leader_scheduler
.get_leader_for_slot(slot)
@ -470,6 +480,7 @@ mod test {
&rotation_sender,
l_sender,
l_receiver,
&leader_scheduler,
);
let total_entries_to_send = 2 * ticks_per_slot as usize - 2;
@ -553,11 +564,12 @@ mod test {
let voting_keypair = Arc::new(VotingKeypair::new_local(&my_keypair));
let (to_leader_sender, _) = channel();
{
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
let (bank, entry_height, last_entry_id, blocktree, l_sender, l_receiver) =
new_bank_from_ledger(
&my_ledger_path,
&BlocktreeConfig::default(),
&Arc::new(RwLock::new(LeaderScheduler::default())),
&leader_scheduler,
);
let bank = Arc::new(bank);
@ -574,6 +586,7 @@ mod test {
&to_leader_sender,
l_sender,
l_receiver,
&leader_scheduler,
);
let keypair = voting_keypair.as_ref();
@ -700,6 +713,7 @@ mod test {
&rotation_tx,
l_sender,
l_receiver,
&leader_scheduler,
);
let keypair = voting_keypair.as_ref();
@ -768,16 +782,23 @@ mod test {
entries.push(entry);
}
let genesis_block = GenesisBlock::new(10_000).0;
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
let my_keypair = Arc::new(my_keypair);
let voting_keypair = Arc::new(VotingKeypair::new_local(&my_keypair));
let bank = Arc::new(Bank::new_with_leader_scheduler(
&genesis_block,
leader_scheduler.clone(),
));
let res = ReplayStage::process_entries(
entries.clone(),
&Arc::new(Bank::new(&GenesisBlock::new(10_000).0)),
&bank,
&cluster_info_me,
Some(&voting_keypair),
&ledger_entry_sender,
&mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler,
);
match res {
@ -791,14 +812,19 @@ mod test {
entries.push(entry);
}
let bank = Arc::new(Bank::new_with_leader_scheduler(
&genesis_block,
leader_scheduler.clone(),
));
let res = ReplayStage::process_entries(
entries.clone(),
&Arc::new(Bank::default()),
&bank,
&cluster_info_me,
Some(&voting_keypair),
&ledger_entry_sender,
&mut current_blob_index,
&Arc::new(RwLock::new(last_entry_id)),
&leader_scheduler,
);
match res {

View File

@ -8,6 +8,7 @@ use crate::broadcast_service::BroadcastService;
use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::fetch_stage::FetchStage;
use crate::leader_scheduler::LeaderScheduler;
use crate::poh_service::PohServiceConfig;
use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage;
@ -152,6 +153,7 @@ impl Tpu {
last_entry_id: &Hash,
to_validator_sender: &TpuRotationSender,
blocktree: &Arc<Blocktree>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) {
self.close_and_forward_unprocessed_packets();
@ -188,7 +190,7 @@ impl Tpu {
broadcast_socket,
self.cluster_info.clone(),
blob_index,
bank.leader_scheduler.clone(),
leader_scheduler.clone(),
entry_receiver,
max_tick_height,
self.exit.clone(),

View File

@ -17,6 +17,7 @@ use crate::blob_fetch_stage::BlobFetchStage;
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::entry_stream_stage::EntryStreamStage;
use crate::leader_scheduler::LeaderScheduler;
use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
use crate::service::Service;
@ -78,6 +79,7 @@ impl Tvu {
entry_stream: Option<&String>,
ledger_signal_sender: SyncSender<bool>,
ledger_signal_receiver: Receiver<bool>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> Self {
let exit = Arc::new(AtomicBool::new(false));
let keypair: Arc<Keypair> = cluster_info
@ -111,7 +113,7 @@ impl Tvu {
Arc::new(retransmit_socket),
repair_socket,
blob_fetch_receiver,
bank.leader_scheduler.clone(),
leader_scheduler.clone(),
exit.clone(),
);
@ -129,13 +131,14 @@ impl Tvu {
to_leader_sender,
ledger_signal_sender,
ledger_signal_receiver,
&leader_scheduler,
);
let entry_stream_stage = if entry_stream.is_some() {
let (entry_stream_stage, entry_stream_receiver) = EntryStreamStage::new(
previous_receiver,
entry_stream.unwrap().to_string(),
bank.leader_scheduler.clone(),
leader_scheduler,
exit.clone(),
);
previous_receiver = entry_stream_receiver;
@ -242,7 +245,11 @@ pub mod tests {
let starting_balance = 10_000;
let (genesis_block, _mint_keypair) = GenesisBlock::new(starting_balance);
let bank = Arc::new(Bank::new(&genesis_block));
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::default()));
let bank = Arc::new(Bank::new_with_leader_scheduler(
&genesis_block,
leader_scheduler.clone(),
));
//start cluster_info1
let mut cluster_info1 = ClusterInfo::new(target1.info.clone());
@ -278,6 +285,7 @@ pub mod tests {
None,
l_sender,
l_receiver,
leader_scheduler,
);
tvu.close().expect("close");
}
@ -332,9 +340,11 @@ pub mod tests {
let starting_balance = 10_000;
let (genesis_block, mint_keypair) = GenesisBlock::new(starting_balance);
let tvu_addr = target1.info.tvu;
let bank = Arc::new(Bank::new_with_leader_scheduler_config(
let leader_scheduler =
Arc::new(RwLock::new(LeaderScheduler::new(&leader_scheduler_config)));
let bank = Arc::new(Bank::new_with_leader_scheduler(
&genesis_block,
&leader_scheduler_config,
leader_scheduler.clone(),
));
assert_eq!(bank.get_balance(&mint_keypair.pubkey()), starting_balance);
@ -375,6 +385,7 @@ pub mod tests {
None,
l_sender,
l_receiver,
leader_scheduler,
);
let mut alice_ref_balance = starting_balance;