Expand blocktree processor options (#6248)
* Refactor blocktree processor args and support full leader cache * Add entry callback option * Rename num_threads to override_num_threads * Add test for entry callback * Refactor cached leader schedule changes * Add tests for blocktree process options * Refactor test * @mvines feedback
This commit is contained in:
parent
723f9a9b81
commit
5e31565574
|
@ -275,7 +275,7 @@ fn simulate_process_entries(
|
|||
initial_lamports: u64,
|
||||
num_accounts: usize,
|
||||
) {
|
||||
let bank = Bank::new(genesis_block);
|
||||
let bank = Arc::new(Bank::new(genesis_block));
|
||||
|
||||
for i in 0..(num_accounts / 2) {
|
||||
bank.transfer(initial_lamports, mint_keypair, &keypairs[i * 2].pubkey())
|
||||
|
|
|
@ -65,13 +65,23 @@ fn execute_batch(batch: &TransactionBatch) -> Result<()> {
|
|||
first_err.unwrap_or(Ok(()))
|
||||
}
|
||||
|
||||
fn execute_batches(batches: &[TransactionBatch]) -> Result<()> {
|
||||
fn execute_batches(
|
||||
bank: &Arc<Bank>,
|
||||
batches: &[TransactionBatch],
|
||||
entry_callback: Option<&ProcessCallback>,
|
||||
) -> Result<()> {
|
||||
inc_new_counter_debug!("bank-par_execute_entries-count", batches.len());
|
||||
let results: Vec<Result<()>> = PAR_THREAD_POOL.with(|thread_pool| {
|
||||
thread_pool.borrow().install(|| {
|
||||
batches
|
||||
.into_par_iter()
|
||||
.map(|batch| execute_batch(batch))
|
||||
.map(|batch| {
|
||||
let result = execute_batch(batch);
|
||||
if let Some(entry_callback) = entry_callback {
|
||||
entry_callback(bank);
|
||||
}
|
||||
result
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
});
|
||||
|
@ -84,13 +94,22 @@ fn execute_batches(batches: &[TransactionBatch]) -> Result<()> {
|
|||
/// 2. Process the locked group in parallel
|
||||
/// 3. Register the `Tick` if it's available
|
||||
/// 4. Update the leader scheduler, goto 1
|
||||
pub fn process_entries(bank: &Bank, entries: &[Entry], randomize: bool) -> Result<()> {
|
||||
pub fn process_entries(bank: &Arc<Bank>, entries: &[Entry], randomize: bool) -> Result<()> {
|
||||
process_entries_with_callback(bank, entries, randomize, None)
|
||||
}
|
||||
|
||||
fn process_entries_with_callback(
|
||||
bank: &Arc<Bank>,
|
||||
entries: &[Entry],
|
||||
randomize: bool,
|
||||
entry_callback: Option<&ProcessCallback>,
|
||||
) -> Result<()> {
|
||||
// accumulator for entries that can be processed in parallel
|
||||
let mut batches = vec![];
|
||||
for entry in entries {
|
||||
if entry.is_tick() {
|
||||
// if its a tick, execute the group and register the tick
|
||||
execute_batches(&batches)?;
|
||||
execute_batches(bank, &batches, entry_callback)?;
|
||||
batches.clear();
|
||||
bank.register_tick(&entry.hash);
|
||||
continue;
|
||||
|
@ -136,12 +155,12 @@ pub fn process_entries(bank: &Bank, entries: &[Entry], randomize: bool) -> Resul
|
|||
} else {
|
||||
// else we have an entry that conflicts with a prior entry
|
||||
// execute the current queue and try to process this entry again
|
||||
execute_batches(&batches)?;
|
||||
execute_batches(bank, &batches, entry_callback)?;
|
||||
batches.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
execute_batches(&batches)?;
|
||||
execute_batches(bank, &batches, entry_callback)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -155,27 +174,45 @@ pub enum BlocktreeProcessorError {
|
|||
LedgerVerificationFailed,
|
||||
}
|
||||
|
||||
/// Callback for accessing bank state while processing the blocktree
|
||||
pub type ProcessCallback = Arc<dyn Fn(&Bank) -> () + Sync + Send>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ProcessOptions {
|
||||
pub verify_ledger: bool,
|
||||
pub full_leader_cache: bool,
|
||||
pub dev_halt_at_slot: Option<Slot>,
|
||||
pub entry_callback: Option<ProcessCallback>,
|
||||
pub override_num_threads: Option<usize>,
|
||||
}
|
||||
|
||||
pub fn process_blocktree(
|
||||
genesis_block: &GenesisBlock,
|
||||
blocktree: &Blocktree,
|
||||
account_paths: Option<String>,
|
||||
verify_ledger: bool,
|
||||
dev_halt_at_slot: Option<Slot>,
|
||||
opts: ProcessOptions,
|
||||
) -> result::Result<(BankForks, Vec<BankForksInfo>, LeaderScheduleCache), BlocktreeProcessorError> {
|
||||
info!("processing ledger from bank 0...");
|
||||
if let Some(num_threads) = opts.override_num_threads {
|
||||
PAR_THREAD_POOL.with(|pool| {
|
||||
*pool.borrow_mut() = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(num_threads)
|
||||
.build()
|
||||
.unwrap()
|
||||
});
|
||||
}
|
||||
|
||||
// Setup bank for slot 0
|
||||
let bank0 = Arc::new(Bank::new_with_paths(&genesis_block, account_paths));
|
||||
process_bank_0(&bank0, blocktree, verify_ledger)?;
|
||||
process_blocktree_from_root(blocktree, bank0, verify_ledger, dev_halt_at_slot)
|
||||
info!("processing ledger from bank 0...");
|
||||
process_bank_0(&bank0, blocktree, &opts)?;
|
||||
process_blocktree_from_root(blocktree, bank0, &opts)
|
||||
}
|
||||
|
||||
// Process blocktree from a known root bank
|
||||
pub fn process_blocktree_from_root(
|
||||
blocktree: &Blocktree,
|
||||
bank: Arc<Bank>,
|
||||
verify_ledger: bool,
|
||||
dev_halt_at_slot: Option<Slot>,
|
||||
opts: &ProcessOptions,
|
||||
) -> result::Result<(BankForks, Vec<BankForksInfo>, LeaderScheduleCache), BlocktreeProcessorError> {
|
||||
info!("processing ledger from root: {}...", bank.slot());
|
||||
// Starting slot must be a root, and thus has no parents
|
||||
|
@ -183,7 +220,6 @@ pub fn process_blocktree_from_root(
|
|||
let start_slot = bank.slot();
|
||||
let now = Instant::now();
|
||||
let mut rooted_path = vec![start_slot];
|
||||
let dev_halt_at_slot = dev_halt_at_slot.unwrap_or(std::u64::MAX);
|
||||
|
||||
blocktree
|
||||
.set_roots(&[start_slot])
|
||||
|
@ -196,14 +232,16 @@ pub fn process_blocktree_from_root(
|
|||
if let Some(meta) = meta {
|
||||
let epoch_schedule = bank.epoch_schedule();
|
||||
let mut leader_schedule_cache = LeaderScheduleCache::new(*epoch_schedule, &bank);
|
||||
if opts.full_leader_cache {
|
||||
leader_schedule_cache.set_max_schedules(std::usize::MAX);
|
||||
}
|
||||
let fork_info = process_pending_slots(
|
||||
&bank,
|
||||
&meta,
|
||||
blocktree,
|
||||
&mut leader_schedule_cache,
|
||||
&mut rooted_path,
|
||||
verify_ledger,
|
||||
dev_halt_at_slot,
|
||||
opts,
|
||||
)?;
|
||||
let (banks, bank_forks_info): (Vec<_>, Vec<_>) = fork_info.into_iter().unzip();
|
||||
let bank_forks = BankForks::new_from_banks(&banks, rooted_path);
|
||||
|
@ -231,35 +269,37 @@ pub fn process_blocktree_from_root(
|
|||
}
|
||||
|
||||
fn verify_and_process_entries(
|
||||
bank: &Bank,
|
||||
bank: &Arc<Bank>,
|
||||
entries: &[Entry],
|
||||
verify_ledger: bool,
|
||||
last_entry_hash: Hash,
|
||||
opts: &ProcessOptions,
|
||||
) -> result::Result<Hash, BlocktreeProcessorError> {
|
||||
assert!(!entries.is_empty());
|
||||
|
||||
if verify_ledger && !entries.verify(&last_entry_hash) {
|
||||
if opts.verify_ledger && !entries.verify(&last_entry_hash) {
|
||||
warn!("Ledger proof of history failed at slot: {}", bank.slot());
|
||||
return Err(BlocktreeProcessorError::LedgerVerificationFailed);
|
||||
}
|
||||
|
||||
process_entries(&bank, &entries, true).map_err(|err| {
|
||||
process_entries_with_callback(bank, &entries, true, opts.entry_callback.as_ref()).map_err(
|
||||
|err| {
|
||||
warn!(
|
||||
"Failed to process entries for slot {}: {:?}",
|
||||
bank.slot(),
|
||||
err
|
||||
);
|
||||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?;
|
||||
},
|
||||
)?;
|
||||
|
||||
Ok(entries.last().unwrap().hash)
|
||||
}
|
||||
|
||||
// Special handling required for processing the entries in slot 0
|
||||
fn process_bank_0(
|
||||
bank0: &Bank,
|
||||
bank0: &Arc<Bank>,
|
||||
blocktree: &Blocktree,
|
||||
verify_ledger: bool,
|
||||
opts: &ProcessOptions,
|
||||
) -> result::Result<(), BlocktreeProcessorError> {
|
||||
assert_eq!(bank0.slot(), 0);
|
||||
|
||||
|
@ -283,7 +323,7 @@ fn process_bank_0(
|
|||
}
|
||||
|
||||
if !entries.is_empty() {
|
||||
verify_and_process_entries(bank0, &entries, verify_ledger, entry0.hash)?;
|
||||
verify_and_process_entries(bank0, &entries, entry0.hash, opts)?;
|
||||
} else {
|
||||
bank0.register_tick(&entry0.hash);
|
||||
}
|
||||
|
@ -356,8 +396,7 @@ fn process_pending_slots(
|
|||
blocktree: &Blocktree,
|
||||
leader_schedule_cache: &mut LeaderScheduleCache,
|
||||
rooted_path: &mut Vec<u64>,
|
||||
verify_ledger: bool,
|
||||
dev_halt_at_slot: Slot,
|
||||
opts: &ProcessOptions,
|
||||
) -> result::Result<Vec<(Arc<Bank>, BankForksInfo)>, BlocktreeProcessorError> {
|
||||
let mut fork_info = vec![];
|
||||
let mut last_status_report = Instant::now();
|
||||
|
@ -371,6 +410,7 @@ fn process_pending_slots(
|
|||
&mut fork_info,
|
||||
)?;
|
||||
|
||||
let dev_halt_at_slot = opts.dev_halt_at_slot.unwrap_or(std::u64::MAX);
|
||||
while !pending_slots.is_empty() {
|
||||
let (slot, meta, bank, last_entry_hash) = pending_slots.pop().unwrap();
|
||||
|
||||
|
@ -385,7 +425,7 @@ fn process_pending_slots(
|
|||
BlocktreeProcessorError::LedgerVerificationFailed
|
||||
})?;
|
||||
|
||||
verify_and_process_entries(&bank, &entries, verify_ledger, last_entry_hash)?;
|
||||
verify_and_process_entries(&bank, &entries, last_entry_hash, opts)?;
|
||||
|
||||
bank.freeze(); // all banks handled by this routine are created from complete slots
|
||||
|
||||
|
@ -436,6 +476,7 @@ pub mod tests {
|
|||
use solana_sdk::system_transaction;
|
||||
use solana_sdk::transaction::Transaction;
|
||||
use solana_sdk::transaction::TransactionError;
|
||||
use std::sync::RwLock;
|
||||
|
||||
pub fn fill_blocktree_slot_with_ticks(
|
||||
blocktree: &Blocktree,
|
||||
|
@ -517,8 +558,12 @@ pub mod tests {
|
|||
// slot 2, points at slot 1
|
||||
fill_blocktree_slot_with_ticks(&blocktree, ticks_per_slot, 2, 1, blockhash);
|
||||
|
||||
let opts = ProcessOptions {
|
||||
verify_ledger: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (mut _bank_forks, bank_forks_info, _) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 1);
|
||||
assert_eq!(
|
||||
|
@ -575,8 +620,12 @@ pub mod tests {
|
|||
|
||||
blocktree.set_roots(&[0, 1, 4]).unwrap();
|
||||
|
||||
let opts = ProcessOptions {
|
||||
verify_ledger: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (bank_forks, bank_forks_info, _) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 1); // One fork, other one is ignored b/c not a descendant of the root
|
||||
|
||||
|
@ -645,8 +694,12 @@ pub mod tests {
|
|||
|
||||
blocktree.set_roots(&[0, 1]).unwrap();
|
||||
|
||||
let opts = ProcessOptions {
|
||||
verify_ledger: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (bank_forks, bank_forks_info, _) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 2); // There are two forks
|
||||
assert_eq!(
|
||||
|
@ -721,8 +774,12 @@ pub mod tests {
|
|||
blocktree.set_roots(&[last_slot + 1]).unwrap();
|
||||
|
||||
// Check that we can properly restart the ledger / leader scheduler doesn't fail
|
||||
let opts = ProcessOptions {
|
||||
verify_ledger: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (bank_forks, bank_forks_info, _) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 1); // There is one fork
|
||||
assert_eq!(
|
||||
|
@ -783,7 +840,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(2);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let keypair = Keypair::new();
|
||||
let slot_entries = create_ticks(genesis_block.ticks_per_slot - 1, genesis_block.hash());
|
||||
let tx = system_transaction::create_user_account(
|
||||
|
@ -864,8 +921,12 @@ pub mod tests {
|
|||
entries,
|
||||
)
|
||||
.unwrap();
|
||||
let opts = ProcessOptions {
|
||||
verify_ledger: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (bank_forks, bank_forks_info, _) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 1);
|
||||
assert_eq!(bank_forks.root(), 0);
|
||||
|
@ -889,8 +950,12 @@ pub mod tests {
|
|||
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
|
||||
|
||||
let blocktree = Blocktree::open(&ledger_path).unwrap();
|
||||
let opts = ProcessOptions {
|
||||
verify_ledger: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (bank_forks, bank_forks_info, _) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, true, None).unwrap();
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 1);
|
||||
assert_eq!(bank_forks_info[0], BankForksInfo { bank_slot: 0 });
|
||||
|
@ -898,10 +963,106 @@ pub mod tests {
|
|||
assert_eq!(bank.tick_height(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_ledger_options_override_threads() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(123);
|
||||
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
|
||||
|
||||
let blocktree = Blocktree::open(&ledger_path).unwrap();
|
||||
let opts = ProcessOptions {
|
||||
override_num_threads: Some(1),
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
PAR_THREAD_POOL.with(|pool| {
|
||||
assert_eq!(pool.borrow().current_num_threads(), 1);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_ledger_options_full_leader_cache() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(123);
|
||||
let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
|
||||
|
||||
let blocktree = Blocktree::open(&ledger_path).unwrap();
|
||||
let opts = ProcessOptions {
|
||||
full_leader_cache: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let (_bank_forks, _bank_forks_info, cached_leader_schedule) =
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
assert_eq!(cached_leader_schedule.max_schedules(), std::usize::MAX);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_ledger_options_entry_callback() {
|
||||
let GenesisBlockInfo {
|
||||
genesis_block,
|
||||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(100);
|
||||
let (ledger_path, last_entry_hash) = create_new_tmp_ledger!(&genesis_block);
|
||||
let blocktree =
|
||||
Blocktree::open(&ledger_path).expect("Expected to successfully open database ledger");
|
||||
let blockhash = genesis_block.hash();
|
||||
let keypairs = [Keypair::new(), Keypair::new(), Keypair::new()];
|
||||
|
||||
let tx = system_transaction::create_user_account(
|
||||
&mint_keypair,
|
||||
&keypairs[0].pubkey(),
|
||||
1,
|
||||
blockhash,
|
||||
);
|
||||
let entry_1 = next_entry(&last_entry_hash, 1, vec![tx]);
|
||||
|
||||
let tx = system_transaction::create_user_account(
|
||||
&mint_keypair,
|
||||
&keypairs[1].pubkey(),
|
||||
1,
|
||||
blockhash,
|
||||
);
|
||||
let entry_2 = next_entry(&entry_1.hash, 1, vec![tx]);
|
||||
|
||||
let mut entries = vec![entry_1, entry_2];
|
||||
entries.extend(create_ticks(genesis_block.ticks_per_slot, last_entry_hash));
|
||||
blocktree
|
||||
.write_entries(
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
genesis_block.ticks_per_slot,
|
||||
None,
|
||||
true,
|
||||
&Arc::new(Keypair::new()),
|
||||
&entries,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let callback_counter: Arc<RwLock<usize>> = Arc::default();
|
||||
let entry_callback = {
|
||||
let counter = callback_counter.clone();
|
||||
let pubkeys: Vec<Pubkey> = keypairs.iter().map(|k| k.pubkey()).collect();
|
||||
Arc::new(move |bank: &Bank| {
|
||||
let mut counter = counter.write().unwrap();
|
||||
assert_eq!(bank.get_balance(&pubkeys[*counter]), 1);
|
||||
assert_eq!(bank.get_balance(&pubkeys[*counter + 1]), 0);
|
||||
*counter += 1;
|
||||
})
|
||||
};
|
||||
|
||||
let opts = ProcessOptions {
|
||||
override_num_threads: Some(1),
|
||||
entry_callback: Some(entry_callback),
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
process_blocktree(&genesis_block, &blocktree, None, opts).unwrap();
|
||||
assert_eq!(*callback_counter.write().unwrap(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_process_entries_tick() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
|
||||
// ensure bank can process a tick
|
||||
assert_eq!(bank.tick_height(), 0);
|
||||
|
@ -917,7 +1078,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
|
||||
|
@ -951,7 +1112,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let keypair3 = Keypair::new();
|
||||
|
@ -1008,7 +1169,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let keypair3 = Keypair::new();
|
||||
|
@ -1061,7 +1222,7 @@ pub mod tests {
|
|||
assert!(process_entries(
|
||||
&bank,
|
||||
&[entry_1_to_mint.clone(), entry_2_to_3_mint_to_1.clone()],
|
||||
false
|
||||
false,
|
||||
)
|
||||
.is_err());
|
||||
|
||||
|
@ -1093,7 +1254,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let keypair3 = Keypair::new();
|
||||
|
@ -1171,7 +1332,7 @@ pub mod tests {
|
|||
entry_2_to_3_and_1_to_mint.clone(),
|
||||
entry_conflict_itself.clone()
|
||||
],
|
||||
false
|
||||
false,
|
||||
)
|
||||
.is_err());
|
||||
|
||||
|
@ -1188,7 +1349,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let keypair3 = Keypair::new();
|
||||
|
@ -1239,7 +1400,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1_000_000_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
|
||||
const NUM_TRANSFERS_PER_ENTRY: usize = 8;
|
||||
const NUM_TRANSFERS: usize = NUM_TRANSFERS_PER_ENTRY * 32;
|
||||
|
@ -1299,7 +1460,7 @@ pub mod tests {
|
|||
..
|
||||
} = create_genesis_block((num_accounts + 1) as u64 * initial_lamports);
|
||||
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
|
||||
let mut keypairs: Vec<Keypair> = vec![];
|
||||
|
||||
|
@ -1366,7 +1527,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let keypair3 = Keypair::new();
|
||||
|
@ -1438,7 +1599,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(11_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let pubkey = Pubkey::new_rand();
|
||||
bank.transfer(1_000, &mint_keypair, &pubkey).unwrap();
|
||||
assert_eq!(bank.transaction_count(), 1);
|
||||
|
@ -1480,7 +1641,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(11_000);
|
||||
let bank = Bank::new(&genesis_block);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let keypair1 = Keypair::new();
|
||||
let keypair2 = Keypair::new();
|
||||
let success_tx = system_transaction::create_user_account(
|
||||
|
@ -1552,15 +1713,19 @@ pub mod tests {
|
|||
|
||||
// Set up bank1
|
||||
let bank0 = Arc::new(Bank::new(&genesis_block));
|
||||
process_bank_0(&bank0, &blocktree, true).unwrap();
|
||||
let opts = ProcessOptions {
|
||||
verify_ledger: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
process_bank_0(&bank0, &blocktree, &opts).unwrap();
|
||||
let bank1 = Arc::new(Bank::new_from_parent(&bank0, &Pubkey::default(), 1));
|
||||
bank1.squash();
|
||||
let slot1_entries = blocktree.get_slot_entries(1, 0, None).unwrap();
|
||||
verify_and_process_entries(&bank1, &slot1_entries, true, bank0.last_blockhash()).unwrap();
|
||||
verify_and_process_entries(&bank1, &slot1_entries, bank0.last_blockhash(), &opts).unwrap();
|
||||
|
||||
// Test process_blocktree_from_root() from slot 1 onwards
|
||||
let (bank_forks, bank_forks_info, _) =
|
||||
process_blocktree_from_root(&blocktree, bank1, true, None).unwrap();
|
||||
process_blocktree_from_root(&blocktree, bank1, &opts).unwrap();
|
||||
|
||||
assert_eq!(bank_forks_info.len(), 1); // One fork
|
||||
assert_eq!(
|
||||
|
@ -1596,7 +1761,7 @@ pub mod tests {
|
|||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_block(1_000_000_000);
|
||||
let mut bank = Bank::new(&genesis_block);
|
||||
let mut bank = Arc::new(Bank::new(&genesis_block));
|
||||
|
||||
const NUM_TRANSFERS_PER_ENTRY: usize = 8;
|
||||
const NUM_TRANSFERS: usize = NUM_TRANSFERS_PER_ENTRY * 32;
|
||||
|
@ -1676,19 +1841,17 @@ pub mod tests {
|
|||
)
|
||||
.expect("process ticks failed");
|
||||
|
||||
let parent = Arc::new(bank);
|
||||
|
||||
if i % 16 == 0 {
|
||||
root.map(|old_root| old_root.squash());
|
||||
root = Some(parent.clone());
|
||||
root = Some(bank.clone());
|
||||
}
|
||||
i += 1;
|
||||
|
||||
bank = Bank::new_from_parent(
|
||||
&parent,
|
||||
bank = Arc::new(Bank::new_from_parent(
|
||||
&bank,
|
||||
&Pubkey::default(),
|
||||
parent.slot() + thread_rng().gen_range(1, 3),
|
||||
);
|
||||
bank.slot() + thread_rng().gen_range(1, 3),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,12 +11,20 @@ use std::sync::{Arc, RwLock};
|
|||
type CachedSchedules = (HashMap<u64, Arc<LeaderSchedule>>, VecDeque<u64>);
|
||||
const MAX_SCHEDULES: usize = 10;
|
||||
|
||||
struct CacheCapacity(usize);
|
||||
impl Default for CacheCapacity {
|
||||
fn default() -> Self {
|
||||
CacheCapacity(MAX_SCHEDULES)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct LeaderScheduleCache {
|
||||
// Map from an epoch to a leader schedule for that epoch
|
||||
pub cached_schedules: RwLock<CachedSchedules>,
|
||||
epoch_schedule: EpochSchedule,
|
||||
max_epoch: RwLock<u64>,
|
||||
max_schedules: CacheCapacity,
|
||||
}
|
||||
|
||||
impl LeaderScheduleCache {
|
||||
|
@ -29,6 +37,7 @@ impl LeaderScheduleCache {
|
|||
cached_schedules: RwLock::new((HashMap::new(), VecDeque::new())),
|
||||
epoch_schedule,
|
||||
max_epoch: RwLock::new(0),
|
||||
max_schedules: CacheCapacity::default(),
|
||||
};
|
||||
|
||||
// This sets the root and calculates the schedule at stakers_epoch(root)
|
||||
|
@ -43,6 +52,16 @@ impl LeaderScheduleCache {
|
|||
cache
|
||||
}
|
||||
|
||||
pub fn set_max_schedules(&mut self, max_schedules: usize) {
|
||||
if max_schedules > 0 {
|
||||
self.max_schedules = CacheCapacity(max_schedules);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn max_schedules(&self) -> usize {
|
||||
self.max_schedules.0
|
||||
}
|
||||
|
||||
pub fn set_root(&self, root_bank: &Bank) {
|
||||
let new_max_epoch = self.epoch_schedule.get_stakers_epoch(root_bank.slot());
|
||||
let old_max_epoch = {
|
||||
|
@ -189,14 +208,18 @@ impl LeaderScheduleCache {
|
|||
if let Entry::Vacant(v) = entry {
|
||||
v.insert(leader_schedule.clone());
|
||||
order.push_back(epoch);
|
||||
Self::retain_latest(cached_schedules, order);
|
||||
Self::retain_latest(cached_schedules, order, self.max_schedules());
|
||||
}
|
||||
leader_schedule
|
||||
})
|
||||
}
|
||||
|
||||
fn retain_latest(schedules: &mut HashMap<u64, Arc<LeaderSchedule>>, order: &mut VecDeque<u64>) {
|
||||
if schedules.len() > MAX_SCHEDULES {
|
||||
fn retain_latest(
|
||||
schedules: &mut HashMap<u64, Arc<LeaderSchedule>>,
|
||||
order: &mut VecDeque<u64>,
|
||||
max_schedules: usize,
|
||||
) {
|
||||
while schedules.len() > max_schedules {
|
||||
let first = order.pop_front().unwrap();
|
||||
schedules.remove(&first);
|
||||
}
|
||||
|
@ -226,6 +249,7 @@ mod tests {
|
|||
let bank = Bank::new(&genesis_block);
|
||||
let cache = LeaderScheduleCache::new_from_bank(&bank);
|
||||
assert_eq!(bank.slot(), 0);
|
||||
assert_eq!(cache.max_schedules(), MAX_SCHEDULES);
|
||||
|
||||
// Epoch schedule for all epochs in the range:
|
||||
// [0, stakers_epoch(bank.slot())] should
|
||||
|
@ -263,7 +287,7 @@ mod tests {
|
|||
cached_schedules.insert(i as u64, Arc::new(LeaderSchedule::default()));
|
||||
order.push_back(i as u64);
|
||||
}
|
||||
LeaderScheduleCache::retain_latest(&mut cached_schedules, &mut order);
|
||||
LeaderScheduleCache::retain_latest(&mut cached_schedules, &mut order, MAX_SCHEDULES);
|
||||
assert_eq!(cached_schedules.len(), MAX_SCHEDULES);
|
||||
let mut keys: Vec<_> = cached_schedules.keys().cloned().collect();
|
||||
keys.sort();
|
||||
|
@ -539,4 +563,18 @@ mod tests {
|
|||
assert_eq!(bank2.get_epoch_and_slot_index(224).0, 3);
|
||||
assert!(cache.slot_leader_at(224, Some(&bank2)).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_max_schedules() {
|
||||
let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(2);
|
||||
let bank = Arc::new(Bank::new(&genesis_block));
|
||||
let mut cache = LeaderScheduleCache::new_from_bank(&bank);
|
||||
|
||||
// Max schedules must be greater than 0
|
||||
cache.set_max_schedules(0);
|
||||
assert_eq!(cache.max_schedules(), MAX_SCHEDULES);
|
||||
|
||||
cache.set_max_schedules(std::usize::MAX);
|
||||
assert_eq!(cache.max_schedules(), std::usize::MAX);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -364,7 +364,7 @@ impl ReplayStage {
|
|||
|
||||
// Returns the replay result and the number of replayed transactions
|
||||
fn replay_blocktree_into_bank(
|
||||
bank: &Bank,
|
||||
bank: &Arc<Bank>,
|
||||
blocktree: &Blocktree,
|
||||
progress: &mut HashMap<u64, ForkProgress>,
|
||||
) -> (Result<()>, usize) {
|
||||
|
@ -675,7 +675,7 @@ impl ReplayStage {
|
|||
}
|
||||
|
||||
fn replay_entries_into_bank(
|
||||
bank: &Bank,
|
||||
bank: &Arc<Bank>,
|
||||
entries: Vec<Entry>,
|
||||
progress: &mut HashMap<u64, ForkProgress>,
|
||||
num: usize,
|
||||
|
@ -698,7 +698,7 @@ impl ReplayStage {
|
|||
}
|
||||
|
||||
pub fn verify_and_process_entries(
|
||||
bank: &Bank,
|
||||
bank: &Arc<Bank>,
|
||||
entries: &[Entry],
|
||||
last_entry: &Hash,
|
||||
shred_index: usize,
|
||||
|
|
|
@ -737,14 +737,14 @@ mod tests {
|
|||
let mut last_bank = bank;
|
||||
let rooted_banks = (slot..slot + last_bank.slots_per_segment() + 1)
|
||||
.map(|i| {
|
||||
let bank = Bank::new_from_parent(&last_bank, &keypair.pubkey(), i);
|
||||
let bank = Arc::new(Bank::new_from_parent(&last_bank, &keypair.pubkey(), i));
|
||||
blocktree_processor::process_entries(
|
||||
&bank,
|
||||
&entry::create_ticks(64, bank.last_blockhash()),
|
||||
true,
|
||||
)
|
||||
.expect("failed process entries");
|
||||
last_bank = Arc::new(bank);
|
||||
last_bank = bank;
|
||||
last_bank.clone()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
|
|
@ -417,8 +417,11 @@ fn get_bank_forks(
|
|||
return blocktree_processor::process_blocktree_from_root(
|
||||
blocktree,
|
||||
Arc::new(deserialized_bank),
|
||||
&blocktree_processor::ProcessOptions {
|
||||
verify_ledger,
|
||||
dev_halt_at_slot,
|
||||
..blocktree_processor::ProcessOptions::default()
|
||||
},
|
||||
)
|
||||
.expect("processing blocktree after loading snapshot failed");
|
||||
} else {
|
||||
|
@ -433,8 +436,11 @@ fn get_bank_forks(
|
|||
&genesis_block,
|
||||
&blocktree,
|
||||
account_paths,
|
||||
blocktree_processor::ProcessOptions {
|
||||
verify_ledger,
|
||||
dev_halt_at_slot,
|
||||
..blocktree_processor::ProcessOptions::default()
|
||||
},
|
||||
)
|
||||
.expect("process_blocktree failed")
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use clap::{crate_description, crate_name, crate_version, value_t_or_exit, App, Arg, SubCommand};
|
||||
use solana_core::blocktree::Blocktree;
|
||||
use solana_core::blocktree_processor::process_blocktree;
|
||||
use solana_core::blocktree_processor::{process_blocktree, ProcessOptions};
|
||||
use solana_sdk::clock::Slot;
|
||||
use solana_sdk::genesis_block::GenesisBlock;
|
||||
use std::collections::BTreeMap;
|
||||
|
@ -168,7 +168,11 @@ fn main() {
|
|||
}
|
||||
("verify", _) => {
|
||||
println!("Verifying ledger...");
|
||||
match process_blocktree(&genesis_block, &blocktree, None, true, None) {
|
||||
let options = ProcessOptions {
|
||||
verify_ledger: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
match process_blocktree(&genesis_block, &blocktree, None, options) {
|
||||
Ok((_bank_forks, bank_forks_info, _)) => {
|
||||
println!("{:?}", bank_forks_info);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue