2018-12-07 14:09:29 -08:00
|
|
|
//! The `replay_stage` replays transactions broadcast by the leader.
|
2018-05-22 14:26:28 -07:00
|
|
|
|
2019-02-21 11:37:48 -08:00
|
|
|
use crate::bank_forks::BankForks;
|
2019-02-07 20:52:39 -08:00
|
|
|
use crate::blocktree::Blocktree;
|
2019-02-21 11:37:48 -08:00
|
|
|
use crate::blocktree_processor::{self, BankForksInfo};
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::cluster_info::ClusterInfo;
|
2019-02-25 14:33:42 -08:00
|
|
|
use crate::entry::{Entry, EntryMeta, EntryReceiver, EntrySender, EntrySlice};
|
2019-02-16 08:23:29 -08:00
|
|
|
use crate::leader_scheduler::LeaderScheduler;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::packet::BlobError;
|
|
|
|
use crate::result::{Error, Result};
|
2019-02-18 18:08:54 -08:00
|
|
|
use crate::rpc_subscriptions::RpcSubscriptions;
|
2018-12-07 19:16:27 -08:00
|
|
|
use crate::service::Service;
|
2019-02-21 11:37:48 -08:00
|
|
|
use crate::tvu::{TvuRotationInfo, TvuRotationSender};
|
2019-02-18 22:26:22 -08:00
|
|
|
use solana_metrics::counter::Counter;
|
2018-11-16 08:45:59 -08:00
|
|
|
use solana_metrics::{influxdb, submit};
|
2019-02-18 22:26:22 -08:00
|
|
|
use solana_runtime::bank::Bank;
|
2019-01-26 00:28:08 -08:00
|
|
|
use solana_sdk::hash::Hash;
|
2019-01-30 19:28:48 -08:00
|
|
|
use solana_sdk::pubkey::Pubkey;
|
2019-02-21 21:43:35 -08:00
|
|
|
use solana_sdk::signature::KeypairUtil;
|
2018-11-16 08:45:59 -08:00
|
|
|
use solana_sdk::timing::duration_as_ms;
|
2019-01-31 19:53:48 -08:00
|
|
|
use solana_sdk::vote_transaction::VoteTransaction;
|
2019-02-13 20:04:20 -08:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2019-02-17 13:09:28 -08:00
|
|
|
use std::sync::mpsc::RecvTimeoutError;
|
2019-02-17 13:05:47 -08:00
|
|
|
use std::sync::mpsc::{channel, Receiver};
|
2018-07-05 12:01:40 -07:00
|
|
|
use std::sync::{Arc, RwLock};
|
2018-07-03 21:14:08 -07:00
|
|
|
use std::thread::{self, Builder, JoinHandle};
|
2019-02-10 16:28:52 -08:00
|
|
|
use std::time::Duration;
|
2018-09-26 20:58:06 -07:00
|
|
|
use std::time::Instant;
|
2018-05-22 14:26:28 -07:00
|
|
|
|
2018-12-13 18:43:10 -08:00
|
|
|
pub const MAX_ENTRY_RECV_PER_ITER: usize = 512;
|
|
|
|
|
2018-12-07 14:09:29 -08:00
|
|
|
// Implement a destructor for the ReplayStage thread to signal it exited
|
2018-09-25 15:41:29 -07:00
|
|
|
// even on panics
|
|
|
|
struct Finalizer {
|
|
|
|
exit_sender: Arc<AtomicBool>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Finalizer {
|
|
|
|
fn new(exit_sender: Arc<AtomicBool>) -> Self {
|
|
|
|
Finalizer { exit_sender }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Implement a destructor for Finalizer.
|
|
|
|
impl Drop for Finalizer {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.exit_sender.clone().store(true, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-12-07 14:09:29 -08:00
|
|
|
pub struct ReplayStage {
|
2019-01-26 00:28:08 -08:00
|
|
|
t_replay: JoinHandle<()>,
|
2019-02-04 15:33:43 -08:00
|
|
|
exit: Arc<AtomicBool>,
|
2018-05-22 14:26:28 -07:00
|
|
|
}
|
|
|
|
|
2018-12-07 14:09:29 -08:00
|
|
|
impl ReplayStage {
|
2018-06-15 14:27:06 -07:00
|
|
|
/// Process entry blobs, already in order
|
2019-01-05 12:57:52 -08:00
|
|
|
#[allow(clippy::too_many_arguments)]
|
2019-02-21 21:43:35 -08:00
|
|
|
fn process_entries<T: KeypairUtil>(
|
2019-02-04 15:33:43 -08:00
|
|
|
mut entries: Vec<Entry>,
|
2018-07-05 12:01:40 -07:00
|
|
|
bank: &Arc<Bank>,
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
2019-02-21 21:43:35 -08:00
|
|
|
voting_keypair: &Option<Arc<T>>,
|
2018-10-23 14:42:48 -07:00
|
|
|
ledger_entry_sender: &EntrySender,
|
2019-02-07 15:10:54 -08:00
|
|
|
current_blob_index: &mut u64,
|
2019-02-26 17:29:38 -08:00
|
|
|
last_entry_id: &mut Hash,
|
2019-02-18 18:08:54 -08:00
|
|
|
subscriptions: &Arc<RpcSubscriptions>,
|
2019-02-25 14:33:42 -08:00
|
|
|
slot: u64,
|
|
|
|
parent_slot: Option<u64>,
|
2018-11-12 11:40:32 -08:00
|
|
|
) -> Result<()> {
|
2019-02-05 08:03:52 -08:00
|
|
|
// Coalesce all the available entries into a single vote
|
2018-11-16 08:45:59 -08:00
|
|
|
submit(
|
2018-12-07 15:18:09 -08:00
|
|
|
influxdb::Point::new("replicate-stage")
|
2018-10-20 06:38:20 -07:00
|
|
|
.add_field("count", influxdb::Value::Integer(entries.len() as i64))
|
2018-10-16 12:54:23 -07:00
|
|
|
.to_owned(),
|
|
|
|
);
|
|
|
|
|
2018-10-10 16:49:41 -07:00
|
|
|
let mut res = Ok(());
|
2018-11-12 11:40:32 -08:00
|
|
|
let mut num_entries_to_write = entries.len();
|
2018-11-12 12:41:19 -08:00
|
|
|
let now = Instant::now();
|
2019-02-10 09:38:09 -08:00
|
|
|
|
2019-02-26 17:29:38 -08:00
|
|
|
if !entries.as_slice().verify(last_entry_id) {
|
2018-12-07 15:18:09 -08:00
|
|
|
inc_new_counter_info!("replicate_stage-verify-fail", entries.len());
|
2018-11-12 12:41:19 -08:00
|
|
|
return Err(Error::BlobError(BlobError::VerificationFailed));
|
|
|
|
}
|
|
|
|
inc_new_counter_info!(
|
2018-12-07 15:18:09 -08:00
|
|
|
"replicate_stage-verify-duration",
|
2018-11-12 12:41:19 -08:00
|
|
|
duration_as_ms(&now.elapsed()) as usize
|
|
|
|
);
|
2018-12-12 15:58:29 -08:00
|
|
|
|
2019-02-10 16:28:52 -08:00
|
|
|
let num_ticks = bank.tick_height();
|
2019-02-26 21:16:18 -08:00
|
|
|
let slot_height = bank.slot_height();
|
|
|
|
let leader_id = LeaderScheduler::default().slot_leader(bank);
|
|
|
|
let mut num_ticks_to_next_vote = LeaderScheduler::num_ticks_left_in_slot(bank, num_ticks);
|
2018-12-18 16:06:05 -08:00
|
|
|
|
2019-02-25 14:33:42 -08:00
|
|
|
let mut entry_tick_height = num_ticks;
|
|
|
|
let mut entries_with_meta = Vec::new();
|
2018-11-12 11:40:32 -08:00
|
|
|
for (i, entry) in entries.iter().enumerate() {
|
2018-12-18 16:06:05 -08:00
|
|
|
inc_new_counter_info!("replicate-stage_bank-tick", bank.tick_height() as usize);
|
|
|
|
if entry.is_tick() {
|
2019-02-25 14:33:42 -08:00
|
|
|
entry_tick_height += 1;
|
2019-02-05 08:03:52 -08:00
|
|
|
if num_ticks_to_next_vote == 0 {
|
2019-02-22 19:50:59 -08:00
|
|
|
num_ticks_to_next_vote = bank.ticks_per_slot();
|
2019-02-05 08:03:52 -08:00
|
|
|
}
|
2018-12-18 16:06:05 -08:00
|
|
|
num_ticks_to_next_vote -= 1;
|
2018-12-13 18:43:10 -08:00
|
|
|
}
|
2018-12-18 16:06:05 -08:00
|
|
|
inc_new_counter_info!(
|
|
|
|
"replicate-stage_tick-to-vote",
|
|
|
|
num_ticks_to_next_vote as usize
|
|
|
|
);
|
2019-02-25 14:33:42 -08:00
|
|
|
entries_with_meta.push(EntryMeta {
|
|
|
|
tick_height: entry_tick_height,
|
|
|
|
slot,
|
2019-02-26 08:38:31 -08:00
|
|
|
slot_leader: leader_id,
|
2019-02-25 14:33:42 -08:00
|
|
|
num_ticks_left_in_slot: num_ticks_to_next_vote,
|
|
|
|
parent_slot,
|
|
|
|
entry: entry.clone(),
|
|
|
|
});
|
2018-12-18 16:06:05 -08:00
|
|
|
// If it's the last entry in the vector, i will be vec len - 1.
|
|
|
|
// If we don't process the entry now, the for loop will exit and the entry
|
|
|
|
// will be dropped.
|
|
|
|
if 0 == num_ticks_to_next_vote || (i + 1) == entries.len() {
|
2019-02-26 21:16:18 -08:00
|
|
|
res = blocktree_processor::process_entries(bank, &entries[0..=i]);
|
2018-12-18 16:06:05 -08:00
|
|
|
|
|
|
|
if res.is_err() {
|
|
|
|
// TODO: This will return early from the first entry that has an erroneous
|
|
|
|
// transaction, instead of processing the rest of the entries in the vector
|
|
|
|
// of received entries. This is in line with previous behavior when
|
|
|
|
// bank.process_entries() was used to process the entries, but doesn't solve the
|
|
|
|
// issue that the bank state was still changed, leading to inconsistencies with the
|
|
|
|
// leader as the leader currently should not be publishing erroneous transactions
|
2019-02-05 08:03:52 -08:00
|
|
|
inc_new_counter_info!("replicate-stage_failed_process_entries", i);
|
2018-12-18 16:06:05 -08:00
|
|
|
break;
|
2018-12-13 18:43:10 -08:00
|
|
|
}
|
|
|
|
|
2018-12-18 16:06:05 -08:00
|
|
|
if 0 == num_ticks_to_next_vote {
|
2019-02-18 18:08:54 -08:00
|
|
|
subscriptions.notify_subscribers(&bank);
|
2019-01-31 20:12:51 -08:00
|
|
|
if let Some(voting_keypair) = voting_keypair {
|
|
|
|
let keypair = voting_keypair.as_ref();
|
2019-02-21 01:43:57 -08:00
|
|
|
let vote =
|
|
|
|
VoteTransaction::new_vote(keypair, slot_height, bank.last_id(), 0);
|
2019-01-31 15:51:29 -08:00
|
|
|
cluster_info.write().unwrap().push_vote(vote);
|
2018-12-18 16:06:05 -08:00
|
|
|
}
|
|
|
|
}
|
2019-02-05 08:03:52 -08:00
|
|
|
num_entries_to_write = i + 1;
|
|
|
|
break;
|
2018-10-10 16:49:41 -07:00
|
|
|
}
|
2018-11-12 11:40:32 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
// If leader rotation happened, only write the entries up to leader rotation.
|
|
|
|
entries.truncate(num_entries_to_write);
|
2019-02-25 14:33:42 -08:00
|
|
|
entries_with_meta.truncate(num_entries_to_write);
|
2019-02-26 17:29:38 -08:00
|
|
|
*last_entry_id = entries
|
2018-11-12 12:41:19 -08:00
|
|
|
.last()
|
|
|
|
.expect("Entries cannot be empty at this point")
|
|
|
|
.id;
|
2018-09-10 08:32:52 -07:00
|
|
|
|
2018-08-06 11:35:45 -07:00
|
|
|
inc_new_counter_info!(
|
2018-12-07 15:18:09 -08:00
|
|
|
"replicate-transactions",
|
2018-07-16 18:33:50 -07:00
|
|
|
entries.iter().map(|x| x.transactions.len()).sum()
|
|
|
|
);
|
2018-08-06 00:59:42 -07:00
|
|
|
|
2018-10-10 16:49:41 -07:00
|
|
|
let entries_len = entries.len() as u64;
|
|
|
|
// TODO: In line with previous behavior, this will write all the entries even if
|
|
|
|
// an error occurred processing one of the entries (causing the rest of the entries to
|
|
|
|
// not be processed).
|
2018-10-23 14:42:48 -07:00
|
|
|
if entries_len != 0 {
|
2019-02-25 14:33:42 -08:00
|
|
|
ledger_entry_sender.send(entries_with_meta)?;
|
2018-08-05 22:04:27 -07:00
|
|
|
}
|
2018-08-03 11:06:06 -07:00
|
|
|
|
2019-02-07 15:10:54 -08:00
|
|
|
*current_blob_index += entries_len;
|
2018-09-03 02:48:11 -07:00
|
|
|
res?;
|
2018-12-13 18:43:10 -08:00
|
|
|
inc_new_counter_info!(
|
|
|
|
"replicate_stage-duration",
|
|
|
|
duration_as_ms(&now.elapsed()) as usize
|
|
|
|
);
|
2018-11-12 11:40:32 -08:00
|
|
|
Ok(())
|
2018-05-22 15:30:46 -07:00
|
|
|
}
|
2018-09-25 15:41:29 -07:00
|
|
|
|
2019-01-29 00:21:27 -08:00
|
|
|
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
|
2019-02-21 21:43:35 -08:00
|
|
|
pub fn new<T>(
|
2019-01-30 19:28:48 -08:00
|
|
|
my_id: Pubkey,
|
2019-02-21 21:43:35 -08:00
|
|
|
voting_keypair: Option<Arc<T>>,
|
2019-02-07 20:52:39 -08:00
|
|
|
blocktree: Arc<Blocktree>,
|
2019-02-21 11:37:48 -08:00
|
|
|
bank_forks: &Arc<RwLock<BankForks>>,
|
|
|
|
bank_forks_info: &[BankForksInfo],
|
2018-10-08 19:55:54 -07:00
|
|
|
cluster_info: Arc<RwLock<ClusterInfo>>,
|
2018-09-25 15:41:29 -07:00
|
|
|
exit: Arc<AtomicBool>,
|
2019-02-10 19:34:18 -08:00
|
|
|
to_leader_sender: &TvuRotationSender,
|
2019-02-04 15:33:43 -08:00
|
|
|
ledger_signal_receiver: Receiver<bool>,
|
2019-02-18 18:08:54 -08:00
|
|
|
subscriptions: &Arc<RpcSubscriptions>,
|
2019-02-21 21:43:35 -08:00
|
|
|
) -> (Self, EntryReceiver)
|
|
|
|
where
|
|
|
|
T: 'static + KeypairUtil + Send + Sync,
|
|
|
|
{
|
2018-10-23 14:42:48 -07:00
|
|
|
let (ledger_entry_sender, ledger_entry_receiver) = channel();
|
2019-02-04 15:33:43 -08:00
|
|
|
let exit_ = exit.clone();
|
2019-02-10 19:34:18 -08:00
|
|
|
let to_leader_sender = to_leader_sender.clone();
|
2019-02-18 18:08:54 -08:00
|
|
|
let subscriptions_ = subscriptions.clone();
|
2019-02-21 13:16:08 -08:00
|
|
|
|
2019-02-24 01:06:46 -08:00
|
|
|
// Gather up all the metadata about the current state of the ledger
|
2019-02-26 17:29:38 -08:00
|
|
|
let (mut bank, tick_height, mut last_entry_id, mut current_blob_index) = {
|
2019-02-21 11:37:48 -08:00
|
|
|
let mut bank_forks = bank_forks.write().unwrap();
|
|
|
|
bank_forks.set_working_bank_id(bank_forks_info[0].bank_id);
|
2019-02-24 01:06:46 -08:00
|
|
|
let bank = bank_forks.working_bank();
|
|
|
|
let tick_height = bank.tick_height();
|
2019-02-22 17:19:33 -08:00
|
|
|
(
|
2019-02-24 01:06:46 -08:00
|
|
|
bank,
|
|
|
|
tick_height,
|
2019-02-22 17:19:33 -08:00
|
|
|
bank_forks_info[0].last_entry_id,
|
2019-02-24 04:37:11 -08:00
|
|
|
bank_forks_info[0].next_blob_index,
|
2019-02-22 17:19:33 -08:00
|
|
|
)
|
2019-02-21 11:37:48 -08:00
|
|
|
};
|
2019-02-24 01:06:46 -08:00
|
|
|
|
|
|
|
// Update Tpu and other fullnode components with the current bank
|
|
|
|
let (mut current_slot, mut current_leader_id, mut max_tick_height_for_slot) = {
|
2019-02-26 21:16:18 -08:00
|
|
|
let slot = (tick_height + 1) / bank.ticks_per_slot();
|
2019-02-24 01:06:46 -08:00
|
|
|
let first_tick_in_slot = slot * bank.ticks_per_slot();
|
2019-02-21 11:37:48 -08:00
|
|
|
|
2019-02-26 21:16:18 -08:00
|
|
|
let leader_id = LeaderScheduler::default().slot_leader_at(slot, &bank);
|
2019-02-21 11:37:48 -08:00
|
|
|
trace!("node {:?} scheduled as leader for slot {}", leader_id, slot,);
|
|
|
|
|
2019-02-24 01:06:46 -08:00
|
|
|
let old_bank = bank.clone();
|
|
|
|
// If the next slot is going to be a new slot and we're the leader for that slot,
|
|
|
|
// make a new working bank, set it as the working bank.
|
2019-02-26 09:18:24 -08:00
|
|
|
if tick_height + 1 == first_tick_in_slot {
|
|
|
|
if leader_id == my_id {
|
|
|
|
bank = Self::create_and_set_working_bank(slot, &bank_forks, &old_bank);
|
|
|
|
}
|
|
|
|
current_blob_index = 0;
|
2019-02-24 01:06:46 -08:00
|
|
|
}
|
|
|
|
|
2019-02-21 11:37:48 -08:00
|
|
|
// Send a rotation notification back to Fullnode to initialize the TPU to the right
|
2019-02-24 01:06:46 -08:00
|
|
|
// state. After this point, the bank.tick_height() is live, which it means it can
|
|
|
|
// be updated by the TPU
|
2019-02-21 11:37:48 -08:00
|
|
|
to_leader_sender
|
|
|
|
.send(TvuRotationInfo {
|
2019-02-24 01:06:46 -08:00
|
|
|
bank: old_bank,
|
2019-02-26 17:29:38 -08:00
|
|
|
last_entry_id,
|
2019-02-21 11:37:48 -08:00
|
|
|
slot,
|
|
|
|
leader_id,
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
|
2019-02-26 21:16:18 -08:00
|
|
|
let max_tick_height_for_slot = first_tick_in_slot
|
|
|
|
+ LeaderScheduler::num_ticks_left_in_slot(&bank, first_tick_in_slot);
|
2019-02-24 01:06:46 -08:00
|
|
|
|
|
|
|
(Some(slot), leader_id, max_tick_height_for_slot)
|
|
|
|
};
|
|
|
|
|
|
|
|
// Start the replay stage loop
|
|
|
|
let bank_forks = bank_forks.clone();
|
2018-12-07 14:09:29 -08:00
|
|
|
let t_replay = Builder::new()
|
|
|
|
.name("solana-replay-stage".to_string())
|
2018-09-25 15:41:29 -07:00
|
|
|
.spawn(move || {
|
2019-02-04 15:33:43 -08:00
|
|
|
let _exit = Finalizer::new(exit_.clone());
|
2019-02-07 15:10:54 -08:00
|
|
|
let mut prev_slot = None;
|
2019-02-19 15:41:40 -08:00
|
|
|
|
2019-02-07 20:52:39 -08:00
|
|
|
// Loop through blocktree MAX_ENTRY_RECV_PER_ITER entries at a time for each
|
2019-02-04 15:33:43 -08:00
|
|
|
// relevant slot to see if there are any available updates
|
2018-09-25 15:41:29 -07:00
|
|
|
loop {
|
2019-02-04 15:33:43 -08:00
|
|
|
// Stop getting entries if we get exit signal
|
|
|
|
if exit_.load(Ordering::Relaxed) {
|
|
|
|
break;
|
2018-10-10 16:49:41 -07:00
|
|
|
}
|
2019-02-17 13:05:47 -08:00
|
|
|
let timer = Duration::from_millis(100);
|
|
|
|
let e = ledger_signal_receiver.recv_timeout(timer);
|
|
|
|
match e {
|
2019-02-17 15:28:20 -08:00
|
|
|
Err(RecvTimeoutError::Timeout) => continue,
|
2019-02-17 13:05:47 -08:00
|
|
|
Err(_) => break,
|
|
|
|
Ok(_) => (),
|
|
|
|
};
|
2019-02-04 15:33:43 -08:00
|
|
|
|
2019-02-07 15:10:54 -08:00
|
|
|
if current_slot.is_none() {
|
|
|
|
let new_slot = Self::get_next_slot(
|
2019-02-07 20:52:39 -08:00
|
|
|
&blocktree,
|
2019-02-07 15:10:54 -08:00
|
|
|
prev_slot.expect("prev_slot must exist"),
|
|
|
|
);
|
|
|
|
if new_slot.is_some() {
|
2019-02-24 01:06:46 -08:00
|
|
|
trace!("{} replay_stage: new_slot found: {:?}", my_id, new_slot);
|
2019-02-07 15:10:54 -08:00
|
|
|
// Reset the state
|
2019-02-24 01:06:46 -08:00
|
|
|
bank = Self::create_and_set_working_bank(
|
|
|
|
new_slot.unwrap(),
|
|
|
|
&bank_forks,
|
|
|
|
&bank,
|
|
|
|
);
|
2019-02-07 15:10:54 -08:00
|
|
|
current_slot = new_slot;
|
2019-02-24 01:06:46 -08:00
|
|
|
Self::reset_state(
|
|
|
|
bank.ticks_per_slot(),
|
|
|
|
current_slot.unwrap(),
|
|
|
|
&mut max_tick_height_for_slot,
|
|
|
|
&mut current_blob_index,
|
|
|
|
);
|
|
|
|
} else {
|
|
|
|
continue;
|
2019-02-07 15:10:54 -08:00
|
|
|
}
|
|
|
|
}
|
2019-02-04 15:33:43 -08:00
|
|
|
|
2019-02-24 01:06:46 -08:00
|
|
|
// current_slot must be Some(x) by this point
|
|
|
|
let slot = current_slot.unwrap();
|
|
|
|
|
|
|
|
// Fetch the next entries from the database
|
2019-02-04 15:33:43 -08:00
|
|
|
let entries = {
|
2019-02-24 01:06:46 -08:00
|
|
|
if current_leader_id != my_id {
|
|
|
|
info!(
|
|
|
|
"{} replay_stage: asking for entries from slot: {}, bi: {}",
|
|
|
|
my_id, slot, current_blob_index
|
|
|
|
);
|
2019-02-07 20:52:39 -08:00
|
|
|
if let Ok(entries) = blocktree.get_slot_entries(
|
2019-02-07 15:10:54 -08:00
|
|
|
slot,
|
|
|
|
current_blob_index,
|
|
|
|
Some(MAX_ENTRY_RECV_PER_ITER as u64),
|
|
|
|
) {
|
|
|
|
entries
|
|
|
|
} else {
|
|
|
|
vec![]
|
|
|
|
}
|
2019-02-04 15:33:43 -08:00
|
|
|
} else {
|
|
|
|
vec![]
|
|
|
|
}
|
|
|
|
};
|
2019-02-25 14:33:42 -08:00
|
|
|
let parent_slot = blocktree.meta(slot).unwrap().map(|meta| meta.parent_slot);
|
2019-02-04 15:33:43 -08:00
|
|
|
|
|
|
|
if !entries.is_empty() {
|
|
|
|
if let Err(e) = Self::process_entries(
|
|
|
|
entries,
|
|
|
|
&bank,
|
|
|
|
&cluster_info,
|
2019-02-21 21:43:35 -08:00
|
|
|
&voting_keypair,
|
2019-02-04 15:33:43 -08:00
|
|
|
&ledger_entry_sender,
|
2019-02-07 15:10:54 -08:00
|
|
|
&mut current_blob_index,
|
2019-02-26 17:29:38 -08:00
|
|
|
&mut last_entry_id,
|
2019-02-18 18:08:54 -08:00
|
|
|
&subscriptions_,
|
2019-02-25 14:33:42 -08:00
|
|
|
slot,
|
|
|
|
parent_slot,
|
2019-02-04 15:33:43 -08:00
|
|
|
) {
|
2019-02-24 01:06:46 -08:00
|
|
|
error!("{} process_entries failed: {:?}", my_id, e);
|
2019-02-04 15:33:43 -08:00
|
|
|
}
|
2019-02-24 01:06:46 -08:00
|
|
|
}
|
2019-02-04 15:33:43 -08:00
|
|
|
|
2019-02-24 01:06:46 -08:00
|
|
|
let current_tick_height = bank.tick_height();
|
|
|
|
|
|
|
|
// We've reached the end of a slot, reset our state and check
|
|
|
|
// for leader rotation
|
|
|
|
if max_tick_height_for_slot == current_tick_height {
|
|
|
|
// Check for leader rotation
|
|
|
|
let (leader_id, next_slot) = {
|
2019-02-26 21:16:18 -08:00
|
|
|
let slot = (current_tick_height + 1) / bank.ticks_per_slot();
|
|
|
|
|
|
|
|
(LeaderScheduler::default().slot_leader_at(slot, &bank), slot)
|
2019-02-24 01:06:46 -08:00
|
|
|
};
|
|
|
|
|
|
|
|
// If we were the leader for the last slot update the last id b/c we
|
|
|
|
// haven't processed any of the entries for the slot for which we were
|
|
|
|
// the leader
|
|
|
|
if current_leader_id == my_id {
|
|
|
|
let meta = blocktree.meta(slot).unwrap().expect("meta has to exist");
|
|
|
|
if meta.last_index == std::u64::MAX {
|
|
|
|
// Ledger hasn't gotten last blob yet, break and wait
|
|
|
|
// for a signal
|
|
|
|
continue;
|
2019-02-04 15:33:43 -08:00
|
|
|
}
|
2019-02-24 01:06:46 -08:00
|
|
|
let last_entry = blocktree
|
|
|
|
.get_slot_entries(slot, meta.last_index, Some(1))
|
|
|
|
.unwrap();
|
2019-02-26 17:29:38 -08:00
|
|
|
last_entry_id = last_entry[0].id;
|
2019-02-24 01:06:46 -08:00
|
|
|
}
|
2019-02-04 15:33:43 -08:00
|
|
|
|
2019-02-24 01:06:46 -08:00
|
|
|
let old_bank = bank.clone();
|
|
|
|
prev_slot = current_slot;
|
|
|
|
if my_id == leader_id {
|
|
|
|
// Create new bank for next slot if we are the leader for that slot
|
|
|
|
bank = Self::create_and_set_working_bank(
|
|
|
|
next_slot,
|
|
|
|
&bank_forks,
|
|
|
|
&old_bank,
|
|
|
|
);
|
|
|
|
current_slot = Some(next_slot);
|
|
|
|
Self::reset_state(
|
|
|
|
bank.ticks_per_slot(),
|
|
|
|
next_slot,
|
|
|
|
&mut max_tick_height_for_slot,
|
|
|
|
&mut current_blob_index,
|
|
|
|
);
|
|
|
|
} else {
|
2019-02-07 15:10:54 -08:00
|
|
|
current_slot = None;
|
2019-02-04 15:33:43 -08:00
|
|
|
}
|
2019-02-24 01:06:46 -08:00
|
|
|
|
|
|
|
if leader_id != current_leader_id {
|
|
|
|
// TODO: Remove this soon once we boot the leader from ClusterInfo
|
|
|
|
cluster_info.write().unwrap().set_leader(leader_id);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Always send rotation signal so that other services like
|
|
|
|
// RPC can be made aware of last slot's bank
|
|
|
|
to_leader_sender
|
|
|
|
.send(TvuRotationInfo {
|
|
|
|
bank: old_bank,
|
2019-02-26 17:29:38 -08:00
|
|
|
last_entry_id,
|
2019-02-24 01:06:46 -08:00
|
|
|
slot: next_slot,
|
|
|
|
leader_id,
|
|
|
|
})
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
// Check for any slots that chain to this one
|
|
|
|
current_leader_id = leader_id;
|
|
|
|
continue;
|
2019-02-04 15:33:43 -08:00
|
|
|
}
|
2018-05-30 13:38:15 -07:00
|
|
|
}
|
2018-12-07 19:01:28 -08:00
|
|
|
})
|
|
|
|
.unwrap();
|
2018-07-19 21:27:35 -07:00
|
|
|
|
2019-02-18 07:00:19 -08:00
|
|
|
(Self { t_replay, exit }, ledger_entry_receiver)
|
2019-02-10 16:28:52 -08:00
|
|
|
}
|
|
|
|
|
2019-02-04 15:33:43 -08:00
|
|
|
pub fn close(self) -> thread::Result<()> {
|
|
|
|
self.exit();
|
|
|
|
self.join()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn exit(&self) {
|
|
|
|
self.exit.store(true, Ordering::Relaxed);
|
|
|
|
}
|
|
|
|
|
2019-02-24 01:06:46 -08:00
|
|
|
fn create_and_set_working_bank(
|
|
|
|
slot: u64,
|
|
|
|
bank_forks: &Arc<RwLock<BankForks>>,
|
|
|
|
parent: &Arc<Bank>,
|
|
|
|
) -> Arc<Bank> {
|
|
|
|
let new_bank = Bank::new_from_parent(&parent);
|
2019-02-26 09:47:28 -08:00
|
|
|
new_bank.squash();
|
2019-02-24 01:06:46 -08:00
|
|
|
let mut bank_forks = bank_forks.write().unwrap();
|
|
|
|
bank_forks.insert(slot, new_bank);
|
|
|
|
bank_forks.set_working_bank_id(slot);
|
|
|
|
bank_forks.working_bank()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn reset_state(
|
|
|
|
ticks_per_slot: u64,
|
|
|
|
slot: u64,
|
|
|
|
max_tick_height_for_slot: &mut u64,
|
|
|
|
current_blob_index: &mut u64,
|
|
|
|
) {
|
|
|
|
*current_blob_index = 0;
|
|
|
|
*max_tick_height_for_slot = (slot + 1) * ticks_per_slot - 1;
|
|
|
|
}
|
|
|
|
|
2019-02-07 20:52:39 -08:00
|
|
|
fn get_next_slot(blocktree: &Blocktree, slot_index: u64) -> Option<u64> {
|
2019-02-07 15:10:54 -08:00
|
|
|
// Find the next slot that chains to the old slot
|
2019-02-07 20:52:39 -08:00
|
|
|
let next_slots = blocktree.get_slots_since(&[slot_index]).expect("Db error");
|
2019-02-07 15:10:54 -08:00
|
|
|
next_slots.first().cloned()
|
|
|
|
}
|
2018-05-22 14:26:28 -07:00
|
|
|
}
|
2018-07-03 21:14:08 -07:00
|
|
|
|
2018-12-07 14:09:29 -08:00
|
|
|
impl Service for ReplayStage {
|
2019-01-26 00:28:08 -08:00
|
|
|
type JoinReturnType = ();
|
2018-10-10 16:49:41 -07:00
|
|
|
|
2019-01-26 00:28:08 -08:00
|
|
|
fn join(self) -> thread::Result<()> {
|
2018-12-07 14:09:29 -08:00
|
|
|
self.t_replay.join()
|
2018-10-10 16:49:41 -07:00
|
|
|
}
|
|
|
|
}
|
2018-09-13 14:00:17 -07:00
|
|
|
|
2018-10-10 16:49:41 -07:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
2019-01-29 00:21:27 -08:00
|
|
|
use super::*;
|
2019-02-26 16:35:00 -08:00
|
|
|
use crate::blocktree::create_new_tmp_ledger;
|
2019-02-07 20:52:39 -08:00
|
|
|
use crate::cluster_info::{ClusterInfo, Node};
|
2019-01-09 14:33:44 -08:00
|
|
|
use crate::entry::create_ticks;
|
2019-02-12 17:38:46 -08:00
|
|
|
use crate::entry::{next_entry_mut, Entry};
|
2019-02-26 21:16:18 -08:00
|
|
|
use crate::fullnode::make_active_set_entries;
|
2019-02-21 11:37:48 -08:00
|
|
|
use crate::fullnode::new_banks_from_blocktree;
|
2019-01-26 00:28:08 -08:00
|
|
|
use crate::replay_stage::ReplayStage;
|
2019-02-18 22:26:22 -08:00
|
|
|
use solana_sdk::genesis_block::GenesisBlock;
|
2018-11-16 08:04:46 -08:00
|
|
|
use solana_sdk::hash::Hash;
|
2018-12-03 10:26:28 -08:00
|
|
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
2018-10-17 13:42:54 -07:00
|
|
|
use std::fs::remove_dir_all;
|
2019-02-26 16:35:00 -08:00
|
|
|
use std::sync::atomic::AtomicBool;
|
2018-10-10 16:49:41 -07:00
|
|
|
use std::sync::mpsc::channel;
|
|
|
|
use std::sync::{Arc, RwLock};
|
|
|
|
|
2018-11-12 11:40:32 -08:00
|
|
|
#[test]
|
2018-12-07 14:09:29 -08:00
|
|
|
fn test_vote_error_replay_stage_correctness() {
|
|
|
|
// Set up dummy node to host a ReplayStage
|
2018-11-12 11:40:32 -08:00
|
|
|
let my_keypair = Keypair::new();
|
|
|
|
let my_id = my_keypair.pubkey();
|
|
|
|
let my_node = Node::new_localhost_with_pubkey(my_id);
|
|
|
|
|
|
|
|
// Create keypair for the leader
|
|
|
|
let leader_id = Keypair::new().pubkey();
|
|
|
|
|
2019-02-22 21:55:46 -08:00
|
|
|
let (genesis_block, _mint_keypair) = GenesisBlock::new_with_leader(10_000, leader_id, 500);
|
|
|
|
|
2019-02-26 19:19:34 -08:00
|
|
|
let (my_ledger_path, _last_id) = create_new_tmp_ledger!(&genesis_block).unwrap();
|
2018-11-12 11:40:32 -08:00
|
|
|
|
|
|
|
// Set up the cluster info
|
2018-11-19 11:25:14 -08:00
|
|
|
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
|
2018-11-12 11:40:32 -08:00
|
|
|
|
2018-12-07 14:09:29 -08:00
|
|
|
// Set up the replay stage
|
2018-11-12 11:40:32 -08:00
|
|
|
let exit = Arc::new(AtomicBool::new(false));
|
2019-02-21 21:43:35 -08:00
|
|
|
let voting_keypair = Arc::new(Keypair::new());
|
2019-02-21 11:37:48 -08:00
|
|
|
let (to_leader_sender, _to_leader_receiver) = channel();
|
2019-02-04 15:33:43 -08:00
|
|
|
{
|
2019-02-26 21:16:18 -08:00
|
|
|
let (bank_forks, bank_forks_info, blocktree, l_receiver) =
|
|
|
|
new_banks_from_blocktree(&my_ledger_path);
|
2019-02-21 11:37:48 -08:00
|
|
|
let bank = bank_forks.working_bank();
|
|
|
|
let last_entry_id = bank_forks_info[0].last_entry_id;
|
2019-02-06 19:21:31 -08:00
|
|
|
|
2019-02-07 20:52:39 -08:00
|
|
|
let blocktree = Arc::new(blocktree);
|
2019-02-04 15:33:43 -08:00
|
|
|
let (replay_stage, ledger_writer_recv) = ReplayStage::new(
|
|
|
|
my_keypair.pubkey(),
|
|
|
|
Some(voting_keypair.clone()),
|
2019-02-07 20:52:39 -08:00
|
|
|
blocktree.clone(),
|
2019-02-21 11:37:48 -08:00
|
|
|
&Arc::new(RwLock::new(bank_forks)),
|
|
|
|
&bank_forks_info,
|
2019-02-04 15:33:43 -08:00
|
|
|
cluster_info_me.clone(),
|
|
|
|
exit.clone(),
|
2019-02-10 19:34:18 -08:00
|
|
|
&to_leader_sender,
|
2019-02-04 15:33:43 -08:00
|
|
|
l_receiver,
|
2019-02-18 18:08:54 -08:00
|
|
|
&Arc::new(RpcSubscriptions::default()),
|
2019-02-04 15:33:43 -08:00
|
|
|
);
|
|
|
|
|
|
|
|
let keypair = voting_keypair.as_ref();
|
2019-02-21 01:43:57 -08:00
|
|
|
let vote = VoteTransaction::new_vote(keypair, 0, bank.last_id(), 0);
|
2019-02-04 15:33:43 -08:00
|
|
|
cluster_info_me.write().unwrap().push_vote(vote);
|
|
|
|
|
2019-02-10 09:38:09 -08:00
|
|
|
info!("Send ReplayStage an entry, should see it on the ledger writer receiver");
|
2019-02-04 15:33:43 -08:00
|
|
|
let next_tick = create_ticks(1, last_entry_id);
|
2019-02-26 16:35:00 -08:00
|
|
|
blocktree.write_entries(1, 0, 0, next_tick.clone()).unwrap();
|
2019-02-04 15:33:43 -08:00
|
|
|
|
|
|
|
let received_tick = ledger_writer_recv
|
|
|
|
.recv()
|
2019-02-10 09:38:09 -08:00
|
|
|
.expect("Expected to receive an entry on the ledger writer receiver");
|
2019-02-04 15:33:43 -08:00
|
|
|
|
2019-02-25 14:33:42 -08:00
|
|
|
assert_eq!(next_tick[0], received_tick[0].entry);
|
2019-02-04 15:33:43 -08:00
|
|
|
|
|
|
|
replay_stage
|
|
|
|
.close()
|
|
|
|
.expect("Expect successful ReplayStage exit");
|
|
|
|
}
|
2018-11-12 11:40:32 -08:00
|
|
|
let _ignored = remove_dir_all(&my_ledger_path);
|
|
|
|
}
|
|
|
|
|
2018-11-16 15:48:10 -08:00
|
|
|
#[test]
|
2018-12-07 14:09:29 -08:00
|
|
|
fn test_replay_stage_poh_error_entry_receiver() {
|
|
|
|
// Set up dummy node to host a ReplayStage
|
2018-11-16 15:48:10 -08:00
|
|
|
let my_keypair = Keypair::new();
|
|
|
|
let my_id = my_keypair.pubkey();
|
|
|
|
let my_node = Node::new_localhost_with_pubkey(my_id);
|
|
|
|
// Set up the cluster info
|
2018-11-19 11:25:14 -08:00
|
|
|
let cluster_info_me = Arc::new(RwLock::new(ClusterInfo::new(my_node.info.clone())));
|
2018-11-16 15:48:10 -08:00
|
|
|
let (ledger_entry_sender, _ledger_entry_receiver) = channel();
|
2019-02-26 17:29:38 -08:00
|
|
|
let mut last_entry_id = Hash::default();
|
2019-02-07 15:10:54 -08:00
|
|
|
let mut current_blob_index = 0;
|
2018-11-16 15:48:10 -08:00
|
|
|
let mut last_id = Hash::default();
|
|
|
|
let mut entries = Vec::new();
|
|
|
|
for _ in 0..5 {
|
2019-02-12 17:38:46 -08:00
|
|
|
let entry = next_entry_mut(&mut last_id, 1, vec![]); //just ticks
|
2018-11-16 15:48:10 -08:00
|
|
|
entries.push(entry);
|
|
|
|
}
|
|
|
|
|
2019-02-16 08:23:29 -08:00
|
|
|
let genesis_block = GenesisBlock::new(10_000).0;
|
2019-02-16 11:36:22 -08:00
|
|
|
let bank = Arc::new(Bank::new(&genesis_block));
|
2019-02-21 21:43:35 -08:00
|
|
|
let voting_keypair = Some(Arc::new(Keypair::new()));
|
2018-12-07 14:09:29 -08:00
|
|
|
let res = ReplayStage::process_entries(
|
2019-02-04 15:33:43 -08:00
|
|
|
entries.clone(),
|
2019-02-16 08:23:29 -08:00
|
|
|
&bank,
|
2018-11-16 15:48:10 -08:00
|
|
|
&cluster_info_me,
|
2019-02-21 21:43:35 -08:00
|
|
|
&voting_keypair,
|
2018-11-16 15:48:10 -08:00
|
|
|
&ledger_entry_sender,
|
2019-02-07 15:10:54 -08:00
|
|
|
&mut current_blob_index,
|
2019-02-26 17:29:38 -08:00
|
|
|
&mut last_entry_id,
|
2019-02-18 18:08:54 -08:00
|
|
|
&Arc::new(RpcSubscriptions::default()),
|
2019-02-25 14:33:42 -08:00
|
|
|
0,
|
|
|
|
None,
|
2018-11-16 15:48:10 -08:00
|
|
|
);
|
|
|
|
|
|
|
|
match res {
|
|
|
|
Ok(_) => (),
|
|
|
|
Err(e) => assert!(false, "Entries were not sent correctly {:?}", e),
|
|
|
|
}
|
|
|
|
|
|
|
|
entries.clear();
|
|
|
|
for _ in 0..5 {
|
2019-02-19 22:18:57 -08:00
|
|
|
let entry = Entry::new(&mut Hash::default(), 1, vec![]); //just broken entries
|
2018-11-16 15:48:10 -08:00
|
|
|
entries.push(entry);
|
|
|
|
}
|
|
|
|
|
2019-02-16 11:36:22 -08:00
|
|
|
let bank = Arc::new(Bank::new(&genesis_block));
|
2018-12-07 14:09:29 -08:00
|
|
|
let res = ReplayStage::process_entries(
|
2019-02-04 15:33:43 -08:00
|
|
|
entries.clone(),
|
2019-02-16 08:23:29 -08:00
|
|
|
&bank,
|
2018-11-16 15:48:10 -08:00
|
|
|
&cluster_info_me,
|
2019-02-21 21:43:35 -08:00
|
|
|
&voting_keypair,
|
2018-11-16 15:48:10 -08:00
|
|
|
&ledger_entry_sender,
|
2019-02-07 15:10:54 -08:00
|
|
|
&mut current_blob_index,
|
2019-02-26 17:29:38 -08:00
|
|
|
&mut last_entry_id,
|
2019-02-18 18:08:54 -08:00
|
|
|
&Arc::new(RpcSubscriptions::default()),
|
2019-02-25 14:33:42 -08:00
|
|
|
0,
|
|
|
|
None,
|
2018-11-16 15:48:10 -08:00
|
|
|
);
|
|
|
|
|
|
|
|
match res {
|
|
|
|
Ok(_) => assert!(false, "Should have failed because entries are broken"),
|
|
|
|
Err(Error::BlobError(BlobError::VerificationFailed)) => (),
|
|
|
|
Err(e) => assert!(
|
|
|
|
false,
|
|
|
|
"Should have failed because with blob error, instead, got {:?}",
|
|
|
|
e
|
|
|
|
),
|
|
|
|
}
|
|
|
|
}
|
2018-07-03 21:14:08 -07:00
|
|
|
}
|