diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index cd250c2123..8c0c0e9b28 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -8,7 +8,6 @@ use rand::SeedableRng; use rand_chacha::ChaChaRng; use solana_ledger::blocktree::Blocktree; use solana_ledger::rooted_slot_iterator::RootedSlotIterator; -use solana_metrics::datapoint; use solana_sdk::{epoch_schedule::EpochSchedule, pubkey::Pubkey}; use std::{ cmp, @@ -27,6 +26,9 @@ pub const REPAIR_REDUNDANCY: usize = 1; pub const NUM_BUFFER_SLOTS: usize = 50; pub const GOSSIP_DELAY_SLOTS: usize = 2; pub const NUM_SLOTS_PER_UPDATE: usize = 2; +// Time between allowing repair for same slot for same validator +pub const REPAIR_SAME_SLOT_THRESHOLD: u64 = 5000; +use solana_sdk::timing::timestamp; // Represents the blobs that a repairman is responsible for repairing in specific slot. More // specifically, a repairman is responsible for every blob in this slot with index @@ -73,6 +75,13 @@ impl Iterator for BlobIndexesToRepairIterator { } } +#[derive(Default)] +struct RepaireeInfo { + last_root: u64, + last_ts: u64, + last_repaired_slot_and_ts: (u64, u64), +} + pub struct ClusterInfoRepairListener { thread_hdls: Vec>, } @@ -93,10 +102,10 @@ impl ClusterInfoRepairListener { // 1) The latest timestamp of the EpochSlots gossip message at which a repair was // sent to this peer // 2) The latest root the peer gossiped - let mut peer_roots: HashMap = HashMap::new(); + let mut peer_infos: HashMap = HashMap::new(); let _ = Self::recv_loop( &blocktree, - &mut peer_roots, + &mut peer_infos, &exit, &cluster_info, &epoch_schedule, @@ -110,7 +119,7 @@ impl ClusterInfoRepairListener { fn recv_loop( blocktree: &Blocktree, - peer_roots: &mut HashMap, + peer_infos: &mut HashMap, exit: &Arc, cluster_info: &Arc>, epoch_schedule: &EpochSchedule, @@ -134,7 +143,7 @@ impl ClusterInfoRepairListener { &my_pubkey, &peer.id, cluster_info, - peer_roots, + peer_infos, &mut my_gossiped_root, ) { peers_needing_repairs.insert(peer.id, repairee_epoch_slots); @@ -145,7 +154,7 @@ impl ClusterInfoRepairListener { let _ = Self::serve_repairs( &my_pubkey, blocktree, - peer_roots, + peer_infos, &peers_needing_repairs, &socket, cluster_info, @@ -161,10 +170,10 @@ impl ClusterInfoRepairListener { my_pubkey: &Pubkey, peer_pubkey: &Pubkey, cluster_info: &Arc>, - peer_roots: &mut HashMap, + peer_infos: &mut HashMap, my_gossiped_root: &mut u64, ) -> Option { - let last_cached_repair_ts = Self::get_last_ts(peer_pubkey, peer_roots); + let last_cached_repair_ts = Self::get_last_ts(peer_pubkey, peer_infos); let my_root = Self::read_my_gossiped_root(&my_pubkey, cluster_info, my_gossiped_root); { let r_cluster_info = cluster_info.read().unwrap(); @@ -173,8 +182,8 @@ impl ClusterInfoRepairListener { if let Some((peer_epoch_slots, updated_ts)) = r_cluster_info.get_epoch_state_for_node(&peer_pubkey, last_cached_repair_ts) { - let peer_entry = peer_roots.entry(*peer_pubkey).or_default(); - let peer_root = cmp::max(peer_epoch_slots.root, peer_entry.1); + let peer_info = peer_infos.entry(*peer_pubkey).or_default(); + let peer_root = cmp::max(peer_epoch_slots.root, peer_info.last_root); let mut result = None; let last_repair_ts = { // Following logic needs to be fast because it holds the lock @@ -185,11 +194,12 @@ impl ClusterInfoRepairListener { updated_ts } else { // No repairs were sent, don't need to update the timestamp - peer_entry.0 + peer_info.last_ts } }; - *peer_entry = (last_repair_ts, peer_root); + peer_info.last_ts = last_repair_ts; + peer_info.last_root = peer_root; result } else { None @@ -200,7 +210,7 @@ impl ClusterInfoRepairListener { fn serve_repairs( my_pubkey: &Pubkey, blocktree: &Blocktree, - peer_roots: &HashMap, + peer_infos: &mut HashMap, repairees: &HashMap, socket: &UdpSocket, cluster_info: &Arc>, @@ -221,7 +231,7 @@ impl ClusterInfoRepairListener { let mut eligible_repairmen = Self::find_eligible_repairmen( my_pubkey, repairee_root, - peer_roots, + peer_infos, NUM_BUFFER_SLOTS, ); @@ -234,7 +244,7 @@ impl ClusterInfoRepairListener { let my_root = Self::read_my_gossiped_root(my_pubkey, cluster_info, my_gossiped_root); - let _ = Self::serve_repairs_to_repairee( + let repair_results = Self::serve_repairs_to_repairee( my_pubkey, repairee_pubkey, my_root, @@ -245,7 +255,16 @@ impl ClusterInfoRepairListener { &repairee_addr, NUM_SLOTS_PER_UPDATE, epoch_schedule, + peer_infos + .get(repairee_pubkey) + .unwrap() + .last_repaired_slot_and_ts, ); + + if let Ok(Some(new_last_repaired_slot)) = repair_results { + let peer_info = peer_infos.get_mut(repairee_pubkey).unwrap(); + peer_info.last_repaired_slot_and_ts = (new_last_repaired_slot, timestamp()); + } } } @@ -264,14 +283,15 @@ impl ClusterInfoRepairListener { repairee_addr: &SocketAddr, num_slots_to_repair: usize, epoch_schedule: &EpochSchedule, - ) -> Result<()> { + last_repaired_slot_and_ts: (u64, u64), + ) -> Result> { let slot_iter = RootedSlotIterator::new(repairee_epoch_slots.root, &blocktree); if slot_iter.is_err() { info!( "Root for repairee is on different fork. My root: {}, repairee_root: {} repairee_pubkey: {:?}", my_root, repairee_epoch_slots.root, repairee_pubkey, ); - return Ok(()); + return Ok(None); } let mut slot_iter = slot_iter?; @@ -284,9 +304,13 @@ impl ClusterInfoRepairListener { let max_confirmed_repairee_slot = epoch_schedule.get_last_slot_in_epoch(max_confirmed_repairee_epoch); + let last_repaired_slot = last_repaired_slot_and_ts.0; + let last_repaired_ts = last_repaired_slot_and_ts.1; + // Skip the first slot in the iterator because we know it's the root slot which the repairee // already has slot_iter.next(); + let mut new_repaired_slot: Option = None; for (slot, slot_meta) in slot_iter { if slot > my_root || num_slots_repaired >= num_slots_to_repair @@ -303,49 +327,76 @@ impl ClusterInfoRepairListener { // calculate_my_repairman_index_for_slot() will divide responsibility evenly across // the cluster let num_blobs_in_slot = slot_meta.received as usize; + + // Check if I'm responsible for repairing this slots if let Some(my_repair_indexes) = Self::calculate_my_repairman_index_for_slot( my_pubkey, &eligible_repairmen, num_blobs_in_slot, REPAIR_REDUNDANCY, ) { - // Repairee is missing this slot, send them the blobs for this slot - for blob_index in my_repair_indexes { - // Loop over the sblob indexes and query the database for these blob that - // this node is reponsible for repairing. This should be faster than using - // a database iterator over the slots because by the time this node is - // sending the blobs in this slot for repair, we expect these slots - // to be full. - if let Some(blob_data) = blocktree - .get_data_shred(slot, blob_index as u64) - .expect("Failed to read data blob from blocktree") - { - socket.send_to(&blob_data[..], repairee_addr)?; - total_data_blobs_sent += 1; + // If I've already sent blobs >= this slot before, then don't send them again + // until the timeout has expired + if slot > last_repaired_slot + || timestamp() - last_repaired_ts > REPAIR_SAME_SLOT_THRESHOLD + { + error!( + "Serving repair for slot {} to {}. Repairee slots: {:?}", + slot, repairee_pubkey, repairee_epoch_slots.slots + ); + // Repairee is missing this slot, send them the blobs for this slot + for blob_index in my_repair_indexes { + // Loop over the blob indexes and query the database for these blob that + // this node is reponsible for repairing. This should be faster than using + // a database iterator over the slots because by the time this node is + // sending the blobs in this slot for repair, we expect these slots + // to be full. + if let Some(blob_data) = blocktree + .get_data_shred(slot, blob_index as u64) + .expect("Failed to read data blob from blocktree") + { + socket.send_to(&blob_data[..], repairee_addr)?; + total_data_blobs_sent += 1; + } + + if let Some(coding_bytes) = blocktree + .get_coding_shred(slot, blob_index as u64) + .expect("Failed to read coding blob from blocktree") + { + socket.send_to(&coding_bytes[..], repairee_addr)?; + total_coding_blobs_sent += 1; + } } - if let Some(coding_bytes) = blocktree - .get_coding_shred(slot, blob_index as u64) - .expect("Failed to read coding blob from blocktree") - { - socket.send_to(&coding_bytes[..], repairee_addr)?; - total_coding_blobs_sent += 1; - } + new_repaired_slot = Some(slot); + Self::report_repair_metrics( + slot, + repairee_pubkey, + total_data_blobs_sent, + total_coding_blobs_sent, + ); + total_data_blobs_sent = 0; + total_coding_blobs_sent = 0; } - num_slots_repaired += 1; } } } - Self::report_repair_metrics(total_data_blobs_sent, total_coding_blobs_sent); - Ok(()) + Ok(new_repaired_slot) } - fn report_repair_metrics(total_data_blobs_sent: u64, total_coding_blobs_sent: u64) { + fn report_repair_metrics( + slot: u64, + repairee_id: &Pubkey, + total_data_blobs_sent: u64, + total_coding_blobs_sent: u64, + ) { if total_data_blobs_sent > 0 || total_coding_blobs_sent > 0 { datapoint!( "repairman_activity", + ("slot", slot, i64), + ("repairee_id", repairee_id.to_string(), String), ("data_sent", total_data_blobs_sent, i64), ("coding_sent", total_coding_blobs_sent, i64) ); @@ -407,14 +458,14 @@ impl ClusterInfoRepairListener { fn find_eligible_repairmen<'a>( my_pubkey: &'a Pubkey, repairee_root: u64, - repairman_roots: &'a HashMap, + repairman_roots: &'a HashMap, num_buffer_slots: usize, ) -> Vec<&'a Pubkey> { let mut repairmen: Vec<_> = repairman_roots .iter() - .filter_map(|(repairman_pubkey, (_, repairman_root))| { + .filter_map(|(repairman_pubkey, repairman_info)| { if Self::should_repair_peer( - *repairman_root, + repairman_info.last_root, repairee_root, num_buffer_slots - GOSSIP_DELAY_SLOTS, ) { @@ -461,8 +512,8 @@ impl ClusterInfoRepairListener { repairman_root > repairee_root + num_buffer_slots as u64 } - fn get_last_ts(pubkey: &Pubkey, peer_roots: &mut HashMap) -> Option { - peer_roots.get(pubkey).map(|(last_ts, _)| *last_ts) + fn get_last_ts(pubkey: &Pubkey, peer_infos: &mut HashMap) -> Option { + peer_infos.get(pubkey).map(|p| p.last_ts) } } @@ -564,7 +615,7 @@ mod tests { ); // Set up locally cached information - let mut peer_roots = HashMap::new(); + let mut peer_info = HashMap::new(); let mut my_gossiped_root = repairee_root; // Root is not sufficiently far ahead, we shouldn't repair @@ -572,7 +623,7 @@ mod tests { &my_pubkey, &peer_pubkey, &cluster_info, - &mut peer_roots, + &mut peer_info, &mut my_gossiped_root, ) .is_none()); @@ -584,7 +635,7 @@ mod tests { &my_pubkey, &peer_pubkey, &cluster_info, - &mut peer_roots, + &mut peer_info, &mut my_gossiped_root, ) .is_some()); @@ -596,7 +647,7 @@ mod tests { &my_pubkey, &peer_pubkey, &cluster_info, - &mut peer_roots, + &mut peer_info, &mut my_gossiped_root, ) .is_none()); @@ -612,12 +663,78 @@ mod tests { &my_pubkey, &peer_pubkey, &cluster_info, - &mut peer_roots, + &mut peer_info, &mut my_gossiped_root, ) .is_some()); } + #[test] + fn test_serve_same_repairs_to_repairee() { + let blocktree_path = get_tmp_ledger_path!(); + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + let num_slots = 2; + let (shreds, _) = make_many_slot_entries(0, num_slots, 1); + blocktree.insert_shreds(shreds, None).unwrap(); + + // Write roots so that these slots will qualify to be sent by the repairman + let last_root = num_slots - 1; + let roots: Vec<_> = (0..=last_root).collect(); + blocktree.set_roots(&roots).unwrap(); + + // Set up my information + let my_pubkey = Pubkey::new_rand(); + let my_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + + // Set up a mock repairee with a socket listening for incoming repairs + let mock_repairee = MockRepairee::make_mock_repairee(); + + // Set up the repairee's EpochSlots, such that they are missing every odd indexed slot + // in the range (repairee_root, num_slots] + let repairee_root = 0; + let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect(); + let repairee_epoch_slots = + EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots, 1); + let eligible_repairmen = vec![&my_pubkey]; + let epoch_schedule = EpochSchedule::custom(32, 16, false); + assert!(ClusterInfoRepairListener::serve_repairs_to_repairee( + &my_pubkey, + &mock_repairee.id, + num_slots - 1, + &blocktree, + &repairee_epoch_slots, + &eligible_repairmen, + &my_socket, + &mock_repairee.tvu_address, + 1, + &epoch_schedule, + // Simulate having already sent a slot very recently + (last_root, timestamp()), + ) + .unwrap() + .is_none()); + + // Simulate the threshold having elapsed, allowing the repairman + // to send the slot again + assert_eq!( + ClusterInfoRepairListener::serve_repairs_to_repairee( + &my_pubkey, + &mock_repairee.id, + num_slots - 1, + &blocktree, + &repairee_epoch_slots, + &eligible_repairmen, + &my_socket, + &mock_repairee.tvu_address, + 1, + &epoch_schedule, + (last_root, timestamp() - REPAIR_SAME_SLOT_THRESHOLD * 2), + ) + .unwrap(), + Some(1) + ); + } + #[test] fn test_serve_repairs_to_repairee() { let blocktree_path = get_tmp_ledger_path!(); @@ -671,6 +788,7 @@ mod tests { &mock_repairee.tvu_address, num_missing_slots as usize, &epoch_schedule, + (0, 0), ) .unwrap(); } @@ -741,6 +859,7 @@ mod tests { &mock_repairee.tvu_address, 1 as usize, &epoch_schedule, + (0, 0), ) .unwrap(); @@ -763,6 +882,7 @@ mod tests { &mock_repairee.tvu_address, 1 as usize, &epoch_schedule, + (0, 0), ) .unwrap();