Minor --expected-shred fix, clean up shred-related gossip log messages (#8041)
automerge
This commit is contained in:
parent
dd276138c2
commit
775fa0c968
|
@ -703,7 +703,7 @@ impl Archiver {
|
||||||
) -> Result<u64> {
|
) -> Result<u64> {
|
||||||
let rpc_peers = {
|
let rpc_peers = {
|
||||||
let cluster_info = cluster_info.read().unwrap();
|
let cluster_info = cluster_info.read().unwrap();
|
||||||
cluster_info.rpc_peers()
|
cluster_info.all_rpc_peers()
|
||||||
};
|
};
|
||||||
debug!("rpc peers: {:?}", rpc_peers);
|
debug!("rpc peers: {:?}", rpc_peers);
|
||||||
if !rpc_peers.is_empty() {
|
if !rpc_peers.is_empty() {
|
||||||
|
@ -759,7 +759,7 @@ impl Archiver {
|
||||||
loop {
|
loop {
|
||||||
let rpc_peers = {
|
let rpc_peers = {
|
||||||
let cluster_info = cluster_info.read().unwrap();
|
let cluster_info = cluster_info.read().unwrap();
|
||||||
cluster_info.rpc_peers()
|
cluster_info.all_rpc_peers()
|
||||||
};
|
};
|
||||||
debug!("rpc peers: {:?}", rpc_peers);
|
debug!("rpc peers: {:?}", rpc_peers);
|
||||||
if !rpc_peers.is_empty() {
|
if !rpc_peers.is_empty() {
|
||||||
|
|
|
@ -405,7 +405,8 @@ impl ClusterInfo {
|
||||||
.map(|x| x.value.contact_info().unwrap())
|
.map(|x| x.value.contact_info().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rpc_peers(&self) -> Vec<ContactInfo> {
|
/// all validators that have a valid rpc port regardless of `shred_version`.
|
||||||
|
pub fn all_rpc_peers(&self) -> Vec<ContactInfo> {
|
||||||
let me = self.my_data();
|
let me = self.my_data();
|
||||||
self.gossip
|
self.gossip
|
||||||
.crds
|
.crds
|
||||||
|
@ -413,8 +414,6 @@ impl ClusterInfo {
|
||||||
.values()
|
.values()
|
||||||
.filter_map(|x| x.value.contact_info())
|
.filter_map(|x| x.value.contact_info())
|
||||||
.filter(|x| x.id != me.id)
|
.filter(|x| x.id != me.id)
|
||||||
/* shred_version not considered for rpc peers (ie, caller must select version
|
|
||||||
if desired) */
|
|
||||||
.filter(|x| ContactInfo::is_valid_address(&x.rpc))
|
.filter(|x| ContactInfo::is_valid_address(&x.rpc))
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect()
|
.collect()
|
||||||
|
@ -1132,23 +1131,25 @@ impl ClusterInfo {
|
||||||
let table_size = obj.read().unwrap().gossip.crds.table.len();
|
let table_size = obj.read().unwrap().gossip.crds.table.len();
|
||||||
datapoint_debug!(
|
datapoint_debug!(
|
||||||
"cluster_info-purge",
|
"cluster_info-purge",
|
||||||
("tabel_size", table_size as i64, i64),
|
("table_size", table_size as i64, i64),
|
||||||
("purge_stake_timeout", timeout as i64, i64)
|
("purge_stake_timeout", timeout as i64, i64)
|
||||||
);
|
);
|
||||||
// Adopt the entrypoint's `shred_version` if ours is unset
|
// Adopt the entrypoint's `shred_version` if ours is unset
|
||||||
if adopt_shred_version {
|
if adopt_shred_version {
|
||||||
// If gossip was given an entrypoint, lookup its id
|
// If gossip was given an entrypoint, lookup its id
|
||||||
let entrypoint_id = obj.read().unwrap().entrypoint.as_ref().map(|e| e.id);
|
let entrypoint_id = obj.read().unwrap().entrypoint.as_ref().map(|e| e.id);
|
||||||
if let Some(entrypoint_id) = entrypoint_id {
|
if let Some(entrypoint_id) = entrypoint_id {
|
||||||
info!("Shred version unknown, looking for the entrypoint:{:?} Shred version", entrypoint_id);
|
|
||||||
// If a pull from the entrypoint was successful, it should exist in the crds table
|
// If a pull from the entrypoint was successful, it should exist in the crds table
|
||||||
let entrypoint = obj.read().unwrap().lookup(&entrypoint_id).cloned();
|
let entrypoint = obj.read().unwrap().lookup(&entrypoint_id).cloned();
|
||||||
if let Some(entrypoint) = entrypoint {
|
if let Some(entrypoint) = entrypoint {
|
||||||
let mut self_info = obj.read().unwrap().my_data();
|
let mut self_info = obj.read().unwrap().my_data();
|
||||||
if entrypoint.shred_version == 0 {
|
if entrypoint.shred_version == 0 {
|
||||||
warn!("entrypoint is running an invalid shred_version: 0");
|
info!("Unable to adopt entrypoint's shred version");
|
||||||
} else {
|
} else {
|
||||||
info!("Setting Shred version to {:?} from entrypoint {:?}", entrypoint.shred_version, entrypoint.id);
|
info!(
|
||||||
|
"Setting shred version to {:?} from entrypoint {:?}",
|
||||||
|
entrypoint.shred_version, entrypoint.id
|
||||||
|
);
|
||||||
self_info.shred_version = entrypoint.shred_version;
|
self_info.shred_version = entrypoint.shred_version;
|
||||||
obj.write().unwrap().insert_self(self_info);
|
obj.write().unwrap().insert_self(self_info);
|
||||||
adopt_shred_version = false;
|
adopt_shred_version = false;
|
||||||
|
|
|
@ -225,7 +225,7 @@ fn get_rpc_addr(
|
||||||
cluster_info.read().unwrap().contact_info_trace()
|
cluster_info.read().unwrap().contact_info_trace()
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut rpc_peers = cluster_info.read().unwrap().rpc_peers();
|
let mut rpc_peers = cluster_info.read().unwrap().all_rpc_peers();
|
||||||
|
|
||||||
let shred_version_required = !rpc_peers
|
let shred_version_required = !rpc_peers
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -246,10 +246,10 @@ fn get_rpc_addr(
|
||||||
.find(|contact_info| contact_info.gossip == *entrypoint_gossip)
|
.find(|contact_info| contact_info.gossip == *entrypoint_gossip)
|
||||||
{
|
{
|
||||||
Some(contact_info.clone())
|
Some(contact_info.clone())
|
||||||
} else if shred_version_required {
|
} else if shred_version_required && expected_shred_version.is_none() {
|
||||||
// Require the user supply a shred version if there are conflicting shred version in
|
// Require the user supply a shred version if there are conflicting shred version in
|
||||||
// gossip to reduce the chance of human error
|
// gossip to reduce the chance of human error
|
||||||
warn!("Multiple shred versions detected, unable to select an RPC service. Restart with --expected-shred-version");
|
warn!("Multiple shred versions in gossip. Restart with --expected-shred-version");
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
// Pick a node at random
|
// Pick a node at random
|
||||||
|
|
Loading…
Reference in New Issue