Re-enqueues unhandled ABS requests (#28362)

This commit is contained in:
Brooks Prumo 2022-10-13 16:25:39 -04:00 committed by GitHub
parent 5df10173dd
commit dd7fee8f32
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 248 additions and 182 deletions

View File

@ -627,9 +627,11 @@ impl Validator {
);
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender);
let accounts_background_request_sender =
AbsRequestSender::new(snapshot_request_sender.clone());
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config,
snapshot_request_sender,
snapshot_request_receiver,
accounts_package_sender,
};

View File

@ -200,9 +200,11 @@ impl BackgroundServices {
);
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender);
let accounts_background_request_sender =
AbsRequestSender::new(snapshot_request_sender.clone());
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_config.clone(),
snapshot_request_sender,
snapshot_request_receiver,
accounts_package_sender,
};

View File

@ -232,9 +232,10 @@ fn run_bank_forks_snapshot_n<F>(
);
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let request_sender = AbsRequestSender::new(snapshot_request_sender);
let request_sender = AbsRequestSender::new(snapshot_request_sender.clone());
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_sender,
snapshot_request_receiver,
accounts_package_sender,
};
@ -729,9 +730,10 @@ fn test_bank_forks_incremental_snapshot(
);
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let request_sender = AbsRequestSender::new(snapshot_request_sender);
let request_sender = AbsRequestSender::new(snapshot_request_sender.clone());
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_sender,
snapshot_request_receiver,
accounts_package_sender,
};
@ -977,9 +979,10 @@ fn test_snapshots_with_background_services(
bank.set_callback(Some(Box::new(callback.clone())));
}
let abs_request_sender = AbsRequestSender::new(snapshot_request_sender);
let abs_request_sender = AbsRequestSender::new(snapshot_request_sender.clone());
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_sender,
snapshot_request_receiver,
accounts_package_sender: accounts_package_sender.clone(),
};

View File

@ -1135,9 +1135,10 @@ fn load_bank_forks(
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender);
let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender.clone());
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: SnapshotConfig::new_load_only(),
snapshot_request_sender,
snapshot_request_receiver,
accounts_package_sender,
};

View File

@ -9,7 +9,7 @@ use {
bank::{Bank, BankSlotDelta, DropCallback},
bank_forks::BankForks,
snapshot_config::SnapshotConfig,
snapshot_package::{AccountsPackage, AccountsPackageType, SnapshotType},
snapshot_package::{self, AccountsPackage, AccountsPackageType, SnapshotType},
snapshot_utils::{self, SnapshotError},
},
crossbeam_channel::{Receiver, SendError, Sender},
@ -137,6 +137,7 @@ pub enum SnapshotRequestType {
pub struct SnapshotRequestHandler {
pub snapshot_config: SnapshotConfig,
pub snapshot_request_sender: SnapshotRequestSender,
pub snapshot_request_receiver: SnapshotRequestReceiver,
pub accounts_package_sender: Sender<AccountsPackage>,
}
@ -150,34 +151,109 @@ impl SnapshotRequestHandler {
non_snapshot_time_us: u128,
last_full_snapshot_slot: &mut Option<Slot>,
) -> Option<Result<u64, SnapshotError>> {
self.snapshot_request_receiver
let (
snapshot_request,
accounts_package_type,
num_outstanding_requests,
num_re_enqueued_requests,
) = self.get_next_snapshot_request(*last_full_snapshot_slot)?;
datapoint_info!(
"handle_snapshot_requests",
(
"num-outstanding-requests",
num_outstanding_requests as i64,
i64
),
(
"num-re-enqueued-requests",
num_re_enqueued_requests as i64,
i64
)
);
Some(self.handle_snapshot_request(
accounts_db_caching_enabled,
test_hash_calculation,
non_snapshot_time_us,
last_full_snapshot_slot,
snapshot_request,
accounts_package_type,
))
}
/// Get the next snapshot request to handle
///
/// Look through the snapshot request channel to find the highest priority one to handle next.
/// If there are no snapshot requests in the channel, return None. Otherwise return the
/// highest priority one. Unhandled snapshot requests with slots GREATER-THAN the handled one
/// will be re-enqueued. The remaining will be dropped.
///
/// Also return the number of snapshot requests initially in the channel, and the number of
/// ones re-enqueued.
fn get_next_snapshot_request(
&self,
last_full_snapshot_slot: Option<Slot>,
) -> Option<(
SnapshotRequest,
AccountsPackageType,
/*num outstanding snapshot requests*/ usize,
/*num re-enqueued snapshot requests*/ usize,
)> {
let mut requests: Vec<_> = self
.snapshot_request_receiver
.try_iter()
.map(|request| {
let accounts_package_type = new_accounts_package_type(
&request,
&self.snapshot_config,
*last_full_snapshot_slot,
last_full_snapshot_slot,
);
(request, accounts_package_type)
})
.inspect(|(request, package_type)| {
trace!(
"outstanding snapshot request: {:?}, {:?}",
request,
package_type
)
.collect();
// `select_nth()` panics if the slice is empty, so return early if that's the case
if requests.is_empty() {
return None;
}
let requests_len = requests.len();
debug!("outstanding snapshot requests ({requests_len}): {requests:?}");
let num_eah_requests = requests
.iter()
.filter(|(_, account_package_type)| {
*account_package_type == AccountsPackageType::EpochAccountsHash
})
.max_by(cmp_snapshot_requests)
.map(|(snapshot_request, accounts_package_type)| {
self.handle_snapshot_request(
accounts_db_caching_enabled,
test_hash_calculation,
non_snapshot_time_us,
last_full_snapshot_slot,
snapshot_request,
accounts_package_type,
)
.count();
assert!(
num_eah_requests <= 1,
"Only a single EAH request is allowed at a time! count: {num_eah_requests}"
);
// Get the highest priority request, and put it at the end, because we're going to pop it
requests.select_nth_unstable_by(requests_len - 1, cmp_requests_by_priority);
// SAFETY: We know `requests` is not empty, so its len is >= 1, therefore there is always
// an element to pop.
let (snapshot_request, accounts_package_type) = requests.pop().unwrap();
let handled_request_slot = snapshot_request.snapshot_root_bank.slot();
// re-enqueue any remaining requests for slots GREATER-THAN the one that will be handled
let num_re_enqueued_requests = requests
.into_iter()
.filter(|(snapshot_request, _)| {
snapshot_request.snapshot_root_bank.slot() > handled_request_slot
})
.map(|(snapshot_request, _)| {
self.snapshot_request_sender
.try_send(snapshot_request)
.expect("re-enqueue snapshot request")
})
.count();
Some((
snapshot_request,
accounts_package_type,
requests_len,
num_re_enqueued_requests,
))
}
fn handle_snapshot_request(
@ -189,10 +265,9 @@ impl SnapshotRequestHandler {
snapshot_request: SnapshotRequest,
accounts_package_type: AccountsPackageType,
) -> Result<u64, SnapshotError> {
trace!(
debug!(
"handling snapshot request: {:?}, {:?}",
snapshot_request,
accounts_package_type
snapshot_request, accounts_package_type
);
let mut total_time = Measure::start("snapshot_request_receiver_total_time");
let SnapshotRequest {
@ -676,9 +751,9 @@ fn new_accounts_package_type(
/// - Incremental Snapshot
/// - Accounts Hash Verifier
///
/// If two snapshots of the same type are being compared, their bank slots are tiebreakers.
/// If two requests of the same type are being compared, their bank slots are the tiebreaker.
#[must_use]
fn cmp_snapshot_requests(
fn cmp_requests_by_priority(
a: &(SnapshotRequest, AccountsPackageType),
b: &(SnapshotRequest, AccountsPackageType),
) -> std::cmp::Ordering {
@ -686,42 +761,20 @@ fn cmp_snapshot_requests(
let (snapshot_request_b, accounts_package_type_b) = b;
let slot_a = snapshot_request_a.snapshot_root_bank.slot();
let slot_b = snapshot_request_b.snapshot_root_bank.slot();
use {AccountsPackageType::*, SnapshotType::*};
match (accounts_package_type_a, accounts_package_type_b) {
// Epoch Accounts Hash packages
(EpochAccountsHash, EpochAccountsHash) => {
panic!("Only a single EAH snapshot request is allowed at a time")
}
(EpochAccountsHash, _) => std::cmp::Ordering::Greater,
(_, EpochAccountsHash) => std::cmp::Ordering::Less,
// Snapshot packages
(Snapshot(snapshot_type_a), Snapshot(snapshot_type_b)) => {
match (snapshot_type_a, snapshot_type_b) {
(FullSnapshot, FullSnapshot) => slot_a.cmp(&slot_b),
(FullSnapshot, IncrementalSnapshot(_)) => std::cmp::Ordering::Greater,
(IncrementalSnapshot(_), FullSnapshot) => std::cmp::Ordering::Less,
(IncrementalSnapshot(base_slot_a), IncrementalSnapshot(base_slot_b)) => {
slot_a.cmp(&slot_b).then(base_slot_a.cmp(base_slot_b))
}
}
}
(Snapshot(_), _) => std::cmp::Ordering::Greater,
(_, Snapshot(_)) => std::cmp::Ordering::Less,
// Accounts Hash Verifier packages
(AccountsHashVerifier, AccountsHashVerifier) => slot_a.cmp(&slot_b),
}
snapshot_package::cmp_accounts_package_types_by_priority(
accounts_package_type_a,
accounts_package_type_b,
)
.then(slot_a.cmp(&slot_b))
}
#[cfg(test)]
mod test {
use {
super::*,
crate::genesis_utils::create_genesis_config,
crate::{epoch_accounts_hash, genesis_utils::create_genesis_config},
crossbeam_channel::unbounded,
solana_sdk::{account::AccountSharedData, pubkey::Pubkey},
solana_sdk::{account::AccountSharedData, epoch_schedule::EpochSchedule, pubkey::Pubkey},
};
#[test]
@ -749,135 +802,140 @@ mod test {
assert!(bank0.rc.accounts.scan_slot(0, |_| Some(())).is_empty());
}
/// Ensure that unhandled snapshot requests are properly re-enqueued or dropped
///
/// The snapshot request handler should be flexible and handle re-queueing unhandled snapshot
/// requests, if those unhandled requests are for slots GREATER-THAN the last request handled.
/// This is needed if, for example, an Epoch Accounts Hash for slot X and a Full Snapshot for
/// slot X+1 are both in the request channel. The EAH needs to be handled first, but the full
/// snapshot should also be handled afterwards, since future incremental snapshots will depend
/// on it.
#[test]
fn test_cmp_snapshot_requests() {
let genesis_config_info = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
fn test_get_next_snapshot_request() {
// These constants were picked to ensure the desired snapshot requests were sent to the
// channel. With 100 slots per Epoch, the EAH start will be at slot 25. Ensure there are
// other requests before this slot, and then 2+ requests of each type afterwards (to
// further test the prioritization logic).
const SLOTS_PER_EPOCH: Slot = 100;
const FULL_SNAPSHOT_INTERVAL: Slot = 20;
const INCREMENTAL_SNAPSHOT_INTERVAL: Slot = 6;
for (accounts_package_type_a, accounts_package_type_b, expected_result) in [
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
std::cmp::Ordering::Greater,
),
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
std::cmp::Ordering::Greater,
),
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::AccountsHashVerifier,
std::cmp::Ordering::Greater,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::EpochAccountsHash,
std::cmp::Ordering::Less,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
std::cmp::Ordering::Equal,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
std::cmp::Ordering::Greater,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::AccountsHashVerifier,
std::cmp::Ordering::Greater,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::EpochAccountsHash,
std::cmp::Ordering::Less,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
std::cmp::Ordering::Less,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(6)),
std::cmp::Ordering::Less,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
std::cmp::Ordering::Equal,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(4)),
std::cmp::Ordering::Greater,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::AccountsHashVerifier,
std::cmp::Ordering::Greater,
),
(
AccountsPackageType::AccountsHashVerifier,
AccountsPackageType::AccountsHashVerifier,
std::cmp::Ordering::Equal,
),
] {
let snapshot_request_a = SnapshotRequest {
snapshot_root_bank: Arc::clone(&bank),
status_cache_slot_deltas: Vec::default(),
request_type: new_snapshot_request_type(&accounts_package_type_a),
};
let snapshot_request_b = SnapshotRequest {
snapshot_root_bank: Arc::clone(&bank),
status_cache_slot_deltas: Vec::default(),
request_type: new_snapshot_request_type(&accounts_package_type_b),
};
let request_a = &(snapshot_request_a, accounts_package_type_a);
let request_b = &(snapshot_request_b, accounts_package_type_b);
let actual_result = cmp_snapshot_requests(request_a, request_b);
assert_eq!(expected_result, actual_result);
}
}
#[test]
#[should_panic]
fn test_cmp_snapshot_requests_both_eah() {
let genesis_config_info = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
let accounts_package_type_a = AccountsPackageType::EpochAccountsHash;
let accounts_package_type_b = AccountsPackageType::EpochAccountsHash;
let snapshot_request_a = SnapshotRequest {
snapshot_root_bank: Arc::clone(&bank),
status_cache_slot_deltas: Vec::default(),
request_type: new_snapshot_request_type(&accounts_package_type_a),
};
let snapshot_request_b = SnapshotRequest {
snapshot_root_bank: Arc::clone(&bank),
status_cache_slot_deltas: Vec::default(),
request_type: new_snapshot_request_type(&accounts_package_type_b),
let snapshot_config = SnapshotConfig {
full_snapshot_archive_interval_slots: FULL_SNAPSHOT_INTERVAL,
incremental_snapshot_archive_interval_slots: INCREMENTAL_SNAPSHOT_INTERVAL,
..SnapshotConfig::default()
};
let request_a = &(snapshot_request_a, accounts_package_type_a);
let request_b = &(snapshot_request_b, accounts_package_type_b);
let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config,
snapshot_request_sender: snapshot_request_sender.clone(),
snapshot_request_receiver,
accounts_package_sender,
};
let _ = cmp_snapshot_requests(request_a, request_b);
}
let send_snapshot_request = |snapshot_root_bank, request_type| {
let snapshot_request = SnapshotRequest {
snapshot_root_bank,
status_cache_slot_deltas: Vec::default(),
request_type,
};
snapshot_request_sender.send(snapshot_request).unwrap();
};
fn new_snapshot_request_type(
accounts_package_type: &AccountsPackageType,
) -> SnapshotRequestType {
match accounts_package_type {
AccountsPackageType::AccountsHashVerifier => SnapshotRequestType::Snapshot,
AccountsPackageType::Snapshot(_) => SnapshotRequestType::Snapshot,
AccountsPackageType::EpochAccountsHash => SnapshotRequestType::EpochAccountsHash,
let mut genesis_config_info = create_genesis_config(10);
genesis_config_info.genesis_config.epoch_schedule =
EpochSchedule::custom(SLOTS_PER_EPOCH, SLOTS_PER_EPOCH, false);
let bank = Arc::new(Bank::new_for_tests(&genesis_config_info.genesis_config));
bank.set_startup_verification_complete();
// Create new banks and send snapshot requests so that the following requests will be in
// the channel before handling the requests:
//
// fss 20
// iss 24
// eah 25 <-- handled 1st
// iss 30
// iss 36
// fss 40
// iss 42
// iss 48
// iss 54
// fss 60 <-- handled 2nd
// iss 66
// iss 72 <-- handled 3rd
// ahv 73
// ahv 74
// ahv 75 <-- handled 4th
//
// (slots not called out will all be AHV)
// Also, incremental snapshots before slot 60 (the first full snapshot handled), will
// actually be AHV since the last full snapshot slot will be `None`. This is expected and
// fine; but maybe unexpected for a reader/debugger without this additional context.
let mut parent = Arc::clone(&bank);
for _ in 0..75 {
let bank = Arc::new(Bank::new_from_parent(
&parent,
&Pubkey::new_unique(),
parent.slot() + 1,
));
if bank.slot() == epoch_accounts_hash::calculation_start(&bank) {
send_snapshot_request(Arc::clone(&bank), SnapshotRequestType::EpochAccountsHash);
} else {
send_snapshot_request(Arc::clone(&bank), SnapshotRequestType::Snapshot);
}
parent = bank;
}
// Ensure the EAH is handled 1st
let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
.get_next_snapshot_request(None)
.unwrap();
assert_eq!(
accounts_package_type,
AccountsPackageType::EpochAccountsHash
);
assert_eq!(snapshot_request.snapshot_root_bank.slot(), 25);
// Ensure the full snapshot from slot 60 is handled 2nd
// (the older full snapshots are skipped and dropped)
let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
.get_next_snapshot_request(None)
.unwrap();
assert_eq!(
accounts_package_type,
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot)
);
assert_eq!(snapshot_request.snapshot_root_bank.slot(), 60);
// Ensure the incremental snapshot from slot 72 is handled 3rd
// (the older incremental snapshots are skipped and dropped)
let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
.get_next_snapshot_request(Some(60))
.unwrap();
assert_eq!(
accounts_package_type,
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(60))
);
assert_eq!(snapshot_request.snapshot_root_bank.slot(), 72);
// Ensure the accounts hash verifier from slot 75 is handled 4th
// (the older accounts hash verifiers are skipped and dropped)
let (snapshot_request, accounts_package_type, ..) = snapshot_request_handler
.get_next_snapshot_request(Some(60))
.unwrap();
assert_eq!(
accounts_package_type,
AccountsPackageType::AccountsHashVerifier
);
assert_eq!(snapshot_request.snapshot_root_bank.slot(), 75);
// And now ensure the snapshot request channel is empty!
assert!(snapshot_request_handler
.get_next_snapshot_request(Some(60))
.is_none());
}
}