Employ gossip_service::discover()
This commit is contained in:
parent
679a718cbf
commit
96c0222b30
|
@ -1,16 +1,13 @@
|
||||||
mod bench;
|
mod bench;
|
||||||
mod cli;
|
mod cli;
|
||||||
|
|
||||||
|
use crate::bench::*;
|
||||||
use solana::client::mk_client;
|
use solana::client::mk_client;
|
||||||
use solana::cluster_info::{ClusterInfo, NodeInfo};
|
|
||||||
use solana::gossip_service::GossipService;
|
|
||||||
|
|
||||||
use solana::gen_keys::GenKeys;
|
use solana::gen_keys::GenKeys;
|
||||||
use solana::service::Service;
|
use solana::gossip_service::discover;
|
||||||
use solana::thin_client::poll_gossip_for_leader;
|
use solana::thin_client::poll_gossip_for_leader;
|
||||||
use solana_metrics;
|
use solana_metrics;
|
||||||
use solana_sdk::signature::{Keypair, KeypairUtil};
|
use solana_sdk::signature::{Keypair, KeypairUtil};
|
||||||
|
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::process::exit;
|
use std::process::exit;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
|
||||||
|
@ -20,56 +17,6 @@ use std::thread::Builder;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use crate::bench::*;
|
|
||||||
|
|
||||||
/// Creates a cluster and waits for the network to converge, returning the peers, leader, and gossip service
|
|
||||||
/// # Arguments
|
|
||||||
/// `leader` - the input leader node
|
|
||||||
/// `exit_signal` - atomic bool used to signal early exit to cluster
|
|
||||||
/// `num_nodes` - the number of nodes
|
|
||||||
/// # Panics
|
|
||||||
/// Panics if the spy node `RwLock` somehow ends up unreadable
|
|
||||||
fn converge(
|
|
||||||
leader: &NodeInfo,
|
|
||||||
exit_signal: &Arc<AtomicBool>,
|
|
||||||
num_nodes: usize,
|
|
||||||
) -> (Vec<NodeInfo>, Option<NodeInfo>, GossipService) {
|
|
||||||
//lets spy on the network
|
|
||||||
let (node, gossip_socket) = ClusterInfo::spy_node();
|
|
||||||
println!("Spy node: {}", node.id);
|
|
||||||
let mut spy_cluster_info = ClusterInfo::new_with_invalid_keypair(node);
|
|
||||||
spy_cluster_info.insert_info(leader.clone());
|
|
||||||
spy_cluster_info.set_leader(leader.id);
|
|
||||||
let spy_ref = Arc::new(RwLock::new(spy_cluster_info));
|
|
||||||
let gossip_service = GossipService::new(&spy_ref, None, None, gossip_socket, &exit_signal);
|
|
||||||
let mut v: Vec<NodeInfo> = vec![];
|
|
||||||
// wait for the network to converge, 30 seconds should be plenty
|
|
||||||
for _ in 0..30 {
|
|
||||||
{
|
|
||||||
let spy_ref = spy_ref.read().unwrap();
|
|
||||||
|
|
||||||
println!("{}", spy_ref.node_info_trace());
|
|
||||||
|
|
||||||
if spy_ref.leader_data().is_some() {
|
|
||||||
v = spy_ref.rpc_peers();
|
|
||||||
if v.len() >= num_nodes {
|
|
||||||
println!("CONVERGED!");
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
println!(
|
|
||||||
"{} node(s) discovered (looking for {} or more)",
|
|
||||||
v.len(),
|
|
||||||
num_nodes
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sleep(Duration::new(1, 0));
|
|
||||||
}
|
|
||||||
let leader = spy_ref.read().unwrap().leader_data().cloned();
|
|
||||||
(v, leader, gossip_service)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
solana_metrics::set_panic_hook("bench-tps");
|
solana_metrics::set_panic_hook("bench-tps");
|
||||||
|
@ -101,9 +48,7 @@ fn main() {
|
||||||
exit(1);
|
exit(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
let exit_signal = Arc::new(AtomicBool::new(false));
|
let nodes = discover(&leader, num_nodes);
|
||||||
let (nodes, leader, gossip_service) = converge(&leader, &exit_signal, num_nodes);
|
|
||||||
|
|
||||||
if nodes.len() < num_nodes {
|
if nodes.len() < num_nodes {
|
||||||
println!(
|
println!(
|
||||||
"Error: Insufficient nodes discovered. Expecting {} or more",
|
"Error: Insufficient nodes discovered. Expecting {} or more",
|
||||||
|
@ -119,17 +64,10 @@ fn main() {
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if leader.is_none() {
|
|
||||||
println!("no leader");
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if converge_only {
|
if converge_only {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let leader = leader.unwrap();
|
|
||||||
|
|
||||||
println!("leader RPC is at {} {}", leader.rpc, leader.id);
|
println!("leader RPC is at {} {}", leader.rpc, leader.id);
|
||||||
let mut client = mk_client(&leader);
|
let mut client = mk_client(&leader);
|
||||||
let mut barrier_client = mk_client(&leader);
|
let mut barrier_client = mk_client(&leader);
|
||||||
|
@ -176,6 +114,8 @@ fn main() {
|
||||||
let first_tx_count = client.transaction_count();
|
let first_tx_count = client.transaction_count();
|
||||||
println!("Initial transaction count {}", first_tx_count);
|
println!("Initial transaction count {}", first_tx_count);
|
||||||
|
|
||||||
|
let exit_signal = Arc::new(AtomicBool::new(false));
|
||||||
|
|
||||||
// Setup a thread per validator to sample every period
|
// Setup a thread per validator to sample every period
|
||||||
// collect the max transaction rate and total tx count seen
|
// collect the max transaction rate and total tx count seen
|
||||||
let maxes = Arc::new(RwLock::new(Vec::new()));
|
let maxes = Arc::new(RwLock::new(Vec::new()));
|
||||||
|
@ -294,9 +234,6 @@ fn main() {
|
||||||
&start.elapsed(),
|
&start.elapsed(),
|
||||||
total_tx_sent_count.load(Ordering::Relaxed),
|
total_tx_sent_count.load(Ordering::Relaxed),
|
||||||
);
|
);
|
||||||
|
|
||||||
// join the cluster_info client threads
|
|
||||||
gossip_service.join().unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
Loading…
Reference in New Issue