Refactor new_banks_from_ledger() into load and process steps

This commit is contained in:
Michael Vines 2022-03-15 20:53:46 -07:00
parent 390c5667f7
commit ab373bb1a9
1 changed files with 93 additions and 54 deletions

View File

@ -35,7 +35,9 @@ use {
},
solana_ledger::{
bank_forks_utils,
blockstore::{Blockstore, BlockstoreSignals, CompletedSlotsReceiver, PurgeType},
blockstore::{
Blockstore, BlockstoreError, BlockstoreSignals, CompletedSlotsReceiver, PurgeType,
},
blockstore_db::{BlockstoreAdvancedOptions, BlockstoreOptions, BlockstoreRecoveryMode},
blockstore_processor::{self, TransactionStatusSender},
leader_schedule::FixedSchedule,
@ -268,6 +270,39 @@ impl Default for ValidatorStartProgress {
}
}
struct BlockstoreRootScan {
thread: Option<JoinHandle<Result<(), BlockstoreError>>>,
}
impl BlockstoreRootScan {
fn new(config: &ValidatorConfig, blockstore: &Arc<Blockstore>, exit: &Arc<AtomicBool>) -> Self {
let thread = if config.rpc_addrs.is_some()
&& config.rpc_config.enable_rpc_transaction_history
&& config.rpc_config.rpc_scan_and_fix_roots
{
let blockstore = blockstore.clone();
let exit = exit.clone();
Some(
Builder::new()
.name("blockstore-root-scan".to_string())
.spawn(move || blockstore.scan_and_fix_roots(&exit))
.unwrap(),
)
} else {
None
};
Self { thread }
}
fn join(self) {
if let Some(blockstore_root_scan) = self.thread {
if let Err(err) = blockstore_root_scan.join() {
warn!("blockstore_root_scan failed to join {:?}", err);
}
}
}
}
#[derive(Default)]
struct TransactionHistoryServices {
transaction_status_sender: Option<TransactionStatusSender>,
@ -456,7 +491,6 @@ impl Validator {
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
last_full_snapshot_slot,
starting_snapshot_hashes,
TransactionHistoryServices {
transaction_status_sender,
@ -467,16 +501,31 @@ impl Validator {
cache_block_meta_sender,
cache_block_meta_service,
},
) = new_banks_from_ledger(
blockstore_process_options,
blockstore_root_scan,
) = load_blockstore(
config,
ledger_path,
&exit,
&start_progress,
accounts_package_channel.0.clone(),
accounts_update_notifier,
transaction_notifier,
);
let last_full_snapshot_slot = process_blockstore(
&blockstore,
&mut bank_forks,
&leader_schedule_cache,
&blockstore_process_options,
transaction_status_sender.as_ref(),
cache_block_meta_sender.as_ref(),
config.snapshot_config.as_ref(),
accounts_package_channel.0.clone(),
blockstore_root_scan,
);
let last_full_snapshot_slot =
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));
maybe_warp_slot(config, ledger_path, &mut bank_forks, &leader_schedule_cache);
let tower = {
@ -1197,12 +1246,11 @@ fn post_process_restored_tower(
}
#[allow(clippy::type_complexity)]
fn new_banks_from_ledger(
fn load_blockstore(
config: &ValidatorConfig,
ledger_path: &Path,
exit: &Arc<AtomicBool>,
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
accounts_package_sender: AccountsPackageSender,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
transaction_notifier: Option<TransactionNotifierLock>,
) -> (
@ -1212,9 +1260,10 @@ fn new_banks_from_ledger(
Receiver<bool>,
CompletedSlotsReceiver,
LeaderScheduleCache,
Option<Slot>,
Option<StartingSnapshotHashes>,
TransactionHistoryServices,
blockstore_processor::ProcessOptions,
BlockstoreRootScan,
) {
info!("loading ledger from {:?}...", ledger_path);
*start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
@ -1259,21 +1308,7 @@ fn new_banks_from_ledger(
blockstore.set_no_compaction(config.no_rocksdb_compaction);
let blockstore = Arc::new(blockstore);
let blockstore_root_scan = if config.rpc_addrs.is_some()
&& config.rpc_config.enable_rpc_transaction_history
&& config.rpc_config.rpc_scan_and_fix_roots
{
let blockstore = blockstore.clone();
let exit = exit.clone();
Some(
Builder::new()
.name("blockstore-root-scan".to_string())
.spawn(move || blockstore.scan_and_fix_roots(&exit))
.unwrap(),
)
} else {
None
};
let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit);
let process_options = blockstore_processor::ProcessOptions {
bpf_jit: config.bpf_jit,
@ -1306,10 +1341,6 @@ fn new_banks_from_ledger(
TransactionHistoryServices::default()
};
let cache_block_meta_sender = transaction_history_services
.cache_block_meta_sender
.as_ref();
let (mut bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
bank_forks_utils::load_bank_forks(
&genesis_config,
@ -1318,7 +1349,9 @@ fn new_banks_from_ledger(
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
&process_options,
cache_block_meta_sender,
transaction_history_services
.cache_block_meta_sender
.as_ref(),
accounts_update_notifier,
);
@ -1326,32 +1359,6 @@ fn new_banks_from_ledger(
bank_forks.set_snapshot_config(config.snapshot_config.clone());
bank_forks.set_accounts_hash_interval_slots(config.accounts_hash_interval_slots);
let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root(
&blockstore,
&mut bank_forks,
&leader_schedule_cache,
&process_options,
transaction_history_services
.transaction_status_sender
.as_ref(),
cache_block_meta_sender,
config.snapshot_config.as_ref(),
accounts_package_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
let last_full_snapshot_slot =
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));
if let Some(blockstore_root_scan) = blockstore_root_scan {
if let Err(err) = blockstore_root_scan.join() {
warn!("blockstore_root_scan failed to join {:?}", err);
}
}
(
genesis_config,
bank_forks,
@ -1359,12 +1366,44 @@ fn new_banks_from_ledger(
ledger_signal_receiver,
completed_slots_receiver,
leader_schedule_cache,
last_full_snapshot_slot,
starting_snapshot_hashes,
transaction_history_services,
process_options,
blockstore_root_scan,
)
}
fn process_blockstore(
blockstore: &Blockstore,
bank_forks: &mut BankForks,
leader_schedule_cache: &LeaderScheduleCache,
process_options: &blockstore_processor::ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
blockstore_root_scan: BlockstoreRootScan,
) -> Option<Slot> {
let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root(
blockstore,
bank_forks,
leader_schedule_cache,
process_options,
transaction_status_sender,
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
blockstore_root_scan.join();
last_full_snapshot_slot
}
fn maybe_warp_slot(
config: &ValidatorConfig,
ledger_path: &Path,