Entrypoint RPC service discovery now blocks until the entrypoint is actually found (#5756)

automerge
This commit is contained in:
Michael Vines 2019-08-30 16:12:58 -07:00 committed by Grimes
parent 6089c8030b
commit e3a6c9234a
3 changed files with 87 additions and 28 deletions

View File

@ -2,8 +2,7 @@
use crate::bank_forks::BankForks;
use crate::blocktree::Blocktree;
use crate::cluster_info::ClusterInfo;
use crate::cluster_info::FULLNODE_PORT_RANGE;
use crate::cluster_info::{ClusterInfo, FULLNODE_PORT_RANGE};
use crate::contact_info::ContactInfo;
use crate::service::Service;
use crate::streamer;
@ -11,13 +10,11 @@ use rand::{thread_rng, Rng};
use solana_client::thin_client::{create_client, ThinClient};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::thread::{self, JoinHandle};
use std::thread::{self, sleep, JoinHandle};
use std::time::{Duration, Instant};
pub struct GossipService {
@ -61,14 +58,15 @@ pub fn discover_cluster(
entry_point: &SocketAddr,
num_nodes: usize,
) -> std::io::Result<(Vec<ContactInfo>, Vec<ContactInfo>)> {
discover(entry_point, Some(num_nodes), Some(30), None, None)
discover(entry_point, Some(num_nodes), Some(30), None, None, None)
}
pub fn discover(
entry_point: &SocketAddr,
num_nodes: Option<usize>,
timeout: Option<u64>,
find_node: Option<Pubkey>,
find_node_by_pubkey: Option<Pubkey>,
find_node_by_ipaddr: Option<IpAddr>,
gossip_addr: Option<&SocketAddr>,
) -> std::io::Result<(Vec<ContactInfo>, Vec<ContactInfo>)> {
let exit = Arc::new(AtomicBool::new(false));
@ -78,8 +76,13 @@ pub fn discover(
info!("Gossip entry point: {:?}", entry_point);
info!("Spy node id: {:?}", id);
let (met_criteria, secs, tvu_peers, replicators) =
spy(spy_ref.clone(), num_nodes, timeout, find_node);
let (met_criteria, secs, tvu_peers, replicators) = spy(
spy_ref.clone(),
num_nodes,
timeout,
find_node_by_pubkey,
find_node_by_ipaddr,
);
exit.store(true, Ordering::Relaxed);
gossip_service.join().unwrap();
@ -150,7 +153,8 @@ fn spy(
spy_ref: Arc<RwLock<ClusterInfo>>,
num_nodes: Option<usize>,
timeout: Option<u64>,
find_node: Option<Pubkey>,
find_node_by_pubkey: Option<Pubkey>,
find_node_by_ipaddr: Option<IpAddr>,
) -> (bool, u64, Vec<ContactInfo>, Vec<ContactInfo>) {
let now = Instant::now();
let mut met_criteria = false;
@ -175,7 +179,17 @@ fn spy(
replicators = spy_ref.read().unwrap().storage_peers();
if let Some(num) = num_nodes {
if tvu_peers.len() + replicators.len() >= num {
if let Some(pubkey) = find_node {
if let Some(ipaddr) = find_node_by_ipaddr {
if tvu_peers
.iter()
.chain(replicators.iter())
.any(|x| x.gossip.ip() == ipaddr)
{
met_criteria = true;
break;
}
}
if let Some(pubkey) = find_node_by_pubkey {
if tvu_peers
.iter()
.chain(replicators.iter())
@ -184,21 +198,33 @@ fn spy(
met_criteria = true;
break;
}
} else {
}
if find_node_by_pubkey.is_none() && find_node_by_ipaddr.is_none() {
met_criteria = true;
break;
}
}
}
if let Some(pubkey) = find_node {
if num_nodes.is_none()
&& tvu_peers
if num_nodes.is_none() {
if let Some(pubkey) = find_node_by_pubkey {
if tvu_peers
.iter()
.chain(replicators.iter())
.any(|x| x.id == pubkey)
{
met_criteria = true;
break;
{
met_criteria = true;
break;
}
}
if let Some(ipaddr) = find_node_by_ipaddr {
if tvu_peers
.iter()
.chain(replicators.iter())
.any(|x| x.gossip.ip() == ipaddr)
{
met_criteria = true;
break;
}
}
}
if i % 20 == 0 {
@ -286,30 +312,59 @@ mod tests {
let spy_ref = Arc::new(RwLock::new(cluster_info));
let (met_criteria, secs, tvu_peers, _) = spy(spy_ref.clone(), None, Some(1), None);
let (met_criteria, secs, tvu_peers, _) = spy(spy_ref.clone(), None, Some(1), None, None);
assert_eq!(met_criteria, false);
assert_eq!(secs, 1);
assert_eq!(tvu_peers, spy_ref.read().unwrap().tvu_peers());
// Find num_nodes
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, None);
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, None, None);
assert_eq!(met_criteria, true);
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(2), None, None);
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(2), None, None, None);
assert_eq!(met_criteria, true);
// Find specific node by pubkey
let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, None, Some(peer0));
let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, None, Some(peer0), None);
assert_eq!(met_criteria, true);
let (met_criteria, _, _, _) = spy(spy_ref.clone(), None, Some(0), Some(Pubkey::new_rand()));
let (met_criteria, _, _, _) = spy(
spy_ref.clone(),
None,
Some(0),
Some(Pubkey::new_rand()),
None,
);
assert_eq!(met_criteria, false);
// Find num_nodes *and* specific node by pubkey
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, Some(peer0));
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(1), None, Some(peer0), None);
assert_eq!(met_criteria, true);
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(3), Some(0), Some(peer0));
let (met_criteria, _, _, _) = spy(spy_ref.clone(), Some(3), Some(0), Some(peer0), None);
assert_eq!(met_criteria, false);
let (met_criteria, _, _, _) =
spy(spy_ref.clone(), Some(1), Some(0), Some(Pubkey::new_rand()));
let (met_criteria, _, _, _) = spy(
spy_ref.clone(),
Some(1),
Some(0),
Some(Pubkey::new_rand()),
None,
);
assert_eq!(met_criteria, false);
// Find specific node by ip address
let (met_criteria, _, _, _) = spy(
spy_ref.clone(),
None,
None,
None,
Some("127.0.0.1".parse().unwrap()),
);
assert_eq!(met_criteria, true);
let (met_criteria, _, _, _) = spy(
spy_ref.clone(),
None,
Some(0),
None,
Some("1.1.1.1".parse().unwrap()),
);
assert_eq!(met_criteria, false);
}
}

View File

@ -147,6 +147,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
num_nodes,
timeout,
pubkey,
None,
gossip_addr.as_ref(),
)?;
@ -184,6 +185,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
Some(1),
Some(timeout),
None,
Some(entrypoint_addr.ip()),
gossip_addr.as_ref(),
)?;
@ -211,6 +213,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
None,
None,
Some(pubkey),
None,
gossip_addr.as_ref(),
)?;
let node = nodes.iter().find(|x| x.id == pubkey).unwrap();

View File

@ -162,6 +162,7 @@ fn initialize_ledger_path(
Some(1),
Some(60),
None,
Some(entrypoint.gossip.ip()),
Some(&gossip_addr),
)
.map_err(|err| err.to_string())?;