solana/core/src/replay_stage.rs

683 lines
25 KiB
Rust
Raw Normal View History

//! The `replay_stage` replays transactions broadcast by the leader.
2018-05-22 14:26:28 -07:00
2019-02-21 11:37:48 -08:00
use crate::bank_forks::BankForks;
2019-02-07 20:52:39 -08:00
use crate::blocktree::Blocktree;
2019-02-26 21:57:45 -08:00
use crate::blocktree_processor;
2018-12-07 19:16:27 -08:00
use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntrySlice};
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::leader_schedule_utils;
2019-03-27 04:30:26 -07:00
use crate::locktower::{Locktower, StakeLockout};
2018-12-07 19:16:27 -08:00
use crate::packet::BlobError;
use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
2019-02-18 18:08:54 -08:00
use crate::rpc_subscriptions::RpcSubscriptions;
2018-12-07 19:16:27 -08:00
use crate::service::Service;
use solana_metrics::{datapoint_warn, inc_new_counter_error, inc_new_counter_info};
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
2019-01-30 19:28:48 -08:00
use solana_sdk::pubkey::Pubkey;
2019-02-21 21:43:35 -08:00
use solana_sdk::signature::KeypairUtil;
use solana_sdk::timing::{self, duration_as_ms};
2019-03-25 15:08:22 -07:00
use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
2018-09-26 20:58:06 -07:00
use std::time::Instant;
2018-05-22 14:26:28 -07:00
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
// Implement a destructor for the ReplayStage thread to signal it exited
// even on panics
struct Finalizer {
exit_sender: Arc<AtomicBool>,
}
impl Finalizer {
fn new(exit_sender: Arc<AtomicBool>) -> Self {
Finalizer { exit_sender }
}
}
// Implement a destructor for Finalizer.
impl Drop for Finalizer {
fn drop(&mut self) {
self.exit_sender.clone().store(true, Ordering::Relaxed);
}
}
pub struct ReplayStage {
t_replay: JoinHandle<Result<()>>,
2018-05-22 14:26:28 -07:00
}
2019-03-27 04:30:26 -07:00
#[derive(Default)]
struct ForkProgress {
last_entry: Hash,
num_blobs: usize,
started_ms: u64,
}
impl ForkProgress {
pub fn new(last_entry: Hash) -> Self {
Self {
last_entry,
num_blobs: 0,
started_ms: timing::timestamp(),
}
}
}
impl ReplayStage {
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
2019-02-21 21:43:35 -08:00
pub fn new<T>(
my_pubkey: &Pubkey,
vote_account: &Pubkey,
voting_keypair: Option<&Arc<T>>,
2019-02-07 20:52:39 -08:00
blocktree: Arc<Blocktree>,
2019-02-21 11:37:48 -08:00
bank_forks: &Arc<RwLock<BankForks>>,
2018-10-08 19:55:54 -07:00
cluster_info: Arc<RwLock<ClusterInfo>>,
2019-03-04 20:50:02 -08:00
exit: &Arc<AtomicBool>,
ledger_signal_receiver: Receiver<bool>,
2019-02-18 18:08:54 -08:00
subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> (Self, Receiver<(u64, Pubkey)>, Receiver<Vec<u64>>)
2019-02-21 21:43:35 -08:00
where
T: 'static + KeypairUtil + Send + Sync,
{
let (root_slot_sender, root_slot_receiver) = channel();
let (slot_full_sender, slot_full_receiver) = channel();
2019-02-26 21:57:45 -08:00
trace!("replay stage");
let exit_ = exit.clone();
2019-02-26 21:57:45 -08:00
let subscriptions = subscriptions.clone();
let bank_forks = bank_forks.clone();
let poh_recorder = poh_recorder.clone();
let my_pubkey = *my_pubkey;
let mut ticks_per_slot = 0;
let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap(), &my_pubkey);
// Start the replay stage loop
let leader_schedule_cache = leader_schedule_cache.clone();
let vote_account = *vote_account;
let voting_keypair = voting_keypair.cloned();
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_.clone());
let mut progress = HashMap::new();
loop {
2019-02-26 21:57:45 -08:00
let now = Instant::now();
// Stop getting entries if we get exit signal
if exit_.load(Ordering::Relaxed) {
break;
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
}
Self::generate_new_bank_forks(
&blocktree,
&mut bank_forks.write().unwrap(),
&leader_schedule_cache,
);
let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some();
Self::replay_active_banks(
&blocktree,
&bank_forks,
&my_pubkey,
&mut ticks_per_slot,
&mut progress,
&slot_full_sender,
)?;
if ticks_per_slot == 0 {
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
let bank = frozen_banks.values().next().unwrap();
ticks_per_slot = bank.ticks_per_slot();
}
2019-03-27 04:30:26 -07:00
let votable =
Self::generate_votable_banks(&bank_forks, &locktower, &mut progress);
if let Some((_, bank)) = votable.last() {
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
2019-02-26 21:57:45 -08:00
Self::handle_votable_bank(
&bank,
&bank_forks,
&mut locktower,
&mut progress,
&vote_account,
&voting_keypair,
&cluster_info,
&blocktree,
&leader_schedule_cache,
&root_slot_sender,
)?;
Self::reset_poh_recorder(
&my_pubkey,
&blocktree,
&bank,
&poh_recorder,
ticks_per_slot,
&leader_schedule_cache,
);
is_tpu_bank_active = false;
}
let (reached_leader_tick, grace_ticks) = if !is_tpu_bank_active {
let poh = poh_recorder.lock().unwrap();
poh.reached_leader_tick()
} else {
(false, 0)
};
if !is_tpu_bank_active {
assert!(ticks_per_slot > 0);
let poh_tick_height = poh_recorder.lock().unwrap().tick_height();
let poh_slot = leader_schedule_utils::tick_height_to_slot(
ticks_per_slot,
poh_tick_height + 1,
);
Self::start_leader(
&my_pubkey,
&bank_forks,
&poh_recorder,
&cluster_info,
poh_slot,
reached_leader_tick,
grace_ticks,
&leader_schedule_cache,
);
}
2019-02-26 21:57:45 -08:00
inc_new_counter_info!(
"replicate_stage-duration",
duration_as_ms(&now.elapsed()) as usize
);
let timer = Duration::from_millis(100);
let result = ledger_signal_receiver.recv_timeout(timer);
match result {
Err(RecvTimeoutError::Timeout) => continue,
Err(_) => break,
Ok(_) => trace!("blocktree signal"),
2019-02-26 21:57:45 -08:00
};
2018-05-30 13:38:15 -07:00
}
2019-02-26 21:57:45 -08:00
Ok(())
})
.unwrap();
(Self { t_replay }, slot_full_receiver, root_slot_receiver)
}
pub fn start_leader(
my_pubkey: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
poh_slot: u64,
reached_leader_tick: bool,
grace_ticks: u64,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) {
trace!("{} checking poh slot {}", my_pubkey, poh_slot);
if bank_forks.read().unwrap().get(poh_slot).is_none() {
let parent_slot = poh_recorder.lock().unwrap().start_slot();
let parent = {
let r_bf = bank_forks.read().unwrap();
r_bf.get(parent_slot)
.expect("start slot doesn't exist in bank forks")
.clone()
};
assert!(parent.is_frozen());
leader_schedule_cache.slot_leader_at(poh_slot, Some(&parent))
.map(|next_leader| {
debug!(
"me: {} leader {} at poh slot {}",
my_pubkey, next_leader, poh_slot
);
cluster_info.write().unwrap().set_leader(&next_leader);
if next_leader == *my_pubkey && reached_leader_tick {
debug!("{} starting tpu for slot {}", my_pubkey, poh_slot);
datapoint_warn!(
2019-05-10 08:33:58 -07:00
"replay_stage-new_leader",
("count", poh_slot, i64),
("grace", grace_ticks, i64));
let tpu_bank = Bank::new_from_parent(&parent, my_pubkey, poh_slot);
2019-03-18 20:23:34 -07:00
bank_forks.write().unwrap().insert(tpu_bank);
if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {
assert_eq!(
bank_forks.read().unwrap().working_bank().slot(),
tpu_bank.slot()
);
debug!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
my_pubkey,
tpu_bank.slot(),
next_leader
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
}
}
})
.or_else(|| {
warn!("{} No next leader found", my_pubkey);
None
});
}
}
2019-03-27 04:30:26 -07:00
fn replay_blocktree_into_bank(
2019-02-26 21:57:45 -08:00
bank: &Bank,
blocktree: &Blocktree,
2019-03-27 04:30:26 -07:00
progress: &mut HashMap<u64, ForkProgress>,
) -> Result<()> {
2019-02-26 21:57:45 -08:00
let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?;
let len = entries.len();
let result = Self::replay_entries_into_bank(bank, entries, progress, num);
2019-02-26 21:57:45 -08:00
if result.is_ok() {
trace!("verified entries {}", len);
inc_new_counter_info!("replicate-stage_process_entries", len);
} else {
info!("debug to verify entries {}", len);
//TODO: mark this fork as failed
inc_new_counter_error!("replicate-stage_failed_process_entries", len);
2019-02-26 21:57:45 -08:00
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn handle_votable_bank<T>(
bank: &Arc<Bank>,
bank_forks: &Arc<RwLock<BankForks>>,
locktower: &mut Locktower,
2019-03-27 04:30:26 -07:00
progress: &mut HashMap<u64, ForkProgress>,
vote_account: &Pubkey,
voting_keypair: &Option<Arc<T>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
blocktree: &Arc<Blocktree>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
root_slot_sender: &Sender<Vec<u64>>,
) -> Result<()>
where
T: 'static + KeypairUtil + Send + Sync,
{
2019-05-21 21:45:38 -07:00
if let Some(new_root) = locktower.record_vote(bank.slot(), bank.hash()) {
// get the root bank before squash
let root_bank = bank_forks
.read()
.unwrap()
.get(new_root)
.expect("Root bank doesn't exist")
.clone();
let mut rooted_slots = root_bank
.parents()
.into_iter()
.map(|bank| bank.slot())
.collect::<Vec<_>>();
rooted_slots.push(root_bank.slot());
blocktree
.set_roots(&rooted_slots)
.expect("Ledger set roots failed");
// Set root first in leader schedule_cache before bank_forks because bank_forks.root
// is consumed by repair_service to update gossip, so we don't want to get blobs for
// repair on gossip before we update leader schedule, otherwise they may get dropped.
leader_schedule_cache.set_root(new_root);
bank_forks.write().unwrap().set_root(new_root);
Self::handle_new_root(&bank_forks, progress);
root_slot_sender.send(rooted_slots)?;
}
locktower.update_epoch(&bank);
if let Some(ref voting_keypair) = voting_keypair {
let node_keypair = cluster_info.read().unwrap().keypair.clone();
// Send our last few votes along with the new one
let vote_ix = vote_instruction::vote(
&node_keypair.pubkey(),
&vote_account,
&voting_keypair.pubkey(),
locktower.recent_votes(),
);
let mut vote_tx = Transaction::new_unsigned_instructions(vec![vote_ix]);
let blockhash = bank.last_blockhash();
vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash);
vote_tx.partial_sign(&[voting_keypair.as_ref()], blockhash);
2019-03-25 15:08:22 -07:00
cluster_info.write().unwrap().push_vote(vote_tx);
}
Ok(())
}
fn reset_poh_recorder(
my_pubkey: &Pubkey,
blocktree: &Blocktree,
bank: &Arc<Bank>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
ticks_per_slot: u64,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) {
let next_leader_slot =
leader_schedule_cache.next_leader_slot(&my_pubkey, bank.slot(), &bank, Some(blocktree));
poh_recorder.lock().unwrap().reset(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
next_leader_slot,
ticks_per_slot,
);
debug!(
"{:?} voted and reset poh at {}. next leader slot {:?}",
my_pubkey,
bank.tick_height(),
next_leader_slot
);
}
fn replay_active_banks(
blocktree: &Arc<Blocktree>,
bank_forks: &Arc<RwLock<BankForks>>,
my_pubkey: &Pubkey,
ticks_per_slot: &mut u64,
2019-03-27 04:30:26 -07:00
progress: &mut HashMap<u64, ForkProgress>,
slot_full_sender: &Sender<(u64, Pubkey)>,
) -> Result<()> {
let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks);
for bank_slot in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
*ticks_per_slot = bank.ticks_per_slot();
if bank.collector_id() != *my_pubkey {
Self::replay_blocktree_into_bank(&bank, &blocktree, progress)?;
}
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
if bank.tick_height() == max_tick_height {
Self::process_completed_bank(my_pubkey, bank, slot_full_sender);
}
}
Ok(())
}
fn generate_votable_banks(
bank_forks: &Arc<RwLock<BankForks>>,
locktower: &Locktower,
2019-03-27 04:30:26 -07:00
progress: &mut HashMap<u64, ForkProgress>,
) -> Vec<(u128, Arc<Bank>)> {
let locktower_start = Instant::now();
// Locktower voting
let descendants = bank_forks.read().unwrap().descendants();
let ancestors = bank_forks.read().unwrap().ancestors();
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
trace!("frozen_banks {}", frozen_banks.len());
let mut votable: Vec<(u128, Arc<Bank>)> = frozen_banks
.values()
.filter(|b| {
let is_votable = b.is_votable();
trace!("bank is votable: {} {}", b.slot(), is_votable);
is_votable
})
.filter(|b| {
let is_recent_epoch = locktower.is_recent_epoch(b);
trace!("bank is is_recent_epoch: {} {}", b.slot(), is_recent_epoch);
is_recent_epoch
})
.filter(|b| {
let has_voted = locktower.has_voted(b.slot());
trace!("bank is has_voted: {} {}", b.slot(), has_voted);
!has_voted
})
.filter(|b| {
let is_locked_out = locktower.is_locked_out(b.slot(), &descendants);
trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out);
!is_locked_out
})
.map(|bank| {
(
bank,
2019-04-05 14:23:00 -07:00
locktower.collect_vote_lockouts(
bank.slot(),
bank.vote_accounts().into_iter(),
&ancestors,
),
)
})
.filter(|(b, stake_lockouts)| {
let vote_threshold =
locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts);
Self::confirm_forks(locktower, stake_lockouts, progress, bank_forks);
debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
vote_threshold
})
.map(|(b, stake_lockouts)| (locktower.calculate_weight(&stake_lockouts), b.clone()))
.collect();
votable.sort_by_key(|b| b.0);
let ms = timing::duration_as_ms(&locktower_start.elapsed());
trace!("votable_banks {}", votable.len());
if !votable.is_empty() {
let weights: Vec<u128> = votable.iter().map(|x| x.0).collect();
info!(
"@{:?} locktower duration: {:?} len: {} weights: {:?}",
timing::timestamp(),
ms,
votable.len(),
weights
);
}
inc_new_counter_info!("replay_stage-locktower_duration", ms as usize);
votable
}
2019-03-27 04:30:26 -07:00
fn confirm_forks(
locktower: &Locktower,
stake_lockouts: &HashMap<u64, StakeLockout>,
progress: &mut HashMap<u64, ForkProgress>,
bank_forks: &Arc<RwLock<BankForks>>,
2019-03-27 04:30:26 -07:00
) {
progress.retain(|slot, prog| {
let duration = timing::timestamp() - prog.started_ms;
if locktower.is_slot_confirmed(*slot, stake_lockouts)
&& bank_forks
.read()
.unwrap()
.get(*slot)
.map(|s| s.is_frozen())
.unwrap_or(true)
{
2019-03-27 04:35:47 -07:00
info!("validator fork confirmed {} {}", *slot, duration);
datapoint_warn!("validator-confirmation", ("duration_ms", duration, i64));
false
} else {
debug!(
"validator fork not confirmed {} {} {:?}",
*slot,
duration,
stake_lockouts.get(slot)
);
true
2019-03-27 04:30:26 -07:00
}
});
2019-03-27 04:30:26 -07:00
}
fn load_blocktree_entries(
2019-02-26 21:57:45 -08:00
bank: &Bank,
blocktree: &Blocktree,
2019-03-27 04:30:26 -07:00
progress: &mut HashMap<u64, ForkProgress>,
) -> Result<(Vec<Entry>, usize)> {
2019-03-04 16:40:28 -08:00
let bank_slot = bank.slot();
2019-03-02 10:20:10 -08:00
let bank_progress = &mut progress
2019-03-04 16:40:28 -08:00
.entry(bank_slot)
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
2019-03-27 04:30:26 -07:00
blocktree.get_slot_entries_with_blob_count(bank_slot, bank_progress.num_blobs as u64, None)
2019-02-26 21:57:45 -08:00
}
2019-03-27 04:30:26 -07:00
fn replay_entries_into_bank(
2019-02-26 21:57:45 -08:00
bank: &Bank,
entries: Vec<Entry>,
2019-03-27 04:30:26 -07:00
progress: &mut HashMap<u64, ForkProgress>,
2019-02-26 21:57:45 -08:00
num: usize,
) -> Result<()> {
2019-03-02 10:20:10 -08:00
let bank_progress = &mut progress
.entry(bank.slot())
.or_insert_with(|| ForkProgress::new(bank.last_blockhash()));
2019-03-27 04:30:26 -07:00
let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.last_entry);
bank_progress.num_blobs += num;
2019-02-26 21:57:45 -08:00
if let Some(last_entry) = entries.last() {
2019-03-27 04:30:26 -07:00
bank_progress.last_entry = last_entry.hash;
2019-02-26 21:57:45 -08:00
}
result
}
pub fn verify_and_process_entries(
bank: &Bank,
entries: &[Entry],
last_entry: &Hash,
) -> Result<()> {
2019-02-26 21:57:45 -08:00
if !entries.verify(last_entry) {
trace!(
"entry verification failed {} {} {} {}",
entries.len(),
bank.tick_height(),
last_entry,
2019-03-02 10:25:16 -08:00
bank.last_blockhash()
2019-02-26 21:57:45 -08:00
);
return Err(Error::BlobError(BlobError::VerificationFailed));
2019-02-26 21:57:45 -08:00
}
blocktree_processor::process_entries(bank, entries)?;
2019-02-26 21:57:45 -08:00
Ok(())
}
2019-03-19 17:30:36 -07:00
fn handle_new_root(
bank_forks: &Arc<RwLock<BankForks>>,
2019-03-27 04:30:26 -07:00
progress: &mut HashMap<u64, ForkProgress>,
2019-03-19 17:30:36 -07:00
) {
let r_bank_forks = bank_forks.read().unwrap();
progress.retain(|k, _| r_bank_forks.get(*k).is_some());
}
fn process_completed_bank(
my_pubkey: &Pubkey,
bank: Arc<Bank>,
slot_full_sender: &Sender<(u64, Pubkey)>,
) {
bank.freeze();
info!("bank frozen {}", bank.slot());
if let Err(e) = slot_full_sender.send((bank.slot(), bank.collector_id())) {
trace!("{} slot_full alert failed: {:?}", my_pubkey, e);
}
}
fn generate_new_bank_forks(
blocktree: &Blocktree,
forks: &mut BankForks,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) {
// Find the next slot that chains to the old slot
2019-02-26 21:57:45 -08:00
let frozen_banks = forks.frozen_banks();
2019-03-04 16:40:28 -08:00
let frozen_bank_slots: Vec<u64> = frozen_banks.keys().cloned().collect();
trace!("frozen_banks {:?}", frozen_bank_slots);
2019-02-26 21:57:45 -08:00
let next_slots = blocktree
2019-03-04 16:40:28 -08:00
.get_slots_since(&frozen_bank_slots)
2019-02-26 21:57:45 -08:00
.expect("Db error");
// Filter out what we've already seen
trace!("generate new forks {:?}", next_slots);
2019-02-26 21:57:45 -08:00
for (parent_id, children) in next_slots {
let parent_bank = frozen_banks
.get(&parent_id)
.expect("missing parent in bank forks")
.clone();
for child_id in children {
if forks.get(child_id).is_some() {
trace!("child already active or frozen {}", child_id);
continue;
2019-02-28 19:49:22 -08:00
}
let leader = leader_schedule_cache
.slot_leader_at(child_id, Some(&parent_bank))
.unwrap();
info!("new fork:{} parent:{}", child_id, parent_id);
2019-03-18 20:23:34 -07:00
forks.insert(Bank::new_from_parent(&parent_bank, &leader, child_id));
2019-02-26 21:57:45 -08:00
}
}
}
2018-05-22 14:26:28 -07:00
}
impl Service for ReplayStage {
type JoinReturnType = ();
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
fn join(self) -> thread::Result<()> {
2019-02-26 21:57:45 -08:00
self.t_replay.join().map(|_| ())
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
}
}
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
#[cfg(test)]
mod test {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::genesis_utils::create_genesis_block;
use crate::packet::Blob;
use crate::replay_stage::ReplayStage;
2018-11-16 08:04:46 -08:00
use solana_sdk::hash::Hash;
2018-10-17 13:42:54 -07:00
use std::fs::remove_dir_all;
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
use std::sync::{Arc, RwLock};
#[test]
fn test_child_slots_of_same_parent() {
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let genesis_block = create_genesis_block(10_000).genesis_block;
let bank0 = Bank::new(&genesis_block);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let mut bank_forks = BankForks::new(0, bank0);
bank_forks.working_bank().freeze();
// Insert blob for slot 1, generate new forks, check result
let mut blob_slot_1 = Blob::default();
blob_slot_1.set_slot(1);
blob_slot_1.set_parent(0);
blocktree.insert_data_blobs(&vec![blob_slot_1]).unwrap();
assert!(bank_forks.get(1).is_none());
ReplayStage::generate_new_bank_forks(
&blocktree,
&mut bank_forks,
&leader_schedule_cache,
);
assert!(bank_forks.get(1).is_some());
2019-03-18 20:23:34 -07:00
// Insert blob for slot 3, generate new forks, check result
let mut blob_slot_2 = Blob::default();
blob_slot_2.set_slot(2);
blob_slot_2.set_parent(0);
blocktree.insert_data_blobs(&vec![blob_slot_2]).unwrap();
assert!(bank_forks.get(2).is_none());
ReplayStage::generate_new_bank_forks(
&blocktree,
&mut bank_forks,
&leader_schedule_cache,
);
assert!(bank_forks.get(1).is_some());
assert!(bank_forks.get(2).is_some());
}
let _ignored = remove_dir_all(&ledger_path);
}
2019-03-19 17:30:36 -07:00
#[test]
fn test_handle_new_root() {
let genesis_block = create_genesis_block(10_000).genesis_block;
let bank0 = Bank::new(&genesis_block);
2019-03-19 17:30:36 -07:00
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0)));
let mut progress = HashMap::new();
2019-03-27 04:30:26 -07:00
progress.insert(5, ForkProgress::new(Hash::default()));
2019-03-19 17:30:36 -07:00
ReplayStage::handle_new_root(&bank_forks, &mut progress);
assert!(progress.is_empty());
}
}