spy nodes are now gossip entrypoints (#6532)
This commit is contained in:
parent
dadcb632d8
commit
397ea05aa7
|
@ -1491,8 +1491,12 @@ impl ClusterInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An alternative to Spy Node that has a valid gossip address and fully participate in Gossip.
|
/// An alternative to Spy Node that has a valid gossip address and fully participate in Gossip.
|
||||||
pub fn gossip_node(id: &Pubkey, gossip_addr: &SocketAddr) -> (ContactInfo, UdpSocket) {
|
pub fn gossip_node(
|
||||||
let (port, (gossip_socket, _)) = Node::get_gossip_port(gossip_addr, VALIDATOR_PORT_RANGE);
|
id: &Pubkey,
|
||||||
|
gossip_addr: &SocketAddr,
|
||||||
|
) -> (ContactInfo, UdpSocket, Option<TcpListener>) {
|
||||||
|
let (port, (gossip_socket, ip_echo)) =
|
||||||
|
Node::get_gossip_port(gossip_addr, VALIDATOR_PORT_RANGE);
|
||||||
let daddr = socketaddr_any!();
|
let daddr = socketaddr_any!();
|
||||||
|
|
||||||
let node = ContactInfo::new(
|
let node = ContactInfo::new(
|
||||||
|
@ -1507,11 +1511,11 @@ impl ClusterInfo {
|
||||||
daddr,
|
daddr,
|
||||||
timestamp(),
|
timestamp(),
|
||||||
);
|
);
|
||||||
(node, gossip_socket)
|
(node, gossip_socket, Some(ip_echo))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A Node with invalid ports to spy on gossip via pull requests
|
/// A Node with invalid ports to spy on gossip via pull requests
|
||||||
pub fn spy_node(id: &Pubkey) -> (ContactInfo, UdpSocket) {
|
pub fn spy_node(id: &Pubkey) -> (ContactInfo, UdpSocket, Option<TcpListener>) {
|
||||||
let (_, gossip_socket) = bind_in_range(VALIDATOR_PORT_RANGE).unwrap();
|
let (_, gossip_socket) = bind_in_range(VALIDATOR_PORT_RANGE).unwrap();
|
||||||
let daddr = socketaddr_any!();
|
let daddr = socketaddr_any!();
|
||||||
|
|
||||||
|
@ -1527,7 +1531,7 @@ impl ClusterInfo {
|
||||||
daddr,
|
daddr,
|
||||||
timestamp(),
|
timestamp(),
|
||||||
);
|
);
|
||||||
(node, gossip_socket)
|
(node, gossip_socket, None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1800,9 +1804,9 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn test_gossip_node() {
|
fn test_gossip_node() {
|
||||||
//check that a gossip nodes always show up as spies
|
//check that a gossip nodes always show up as spies
|
||||||
let (node, _) = ClusterInfo::spy_node(&Pubkey::new_rand());
|
let (node, _, _) = ClusterInfo::spy_node(&Pubkey::new_rand());
|
||||||
assert!(ClusterInfo::is_spy_node(&node));
|
assert!(ClusterInfo::is_spy_node(&node));
|
||||||
let (node, _) =
|
let (node, _, _) =
|
||||||
ClusterInfo::gossip_node(&Pubkey::new_rand(), &"1.1.1.1:1111".parse().unwrap());
|
ClusterInfo::gossip_node(&Pubkey::new_rand(), &"1.1.1.1:1111".parse().unwrap());
|
||||||
assert!(ClusterInfo::is_spy_node(&node));
|
assert!(ClusterInfo::is_spy_node(&node));
|
||||||
}
|
}
|
||||||
|
@ -1811,7 +1815,7 @@ mod tests {
|
||||||
fn test_cluster_spy_gossip() {
|
fn test_cluster_spy_gossip() {
|
||||||
//check that gossip doesn't try to push to invalid addresses
|
//check that gossip doesn't try to push to invalid addresses
|
||||||
let node = Node::new_localhost();
|
let node = Node::new_localhost();
|
||||||
let (spy, _) = ClusterInfo::spy_node(&Pubkey::new_rand());
|
let (spy, _, _) = ClusterInfo::spy_node(&Pubkey::new_rand());
|
||||||
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
let cluster_info = Arc::new(RwLock::new(ClusterInfo::new_with_invalid_keypair(
|
||||||
node.info,
|
node.info,
|
||||||
)));
|
)));
|
||||||
|
|
|
@ -10,7 +10,7 @@ use solana_ledger::bank_forks::BankForks;
|
||||||
use solana_ledger::blocktree::Blocktree;
|
use solana_ledger::blocktree::Blocktree;
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
use std::net::{IpAddr, SocketAddr, UdpSocket};
|
use std::net::{IpAddr, SocketAddr, TcpListener, UdpSocket};
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
@ -70,12 +70,14 @@ pub fn discover(
|
||||||
gossip_addr: Option<&SocketAddr>,
|
gossip_addr: Option<&SocketAddr>,
|
||||||
) -> std::io::Result<(Vec<ContactInfo>, Vec<ContactInfo>)> {
|
) -> std::io::Result<(Vec<ContactInfo>, Vec<ContactInfo>)> {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let (gossip_service, spy_ref) = make_gossip_node(entry_point, &exit, gossip_addr);
|
let (gossip_service, ip_echo, spy_ref) = make_gossip_node(entry_point, &exit, gossip_addr);
|
||||||
|
|
||||||
let id = spy_ref.read().unwrap().keypair.pubkey();
|
let id = spy_ref.read().unwrap().keypair.pubkey();
|
||||||
info!("Gossip entry point: {:?}", entry_point);
|
info!("Gossip entry point: {:?}", entry_point);
|
||||||
info!("Spy node id: {:?}", id);
|
info!("Spy node id: {:?}", id);
|
||||||
|
|
||||||
|
let _ip_echo_server = ip_echo.map(solana_netutil::ip_echo_server);
|
||||||
|
|
||||||
let (met_criteria, secs, tvu_peers, archivers) = spy(
|
let (met_criteria, secs, tvu_peers, archivers) = spy(
|
||||||
spy_ref.clone(),
|
spy_ref.clone(),
|
||||||
num_nodes,
|
num_nodes,
|
||||||
|
@ -247,9 +249,9 @@ fn make_gossip_node(
|
||||||
entry_point: &SocketAddr,
|
entry_point: &SocketAddr,
|
||||||
exit: &Arc<AtomicBool>,
|
exit: &Arc<AtomicBool>,
|
||||||
gossip_addr: Option<&SocketAddr>,
|
gossip_addr: Option<&SocketAddr>,
|
||||||
) -> (GossipService, Arc<RwLock<ClusterInfo>>) {
|
) -> (GossipService, Option<TcpListener>, Arc<RwLock<ClusterInfo>>) {
|
||||||
let keypair = Arc::new(Keypair::new());
|
let keypair = Arc::new(Keypair::new());
|
||||||
let (node, gossip_socket) = if let Some(gossip_addr) = gossip_addr {
|
let (node, gossip_socket, ip_echo) = if let Some(gossip_addr) = gossip_addr {
|
||||||
ClusterInfo::gossip_node(&keypair.pubkey(), gossip_addr)
|
ClusterInfo::gossip_node(&keypair.pubkey(), gossip_addr)
|
||||||
} else {
|
} else {
|
||||||
ClusterInfo::spy_node(&keypair.pubkey())
|
ClusterInfo::spy_node(&keypair.pubkey())
|
||||||
|
@ -259,7 +261,7 @@ fn make_gossip_node(
|
||||||
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
let cluster_info = Arc::new(RwLock::new(cluster_info));
|
||||||
let gossip_service =
|
let gossip_service =
|
||||||
GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit);
|
GossipService::new(&cluster_info.clone(), None, None, gossip_socket, &exit);
|
||||||
(gossip_service, cluster_info)
|
(gossip_service, ip_echo, cluster_info)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service for GossipService {
|
impl Service for GossipService {
|
||||||
|
|
|
@ -1,15 +1,11 @@
|
||||||
//! A command-line executable for monitoring a cluster's gossip plane.
|
//! A command-line executable for monitoring a cluster's gossip plane.
|
||||||
|
|
||||||
#[macro_use]
|
|
||||||
extern crate solana_core;
|
|
||||||
|
|
||||||
use clap::{
|
use clap::{
|
||||||
crate_description, crate_name, crate_version, value_t_or_exit, App, AppSettings, Arg,
|
crate_description, crate_name, crate_version, value_t_or_exit, App, AppSettings, Arg,
|
||||||
SubCommand,
|
SubCommand,
|
||||||
};
|
};
|
||||||
use solana_client::rpc_client::RpcClient;
|
use solana_client::rpc_client::RpcClient;
|
||||||
use solana_core::contact_info::ContactInfo;
|
use solana_core::{contact_info::ContactInfo, gossip_service::discover};
|
||||||
use solana_core::gossip_service::discover;
|
|
||||||
use solana_sdk::pubkey::Pubkey;
|
use solana_sdk::pubkey::Pubkey;
|
||||||
use std::error;
|
use std::error;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
@ -65,6 +61,14 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||||
SubCommand::with_name("spy")
|
SubCommand::with_name("spy")
|
||||||
.about("Monitor the gossip entrypoint")
|
.about("Monitor the gossip entrypoint")
|
||||||
.setting(AppSettings::DisableVersion)
|
.setting(AppSettings::DisableVersion)
|
||||||
|
.arg(
|
||||||
|
clap::Arg::with_name("gossip_port")
|
||||||
|
.long("gossip-port")
|
||||||
|
.value_name("PORT")
|
||||||
|
.takes_value(true)
|
||||||
|
.default_value("0")
|
||||||
|
.help("Gossip port number for the node"),
|
||||||
|
)
|
||||||
.arg(
|
.arg(
|
||||||
Arg::with_name("num_nodes")
|
Arg::with_name("num_nodes")
|
||||||
.short("N")
|
.short("N")
|
||||||
|
@ -121,17 +125,6 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let gossip_addr = {
|
|
||||||
let mut addr = socketaddr_any!();
|
|
||||||
addr.set_ip(
|
|
||||||
solana_netutil::get_public_ip_addr(&entrypoint_addr).unwrap_or_else(|err| {
|
|
||||||
eprintln!("failed to contact {}: {}", entrypoint_addr, err);
|
|
||||||
exit(1);
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
Some(addr)
|
|
||||||
};
|
|
||||||
|
|
||||||
match matches.subcommand() {
|
match matches.subcommand() {
|
||||||
("spy", Some(matches)) => {
|
("spy", Some(matches)) => {
|
||||||
let num_nodes_exactly = matches
|
let num_nodes_exactly = matches
|
||||||
|
@ -148,13 +141,21 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||||
.value_of("node_pubkey")
|
.value_of("node_pubkey")
|
||||||
.map(|pubkey_str| pubkey_str.parse::<Pubkey>().unwrap());
|
.map(|pubkey_str| pubkey_str.parse::<Pubkey>().unwrap());
|
||||||
|
|
||||||
|
let gossip_addr = SocketAddr::new(
|
||||||
|
solana_netutil::get_public_ip_addr(&entrypoint_addr).unwrap_or_else(|err| {
|
||||||
|
eprintln!("failed to contact {}: {}", entrypoint_addr, err);
|
||||||
|
exit(1);
|
||||||
|
}),
|
||||||
|
value_t_or_exit!(matches, "gossip_port", u16),
|
||||||
|
);
|
||||||
|
|
||||||
let (nodes, _archivers) = discover(
|
let (nodes, _archivers) = discover(
|
||||||
&entrypoint_addr,
|
&entrypoint_addr,
|
||||||
num_nodes,
|
num_nodes,
|
||||||
timeout,
|
timeout,
|
||||||
pubkey,
|
pubkey,
|
||||||
None,
|
None,
|
||||||
gossip_addr.as_ref(),
|
Some(&gossip_addr),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
if timeout.is_some() {
|
if timeout.is_some() {
|
||||||
|
@ -197,7 +198,7 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||||
Some(timeout),
|
Some(timeout),
|
||||||
None,
|
None,
|
||||||
Some(entrypoint_addr.ip()),
|
Some(entrypoint_addr.ip()),
|
||||||
gossip_addr.as_ref(),
|
None,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let rpc_addrs: Vec<_> = nodes
|
let rpc_addrs: Vec<_> = nodes
|
||||||
|
@ -224,14 +225,8 @@ fn main() -> Result<(), Box<dyn error::Error>> {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.parse::<Pubkey>()
|
.parse::<Pubkey>()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let (nodes, _archivers) = discover(
|
let (nodes, _archivers) =
|
||||||
&entrypoint_addr,
|
discover(&entrypoint_addr, None, None, Some(pubkey), None, None)?;
|
||||||
None,
|
|
||||||
None,
|
|
||||||
Some(pubkey),
|
|
||||||
None,
|
|
||||||
gossip_addr.as_ref(),
|
|
||||||
)?;
|
|
||||||
let node = nodes.iter().find(|x| x.id == pubkey).unwrap();
|
let node = nodes.iter().find(|x| x.id == pubkey).unwrap();
|
||||||
|
|
||||||
if !ContactInfo::is_valid_address(&node.rpc) {
|
if !ContactInfo::is_valid_address(&node.rpc) {
|
||||||
|
|
Loading…
Reference in New Issue