From 483cc2fa4e90c22a55752ffb4cade81c728cec53 Mon Sep 17 00:00:00 2001 From: carllin Date: Mon, 8 Apr 2019 12:46:23 -0700 Subject: [PATCH] Support old repair strategy for reparing slots in a range for supporting replicators (#3665) --- core/src/repair_service.rs | 122 ++++++++++++++++++++++++++++------- core/src/replicator.rs | 2 +- core/src/retransmit_stage.rs | 3 +- core/src/window_service.rs | 7 +- 4 files changed, 105 insertions(+), 29 deletions(-) diff --git a/core/src/repair_service.rs b/core/src/repair_service.rs index 9cea0304cb..896c8e2ec1 100644 --- a/core/src/repair_service.rs +++ b/core/src/repair_service.rs @@ -26,6 +26,21 @@ pub enum RepairType { Blob(u64, u64), } +#[derive(Default)] +struct RepairInfo { + max_slot: u64, + repair_tries: u64, +} + +impl RepairInfo { + fn new() -> Self { + RepairInfo { + max_slot: 0, + repair_tries: 0, + } + } +} + pub struct RepairSlotRange { pub start: u64, pub end: u64, @@ -50,15 +65,28 @@ impl RepairService { exit: Arc, repair_socket: &Arc, cluster_info: &Arc>, - _repair_slot_range: RepairSlotRange, + repair_slot_range: Option, ) { + let mut repair_info = RepairInfo::new(); let id = cluster_info.read().unwrap().id(); loop { if exit.load(Ordering::Relaxed) { break; } - let repairs = Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH); + let repairs = { + if let Some(ref repair_slot_range) = repair_slot_range { + // Strategy used by replicators + Self::generate_repairs_in_range( + blocktree, + MAX_REPAIR_LENGTH, + &mut repair_info, + repair_slot_range, + ) + } else { + Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH) + } + }; if let Ok(repairs) = repairs { let reqs: Vec<_> = repairs @@ -106,7 +134,7 @@ impl RepairService { exit: &Arc, repair_socket: Arc, cluster_info: Arc>, - repair_slot_range: RepairSlotRange, + repair_slot_range: Option, ) -> Self { let exit = exit.clone(); let t_repair = Builder::new() @@ -125,6 +153,66 @@ impl RepairService { RepairService { t_repair } } + fn generate_repairs_in_range( + blocktree: &Blocktree, + max_repairs: usize, + repair_info: &mut RepairInfo, + repair_range: &RepairSlotRange, + ) -> Result<(Vec)> { + // Slot height and blob indexes for blobs we want to repair + let mut repairs: Vec = vec![]; + let mut current_slot = Some(repair_range.start); + while repairs.len() < max_repairs && current_slot.is_some() { + if current_slot.unwrap() > repair_range.end { + break; + } + + if current_slot.unwrap() > repair_info.max_slot { + repair_info.repair_tries = 0; + repair_info.max_slot = current_slot.unwrap(); + } + + if let Some(slot) = blocktree.meta(current_slot.unwrap())? { + let new_repairs = Self::generate_repairs_for_slot( + blocktree, + current_slot.unwrap(), + &slot, + max_repairs - repairs.len(), + ); + repairs.extend(new_repairs); + } + current_slot = blocktree.get_next_slot(current_slot.unwrap())?; + } + + // Only increment repair_tries if the ledger contains every blob for every slot + if repairs.is_empty() { + repair_info.repair_tries += 1; + } + + // Optimistically try the next slot if we haven't gotten any repairs + // for a while + if repair_info.repair_tries >= MAX_REPAIR_TRIES { + repairs.push(RepairType::HighestBlob(repair_info.max_slot + 1, 0)) + } + + Ok(repairs) + } + + fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result<(Vec)> { + // Slot height and blob indexes for blobs we want to repair + let mut repairs: Vec = vec![]; + let slot = *blocktree.root_slot.read().unwrap(); + Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, slot); + + // TODO: Incorporate gossip to determine priorities for repair? + + // Try to resolve orphans in blocktree + let orphans = blocktree.get_orphans(Some(MAX_ORPHANS)); + + Self::generate_repairs_for_orphans(&orphans[..], &mut repairs); + Ok(repairs) + } + fn generate_repairs_for_slot( blocktree: &Blocktree, slot: u64, @@ -149,21 +237,6 @@ impl RepairService { } } - fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result<(Vec)> { - // Slot height and blob indexes for blobs we want to repair - let mut repairs: Vec = vec![]; - let slot = *blocktree.root_slot.read().unwrap(); - Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, slot); - - // TODO: Incorporate gossip to determine priorities for repair? - - // Try to resolve orphans in blocktree - let orphans = blocktree.get_orphans(Some(MAX_ORPHANS)); - - Self::generate_repairs_for_orphans(&orphans[..], &mut repairs); - Ok(repairs) - } - fn generate_repairs_for_orphans(orphans: &[u64], repairs: &mut Vec) { repairs.extend(orphans.iter().map(|h| RepairType::Orphan(*h))); } @@ -325,7 +398,7 @@ mod test { Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); } - /*#[test] + #[test] pub fn test_repair_range() { let blocktree_path = get_tmp_ledger_path!(); { @@ -353,11 +426,16 @@ mod test { repair_slot_range.end = end; assert_eq!( - RepairService::generate_repairs(&blocktree, std::usize::MAX, &mut repair_info,) - .unwrap(), + RepairService::generate_repairs_in_range( + &blocktree, + std::usize::MAX, + &mut repair_info, + &repair_slot_range + ) + .unwrap(), expected ); } Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction"); - }*/ + } } diff --git a/core/src/replicator.rs b/core/src/replicator.rs index 444faf9a10..c8c22f15ab 100644 --- a/core/src/replicator.rs +++ b/core/src/replicator.rs @@ -244,7 +244,7 @@ impl Replicator { retransmit_sender, repair_socket, &exit, - repair_slot_range, + Some(repair_slot_range), ); let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index ccbf98e15f..f26d3078ae 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -7,7 +7,6 @@ use crate::cluster_info::{ NEIGHBORHOOD_SIZE, }; use crate::packet::SharedBlob; -use crate::repair_service::RepairSlotRange; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; @@ -131,7 +130,7 @@ impl RetransmitStage { retransmit_sender, repair_socket, exit, - RepairSlotRange::default(), + None, ); let thread_hdls = vec![t_retransmit]; diff --git a/core/src/window_service.rs b/core/src/window_service.rs index f9bd1212f1..c705743eca 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -103,7 +103,7 @@ impl WindowService { retransmit: BlobSender, repair_socket: Arc, exit: &Arc, - repair_slot_range: RepairSlotRange, + repair_slot_range: Option, ) -> WindowService { let repair_service = RepairService::new( blocktree.clone(), @@ -159,7 +159,6 @@ mod test { use crate::blocktree::Blocktree; use crate::cluster_info::{ClusterInfo, Node}; use crate::entry::make_consecutive_blobs; - use crate::repair_service::RepairSlotRange; use crate::service::Service; use crate::streamer::{blob_receiver, responder}; use crate::window_service::WindowService; @@ -197,7 +196,7 @@ mod test { s_retransmit, Arc::new(leader_node.sockets.repair), &exit, - RepairSlotRange::default(), + None, ); let t_responder = { let (s_responder, r_responder) = channel(); @@ -269,7 +268,7 @@ mod test { s_retransmit, Arc::new(leader_node.sockets.repair), &exit, - RepairSlotRange::default(), + None, ); let t_responder = { let (s_responder, r_responder) = channel();