From e3a6c9234abdab66d3f8709664f3c0e7ad4fcd1c Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 30 Aug 2019 16:12:58 -0700 Subject: [PATCH] Entrypoint RPC service discovery now blocks until the entrypoint is actually found (#5756) automerge --- core/src/gossip_service.rs | 111 +++++++++++++++++++++++++++---------- gossip/src/main.rs | 3 + validator/src/main.rs | 1 + 3 files changed, 87 insertions(+), 28 deletions(-) diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 0972c905c..227a34d69 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -2,8 +2,7 @@ use crate::bank_forks::BankForks; use crate::blocktree::Blocktree; -use crate::cluster_info::ClusterInfo; -use crate::cluster_info::FULLNODE_PORT_RANGE; +use crate::cluster_info::{ClusterInfo, FULLNODE_PORT_RANGE}; use crate::contact_info::ContactInfo; use crate::service::Service; use crate::streamer; @@ -11,13 +10,11 @@ use rand::{thread_rng, Rng}; use solana_client::thin_client::{create_client, ThinClient}; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; -use std::net::SocketAddr; -use std::net::UdpSocket; +use std::net::{IpAddr, SocketAddr, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; -use std::thread::sleep; -use std::thread::{self, JoinHandle}; +use std::thread::{self, sleep, JoinHandle}; use std::time::{Duration, Instant}; pub struct GossipService { @@ -61,14 +58,15 @@ pub fn discover_cluster( entry_point: &SocketAddr, num_nodes: usize, ) -> std::io::Result<(Vec, Vec)> { - discover(entry_point, Some(num_nodes), Some(30), None, None) + discover(entry_point, Some(num_nodes), Some(30), None, None, None) } pub fn discover( entry_point: &SocketAddr, num_nodes: Option, timeout: Option, - find_node: Option, + find_node_by_pubkey: Option, + find_node_by_ipaddr: Option, gossip_addr: Option<&SocketAddr>, ) -> std::io::Result<(Vec, Vec)> { let exit = Arc::new(AtomicBool::new(false)); @@ -78,8 +76,13 @@ pub fn discover( info!("Gossip entry point: {:?}", entry_point); info!("Spy node id: {:?}", id); - let (met_criteria, secs, tvu_peers, replicators) = - spy(spy_ref.clone(), num_nodes, timeout, find_node); + let (met_criteria, secs, tvu_peers, replicators) = spy( + spy_ref.clone(), + num_nodes, + timeout, + find_node_by_pubkey, + find_node_by_ipaddr, + ); exit.store(true, Ordering::Relaxed); gossip_service.join().unwrap(); @@ -150,7 +153,8 @@ fn spy( spy_ref: Arc>, num_nodes: Option, timeout: Option, - find_node: Option, + find_node_by_pubkey: Option, + find_node_by_ipaddr: Option, ) -> (bool, u64, Vec, Vec) { let now = Instant::now(); let mut met_criteria = false; @@ -175,7 +179,17 @@ fn spy( replicators = spy_ref.read().unwrap().storage_peers(); if let Some(num) = num_nodes { if tvu_peers.len() + replicators.len() >= num { - if let Some(pubkey) = find_node { + if let Some(ipaddr) = find_node_by_ipaddr { + if tvu_peers + .iter() + .chain(replicators.iter()) + .any(|x| x.gossip.ip() == ipaddr) + { + met_criteria = true; + break; + } + } + if let Some(pubkey) = find_node_by_pubkey { if tvu_peers .iter() .chain(replicators.iter()) @@ -184,21 +198,33 @@ fn spy( met_criteria = true; break; } - } else { + } + if find_node_by_pubkey.is_none() && find_node_by_ipaddr.is_none() { met_criteria = true; break; } } } - if let Some(pubkey) = find_node { - if num_nodes.is_none() - && tvu_peers + if num_nodes.is_none() { + if let Some(pubkey) = find_node_by_pubkey { + if tvu_peers .iter() .chain(replicators.iter()) .any(|x| x.id == pubkey) - { - met_criteria = true; - break; + { + met_criteria = true; + break; + } + } + if let Some(ipaddr) = find_node_by_ipaddr { + if tvu_peers + .iter() + .chain(replicators.iter()) + .any(|x| x.gossip.ip() == ipaddr) + { + met_criteria = true; + break; + } } } if i % 20 == 0 { @@ -286,30 +312,59 @@ mod tests { let spy_ref = Arc::new(RwLock::new(cluster_info)); - let (met_criteria, secs, tvu_peers, _) = spy(spy_ref.clone(), None, Some(1), None); + let (met_criteria, secs, tvu_peers, _) = spy(spy_ref.clone(), None, Some(1), None, None); assert_eq!(met_criteria, false); assert_eq!(secs, 1); assert_eq!(tvu_peers, spy_ref.read().unwrap().tvu_peers()); // Find num_nodes - let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, None); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, None, None); assert_eq!(met_criteria, true); - let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(2), None, None); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(2), None, None, None); assert_eq!(met_criteria, true); // Find specific node by pubkey - let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, None, Some(peer0)); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, None, Some(peer0), None); assert_eq!(met_criteria, true); - let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, Some(0), Some(Pubkey::new_rand())); + let (met_criteria, _, _, _) = spy( + spy_ref.clone(), + None, + Some(0), + Some(Pubkey::new_rand()), + None, + ); assert_eq!(met_criteria, false); // Find num_nodes *and* specific node by pubkey - let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, Some(peer0)); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, Some(peer0), None); assert_eq!(met_criteria, true); - let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(3), Some(0), Some(peer0)); + let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(3), Some(0), Some(peer0), None); assert_eq!(met_criteria, false); - let (met_criteria, _, _, _) = - spy(spy_ref.clone(), Some(1), Some(0), Some(Pubkey::new_rand())); + let (met_criteria, _, _, _) = spy( + spy_ref.clone(), + Some(1), + Some(0), + Some(Pubkey::new_rand()), + None, + ); + assert_eq!(met_criteria, false); + + // Find specific node by ip address + let (met_criteria, _, _, _) = spy( + spy_ref.clone(), + None, + None, + None, + Some("127.0.0.1".parse().unwrap()), + ); + assert_eq!(met_criteria, true); + let (met_criteria, _, _, _) = spy( + spy_ref.clone(), + None, + Some(0), + None, + Some("1.1.1.1".parse().unwrap()), + ); assert_eq!(met_criteria, false); } } diff --git a/gossip/src/main.rs b/gossip/src/main.rs index cc7aee5e9..715792512 100644 --- a/gossip/src/main.rs +++ b/gossip/src/main.rs @@ -147,6 +147,7 @@ fn main() -> Result<(), Box> { num_nodes, timeout, pubkey, + None, gossip_addr.as_ref(), )?; @@ -184,6 +185,7 @@ fn main() -> Result<(), Box> { Some(1), Some(timeout), None, + Some(entrypoint_addr.ip()), gossip_addr.as_ref(), )?; @@ -211,6 +213,7 @@ fn main() -> Result<(), Box> { None, None, Some(pubkey), + None, gossip_addr.as_ref(), )?; let node = nodes.iter().find(|x| x.id == pubkey).unwrap(); diff --git a/validator/src/main.rs b/validator/src/main.rs index c34002152..5dbd015c7 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -162,6 +162,7 @@ fn initialize_ledger_path( Some(1), Some(60), None, + Some(entrypoint.gossip.ip()), Some(&gossip_addr), ) .map_err(|err| err.to_string())?;