Switch leader scheduler to use PoH ticks instead of Entry height (#1519)

* Add PoH height to process_ledger()

* Moved broadcast_stage Leader Scheduling logic to use Poh height instead of entry_height

* Moved LeaderScheduler logic to PoH in ReplicateStage

* Fix Leader scheduling tests to use PoH instead of entry height

* Change is_leader detection in repair() to use PoH instead of entry height

* Add tests to LeaderScheduler for new functionality

* fix Entry::new and genesis block PoH counts

* Moved LeaderScheduler to PoH ticks

* Cleanup to resolve PR comments
This commit is contained in:
carllin 2018-10-18 22:57:48 -07:00 committed by GitHub
parent 0339642e77
commit 0bd1412562
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1063 additions and 874 deletions

View File

@ -103,6 +103,8 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
verified_receiver,
Default::default(),
&mint.last_id(),
0,
None,
);
let mut id = mint.last_id();
@ -202,6 +204,8 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
verified_receiver,
Default::default(),
&mint.last_id(),
0,
None,
);
let mut id = mint.last_id();

View File

@ -871,40 +871,50 @@ impl Bank {
results
}
pub fn process_entry_votes(
bank: &Bank,
pub fn process_entry(
&self,
entry: &Entry,
entry_height: u64,
tick_height: &mut u64,
leader_scheduler: &mut LeaderScheduler,
) -> Result<()> {
if !entry.is_tick() {
for result in self.process_transactions(&entry.transactions) {
result?;
}
} else {
*tick_height += 1;
self.register_entry_id(&entry.id);
}
self.process_entry_votes(entry, *tick_height, leader_scheduler);
Ok(())
}
fn process_entry_votes(
&self,
entry: &Entry,
tick_height: u64,
leader_scheduler: &mut LeaderScheduler,
) {
for tx in &entry.transactions {
if tx.vote().is_some() {
// Update the active set in the leader scheduler
leader_scheduler.push_vote(*tx.from(), entry_height);
leader_scheduler.push_vote(*tx.from(), tick_height);
}
}
leader_scheduler.update_height(entry_height, bank);
}
pub fn process_entry(&self, entry: &Entry) -> Result<()> {
if !entry.transactions.is_empty() {
for result in self.process_transactions(&entry.transactions) {
result?;
}
} else {
self.register_entry_id(&entry.id);
}
Ok(())
leader_scheduler.update_height(tick_height, self);
}
/// Process an ordered list of entries, populating a circular buffer "tail"
/// as we go.
/// as we go.
fn process_entries_tail(
&self,
entries: &[Entry],
tail: &mut Vec<Entry>,
tail_idx: &mut usize,
tick_height: &mut u64,
leader_scheduler: &mut LeaderScheduler,
) -> Result<u64> {
let mut entry_count = 0;
@ -917,7 +927,12 @@ impl Bank {
*tail_idx = (*tail_idx + 1) % WINDOW_SIZE as usize;
entry_count += 1;
self.process_entry(entry)?;
// TODO: We prepare for implementing voting contract by making the associated
// process_entries functions aware of the vote-tracking structure inside
// the leader scheduler. Next we will extract the vote tracking structure
// out of the leader scheduler, and into the bank, and remove the leader
// scheduler from these banking functions.
self.process_entry(entry, tick_height, leader_scheduler)?;
}
Ok(entry_count)
@ -958,7 +973,7 @@ impl Bank {
// accumulator for entries that can be processed in parallel
let mut mt_group = vec![];
for entry in entries {
if entry.transactions.is_empty() {
if entry.is_tick() {
// if its a tick, execute the group and register the tick
self.par_execute_entries(&mt_group)?;
self.register_entry_id(&entry.id);
@ -992,37 +1007,37 @@ impl Bank {
tail: &mut Vec<Entry>,
tail_idx: &mut usize,
leader_scheduler: &mut LeaderScheduler,
) -> Result<u64>
) -> Result<(u64, u64)>
where
I: IntoIterator<Item = Entry>,
{
// Ledger verification needs to be parallelized, but we can't pull the whole
// thing into memory. We therefore chunk it.
let mut entry_count = *tail_idx as u64;
let mut entry_height = *tail_idx as u64;
let mut tick_height = 0;
for entry in &tail[0..*tail_idx] {
tick_height += entry.is_tick() as u64
}
let mut id = start_hash;
for block in &entries.into_iter().chunks(VERIFY_BLOCK_SIZE) {
let block: Vec<_> = block.collect();
if !block.verify(&id) {
warn!("Ledger proof of history failed at entry: {}", entry_count);
warn!("Ledger proof of history failed at entry: {}", entry_height);
return Err(BankError::LedgerVerificationFailed);
}
id = block.last().unwrap().id;
let tail_count = self.process_entries_tail(&block, tail, tail_idx)?;
let entry_count = self.process_entries_tail(
&block,
tail,
tail_idx,
&mut tick_height,
leader_scheduler,
)?;
if !leader_scheduler.use_only_bootstrap_leader {
for (i, entry) in block.iter().enumerate() {
Self::process_entry_votes(
self,
&entry,
entry_count + i as u64 + 1,
leader_scheduler,
);
}
}
entry_count += tail_count;
entry_height += entry_count;
}
Ok(entry_count)
Ok((tick_height, entry_height))
}
/// Process a full ledger.
@ -1030,7 +1045,7 @@ impl Bank {
&self,
entries: I,
leader_scheduler: &mut LeaderScheduler,
) -> Result<(u64, Vec<Entry>)>
) -> Result<(u64, u64, Vec<Entry>)>
where
I: IntoIterator<Item = Entry>,
{
@ -1072,7 +1087,7 @@ impl Bank {
tail.push(entry0);
tail.push(entry1);
let mut tail_idx = 2;
let entry_count = self.process_blocks(
let (tick_height, entry_height) = self.process_blocks(
entry1_id,
entries,
&mut tail,
@ -1085,7 +1100,7 @@ impl Bank {
tail.rotate_left(tail_idx)
}
Ok((entry_count, tail))
Ok((tick_height, entry_height, tail))
}
/// Create, sign, and process a Transaction from `keypair` to `to` of
@ -1618,7 +1633,7 @@ mod tests {
let mut last_id = mint.last_id();
let mut hash = mint.last_id();
let mut entries: Vec<Entry> = vec![];
let mut num_hashes = 0;
let num_hashes = 1;
for k in keypairs {
let txs = vec![Transaction::system_new(
&mint.keypair(),
@ -1629,7 +1644,8 @@ mod tests {
let mut e = ledger::next_entries(&hash, 0, txs);
entries.append(&mut e);
hash = entries.last().unwrap().id;
let tick = Entry::new_mut(&mut hash, &mut num_hashes, vec![]);
let tick = Entry::new(&hash, num_hashes, vec![]);
hash = tick.id;
last_id = hash;
entries.push(tick);
}
@ -1645,14 +1661,16 @@ mod tests {
let mut entries = Vec::with_capacity(length);
let mut hash = mint.last_id();
let mut last_id = mint.last_id();
let mut num_hashes = 0;
let num_hashes = 1;
for i in 0..length {
let keypair = Keypair::new();
let tx = Transaction::system_new(&mint.keypair(), keypair.pubkey(), 1, last_id);
let entry = Entry::new_mut(&mut hash, &mut num_hashes, vec![tx]);
let entry = Entry::new(&hash, num_hashes, vec![tx]);
hash = entry.id;
entries.push(entry);
if (i + 1) % ticks == 0 {
let tick = Entry::new_mut(&mut hash, &mut num_hashes, vec![]);
let tick = Entry::new(&hash, num_hashes, vec![]);
hash = tick.id;
last_id = hash;
entries.push(tick);
}
@ -1681,11 +1699,12 @@ mod tests {
let (ledger, pubkey) = create_sample_ledger(1);
let (ledger, dup) = ledger.tee();
let bank = Bank::default();
let (ledger_height, tail) = bank
let (tick_height, ledger_height, tail) = bank
.process_ledger(ledger, &mut LeaderScheduler::default())
.unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, 4);
assert_eq!(tick_height, 2);
assert_eq!(tail.len(), 4);
assert_eq!(tail, dup.collect_vec());
let last_entry = &tail[tail.len() - 1];
@ -1708,11 +1727,12 @@ mod tests {
for entry_count in window_size - 3..window_size + 2 {
let (ledger, pubkey) = create_sample_ledger(entry_count);
let bank = Bank::default();
let (ledger_height, tail) = bank
let (tick_height, ledger_height, tail) = bank
.process_ledger(ledger, &mut LeaderScheduler::default())
.unwrap();
assert_eq!(bank.get_balance(&pubkey), 1);
assert_eq!(ledger_height, entry_count as u64 + 3);
assert_eq!(tick_height, 2);
assert!(tail.len() <= window_size);
let last_entry = &tail[tail.len() - 1];
assert_eq!(bank.last_id(), last_entry.id);

View File

@ -9,7 +9,7 @@ use entry::Entry;
use hash::Hash;
use log::Level;
use packet::Packets;
use poh_recorder::PohRecorder;
use poh_recorder::{PohRecorder, PohRecorderError};
use rayon::prelude::*;
use result::{Error, Result};
use service::Service;
@ -25,13 +25,20 @@ use std::time::Instant;
use timing;
use transaction::Transaction;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BankingStageReturnType {
LeaderRotation,
ChannelDisconnected,
}
// number of threads is 1 until mt bank is ready
pub const NUM_THREADS: usize = 10;
/// Stores the stage's thread handle and output receiver.
pub struct BankingStage {
/// Handle to the stage's thread.
thread_hdls: Vec<JoinHandle<()>>,
bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>>,
tick_producer: JoinHandle<Option<BankingStageReturnType>>,
}
pub enum Config {
@ -55,10 +62,18 @@ impl BankingStage {
verified_receiver: Receiver<VerifiedPackets>,
config: Config,
last_entry_id: &Hash,
tick_height: u64,
max_tick_height: Option<u64>,
) -> (Self, Receiver<Vec<Entry>>) {
let (entry_sender, entry_receiver) = channel();
let shared_verified_receiver = Arc::new(Mutex::new(verified_receiver));
let poh = PohRecorder::new(bank.clone(), entry_sender, *last_entry_id);
let poh = PohRecorder::new(
bank.clone(),
entry_sender,
*last_entry_id,
tick_height,
max_tick_height,
);
let tick_poh = poh.clone();
// Tick producer is a headless producer, so when it exits it should notify the banking stage.
// Since channel are not used to talk between these threads an AtomicBool is used as a
@ -71,21 +86,25 @@ impl BankingStage {
let tick_producer = Builder::new()
.name("solana-banking-stage-tick_producer".to_string())
.spawn(move || {
if let Err(e) = Self::tick_producer(&tick_poh, &config, &poh_exit) {
match e {
Error::SendError => (),
_ => error!(
let mut tick_poh_ = tick_poh;
let return_value = match Self::tick_producer(&mut tick_poh_, &config, &poh_exit) {
Err(Error::SendError) => Some(BankingStageReturnType::ChannelDisconnected),
Err(e) => {
error!(
"solana-banking-stage-tick_producer unexpected error {:?}",
e
),
);
None
}
}
Ok(x) => x,
};
debug!("tick producer exiting");
poh_exit.store(true, Ordering::Relaxed);
return_value
}).unwrap();
// Many banks that process transactions in parallel.
let mut thread_hdls: Vec<JoinHandle<()>> = (0..NUM_THREADS)
let bank_thread_hdls: Vec<JoinHandle<Option<BankingStageReturnType>>> = (0..NUM_THREADS)
.map(|_| {
let thread_bank = bank.clone();
let thread_verified_receiver = shared_verified_receiver.clone();
@ -94,7 +113,7 @@ impl BankingStage {
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
loop {
let return_result = loop {
if let Err(e) = Self::process_packets(
&thread_bank,
&thread_verified_receiver,
@ -104,23 +123,37 @@ impl BankingStage {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
break
break Some(BankingStageReturnType::ChannelDisconnected);
}
Error::RecvError(_) => {
break Some(BankingStageReturnType::ChannelDisconnected);
}
Error::SendError => {
break Some(BankingStageReturnType::ChannelDisconnected);
}
Error::PohRecorderError(PohRecorderError::MaxHeightReached) => {
break Some(BankingStageReturnType::LeaderRotation);
}
Error::RecvError(_) => break,
Error::SendError => break,
_ => error!("solana-banking-stage-tx {:?}", e),
}
}
if thread_banking_exit.load(Ordering::Relaxed) {
debug!("tick service exited");
break;
break None;
}
}
};
thread_banking_exit.store(true, Ordering::Relaxed);
return_result
}).unwrap()
}).collect();
thread_hdls.push(tick_producer);
(BankingStage { thread_hdls }, entry_receiver)
(
BankingStage {
bank_thread_hdls,
tick_producer,
},
entry_receiver,
)
}
/// Convert the transactions from a blob of binary data to a vector of transactions and
@ -135,22 +168,43 @@ impl BankingStage {
}).collect()
}
fn tick_producer(poh: &PohRecorder, config: &Config, poh_exit: &AtomicBool) -> Result<()> {
fn tick_producer(
poh: &mut PohRecorder,
config: &Config,
poh_exit: &AtomicBool,
) -> Result<Option<BankingStageReturnType>> {
loop {
match *config {
Config::Tick(num) => {
for _ in 0..num {
poh.hash();
match poh.hash() {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached)) => {
return Ok(Some(BankingStageReturnType::LeaderRotation));
}
Err(e) => {
return Err(e);
}
_ => (),
}
}
}
Config::Sleep(duration) => {
sleep(duration);
}
}
poh.tick()?;
match poh.tick() {
Ok(height) if Some(height) == poh.max_tick_height => {
// CASE 1: We were successful in recording the last tick, so exit
return Ok(Some(BankingStageReturnType::LeaderRotation));
}
Ok(_) => (),
Err(e) => {
return Err(e);
}
};
if poh_exit.load(Ordering::Relaxed) {
debug!("tick service exited");
return Ok(());
return Ok(None);
}
}
}
@ -242,13 +296,24 @@ impl BankingStage {
}
impl Service for BankingStage {
type JoinReturnType = ();
type JoinReturnType = Option<BankingStageReturnType>;
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
fn join(self) -> thread::Result<Option<BankingStageReturnType>> {
let mut return_value = None;
for bank_thread_hdl in self.bank_thread_hdls {
let thread_return_value = bank_thread_hdl.join()?;
if thread_return_value.is_some() {
return_value = thread_return_value;
}
}
Ok(())
let tick_return_value = self.tick_producer.join()?;
if tick_return_value.is_some() {
return_value = tick_return_value;
}
Ok(return_value)
}
}
@ -256,6 +321,7 @@ impl Service for BankingStage {
mod tests {
use super::*;
use bank::Bank;
use banking_stage::BankingStageReturnType;
use ledger::Block;
use mint::Mint;
use packet::to_packets;
@ -273,9 +339,14 @@ mod tests {
verified_receiver,
Default::default(),
&bank.last_id(),
0,
None,
);
drop(verified_sender);
assert_eq!(banking_stage.join().unwrap(), ());
assert_eq!(
banking_stage.join().unwrap(),
Some(BankingStageReturnType::ChannelDisconnected)
);
}
#[test]
@ -287,9 +358,14 @@ mod tests {
verified_receiver,
Default::default(),
&bank.last_id(),
0,
None,
);
drop(entry_receiver);
assert_eq!(banking_stage.join().unwrap(), ());
assert_eq!(
banking_stage.join().unwrap(),
Some(BankingStageReturnType::ChannelDisconnected)
);
}
#[test]
@ -302,6 +378,8 @@ mod tests {
verified_receiver,
Config::Sleep(Duration::from_millis(1)),
&bank.last_id(),
0,
None,
);
sleep(Duration::from_millis(500));
drop(verified_sender);
@ -310,7 +388,10 @@ mod tests {
assert!(entries.len() != 0);
assert!(entries.verify(&start_hash));
assert_eq!(entries[entries.len() - 1].id, bank.last_id());
assert_eq!(banking_stage.join().unwrap(), ());
assert_eq!(
banking_stage.join().unwrap(),
Some(BankingStageReturnType::ChannelDisconnected)
);
}
#[test]
@ -324,6 +405,8 @@ mod tests {
verified_receiver,
Default::default(),
&bank.last_id(),
0,
None,
);
// good tx
@ -359,7 +442,10 @@ mod tests {
last_id = entries.last().unwrap().id;
});
drop(entry_receiver);
assert_eq!(banking_stage.join().unwrap(), ());
assert_eq!(
banking_stage.join().unwrap(),
Some(BankingStageReturnType::ChannelDisconnected)
);
}
#[test]
fn test_banking_stage_entryfication() {
@ -374,6 +460,8 @@ mod tests {
verified_receiver,
Default::default(),
&bank.last_id(),
0,
None,
);
// Process a batch that includes a transaction that receives two tokens.
@ -392,7 +480,10 @@ mod tests {
.send(vec![(packets[0].clone(), vec![1u8])])
.unwrap();
drop(verified_sender);
assert_eq!(banking_stage.join().unwrap(), ());
assert_eq!(
banking_stage.join().unwrap(),
Some(BankingStageReturnType::ChannelDisconnected)
);
// Collect the ledger and feed it to a new bank.
let entries: Vec<_> = entry_receiver.iter().flat_map(|x| x).collect();
@ -410,4 +501,25 @@ mod tests {
}
assert_eq!(bank.get_balance(&alice.pubkey()), 1);
}
// Test that when the max_tick_height is reached, the banking stage exits
// with reason BankingStageReturnType::LeaderRotation
#[test]
fn test_max_tick_height_shutdown() {
let bank = Arc::new(Bank::new(&Mint::new(2)));
let (_verified_sender_, verified_receiver) = channel();
let max_tick_height = 10;
let (banking_stage, _entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
Default::default(),
&bank.last_id(),
0,
Some(max_tick_height),
);
assert_eq!(
banking_stage.join().unwrap(),
Some(BankingStageReturnType::LeaderRotation)
);
}
}

View File

@ -142,7 +142,10 @@ fn main() {
}
last_id = entry.id;
if let Err(e) = bank.process_entry(&entry) {
let mut tick_height = 0;
let mut leader_scheduler = LeaderScheduler::default();
if let Err(e) = bank.process_entry(&entry, &mut tick_height, &mut leader_scheduler)
{
eprintln!("verify failed at entry[{}], err: {:?}", i + 2, e);
if !matches.is_present("continue") {
exit(1);

View File

@ -32,6 +32,7 @@ pub enum BroadcastStageReturnType {
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
fn broadcast(
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
mut tick_height: u64,
node_info: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
@ -49,6 +50,9 @@ fn broadcast(
ventries.push(entries);
while let Ok(entries) = receiver.try_recv() {
num_entries += entries.len();
tick_height += entries
.iter()
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
ventries.push(entries);
}
inc_new_counter_info!("broadcast_stage-entries_received", num_entries);
@ -134,6 +138,7 @@ fn broadcast(
// Send blobs out from the window
ClusterInfo::broadcast(
&leader_scheduler,
tick_height,
&node_info,
&broadcast_table,
&window,
@ -194,6 +199,7 @@ impl BroadcastStage {
entry_height: u64,
receiver: &Receiver<Vec<Entry>>,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
tick_height: u64,
) -> BroadcastStageReturnType {
let mut transmit_index = WindowIndex {
data: entry_height,
@ -205,6 +211,7 @@ impl BroadcastStage {
let broadcast_table = cluster_info.read().unwrap().compute_broadcast_table();
if let Err(e) = broadcast(
leader_scheduler,
tick_height,
&me,
&broadcast_table,
&window,
@ -250,6 +257,7 @@ impl BroadcastStage {
entry_height: u64,
receiver: Receiver<Vec<Entry>>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
tick_height: u64,
exit_sender: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
@ -263,6 +271,7 @@ impl BroadcastStage {
entry_height,
&receiver,
&leader_scheduler,
tick_height,
)
}).unwrap();

View File

@ -496,6 +496,7 @@ impl ClusterInfo {
#[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))]
pub fn broadcast(
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
tick_height: u64,
me: &NodeInfo,
broadcast_table: &[NodeInfo],
window: &SharedWindow,
@ -537,13 +538,13 @@ impl ClusterInfo {
br_idx
);
// Make sure the next leader in line knows about the last entry before rotation
// so he can initiate repairs if necessary
let entry_height = idx + 1;
// Make sure the next leader in line knows about the entries before his slot in the leader
// rotation so he can initiate repairs if necessary
{
let ls_lock = leader_scheduler.read().unwrap();
let next_leader_id = ls_lock.get_scheduled_leader(entry_height);
let next_leader_height = ls_lock.max_height_for_leader(tick_height);
let next_leader_id =
next_leader_height.map(|nlh| ls_lock.get_scheduled_leader(nlh));
// In the case the next scheduled leader is None, then the write_stage moved
// the schedule too far ahead and we no longer are in the known window
// (will happen during calculation of the next set of slots every epoch or
@ -555,10 +556,11 @@ impl ClusterInfo {
// scheduled leader, so the next leader will have to rely on avalanche/repairs
// to get this last blob, which could cause slowdowns during leader handoffs.
// See corresponding issue for repairs in repair() function in window.rs.
if next_leader_id.is_some() && next_leader_id != Some(me.id) {
let info_result = broadcast_table
.iter()
.position(|n| n.id == next_leader_id.unwrap());
if let Some(Some(next_leader_id)) = next_leader_id {
if next_leader_id == me.id {
break;
}
let info_result = broadcast_table.iter().position(|n| n.id == next_leader_id);
if let Some(index) = info_result {
orders.push((window_l[w_idx].data.clone(), &broadcast_table[index]));
}

View File

@ -327,6 +327,7 @@ mod tests {
leader_keypair,
bank,
0,
0,
&[],
leader,
None,

View File

@ -48,12 +48,33 @@ pub struct Entry {
impl Entry {
/// Creates the next Entry `num_hashes` after `start_hash`.
pub fn new(start_hash: &Hash, num_hashes: u64, transactions: Vec<Transaction>) -> Self {
let num_hashes = num_hashes + if transactions.is_empty() { 0 } else { 1 };
let id = next_hash(start_hash, 0, &transactions);
let entry = Entry {
num_hashes,
id,
transactions,
let entry = {
if num_hashes == 0 && transactions.is_empty() {
Entry {
num_hashes: 0,
id: *start_hash,
transactions,
}
} else if num_hashes == 0 {
// If you passed in transactions, but passed in num_hashes == 0, then
// next_hash will generate the next hash and set num_hashes == 1
let id = next_hash(start_hash, 1, &transactions);
Entry {
num_hashes: 1,
id,
transactions,
}
} else {
// Otherwise, the next Entry `num_hashes` after `start_hash`.
// If you wanted a tick for instance, then pass in num_hashes = 1
// and transactions = empty
let id = next_hash(start_hash, num_hashes, &transactions);
Entry {
num_hashes,
id,
transactions,
}
}
};
let size = serialized_size(&entry).unwrap();
@ -175,6 +196,10 @@ impl Entry {
}
true
}
pub fn is_tick(&self) -> bool {
self.transactions.is_empty()
}
}
/// Creates the hash `num_hashes` after `start_hash`. If the transaction contains
@ -186,7 +211,7 @@ fn next_hash(start_hash: &Hash, num_hashes: u64, transactions: &[Transaction]) -
return *start_hash;
}
let mut poh = Poh::new(*start_hash);
let mut poh = Poh::new(*start_hash, 0);
for _ in 1..num_hashes {
poh.hash();

View File

@ -77,6 +77,7 @@ impl ValidatorServices {
}
}
#[derive(Debug)]
pub enum FullnodeReturnType {
LeaderToValidatorRotation,
ValidatorToLeaderRotation,
@ -137,7 +138,7 @@ impl Fullnode {
mut leader_scheduler: LeaderScheduler,
) -> Self {
info!("creating bank...");
let (bank, entry_height, ledger_tail) =
let (bank, tick_height, entry_height, ledger_tail) =
Self::new_bank_from_ledger(ledger_path, &mut leader_scheduler);
info!("creating networking stack...");
@ -154,6 +155,7 @@ impl Fullnode {
let server = Self::new_with_bank(
keypair,
bank,
tick_height,
entry_height,
&ledger_tail,
node,
@ -236,6 +238,7 @@ impl Fullnode {
pub fn new_with_bank(
keypair: Keypair,
bank: Bank,
tick_height: u64,
entry_height: u64,
ledger_tail: &[Entry],
node: Node,
@ -308,7 +311,7 @@ impl Fullnode {
let scheduled_leader = leader_scheduler
.read()
.unwrap()
.get_scheduled_leader(entry_height)
.get_scheduled_leader(tick_height)
.expect("Leader not known after processing bank");
cluster_info.write().unwrap().set_leader(scheduled_leader);
@ -317,6 +320,7 @@ impl Fullnode {
let tvu = Tvu::new(
keypair.clone(),
&bank,
tick_height,
entry_height,
cluster_info.clone(),
shared_window.clone(),
@ -339,6 +343,10 @@ impl Fullnode {
let validator_state = ValidatorServices::new(tvu);
Some(NodeRole::Validator(validator_state))
} else {
let max_tick_height = {
let ls_lock = leader_scheduler.read().unwrap();
ls_lock.max_height_for_leader(tick_height)
};
// Start in leader mode.
let (tpu, entry_receiver, tpu_exit) = Tpu::new(
keypair.clone(),
@ -352,9 +360,9 @@ impl Fullnode {
.collect(),
ledger_path,
sigverify_disabled,
entry_height,
tick_height,
max_tick_height,
last_entry_id,
leader_scheduler.clone(),
);
let broadcast_stage = BroadcastStage::new(
@ -367,6 +375,7 @@ impl Fullnode {
entry_height,
entry_receiver,
leader_scheduler.clone(),
tick_height,
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
@ -398,14 +407,14 @@ impl Fullnode {
}
fn leader_to_validator(&mut self) -> Result<()> {
let (scheduled_leader, entry_height) = {
let (scheduled_leader, tick_height, entry_height, last_entry_id) = {
let mut ls_lock = self.leader_scheduler.write().unwrap();
// Clear the leader scheduler
ls_lock.reset();
// TODO: We can avoid building the bank again once RecordStage is
// integrated with BankingStage
let (bank, entry_height, _) =
let (bank, tick_height, entry_height, ledger_tail) =
Self::new_bank_from_ledger(&self.ledger_path, &mut *ls_lock);
self.bank = Arc::new(bank);
@ -414,7 +423,12 @@ impl Fullnode {
ls_lock
.get_scheduled_leader(entry_height)
.expect("Scheduled leader should exist after rebuilding bank"),
tick_height,
entry_height,
ledger_tail
.last()
.expect("Expected at least one entry in the ledger")
.id,
)
};
@ -439,9 +453,19 @@ impl Fullnode {
));
}
// In the rare case that the leader exited on a multiple of seed_rotation_interval
// when the new leader schedule was being generated, and there are no other validators
// in the active set, then the leader scheduler will pick the same leader again, so
// check for that
if scheduled_leader == self.keypair.pubkey() {
self.validator_to_leader(tick_height, entry_height, last_entry_id);
return Ok(());
}
let tvu = Tvu::new(
self.keypair.clone(),
&self.bank,
tick_height,
entry_height,
self.cluster_info.clone(),
self.shared_window.clone(),
@ -463,11 +487,17 @@ impl Fullnode {
Ok(())
}
fn validator_to_leader(&mut self, entry_height: u64, last_entry_id: Hash) {
fn validator_to_leader(&mut self, tick_height: u64, entry_height: u64, last_entry_id: Hash) {
self.cluster_info
.write()
.unwrap()
.set_leader(self.keypair.pubkey());
let max_tick_height = {
let ls_lock = self.leader_scheduler.read().unwrap();
ls_lock.max_height_for_leader(tick_height)
};
let (tpu, blob_receiver, tpu_exit) = Tpu::new(
self.keypair.clone(),
&self.bank,
@ -479,12 +509,12 @@ impl Fullnode {
.collect(),
&self.ledger_path,
self.sigverify_disabled,
entry_height,
tick_height,
max_tick_height,
// We pass the last_entry_id from the replicate stage because we can't trust that
// the window didn't overwrite the slot at for the last entry that the replicate stage
// processed. We also want to avoid reading processing the ledger for the last id.
&last_entry_id,
self.leader_scheduler.clone(),
);
let broadcast_stage = BroadcastStage::new(
@ -496,6 +526,7 @@ impl Fullnode {
entry_height,
blob_receiver,
self.leader_scheduler.clone(),
tick_height,
tpu_exit,
);
let leader_state = LeaderServices::new(tpu, broadcast_stage);
@ -521,8 +552,9 @@ impl Fullnode {
_ => Ok(None),
},
Some(NodeRole::Validator(validator_services)) => match validator_services.join()? {
Some(TvuReturnType::LeaderRotation(entry_height, last_entry_id)) => {
self.validator_to_leader(entry_height, last_entry_id);
Some(TvuReturnType::LeaderRotation(tick_height, entry_height, last_entry_id)) => {
//TODO: Fix this to return actual poh height.
self.validator_to_leader(tick_height, entry_height, last_entry_id);
Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation))
}
_ => Ok(None),
@ -552,19 +584,19 @@ impl Fullnode {
pub fn new_bank_from_ledger(
ledger_path: &str,
leader_scheduler: &mut LeaderScheduler,
) -> (Bank, u64, Vec<Entry>) {
) -> (Bank, u64, u64, Vec<Entry>) {
let bank = Bank::default();
let entries = read_ledger(ledger_path, true).expect("opening ledger");
let entries = entries
.map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err)));
info!("processing ledger...");
let (entry_height, ledger_tail) = bank
let (tick_height, entry_height, ledger_tail) = bank
.process_ledger(entries, leader_scheduler)
.expect("process_ledger");
// entry_height is the network-wide agreed height of the ledger.
// initialize it from the input ledger
info!("processed {} ledger...", entry_height);
(bank, entry_height, ledger_tail)
(bank, tick_height, entry_height, ledger_tail)
}
}
@ -581,7 +613,7 @@ impl Service for Fullnode {
match self.node_role {
Some(NodeRole::Validator(validator_service)) => {
if let Some(TvuReturnType::LeaderRotation(_, _)) = validator_service.join()? {
if let Some(TvuReturnType::LeaderRotation(_, _, _)) = validator_service.join()? {
return Ok(Some(FullnodeReturnType::ValidatorToLeaderRotation));
}
}
@ -601,7 +633,7 @@ impl Service for Fullnode {
mod tests {
use bank::Bank;
use cluster_info::Node;
use fullnode::{Fullnode, NodeRole, TvuReturnType};
use fullnode::{Fullnode, FullnodeReturnType, NodeRole, TvuReturnType};
use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use ledger::{create_tmp_genesis, create_tmp_sample_ledger, LedgerWriter};
use packet::make_consecutive_blobs;
@ -627,6 +659,7 @@ mod tests {
let v = Fullnode::new_with_bank(
keypair,
bank,
0,
entry_height,
&genesis_entries,
tn,
@ -658,6 +691,7 @@ mod tests {
Fullnode::new_with_bank(
keypair,
bank,
0,
entry_height,
&genesis_entries,
tn,
@ -682,6 +716,70 @@ mod tests {
}
}
#[test]
fn test_leader_to_leader_transition() {
// Create the leader node information
let bootstrap_leader_keypair = Keypair::new();
let bootstrap_leader_node =
Node::new_localhost_with_pubkey(bootstrap_leader_keypair.pubkey());
let bootstrap_leader_info = bootstrap_leader_node.info.clone();
// Make a mint and a genesis entries for leader ledger
let num_ending_ticks = 1;
let (_, bootstrap_leader_ledger_path, genesis_entries) =
create_tmp_sample_ledger("test_leader_to_leader_transition", 10_000, num_ending_ticks);
let initial_tick_height = genesis_entries
.iter()
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
// Create the common leader scheduling configuration
let num_slots_per_epoch = 3;
let leader_rotation_interval = 5;
let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval;
let active_window_length = 5;
// Set the bootstrap height to be bigger than the initial tick height.
// Once the leader hits the bootstrap height ticks, because there are no other
// choices in the active set, this leader will remain the leader in the next
// epoch. In the next epoch, check that the same leader knows to shut down and
// restart as a leader again.
let bootstrap_height = initial_tick_height + 1;
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_leader_info.id,
Some(bootstrap_height as u64),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(active_window_length),
);
// Start up the leader
let mut bootstrap_leader = Fullnode::new(
bootstrap_leader_node,
&bootstrap_leader_ledger_path,
bootstrap_leader_keypair,
Some(bootstrap_leader_info.contact_info.ncp),
false,
LeaderScheduler::new(&leader_scheduler_config),
);
// Wait for the leader to transition, ticks should cause the leader to
// reach the height for leader rotation
match bootstrap_leader.handle_role_transition().unwrap() {
Some(FullnodeReturnType::LeaderToValidatorRotation) => (),
_ => {
panic!("Expected a leader transition");
}
}
match bootstrap_leader.node_role {
Some(NodeRole::Leader(_)) => (),
_ => {
panic!("Expected bootstrap leader to be a leader");
}
}
}
#[test]
fn test_wrong_role_transition() {
// Create the leader node information
@ -695,8 +793,9 @@ mod tests {
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
// Make a common mint and a genesis entry for both leader + validator's ledgers
let num_ending_ticks = 1;
let (mint, bootstrap_leader_ledger_path, genesis_entries) =
create_tmp_sample_ledger("test_wrong_role_transition", 10_000);
create_tmp_sample_ledger("test_wrong_role_transition", 10_000, num_ending_ticks);
let last_id = genesis_entries
.last()
@ -706,27 +805,35 @@ mod tests {
// Write the entries to the ledger that will cause leader rotation
// after the bootstrap height
let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();
let first_entries =
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id);
let active_set_entries = make_active_set_entries(
&validator_keypair,
&mint.keypair(),
&last_id,
&last_id,
num_ending_ticks,
);
let ledger_initial_len = (genesis_entries.len() + first_entries.len()) as u64;
ledger_writer.write_entries(first_entries).unwrap();
let genesis_tick_height = genesis_entries
.iter()
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64)
+ num_ending_ticks as u64;
ledger_writer.write_entries(active_set_entries).unwrap();
// Create the common leader scheduling configuration
let num_slots_per_epoch = 3;
let leader_rotation_interval = 5;
let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval;
// Set the bootstrap height exactly the current ledger length, so that we can
// Set the bootstrap height exactly the current tick height, so that we can
// test if the bootstrap leader knows to immediately transition to a validator
// after parsing the ledger during startup
let bootstrap_height = ledger_initial_len;
let bootstrap_height = genesis_tick_height;
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_leader_info.id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(ledger_initial_len),
Some(genesis_tick_height),
);
// Test that a node knows to transition to a validator based on parsing the ledger
@ -774,8 +881,13 @@ mod tests {
let leader_ncp = leader_node.info.contact_info.ncp;
// Create validator identity
let (mint, validator_ledger_path, genesis_entries) =
create_tmp_sample_ledger("test_validator_to_leader_transition", 10_000);
let num_ending_ticks = 1;
let (mint, validator_ledger_path, genesis_entries) = create_tmp_sample_ledger(
"test_validator_to_leader_transition",
10_000,
num_ending_ticks,
);
let validator_keypair = Keypair::new();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
let validator_info = validator_node.info.clone();
@ -793,12 +905,16 @@ mod tests {
//
// 2) A vote from the validator
let mut ledger_writer = LedgerWriter::open(&validator_ledger_path, false).unwrap();
let bootstrap_entries =
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id);
let bootstrap_entries_len = bootstrap_entries.len();
last_id = bootstrap_entries.last().unwrap().id;
ledger_writer.write_entries(bootstrap_entries).unwrap();
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64;
let active_set_entries =
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0);
let initial_tick_height = genesis_entries
.iter()
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height;
let active_set_entries_len = active_set_entries.len() as u64;
last_id = active_set_entries.last().unwrap().id;
ledger_writer.write_entries(active_set_entries).unwrap();
let ledger_initial_len = genesis_entries.len() as u64 + active_set_entries_len;
// Set the leader scheduler for the validator
let leader_rotation_interval = 10;
@ -864,8 +980,8 @@ mod tests {
let join_result = validator_services
.join()
.expect("Expected successful validator join");
if let Some(TvuReturnType::LeaderRotation(result_bh, _)) = join_result {
assert_eq!(result_bh, bootstrap_height);
if let Some(TvuReturnType::LeaderRotation(tick_height, _, _)) = join_result {
assert_eq!(tick_height, bootstrap_height);
} else {
panic!("Expected validator to have exited due to leader rotation");
}
@ -873,14 +989,20 @@ mod tests {
_ => panic!("Role should not be leader"),
}
// Check the validator ledger to make sure it's the right height, we should've
// transitioned after the bootstrap_height entry
let (_, entry_height, _) = Fullnode::new_bank_from_ledger(
// Check the validator ledger for the correct entry + tick heights, we should've
// transitioned after tick_height = bootstrap_height.
let (_, tick_height, entry_height, _) = Fullnode::new_bank_from_ledger(
&validator_ledger_path,
&mut LeaderScheduler::new(&leader_scheduler_config),
);
assert_eq!(entry_height, bootstrap_height);
assert_eq!(tick_height, bootstrap_height);
assert_eq!(
entry_height,
// Only the first genesis entry has num_hashes = 0, every other entry
// had num_hashes = 1
bootstrap_height + active_set_entries_len + initial_non_tick_height,
);
// Shut down
t_responder.join().expect("responder thread join");

View File

@ -9,6 +9,7 @@ use budget_transaction::BudgetTransaction;
use byteorder::{LittleEndian, ReadBytesExt};
use entry::Entry;
use hash::{hash, Hash};
use ledger::create_ticks;
use signature::{Keypair, KeypairUtil};
#[cfg(test)]
use solana_program_interface::account::Account;
@ -222,7 +223,7 @@ impl LeaderScheduler {
(height - self.bootstrap_height) % self.leader_rotation_interval == 0
}
pub fn entries_until_next_leader_rotation(&self, height: u64) -> Option<u64> {
pub fn count_until_next_leader_rotation(&self, height: u64) -> Option<u64> {
if self.use_only_bootstrap_leader {
return None;
}
@ -237,6 +238,46 @@ impl LeaderScheduler {
}
}
// Let Leader X be the leader at the input tick height. This function returns the
// the PoH height at which Leader X's slot ends.
pub fn max_height_for_leader(&self, height: u64) -> Option<u64> {
if self.use_only_bootstrap_leader || self.get_scheduled_leader(height).is_none() {
return None;
}
let result = {
if height < self.bootstrap_height || self.leader_schedule.len() > 1 {
// Two cases to consider:
//
// 1) If height is less than the bootstrap height, then the current leader's
// slot ends when PoH height = bootstrap_height
//
// 2) Otherwise, if height >= bootstrap height, then we have generated a schedule.
// If this leader is not the only one in the schedule, then they will
// only be leader until the end of this slot (someone else is then guaranteed
// to take over)
//
// Both above cases are calculated by the function:
// count_until_next_leader_rotation() + height
self.count_until_next_leader_rotation(height).expect(
"Should return some value when not using default implementation
of LeaderScheduler",
) + height
} else {
// If the height is greater than bootstrap_height and this leader is
// the only leader in the schedule, then that leader will be in power
// for every slot until the next epoch, which is seed_rotation_interval
// PoH counts from the beginning of the last epoch.
self.last_seed_height.expect(
"If height >= bootstrap height, then we expect
a seed has been generated",
) + self.seed_rotation_interval
}
};
Some(result)
}
pub fn reset(&mut self) {
self.last_seed_height = None;
self.active_validators.reset();
@ -259,6 +300,12 @@ impl LeaderScheduler {
return;
}
if let Some(last_seed_height) = self.last_seed_height {
if height <= last_seed_height {
return;
}
}
if (height - self.bootstrap_height) % self.seed_rotation_interval == 0 {
self.generate_schedule(height, bank);
}
@ -303,6 +350,8 @@ impl LeaderScheduler {
// Called every seed_rotation_interval entries, generates the leader schedule
// for the range of entries: [height, height + seed_rotation_interval)
fn generate_schedule(&mut self, height: u64, bank: &Bank) {
assert!(height >= self.bootstrap_height);
assert!((height - self.bootstrap_height) % self.seed_rotation_interval == 0);
let seed = Self::calculate_seed(height);
self.seed = seed;
let active_set = self.get_active_set(height);
@ -330,12 +379,36 @@ impl LeaderScheduler {
// schedule
let ordered_account_stake = ranked_active_set.into_iter().map(|(_, stake)| stake);
let start_index = Self::choose_account(ordered_account_stake, self.seed, total_stake);
validator_rankings.rotate_left(start_index + 1);
validator_rankings.rotate_left(start_index);
// There are only seed_rotation_interval / self.leader_rotation_interval slots, so
// we only need to keep at most that many validators in the schedule
validator_rankings
.truncate((self.seed_rotation_interval / self.leader_rotation_interval) as usize);
let slots_per_epoch = self.seed_rotation_interval / self.leader_rotation_interval;
// If possible, try to avoid having the same leader twice in a row, but
// if there's only one leader to choose from, then we have no other choice
if validator_rankings.len() > 1 {
let old_epoch_last_leader = self
.get_scheduled_leader(height - 1)
.expect("Previous leader schedule should still exist");
let new_epoch_start_leader = validator_rankings[0];
if old_epoch_last_leader == new_epoch_start_leader {
if slots_per_epoch == 1 {
// If there is only one slot per epoch, and the same leader as the last slot
// of the previous epoch was chosen, then pick the next leader in the
// rankings instead
validator_rankings[0] = validator_rankings[1];
} else {
// If there is more than one leader in the schedule, truncate and set the most
// recent leader to the back of the line. This way that node will still remain
// in the rotation, just at a later slot.
validator_rankings.truncate(slots_per_epoch as usize);
validator_rankings.rotate_left(1);
}
}
}
self.leader_schedule = validator_rankings;
self.last_seed_height = Some(height);
}
@ -432,12 +505,13 @@ pub fn make_active_set_entries(
token_source: &Keypair,
last_entry_id: &Hash,
last_tick_id: &Hash,
num_ending_ticks: usize,
) -> Vec<Entry> {
// 1) Create transfer token entry
let transfer_tx =
Transaction::system_new(&token_source, active_keypair.pubkey(), 1, *last_tick_id);
let transfer_entry = Entry::new(last_entry_id, 0, vec![transfer_tx]);
let last_entry_id = transfer_entry.id;
let transfer_entry = Entry::new(last_entry_id, 1, vec![transfer_tx]);
let mut last_entry_id = transfer_entry.id;
// 2) Create vote entry
let vote = Vote {
@ -445,9 +519,14 @@ pub fn make_active_set_entries(
contact_info_version: 0,
};
let vote_tx = Transaction::budget_new_vote(&active_keypair, vote, *last_tick_id, 0);
let vote_entry = Entry::new(&last_entry_id, 0, vec![vote_tx]);
let vote_entry = Entry::new(&last_entry_id, 1, vec![vote_tx]);
last_entry_id = vote_entry.id;
vec![transfer_entry, vote_entry]
// 3) Create the ending empty ticks
let mut txs = vec![transfer_entry, vote_entry];
let empty_ticks = create_ticks(num_ending_ticks, last_entry_id);
txs.extend(empty_ticks);
txs
}
#[cfg(test)]
@ -851,7 +930,7 @@ mod tests {
// validators as part of the schedule each time (we need to check the active window
// is the cause of validators being truncated later)
let seed_rotation_interval = leader_rotation_interval * num_validators;
let active_window_length = 1;
let active_window_length = seed_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_leader_id,
@ -877,7 +956,7 @@ mod tests {
let new_pubkey = new_validator.pubkey();
validators.push(new_pubkey);
// Vote at height i * active_window_length for validator i
leader_scheduler.push_vote(new_pubkey, i * active_window_length);
leader_scheduler.push_vote(new_pubkey, i * active_window_length + bootstrap_height);
bank.transfer((i + 1) as i64, &mint.keypair(), new_pubkey, last_id)
.unwrap();
}
@ -886,7 +965,7 @@ mod tests {
// validators are falling out of the rotation as they fall out of the
// active set
for i in 0..=num_validators {
leader_scheduler.generate_schedule(i * active_window_length, &bank);
leader_scheduler.generate_schedule(i * active_window_length + bootstrap_height, &bank);
let result = &leader_scheduler.leader_schedule;
let expected = if i == num_validators {
bootstrap_leader_id
@ -1022,4 +1101,202 @@ mod tests {
let active_validators = ActiveValidators::new(Some(active_window_length));
assert_eq!(active_validators.active_window_length, active_window_length);
}
fn run_consecutive_leader_test(num_slots_per_epoch: u64, add_validator: bool) {
let bootstrap_leader_id = Keypair::new().pubkey();
let bootstrap_height = 500;
let leader_rotation_interval = 100;
let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval;
let active_window_length = bootstrap_height + seed_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_leader_id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(active_window_length),
);
let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config);
// Create mint and bank
let mint = Mint::new(10000);
let bank = Bank::new(&mint);
let last_id = mint
.create_entries()
.last()
.expect("Mint should not create empty genesis entries")
.id;
let initial_vote_height = 1;
// Create and add validator to the active set
let validator_id = Keypair::new().pubkey();
if add_validator {
leader_scheduler.push_vote(validator_id, initial_vote_height);
bank.transfer(1, &mint.keypair(), validator_id, last_id)
.unwrap();
}
// Make sure the bootstrap leader, not the validator, is picked again on next slot
// Depending on the seed, we make the leader stake either 2, or 3. Because the
// validator stake is always 1, then the rankings will always be
// [(validator, 1), (leader, leader_stake)]. Thus we just need to make sure that
// seed % (leader_stake + 1) > 0 to make sure that the leader is picked again.
let seed = LeaderScheduler::calculate_seed(bootstrap_height);
let leader_stake = {
if seed % 3 == 0 {
3
} else {
2
}
};
// Add leader to the active set
leader_scheduler.push_vote(bootstrap_leader_id, initial_vote_height);
bank.transfer(leader_stake, &mint.keypair(), bootstrap_leader_id, last_id)
.unwrap();
leader_scheduler.generate_schedule(bootstrap_height, &bank);
// Make sure the validator, not the leader is selected on the first slot of the
// next epoch
if add_validator {
assert!(leader_scheduler.leader_schedule[0] == validator_id);
} else {
assert!(leader_scheduler.leader_schedule[0] == bootstrap_leader_id);
}
}
#[test]
fn test_avoid_consecutive_leaders() {
// Test when there is both a leader + validator in the active set
run_consecutive_leader_test(1, true);
run_consecutive_leader_test(2, true);
run_consecutive_leader_test(10, true);
// Test when there is only one node in the active set
run_consecutive_leader_test(1, false);
run_consecutive_leader_test(2, false);
run_consecutive_leader_test(10, false);
}
#[test]
fn test_max_height_for_leader() {
let bootstrap_leader_id = Keypair::new().pubkey();
let bootstrap_height = 500;
let leader_rotation_interval = 100;
let seed_rotation_interval = 2 * leader_rotation_interval;
let active_window_length = bootstrap_height + seed_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_leader_id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(active_window_length),
);
let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config);
// Create mint and bank
let mint = Mint::new(10000);
let bank = Bank::new(&mint);
let last_id = mint
.create_entries()
.last()
.expect("Mint should not create empty genesis entries")
.id;
let initial_vote_height = 1;
// No schedule generated yet, so for all heights < bootstrap height, the
// max height will be bootstrap leader
assert_eq!(
leader_scheduler.max_height_for_leader(0),
Some(bootstrap_height)
);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height - 1),
Some(bootstrap_height)
);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height),
None
);
// Test when the active set == 1 node
// Generate schedule where the bootstrap leader will be the only
// choice because the active set is empty. Thus if the schedule
// was generated on PoH height bootstrap_height + n * seed_rotation_interval,
// then the same leader will be in power until PoH height
// bootstrap_height + (n + 1) * seed_rotation_interval
leader_scheduler.generate_schedule(bootstrap_height, &bank);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height),
Some(bootstrap_height + seed_rotation_interval)
);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height - 1),
None
);
leader_scheduler.generate_schedule(bootstrap_height + seed_rotation_interval, &bank);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval),
Some(bootstrap_height + 2 * seed_rotation_interval)
);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval - 1),
None
);
leader_scheduler.reset();
// Now test when the active set > 1 node
// Create and add validator to the active set
let validator_id = Keypair::new().pubkey();
leader_scheduler.push_vote(validator_id, initial_vote_height);
bank.transfer(1, &mint.keypair(), validator_id, last_id)
.unwrap();
// Add leader to the active set
leader_scheduler.push_vote(bootstrap_leader_id, initial_vote_height);
bank.transfer(1, &mint.keypair(), bootstrap_leader_id, last_id)
.unwrap();
// Generate the schedule
leader_scheduler.generate_schedule(bootstrap_height, &bank);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height),
Some(bootstrap_height + leader_rotation_interval)
);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height - 1),
None
);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height + leader_rotation_interval),
Some(bootstrap_height + 2 * leader_rotation_interval)
);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval),
None,
);
leader_scheduler.generate_schedule(bootstrap_height + seed_rotation_interval, &bank);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval),
Some(bootstrap_height + seed_rotation_interval + leader_rotation_interval)
);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height + seed_rotation_interval - 1),
None
);
assert_eq!(
leader_scheduler.max_height_for_leader(bootstrap_height + 2 * seed_rotation_interval),
None
);
}
}

View File

@ -628,13 +628,29 @@ pub fn create_tmp_genesis(name: &str, num: i64) -> (Mint, String) {
(mint, path)
}
pub fn create_tmp_sample_ledger(name: &str, num: i64) -> (Mint, String, Vec<Entry>) {
let mint = Mint::new(num);
pub fn create_ticks(num_ticks: usize, mut hash: Hash) -> Vec<Entry> {
let mut ticks = Vec::with_capacity(num_ticks as usize);
for _ in 0..num_ticks {
let new_tick = Entry::new(&hash, 1, vec![]);
hash = new_tick.id;
ticks.push(new_tick);
}
ticks
}
pub fn create_tmp_sample_ledger(
name: &str,
num_tokens: i64,
num_ending_ticks: usize,
) -> (Mint, String, Vec<Entry>) {
let mint = Mint::new(num_tokens);
let path = get_tmp_ledger_path(name);
// Create the entries
let mut genesis = mint.create_entries();
genesis.extend(vec![Entry::new(&mint.last_id(), 0, vec![])]);
let ticks = create_ticks(num_ending_ticks, mint.last_id());
genesis.extend(ticks);
let mut writer = LedgerWriter::open(&path, true).unwrap();
writer.write_entries(genesis.clone()).unwrap();
@ -1017,5 +1033,4 @@ mod tests {
let _ignored = remove_dir_all(&ledger_path);
}
}

View File

@ -60,7 +60,7 @@ impl Mint {
pub fn create_entries(&self) -> Vec<Entry> {
let e0 = Entry::new(&self.seed(), 0, vec![]);
let e1 = Entry::new(&e0.id, 0, self.create_transactions());
let e1 = Entry::new(&e0.id, 1, self.create_transactions());
vec![e0, e1]
}
}

View File

@ -3,9 +3,11 @@ use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use counter::Counter;
#[cfg(test)]
use entry::Entry;
#[cfg(test)]
use hash::Hash;
#[cfg(test)]
use ledger::{next_entries_mut, Block};
use ledger::Block;
use log::Level;
use recvmmsg::{recv_mmsg, NUM_RCVMMSGS};
use result::{Error, Result};
@ -411,10 +413,12 @@ pub fn make_consecutive_blobs(
addr: &SocketAddr,
) -> SharedBlobs {
let mut last_hash = start_hash;
let mut num_hashes = 0;
let num_hashes = 1;
let mut all_entries = Vec::with_capacity(num_blobs_to_make as usize);
for _ in 0..num_blobs_to_make {
all_entries.extend(next_entries_mut(&mut last_hash, &mut num_hashes, vec![]));
let entry = Entry::new(&last_hash, num_hashes, vec![]);
last_hash = entry.id;
all_entries.push(entry);
}
let mut new_blobs = all_entries.to_blobs_with_id(me_id, start_height, addr);
new_blobs.truncate(num_blobs_to_make as usize);

View File

@ -5,6 +5,7 @@ use hash::{hash, hashv, Hash};
pub struct Poh {
last_hash: Hash,
num_hashes: u64,
pub tick_height: u64,
}
#[derive(Debug)]
@ -15,10 +16,11 @@ pub struct PohEntry {
}
impl Poh {
pub fn new(last_hash: Hash) -> Self {
pub fn new(last_hash: Hash, tick_height: u64) -> Self {
Poh {
last_hash,
num_hashes: 0,
tick_height,
}
}
@ -32,7 +34,6 @@ impl Poh {
self.last_hash = hashv(&[&self.last_hash.as_ref(), &mixin.as_ref()]);
self.num_hashes = 0;
PohEntry {
num_hashes,
id: self.last_hash,
@ -47,6 +48,7 @@ impl Poh {
let num_hashes = self.num_hashes;
self.num_hashes = 0;
self.tick_height += 1;
PohEntry {
num_hashes,

View File

@ -5,53 +5,93 @@ use bank::Bank;
use entry::Entry;
use hash::Hash;
use poh::Poh;
use result::Result;
use result::{Error, Result};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use transaction::Transaction;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PohRecorderError {
MaxHeightReached,
}
#[derive(Clone)]
pub struct PohRecorder {
poh: Arc<Mutex<Poh>>,
bank: Arc<Bank>,
sender: Sender<Vec<Entry>>,
// TODO: whe extracting PoH generator into a separate standalone service,
// use this field for checking timeouts when running as a validator, and as
// a transmission guard when running as the leader.
pub max_tick_height: Option<u64>,
}
impl PohRecorder {
/// A recorder to synchronize PoH with the following data structures
/// * bank - the LastId's queue is updated on `tick` and `record` events
/// * sender - the Entry channel that outputs to the ledger
pub fn new(bank: Arc<Bank>, sender: Sender<Vec<Entry>>, last_entry_id: Hash) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id)));
PohRecorder { poh, bank, sender }
pub fn new(
bank: Arc<Bank>,
sender: Sender<Vec<Entry>>,
last_entry_id: Hash,
tick_height: u64,
max_tick_height: Option<u64>,
) -> Self {
let poh = Arc::new(Mutex::new(Poh::new(last_entry_id, tick_height)));
PohRecorder {
poh,
bank,
sender,
max_tick_height,
}
}
pub fn hash(&self) {
pub fn hash(&self) -> Result<()> {
// TODO: amortize the cost of this lock by doing the loop in here for
// some min amount of hashes
let mut poh = self.poh.lock().unwrap();
poh.hash()
if self.check_max_tick_height_reached(&*poh) {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
} else {
poh.hash();
Ok(())
}
}
pub fn tick(&self) -> Result<()> {
// Register and send the entry out while holding the lock.
pub fn tick(&mut self) -> Result<u64> {
// Register and send the entry out while holding the lock if the max PoH height
// hasn't been reached.
// This guarantees PoH order and Entry production and banks LastId queue is the same
let mut poh = self.poh.lock().unwrap();
let tick = poh.tick();
self.bank.register_entry_id(&tick.id);
let entry = Entry {
num_hashes: tick.num_hashes,
id: tick.id,
transactions: vec![],
};
self.sender.send(vec![entry])?;
Ok(())
if self.check_max_tick_height_reached(&*poh) {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
} else {
self.register_and_send_tick(&mut *poh)?;
Ok(poh.tick_height)
}
}
pub fn record(&self, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
// Register and send the entry out while holding the lock.
// This guarantees PoH order and Entry production and banks LastId queue is the same.
let mut poh = self.poh.lock().unwrap();
if self.check_max_tick_height_reached(&*poh) {
Err(Error::PohRecorderError(PohRecorderError::MaxHeightReached))
} else {
self.record_and_send_txs(&mut *poh, mixin, txs)?;
Ok(())
}
}
fn check_max_tick_height_reached(&self, poh: &Poh) -> bool {
if let Some(max_tick_height) = self.max_tick_height {
poh.tick_height >= max_tick_height
} else {
false
}
}
fn record_and_send_txs(&self, poh: &mut Poh, mixin: Hash, txs: Vec<Transaction>) -> Result<()> {
let tick = poh.record(mixin);
assert!(!txs.is_empty(), "Entries without transactions are used to track real-time passing in the ledger and can only be generated with PohRecorder::tick function");
let entry = Entry {
@ -62,6 +102,18 @@ impl PohRecorder {
self.sender.send(vec![entry])?;
Ok(())
}
fn register_and_send_tick(&self, poh: &mut Poh) -> Result<()> {
let tick = poh.tick();
self.bank.register_entry_id(&tick.id);
let entry = Entry {
num_hashes: tick.num_hashes,
id: tick.id,
transactions: vec![],
};
self.sender.send(vec![entry])?;
Ok(())
}
}
#[cfg(test)]
@ -79,7 +131,7 @@ mod tests {
let bank = Arc::new(Bank::new(&mint));
let last_id = bank.last_id();
let (entry_sender, entry_receiver) = channel();
let poh_recorder = PohRecorder::new(bank, entry_sender, last_id);
let mut poh_recorder = PohRecorder::new(bank, entry_sender, last_id, 0, None);
//send some data
let h1 = hash(b"hello world!");

View File

@ -27,7 +27,7 @@ use vote_stage::send_validator_vote;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ReplicateStageReturnType {
LeaderRotation(u64, Hash),
LeaderRotation(u64, u64, Hash),
}
// Implement a destructor for the ReplicateStage thread to signal it exited
@ -62,6 +62,7 @@ impl ReplicateStage {
ledger_writer: Option<&mut LedgerWriter>,
keypair: &Arc<Keypair>,
vote_blob_sender: Option<&BlobSender>,
tick_height: &mut u64,
entry_height: &mut u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<Hash> {
@ -85,24 +86,30 @@ impl ReplicateStage {
let last_entry_id = {
let mut num_entries_to_write = entries.len();
for (i, entry) in entries.iter().enumerate() {
res = bank.process_entry(&entry);
Bank::process_entry_votes(
&bank,
// max_tick_height is the PoH height at which the next leader rotation will
// happen. The leader should send an entry such that the total PoH is equal
// to max_tick_height - guard.
// TODO: Introduce a "guard" for the end of transmission periods, the guard
// is assumed to be zero for now.
let max_tick_height = {
let ls_lock = leader_scheduler.read().unwrap();
ls_lock.max_height_for_leader(*tick_height)
};
res = bank.process_entry(
&entry,
*entry_height + i as u64 + 1,
tick_height,
&mut *leader_scheduler.write().unwrap(),
);
{
// Will run only if leader_scheduler.use_only_bootstrap_leader is false
if let Some(max_tick_height) = max_tick_height {
let ls_lock = leader_scheduler.read().unwrap();
if ls_lock.is_leader_rotation_height(
// i is zero indexed, so current entry height for this entry is actually the
// old entry height + i + 1
*entry_height + i as u64 + 1,
) {
if *tick_height == max_tick_height {
let my_id = keypair.pubkey();
let scheduled_leader =
ls_lock.get_scheduled_leader(*entry_height + i as u64 + 1).expect("Scheduled leader id should never be unknown while processing entries");
let scheduled_leader = ls_lock.get_scheduled_leader(*tick_height).expect(
"Scheduled leader id should never be unknown while processing entries",
);
cluster_info.write().unwrap().set_leader(scheduled_leader);
if my_id == scheduled_leader {
num_entries_to_write = i + 1;
@ -162,6 +169,7 @@ impl ReplicateStage {
window_receiver: EntryReceiver,
ledger_path: Option<&str>,
exit: Arc<AtomicBool>,
tick_height: u64,
entry_height: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> Self {
@ -179,15 +187,17 @@ impl ReplicateStage {
let now = Instant::now();
let mut next_vote_secs = 1;
let mut entry_height_ = entry_height;
let mut tick_height_ = tick_height;
let mut last_entry_id = None;
loop {
let leader_id = leader_scheduler
.read()
.unwrap()
.get_scheduled_leader(entry_height_)
.get_scheduled_leader(tick_height_)
.expect("Scheduled leader id should never be unknown at this point");
if leader_id == keypair.pubkey() {
return Some(ReplicateStageReturnType::LeaderRotation(
tick_height_,
entry_height_,
// We should never start the TPU / this stage on an exact entry that causes leader
// rotation (Fullnode should automatically transition on startup if it detects
@ -212,6 +222,7 @@ impl ReplicateStage {
ledger_writer.as_mut(),
&keypair,
vote_sender,
&mut tick_height_,
&mut entry_height_,
&leader_scheduler,
) {
@ -246,9 +257,10 @@ impl Service for ReplicateStage {
#[cfg(test)]
mod test {
use cluster_info::{ClusterInfo, Node};
use entry::Entry;
use fullnode::Fullnode;
use leader_scheduler::{make_active_set_entries, LeaderScheduler, LeaderSchedulerConfig};
use ledger::{create_tmp_sample_ledger, next_entries_mut, LedgerWriter};
use ledger::{create_tmp_sample_ledger, LedgerWriter};
use logger;
use replicate_stage::{ReplicateStage, ReplicateStageReturnType};
use service::Service;
@ -269,8 +281,12 @@ mod test {
let cluster_info_me = ClusterInfo::new(my_node.info.clone()).expect("ClusterInfo::new");
// Create a ledger
let (mint, my_ledger_path, genesis_entries) =
create_tmp_sample_ledger("test_replicate_stage_leader_rotation_exit", 10_000);
let num_ending_ticks = 1;
let (mint, my_ledger_path, genesis_entries) = create_tmp_sample_ledger(
"test_replicate_stage_leader_rotation_exit",
10_000,
num_ending_ticks,
);
let mut last_id = genesis_entries
.last()
.expect("expected at least one genesis entry")
@ -280,11 +296,16 @@ mod test {
// 1) Give the validator a nonzero number of tokens 2) A vote from the validator .
// This will cause leader rotation after the bootstrap height
let mut ledger_writer = LedgerWriter::open(&my_ledger_path, false).unwrap();
let bootstrap_entries =
make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id);
last_id = bootstrap_entries.last().unwrap().id;
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64;
ledger_writer.write_entries(bootstrap_entries).unwrap();
let active_set_entries =
make_active_set_entries(&my_keypair, &mint.keypair(), &last_id, &last_id, 0);
last_id = active_set_entries.last().unwrap().id;
let initial_tick_height = genesis_entries
.iter()
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
let active_set_entries_len = active_set_entries.len() as u64;
let initial_non_tick_height = genesis_entries.len() as u64 - initial_tick_height;
let initial_entry_len = genesis_entries.len() as u64 + active_set_entries_len;
ledger_writer.write_entries(active_set_entries).unwrap();
// Set up the LeaderScheduler so that this this node becomes the leader at
// bootstrap_height = num_bootstrap_slots * leader_rotation_interval
@ -303,7 +324,8 @@ mod test {
let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config);
// Set up the bank
let (bank, _, _) = Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler);
let (bank, _, _, _) =
Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler);
let leader_scheduler = Arc::new(RwLock::new(leader_scheduler));
@ -317,47 +339,57 @@ mod test {
entry_receiver,
Some(&my_ledger_path),
exit.clone(),
ledger_initial_len,
initial_tick_height,
initial_entry_len,
leader_scheduler.clone(),
);
// Send enough entries to trigger leader rotation
// Send enough ticks to trigger leader rotation
let extra_entries = leader_rotation_interval;
let total_entries_to_send = (bootstrap_height + extra_entries) as usize;
let mut num_hashes = 0;
let num_hashes = 1;
let mut entries_to_send = vec![];
while entries_to_send.len() < total_entries_to_send {
let entries = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
last_id = entries.last().expect("expected at least one entry").id;
entries_to_send.extend(entries);
let entry = Entry::new(&mut last_id, num_hashes, vec![]);
last_id = entry.id;
entries_to_send.push(entry);
}
entries_to_send.truncate(total_entries_to_send);
let last_id = entries_to_send[(bootstrap_height - 1) as usize].id;
assert!((num_ending_ticks as u64) < bootstrap_height);
// Add on the only entries that weren't ticks to the bootstrap height to get the
// total expected entry length
let expected_entry_height =
bootstrap_height + initial_non_tick_height + active_set_entries_len;
let expected_last_id =
entries_to_send[(bootstrap_height - initial_tick_height - 1) as usize].id;
entry_sender.send(entries_to_send).unwrap();
// Wait for replicate_stage to exit and check return value is correct
assert_eq!(
Some(ReplicateStageReturnType::LeaderRotation(
bootstrap_height,
last_id
expected_entry_height,
expected_last_id,
)),
replicate_stage.join().expect("replicate stage join")
);
assert_eq!(exit.load(Ordering::Relaxed), true);
//Check ledger height is correct
// Check ledger height is correct
let mut leader_scheduler = Arc::try_unwrap(leader_scheduler)
.expect("Multiple references to this RwLock still exist")
.into_inner()
.expect("RwLock for LeaderScheduler is still locked");
let (_, entry_height, _) =
leader_scheduler.reset();
let (_, tick_height, entry_height, _) =
Fullnode::new_bank_from_ledger(&my_ledger_path, &mut leader_scheduler);
assert_eq!(entry_height, bootstrap_height);
assert_eq!(tick_height, bootstrap_height);
assert_eq!(entry_height, expected_entry_height);
let _ignored = remove_dir_all(&my_ledger_path);
}
}

View File

@ -108,6 +108,7 @@ impl Replicator {
let t_window = window_service(
cluster_info.clone(),
shared_window.clone(),
0,
entry_height,
max_entry_height,
blob_fetch_receiver,

View File

@ -6,6 +6,7 @@ use cluster_info;
#[cfg(feature = "erasure")]
use erasure;
use packet;
use poh_recorder;
use serde_json;
use std;
use std::any::Any;
@ -25,6 +26,7 @@ pub enum Error {
#[cfg(feature = "erasure")]
ErasureError(erasure::ErasureError),
SendError,
PohRecorderError(poh_recorder::PohRecorderError),
}
pub type Result<T> = std::result::Result<T, Error>;
@ -73,7 +75,6 @@ impl std::convert::From<Box<Any + Send + 'static>> for Error {
Error::JoinError(e)
}
}
impl std::convert::From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Error {
Error::IO(e)
@ -94,6 +95,11 @@ impl std::convert::From<std::boxed::Box<bincode::ErrorKind>> for Error {
Error::Serialize(e)
}
}
impl std::convert::From<poh_recorder::PohRecorderError> for Error {
fn from(e: poh_recorder::PohRecorderError) -> Error {
Error::PohRecorderError(e)
}
}
#[cfg(test)]
mod tests {

View File

@ -87,6 +87,7 @@ impl RetransmitStage {
pub fn new(
cluster_info: &Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
tick_height: u64,
entry_height: u64,
retransmit_socket: Arc<UdpSocket>,
repair_socket: Arc<UdpSocket>,
@ -102,6 +103,7 @@ impl RetransmitStage {
let t_window = window_service(
cluster_info.clone(),
window,
tick_height,
entry_height,
0,
fetch_stage_receiver,

View File

@ -559,6 +559,7 @@ mod tests {
let server = Fullnode::new_with_bank(
leader_keypair,
bank,
0,
entry_height,
&genesis_entries,
leader,

View File

@ -459,6 +459,7 @@ mod tests {
leader_keypair,
bank,
0,
0,
&[],
leader,
None,
@ -506,6 +507,7 @@ mod tests {
leader_keypair,
bank,
0,
0,
&[],
leader,
None,
@ -567,6 +569,7 @@ mod tests {
let server = Fullnode::new_with_bank(
leader_keypair,
bank,
0,
entry_height,
&genesis_entries,
leader,
@ -630,6 +633,7 @@ mod tests {
let server = Fullnode::new_with_bank(
leader_keypair,
bank,
0,
entry_height,
&genesis_entries,
leader,

View File

@ -26,12 +26,11 @@
//! ```
use bank::Bank;
use banking_stage::{BankingStage, Config};
use banking_stage::{BankingStage, BankingStageReturnType, Config};
use cluster_info::ClusterInfo;
use entry::Entry;
use fetch_stage::FetchStage;
use hash::Hash;
use leader_scheduler::LeaderScheduler;
use service::Service;
use signature::Keypair;
use sigverify_stage::SigVerifyStage;
@ -40,7 +39,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver;
use std::sync::{Arc, RwLock};
use std::thread;
use write_stage::{WriteStage, WriteStageReturnType};
use write_stage::WriteStage;
pub enum TpuReturnType {
LeaderRotation,
@ -64,9 +63,9 @@ impl Tpu {
transactions_sockets: Vec<UdpSocket>,
ledger_path: &str,
sigverify_disabled: bool,
entry_height: u64,
tick_height: u64,
max_tick_height: Option<u64>,
last_entry_id: &Hash,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> (Self, Receiver<Vec<Entry>>, Arc<AtomicBool>) {
let exit = Arc::new(AtomicBool::new(false));
@ -75,8 +74,14 @@ impl Tpu {
let (sigverify_stage, verified_receiver) =
SigVerifyStage::new(packet_receiver, sigverify_disabled);
let (banking_stage, entry_receiver) =
BankingStage::new(&bank, verified_receiver, tick_duration, last_entry_id);
let (banking_stage, entry_receiver) = BankingStage::new(
&bank,
verified_receiver,
tick_duration,
last_entry_id,
tick_height,
max_tick_height,
);
let (write_stage, entry_forwarder) = WriteStage::new(
keypair,
@ -84,8 +89,6 @@ impl Tpu {
cluster_info.clone(),
ledger_path,
entry_receiver,
entry_height,
leader_scheduler,
);
let tpu = Tpu {
@ -118,9 +121,9 @@ impl Service for Tpu {
fn join(self) -> thread::Result<(Option<TpuReturnType>)> {
self.fetch_stage.join()?;
self.sigverify_stage.join()?;
self.banking_stage.join()?;
match self.write_stage.join()? {
WriteStageReturnType::LeaderRotation => Ok(Some(TpuReturnType::LeaderRotation)),
self.write_stage.join()?;
match self.banking_stage.join()? {
Some(BankingStageReturnType::LeaderRotation) => Ok(Some(TpuReturnType::LeaderRotation)),
_ => Ok(None),
}
}

View File

@ -53,7 +53,7 @@ use window::SharedWindow;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum TvuReturnType {
LeaderRotation(u64, Hash),
LeaderRotation(u64, u64, Hash),
}
pub struct Tvu {
@ -79,6 +79,7 @@ impl Tvu {
pub fn new(
keypair: Arc<Keypair>,
bank: &Arc<Bank>,
tick_height: u64,
entry_height: u64,
cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
@ -102,6 +103,7 @@ impl Tvu {
let (retransmit_stage, blob_window_receiver) = RetransmitStage::new(
&cluster_info,
window,
tick_height,
entry_height,
Arc::new(retransmit_socket),
repair_socket,
@ -116,6 +118,7 @@ impl Tvu {
blob_window_receiver,
ledger_path,
exit.clone(),
tick_height,
entry_height,
leader_scheduler,
);
@ -149,9 +152,15 @@ impl Service for Tvu {
self.retransmit_stage.join()?;
self.fetch_stage.join()?;
match self.replicate_stage.join()? {
Some(ReplicateStageReturnType::LeaderRotation(entry_height, last_entry_id)) => Ok(
Some(TvuReturnType::LeaderRotation(entry_height, last_entry_id)),
),
Some(ReplicateStageReturnType::LeaderRotation(
tick_height,
entry_height,
last_entry_id,
)) => Ok(Some(TvuReturnType::LeaderRotation(
tick_height,
entry_height,
last_entry_id,
))),
_ => Ok(None),
}
}
@ -254,6 +263,7 @@ pub mod tests {
Arc::new(target1_keypair),
&bank,
0,
0,
cref1,
dr_1.1,
target1.sockets.replicate,

View File

@ -932,6 +932,7 @@ mod tests {
leader_keypair,
bank,
0,
0,
&[],
leader,
None,
@ -1007,6 +1008,7 @@ mod tests {
let server = Fullnode::new_with_bank(
leader_keypair,
bank,
0,
entry_height,
&genesis_entries,
leader,
@ -1085,6 +1087,7 @@ mod tests {
leader_keypair,
bank,
0,
0,
&[],
leader,
None,
@ -1204,6 +1207,7 @@ mod tests {
leader_keypair,
bank,
0,
0,
&[],
leader,
None,
@ -1321,6 +1325,7 @@ mod tests {
leader_keypair,
bank,
0,
0,
&[],
leader,
None,

View File

@ -60,6 +60,7 @@ pub trait WindowUtil {
times: usize,
consumed: u64,
received: u64,
tick_height: u64,
max_entry_height: u64,
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
) -> Vec<(SocketAddr, Vec<u8>)>;
@ -74,6 +75,7 @@ pub trait WindowUtil {
pix: u64,
consume_queue: &mut Vec<Entry>,
consumed: &mut u64,
tick_height: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
);
@ -101,6 +103,7 @@ impl WindowUtil for Window {
times: usize,
consumed: u64,
received: u64,
tick_height: u64,
max_entry_height: u64,
leader_scheduler_option: &Arc<RwLock<LeaderScheduler>>,
) -> Vec<(SocketAddr, Vec<u8>)> {
@ -110,28 +113,30 @@ impl WindowUtil for Window {
let ls_lock = leader_scheduler_option.read().unwrap();
if !ls_lock.use_only_bootstrap_leader {
// Calculate the next leader rotation height and check if we are the leader
let next_leader_rotation_height = consumed + ls_lock.entries_until_next_leader_rotation(consumed).expect("Leader rotation should exist when not using default implementation of LeaderScheduler");
match ls_lock.get_scheduled_leader(next_leader_rotation_height) {
Some(leader_id) if leader_id == *id => is_next_leader = true,
// In the case that we are not in the current scope of the leader schedule
// window then either:
//
// 1) The replicate stage hasn't caught up to the "consumed" entries we sent,
// in which case it will eventually catch up
//
// 2) We are on the border between seed_rotation_intervals, so the
// schedule won't be known until the entry on that cusp is received
// by the replicate stage (which comes after this stage). Hence, the next
// leader at the beginning of that next epoch will not know he is the
// leader until he receives that last "cusp" entry. He also won't ask for repairs
// for that entry because "is_next_leader" won't be set here. In this case,
// everybody will be blocking waiting for that "cusp" entry instead of repairing,
// until the leader hits "times" >= the max times in calculate_max_repair().
// The impact of this, along with the similar problem from broadcast for the transitioning
// leader, can be observed in the multinode test, test_full_leader_validator_network(),
None => (),
_ => (),
if let Some(next_leader_rotation_height) =
ls_lock.max_height_for_leader(tick_height)
{
match ls_lock.get_scheduled_leader(next_leader_rotation_height) {
Some(leader_id) if leader_id == *id => is_next_leader = true,
// In the case that we are not in the current scope of the leader schedule
// window then either:
//
// 1) The replicate stage hasn't caught up to the "consumed" entries we sent,
// in which case it will eventually catch up
//
// 2) We are on the border between seed_rotation_intervals, so the
// schedule won't be known until the entry on that cusp is received
// by the replicate stage (which comes after this stage). Hence, the next
// leader at the beginning of that next epoch will not know he is the
// leader until he receives that last "cusp" entry. He also won't ask for repairs
// for that entry because "is_next_leader" won't be set here. In this case,
// everybody will be blocking waiting for that "cusp" entry instead of repairing,
// until the leader hits "times" >= the max times in calculate_max_repair().
// The impact of this, along with the similar problem from broadcast for the transitioning
// leader, can be observed in the multinode test, test_full_leader_validator_network(),
None => (),
_ => (),
}
}
}
}
@ -228,6 +233,7 @@ impl WindowUtil for Window {
pix: u64,
consume_queue: &mut Vec<Entry>,
consumed: &mut u64,
tick_height: &mut u64,
leader_unknown: bool,
pending_retransmits: &mut bool,
) {
@ -303,6 +309,9 @@ impl WindowUtil for Window {
// Check that we can get the entries from this blob
match reconstruct_entries_from_blobs(vec![k_data_blob]) {
Ok(entries) => {
for entry in &entries {
*tick_height += entry.is_tick() as u64;
}
consume_queue.extend(entries);
}
Err(_) => {

View File

@ -153,6 +153,7 @@ fn recv_window(
cluster_info: &Arc<RwLock<ClusterInfo>>,
consumed: &mut u64,
received: &mut u64,
tick_height: &mut u64,
max_ix: u64,
r: &BlobReceiver,
s: &EntrySender,
@ -231,6 +232,7 @@ fn recv_window(
pix,
&mut consume_queue,
consumed,
tick_height,
leader_unknown,
pending_retransmits,
);
@ -263,6 +265,7 @@ fn recv_window(
pub fn window_service(
cluster_info: Arc<RwLock<ClusterInfo>>,
window: SharedWindow,
tick_height: u64,
entry_height: u64,
max_entry_height: u64,
r: BlobReceiver,
@ -275,6 +278,7 @@ pub fn window_service(
Builder::new()
.name("solana-window".to_string())
.spawn(move || {
let mut tick_height_ = tick_height;
let mut consumed = entry_height;
let mut received = entry_height;
let mut last = entry_height;
@ -290,6 +294,7 @@ pub fn window_service(
&cluster_info,
&mut consumed,
&mut received,
&mut tick_height_,
max_entry_height,
&r,
&s,
@ -340,6 +345,7 @@ pub fn window_service(
times,
consumed,
received,
tick_height_,
max_entry_height,
&leader_scheduler,
);
@ -406,6 +412,7 @@ mod test {
win,
0,
0,
0,
r_reader,
s_window,
s_retransmit,
@ -468,6 +475,7 @@ mod test {
win,
0,
0,
0,
r_reader,
s_window,
s_retransmit,
@ -531,6 +539,7 @@ mod test {
win,
0,
0,
0,
r_reader,
s_window,
s_retransmit,

View File

@ -3,18 +3,14 @@
//! stdout, and then sends the Entry to its output channel.
use bank::Bank;
use budget_transaction::BudgetTransaction;
use cluster_info::ClusterInfo;
use counter::Counter;
use entry::Entry;
use leader_scheduler::LeaderScheduler;
use ledger::{Block, LedgerWriter};
use log::Level;
use result::{Error, Result};
use service::Service;
use signature::Keypair;
use solana_program_interface::pubkey::Pubkey;
use std::cmp;
use std::net::UdpSocket;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
@ -25,107 +21,19 @@ use streamer::responder;
use timing::{duration_as_ms, duration_as_s};
use vote_stage::send_leader_vote;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum WriteStageReturnType {
LeaderRotation,
ChannelDisconnected,
}
pub struct WriteStage {
thread_hdls: Vec<JoinHandle<()>>,
write_thread: JoinHandle<WriteStageReturnType>,
write_thread: JoinHandle<()>,
}
impl WriteStage {
// Given a vector of potential new entries to write, return as many as we can
// fit before we hit the entry height for leader rotation. Also return a boolean
// reflecting whether we actually hit an entry height for leader rotation.
fn find_leader_rotation_index(
bank: &Arc<Bank>,
my_id: &Pubkey,
leader_scheduler: &mut LeaderScheduler,
entry_height: u64,
mut new_entries: Vec<Entry>,
) -> (Vec<Entry>, bool) {
// In the case we're using the default implemntation of the leader scheduler,
// there won't ever be leader rotations at particular entry heights, so just
// return the entire input vector of entries
if leader_scheduler.use_only_bootstrap_leader {
return (new_entries, false);
}
let new_entries_length = new_entries.len();
// i is the number of entries to take
let mut i = 0;
let mut is_leader_rotation = false;
loop {
let next_leader = leader_scheduler.get_scheduled_leader(entry_height + i as u64);
if next_leader != Some(*my_id) {
is_leader_rotation = true;
break;
}
if i == new_entries_length {
break;
}
// Find out how many more entries we can squeeze in until the next leader
// rotation
let entries_until_leader_rotation = leader_scheduler.entries_until_next_leader_rotation(
entry_height + (i as u64)
).expect("Leader rotation should exist when not using default implementation of LeaderScheduler");
// Check the next leader rotation height entries in new_entries, or
// if the new_entries doesnt have that many entries remaining,
// just check the rest of the new_entries_vector
let step = cmp::min(
entries_until_leader_rotation as usize,
new_entries_length - i,
);
// 1) Since "i" is the current number/count of items from the new_entries vector we have
// have already checked, then "i" is also the INDEX into the new_entries vector of the
// next unprocessed entry. Hence this loop checks all entries between indexes:
// [entry_height + i, entry_height + i + step - 1], which is equivalent to the
// entry heights: [entry_height + i + 1, entry_height + i + step]
for (entry, new_entries_index) in new_entries[i..(i + step)].iter().zip(i..(i + step)) {
let votes = entry
.transactions
.iter()
.filter_map(BudgetTransaction::vote);
for (voter_id, _, _) in votes {
leader_scheduler
.push_vote(voter_id, entry_height + new_entries_index as u64 + 1);
}
// TODO: There's an issue here that the bank state may have updated
// while this entry was in flight from the BankingStage, which could cause
// the leader schedule, which is based on stake (tied to the bank account balances)
// right now, to be inconsistent with the rest of the network. Fix once
// we can pin PoH height to bank state
leader_scheduler.update_height(entry_height + new_entries_index as u64 + 1, bank);
}
i += step
}
new_entries.truncate(i as usize);
(new_entries, is_leader_rotation)
}
/// Process any Entry items that have been published by the RecordStage.
/// continuosly send entries out
pub fn write_and_send_entries(
bank: &Arc<Bank>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
ledger_writer: &mut LedgerWriter,
entry_sender: &Sender<Vec<Entry>>,
entry_receiver: &Receiver<Vec<Entry>>,
entry_height: &mut u64,
leader_scheduler: &Arc<RwLock<LeaderScheduler>>,
) -> Result<()> {
let mut ventries = Vec::new();
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
@ -134,22 +42,8 @@ impl WriteStage {
let mut num_txs = 0;
loop {
// Find out how many more entries we can squeeze in until the next leader
// rotation
let (new_entries, is_leader_rotation) = Self::find_leader_rotation_index(
bank,
&cluster_info.read().unwrap().my_data().id,
&mut *leader_scheduler.write().unwrap(),
*entry_height + num_new_entries as u64,
received_entries,
);
num_new_entries += new_entries.len();
ventries.push(new_entries);
if is_leader_rotation {
break;
}
num_new_entries += received_entries.len();
ventries.push(received_entries);
if let Ok(n) = entry_receiver.try_recv() {
received_entries = n;
@ -175,9 +69,6 @@ impl WriteStage {
num_txs += e.transactions.len();
ledger_writer.write_entry_noflush(&e)?;
}
// Once the entries have been written to the ledger, then we can
// safely incement entry height
*entry_height += entries.len() as u64;
inc_new_counter_info!("write_stage-write_entries", entries.len());
@ -217,8 +108,6 @@ impl WriteStage {
cluster_info: Arc<RwLock<ClusterInfo>>,
ledger_path: &str,
entry_receiver: Receiver<Vec<Entry>>,
entry_height: u64,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
) -> (Self, Receiver<Vec<Entry>>) {
let (vote_blob_sender, vote_blob_receiver) = channel();
let send = UdpSocket::bind("0.0.0.0:0").expect("bind");
@ -236,42 +125,16 @@ impl WriteStage {
let mut last_vote = 0;
let mut last_valid_validator_timestamp = 0;
let id = cluster_info.read().unwrap().id;
let mut entry_height_ = entry_height;
loop {
// Note that entry height is not zero indexed, it starts at 1, so the
// old leader is in power up to and including entry height
// n * leader_rotation_interval for some "n". Once we've forwarded
// that last block, check for the next scheduled leader.
match leader_scheduler
.read()
.unwrap()
.get_scheduled_leader(entry_height_)
{
Some(leader_id) if leader_id == id => (),
None => panic!(
"Scheduled leader id should never be unknown while processing entries"
),
_ => {
// If the leader is no longer in power, exit.
// When the broadcast stage has received the last blob, it
// will signal to close the fetch stage, which will in turn
// close down this write stage
return WriteStageReturnType::LeaderRotation;
}
}
if let Err(e) = Self::write_and_send_entries(
&bank,
&cluster_info,
&mut ledger_writer,
&entry_sender,
&entry_receiver,
&mut entry_height_,
&leader_scheduler,
) {
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) => {
return WriteStageReturnType::ChannelDisconnected
break;
}
Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (),
_ => {
@ -310,9 +173,9 @@ impl WriteStage {
}
impl Service for WriteStage {
type JoinReturnType = WriteStageReturnType;
type JoinReturnType = ();
fn join(self) -> thread::Result<WriteStageReturnType> {
fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
@ -320,409 +183,3 @@ impl Service for WriteStage {
self.write_thread.join()
}
}
#[cfg(test)]
mod tests {
use bank::Bank;
use cluster_info::{ClusterInfo, Node};
use entry::Entry;
use hash::Hash;
use leader_scheduler::{set_new_leader, LeaderScheduler, LeaderSchedulerConfig};
use ledger::{create_tmp_genesis, next_entries_mut, read_ledger};
use service::Service;
use signature::{Keypair, KeypairUtil};
use solana_program_interface::account::Account;
use solana_program_interface::pubkey::Pubkey;
use std::fs::remove_dir_all;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, RwLock};
use write_stage::{WriteStage, WriteStageReturnType};
struct DummyWriteStage {
write_stage: WriteStage,
entry_sender: Sender<Vec<Entry>>,
_write_stage_entry_receiver: Receiver<Vec<Entry>>,
bank: Arc<Bank>,
leader_ledger_path: String,
ledger_tail: Vec<Entry>,
leader_scheduler: Arc<RwLock<LeaderScheduler>>,
}
fn process_ledger(ledger_path: &str, bank: &Bank) -> (u64, Vec<Entry>) {
let entries = read_ledger(ledger_path, true).expect("opening ledger");
let entries = entries
.map(|e| e.unwrap_or_else(|err| panic!("failed to parse entry. error: {}", err)));
info!("processing ledger...");
bank.process_ledger(entries, &mut LeaderScheduler::default())
.expect("process_ledger")
}
fn setup_dummy_write_stage(
leader_keypair: Arc<Keypair>,
leader_scheduler_config: &LeaderSchedulerConfig,
test_name: &str,
) -> DummyWriteStage {
// Setup leader info
let leader_info = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let cluster_info = Arc::new(RwLock::new(
ClusterInfo::new(leader_info.info).expect("ClusterInfo::new"),
));
let bank = Arc::new(Bank::default());
// Make a ledger
let (_, leader_ledger_path) = create_tmp_genesis(test_name, 10_000);
let (entry_height, ledger_tail) = process_ledger(&leader_ledger_path, &bank);
// Make a dummy pipe
let (entry_sender, entry_receiver) = channel();
// Make a dummy leader scheduler we can manipulate
let leader_scheduler = Arc::new(RwLock::new(LeaderScheduler::new(leader_scheduler_config)));
// Start up the write stage
let (write_stage, _write_stage_entry_receiver) = WriteStage::new(
leader_keypair,
bank.clone(),
cluster_info.clone(),
&leader_ledger_path,
entry_receiver,
entry_height,
leader_scheduler.clone(),
);
DummyWriteStage {
write_stage,
entry_sender,
// Need to keep this alive, otherwise the write_stage will detect ChannelClosed
// and shut down
_write_stage_entry_receiver,
bank,
leader_ledger_path,
ledger_tail,
leader_scheduler,
}
}
#[test]
fn test_write_stage_leader_rotation_exit() {
let leader_keypair = Keypair::new();
let leader_id = leader_keypair.pubkey();
// Make a dummy leader scheduler we can manipulate
let bootstrap_height = 20;
let leader_rotation_interval = 10;
let seed_rotation_interval = 2 * leader_rotation_interval;
let active_window = bootstrap_height + 3 * seed_rotation_interval;
let leader_scheduler_config = LeaderSchedulerConfig::new(
leader_keypair.pubkey(),
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(active_window),
);
let write_stage_info = setup_dummy_write_stage(
Arc::new(leader_keypair),
&leader_scheduler_config,
"test_write_stage_leader_rotation_exit",
);
let mut last_id = write_stage_info
.ledger_tail
.last()
.expect("Ledger should not be empty")
.id;
let mut num_hashes = 0;
let genesis_entry_height = write_stage_info.ledger_tail.len() as u64;
// Insert a nonzero balance and vote into the bank to make this node eligible
// for leader selection
write_stage_info
.leader_scheduler
.write()
.unwrap()
.push_vote(leader_id, 1);
let dummy_id = Keypair::new().pubkey();
let accounts = write_stage_info.bank.accounts();
let new_account = Account::new(1, 10, dummy_id.clone());
accounts
.write()
.unwrap()
.insert(leader_id, new_account.clone());
// Input enough entries to make exactly leader_rotation_interval entries, which will
// trigger a check for leader rotation. Because the next scheduled leader
// is ourselves, we won't exit
for _ in genesis_entry_height..bootstrap_height {
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
write_stage_info.entry_sender.send(new_entry).unwrap();
}
// Set the next scheduled next leader to some other node
{
let mut leader_scheduler = write_stage_info.leader_scheduler.write().unwrap();
set_new_leader(&write_stage_info.bank, &mut (*leader_scheduler), 1);
}
// Input enough dummy entries until the next seed rotation_interval,
// The write_stage will see that it's no longer the leader after
// checking the schedule, and exit
for _ in 0..seed_rotation_interval {
let new_entry = next_entries_mut(&mut last_id, &mut num_hashes, vec![]);
write_stage_info.entry_sender.send(new_entry).unwrap();
}
assert_eq!(
write_stage_info.write_stage.join().unwrap(),
WriteStageReturnType::LeaderRotation
);
// Make sure the ledger contains exactly 2 * leader_rotation_interval entries
let (entry_height, _) =
process_ledger(&write_stage_info.leader_ledger_path, &write_stage_info.bank);
remove_dir_all(write_stage_info.leader_ledger_path).unwrap();
assert_eq!(entry_height, 2 * leader_rotation_interval);
}
fn make_leader_scheduler(
my_id: Pubkey,
bootstrap_height: u64,
leader_rotation_interval: u64,
seed_rotation_interval: u64,
active_window: u64,
) -> LeaderScheduler {
let leader_scheduler_config = LeaderSchedulerConfig::new(
my_id,
Some(bootstrap_height),
Some(leader_rotation_interval),
Some(seed_rotation_interval),
Some(active_window),
);
let mut leader_scheduler = LeaderScheduler::new(&leader_scheduler_config);
leader_scheduler.push_vote(my_id, 1);
leader_scheduler
}
#[test]
// Tests for when the leader across slots and epochs are the same
fn test_same_leader_index_calculation() {
// Set up a dummy node
let my_keypair = Arc::new(Keypair::new());
let my_id = my_keypair.pubkey();
// Set up a dummy bank
let bank = Arc::new(Bank::default());
let accounts = bank.accounts();
let dummy_id = Keypair::new().pubkey();
let new_account = Account::new(1, 10, dummy_id.clone());
accounts.write().unwrap().insert(my_id, new_account.clone());
let entry = Entry::new(&Hash::default(), 0, vec![]);
// Note: An slot is the period of leader_rotation_interval entries
// time during which a leader is in power
let leader_rotation_interval = 10;
let bootstrap_height = 17;
let seed_rotation_interval = 3 * leader_rotation_interval;
let active_window = bootstrap_height + 3 * seed_rotation_interval;
let mut leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
seed_rotation_interval,
active_window,
);
// Case 1: A vector that is completely within a certain slot should return that
// entire vector
let mut len = (leader_scheduler.bootstrap_height - 1) as usize;
let mut input = vec![entry.clone(); len];
let mut result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
0,
input.clone(),
);
assert_eq!(result, (input, false));
// Case 2: A vector of new entries that spans multiple slots should return the
// entire vector, assuming that the same leader is in power for all the slots.
len = 2 * seed_rotation_interval as usize;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
bootstrap_height - 1,
input.clone(),
);
assert_eq!(result, (input, false));
// Case 3: A vector that triggers a check for leader rotation should return
// the entire vector and signal leader_rotation == false, if the
// same leader is in power for the next slot as well.
let mut leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
seed_rotation_interval,
active_window,
);
len = 1;
input = vec![entry.clone(); len];
result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
bootstrap_height - 1,
input.clone(),
);
assert_eq!(result, (input.clone(), false));
result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
bootstrap_height + seed_rotation_interval - 1,
input.clone(),
);
assert_eq!(result, (input, false));
}
// Tests for when the leader across slots / epochs are different
#[test]
fn test_different_leader_index_calculation() {
// Set up a dummy node
let my_keypair = Arc::new(Keypair::new());
let my_id = my_keypair.pubkey();
// Set up a dummy bank
let bank = Arc::new(Bank::default());
let entry = Entry::new(&Hash::default(), 0, vec![]);
// Note: An slot is the period of leader_rotation_interval entries
// time during which a leader is in power
let leader_rotation_interval = 10;
let bootstrap_height = 17;
let seed_rotation_interval = 3 * leader_rotation_interval;
let active_window = 1;
let swap_entry_height = bootstrap_height + 2 * seed_rotation_interval;
// Case 1: A vector that spans different epochs for different leaders
// should get truncated
// Set the leader scheduler
let mut leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
seed_rotation_interval,
active_window,
);
set_new_leader(&bank, &mut leader_scheduler, swap_entry_height);
// Start test
let mut start_height = bootstrap_height - 1;
let extra_len = leader_rotation_interval;
let expected_len = swap_entry_height - start_height;
let mut len = expected_len + extra_len;
let mut input = vec![entry.clone(); len as usize];
let mut result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
start_height,
input.clone(),
);
input.truncate(expected_len as usize);
assert_eq!(result, (input, true));
// Case 2: Start at entry height == the height where the next leader is elected, should
// return no entries
len = 1;
input = vec![entry.clone(); len as usize];
result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
swap_entry_height,
input.clone(),
);
assert_eq!(result, (vec![], true));
// Case 3: A vector that lands one before leader rotation happens should not be
// truncated, and should signal leader rotation == false
// Reset the leader scheduler
leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
seed_rotation_interval,
active_window,
);
set_new_leader(&bank, &mut leader_scheduler, swap_entry_height);
// Start test
start_height = bootstrap_height - 1;
let len_remaining = swap_entry_height - start_height;
len = len_remaining - 1;
input = vec![entry.clone(); len as usize];
result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
start_height,
input.clone(),
);
assert_eq!(result, (input, false));
// Case 4: A vector that lands exactly where leader rotation happens should not be
// truncated, but should return leader rotation == true
// Reset the leader scheduler
leader_scheduler = make_leader_scheduler(
my_id,
bootstrap_height,
leader_rotation_interval,
seed_rotation_interval,
active_window,
);
set_new_leader(&bank, &mut leader_scheduler, swap_entry_height);
// Generate the schedule
leader_scheduler.update_height(bootstrap_height, &bank);
// Start test
start_height = bootstrap_height + leader_rotation_interval - 1;
len = swap_entry_height - start_height;
input = vec![entry.clone(); len as usize];
result = WriteStage::find_leader_rotation_index(
&bank,
&my_id,
&mut leader_scheduler,
start_height,
input.clone(),
);
assert_eq!(result, (input, true));
}
}

View File

@ -774,10 +774,8 @@ fn test_leader_to_validator_transition() {
logger::setup();
let leader_rotation_interval = 20;
// Make a dummy validator id to be the next leader and
// sink for this test's mock transactions
// Make a dummy validator id to be the next leader
let validator_keypair = Keypair::new();
let validator_id = validator_keypair.pubkey();
// Create the leader node information
let leader_keypair = Keypair::new();
@ -786,8 +784,12 @@ fn test_leader_to_validator_transition() {
// Initialize the leader ledger. Make a mint and a genesis entry
// in the leader ledger
let (mint, leader_ledger_path, genesis_entries) =
create_tmp_sample_ledger("test_leader_to_validator_transition", 10_000);
let num_ending_ticks = 1;
let (mint, leader_ledger_path, genesis_entries) = create_tmp_sample_ledger(
"test_leader_to_validator_transition",
10_000,
num_ending_ticks,
);
let last_id = genesis_entries
.last()
@ -798,10 +800,8 @@ fn test_leader_to_validator_transition() {
// after the bootstrap height
let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
let bootstrap_entries =
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id);
let bootstrap_entries_len = bootstrap_entries.len();
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0);
ledger_writer.write_entries(bootstrap_entries).unwrap();
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries_len) as u64;
// Start the leader node
let bootstrap_height = leader_rotation_interval;
@ -842,39 +842,25 @@ fn test_leader_to_validator_transition() {
assert!(converged);
let extra_transactions = std::cmp::max(bootstrap_height / 3, 1);
// Account that will be the sink for all the test's transactions
let bob_pubkey = Keypair::new().pubkey();
// Push leader "extra_transactions" past bootstrap_height,
// make sure the leader stops.
assert!(ledger_initial_len < bootstrap_height);
for i in ledger_initial_len..(bootstrap_height + extra_transactions) {
if i < bootstrap_height {
// Poll to see that the bank state is updated after every transaction
// to ensure that each transaction is packaged as a single entry,
// so that we can be sure leader rotation is triggered
let expected_balance = std::cmp::min(
bootstrap_height - ledger_initial_len,
i - ledger_initial_len + 1,
);
let result = send_tx_and_retry_get_balance(
&leader_info,
&mint,
&validator_id,
1,
Some(expected_balance as i64),
);
// Push transactions until we detect an exit
let mut i = 1;
loop {
// Poll to see that the bank state is updated after every transaction
// to ensure that each transaction is packaged as a single entry,
// so that we can be sure leader rotation is triggered
let result =
send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, Some(i as i64));
// If the transaction wasn't reflected in the node, then we assume
// the node has transitioned already
if result != Some(expected_balance as i64) {
break;
}
} else {
// After bootstrap entries, we don't care about the
// number of entries generated by these transactions. These are just
// here for testing to make sure the leader stopped at the correct point.
send_tx_and_retry_get_balance(&leader_info, &mint, &validator_id, 1, None);
// If the transaction wasn't reflected in the node, then we assume
// the node has transitioned already
if result != Some(i as i64) {
break;
}
i += 1;
}
// Wait for leader to shut down tpu and restart tvu
@ -883,25 +869,22 @@ fn test_leader_to_validator_transition() {
_ => panic!("Expected reason for exit to be leader rotation"),
}
// Query now validator to make sure that he has the proper balances in his bank
// after the transitions, even though we submitted "extra_transactions"
// transactions earlier
// Query newly transitioned validator to make sure that he has the proper balances in
// the after the transitions
let mut leader_client = mk_client(&leader_info);
let maximum_bal = bootstrap_height - ledger_initial_len;
let bal = leader_client
.poll_get_balance(&validator_id)
.expect("Expected success when polling newly transitioned validator for balance")
as u64;
assert!(bal <= maximum_bal);
// Leader could have executed transactions in bank but not recorded them, so
// we only have an upper bound on the balance
if let Ok(bal) = leader_client.poll_get_balance(&bob_pubkey) {
assert!(bal <= i - 1);
}
// Check the ledger to make sure it's the right height, we should've
// transitioned after the bootstrap_height entry
let (_, entry_height, _) =
// transitioned after tick_height == bootstrap_height
let (_, tick_height, _, _) =
Fullnode::new_bank_from_ledger(&leader_ledger_path, &mut LeaderScheduler::default());
assert_eq!(entry_height, bootstrap_height);
assert_eq!(tick_height, bootstrap_height);
// Shut down
ncp.close().unwrap();
@ -927,8 +910,9 @@ fn test_leader_validator_basic() {
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
// Make a common mint and a genesis entry for both leader + validator ledgers
let num_ending_ticks = 1;
let (mint, leader_ledger_path, genesis_entries) =
create_tmp_sample_ledger("test_leader_to_validator_transition", 10_000);
create_tmp_sample_ledger("test_leader_validator_basic", 10_000, num_ending_ticks);
let validator_ledger_path = tmp_copy_ledger(&leader_ledger_path, "test_leader_validator_basic");
@ -945,10 +929,9 @@ fn test_leader_validator_basic() {
// Write the bootstrap entries to the ledger that will cause leader rotation
// after the bootstrap height
let mut ledger_writer = LedgerWriter::open(&leader_ledger_path, false).unwrap();
let bootstrap_entries =
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id);
let ledger_initial_len = (genesis_entries.len() + bootstrap_entries.len()) as u64;
ledger_writer.write_entries(bootstrap_entries).unwrap();
let active_set_entries =
make_active_set_entries(&validator_keypair, &mint.keypair(), &last_id, &last_id, 0);
ledger_writer.write_entries(active_set_entries).unwrap();
// Create the leader scheduler config
let num_bootstrap_slots = 2;
@ -985,22 +968,21 @@ fn test_leader_validator_basic() {
let servers = converge(&leader_info, 2);
assert_eq!(servers.len(), 2);
// Send transactions to the leader
let extra_transactions = std::cmp::max(leader_rotation_interval / 3, 1);
// Push "extra_transactions" past leader_rotation_interval entry height,
// make sure the validator stops.
for i in ledger_initial_len..(bootstrap_height + extra_transactions) {
let expected_balance = std::cmp::min(
bootstrap_height - ledger_initial_len,
i - ledger_initial_len + 1,
);
// Push transactions until we detect the nodes exit
let mut i = 1;
loop {
// Poll to see that the bank state is updated after every transaction
// to ensure that each transaction is packaged as a single entry,
// so that we can be sure leader rotation is triggered
let result = send_tx_and_retry_get_balance(&leader_info, &mint, &bob_pubkey, 1, None);
// If the transaction wasn't reflected in the node, then we assume
// the node has transitioned already
if result != Some(expected_balance as i64) {
if result != Some(i as i64) {
break;
}
i += 1;
}
// Wait for validator to shut down tvu and restart tpu
@ -1015,6 +997,16 @@ fn test_leader_validator_basic() {
_ => panic!("Expected reason for exit to be leader rotation"),
}
// Query newly transitioned validator to make sure that he has the proper balances
// in the bank after the transitions
let mut leader_client = mk_client(&leader_info);
// Leader could have executed transactions in bank but not recorded them, so
// we only have an upper bound on the balance
if let Ok(bal) = leader_client.poll_get_balance(&bob_pubkey) {
assert!(bal <= i - 1);
}
// Shut down
validator.close().unwrap();
leader.close().unwrap();
@ -1082,8 +1074,9 @@ fn test_dropped_handoff_recovery() {
let bootstrap_leader_info = bootstrap_leader_node.info.clone();
// Make a common mint and a genesis entry for both leader + validator's ledgers
let num_ending_ticks = 1;
let (mint, bootstrap_leader_ledger_path, genesis_entries) =
create_tmp_sample_ledger("test_dropped_handoff_recovery", 10_000);
create_tmp_sample_ledger("test_dropped_handoff_recovery", 10_000, num_ending_ticks);
let last_id = genesis_entries
.last()
@ -1101,15 +1094,12 @@ fn test_dropped_handoff_recovery() {
// Make the entries to give the next_leader validator some stake so that he will be in
// leader election active set
let first_entries =
make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id, &last_id);
let first_entries_len = first_entries.len();
let active_set_entries =
make_active_set_entries(&next_leader_keypair, &mint.keypair(), &last_id, &last_id, 0);
// Write the entries
let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();
ledger_writer.write_entries(first_entries).unwrap();
let ledger_initial_len = (genesis_entries.len() + first_entries_len) as u64;
ledger_writer.write_entries(active_set_entries).unwrap();
let next_leader_ledger_path = tmp_copy_ledger(
&bootstrap_leader_ledger_path,
@ -1119,10 +1109,13 @@ fn test_dropped_handoff_recovery() {
ledger_paths.push(next_leader_ledger_path.clone());
// Create the common leader scheduling configuration
let initial_tick_height = genesis_entries
.iter()
.fold(0, |tick_count, entry| tick_count + entry.is_tick() as u64);
let num_slots_per_epoch = (N + 1) as u64;
let leader_rotation_interval = 5;
let seed_rotation_interval = num_slots_per_epoch * leader_rotation_interval;
let bootstrap_height = ledger_initial_len + 1;
let bootstrap_height = initial_tick_height + 1;
let leader_scheduler_config = LeaderSchedulerConfig::new(
bootstrap_leader_info.id,
Some(bootstrap_height),
@ -1227,8 +1220,12 @@ fn test_full_leader_validator_network() {
}
// Make a common mint and a genesis entry for both leader + validator's ledgers
let (mint, bootstrap_leader_ledger_path, genesis_entries) =
create_tmp_sample_ledger("test_full_leader_validator_network", 10_000);
let num_ending_ticks = 1;
let (mint, bootstrap_leader_ledger_path, genesis_entries) = create_tmp_sample_ledger(
"test_full_leader_validator_network",
10_000,
num_ending_ticks,
);
let last_tick_id = genesis_entries
.last()
@ -1251,8 +1248,13 @@ fn test_full_leader_validator_network() {
for node_keypair in node_keypairs.iter() {
// Make entries to give the validator some stake so that he will be in
// leader election active set
let bootstrap_entries =
make_active_set_entries(node_keypair, &mint.keypair(), &last_entry_id, &last_tick_id);
let bootstrap_entries = make_active_set_entries(
node_keypair,
&mint.keypair(),
&last_entry_id,
&last_tick_id,
0,
);
// Write the entries
let mut ledger_writer = LedgerWriter::open(&bootstrap_leader_ledger_path, false).unwrap();