Add threshold to repairman for same slot (#6728)
This commit is contained in:
parent
46391397b8
commit
8f91b5aab3
|
@ -8,7 +8,6 @@ use rand::SeedableRng;
|
||||||
use rand_chacha::ChaChaRng;
|
use rand_chacha::ChaChaRng;
|
||||||
use solana_ledger::blocktree::Blocktree;
|
use solana_ledger::blocktree::Blocktree;
|
||||||
use solana_ledger::rooted_slot_iterator::RootedSlotIterator;
|
use solana_ledger::rooted_slot_iterator::RootedSlotIterator;
|
||||||
use solana_metrics::datapoint;
|
|
||||||
use solana_sdk::{epoch_schedule::EpochSchedule, pubkey::Pubkey};
|
use solana_sdk::{epoch_schedule::EpochSchedule, pubkey::Pubkey};
|
||||||
use std::{
|
use std::{
|
||||||
cmp,
|
cmp,
|
||||||
|
@ -27,6 +26,9 @@ pub const REPAIR_REDUNDANCY: usize = 1;
|
||||||
pub const NUM_BUFFER_SLOTS: usize = 50;
|
pub const NUM_BUFFER_SLOTS: usize = 50;
|
||||||
pub const GOSSIP_DELAY_SLOTS: usize = 2;
|
pub const GOSSIP_DELAY_SLOTS: usize = 2;
|
||||||
pub const NUM_SLOTS_PER_UPDATE: usize = 2;
|
pub const NUM_SLOTS_PER_UPDATE: usize = 2;
|
||||||
|
// Time between allowing repair for same slot for same validator
|
||||||
|
pub const REPAIR_SAME_SLOT_THRESHOLD: u64 = 5000;
|
||||||
|
use solana_sdk::timing::timestamp;
|
||||||
|
|
||||||
// Represents the blobs that a repairman is responsible for repairing in specific slot. More
|
// Represents the blobs that a repairman is responsible for repairing in specific slot. More
|
||||||
// specifically, a repairman is responsible for every blob in this slot with index
|
// specifically, a repairman is responsible for every blob in this slot with index
|
||||||
|
@ -73,6 +75,13 @@ impl Iterator for BlobIndexesToRepairIterator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct RepaireeInfo {
|
||||||
|
last_root: u64,
|
||||||
|
last_ts: u64,
|
||||||
|
last_repaired_slot_and_ts: (u64, u64),
|
||||||
|
}
|
||||||
|
|
||||||
pub struct ClusterInfoRepairListener {
|
pub struct ClusterInfoRepairListener {
|
||||||
thread_hdls: Vec<JoinHandle<()>>,
|
thread_hdls: Vec<JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
@ -93,10 +102,10 @@ impl ClusterInfoRepairListener {
|
||||||
// 1) The latest timestamp of the EpochSlots gossip message at which a repair was
|
// 1) The latest timestamp of the EpochSlots gossip message at which a repair was
|
||||||
// sent to this peer
|
// sent to this peer
|
||||||
// 2) The latest root the peer gossiped
|
// 2) The latest root the peer gossiped
|
||||||
let mut peer_roots: HashMap<Pubkey, (u64, u64)> = HashMap::new();
|
let mut peer_infos: HashMap<Pubkey, RepaireeInfo> = HashMap::new();
|
||||||
let _ = Self::recv_loop(
|
let _ = Self::recv_loop(
|
||||||
&blocktree,
|
&blocktree,
|
||||||
&mut peer_roots,
|
&mut peer_infos,
|
||||||
&exit,
|
&exit,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&epoch_schedule,
|
&epoch_schedule,
|
||||||
|
@ -110,7 +119,7 @@ impl ClusterInfoRepairListener {
|
||||||
|
|
||||||
fn recv_loop(
|
fn recv_loop(
|
||||||
blocktree: &Blocktree,
|
blocktree: &Blocktree,
|
||||||
peer_roots: &mut HashMap<Pubkey, (u64, u64)>,
|
peer_infos: &mut HashMap<Pubkey, RepaireeInfo>,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
epoch_schedule: &EpochSchedule,
|
epoch_schedule: &EpochSchedule,
|
||||||
|
@ -134,7 +143,7 @@ impl ClusterInfoRepairListener {
|
||||||
&my_pubkey,
|
&my_pubkey,
|
||||||
&peer.id,
|
&peer.id,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
peer_roots,
|
peer_infos,
|
||||||
&mut my_gossiped_root,
|
&mut my_gossiped_root,
|
||||||
) {
|
) {
|
||||||
peers_needing_repairs.insert(peer.id, repairee_epoch_slots);
|
peers_needing_repairs.insert(peer.id, repairee_epoch_slots);
|
||||||
|
@ -145,7 +154,7 @@ impl ClusterInfoRepairListener {
|
||||||
let _ = Self::serve_repairs(
|
let _ = Self::serve_repairs(
|
||||||
&my_pubkey,
|
&my_pubkey,
|
||||||
blocktree,
|
blocktree,
|
||||||
peer_roots,
|
peer_infos,
|
||||||
&peers_needing_repairs,
|
&peers_needing_repairs,
|
||||||
&socket,
|
&socket,
|
||||||
cluster_info,
|
cluster_info,
|
||||||
|
@ -161,10 +170,10 @@ impl ClusterInfoRepairListener {
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
peer_pubkey: &Pubkey,
|
peer_pubkey: &Pubkey,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
peer_roots: &mut HashMap<Pubkey, (u64, u64)>,
|
peer_infos: &mut HashMap<Pubkey, RepaireeInfo>,
|
||||||
my_gossiped_root: &mut u64,
|
my_gossiped_root: &mut u64,
|
||||||
) -> Option<EpochSlots> {
|
) -> Option<EpochSlots> {
|
||||||
let last_cached_repair_ts = Self::get_last_ts(peer_pubkey, peer_roots);
|
let last_cached_repair_ts = Self::get_last_ts(peer_pubkey, peer_infos);
|
||||||
let my_root = Self::read_my_gossiped_root(&my_pubkey, cluster_info, my_gossiped_root);
|
let my_root = Self::read_my_gossiped_root(&my_pubkey, cluster_info, my_gossiped_root);
|
||||||
{
|
{
|
||||||
let r_cluster_info = cluster_info.read().unwrap();
|
let r_cluster_info = cluster_info.read().unwrap();
|
||||||
|
@ -173,8 +182,8 @@ impl ClusterInfoRepairListener {
|
||||||
if let Some((peer_epoch_slots, updated_ts)) =
|
if let Some((peer_epoch_slots, updated_ts)) =
|
||||||
r_cluster_info.get_epoch_state_for_node(&peer_pubkey, last_cached_repair_ts)
|
r_cluster_info.get_epoch_state_for_node(&peer_pubkey, last_cached_repair_ts)
|
||||||
{
|
{
|
||||||
let peer_entry = peer_roots.entry(*peer_pubkey).or_default();
|
let peer_info = peer_infos.entry(*peer_pubkey).or_default();
|
||||||
let peer_root = cmp::max(peer_epoch_slots.root, peer_entry.1);
|
let peer_root = cmp::max(peer_epoch_slots.root, peer_info.last_root);
|
||||||
let mut result = None;
|
let mut result = None;
|
||||||
let last_repair_ts = {
|
let last_repair_ts = {
|
||||||
// Following logic needs to be fast because it holds the lock
|
// Following logic needs to be fast because it holds the lock
|
||||||
|
@ -185,11 +194,12 @@ impl ClusterInfoRepairListener {
|
||||||
updated_ts
|
updated_ts
|
||||||
} else {
|
} else {
|
||||||
// No repairs were sent, don't need to update the timestamp
|
// No repairs were sent, don't need to update the timestamp
|
||||||
peer_entry.0
|
peer_info.last_ts
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
*peer_entry = (last_repair_ts, peer_root);
|
peer_info.last_ts = last_repair_ts;
|
||||||
|
peer_info.last_root = peer_root;
|
||||||
result
|
result
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
@ -200,7 +210,7 @@ impl ClusterInfoRepairListener {
|
||||||
fn serve_repairs(
|
fn serve_repairs(
|
||||||
my_pubkey: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
blocktree: &Blocktree,
|
blocktree: &Blocktree,
|
||||||
peer_roots: &HashMap<Pubkey, (u64, u64)>,
|
peer_infos: &mut HashMap<Pubkey, RepaireeInfo>,
|
||||||
repairees: &HashMap<Pubkey, EpochSlots>,
|
repairees: &HashMap<Pubkey, EpochSlots>,
|
||||||
socket: &UdpSocket,
|
socket: &UdpSocket,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
|
@ -221,7 +231,7 @@ impl ClusterInfoRepairListener {
|
||||||
let mut eligible_repairmen = Self::find_eligible_repairmen(
|
let mut eligible_repairmen = Self::find_eligible_repairmen(
|
||||||
my_pubkey,
|
my_pubkey,
|
||||||
repairee_root,
|
repairee_root,
|
||||||
peer_roots,
|
peer_infos,
|
||||||
NUM_BUFFER_SLOTS,
|
NUM_BUFFER_SLOTS,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -234,7 +244,7 @@ impl ClusterInfoRepairListener {
|
||||||
let my_root =
|
let my_root =
|
||||||
Self::read_my_gossiped_root(my_pubkey, cluster_info, my_gossiped_root);
|
Self::read_my_gossiped_root(my_pubkey, cluster_info, my_gossiped_root);
|
||||||
|
|
||||||
let _ = Self::serve_repairs_to_repairee(
|
let repair_results = Self::serve_repairs_to_repairee(
|
||||||
my_pubkey,
|
my_pubkey,
|
||||||
repairee_pubkey,
|
repairee_pubkey,
|
||||||
my_root,
|
my_root,
|
||||||
|
@ -245,7 +255,16 @@ impl ClusterInfoRepairListener {
|
||||||
&repairee_addr,
|
&repairee_addr,
|
||||||
NUM_SLOTS_PER_UPDATE,
|
NUM_SLOTS_PER_UPDATE,
|
||||||
epoch_schedule,
|
epoch_schedule,
|
||||||
|
peer_infos
|
||||||
|
.get(repairee_pubkey)
|
||||||
|
.unwrap()
|
||||||
|
.last_repaired_slot_and_ts,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if let Ok(Some(new_last_repaired_slot)) = repair_results {
|
||||||
|
let peer_info = peer_infos.get_mut(repairee_pubkey).unwrap();
|
||||||
|
peer_info.last_repaired_slot_and_ts = (new_last_repaired_slot, timestamp());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,14 +283,15 @@ impl ClusterInfoRepairListener {
|
||||||
repairee_addr: &SocketAddr,
|
repairee_addr: &SocketAddr,
|
||||||
num_slots_to_repair: usize,
|
num_slots_to_repair: usize,
|
||||||
epoch_schedule: &EpochSchedule,
|
epoch_schedule: &EpochSchedule,
|
||||||
) -> Result<()> {
|
last_repaired_slot_and_ts: (u64, u64),
|
||||||
|
) -> Result<Option<u64>> {
|
||||||
let slot_iter = RootedSlotIterator::new(repairee_epoch_slots.root, &blocktree);
|
let slot_iter = RootedSlotIterator::new(repairee_epoch_slots.root, &blocktree);
|
||||||
if slot_iter.is_err() {
|
if slot_iter.is_err() {
|
||||||
info!(
|
info!(
|
||||||
"Root for repairee is on different fork. My root: {}, repairee_root: {} repairee_pubkey: {:?}",
|
"Root for repairee is on different fork. My root: {}, repairee_root: {} repairee_pubkey: {:?}",
|
||||||
my_root, repairee_epoch_slots.root, repairee_pubkey,
|
my_root, repairee_epoch_slots.root, repairee_pubkey,
|
||||||
);
|
);
|
||||||
return Ok(());
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut slot_iter = slot_iter?;
|
let mut slot_iter = slot_iter?;
|
||||||
|
@ -284,9 +304,13 @@ impl ClusterInfoRepairListener {
|
||||||
let max_confirmed_repairee_slot =
|
let max_confirmed_repairee_slot =
|
||||||
epoch_schedule.get_last_slot_in_epoch(max_confirmed_repairee_epoch);
|
epoch_schedule.get_last_slot_in_epoch(max_confirmed_repairee_epoch);
|
||||||
|
|
||||||
|
let last_repaired_slot = last_repaired_slot_and_ts.0;
|
||||||
|
let last_repaired_ts = last_repaired_slot_and_ts.1;
|
||||||
|
|
||||||
// Skip the first slot in the iterator because we know it's the root slot which the repairee
|
// Skip the first slot in the iterator because we know it's the root slot which the repairee
|
||||||
// already has
|
// already has
|
||||||
slot_iter.next();
|
slot_iter.next();
|
||||||
|
let mut new_repaired_slot: Option<u64> = None;
|
||||||
for (slot, slot_meta) in slot_iter {
|
for (slot, slot_meta) in slot_iter {
|
||||||
if slot > my_root
|
if slot > my_root
|
||||||
|| num_slots_repaired >= num_slots_to_repair
|
|| num_slots_repaired >= num_slots_to_repair
|
||||||
|
@ -303,15 +327,26 @@ impl ClusterInfoRepairListener {
|
||||||
// calculate_my_repairman_index_for_slot() will divide responsibility evenly across
|
// calculate_my_repairman_index_for_slot() will divide responsibility evenly across
|
||||||
// the cluster
|
// the cluster
|
||||||
let num_blobs_in_slot = slot_meta.received as usize;
|
let num_blobs_in_slot = slot_meta.received as usize;
|
||||||
|
|
||||||
|
// Check if I'm responsible for repairing this slots
|
||||||
if let Some(my_repair_indexes) = Self::calculate_my_repairman_index_for_slot(
|
if let Some(my_repair_indexes) = Self::calculate_my_repairman_index_for_slot(
|
||||||
my_pubkey,
|
my_pubkey,
|
||||||
&eligible_repairmen,
|
&eligible_repairmen,
|
||||||
num_blobs_in_slot,
|
num_blobs_in_slot,
|
||||||
REPAIR_REDUNDANCY,
|
REPAIR_REDUNDANCY,
|
||||||
) {
|
) {
|
||||||
|
// If I've already sent blobs >= this slot before, then don't send them again
|
||||||
|
// until the timeout has expired
|
||||||
|
if slot > last_repaired_slot
|
||||||
|
|| timestamp() - last_repaired_ts > REPAIR_SAME_SLOT_THRESHOLD
|
||||||
|
{
|
||||||
|
error!(
|
||||||
|
"Serving repair for slot {} to {}. Repairee slots: {:?}",
|
||||||
|
slot, repairee_pubkey, repairee_epoch_slots.slots
|
||||||
|
);
|
||||||
// Repairee is missing this slot, send them the blobs for this slot
|
// Repairee is missing this slot, send them the blobs for this slot
|
||||||
for blob_index in my_repair_indexes {
|
for blob_index in my_repair_indexes {
|
||||||
// Loop over the sblob indexes and query the database for these blob that
|
// Loop over the blob indexes and query the database for these blob that
|
||||||
// this node is reponsible for repairing. This should be faster than using
|
// this node is reponsible for repairing. This should be faster than using
|
||||||
// a database iterator over the slots because by the time this node is
|
// a database iterator over the slots because by the time this node is
|
||||||
// sending the blobs in this slot for repair, we expect these slots
|
// sending the blobs in this slot for repair, we expect these slots
|
||||||
|
@ -333,19 +368,35 @@ impl ClusterInfoRepairListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
new_repaired_slot = Some(slot);
|
||||||
|
Self::report_repair_metrics(
|
||||||
|
slot,
|
||||||
|
repairee_pubkey,
|
||||||
|
total_data_blobs_sent,
|
||||||
|
total_coding_blobs_sent,
|
||||||
|
);
|
||||||
|
total_data_blobs_sent = 0;
|
||||||
|
total_coding_blobs_sent = 0;
|
||||||
|
}
|
||||||
num_slots_repaired += 1;
|
num_slots_repaired += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::report_repair_metrics(total_data_blobs_sent, total_coding_blobs_sent);
|
Ok(new_repaired_slot)
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn report_repair_metrics(total_data_blobs_sent: u64, total_coding_blobs_sent: u64) {
|
fn report_repair_metrics(
|
||||||
|
slot: u64,
|
||||||
|
repairee_id: &Pubkey,
|
||||||
|
total_data_blobs_sent: u64,
|
||||||
|
total_coding_blobs_sent: u64,
|
||||||
|
) {
|
||||||
if total_data_blobs_sent > 0 || total_coding_blobs_sent > 0 {
|
if total_data_blobs_sent > 0 || total_coding_blobs_sent > 0 {
|
||||||
datapoint!(
|
datapoint!(
|
||||||
"repairman_activity",
|
"repairman_activity",
|
||||||
|
("slot", slot, i64),
|
||||||
|
("repairee_id", repairee_id.to_string(), String),
|
||||||
("data_sent", total_data_blobs_sent, i64),
|
("data_sent", total_data_blobs_sent, i64),
|
||||||
("coding_sent", total_coding_blobs_sent, i64)
|
("coding_sent", total_coding_blobs_sent, i64)
|
||||||
);
|
);
|
||||||
|
@ -407,14 +458,14 @@ impl ClusterInfoRepairListener {
|
||||||
fn find_eligible_repairmen<'a>(
|
fn find_eligible_repairmen<'a>(
|
||||||
my_pubkey: &'a Pubkey,
|
my_pubkey: &'a Pubkey,
|
||||||
repairee_root: u64,
|
repairee_root: u64,
|
||||||
repairman_roots: &'a HashMap<Pubkey, (u64, u64)>,
|
repairman_roots: &'a HashMap<Pubkey, RepaireeInfo>,
|
||||||
num_buffer_slots: usize,
|
num_buffer_slots: usize,
|
||||||
) -> Vec<&'a Pubkey> {
|
) -> Vec<&'a Pubkey> {
|
||||||
let mut repairmen: Vec<_> = repairman_roots
|
let mut repairmen: Vec<_> = repairman_roots
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(repairman_pubkey, (_, repairman_root))| {
|
.filter_map(|(repairman_pubkey, repairman_info)| {
|
||||||
if Self::should_repair_peer(
|
if Self::should_repair_peer(
|
||||||
*repairman_root,
|
repairman_info.last_root,
|
||||||
repairee_root,
|
repairee_root,
|
||||||
num_buffer_slots - GOSSIP_DELAY_SLOTS,
|
num_buffer_slots - GOSSIP_DELAY_SLOTS,
|
||||||
) {
|
) {
|
||||||
|
@ -461,8 +512,8 @@ impl ClusterInfoRepairListener {
|
||||||
repairman_root > repairee_root + num_buffer_slots as u64
|
repairman_root > repairee_root + num_buffer_slots as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_last_ts(pubkey: &Pubkey, peer_roots: &mut HashMap<Pubkey, (u64, u64)>) -> Option<u64> {
|
fn get_last_ts(pubkey: &Pubkey, peer_infos: &mut HashMap<Pubkey, RepaireeInfo>) -> Option<u64> {
|
||||||
peer_roots.get(pubkey).map(|(last_ts, _)| *last_ts)
|
peer_infos.get(pubkey).map(|p| p.last_ts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -564,7 +615,7 @@ mod tests {
|
||||||
);
|
);
|
||||||
|
|
||||||
// Set up locally cached information
|
// Set up locally cached information
|
||||||
let mut peer_roots = HashMap::new();
|
let mut peer_info = HashMap::new();
|
||||||
let mut my_gossiped_root = repairee_root;
|
let mut my_gossiped_root = repairee_root;
|
||||||
|
|
||||||
// Root is not sufficiently far ahead, we shouldn't repair
|
// Root is not sufficiently far ahead, we shouldn't repair
|
||||||
|
@ -572,7 +623,7 @@ mod tests {
|
||||||
&my_pubkey,
|
&my_pubkey,
|
||||||
&peer_pubkey,
|
&peer_pubkey,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut peer_roots,
|
&mut peer_info,
|
||||||
&mut my_gossiped_root,
|
&mut my_gossiped_root,
|
||||||
)
|
)
|
||||||
.is_none());
|
.is_none());
|
||||||
|
@ -584,7 +635,7 @@ mod tests {
|
||||||
&my_pubkey,
|
&my_pubkey,
|
||||||
&peer_pubkey,
|
&peer_pubkey,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut peer_roots,
|
&mut peer_info,
|
||||||
&mut my_gossiped_root,
|
&mut my_gossiped_root,
|
||||||
)
|
)
|
||||||
.is_some());
|
.is_some());
|
||||||
|
@ -596,7 +647,7 @@ mod tests {
|
||||||
&my_pubkey,
|
&my_pubkey,
|
||||||
&peer_pubkey,
|
&peer_pubkey,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut peer_roots,
|
&mut peer_info,
|
||||||
&mut my_gossiped_root,
|
&mut my_gossiped_root,
|
||||||
)
|
)
|
||||||
.is_none());
|
.is_none());
|
||||||
|
@ -612,12 +663,78 @@ mod tests {
|
||||||
&my_pubkey,
|
&my_pubkey,
|
||||||
&peer_pubkey,
|
&peer_pubkey,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut peer_roots,
|
&mut peer_info,
|
||||||
&mut my_gossiped_root,
|
&mut my_gossiped_root,
|
||||||
)
|
)
|
||||||
.is_some());
|
.is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_serve_same_repairs_to_repairee() {
|
||||||
|
let blocktree_path = get_tmp_ledger_path!();
|
||||||
|
let blocktree = Blocktree::open(&blocktree_path).unwrap();
|
||||||
|
let num_slots = 2;
|
||||||
|
let (shreds, _) = make_many_slot_entries(0, num_slots, 1);
|
||||||
|
blocktree.insert_shreds(shreds, None).unwrap();
|
||||||
|
|
||||||
|
// Write roots so that these slots will qualify to be sent by the repairman
|
||||||
|
let last_root = num_slots - 1;
|
||||||
|
let roots: Vec<_> = (0..=last_root).collect();
|
||||||
|
blocktree.set_roots(&roots).unwrap();
|
||||||
|
|
||||||
|
// Set up my information
|
||||||
|
let my_pubkey = Pubkey::new_rand();
|
||||||
|
let my_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
|
||||||
|
// Set up a mock repairee with a socket listening for incoming repairs
|
||||||
|
let mock_repairee = MockRepairee::make_mock_repairee();
|
||||||
|
|
||||||
|
// Set up the repairee's EpochSlots, such that they are missing every odd indexed slot
|
||||||
|
// in the range (repairee_root, num_slots]
|
||||||
|
let repairee_root = 0;
|
||||||
|
let repairee_slots: BTreeSet<_> = (0..=num_slots).step_by(2).collect();
|
||||||
|
let repairee_epoch_slots =
|
||||||
|
EpochSlots::new(mock_repairee.id, repairee_root, repairee_slots, 1);
|
||||||
|
let eligible_repairmen = vec![&my_pubkey];
|
||||||
|
let epoch_schedule = EpochSchedule::custom(32, 16, false);
|
||||||
|
assert!(ClusterInfoRepairListener::serve_repairs_to_repairee(
|
||||||
|
&my_pubkey,
|
||||||
|
&mock_repairee.id,
|
||||||
|
num_slots - 1,
|
||||||
|
&blocktree,
|
||||||
|
&repairee_epoch_slots,
|
||||||
|
&eligible_repairmen,
|
||||||
|
&my_socket,
|
||||||
|
&mock_repairee.tvu_address,
|
||||||
|
1,
|
||||||
|
&epoch_schedule,
|
||||||
|
// Simulate having already sent a slot very recently
|
||||||
|
(last_root, timestamp()),
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
.is_none());
|
||||||
|
|
||||||
|
// Simulate the threshold having elapsed, allowing the repairman
|
||||||
|
// to send the slot again
|
||||||
|
assert_eq!(
|
||||||
|
ClusterInfoRepairListener::serve_repairs_to_repairee(
|
||||||
|
&my_pubkey,
|
||||||
|
&mock_repairee.id,
|
||||||
|
num_slots - 1,
|
||||||
|
&blocktree,
|
||||||
|
&repairee_epoch_slots,
|
||||||
|
&eligible_repairmen,
|
||||||
|
&my_socket,
|
||||||
|
&mock_repairee.tvu_address,
|
||||||
|
1,
|
||||||
|
&epoch_schedule,
|
||||||
|
(last_root, timestamp() - REPAIR_SAME_SLOT_THRESHOLD * 2),
|
||||||
|
)
|
||||||
|
.unwrap(),
|
||||||
|
Some(1)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_serve_repairs_to_repairee() {
|
fn test_serve_repairs_to_repairee() {
|
||||||
let blocktree_path = get_tmp_ledger_path!();
|
let blocktree_path = get_tmp_ledger_path!();
|
||||||
|
@ -671,6 +788,7 @@ mod tests {
|
||||||
&mock_repairee.tvu_address,
|
&mock_repairee.tvu_address,
|
||||||
num_missing_slots as usize,
|
num_missing_slots as usize,
|
||||||
&epoch_schedule,
|
&epoch_schedule,
|
||||||
|
(0, 0),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
@ -741,6 +859,7 @@ mod tests {
|
||||||
&mock_repairee.tvu_address,
|
&mock_repairee.tvu_address,
|
||||||
1 as usize,
|
1 as usize,
|
||||||
&epoch_schedule,
|
&epoch_schedule,
|
||||||
|
(0, 0),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
@ -763,6 +882,7 @@ mod tests {
|
||||||
&mock_repairee.tvu_address,
|
&mock_repairee.tvu_address,
|
||||||
1 as usize,
|
1 as usize,
|
||||||
&epoch_schedule,
|
&epoch_schedule,
|
||||||
|
(0, 0),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue