diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 8670202f6e..3ab86deaf1 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -38,6 +38,20 @@ impl RepairInfo { } } +pub struct RepairSlotRange { + pub start: u64, + pub end: u64, +} + +impl Default for RepairSlotRange { + fn default() -> Self { + RepairSlotRange { + start: 0, + end: std::u64::MAX, + } + } +} + pub struct RepairService { t_repair: JoinHandle<()>, } @@ -48,6 +62,7 @@ impl RepairService { exit: Arc, repair_socket: &Arc, cluster_info: &Arc>, + repair_slot_range: RepairSlotRange, ) { let mut repair_info = RepairInfo::new(); let id = cluster_info.read().unwrap().id(); @@ -56,7 +71,12 @@ impl RepairService { break; } - let repairs = Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH, &mut repair_info); + let repairs = Self::generate_repairs( + blocktree, + MAX_REPAIR_LENGTH, + &mut repair_info, + &repair_slot_range, + ); if let Ok(repairs) = repairs { let reqs: Vec<_> = repairs @@ -112,11 +132,20 @@ impl RepairService { exit: &Arc, repair_socket: Arc, cluster_info: Arc>, + repair_slot_range: RepairSlotRange, ) -> Self { let exit = exit.clone(); let t_repair = Builder::new() .name("solana-repair-service".to_string()) - .spawn(move || Self::run(&blocktree, exit, &repair_socket, &cluster_info)) + .spawn(move || { + Self::run( + &blocktree, + exit, + &repair_socket, + &cluster_info, + repair_slot_range, + ) + }) .unwrap(); RepairService { t_repair } @@ -150,19 +179,22 @@ impl RepairService { blocktree: &Blocktree, max_repairs: usize, repair_info: &mut RepairInfo, + repair_range: &RepairSlotRange, ) -> Result<(Vec)> { // Slot height and blob indexes for blobs we want to repair let mut repairs: Vec = vec![]; - let mut current_slot = Some(0); + let mut current_slot = Some(repair_range.start); while repairs.len() < max_repairs && current_slot.is_some() { + if current_slot.unwrap() > repair_range.end { + break; + } + if current_slot.unwrap() > repair_info.max_slot { repair_info.repair_tries = 0; repair_info.max_slot = current_slot.unwrap(); } - let slot = blocktree.meta(current_slot.unwrap())?; - if slot.is_some() { - let slot = slot.unwrap(); + if let Some(slot) = blocktree.meta(current_slot.unwrap())? { let new_repairs = Self::process_slot( blocktree, current_slot.unwrap(), @@ -220,6 +252,7 @@ mod test { blocktree.write_blobs(&blobs).unwrap(); let mut repair_info = RepairInfo::new(); + let repair_slot_range = RepairSlotRange::default(); // We have all the blobs for all the slots in the ledger, wait for optimistic // future repair after MAX_REPAIR_TRIES for i in 0..MAX_REPAIR_TRIES { @@ -232,7 +265,13 @@ mod test { vec![] }; assert_eq!( - RepairService::generate_repairs(&blocktree, 2, &mut repair_info).unwrap(), + RepairService::generate_repairs( + &blocktree, + 2, + &mut repair_info, + &repair_slot_range + ) + .unwrap(), expected ); } @@ -245,7 +284,13 @@ mod test { blocktree.write_blobs(&blobs).unwrap(); assert_eq!( - RepairService::generate_repairs(&blocktree, 2, &mut repair_info).unwrap(), + RepairService::generate_repairs( + &blocktree, + 2, + &mut repair_info, + &repair_slot_range + ) + .unwrap(), vec![] ); assert_eq!(repair_info.repair_tries, 1); @@ -272,7 +317,13 @@ mod test { blocktree.write_blobs(&blobs).unwrap(); // Check that repair tries to patch the empty slot assert_eq!( - RepairService::generate_repairs(&blocktree, 2, &mut repair_info).unwrap(), + RepairService::generate_repairs( + &blocktree, + 2, + &mut repair_info, + &RepairSlotRange::default() + ) + .unwrap(), vec![RepairType::HighestBlob(0, 0), RepairType::Blob(2, 0)] ); } @@ -313,15 +364,27 @@ mod test { .collect(); // Across all slots, find all missing indexes in the range [0, num_entries_per_slot] + let repair_slot_range = RepairSlotRange::default(); + assert_eq!( - RepairService::generate_repairs(&blocktree, std::usize::MAX, &mut repair_info) - .unwrap(), + RepairService::generate_repairs( + &blocktree, + std::usize::MAX, + &mut repair_info, + &repair_slot_range + ) + .unwrap(), expected ); assert_eq!( - RepairService::generate_repairs(&blocktree, expected.len() - 2, &mut repair_info) - .unwrap()[..], + RepairService::generate_repairs( + &blocktree, + expected.len() - 2, + &mut repair_info, + &repair_slot_range + ) + .unwrap()[..], expected[0..expected.len() - 2] ); } @@ -349,12 +412,61 @@ mod test { // We didn't get the last blob for this slot, so ask for the highest blob for that slot let expected: Vec = vec![RepairType::HighestBlob(0, num_entries_per_slot)]; + let repair_slot_range = RepairSlotRange::default(); + assert_eq!( - RepairService::generate_repairs(&blocktree, std::usize::MAX, &mut repair_info) - .unwrap(), + RepairService::generate_repairs( + &blocktree, + std::usize::MAX, + &mut repair_info, + &repair_slot_range + ) + .unwrap(), expected ); } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } + + #[test] + pub fn test_repair_range() { + let blocktree_path = get_tmp_ledger_path!(); + { + let blocktree = Blocktree::open(&blocktree_path).unwrap(); + + let num_entries_per_slot = 10; + + let mut repair_info = RepairInfo::new(); + + let num_slots = 1; + let start = 5; + // Create some blobs in slots 0..num_slots + for i in start..start + num_slots { + let parent = if i > 0 { i - 1 } else { 0 }; + let (blobs, _) = make_slot_entries(i, parent, num_entries_per_slot as u64); + + blocktree.write_blobs(&blobs).unwrap(); + } + + let end = 4; + let expected: Vec = vec![RepairType::HighestBlob(end, 0)]; + + let mut repair_slot_range = RepairSlotRange::default(); + repair_slot_range.start = 2; + repair_slot_range.end = end; + + assert_eq!( + RepairService::generate_repairs( + &blocktree, + std::usize::MAX, + &mut repair_info, + &repair_slot_range + ) + .unwrap(), + expected + ); + } + Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); + } + } diff --git a/core/src/replicator.rs b/core/src/replicator.rs index f2bfa5dcd1..3bc3e73090 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -6,6 +6,7 @@ use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE}; use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE}; use crate::contact_info::ContactInfo; use crate::gossip_service::GossipService; +use crate::repair_service::RepairSlotRange; use crate::result::Result; use crate::service::Service; use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT}; @@ -156,6 +157,9 @@ impl Replicator { let signature = keypair.sign(storage_blockhash.as_ref()); let entry_height = get_entry_heights_from_blockhash(&signature, storage_entry_height); + let mut repair_slot_range = RepairSlotRange::default(); + repair_slot_range.end = entry_height; + repair_slot_range.start = entry_height - ENTRIES_PER_SEGMENT; info!("replicating entry_height: {}", entry_height); @@ -176,6 +180,7 @@ impl Replicator { retransmit_sender, repair_socket, &exit, + repair_slot_range, ); info!("window created, waiting for ledger download done"); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 4832daffa3..ccbf98e15f 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -7,6 +7,7 @@ use crate::cluster_info::{ NEIGHBORHOOD_SIZE, }; use crate::packet::SharedBlob; +use crate::repair_service::RepairSlotRange; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; @@ -130,6 +131,7 @@ impl RetransmitStage { retransmit_sender, repair_socket, exit, + RepairSlotRange::default(), ); let thread_hdls = vec![t_retransmit]; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 37c8ac1bbd..f9bd1212f1 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -3,7 +3,7 @@ use crate::blocktree::Blocktree; use crate::cluster_info::ClusterInfo; use crate::db_window::*; -use crate::repair_service::RepairService; +use crate::repair_service::{RepairService, RepairSlotRange}; use crate::result::{Error, Result}; use crate::service::Service; use crate::streamer::{BlobReceiver, BlobSender}; @@ -103,9 +103,15 @@ impl WindowService { retransmit: BlobSender, repair_socket: Arc, exit: &Arc, + repair_slot_range: RepairSlotRange, ) -> WindowService { - let repair_service = - RepairService::new(blocktree.clone(), exit, repair_socket, cluster_info.clone()); + let repair_service = RepairService::new( + blocktree.clone(), + exit, + repair_socket, + cluster_info.clone(), + repair_slot_range, + ); let exit = exit.clone(); let t_window = Builder::new() .name("solana-window".to_string()) @@ -153,6 +159,7 @@ mod test { use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, Node}; use crate::entry::make_consecutive_blobs; + use crate::repair_service::RepairSlotRange; use crate::service::Service; use crate::streamer::{blob_receiver, responder}; use crate::window_service::WindowService; @@ -190,6 +197,7 @@ mod test { s_retransmit, Arc::new(leader_node.sockets.repair), &exit, + RepairSlotRange::default(), ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -261,6 +269,7 @@ mod test { s_retransmit, Arc::new(leader_node.sockets.repair), &exit, + RepairSlotRange::default(), ); let t_responder = { let (s_responder, r_responder) = channel();