Correctly describe repair and retransmit peers (#2110)
This commit is contained in:
parent
9243bc58db
commit
4788a4f775
|
@ -263,6 +263,20 @@ impl ClusterInfo {
|
||||||
|
|
||||||
/// compute broadcast table
|
/// compute broadcast table
|
||||||
pub fn tvu_peers(&self) -> Vec<NodeInfo> {
|
pub fn tvu_peers(&self) -> Vec<NodeInfo> {
|
||||||
|
let me = self.my_data().id;
|
||||||
|
self.gossip
|
||||||
|
.crds
|
||||||
|
.table
|
||||||
|
.values()
|
||||||
|
.filter_map(|x| x.value.contact_info())
|
||||||
|
.filter(|x| x.id != me)
|
||||||
|
.filter(|x| ContactInfo::is_valid_address(&x.tvu))
|
||||||
|
.cloned()
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// all peers that have a valid tvu except the leader
|
||||||
|
pub fn retransmit_peers(&self) -> Vec<NodeInfo> {
|
||||||
let me = self.my_data().id;
|
let me = self.my_data().id;
|
||||||
self.gossip
|
self.gossip
|
||||||
.crds
|
.crds
|
||||||
|
@ -275,6 +289,14 @@ impl ClusterInfo {
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// all tvu peers with valid gossip addrs
|
||||||
|
pub fn repair_peers(&self) -> Vec<NodeInfo> {
|
||||||
|
ClusterInfo::tvu_peers(self)
|
||||||
|
.into_iter()
|
||||||
|
.filter(|x| ContactInfo::is_valid_address(&x.gossip))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
/// compute broadcast table
|
/// compute broadcast table
|
||||||
pub fn tpu_peers(&self) -> Vec<NodeInfo> {
|
pub fn tpu_peers(&self) -> Vec<NodeInfo> {
|
||||||
let me = self.my_data().id;
|
let me = self.my_data().id;
|
||||||
|
@ -354,7 +376,7 @@ impl ClusterInfo {
|
||||||
let (me, orders): (NodeInfo, Vec<NodeInfo>) = {
|
let (me, orders): (NodeInfo, Vec<NodeInfo>) = {
|
||||||
// copy to avoid locking during IO
|
// copy to avoid locking during IO
|
||||||
let s = obj.read().expect("'obj' read lock in pub fn retransmit");
|
let s = obj.read().expect("'obj' read lock in pub fn retransmit");
|
||||||
(s.my_data().clone(), s.tvu_peers())
|
(s.my_data().clone(), s.retransmit_peers())
|
||||||
};
|
};
|
||||||
blob.write()
|
blob.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -513,7 +535,7 @@ impl ClusterInfo {
|
||||||
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
|
pub fn window_index_request(&self, ix: u64) -> Result<(SocketAddr, Vec<u8>)> {
|
||||||
// find a peer that appears to be accepting replication, as indicated
|
// find a peer that appears to be accepting replication, as indicated
|
||||||
// by a valid tvu port location
|
// by a valid tvu port location
|
||||||
let valid: Vec<_> = self.gossip_peers();
|
let valid: Vec<_> = self.repair_peers();
|
||||||
if valid.is_empty() {
|
if valid.is_empty() {
|
||||||
Err(ClusterInfoError::NoPeers)?;
|
Err(ClusterInfoError::NoPeers)?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ pub fn repair(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_peers = rcluster_info.tvu_peers().len() as u64;
|
let num_peers = rcluster_info.repair_peers().len() as u64;
|
||||||
|
|
||||||
// Check if there's a max_entry_height limitation
|
// Check if there's a max_entry_height limitation
|
||||||
let max_repair_entry_height = if max_entry_height == 0 {
|
let max_repair_entry_height = if max_entry_height == 0 {
|
||||||
|
|
|
@ -176,7 +176,7 @@ impl WindowUtil for Window {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let num_peers = rcluster_info.tvu_peers().len() as u64;
|
let num_peers = rcluster_info.repair_peers().len() as u64;
|
||||||
let max_repair = if max_entry_height == 0 {
|
let max_repair = if max_entry_height == 0 {
|
||||||
calculate_max_repair(
|
calculate_max_repair(
|
||||||
num_peers,
|
num_peers,
|
||||||
|
|
Loading…
Reference in New Issue