Refactor LocalCluster and add support for listener nodes (#3790)

This commit is contained in:
Sagar Dhawan 2019-04-15 15:27:45 -07:00 committed by GitHub
parent 80f3568062
commit 3fcf03ff3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 202 additions and 139 deletions

View File

@ -726,7 +726,7 @@ fn should_switch_directions(num_lamports_per_account: u64, i: u64) -> bool {
mod tests {
use super::*;
use solana::fullnode::FullnodeConfig;
use solana::local_cluster::LocalCluster;
use solana::local_cluster::{ClusterConfig, LocalCluster};
use solana_drone::drone::run_local_drone;
use std::sync::mpsc::channel;
@ -750,8 +750,12 @@ mod tests {
fn test_bench_tps() {
let fullnode_config = FullnodeConfig::default();
const NUM_NODES: usize = 1;
let cluster =
LocalCluster::new_with_config(&[999_990; NUM_NODES], 2_000_000, &fullnode_config, &[]);
let cluster = LocalCluster::new(&ClusterConfig {
node_stakes: vec![999_990; NUM_NODES],
cluster_lamports: 2_000_000,
fullnode_config,
..ClusterConfig::default()
});
let drone_keypair = Keypair::new();
cluster.transfer(&cluster.funding_keypair, &drone_keypair.pubkey(), 1_000_000);

View File

@ -36,7 +36,7 @@ use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::Result;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FullnodeConfig {
pub sigverify_disabled: bool,
pub voting_disabled: bool,

View File

@ -51,13 +51,46 @@ impl ReplicatorInfo {
}
}
#[derive(Clone, Debug)]
pub struct ClusterConfig {
/// The fullnode config that should be applied to every node in the cluster
pub fullnode_config: FullnodeConfig,
/// Number of replicators in the cluster
pub num_replicators: u64,
/// Number of nodes that are unstaked and not voting (a.k.a listening)
pub num_listeners: u64,
/// The stakes of each node
pub node_stakes: Vec<u64>,
/// The total lamports available to the cluster
pub cluster_lamports: u64,
pub ticks_per_slot: u64,
pub slots_per_epoch: u64,
pub native_instruction_processors: Vec<(String, Pubkey)>,
}
impl Default for ClusterConfig {
fn default() -> Self {
ClusterConfig {
fullnode_config: FullnodeConfig::default(),
num_replicators: 0,
num_listeners: 0,
node_stakes: vec![],
cluster_lamports: 0,
ticks_per_slot: DEFAULT_TICKS_PER_SLOT,
slots_per_epoch: DEFAULT_SLOTS_PER_EPOCH,
native_instruction_processors: vec![],
}
}
}
pub struct LocalCluster {
/// Keypair with funding to particpiate in the network
/// Keypair with funding to participate in the network
pub funding_keypair: Keypair,
pub fullnode_config: FullnodeConfig,
/// Entry point from which the rest of the network can be discovered
pub entry_point_info: ContactInfo,
pub fullnode_infos: HashMap<Pubkey, FullnodeInfo>,
pub listener_infos: HashMap<Pubkey, FullnodeInfo>,
fullnodes: HashMap<Pubkey, Fullnode>,
genesis_ledger_path: String,
pub genesis_block: GenesisBlock,
@ -66,67 +99,35 @@ pub struct LocalCluster {
}
impl LocalCluster {
pub fn new(num_nodes: usize, cluster_lamports: u64, lamports_per_node: u64) -> Self {
pub fn new_with_equal_stakes(
num_nodes: usize,
cluster_lamports: u64,
lamports_per_node: u64,
) -> Self {
let stakes: Vec<_> = (0..num_nodes).map(|_| lamports_per_node).collect();
Self::new_with_config(&stakes, cluster_lamports, &FullnodeConfig::default(), &[])
}
pub fn new_with_config(
node_stakes: &[u64],
cluster_lamports: u64,
fullnode_config: &FullnodeConfig,
native_instruction_processors: &[(String, Pubkey)],
) -> Self {
Self::new_with_config_replicators(
node_stakes,
let config = ClusterConfig {
node_stakes: stakes,
cluster_lamports,
fullnode_config,
0,
DEFAULT_TICKS_PER_SLOT,
DEFAULT_SLOTS_PER_EPOCH,
native_instruction_processors,
)
..ClusterConfig::default()
};
Self::new(&config)
}
pub fn new_with_tick_config(
node_stakes: &[u64],
cluster_lamports: u64,
fullnode_config: &FullnodeConfig,
ticks_per_slot: u64,
slots_per_epoch: u64,
native_instruction_processors: &[(String, Pubkey)],
) -> Self {
Self::new_with_config_replicators(
node_stakes,
cluster_lamports,
fullnode_config,
0,
ticks_per_slot,
slots_per_epoch,
native_instruction_processors,
)
}
pub fn new_with_config_replicators(
node_stakes: &[u64],
cluster_lamports: u64,
fullnode_config: &FullnodeConfig,
num_replicators: usize,
ticks_per_slot: u64,
slots_per_epoch: u64,
native_instruction_processors: &[(String, Pubkey)],
) -> Self {
pub fn new(config: &ClusterConfig) -> Self {
let voting_keypair = Keypair::new();
let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey();
let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
let (mut genesis_block, mint_keypair) =
GenesisBlock::new_with_leader(cluster_lamports, &leader_pubkey, node_stakes[0]);
genesis_block.ticks_per_slot = ticks_per_slot;
genesis_block.slots_per_epoch = slots_per_epoch;
let (mut genesis_block, mint_keypair) = GenesisBlock::new_with_leader(
config.cluster_lamports,
&leader_pubkey,
config.node_stakes[0],
);
genesis_block.ticks_per_slot = config.ticks_per_slot;
genesis_block.slots_per_epoch = config.slots_per_epoch;
genesis_block
.native_instruction_processors
.extend_from_slice(native_instruction_processors);
.extend_from_slice(&config.native_instruction_processors);
genesis_block.bootstrap_leader_vote_account_id = voting_keypair.pubkey();
let (genesis_ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block);
let leader_ledger_path = tmp_copy_blocktree!(&genesis_ledger_path);
@ -139,7 +140,7 @@ impl LocalCluster {
&voting_keypair.pubkey(),
voting_keypair,
None,
fullnode_config,
&config.fullnode_config,
);
let mut fullnodes = HashMap::new();
@ -159,22 +160,33 @@ impl LocalCluster {
genesis_block,
fullnode_infos,
replicator_infos: HashMap::new(),
fullnode_config: fullnode_config.clone(),
fullnode_config: config.fullnode_config.clone(),
listener_infos: HashMap::new(),
};
for stake in &node_stakes[1..] {
cluster.add_validator(&fullnode_config, *stake);
for stake in &config.node_stakes[1..] {
cluster.add_validator(&config.fullnode_config, *stake);
}
discover_nodes(&cluster.entry_point_info.gossip, node_stakes.len()).unwrap();
let listener_config = FullnodeConfig {
voting_disabled: true,
..config.fullnode_config.clone()
};
(0..config.num_listeners).for_each(|_| cluster.add_validator(&listener_config, 0));
for _ in 0..num_replicators {
discover_nodes(
&cluster.entry_point_info.gossip,
config.node_stakes.len() + config.num_listeners as usize,
)
.unwrap();
for _ in 0..config.num_replicators {
cluster.add_replicator();
}
discover_nodes(
&cluster.entry_point_info.gossip,
node_stakes.len() + num_replicators,
config.node_stakes.len() + config.num_replicators as usize,
)
.unwrap();
@ -205,23 +217,37 @@ impl LocalCluster {
);
// Must have enough tokens to fund vote account and set delegate
assert!(stake > 2);
let validator_keypair = Arc::new(Keypair::new());
let voting_keypair = Keypair::new();
let validator_pubkey = validator_keypair.pubkey();
let validator_node = Node::new_localhost_with_pubkey(&validator_keypair.pubkey());
let ledger_path = tmp_copy_blocktree!(&self.genesis_ledger_path);
// Send each validator some lamports to vote
let validator_balance =
Self::transfer_with_client(&client, &self.funding_keypair, &validator_pubkey, stake);
info!(
"validator {} balance {}",
validator_pubkey, validator_balance
);
if fullnode_config.voting_disabled {
// setup as a listener
info!("listener {} ", validator_pubkey,);
} else {
assert!(stake > 2);
// Send each validator some lamports to vote
let validator_balance = Self::transfer_with_client(
&client,
&self.funding_keypair,
&validator_pubkey,
stake,
);
info!(
"validator {} balance {}",
validator_pubkey, validator_balance
);
Self::create_and_fund_vote_account(&client, &voting_keypair, &validator_keypair, stake - 1)
Self::create_and_fund_vote_account(
&client,
&voting_keypair,
&validator_keypair,
stake - 1,
)
.unwrap();
}
let validator_server = Fullnode::new(
validator_node,
@ -230,15 +256,22 @@ impl LocalCluster {
&voting_keypair.pubkey(),
voting_keypair,
Some(&self.entry_point_info),
fullnode_config,
&fullnode_config,
);
self.fullnodes
.insert(validator_keypair.pubkey(), validator_server);
self.fullnode_infos.insert(
validator_keypair.pubkey(),
FullnodeInfo::new(validator_keypair.clone(), ledger_path),
);
if fullnode_config.voting_disabled {
self.listener_infos.insert(
validator_keypair.pubkey(),
FullnodeInfo::new(validator_keypair.clone(), ledger_path),
);
} else {
self.fullnode_infos.insert(
validator_keypair.pubkey(),
FullnodeInfo::new(validator_keypair.clone(), ledger_path),
);
}
}
fn add_replicator(&mut self) {
@ -376,6 +409,10 @@ impl LocalCluster {
}
impl Cluster for LocalCluster {
fn get_node_ids(&self) -> Vec<Pubkey> {
self.fullnodes.keys().cloned().collect()
}
fn restart_node(&mut self, pubkey: Pubkey) {
// Shut down the fullnode
let node = self.fullnodes.remove(&pubkey).unwrap();
@ -401,10 +438,6 @@ impl Cluster for LocalCluster {
self.fullnodes.insert(pubkey, restarted_node);
}
fn get_node_ids(&self) -> Vec<Pubkey> {
self.fullnodes.keys().cloned().collect()
}
}
impl Drop for LocalCluster {
@ -421,7 +454,7 @@ mod test {
fn test_local_cluster_start_and_exit() {
solana_logger::setup();
let num_nodes = 1;
let cluster = LocalCluster::new(num_nodes, 100, 3);
let cluster = LocalCluster::new_with_equal_stakes(num_nodes, 100, 3);
assert_eq!(cluster.fullnodes.len(), num_nodes);
assert_eq!(cluster.replicators.len(), 0);
}
@ -429,19 +462,20 @@ mod test {
#[test]
fn test_local_cluster_start_and_exit_with_config() {
solana_logger::setup();
let mut fullnode_exit = FullnodeConfig::default();
fullnode_exit.rpc_config.enable_fullnode_exit = true;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
const NUM_NODES: usize = 1;
let num_replicators = 1;
let cluster = LocalCluster::new_with_config_replicators(
&[3; NUM_NODES],
100,
&fullnode_exit,
num_replicators,
16,
16,
&[],
);
let config = ClusterConfig {
fullnode_config,
num_replicators: 1,
node_stakes: vec![3; NUM_NODES],
cluster_lamports: 100,
ticks_per_slot: 16,
slots_per_epoch: 16,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&config);
assert_eq!(cluster.fullnodes.len(), NUM_NODES);
assert_eq!(cluster.replicators.len(), num_replicators);
}

View File

@ -9,7 +9,7 @@ use std::sync::{Arc, Mutex};
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum PohServiceConfig {
/// * `Tick` - Run full PoH thread. Tick is a rough estimate of how many hashes to roll before
/// transmitting a new entry.

View File

@ -4,7 +4,7 @@ use solana::cluster::Cluster;
use solana::cluster_tests;
use solana::fullnode::FullnodeConfig;
use solana::gossip_service::discover_nodes;
use solana::local_cluster::LocalCluster;
use solana::local_cluster::{ClusterConfig, LocalCluster};
use solana::poh_service::PohServiceConfig;
use solana_sdk::timing;
use std::time::Duration;
@ -13,7 +13,7 @@ use std::time::Duration;
fn test_spend_and_verify_all_nodes_1() {
solana_logger::setup();
let num_nodes = 1;
let local = LocalCluster::new(num_nodes, 10_000, 100);
let local = LocalCluster::new_with_equal_stakes(num_nodes, 10_000, 100);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
@ -25,7 +25,7 @@ fn test_spend_and_verify_all_nodes_1() {
fn test_spend_and_verify_all_nodes_2() {
solana_logger::setup();
let num_nodes = 2;
let local = LocalCluster::new(num_nodes, 10_000, 100);
let local = LocalCluster::new_with_equal_stakes(num_nodes, 10_000, 100);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
@ -37,7 +37,7 @@ fn test_spend_and_verify_all_nodes_2() {
fn test_spend_and_verify_all_nodes_3() {
solana_logger::setup();
let num_nodes = 3;
let local = LocalCluster::new(num_nodes, 10_000, 100);
let local = LocalCluster::new_with_equal_stakes(num_nodes, 10_000, 100);
cluster_tests::spend_and_verify_all_nodes(
&local.entry_point_info,
&local.funding_keypair,
@ -50,7 +50,7 @@ fn test_spend_and_verify_all_nodes_3() {
fn test_fullnode_exit_default_config_should_panic() {
solana_logger::setup();
let num_nodes = 2;
let local = LocalCluster::new(num_nodes, 10_000, 100);
let local = LocalCluster::new_with_equal_stakes(num_nodes, 10_000, 100);
cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes);
}
@ -60,7 +60,13 @@ fn test_fullnode_exit_2() {
let num_nodes = 2;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(&[100; 2], 10_000, &fullnode_config, &[]);
let config = ClusterConfig {
cluster_lamports: 10_000,
node_stakes: vec![100; 2],
fullnode_config,
..ClusterConfig::default()
};
let local = LocalCluster::new(&config);
cluster_tests::fullnode_exit(&local.entry_point_info, num_nodes);
}
@ -71,7 +77,13 @@ fn test_leader_failure_4() {
let num_nodes = 4;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.rpc_config.enable_fullnode_exit = true;
let local = LocalCluster::new_with_config(&[100; 4], 10_000, &fullnode_config, &[]);
let config = ClusterConfig {
cluster_lamports: 10_000,
node_stakes: vec![100; 4],
fullnode_config,
..ClusterConfig::default()
};
let local = LocalCluster::new(&config);
cluster_tests::kill_entry_and_spend_and_verify_rest(
&local.entry_point_info,
&local.funding_keypair,
@ -87,14 +99,14 @@ fn test_two_unbalanced_stakes() {
fullnode_config.tick_config =
PohServiceConfig::Sleep(Duration::from_millis(100 / num_ticks_per_second));
fullnode_config.rpc_config.enable_fullnode_exit = true;
let mut cluster = LocalCluster::new_with_tick_config(
&[999_990, 3],
1_000_000,
&fullnode_config,
num_ticks_per_slot,
num_slots_per_epoch,
&[],
);
let mut cluster = LocalCluster::new(&ClusterConfig {
node_stakes: vec![999_990, 3],
cluster_lamports: 1_000_000,
fullnode_config: fullnode_config.clone(),
ticks_per_slot: num_ticks_per_slot,
slots_per_epoch: num_slots_per_epoch,
..ClusterConfig::default()
});
cluster_tests::sleep_n_epochs(
10.0,
&fullnode_config.tick_config,
@ -113,8 +125,12 @@ fn test_two_unbalanced_stakes() {
fn test_forwarding() {
// Set up a cluster where one node is never the leader, so all txs sent to this node
// will be have to be forwarded in order to be confirmed
let fullnode_config = FullnodeConfig::default();
let cluster = LocalCluster::new_with_config(&[999_990, 3], 2_000_000, &fullnode_config, &[]);
let config = ClusterConfig {
node_stakes: vec![999_990, 3],
cluster_lamports: 2_000_000,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&config);
let cluster_nodes = discover_nodes(&cluster.entry_point_info.gossip, 2).unwrap();
assert!(cluster_nodes.len() >= 2);
@ -132,14 +148,14 @@ fn test_restart_node() {
let fullnode_config = FullnodeConfig::default();
let slots_per_epoch = 8;
let ticks_per_slot = 16;
let mut cluster = LocalCluster::new_with_tick_config(
&[3],
100,
&fullnode_config,
let mut cluster = LocalCluster::new(&ClusterConfig {
node_stakes: vec![3],
cluster_lamports: 100,
fullnode_config: fullnode_config.clone(),
ticks_per_slot,
slots_per_epoch,
&[],
);
..ClusterConfig::default()
});
let nodes = cluster.get_node_ids();
cluster_tests::sleep_n_epochs(
1.0,
@ -156,3 +172,16 @@ fn test_restart_node() {
);
cluster_tests::send_many_transactions(&cluster.entry_point_info, &cluster.funding_keypair, 1);
}
#[test]
fn test_listener_startup() {
let config = ClusterConfig {
node_stakes: vec![100; 1],
cluster_lamports: 1_000,
num_listeners: 3,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&config);
let cluster_nodes = discover_nodes(&cluster.entry_point_info.gossip, 4).unwrap();
assert_eq!(cluster_nodes.len(), 4);
}

View File

@ -10,7 +10,7 @@ use solana::cluster_info::{ClusterInfo, Node, FULLNODE_PORT_RANGE};
use solana::contact_info::ContactInfo;
use solana::fullnode::FullnodeConfig;
use solana::gossip_service::discover_nodes;
use solana::local_cluster::LocalCluster;
use solana::local_cluster::{ClusterConfig, LocalCluster};
use solana::replicator::Replicator;
use solana::replicator::ReplicatorRequest;
use solana::storage_stage::STORAGE_ROTATE_TEST_COUNT;
@ -19,8 +19,6 @@ use solana_client::thin_client::create_client;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::timing::DEFAULT_SLOTS_PER_EPOCH;
use solana_sdk::timing::DEFAULT_TICKS_PER_SLOT;
use std::fs::remove_dir_all;
use std::net::SocketAddr;
use std::net::UdpSocket;
@ -107,15 +105,14 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) {
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
let cluster = LocalCluster::new_with_config_replicators(
&vec![100; num_nodes],
10_000,
&fullnode_config,
num_replicators,
DEFAULT_TICKS_PER_SLOT,
DEFAULT_SLOTS_PER_EPOCH,
&[],
);
let config = ClusterConfig {
fullnode_config,
num_replicators: num_replicators as u64,
node_stakes: vec![100; num_nodes],
cluster_lamports: 10_000,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&config);
let cluster_nodes = discover_nodes(
&cluster.entry_point_info.gossip,
@ -193,7 +190,7 @@ fn test_replicator_startup_ledger_hang() {
info!("starting replicator test");
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
let cluster = LocalCluster::new(2, 10_000, 100);;
let cluster = LocalCluster::new_with_equal_stakes(2, 10_000, 100);;
info!("starting replicator node");
let bad_keys = Arc::new(Keypair::new());
@ -222,19 +219,18 @@ fn test_account_setup() {
let num_replicators = 1;
let mut fullnode_config = FullnodeConfig::default();
fullnode_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
let cluster = LocalCluster::new_with_config_replicators(
&vec![100; num_nodes],
10_000,
&fullnode_config,
let config = ClusterConfig {
fullnode_config,
num_replicators,
DEFAULT_TICKS_PER_SLOT,
DEFAULT_SLOTS_PER_EPOCH,
&[],
);
node_stakes: vec![100; num_nodes],
cluster_lamports: 10_000,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&config);
let _ = discover_nodes(
&cluster.entry_point_info.gossip,
num_nodes + num_replicators,
num_nodes + num_replicators as usize,
)
.unwrap();
// now check that the cluster actually has accounts for the replicator.