diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index bf7003bea..d3ced37f5 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -1,7 +1,7 @@ use { crate::{ cluster_slots::ClusterSlots, - duplicate_repair_status::{DeadSlotAncestorRequestStatus, DuplicateAncestorDecision}, + duplicate_repair_status::{AncestorRequestStatus, DuplicateAncestorDecision}, outstanding_requests::OutstandingRequests, packet_threshold::DynamicPacketToProcessThreshold, repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup}, @@ -164,7 +164,7 @@ impl AncestorHashesService { None, ); - let ancestor_hashes_request_statuses: Arc> = + let ancestor_hashes_request_statuses: Arc> = Arc::new(DashMap::new()); let (retryable_slots_sender, retryable_slots_receiver) = unbounded(); @@ -204,7 +204,7 @@ impl AncestorHashesService { /// Listen for responses to our ancestors hashes repair requests fn run_responses_listener( - ancestor_hashes_request_statuses: Arc>, + ancestor_hashes_request_statuses: Arc>, response_receiver: PacketBatchReceiver, blockstore: Arc, outstanding_requests: Arc>, @@ -253,7 +253,7 @@ impl AncestorHashesService { /// Process messages from the network #[allow(clippy::too_many_arguments)] fn process_new_packets_from_channel( - ancestor_hashes_request_statuses: &DashMap, + ancestor_hashes_request_statuses: &DashMap, response_receiver: &PacketBatchReceiver, blockstore: &Blockstore, outstanding_requests: &RwLock, @@ -300,7 +300,7 @@ impl AncestorHashesService { } fn process_packet_batch( - ancestor_hashes_request_statuses: &DashMap, + ancestor_hashes_request_statuses: &DashMap, packet_batch: PacketBatch, stats: &mut AncestorHashesResponsesStats, outstanding_requests: &RwLock, @@ -336,7 +336,7 @@ impl AncestorHashesService { /// `request_slot` fn verify_and_process_ancestor_response( packet: &Packet, - ancestor_hashes_request_statuses: &DashMap, + ancestor_hashes_request_statuses: &DashMap, stats: &mut AncestorHashesResponsesStats, outstanding_requests: &RwLock, blockstore: &Blockstore, @@ -485,7 +485,7 @@ impl AncestorHashesService { fn process_replay_updates( ancestor_hashes_replay_update_receiver: &AncestorHashesReplayUpdateReceiver, - ancestor_hashes_request_statuses: &DashMap, + ancestor_hashes_request_statuses: &DashMap, dead_slot_pool: &mut HashSet, repairable_dead_slot_pool: &mut HashSet, root_slot: Slot, @@ -512,7 +512,7 @@ impl AncestorHashesService { } fn run_manage_ancestor_requests( - ancestor_hashes_request_statuses: Arc>, + ancestor_hashes_request_statuses: Arc>, ancestor_hashes_request_socket: Arc, repair_info: RepairInfo, outstanding_requests: Arc>, @@ -561,7 +561,7 @@ impl AncestorHashesService { #[allow(clippy::too_many_arguments)] fn manage_ancestor_requests( - ancestor_hashes_request_statuses: &DashMap, + ancestor_hashes_request_statuses: &DashMap, ancestor_hashes_request_socket: &UdpSocket, repair_info: &RepairInfo, outstanding_requests: &RwLock, @@ -595,7 +595,6 @@ impl AncestorHashesService { ); dead_slot_pool.retain(|slot| *slot > root_bank.slot()); - repairable_dead_slot_pool.retain(|slot| *slot > root_bank.slot()); ancestor_hashes_request_statuses.retain(|slot, status| { @@ -705,7 +704,7 @@ impl AncestorHashesService { /// added to `ancestor_hashes_request_statuses` #[allow(clippy::too_many_arguments)] fn initiate_ancestor_hashes_requests_for_duplicate_slot( - ancestor_hashes_request_statuses: &DashMap, + ancestor_hashes_request_statuses: &DashMap, ancestor_hashes_request_socket: &UdpSocket, cluster_slots: &ClusterSlots, serve_repair: &ServeRepair, @@ -741,7 +740,7 @@ impl AncestorHashesService { } } - let ancestor_request_status = DeadSlotAncestorRequestStatus::new( + let ancestor_request_status = AncestorRequestStatus::new( sampled_validators .into_iter() .map(|(_pk, socket_addr)| socket_addr), @@ -840,7 +839,7 @@ mod test { // 4) If an outstanding request for a slot already exists, should // ignore any signals from replay stage - ancestor_hashes_request_statuses.insert(slot, DeadSlotAncestorRequestStatus::default()); + ancestor_hashes_request_statuses.insert(slot, AncestorRequestStatus::default()); dead_slot_pool.clear(); repairable_dead_slot_pool.clear(); ancestor_hashes_replay_update_sender @@ -1031,7 +1030,7 @@ mod test { } struct ManageAncestorHashesState { - ancestor_hashes_request_statuses: Arc>, + ancestor_hashes_request_statuses: Arc>, ancestor_hashes_request_socket: Arc, requester_serve_repair: ServeRepair, repair_info: RepairInfo, diff --git a/core/src/duplicate_repair_status.rs b/core/src/duplicate_repair_status.rs index 88213107e..9690b6178 100644 --- a/core/src/duplicate_repair_status.rs +++ b/core/src/duplicate_repair_status.rs @@ -95,7 +95,7 @@ impl DuplicateSlotRepairStatus { } #[derive(Default, Clone)] -pub struct DeadSlotAncestorRequestStatus { +pub struct AncestorRequestStatus { // The mismatched slot that was the subject of the AncestorHashes(requested_mismatched_slot) // repair request. All responses to this request should be for ancestors of this slot. requested_mismatched_slot: Slot, @@ -115,16 +115,16 @@ pub struct DeadSlotAncestorRequestStatus { ancestor_request_responses: HashMap, Vec>, } -impl DeadSlotAncestorRequestStatus { +impl AncestorRequestStatus { pub fn new( sampled_validators: impl Iterator, requested_mismatched_slot: Slot, ) -> Self { - DeadSlotAncestorRequestStatus { + AncestorRequestStatus { requested_mismatched_slot, start_ts: timestamp(), sampled_validators: sampled_validators.map(|p| (p, false)).collect(), - ..DeadSlotAncestorRequestStatus::default() + ..AncestorRequestStatus::default() } } @@ -365,7 +365,7 @@ pub mod tests { correct_ancestors_response: Vec<(Slot, Hash)>, _blockstore_temp_dir: TempDir, blockstore: Blockstore, - status: DeadSlotAncestorRequestStatus, + status: AncestorRequestStatus, } fn create_rand_socket_addr() -> SocketAddr { @@ -380,8 +380,7 @@ pub mod tests { .take(ANCESTOR_HASH_REPAIR_SAMPLE_SIZE) .collect(); - let status = - DeadSlotAncestorRequestStatus::new(sampled_addresses.iter().cloned(), request_slot); + let status = AncestorRequestStatus::new(sampled_addresses.iter().cloned(), request_slot); let blockstore_temp_dir = get_tmp_ledger_path_auto_delete!(); let blockstore = Blockstore::open(blockstore_temp_dir.path()).unwrap();