Process blockstore after starting the TVU

This commit is contained in:
Michael Vines 2022-03-23 10:04:58 -07:00
parent 83e041299a
commit 84e3342612
4 changed files with 236 additions and 134 deletions

View File

@ -346,7 +346,7 @@ pub struct ReplayStage {
impl ReplayStage { impl ReplayStage {
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new( pub fn new<T: Into<Tower> + Sized>(
config: ReplayStageConfig, config: ReplayStageConfig,
blockstore: Arc<Blockstore>, blockstore: Arc<Blockstore>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
@ -354,7 +354,7 @@ impl ReplayStage {
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
duplicate_slots_receiver: DuplicateSlotReceiver, duplicate_slots_receiver: DuplicateSlotReceiver,
poh_recorder: Arc<Mutex<PohRecorder>>, poh_recorder: Arc<Mutex<PohRecorder>>,
mut tower: Tower, tower: T,
vote_tracker: Arc<VoteTracker>, vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>, cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender, retransmit_slots_sender: RetransmitSlotsSender,
@ -369,6 +369,9 @@ impl ReplayStage {
block_metadata_notifier: Option<BlockMetadataNotifierLock>, block_metadata_notifier: Option<BlockMetadataNotifierLock>,
transaction_cost_metrics_sender: Option<TransactionCostMetricsSender>, transaction_cost_metrics_sender: Option<TransactionCostMetricsSender>,
) -> Self { ) -> Self {
let mut tower = tower.into();
info!("Tower state: {:?}", tower);
let ReplayStageConfig { let ReplayStageConfig {
vote_account, vote_account,
authorized_voter_keypairs, authorized_voter_keypairs,

View File

@ -99,7 +99,7 @@ impl Tvu {
/// * `sockets` - fetch, repair, and retransmit sockets /// * `sockets` - fetch, repair, and retransmit sockets
/// * `blockstore` - the ledger itself /// * `blockstore` - the ledger itself
#[allow(clippy::new_ret_no_self, clippy::too_many_arguments)] #[allow(clippy::new_ret_no_self, clippy::too_many_arguments)]
pub fn new( pub fn new<T: Into<Tower> + Sized>(
vote_account: &Pubkey, vote_account: &Pubkey,
authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>, authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
@ -109,7 +109,7 @@ impl Tvu {
ledger_signal_receiver: Receiver<bool>, ledger_signal_receiver: Receiver<bool>,
rpc_subscriptions: &Arc<RpcSubscriptions>, rpc_subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>, poh_recorder: &Arc<Mutex<PohRecorder>>,
tower: Tower, tower: T,
tower_storage: Arc<dyn TowerStorage>, tower_storage: Arc<dyn TowerStorage>,
leader_schedule_cache: &Arc<LeaderScheduleCache>, leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,

View File

@ -655,43 +655,32 @@ impl Validator {
) )
}; };
process_blockstore( let leader_schedule_cache = Arc::new(leader_schedule_cache);
let mut process_blockstore = ProcessBlockStore::new(
&id,
vote_account,
&start_progress,
&blockstore, &blockstore,
&bank_forks, &bank_forks,
&leader_schedule_cache, &leader_schedule_cache,
&blockstore_process_options, &blockstore_process_options,
transaction_status_sender.as_ref(), transaction_status_sender.as_ref(),
cache_block_meta_sender.as_ref(), cache_block_meta_sender.clone(),
blockstore_root_scan, blockstore_root_scan,
&accounts_background_request_sender, accounts_background_request_sender.clone(),
&start_progress, config,
); );
maybe_warp_slot(config, ledger_path, &bank_forks, &leader_schedule_cache); maybe_warp_slot(
config,
let tower = { &mut process_blockstore,
let restored_tower = Tower::restore(config.tower_storage.as_ref(), &id); ledger_path,
if let Ok(tower) = &restored_tower { &bank_forks,
reconcile_blockstore_roots_with_tower(tower, &blockstore).unwrap_or_else(|err| { &leader_schedule_cache,
error!("Failed to reconcile blockstore with tower: {:?}", err); );
abort()
});
}
post_process_restored_tower(
restored_tower,
&id,
vote_account,
config,
&bank_forks.read().unwrap(),
)
};
info!("Tower state: {:?}", tower);
*start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices;
let leader_schedule_cache = Arc::new(leader_schedule_cache);
let sample_performance_service = let sample_performance_service =
if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history { if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history {
Some(SamplePerformanceService::new( Some(SamplePerformanceService::new(
@ -703,7 +692,7 @@ impl Validator {
None None
}; };
let bank = bank_forks.read().unwrap().working_bank(); let bank = Arc::clone(&bank_forks.read().unwrap().working_bank());
info!("Starting validator with working bank slot {}", bank.slot()); info!("Starting validator with working bank slot {}", bank.slot());
let mut block_commitment_cache = BlockCommitmentCache::default(); let mut block_commitment_cache = BlockCommitmentCache::default();
@ -888,7 +877,8 @@ impl Validator {
let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority( let waited_for_supermajority = if let Ok(waited) = wait_for_supermajority(
config, config,
&bank, Some(&mut process_blockstore),
&bank_forks,
&cluster_info, &cluster_info,
rpc_override_health_check, rpc_override_health_check,
&start_progress, &start_progress,
@ -953,7 +943,7 @@ impl Validator {
ledger_signal_receiver, ledger_signal_receiver,
&rpc_subscriptions, &rpc_subscriptions,
&poh_recorder, &poh_recorder,
tower, process_blockstore,
config.tower_storage.clone(), config.tower_storage.clone(),
&leader_schedule_cache, &leader_schedule_cache,
&exit, &exit,
@ -1380,10 +1370,11 @@ fn load_blockstore(
let blockstore = Arc::new(blockstore); let blockstore = Arc::new(blockstore);
let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit); let blockstore_root_scan = BlockstoreRootScan::new(config, &blockstore, exit);
let halt_at_slot = config.halt_at_slot.or_else(|| highest_slot(&blockstore));
let process_options = blockstore_processor::ProcessOptions { let process_options = blockstore_processor::ProcessOptions {
poh_verify: config.poh_verify, poh_verify: config.poh_verify,
halt_at_slot: config.halt_at_slot, halt_at_slot,
new_hard_forks: config.new_hard_forks.clone(), new_hard_forks: config.new_hard_forks.clone(),
debug_keys: config.debug_keys.clone(), debug_keys: config.debug_keys.clone(),
account_indexes: config.account_indexes.clone(), account_indexes: config.account_indexes.clone(),
@ -1514,54 +1505,144 @@ fn highest_slot(blockstore: &Blockstore) -> Option<Slot> {
highest_slot highest_slot
} }
#[allow(clippy::too_many_arguments)] struct ProcessBlockStore<'a> {
fn process_blockstore( id: &'a Pubkey,
blockstore: &Blockstore, vote_account: &'a Pubkey,
bank_forks: &Arc<RwLock<BankForks>>, start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
leader_schedule_cache: &LeaderScheduleCache, blockstore: &'a Blockstore,
process_options: &blockstore_processor::ProcessOptions, bank_forks: &'a Arc<RwLock<BankForks>>,
transaction_status_sender: Option<&TransactionStatusSender>, leader_schedule_cache: &'a LeaderScheduleCache,
cache_block_meta_sender: Option<&CacheBlockMetaSender>, process_options: &'a blockstore_processor::ProcessOptions,
blockstore_root_scan: BlockstoreRootScan, transaction_status_sender: Option<&'a TransactionStatusSender>,
accounts_background_request_sender: &AbsRequestSender, cache_block_meta_sender: Option<CacheBlockMetaSender>,
start_progress: &Arc<RwLock<ValidatorStartProgress>>, blockstore_root_scan: Option<BlockstoreRootScan>,
) { accounts_background_request_sender: AbsRequestSender,
let exit = Arc::new(AtomicBool::new(false)); config: &'a ValidatorConfig,
if let Some(max_slot) = highest_slot(blockstore) { tower: Option<Tower>,
let bank_forks = bank_forks.clone(); }
let exit = exit.clone();
let start_progress = start_progress.clone();
let _ = std::thread::spawn(move || { impl<'a> ProcessBlockStore<'a> {
while !exit.load(Ordering::Relaxed) { #[allow(clippy::too_many_arguments)]
let slot = bank_forks.read().unwrap().working_bank().slot(); fn new(
*start_progress.write().unwrap() = id: &'a Pubkey,
ValidatorStartProgress::ProcessingLedger { slot, max_slot }; vote_account: &'a Pubkey,
sleep(Duration::from_secs(2)); start_progress: &'a Arc<RwLock<ValidatorStartProgress>>,
} blockstore: &'a Blockstore,
}); bank_forks: &'a Arc<RwLock<BankForks>>,
leader_schedule_cache: &'a LeaderScheduleCache,
process_options: &'a blockstore_processor::ProcessOptions,
transaction_status_sender: Option<&'a TransactionStatusSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
blockstore_root_scan: BlockstoreRootScan,
accounts_background_request_sender: AbsRequestSender,
config: &'a ValidatorConfig,
) -> Self {
Self {
id,
vote_account,
start_progress,
blockstore,
bank_forks,
leader_schedule_cache,
process_options,
transaction_status_sender,
cache_block_meta_sender,
blockstore_root_scan: Some(blockstore_root_scan),
accounts_background_request_sender,
config,
tower: None,
}
} }
blockstore_processor::process_blockstore_from_root(
blockstore,
bank_forks,
leader_schedule_cache,
process_options,
transaction_status_sender,
cache_block_meta_sender,
accounts_background_request_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
exit.store(true, Ordering::Relaxed); fn process(&mut self) {
if self.tower.is_none() {
let previous_start_process = *self.start_progress.read().unwrap();
*self.start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
blockstore_root_scan.join(); /*
#[allow(clippy::too_many_arguments)]
fn process_blockstore(
blockstore: &Blockstore,
bank_forks: &Arc<RwLock<BankForks>>,
leader_schedule_cache: &LeaderScheduleCache,
process_options: &blockstore_processor::ProcessOptions,
transaction_status_sender: Option<&TransactionStatusSender>,
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
blockstore_root_scan: BlockstoreRootScan,
accounts_background_request_sender: &AbsRequestSender,
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
) {
*/
let exit = Arc::new(AtomicBool::new(false));
if let Some(max_slot) = highest_slot(self.blockstore) {
let bank_forks = self.bank_forks.clone();
let exit = exit.clone();
let start_progress = self.start_progress.clone();
let _ = std::thread::spawn(move || {
while !exit.load(Ordering::Relaxed) {
let slot = bank_forks.read().unwrap().working_bank().slot();
*start_progress.write().unwrap() =
ValidatorStartProgress::ProcessingLedger { slot, max_slot };
sleep(Duration::from_secs(2));
}
});
}
blockstore_processor::process_blockstore_from_root(
self.blockstore,
self.bank_forks,
self.leader_schedule_cache,
self.process_options,
self.transaction_status_sender,
self.cache_block_meta_sender.as_ref(),
&self.accounts_background_request_sender,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
abort()
});
exit.store(true, Ordering::Relaxed);
if let Some(blockstore_root_scan) = self.blockstore_root_scan.take() {
blockstore_root_scan.join();
}
self.tower = Some({
let restored_tower = Tower::restore(self.config.tower_storage.as_ref(), self.id);
if let Ok(tower) = &restored_tower {
reconcile_blockstore_roots_with_tower(tower, self.blockstore).unwrap_or_else(
|err| {
error!("Failed to reconcile blockstore with tower: {:?}", err);
abort()
},
);
}
post_process_restored_tower(
restored_tower,
self.id,
self.vote_account,
self.config,
&self.bank_forks.read().unwrap(),
)
});
*self.start_progress.write().unwrap() = previous_start_process;
}
}
}
impl<'a> From<ProcessBlockStore<'a>> for Tower {
fn from(mut process_blockstore: ProcessBlockStore<'a>) -> Self {
process_blockstore.process();
process_blockstore.tower.expect("valid tower")
}
} }
fn maybe_warp_slot( fn maybe_warp_slot(
config: &ValidatorConfig, config: &ValidatorConfig,
process_blockstore: &mut ProcessBlockStore,
ledger_path: &Path, ledger_path: &Path,
bank_forks: &RwLock<BankForks>, bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache, leader_schedule_cache: &LeaderScheduleCache,
@ -1572,6 +1653,8 @@ fn maybe_warp_slot(
abort(); abort();
}); });
process_blockstore.process();
let mut bank_forks = bank_forks.write().unwrap(); let mut bank_forks = bank_forks.write().unwrap();
let working_bank = bank_forks.working_bank(); let working_bank = bank_forks.working_bank();
@ -1753,67 +1836,75 @@ enum ValidatorError {
// that is unrecoverable and the validator should exit. // that is unrecoverable and the validator should exit.
fn wait_for_supermajority( fn wait_for_supermajority(
config: &ValidatorConfig, config: &ValidatorConfig,
bank: &Bank, process_blockstore: Option<&mut ProcessBlockStore>,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
rpc_override_health_check: Arc<AtomicBool>, rpc_override_health_check: Arc<AtomicBool>,
start_progress: &Arc<RwLock<ValidatorStartProgress>>, start_progress: &Arc<RwLock<ValidatorStartProgress>>,
) -> Result<bool, ValidatorError> { ) -> Result<bool, ValidatorError> {
if let Some(wait_for_supermajority) = config.wait_for_supermajority { match config.wait_for_supermajority {
match wait_for_supermajority.cmp(&bank.slot()) { None => Ok(false),
std::cmp::Ordering::Less => return Ok(false), Some(wait_for_supermajority) => {
std::cmp::Ordering::Greater => { if let Some(process_blockstore) = process_blockstore {
error!( process_blockstore.process();
"Ledger does not have enough data to wait for supermajority, \
please enable snapshot fetch. Has {} needs {}",
bank.slot(),
wait_for_supermajority
);
return Err(ValidatorError::NotEnoughLedgerData);
} }
_ => {}
}
} else {
return Ok(false);
}
if let Some(expected_bank_hash) = config.expected_bank_hash { let bank = bank_forks.read().unwrap().working_bank();
if bank.hash() != expected_bank_hash { match wait_for_supermajority.cmp(&bank.slot()) {
error!( std::cmp::Ordering::Less => return Ok(false),
"Bank hash({}) does not match expected value: {}", std::cmp::Ordering::Greater => {
bank.hash(), error!(
expected_bank_hash "Ledger does not have enough data to wait for supermajority, \
); please enable snapshot fetch. Has {} needs {}",
return Err(ValidatorError::BadExpectedBankHash); bank.slot(),
wait_for_supermajority
);
return Err(ValidatorError::NotEnoughLedgerData);
}
_ => {}
}
if let Some(expected_bank_hash) = config.expected_bank_hash {
if bank.hash() != expected_bank_hash {
error!(
"Bank hash({}) does not match expected value: {}",
bank.hash(),
expected_bank_hash
);
return Err(ValidatorError::BadExpectedBankHash);
}
}
*start_progress.write().unwrap() = ValidatorStartProgress::WaitingForSupermajority;
for i in 1.. {
if i % 10 == 1 {
info!(
"Waiting for {}% of activated stake at slot {} to be in gossip...",
WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
bank.slot()
);
}
let gossip_stake_percent =
get_stake_percent_in_gossip(&bank, cluster_info, i % 10 == 0);
if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
info!(
"Supermajority reached, {}% active stake detected, starting up now.",
gossip_stake_percent,
);
break;
}
// The normal RPC health checks don't apply as the node is waiting, so feign health to
// prevent load balancers from removing the node from their list of candidates during a
// manual restart.
rpc_override_health_check.store(true, Ordering::Relaxed);
sleep(Duration::new(1, 0));
}
rpc_override_health_check.store(false, Ordering::Relaxed);
Ok(true)
} }
} }
*start_progress.write().unwrap() = ValidatorStartProgress::WaitingForSupermajority;
for i in 1.. {
if i % 10 == 1 {
info!(
"Waiting for {}% of activated stake at slot {} to be in gossip...",
WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT,
bank.slot()
);
}
let gossip_stake_percent = get_stake_percent_in_gossip(bank, cluster_info, i % 10 == 0);
if gossip_stake_percent >= WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT {
info!(
"Supermajority reached, {}% active stake detected, starting up now.",
gossip_stake_percent,
);
break;
}
// The normal RPC health checks don't apply as the node is waiting, so feign health to
// prevent load balancers from removing the node from their list of candidates during a
// manual restart.
rpc_override_health_check.store(true, Ordering::Relaxed);
sleep(Duration::new(1, 0));
}
rpc_override_health_check.store(false, Ordering::Relaxed);
Ok(true)
} }
// Get the activated stake percentage (based on the provided bank) that is visible in gossip // Get the activated stake percentage (based on the provided bank) that is visible in gossip
@ -2120,14 +2211,15 @@ mod tests {
); );
let (genesis_config, _mint_keypair) = create_genesis_config(1); let (genesis_config, _mint_keypair) = create_genesis_config(1);
let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let bank_forks = RwLock::new(BankForks::new(Bank::new_for_tests(&genesis_config)));
let mut config = ValidatorConfig::default_for_test(); let mut config = ValidatorConfig::default_for_test();
let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default())); let start_progress = Arc::new(RwLock::new(ValidatorStartProgress::default()));
assert!(!wait_for_supermajority( assert!(!wait_for_supermajority(
&config, &config,
&bank, None,
&bank_forks,
&cluster_info, &cluster_info,
rpc_override_health_check.clone(), rpc_override_health_check.clone(),
&start_progress, &start_progress,
@ -2139,7 +2231,8 @@ mod tests {
assert_eq!( assert_eq!(
wait_for_supermajority( wait_for_supermajority(
&config, &config,
&bank, None,
&bank_forks,
&cluster_info, &cluster_info,
rpc_override_health_check.clone(), rpc_override_health_check.clone(),
&start_progress, &start_progress,
@ -2148,11 +2241,16 @@ mod tests {
); );
// bank=1, wait=0, should pass, bank is past the wait slot // bank=1, wait=0, should pass, bank is past the wait slot
let bank = Bank::new_from_parent(&bank, &Pubkey::default(), 1); let bank_forks = RwLock::new(BankForks::new(Bank::new_from_parent(
&bank_forks.read().unwrap().root_bank(),
&Pubkey::default(),
1,
)));
config.wait_for_supermajority = Some(0); config.wait_for_supermajority = Some(0);
assert!(!wait_for_supermajority( assert!(!wait_for_supermajority(
&config, &config,
&bank, None,
&bank_forks,
&cluster_info, &cluster_info,
rpc_override_health_check.clone(), rpc_override_health_check.clone(),
&start_progress, &start_progress,
@ -2165,7 +2263,8 @@ mod tests {
assert_eq!( assert_eq!(
wait_for_supermajority( wait_for_supermajority(
&config, &config,
&bank, None,
&bank_forks,
&cluster_info, &cluster_info,
rpc_override_health_check, rpc_override_health_check,
&start_progress, &start_progress,

View File

@ -362,7 +362,7 @@ impl SnapshotRequestHandler {
} }
} }
#[derive(Default)] #[derive(Default, Clone)]
pub struct AbsRequestSender { pub struct AbsRequestSender {
snapshot_request_sender: Option<SnapshotRequestSender>, snapshot_request_sender: Option<SnapshotRequestSender>,
} }