diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index dc7d594a0a..0fc032dd85 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -29,22 +29,39 @@ use std::{ pub type DuplicateSlotsResetSender = CrossbeamSender; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver; -#[derive(Default)] +#[derive(Default, Debug)] +pub struct SlotRepairs { + highest_shred_index: u64, + // map from pubkey to total number of requests + pubkey_repairs: HashMap, +} + +#[derive(Default, Debug)] pub struct RepairStatsGroup { pub count: u64, pub min: u64, pub max: u64, + pub slot_pubkeys: HashMap, } impl RepairStatsGroup { - pub fn update(&mut self, slot: u64) { + pub fn update(&mut self, repair_peer_id: &Pubkey, slot: Slot, shred_index: u64) { self.count += 1; + let slot_repairs = self.slot_pubkeys.entry(slot).or_default(); + // Increment total number of repairs of this type for this pubkey by 1 + *slot_repairs + .pubkey_repairs + .entry(*repair_peer_id) + .or_default() += 1; + // Update the max requested shred index for this slot + slot_repairs.highest_shred_index = + std::cmp::max(slot_repairs.highest_shred_index, shred_index); self.min = std::cmp::min(self.min, slot); self.max = std::cmp::max(self.max, slot); } } -#[derive(Default)] +#[derive(Default, Debug)] pub struct RepairStats { pub shred: RepairStatsGroup, pub highest_shred: RepairStatsGroup, @@ -81,7 +98,7 @@ impl Default for RepairSlotRange { #[derive(Default, Clone)] pub struct DuplicateSlotRepairStatus { start: u64, - repair_addr: Option, + repair_pubkey_and_addr: Option<(Pubkey, SocketAddr)>, } pub struct RepairService { @@ -197,6 +214,7 @@ impl RepairService { let repair_total = repair_stats.shred.count + repair_stats.highest_shred.count + repair_stats.orphan.count; + info!("repair_stats: {:#?}", repair_stats); if repair_total > 0 { datapoint_info!( "serve_repair-repair", @@ -307,7 +325,7 @@ impl RepairService { ) { duplicate_slot_repair_statuses.retain(|slot, status| { Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair); - if let Some(repair_addr) = status.repair_addr { + if let Some((repair_pubkey, repair_addr)) = status.repair_pubkey_and_addr { let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot); if let Some(repairs) = repairs { @@ -315,12 +333,16 @@ impl RepairService { if let Err(e) = Self::serialize_and_send_request( &repair_type, repair_socket, + &repair_pubkey, &repair_addr, serve_repair, repair_stats, DEFAULT_NONCE, ) { - info!("repair req send_to({}) error {:?}", repair_addr, e); + info!( + "repair req send_to {} ({}) error {:?}", + repair_pubkey, repair_addr, e + ); } } true @@ -336,12 +358,14 @@ impl RepairService { fn serialize_and_send_request( repair_type: &RepairType, repair_socket: &UdpSocket, + repair_pubkey: &Pubkey, to: &SocketAddr, serve_repair: &ServeRepair, repair_stats: &mut RepairStats, nonce: Nonce, ) -> Result<()> { - let req = serve_repair.map_repair_request(&repair_type, repair_stats, nonce)?; + let req = + serve_repair.map_repair_request(&repair_type, repair_pubkey, repair_stats, nonce)?; repair_socket.send_to(&req, to)?; Ok(()) } @@ -353,12 +377,12 @@ impl RepairService { serve_repair: &ServeRepair, ) { let now = timestamp(); - if status.repair_addr.is_none() + if status.repair_pubkey_and_addr.is_none() || now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64 { - let repair_addr = + let repair_pubkey_and_addr = serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots); - status.repair_addr = repair_addr.ok(); + status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok(); status.start = timestamp(); } } @@ -395,12 +419,12 @@ impl RepairService { // Mark this slot as special repair, try to download from single // validator to avoid corruption - let repair_addr = serve_repair + let repair_pubkey_and_addr = serve_repair .repair_request_duplicate_compute_best_peer(*slot, cluster_slots) .ok(); let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus { start: timestamp(), - repair_addr, + repair_pubkey_and_addr, }; duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status); } @@ -423,7 +447,7 @@ impl RepairService { warn!( "Repaired version of slot {} most recently (but maybe not entirely) from {:?} has failed again", - dead_slot, status.repair_addr + dead_slot, status.repair_pubkey_and_addr ); } cluster_slots @@ -873,7 +897,7 @@ mod test { let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); let duplicate_status = DuplicateSlotRepairStatus { start: std::u64::MAX, - repair_addr: None, + repair_pubkey_and_addr: None, }; // Insert some shreds to create a SlotMeta, @@ -898,7 +922,7 @@ mod test { assert!(duplicate_slot_repair_statuses .get(&dead_slot) .unwrap() - .repair_addr + .repair_pubkey_and_addr .is_none()); assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); @@ -906,7 +930,8 @@ mod test { duplicate_slot_repair_statuses .get_mut(&dead_slot) .unwrap() - .repair_addr = Some(receive_socket.local_addr().unwrap()); + .repair_pubkey_and_addr = + Some((Pubkey::default(), receive_socket.local_addr().unwrap())); // Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses` RepairService::generate_and_send_duplicate_repairs( @@ -938,7 +963,10 @@ mod test { #[test] pub fn test_update_duplicate_slot_repair_addr() { - let dummy_addr = Some(UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap()); + let dummy_addr = Some(( + Pubkey::default(), + UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(), + )); let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( Node::new_localhost().info, )); @@ -956,7 +984,7 @@ mod test { // address let mut duplicate_status = DuplicateSlotRepairStatus { start: std::u64::MAX, - repair_addr: dummy_addr, + repair_pubkey_and_addr: dummy_addr, }; RepairService::update_duplicate_slot_repair_addr( dead_slot, @@ -964,12 +992,12 @@ mod test { &cluster_slots, &serve_repair, ); - assert_eq!(duplicate_status.repair_addr, dummy_addr); + assert_eq!(duplicate_status.repair_pubkey_and_addr, dummy_addr); // If the repair address is None, should try to update let mut duplicate_status = DuplicateSlotRepairStatus { start: std::u64::MAX, - repair_addr: None, + repair_pubkey_and_addr: None, }; RepairService::update_duplicate_slot_repair_addr( dead_slot, @@ -977,12 +1005,12 @@ mod test { &cluster_slots, &serve_repair, ); - assert!(duplicate_status.repair_addr.is_some()); + assert!(duplicate_status.repair_pubkey_and_addr.is_some()); - // If sufficient time has passssed, should try to update + // If sufficient time has passed, should try to update let mut duplicate_status = DuplicateSlotRepairStatus { start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64, - repair_addr: dummy_addr, + repair_pubkey_and_addr: dummy_addr, }; RepairService::update_duplicate_slot_repair_addr( dead_slot, @@ -990,7 +1018,7 @@ mod test { &cluster_slots, &serve_repair, ); - assert_ne!(duplicate_status.repair_addr, dummy_addr); + assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr); } #[test] diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index 0610333e2e..3678238922 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -397,7 +397,13 @@ impl ServeRepair { let (repair_peers, weights) = cache.get(&slot).unwrap(); let n = weighted_best(&weights, Pubkey::new_rand().to_bytes()); let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port - let out = self.map_repair_request(&repair_request, repair_stats, DEFAULT_NONCE)?; + let repair_peer_id = repair_peers[n].id; + let out = self.map_repair_request( + &repair_request, + &repair_peer_id, + repair_stats, + DEFAULT_NONCE, + )?; Ok((addr, out)) } @@ -405,33 +411,38 @@ impl ServeRepair { &self, slot: Slot, cluster_slots: &ClusterSlots, - ) -> Result { + ) -> Result<(Pubkey, SocketAddr)> { let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot); if repair_peers.is_empty() { return Err(ClusterInfoError::NoPeers.into()); } let weights = cluster_slots.compute_weights_exclude_noncomplete(slot, &repair_peers); let n = weighted_best(&weights, Pubkey::new_rand().to_bytes()); - Ok(repair_peers[n].serve_repair) + Ok((repair_peers[n].id, repair_peers[n].serve_repair)) } pub fn map_repair_request( &self, repair_request: &RepairType, + repair_peer_id: &Pubkey, repair_stats: &mut RepairStats, nonce: Nonce, ) -> Result> { match repair_request { RepairType::Shred(slot, shred_index) => { - repair_stats.shred.update(*slot); + repair_stats + .shred + .update(repair_peer_id, *slot, *shred_index); Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?) } RepairType::HighestShred(slot, shred_index) => { - repair_stats.highest_shred.update(*slot); + repair_stats + .highest_shred + .update(repair_peer_id, *slot, *shred_index); Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?) } RepairType::Orphan(slot) => { - repair_stats.orphan.update(*slot); + repair_stats.orphan.update(repair_peer_id, *slot, 0); Ok(self.orphan_bytes(*slot, nonce)?) } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 84ce077da8..d703a4efb5 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1125,6 +1125,12 @@ impl Blockstore { new_consumed, shred.reference_tick(), ); + if slot_meta.is_full() { + info!( + "slot {} is full, last: {}", + slot_meta.slot, slot_meta.last_index + ); + } data_index.set_present(index, true); trace!("inserted shred into slot {:?} and index {:?}", slot, index); Ok(())