diff --git a/Cargo.lock b/Cargo.lock index 39ecc05392..9147548f7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4556,6 +4556,7 @@ dependencies = [ "byteorder", "chrono", "crossbeam-channel", + "dashmap", "ed25519-dalek", "flate2", "fs_extra", diff --git a/core/Cargo.toml b/core/Cargo.toml index c752d0e253..1f617fb340 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -23,6 +23,7 @@ bs58 = "0.4.0" byteorder = "1.4.3" chrono = { version = "0.4.11", features = ["serde"] } crossbeam-channel = "0.5" +dashmap = { version = "4.0.2", features = ["rayon", "raw-api"] } ed25519-dalek = "=1.0.1" fs_extra = "1.2.0" flate2 = "1.0" diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs new file mode 100644 index 0000000000..058d6b085f --- /dev/null +++ b/core/src/ancestor_hashes_service.rs @@ -0,0 +1,364 @@ +use crate::{ + duplicate_repair_status::DeadSlotAncestorRequestStatus, + outstanding_requests::OutstandingRequests, + repair_response::{self}, + repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup}, + result::{Error, Result}, + serve_repair::AncestorHashesRepairType, +}; +use dashmap::{mapref::entry::Entry::Occupied, DashMap}; +use solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}; +use solana_measure::measure::Measure; +use solana_perf::{packet::limited_deserialize, recycler::Recycler}; +use solana_sdk::{ + clock::{Slot, SLOT_MS}, + timing::timestamp, +}; +use solana_streamer::{ + packet::Packets, + streamer::{self, PacketReceiver}, +}; +use std::{ + net::UdpSocket, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel, + {Arc, RwLock}, + }, + thread::{self, sleep, Builder, JoinHandle}, + time::{Duration, Instant}, +}; + +pub const MAX_ANCESTOR_HASHES_SLOT_REQUESTS_PER_SECOND: usize = 2; +type OutstandingAncestorHashesRepairs = OutstandingRequests; + +#[derive(Default)] +pub struct AncestorHashesResponsesStats { + pub total_packets: usize, + pub dropped_packets: usize, + pub invalid_packets: usize, + pub processed: usize, +} + +impl AncestorHashesResponsesStats { + fn report(&mut self) { + inc_new_counter_info!( + "ancestor_hashes_responses-total_packets", + self.total_packets + ); + inc_new_counter_info!("ancestor_hashes_responses-processed", self.processed); + inc_new_counter_info!( + "ancestor_hashes_responses-dropped_packets", + self.dropped_packets + ); + inc_new_counter_info!( + "ancestor_hashes_responses-invalid_packets", + self.invalid_packets + ); + *self = AncestorHashesResponsesStats::default(); + } +} + +pub struct AncestorRepairRequestsStats { + pub ancestor_requests: RepairStatsGroup, + last_report: Instant, +} + +impl Default for AncestorRepairRequestsStats { + fn default() -> Self { + AncestorRepairRequestsStats { + ancestor_requests: RepairStatsGroup::default(), + last_report: Instant::now(), + } + } +} + +impl AncestorRepairRequestsStats { + fn report(&mut self) { + let slot_to_count: Vec<_> = self + .ancestor_requests + .slot_pubkeys + .iter() + .map(|(slot, slot_repairs)| { + ( + slot, + slot_repairs + .pubkey_repairs() + .iter() + .map(|(_key, count)| count) + .sum::(), + ) + }) + .collect(); + + let repair_total = self.ancestor_requests.count; + if self.last_report.elapsed().as_secs() > 2 && repair_total > 0 { + info!("ancestor_repair_requests_stats: {:?}", slot_to_count); + datapoint_info!( + "ancestor-repair", + ("ancestor-repair-count", self.ancestor_requests.count, i64) + ); + + *self = AncestorRepairRequestsStats::default(); + } + } +} + +pub struct AncestorHashesService { + thread_hdls: Vec>, +} + +impl AncestorHashesService { + pub fn new( + exit: Arc, + blockstore: Arc, + ancestor_hashes_request_socket: Arc, + repair_info: RepairInfo, + ) -> Self { + let outstanding_requests: Arc> = + Arc::new(RwLock::new(OutstandingAncestorHashesRepairs::default())); + let (response_sender, response_receiver) = channel(); + let t_receiver = streamer::receiver( + ancestor_hashes_request_socket.clone(), + &exit, + response_sender, + Recycler::default(), + "ancestor_hashes_response_receiver", + 1, + false, + ); + + // Listen for responses to our ancestor requests + let ancestor_hashes_request_statuses: Arc> = + Arc::new(DashMap::new()); + let t_ancestor_hashes_responses = Self::run_responses_listener( + ancestor_hashes_request_statuses.clone(), + response_receiver, + blockstore, + outstanding_requests.clone(), + exit.clone(), + repair_info.duplicate_slots_reset_sender.clone(), + ); + + // Generate ancestor requests for dead slots that are repairable + let t_ancestor_requests = Self::run_find_repairable_dead_slots( + ancestor_hashes_request_statuses, + ancestor_hashes_request_socket, + repair_info, + outstanding_requests, + exit, + ); + let thread_hdls = vec![t_receiver, t_ancestor_hashes_responses, t_ancestor_requests]; + Self { thread_hdls } + } + + pub fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) + } + + /// Listen for responses to our ancestors hashes repair requests + fn run_responses_listener( + ancestor_hashes_request_statuses: Arc>, + response_receiver: PacketReceiver, + blockstore: Arc, + outstanding_requests: Arc>, + exit: Arc, + duplicate_slots_reset_sender: DuplicateSlotsResetSender, + ) -> JoinHandle<()> { + Builder::new() + .name("solana-ancestor-hashes-responses-service".to_string()) + .spawn(move || { + let mut last_stats_report = Instant::now(); + let mut stats = AncestorHashesResponsesStats::default(); + let mut max_packets = 1024; + loop { + let result = Self::process_new_responses( + &ancestor_hashes_request_statuses, + &response_receiver, + &blockstore, + &outstanding_requests, + &mut stats, + &mut max_packets, + &duplicate_slots_reset_sender, + ); + match result { + Err(Error::RecvTimeout(_)) | Ok(_) => {} + Err(err) => info!("ancestors hashes reponses listener error: {:?}", err), + }; + if exit.load(Ordering::Relaxed) { + return; + } + if last_stats_report.elapsed().as_secs() > 2 { + stats.report(); + last_stats_report = Instant::now(); + } + } + }) + .unwrap() + } + + /// Process messages from the network + fn process_new_responses( + ancestor_hashes_request_statuses: &DashMap, + response_receiver: &PacketReceiver, + blockstore: &Blockstore, + outstanding_requests: &RwLock, + stats: &mut AncestorHashesResponsesStats, + max_packets: &mut usize, + duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + ) -> Result<()> { + let timeout = Duration::new(1, 0); + let mut responses = vec![response_receiver.recv_timeout(timeout)?]; + let mut total_packets = responses[0].packets.len(); + + let mut dropped_packets = 0; + while let Ok(more) = response_receiver.try_recv() { + total_packets += more.packets.len(); + if total_packets < *max_packets { + // Drop the rest in the channel in case of DOS + responses.push(more); + } else { + dropped_packets += more.packets.len(); + } + } + + stats.dropped_packets += dropped_packets; + stats.total_packets += total_packets; + + let mut time = Measure::start("ancestor_hashes::handle_packets"); + for response in responses { + Self::handle_packets( + ancestor_hashes_request_statuses, + response, + stats, + outstanding_requests, + blockstore, + duplicate_slots_reset_sender, + ); + } + time.stop(); + if total_packets >= *max_packets { + if time.as_ms() > 1000 { + *max_packets = (*max_packets * 9) / 10; + } else { + *max_packets = (*max_packets * 10) / 9; + } + } + Ok(()) + } + + fn handle_packets( + ancestor_hashes_request_statuses: &DashMap, + packets: Packets, + stats: &mut AncestorHashesResponsesStats, + outstanding_requests: &RwLock, + blockstore: &Blockstore, + duplicate_slots_reset_sender: &DuplicateSlotsResetSender, + ) { + // iter over the packets + packets.packets.iter().for_each(|packet| { + let from_addr = packet.meta.addr(); + if let Ok(ancestor_hashes_response) = + limited_deserialize(&packet.data[..packet.meta.size - SIZE_OF_NONCE]) + { + // Verify the response + let request_slot = repair_response::nonce(&packet.data[..packet.meta.size]) + .and_then(|nonce| { + outstanding_requests.write().unwrap().register_response( + nonce, + &ancestor_hashes_response, + timestamp(), + // If the response is valid, return the slot the request + // was for + |ancestor_hashes_request| ancestor_hashes_request.0, + ) + }); + + if request_slot.is_none() { + stats.invalid_packets += 1; + return; + } + + // If was a valid response, there must be a valid `request_slot` + let request_slot = request_slot.unwrap(); + stats.processed += 1; + + // Check if we can make any decisions. + if let Occupied(mut ancestor_hashes_status_ref) = + ancestor_hashes_request_statuses.entry(request_slot) + { + if let Some(decision) = ancestor_hashes_status_ref.get_mut().add_response( + &from_addr, + ancestor_hashes_response.into_slot_hashes(), + blockstore, + ) { + let potential_slots_to_dump = { + // TODO: In the case of DuplicateAncestorDecision::ContinueSearch + // This means all the ancestors were mismatched, which + // means the earliest mismatched ancestor has yet to be found. + // + // In the best case scenario, this means after ReplayStage dumps + // the earliest known ancestor `A` here, and then repairs `A`, + // because we may still have the incorrect version of some ancestor + // of `A`, we will mark `A` as dead and then continue the search + // protocol through another round of ancestor repairs. + // + // However this process is a bit slow, so in an ideal world, the + // protocol could be extended to keep searching by making + // another ancestor repair request from the earliest returned + // ancestor from this search. + decision + .repair_status() + .map(|status| status.correct_ancestors_to_repair.clone()) + }; + + let mut did_send_replay_correct_ancestors = false; + + if let Some(potential_slots_to_dump) = potential_slots_to_dump { + // Signal ReplayStage to dump the fork that is descended from + // `earliest_mismatched_slot_to_dump`. + if !potential_slots_to_dump.is_empty() { + did_send_replay_correct_ancestors = true; + let _ = duplicate_slots_reset_sender.send(potential_slots_to_dump); + } + } + + if !did_send_replay_correct_ancestors { + // If nothing is going to be dumped + repaired, then we can remove + // this slot from `ancestor_hashes_request_statuses` since the + // dead flag won't be cleared from blockstore, so the + // `ancestor_hashes_request_statuses.retain()` in + // `Self::run_find_repairable_dead_slots()` won't clear + // this slot + ancestor_hashes_status_ref.remove(); + } + } + } + } + }); + } + + fn run_find_repairable_dead_slots( + _ancestor_hashes_request_statuses: Arc>, + _ancestor_hashes_request_socket: Arc, + _repair_info: RepairInfo, + _outstanding_requests: Arc>, + exit: Arc, + ) -> JoinHandle<()> { + let mut repair_stats = AncestorRepairRequestsStats::default(); + + Builder::new() + .name("solana-find-repairable-dead-slots".to_string()) + .spawn(move || loop { + if exit.load(Ordering::Relaxed) { + return; + } + repair_stats.report(); + sleep(Duration::from_millis(SLOT_MS)); + }) + .unwrap() + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index ce7e156d9e..db336c73a1 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -8,6 +8,7 @@ //! pub mod accounts_hash_verifier; +pub mod ancestor_hashes_service; pub mod banking_stage; pub mod broadcast_stage; pub mod cache_block_meta_service; diff --git a/core/src/outstanding_requests.rs b/core/src/outstanding_requests.rs index 27d6ba5d05..7f8a8c109d 100644 --- a/core/src/outstanding_requests.rs +++ b/core/src/outstanding_requests.rs @@ -29,8 +29,15 @@ where nonce } - pub fn register_response(&mut self, nonce: u32, response: &S, now: u64) -> bool { - let (is_valid, should_delete) = self + pub fn register_response( + &mut self, + nonce: u32, + response: &S, + now: u64, + // runs if the response was valid + success_fn: impl Fn(&T) -> R, + ) -> Option { + let (response, should_delete) = self .requests .get_mut(&nonce) .map(|status| { @@ -39,12 +46,15 @@ where && status.request.verify_response(response) { status.num_expected_responses -= 1; - (true, status.num_expected_responses == 0) + ( + Some(success_fn(&status.request)), + status.num_expected_responses == 0, + ) } else { - (false, true) + (None, true) } }) - .unwrap_or((false, false)); + .unwrap_or((None, false)); if should_delete { self.requests @@ -52,7 +62,7 @@ where .expect("Delete must delete existing object"); } - is_valid + response } } @@ -103,7 +113,9 @@ pub(crate) mod tests { .unwrap() .expire_timestamp; - assert!(!outstanding_requests.register_response(nonce, &shred, expire_timestamp + 1)); + assert!(outstanding_requests + .register_response(nonce, &shred, expire_timestamp + 1, |_| ()) + .is_none()); assert!(outstanding_requests.requests.get(&nonce).is_none()); } @@ -127,7 +139,9 @@ pub(crate) mod tests { assert!(num_expected_responses > 1); // Response that passes all checks should decrease num_expected_responses - assert!(outstanding_requests.register_response(nonce, &shred, expire_timestamp - 1)); + assert!(outstanding_requests + .register_response(nonce, &shred, expire_timestamp - 1, |_| ()) + .is_some()); num_expected_responses -= 1; assert_eq!( outstanding_requests @@ -139,8 +153,12 @@ pub(crate) mod tests { ); // Response with incorrect nonce is ignored - assert!(!outstanding_requests.register_response(nonce + 1, &shred, expire_timestamp - 1)); - assert!(!outstanding_requests.register_response(nonce + 1, &shred, expire_timestamp)); + assert!(outstanding_requests + .register_response(nonce + 1, &shred, expire_timestamp - 1, |_| ()) + .is_none()); + assert!(outstanding_requests + .register_response(nonce + 1, &shred, expire_timestamp, |_| ()) + .is_none()); assert_eq!( outstanding_requests .requests @@ -152,7 +170,9 @@ pub(crate) mod tests { // Response with timestamp over limit should remove status, preventing late // responses from being accepted - assert!(!outstanding_requests.register_response(nonce, &shred, expire_timestamp)); + assert!(outstanding_requests + .register_response(nonce, &shred, expire_timestamp, |_| ()) + .is_none()); assert!(outstanding_requests.requests.get(&nonce).is_none()); // If number of outstanding requests hits zero, should also remove the entry @@ -170,7 +190,9 @@ pub(crate) mod tests { assert!(num_expected_responses > 1); for _ in 0..num_expected_responses { assert!(outstanding_requests.requests.get(&nonce).is_some()); - assert!(outstanding_requests.register_response(nonce, &shred, expire_timestamp - 1)); + assert!(outstanding_requests + .register_response(nonce, &shred, expire_timestamp - 1, |_| ()) + .is_some()); } assert!(outstanding_requests.requests.get(&nonce).is_none()); } diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index c9cc4c93c7..2b7b71f405 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -1,6 +1,7 @@ //! The `repair_service` module implements the tools necessary to generate a thread which //! regularly finds missing shreds in the ledger and sends repair requests for those shreds use crate::{ + ancestor_hashes_service::AncestorHashesService, cluster_info_vote_listener::VerifiedVoteReceiver, cluster_slots::ClusterSlots, duplicate_repair_status::DuplicateSlotRepairStatus, @@ -33,8 +34,8 @@ use std::{ time::{Duration, Instant}, }; -pub type DuplicateSlotsResetSender = CrossbeamSender; -pub type DuplicateSlotsResetReceiver = CrossbeamReceiver; +pub type DuplicateSlotsResetSender = CrossbeamSender>; +pub type DuplicateSlotsResetReceiver = CrossbeamReceiver>; pub type ConfirmedSlotsSender = CrossbeamSender>; pub type ConfirmedSlotsReceiver = CrossbeamReceiver>; pub type OutstandingShredRepairs = OutstandingRequests; @@ -46,6 +47,12 @@ pub struct SlotRepairs { pubkey_repairs: HashMap, } +impl SlotRepairs { + pub fn pubkey_repairs(&self) -> &HashMap { + &self.pubkey_repairs + } +} + #[derive(Default, Debug)] pub struct RepairStatsGroup { pub count: u64, @@ -111,8 +118,11 @@ pub const MAX_DUPLICATE_WAIT_MS: usize = 10_000; pub const REPAIR_MS: u64 = 100; pub const MAX_ORPHANS: usize = 5; +#[derive(Clone)] pub struct RepairInfo { pub bank_forks: Arc>, + pub cluster_info: Arc, + pub cluster_slots: Arc, pub epoch_schedule: EpochSchedule, pub duplicate_slots_reset_sender: DuplicateSlotsResetSender, pub repair_validators: Option>, @@ -134,6 +144,7 @@ impl Default for RepairSlotRange { pub struct RepairService { t_repair: JoinHandle<()>, + ancestor_hashes_service: AncestorHashesService, } impl RepairService { @@ -141,44 +152,54 @@ impl RepairService { blockstore: Arc, exit: Arc, repair_socket: Arc, - cluster_info: Arc, repair_info: RepairInfo, - cluster_slots: Arc, verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: Arc>, ) -> Self { - let t_repair = Builder::new() - .name("solana-repair-service".to_string()) - .spawn(move || { - Self::run( - &blockstore, - &exit, - &repair_socket, - cluster_info, - repair_info, - &cluster_slots, - verified_vote_receiver, - &outstanding_requests, - ) - }) - .unwrap(); + let t_repair = { + let blockstore = blockstore.clone(); + let exit = exit.clone(); + let repair_info = repair_info.clone(); + Builder::new() + .name("solana-repair-service".to_string()) + .spawn(move || { + Self::run( + &blockstore, + &exit, + &repair_socket, + repair_info, + verified_vote_receiver, + &outstanding_requests, + ) + }) + .unwrap() + }; - RepairService { t_repair } + let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); + let ancestor_hashes_service = AncestorHashesService::new( + exit, + blockstore, + ancestor_hashes_request_socket, + repair_info, + ); + + RepairService { + t_repair, + ancestor_hashes_service, + } } fn run( blockstore: &Blockstore, exit: &AtomicBool, repair_socket: &UdpSocket, - cluster_info: Arc, repair_info: RepairInfo, - cluster_slots: &ClusterSlots, verified_vote_receiver: VerifiedVoteReceiver, outstanding_requests: &RwLock, ) { let mut repair_weight = RepairWeight::new(repair_info.bank_forks.read().unwrap().root()); - let serve_repair = ServeRepair::new(cluster_info.clone()); - let id = cluster_info.id(); + let serve_repair = ServeRepair::new(repair_info.cluster_info.clone()); + let id = repair_info.cluster_info.id(); let mut repair_stats = RepairStats::default(); let mut repair_timing = RepairTiming::default(); let mut last_stats = Instant::now(); @@ -243,7 +264,7 @@ impl RepairService { let mut outstanding_requests = outstanding_requests.write().unwrap(); repairs.into_iter().for_each(|repair_request| { if let Ok((to, req)) = serve_repair.repair_request( - cluster_slots, + &repair_info.cluster_slots, repair_request, &mut peers_cache, &mut repair_stats, @@ -389,12 +410,12 @@ impl RepairService { repairs: &mut Vec, max_repairs: usize, slot: Slot, - ancestor_hashes_request_statuses: &impl Contains<'a, Slot>, + duplicate_slot_repair_statuses: &impl Contains<'a, Slot>, ) { let mut pending_slots = vec![slot]; while repairs.len() < max_repairs && !pending_slots.is_empty() { let slot = pending_slots.pop().unwrap(); - if ancestor_hashes_request_statuses.contains(&slot) { + if duplicate_slot_repair_statuses.contains(&slot) { // These are repaired through a different path continue; } @@ -554,7 +575,8 @@ impl RepairService { } pub fn join(self) -> thread::Result<()> { - self.t_repair.join() + self.t_repair.join()?; + self.ancestor_hashes_service.join() } } @@ -875,7 +897,7 @@ mod test { let cluster_slots = ClusterSlots::default(); let serve_repair = ServeRepair::new(Arc::new(new_test_cluster_info(Node::new_localhost().info))); - let mut ancestor_hashes_request_statuses = HashMap::new(); + let mut duplicate_slot_repair_statuses = HashMap::new(); let dead_slot = 9; let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); let duplicate_status = DuplicateSlotRepairStatus { @@ -891,12 +913,12 @@ mod test { .insert_shreds(shreds[..shreds.len() - 1].to_vec(), None, false) .unwrap(); - ancestor_hashes_request_statuses.insert(dead_slot, duplicate_status); + duplicate_slot_repair_statuses.insert(dead_slot, duplicate_status); // There is no repair_addr, so should not get filtered because the timeout // `std::u64::MAX` has not expired RepairService::generate_and_send_duplicate_repairs( - &mut ancestor_hashes_request_statuses, + &mut duplicate_slot_repair_statuses, &cluster_slots, &blockstore, &serve_repair, @@ -905,23 +927,23 @@ mod test { &None, &RwLock::new(OutstandingRequests::default()), ); - assert!(ancestor_hashes_request_statuses + assert!(duplicate_slot_repair_statuses .get(&dead_slot) .unwrap() .repair_pubkey_and_addr .is_none()); - assert!(ancestor_hashes_request_statuses.get(&dead_slot).is_some()); + assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); // Give the slot a repair address - ancestor_hashes_request_statuses + duplicate_slot_repair_statuses .get_mut(&dead_slot) .unwrap() .repair_pubkey_and_addr = Some((Pubkey::default(), receive_socket.local_addr().unwrap())); - // Slot is not yet full, should not get filtered from `ancestor_hashes_request_statuses` + // Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses` RepairService::generate_and_send_duplicate_repairs( - &mut ancestor_hashes_request_statuses, + &mut duplicate_slot_repair_statuses, &cluster_slots, &blockstore, &serve_repair, @@ -930,16 +952,16 @@ mod test { &None, &RwLock::new(OutstandingRequests::default()), ); - assert_eq!(ancestor_hashes_request_statuses.len(), 1); - assert!(ancestor_hashes_request_statuses.get(&dead_slot).is_some()); + assert_eq!(duplicate_slot_repair_statuses.len(), 1); + assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); // Insert rest of shreds. Slot is full, should get filtered from - // `ancestor_hashes_request_statuses` + // `duplicate_slot_repair_statuses` blockstore .insert_shreds(vec![shreds.pop().unwrap()], None, false) .unwrap(); RepairService::generate_and_send_duplicate_repairs( - &mut ancestor_hashes_request_statuses, + &mut duplicate_slot_repair_statuses, &cluster_slots, &blockstore, &serve_repair, @@ -948,7 +970,7 @@ mod test { &None, &RwLock::new(OutstandingRequests::default()), ); - assert!(ancestor_hashes_request_statuses.is_empty()); + assert!(duplicate_slot_repair_statuses.is_empty()); } #[test] diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 93ddc49c40..ee8bd78b61 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -574,10 +574,11 @@ impl RetransmitStage { epoch_schedule, duplicate_slots_reset_sender, repair_validators, + cluster_info: cluster_info.clone(), + cluster_slots, }; let window_service = WindowService::new( blockstore, - cluster_info.clone(), verified_receiver, retransmit_sender, repair_socket, @@ -599,7 +600,6 @@ impl RetransmitStage { ); rv && is_connected }, - cluster_slots, verified_vote_receiver, completed_data_sets_sender, duplicate_slots_sender, diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 0d7b8a3d8a..36b23993bd 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -95,20 +95,25 @@ impl RequestResponse for ShredRepairType { } } -pub struct AncestorHashesRepair(Slot); +pub struct AncestorHashesRepairType(pub Slot); +impl AncestorHashesRepairType { + pub fn slot(&self) -> Slot { + self.0 + } +} + #[derive(Serialize, Deserialize)] pub enum AncestorHashesResponseVersion { Current(Vec), } impl AncestorHashesResponseVersion { - #[cfg(test)] - fn into_slot_hashes(self) -> Vec { + pub fn into_slot_hashes(self) -> Vec { match self { AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes, } } - fn slot_hashes(&self) -> &[SlotHash] { + pub fn slot_hashes(&self) -> &[SlotHash] { match self { AncestorHashesResponseVersion::Current(slot_hashes) => slot_hashes, } @@ -121,7 +126,7 @@ impl AncestorHashesResponseVersion { } } -impl RequestResponse for AncestorHashesRepair { +impl RequestResponse for AncestorHashesRepairType { type Response = AncestorHashesResponseVersion; fn num_expected_responses(&self) -> u32 { 1 @@ -474,6 +479,16 @@ impl ServeRepair { Ok(out) } + pub fn ancestor_repair_request_bytes( + &self, + request_slot: Slot, + nonce: Nonce, + ) -> Result> { + let repair_request = RepairProtocol::AncestorHashes(self.my_info(), request_slot, nonce); + let out = serialize(&repair_request)?; + Ok(out) + } + pub(crate) fn repair_request( &self, cluster_slots: &ClusterSlots, @@ -1346,7 +1361,7 @@ mod tests { #[test] fn test_verify_ancestor_response() { let request_slot = MAX_ANCESTOR_RESPONSES as Slot; - let repair = AncestorHashesRepair(request_slot); + let repair = AncestorHashesRepairType(request_slot); let mut response: Vec = (0..request_slot) .into_iter() .map(|slot| (slot, Hash::new_unique())) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 21bfa17431..227748b1ca 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -3,7 +3,6 @@ //! use crate::{ cluster_info_vote_listener::VerifiedVoteReceiver, - cluster_slots::ClusterSlots, completed_data_sets_service::CompletedDataSetsSender, outstanding_requests::OutstandingRequests, repair_response, @@ -131,11 +130,14 @@ fn verify_repair( repair_meta .as_ref() .map(|repair_meta| { - outstanding_requests.register_response( - repair_meta.nonce, - shred, - solana_sdk::timing::timestamp(), - ) + outstanding_requests + .register_response( + repair_meta.nonce, + shred, + solana_sdk::timing::timestamp(), + |_| (), + ) + .is_some() }) .unwrap_or(true) } @@ -329,7 +331,6 @@ impl WindowService { #[allow(clippy::too_many_arguments)] pub fn new( blockstore: Arc, - cluster_info: Arc, verified_receiver: CrossbeamReceiver>, retransmit: PacketSender, repair_socket: Arc, @@ -337,7 +338,6 @@ impl WindowService { repair_info: RepairInfo, leader_schedule_cache: &Arc, shred_filter: F, - cluster_slots: Arc, verified_vote_receiver: VerifiedVoteReceiver, completed_data_sets_sender: CompletedDataSetsSender, duplicate_slots_sender: DuplicateSlotSender, @@ -352,14 +352,14 @@ impl WindowService { Arc::new(RwLock::new(OutstandingRequests::default())); let bank_forks = repair_info.bank_forks.clone(); + let cluster_info = repair_info.cluster_info.clone(); + let id = cluster_info.id(); let repair_service = RepairService::new( blockstore.clone(), exit.clone(), repair_socket, - cluster_info.clone(), repair_info, - cluster_slots, verified_vote_receiver, outstanding_requests.clone(), ); @@ -368,7 +368,7 @@ impl WindowService { let (duplicate_sender, duplicate_receiver) = unbounded(); let t_check_duplicate = Self::start_check_duplicate_thread( - cluster_info.clone(), + cluster_info, exit.clone(), blockstore.clone(), duplicate_receiver, @@ -386,7 +386,7 @@ impl WindowService { ); let t_window = Self::start_recv_window_thread( - cluster_info.id(), + id, exit, &blockstore, insert_sender,