gossip_service::discover() now reports the leader

This commit is contained in:
Michael Vines 2019-03-07 10:13:54 -08:00
parent 54ff9b3ac2
commit 5edbd6a7fb
4 changed files with 48 additions and 43 deletions

View File

@ -3,9 +3,9 @@ mod cli;
use crate::bench::*; use crate::bench::*;
use solana::client::mk_client; use solana::client::mk_client;
use solana::contact_info::ContactInfo;
use solana::gen_keys::GenKeys; use solana::gen_keys::GenKeys;
use solana::gossip_service::discover; use solana::gossip_service::discover;
use solana::thin_client::poll_gossip_for_leader;
use solana_metrics; use solana_metrics;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::VecDeque; use std::collections::VecDeque;
@ -39,17 +39,9 @@ fn main() {
converge_only, converge_only,
} = cfg; } = cfg;
println!("Looking for leader at {:?}", network); let cluster_entrypoint = ContactInfo::new_entry_point(&network);
let leader = poll_gossip_for_leader(network, Duration::from_secs(30)).unwrap_or_else(|err| { let (leader, nodes) = discover(&cluster_entrypoint, num_nodes).unwrap_or_else(|err| {
eprintln!( eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err);
"Error: unable to find leader on network after 30 seconds: {:?}",
err
);
exit(1);
});
let nodes = discover(&leader, num_nodes).unwrap_or_else(|err| {
eprintln!("{:?}", err);
exit(1); exit(1);
}); });
if nodes.len() < num_nodes { if nodes.len() < num_nodes {
@ -67,11 +59,16 @@ fn main() {
exit(1); exit(1);
} }
if leader.is_none() {
eprintln!("Error: No leader");
exit(1);
}
if converge_only { if converge_only {
return; return;
} }
println!("leader RPC is at {} {}", leader.rpc, leader.id); let leader = leader.unwrap();
let mut client = mk_client(&leader); let mut client = mk_client(&leader);
let mut barrier_client = mk_client(&leader); let mut barrier_client = mk_client(&leader);

View File

@ -222,21 +222,28 @@ impl ClusterInfo {
} }
pub fn node_info_trace(&self) -> String { pub fn node_info_trace(&self) -> String {
let leader_id = self.leader_id(); let leader_id = self.leader_id();
let gossip_top_leader = self.get_gossip_top_leader();
let nodes: Vec<_> = self let nodes: Vec<_> = self
.rpc_peers() .rpc_peers()
.into_iter() .into_iter()
.map(|node| { .map(|node| {
let mut annotation = String::new();
if let Some(top_leader) = gossip_top_leader {
if node.id == top_leader.id {
annotation.push_str(" [gossip top leader]");
}
}
if node.id == leader_id {
annotation.push_str(" [leader]");
}
format!( format!(
" gossip: {:20} | {}{}\n \ "- gossip: {:20} | {}{}\n \
tpu: {:20} |\n \ tpu: {:20} |\n \
rpc: {:20} |\n", rpc: {:20} |\n",
node.gossip.to_string(), node.gossip.to_string(),
node.id, node.id,
if node.id == leader_id { annotation,
" <==== leader"
} else {
""
},
node.tpu.to_string(), node.tpu.to_string(),
node.rpc.to_string() node.rpc.to_string()
) )
@ -244,9 +251,9 @@ impl ClusterInfo {
.collect(); .collect();
format!( format!(
" NodeInfo.contact_info | Node identifier\n\ " Node contact info | Node identifier\n\
---------------------------+------------------\n\ -------------------------------+------------------\n\
{}\n \ {}\
Nodes: {}", Nodes: {}",
nodes.join(""), nodes.join(""),
nodes.len() nodes.len()

View File

@ -19,7 +19,7 @@ pub fn spend_and_verify_all_nodes(
funding_keypair: &Keypair, funding_keypair: &Keypair,
nodes: usize, nodes: usize,
) { ) {
let cluster_nodes = discover(&entry_point_info, nodes).unwrap(); let (_leader_id, cluster_nodes) = discover(&entry_point_info, nodes).unwrap();
assert!(cluster_nodes.len() >= nodes); assert!(cluster_nodes.len() >= nodes);
for ingress_node in &cluster_nodes { for ingress_node in &cluster_nodes {
let random_keypair = Keypair::new(); let random_keypair = Keypair::new();
@ -46,7 +46,7 @@ pub fn spend_and_verify_all_nodes(
} }
pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) { pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) {
let cluster_nodes = discover(&entry_point_info, nodes).unwrap(); let (_leader_id, cluster_nodes) = discover(&entry_point_info, nodes).unwrap();
assert!(cluster_nodes.len() >= nodes); assert!(cluster_nodes.len() >= nodes);
for node in &cluster_nodes { for node in &cluster_nodes {
let mut client = mk_client(&node); let mut client = mk_client(&node);
@ -65,7 +65,7 @@ pub fn kill_entry_and_spend_and_verify_rest(
nodes: usize, nodes: usize,
) { ) {
solana_logger::setup(); solana_logger::setup();
let cluster_nodes = discover(&entry_point_info, nodes).unwrap(); let (_leader_id, cluster_nodes) = discover(&entry_point_info, nodes).unwrap();
assert!(cluster_nodes.len() >= nodes); assert!(cluster_nodes.len() >= nodes);
let mut client = mk_client(&entry_point_info); let mut client = mk_client(&entry_point_info);
info!("sleeping for an epoch"); info!("sleeping for an epoch");

View File

@ -80,9 +80,7 @@ pub fn make_listening_node(
pub fn discover( pub fn discover(
entry_point_info: &NodeInfo, entry_point_info: &NodeInfo,
num_nodes: usize, num_nodes: usize,
) -> Result<Vec<NodeInfo>, &'static str> { ) -> Result<(Option<NodeInfo>, Vec<NodeInfo>), &'static str> {
info!("Wait for convergence with {} nodes", num_nodes);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let (gossip_service, spy_ref) = make_spy_node(entry_point_info, &exit); let (gossip_service, spy_ref) = make_spy_node(entry_point_info, &exit);
let id = spy_ref.read().unwrap().keypair.pubkey(); let id = spy_ref.read().unwrap().keypair.pubkey();
@ -95,25 +93,24 @@ pub fn discover(
// Wait for the cluster to converge // Wait for the cluster to converge
let now = Instant::now(); let now = Instant::now();
let mut i = 0; let mut i = 0;
while now.elapsed() < Duration::from_secs(15) { while now.elapsed() < Duration::from_secs(30) {
let rpc_peers = spy_ref.read().unwrap().rpc_peers(); let rpc_peers = spy_ref.read().unwrap().rpc_peers();
if i % 20 == 0 {
info!(
"discover: spy_node {} found {}/{} nodes",
id,
rpc_peers.len(),
num_nodes,
);
}
if rpc_peers.len() >= num_nodes { if rpc_peers.len() >= num_nodes {
info!(
"discover success in {}s...\n{}",
now.elapsed().as_secs(),
spy_ref.read().unwrap().node_info_trace()
);
let leader = spy_ref.read().unwrap().get_gossip_top_leader().cloned();
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
gossip_service.join().unwrap(); gossip_service.join().unwrap();
return Ok(rpc_peers); return Ok((leader, rpc_peers));
} }
if i % 20 == 0 { if i % 20 == 0 {
debug!( info!(
"discover: expecting an additional {} nodes", "discovering...\n{}",
num_nodes - rpc_peers.len() spy_ref.read().unwrap().node_info_trace()
); );
} }
sleep(Duration::from_millis( sleep(Duration::from_millis(
@ -124,6 +121,10 @@ pub fn discover(
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
gossip_service.join().unwrap(); gossip_service.join().unwrap();
info!(
"discover failed...\n{}",
spy_ref.read().unwrap().node_info_trace()
);
Err("Failed to converge") Err("Failed to converge")
} }