Add a cache for leader schedules (#3841)

* Add a cache for leader schedules
This commit is contained in:
carllin 2019-04-19 02:39:44 -07:00 committed by GitHub
parent 0f88872650
commit 512bfc93cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 568 additions and 254 deletions

View File

@ -10,6 +10,7 @@ use solana::banking_stage::{create_test_recorder, BankingStage};
use solana::blocktree::{get_tmp_ledger_path, Blocktree};
use solana::cluster_info::ClusterInfo;
use solana::cluster_info::Node;
use solana::leader_schedule_cache::LeaderScheduleCache;
use solana::packet::to_packets_chunked;
use solana::poh_recorder::WorkingBankEntries;
use solana::service::Service;
@ -57,6 +58,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let dummy = system_transaction::transfer(
&mint_keypair,
&mint_keypair.pubkey(),
@ -122,6 +124,7 @@ fn bench_banking_stage_multi_accounts(bencher: &mut Bencher) {
&poh_recorder,
verified_receiver,
vote_receiver,
&leader_schedule_cache,
);
poh_recorder.lock().unwrap().set_bank(&bank);
@ -164,6 +167,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let dummy = system_transaction::transfer(
&mint_keypair,
&mint_keypair.pubkey(),
@ -245,6 +249,7 @@ fn bench_banking_stage_multi_programs(bencher: &mut Bencher) {
&poh_recorder,
verified_receiver,
vote_receiver,
&leader_schedule_cache,
);
poh_recorder.lock().unwrap().set_bank(&bank);

View File

@ -6,7 +6,7 @@ use crate::cluster_info::ClusterInfo;
use crate::contact_info::ContactInfo;
use crate::entry;
use crate::entry::{hash_transactions, Entry};
use crate::leader_schedule_utils;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::packet;
use crate::packet::{Packet, Packets};
use crate::poh_recorder::{PohRecorder, PohRecorderError, WorkingBankEntries};
@ -56,6 +56,7 @@ impl BankingStage {
poh_recorder: &Arc<Mutex<PohRecorder>>,
verified_receiver: Receiver<VerifiedPackets>,
verified_vote_receiver: Receiver<VerifiedPackets>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> Self {
Self::new_num_threads(
cluster_info,
@ -63,6 +64,7 @@ impl BankingStage {
verified_receiver,
verified_vote_receiver,
cmp::min(2, Self::num_threads()),
leader_schedule_cache,
)
}
@ -72,6 +74,7 @@ impl BankingStage {
verified_receiver: Receiver<VerifiedPackets>,
verified_vote_receiver: Receiver<VerifiedPackets>,
num_threads: u32,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> Self {
let verified_receiver = Arc::new(Mutex::new(verified_receiver));
let verified_vote_receiver = Arc::new(Mutex::new(verified_vote_receiver));
@ -95,6 +98,7 @@ impl BankingStage {
let cluster_info = cluster_info.clone();
let exit = exit.clone();
let mut recv_start = Instant::now();
let leader_schedule_cache = leader_schedule_cache.clone();
Builder::new()
.name("solana-banking-stage-tx".to_string())
.spawn(move || {
@ -104,6 +108,7 @@ impl BankingStage {
&cluster_info,
&mut recv_start,
enable_forwarding,
leader_schedule_cache,
);
exit.store(true, Ordering::Relaxed);
})
@ -238,6 +243,7 @@ impl BankingStage {
fn should_buffer_packets(
poh_recorder: &Arc<Mutex<PohRecorder>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> bool {
let rcluster_info = cluster_info.read().unwrap();
@ -245,9 +251,9 @@ impl BankingStage {
// or, if it was getting sent to me
// or, the next leader is unknown
let leader_id = match poh_recorder.lock().unwrap().bank() {
Some(bank) => {
leader_schedule_utils::slot_leader_at(bank.slot() + 1, &bank).unwrap_or_default()
}
Some(bank) => leader_schedule_cache
.slot_leader_at_else_compute(bank.slot() + 1, &bank)
.unwrap_or_default(),
None => rcluster_info
.leader_data()
.map_or(rcluster_info.id(), |x| x.id),
@ -262,6 +268,7 @@ impl BankingStage {
cluster_info: &Arc<RwLock<ClusterInfo>>,
recv_start: &mut Instant,
enable_forwarding: bool,
leader_schedule_cache: Arc<LeaderScheduleCache>,
) {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let mut buffered_packets = vec![];
@ -292,7 +299,11 @@ impl BankingStage {
{
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
Ok(unprocessed_packets) => {
if Self::should_buffer_packets(poh_recorder, cluster_info) {
if Self::should_buffer_packets(
poh_recorder,
cluster_info,
&leader_schedule_cache,
) {
let num = unprocessed_packets
.iter()
.map(|(x, start, _)| x.packets.len().saturating_sub(*start))
@ -618,6 +629,7 @@ pub fn create_test_recorder(
bank.ticks_per_slot(),
&Pubkey::default(),
blocktree,
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
poh_recorder.set_bank(&bank);
@ -647,6 +659,7 @@ mod tests {
fn test_banking_stage_shutdown1() {
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
let ledger_path = get_tmp_ledger_path!();
@ -663,6 +676,7 @@ mod tests {
&poh_recorder,
verified_receiver,
vote_receiver,
&leader_schedule_cache,
);
drop(verified_sender);
drop(vote_sender);
@ -679,6 +693,7 @@ mod tests {
let (mut genesis_block, _mint_keypair) = GenesisBlock::new(2);
genesis_block.ticks_per_slot = 4;
let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
@ -696,6 +711,7 @@ mod tests {
&poh_recorder,
verified_receiver,
vote_receiver,
&leader_schedule_cache,
);
trace!("sending bank");
sleep(Duration::from_millis(600));
@ -724,6 +740,7 @@ mod tests {
solana_logger::setup();
let (genesis_block, mint_keypair) = GenesisBlock::new(10);
let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let start_hash = bank.last_blockhash();
let (verified_sender, verified_receiver) = channel();
let (vote_sender, vote_receiver) = channel();
@ -741,6 +758,7 @@ mod tests {
&poh_recorder,
verified_receiver,
vote_receiver,
&leader_schedule_cache,
);
// fund another account so we can send 2 good transactions in a single batch.
@ -862,6 +880,7 @@ mod tests {
let entry_receiver = {
// start a banking_stage to eat verified receiver
let bank = Arc::new(Bank::new(&genesis_block));
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let blocktree = Arc::new(
Blocktree::open(&ledger_path)
.expect("Expected to be able to open database ledger"),
@ -877,6 +896,7 @@ mod tests {
verified_receiver,
vote_receiver,
2,
&leader_schedule_cache,
);
// wait for banking_stage to eat the packets
@ -933,6 +953,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
@ -1040,6 +1061,7 @@ mod tests {
bank.ticks_per_slot(),
&pubkey,
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));

View File

@ -1,7 +1,7 @@
use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree;
use crate::entry::{Entry, EntrySlice};
use crate::leader_schedule_utils;
use crate::leader_schedule_cache::LeaderScheduleCache;
use rayon::prelude::*;
use solana_metrics::counter::Counter;
use solana_runtime::bank::Bank;
@ -116,10 +116,9 @@ pub fn process_blocktree(
genesis_block: &GenesisBlock,
blocktree: &Blocktree,
account_paths: Option<String>,
) -> result::Result<(BankForks, Vec<BankForksInfo>), BlocktreeProcessorError> {
) -> result::Result<(BankForks, Vec<BankForksInfo>, LeaderScheduleCache), BlocktreeProcessorError> {
let now = Instant::now();
info!("processing ledger...");
// Setup bank for slot 0
let mut pending_slots = {
let slot = 0;
@ -139,6 +138,8 @@ pub fn process_blocktree(
vec![(slot, meta, bank, entry_height, last_entry_hash)]
};
let leader_schedule_cache = LeaderScheduleCache::new(*pending_slots[0].2.epoch_schedule());
let mut fork_info = vec![];
while !pending_slots.is_empty() {
let (slot, meta, bank, mut entry_height, mut last_entry_hash) =
@ -216,7 +217,9 @@ pub fn process_blocktree(
if next_meta.is_full() {
let next_bank = Arc::new(Bank::new_from_parent(
&bank,
&leader_schedule_utils::slot_leader_at(next_slot, &bank).unwrap(),
&leader_schedule_cache
.slot_leader_at_else_compute(next_slot, &bank)
.unwrap(),
next_slot,
));
trace!("Add child bank for slot={}", next_slot);
@ -250,7 +253,7 @@ pub fn process_blocktree(
bank_forks_info.len(),
);
Ok((bank_forks, bank_forks_info))
Ok((bank_forks, bank_forks_info, leader_schedule_cache))
}
#[cfg(test)]
@ -327,7 +330,7 @@ mod tests {
// slot 2, points at slot 1
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 2, 1, blockhash);
let (mut _bank_forks, bank_forks_info) =
let (mut _bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_block, &blocktree, None).unwrap();
assert_eq!(bank_forks_info.len(), 1);
@ -387,7 +390,7 @@ mod tests {
blocktree.set_root(0).unwrap();
blocktree.set_root(1).unwrap();
let (bank_forks, bank_forks_info) =
let (bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_block, &blocktree, None).unwrap();
assert_eq!(bank_forks_info.len(), 2); // There are two forks
@ -537,7 +540,7 @@ mod tests {
.write_entries(1, 0, 0, genesis_block.ticks_per_slot, &entries)
.unwrap();
let entry_height = genesis_block.ticks_per_slot + entries.len() as u64;
let (bank_forks, bank_forks_info) =
let (bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_block, &blocktree, None).unwrap();
assert_eq!(bank_forks_info.len(), 1);
@ -562,7 +565,7 @@ mod tests {
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
let blocktree = Blocktree::open(&ledger_path).unwrap();
let (bank_forks, bank_forks_info) =
let (bank_forks, bank_forks_info, _) =
process_blocktree(&genesis_block, &blocktree, None).unwrap();
assert_eq!(bank_forks_info.len(), 1);

View File

@ -9,8 +9,7 @@ use crate::entry::create_ticks;
use crate::entry::next_entry_mut;
use crate::entry::Entry;
use crate::gossip_service::{discover_nodes, GossipService};
use crate::leader_schedule_utils;
use crate::poh_recorder::PohRecorder;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh_service::{PohService, PohServiceConfig};
use crate::rpc::JsonRpcConfig;
use crate::rpc_pubsub_service::PubSubService;
@ -20,6 +19,8 @@ use crate::service::Service;
use crate::storage_stage::StorageState;
use crate::tpu::Tpu;
use crate::tvu::{Sockets, Tvu};
use crate::poh_recorder::PohRecorder;
use solana_metrics::counter::Counter;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
@ -94,9 +95,10 @@ impl Fullnode {
let id = keypair.pubkey();
assert_eq!(id, node.info.id);
let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) =
new_banks_from_blocktree(ledger_path, config.account_paths.clone());
let leader_schedule_cache = Arc::new(leader_schedule_cache);
let exit = Arc::new(AtomicBool::new(false));
let bank_info = &bank_forks_info[0];
let bank = bank_forks[bank_info.bank_slot].clone();
@ -107,15 +109,17 @@ impl Fullnode {
bank.last_blockhash(),
);
let blocktree = Arc::new(blocktree);
let (poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
leader_schedule_utils::next_leader_slot(&id, bank.slot(), &bank, Some(&blocktree)),
leader_schedule_cache.next_leader_slot(&id, bank.slot(), &bank, Some(&blocktree)),
bank.ticks_per_slot(),
&id,
&blocktree,
blocktree.new_blobs_signals.first().cloned(),
&leader_schedule_cache,
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit);
@ -231,6 +235,7 @@ impl Fullnode {
&poh_recorder,
sender.clone(),
receiver,
&leader_schedule_cache,
&exit,
);
let tpu = Tpu::new(
@ -244,6 +249,7 @@ impl Fullnode {
config.sigverify_disabled,
&blocktree,
sender,
&leader_schedule_cache,
&exit,
);
@ -275,14 +281,20 @@ impl Fullnode {
pub fn new_banks_from_blocktree(
blocktree_path: &str,
account_paths: Option<String>,
) -> (BankForks, Vec<BankForksInfo>, Blocktree, Receiver<bool>) {
) -> (
BankForks,
Vec<BankForksInfo>,
Blocktree,
Receiver<bool>,
LeaderScheduleCache,
) {
let genesis_block =
GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block");
let (blocktree, ledger_signal_receiver) = Blocktree::open_with_signal(blocktree_path)
.expect("Expected to successfully open database ledger");
let (bank_forks, bank_forks_info) =
let (bank_forks, bank_forks_info, leader_schedule_cache) =
blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths)
.expect("process_blocktree failed");
@ -291,6 +303,7 @@ pub fn new_banks_from_blocktree(
bank_forks_info,
blocktree,
ledger_signal_receiver,
leader_schedule_cache,
)
}

View File

@ -5,7 +5,7 @@ use solana_sdk::pubkey::Pubkey;
use std::ops::Index;
/// Stake-weighted leader schedule for one epoch.
#[derive(Debug, PartialEq)]
#[derive(Debug, Default, PartialEq)]
pub struct LeaderSchedule {
slot_leaders: Vec<Pubkey>,
}

View File

@ -0,0 +1,405 @@
use crate::blocktree::Blocktree;
use crate::leader_schedule::LeaderSchedule;
use crate::leader_schedule_utils;
use solana_runtime::bank::{Bank, EpochSchedule};
use solana_sdk::pubkey::Pubkey;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, RwLock};
type CachedSchedules = (HashMap<u64, Arc<LeaderSchedule>>, VecDeque<u64>);
const MAX_SCHEDULES: usize = 10;
#[derive(Default)]
pub struct LeaderScheduleCache {
// Map from an epoch to a leader schedule for that epoch
pub cached_schedules: RwLock<CachedSchedules>,
epoch_schedule: EpochSchedule,
}
impl LeaderScheduleCache {
pub fn new_from_bank(bank: &Bank) -> Self {
Self::new(*bank.epoch_schedule())
}
pub fn new(epoch_schedule: EpochSchedule) -> Self {
Self {
cached_schedules: RwLock::new((HashMap::new(), VecDeque::new())),
epoch_schedule,
}
}
pub fn slot_leader_at(&self, slot: u64) -> Option<Pubkey> {
let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot);
self.cached_schedules
.read()
.unwrap()
.0
.get(&epoch)
.map(|schedule| schedule[slot_index])
}
pub fn slot_leader_at_else_compute(&self, slot: u64, bank: &Bank) -> Option<Pubkey> {
let cache_result = self.slot_leader_at(slot);
if cache_result.is_some() {
cache_result
} else {
let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot);
if let Some(epoch_schedule) = self.compute_epoch_schedule(epoch, bank) {
Some(epoch_schedule[slot_index])
} else {
None
}
}
}
/// Return the next slot after the given current_slot that the given node will be leader
pub fn next_leader_slot(
&self,
pubkey: &Pubkey,
mut current_slot: u64,
bank: &Bank,
blocktree: Option<&Blocktree>,
) -> Option<u64> {
let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1);
while let Some(leader_schedule) = self.get_epoch_schedule_else_compute(epoch, bank) {
// clippy thinks I should do this:
// for (i, <item>) in leader_schedule
// .iter()
// .enumerate()
// .take(bank.get_slots_in_epoch(epoch))
// .skip(from_slot_index + 1) {
//
// but leader_schedule doesn't implement Iter...
#[allow(clippy::needless_range_loop)]
for i in start_index..bank.get_slots_in_epoch(epoch) {
current_slot += 1;
if *pubkey == leader_schedule[i] {
if let Some(blocktree) = blocktree {
if let Some(meta) = blocktree.meta(current_slot).unwrap() {
// We have already sent a blob for this slot, so skip it
if meta.received > 0 {
continue;
}
}
}
return Some(current_slot);
}
}
epoch += 1;
start_index = 0;
}
None
}
fn get_epoch_schedule_else_compute(
&self,
epoch: u64,
bank: &Bank,
) -> Option<Arc<LeaderSchedule>> {
let epoch_schedule = self.cached_schedules.read().unwrap().0.get(&epoch).cloned();
if epoch_schedule.is_some() {
epoch_schedule
} else if let Some(epoch_schedule) = self.compute_epoch_schedule(epoch, bank) {
Some(epoch_schedule)
} else {
None
}
}
fn compute_epoch_schedule(&self, epoch: u64, bank: &Bank) -> Option<Arc<LeaderSchedule>> {
let leader_schedule = leader_schedule_utils::leader_schedule(epoch, bank);
leader_schedule.map(|leader_schedule| {
let leader_schedule = Arc::new(leader_schedule);
let (ref mut cached_schedules, ref mut order) = *self.cached_schedules.write().unwrap();
// Check to see if schedule exists in case somebody already inserted in the time we were
// waiting for the lock
let entry = cached_schedules.entry(epoch);
if let Entry::Vacant(v) = entry {
v.insert(leader_schedule.clone());
order.push_back(epoch);
Self::retain_latest(cached_schedules, order);
}
leader_schedule
})
}
fn retain_latest(schedules: &mut HashMap<u64, Arc<LeaderSchedule>>, order: &mut VecDeque<u64>) {
if schedules.len() > MAX_SCHEDULES {
let first = order.pop_front().unwrap();
schedules.remove(&first);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blocktree::tests::make_slot_entries;
use crate::voting_keypair::tests::new_vote_account;
use solana_runtime::bank::{Bank, EpochSchedule};
use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS};
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread::Builder;
use crate::blocktree::get_tmp_ledger_path;
#[test]
fn test_slot_leader_at_else_compute() {
let slots_per_epoch = 10;
let epoch_schedule = EpochSchedule::new(slots_per_epoch, slots_per_epoch / 2, true);
let cache = LeaderScheduleCache::new(epoch_schedule);
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Bank::new(&genesis_block);
// Nothing in the cache, should return None
assert!(cache.slot_leader_at(bank.slot()).is_none());
// Add something to the cache
assert!(cache
.slot_leader_at_else_compute(bank.slot(), &bank)
.is_some());
assert!(cache.slot_leader_at(bank.slot()).is_some());
assert_eq!(cache.cached_schedules.read().unwrap().0.len(), 1);
}
#[test]
fn test_retain_latest() {
let mut cached_schedules = HashMap::new();
let mut order = VecDeque::new();
for i in 0..=MAX_SCHEDULES {
cached_schedules.insert(i as u64, Arc::new(LeaderSchedule::default()));
order.push_back(i as u64);
}
LeaderScheduleCache::retain_latest(&mut cached_schedules, &mut order);
assert_eq!(cached_schedules.len(), MAX_SCHEDULES);
let mut keys: Vec<_> = cached_schedules.keys().cloned().collect();
keys.sort();
let expected: Vec<_> = (1..=MAX_SCHEDULES as u64).collect();
let expected_order: VecDeque<_> = (1..=MAX_SCHEDULES as u64).collect();
assert_eq!(expected, keys);
assert_eq!(expected_order, order);
}
#[test]
fn test_thread_race_leader_schedule_cache() {
let num_runs = 10;
for _ in 0..num_runs {
run_thread_race()
}
}
fn run_thread_race() {
let slots_per_epoch = 10;
let epoch_schedule = EpochSchedule::new(slots_per_epoch, slots_per_epoch / 2, true);
let cache = Arc::new(LeaderScheduleCache::new(epoch_schedule));
let (genesis_block, _mint_keypair) = GenesisBlock::new(2);
let bank = Arc::new(Bank::new(&genesis_block));
let num_threads = 10;
let (threads, senders): (Vec<_>, Vec<_>) = (0..num_threads)
.map(|_| {
let cache = cache.clone();
let bank = bank.clone();
let (sender, receiver) = channel();
(
Builder::new()
.name("test_thread_race_leader_schedule_cache".to_string())
.spawn(move || {
let _ = receiver.recv();
cache.slot_leader_at_else_compute(bank.slot(), &bank);
})
.unwrap(),
sender,
)
})
.unzip();
for sender in &senders {
sender.send(true).unwrap();
}
for t in threads.into_iter() {
t.join().unwrap();
}
let (ref cached_schedules, ref order) = *cache.cached_schedules.read().unwrap();
assert_eq!(cached_schedules.len(), 1);
assert_eq!(order.len(), 1);
}
#[test]
fn test_next_leader_slot() {
let pubkey = Pubkey::new_rand();
let mut genesis_block = GenesisBlock::new_with_leader(
BOOTSTRAP_LEADER_LAMPORTS,
&pubkey,
BOOTSTRAP_LEADER_LAMPORTS,
)
.0;
genesis_block.epoch_warmup = false;
let bank = Bank::new(&genesis_block);
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
assert_eq!(
cache
.slot_leader_at_else_compute(bank.slot(), &bank)
.unwrap(),
pubkey
);
assert_eq!(cache.next_leader_slot(&pubkey, 0, &bank, None), Some(1));
assert_eq!(cache.next_leader_slot(&pubkey, 1, &bank, None), Some(2));
assert_eq!(
cache.next_leader_slot(
&pubkey,
2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2
&bank,
None
),
None
);
assert_eq!(
cache.next_leader_slot(
&Pubkey::new_rand(), // not in leader_schedule
0,
&bank,
None
),
None
);
}
#[test]
fn test_next_leader_slot_blocktree() {
let pubkey = Pubkey::new_rand();
let mut genesis_block = GenesisBlock::new_with_leader(
BOOTSTRAP_LEADER_LAMPORTS,
&pubkey,
BOOTSTRAP_LEADER_LAMPORTS,
)
.0;
genesis_block.epoch_warmup = false;
let bank = Bank::new(&genesis_block);
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
assert_eq!(
cache
.slot_leader_at_else_compute(bank.slot(), &bank)
.unwrap(),
pubkey
);
// Check that the next leader slot after 0 is slot 1
assert_eq!(
cache.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)),
Some(1)
);
// Write a blob into slot 2 that chains to slot 1,
// but slot 1 is empty so should not be skipped
let (blobs, _) = make_slot_entries(2, 1, 1);
blocktree.write_blobs(&blobs[..]).unwrap();
assert_eq!(
cache.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)),
Some(1)
);
// Write a blob into slot 1
let (blobs, _) = make_slot_entries(1, 0, 1);
// Check that slot 1 and 2 are skipped
blocktree.write_blobs(&blobs[..]).unwrap();
assert_eq!(
cache.next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)),
Some(3)
);
// Integrity checks
assert_eq!(
cache.next_leader_slot(
&pubkey,
2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2
&bank,
Some(&blocktree)
),
None
);
assert_eq!(
cache.next_leader_slot(
&Pubkey::new_rand(), // not in leader_schedule
0,
&bank,
Some(&blocktree)
),
None
);
}
Blocktree::destroy(&ledger_path).unwrap();
}
#[test]
fn test_next_leader_slot_next_epoch() {
let pubkey = Pubkey::new_rand();
let (mut genesis_block, mint_keypair) = GenesisBlock::new_with_leader(
2 * BOOTSTRAP_LEADER_LAMPORTS,
&pubkey,
BOOTSTRAP_LEADER_LAMPORTS,
);
genesis_block.epoch_warmup = false;
let bank = Bank::new(&genesis_block);
let cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let delegate_id = Pubkey::new_rand();
// Create new vote account
let new_voting_keypair = Keypair::new();
new_vote_account(
&mint_keypair,
&new_voting_keypair,
&delegate_id,
&bank,
BOOTSTRAP_LEADER_LAMPORTS,
);
// Have to wait until the epoch at after the epoch stakes generated at genesis
// for the new votes to take effect.
let mut target_slot = 1;
let epoch = bank.get_stakers_epoch(0);
while bank.get_stakers_epoch(target_slot) == epoch {
target_slot += 1;
}
let bank = Bank::new_from_parent(&Arc::new(bank), &Pubkey::default(), target_slot);
let mut expected_slot = 0;
let epoch = bank.get_stakers_epoch(target_slot);
for i in 0..epoch {
expected_slot += bank.get_slots_in_epoch(i);
}
let schedule = cache.compute_epoch_schedule(epoch, &bank).unwrap();
let mut index = 0;
while schedule[index] != delegate_id {
index += 1
}
expected_slot += index;
assert_eq!(
cache.next_leader_slot(&delegate_id, 0, &bank, None),
Some(expected_slot),
);
}
}

View File

@ -1,4 +1,3 @@
use crate::blocktree::Blocktree;
use crate::leader_schedule::LeaderSchedule;
use crate::staking_utils;
use solana_runtime::bank::Bank;
@ -6,7 +5,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::NUM_CONSECUTIVE_LEADER_SLOTS;
/// Return the leader schedule for the given epoch.
fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option<LeaderSchedule> {
pub fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option<LeaderSchedule> {
staking_utils::delegated_stakes_at_epoch(bank, epoch_height).map(|stakes| {
let mut seed = [0u8; 32];
seed[0..8].copy_from_slice(&epoch_height.to_le_bytes());
@ -21,6 +20,23 @@ fn leader_schedule(epoch_height: u64, bank: &Bank) -> Option<LeaderSchedule> {
})
}
/// Return the leader for the given slot.
pub fn slot_leader_at(slot: u64, bank: &Bank) -> Option<Pubkey> {
let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot);
leader_schedule(epoch, bank).map(|leader_schedule| leader_schedule[slot_index])
}
// Returns the number of ticks remaining from the specified tick_height to the end of the
// slot implied by the tick_height
pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 {
bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1
}
pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 {
tick_height / ticks_per_slot
}
fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
// Sort first by stake. If stakes are the same, sort by pubkey to ensure a
// deterministic result.
@ -37,229 +53,11 @@ fn sort_stakes(stakes: &mut Vec<(Pubkey, u64)>) {
stakes.dedup();
}
/// Return the leader for the given slot.
pub fn slot_leader_at(slot: u64, bank: &Bank) -> Option<Pubkey> {
let (epoch, slot_index) = bank.get_epoch_and_slot_index(slot);
leader_schedule(epoch, bank).map(|leader_schedule| leader_schedule[slot_index])
}
/// Return the next slot after the given current_slot that the given node will be leader
pub fn next_leader_slot(
pubkey: &Pubkey,
mut current_slot: u64,
bank: &Bank,
blocktree: Option<&Blocktree>,
) -> Option<u64> {
let (mut epoch, mut start_index) = bank.get_epoch_and_slot_index(current_slot + 1);
while let Some(leader_schedule) = leader_schedule(epoch, bank) {
// clippy thinks I should do this:
// for (i, <item>) in leader_schedule
// .iter()
// .enumerate()
// .take(bank.get_slots_in_epoch(epoch))
// .skip(from_slot_index + 1) {
//
// but leader_schedule doesn't implement Iter...
#[allow(clippy::needless_range_loop)]
for i in start_index..bank.get_slots_in_epoch(epoch) {
current_slot += 1;
if *pubkey == leader_schedule[i] {
if let Some(blocktree) = blocktree {
if let Some(meta) = blocktree.meta(current_slot).unwrap() {
// We have already sent a blob for this slot, so skip it
if meta.received > 0 {
continue;
}
}
}
return Some(current_slot);
}
}
epoch += 1;
start_index = 0;
}
None
}
// Returns the number of ticks remaining from the specified tick_height to the end of the
// slot implied by the tick_height
pub fn num_ticks_left_in_slot(bank: &Bank, tick_height: u64) -> u64 {
bank.ticks_per_slot() - tick_height % bank.ticks_per_slot() - 1
}
pub fn tick_height_to_slot(ticks_per_slot: u64, tick_height: u64) -> u64 {
tick_height / ticks_per_slot
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blocktree::get_tmp_ledger_path;
use crate::blocktree::tests::make_slot_entries;
use crate::staking_utils;
use crate::voting_keypair::tests::new_vote_account;
use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS};
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::Arc;
#[test]
fn test_next_leader_slot() {
let pubkey = Pubkey::new_rand();
let mut genesis_block = GenesisBlock::new_with_leader(
BOOTSTRAP_LEADER_LAMPORTS,
&pubkey,
BOOTSTRAP_LEADER_LAMPORTS,
)
.0;
genesis_block.epoch_warmup = false;
let bank = Bank::new(&genesis_block);
assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey);
assert_eq!(next_leader_slot(&pubkey, 0, &bank, None), Some(1));
assert_eq!(next_leader_slot(&pubkey, 1, &bank, None), Some(2));
assert_eq!(
next_leader_slot(
&pubkey,
2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2
&bank,
None
),
None
);
assert_eq!(
next_leader_slot(
&Pubkey::new_rand(), // not in leader_schedule
0,
&bank,
None
),
None
);
}
#[test]
fn test_next_leader_slot_blocktree() {
let pubkey = Pubkey::new_rand();
let mut genesis_block = GenesisBlock::new_with_leader(
BOOTSTRAP_LEADER_LAMPORTS,
&pubkey,
BOOTSTRAP_LEADER_LAMPORTS,
)
.0;
genesis_block.epoch_warmup = false;
let bank = Bank::new(&genesis_block);
let ledger_path = get_tmp_ledger_path!();
{
let blocktree = Arc::new(
Blocktree::open(&ledger_path).expect("Expected to be able to open database ledger"),
);
assert_eq!(slot_leader_at(bank.slot(), &bank).unwrap(), pubkey);
// Check that the next leader slot after 0 is slot 1
assert_eq!(
next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)),
Some(1)
);
// Write a blob into slot 2 that chains to slot 1,
// but slot 1 is empty so should not be skipped
let (blobs, _) = make_slot_entries(2, 1, 1);
blocktree.write_blobs(&blobs[..]).unwrap();
assert_eq!(
next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)),
Some(1)
);
// Write a blob into slot 1
let (blobs, _) = make_slot_entries(1, 0, 1);
// Check that slot 1 and 2 are skipped
blocktree.write_blobs(&blobs[..]).unwrap();
assert_eq!(
next_leader_slot(&pubkey, 0, &bank, Some(&blocktree)),
Some(3)
);
// Integrity checks
assert_eq!(
next_leader_slot(
&pubkey,
2 * genesis_block.slots_per_epoch - 1, // no schedule generated for epoch 2
&bank,
Some(&blocktree)
),
None
);
assert_eq!(
next_leader_slot(
&Pubkey::new_rand(), // not in leader_schedule
0,
&bank,
Some(&blocktree)
),
None
);
}
Blocktree::destroy(&ledger_path).unwrap();
}
#[test]
fn test_next_leader_slot_next_epoch() {
let pubkey = Pubkey::new_rand();
let (mut genesis_block, mint_keypair) = GenesisBlock::new_with_leader(
2 * BOOTSTRAP_LEADER_LAMPORTS,
&pubkey,
BOOTSTRAP_LEADER_LAMPORTS,
);
genesis_block.epoch_warmup = false;
let bank = Bank::new(&genesis_block);
let delegate_id = Pubkey::new_rand();
// Create new vote account
let new_voting_keypair = Keypair::new();
new_vote_account(
&mint_keypair,
&new_voting_keypair,
&delegate_id,
&bank,
BOOTSTRAP_LEADER_LAMPORTS,
);
// Have to wait until the epoch at after the epoch stakes generated at genesis
// for the new votes to take effect.
let mut target_slot = 1;
let epoch = bank.get_stakers_epoch(0);
while bank.get_stakers_epoch(target_slot) == epoch {
target_slot += 1;
}
let bank = Bank::new_from_parent(&Arc::new(bank), &Pubkey::default(), target_slot);
let mut expected_slot = 0;
let epoch = bank.get_stakers_epoch(target_slot);
for i in 0..epoch {
expected_slot += bank.get_slots_in_epoch(i);
}
let schedule = leader_schedule(epoch, &bank).unwrap();
let mut index = 0;
while schedule[index] != delegate_id {
index += 1
}
expected_slot += index;
assert_eq!(
next_leader_slot(&delegate_id, 0, &bank, None),
Some(expected_slot),
);
}
#[test]
fn test_leader_schedule_via_bank() {

View File

@ -37,6 +37,7 @@ pub mod fullnode;
pub mod gen_keys;
pub mod gossip_service;
pub mod leader_schedule;
pub mod leader_schedule_cache;
pub mod leader_schedule_utils;
pub mod local_cluster;
pub mod local_vote_signer_service;

View File

@ -12,7 +12,7 @@
//!
use crate::blocktree::Blocktree;
use crate::entry::Entry;
use crate::leader_schedule_utils;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh::Poh;
use crate::result::{Error, Result};
use solana_runtime::bank::Bank;
@ -53,13 +53,14 @@ pub struct PohRecorder {
max_last_leader_grace_ticks: u64,
id: Pubkey,
blocktree: Arc<Blocktree>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
}
impl PohRecorder {
fn clear_bank(&mut self) {
if let Some(working_bank) = self.working_bank.take() {
let bank = working_bank.bank;
let next_leader_slot = leader_schedule_utils::next_leader_slot(
let next_leader_slot = self.leader_schedule_cache.next_leader_slot(
&self.id,
bank.slot(),
&bank,
@ -278,6 +279,7 @@ impl PohRecorder {
id: &Pubkey,
blocktree: &Arc<Blocktree>,
clear_bank_signal: Option<SyncSender<bool>>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> (Self, Receiver<WorkingBankEntries>) {
let poh = Poh::new(last_entry_hash, tick_height);
let (sender, receiver) = channel();
@ -301,6 +303,7 @@ impl PohRecorder {
max_last_leader_grace_ticks,
id: *id,
blocktree: blocktree.clone(),
leader_schedule_cache: leader_schedule_cache.clone(),
},
receiver,
)
@ -317,6 +320,7 @@ impl PohRecorder {
ticks_per_slot: u64,
id: &Pubkey,
blocktree: &Arc<Blocktree>,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> (Self, Receiver<WorkingBankEntries>) {
Self::new_with_clear_signal(
tick_height,
@ -327,6 +331,7 @@ impl PohRecorder {
id,
blocktree,
None,
leader_schedule_cache,
)
}
@ -376,6 +381,7 @@ impl PohRecorder {
mod tests {
use super::*;
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::test_tx::test_tx;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::hash;
@ -399,6 +405,7 @@ mod tests {
DEFAULT_TICKS_PER_SLOT,
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
);
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1);
@ -424,6 +431,7 @@ mod tests {
DEFAULT_TICKS_PER_SLOT,
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
);
poh_recorder.tick();
poh_recorder.tick();
@ -448,6 +456,7 @@ mod tests {
DEFAULT_TICKS_PER_SLOT,
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
);
poh_recorder.tick();
assert_eq!(poh_recorder.tick_cache.len(), 1);
@ -474,6 +483,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let working_bank = WorkingBank {
@ -506,6 +516,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let working_bank = WorkingBank {
@ -550,6 +561,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
poh_recorder.tick();
@ -592,6 +604,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let working_bank = WorkingBank {
@ -628,6 +641,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let working_bank = WorkingBank {
@ -666,6 +680,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let working_bank = WorkingBank {
@ -711,6 +726,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let working_bank = WorkingBank {
@ -753,6 +769,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let working_bank = WorkingBank {
@ -786,6 +803,7 @@ mod tests {
DEFAULT_TICKS_PER_SLOT,
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
);
poh_recorder.tick();
poh_recorder.tick();
@ -816,6 +834,7 @@ mod tests {
DEFAULT_TICKS_PER_SLOT,
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
);
poh_recorder.tick();
poh_recorder.tick();
@ -846,6 +865,7 @@ mod tests {
DEFAULT_TICKS_PER_SLOT,
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::default()),
);
poh_recorder.tick();
poh_recorder.tick();
@ -876,6 +896,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let ticks_per_slot = bank.ticks_per_slot();
let working_bank = WorkingBank {
@ -908,6 +929,7 @@ mod tests {
&Pubkey::default(),
&Arc::new(blocktree),
Some(sender),
&Arc::new(LeaderScheduleCache::default()),
);
poh_recorder.set_bank(&bank);
poh_recorder.clear_bank();
@ -936,6 +958,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let end_slot = 3;
@ -980,6 +1003,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
// Test that with no leader slot, we don't reach the leader tick

View File

@ -98,6 +98,7 @@ impl Service for PohService {
mod tests {
use super::*;
use crate::blocktree::{get_tmp_ledger_path, Blocktree};
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh_recorder::WorkingBank;
use crate::result::Result;
use crate::test_tx::test_tx;
@ -123,6 +124,7 @@ mod tests {
bank.ticks_per_slot(),
&Pubkey::default(),
&Arc::new(blocktree),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
);
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let exit = Arc::new(AtomicBool::new(false));

View File

@ -5,6 +5,7 @@ use crate::blocktree::Blocktree;
use crate::blocktree_processor;
use crate::cluster_info::ClusterInfo;
use crate::entry::{Entry, EntrySender, EntrySlice};
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::leader_schedule_utils;
use crate::locktower::{Locktower, StakeLockout};
use crate::packet::BlobError;
@ -83,6 +84,7 @@ impl ReplayStage {
subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
storage_entry_sender: EntrySender,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) -> (Self, Receiver<(u64, Pubkey)>)
where
T: 'static + KeypairUtil + Send + Sync,
@ -103,6 +105,7 @@ impl ReplayStage {
.expect("blocktree.set_root() failed at replay_stage startup");
}
// Start the replay stage loop
let leader_schedule_cache = leader_schedule_cache.clone();
let t_replay = Builder::new()
.name("solana-replay-stage".to_string())
.spawn(move || {
@ -115,7 +118,11 @@ impl ReplayStage {
break;
}
Self::generate_new_bank_forks(&blocktree, &mut bank_forks.write().unwrap());
Self::generate_new_bank_forks(
&blocktree,
&mut bank_forks.write().unwrap(),
&leader_schedule_cache,
);
let mut is_tpu_bank_active = poh_recorder.lock().unwrap().bank().is_some();
@ -158,6 +165,7 @@ impl ReplayStage {
&bank,
&poh_recorder,
ticks_per_slot,
&leader_schedule_cache,
);
is_tpu_bank_active = false;
@ -185,6 +193,7 @@ impl ReplayStage {
poh_slot,
reached_leader_tick,
grace_ticks,
&leader_schedule_cache,
);
}
@ -213,6 +222,7 @@ impl ReplayStage {
poh_slot: u64,
reached_leader_tick: bool,
grace_ticks: u64,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) {
trace!("{} checking poh slot {}", my_id, poh_slot);
if bank_forks.read().unwrap().get(poh_slot).is_none() {
@ -225,7 +235,7 @@ impl ReplayStage {
};
assert!(parent.is_frozen());
leader_schedule_utils::slot_leader_at(poh_slot, &parent)
leader_schedule_cache.slot_leader_at_else_compute(poh_slot, &parent)
.map(|next_leader| {
debug!(
"me: {} leader {} at poh slot {}",
@ -327,9 +337,10 @@ impl ReplayStage {
bank: &Arc<Bank>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
ticks_per_slot: u64,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) {
let next_leader_slot =
leader_schedule_utils::next_leader_slot(&my_id, bank.slot(), &bank, Some(blocktree));
leader_schedule_cache.next_leader_slot(&my_id, bank.slot(), &bank, Some(blocktree));
poh_recorder.lock().unwrap().reset(
bank.tick_height(),
bank.last_blockhash(),
@ -557,7 +568,11 @@ impl ReplayStage {
}
}
fn generate_new_bank_forks(blocktree: &Blocktree, forks: &mut BankForks) {
fn generate_new_bank_forks(
blocktree: &Blocktree,
forks: &mut BankForks,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
) {
// Find the next slot that chains to the old slot
let frozen_banks = forks.frozen_banks();
let frozen_bank_slots: Vec<u64> = frozen_banks.keys().cloned().collect();
@ -577,7 +592,9 @@ impl ReplayStage {
trace!("child already active or frozen {}", child_id);
continue;
}
let leader = leader_schedule_utils::slot_leader_at(child_id, &parent_bank).unwrap();
let leader = leader_schedule_cache
.slot_leader_at_else_compute(child_id, &parent_bank)
.unwrap();
info!("new fork:{} parent:{}", child_id, parent_id);
forks.insert(Bank::new_from_parent(&parent_bank, &leader, child_id));
}
@ -636,10 +653,10 @@ mod test {
// Set up the replay stage
{
let voting_keypair = Arc::new(Keypair::new());
let (bank_forks, _bank_forks_info, blocktree, l_receiver) =
let (bank_forks, _bank_forks_info, blocktree, l_receiver, leader_schedule_cache) =
new_banks_from_blocktree(&my_ledger_path, None);
let bank = bank_forks.working_bank();
let leader_schedule_cache = Arc::new(leader_schedule_cache);
let blocktree = Arc::new(blocktree);
let (exit, poh_recorder, poh_service, _entry_receiver) =
create_test_recorder(&bank, &blocktree);
@ -656,6 +673,7 @@ mod test {
&Arc::new(RpcSubscriptions::default()),
&poh_recorder,
ledger_writer_sender,
&leader_schedule_cache,
);
let vote_ix = vote_instruction::vote(&voting_keypair.pubkey(), vec![Vote::new(0)]);
let vote_tx = Transaction::new_signed_instructions(
@ -754,6 +772,7 @@ mod test {
let genesis_block = GenesisBlock::new(10_000).0;
let bank0 = Bank::new(&genesis_block);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let mut bank_forks = BankForks::new(0, bank0);
bank_forks.working_bank().freeze();
@ -763,7 +782,11 @@ mod test {
blob_slot_1.set_parent(0);
blocktree.insert_data_blobs(&vec![blob_slot_1]).unwrap();
assert!(bank_forks.get(1).is_none());
ReplayStage::generate_new_bank_forks(&blocktree, &mut bank_forks);
ReplayStage::generate_new_bank_forks(
&blocktree,
&mut bank_forks,
&leader_schedule_cache,
);
assert!(bank_forks.get(1).is_some());
// Insert blob for slot 3, generate new forks, check result
@ -772,7 +795,11 @@ mod test {
blob_slot_2.set_parent(0);
blocktree.insert_data_blobs(&vec![blob_slot_2]).unwrap();
assert!(bank_forks.get(2).is_none());
ReplayStage::generate_new_bank_forks(&blocktree, &mut bank_forks);
ReplayStage::generate_new_bank_forks(
&blocktree,
&mut bank_forks,
&leader_schedule_cache,
);
assert!(bank_forks.get(1).is_some());
assert!(bank_forks.get(2).is_some());
}

View File

@ -8,6 +8,7 @@ use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::entry::EntrySender;
use crate::fetch_stage::FetchStage;
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh_recorder::{PohRecorder, WorkingBankEntries};
use crate::service::Service;
use crate::sigverify_stage::SigVerifyStage;
@ -39,6 +40,7 @@ impl Tpu {
sigverify_disabled: bool,
blocktree: &Arc<Blocktree>,
storage_entry_sender: EntrySender,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
) -> Self {
cluster_info.write().unwrap().set_leader(id);
@ -69,6 +71,7 @@ impl Tpu {
poh_recorder,
verified_receiver,
verified_vote_receiver,
leader_schedule_cache,
);
let broadcast_stage = BroadcastStage::new(

View File

@ -19,6 +19,7 @@ use crate::blocktree::Blocktree;
use crate::blocktree_processor::BankForksInfo;
use crate::cluster_info::ClusterInfo;
use crate::entry::{EntryReceiver, EntrySender};
use crate::leader_schedule_cache::LeaderScheduleCache;
use crate::poh_recorder::PohRecorder;
use crate::replay_stage::ReplayStage;
use crate::retransmit_stage::RetransmitStage;
@ -71,6 +72,7 @@ impl Tvu {
poh_recorder: &Arc<Mutex<PohRecorder>>,
storage_entry_sender: EntrySender,
storage_entry_receiver: EntryReceiver,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
) -> Self
where
@ -121,6 +123,7 @@ impl Tvu {
subscriptions,
poh_recorder,
storage_entry_sender,
leader_schedule_cache,
);
let blockstream_service = if blockstream.is_some() {
@ -214,6 +217,7 @@ pub mod tests {
create_test_recorder(&bank, &blocktree);
let voting_keypair = Keypair::new();
let (storage_entry_sender, storage_entry_receiver) = channel();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let tvu = Tvu::new(
&voting_keypair.pubkey(),
Some(Arc::new(voting_keypair)),
@ -236,6 +240,7 @@ pub mod tests {
&poh_recorder,
storage_entry_sender,
storage_entry_receiver,
&leader_schedule_cache,
&exit,
);
exit.store(true, Ordering::Relaxed);

View File

@ -83,7 +83,7 @@ fn test_replay() {
let tvu_addr = target1.info.tvu;
let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) =
fullnode::new_banks_from_blocktree(&blocktree_path, None);
let bank = bank_forks.working_bank();
assert_eq!(
@ -91,6 +91,7 @@ fn test_replay() {
starting_mint_balance
);
let leader_schedule_cache = Arc::new(leader_schedule_cache);
// start cluster_info1
let bank_forks = Arc::new(RwLock::new(bank_forks));
let mut cluster_info1 = ClusterInfo::new_with_invalid_keypair(target1.info.clone());
@ -126,6 +127,7 @@ fn test_replay() {
&poh_recorder,
storage_sender,
storage_receiver,
&leader_schedule_cache,
&exit,
);

View File

@ -112,7 +112,7 @@ fn main() {
stdout().write_all(b"\n]}\n").expect("close array");
}
("verify", _) => match process_blocktree(&genesis_block, &blocktree, None) {
Ok((_bank_forks, bank_forks_info)) => {
Ok((_bank_forks, bank_forks_info, _)) => {
println!("{:?}", bank_forks_info);
}
Err(err) => {

View File

@ -279,6 +279,10 @@ impl Bank {
}
}
pub fn epoch_schedule(&self) -> &EpochSchedule {
&self.epoch_schedule
}
/// squash the parent's state up into this Bank,
/// this Bank becomes a root
pub fn squash(&self) {