Add a black box local cluster harness (#3028)

Integration test harness for the network.
This commit is contained in:
anatoly yakovenko 2019-03-01 10:36:52 -08:00 committed by GitHub
parent a57fb00584
commit c27726e065
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 258 additions and 8 deletions

View File

@ -1023,7 +1023,7 @@ impl ClusterInfo {
let len = data.len();
let now = Instant::now();
let self_id = me.read().unwrap().gossip.id;
trace!("PullResponse me: {} len={}", self_id, len);
trace!("PullResponse me: {} from: {} len={}", self_id, from, len);
me.write()
.unwrap()
.gossip

41
src/cluster_tests.rs Normal file
View File

@ -0,0 +1,41 @@
/// Cluster independant integration tests
///
/// All tests must start from an entry point and a funding keypair and
/// discover the rest of the network.
use crate::client::mk_client;
use crate::contact_info::ContactInfo;
use crate::gossip_service::discover;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
/// Spend and verify from every node in the network
pub fn spend_and_verify_all_nodes(
entry_point_info: &ContactInfo,
funding_keypair: &Keypair,
nodes: usize,
) {
let cluster_nodes = discover(&entry_point_info, nodes);
assert!(cluster_nodes.len() >= nodes);
for ingress_node in &cluster_nodes {
let random_keypair = Keypair::new();
let mut client = mk_client(&ingress_node);
let bal = client
.poll_get_balance(&funding_keypair.pubkey())
.expect("balance in source");
assert!(bal > 0);
let mut transaction = SystemTransaction::new_move(
&funding_keypair,
random_keypair.pubkey(),
1,
client.get_last_id(),
0,
);
let sig = client
.retry_transfer(&funding_keypair, &mut transaction, 5)
.unwrap();
for validator in &cluster_nodes {
let mut client = mk_client(&validator);
client.poll_for_signature(&sig).unwrap();
}
}
}

View File

@ -32,6 +32,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::JoinHandle;
use std::thread::{spawn, Result};
use std::time::Duration;
@ -324,18 +325,18 @@ impl Fullnode {
// Runs a thread to manage node role transitions. The returned closure can be used to signal the
// node to exit.
pub fn run(
pub fn start(
mut self,
rotation_notifier: Option<Sender<(FullnodeReturnType, u64)>>,
) -> impl FnOnce() {
) -> (JoinHandle<()>, Arc<AtomicBool>, Receiver<bool>) {
let (sender, receiver) = channel();
let exit = self.exit.clone();
let timeout = Duration::from_secs(1);
spawn(move || loop {
let handle = spawn(move || loop {
if self.exit.load(Ordering::Relaxed) {
debug!("node shutdown requested");
self.close().expect("Unable to close node");
sender.send(true).expect("Unable to signal exit");
let _ = sender.send(true);
break;
}
@ -359,6 +360,14 @@ impl Fullnode {
_ => (),
}
});
(handle, exit, receiver)
}
pub fn run(
self,
rotation_notifier: Option<Sender<(FullnodeReturnType, u64)>>,
) -> impl FnOnce() {
let (_, exit, receiver) = self.start(rotation_notifier);
move || {
exit.store(true, Ordering::Relaxed);
receiver.recv().unwrap();

View File

@ -89,6 +89,11 @@ pub fn make_listening_node(
(gossip_service, new_node_cluster_info_ref, new_node, id)
}
pub fn discover(entry_point_info: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
converge(entry_point_info, num_nodes)
}
//TODO: deprecate this in favor of discover
pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
info!("Wait for convergence with {} nodes", num_nodes);
// Let's spy on the network
@ -102,14 +107,21 @@ pub fn converge(node: &NodeInfo, num_nodes: usize) -> Vec<NodeInfo> {
// Wait for the cluster to converge
for _ in 0..15 {
let rpc_peers = spy_ref.read().unwrap().rpc_peers();
if rpc_peers.len() == num_nodes {
debug!("converge found {} nodes: {:?}", rpc_peers.len(), rpc_peers);
if rpc_peers.len() >= num_nodes {
debug!(
"converge found {}/{} nodes: {:?}",
rpc_peers.len(),
num_nodes,
rpc_peers
);
gossip_service.close().unwrap();
return rpc_peers;
}
debug!(
"converge found {} nodes, need {} more",
"spy_node: {} converge found {}/{} nodes, need {} more",
id,
rpc_peers.len(),
num_nodes,
num_nodes - rpc_peers.len()
);
sleep(Duration::new(1, 0));

View File

@ -31,6 +31,7 @@ pub mod blockstream;
pub mod blockstream_service;
pub mod blocktree_processor;
pub mod cluster_info;
pub mod cluster_tests;
pub mod db_window;
pub mod entry;
#[cfg(feature = "erasure")]
@ -42,6 +43,7 @@ pub mod gossip_service;
pub mod leader_confirmation_service;
pub mod leader_schedule;
pub mod leader_schedule_utils;
pub mod local_cluster;
pub mod local_vote_signer_service;
pub mod packet;
pub mod poh;

146
src/local_cluster.rs Normal file
View File

@ -0,0 +1,146 @@
use crate::blocktree::{create_new_tmp_ledger, tmp_copy_blocktree};
use crate::client::mk_client;
use crate::cluster_info::{Node, NodeInfo};
use crate::fullnode::{Fullnode, FullnodeConfig};
use crate::gossip_service::discover;
use crate::thin_client::retry_get_balance;
use crate::thin_client::ThinClient;
use crate::voting_keypair::VotingKeypair;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction::SystemTransaction;
use std::fs::remove_dir_all;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::JoinHandle;
pub struct LocalCluster {
/// Keypair with funding to particpiate in the network
pub funding_keypair: Keypair,
/// Entry point from which the rest of the network can be discovered
pub entry_point_info: NodeInfo,
fullnode_hdls: Vec<(JoinHandle<()>, Arc<AtomicBool>)>,
ledger_paths: Vec<String>,
}
impl LocalCluster {
pub fn new(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self {
let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey();
let leader_node = Node::new_localhost_with_pubkey(leader_keypair.pubkey());
let (genesis_block, mint_keypair) =
GenesisBlock::new_with_leader(cluster_lamports, leader_pubkey, lamports_per_node);
let (genesis_ledger_path, _last_id) = create_new_tmp_ledger!(&genesis_block);
let leader_ledger_path = tmp_copy_blocktree!(&genesis_ledger_path);
let mut ledger_paths = vec![];
ledger_paths.push(genesis_ledger_path.clone());
ledger_paths.push(leader_ledger_path.clone());
let voting_keypair = VotingKeypair::new_local(&leader_keypair);
let fullnode_config = FullnodeConfig::default();
let leader_node_info = leader_node.info.clone();
let leader_server = Fullnode::new(
leader_node,
&leader_keypair,
&leader_ledger_path,
voting_keypair,
None,
&fullnode_config,
);
let (thread, exit, _) = leader_server.start(None);
let mut fullnode_hdls = vec![(thread, exit)];
let mut client = mk_client(&leader_node_info);
for _ in 0..(num_nodes - 1) {
let validator_keypair = Arc::new(Keypair::new());
let voting_keypair = VotingKeypair::new_local(&validator_keypair);
let validator_pubkey = validator_keypair.pubkey();
let validator_node = Node::new_localhost_with_pubkey(validator_keypair.pubkey());
let ledger_path = tmp_copy_blocktree!(&genesis_ledger_path);
ledger_paths.push(ledger_path.clone());
// Send each validator some tokens to vote
let validator_balance = Self::transfer(
&mut client,
&mint_keypair,
&validator_pubkey,
lamports_per_node,
);
info!(
"validator {} balance {}",
validator_pubkey, validator_balance
);
let validator_server = Fullnode::new(
validator_node,
&validator_keypair,
&ledger_path,
voting_keypair,
Some(&leader_node_info),
&FullnodeConfig::default(),
);
let (thread, exit, _) = validator_server.start(None);
fullnode_hdls.push((thread, exit));
}
discover(&leader_node_info, num_nodes);
Self {
funding_keypair: mint_keypair,
entry_point_info: leader_node_info,
fullnode_hdls,
ledger_paths,
}
}
pub fn exit(&self) {
for node in &self.fullnode_hdls {
node.1.store(true, Ordering::Relaxed);
}
}
pub fn close(&mut self) {
self.exit();
while let Some(node) = self.fullnode_hdls.pop() {
node.0.join().expect("join");
}
for path in &self.ledger_paths {
remove_dir_all(path).unwrap();
}
}
fn transfer(
client: &mut ThinClient,
source_keypair: &Keypair,
dest_pubkey: &Pubkey,
lamports: u64,
) -> u64 {
trace!("getting leader last_id");
let last_id = client.get_last_id();
let mut tx =
SystemTransaction::new_account(&source_keypair, *dest_pubkey, lamports, last_id, 0);
info!(
"executing transfer of {} from {} to {}",
lamports,
source_keypair.pubkey(),
*dest_pubkey
);
client
.retry_transfer(&source_keypair, &mut tx, 5)
.expect("client transfer");
retry_get_balance(client, dest_pubkey, Some(lamports)).expect("get balance")
}
}
impl Drop for LocalCluster {
fn drop(&mut self) {
self.close()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_local_cluster_start_and_exit() {
solana_logger::setup();
let network = LocalCluster::new(1, 100, 2);
drop(network)
}
}

40
tests/local_cluster.rs Normal file
View File

@ -0,0 +1,40 @@
extern crate solana;
use solana::cluster_tests;
use solana::local_cluster::LocalCluster;
#[test]
fn test_spend_and_verify_all_nodes_1() -> () {
solana_logger::setup();
let num_nodes = 1;
let local = LocalCluster::new(num_nodes, 10_000, 100);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
fn test_spend_and_verify_all_nodes_2() -> () {
solana_logger::setup();
let num_nodes = 2;
let local = LocalCluster::new(num_nodes, 10_000, 100);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}
#[test]
fn test_spend_and_verify_all_nodes_3() -> () {
solana_logger::setup();
let num_nodes = 3;
let local = LocalCluster::new(num_nodes, 10_000, 100);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
);
}