From 33f4e79589ff1b5d531aed1cff2f019b2fd57ead Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Mon, 18 Oct 2021 14:01:52 -0500 Subject: [PATCH] Refactor out get_rpc_peers() (#20744) --- validator/src/bootstrap.rs | 149 +++++++++++++++++++++---------------- 1 file changed, 84 insertions(+), 65 deletions(-) diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index fc37737724..8b9a4667f3 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -101,6 +101,80 @@ fn start_gossip_node( (cluster_info, gossip_exit_flag, gossip_service) } +fn get_rpc_peers( + cluster_info: &ClusterInfo, + cluster_entrypoints: &[ContactInfo], + validator_config: &ValidatorConfig, + blacklisted_rpc_nodes: &mut HashSet, + blacklist_timeout: &Instant, + retry_reason: &mut Option, +) -> Option> { + let shred_version = validator_config + .expected_shred_version + .unwrap_or_else(|| cluster_info.my_shred_version()); + if shred_version == 0 { + let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| { + cluster_info + .lookup_contact_info_by_gossip_addr(&cluster_entrypoint.gossip) + .map_or(false, |entrypoint| entrypoint.shred_version == 0) + }); + + if all_zero_shred_versions { + eprintln!("Entrypoint shred version is zero. Restart with --expected-shred-version"); + exit(1); + } + info!("Waiting to adopt entrypoint shred version..."); + return None; + } + + info!( + "Searching for an RPC service with shred version {}{}...", + shred_version, + retry_reason + .as_ref() + .map(|s| format!(" (Retrying: {})", s)) + .unwrap_or_default() + ); + + let rpc_peers = cluster_info + .all_rpc_peers() + .into_iter() + .filter(|contact_info| contact_info.shred_version == shred_version) + .collect::>(); + let rpc_peers_total = rpc_peers.len(); + + // Filter out blacklisted nodes + let rpc_peers: Vec<_> = rpc_peers + .into_iter() + .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(&rpc_peer.id)) + .collect(); + let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len(); + let rpc_peers_trusted = rpc_peers + .iter() + .filter(|rpc_peer| is_trusted_validator(&rpc_peer.id, &validator_config.trusted_validators)) + .count(); + + info!( + "Total {} RPC nodes found. {} known, {} blacklisted ", + rpc_peers_total, rpc_peers_trusted, rpc_peers_blacklisted + ); + + if rpc_peers_blacklisted == rpc_peers_total { + *retry_reason = + if !blacklisted_rpc_nodes.is_empty() && blacklist_timeout.elapsed().as_secs() > 60 { + // If all nodes are blacklisted and no additional nodes are discovered after 60 seconds, + // remove the blacklist and try them all again + blacklisted_rpc_nodes.clear(); + Some("Blacklist timeout expired".to_owned()) + } else { + Some("Wait for known rpc peers".to_owned()) + }; + return None; + } + + Some(rpc_peers) +} + fn get_rpc_node( cluster_info: &ClusterInfo, cluster_entrypoints: &[ContactInfo], @@ -117,73 +191,18 @@ fn get_rpc_node( sleep(Duration::from_secs(1)); info!("\n{}", cluster_info.rpc_info_trace()); - let shred_version = validator_config - .expected_shred_version - .unwrap_or_else(|| cluster_info.my_shred_version()); - if shred_version == 0 { - let all_zero_shred_versions = cluster_entrypoints.iter().all(|cluster_entrypoint| { - cluster_info - .lookup_contact_info_by_gossip_addr(&cluster_entrypoint.gossip) - .map_or(false, |entrypoint| entrypoint.shred_version == 0) - }); - - if all_zero_shred_versions { - eprintln!( - "Entrypoint shred version is zero. Restart with --expected-shred-version" - ); - exit(1); - } - info!("Waiting to adopt entrypoint shred version..."); - continue; - } - - info!( - "Searching for an RPC service with shred version {}{}...", - shred_version, - retry_reason - .as_ref() - .map(|s| format!(" (Retrying: {})", s)) - .unwrap_or_default() - ); - - let rpc_peers = cluster_info - .all_rpc_peers() - .into_iter() - .filter(|contact_info| contact_info.shred_version == shred_version) - .collect::>(); - let rpc_peers_total = rpc_peers.len(); - - // Filter out blacklisted nodes - let rpc_peers: Vec<_> = rpc_peers - .into_iter() - .filter(|rpc_peer| !blacklisted_rpc_nodes.contains(&rpc_peer.id)) - .collect(); - let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len(); - let rpc_peers_trusted = rpc_peers - .iter() - .filter(|rpc_peer| { - is_trusted_validator(&rpc_peer.id, &validator_config.trusted_validators) - }) - .count(); - - info!( - "Total {} RPC nodes found. {} known, {} blacklisted ", - rpc_peers_total, rpc_peers_trusted, rpc_peers_blacklisted - ); - - if rpc_peers_blacklisted == rpc_peers_total { - retry_reason = if !blacklisted_rpc_nodes.is_empty() - && blacklist_timeout.elapsed().as_secs() > 60 - { - // If all nodes are blacklisted and no additional nodes are discovered after 60 seconds, - // remove the blacklist and try them all again - blacklisted_rpc_nodes.clear(); - Some("Blacklist timeout expired".to_owned()) - } else { - Some("Wait for known rpc peers".to_owned()) - }; + let rpc_peers = get_rpc_peers( + cluster_info, + cluster_entrypoints, + validator_config, + blacklisted_rpc_nodes, + &blacklist_timeout, + &mut retry_reason, + ); + if rpc_peers.is_none() { continue; } + let rpc_peers = rpc_peers.unwrap(); blacklist_timeout = Instant::now(); let mut highest_snapshot_hash = get_highest_local_snapshot_hash(snapshot_archives_dir);