Fix replay_stage to 1) skip leader slots, 2) create/set working banks properly

This commit is contained in:
Carl 2019-02-24 01:06:46 -08:00 committed by Grimes
parent 1e15e6375a
commit c13ae10d31
3 changed files with 153 additions and 67 deletions

View File

@ -102,6 +102,7 @@ pub struct Fullnode {
rotation_receiver: TvuRotationReceiver,
blocktree: Arc<Blocktree>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
bank_forks: Arc<RwLock<BankForks>>,
}
impl Fullnode {
@ -252,6 +253,7 @@ impl Fullnode {
rotation_receiver,
blocktree,
leader_scheduler,
bank_forks,
}
}
@ -278,7 +280,7 @@ impl Fullnode {
None => FullnodeReturnType::LeaderToLeaderRotation, // value doesn't matter here...
};
self.node_services.tpu.switch_to_leader(
Arc::new(rotation_info.bank),
self.bank_forks.read().unwrap().working_bank(),
PohServiceConfig::default(),
self.tpu_sockets
.iter()

View File

@ -191,59 +191,65 @@ impl ReplayStage {
let to_leader_sender = to_leader_sender.clone();
let subscriptions_ = subscriptions.clone();
let (bank, last_entry_id, mut current_blob_index) = {
// Gather up all the metadata about the current state of the ledger
let (mut bank, tick_height, last_entry_id, mut current_blob_index) = {
let mut bank_forks = bank_forks.write().unwrap();
bank_forks.set_working_bank_id(bank_forks_info[0].bank_id);
let bank = bank_forks.working_bank();
let tick_height = bank.tick_height();
(
bank_forks.working_bank(),
bank,
tick_height,
bank_forks_info[0].last_entry_id,
bank_forks_info[0].entry_height,
)
};
let last_entry_id = Arc::new(RwLock::new(last_entry_id));
{
// Update Tpu and other fullnode components with the current bank
let (mut current_slot, mut current_leader_id, mut max_tick_height_for_slot) = {
let leader_scheduler = leader_scheduler.read().unwrap();
let slot = leader_scheduler.tick_height_to_slot(bank.tick_height() + 1);
let slot = leader_scheduler.tick_height_to_slot(tick_height + 1);
let first_tick_in_slot = slot * bank.ticks_per_slot();
let leader_id = leader_scheduler
.get_leader_for_slot(slot)
.expect("Leader not known after processing bank");
trace!("node {:?} scheduled as leader for slot {}", leader_id, slot,);
let old_bank = bank.clone();
// If the next slot is going to be a new slot and we're the leader for that slot,
// make a new working bank, set it as the working bank.
if tick_height + 1 == first_tick_in_slot && leader_id == my_id {
bank = Self::create_and_set_working_bank(slot, &bank_forks, &old_bank);
}
// Send a rotation notification back to Fullnode to initialize the TPU to the right
// state
// state. After this point, the bank.tick_height() is live, which it means it can
// be updated by the TPU
to_leader_sender
.send(TvuRotationInfo {
bank: Bank::new_from_parent(&bank),
bank: old_bank,
last_entry_id: *last_entry_id.read().unwrap(),
slot,
leader_id,
})
.unwrap();
}
let max_tick_height_for_slot =
first_tick_in_slot + leader_scheduler.num_ticks_left_in_slot(first_tick_in_slot);
(Some(slot), leader_id, max_tick_height_for_slot)
};
// Start the replay stage loop
let bank_forks = bank_forks.clone();
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_.clone());
let mut last_leader_id = leader_scheduler_
.read()
.unwrap()
.get_leader_for_tick(bank.tick_height() + 1)
.unwrap();
let mut prev_slot = None;
let (mut current_slot, mut max_tick_height_for_slot) = {
let tick_height = bank.tick_height();
let leader_scheduler = leader_scheduler_.read().unwrap();
let current_slot = leader_scheduler.tick_height_to_slot(tick_height + 1);
let first_tick_in_current_slot = current_slot * bank.ticks_per_slot();
(
Some(current_slot),
first_tick_in_current_slot
+ leader_scheduler.num_ticks_left_in_slot(first_tick_in_current_slot),
)
};
// Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each
// relevant slot to see if there are any available updates
@ -266,20 +272,35 @@ impl ReplayStage {
prev_slot.expect("prev_slot must exist"),
);
if new_slot.is_some() {
trace!("{} replay_stage: new_slot found: {:?}", my_id, new_slot);
// Reset the state
bank = Self::create_and_set_working_bank(
new_slot.unwrap(),
&bank_forks,
&bank,
);
current_slot = new_slot;
current_blob_index = 0;
let leader_scheduler = leader_scheduler_.read().unwrap();
let first_tick_in_current_slot =
current_slot.unwrap() * bank.ticks_per_slot();
max_tick_height_for_slot = first_tick_in_current_slot
+ leader_scheduler
.num_ticks_left_in_slot(first_tick_in_current_slot);
Self::reset_state(
bank.ticks_per_slot(),
current_slot.unwrap(),
&mut max_tick_height_for_slot,
&mut current_blob_index,
);
} else {
continue;
}
}
// current_slot must be Some(x) by this point
let slot = current_slot.unwrap();
// Fetch the next entries from the database
let entries = {
if let Some(slot) = current_slot {
if current_leader_id != my_id {
info!(
"{} replay_stage: asking for entries from slot: {}, bi: {}",
my_id, slot, current_blob_index
);
if let Ok(entries) = blocktree.get_slot_entries(
slot,
current_blob_index,
@ -294,7 +315,6 @@ impl ReplayStage {
}
};
// Fetch the next entries from the database
if !entries.is_empty() {
if let Err(e) = Self::process_entries(
entries,
@ -307,45 +327,87 @@ impl ReplayStage {
&leader_scheduler_,
&subscriptions_,
) {
error!("process_entries failed: {:?}", e);
error!("{} process_entries failed: {:?}", my_id, e);
}
}
let current_tick_height = bank.tick_height();
let current_tick_height = bank.tick_height();
// We've reached the end of a slot, reset our state and check
// for leader rotation
if max_tick_height_for_slot == current_tick_height {
// Check for leader rotation
let (leader_id, next_slot) = {
let leader_scheduler = leader_scheduler_.read().unwrap();
(
leader_scheduler
.get_leader_for_tick(current_tick_height + 1)
.unwrap(),
leader_scheduler.tick_height_to_slot(current_tick_height + 1),
)
};
// We've reached the end of a slot, reset our state and check
// for leader rotation
if max_tick_height_for_slot == current_tick_height {
// TODO: replace this with generating an actual leader schedule
// from the bank
leader_scheduler_
.write()
.unwrap()
.update_tick_height(current_tick_height, &bank);
// Check for leader rotation
let (leader_id, next_slot) = {
let leader_scheduler = leader_scheduler_.read().unwrap();
(
leader_scheduler
.get_leader_for_tick(current_tick_height + 1)
.unwrap(),
leader_scheduler.tick_height_to_slot(current_tick_height + 1),
)
};
if my_id == leader_id || my_id == last_leader_id {
to_leader_sender
.send(TvuRotationInfo {
bank: Bank::new_from_parent(&bank),
last_entry_id: *last_entry_id.read().unwrap(),
slot: next_slot,
leader_id,
})
.unwrap();
} else if leader_id != last_leader_id {
// TODO: Remove this soon once we boot the leader from ClusterInfo
cluster_info.write().unwrap().set_leader(leader_id);
// If we were the leader for the last slot update the last id b/c we
// haven't processed any of the entries for the slot for which we were
// the leader
if current_leader_id == my_id {
let meta = blocktree.meta(slot).unwrap().expect("meta has to exist");
if meta.last_index == std::u64::MAX {
// Ledger hasn't gotten last blob yet, break and wait
// for a signal
continue;
}
// Check for any slots that chain to this one
prev_slot = current_slot;
current_slot = None;
last_leader_id = leader_id;
continue;
let last_entry = blocktree
.get_slot_entries(slot, meta.last_index, Some(1))
.unwrap();
*(last_entry_id.write().unwrap()) = last_entry[0].id;
}
let old_bank = bank.clone();
prev_slot = current_slot;
if my_id == leader_id {
// Create new bank for next slot if we are the leader for that slot
bank = Self::create_and_set_working_bank(
next_slot,
&bank_forks,
&old_bank,
);
current_slot = Some(next_slot);
Self::reset_state(
bank.ticks_per_slot(),
next_slot,
&mut max_tick_height_for_slot,
&mut current_blob_index,
);
} else {
current_slot = None;
}
if leader_id != current_leader_id {
// TODO: Remove this soon once we boot the leader from ClusterInfo
cluster_info.write().unwrap().set_leader(leader_id);
}
// Always send rotation signal so that other services like
// RPC can be made aware of last slot's bank
to_leader_sender
.send(TvuRotationInfo {
bank: old_bank,
last_entry_id: *last_entry_id.read().unwrap(),
slot: next_slot,
leader_id,
})
.unwrap();
// Check for any slots that chain to this one
current_leader_id = leader_id;
continue;
}
}
})
@ -363,6 +425,28 @@ impl ReplayStage {
self.exit.store(true, Ordering::Relaxed);
}
fn create_and_set_working_bank(
slot: u64,
bank_forks: &Arc<RwLock<BankForks>>,
parent: &Arc<Bank>,
) -> Arc<Bank> {
let new_bank = Bank::new_from_parent(&parent);
let mut bank_forks = bank_forks.write().unwrap();
bank_forks.insert(slot, new_bank);
bank_forks.set_working_bank_id(slot);
bank_forks.working_bank()
}
fn reset_state(
ticks_per_slot: u64,
slot: u64,
max_tick_height_for_slot: &mut u64,
current_blob_index: &mut u64,
) {
*current_blob_index = 0;
*max_tick_height_for_slot = (slot + 1) * ticks_per_slot - 1;
}
fn get_next_slot(blocktree: &Blocktree, slot_index: u64) -> Option<u64> {
// Find the next slot that chains to the old slot
let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error");

View File

@ -35,7 +35,7 @@ use std::sync::{Arc, RwLock};
use std::thread;
pub struct TvuRotationInfo {
pub bank: Bank, // Bank to use
pub bank: Arc<Bank>, // Bank to use
pub last_entry_id: Hash, // last_entry_id of that bank
pub slot: u64, // slot height to initiate a rotation
pub leader_id: Pubkey, // leader upon rotation