Add repair slot range

Use default impl RepairSlotRange
This commit is contained in:
Stephen Akridge 2019-02-12 17:43:45 -08:00 committed by sakridge
parent 5d27f221f7
commit ec9e13d1f4
4 changed files with 146 additions and 18 deletions

View File

@ -38,6 +38,20 @@ impl RepairInfo {
}
}
pub struct RepairSlotRange {
pub start: u64,
pub end: u64,
}
impl Default for RepairSlotRange {
fn default() -> Self {
RepairSlotRange {
start: 0,
end: std::u64::MAX,
}
}
}
pub struct RepairService {
t_repair: JoinHandle<()>,
}
@ -48,6 +62,7 @@ impl RepairService {
exit: Arc<AtomicBool>,
repair_socket: &Arc<UdpSocket>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
repair_slot_range: RepairSlotRange,
) {
let mut repair_info = RepairInfo::new();
let id = cluster_info.read().unwrap().id();
@ -56,7 +71,12 @@ impl RepairService {
break;
}
let repairs = Self::generate_repairs(blocktree, MAX_REPAIR_LENGTH, &mut repair_info);
let repairs = Self::generate_repairs(
blocktree,
MAX_REPAIR_LENGTH,
&mut repair_info,
&repair_slot_range,
);
if let Ok(repairs) = repairs {
let reqs: Vec<_> = repairs
@ -112,11 +132,20 @@ impl RepairService {
exit: &Arc<AtomicBool>,
repair_socket: Arc<UdpSocket>,
cluster_info: Arc<RwLock<ClusterInfo>>,
repair_slot_range: RepairSlotRange,
) -> Self {
let exit = exit.clone();
let t_repair = Builder::new()
.name("solana-repair-service".to_string())
.spawn(move || Self::run(&blocktree, exit, &repair_socket, &cluster_info))
.spawn(move || {
Self::run(
&blocktree,
exit,
&repair_socket,
&cluster_info,
repair_slot_range,
)
})
.unwrap();
RepairService { t_repair }
@ -150,19 +179,22 @@ impl RepairService {
blocktree: &Blocktree,
max_repairs: usize,
repair_info: &mut RepairInfo,
repair_range: &RepairSlotRange,
) -> Result<(Vec<RepairType>)> {
// Slot height and blob indexes for blobs we want to repair
let mut repairs: Vec<RepairType> = vec![];
let mut current_slot = Some(0);
let mut current_slot = Some(repair_range.start);
while repairs.len() < max_repairs && current_slot.is_some() {
if current_slot.unwrap() > repair_range.end {
break;
}
if current_slot.unwrap() > repair_info.max_slot {
repair_info.repair_tries = 0;
repair_info.max_slot = current_slot.unwrap();
}
let slot = blocktree.meta(current_slot.unwrap())?;
if slot.is_some() {
let slot = slot.unwrap();
if let Some(slot) = blocktree.meta(current_slot.unwrap())? {
let new_repairs = Self::process_slot(
blocktree,
current_slot.unwrap(),
@ -220,6 +252,7 @@ mod test {
blocktree.write_blobs(&blobs).unwrap();
let mut repair_info = RepairInfo::new();
let repair_slot_range = RepairSlotRange::default();
// We have all the blobs for all the slots in the ledger, wait for optimistic
// future repair after MAX_REPAIR_TRIES
for i in 0..MAX_REPAIR_TRIES {
@ -232,7 +265,13 @@ mod test {
vec![]
};
assert_eq!(
RepairService::generate_repairs(&blocktree, 2, &mut repair_info).unwrap(),
RepairService::generate_repairs(
&blocktree,
2,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
expected
);
}
@ -245,7 +284,13 @@ mod test {
blocktree.write_blobs(&blobs).unwrap();
assert_eq!(
RepairService::generate_repairs(&blocktree, 2, &mut repair_info).unwrap(),
RepairService::generate_repairs(
&blocktree,
2,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
vec![]
);
assert_eq!(repair_info.repair_tries, 1);
@ -272,7 +317,13 @@ mod test {
blocktree.write_blobs(&blobs).unwrap();
// Check that repair tries to patch the empty slot
assert_eq!(
RepairService::generate_repairs(&blocktree, 2, &mut repair_info).unwrap(),
RepairService::generate_repairs(
&blocktree,
2,
&mut repair_info,
&RepairSlotRange::default()
)
.unwrap(),
vec![RepairType::HighestBlob(0, 0), RepairType::Blob(2, 0)]
);
}
@ -313,15 +364,27 @@ mod test {
.collect();
// Across all slots, find all missing indexes in the range [0, num_entries_per_slot]
let repair_slot_range = RepairSlotRange::default();
assert_eq!(
RepairService::generate_repairs(&blocktree, std::usize::MAX, &mut repair_info)
.unwrap(),
RepairService::generate_repairs(
&blocktree,
std::usize::MAX,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
expected
);
assert_eq!(
RepairService::generate_repairs(&blocktree, expected.len() - 2, &mut repair_info)
.unwrap()[..],
RepairService::generate_repairs(
&blocktree,
expected.len() - 2,
&mut repair_info,
&repair_slot_range
)
.unwrap()[..],
expected[0..expected.len() - 2]
);
}
@ -349,12 +412,61 @@ mod test {
// We didn't get the last blob for this slot, so ask for the highest blob for that slot
let expected: Vec<RepairType> = vec![RepairType::HighestBlob(0, num_entries_per_slot)];
let repair_slot_range = RepairSlotRange::default();
assert_eq!(
RepairService::generate_repairs(&blocktree, std::usize::MAX, &mut repair_info)
.unwrap(),
RepairService::generate_repairs(
&blocktree,
std::usize::MAX,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
expected
);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
#[test]
pub fn test_repair_range() {
let blocktree_path = get_tmp_ledger_path!();
{
let blocktree = Blocktree::open(&blocktree_path).unwrap();
let num_entries_per_slot = 10;
let mut repair_info = RepairInfo::new();
let num_slots = 1;
let start = 5;
// Create some blobs in slots 0..num_slots
for i in start..start + num_slots {
let parent = if i > 0 { i - 1 } else { 0 };
let (blobs, _) = make_slot_entries(i, parent, num_entries_per_slot as u64);
blocktree.write_blobs(&blobs).unwrap();
}
let end = 4;
let expected: Vec<RepairType> = vec![RepairType::HighestBlob(end, 0)];
let mut repair_slot_range = RepairSlotRange::default();
repair_slot_range.start = 2;
repair_slot_range.end = end;
assert_eq!(
RepairService::generate_repairs(
&blocktree,
std::usize::MAX,
&mut repair_info,
&repair_slot_range
)
.unwrap(),
expected
);
}
Blocktree::destroy(&blocktree_path).expect("Expected successful database destruction");
}
}

View File

@ -6,6 +6,7 @@ use crate::chacha::{chacha_cbc_encrypt_ledger, CHACHA_BLOCK_SIZE};
use crate::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE};
use crate::contact_info::ContactInfo;
use crate::gossip_service::GossipService;
use crate::repair_service::RepairSlotRange;
use crate::result::Result;
use crate::service::Service;
use crate::storage_stage::{get_segment_from_entry, ENTRIES_PER_SEGMENT};
@ -156,6 +157,9 @@ impl Replicator {
let signature = keypair.sign(storage_blockhash.as_ref());
let entry_height = get_entry_heights_from_blockhash(&signature, storage_entry_height);
let mut repair_slot_range = RepairSlotRange::default();
repair_slot_range.end = entry_height;
repair_slot_range.start = entry_height - ENTRIES_PER_SEGMENT;
info!("replicating entry_height: {}", entry_height);
@ -176,6 +180,7 @@ impl Replicator {
retransmit_sender,
repair_socket,
&exit,
repair_slot_range,
);
info!("window created, waiting for ledger download done");

View File

@ -7,6 +7,7 @@ use crate::cluster_info::{
NEIGHBORHOOD_SIZE,
};
use crate::packet::SharedBlob;
use crate::repair_service::RepairSlotRange;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::staking_utils;
@ -130,6 +131,7 @@ impl RetransmitStage {
retransmit_sender,
repair_socket,
exit,
RepairSlotRange::default(),
);
let thread_hdls = vec![t_retransmit];

View File

@ -3,7 +3,7 @@
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::db_window::*;
use crate::repair_service::RepairService;
use crate::repair_service::{RepairService, RepairSlotRange};
use crate::result::{Error, Result};
use crate::service::Service;
use crate::streamer::{BlobReceiver, BlobSender};
@ -103,9 +103,15 @@ impl WindowService {
retransmit: BlobSender,
repair_socket: Arc<UdpSocket>,
exit: &Arc<AtomicBool>,
repair_slot_range: RepairSlotRange,
) -> WindowService {
let repair_service =
RepairService::new(blocktree.clone(), exit, repair_socket, cluster_info.clone());
let repair_service = RepairService::new(
blocktree.clone(),
exit,
repair_socket,
cluster_info.clone(),
repair_slot_range,
);
let exit = exit.clone();
let t_window = Builder::new()
.name("solana-window".to_string())
@ -153,6 +159,7 @@ mod test {
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, Node};
use crate::entry::make_consecutive_blobs;
use crate::repair_service::RepairSlotRange;
use crate::service::Service;
use crate::streamer::{blob_receiver, responder};
use crate::window_service::WindowService;
@ -190,6 +197,7 @@ mod test {
s_retransmit,
Arc::new(leader_node.sockets.repair),
&exit,
RepairSlotRange::default(),
);
let t_responder = {
let (s_responder, r_responder) = channel();
@ -261,6 +269,7 @@ mod test {
s_retransmit,
Arc::new(leader_node.sockets.repair),
&exit,
RepairSlotRange::default(),
);
let t_responder = {
let (s_responder, r_responder) = channel();