Add repair breakdown by slot and index (#10717)

* Slot full logging

* Repair stats logging

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-06-19 18:28:15 -07:00 committed by GitHub
parent cae22efd0e
commit a33fef9af2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 30 deletions

View File

@ -29,22 +29,39 @@ use std::{
pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>; pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>; pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>;
#[derive(Default)] #[derive(Default, Debug)]
pub struct SlotRepairs {
highest_shred_index: u64,
// map from pubkey to total number of requests
pubkey_repairs: HashMap<Pubkey, u64>,
}
#[derive(Default, Debug)]
pub struct RepairStatsGroup { pub struct RepairStatsGroup {
pub count: u64, pub count: u64,
pub min: u64, pub min: u64,
pub max: u64, pub max: u64,
pub slot_pubkeys: HashMap<Slot, SlotRepairs>,
} }
impl RepairStatsGroup { impl RepairStatsGroup {
pub fn update(&mut self, slot: u64) { pub fn update(&mut self, repair_peer_id: &Pubkey, slot: Slot, shred_index: u64) {
self.count += 1; self.count += 1;
let slot_repairs = self.slot_pubkeys.entry(slot).or_default();
// Increment total number of repairs of this type for this pubkey by 1
*slot_repairs
.pubkey_repairs
.entry(*repair_peer_id)
.or_default() += 1;
// Update the max requested shred index for this slot
slot_repairs.highest_shred_index =
std::cmp::max(slot_repairs.highest_shred_index, shred_index);
self.min = std::cmp::min(self.min, slot); self.min = std::cmp::min(self.min, slot);
self.max = std::cmp::max(self.max, slot); self.max = std::cmp::max(self.max, slot);
} }
} }
#[derive(Default)] #[derive(Default, Debug)]
pub struct RepairStats { pub struct RepairStats {
pub shred: RepairStatsGroup, pub shred: RepairStatsGroup,
pub highest_shred: RepairStatsGroup, pub highest_shred: RepairStatsGroup,
@ -81,7 +98,7 @@ impl Default for RepairSlotRange {
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct DuplicateSlotRepairStatus { pub struct DuplicateSlotRepairStatus {
start: u64, start: u64,
repair_addr: Option<SocketAddr>, repair_pubkey_and_addr: Option<(Pubkey, SocketAddr)>,
} }
pub struct RepairService { pub struct RepairService {
@ -197,6 +214,7 @@ impl RepairService {
let repair_total = repair_stats.shred.count let repair_total = repair_stats.shred.count
+ repair_stats.highest_shred.count + repair_stats.highest_shred.count
+ repair_stats.orphan.count; + repair_stats.orphan.count;
info!("repair_stats: {:#?}", repair_stats);
if repair_total > 0 { if repair_total > 0 {
datapoint_info!( datapoint_info!(
"serve_repair-repair", "serve_repair-repair",
@ -307,7 +325,7 @@ impl RepairService {
) { ) {
duplicate_slot_repair_statuses.retain(|slot, status| { duplicate_slot_repair_statuses.retain(|slot, status| {
Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair); Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair);
if let Some(repair_addr) = status.repair_addr { if let Some((repair_pubkey, repair_addr)) = status.repair_pubkey_and_addr {
let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot); let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot);
if let Some(repairs) = repairs { if let Some(repairs) = repairs {
@ -315,12 +333,16 @@ impl RepairService {
if let Err(e) = Self::serialize_and_send_request( if let Err(e) = Self::serialize_and_send_request(
&repair_type, &repair_type,
repair_socket, repair_socket,
&repair_pubkey,
&repair_addr, &repair_addr,
serve_repair, serve_repair,
repair_stats, repair_stats,
DEFAULT_NONCE, DEFAULT_NONCE,
) { ) {
info!("repair req send_to({}) error {:?}", repair_addr, e); info!(
"repair req send_to {} ({}) error {:?}",
repair_pubkey, repair_addr, e
);
} }
} }
true true
@ -336,12 +358,14 @@ impl RepairService {
fn serialize_and_send_request( fn serialize_and_send_request(
repair_type: &RepairType, repair_type: &RepairType,
repair_socket: &UdpSocket, repair_socket: &UdpSocket,
repair_pubkey: &Pubkey,
to: &SocketAddr, to: &SocketAddr,
serve_repair: &ServeRepair, serve_repair: &ServeRepair,
repair_stats: &mut RepairStats, repair_stats: &mut RepairStats,
nonce: Nonce, nonce: Nonce,
) -> Result<()> { ) -> Result<()> {
let req = serve_repair.map_repair_request(&repair_type, repair_stats, nonce)?; let req =
serve_repair.map_repair_request(&repair_type, repair_pubkey, repair_stats, nonce)?;
repair_socket.send_to(&req, to)?; repair_socket.send_to(&req, to)?;
Ok(()) Ok(())
} }
@ -353,12 +377,12 @@ impl RepairService {
serve_repair: &ServeRepair, serve_repair: &ServeRepair,
) { ) {
let now = timestamp(); let now = timestamp();
if status.repair_addr.is_none() if status.repair_pubkey_and_addr.is_none()
|| now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64 || now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64
{ {
let repair_addr = let repair_pubkey_and_addr =
serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots); serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots);
status.repair_addr = repair_addr.ok(); status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok();
status.start = timestamp(); status.start = timestamp();
} }
} }
@ -395,12 +419,12 @@ impl RepairService {
// Mark this slot as special repair, try to download from single // Mark this slot as special repair, try to download from single
// validator to avoid corruption // validator to avoid corruption
let repair_addr = serve_repair let repair_pubkey_and_addr = serve_repair
.repair_request_duplicate_compute_best_peer(*slot, cluster_slots) .repair_request_duplicate_compute_best_peer(*slot, cluster_slots)
.ok(); .ok();
let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus { let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus {
start: timestamp(), start: timestamp(),
repair_addr, repair_pubkey_and_addr,
}; };
duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status); duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status);
} }
@ -423,7 +447,7 @@ impl RepairService {
warn!( warn!(
"Repaired version of slot {} most recently (but maybe not entirely) "Repaired version of slot {} most recently (but maybe not entirely)
from {:?} has failed again", from {:?} has failed again",
dead_slot, status.repair_addr dead_slot, status.repair_pubkey_and_addr
); );
} }
cluster_slots cluster_slots
@ -873,7 +897,7 @@ mod test {
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
let duplicate_status = DuplicateSlotRepairStatus { let duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX, start: std::u64::MAX,
repair_addr: None, repair_pubkey_and_addr: None,
}; };
// Insert some shreds to create a SlotMeta, // Insert some shreds to create a SlotMeta,
@ -898,7 +922,7 @@ mod test {
assert!(duplicate_slot_repair_statuses assert!(duplicate_slot_repair_statuses
.get(&dead_slot) .get(&dead_slot)
.unwrap() .unwrap()
.repair_addr .repair_pubkey_and_addr
.is_none()); .is_none());
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some()); assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());
@ -906,7 +930,8 @@ mod test {
duplicate_slot_repair_statuses duplicate_slot_repair_statuses
.get_mut(&dead_slot) .get_mut(&dead_slot)
.unwrap() .unwrap()
.repair_addr = Some(receive_socket.local_addr().unwrap()); .repair_pubkey_and_addr =
Some((Pubkey::default(), receive_socket.local_addr().unwrap()));
// Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses` // Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses`
RepairService::generate_and_send_duplicate_repairs( RepairService::generate_and_send_duplicate_repairs(
@ -938,7 +963,10 @@ mod test {
#[test] #[test]
pub fn test_update_duplicate_slot_repair_addr() { pub fn test_update_duplicate_slot_repair_addr() {
let dummy_addr = Some(UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap()); let dummy_addr = Some((
Pubkey::default(),
UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(),
));
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair( let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
Node::new_localhost().info, Node::new_localhost().info,
)); ));
@ -956,7 +984,7 @@ mod test {
// address // address
let mut duplicate_status = DuplicateSlotRepairStatus { let mut duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX, start: std::u64::MAX,
repair_addr: dummy_addr, repair_pubkey_and_addr: dummy_addr,
}; };
RepairService::update_duplicate_slot_repair_addr( RepairService::update_duplicate_slot_repair_addr(
dead_slot, dead_slot,
@ -964,12 +992,12 @@ mod test {
&cluster_slots, &cluster_slots,
&serve_repair, &serve_repair,
); );
assert_eq!(duplicate_status.repair_addr, dummy_addr); assert_eq!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
// If the repair address is None, should try to update // If the repair address is None, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus { let mut duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX, start: std::u64::MAX,
repair_addr: None, repair_pubkey_and_addr: None,
}; };
RepairService::update_duplicate_slot_repair_addr( RepairService::update_duplicate_slot_repair_addr(
dead_slot, dead_slot,
@ -977,12 +1005,12 @@ mod test {
&cluster_slots, &cluster_slots,
&serve_repair, &serve_repair,
); );
assert!(duplicate_status.repair_addr.is_some()); assert!(duplicate_status.repair_pubkey_and_addr.is_some());
// If sufficient time has passssed, should try to update // If sufficient time has passed, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus { let mut duplicate_status = DuplicateSlotRepairStatus {
start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64, start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64,
repair_addr: dummy_addr, repair_pubkey_and_addr: dummy_addr,
}; };
RepairService::update_duplicate_slot_repair_addr( RepairService::update_duplicate_slot_repair_addr(
dead_slot, dead_slot,
@ -990,7 +1018,7 @@ mod test {
&cluster_slots, &cluster_slots,
&serve_repair, &serve_repair,
); );
assert_ne!(duplicate_status.repair_addr, dummy_addr); assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
} }
#[test] #[test]

View File

@ -397,7 +397,13 @@ impl ServeRepair {
let (repair_peers, weights) = cache.get(&slot).unwrap(); let (repair_peers, weights) = cache.get(&slot).unwrap();
let n = weighted_best(&weights, Pubkey::new_rand().to_bytes()); let n = weighted_best(&weights, Pubkey::new_rand().to_bytes());
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
let out = self.map_repair_request(&repair_request, repair_stats, DEFAULT_NONCE)?; let repair_peer_id = repair_peers[n].id;
let out = self.map_repair_request(
&repair_request,
&repair_peer_id,
repair_stats,
DEFAULT_NONCE,
)?;
Ok((addr, out)) Ok((addr, out))
} }
@ -405,33 +411,38 @@ impl ServeRepair {
&self, &self,
slot: Slot, slot: Slot,
cluster_slots: &ClusterSlots, cluster_slots: &ClusterSlots,
) -> Result<SocketAddr> { ) -> Result<(Pubkey, SocketAddr)> {
let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot); let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot);
if repair_peers.is_empty() { if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into()); return Err(ClusterInfoError::NoPeers.into());
} }
let weights = cluster_slots.compute_weights_exclude_noncomplete(slot, &repair_peers); let weights = cluster_slots.compute_weights_exclude_noncomplete(slot, &repair_peers);
let n = weighted_best(&weights, Pubkey::new_rand().to_bytes()); let n = weighted_best(&weights, Pubkey::new_rand().to_bytes());
Ok(repair_peers[n].serve_repair) Ok((repair_peers[n].id, repair_peers[n].serve_repair))
} }
pub fn map_repair_request( pub fn map_repair_request(
&self, &self,
repair_request: &RepairType, repair_request: &RepairType,
repair_peer_id: &Pubkey,
repair_stats: &mut RepairStats, repair_stats: &mut RepairStats,
nonce: Nonce, nonce: Nonce,
) -> Result<Vec<u8>> { ) -> Result<Vec<u8>> {
match repair_request { match repair_request {
RepairType::Shred(slot, shred_index) => { RepairType::Shred(slot, shred_index) => {
repair_stats.shred.update(*slot); repair_stats
.shred
.update(repair_peer_id, *slot, *shred_index);
Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?) Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?)
} }
RepairType::HighestShred(slot, shred_index) => { RepairType::HighestShred(slot, shred_index) => {
repair_stats.highest_shred.update(*slot); repair_stats
.highest_shred
.update(repair_peer_id, *slot, *shred_index);
Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?) Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?)
} }
RepairType::Orphan(slot) => { RepairType::Orphan(slot) => {
repair_stats.orphan.update(*slot); repair_stats.orphan.update(repair_peer_id, *slot, 0);
Ok(self.orphan_bytes(*slot, nonce)?) Ok(self.orphan_bytes(*slot, nonce)?)
} }
} }

View File

@ -1125,6 +1125,12 @@ impl Blockstore {
new_consumed, new_consumed,
shred.reference_tick(), shred.reference_tick(),
); );
if slot_meta.is_full() {
info!(
"slot {} is full, last: {}",
slot_meta.slot, slot_meta.last_index
);
}
data_index.set_present(index, true); data_index.set_present(index, true);
trace!("inserted shred into slot {:?} and index {:?}", slot, index); trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(()) Ok(())