Remove send snapshot hard unwrap (#326)

This commit is contained in:
carllin 2024-04-11 12:18:52 -04:00 committed by GitHub
parent f6cac1e20f
commit d5c291a934
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 219 additions and 124 deletions

View File

@ -555,7 +555,8 @@ mod tests {
bank_forks
.write()
.unwrap()
.set_root(x, &AbsRequestSender::default(), None);
.set_root(x, &AbsRequestSender::default(), None)
.unwrap();
}
// Add an additional bank/vote that will root slot 2
@ -596,11 +597,15 @@ mod tests {
.read()
.unwrap()
.highest_super_majority_root();
bank_forks.write().unwrap().set_root(
root,
&AbsRequestSender::default(),
Some(highest_super_majority_root),
);
bank_forks
.write()
.unwrap()
.set_root(
root,
&AbsRequestSender::default(),
Some(highest_super_majority_root),
)
.unwrap();
let highest_super_majority_root_bank =
bank_forks.read().unwrap().get(highest_super_majority_root);
assert!(highest_super_majority_root_bank.is_some());
@ -675,11 +680,15 @@ mod tests {
.read()
.unwrap()
.highest_super_majority_root();
bank_forks.write().unwrap().set_root(
root,
&AbsRequestSender::default(),
Some(highest_super_majority_root),
);
bank_forks
.write()
.unwrap()
.set_root(
root,
&AbsRequestSender::default(),
Some(highest_super_majority_root),
)
.unwrap();
let highest_super_majority_root_bank =
bank_forks.read().unwrap().get(highest_super_majority_root);
assert!(highest_super_majority_root_bank.is_some());

View File

@ -1908,7 +1908,9 @@ mod test {
{
let mut w_bank_forks = bank_forks.write().unwrap();
w_bank_forks.insert(new_root_bank);
w_bank_forks.set_root(new_root_slot, &AbsRequestSender::default(), None);
w_bank_forks
.set_root(new_root_slot, &AbsRequestSender::default(), None)
.unwrap();
}
popular_pruned_slot_pool.insert(dead_duplicate_confirmed_slot);
assert!(!dead_slot_pool.is_empty());

View File

@ -59,7 +59,7 @@ use {
solana_runtime::{
accounts_background_service::AbsRequestSender,
bank::{bank_hash_details, Bank, NewBankOptions},
bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
bank_forks::{BankForks, SetRootError, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
commitment::BlockCommitmentCache,
installed_scheduler_pool::BankWithScheduler,
prioritization_fee_cache::PrioritizationFeeCache,
@ -980,7 +980,7 @@ impl ReplayStage {
);
}
Self::handle_votable_bank(
if let Err(e) = Self::handle_votable_bank(
vote_bank,
switch_fork_decision,
&bank_forks,
@ -1007,7 +1007,10 @@ impl ReplayStage {
&mut epoch_slots_frozen_slots,
&drop_bank_sender,
wait_to_vote_slot,
);
) {
error!("Unable to set root: {e}");
return;
}
}
voting_time.stop();
@ -2306,7 +2309,7 @@ impl ReplayStage {
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
drop_bank_sender: &Sender<Vec<Arc<Bank>>>,
wait_to_vote_slot: Option<Slot>,
) {
) -> Result<(), SetRootError> {
if bank.is_empty() {
datapoint_info!("replay_stage-voted_empty_bank", ("slot", bank.slot(), i64));
}
@ -2362,7 +2365,7 @@ impl ReplayStage {
vote_signatures,
epoch_slots_frozen_slots,
drop_bank_sender,
);
)?;
blockstore.slots_stats.mark_rooted(new_root);
@ -2406,6 +2409,7 @@ impl ReplayStage {
voting_sender,
wait_to_vote_slot,
);
Ok(())
}
fn generate_vote_tx(
@ -4136,13 +4140,13 @@ impl ReplayStage {
voted_signatures: &mut Vec<Signature>,
epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots,
drop_bank_sender: &Sender<Vec<Arc<Bank>>>,
) {
) -> Result<(), SetRootError> {
bank_forks.read().unwrap().prune_program_cache(new_root);
let removed_banks = bank_forks.write().unwrap().set_root(
new_root,
accounts_background_request_sender,
highest_super_majority_root,
);
)?;
drop_bank_sender
.send(removed_banks)
@ -4173,6 +4177,7 @@ impl ReplayStage {
unfrozen_gossip_verified_vote_hashes.set_root(new_root);
*epoch_slots_frozen_slots = epoch_slots_frozen_slots.split_off(&new_root);
Ok(())
// epoch_slots_frozen_slots now only contains entries >= `new_root`
}
@ -4694,7 +4699,8 @@ pub(crate) mod tests {
&mut Vec::new(),
&mut epoch_slots_frozen_slots,
&drop_bank_sender,
);
)
.unwrap();
assert_eq!(bank_forks.read().unwrap().root(), root);
assert_eq!(progress.len(), 1);
assert!(progress.get(&root).is_some());
@ -4772,7 +4778,8 @@ pub(crate) mod tests {
&mut Vec::new(),
&mut EpochSlotsFrozenSlots::default(),
&drop_bank_sender,
);
)
.unwrap();
assert_eq!(bank_forks.read().unwrap().root(), root);
assert!(bank_forks.read().unwrap().get(confirmed_root).is_some());
assert!(bank_forks.read().unwrap().get(fork).is_none());
@ -5797,7 +5804,9 @@ pub(crate) mod tests {
bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
let bank9 = bank_forks.get(9).unwrap();
bank_forks.insert(Bank::new_from_parent(bank9, &Pubkey::default(), 10));
bank_forks.set_root(9, &AbsRequestSender::default(), None);
bank_forks
.set_root(9, &AbsRequestSender::default(), None)
.unwrap();
let total_epoch_stake = bank0.total_epoch_stake();
// Insert new ForkProgress for slot 10 and its
@ -5891,7 +5900,9 @@ pub(crate) mod tests {
.get_propagated_stats_mut(0)
.unwrap()
.is_leader_slot = true;
bank_forks.set_root(0, &AbsRequestSender::default(), None);
bank_forks
.set_root(0, &AbsRequestSender::default(), None)
.unwrap();
let total_epoch_stake = bank_forks.root_bank().total_epoch_stake();
// Insert new ForkProgress representing a slot for all slots 1..=num_banks. Only
@ -5974,7 +5985,9 @@ pub(crate) mod tests {
.get_propagated_stats_mut(0)
.unwrap()
.is_leader_slot = true;
bank_forks.set_root(0, &AbsRequestSender::default(), None);
bank_forks
.set_root(0, &AbsRequestSender::default(), None)
.unwrap();
let total_epoch_stake = num_validators as u64 * stake_per_validator;
@ -6596,7 +6609,8 @@ pub(crate) mod tests {
bank_forks
.write()
.unwrap()
.set_root(3, &AbsRequestSender::default(), None);
.set_root(3, &AbsRequestSender::default(), None)
.unwrap();
let mut descendants = bank_forks.read().unwrap().descendants();
let mut ancestors = bank_forks.read().unwrap().ancestors();
let slot_3_descendants = descendants.get(&3).unwrap().clone();

View File

@ -2097,11 +2097,13 @@ fn maybe_warp_slot(
warp_slot,
solana_accounts_db::accounts_db::CalcAccountsHashDataSource::Storages,
));
bank_forks.set_root(
warp_slot,
accounts_background_request_sender,
Some(warp_slot),
);
bank_forks
.set_root(
warp_slot,
accounts_background_request_sender,
Some(warp_slot),
)
.map_err(|err| err.to_string())?;
leader_schedule_cache.set_root(&bank_forks.root_bank());
let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive(

View File

@ -227,6 +227,7 @@ impl VoteSimulator {
&mut EpochSlotsFrozenSlots::default(),
&drop_bank_sender,
)
.unwrap()
}
pub fn create_and_vote_new_branch(

View File

@ -295,13 +295,17 @@ fn test_epoch_accounts_hash_basic(test_environment: TestEnvironment) {
if bank.slot().checked_rem(SET_ROOT_INTERVAL).unwrap() == 0 {
trace!("rooting bank {}", bank.slot());
bank_forks.read().unwrap().prune_program_cache(bank.slot());
bank_forks.write().unwrap().set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
);
bank_forks
.write()
.unwrap()
.set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
)
.unwrap();
}
// To ensure EAH calculations are correct, calculate the accounts hash here, in-band.
@ -407,13 +411,17 @@ fn test_snapshots_have_expected_epoch_accounts_hash() {
// Root every bank. This is what a normal validator does as well.
// `set_root()` is also what requests snapshots and EAH calculations.
bank_forks.read().unwrap().prune_program_cache(bank.slot());
bank_forks.write().unwrap().set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
);
bank_forks
.write()
.unwrap()
.set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
)
.unwrap();
// After submitting an EAH calculation request, wait until it gets handled by ABS so that
// subsequent snapshot requests are not swallowed.
@ -531,13 +539,17 @@ fn test_background_services_request_handling_for_epoch_accounts_hash() {
if bank.block_height() == set_root_slot {
info!("Calling set_root() on bank {}...", bank.slot());
bank_forks.read().unwrap().prune_program_cache(bank.slot());
bank_forks.write().unwrap().set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
);
bank_forks
.write()
.unwrap()
.set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
)
.unwrap();
info!("Calling set_root() on bank {}... DONE", bank.slot());
// wait until eah is valid
@ -588,13 +600,17 @@ fn test_epoch_accounts_hash_and_warping() {
epoch_schedule.get_first_slot_in_epoch(bank.epoch() + 1) + eah_stop_offset;
// have to set root here so that we can flush the write cache
bank_forks.read().unwrap().prune_program_cache(bank.slot());
bank_forks.write().unwrap().set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
);
bank_forks
.write()
.unwrap()
.set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
)
.unwrap();
// flush the write cache so warping can calculate the accounts hash from storages
bank.force_flush_accounts_cache();
let bank = bank_forks
@ -614,13 +630,17 @@ fn test_epoch_accounts_hash_and_warping() {
.insert(Bank::new_from_parent(bank, &Pubkey::default(), slot))
.clone_without_scheduler();
bank_forks.read().unwrap().prune_program_cache(bank.slot());
bank_forks.write().unwrap().set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
);
bank_forks
.write()
.unwrap()
.set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
)
.unwrap();
info!("Waiting for epoch accounts hash...");
_ = bank
.rc
@ -654,13 +674,17 @@ fn test_epoch_accounts_hash_and_warping() {
.insert(Bank::new_from_parent(bank, &Pubkey::default(), slot))
.clone_without_scheduler();
bank_forks.read().unwrap().prune_program_cache(bank.slot());
bank_forks.write().unwrap().set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
);
bank_forks
.write()
.unwrap()
.set_root(
bank.slot(),
&test_environment
.background_services
.accounts_background_request_sender,
None,
)
.unwrap();
info!("Waiting for epoch accounts hash...");
_ = bank
.rc

View File

@ -237,7 +237,8 @@ fn run_bank_forks_snapshot_n<F>(
bank_forks
.write()
.unwrap()
.set_root(bank.slot(), &request_sender, None);
.set_root(bank.slot(), &request_sender, None)
.unwrap();
snapshot_request_handler.handle_snapshot_requests(
false,
0,
@ -633,7 +634,8 @@ fn test_slots_to_snapshot(snapshot_version: SnapshotVersion, cluster_type: Clust
bank_forks
.write()
.unwrap()
.set_root(current_bank.slot(), &request_sender, None);
.set_root(current_bank.slot(), &request_sender, None)
.unwrap();
// Since the accounts background services are not running, EpochAccountsHash
// calculation requests will not be handled. To prevent banks from hanging during
@ -789,7 +791,8 @@ fn test_bank_forks_incremental_snapshot(
bank_forks
.write()
.unwrap()
.set_root(bank.slot(), &request_sender, None);
.set_root(bank.slot(), &request_sender, None)
.unwrap();
snapshot_request_handler.handle_snapshot_requests(
false,
0,
@ -1090,7 +1093,8 @@ fn test_snapshots_with_background_services(
bank_forks
.write()
.unwrap()
.set_root(slot, &abs_request_sender, None);
.set_root(slot, &abs_request_sender, None)
.unwrap();
}
// If a snapshot should be taken this slot, wait for it to complete

View File

@ -305,7 +305,9 @@ mod tests {
let mut bank_forks = bank_forks_arc.write().unwrap();
let bank0 = bank_forks.get(0).unwrap();
bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
bank_forks.set_root(9, &AbsRequestSender::default(), None);
bank_forks
.set_root(9, &AbsRequestSender::default(), None)
.unwrap();
}
blockstore.set_roots([0, 9].iter()).unwrap();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(
@ -394,7 +396,9 @@ mod tests {
let mut bank_forks = bank_forks_arc.write().unwrap();
let bank0 = bank_forks.get(0).unwrap();
bank_forks.insert(Bank::new_from_parent(bank0.clone(), &Pubkey::default(), 9));
bank_forks.set_root(9, &AbsRequestSender::default(), None);
bank_forks
.set_root(9, &AbsRequestSender::default(), None)
.unwrap();
}
blockstore.set_roots([0, 9].iter()).unwrap();
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(

View File

@ -35,7 +35,7 @@ use {
solana_runtime::{
accounts_background_service::{AbsRequestSender, SnapshotRequestKind},
bank::{Bank, TransactionBalancesSet},
bank_forks::BankForks,
bank_forks::{BankForks, SetRootError},
bank_utils,
commitment::VOTE_THRESHOLD_SIZE,
installed_scheduler_pool::BankWithScheduler,
@ -689,6 +689,9 @@ pub enum BlockstoreProcessorError {
#[error("root bank with mismatched capitalization at {0}")]
RootBankWithMismatchedCapitalization(Slot),
#[error("set root error {0}")]
SetRootError(#[from] SetRootError),
}
/// Callback for accessing bank state after each slot is confirmed while
@ -1835,7 +1838,7 @@ fn load_frozen_forks(
root,
accounts_background_request_sender,
None,
);
)?;
m.stop();
set_root_us += m.as_us();
@ -3899,11 +3902,15 @@ pub mod tests {
&mut ExecuteTimings::default(),
)
.unwrap();
bank_forks.write().unwrap().set_root(
1,
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
None,
);
bank_forks
.write()
.unwrap()
.set_root(
1,
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
None,
)
.unwrap();
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank1);

View File

@ -1155,7 +1155,9 @@ impl ProgramTestContext {
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
let abs_request_sender = AbsRequestSender::new(snapshot_request_sender);
bank_forks.set_root(pre_warp_slot, &abs_request_sender, Some(pre_warp_slot));
bank_forks
.set_root(pre_warp_slot, &abs_request_sender, Some(pre_warp_slot))
.unwrap();
// The call to `set_root()` above will send an EAH request. Need to intercept and handle
// all EpochAccountsHash requests so future rooted banks do not hang in Bank::freeze()
@ -1217,11 +1219,13 @@ impl ProgramTestContext {
bank.fill_bank_with_ticks_for_tests();
let pre_warp_slot = bank.slot();
bank_forks.set_root(
pre_warp_slot,
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
Some(pre_warp_slot),
);
bank_forks
.set_root(
pre_warp_slot,
&solana_runtime::accounts_background_service::AbsRequestSender::default(),
Some(pre_warp_slot),
)
.unwrap();
// warp_bank is frozen so go forward to get unfrozen bank at warp_slot
let warp_slot = pre_warp_slot + 1;

View File

@ -607,7 +607,8 @@ mod tests {
bank_forks
.write()
.unwrap()
.set_root(7, &AbsRequestSender::default(), None);
.set_root(7, &AbsRequestSender::default(), None)
.unwrap();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(6),
&bank_forks,

View File

@ -5087,11 +5087,11 @@ pub mod tests {
self.bank_forks.write().unwrap().insert(new_bank);
for root in roots.iter() {
self.bank_forks.write().unwrap().set_root(
*root,
&AbsRequestSender::default(),
Some(0),
);
self.bank_forks
.write()
.unwrap()
.set_root(*root, &AbsRequestSender::default(), Some(0))
.unwrap();
let block_time = self
.bank_forks
.read()

View File

@ -9,6 +9,7 @@ use {
},
snapshot_config::SnapshotConfig,
},
crossbeam_channel::SendError,
log::*,
solana_measure::measure::Measure,
solana_program_runtime::loaded_programs::{BlockRelation, ForkGraph},
@ -26,6 +27,7 @@ use {
},
time::Instant,
},
thiserror::Error,
};
pub const MAX_ROOT_DISTANCE_FOR_VOTE_ONLY: Slot = 400;
@ -43,6 +45,12 @@ impl ReadOnlyAtomicSlot {
}
}
#[derive(Error, Debug)]
pub enum SetRootError {
#[error("failed to send epoch accounts hash request for bank {0}: {1}")]
SendEpochAccountHashError(Slot, SendError<SnapshotRequest>),
}
#[derive(Debug, Default, Copy, Clone)]
struct SetRootMetrics {
timings: SetRootTimings,
@ -280,7 +288,7 @@ impl BankForks {
root: Slot,
accounts_background_request_sender: &AbsRequestSender,
highest_super_majority_root: Option<Slot>,
) -> (Vec<Arc<Bank>>, SetRootMetrics) {
) -> Result<(Vec<Arc<Bank>>, SetRootMetrics), SetRootError> {
let old_epoch = self.root_bank().epoch();
// To support `RootBankCache` (via `ReadOnlyAtomicSlot`) accessing `root` *without* locking
// BankForks first *and* from a different thread, this store *must* be at least Release to
@ -353,14 +361,16 @@ impl BankForks {
.accounts_db
.epoch_accounts_hash_manager
.set_in_flight(eah_bank.slot());
accounts_background_request_sender
.send_snapshot_request(SnapshotRequest {
if let Err(e) =
accounts_background_request_sender.send_snapshot_request(SnapshotRequest {
snapshot_root_bank: Arc::clone(eah_bank),
status_cache_slot_deltas: Vec::default(),
request_kind: SnapshotRequestKind::EpochAccountsHash,
enqueued: Instant::now(),
})
.expect("send epoch accounts hash request");
{
return Err(SetRootError::SendEpochAccountHashError(eah_bank.slot(), e));
};
}
drop(eah_banks);
@ -425,7 +435,7 @@ impl BankForks {
drop(parents);
drop_parent_banks_time.stop();
(
Ok((
removed_banks,
SetRootMetrics {
timings: SetRootTimings {
@ -441,7 +451,7 @@ impl BankForks {
dropped_banks_len: dropped_banks_len as i64,
accounts_data_len,
},
)
))
}
pub fn prune_program_cache(&self, root: Slot) {
@ -455,14 +465,14 @@ impl BankForks {
root: Slot,
accounts_background_request_sender: &AbsRequestSender,
highest_super_majority_root: Option<Slot>,
) -> Vec<Arc<Bank>> {
) -> Result<Vec<Arc<Bank>>, SetRootError> {
let program_cache_prune_start = Instant::now();
let set_root_start = Instant::now();
let (removed_banks, set_root_metrics) = self.do_set_root_return_metrics(
root,
accounts_background_request_sender,
highest_super_majority_root,
);
)?;
datapoint_info!(
"bank-forks_set_root",
(
@ -548,7 +558,7 @@ impl BankForks {
("dropped_banks_len", set_root_metrics.dropped_banks_len, i64),
("accounts_data_len", set_root_metrics.accounts_data_len, i64),
);
removed_banks
Ok(removed_banks)
}
pub fn root(&self) -> Slot {
@ -848,7 +858,7 @@ mod tests {
let bank0 = Bank::new_for_tests(&genesis_config);
let bank_forks0 = BankForks::new_rw_arc(bank0);
let mut bank_forks0 = bank_forks0.write().unwrap();
bank_forks0.set_root(0, &abs_request_sender, None);
bank_forks0.set_root(0, &abs_request_sender, None).unwrap();
let bank1 = Bank::new_for_tests(&genesis_config);
let bank_forks1 = BankForks::new_rw_arc(bank1);
@ -883,7 +893,9 @@ mod tests {
// Set root in bank_forks0 to truncate the ancestor history
bank_forks0.insert(child1);
bank_forks0.set_root(slot, &abs_request_sender, None);
bank_forks0
.set_root(slot, &abs_request_sender, None)
.unwrap();
// Don't set root in bank_forks1 to keep the ancestor history
bank_forks1.insert(child2);
@ -948,11 +960,15 @@ mod tests {
(4, vec![]),
])
);
bank_forks.write().unwrap().set_root(
2,
&AbsRequestSender::default(),
None, // highest confirmed root
);
bank_forks
.write()
.unwrap()
.set_root(
2,
&AbsRequestSender::default(),
None, // highest confirmed root
)
.unwrap();
bank_forks.read().unwrap().get(2).unwrap().squash();
assert_eq!(
bank_forks.read().unwrap().ancestors(),
@ -1011,11 +1027,15 @@ mod tests {
(4, vec![]),
])
);
bank_forks.write().unwrap().set_root(
2,
&AbsRequestSender::default(),
Some(1), // highest confirmed root
);
bank_forks
.write()
.unwrap()
.set_root(
2,
&AbsRequestSender::default(),
Some(1), // highest confirmed root
)
.unwrap();
bank_forks.read().unwrap().get(2).unwrap().squash();
assert_eq!(
bank_forks.read().unwrap().ancestors(),
@ -1101,11 +1121,13 @@ mod tests {
assert_matches!(bank_forks.relationship(1, 13), BlockRelation::Unknown);
assert_matches!(bank_forks.relationship(13, 2), BlockRelation::Unknown);
bank_forks.set_root(
2,
&AbsRequestSender::default(),
Some(1), // highest confirmed root
);
bank_forks
.set_root(
2,
&AbsRequestSender::default(),
Some(1), // highest confirmed root
)
.unwrap();
assert_matches!(bank_forks.relationship(1, 2), BlockRelation::Unknown);
assert_matches!(bank_forks.relationship(2, 0), BlockRelation::Unknown);
}

View File

@ -79,7 +79,8 @@ mod tests {
bank_forks
.write()
.unwrap()
.set_root(1, &AbsRequestSender::default(), None);
.set_root(1, &AbsRequestSender::default(), None)
.unwrap();
let bank = bank_forks.read().unwrap().root_bank();
// cached slot and bank are not updated until we call `root_bank()`