Support old repair strategy for reparing slots in a range for supporting replicators (#3665)

This commit is contained in:
carllin 2019-04-08 12:46:23 -07:00 committed by GitHub
parent e551f6b552
commit 483cc2fa4e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 29 deletions

View File

@ -26,6 +26,21 @@ pub enum RepairType {
Blob(u64, u64),
}
#[derive(Default)]
struct RepairInfo {
max_slot: u64,
repair_tries: u64,
}
impl RepairInfo {
fn new() -> Self {
RepairInfo {
max_slot: 0,
repair_tries: 0,
}
}
}
pub struct RepairSlotRange {
pub start: u64,
pub end: u64,
@ -50,15 +65,28 @@ impl RepairService {
exit: Arc<AtomicBool>,
repair_socket: &Arc<UdpSocket>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
_repair_slot_range: RepairSlotRange,
repair_slot_range: Option<RepairSlotRange>,
) {
let mut repair_info = RepairInfo::new();
let id = cluster_info.read().unwrap().id();
loop {
if exit.load(Ordering::Relaxed) {
break;
}
let repairs = Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH);
let repairs = {
if let Some(ref repair_slot_range) = repair_slot_range {
// Strategy used by replicators
Self::generate_repairs_in_range(
blocktree,
MAX_REPAIR_LENGTH,
&mut repair_info,
repair_slot_range,
)
} else {
Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH)
}
};
if let Ok(repairs) = repairs {
let reqs: Vec<_> = repairs
@ -106,7 +134,7 @@ impl RepairService {
exit: &Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
repair_slot_range: RepairSlotRange,
repair_slot_range: Option<RepairSlotRange>,
) -> Self {
let exit = exit.clone();
let t_repair = Builder::new()
@ -125,6 +153,66 @@ impl RepairService {
RepairService { t_repair }
}
fn generate_repairs_in_range(
blocktree: &Blocktree,
max_repairs: usize,
repair_info: &mut RepairInfo,
repair_range: &RepairSlotRange,
) -> Result<(Vec<RepairType>)> {
// Slot height and blob indexes for blobs we want to repair
let mut repairs: Vec<RepairType> = vec![];
let mut current_slot = Some(repair_range.start);
while repairs.len() < max_repairs && current_slot.is_some() {
if current_slot.unwrap() > repair_range.end {
break;
}
if current_slot.unwrap() > repair_info.max_slot {
repair_info.repair_tries = 0;
repair_info.max_slot = current_slot.unwrap();
}
if let Some(slot) = blocktree.meta(current_slot.unwrap())? {
let new_repairs = Self::generate_repairs_for_slot(
blocktree,
current_slot.unwrap(),
&slot,
max_repairs - repairs.len(),
);
repairs.extend(new_repairs);
}
current_slot = blocktree.get_next_slot(current_slot.unwrap())?;
}
// Only increment repair_tries if the ledger contains every blob for every slot
if repairs.is_empty() {
repair_info.repair_tries += 1;
}
// Optimistically try the next slot if we haven't gotten any repairs
// for a while
if repair_info.repair_tries >= MAX_REPAIR_TRIES {
repairs.push(RepairType::HighestBlob(repair_info.max_slot + 1, 0))
}
Ok(repairs)
}
fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result<(Vec<RepairType>)> {
// Slot height and blob indexes for blobs we want to repair
let mut repairs: Vec<RepairType> = vec![];
let slot = *blocktree.root_slot.read().unwrap();
Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, slot);
// TODO: Incorporate gossip to determine priorities for repair?
// Try to resolve orphans in blocktree
let orphans = blocktree.get_orphans(Some(MAX_ORPHANS));
Self::generate_repairs_for_orphans(&orphans[..], &mut repairs);
Ok(repairs)
}
fn generate_repairs_for_slot(
blocktree: &Blocktree,
slot: u64,
@ -149,21 +237,6 @@ impl RepairService {
}
}
fn generate_repairs(blocktree: &Blocktree, max_repairs: usize) -> Result<(Vec<RepairType>)> {
// Slot height and blob indexes for blobs we want to repair
let mut repairs: Vec<RepairType> = vec![];
let slot = *blocktree.root_slot.read().unwrap();
Self::generate_repairs_for_fork(blocktree, &mut repairs, max_repairs, slot);
// TODO: Incorporate gossip to determine priorities for repair?
// Try to resolve orphans in blocktree
let orphans = blocktree.get_orphans(Some(MAX_ORPHANS));
Self::generate_repairs_for_orphans(&orphans[..], &mut repairs);
Ok(repairs)
}
fn generate_repairs_for_orphans(orphans: &[u64], repairs: &mut Vec<RepairType>) {
repairs.extend(orphans.iter().map(|h| RepairType::Orphan(*h)));
}
@ -325,7 +398,7 @@ mod test {
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
/*#[test]
#[test]
pub fn test_repair_range() {
let blocktree_path = get_tmp_ledger_path!();
{
@ -353,11 +426,16 @@ mod test {
repair_slot_range.end = end;
assert_eq!(
RepairService::generate_repairs(&blocktree, std::usize::MAX, &mut repair_info,)
.unwrap(),
RepairService::generate_repairs_in_range(
&blocktree,
std::usize::MAX,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
expected
);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}*/
}
}

View File

@ -244,7 +244,7 @@ impl Replicator {
retransmit_sender,
repair_socket,
&exit,
repair_slot_range,
Some(repair_slot_range),
);
let client = create_client(cluster_entrypoint.client_facing_addr(), FULLNODE_PORT_RANGE);

View File

@ -7,7 +7,6 @@ use crate::cluster_info::{
NEIGHBORHOOD_SIZE,
};
use crate::packet::SharedBlob;
use crate::repair_service::RepairSlotRange;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::staking_utils;
@ -131,7 +130,7 @@ impl RetransmitStage {
retransmit_sender,
repair_socket,
exit,
RepairSlotRange::default(),
None,
);
let thread_hdls = vec![t_retransmit];

View File

@ -103,7 +103,7 @@ impl WindowService {
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
exit: &Arc<AtomicBool>,
repair_slot_range: RepairSlotRange,
repair_slot_range: Option<RepairSlotRange>,
) -> WindowService {
let repair_service = RepairService::new(
blocktree.clone(),
@ -159,7 +159,6 @@ mod test {
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::make_consecutive_blobs;
use crate::repair_service::RepairSlotRange;
use crate::service::Service;
use crate::streamer::{blob_receiver, responder};
use crate::window_service::WindowService;
@ -197,7 +196,7 @@ mod test {
s_retransmit,
Arc::new(leader_node.sockets.repair),
&exit,
RepairSlotRange::default(),
None,
);
let t_responder = {
let (s_responder, r_responder) = channel();
@ -269,7 +268,7 @@ mod test {
s_retransmit,
Arc::new(leader_node.sockets.repair),
&exit,
RepairSlotRange::default(),
None,
);
let t_responder = {
let (s_responder, r_responder) = channel();