solana/core/src/replay_stage.rs

659 lines
26 KiB
Rust
Raw Normal View History

//! The `replay_stage` replays transactions broadcast by the leader.
2018-05-22 14:26:28 -07:00
2019-02-21 11:37:48 -08:00
use crate::bank_forks::BankForks;
2019-02-07 20:52:39 -08:00
use crate::blocktree::Blocktree;
2019-02-26 21:57:45 -08:00
use crate::blocktree_processor;
2018-12-07 19:16:27 -08:00
use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntryReceiver, EntrySender, EntrySlice};
use crate::leader_schedule_utils;
use crate::locktower::Locktower;
2018-12-07 19:16:27 -08:00
use crate::packet::BlobError;
use crate::poh_recorder::PohRecorder;
2019-02-26 21:57:45 -08:00
use crate::result;
2019-02-18 18:08:54 -08:00
use crate::rpc_subscriptions::RpcSubscriptions;
2018-12-07 19:16:27 -08:00
use crate::service::Service;
use solana_metrics::counter::Counter;
2019-03-15 16:36:12 -07:00
use solana_metrics::influxdb;
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
2019-01-30 19:28:48 -08:00
use solana_sdk::pubkey::Pubkey;
2019-02-21 21:43:35 -08:00
use solana_sdk::signature::KeypairUtil;
use solana_sdk::timing::{self, duration_as_ms};
2019-03-02 13:51:26 -08:00
use solana_vote_api::vote_transaction::VoteTransaction;
2019-02-26 21:57:45 -08:00
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;
2018-09-26 20:58:06 -07:00
use std::time::Instant;
2018-05-22 14:26:28 -07:00
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
// Implement a destructor for the ReplayStage thread to signal it exited
// even on panics
struct Finalizer {
exit_sender: Arc<AtomicBool>,
}
impl Finalizer {
fn new(exit_sender: Arc<AtomicBool>) -> Self {
Finalizer { exit_sender }
}
}
// Implement a destructor for Finalizer.
impl Drop for Finalizer {
fn drop(&mut self) {
self.exit_sender.clone().store(true, Ordering::Relaxed);
}
}
pub struct ReplayStage {
2019-02-26 21:57:45 -08:00
t_replay: JoinHandle<result::Result<()>>,
2018-05-22 14:26:28 -07:00
}
impl ReplayStage {
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
2019-02-21 21:43:35 -08:00
pub fn new<T>(
my_id: &Pubkey,
vote_account: &Pubkey,
2019-02-21 21:43:35 -08:00
voting_keypair: Option<Arc<T>>,
2019-02-07 20:52:39 -08:00
blocktree: Arc<Blocktree>,
2019-02-21 11:37:48 -08:00
bank_forks: &Arc<RwLock<BankForks>>,
2018-10-08 19:55:54 -07:00
cluster_info: Arc<RwLock<ClusterInfo>>,
2019-03-04 20:50:02 -08:00
exit: &Arc<AtomicBool>,
ledger_signal_receiver: Receiver<bool>,
2019-02-18 18:08:54 -08:00
subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
) -> (Self, Receiver<(u64, Pubkey)>, EntryReceiver)
2019-02-21 21:43:35 -08:00
where
T: 'static + KeypairUtil + Send + Sync,
{
2019-02-27 10:58:01 -08:00
let (forward_entry_sender, forward_entry_receiver) = channel();
let (slot_full_sender, slot_full_receiver) = channel();
2019-02-26 21:57:45 -08:00
trace!("replay stage");
let exit_ = exit.clone();
2019-02-26 21:57:45 -08:00
let subscriptions = subscriptions.clone();
let bank_forks = bank_forks.clone();
let poh_recorder = poh_recorder.clone();
let my_id = *my_id;
let vote_account = *vote_account;
let mut ticks_per_slot = 0;
let mut locktower = Locktower::new_from_forks(&bank_forks.read().unwrap());
// Start the replay stage loop
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
let _exit = Finalizer::new(exit_.clone());
let mut progress = HashMap::new();
loop {
2019-02-26 21:57:45 -08:00
let now = Instant::now();
// Stop getting entries if we get exit signal
if exit_.load(Ordering::Relaxed) {
break;
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
}
2019-02-26 21:57:45 -08:00
Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap());
let active_banks = bank_forks.read().unwrap().active_banks();
trace!("active banks {:?}", active_banks);
let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some();
2019-03-04 16:40:28 -08:00
for bank_slot in &active_banks {
let bank = bank_forks.read().unwrap().get(*bank_slot).unwrap().clone();
ticks_per_slot = bank.ticks_per_slot();
if bank.collector_id() != my_id {
2019-02-26 21:57:45 -08:00
Self::replay_blocktree_into_bank(
&bank,
2019-02-26 21:57:45 -08:00
&blocktree,
&mut progress,
&forward_entry_sender,
)?;
}
2019-03-04 16:40:28 -08:00
let max_tick_height = (*bank_slot + 1) * bank.ticks_per_slot() - 1;
2019-02-26 21:57:45 -08:00
if bank.tick_height() == max_tick_height {
Self::process_completed_bank(
&my_id,
bank,
&mut progress,
&slot_full_sender,
);
}
}
if ticks_per_slot == 0 {
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
let bank = frozen_banks.values().next().unwrap();
ticks_per_slot = bank.ticks_per_slot();
}
let locktower_start = Instant::now();
// Locktower voting
2019-03-18 15:20:04 -07:00
let descendants = bank_forks.read().unwrap().descendants();
let ancestors = bank_forks.read().unwrap().ancestors();
let frozen_banks = bank_forks.read().unwrap().frozen_banks();
2019-03-19 16:00:52 -07:00
trace!("frozen_banks {}", frozen_banks.len());
let mut votable: Vec<(u128, Arc<Bank>)> = frozen_banks
.values()
2019-03-19 16:00:52 -07:00
.filter(|b| {
let is_votable = b.is_votable();
trace!("bank is votable: {} {}", b.slot(), is_votable);
is_votable
})
.filter(|b| {
let has_voted = locktower.has_voted(b.slot());
trace!("bank is has_voted: {} {}", b.slot(), has_voted);
!has_voted
})
.filter(|b| {
let is_locked_out = locktower.is_locked_out(b.slot(), &descendants);
trace!("bank is is_locked_out: {} {}", b.slot(), is_locked_out);
!is_locked_out
})
.map(|bank| {
(
bank,
locktower.collect_vote_lockouts(
bank.slot(),
bank.vote_accounts(),
&ancestors,
),
)
})
.filter(|(b, stake_lockouts)| {
2019-03-19 16:00:52 -07:00
let vote_threshold =
locktower.check_vote_stake_threshold(b.slot(), &stake_lockouts);
debug!("bank vote_threshold: {} {}", b.slot(), vote_threshold);
2019-03-19 16:00:52 -07:00
vote_threshold
})
.map(|(b, stake_lockouts)| {
(locktower.calculate_weight(&stake_lockouts), b.clone())
})
.collect();
votable.sort_by_key(|b| b.0);
2019-03-19 16:00:52 -07:00
trace!("votable_banks {}", votable.len());
let ms = timing::duration_as_ms(&locktower_start.elapsed());
2019-03-19 16:00:52 -07:00
if !votable.is_empty() {
let weights: Vec<u128> = votable.iter().map(|x| x.0).collect();
info!(
"@{:?} locktower duration: {:?} len: {} weights: {:?}",
timing::timestamp(),
ms,
votable.len(),
weights
);
}
inc_new_counter_info!("replay_stage-locktower_duration", ms as usize);
if let Some((_, bank)) = votable.last() {
subscriptions.notify_subscribers(&bank);
2019-02-26 21:57:45 -08:00
if let Some(ref voting_keypair) = voting_keypair {
let keypair = voting_keypair.as_ref();
let vote = VoteTransaction::new_vote(
&vote_account,
2019-02-26 21:57:45 -08:00
keypair,
bank.slot(),
bank.last_blockhash(),
2019-02-26 21:57:45 -08:00
0,
);
if let Some(new_root) = locktower.record_vote(bank.slot()) {
bank_forks.write().unwrap().set_root(new_root);
2019-03-19 17:30:36 -07:00
Self::handle_new_root(&bank_forks, &mut progress);
}
locktower.update_epoch(&bank);
2019-02-26 21:57:45 -08:00
cluster_info.write().unwrap().push_vote(vote);
}
let next_leader_slot =
leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank);
poh_recorder.lock().unwrap().reset(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
next_leader_slot,
ticks_per_slot,
);
debug!(
"{:?} voted and reset poh at {}. next leader slot {:?}",
my_id,
bank.tick_height(),
next_leader_slot
);
is_tpu_bank_active = false;
}
let (reached_leader_tick, grace_ticks) = if !is_tpu_bank_active {
let poh = poh_recorder.lock().unwrap();
poh.reached_leader_tick()
} else {
(false, 0)
};
if !is_tpu_bank_active {
assert!(ticks_per_slot > 0);
let poh_tick_height = poh_recorder.lock().unwrap().tick_height();
let poh_slot = leader_schedule_utils::tick_height_to_slot(
ticks_per_slot,
poh_tick_height + 1,
);
Self::start_leader(
&my_id,
&bank_forks,
&poh_recorder,
&cluster_info,
&blocktree,
poh_slot,
reached_leader_tick,
grace_ticks,
);
}
2019-02-26 21:57:45 -08:00
inc_new_counter_info!(
"replicate_stage-duration",
duration_as_ms(&now.elapsed()) as usize
);
let timer = Duration::from_millis(100);
let result = ledger_signal_receiver.recv_timeout(timer);
match result {
Err(RecvTimeoutError::Timeout) => continue,
Err(_) => break,
Ok(_) => trace!("blocktree signal"),
2019-02-26 21:57:45 -08:00
};
2018-05-30 13:38:15 -07:00
}
2019-02-26 21:57:45 -08:00
Ok(())
})
.unwrap();
(
2019-03-04 20:50:02 -08:00
Self { t_replay },
slot_full_receiver,
forward_entry_receiver,
)
}
pub fn start_leader(
my_id: &Pubkey,
bank_forks: &Arc<RwLock<BankForks>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
blocktree: &Blocktree,
poh_slot: u64,
reached_leader_tick: bool,
grace_ticks: u64,
) {
trace!("{} checking poh slot {}", my_id, poh_slot);
if blocktree.meta(poh_slot).unwrap().is_some() {
// We've already broadcasted entries for this slot, skip it
return;
}
if bank_forks.read().unwrap().get(poh_slot).is_none() {
let frozen = bank_forks.read().unwrap().frozen_banks();
let parent_slot = poh_recorder.lock().unwrap().start_slot();
assert!(frozen.contains_key(&parent_slot));
let parent = &frozen[&parent_slot];
leader_schedule_utils::slot_leader_at(poh_slot, parent)
.map(|next_leader| {
debug!(
"me: {} leader {} at poh slot {}",
my_id, next_leader, poh_slot
);
cluster_info.write().unwrap().set_leader(&next_leader);
if next_leader == *my_id && reached_leader_tick {
debug!("{} starting tpu for slot {}", my_id, poh_slot);
2019-03-15 16:36:12 -07:00
solana_metrics::submit(
influxdb::Point::new("counter-replay_stage-new_leader")
.add_field(
"count",
influxdb::Value::Integer(poh_slot as i64),
)
.add_field(
"grace",
influxdb::Value::Integer(grace_ticks as i64),
)
2019-03-15 16:36:12 -07:00
.to_owned(),);
let tpu_bank = Bank::new_from_parent(parent, my_id, poh_slot);
2019-03-18 20:23:34 -07:00
bank_forks.write().unwrap().insert(tpu_bank);
if let Some(tpu_bank) = bank_forks.read().unwrap().get(poh_slot).cloned() {
assert_eq!(
bank_forks.read().unwrap().working_bank().slot(),
tpu_bank.slot()
);
debug!(
"poh_recorder new working bank: me: {} next_slot: {} next_leader: {}",
my_id,
tpu_bank.slot(),
next_leader
);
poh_recorder.lock().unwrap().set_bank(&tpu_bank);
}
}
})
.or_else(|| {
warn!("{} No next leader found", my_id);
None
});
}
}
2019-02-26 21:57:45 -08:00
pub fn replay_blocktree_into_bank(
bank: &Bank,
blocktree: &Blocktree,
progress: &mut HashMap<u64, (Hash, usize)>,
forward_entry_sender: &EntrySender,
) -> result::Result<()> {
let (entries, num) = Self::load_blocktree_entries(bank, blocktree, progress)?;
let len = entries.len();
let result =
Self::replay_entries_into_bank(bank, entries, progress, forward_entry_sender, num);
if result.is_ok() {
trace!("verified entries {}", len);
inc_new_counter_info!("replicate-stage_process_entries", len);
} else {
info!("debug to verify entries {}", len);
//TODO: mark this fork as failed
inc_new_counter_info!("replicate-stage_failed_process_entries", len);
}
Ok(())
}
pub fn load_blocktree_entries(
bank: &Bank,
blocktree: &Blocktree,
progress: &mut HashMap<u64, (Hash, usize)>,
) -> result::Result<(Vec<Entry>, usize)> {
2019-03-04 16:40:28 -08:00
let bank_slot = bank.slot();
2019-03-02 10:20:10 -08:00
let bank_progress = &mut progress
2019-03-04 16:40:28 -08:00
.entry(bank_slot)
2019-03-02 10:25:16 -08:00
.or_insert((bank.last_blockhash(), 0));
2019-03-04 16:40:28 -08:00
blocktree.get_slot_entries_with_blob_count(bank_slot, bank_progress.1 as u64, None)
2019-02-26 21:57:45 -08:00
}
pub fn replay_entries_into_bank(
bank: &Bank,
entries: Vec<Entry>,
progress: &mut HashMap<u64, (Hash, usize)>,
forward_entry_sender: &EntrySender,
num: usize,
) -> result::Result<()> {
2019-03-02 10:20:10 -08:00
let bank_progress = &mut progress
.entry(bank.slot())
2019-03-02 10:25:16 -08:00
.or_insert((bank.last_blockhash(), 0));
2019-02-26 21:57:45 -08:00
let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.0);
bank_progress.1 += num;
if let Some(last_entry) = entries.last() {
bank_progress.0 = last_entry.hash;
}
if result.is_ok() {
forward_entry_sender.send(entries)?;
}
result
}
pub fn verify_and_process_entries(
bank: &Bank,
entries: &[Entry],
last_entry: &Hash,
) -> result::Result<()> {
if !entries.verify(last_entry) {
trace!(
"entry verification failed {} {} {} {}",
entries.len(),
bank.tick_height(),
last_entry,
2019-03-02 10:25:16 -08:00
bank.last_blockhash()
2019-02-26 21:57:45 -08:00
);
return Err(result::Error::BlobError(BlobError::VerificationFailed));
}
blocktree_processor::process_entries(bank, entries)?;
2019-02-26 21:57:45 -08:00
Ok(())
}
2019-03-19 17:30:36 -07:00
fn handle_new_root(
bank_forks: &Arc<RwLock<BankForks>>,
progress: &mut HashMap<u64, (Hash, usize)>,
) {
let r_bank_forks = bank_forks.read().unwrap();
progress.retain(|k, _| r_bank_forks.get(*k).is_some());
}
fn process_completed_bank(
my_id: &Pubkey,
bank: Arc<Bank>,
progress: &mut HashMap<u64, (Hash, usize)>,
slot_full_sender: &Sender<(u64, Pubkey)>,
) {
bank.freeze();
info!("bank frozen {}", bank.slot());
progress.remove(&bank.slot());
if let Err(e) = slot_full_sender.send((bank.slot(), bank.collector_id())) {
info!("{} slot_full alert failed: {:?}", my_id, e);
}
}
2019-02-26 21:57:45 -08:00
fn generate_new_bank_forks(blocktree: &Blocktree, forks: &mut BankForks) {
// Find the next slot that chains to the old slot
2019-02-26 21:57:45 -08:00
let frozen_banks = forks.frozen_banks();
2019-03-04 16:40:28 -08:00
let frozen_bank_slots: Vec<u64> = frozen_banks.keys().cloned().collect();
trace!("frozen_banks {:?}", frozen_bank_slots);
2019-02-26 21:57:45 -08:00
let next_slots = blocktree
2019-03-04 16:40:28 -08:00
.get_slots_since(&frozen_bank_slots)
2019-02-26 21:57:45 -08:00
.expect("Db error");
// Filter out what we've already seen
trace!("generate new forks {:?}", next_slots);
2019-02-26 21:57:45 -08:00
for (parent_id, children) in next_slots {
let parent_bank = frozen_banks
.get(&parent_id)
.expect("missing parent in bank forks")
.clone();
for child_id in children {
if forks.get(child_id).is_some() {
trace!("child already active or frozen {}", child_id);
continue;
2019-02-28 19:49:22 -08:00
}
let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank).unwrap();
info!("new fork:{} parent:{}", child_id, parent_id);
2019-03-18 20:23:34 -07:00
forks.insert(Bank::new_from_parent(&parent_bank, &leader, child_id));
2019-02-26 21:57:45 -08:00
}
}
}
2018-05-22 14:26:28 -07:00
}
impl Service for ReplayStage {
type JoinReturnType = ();
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
fn join(self) -> thread::Result<()> {
2019-02-26 21:57:45 -08:00
self.t_replay.join().map(|_| ())
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
}
}
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
#[cfg(test)]
mod test {
use super::*;
use crate::banking_stage::create_test_recorder;
use crate::blocktree::{create_new_tmp_ledger, get_tmp_ledger_path};
2019-02-07 20:52:39 -08:00
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::create_ticks;
use crate::entry::{next_entry_mut, Entry};
2019-02-21 11:37:48 -08:00
use crate::fullnode::new_banks_from_blocktree;
use crate::packet::Blob;
use crate::replay_stage::ReplayStage;
2019-02-26 21:57:45 -08:00
use crate::result::Error;
use solana_sdk::genesis_block::GenesisBlock;
2018-11-16 08:04:46 -08:00
use solana_sdk::hash::Hash;
2018-12-03 10:26:28 -08:00
use solana_sdk::signature::{Keypair, KeypairUtil};
2018-10-17 13:42:54 -07:00
use std::fs::remove_dir_all;
Leader scheduler plumbing (#1440) * Added LeaderScheduler module and tests * plumbing for LeaderScheduler in Fullnode + tests. Add vote processing for active set to ReplicateStage and WriteStage * Add LeaderScheduler plumbing for Tvu, window, and tests * Fix bank and switch tests to use new LeaderScheduler * move leader rotation check from window service to replicate stage * Add replicate_stage leader rotation exit test * removed leader scheduler from the window service and associated modules/tests * Corrected is_leader calculation in repair() function in window.rs * Integrate LeaderScheduler with write_stage for leader to validator transitions * Integrated LeaderScheduler with BroadcastStage * Removed gossip leader rotation from crdt * Add multi validator, leader test * Comments and cleanup * Remove unneeded checks from broadcast stage * Fix case where a validator/leader need to immediately transition on startup after reading ledger and seeing they are not in the correct role * Set new leader in validator -> validator transitions * Clean up for PR comments, refactor LeaderScheduler from process_entry/process_ledger_tail * Cleaned out LeaderScheduler options, implemented LeaderScheduler strategy that only picks the bootstrap leader to support existing tests, drone/airdrops * Ignore test_full_leader_validator_network test due to bug where the next leader in line fails to get the last entry before rotation (b/c it hasn't started up yet). Added a test test_dropped_handoff_recovery go track this bug
2018-10-10 16:49:41 -07:00
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
#[test]
fn test_vote_error_replay_stage_correctness() {
2019-02-26 21:57:45 -08:00
solana_logger::setup();
// Set up dummy node to host a ReplayStage
let my_keypair = Keypair::new();
let my_id = my_keypair.pubkey();
let my_node = Node::new_localhost_with_pubkey(&my_id);
// Create keypair for the leader
let leader_id = Keypair::new().pubkey();
let (genesis_block, _mint_keypair) = GenesisBlock::new_with_leader(10_000, &leader_id, 500);
2019-02-22 21:55:46 -08:00
2019-03-02 10:25:16 -08:00
let (my_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
// Set up the cluster info
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
my_node.info.clone(),
)));
// Set up the replay stage
{
2019-03-04 20:50:02 -08:00
let voting_keypair = Arc::new(Keypair::new());
let (bank_forks, _bank_forks_info, blocktree, l_receiver) =
2019-02-25 21:22:00 -08:00
new_banks_from_blocktree(&my_ledger_path, None);
2019-02-21 11:37:48 -08:00
let bank = bank_forks.working_bank();
2019-02-06 19:21:31 -08:00
2019-02-07 20:52:39 -08:00
let blocktree = Arc::new(blocktree);
2019-03-04 20:50:02 -08:00
let (exit, poh_recorder, poh_service, _entry_receiver) = create_test_recorder(&bank);
let (replay_stage, _slot_full_receiver, ledger_writer_recv) = ReplayStage::new(
&my_keypair.pubkey(),
&voting_keypair.pubkey(),
Some(voting_keypair.clone()),
2019-02-07 20:52:39 -08:00
blocktree.clone(),
2019-02-21 11:37:48 -08:00
&Arc::new(RwLock::new(bank_forks)),
cluster_info_me.clone(),
2019-03-04 20:50:02 -08:00
&exit,
l_receiver,
2019-02-18 18:08:54 -08:00
&Arc::new(RpcSubscriptions::default()),
&poh_recorder,
);
let keypair = voting_keypair.as_ref();
let vote =
VoteTransaction::new_vote(&keypair.pubkey(), keypair, 0, bank.last_blockhash(), 0);
cluster_info_me.write().unwrap().push_vote(vote);
info!("Send ReplayStage an entry, should see it on the ledger writer receiver");
2019-03-02 10:25:16 -08:00
let next_tick = create_ticks(1, bank.last_blockhash());
blocktree
.write_entries(1, 0, 0, genesis_block.ticks_per_slot, next_tick.clone())
.unwrap();
let received_tick = ledger_writer_recv
.recv()
.expect("Expected to receive an entry on the ledger writer receiver");
assert_eq!(next_tick[0], received_tick[0]);
2019-03-04 20:50:02 -08:00
exit.store(true, Ordering::Relaxed);
replay_stage.join().unwrap();
poh_service.join().unwrap();
}
let _ignored = remove_dir_all(&my_ledger_path);
}
#[test]
2019-02-26 21:57:45 -08:00
fn test_replay_stage_poh_ok_entry_receiver() {
let (forward_entry_sender, forward_entry_receiver) = channel();
let genesis_block = GenesisBlock::new(10_000).0;
let bank = Arc::new(Bank::new(&genesis_block));
2019-03-02 10:25:16 -08:00
let mut blockhash = bank.last_blockhash();
let mut entries = Vec::new();
for _ in 0..5 {
2019-03-02 10:25:16 -08:00
let entry = next_entry_mut(&mut blockhash, 1, vec![]); //just ticks
entries.push(entry);
}
2019-02-26 21:57:45 -08:00
let mut progress = HashMap::new();
let res = ReplayStage::replay_entries_into_bank(
2019-02-16 08:23:29 -08:00
&bank,
2019-02-26 21:57:45 -08:00
entries.clone(),
&mut progress,
&forward_entry_sender,
2019-02-26 21:57:45 -08:00
0,
);
2019-02-26 21:57:45 -08:00
assert!(res.is_ok(), "replay failed {:?}", res);
let res = forward_entry_receiver.try_recv();
match res {
Ok(_) => (),
Err(e) => assert!(false, "Entries were not sent correctly {:?}", e),
}
2019-02-26 21:57:45 -08:00
}
2019-02-26 21:57:45 -08:00
#[test]
fn test_replay_stage_poh_error_entry_receiver() {
let (forward_entry_sender, forward_entry_receiver) = channel();
let mut entries = Vec::new();
for _ in 0..5 {
2019-02-19 22:18:57 -08:00
let entry = Entry::new(&mut Hash::default(), 1, vec![]); //just broken entries
entries.push(entry);
}
2019-02-26 21:57:45 -08:00
let genesis_block = GenesisBlock::new(10_000).0;
let bank = Arc::new(Bank::new(&genesis_block));
2019-02-26 21:57:45 -08:00
let mut progress = HashMap::new();
let res = ReplayStage::replay_entries_into_bank(
2019-02-16 08:23:29 -08:00
&bank,
2019-02-26 21:57:45 -08:00
entries.clone(),
&mut progress,
&forward_entry_sender,
2019-02-26 21:57:45 -08:00
0,
);
match res {
Ok(_) => assert!(false, "Should have failed because entries are broken"),
Err(Error::BlobError(BlobError::VerificationFailed)) => (),
Err(e) => assert!(
false,
"Should have failed because with blob error, instead, got {:?}",
e
),
}
2019-02-26 21:57:45 -08:00
assert!(forward_entry_receiver.try_recv().is_err());
}
#[test]
fn test_child_slots_of_same_parent() {
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
let genesis_block = GenesisBlock::new(10_000).0;
let bank0 = Bank::new(&genesis_block);
let mut bank_forks = BankForks::new(0, bank0);
bank_forks.working_bank().freeze();
// Insert blob for slot 1, generate new forks, check result
let mut blob_slot_1 = Blob::default();
blob_slot_1.set_slot(1);
blob_slot_1.set_parent(0);
blocktree.insert_data_blobs(&vec![blob_slot_1]).unwrap();
assert!(bank_forks.get(1).is_none());
ReplayStage::generate_new_bank_forks(&blocktree, &mut bank_forks);
assert!(bank_forks.get(1).is_some());
2019-03-18 20:23:34 -07:00
// Insert blob for slot 3, generate new forks, check result
let mut blob_slot_2 = Blob::default();
blob_slot_2.set_slot(2);
blob_slot_2.set_parent(0);
blocktree.insert_data_blobs(&vec![blob_slot_2]).unwrap();
assert!(bank_forks.get(2).is_none());
ReplayStage::generate_new_bank_forks(&blocktree, &mut bank_forks);
assert!(bank_forks.get(1).is_some());
assert!(bank_forks.get(2).is_some());
}
let _ignored = remove_dir_all(&ledger_path);
}
2019-03-19 17:30:36 -07:00
#[test]
fn test_handle_new_root() {
let bank0 = Bank::default();
let bank_forks = Arc::new(RwLock::new(BankForks::new(0, bank0)));
let mut progress = HashMap::new();
progress.insert(5, (Hash::default(), 0));
ReplayStage::handle_new_root(&bank_forks, &mut progress);
assert!(progress.is_empty());
}
}