1) Switch broken tests to generate an empty tick in their ledgers to use as last_id, 2) Fix bug where PoH generator in BankingStage did not referenced the last tick instead of the last entry on startup, causing ledger verification to fail on the new tick added by the PoH generator (#1479)
This commit is contained in:
parent
9dd4dc2088
commit
47f69f2d24
|
@ -97,7 +97,12 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
|
|||
let len = x.read().unwrap().packets.len();
|
||||
(x, iter::repeat(1).take(len).collect())
|
||||
}).collect();
|
||||
let (_stage, signal_receiver) = BankingStage::new(&bank, verified_receiver, Default::default());
|
||||
let (_stage, signal_receiver) = BankingStage::new(
|
||||
&bank,
|
||||
verified_receiver,
|
||||
Default::default(),
|
||||
&mint.last_id(),
|
||||
);
|
||||
bencher.iter(move || {
|
||||
for v in verified.chunks(verified.len() / NUM_THREADS) {
|
||||
verified_sender.send(v.to_vec()).unwrap();
|
||||
|
@ -182,7 +187,12 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
|
|||
let len = x.read().unwrap().packets.len();
|
||||
(x, iter::repeat(1).take(len).collect())
|
||||
}).collect();
|
||||
let (_stage, signal_receiver) = BankingStage::new(&bank, verified_receiver, Default::default());
|
||||
let (_stage, signal_receiver) = BankingStage::new(
|
||||
&bank,
|
||||
verified_receiver,
|
||||
Default::default(),
|
||||
&mint.last_id(),
|
||||
);
|
||||
bencher.iter(move || {
|
||||
for v in verified.chunks(verified.len() / NUM_THREADS) {
|
||||
verified_sender.send(v.to_vec()).unwrap();
|
||||
|
|
|
@ -6,6 +6,7 @@ use bank::{Bank, NUM_TICKS_PER_SECOND};
|
|||
use bincode::deserialize;
|
||||
use counter::Counter;
|
||||
use entry::Entry;
|
||||
use hash::Hash;
|
||||
use log::Level;
|
||||
use packet::Packets;
|
||||
use poh_recorder::PohRecorder;
|
||||
|
@ -53,10 +54,11 @@ impl BankingStage {
|
|||
bank: &Arc<Bank>,
|
||||
verified_receiver: Receiver<VerifiedPackets>,
|
||||
config: Config,
|
||||
last_entry_id: &Hash,
|
||||
) -> (Self, Receiver<Vec<Entry>>) {
|
||||
let (entry_sender, entry_receiver) = channel();
|
||||
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
|
||||
let poh = PohRecorder::new(bank.clone(), entry_sender);
|
||||
let poh = PohRecorder::new(bank.clone(), entry_sender, *last_entry_id);
|
||||
let tick_poh = poh.clone();
|
||||
// Tick producer is a headless producer, so when it exits it should notify the banking stage.
|
||||
// Since channel are not used to talk between these threads an AtomicBool is used as a
|
||||
|
@ -264,20 +266,28 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_banking_stage_shutdown1() {
|
||||
let bank = Bank::new(&Mint::new(2));
|
||||
let bank = Arc::new(Bank::new(&Mint::new(2)));
|
||||
let (verified_sender, verified_receiver) = channel();
|
||||
let (banking_stage, _entry_receiver) =
|
||||
BankingStage::new(&Arc::new(bank), verified_receiver, Default::default());
|
||||
let (banking_stage, _entry_receiver) = BankingStage::new(
|
||||
&bank,
|
||||
verified_receiver,
|
||||
Default::default(),
|
||||
&bank.last_id(),
|
||||
);
|
||||
drop(verified_sender);
|
||||
assert_eq!(banking_stage.join().unwrap(), ());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_banking_stage_shutdown2() {
|
||||
let bank = Bank::new(&Mint::new(2));
|
||||
let bank = Arc::new(Bank::new(&Mint::new(2)));
|
||||
let (_verified_sender, verified_receiver) = channel();
|
||||
let (banking_stage, entry_receiver) =
|
||||
BankingStage::new(&Arc::new(bank), verified_receiver, Default::default());
|
||||
let (banking_stage, entry_receiver) = BankingStage::new(
|
||||
&bank,
|
||||
verified_receiver,
|
||||
Default::default(),
|
||||
&bank.last_id(),
|
||||
);
|
||||
drop(entry_receiver);
|
||||
assert_eq!(banking_stage.join().unwrap(), ());
|
||||
}
|
||||
|
@ -291,6 +301,7 @@ mod tests {
|
|||
&bank,
|
||||
verified_receiver,
|
||||
Config::Sleep(Duration::from_millis(1)),
|
||||
&bank.last_id(),
|
||||
);
|
||||
sleep(Duration::from_millis(500));
|
||||
drop(verified_sender);
|
||||
|
@ -308,8 +319,12 @@ mod tests {
|
|||
let bank = Arc::new(Bank::new(&mint));
|
||||
let start_hash = bank.last_id();
|
||||
let (verified_sender, verified_receiver) = channel();
|
||||
let (banking_stage, entry_receiver) =
|
||||
BankingStage::new(&bank, verified_receiver, Default::default());
|
||||
let (banking_stage, entry_receiver) = BankingStage::new(
|
||||
&bank,
|
||||
verified_receiver,
|
||||
Default::default(),
|
||||
&bank.last_id(),
|
||||
);
|
||||
|
||||
// good tx
|
||||
let keypair = mint.keypair();
|
||||
|
@ -354,8 +369,12 @@ mod tests {
|
|||
let mint = Mint::new(2);
|
||||
let bank = Arc::new(Bank::new(&mint));
|
||||
let (verified_sender, verified_receiver) = channel();
|
||||
let (banking_stage, entry_receiver) =
|
||||
BankingStage::new(&bank, verified_receiver, Default::default());
|
||||
let (banking_stage, entry_receiver) = BankingStage::new(
|
||||
&bank,
|
||||
verified_receiver,
|
||||
Default::default(),
|
||||
&bank.last_id(),
|
||||
);
|
||||
|
||||
// Process a batch that includes a transaction that receives two tokens.
|
||||
let alice = Keypair::new();
|
||||
|
|
|
@ -5,6 +5,7 @@ use broadcast_stage::BroadcastStage;
|
|||
use cluster_info::{ClusterInfo, Node, NodeInfo};
|
||||
use drone::DRONE_PORT;
|
||||
use entry::Entry;
|
||||
use hash::Hash;
|
||||
use leader_scheduler::LeaderScheduler;
|
||||
use ledger::read_ledger;
|
||||
use ncp::Ncp;
|
||||
|
@ -275,6 +276,11 @@ impl Fullnode {
|
|||
exit.clone(),
|
||||
);
|
||||
|
||||
let last_entry_id = &ledger_tail
|
||||
.last()
|
||||
.expect("Expected at least one entry in the ledger")
|
||||
.id;
|
||||
|
||||
let window = window::new_window_from_entries(ledger_tail, entry_height, &node.info);
|
||||
let shared_window = Arc::new(RwLock::new(window));
|
||||
let cluster_info = Arc::new(RwLock::new(
|
||||
|
@ -347,6 +353,7 @@ impl Fullnode {
|
|||
ledger_path,
|
||||
sigverify_disabled,
|
||||
entry_height,
|
||||
last_entry_id,
|
||||
leader_scheduler.clone(),
|
||||
);
|
||||
|
||||
|
@ -455,7 +462,7 @@ impl Fullnode {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn validator_to_leader(&mut self, entry_height: u64) {
|
||||
fn validator_to_leader(&mut self, entry_height: u64, last_entry_id: Hash) {
|
||||
self.cluster_info
|
||||
.write()
|
||||
.unwrap()
|
||||
|
@ -472,6 +479,10 @@ impl Fullnode {
|
|||
&self.ledger_path,
|
||||
self.sigverify_disabled,
|
||||
entry_height,
|
||||
// We pass the last_entry_id from the replicate stage because we can't trust that
|
||||
// the window didn't overwrite the slot at for the last entry that the replicate stage
|
||||
// processed. We also want to avoid reading processing the ledger for the last id.
|
||||
&last_entry_id,
|
||||
self.leader_scheduler.clone(),
|
||||
);
|
||||
|
||||
|
@ -509,8 +520,8 @@ impl Fullnode {
|
|||
_ => Ok(None),
|
||||
},
|
||||
Some(NodeRole::Validator(validator_services)) => match validator_services.join()? {
|
||||
Some(TvuReturnType::LeaderRotation(entry_height)) => {
|
||||
self.validator_to_leader(entry_height);
|
||||
Some(TvuReturnType::LeaderRotation(entry_height, last_entry_id)) => {
|
||||
self.validator_to_leader(entry_height, last_entry_id);
|
||||
Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation))
|
||||
}
|
||||
_ => Ok(None),
|
||||
|
@ -568,7 +579,7 @@ impl Service for Fullnode {
|
|||
|
||||
match self.node_role {
|
||||
Some(NodeRole::Validator(validator_service)) => {
|
||||
if let Some(TvuReturnType::LeaderRotation(_)) = validator_service.join()? {
|
||||
if let Some(TvuReturnType::LeaderRotation(_, _)) = validator_service.join()? {
|
||||
return Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation));
|
||||
}
|
||||
}
|
||||
|
@ -590,7 +601,7 @@ mod tests {
|
|||
use cluster_info::Node;
|
||||
use fullnode::{Fullnode, NodeRole, TvuReturnType};
|
||||
use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
|
||||
use ledger::{genesis, LedgerWriter};
|
||||
use ledger::{create_sample_ledger, genesis, LedgerWriter};
|
||||
use packet::make_consecutive_blobs;
|
||||
use service::Service;
|
||||
use signature::{Keypair, KeypairUtil};
|
||||
|
@ -605,14 +616,17 @@ mod tests {
|
|||
fn validator_exit() {
|
||||
let keypair = Keypair::new();
|
||||
let tn = Node::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let (alice, validator_ledger_path) = genesis("validator_exit", 10_000);
|
||||
let bank = Bank::new(&alice);
|
||||
let (mint, validator_ledger_path) = genesis("validator_exit", 10_000);
|
||||
let bank = Bank::new(&mint);
|
||||
let entry = tn.info.clone();
|
||||
let genesis_entries = &mint.create_entries();
|
||||
let entry_height = genesis_entries.len() as u64;
|
||||
|
||||
let v = Fullnode::new_with_bank(
|
||||
keypair,
|
||||
bank,
|
||||
0,
|
||||
&[],
|
||||
entry_height,
|
||||
&genesis_entries,
|
||||
tn,
|
||||
Some(&entry),
|
||||
&validator_ledger_path,
|
||||
|
@ -631,16 +645,19 @@ mod tests {
|
|||
.map(|i| {
|
||||
let keypair = Keypair::new();
|
||||
let tn = Node::new_localhost_with_pubkey(keypair.pubkey());
|
||||
let (alice, validator_ledger_path) =
|
||||
let (mint, validator_ledger_path) =
|
||||
genesis(&format!("validator_parallel_exit_{}", i), 10_000);
|
||||
ledger_paths.push(validator_ledger_path.clone());
|
||||
let bank = Bank::new(&alice);
|
||||
let bank = Bank::new(&mint);
|
||||
let entry = tn.info.clone();
|
||||
|
||||
let genesis_entries = &mint.create_entries();
|
||||
let entry_height = genesis_entries.len() as u64;
|
||||
Fullnode::new_with_bank(
|
||||
keypair,
|
||||
bank,
|
||||
0,
|
||||
&[],
|
||||
entry_height,
|
||||
&genesis_entries,
|
||||
tn,
|
||||
Some(&entry),
|
||||
&validator_ledger_path,
|
||||
|
@ -664,7 +681,6 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_wrong_role_transition() {
|
||||
// Create the leader node information
|
||||
let bootstrap_leader_keypair = Keypair::new();
|
||||
|
@ -677,9 +693,9 @@ mod tests {
|
|||
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
|
||||
|
||||
// Make a common mint and a genesis entry for both leader + validator's ledgers
|
||||
let (mint, bootstrap_leader_ledger_path) = genesis("test_wrong_role_transition", 10_000);
|
||||
let (mint, bootstrap_leader_ledger_path, genesis_entries) =
|
||||
create_sample_ledger("test_wrong_role_transition", 10_000);
|
||||
|
||||
let genesis_entries = mint.create_entries();
|
||||
let last_id = genesis_entries
|
||||
.last()
|
||||
.expect("expected at least one genesis entry")
|
||||
|
@ -688,7 +704,8 @@ mod tests {
|
|||
// Write the entries to the ledger that will cause leader rotation
|
||||
// after the bootstrap height
|
||||
let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();
|
||||
let first_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id);
|
||||
let first_entries =
|
||||
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id);
|
||||
|
||||
let ledger_initial_len = (genesis_entries.len() + first_entries.len()) as u64;
|
||||
ledger_writer.write_entries(first_entries).unwrap();
|
||||
|
@ -746,7 +763,6 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_validator_to_leader_transition() {
|
||||
// Make a leader identity
|
||||
let leader_keypair = Keypair::new();
|
||||
|
@ -755,12 +771,12 @@ mod tests {
|
|||
let leader_ncp = leader_node.info.contact_info.ncp;
|
||||
|
||||
// Create validator identity
|
||||
let (mint, validator_ledger_path) = genesis("test_validator_to_leader_transition", 10_000);
|
||||
let (mint, validator_ledger_path, genesis_entries) =
|
||||
create_sample_ledger("test_validator_to_leader_transition", 10_000);
|
||||
let validator_keypair = Keypair::new();
|
||||
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
|
||||
let validator_info = validator_node.info.clone();
|
||||
|
||||
let genesis_entries = mint.create_entries();
|
||||
let mut last_id = genesis_entries
|
||||
.last()
|
||||
.expect("expected at least one genesis entry")
|
||||
|
@ -775,7 +791,7 @@ mod tests {
|
|||
// 2) A vote from the validator
|
||||
let mut ledger_writer = LedgerWriter::open(&validator_ledger_path, false).unwrap();
|
||||
let bootstrap_entries =
|
||||
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id);
|
||||
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id);
|
||||
let bootstrap_entries_len = bootstrap_entries.len();
|
||||
last_id = bootstrap_entries.last().unwrap().id;
|
||||
ledger_writer.write_entries(bootstrap_entries).unwrap();
|
||||
|
@ -845,10 +861,11 @@ mod tests {
|
|||
let join_result = validator_services
|
||||
.join()
|
||||
.expect("Expected successful validator join");
|
||||
assert_eq!(
|
||||
join_result,
|
||||
Some(TvuReturnType::LeaderRotation(bootstrap_height))
|
||||
);
|
||||
if let Some(TvuReturnType::LeaderRotation(result_bh, _)) = join_result {
|
||||
assert_eq!(result_bh, bootstrap_height);
|
||||
} else {
|
||||
panic!("Expected validator to have exited due to leader rotation");
|
||||
}
|
||||
}
|
||||
_ => panic!("Role should not be leader"),
|
||||
}
|
||||
|
|
|
@ -430,20 +430,22 @@ pub fn set_new_leader(bank: &Bank, leader_scheduler: &mut LeaderScheduler, vote_
|
|||
pub fn make_active_set_entries(
|
||||
active_keypair: &Keypair,
|
||||
token_source: &Keypair,
|
||||
last_id: &Hash,
|
||||
last_entry_id: &Hash,
|
||||
last_tick_id: &Hash,
|
||||
) -> Vec<Entry> {
|
||||
// 1) Create transfer token entry
|
||||
let transfer_tx = Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_id);
|
||||
let transfer_entry = Entry::new(last_id, 0, vec![transfer_tx]);
|
||||
let last_id = transfer_entry.id;
|
||||
let transfer_tx =
|
||||
Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_tick_id);
|
||||
let transfer_entry = Entry::new(last_entry_id, 0, vec![transfer_tx]);
|
||||
let last_entry_id = transfer_entry.id;
|
||||
|
||||
// 2) Create vote entry
|
||||
let vote = Vote {
|
||||
version: 0,
|
||||
contact_info_version: 0,
|
||||
};
|
||||
let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, last_id, 0);
|
||||
let vote_entry = Entry::new(&last_id, 0, vec![vote_tx]);
|
||||
let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, *last_tick_id, 0);
|
||||
let vote_entry = Entry::new(&last_entry_id, 0, vec![vote_tx]);
|
||||
|
||||
vec![transfer_entry, vote_entry]
|
||||
}
|
||||
|
|
|
@ -8,12 +8,10 @@ use budget_transaction::BudgetTransaction;
|
|||
use entry::Entry;
|
||||
use hash::Hash;
|
||||
use log::Level::Trace;
|
||||
#[cfg(test)]
|
||||
use mint::Mint;
|
||||
use packet::{SharedBlob, BLOB_DATA_SIZE};
|
||||
use rayon::prelude::*;
|
||||
use result::{Error, Result};
|
||||
#[cfg(test)]
|
||||
use signature::{Keypair, KeypairUtil};
|
||||
use solana_program_interface::pubkey::Pubkey;
|
||||
use std::fs::{create_dir_all, remove_dir_all, File, OpenOptions};
|
||||
|
@ -466,7 +464,8 @@ impl Block for [Entry] {
|
|||
let r = x1.verify(&x0.id);
|
||||
if !r {
|
||||
warn!(
|
||||
"entry invalid!: {:?} num txs: {}",
|
||||
"entry invalid!: x0: {:?}, x1: {:?} num txs: {}",
|
||||
x0.id,
|
||||
x1.id,
|
||||
x1.transactions.len()
|
||||
);
|
||||
|
@ -592,7 +591,6 @@ pub fn next_entries(
|
|||
next_entries_mut(&mut id, &mut num_hashes, transactions)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn tmp_ledger_path(name: &str) -> String {
|
||||
use std::env;
|
||||
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
|
||||
|
@ -611,6 +609,31 @@ pub fn genesis(name: &str, num: i64) -> (Mint, String) {
|
|||
(mint, path)
|
||||
}
|
||||
|
||||
fn create_ticks(num_ticks: usize, hash: &mut Hash) -> Vec<Entry> {
|
||||
let mut ticks = Vec::with_capacity(num_ticks);
|
||||
let mut num_hashes = 0;
|
||||
for _ in 0..num_ticks {
|
||||
ticks.push(Entry::new_mut(hash, &mut num_hashes, vec![]));
|
||||
}
|
||||
|
||||
ticks
|
||||
}
|
||||
|
||||
pub fn create_sample_ledger(name: &str, num: i64) -> (Mint, String, Vec<Entry>) {
|
||||
let mint = Mint::new(num);
|
||||
let path = tmp_ledger_path(name);
|
||||
|
||||
// Create the entries
|
||||
let mut genesis = mint.create_entries();
|
||||
let ticks = create_ticks(1, &mut mint.last_id());
|
||||
genesis.extend(ticks);
|
||||
|
||||
let mut writer = LedgerWriter::open(&path, true).unwrap();
|
||||
writer.write_entries(genesis.clone()).unwrap();
|
||||
|
||||
(mint, path, genesis)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
@ -21,8 +21,8 @@ impl PohRecorder {
|
|||
/// A recorder to synchronize PoH with the following data structures
|
||||
/// * bank - the LastId's queue is updated on `tick` and `record` events
|
||||
/// * sender - the Entry channel that outputs to the ledger
|
||||
pub fn new(bank: Arc<Bank>, sender: Sender<Vec<Entry>>) -> Self {
|
||||
let poh = Arc::new(Mutex::new(Poh::new(bank.last_id())));
|
||||
pub fn new(bank: Arc<Bank>, sender: Sender<Vec<Entry>>, last_entry_id: Hash) -> Self {
|
||||
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id)));
|
||||
PohRecorder { poh, bank, sender }
|
||||
}
|
||||
|
||||
|
@ -77,8 +77,9 @@ mod tests {
|
|||
fn test_poh() {
|
||||
let mint = Mint::new(1);
|
||||
let bank = Arc::new(Bank::new(&mint));
|
||||
let last_id = bank.last_id();
|
||||
let (entry_sender, entry_receiver) = channel();
|
||||
let poh_recorder = PohRecorder::new(bank, entry_sender);
|
||||
let poh_recorder = PohRecorder::new(bank, entry_sender, last_id);
|
||||
|
||||
//send some data
|
||||
let h1 = hash(b"hello world!");
|
||||
|
|
|
@ -4,6 +4,7 @@ use bank::Bank;
|
|||
use cluster_info::ClusterInfo;
|
||||
use counter::Counter;
|
||||
use entry::EntryReceiver;
|
||||
use hash::Hash;
|
||||
use leader_scheduler::LeaderScheduler;
|
||||
use ledger::{Block, LedgerWriter};
|
||||
use log::Level;
|
||||
|
@ -23,7 +24,7 @@ use vote_stage::send_validator_vote;
|
|||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub enum ReplicateStageReturnType {
|
||||
LeaderRotation(u64),
|
||||
LeaderRotation(u64, Hash),
|
||||
}
|
||||
|
||||
// Implement a destructor for the ReplicateStage thread to signal it exited
|
||||
|
@ -60,7 +61,7 @@ impl ReplicateStage {
|
|||
vote_blob_sender: Option<&BlobSender>,
|
||||
entry_height: &mut u64,
|
||||
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
|
||||
) -> Result<()> {
|
||||
) -> Result<Hash> {
|
||||
let timer = Duration::new(1, 0);
|
||||
//coalesce all the available entries into a single vote
|
||||
let mut entries = window_receiver.recv_timeout(timer)?;
|
||||
|
@ -69,7 +70,7 @@ impl ReplicateStage {
|
|||
}
|
||||
|
||||
let mut res = Ok(());
|
||||
{
|
||||
let last_entry_id = {
|
||||
let mut num_entries_to_write = entries.len();
|
||||
for (i, entry) in entries.iter().enumerate() {
|
||||
res = bank.process_entry(&entry);
|
||||
|
@ -111,7 +112,11 @@ impl ReplicateStage {
|
|||
|
||||
// If leader rotation happened, only write the entries up to leader rotation.
|
||||
entries.truncate(num_entries_to_write);
|
||||
}
|
||||
entries
|
||||
.last()
|
||||
.expect("Entries cannot be empty at this point")
|
||||
.id
|
||||
};
|
||||
|
||||
if let Some(sender) = vote_blob_sender {
|
||||
send_validator_vote(bank, keypair, cluster_info, sender)?;
|
||||
|
@ -135,7 +140,7 @@ impl ReplicateStage {
|
|||
|
||||
*entry_height += entries_len;
|
||||
res?;
|
||||
Ok(())
|
||||
Ok(last_entry_id)
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
|
@ -162,6 +167,7 @@ impl ReplicateStage {
|
|||
let now = Instant::now();
|
||||
let mut next_vote_secs = 1;
|
||||
let mut entry_height_ = entry_height;
|
||||
let mut last_entry_id = None;
|
||||
loop {
|
||||
let leader_id = leader_scheduler
|
||||
.read()
|
||||
|
@ -169,7 +175,14 @@ impl ReplicateStage {
|
|||
.get_scheduled_leader(entry_height_)
|
||||
.expect("Scheduled leader id should never be unknown at this point");
|
||||
if leader_id == keypair.pubkey() {
|
||||
return Some(ReplicateStageReturnType::LeaderRotation(entry_height_));
|
||||
return Some(ReplicateStageReturnType::LeaderRotation(
|
||||
entry_height_,
|
||||
// We should never start the TPU / this stage on an exact entry that causes leader
|
||||
// rotation (Fullnode should automatically transition on startup if it detects
|
||||
// are no longer a validator. Hence we can assume that some entry must have
|
||||
// triggered leader rotation
|
||||
last_entry_id.expect("Must exist an entry that triggered rotation"),
|
||||
));
|
||||
}
|
||||
|
||||
// Only vote once a second.
|
||||
|
@ -180,7 +193,7 @@ impl ReplicateStage {
|
|||
None
|
||||
};
|
||||
|
||||
if let Err(e) = Self::replicate_requests(
|
||||
match Self::replicate_requests(
|
||||
&bank,
|
||||
&cluster_info,
|
||||
&window_receiver,
|
||||
|
@ -190,10 +203,11 @@ impl ReplicateStage {
|
|||
&mut entry_height_,
|
||||
&leader_scheduler,
|
||||
) {
|
||||
match e {
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => break,
|
||||
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
|
||||
_ => error!("{:?}", e),
|
||||
Err(Error::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
|
||||
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
|
||||
Err(e) => error!("{:?}", e),
|
||||
Ok(last_entry_id_) => {
|
||||
last_entry_id = Some(last_entry_id_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -222,7 +236,7 @@ mod test {
|
|||
use cluster_info::{ClusterInfo, Node};
|
||||
use fullnode::Fullnode;
|
||||
use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
|
||||
use ledger::{genesis, next_entries_mut, LedgerWriter};
|
||||
use ledger::{create_sample_ledger, next_entries_mut, LedgerWriter};
|
||||
use logger;
|
||||
use replicate_stage::{ReplicateStage, ReplicateStageReturnType};
|
||||
use service::Service;
|
||||
|
@ -232,7 +246,6 @@ mod test {
|
|||
use std::sync::{Arc, RwLock};
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
pub fn test_replicate_stage_leader_rotation_exit() {
|
||||
logger::setup();
|
||||
|
||||
|
@ -243,8 +256,8 @@ mod test {
|
|||
let cluster_info_me = ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new");
|
||||
|
||||
// Create a ledger
|
||||
let (mint, my_ledger_path) = genesis("test_replicate_stage_leader_rotation_exit", 10_000);
|
||||
let genesis_entries = mint.create_entries();
|
||||
let (mint, my_ledger_path, genesis_entries) =
|
||||
create_sample_ledger("test_replicate_stage_leader_rotation_exit", 10_000);
|
||||
let mut last_id = genesis_entries
|
||||
.last()
|
||||
.expect("expected at least one genesis entry")
|
||||
|
@ -254,7 +267,8 @@ mod test {
|
|||
// 1) Give the validator a nonzero number of tokens 2) A vote from the validator .
|
||||
// This will cause leader rotation after the bootstrap height
|
||||
let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap();
|
||||
let bootstrap_entries = make_active_set_entries(&my_keypair, &mint.keypair(), &last_id);
|
||||
let bootstrap_entries =
|
||||
make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id);
|
||||
last_id = bootstrap_entries.last().unwrap().id;
|
||||
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64;
|
||||
ledger_writer.write_entries(bootstrap_entries).unwrap();
|
||||
|
@ -307,11 +321,15 @@ mod test {
|
|||
}
|
||||
|
||||
entries_to_send.truncate(total_entries_to_send);
|
||||
let last_id = entries_to_send[(bootstrap_height - 1) as usize].id;
|
||||
entry_sender.send(entries_to_send).unwrap();
|
||||
|
||||
// Wait for replicate_stage to exit and check return value is correct
|
||||
assert_eq!(
|
||||
Some(ReplicateStageReturnType::LeaderRotation(bootstrap_height)),
|
||||
Some(ReplicateStageReturnType::LeaderRotation(
|
||||
bootstrap_height,
|
||||
last_id
|
||||
)),
|
||||
replicate_stage.join().expect("replicate stage join")
|
||||
);
|
||||
|
||||
|
|
|
@ -575,11 +575,13 @@ mod tests {
|
|||
let leader_data = leader.info.clone();
|
||||
let ledger_path = tmp_ledger("client_check_signature", &alice);
|
||||
|
||||
let genesis_entries = &alice.create_entries();
|
||||
let entry_height = genesis_entries.len() as u64;
|
||||
let server = Fullnode::new_with_bank(
|
||||
leader_keypair,
|
||||
bank,
|
||||
0,
|
||||
&[],
|
||||
entry_height,
|
||||
&genesis_entries,
|
||||
leader,
|
||||
None,
|
||||
&ledger_path,
|
||||
|
@ -636,11 +638,13 @@ mod tests {
|
|||
let leader_data = leader.info.clone();
|
||||
let ledger_path = tmp_ledger("zero_balance_check", &alice);
|
||||
|
||||
let genesis_entries = &alice.create_entries();
|
||||
let entry_height = genesis_entries.len() as u64;
|
||||
let server = Fullnode::new_with_bank(
|
||||
leader_keypair,
|
||||
bank,
|
||||
0,
|
||||
&[],
|
||||
entry_height,
|
||||
&genesis_entries,
|
||||
leader,
|
||||
None,
|
||||
&ledger_path,
|
||||
|
|
|
@ -30,6 +30,7 @@ use banking_stage::{BankingStage, Config};
|
|||
use cluster_info::ClusterInfo;
|
||||
use entry::Entry;
|
||||
use fetch_stage::FetchStage;
|
||||
use hash::Hash;
|
||||
use leader_scheduler::LeaderScheduler;
|
||||
use service::Service;
|
||||
use signature::Keypair;
|
||||
|
@ -54,6 +55,7 @@ pub struct Tpu {
|
|||
}
|
||||
|
||||
impl Tpu {
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
|
||||
pub fn new(
|
||||
keypair: Arc<Keypair>,
|
||||
bank: &Arc<Bank>,
|
||||
|
@ -63,6 +65,7 @@ impl Tpu {
|
|||
ledger_path: &str,
|
||||
sigverify_disabled: bool,
|
||||
entry_height: u64,
|
||||
last_entry_id: &Hash,
|
||||
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
|
||||
) -> (Self, Receiver<Vec<Entry>>, Arc<AtomicBool>) {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
|
@ -73,7 +76,7 @@ impl Tpu {
|
|||
SigVerifyStage::new(packet_receiver, sigverify_disabled);
|
||||
|
||||
let (banking_stage, entry_receiver) =
|
||||
BankingStage::new(&bank, verified_receiver, tick_duration);
|
||||
BankingStage::new(&bank, verified_receiver, tick_duration, last_entry_id);
|
||||
|
||||
let (write_stage, entry_forwarder) = WriteStage::new(
|
||||
keypair,
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
use bank::Bank;
|
||||
use blob_fetch_stage::BlobFetchStage;
|
||||
use cluster_info::ClusterInfo;
|
||||
use hash::Hash;
|
||||
use leader_scheduler::LeaderScheduler;
|
||||
use replicate_stage::{ReplicateStage, ReplicateStageReturnType};
|
||||
use retransmit_stage::RetransmitStage;
|
||||
|
@ -52,7 +53,7 @@ use window::SharedWindow;
|
|||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub enum TvuReturnType {
|
||||
LeaderRotation(u64),
|
||||
LeaderRotation(u64, Hash),
|
||||
}
|
||||
|
||||
pub struct Tvu {
|
||||
|
@ -148,9 +149,9 @@ impl Service for Tvu {
|
|||
self.retransmit_stage.join()?;
|
||||
self.fetch_stage.join()?;
|
||||
match self.replicate_stage.join()? {
|
||||
Some(ReplicateStageReturnType::LeaderRotation(entry_height)) => {
|
||||
Ok(Some(TvuReturnType::LeaderRotation(entry_height)))
|
||||
}
|
||||
Some(ReplicateStageReturnType::LeaderRotation(entry_height, last_entry_id)) => Ok(
|
||||
Some(TvuReturnType::LeaderRotation(entry_height, last_entry_id)),
|
||||
),
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1017,11 +1017,13 @@ mod tests {
|
|||
|
||||
let rpc_port = 11111; // Needs to be distinct known number to not conflict with other tests
|
||||
|
||||
let genesis_entries = &alice.create_entries();
|
||||
let entry_height = genesis_entries.len() as u64;
|
||||
let server = Fullnode::new_with_bank(
|
||||
leader_keypair,
|
||||
bank,
|
||||
0,
|
||||
&[],
|
||||
entry_height,
|
||||
&genesis_entries,
|
||||
leader,
|
||||
None,
|
||||
&ledger_path,
|
||||
|
|
|
@ -11,7 +11,7 @@ use solana::entry::Entry;
|
|||
use solana::fullnode::{Fullnode, FullnodeReturnType};
|
||||
use solana::hash::Hash;
|
||||
use solana::leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
|
||||
use solana::ledger::{read_ledger, LedgerWriter};
|
||||
use solana::ledger::{create_sample_ledger, read_ledger, LedgerWriter};
|
||||
use solana::logger;
|
||||
use solana::mint::Mint;
|
||||
use solana::ncp::Ncp;
|
||||
|
@ -787,7 +787,6 @@ fn test_multi_node_dynamic_network() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_leader_to_validator_transition() {
|
||||
logger::setup();
|
||||
let leader_rotation_interval = 20;
|
||||
|
@ -805,7 +804,7 @@ fn test_leader_to_validator_transition() {
|
|||
// Initialize the leader ledger. Make a mint and a genesis entry
|
||||
// in the leader ledger
|
||||
let (mint, leader_ledger_path, genesis_entries) =
|
||||
genesis("test_leader_to_validator_transition", 10_000);
|
||||
create_sample_ledger("test_leader_to_validator_transition", 10_000);
|
||||
|
||||
let last_id = genesis_entries
|
||||
.last()
|
||||
|
@ -815,7 +814,8 @@ fn test_leader_to_validator_transition() {
|
|||
// Write the bootstrap entries to the ledger that will cause leader rotation
|
||||
// after the bootstrap height
|
||||
let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
|
||||
let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id);
|
||||
let bootstrap_entries =
|
||||
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id);
|
||||
let bootstrap_entries_len = bootstrap_entries.len();
|
||||
ledger_writer.write_entries(bootstrap_entries).unwrap();
|
||||
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64;
|
||||
|
@ -927,7 +927,6 @@ fn test_leader_to_validator_transition() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_leader_validator_basic() {
|
||||
logger::setup();
|
||||
let leader_rotation_interval = 10;
|
||||
|
@ -946,7 +945,7 @@ fn test_leader_validator_basic() {
|
|||
|
||||
// Make a common mint and a genesis entry for both leader + validator ledgers
|
||||
let (mint, leader_ledger_path, genesis_entries) =
|
||||
genesis("test_leader_validator_basic", 10_000);
|
||||
create_sample_ledger("test_leader_to_validator_transition", 10_000);
|
||||
|
||||
let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic");
|
||||
|
||||
|
@ -954,7 +953,6 @@ fn test_leader_validator_basic() {
|
|||
.last()
|
||||
.expect("expected at least one genesis entry")
|
||||
.id;
|
||||
let genesis_height = genesis_entries.len();
|
||||
|
||||
// Initialize both leader + validator ledger
|
||||
let mut ledger_paths = Vec::new();
|
||||
|
@ -964,7 +962,9 @@ fn test_leader_validator_basic() {
|
|||
// Write the bootstrap entries to the ledger that will cause leader rotation
|
||||
// after the bootstrap height
|
||||
let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
|
||||
let bootstrap_entries = make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id);
|
||||
let bootstrap_entries =
|
||||
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id);
|
||||
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64;
|
||||
ledger_writer.write_entries(bootstrap_entries).unwrap();
|
||||
|
||||
// Create the leader scheduler config
|
||||
|
@ -1004,12 +1004,20 @@ fn test_leader_validator_basic() {
|
|||
|
||||
// Send transactions to the leader
|
||||
let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1);
|
||||
let total_transactions_to_send = bootstrap_height + extra_transactions;
|
||||
|
||||
// Push "extra_transactions" past leader_rotation_interval entry height,
|
||||
// make sure the validator stops.
|
||||
for _ in genesis_height as u64..total_transactions_to_send {
|
||||
send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None);
|
||||
for i in ledger_initial_len..(bootstrap_height + extra_transactions) {
|
||||
let expected_balance = std::cmp::min(
|
||||
bootstrap_height - ledger_initial_len,
|
||||
i - ledger_initial_len + 1,
|
||||
);
|
||||
let result = send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None);
|
||||
// If the transaction wasn't reflected in the node, then we assume
|
||||
// the node has transitioned already
|
||||
if result != Some(expected_balance as i64) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for validator to shut down tvu and restart tpu
|
||||
|
@ -1092,7 +1100,7 @@ fn test_dropped_handoff_recovery() {
|
|||
|
||||
// Make a common mint and a genesis entry for both leader + validator's ledgers
|
||||
let (mint, bootstrap_leader_ledger_path, genesis_entries) =
|
||||
genesis("test_dropped_handoff_recovery", 10_000);
|
||||
create_sample_ledger("test_dropped_handoff_recovery", 10_000);
|
||||
|
||||
let last_id = genesis_entries
|
||||
.last()
|
||||
|
@ -1110,7 +1118,8 @@ fn test_dropped_handoff_recovery() {
|
|||
|
||||
// Make the entries to give the next_leader validator some stake so that he will be in
|
||||
// leader election active set
|
||||
let first_entries = make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id);
|
||||
let first_entries =
|
||||
make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id, &last_id);
|
||||
let first_entries_len = first_entries.len();
|
||||
|
||||
// Write the entries
|
||||
|
@ -1212,8 +1221,8 @@ fn test_dropped_handoff_recovery() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
//TODO: Ignore for now due to bug exposed by the test "test_dropped_handoff_recovery"
|
||||
#[ignore]
|
||||
//TODO: Ignore for now due to bug exposed by the test "test_dropped_handoff_recovery"
|
||||
fn test_full_leader_validator_network() {
|
||||
logger::setup();
|
||||
// The number of validators
|
||||
|
@ -1236,9 +1245,14 @@ fn test_full_leader_validator_network() {
|
|||
|
||||
// Make a common mint and a genesis entry for both leader + validator's ledgers
|
||||
let (mint, bootstrap_leader_ledger_path, genesis_entries) =
|
||||
genesis("test_full_leader_validator_network", 10_000);
|
||||
create_sample_ledger("test_full_leader_validator_network", 10_000);
|
||||
|
||||
let mut last_id = genesis_entries
|
||||
let last_tick_id = genesis_entries
|
||||
.last()
|
||||
.expect("expected at least one genesis entry")
|
||||
.id;
|
||||
|
||||
let mut last_entry_id = genesis_entries
|
||||
.last()
|
||||
.expect("expected at least one genesis entry")
|
||||
.id;
|
||||
|
@ -1254,11 +1268,12 @@ fn test_full_leader_validator_network() {
|
|||
for node_keypair in node_keypairs.iter() {
|
||||
// Make entries to give the validator some stake so that he will be in
|
||||
// leader election active set
|
||||
let bootstrap_entries = make_active_set_entries(node_keypair, &mint.keypair(), &last_id);
|
||||
let bootstrap_entries =
|
||||
make_active_set_entries(node_keypair, &mint.keypair(), &last_entry_id, &last_tick_id);
|
||||
|
||||
// Write the entries
|
||||
let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();
|
||||
last_id = bootstrap_entries
|
||||
last_entry_id = bootstrap_entries
|
||||
.last()
|
||||
.expect("expected at least one genesis entry")
|
||||
.id;
|
||||
|
|
Loading…
Reference in New Issue