Run real snapshot packager while processing blockstore at validator startup
This commit is contained in:
parent
b101e00ffa
commit
83e041299a
|
@ -115,6 +115,9 @@ use {
|
|||
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
|
||||
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
|
||||
|
||||
/// maximum drop bank signal queue length
|
||||
const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
|
||||
|
||||
pub struct ValidatorConfig {
|
||||
pub halt_at_slot: Option<Slot>,
|
||||
pub expected_genesis_hash: Option<Hash>,
|
||||
|
@ -529,66 +532,18 @@ impl Validator {
|
|||
Some(poh_timing_point_sender.clone()),
|
||||
);
|
||||
|
||||
let pending_accounts_package = PendingAccountsPackage::default();
|
||||
let last_full_snapshot_slot = process_blockstore(
|
||||
&blockstore,
|
||||
&bank_forks,
|
||||
&leader_schedule_cache,
|
||||
&blockstore_process_options,
|
||||
transaction_status_sender.as_ref(),
|
||||
cache_block_meta_sender.as_ref(),
|
||||
config.snapshot_config.as_ref(),
|
||||
Arc::clone(&pending_accounts_package),
|
||||
blockstore_root_scan,
|
||||
pruned_banks_receiver,
|
||||
&start_progress,
|
||||
);
|
||||
let last_full_snapshot_slot =
|
||||
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));
|
||||
|
||||
maybe_warp_slot(config, ledger_path, &bank_forks, &leader_schedule_cache);
|
||||
|
||||
let tower = {
|
||||
let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id);
|
||||
if let Ok(tower) = &restored_tower {
|
||||
reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| {
|
||||
error!("Failed to reconcile blockstore with tower: {:?}", err);
|
||||
abort()
|
||||
});
|
||||
}
|
||||
|
||||
post_process_restored_tower(
|
||||
restored_tower,
|
||||
&id,
|
||||
vote_account,
|
||||
config,
|
||||
&bank_forks.read().unwrap(),
|
||||
)
|
||||
};
|
||||
info!("Tower state: {:?}", tower);
|
||||
|
||||
*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
|
||||
|
||||
let leader_schedule_cache = Arc::new(leader_schedule_cache);
|
||||
|
||||
let sample_performance_service =
|
||||
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
|
||||
Some(SamplePerformanceService::new(
|
||||
&bank_forks,
|
||||
&blockstore,
|
||||
&exit,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let bank = bank_forks.read().unwrap().working_bank();
|
||||
info!("Starting validator with working bank slot {}", bank.slot());
|
||||
|
||||
node.info.wallclock = timestamp();
|
||||
node.info.shred_version = compute_shred_version(
|
||||
&genesis_config.hash(),
|
||||
Some(&bank.hard_forks().read().unwrap()),
|
||||
Some(
|
||||
&bank_forks
|
||||
.read()
|
||||
.unwrap()
|
||||
.working_bank()
|
||||
.hard_forks()
|
||||
.read()
|
||||
.unwrap(),
|
||||
),
|
||||
);
|
||||
|
||||
Self::print_node_info(&node);
|
||||
|
@ -613,28 +568,13 @@ impl Validator {
|
|||
cluster_info.restore_contact_info(ledger_path, config.contact_save_interval);
|
||||
let cluster_info = Arc::new(cluster_info);
|
||||
|
||||
// Before replay starts, set the callbacks in each of the banks in BankForks
|
||||
// Note after this callback is created, only the AccountsBackgroundService should be calling
|
||||
// AccountsDb::purge_slot() to clean up dropped banks.
|
||||
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
|
||||
let callback = bank_forks
|
||||
.read()
|
||||
.unwrap()
|
||||
.root_bank()
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.create_drop_bank_callback(pruned_banks_sender);
|
||||
for bank in bank_forks.read().unwrap().banks().values() {
|
||||
bank.set_callback(Some(Box::new(callback.clone())));
|
||||
}
|
||||
|
||||
let (
|
||||
accounts_background_service,
|
||||
accounts_hash_verifier,
|
||||
snapshot_packager_service,
|
||||
accounts_background_request_sender,
|
||||
) = {
|
||||
let pending_accounts_package = PendingAccountsPackage::default();
|
||||
let (
|
||||
accounts_background_request_sender,
|
||||
snapshot_request_handler,
|
||||
|
@ -694,6 +634,7 @@ impl Validator {
|
|||
config.snapshot_config.clone(),
|
||||
);
|
||||
|
||||
let last_full_snapshot_slot = starting_snapshot_hashes.map(|x| x.full.hash.0);
|
||||
let accounts_background_service = AccountsBackgroundService::new(
|
||||
bank_forks.clone(),
|
||||
&exit,
|
||||
|
@ -714,6 +655,57 @@ impl Validator {
|
|||
)
|
||||
};
|
||||
|
||||
process_blockstore(
|
||||
&blockstore,
|
||||
&bank_forks,
|
||||
&leader_schedule_cache,
|
||||
&blockstore_process_options,
|
||||
transaction_status_sender.as_ref(),
|
||||
cache_block_meta_sender.as_ref(),
|
||||
blockstore_root_scan,
|
||||
&accounts_background_request_sender,
|
||||
&start_progress,
|
||||
);
|
||||
|
||||
maybe_warp_slot(config, ledger_path, &bank_forks, &leader_schedule_cache);
|
||||
|
||||
let tower = {
|
||||
let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id);
|
||||
if let Ok(tower) = &restored_tower {
|
||||
reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| {
|
||||
error!("Failed to reconcile blockstore with tower: {:?}", err);
|
||||
abort()
|
||||
});
|
||||
}
|
||||
|
||||
post_process_restored_tower(
|
||||
restored_tower,
|
||||
&id,
|
||||
vote_account,
|
||||
config,
|
||||
&bank_forks.read().unwrap(),
|
||||
)
|
||||
};
|
||||
info!("Tower state: {:?}", tower);
|
||||
|
||||
*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
|
||||
|
||||
let leader_schedule_cache = Arc::new(leader_schedule_cache);
|
||||
|
||||
let sample_performance_service =
|
||||
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
|
||||
Some(SamplePerformanceService::new(
|
||||
&bank_forks,
|
||||
&blockstore,
|
||||
&exit,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let bank = bank_forks.read().unwrap().working_bank();
|
||||
info!("Starting validator with working bank slot {}", bank.slot());
|
||||
|
||||
let mut block_commitment_cache = BlockCommitmentCache::default();
|
||||
block_commitment_cache.initialize_slots(bank.slot(), bank_forks.read().unwrap().root());
|
||||
let block_commitment_cache = Arc::new(RwLock::new(block_commitment_cache));
|
||||
|
@ -1420,7 +1412,7 @@ fn load_blockstore(
|
|||
TransactionHistoryServices::default()
|
||||
};
|
||||
|
||||
let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes, pruned_banks_receiver) =
|
||||
let (bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
|
||||
bank_forks_utils::load_bank_forks(
|
||||
&genesis_config,
|
||||
&blockstore,
|
||||
|
@ -1434,6 +1426,28 @@ fn load_blockstore(
|
|||
accounts_update_notifier,
|
||||
);
|
||||
|
||||
// Before replay starts, set the callbacks in each of the banks in BankForks so that
|
||||
// all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
|
||||
// drop behavior can be safely synchronized with any other ongoing accounts activity like
|
||||
// cache flush, clean, shrink, as long as the same thread performing those activities also
|
||||
// is processing the dropped banks from the `pruned_banks_receiver` channel.
|
||||
|
||||
// There should only be one bank, the root bank in BankForks. Thus all banks added to
|
||||
// BankForks from now on will be descended from the root bank and thus will inherit
|
||||
// the bank drop callback.
|
||||
assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
|
||||
let (pruned_banks_sender, pruned_banks_receiver) = bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
|
||||
{
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
root_bank.set_callback(Some(Box::new(
|
||||
root_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.create_drop_bank_callback(pruned_banks_sender),
|
||||
)));
|
||||
}
|
||||
|
||||
{
|
||||
let hard_forks: Vec<_> = bank_forks
|
||||
.read()
|
||||
|
@ -1508,12 +1522,10 @@ fn process_blockstore(
|
|||
process_options: &blockstore_processor::ProcessOptions,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
snapshot_config: Option<&SnapshotConfig>,
|
||||
pending_accounts_package: PendingAccountsPackage,
|
||||
blockstore_root_scan: BlockstoreRootScan,
|
||||
pruned_banks_receiver: DroppedSlotsReceiver,
|
||||
accounts_background_request_sender: &AbsRequestSender,
|
||||
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
|
||||
) -> Option<Slot> {
|
||||
) {
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
if let Some(max_slot) = highest_slot(blockstore) {
|
||||
let bank_forks = bank_forks.clone();
|
||||
|
@ -1529,17 +1541,14 @@ fn process_blockstore(
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root(
|
||||
blockstore_processor::process_blockstore_from_root(
|
||||
blockstore,
|
||||
bank_forks,
|
||||
leader_schedule_cache,
|
||||
process_options,
|
||||
transaction_status_sender,
|
||||
cache_block_meta_sender,
|
||||
snapshot_config,
|
||||
pending_accounts_package,
|
||||
pruned_banks_receiver,
|
||||
accounts_background_request_sender,
|
||||
)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("Failed to load ledger: {:?}", err);
|
||||
|
@ -1549,8 +1558,6 @@ fn process_blockstore(
|
|||
exit.store(true, Ordering::Relaxed);
|
||||
|
||||
blockstore_root_scan.join();
|
||||
|
||||
last_full_snapshot_slot
|
||||
}
|
||||
|
||||
fn maybe_warp_slot(
|
||||
|
|
|
@ -43,7 +43,6 @@ use {
|
|||
snapshot_archive_info::SnapshotArchiveInfoGetter,
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_hash::StartingSnapshotHashes,
|
||||
snapshot_package::PendingAccountsPackage,
|
||||
snapshot_utils::{
|
||||
self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
|
||||
DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
|
||||
|
@ -778,7 +777,6 @@ fn load_bank_forks(
|
|||
process_options,
|
||||
None,
|
||||
None,
|
||||
PendingAccountsPackage::default(),
|
||||
None,
|
||||
)
|
||||
.map(|(bank_forks, .., starting_snapshot_hashes)| (bank_forks, starting_snapshot_hashes))
|
||||
|
|
|
@ -7,16 +7,13 @@ use {
|
|||
},
|
||||
leader_schedule_cache::LeaderScheduleCache,
|
||||
},
|
||||
crossbeam_channel::bounded,
|
||||
log::*,
|
||||
solana_runtime::{
|
||||
accounts_background_service::DroppedSlotsReceiver,
|
||||
accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||
bank_forks::BankForks,
|
||||
snapshot_archive_info::SnapshotArchiveInfoGetter,
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_hash::{FullSnapshotHash, IncrementalSnapshotHash, StartingSnapshotHashes},
|
||||
snapshot_package::PendingAccountsPackage,
|
||||
snapshot_utils,
|
||||
},
|
||||
solana_sdk::genesis_config::GenesisConfig,
|
||||
|
@ -37,9 +34,6 @@ pub type LoadResult = result::Result<
|
|||
BlockstoreProcessorError,
|
||||
>;
|
||||
|
||||
/// maximum drop bank signal queue length
|
||||
const MAX_DROP_BANK_SIGNAL_QUEUE_SIZE: usize = 10_000;
|
||||
|
||||
/// Load the banks via genesis or a snapshot then processes all full blocks in blockstore
|
||||
///
|
||||
/// If a snapshot config is given, and a snapshot is found, it will be loaded. Otherwise, load
|
||||
|
@ -54,20 +48,18 @@ pub fn load(
|
|||
process_options: ProcessOptions,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
pending_accounts_package: PendingAccountsPackage,
|
||||
accounts_update_notifier: Option<AccountsUpdateNotifier>,
|
||||
) -> LoadResult {
|
||||
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, pruned_banks_receiver) =
|
||||
load_bank_forks(
|
||||
genesis_config,
|
||||
blockstore,
|
||||
account_paths,
|
||||
shrink_paths,
|
||||
snapshot_config,
|
||||
&process_options,
|
||||
cache_block_meta_sender,
|
||||
accounts_update_notifier,
|
||||
);
|
||||
let (bank_forks, leader_schedule_cache, starting_snapshot_hashes, ..) = load_bank_forks(
|
||||
genesis_config,
|
||||
blockstore,
|
||||
account_paths,
|
||||
shrink_paths,
|
||||
snapshot_config,
|
||||
&process_options,
|
||||
cache_block_meta_sender,
|
||||
accounts_update_notifier,
|
||||
);
|
||||
|
||||
blockstore_processor::process_blockstore_from_root(
|
||||
blockstore,
|
||||
|
@ -76,9 +68,7 @@ pub fn load(
|
|||
&process_options,
|
||||
transaction_status_sender,
|
||||
cache_block_meta_sender,
|
||||
snapshot_config,
|
||||
pending_accounts_package,
|
||||
pruned_banks_receiver,
|
||||
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
|
||||
)
|
||||
.map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes))
|
||||
}
|
||||
|
@ -97,7 +87,6 @@ pub fn load_bank_forks(
|
|||
Arc<RwLock<BankForks>>,
|
||||
LeaderScheduleCache,
|
||||
Option<StartingSnapshotHashes>,
|
||||
DroppedSlotsReceiver,
|
||||
) {
|
||||
let snapshot_present = if let Some(snapshot_config) = snapshot_config {
|
||||
info!(
|
||||
|
@ -156,33 +145,11 @@ pub fn load_bank_forks(
|
|||
)
|
||||
};
|
||||
|
||||
// Before replay starts, set the callbacks in each of the banks in BankForks so that
|
||||
// all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
|
||||
// drop behavior can be safely synchronized with any other ongoing accounts activity like
|
||||
// cache flush, clean, shrink, as long as the same thread performing those activities also
|
||||
// is processing the dropped banks from the `pruned_banks_receiver` channel.
|
||||
|
||||
// There should only be one bank, the root bank in BankForks. Thus all banks added to
|
||||
// BankForks from now on will be descended from the root bank and thus will inherit
|
||||
// the bank drop callback.
|
||||
assert_eq!(bank_forks.read().unwrap().banks().len(), 1);
|
||||
let (pruned_banks_sender, pruned_banks_receiver) = bounded(MAX_DROP_BANK_SIGNAL_QUEUE_SIZE);
|
||||
|
||||
let leader_schedule_cache = {
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
let callback = root_bank
|
||||
.rc
|
||||
.accounts
|
||||
.accounts_db
|
||||
.create_drop_bank_callback(pruned_banks_sender);
|
||||
root_bank.set_callback(Some(Box::new(callback)));
|
||||
|
||||
let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&root_bank);
|
||||
if process_options.full_leader_cache {
|
||||
leader_schedule_cache.set_max_schedules(std::usize::MAX);
|
||||
}
|
||||
leader_schedule_cache
|
||||
};
|
||||
let mut leader_schedule_cache =
|
||||
LeaderScheduleCache::new_from_bank(&bank_forks.read().unwrap().root_bank());
|
||||
if process_options.full_leader_cache {
|
||||
leader_schedule_cache.set_max_schedules(std::usize::MAX);
|
||||
}
|
||||
|
||||
if let Some(ref new_hard_forks) = process_options.new_hard_forks {
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
|
@ -200,12 +167,7 @@ pub fn load_bank_forks(
|
|||
}
|
||||
}
|
||||
|
||||
(
|
||||
bank_forks,
|
||||
leader_schedule_cache,
|
||||
starting_snapshot_hashes,
|
||||
pruned_banks_receiver,
|
||||
)
|
||||
(bank_forks, leader_schedule_cache, starting_snapshot_hashes)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
|
|
@ -17,7 +17,7 @@ use {
|
|||
solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings},
|
||||
solana_rayon_threadlimit::get_thread_count,
|
||||
solana_runtime::{
|
||||
accounts_background_service::DroppedSlotsReceiver,
|
||||
accounts_background_service::AbsRequestSender,
|
||||
accounts_db::{AccountShrinkThreshold, AccountsDbConfig},
|
||||
accounts_index::AccountSecondaryIndexes,
|
||||
accounts_update_notifier_interface::AccountsUpdateNotifier,
|
||||
|
@ -31,9 +31,6 @@ use {
|
|||
commitment::VOTE_THRESHOLD_SIZE,
|
||||
cost_model::CostModel,
|
||||
runtime_config::RuntimeConfig,
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_package::{PendingAccountsPackage, SnapshotType},
|
||||
snapshot_utils,
|
||||
transaction_batch::TransactionBatch,
|
||||
transaction_cost_metrics_sender::TransactionCostMetricsSender,
|
||||
vote_account::VoteAccountsHashMap,
|
||||
|
@ -569,17 +566,16 @@ pub fn test_process_blockstore(
|
|||
blockstore: &Blockstore,
|
||||
opts: ProcessOptions,
|
||||
) -> (Arc<RwLock<BankForks>>, LeaderScheduleCache) {
|
||||
let (bank_forks, leader_schedule_cache, .., pruned_banks_receiver) =
|
||||
crate::bank_forks_utils::load_bank_forks(
|
||||
genesis_config,
|
||||
blockstore,
|
||||
Vec::new(),
|
||||
None,
|
||||
None,
|
||||
&opts,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let (bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks(
|
||||
genesis_config,
|
||||
blockstore,
|
||||
Vec::new(),
|
||||
None,
|
||||
None,
|
||||
&opts,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
process_blockstore_from_root(
|
||||
blockstore,
|
||||
&bank_forks,
|
||||
|
@ -587,9 +583,7 @@ pub fn test_process_blockstore(
|
|||
&opts,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
PendingAccountsPackage::default(),
|
||||
pruned_banks_receiver,
|
||||
&AbsRequestSender::default(),
|
||||
)
|
||||
.unwrap();
|
||||
(bank_forks, leader_schedule_cache)
|
||||
|
@ -639,10 +633,8 @@ pub fn process_blockstore_from_root(
|
|||
opts: &ProcessOptions,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
snapshot_config: Option<&SnapshotConfig>,
|
||||
pending_accounts_package: PendingAccountsPackage,
|
||||
pruned_banks_receiver: DroppedSlotsReceiver,
|
||||
) -> result::Result<Option<Slot>, BlockstoreProcessorError> {
|
||||
accounts_background_request_sender: &AbsRequestSender,
|
||||
) -> result::Result<(), BlockstoreProcessorError> {
|
||||
if let Some(num_threads) = opts.override_num_threads {
|
||||
PAR_THREAD_POOL.with(|pool| {
|
||||
*pool.borrow_mut() = rayon::ThreadPoolBuilder::new()
|
||||
|
@ -682,8 +674,6 @@ pub fn process_blockstore_from_root(
|
|||
let mut timing = ExecuteTimings::default();
|
||||
// Iterate and replay slots from blockstore starting from `start_slot`
|
||||
|
||||
let mut last_full_snapshot_slot = None;
|
||||
|
||||
if let Some(start_slot_meta) = blockstore
|
||||
.meta(start_slot)
|
||||
.unwrap_or_else(|_| panic!("Failed to get meta for slot {}", start_slot))
|
||||
|
@ -697,11 +687,8 @@ pub fn process_blockstore_from_root(
|
|||
opts,
|
||||
transaction_status_sender,
|
||||
cache_block_meta_sender,
|
||||
snapshot_config,
|
||||
pending_accounts_package,
|
||||
&mut timing,
|
||||
&mut last_full_snapshot_slot,
|
||||
pruned_banks_receiver,
|
||||
accounts_background_request_sender,
|
||||
)?;
|
||||
} else {
|
||||
// If there's no meta for the input `start_slot`, then we started from a snapshot
|
||||
|
@ -761,7 +748,7 @@ pub fn process_blockstore_from_root(
|
|||
assert!(bank_forks.active_banks().is_empty());
|
||||
}
|
||||
|
||||
Ok(last_full_snapshot_slot)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Verify that a segment of entries has the correct number of ticks and hashes
|
||||
|
@ -1159,16 +1146,12 @@ fn load_frozen_forks(
|
|||
opts: &ProcessOptions,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
|
||||
snapshot_config: Option<&SnapshotConfig>,
|
||||
pending_accounts_package: PendingAccountsPackage,
|
||||
timing: &mut ExecuteTimings,
|
||||
last_full_snapshot_slot: &mut Option<Slot>,
|
||||
pruned_banks_receiver: DroppedSlotsReceiver,
|
||||
accounts_background_request_sender: &AbsRequestSender,
|
||||
) -> result::Result<(), BlockstoreProcessorError> {
|
||||
let recyclers = VerifyRecyclers::default();
|
||||
let mut all_banks = HashMap::new();
|
||||
let mut last_status_report = Instant::now();
|
||||
let mut last_free = Instant::now();
|
||||
let mut pending_slots = vec![];
|
||||
let mut slots_elapsed = 0;
|
||||
let mut txs = 0;
|
||||
|
@ -1292,63 +1275,10 @@ fn load_frozen_forks(
|
|||
leader_schedule_cache.set_root(new_root_bank);
|
||||
let _ = bank_forks.write().unwrap().set_root(
|
||||
root,
|
||||
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
|
||||
accounts_background_request_sender,
|
||||
None,
|
||||
);
|
||||
|
||||
if let Some(snapshot_config) = snapshot_config {
|
||||
let block_height = new_root_bank.block_height();
|
||||
if snapshot_utils::should_take_full_snapshot(
|
||||
block_height,
|
||||
snapshot_config.full_snapshot_archive_interval_slots,
|
||||
) {
|
||||
info!("Taking snapshot of new root bank that has crossed the full snapshot interval! slot: {}", root);
|
||||
*last_full_snapshot_slot = Some(root);
|
||||
new_root_bank.exhaustively_free_unused_resource(*last_full_snapshot_slot);
|
||||
last_free = Instant::now();
|
||||
new_root_bank.update_accounts_hash_with_index_option(
|
||||
false,
|
||||
snapshot_config.accounts_hash_debug_verify,
|
||||
false,
|
||||
);
|
||||
snapshot_utils::snapshot_bank(
|
||||
new_root_bank,
|
||||
new_root_bank.src.slot_deltas(&new_root_bank.src.roots()),
|
||||
&pending_accounts_package,
|
||||
&snapshot_config.bank_snapshots_dir,
|
||||
&snapshot_config.snapshot_archives_dir,
|
||||
snapshot_config.snapshot_version,
|
||||
snapshot_config.archive_format,
|
||||
None,
|
||||
Some(SnapshotType::FullSnapshot),
|
||||
)
|
||||
.expect("Failed to snapshot bank while loading frozen banks");
|
||||
trace!(
|
||||
"took bank snapshot for new root bank, block height: {}, slot: {}",
|
||||
block_height,
|
||||
root
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if last_free.elapsed() > Duration::from_secs(10) {
|
||||
// Purge account state for all dropped banks
|
||||
for (pruned_slot, pruned_bank_id) in pruned_banks_receiver.try_iter() {
|
||||
// Simulate this purge is from AccountBackgroundService
|
||||
new_root_bank.rc.accounts.accounts_db.purge_slot(
|
||||
pruned_slot,
|
||||
pruned_bank_id,
|
||||
true,
|
||||
);
|
||||
}
|
||||
|
||||
// Must be called after `squash()`, so that AccountsDb knows what
|
||||
// the roots are for the cache flushing in exhaustively_free_unused_resource().
|
||||
// This could take few secs; so update last_free later
|
||||
new_root_bank.exhaustively_free_unused_resource(*last_full_snapshot_slot);
|
||||
last_free = Instant::now();
|
||||
}
|
||||
|
||||
// Filter out all non descendants of the new root
|
||||
pending_slots
|
||||
.retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(&root));
|
||||
|
@ -3211,7 +3141,6 @@ pub mod tests {
|
|||
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank1);
|
||||
|
||||
// Test process_blockstore_from_root() from slot 1 onwards
|
||||
let (_pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
|
||||
let bank_forks = RwLock::new(bank_forks);
|
||||
process_blockstore_from_root(
|
||||
&blockstore,
|
||||
|
@ -3220,9 +3149,7 @@ pub mod tests {
|
|||
&opts,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
PendingAccountsPackage::default(),
|
||||
pruned_banks_receiver,
|
||||
&AbsRequestSender::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -3246,128 +3173,6 @@ pub mod tests {
|
|||
verify_fork_infos(&bank_forks);
|
||||
}
|
||||
|
||||
/// Test that processing the blockstore is aware of incremental snapshots. When processing the
|
||||
/// blockstore from a root, like what happens when loading from a snapshot, there may be new
|
||||
/// roots that cross a full snapshot interval. In these cases, a bank snapshot must be taken,
|
||||
/// so that a full snapshot archive is created and available by the time the background
|
||||
/// services spin up.
|
||||
///
|
||||
/// For this test, process enough roots to cross the full snapshot interval multiple times.
|
||||
/// Ensure afterwards that the snapshots were created.
|
||||
#[test]
|
||||
fn test_process_blockstore_from_root_with_snapshots() {
|
||||
solana_logger::setup();
|
||||
let GenesisConfigInfo {
|
||||
mut genesis_config, ..
|
||||
} = create_genesis_config(123);
|
||||
|
||||
let ticks_per_slot = 1;
|
||||
genesis_config.ticks_per_slot = ticks_per_slot;
|
||||
let (ledger_path, blockhash) = create_new_tmp_ledger_auto_delete!(&genesis_config);
|
||||
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
|
||||
|
||||
const ROOT_INTERVAL_SLOTS: Slot = 2;
|
||||
const FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = ROOT_INTERVAL_SLOTS * 5;
|
||||
const LAST_SLOT: Slot = FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS * 4;
|
||||
|
||||
let mut last_hash = blockhash;
|
||||
for i in 1..=LAST_SLOT {
|
||||
last_hash =
|
||||
fill_blockstore_slot_with_ticks(&blockstore, ticks_per_slot, i, i - 1, last_hash);
|
||||
}
|
||||
|
||||
let roots_to_set = (0..=LAST_SLOT)
|
||||
.step_by(ROOT_INTERVAL_SLOTS as usize)
|
||||
.collect_vec();
|
||||
blockstore.set_roots(roots_to_set.iter()).unwrap();
|
||||
|
||||
// Set up bank1
|
||||
let mut bank_forks = BankForks::new(Bank::new_for_tests(&genesis_config));
|
||||
let bank0 = bank_forks.get(0).unwrap().clone();
|
||||
let opts = ProcessOptions {
|
||||
poh_verify: true,
|
||||
accounts_db_test_hash_calculation: true,
|
||||
..ProcessOptions::default()
|
||||
};
|
||||
let recyclers = VerifyRecyclers::default();
|
||||
process_bank_0(&bank0, &blockstore, &opts, &recyclers, None);
|
||||
|
||||
let slot_start_processing = 1;
|
||||
let bank = bank_forks.insert(Bank::new_from_parent(
|
||||
&bank0,
|
||||
&Pubkey::default(),
|
||||
slot_start_processing,
|
||||
));
|
||||
confirm_full_slot(
|
||||
&blockstore,
|
||||
&bank,
|
||||
&opts,
|
||||
&recyclers,
|
||||
&mut ConfirmationProgress::new(bank0.last_blockhash()),
|
||||
None,
|
||||
None,
|
||||
&mut ExecuteTimings::default(),
|
||||
)
|
||||
.unwrap();
|
||||
bank_forks.set_root(
|
||||
1,
|
||||
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
|
||||
None,
|
||||
);
|
||||
|
||||
let bank_snapshots_tempdir = tempfile::TempDir::new().unwrap();
|
||||
let snapshot_config = SnapshotConfig {
|
||||
full_snapshot_archive_interval_slots: FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
|
||||
bank_snapshots_dir: bank_snapshots_tempdir.path().to_path_buf(),
|
||||
..SnapshotConfig::default()
|
||||
};
|
||||
|
||||
let pending_accounts_package = PendingAccountsPackage::default();
|
||||
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);
|
||||
|
||||
let (_pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded();
|
||||
process_blockstore_from_root(
|
||||
&blockstore,
|
||||
&RwLock::new(bank_forks),
|
||||
&leader_schedule_cache,
|
||||
&opts,
|
||||
None,
|
||||
None,
|
||||
Some(&snapshot_config),
|
||||
Arc::clone(&pending_accounts_package),
|
||||
pruned_banks_receiver,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Ensure the last AccountsPackage was created and is pending
|
||||
let received_accounts_package_slot = pending_accounts_package
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.unwrap()
|
||||
.slot;
|
||||
let expected_accounts_package_slot = (slot_start_processing..=LAST_SLOT)
|
||||
.filter(|slot| slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0)
|
||||
.max()
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
received_accounts_package_slot,
|
||||
expected_accounts_package_slot
|
||||
);
|
||||
|
||||
// Ensure all the bank snapshots were created
|
||||
let bank_snapshots = snapshot_utils::get_bank_snapshots(&bank_snapshots_tempdir);
|
||||
let mut bank_snapshot_slots = bank_snapshots
|
||||
.into_iter()
|
||||
.map(|bank_snapshot| bank_snapshot.slot)
|
||||
.collect::<Vec<_>>();
|
||||
bank_snapshot_slots.sort_unstable();
|
||||
let expected_slots = (slot_start_processing..=LAST_SLOT)
|
||||
.filter(|slot| slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0)
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(bank_snapshot_slots, expected_slots);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_process_entries_stress() {
|
||||
|
|
|
@ -3261,34 +3261,6 @@ impl Bank {
|
|||
}
|
||||
}
|
||||
|
||||
// Should not be called outside of startup, will race with
|
||||
// concurrent cleaning logic in AccountsBackgroundService
|
||||
pub fn exhaustively_free_unused_resource(&self, last_full_snapshot_slot: Option<Slot>) {
|
||||
const IS_STARTUP: bool = true; // this is only called at startup, and we want to use more threads
|
||||
let mut flush = Measure::start("flush");
|
||||
// Flush all the rooted accounts. Must be called after `squash()`,
|
||||
// so that AccountsDb knows what the roots are.
|
||||
self.force_flush_accounts_cache();
|
||||
flush.stop();
|
||||
|
||||
let mut clean = Measure::start("clean");
|
||||
// Don't clean the slot we're snapshotting because it may have zero-lamport
|
||||
// accounts that were included in the bank delta hash when the bank was frozen,
|
||||
// and if we clean them here, any newly created snapshot's hash for this bank
|
||||
// may not match the frozen hash.
|
||||
self.clean_accounts(true, IS_STARTUP, last_full_snapshot_slot);
|
||||
clean.stop();
|
||||
|
||||
let mut shrink = Measure::start("shrink");
|
||||
self.shrink_all_slots(IS_STARTUP, last_full_snapshot_slot);
|
||||
shrink.stop();
|
||||
|
||||
info!(
|
||||
"exhaustively_free_unused_resource() {} {} {}",
|
||||
flush, clean, shrink,
|
||||
);
|
||||
}
|
||||
|
||||
pub fn epoch_schedule(&self) -> &EpochSchedule {
|
||||
&self.epoch_schedule
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue