If a bad RPC node is selected try another one instead of aborting

This commit is contained in:
Michael Vines 2020-01-17 21:32:18 -07:00
parent e28508ad56
commit 30d40e9a32
1 changed files with 27 additions and 20 deletions

View File

@ -197,7 +197,7 @@ fn get_rpc_addr(
node: &Node, node: &Node,
identity_keypair: &Arc<Keypair>, identity_keypair: &Arc<Keypair>,
entrypoint_gossip: &SocketAddr, entrypoint_gossip: &SocketAddr,
) -> (Pubkey, SocketAddr) { ) -> (RpcClient, SocketAddr) {
let mut cluster_info = ClusterInfo::new(node.info.clone(), identity_keypair.clone()); let mut cluster_info = ClusterInfo::new(node.info.clone(), identity_keypair.clone());
cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip)); cluster_info.set_entrypoint(ContactInfo::new_gossip_entry_point(entrypoint_gossip));
let cluster_info = Arc::new(RwLock::new(cluster_info)); let cluster_info = Arc::new(RwLock::new(cluster_info));
@ -211,7 +211,7 @@ fn get_rpc_addr(
&exit, &exit,
); );
let (id, rpc_addr) = loop { let (rpc_client, rpc_addr) = loop {
info!( info!(
"Searching for RPC service...\n{}", "Searching for RPC service...\n{}",
cluster_info.read().unwrap().contact_info_trace() cluster_info.read().unwrap().contact_info_trace()
@ -227,16 +227,30 @@ fn get_rpc_addr(
.any(|contact_info| contact_info.gossip == *entrypoint_gossip); .any(|contact_info| contact_info.gossip == *entrypoint_gossip);
if found_entrypoint & !rpc_peers.is_empty() { if found_entrypoint & !rpc_peers.is_empty() {
// Prefer the entrypoint's RPC service if present, otherwise pick a node at random let (id, rpc_addr) = {
if let Some(contact_info) = rpc_peers // Prefer the entrypoint's RPC service if present, otherwise pick a node at random
.iter() if let Some(contact_info) = rpc_peers
.find(|contact_info| contact_info.gossip == *entrypoint_gossip) .iter()
{ .find(|contact_info| contact_info.gossip == *entrypoint_gossip)
break (contact_info.id, contact_info.rpc); {
} (contact_info.id, contact_info.rpc)
} else {
let i = thread_rng().gen_range(0, rpc_peers.len());
(rpc_peers[i].id, rpc_peers[i].rpc)
}
};
let i = thread_rng().gen_range(0, rpc_peers.len()); info!("Contacting RPC port of node {}: {:?}", id, rpc_addr);
break (rpc_peers[i].id, rpc_peers[i].rpc); let rpc_client = RpcClient::new_socket(rpc_addr);
match rpc_client.get_version() {
Ok(rpc_version) => {
info!("RPC node version: {}", rpc_version.solana_core);
break (rpc_client, rpc_addr);
}
Err(err) => {
warn!("Failed to get RPC version: {}", err);
}
}
} }
sleep(Duration::from_secs(1)); sleep(Duration::from_secs(1));
@ -245,7 +259,7 @@ fn get_rpc_addr(
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
gossip_service.join().unwrap(); gossip_service.join().unwrap();
(id, rpc_addr) (rpc_client, rpc_addr)
} }
fn check_vote_account( fn check_vote_account(
@ -807,15 +821,8 @@ pub fn main() {
); );
if !no_genesis_fetch { if !no_genesis_fetch {
let (rpc_node_id, rpc_addr) = let (rpc_client, rpc_addr) =
get_rpc_addr(&node, &identity_keypair, &cluster_entrypoint.gossip); get_rpc_addr(&node, &identity_keypair, &cluster_entrypoint.gossip);
info!("Using RPC from node {}: {:?}", rpc_node_id, rpc_addr);
let rpc_client = RpcClient::new_socket(rpc_addr);
let rpc_version = rpc_client.get_version().unwrap_or_else(|err| {
error!("Failed to get version: {}", err);
exit(1);
});
info!("RPC node version: {}", rpc_version.solana_core);
download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| { download_ledger(&rpc_addr, &ledger_path, no_snapshot_fetch).unwrap_or_else(|err| {
error!("Failed to initialize ledger: {}", err); error!("Failed to initialize ledger: {}", err);