use Bank's notion of leader_id where possible (#3119)

This commit is contained in:
Rob Walker 2019-03-04 18:40:47 -08:00 committed by GitHub
parent a481822321
commit 794e961328
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 23 deletions

View File

@ -3,7 +3,6 @@
use crate::bank_forks::BankForks; use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree; use crate::blocktree::Blocktree;
use crate::blocktree_processor; use crate::blocktree_processor;
use crate::blocktree_processor::BankForksInfo;
use crate::cluster_info::ClusterInfo; use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice}; use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_schedule_utils; use crate::leader_schedule_utils;
@ -59,7 +58,6 @@ impl ReplayStage {
voting_keypair: Option<Arc<T>>, voting_keypair: Option<Arc<T>>,
blocktree: Arc<Blocktree>, blocktree: Arc<Blocktree>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
_bank_forks_info: &[BankForksInfo],
cluster_info: Arc<RwLock<ClusterInfo>>, cluster_info: Arc<RwLock<ClusterInfo>>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
@ -77,14 +75,14 @@ impl ReplayStage {
let bank_forks = bank_forks.clone(); let bank_forks = bank_forks.clone();
let poh_recorder = poh_recorder.clone(); let poh_recorder = poh_recorder.clone();
let mut progress = HashMap::new();
// Start the replay stage loop // Start the replay stage loop
let t_replay = Builder::new() let t_replay = Builder::new()
.name("solana-replay-stage".to_string()) .name("solana-replay-stage".to_string())
.spawn(move || { .spawn(move || {
let _exit = Finalizer::new(exit_.clone()); let _exit = Finalizer::new(exit_.clone());
let mut first_block = false; let mut first_block = false;
let mut progress = HashMap::new();
loop { loop {
let now = Instant::now(); let now = Instant::now();
// Stop getting entries if we get exit signal // Stop getting entries if we get exit signal
@ -97,7 +95,7 @@ impl ReplayStage {
let mut votable: Vec<u64> = vec![]; let mut votable: Vec<u64> = vec![];
for bank_slot in &active_banks { for bank_slot in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone(); let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
if !Self::is_tpu(&bank, my_id) { if bank.collector_id() != my_id {
Self::replay_blocktree_into_bank( Self::replay_blocktree_into_bank(
&bank, &bank,
&blocktree, &blocktree,
@ -111,8 +109,9 @@ impl ReplayStage {
info!("bank frozen {}", bank.slot()); info!("bank frozen {}", bank.slot());
votable.push(*bank_slot); votable.push(*bank_slot);
progress.remove(bank_slot); progress.remove(bank_slot);
let id = leader_schedule_utils::slot_leader_at(bank.slot(), &bank); if let Err(e) =
if let Err(e) = slot_full_sender.send((bank.slot(), id)) { slot_full_sender.send((bank.slot(), bank.collector_id()))
{
info!("{} slot_full alert failed: {:?}", my_id, e); info!("{} slot_full alert failed: {:?}", my_id, e);
} }
} }
@ -139,8 +138,9 @@ impl ReplayStage {
.unwrap() .unwrap()
.clone(); .clone();
let next_slot = *latest_slot_vote + 1; let next_slot = *latest_slot_vote + 1;
let next_leader = leader_schedule_utils::slot_leader_at(next_slot, &parent); let next_leader_id =
cluster_info.write().unwrap().set_leader(next_leader); leader_schedule_utils::slot_leader_at(next_slot, &parent);
cluster_info.write().unwrap().set_leader(next_leader_id);
subscriptions.notify_subscribers(&parent); subscriptions.notify_subscribers(&parent);
@ -159,7 +159,7 @@ impl ReplayStage {
.unwrap() .unwrap()
.reset(parent.tick_height(), parent.last_blockhash()); .reset(parent.tick_height(), parent.last_blockhash());
if next_leader == my_id { if next_leader_id == my_id {
let frozen = bank_forks.read().unwrap().frozen_banks(); let frozen = bank_forks.read().unwrap().frozen_banks();
assert!(frozen.get(&next_slot).is_none()); assert!(frozen.get(&next_slot).is_none());
assert!(bank_forks.read().unwrap().get(next_slot).is_none()); assert!(bank_forks.read().unwrap().get(next_slot).is_none());
@ -169,12 +169,15 @@ impl ReplayStage {
if let Some(tpu_bank) = if let Some(tpu_bank) =
bank_forks.read().unwrap().get(next_slot).cloned() bank_forks.read().unwrap().get(next_slot).cloned()
{ {
assert_eq!(bank_forks.read().unwrap().working_bank().slot(), tpu_bank.slot()); assert_eq!(
bank_forks.read().unwrap().working_bank().slot(),
tpu_bank.slot()
);
debug!( debug!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}", "new working bank: me: {} next_slot: {} next_leader: {}",
my_id, my_id,
tpu_bank.slot(), tpu_bank.slot(),
next_leader next_leader_id
); );
poh_recorder.lock().unwrap().set_bank(&tpu_bank); poh_recorder.lock().unwrap().set_bank(&tpu_bank);
} }
@ -256,10 +259,6 @@ impl ReplayStage {
result result
} }
pub fn is_tpu(bank: &Bank, my_id: Pubkey) -> bool {
my_id == leader_schedule_utils::slot_leader(&bank)
}
pub fn close(self) -> thread::Result<()> { pub fn close(self) -> thread::Result<()> {
self.exit(); self.exit();
self.join() self.join()
@ -372,7 +371,7 @@ mod test {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let voting_keypair = Arc::new(Keypair::new()); let voting_keypair = Arc::new(Keypair::new());
{ {
let (bank_forks, bank_forks_info, blocktree, l_receiver) = let (bank_forks, _bank_forks_info, blocktree, l_receiver) =
new_banks_from_blocktree(&my_ledger_path, None); new_banks_from_blocktree(&my_ledger_path, None);
let bank = bank_forks.working_bank(); let bank = bank_forks.working_bank();
@ -383,7 +382,6 @@ mod test {
Some(voting_keypair.clone()), Some(voting_keypair.clone()),
blocktree.clone(), blocktree.clone(),
&Arc::new(RwLock::new(bank_forks)), &Arc::new(RwLock::new(bank_forks)),
&bank_forks_info,
cluster_info_me.clone(), cluster_info_me.clone(),
exit.clone(), exit.clone(),
l_receiver, l_receiver,

View File

@ -111,7 +111,6 @@ impl Tvu {
voting_keypair, voting_keypair,
blocktree.clone(), blocktree.clone(),
&bank_forks, &bank_forks,
&bank_forks_info,
cluster_info.clone(), cluster_info.clone(),
exit.clone(), exit.clone(),
ledger_signal_receiver, ledger_signal_receiver,

View File

@ -191,6 +191,10 @@ impl Bank {
bank bank
} }
pub fn collector_id(&self) -> Pubkey {
self.collector_id
}
pub fn slot(&self) -> u64 { pub fn slot(&self) -> u64 {
self.slot self.slot
} }