Rename id to pubkey in cluster_info_repair_listener (#4421)
This commit is contained in:
parent
57f8a15b96
commit
cf4bb70d80
|
@ -95,7 +95,7 @@ impl ClusterInfoRepairListener {
|
||||||
let _ = Self::recv_loop(
|
let _ = Self::recv_loop(
|
||||||
&blocktree,
|
&blocktree,
|
||||||
&mut peer_roots,
|
&mut peer_roots,
|
||||||
exit,
|
&exit,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&epoch_schedule,
|
&epoch_schedule,
|
||||||
);
|
);
|
||||||
|
@ -109,7 +109,7 @@ impl ClusterInfoRepairListener {
|
||||||
fn recv_loop(
|
fn recv_loop(
|
||||||
blocktree: &Blocktree,
|
blocktree: &Blocktree,
|
||||||
peer_roots: &mut HashMap<Pubkey, (u64, u64)>,
|
peer_roots: &mut HashMap<Pubkey, (u64, u64)>,
|
||||||
exit: Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
epoch_schedule: &EpochSchedule,
|
epoch_schedule: &EpochSchedule,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
@ -156,22 +156,22 @@ impl ClusterInfoRepairListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_potential_repairee(
|
fn process_potential_repairee(
|
||||||
my_id: &Pubkey,
|
my_pubkey: &Pubkey,
|
||||||
peer_id: &Pubkey,
|
peer_pubkey: &Pubkey,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
peer_roots: &mut HashMap<Pubkey, (u64, u64)>,
|
peer_roots: &mut HashMap<Pubkey, (u64, u64)>,
|
||||||
my_gossiped_root: &mut u64,
|
my_gossiped_root: &mut u64,
|
||||||
) -> Option<EpochSlots> {
|
) -> Option<EpochSlots> {
|
||||||
let last_cached_repair_ts = Self::get_last_ts(peer_id, peer_roots);
|
let last_cached_repair_ts = Self::get_last_ts(peer_pubkey, peer_roots);
|
||||||
let my_root = Self::read_my_gossiped_root(&my_id, cluster_info, my_gossiped_root);
|
let my_root = Self::read_my_gossiped_root(&my_pubkey, cluster_info, my_gossiped_root);
|
||||||
{
|
{
|
||||||
let r_cluster_info = cluster_info.read().unwrap();
|
let r_cluster_info = cluster_info.read().unwrap();
|
||||||
|
|
||||||
// Update our local map with the updated peers' information
|
// Update our local map with the updated peers' information
|
||||||
if let Some((peer_epoch_slots, updated_ts)) =
|
if let Some((peer_epoch_slots, updated_ts)) =
|
||||||
r_cluster_info.get_epoch_state_for_node(&peer_id, last_cached_repair_ts)
|
r_cluster_info.get_epoch_state_for_node(&peer_pubkey, last_cached_repair_ts)
|
||||||
{
|
{
|
||||||
let peer_entry = peer_roots.entry(*peer_id).or_default();
|
let peer_entry = peer_roots.entry(*peer_pubkey).or_default();
|
||||||
let peer_root = cmp::max(peer_epoch_slots.root, peer_entry.1);
|
let peer_root = cmp::max(peer_epoch_slots.root, peer_entry.1);
|
||||||
let mut result = None;
|
let mut result = None;
|
||||||
let last_repair_ts = {
|
let last_repair_ts = {
|
||||||
|
@ -401,13 +401,13 @@ impl ClusterInfoRepairListener {
|
||||||
) -> Vec<&'a Pubkey> {
|
) -> Vec<&'a Pubkey> {
|
||||||
let mut repairmen: Vec<_> = repairman_roots
|
let mut repairmen: Vec<_> = repairman_roots
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(repairman_id, (_, repairman_root))| {
|
.filter_map(|(repairman_pubkey, (_, repairman_root))| {
|
||||||
if Self::should_repair_peer(
|
if Self::should_repair_peer(
|
||||||
*repairman_root,
|
*repairman_root,
|
||||||
repairee_root,
|
repairee_root,
|
||||||
num_buffer_slots - GOSSIP_DELAY_SLOTS,
|
num_buffer_slots - GOSSIP_DELAY_SLOTS,
|
||||||
) {
|
) {
|
||||||
Some(repairman_id)
|
Some(repairman_pubkey)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
@ -535,8 +535,8 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_process_potential_repairee() {
|
fn test_process_potential_repairee() {
|
||||||
// Set up node ids
|
// Set up node ids
|
||||||
let my_id = Pubkey::new_rand();
|
let my_pubkey = Pubkey::new_rand();
|
||||||
let peer_id = Pubkey::new_rand();
|
let peer_pubkey = Pubkey::new_rand();
|
||||||
|
|
||||||
// Set up cluster_info
|
// Set up cluster_info
|
||||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
||||||
|
@ -547,7 +547,7 @@ mod tests {
|
||||||
let repairee_root = 0;
|
let repairee_root = 0;
|
||||||
let repairee_slots = BTreeSet::new();
|
let repairee_slots = BTreeSet::new();
|
||||||
cluster_info.write().unwrap().push_epoch_slots(
|
cluster_info.write().unwrap().push_epoch_slots(
|
||||||
peer_id,
|
peer_pubkey,
|
||||||
repairee_root,
|
repairee_root,
|
||||||
repairee_slots.clone(),
|
repairee_slots.clone(),
|
||||||
);
|
);
|
||||||
|
@ -558,8 +558,8 @@ mod tests {
|
||||||
|
|
||||||
// Root is not sufficiently far ahead, we shouldn't repair
|
// Root is not sufficiently far ahead, we shouldn't repair
|
||||||
assert!(ClusterInfoRepairListener::process_potential_repairee(
|
assert!(ClusterInfoRepairListener::process_potential_repairee(
|
||||||
&my_id,
|
&my_pubkey,
|
||||||
&peer_id,
|
&peer_pubkey,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut peer_roots,
|
&mut peer_roots,
|
||||||
&mut my_gossiped_root,
|
&mut my_gossiped_root,
|
||||||
|
@ -570,8 +570,8 @@ mod tests {
|
||||||
// object in gossip is not updated
|
// object in gossip is not updated
|
||||||
my_gossiped_root = repairee_root + NUM_BUFFER_SLOTS as u64 + 1;
|
my_gossiped_root = repairee_root + NUM_BUFFER_SLOTS as u64 + 1;
|
||||||
assert!(ClusterInfoRepairListener::process_potential_repairee(
|
assert!(ClusterInfoRepairListener::process_potential_repairee(
|
||||||
&my_id,
|
&my_pubkey,
|
||||||
&peer_id,
|
&peer_pubkey,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut peer_roots,
|
&mut peer_roots,
|
||||||
&mut my_gossiped_root,
|
&mut my_gossiped_root,
|
||||||
|
@ -582,22 +582,24 @@ mod tests {
|
||||||
// even if our root moves forward
|
// even if our root moves forward
|
||||||
my_gossiped_root += 4;
|
my_gossiped_root += 4;
|
||||||
assert!(ClusterInfoRepairListener::process_potential_repairee(
|
assert!(ClusterInfoRepairListener::process_potential_repairee(
|
||||||
&my_id,
|
&my_pubkey,
|
||||||
&peer_id,
|
&peer_pubkey,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut peer_roots,
|
&mut peer_roots,
|
||||||
&mut my_gossiped_root,
|
&mut my_gossiped_root,
|
||||||
)
|
)
|
||||||
.is_none());
|
.is_none());
|
||||||
|
|
||||||
// Update the gossiped EpochSlots. Now a repair should be sent again
|
// Sleep to make sure the timestamp is updated in gossip. Update the gossiped EpochSlots.
|
||||||
|
// Now a repair should be sent again
|
||||||
|
sleep(Duration::from_millis(10));
|
||||||
cluster_info
|
cluster_info
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.push_epoch_slots(peer_id, repairee_root, repairee_slots);
|
.push_epoch_slots(peer_pubkey, repairee_root, repairee_slots);
|
||||||
assert!(ClusterInfoRepairListener::process_potential_repairee(
|
assert!(ClusterInfoRepairListener::process_potential_repairee(
|
||||||
&my_id,
|
&my_pubkey,
|
||||||
&peer_id,
|
&peer_pubkey,
|
||||||
&cluster_info,
|
&cluster_info,
|
||||||
&mut peer_roots,
|
&mut peer_roots,
|
||||||
&mut my_gossiped_root,
|
&mut my_gossiped_root,
|
||||||
|
@ -645,9 +647,9 @@ mod tests {
|
||||||
// Have all the repairman send the repairs
|
// Have all the repairman send the repairs
|
||||||
let epoch_schedule = EpochSchedule::new(32, 16, false);
|
let epoch_schedule = EpochSchedule::new(32, 16, false);
|
||||||
let num_missing_slots = num_slots / 2;
|
let num_missing_slots = num_slots / 2;
|
||||||
for repairman_id in &eligible_repairmen {
|
for repairman_pubkey in &eligible_repairmen {
|
||||||
ClusterInfoRepairListener::serve_repairs_to_repairee(
|
ClusterInfoRepairListener::serve_repairs_to_repairee(
|
||||||
&repairman_id,
|
&repairman_pubkey,
|
||||||
num_slots - 1,
|
num_slots - 1,
|
||||||
&blocktree,
|
&blocktree,
|
||||||
&repairee_epoch_slots,
|
&repairee_epoch_slots,
|
||||||
|
|
Loading…
Reference in New Issue