From 397ea05aa7f911d5a6949cb574b4fb20be4db8be Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Thu, 24 Oct 2019 15:35:33 -0700 Subject: [PATCH] spy nodes are now gossip entrypoints (#6532) --- core/src/cluster_info.rs | 20 +++++++++------- core/src/gossip_service.rs | 12 ++++++---- gossip/src/main.rs | 47 +++++++++++++++++--------------------- 3 files changed, 40 insertions(+), 39 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 4831641c0..bbeffaeeb 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -1491,8 +1491,12 @@ impl ClusterInfo { } /// An alternative to Spy Node that has a valid gossip address and fully participate in Gossip. - pub fn gossip_node(id: &Pubkey, gossip_addr: &SocketAddr) -> (ContactInfo, UdpSocket) { - let (port, (gossip_socket, _)) = Node::get_gossip_port(gossip_addr, VALIDATOR_PORT_RANGE); + pub fn gossip_node( + id: &Pubkey, + gossip_addr: &SocketAddr, + ) -> (ContactInfo, UdpSocket, Option) { + let (port, (gossip_socket, ip_echo)) = + Node::get_gossip_port(gossip_addr, VALIDATOR_PORT_RANGE); let daddr = socketaddr_any!(); let node = ContactInfo::new( @@ -1507,11 +1511,11 @@ impl ClusterInfo { daddr, timestamp(), ); - (node, gossip_socket) + (node, gossip_socket, Some(ip_echo)) } /// A Node with invalid ports to spy on gossip via pull requests - pub fn spy_node(id: &Pubkey) -> (ContactInfo, UdpSocket) { + pub fn spy_node(id: &Pubkey) -> (ContactInfo, UdpSocket, Option) { let (_, gossip_socket) = bind_in_range(VALIDATOR_PORT_RANGE).unwrap(); let daddr = socketaddr_any!(); @@ -1527,7 +1531,7 @@ impl ClusterInfo { daddr, timestamp(), ); - (node, gossip_socket) + (node, gossip_socket, None) } } @@ -1800,9 +1804,9 @@ mod tests { #[test] fn test_gossip_node() { //check that a gossip nodes always show up as spies - let (node, _) = ClusterInfo::spy_node(&Pubkey::new_rand()); + let (node, _, _) = ClusterInfo::spy_node(&Pubkey::new_rand()); assert!(ClusterInfo::is_spy_node(&node)); - let (node, _) = + let (node, _, _) = ClusterInfo::gossip_node(&Pubkey::new_rand(), &"1.1.1.1:1111".parse().unwrap()); assert!(ClusterInfo::is_spy_node(&node)); } @@ -1811,7 +1815,7 @@ mod tests { fn test_cluster_spy_gossip() { //check that gossip doesn't try to push to invalid addresses let node = Node::new_localhost(); - let (spy, _) = ClusterInfo::spy_node(&Pubkey::new_rand()); + let (spy, _, _) = ClusterInfo::spy_node(&Pubkey::new_rand()); let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair( node.info, ))); diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 1519115b2..609ccc3db 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -10,7 +10,7 @@ use solana_ledger::bank_forks::BankForks; use solana_ledger::blocktree::Blocktree; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; -use std::net::{IpAddr, SocketAddr, UdpSocket}; +use std::net::{IpAddr, SocketAddr, TcpListener, UdpSocket}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; @@ -70,12 +70,14 @@ pub fn discover( gossip_addr: Option<&SocketAddr>, ) -> std::io::Result<(Vec, Vec)> { let exit = Arc::new(AtomicBool::new(false)); - let (gossip_service, spy_ref) = make_gossip_node(entry_point, &exit, gossip_addr); + let (gossip_service, ip_echo, spy_ref) = make_gossip_node(entry_point, &exit, gossip_addr); let id = spy_ref.read().unwrap().keypair.pubkey(); info!("Gossip entry point: {:?}", entry_point); info!("Spy node id: {:?}", id); + let _ip_echo_server = ip_echo.map(solana_netutil::ip_echo_server); + let (met_criteria, secs, tvu_peers, archivers) = spy( spy_ref.clone(), num_nodes, @@ -247,9 +249,9 @@ fn make_gossip_node( entry_point: &SocketAddr, exit: &Arc, gossip_addr: Option<&SocketAddr>, -) -> (GossipService, Arc>) { +) -> (GossipService, Option, Arc>) { let keypair = Arc::new(Keypair::new()); - let (node, gossip_socket) = if let Some(gossip_addr) = gossip_addr { + let (node, gossip_socket, ip_echo) = if let Some(gossip_addr) = gossip_addr { ClusterInfo::gossip_node(&keypair.pubkey(), gossip_addr) } else { ClusterInfo::spy_node(&keypair.pubkey()) @@ -259,7 +261,7 @@ fn make_gossip_node( let cluster_info = Arc::new(RwLock::new(cluster_info)); let gossip_service = GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit); - (gossip_service, cluster_info) + (gossip_service, ip_echo, cluster_info) } impl Service for GossipService { diff --git a/gossip/src/main.rs b/gossip/src/main.rs index 0099b44f3..8eb8f620d 100644 --- a/gossip/src/main.rs +++ b/gossip/src/main.rs @@ -1,15 +1,11 @@ //! A command-line executable for monitoring a cluster's gossip plane. -#[macro_use] -extern crate solana_core; - use clap::{ crate_description, crate_name, crate_version, value_t_or_exit, App, AppSettings, Arg, SubCommand, }; use solana_client::rpc_client::RpcClient; -use solana_core::contact_info::ContactInfo; -use solana_core::gossip_service::discover; +use solana_core::{contact_info::ContactInfo, gossip_service::discover}; use solana_sdk::pubkey::Pubkey; use std::error; use std::net::SocketAddr; @@ -65,6 +61,14 @@ fn main() -> Result<(), Box> { SubCommand::with_name("spy") .about("Monitor the gossip entrypoint") .setting(AppSettings::DisableVersion) + .arg( + clap::Arg::with_name("gossip_port") + .long("gossip-port") + .value_name("PORT") + .takes_value(true) + .default_value("0") + .help("Gossip port number for the node"), + ) .arg( Arg::with_name("num_nodes") .short("N") @@ -121,17 +125,6 @@ fn main() -> Result<(), Box> { }); } - let gossip_addr = { - let mut addr = socketaddr_any!(); - addr.set_ip( - solana_netutil::get_public_ip_addr(&entrypoint_addr).unwrap_or_else(|err| { - eprintln!("failed to contact {}: {}", entrypoint_addr, err); - exit(1); - }), - ); - Some(addr) - }; - match matches.subcommand() { ("spy", Some(matches)) => { let num_nodes_exactly = matches @@ -148,13 +141,21 @@ fn main() -> Result<(), Box> { .value_of("node_pubkey") .map(|pubkey_str| pubkey_str.parse::().unwrap()); + let gossip_addr = SocketAddr::new( + solana_netutil::get_public_ip_addr(&entrypoint_addr).unwrap_or_else(|err| { + eprintln!("failed to contact {}: {}", entrypoint_addr, err); + exit(1); + }), + value_t_or_exit!(matches, "gossip_port", u16), + ); + let (nodes, _archivers) = discover( &entrypoint_addr, num_nodes, timeout, pubkey, None, - gossip_addr.as_ref(), + Some(&gossip_addr), )?; if timeout.is_some() { @@ -197,7 +198,7 @@ fn main() -> Result<(), Box> { Some(timeout), None, Some(entrypoint_addr.ip()), - gossip_addr.as_ref(), + None, )?; let rpc_addrs: Vec<_> = nodes @@ -224,14 +225,8 @@ fn main() -> Result<(), Box> { .unwrap() .parse::() .unwrap(); - let (nodes, _archivers) = discover( - &entrypoint_addr, - None, - None, - Some(pubkey), - None, - gossip_addr.as_ref(), - )?; + let (nodes, _archivers) = + discover(&entrypoint_addr, None, None, Some(pubkey), None, None)?; let node = nodes.iter().find(|x| x.id == pubkey).unwrap(); if !ContactInfo::is_valid_address(&node.rpc) {