diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 11c7f759e..e363cdfdc 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -317,10 +317,10 @@ impl ClusterInfo { ) } - pub fn push_epoch_slots(&mut self, id: Pubkey, root: u64, slots: BTreeSet) { + pub fn push_epoch_slots(&mut self, id: Pubkey, root: Slot, min: Slot, slots: BTreeSet) { let now = timestamp(); let entry = CrdsValue::new_signed( - CrdsData::EpochSlots(EpochSlots::new(id, root, slots, now)), + CrdsData::EpochSlots(EpochSlots::new(id, root, min, slots, now)), &self.keypair, ); self.gossip @@ -489,13 +489,18 @@ impl ClusterInfo { .collect() } - /// all tvu peers with valid gossip addrs - fn repair_peers(&self) -> Vec { + /// all tvu peers with valid gossip addrs that likely have the slot being requested + fn repair_peers(&self, slot: Slot) -> Vec { let me = self.my_data().id; ClusterInfo::tvu_peers(self) .into_iter() .filter(|x| x.id != me) .filter(|x| ContactInfo::is_valid_address(&x.gossip)) + .filter(|x| { + self.get_epoch_state_for_node(&x.id, None) + .map(|(epoch_slots, _)| epoch_slots.lowest <= slot) + .unwrap_or_else(|| /* fallback to legacy behavior */ true) + }) .collect() } @@ -840,9 +845,9 @@ impl ClusterInfo { } pub fn repair_request(&self, repair_request: &RepairType) -> Result<(SocketAddr, Vec)> { - // find a peer that appears to be accepting replication, as indicated - // by a valid tvu port location - let valid: Vec<_> = self.repair_peers(); + // find a peer that appears to be accepting replication and has the desired slot, as indicated + // by a valid tvu port location + let valid: Vec<_> = self.repair_peers(repair_request.slot()); if valid.is_empty() { return Err(ClusterInfoError::NoPeers.into()); } @@ -2555,6 +2560,7 @@ mod tests { let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { from: Pubkey::default(), root: 0, + lowest: 0, slots: btree_slots, wallclock: 0, })); @@ -2571,6 +2577,7 @@ mod tests { let mut value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots { from: Pubkey::default(), root: 0, + lowest: 0, slots: BTreeSet::new(), wallclock: 0, })); @@ -2588,6 +2595,7 @@ mod tests { value.data = CrdsData::EpochSlots(EpochSlots { from: Pubkey::default(), root: 0, + lowest: 0, slots, wallclock: 0, }); @@ -2700,6 +2708,37 @@ mod tests { assert_eq!(pulls.get(0).unwrap().0, other_node.gossip); } + #[test] + fn test_repair_peers() { + let node_keypair = Arc::new(Keypair::new()); + let mut cluster_info = ClusterInfo::new( + ContactInfo::new_localhost(&node_keypair.pubkey(), timestamp()), + node_keypair, + ); + for i in 0..10 { + let mut peer_root = 5; + let mut peer_lowest = 0; + if i >= 5 { + // make these invalid for the upcoming repair request + peer_root = 15; + peer_lowest = 10; + } + let other_node_pubkey = Pubkey::new_rand(); + let other_node = ContactInfo::new_localhost(&other_node_pubkey, timestamp()); + cluster_info.insert_info(other_node.clone()); + let value = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( + other_node_pubkey, + peer_root, + peer_lowest, + BTreeSet::new(), + timestamp(), + ))); + let _ = cluster_info.gossip.crds.insert(value, timestamp()); + } + // only half the visible peers should be eligible to serve this repair + assert_eq!(cluster_info.repair_peers(5).len(), 5); + } + #[test] fn test_max_bloom_size() { assert_eq!(MAX_BLOOM_SIZE, max_bloom_size()); diff --git a/core/src/cluster_info_repair_listener.rs b/core/src/cluster_info_repair_listener.rs index 1e6f823ca..58be24515 100644 --- a/core/src/cluster_info_repair_listener.rs +++ b/core/src/cluster_info_repair_listener.rs @@ -27,6 +27,7 @@ pub const GOSSIP_DELAY_SLOTS: usize = 2; pub const NUM_SLOTS_PER_UPDATE: usize = 2; // Time between allowing repair for same slot for same validator pub const REPAIR_SAME_SLOT_THRESHOLD: u64 = 5000; +use solana_sdk::clock::Slot; use solana_sdk::timing::timestamp; // Represents the shreds that a repairman is responsible for repairing in specific slot. More @@ -76,9 +77,10 @@ impl Iterator for ShredIndexesToRepairIterator { #[derive(Default)] struct RepaireeInfo { - last_root: u64, + last_root: Slot, last_ts: u64, - last_repaired_slot_and_ts: (u64, u64), + last_repaired_slot_and_ts: (Slot, u64), + lowest_slot: Slot, } pub struct ClusterInfoRepairListener { @@ -132,6 +134,7 @@ impl ClusterInfoRepairListener { return Ok(()); } + let lowest_slot = blocktree.lowest_slot(); let peers = cluster_info.read().unwrap().gossip_peers(); let mut peers_needing_repairs: HashMap = HashMap::new(); @@ -144,6 +147,7 @@ impl ClusterInfoRepairListener { cluster_info, peer_infos, &mut my_gossiped_root, + lowest_slot, ) { peers_needing_repairs.insert(peer.id, repairee_epoch_slots); } @@ -170,7 +174,8 @@ impl ClusterInfoRepairListener { peer_pubkey: &Pubkey, cluster_info: &Arc>, peer_infos: &mut HashMap, - my_gossiped_root: &mut u64, + my_gossiped_root: &mut Slot, + my_lowest_slot: Slot, ) -> Option { let last_cached_repair_ts = Self::get_last_ts(peer_pubkey, peer_infos); let my_root = Self::read_my_gossiped_root(&my_pubkey, cluster_info, my_gossiped_root); @@ -187,7 +192,12 @@ impl ClusterInfoRepairListener { let last_repair_ts = { // Following logic needs to be fast because it holds the lock // preventing updates on gossip - if Self::should_repair_peer(my_root, peer_epoch_slots.root, NUM_BUFFER_SLOTS) { + if Self::should_repair_peer( + my_root, + my_lowest_slot, + peer_epoch_slots.root, + NUM_BUFFER_SLOTS, + ) { // Clone out EpochSlots structure to avoid holding lock on gossip result = Some(peer_epoch_slots.clone()); updated_ts @@ -199,6 +209,7 @@ impl ClusterInfoRepairListener { peer_info.last_ts = last_repair_ts; peer_info.last_root = peer_root; + peer_info.lowest_slot = peer_epoch_slots.lowest; result } else { None @@ -213,7 +224,7 @@ impl ClusterInfoRepairListener { repairees: &HashMap, socket: &UdpSocket, cluster_info: &Arc>, - my_gossiped_root: &mut u64, + my_gossiped_root: &mut Slot, epoch_schedule: &EpochSchedule, ) -> Result<()> { for (repairee_pubkey, repairee_epoch_slots) in repairees { @@ -274,7 +285,7 @@ impl ClusterInfoRepairListener { fn serve_repairs_to_repairee( my_pubkey: &Pubkey, repairee_pubkey: &Pubkey, - my_root: u64, + my_root: Slot, blocktree: &Blocktree, repairee_epoch_slots: &EpochSlots, eligible_repairmen: &[&Pubkey], @@ -283,7 +294,7 @@ impl ClusterInfoRepairListener { num_slots_to_repair: usize, epoch_schedule: &EpochSchedule, last_repaired_slot_and_ts: (u64, u64), - ) -> Result> { + ) -> Result> { let slot_iter = RootedSlotIterator::new(repairee_epoch_slots.root, &blocktree); if slot_iter.is_err() { info!( @@ -386,13 +397,13 @@ impl ClusterInfoRepairListener { } fn report_repair_metrics( - slot: u64, + slot: Slot, repairee_id: &Pubkey, total_data_shreds_sent: u64, total_coding_shreds_sent: u64, ) { if total_data_shreds_sent > 0 || total_coding_shreds_sent > 0 { - datapoint!( + datapoint_info!( "repairman_activity", ("slot", slot, i64), ("repairee_id", repairee_id.to_string(), String), @@ -405,7 +416,7 @@ impl ClusterInfoRepairListener { fn shuffle_repairmen( eligible_repairmen: &mut Vec<&Pubkey>, repairee_pubkey: &Pubkey, - repairee_root: u64, + repairee_root: Slot, ) { // Make a seed from pubkey + repairee root let mut seed = [0u8; mem::size_of::()]; @@ -456,7 +467,7 @@ impl ClusterInfoRepairListener { fn find_eligible_repairmen<'a>( my_pubkey: &'a Pubkey, - repairee_root: u64, + repairee_root: Slot, repairman_roots: &'a HashMap, num_buffer_slots: usize, ) -> Vec<&'a Pubkey> { @@ -465,6 +476,7 @@ impl ClusterInfoRepairListener { .filter_map(|(repairman_pubkey, repairman_info)| { if Self::should_repair_peer( repairman_info.last_root, + repairman_info.lowest_slot, repairee_root, num_buffer_slots - GOSSIP_DELAY_SLOTS, ) { @@ -484,7 +496,7 @@ impl ClusterInfoRepairListener { fn read_my_gossiped_root( my_pubkey: &Pubkey, cluster_info: &Arc>, - old_root: &mut u64, + old_root: &mut Slot, ) -> u64 { let new_root = cluster_info .read() @@ -502,13 +514,16 @@ impl ClusterInfoRepairListener { // Decide if a repairman with root == `repairman_root` should send repairs to a // potential repairee with root == `repairee_root` fn should_repair_peer( - repairman_root: u64, - repairee_root: u64, + repairman_root: Slot, + repairman_lowest_slot: Slot, + repairee_root: Slot, num_buffer_slots: usize, ) -> bool { - // Check if this potential repairman's root is greater than the repairee root + - // num_buffer_slots - repairman_root > repairee_root + num_buffer_slots as u64 + // Check if this potential repairman is likely to have slots needed to repair this repairee + // and that its root is greater than the repairee root + num_buffer_slots + // Only need to be able to repair slots that are 1 higher than root + repairman_lowest_slot <= repairee_root + 1 + && repairman_root > repairee_root + num_buffer_slots as u64 } fn get_last_ts(pubkey: &Pubkey, peer_infos: &mut HashMap) -> Option { @@ -612,6 +627,7 @@ mod tests { cluster_info.write().unwrap().push_epoch_slots( peer_pubkey, repairee_root, + 0, repairee_slots.clone(), ); @@ -626,6 +642,7 @@ mod tests { &cluster_info, &mut peer_info, &mut my_gossiped_root, + 0, ) .is_none()); @@ -638,6 +655,7 @@ mod tests { &cluster_info, &mut peer_info, &mut my_gossiped_root, + 0, ) .is_some()); @@ -650,22 +668,26 @@ mod tests { &cluster_info, &mut peer_info, &mut my_gossiped_root, + 0, ) .is_none()); // Sleep to make sure the timestamp is updated in gossip. Update the gossiped EpochSlots. // Now a repair should be sent again sleep(Duration::from_millis(10)); - cluster_info - .write() - .unwrap() - .push_epoch_slots(peer_pubkey, repairee_root, repairee_slots); + cluster_info.write().unwrap().push_epoch_slots( + peer_pubkey, + repairee_root, + 0, + repairee_slots, + ); assert!(ClusterInfoRepairListener::process_potential_repairee( &my_pubkey, &peer_pubkey, &cluster_info, &mut peer_info, &mut my_gossiped_root, + 0, ) .is_some()); } @@ -695,7 +717,7 @@ mod tests { let repairee_root = 0; let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect(); let repairee_epoch_slots = - EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots, 1); + EpochSlots::new(mock_repairee.id, repairee_root, 0, repairee_slots, 1); let eligible_repairmen = vec![&my_pubkey]; let epoch_schedule = EpochSchedule::custom(32, 16, false); assert!(ClusterInfoRepairListener::serve_repairs_to_repairee( @@ -765,7 +787,7 @@ mod tests { let repairee_root = 0; let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect(); let repairee_epoch_slots = - EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots, 1); + EpochSlots::new(mock_repairee.id, repairee_root, 0, repairee_slots, 1); // Mock out some other repairmen such that each repairman is responsible for 1 shred in a slot let num_repairmen = entries_per_slot - 1; @@ -857,8 +879,13 @@ mod tests { // already has all the slots for which they have a confirmed leader schedule let repairee_root = 0; let repairee_slots: BTreeSet<_> = (0..=slots_per_epoch).collect(); - let repairee_epoch_slots = - EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots.clone(), 1); + let repairee_epoch_slots = EpochSlots::new( + mock_repairee.id, + repairee_root, + 0, + repairee_slots.clone(), + 1, + ); ClusterInfoRepairListener::serve_repairs_to_repairee( &my_pubkey, @@ -882,7 +909,7 @@ mod tests { // Set the root to stakers_slot_offset, now epoch 2 should be confirmed, so the repairee // is now eligible to get slots from epoch 2: let repairee_epoch_slots = - EpochSlots::new(mock_repairee.id, stakers_slot_offset, repairee_slots, 1); + EpochSlots::new(mock_repairee.id, stakers_slot_offset, 0, repairee_slots, 1); ClusterInfoRepairListener::serve_repairs_to_repairee( &my_pubkey, &mock_repairee.id, @@ -1016,6 +1043,7 @@ mod tests { let repairee_root = 5; assert!(!ClusterInfoRepairListener::should_repair_peer( repairman_root, + 0, repairee_root, 0, )); @@ -1025,6 +1053,7 @@ mod tests { let repairee_root = 5; assert!(!ClusterInfoRepairListener::should_repair_peer( repairman_root, + 0, repairee_root, 0, )); @@ -1034,6 +1063,7 @@ mod tests { let repairee_root = 5; assert!(ClusterInfoRepairListener::should_repair_peer( repairman_root, + 0, repairee_root, 0, )); @@ -1043,6 +1073,7 @@ mod tests { let repairee_root = 5; assert!(!ClusterInfoRepairListener::should_repair_peer( repairman_root, + 0, repairee_root, 11, )); @@ -1052,6 +1083,17 @@ mod tests { let repairee_root = 5; assert!(ClusterInfoRepairListener::should_repair_peer( repairman_root, + 0, + repairee_root, + 10, + )); + + // If repairee is behind and outside the buffer but behind our lowest slot, we don't repair + let repairman_root = 16; + let repairee_root = 5; + assert!(!ClusterInfoRepairListener::should_repair_peer( + repairman_root, + repairee_root + 2, repairee_root, 10, )); diff --git a/core/src/crds_value.rs b/core/src/crds_value.rs index e622e019f..06d44e35e 100644 --- a/core/src/crds_value.rs +++ b/core/src/crds_value.rs @@ -1,5 +1,6 @@ use crate::contact_info::ContactInfo; use bincode::{serialize, serialized_size}; +use solana_sdk::clock::Slot; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, Signable, Signature}; use solana_sdk::transaction::Transaction; @@ -63,16 +64,24 @@ pub enum CrdsData { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct EpochSlots { pub from: Pubkey, - pub root: u64, - pub slots: BTreeSet, + pub root: Slot, + pub lowest: Slot, + pub slots: BTreeSet, pub wallclock: u64, } impl EpochSlots { - pub fn new(from: Pubkey, root: u64, slots: BTreeSet, wallclock: u64) -> Self { + pub fn new( + from: Pubkey, + root: Slot, + lowest: Slot, + slots: BTreeSet, + wallclock: u64, + ) -> Self { Self { from, root, + lowest, slots, wallclock, } @@ -271,6 +280,7 @@ mod test { let v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( Pubkey::default(), 0, + 0, BTreeSet::new(), 0, ))); @@ -293,10 +303,11 @@ mod test { Vote::new(&keypair.pubkey(), test_tx(), timestamp()), )); verify_signatures(&mut v, &keypair, &wrong_keypair); - let btreeset: BTreeSet = vec![1, 2, 3, 6, 8].into_iter().collect(); + let btreeset: BTreeSet = vec![1, 2, 3, 6, 8].into_iter().collect(); v = CrdsValue::new_unsigned(CrdsData::EpochSlots(EpochSlots::new( keypair.pubkey(), 0, + 0, btreeset, timestamp(), ))); diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index def7d7ddd..5fdfa8ebf 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -35,14 +35,24 @@ pub enum RepairStrategy { #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq)] pub enum RepairType { - Orphan(u64), - HighestShred(u64, u64), - Shred(u64, u64), + Orphan(Slot), + HighestShred(Slot, u64), + Shred(Slot, u64), +} + +impl RepairType { + pub fn slot(&self) -> Slot { + match self { + RepairType::Orphan(slot) => *slot, + RepairType::HighestShred(slot, _) => *slot, + RepairType::Shred(slot, _) => *slot, + } + } } pub struct RepairSlotRange { - pub start: u64, - pub end: u64, + pub start: Slot, + pub end: Slot, } impl Default for RepairSlotRange { @@ -106,7 +116,7 @@ impl RepairService { cluster_info: &Arc>, repair_strategy: RepairStrategy, ) { - let mut epoch_slots: BTreeSet = BTreeSet::new(); + let mut epoch_slots: BTreeSet = BTreeSet::new(); let id = cluster_info.read().unwrap().id(); let mut current_root = 0; if let RepairStrategy::RepairAll { @@ -144,9 +154,11 @@ impl RepairService { .. } => { let new_root = blocktree.last_root(); + let lowest_slot = blocktree.lowest_slot(); Self::update_epoch_slots( id, new_root, + lowest_slot, &mut current_root, &mut epoch_slots, &cluster_info, @@ -216,7 +228,7 @@ impl RepairService { fn generate_repairs( blocktree: &Blocktree, - root: u64, + root: Slot, max_repairs: usize, ) -> Result> { // Slot height and shred indexes for shreds we want to repair @@ -289,8 +301,8 @@ impl RepairService { fn get_completed_slots_past_root( blocktree: &Blocktree, - slots_in_gossip: &mut BTreeSet, - root: u64, + slots_in_gossip: &mut BTreeSet, + root: Slot, epoch_schedule: &EpochSchedule, ) { let last_confirmed_epoch = epoch_schedule.get_leader_schedule_epoch(root); @@ -313,8 +325,8 @@ impl RepairService { fn initialize_epoch_slots( id: Pubkey, blocktree: &Blocktree, - slots_in_gossip: &mut BTreeSet, - root: u64, + slots_in_gossip: &mut BTreeSet, + root: Slot, epoch_schedule: &EpochSchedule, cluster_info: &RwLock, ) { @@ -324,19 +336,22 @@ impl RepairService { // also be updated with the latest root (done in blocktree_processor) and thus // will provide a schedule to window_service for any incoming shreds up to the // last_confirmed_epoch. - cluster_info - .write() - .unwrap() - .push_epoch_slots(id, root, slots_in_gossip.clone()); + cluster_info.write().unwrap().push_epoch_slots( + id, + root, + blocktree.lowest_slot(), + slots_in_gossip.clone(), + ); } // Update the gossiped structure used for the "Repairmen" repair protocol. See book // for details. fn update_epoch_slots( id: Pubkey, - latest_known_root: u64, - prev_root: &mut u64, - slots_in_gossip: &mut BTreeSet, + latest_known_root: Slot, + lowest_slot: Slot, + prev_root: &mut Slot, + slots_in_gossip: &mut BTreeSet, cluster_info: &RwLock, completed_slots_receiver: &CompletedSlotsReceiver, ) { @@ -362,12 +377,13 @@ impl RepairService { cluster_info.write().unwrap().push_epoch_slots( id, latest_known_root, + lowest_slot, slots_in_gossip.clone(), ); } } - fn retain_slots_greater_than_root(slot_set: &mut BTreeSet, root: u64) { + fn retain_slots_greater_than_root(slot_set: &mut BTreeSet, root: Slot) { *slot_set = slot_set .range((Excluded(&root), Unbounded)) .cloned() @@ -732,6 +748,7 @@ mod test { RepairService::update_epoch_slots( Pubkey::default(), root, + blocktree.lowest_slot(), &mut root.clone(), &mut completed_slots, &cluster_info, @@ -749,6 +766,7 @@ mod test { RepairService::update_epoch_slots( Pubkey::default(), root, + 0, &mut 0, &mut completed_slots, &cluster_info, @@ -782,6 +800,7 @@ mod test { RepairService::update_epoch_slots( my_pubkey.clone(), current_root, + 0, &mut current_root.clone(), &mut completed_slots, &cluster_info, @@ -812,6 +831,7 @@ mod test { RepairService::update_epoch_slots( my_pubkey.clone(), current_root, + 0, &mut current_root, &mut completed_slots, &cluster_info, @@ -830,6 +850,7 @@ mod test { RepairService::update_epoch_slots( my_pubkey.clone(), current_root + 1, + 0, &mut current_root, &mut completed_slots, &cluster_info, diff --git a/ledger/src/blocktree.rs b/ledger/src/blocktree.rs index c9ca6093a..7c9b0dd68 100644 --- a/ledger/src/blocktree.rs +++ b/ledger/src/blocktree.rs @@ -72,10 +72,10 @@ pub struct Blocktree { data_shred_cf: LedgerColumn, code_shred_cf: LedgerColumn, transaction_status_cf: LedgerColumn, - last_root: Arc>, + last_root: Arc>, insert_shreds_lock: Arc>, pub new_shreds_signals: Vec>, - pub completed_slots_senders: Vec>>, + pub completed_slots_senders: Vec>>, } pub struct IndexMetaWorkingSetEntry { @@ -1452,9 +1452,23 @@ impl Blocktree { } } - pub fn last_root(&self) -> u64 { + pub fn last_root(&self) -> Slot { *self.last_root.read().unwrap() } + + // find the first available slot in blocktree that has some data in it + pub fn lowest_slot(&self) -> Slot { + for (slot, meta) in self + .slot_meta_iterator(0) + .expect("unable to iterate over meta") + { + if slot > 0 && meta.received > 0 { + return slot; + } + } + // This means blocktree is empty, should never get here aside from right at boot. + self.last_root() + } } fn update_slot_meta( @@ -1952,7 +1966,7 @@ macro_rules! create_new_tmp_ledger { }; } -pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: u64) -> bool { +pub fn verify_shred_slots(slot: Slot, parent_slot: Slot, last_root: Slot) -> bool { if !is_valid_write_to_slot_0(slot, parent_slot, last_root) { // Check that the parent_slot < slot if parent_slot >= slot { @@ -4341,4 +4355,21 @@ pub mod tests { } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + + #[test] + fn test_lowest_slot() { + let blocktree_path = get_tmp_ledger_path!(); + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + for i in 0..10 { + let slot = i; + let (shreds, _) = make_slot_entries(slot, 0, 1); + blocktree.insert_shreds(shreds, None, false).unwrap(); + } + assert_eq!(blocktree.lowest_slot(), 1); + blocktree.run_purge_batch(0, 5).unwrap(); + assert_eq!(blocktree.lowest_slot(), 6); + } + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } }