Handle JsonRpcService startup failure (#27075)

This commit is contained in:
Jeff Biseda 2022-08-11 23:25:20 -07:00 committed by GitHub
parent 6c58acf73e
commit e50013acdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 222 additions and 201 deletions

View File

@ -397,9 +397,9 @@ impl ReplayStage {
drop_bank_sender: Sender<Vec<Arc<Bank>>>, drop_bank_sender: Sender<Vec<Arc<Bank>>>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>, block_metadata_notifier: Option<BlockMetadataNotifierLock>,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
) -> Self { ) -> Result<Self, String> {
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower(); let tower = process_blockstore.process_to_create_tower()?;
info!("Tower state: {:?}", tower); info!("Tower state: {:?}", tower);
tower tower
} else { } else {
@ -940,10 +940,10 @@ impl ReplayStage {
}) })
.unwrap(); .unwrap();
Self { Ok(Self {
t_replay, t_replay,
commitment_service, commitment_service,
} })
} }
fn check_for_vote_only_mode( fn check_for_vote_only_mode(

View File

@ -129,7 +129,7 @@ impl Tvu {
accounts_background_request_sender: AbsRequestSender, accounts_background_request_sender: AbsRequestSender,
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
connection_cache: &Arc<ConnectionCache>, connection_cache: &Arc<ConnectionCache>,
) -> Self { ) -> Result<Self, String> {
let TvuSockets { let TvuSockets {
repair: repair_socket, repair: repair_socket,
fetch: fetch_sockets, fetch: fetch_sockets,
@ -288,7 +288,7 @@ impl Tvu {
drop_bank_sender, drop_bank_sender,
block_metadata_notifier, block_metadata_notifier,
log_messages_bytes_limit, log_messages_bytes_limit,
); )?;
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
LedgerCleanupService::new( LedgerCleanupService::new(
@ -301,7 +301,7 @@ impl Tvu {
) )
}); });
Tvu { Ok(Tvu {
fetch_stage, fetch_stage,
shred_sigverify, shred_sigverify,
retransmit_stage, retransmit_stage,
@ -313,7 +313,7 @@ impl Tvu {
voting_service, voting_service,
warm_quic_cache_service, warm_quic_cache_service,
drop_bank_service, drop_bank_service,
} })
} }
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {
@ -450,7 +450,8 @@ pub mod tests {
AbsRequestSender::default(), AbsRequestSender::default(),
None, None,
&Arc::new(ConnectionCache::default()), &Arc::new(ConnectionCache::default()),
); )
.expect("assume success");
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
tvu.join().unwrap(); tvu.join().unwrap();
poh_service.join().unwrap(); poh_service.join().unwrap();

View File

@ -365,20 +365,6 @@ pub struct Validator {
accounts_hash_verifier: AccountsHashVerifier, accounts_hash_verifier: AccountsHashVerifier,
} }
// in the distant future, get rid of ::new()/exit() and use Result properly...
pub fn abort() -> ! {
#[cfg(not(test))]
{
// standard error is usually redirected to a log file, cry for help on standard output as
// well
println!("Validator process aborted. The validator log may contain further details");
std::process::exit(1);
}
#[cfg(test)]
panic!("process::exit(1) is intercepted for friendly test failure...");
}
impl Validator { impl Validator {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
@ -394,7 +380,7 @@ impl Validator {
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
use_quic: bool, use_quic: bool,
tpu_connection_pool_size: usize, tpu_connection_pool_size: usize,
) -> Self { ) -> Result<Self, String> {
let id = identity_keypair.pubkey(); let id = identity_keypair.pubkey();
assert_eq!(id, node.info.id); assert_eq!(id, node.info.id);
@ -402,10 +388,12 @@ impl Validator {
warn!("vote account: {}", vote_account); warn!("vote account: {}", vote_account);
if !config.no_os_network_stats_reporting { if !config.no_os_network_stats_reporting {
verify_net_stats_access().unwrap_or_else(|err| { if let Err(e) = verify_net_stats_access() {
error!("Failed to access Network stats: {}. Bypass check with --no-os-network-stats-reporting.", err); return Err(format!(
abort(); "Failed to access Network stats: {}. Bypass check with --no-os-network-stats-reporting.",
}); e,
));
}
} }
let mut bank_notification_senders = Vec::new(); let mut bank_notification_senders = Vec::new();
@ -419,8 +407,7 @@ impl Validator {
match result { match result {
Ok(geyser_plugin_service) => Some(geyser_plugin_service), Ok(geyser_plugin_service) => Some(geyser_plugin_service),
Err(err) => { Err(err) => {
error!("Failed to load the Geyser plugin: {:?}", err); return Err(format!("Failed to load the Geyser plugin: {:?}", err));
abort();
} }
} }
} else { } else {
@ -449,11 +436,10 @@ impl Validator {
info!("Done."); info!("Done.");
if !ledger_path.is_dir() { if !ledger_path.is_dir() {
error!( return Err(format!(
"ledger directory does not exist or is not accessible: {:?}", "ledger directory does not exist or is not accessible: {:?}",
ledger_path ledger_path
); ));
abort();
} }
if let Some(shred_version) = config.expected_shred_version { if let Some(shred_version) = config.expected_shred_version {
@ -550,7 +536,7 @@ impl Validator {
accounts_update_notifier, accounts_update_notifier,
transaction_notifier, transaction_notifier,
Some(poh_timing_point_sender.clone()), Some(poh_timing_point_sender.clone()),
); )?;
node.info.wallclock = timestamp(); node.info.wallclock = timestamp();
node.info.shred_version = compute_shred_version( node.info.shred_version = compute_shred_version(
@ -570,11 +556,10 @@ impl Validator {
if let Some(expected_shred_version) = config.expected_shred_version { if let Some(expected_shred_version) = config.expected_shred_version {
if expected_shred_version != node.info.shred_version { if expected_shred_version != node.info.shred_version {
error!( return Err(format!(
"shred version mismatch: expected {} found: {}", "shred version mismatch: expected {} found: {}",
expected_shred_version, node.info.shred_version, expected_shred_version, node.info.shred_version,
); ));
abort();
} }
} }
@ -698,7 +683,7 @@ impl Validator {
ledger_path, ledger_path,
&bank_forks, &bank_forks,
&leader_schedule_cache, &leader_schedule_cache,
); )?;
*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
@ -807,29 +792,32 @@ impl Validator {
} else { } else {
None None
}; };
let json_rpc_service = JsonRpcService::new(
rpc_addr,
config.rpc_config.clone(),
config.snapshot_config.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
Some(poh_recorder.clone()),
genesis_config.hash(),
ledger_path,
config.validator_exit.clone(),
config.known_validators.clone(),
rpc_override_health_check.clone(),
startup_verification_complete,
optimistically_confirmed_bank.clone(),
config.send_transaction_service_config.clone(),
max_slots.clone(),
leader_schedule_cache.clone(),
connection_cache.clone(),
max_complete_transaction_status_slot,
)?;
( (
Some(JsonRpcService::new( Some(json_rpc_service),
rpc_addr,
config.rpc_config.clone(),
config.snapshot_config.clone(),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
Some(poh_recorder.clone()),
genesis_config.hash(),
ledger_path,
config.validator_exit.clone(),
config.known_validators.clone(),
rpc_override_health_check.clone(),
startup_verification_complete,
optimistically_confirmed_bank.clone(),
config.send_transaction_service_config.clone(),
max_slots.clone(),
leader_schedule_cache.clone(),
connection_cache.clone(),
max_complete_transaction_status_slot,
)),
if !config.rpc_config.full_api { if !config.rpc_config.full_api {
None None
} else { } else {
@ -904,7 +892,7 @@ impl Validator {
exit.clone(), exit.clone(),
); );
let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority( let waited_for_supermajority = match wait_for_supermajority(
config, config,
Some(&mut process_blockstore), Some(&mut process_blockstore),
&bank_forks, &bank_forks,
@ -912,9 +900,8 @@ impl Validator {
rpc_override_health_check, rpc_override_health_check,
&start_progress, &start_progress,
) { ) {
waited Ok(waited) => waited,
} else { Err(e) => return Err(format!("wait_for_supermajority failed: {:?}", e)),
abort();
}; };
let ledger_metric_report_service = let ledger_metric_report_service =
@ -1004,7 +991,7 @@ impl Validator {
accounts_background_request_sender, accounts_background_request_sender,
config.runtime_config.log_messages_bytes_limit, config.runtime_config.log_messages_bytes_limit,
&connection_cache, &connection_cache,
); )?;
let tpu = Tpu::new( let tpu = Tpu::new(
&cluster_info, &cluster_info,
@ -1050,7 +1037,7 @@ impl Validator {
); );
*start_progress.write().unwrap() = ValidatorStartProgress::Running; *start_progress.write().unwrap() = ValidatorStartProgress::Running;
Self { Ok(Self {
stats_reporter_service, stats_reporter_service,
gossip_service, gossip_service,
serve_repair_service, serve_repair_service,
@ -1079,7 +1066,7 @@ impl Validator {
ledger_metric_report_service, ledger_metric_report_service,
accounts_background_service, accounts_background_service,
accounts_hash_verifier, accounts_hash_verifier,
} })
} }
// Used for notifying many nodes in parallel to exit // Used for notifying many nodes in parallel to exit
@ -1225,7 +1212,10 @@ fn active_vote_account_exists_in_bank(bank: &Arc<Bank>, vote_account: &Pubkey) -
false false
} }
fn check_poh_speed(genesis_config: &GenesisConfig, maybe_hash_samples: Option<u64>) { fn check_poh_speed(
genesis_config: &GenesisConfig,
maybe_hash_samples: Option<u64>,
) -> Result<(), String> {
if let Some(hashes_per_tick) = genesis_config.hashes_per_tick() { if let Some(hashes_per_tick) = genesis_config.hashes_per_tick() {
let ticks_per_slot = genesis_config.ticks_per_slot(); let ticks_per_slot = genesis_config.ticks_per_slot();
let hashes_per_slot = hashes_per_tick * ticks_per_slot; let hashes_per_slot = hashes_per_tick * ticks_per_slot;
@ -1245,13 +1235,14 @@ fn check_poh_speed(genesis_config: &GenesisConfig, maybe_hash_samples: Option<u6
let extra_ns = target_ns_per_slot - my_ns_per_slot; let extra_ns = target_ns_per_slot - my_ns_per_slot;
info!("PoH speed check: Will sleep {}ns per slot.", extra_ns); info!("PoH speed check: Will sleep {}ns per slot.", extra_ns);
} else { } else {
error!( return Err(format!(
"PoH is slower than cluster target tick rate! mine: {} cluster: {}. If you wish to continue, try --no-poh-speed-test", "PoH is slower than cluster target tick rate! mine: {} cluster: {}. \
If you wish to continue, try --no-poh-speed-test",
my_ns_per_slot, target_ns_per_slot, my_ns_per_slot, target_ns_per_slot,
); ));
abort();
} }
} }
Ok(())
} }
fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> { fn maybe_cluster_restart_with_hard_fork(config: &ValidatorConfig, root_slot: Slot) -> Option<Slot> {
@ -1271,66 +1262,64 @@ fn post_process_restored_tower(
vote_account: &Pubkey, vote_account: &Pubkey,
config: &ValidatorConfig, config: &ValidatorConfig,
bank_forks: &BankForks, bank_forks: &BankForks,
) -> Tower { ) -> Result<Tower, String> {
let mut should_require_tower = config.require_tower; let mut should_require_tower = config.require_tower;
restored_tower let restored_tower = restored_tower.and_then(|tower| {
.and_then(|tower| { let root_bank = bank_forks.root_bank();
let root_bank = bank_forks.root_bank(); let slot_history = root_bank.get_slot_history();
let slot_history = root_bank.get_slot_history(); // make sure tower isn't corrupted first before the following hard fork check
// make sure tower isn't corrupted first before the following hard fork check let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
let tower = tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history);
if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(config, root_bank.slot()) { if let Some(hard_fork_restart_slot) =
// intentionally fail to restore tower; we're supposedly in a new hard fork; past maybe_cluster_restart_with_hard_fork(config, root_bank.slot())
// out-of-chain vote state doesn't make sense at all {
// what if --wait-for-supermajority again if the validator restarted? // intentionally fail to restore tower; we're supposedly in a new hard fork; past
let message = format!("Hard fork is detected; discarding tower restoration result: {:?}", tower); // out-of-chain vote state doesn't make sense at all
datapoint_error!( // what if --wait-for-supermajority again if the validator restarted?
"tower_error", let message = format!(
( "Hard fork is detected; discarding tower restoration result: {:?}",
"error", tower
message, );
String datapoint_error!("tower_error", ("error", message, String),);
), error!("{}", message);
);
error!("{}", message);
// unconditionally relax tower requirement so that we can always restore tower // unconditionally relax tower requirement so that we can always restore tower
// from root bank. // from root bank.
should_require_tower = false; should_require_tower = false;
return Err(crate::consensus::TowerError::HardFork(hard_fork_restart_slot)); return Err(crate::consensus::TowerError::HardFork(
} hard_fork_restart_slot,
));
}
if let Some(warp_slot) = config.warp_slot { if let Some(warp_slot) = config.warp_slot {
// unconditionally relax tower requirement so that we can always restore tower // unconditionally relax tower requirement so that we can always restore tower
// from root bank after the warp // from root bank after the warp
should_require_tower = false; should_require_tower = false;
return Err(crate::consensus::TowerError::HardFork(warp_slot)); return Err(crate::consensus::TowerError::HardFork(warp_slot));
} }
tower tower
}) });
.unwrap_or_else(|err| {
let restored_tower = match restored_tower {
Ok(tower) => tower,
Err(err) => {
let voting_has_been_active = let voting_has_been_active =
active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account); active_vote_account_exists_in_bank(&bank_forks.working_bank(), vote_account);
if !err.is_file_missing() { if !err.is_file_missing() {
datapoint_error!( datapoint_error!(
"tower_error", "tower_error",
( ("error", format!("Unable to restore tower: {}", err), String),
"error",
format!("Unable to restore tower: {}", err),
String
),
); );
} }
if should_require_tower && voting_has_been_active { if should_require_tower && voting_has_been_active {
error!("Requested mandatory tower restore failed: {}", err); return Err(format!(
error!( "Requested mandatory tower restore failed: {}. \
"And there is an existing vote_account containing actual votes. \ And there is an existing vote_account containing actual votes. \
Aborting due to possible conflicting duplicate votes", Aborting due to possible conflicting duplicate votes",
); err
abort(); ));
} }
if err.is_file_missing() && !voting_has_been_active { if err.is_file_missing() && !voting_has_been_active {
// Currently, don't protect against spoofed snapshots with no tower at all // Currently, don't protect against spoofed snapshots with no tower at all
@ -1345,12 +1334,11 @@ fn post_process_restored_tower(
); );
} }
Tower::new_from_bankforks( Tower::new_from_bankforks(bank_forks, validator_identity, vote_account)
bank_forks, }
validator_identity, };
vote_account,
) Ok(restored_tower)
})
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@ -1362,20 +1350,23 @@ fn load_blockstore(
accounts_update_notifier: Option<AccountsUpdateNotifier>, accounts_update_notifier: Option<AccountsUpdateNotifier>,
transaction_notifier: Option<TransactionNotifierLock>, transaction_notifier: Option<TransactionNotifierLock>,
poh_timing_point_sender: Option<PohTimingSender>, poh_timing_point_sender: Option<PohTimingSender>,
) -> ( ) -> Result<
GenesisConfig, (
Arc<RwLock<BankForks>>, GenesisConfig,
Arc<Blockstore>, Arc<RwLock<BankForks>>,
Slot, Arc<Blockstore>,
Receiver<bool>, Slot,
CompletedSlotsReceiver, Receiver<bool>,
LeaderScheduleCache, CompletedSlotsReceiver,
Option<StartingSnapshotHashes>, LeaderScheduleCache,
TransactionHistoryServices, Option<StartingSnapshotHashes>,
blockstore_processor::ProcessOptions, TransactionHistoryServices,
BlockstoreRootScan, blockstore_processor::ProcessOptions,
DroppedSlotsReceiver, BlockstoreRootScan,
) { DroppedSlotsReceiver,
),
String,
> {
info!("loading ledger from {:?}...", ledger_path); info!("loading ledger from {:?}...", ledger_path);
*start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger; *start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size); let genesis_config = open_genesis_config(ledger_path, config.max_genesis_archive_unpacked_size);
@ -1392,14 +1383,17 @@ fn load_blockstore(
if let Some(expected_genesis_hash) = config.expected_genesis_hash { if let Some(expected_genesis_hash) = config.expected_genesis_hash {
if genesis_hash != expected_genesis_hash { if genesis_hash != expected_genesis_hash {
error!("genesis hash mismatch: expected {}", expected_genesis_hash); return Err(format!(
error!("Delete the ledger directory to continue: {:?}", ledger_path); "genesis hash mismatch: hash={} expected={}. Delete the ledger directory to continue: {:?}",
abort(); genesis_hash,
expected_genesis_hash,
ledger_path,
));
} }
} }
if !config.no_poh_speed_test { if !config.no_poh_speed_test {
check_poh_speed(&genesis_config, None); check_poh_speed(&genesis_config, None)?;
} }
let BlockstoreSignals { let BlockstoreSignals {
@ -1508,7 +1502,7 @@ fn load_blockstore(
} }
} }
( Ok((
genesis_config, genesis_config,
bank_forks, bank_forks,
blockstore, blockstore,
@ -1521,7 +1515,7 @@ fn load_blockstore(
process_options, process_options,
blockstore_root_scan, blockstore_root_scan,
pruned_banks_receiver, pruned_banks_receiver,
) ))
} }
fn highest_slot(blockstore: &Blockstore) -> Option<Slot> { fn highest_slot(blockstore: &Blockstore) -> Option<Slot> {
@ -1599,7 +1593,7 @@ impl<'a> ProcessBlockStore<'a> {
} }
} }
pub(crate) fn process(&mut self) { pub(crate) fn process(&mut self) -> Result<(), String> {
if self.tower.is_none() { if self.tower.is_none() {
let previous_start_process = *self.start_progress.read().unwrap(); let previous_start_process = *self.start_progress.read().unwrap();
*self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger; *self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
@ -1633,7 +1627,7 @@ impl<'a> ProcessBlockStore<'a> {
} }
}); });
} }
blockstore_processor::process_blockstore_from_root( if let Err(e) = blockstore_processor::process_blockstore_from_root(
self.blockstore, self.blockstore,
self.bank_forks, self.bank_forks,
self.leader_schedule_cache, self.leader_schedule_cache,
@ -1641,11 +1635,9 @@ impl<'a> ProcessBlockStore<'a> {
self.transaction_status_sender, self.transaction_status_sender,
self.cache_block_meta_sender.as_ref(), self.cache_block_meta_sender.as_ref(),
&self.accounts_background_request_sender, &self.accounts_background_request_sender,
) ) {
.unwrap_or_else(|err| { return Err(format!("Failed to load ledger: {:?}", e));
error!("Failed to load ledger: {:?}", err); }
abort()
});
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
@ -1657,15 +1649,16 @@ impl<'a> ProcessBlockStore<'a> {
let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id); let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
if let Ok(tower) = &restored_tower { if let Ok(tower) = &restored_tower {
// reconciliation attempt 1 of 2 with tower // reconciliation attempt 1 of 2 with tower
reconcile_blockstore_roots_with_external_source( if let Err(e) = reconcile_blockstore_roots_with_external_source(
ExternalRootSource::Tower(tower.root()), ExternalRootSource::Tower(tower.root()),
self.blockstore, self.blockstore,
&mut self.original_blockstore_root, &mut self.original_blockstore_root,
) ) {
.unwrap_or_else(|err| { return Err(format!(
error!("Failed to reconcile blockstore with tower: {:?}", err); "Failed to reconcile blockstore with tower: {:?}",
abort() e
}); ));
}
} }
post_process_restored_tower( post_process_restored_tower(
@ -1674,7 +1667,7 @@ impl<'a> ProcessBlockStore<'a> {
self.vote_account, self.vote_account,
self.config, self.config,
&self.bank_forks.read().unwrap(), &self.bank_forks.read().unwrap(),
) )?
}); });
if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork( if let Some(hard_fork_restart_slot) = maybe_cluster_restart_with_hard_fork(
@ -1683,24 +1676,26 @@ impl<'a> ProcessBlockStore<'a> {
) { ) {
// reconciliation attempt 2 of 2 with hard fork // reconciliation attempt 2 of 2 with hard fork
// this should be #2 because hard fork root > tower root in almost all cases // this should be #2 because hard fork root > tower root in almost all cases
reconcile_blockstore_roots_with_external_source( if let Err(e) = reconcile_blockstore_roots_with_external_source(
ExternalRootSource::HardFork(hard_fork_restart_slot), ExternalRootSource::HardFork(hard_fork_restart_slot),
self.blockstore, self.blockstore,
&mut self.original_blockstore_root, &mut self.original_blockstore_root,
) ) {
.unwrap_or_else(|err| { return Err(format!(
error!("Failed to reconcile blockstore with hard fork: {:?}", err); "Failed to reconcile blockstore with hard fork: {:?}",
abort() e
}); ));
}
} }
*self.start_progress.write().unwrap() = previous_start_process; *self.start_progress.write().unwrap() = previous_start_process;
} }
Ok(())
} }
pub(crate) fn process_to_create_tower(mut self) -> Tower { pub(crate) fn process_to_create_tower(mut self) -> Result<Tower, String> {
self.process(); self.process()?;
self.tower.unwrap() Ok(self.tower.unwrap())
} }
} }
@ -1710,26 +1705,25 @@ fn maybe_warp_slot(
ledger_path: &Path, ledger_path: &Path,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache, leader_schedule_cache: &LeaderScheduleCache,
) { ) -> Result<(), String> {
if let Some(warp_slot) = config.warp_slot { if let Some(warp_slot) = config.warp_slot {
let snapshot_config = config.snapshot_config.as_ref().unwrap_or_else(|| { let snapshot_config = match config.snapshot_config.as_ref() {
error!("warp slot requires a snapshot config"); Some(config) => config,
abort(); None => return Err("warp slot requires a snapshot config".to_owned()),
}); };
process_blockstore.process(); process_blockstore.process()?;
let mut bank_forks = bank_forks.write().unwrap(); let mut bank_forks = bank_forks.write().unwrap();
let working_bank = bank_forks.working_bank(); let working_bank = bank_forks.working_bank();
if warp_slot <= working_bank.slot() { if warp_slot <= working_bank.slot() {
error!( return Err(format!(
"warp slot ({}) cannot be less than the working bank slot ({})", "warp slot ({}) cannot be less than the working bank slot ({})",
warp_slot, warp_slot,
working_bank.slot() working_bank.slot()
); ));
abort();
} }
info!("warping to slot {}", warp_slot); info!("warping to slot {}", warp_slot);
@ -1746,7 +1740,7 @@ fn maybe_warp_slot(
); );
leader_schedule_cache.set_root(&bank_forks.root_bank()); leader_schedule_cache.set_root(&bank_forks.root_bank());
let full_snapshot_archive_info = snapshot_utils::bank_to_full_snapshot_archive( let full_snapshot_archive_info = match snapshot_utils::bank_to_full_snapshot_archive(
ledger_path, ledger_path,
&bank_forks.root_bank(), &bank_forks.root_bank(),
None, None,
@ -1755,16 +1749,16 @@ fn maybe_warp_slot(
snapshot_config.archive_format, snapshot_config.archive_format,
snapshot_config.maximum_full_snapshot_archives_to_retain, snapshot_config.maximum_full_snapshot_archives_to_retain,
snapshot_config.maximum_incremental_snapshot_archives_to_retain, snapshot_config.maximum_incremental_snapshot_archives_to_retain,
) ) {
.unwrap_or_else(|err| { Ok(archive_info) => archive_info,
error!("Unable to create snapshot: {}", err); Err(e) => return Err(format!("Unable to create snapshot: {}", e)),
abort(); };
});
info!( info!(
"created snapshot: {}", "created snapshot: {}",
full_snapshot_archive_info.path().display() full_snapshot_archive_info.path().display()
); );
} }
Ok(())
} }
fn blockstore_contains_bad_shred_version( fn blockstore_contains_bad_shred_version(
@ -1884,6 +1878,7 @@ fn initialize_rpc_transaction_history_services(
enum ValidatorError { enum ValidatorError {
BadExpectedBankHash, BadExpectedBankHash,
NotEnoughLedgerData, NotEnoughLedgerData,
Error(String),
} }
// Return if the validator waited on other nodes to start. In this case // Return if the validator waited on other nodes to start. In this case
@ -1904,7 +1899,9 @@ fn wait_for_supermajority(
None => Ok(false), None => Ok(false),
Some(wait_for_supermajority_slot) => { Some(wait_for_supermajority_slot) => {
if let Some(process_blockstore) = process_blockstore { if let Some(process_blockstore) = process_blockstore {
process_blockstore.process(); process_blockstore
.process()
.map_err(ValidatorError::Error)?;
} }
let bank = bank_forks.read().unwrap().working_bank(); let bank = bank_forks.read().unwrap().working_bank();
@ -2140,7 +2137,8 @@ mod tests {
SocketAddrSpace::Unspecified, SocketAddrSpace::Unspecified,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_CONNECTION_POOL_SIZE,
); )
.expect("assume successful validator start");
assert_eq!( assert_eq!(
*start_progress.read().unwrap(), *start_progress.read().unwrap(),
ValidatorStartProgress::Running ValidatorStartProgress::Running
@ -2224,6 +2222,7 @@ mod tests {
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_CONNECTION_POOL_SIZE,
) )
.expect("assume successful validator start")
}) })
.collect(); .collect();
@ -2363,7 +2362,6 @@ mod tests {
} }
#[test] #[test]
#[should_panic]
fn test_poh_speed() { fn test_poh_speed() {
solana_logger::setup(); solana_logger::setup();
let poh_config = PohConfig { let poh_config = PohConfig {
@ -2376,7 +2374,7 @@ mod tests {
poh_config, poh_config,
..GenesisConfig::default() ..GenesisConfig::default()
}; };
check_poh_speed(&genesis_config, Some(10_000)); assert!(check_poh_speed(&genesis_config, Some(10_000)).is_err());
} }
#[test] #[test]
@ -2390,6 +2388,6 @@ mod tests {
poh_config, poh_config,
..GenesisConfig::default() ..GenesisConfig::default()
}; };
check_poh_speed(&genesis_config, Some(10_000)); check_poh_speed(&genesis_config, Some(10_000)).unwrap();
} }
} }

View File

@ -280,7 +280,8 @@ impl LocalCluster {
socket_addr_space, socket_addr_space,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_CONNECTION_POOL_SIZE,
); )
.expect("assume successful validator start");
let mut validators = HashMap::new(); let mut validators = HashMap::new();
let leader_info = ValidatorInfo { let leader_info = ValidatorInfo {
@ -478,7 +479,8 @@ impl LocalCluster {
socket_addr_space, socket_addr_space,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_CONNECTION_POOL_SIZE,
); )
.expect("assume successful validator start");
let validator_pubkey = validator_keypair.pubkey(); let validator_pubkey = validator_keypair.pubkey();
let validator_info = ClusterValidatorInfo::new( let validator_info = ClusterValidatorInfo::new(
@ -839,7 +841,8 @@ impl Cluster for LocalCluster {
socket_addr_space, socket_addr_space,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_CONNECTION_POOL_SIZE,
); )
.expect("assume successful validator start");
cluster_validator_info.validator = Some(restarted_node); cluster_validator_info.validator = Some(restarted_node);
cluster_validator_info cluster_validator_info
} }

View File

@ -355,7 +355,7 @@ impl JsonRpcService {
leader_schedule_cache: Arc<LeaderScheduleCache>, leader_schedule_cache: Arc<LeaderScheduleCache>,
connection_cache: Arc<ConnectionCache>, connection_cache: Arc<ConnectionCache>,
current_transaction_status_slot: Arc<AtomicU64>, current_transaction_status_slot: Arc<AtomicU64>,
) -> Self { ) -> Result<Self, String> {
info!("rpc bound to {:?}", rpc_addr); info!("rpc bound to {:?}", rpc_addr);
info!("rpc configuration: {:?}", config); info!("rpc configuration: {:?}", config);
let rpc_threads = 1.max(config.rpc_threads); let rpc_threads = 1.max(config.rpc_threads);
@ -528,28 +528,29 @@ impl JsonRpcService {
e, e,
rpc_addr.port() rpc_addr.port()
); );
close_handle_sender.send(Err(e.to_string())).unwrap();
return; return;
} }
let server = server.unwrap(); let server = server.unwrap();
close_handle_sender.send(server.close_handle()).unwrap(); close_handle_sender.send(Ok(server.close_handle())).unwrap();
server.wait(); server.wait();
exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed); exit_bigtable_ledger_upload_service.store(true, Ordering::Relaxed);
}) })
.unwrap(); .unwrap();
let close_handle = close_handle_receiver.recv().unwrap(); let close_handle = close_handle_receiver.recv().unwrap()?;
let close_handle_ = close_handle.clone(); let close_handle_ = close_handle.clone();
validator_exit validator_exit
.write() .write()
.unwrap() .unwrap()
.register_exit(Box::new(move || close_handle_.close())); .register_exit(Box::new(move || close_handle_.close()));
Self { Ok(Self {
thread_hdl, thread_hdl,
#[cfg(test)] #[cfg(test)]
request_processor: test_request_processor, request_processor: test_request_processor,
close_handle: Some(close_handle), close_handle: Some(close_handle),
} })
} }
pub fn exit(&mut self) { pub fn exit(&mut self) {
@ -644,7 +645,8 @@ mod tests {
Arc::new(LeaderScheduleCache::default()), Arc::new(LeaderScheduleCache::default()),
connection_cache, connection_cache,
Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()),
); )
.expect("assume successful JsonRpcService start");
let thread = rpc_service.thread_hdl.thread(); let thread = rpc_service.thread_hdl.thread();
assert_eq!(thread.name().unwrap(), "solana-jsonrpc"); assert_eq!(thread.name().unwrap(), "solana-jsonrpc");

View File

@ -295,7 +295,7 @@ impl TestValidatorGenesis {
warn!("Could not find {}, skipping.", address); warn!("Could not find {}, skipping.", address);
} else { } else {
error!("Failed to fetch {}: {}", address, res.unwrap_err()); error!("Failed to fetch {}: {}", address, res.unwrap_err());
solana_core::validator::abort(); Self::abort();
} }
} }
self self
@ -306,7 +306,7 @@ impl TestValidatorGenesis {
let account_path = let account_path =
solana_program_test::find_file(account.filename).unwrap_or_else(|| { solana_program_test::find_file(account.filename).unwrap_or_else(|| {
error!("Unable to locate {}", account.filename); error!("Unable to locate {}", account.filename);
solana_core::validator::abort(); Self::abort();
}); });
let mut file = File::open(&account_path).unwrap(); let mut file = File::open(&account_path).unwrap();
let mut account_info_raw = String::new(); let mut account_info_raw = String::new();
@ -320,7 +320,7 @@ impl TestValidatorGenesis {
account_path.to_str().unwrap(), account_path.to_str().unwrap(),
err err
); );
solana_core::validator::abort(); Self::abort();
} }
Ok(deserialized) => deserialized, Ok(deserialized) => deserialized,
}; };
@ -349,7 +349,7 @@ impl TestValidatorGenesis {
let matched_files = fs::read_dir(&dir) let matched_files = fs::read_dir(&dir)
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
error!("Cannot read directory {}: {}", dir, err); error!("Cannot read directory {}: {}", dir, err);
solana_core::validator::abort(); Self::abort();
}) })
.flatten() .flatten()
.map(|entry| entry.path()) .map(|entry| entry.path())
@ -510,6 +510,19 @@ impl TestValidatorGenesis {
Err(err) => panic!("Test validator failed to start: {}", err), Err(err) => panic!("Test validator failed to start: {}", err),
} }
} }
fn abort() -> ! {
#[cfg(not(test))]
{
// standard error is usually redirected to a log file, cry for help on standard output as
// well
println!("Validator process aborted. The validator log may contain further details");
std::process::exit(1);
}
#[cfg(test)]
panic!("process::exit(1) is intercepted for friendly test failure...");
}
} }
pub struct TestValidator { pub struct TestValidator {
@ -810,7 +823,7 @@ impl TestValidator {
socket_addr_space, socket_addr_space,
DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_CONNECTION_POOL_SIZE,
)); )?);
// Needed to avoid panics in `solana-responder-gossip` in tests that create a number of // Needed to avoid panics in `solana-responder-gossip` in tests that create a number of
// test validators concurrently... // test validators concurrently...

View File

@ -3176,7 +3176,11 @@ pub fn main() {
socket_addr_space, socket_addr_space,
tpu_use_quic, tpu_use_quic,
tpu_connection_pool_size, tpu_connection_pool_size,
); )
.unwrap_or_else(|e| {
error!("Failed to start validator: {:?}", e);
exit(1);
});
*admin_service_post_init.write().unwrap() = *admin_service_post_init.write().unwrap() =
Some(admin_rpc_service::AdminRpcRequestMetadataPostInit { Some(admin_rpc_service::AdminRpcRequestMetadataPostInit {
bank_forks: validator.bank_forks.clone(), bank_forks: validator.bank_forks.clone(),