Move bootstrap code into inner module (#20730)
This commit is contained in:
parent
5794bba65c
commit
3f559cc2c9
|
@ -11,7 +11,6 @@ use {
|
||||||
gossip_service::GossipService,
|
gossip_service::GossipService,
|
||||||
},
|
},
|
||||||
solana_runtime::{
|
solana_runtime::{
|
||||||
hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
|
|
||||||
snapshot_archive_info::SnapshotArchiveInfoGetter,
|
snapshot_archive_info::SnapshotArchiveInfoGetter,
|
||||||
snapshot_package::SnapshotType,
|
snapshot_package::SnapshotType,
|
||||||
snapshot_utils::{
|
snapshot_utils::{
|
||||||
|
@ -41,6 +40,118 @@ use {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct RpcBootstrapConfig {
|
||||||
|
pub no_genesis_fetch: bool,
|
||||||
|
pub no_snapshot_fetch: bool,
|
||||||
|
pub no_untrusted_rpc: bool,
|
||||||
|
pub max_genesis_archive_unpacked_size: u64,
|
||||||
|
pub no_check_vote_account: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
|
pub fn rpc_bootstrap(
|
||||||
|
node: &Node,
|
||||||
|
identity_keypair: &Arc<Keypair>,
|
||||||
|
ledger_path: &Path,
|
||||||
|
snapshot_archives_dir: &Path,
|
||||||
|
vote_account: &Pubkey,
|
||||||
|
authorized_voter_keypairs: Arc<RwLock<Vec<Arc<Keypair>>>>,
|
||||||
|
cluster_entrypoints: &[ContactInfo],
|
||||||
|
validator_config: &mut ValidatorConfig,
|
||||||
|
bootstrap_config: RpcBootstrapConfig,
|
||||||
|
no_port_check: bool,
|
||||||
|
use_progress_bar: bool,
|
||||||
|
maximum_local_snapshot_age: Slot,
|
||||||
|
should_check_duplicate_instance: bool,
|
||||||
|
start_progress: &Arc<RwLock<ValidatorStartProgress>>,
|
||||||
|
minimal_snapshot_download_speed: f32,
|
||||||
|
maximum_snapshot_download_abort: u64,
|
||||||
|
socket_addr_space: SocketAddrSpace,
|
||||||
|
) {
|
||||||
|
without_incremental_snapshots::rpc_bootstrap(
|
||||||
|
node,
|
||||||
|
identity_keypair,
|
||||||
|
ledger_path,
|
||||||
|
snapshot_archives_dir,
|
||||||
|
vote_account,
|
||||||
|
authorized_voter_keypairs,
|
||||||
|
cluster_entrypoints,
|
||||||
|
validator_config,
|
||||||
|
bootstrap_config,
|
||||||
|
no_port_check,
|
||||||
|
use_progress_bar,
|
||||||
|
maximum_local_snapshot_age,
|
||||||
|
should_check_duplicate_instance,
|
||||||
|
start_progress,
|
||||||
|
minimal_snapshot_download_speed,
|
||||||
|
maximum_snapshot_download_abort,
|
||||||
|
socket_addr_space,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn verify_reachable_ports(
|
||||||
|
node: &Node,
|
||||||
|
cluster_entrypoint: &ContactInfo,
|
||||||
|
validator_config: &ValidatorConfig,
|
||||||
|
socket_addr_space: &SocketAddrSpace,
|
||||||
|
) -> bool {
|
||||||
|
let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
|
||||||
|
|
||||||
|
if ContactInfo::is_valid_address(&node.info.serve_repair, socket_addr_space) {
|
||||||
|
udp_sockets.push(&node.sockets.serve_repair);
|
||||||
|
}
|
||||||
|
if ContactInfo::is_valid_address(&node.info.tpu, socket_addr_space) {
|
||||||
|
udp_sockets.extend(node.sockets.tpu.iter());
|
||||||
|
}
|
||||||
|
if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) {
|
||||||
|
udp_sockets.extend(node.sockets.tpu_forwards.iter());
|
||||||
|
}
|
||||||
|
if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) {
|
||||||
|
udp_sockets.extend(node.sockets.tpu_vote.iter());
|
||||||
|
}
|
||||||
|
if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) {
|
||||||
|
udp_sockets.extend(node.sockets.tvu.iter());
|
||||||
|
udp_sockets.extend(node.sockets.broadcast.iter());
|
||||||
|
udp_sockets.extend(node.sockets.retransmit_sockets.iter());
|
||||||
|
}
|
||||||
|
if ContactInfo::is_valid_address(&node.info.tvu_forwards, socket_addr_space) {
|
||||||
|
udp_sockets.extend(node.sockets.tvu_forwards.iter());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut tcp_listeners = vec![];
|
||||||
|
if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
|
||||||
|
for (purpose, bind_addr, public_addr) in &[
|
||||||
|
("RPC", rpc_addr, &node.info.rpc),
|
||||||
|
("RPC pubsub", rpc_pubsub_addr, &node.info.rpc_pubsub),
|
||||||
|
] {
|
||||||
|
if ContactInfo::is_valid_address(public_addr, socket_addr_space) {
|
||||||
|
tcp_listeners.push((
|
||||||
|
bind_addr.port(),
|
||||||
|
TcpListener::bind(bind_addr).unwrap_or_else(|err| {
|
||||||
|
error!(
|
||||||
|
"Unable to bind to tcp {:?} for {}: {}",
|
||||||
|
bind_addr, purpose, err
|
||||||
|
);
|
||||||
|
exit(1);
|
||||||
|
}),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(ip_echo) = &node.sockets.ip_echo {
|
||||||
|
let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
|
||||||
|
tcp_listeners.push((ip_echo.local_addr().unwrap().port(), ip_echo));
|
||||||
|
}
|
||||||
|
|
||||||
|
solana_net_utils::verify_reachable_ports(
|
||||||
|
&cluster_entrypoint.gossip,
|
||||||
|
tcp_listeners,
|
||||||
|
&udp_sockets,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
fn is_trusted_validator(id: &Pubkey, trusted_validators: &Option<HashSet<Pubkey>>) -> bool {
|
fn is_trusted_validator(id: &Pubkey, trusted_validators: &Option<HashSet<Pubkey>>) -> bool {
|
||||||
if let Some(trusted_validators) = trusted_validators {
|
if let Some(trusted_validators) = trusted_validators {
|
||||||
trusted_validators.contains(id)
|
trusted_validators.contains(id)
|
||||||
|
@ -49,25 +160,6 @@ fn is_trusted_validator(id: &Pubkey, trusted_validators: &Option<HashSet<Pubkey>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_trusted_snapshot_hashes(
|
|
||||||
cluster_info: &ClusterInfo,
|
|
||||||
trusted_validators: &Option<HashSet<Pubkey>>,
|
|
||||||
) -> Option<HashSet<(Slot, Hash)>> {
|
|
||||||
if let Some(trusted_validators) = trusted_validators {
|
|
||||||
let mut trusted_snapshot_hashes = HashSet::new();
|
|
||||||
for trusted_validator in trusted_validators {
|
|
||||||
cluster_info.get_snapshot_hash_for_node(trusted_validator, |snapshot_hashes| {
|
|
||||||
for snapshot_hash in snapshot_hashes {
|
|
||||||
trusted_snapshot_hashes.insert(*snapshot_hash);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Some(trusted_snapshot_hashes)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn start_gossip_node(
|
fn start_gossip_node(
|
||||||
identity_keypair: Arc<Keypair>,
|
identity_keypair: Arc<Keypair>,
|
||||||
cluster_entrypoints: &[ContactInfo],
|
cluster_entrypoints: &[ContactInfo],
|
||||||
|
@ -175,126 +267,6 @@ fn get_rpc_peers(
|
||||||
Some(rpc_peers)
|
Some(rpc_peers)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_rpc_node(
|
|
||||||
cluster_info: &ClusterInfo,
|
|
||||||
cluster_entrypoints: &[ContactInfo],
|
|
||||||
validator_config: &ValidatorConfig,
|
|
||||||
blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
|
|
||||||
snapshot_not_required: bool,
|
|
||||||
no_untrusted_rpc: bool,
|
|
||||||
snapshot_archives_dir: &Path,
|
|
||||||
) -> Option<(ContactInfo, Option<(Slot, Hash)>)> {
|
|
||||||
let mut blacklist_timeout = Instant::now();
|
|
||||||
let mut newer_cluster_snapshot_timeout = None;
|
|
||||||
let mut retry_reason = None;
|
|
||||||
loop {
|
|
||||||
sleep(Duration::from_secs(1));
|
|
||||||
info!("\n{}", cluster_info.rpc_info_trace());
|
|
||||||
|
|
||||||
let rpc_peers = get_rpc_peers(
|
|
||||||
cluster_info,
|
|
||||||
cluster_entrypoints,
|
|
||||||
validator_config,
|
|
||||||
blacklisted_rpc_nodes,
|
|
||||||
&blacklist_timeout,
|
|
||||||
&mut retry_reason,
|
|
||||||
);
|
|
||||||
if rpc_peers.is_none() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let rpc_peers = rpc_peers.unwrap();
|
|
||||||
blacklist_timeout = Instant::now();
|
|
||||||
|
|
||||||
let mut highest_snapshot_hash = get_highest_local_snapshot_hash(snapshot_archives_dir);
|
|
||||||
let eligible_rpc_peers = if snapshot_not_required {
|
|
||||||
rpc_peers
|
|
||||||
} else {
|
|
||||||
let trusted_snapshot_hashes =
|
|
||||||
get_trusted_snapshot_hashes(cluster_info, &validator_config.trusted_validators);
|
|
||||||
|
|
||||||
let mut eligible_rpc_peers = vec![];
|
|
||||||
|
|
||||||
for rpc_peer in rpc_peers.iter() {
|
|
||||||
if no_untrusted_rpc
|
|
||||||
&& !is_trusted_validator(&rpc_peer.id, &validator_config.trusted_validators)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| {
|
|
||||||
for snapshot_hash in snapshot_hashes {
|
|
||||||
if let Some(ref trusted_snapshot_hashes) = trusted_snapshot_hashes {
|
|
||||||
if !trusted_snapshot_hashes.contains(snapshot_hash) {
|
|
||||||
// Ignore all untrusted snapshot hashes
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if highest_snapshot_hash.is_none()
|
|
||||||
|| snapshot_hash.0 > highest_snapshot_hash.unwrap().0
|
|
||||||
{
|
|
||||||
// Found a higher snapshot, remove all nodes with a lower snapshot
|
|
||||||
eligible_rpc_peers.clear();
|
|
||||||
highest_snapshot_hash = Some(*snapshot_hash)
|
|
||||||
}
|
|
||||||
|
|
||||||
if Some(*snapshot_hash) == highest_snapshot_hash {
|
|
||||||
eligible_rpc_peers.push(rpc_peer.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
match highest_snapshot_hash {
|
|
||||||
None => {
|
|
||||||
assert!(eligible_rpc_peers.is_empty());
|
|
||||||
}
|
|
||||||
Some(highest_snapshot_hash) => {
|
|
||||||
if eligible_rpc_peers.is_empty() {
|
|
||||||
match newer_cluster_snapshot_timeout {
|
|
||||||
None => newer_cluster_snapshot_timeout = Some(Instant::now()),
|
|
||||||
Some(newer_cluster_snapshot_timeout) => {
|
|
||||||
if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 {
|
|
||||||
warn!("giving up newer snapshot from the cluster");
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
retry_reason = Some(format!(
|
|
||||||
"Wait for newer snapshot than local: {:?}",
|
|
||||||
highest_snapshot_hash
|
|
||||||
));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Highest available snapshot slot is {}, available from {} node{}: {:?}",
|
|
||||||
highest_snapshot_hash.0,
|
|
||||||
eligible_rpc_peers.len(),
|
|
||||||
if eligible_rpc_peers.len() > 1 {
|
|
||||||
"s"
|
|
||||||
} else {
|
|
||||||
""
|
|
||||||
},
|
|
||||||
eligible_rpc_peers
|
|
||||||
.iter()
|
|
||||||
.map(|contact_info| contact_info.id)
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
eligible_rpc_peers
|
|
||||||
};
|
|
||||||
|
|
||||||
if !eligible_rpc_peers.is_empty() {
|
|
||||||
let contact_info =
|
|
||||||
&eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())];
|
|
||||||
return Some((contact_info.clone(), highest_snapshot_hash));
|
|
||||||
} else {
|
|
||||||
retry_reason = Some("No snapshots available".to_owned());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn check_vote_account(
|
fn check_vote_account(
|
||||||
rpc_client: &RpcClient,
|
rpc_client: &RpcClient,
|
||||||
identity_pubkey: &Pubkey,
|
identity_pubkey: &Pubkey,
|
||||||
|
@ -359,87 +331,34 @@ fn check_vote_account(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn verify_reachable_ports(
|
/// Get the Slot and Hash of the local snapshot with the highest slot. Can be either a full
|
||||||
node: &Node,
|
/// snapshot or an incremental snapshot.
|
||||||
cluster_entrypoint: &ContactInfo,
|
fn get_highest_local_snapshot_hash(
|
||||||
validator_config: &ValidatorConfig,
|
snapshot_archives_dir: impl AsRef<Path>,
|
||||||
socket_addr_space: &SocketAddrSpace,
|
) -> Option<(Slot, Hash)> {
|
||||||
) -> bool {
|
if let Some(full_snapshot_info) =
|
||||||
let mut udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
|
snapshot_utils::get_highest_full_snapshot_archive_info(&snapshot_archives_dir)
|
||||||
|
{
|
||||||
if ContactInfo::is_valid_address(&node.info.serve_repair, socket_addr_space) {
|
if let Some(incremental_snapshot_info) =
|
||||||
udp_sockets.push(&node.sockets.serve_repair);
|
snapshot_utils::get_highest_incremental_snapshot_archive_info(
|
||||||
}
|
&snapshot_archives_dir,
|
||||||
if ContactInfo::is_valid_address(&node.info.tpu, socket_addr_space) {
|
full_snapshot_info.slot(),
|
||||||
udp_sockets.extend(node.sockets.tpu.iter());
|
|
||||||
}
|
|
||||||
if ContactInfo::is_valid_address(&node.info.tpu_forwards, socket_addr_space) {
|
|
||||||
udp_sockets.extend(node.sockets.tpu_forwards.iter());
|
|
||||||
}
|
|
||||||
if ContactInfo::is_valid_address(&node.info.tpu_vote, socket_addr_space) {
|
|
||||||
udp_sockets.extend(node.sockets.tpu_vote.iter());
|
|
||||||
}
|
|
||||||
if ContactInfo::is_valid_address(&node.info.tvu, socket_addr_space) {
|
|
||||||
udp_sockets.extend(node.sockets.tvu.iter());
|
|
||||||
udp_sockets.extend(node.sockets.broadcast.iter());
|
|
||||||
udp_sockets.extend(node.sockets.retransmit_sockets.iter());
|
|
||||||
}
|
|
||||||
if ContactInfo::is_valid_address(&node.info.tvu_forwards, socket_addr_space) {
|
|
||||||
udp_sockets.extend(node.sockets.tvu_forwards.iter());
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut tcp_listeners = vec![];
|
|
||||||
if let Some((rpc_addr, rpc_pubsub_addr)) = validator_config.rpc_addrs {
|
|
||||||
for (purpose, bind_addr, public_addr) in &[
|
|
||||||
("RPC", rpc_addr, &node.info.rpc),
|
|
||||||
("RPC pubsub", rpc_pubsub_addr, &node.info.rpc_pubsub),
|
|
||||||
] {
|
|
||||||
if ContactInfo::is_valid_address(public_addr, socket_addr_space) {
|
|
||||||
tcp_listeners.push((
|
|
||||||
bind_addr.port(),
|
|
||||||
TcpListener::bind(bind_addr).unwrap_or_else(|err| {
|
|
||||||
error!(
|
|
||||||
"Unable to bind to tcp {:?} for {}: {}",
|
|
||||||
bind_addr, purpose, err
|
|
||||||
);
|
|
||||||
exit(1);
|
|
||||||
}),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ip_echo) = &node.sockets.ip_echo {
|
|
||||||
let ip_echo = ip_echo.try_clone().expect("unable to clone tcp_listener");
|
|
||||||
tcp_listeners.push((ip_echo.local_addr().unwrap().port(), ip_echo));
|
|
||||||
}
|
|
||||||
|
|
||||||
solana_net_utils::verify_reachable_ports(
|
|
||||||
&cluster_entrypoint.gossip,
|
|
||||||
tcp_listeners,
|
|
||||||
&udp_sockets,
|
|
||||||
)
|
)
|
||||||
|
{
|
||||||
|
Some((
|
||||||
|
incremental_snapshot_info.slot(),
|
||||||
|
*incremental_snapshot_info.hash(),
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
Some((full_snapshot_info.slot(), *full_snapshot_info.hash()))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RpcBootstrapConfig {
|
mod without_incremental_snapshots {
|
||||||
pub no_genesis_fetch: bool,
|
use super::*;
|
||||||
pub no_snapshot_fetch: bool,
|
|
||||||
pub no_untrusted_rpc: bool,
|
|
||||||
pub max_genesis_archive_unpacked_size: u64,
|
|
||||||
pub no_check_vote_account: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for RpcBootstrapConfig {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
no_genesis_fetch: true,
|
|
||||||
no_snapshot_fetch: true,
|
|
||||||
no_untrusted_rpc: true,
|
|
||||||
max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
|
|
||||||
no_check_vote_account: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn rpc_bootstrap(
|
pub fn rpc_bootstrap(
|
||||||
|
@ -710,28 +629,142 @@ pub fn rpc_bootstrap(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the Slot and Hash of the local snapshot with the highest slot. Can be either a full
|
fn get_rpc_node(
|
||||||
/// snapshot or an incremental snapshot.
|
cluster_info: &ClusterInfo,
|
||||||
fn get_highest_local_snapshot_hash(
|
cluster_entrypoints: &[ContactInfo],
|
||||||
snapshot_archives_dir: impl AsRef<Path>,
|
validator_config: &ValidatorConfig,
|
||||||
) -> Option<(Slot, Hash)> {
|
blacklisted_rpc_nodes: &mut HashSet<Pubkey>,
|
||||||
if let Some(full_snapshot_info) =
|
snapshot_not_required: bool,
|
||||||
snapshot_utils::get_highest_full_snapshot_archive_info(&snapshot_archives_dir)
|
no_untrusted_rpc: bool,
|
||||||
{
|
snapshot_archives_dir: &Path,
|
||||||
if let Some(incremental_snapshot_info) =
|
) -> Option<(ContactInfo, Option<(Slot, Hash)>)> {
|
||||||
snapshot_utils::get_highest_incremental_snapshot_archive_info(
|
let mut blacklist_timeout = Instant::now();
|
||||||
&snapshot_archives_dir,
|
let mut newer_cluster_snapshot_timeout = None;
|
||||||
full_snapshot_info.slot(),
|
let mut retry_reason = None;
|
||||||
)
|
loop {
|
||||||
{
|
sleep(Duration::from_secs(1));
|
||||||
Some((
|
info!("\n{}", cluster_info.rpc_info_trace());
|
||||||
incremental_snapshot_info.slot(),
|
|
||||||
*incremental_snapshot_info.hash(),
|
let rpc_peers = get_rpc_peers(
|
||||||
))
|
cluster_info,
|
||||||
} else {
|
cluster_entrypoints,
|
||||||
Some((full_snapshot_info.slot(), *full_snapshot_info.hash()))
|
validator_config,
|
||||||
|
blacklisted_rpc_nodes,
|
||||||
|
&blacklist_timeout,
|
||||||
|
&mut retry_reason,
|
||||||
|
);
|
||||||
|
if rpc_peers.is_none() {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
let rpc_peers = rpc_peers.unwrap();
|
||||||
|
blacklist_timeout = Instant::now();
|
||||||
|
|
||||||
|
let mut highest_snapshot_hash = get_highest_local_snapshot_hash(snapshot_archives_dir);
|
||||||
|
let eligible_rpc_peers = if snapshot_not_required {
|
||||||
|
rpc_peers
|
||||||
|
} else {
|
||||||
|
let trusted_snapshot_hashes =
|
||||||
|
get_trusted_snapshot_hashes(cluster_info, &validator_config.trusted_validators);
|
||||||
|
|
||||||
|
let mut eligible_rpc_peers = vec![];
|
||||||
|
|
||||||
|
for rpc_peer in rpc_peers.iter() {
|
||||||
|
if no_untrusted_rpc
|
||||||
|
&& !is_trusted_validator(&rpc_peer.id, &validator_config.trusted_validators)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
cluster_info.get_snapshot_hash_for_node(&rpc_peer.id, |snapshot_hashes| {
|
||||||
|
for snapshot_hash in snapshot_hashes {
|
||||||
|
if let Some(ref trusted_snapshot_hashes) = trusted_snapshot_hashes {
|
||||||
|
if !trusted_snapshot_hashes.contains(snapshot_hash) {
|
||||||
|
// Ignore all untrusted snapshot hashes
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if highest_snapshot_hash.is_none()
|
||||||
|
|| snapshot_hash.0 > highest_snapshot_hash.unwrap().0
|
||||||
|
{
|
||||||
|
// Found a higher snapshot, remove all nodes with a lower snapshot
|
||||||
|
eligible_rpc_peers.clear();
|
||||||
|
highest_snapshot_hash = Some(*snapshot_hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
if Some(*snapshot_hash) == highest_snapshot_hash {
|
||||||
|
eligible_rpc_peers.push(rpc_peer.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
match highest_snapshot_hash {
|
||||||
|
None => {
|
||||||
|
assert!(eligible_rpc_peers.is_empty());
|
||||||
|
}
|
||||||
|
Some(highest_snapshot_hash) => {
|
||||||
|
if eligible_rpc_peers.is_empty() {
|
||||||
|
match newer_cluster_snapshot_timeout {
|
||||||
|
None => newer_cluster_snapshot_timeout = Some(Instant::now()),
|
||||||
|
Some(newer_cluster_snapshot_timeout) => {
|
||||||
|
if newer_cluster_snapshot_timeout.elapsed().as_secs() > 180 {
|
||||||
|
warn!("giving up newer snapshot from the cluster");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
retry_reason = Some(format!(
|
||||||
|
"Wait for newer snapshot than local: {:?}",
|
||||||
|
highest_snapshot_hash
|
||||||
|
));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Highest available snapshot slot is {}, available from {} node{}: {:?}",
|
||||||
|
highest_snapshot_hash.0,
|
||||||
|
eligible_rpc_peers.len(),
|
||||||
|
if eligible_rpc_peers.len() > 1 {
|
||||||
|
"s"
|
||||||
|
} else {
|
||||||
|
""
|
||||||
|
},
|
||||||
|
eligible_rpc_peers
|
||||||
|
.iter()
|
||||||
|
.map(|contact_info| contact_info.id)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
eligible_rpc_peers
|
||||||
|
};
|
||||||
|
|
||||||
|
if !eligible_rpc_peers.is_empty() {
|
||||||
|
let contact_info =
|
||||||
|
&eligible_rpc_peers[thread_rng().gen_range(0, eligible_rpc_peers.len())];
|
||||||
|
return Some((contact_info.clone(), highest_snapshot_hash));
|
||||||
|
} else {
|
||||||
|
retry_reason = Some("No snapshots available".to_owned());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_trusted_snapshot_hashes(
|
||||||
|
cluster_info: &ClusterInfo,
|
||||||
|
trusted_validators: &Option<HashSet<Pubkey>>,
|
||||||
|
) -> Option<HashSet<(Slot, Hash)>> {
|
||||||
|
if let Some(trusted_validators) = trusted_validators {
|
||||||
|
let mut trusted_snapshot_hashes = HashSet::new();
|
||||||
|
for trusted_validator in trusted_validators {
|
||||||
|
cluster_info.get_snapshot_hash_for_node(trusted_validator, |snapshot_hashes| {
|
||||||
|
for snapshot_hash in snapshot_hashes {
|
||||||
|
trusted_snapshot_hashes.insert(*snapshot_hash);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Some(trusted_snapshot_hashes)
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue