Support for custom BroadcastStage in local cluster tests (#4716)

* Refactor BroadcastStage to support custom implementations, add FailEntryVerificationBroadcastRun implementation

* Plumb switch on broadcast type through validator

* Add test for validator generating non-verifiable entries to local_cluster

* Fix bad initializers

* Refactor broadcast run code into utils
This commit is contained in:
carllin 2019-06-19 00:13:19 -07:00 committed by GitHub
parent 6bc0d2a0cb
commit 46bb79df29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 575 additions and 238 deletions

View File

@ -963,7 +963,6 @@ mod tests {
solana_logger::setup();
const NUM_NODES: usize = 1;
let validator_config = ValidatorConfig::default();
let mut config = Config::default();
config.identity = Keypair::new();
@ -985,7 +984,7 @@ mod tests {
let cluster = LocalCluster::new(&ClusterConfig {
node_stakes: vec![100_000; NUM_NODES],
cluster_lamports: 100_000_000_000_000,
validator_config,
validator_configs: vec![ValidatorConfig::default(); NUM_NODES],
native_instruction_processors: [solana_exchange_program!()].to_vec(),
..ClusterConfig::default()
});

View File

@ -668,12 +668,11 @@ mod tests {
#[test]
fn test_bench_tps_local_cluster() {
solana_logger::setup();
let validator_config = ValidatorConfig::default();
const NUM_NODES: usize = 1;
let cluster = LocalCluster::new(&ClusterConfig {
node_stakes: vec![999_990; NUM_NODES],
cluster_lamports: 2_000_000,
validator_config,
validator_configs: vec![ValidatorConfig::default(); NUM_NODES],
..ClusterConfig::default()
});

View File

@ -1,28 +1,28 @@
//! A stage to broadcast data from a leader node to validators
//!
use self::fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun;
use self::standard_broadcast_run::StandardBroadcastRun;
use crate::blocktree::Blocktree;
use crate::cluster_info::{ClusterInfo, ClusterInfoError};
use crate::entry::EntrySlice;
use crate::erasure::CodingGenerator;
use crate::packet::index_blobs;
use crate::poh_recorder::WorkingBankEntries;
use crate::result::{Error, Result};
use crate::service::Service;
use crate::staking_utils;
use rayon::prelude::*;
use rayon::ThreadPool;
use solana_metrics::{
datapoint, inc_new_counter_debug, inc_new_counter_error, inc_new_counter_info,
};
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signable;
use solana_sdk::timing::duration_as_ms;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError};
use std::sync::{Arc, RwLock};
use std::thread::{self, Builder, JoinHandle};
use std::time::{Duration, Instant};
use std::time::Instant;
mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run;
pub const NUM_THREADS: u32 = 10;
@ -31,170 +31,57 @@ pub enum BroadcastStageReturnType {
ChannelDisconnected,
}
#[derive(Default)]
struct BroadcastStats {
num_entries: Vec<usize>,
run_elapsed: Vec<u64>,
to_blobs_elapsed: Vec<u64>,
#[derive(PartialEq, Clone, Debug)]
pub enum BroadcastStageType {
Standard,
FailEntryVerification,
}
struct Broadcast {
id: Pubkey,
coding_generator: CodingGenerator,
stats: BroadcastStats,
thread_pool: ThreadPool,
impl BroadcastStageType {
pub fn new_broadcast_stage(
&self,
sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>,
receiver: Receiver<WorkingBankEntries>,
exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
) -> BroadcastStage {
match self {
BroadcastStageType::Standard => BroadcastStage::new(
sock,
cluster_info,
receiver,
exit_sender,
blocktree,
StandardBroadcastRun::new(),
),
BroadcastStageType::FailEntryVerification => BroadcastStage::new(
sock,
cluster_info,
receiver,
exit_sender,
blocktree,
FailEntryVerificationBroadcastRun::new(),
),
}
}
}
impl Broadcast {
trait BroadcastRun {
fn run(
&mut self,
broadcast: &mut Broadcast,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
let timer = Duration::new(1, 0);
let (mut bank, entries) = receiver.recv_timeout(timer)?;
let mut max_tick_height = bank.max_tick_height();
) -> Result<()>;
}
let run_start = Instant::now();
let mut num_entries = entries.len();
let mut ventries = Vec::new();
let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
assert!(last_tick <= max_tick_height);
if last_tick != max_tick_height {
while let Ok((same_bank, entries)) = receiver.try_recv() {
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if same_bank.slot() != bank.slot() {
num_entries = 0;
ventries.clear();
bank = same_bank.clone();
max_tick_height = bank.max_tick_height();
}
num_entries += entries.len();
last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
assert!(last_tick <= max_tick_height,);
if last_tick == max_tick_height {
break;
}
}
}
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
let to_blobs_start = Instant::now();
let blobs: Vec<_> = self.thread_pool.install(|| {
ventries
.into_par_iter()
.map(|p| {
let entries: Vec<_> = p.into_iter().map(|e| e.0).collect();
entries.to_shared_blobs()
})
.flatten()
.collect()
});
let blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
index_blobs(
&blobs,
&self.id,
blob_index,
bank.slot(),
bank.parent().map_or(0, |parent| parent.slot()),
);
if last_tick == max_tick_height {
blobs.last().unwrap().write().unwrap().set_is_last_in_slot();
}
// Make sure not to modify the blob header or data after signing it here
self.thread_pool.install(|| {
blobs.par_iter().for_each(|b| {
b.write()
.unwrap()
.sign(&cluster_info.read().unwrap().keypair);
})
});
blocktree.write_shared_blobs(&blobs)?;
let coding = self.coding_generator.next(&blobs);
self.thread_pool.install(|| {
coding.par_iter().for_each(|c| {
c.write()
.unwrap()
.sign(&cluster_info.read().unwrap().keypair);
})
});
let to_blobs_elapsed = duration_as_ms(&to_blobs_start.elapsed());
let broadcast_start = Instant::now();
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
// Send out data
cluster_info
.read()
.unwrap()
.broadcast(sock, &blobs, stakes.as_ref())?;
inc_new_counter_debug!("streamer-broadcast-sent", blobs.len());
// send out erasures
cluster_info
.read()
.unwrap()
.broadcast(sock, &coding, stakes.as_ref())?;
self.update_broadcast_stats(
duration_as_ms(&broadcast_start.elapsed()),
duration_as_ms(&run_start.elapsed()),
num_entries,
to_blobs_elapsed,
blob_index,
);
Ok(())
}
fn update_broadcast_stats(
&mut self,
broadcast_elapsed: u64,
run_elapsed: u64,
num_entries: usize,
to_blobs_elapsed: u64,
blob_index: u64,
) {
inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize);
self.stats.num_entries.push(num_entries);
self.stats.to_blobs_elapsed.push(to_blobs_elapsed);
self.stats.run_elapsed.push(run_elapsed);
if self.stats.num_entries.len() >= 16 {
info!(
"broadcast: entries: {:?} blob times ms: {:?} broadcast times ms: {:?}",
self.stats.num_entries, self.stats.to_blobs_elapsed, self.stats.run_elapsed
);
self.stats.num_entries.clear();
self.stats.to_blobs_elapsed.clear();
self.stats.run_elapsed.clear();
}
datapoint!("broadcast-service", ("transmit-index", blob_index, i64));
}
struct Broadcast {
coding_generator: CodingGenerator,
thread_pool: ThreadPool,
}
// Implement a destructor for the BroadcastStage thread to signal it exited
@ -226,14 +113,12 @@ impl BroadcastStage {
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
blocktree: &Arc<Blocktree>,
mut broadcast_stage_run: impl BroadcastRun,
) -> BroadcastStageReturnType {
let me = cluster_info.read().unwrap().my_data().clone();
let coding_generator = CodingGenerator::default();
let mut broadcast = Broadcast {
id: me.id,
coding_generator,
stats: BroadcastStats::default(),
thread_pool: rayon::ThreadPoolBuilder::new()
.num_threads(sys_info::cpu_num().unwrap_or(NUM_THREADS) as usize)
.build()
@ -241,7 +126,9 @@ impl BroadcastStage {
};
loop {
if let Err(e) = broadcast.run(&cluster_info, receiver, sock, blocktree) {
if let Err(e) =
broadcast_stage_run.run(&mut broadcast, &cluster_info, receiver, sock, blocktree)
{
match e {
Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => {
return BroadcastStageReturnType::ChannelDisconnected;
@ -273,12 +160,13 @@ impl BroadcastStage {
/// which will then close FetchStage in the Tpu, and then the rest of the Tpu,
/// completing the cycle.
#[allow(clippy::too_many_arguments)]
pub fn new(
fn new(
sock: UdpSocket,
cluster_info: Arc<RwLock<ClusterInfo>>,
receiver: Receiver<WorkingBankEntries>,
exit_sender: &Arc<AtomicBool>,
blocktree: &Arc<Blocktree>,
broadcast_stage_run: impl BroadcastRun + Send + 'static,
) -> Self {
let blocktree = blocktree.clone();
let exit_sender = exit_sender.clone();
@ -286,7 +174,13 @@ impl BroadcastStage {
.name("solana-broadcaster".to_string())
.spawn(move || {
let _finalizer = Finalizer::new(exit_sender);
Self::run(&sock, &cluster_info, &receiver, &blocktree)
Self::run(
&sock,
&cluster_info,
&receiver,
&blocktree,
broadcast_stage_run,
)
})
.unwrap();
@ -312,6 +206,7 @@ mod test {
use crate::service::Service;
use solana_runtime::bank::Bank;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
@ -357,6 +252,7 @@ mod test {
entry_receiver,
&exit_sender,
&blocktree,
StandardBroadcastRun::new(),
);
MockBroadcastStage {

View File

@ -0,0 +1,156 @@
use crate::entry::Entry;
use crate::entry::EntrySlice;
use crate::erasure::CodingGenerator;
use crate::packet::{self, SharedBlob};
use crate::poh_recorder::WorkingBankEntries;
use crate::result::Result;
use rayon::prelude::*;
use rayon::ThreadPool;
use solana_runtime::bank::Bank;
use solana_sdk::signature::{Keypair, KeypairUtil, Signable};
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::time::{Duration, Instant};
pub(super) struct ReceiveResults {
pub ventries: Vec<Vec<(Entry, u64)>>,
pub num_entries: usize,
pub time_elapsed: Duration,
pub bank: Arc<Bank>,
pub last_tick: u64,
}
impl ReceiveResults {
pub fn new(
ventries: Vec<Vec<(Entry, u64)>>,
num_entries: usize,
time_elapsed: Duration,
bank: Arc<Bank>,
last_tick: u64,
) -> Self {
Self {
ventries,
num_entries,
time_elapsed,
bank,
last_tick,
}
}
}
pub(super) fn recv_slot_blobs(receiver: &Receiver<WorkingBankEntries>) -> Result<ReceiveResults> {
let timer = Duration::new(1, 0);
let (mut bank, entries) = receiver.recv_timeout(timer)?;
let recv_start = Instant::now();
let mut max_tick_height = bank.max_tick_height();
let mut num_entries = entries.len();
let mut ventries = Vec::new();
let mut last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
assert!(last_tick <= max_tick_height);
if last_tick != max_tick_height {
while let Ok((same_bank, entries)) = receiver.try_recv() {
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if same_bank.slot() != bank.slot() {
num_entries = 0;
ventries.clear();
bank = same_bank.clone();
max_tick_height = bank.max_tick_height();
}
num_entries += entries.len();
last_tick = entries.last().map(|v| v.1).unwrap_or(0);
ventries.push(entries);
assert!(last_tick <= max_tick_height,);
if last_tick == max_tick_height {
break;
}
}
}
let recv_end = recv_start.elapsed();
let receive_results = ReceiveResults::new(ventries, num_entries, recv_end, bank, last_tick);
Ok(receive_results)
}
pub(super) fn entries_to_blobs(
ventries: Vec<Vec<(Entry, u64)>>,
thread_pool: &ThreadPool,
latest_blob_index: u64,
last_tick: u64,
bank: &Bank,
keypair: &Keypair,
coding_generator: &mut CodingGenerator,
) -> (Vec<SharedBlob>, Vec<SharedBlob>) {
let blobs = generate_data_blobs(
ventries,
thread_pool,
latest_blob_index,
last_tick,
&bank,
&keypair,
);
let coding = generate_coding_blobs(&blobs, &thread_pool, coding_generator, &keypair);
(blobs, coding)
}
pub(super) fn generate_data_blobs(
ventries: Vec<Vec<(Entry, u64)>>,
thread_pool: &ThreadPool,
latest_blob_index: u64,
last_tick: u64,
bank: &Bank,
keypair: &Keypair,
) -> Vec<SharedBlob> {
let blobs: Vec<SharedBlob> = thread_pool.install(|| {
ventries
.into_par_iter()
.map(|p| {
let entries: Vec<_> = p.into_iter().map(|e| e.0).collect();
entries.to_shared_blobs()
})
.flatten()
.collect()
});
packet::index_blobs(
&blobs,
&keypair.pubkey(),
latest_blob_index,
bank.slot(),
bank.parent().map_or(0, |parent| parent.slot()),
);
if last_tick == bank.max_tick_height() {
blobs.last().unwrap().write().unwrap().set_is_last_in_slot();
}
// Make sure not to modify the blob header or data after signing it here
thread_pool.install(|| {
blobs.par_iter().for_each(|b| {
b.write().unwrap().sign(keypair);
})
});
blobs
}
pub(super) fn generate_coding_blobs(
blobs: &[SharedBlob],
thread_pool: &ThreadPool,
coding_generator: &mut CodingGenerator,
keypair: &Keypair,
) -> Vec<SharedBlob> {
let coding = coding_generator.next(&blobs);
thread_pool.install(|| {
coding.par_iter().for_each(|c| {
c.write().unwrap().sign(keypair);
})
});
coding
}

View File

@ -0,0 +1,70 @@
use super::*;
use solana_sdk::hash::Hash;
pub(super) struct FailEntryVerificationBroadcastRun {}
impl FailEntryVerificationBroadcastRun {
pub(super) fn new() -> Self {
Self {}
}
}
impl BroadcastRun for FailEntryVerificationBroadcastRun {
fn run(
&mut self,
broadcast: &mut Broadcast,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
// 1) Pull entries from banking stage
let mut receive_results = broadcast_utils::recv_slot_blobs(receiver)?;
let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick;
// 2) Convert entries to blobs + generate coding blobs. Set a garbage PoH on the last entry
// in the slot to make verification fail on validators
if last_tick == bank.max_tick_height() {
let mut last_entry = receive_results
.ventries
.last_mut()
.unwrap()
.last_mut()
.unwrap();
last_entry.0.hash = Hash::default();
}
let keypair = &cluster_info.read().unwrap().keypair.clone();
let latest_blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs(
receive_results.ventries,
&broadcast.thread_pool,
latest_blob_index,
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
);
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;
// 3) Start broadcast step
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
// Broadcast data + erasures
cluster_info.read().unwrap().broadcast(
sock,
data_blobs.iter().chain(coding_blobs.iter()),
stakes.as_ref(),
)?;
Ok(())
}
}

View File

@ -0,0 +1,116 @@
use super::broadcast_utils;
use super::*;
#[derive(Default)]
struct BroadcastStats {
num_entries: Vec<usize>,
run_elapsed: Vec<u64>,
to_blobs_elapsed: Vec<u64>,
}
pub(super) struct StandardBroadcastRun {
stats: BroadcastStats,
}
impl StandardBroadcastRun {
pub(super) fn new() -> Self {
Self {
stats: BroadcastStats::default(),
}
}
fn update_broadcast_stats(
&mut self,
broadcast_elapsed: u64,
run_elapsed: u64,
num_entries: usize,
to_blobs_elapsed: u64,
blob_index: u64,
) {
inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize);
self.stats.num_entries.push(num_entries);
self.stats.to_blobs_elapsed.push(to_blobs_elapsed);
self.stats.run_elapsed.push(run_elapsed);
if self.stats.num_entries.len() >= 16 {
info!(
"broadcast: entries: {:?} blob times ms: {:?} broadcast times ms: {:?}",
self.stats.num_entries, self.stats.to_blobs_elapsed, self.stats.run_elapsed
);
self.stats.num_entries.clear();
self.stats.to_blobs_elapsed.clear();
self.stats.run_elapsed.clear();
}
datapoint!("broadcast-service", ("transmit-index", blob_index, i64));
}
}
impl BroadcastRun for StandardBroadcastRun {
fn run(
&mut self,
broadcast: &mut Broadcast,
cluster_info: &Arc<RwLock<ClusterInfo>>,
receiver: &Receiver<WorkingBankEntries>,
sock: &UdpSocket,
blocktree: &Arc<Blocktree>,
) -> Result<()> {
// 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_blobs(receiver)?;
let receive_elapsed = receive_results.time_elapsed;
let num_entries = receive_results.num_entries;
let bank = receive_results.bank.clone();
let last_tick = receive_results.last_tick;
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
// 2) Convert entries to blobs + generate coding blobs
let to_blobs_start = Instant::now();
let keypair = &cluster_info.read().unwrap().keypair.clone();
let latest_blob_index = blocktree
.meta(bank.slot())
.expect("Database error")
.map(|meta| meta.consumed)
.unwrap_or(0);
let (data_blobs, coding_blobs) = broadcast_utils::entries_to_blobs(
receive_results.ventries,
&broadcast.thread_pool,
latest_blob_index,
last_tick,
&bank,
&keypair,
&mut broadcast.coding_generator,
);
blocktree.write_shared_blobs(data_blobs.iter().chain(coding_blobs.iter()))?;
let to_blobs_elapsed = to_blobs_start.elapsed();
// 3) Start broadcast step
let broadcast_start = Instant::now();
let bank_epoch = bank.get_stakers_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
// Broadcast data + erasures
cluster_info.read().unwrap().broadcast(
sock,
data_blobs.iter().chain(coding_blobs.iter()),
stakes.as_ref(),
)?;
inc_new_counter_debug!(
"streamer-broadcast-sent",
data_blobs.len() + coding_blobs.len()
);
let broadcast_elapsed = broadcast_start.elapsed();
self.update_broadcast_stats(
duration_as_ms(&broadcast_elapsed),
duration_as_ms(&(receive_elapsed + to_blobs_elapsed + broadcast_elapsed)),
num_entries,
duration_as_ms(&to_blobs_elapsed),
latest_blob_index,
);
Ok(())
}
}

View File

@ -44,6 +44,7 @@ use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::{duration_as_ms, timestamp};
use solana_sdk::transaction::Transaction;
use std::borrow::Borrow;
use std::borrow::Cow;
use std::cmp::min;
use std::collections::{BTreeSet, HashMap};
@ -709,16 +710,22 @@ impl ClusterInfo {
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
pub fn broadcast(
pub fn broadcast<I>(
&self,
s: &UdpSocket,
blobs: &[SharedBlob],
blobs: I,
stakes: Option<&HashMap<Pubkey, u64>>,
) -> Result<()> {
) -> Result<()>
where
I: IntoIterator,
I::Item: Borrow<SharedBlob>,
{
let mut last_err = Ok(());
let mut broadcast_table_len = 0;
blobs.iter().for_each(|b| {
let blob = b.read().unwrap();
let mut blobs_len = 0;
blobs.into_iter().for_each(|b| {
blobs_len += 1;
let blob = b.borrow().read().unwrap();
let broadcast_table = self.sorted_tvu_peers(stakes, ChaChaRng::from_seed(blob.seed()));
broadcast_table_len = cmp::max(broadcast_table_len, broadcast_table.len());
@ -732,7 +739,7 @@ impl ClusterInfo {
last_err?;
inc_new_counter_debug!("cluster_info-broadcast-max_idx", blobs.len());
inc_new_counter_debug!("cluster_info-broadcast-max_idx", blobs_len);
if broadcast_table_len != 0 {
inc_new_counter_warn!("broadcast_service-num_peers", broadcast_table_len + 1);
}

View File

@ -8,11 +8,13 @@ use crate::contact_info::ContactInfo;
use crate::entry::{Entry, EntrySlice};
use crate::gossip_service::discover_cluster;
use crate::locktower::VOTE_THRESHOLD_DEPTH;
use hashbrown::HashSet;
use solana_client::thin_client::create_client;
use solana_runtime::epoch_schedule::MINIMUM_SLOTS_PER_EPOCH;
use solana_sdk::client::SyncClient;
use solana_sdk::hash::Hash;
use solana_sdk::poh_config::PohConfig;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil, Signature};
use solana_sdk::system_transaction;
use solana_sdk::timing::{
@ -26,14 +28,18 @@ use std::time::Duration;
const DEFAULT_SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / DEFAULT_NUM_TICKS_PER_SECOND;
/// Spend and verify from every node in the network
pub fn spend_and_verify_all_nodes(
pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher>(
entry_point_info: &ContactInfo,
funding_keypair: &Keypair,
nodes: usize,
ignore_nodes: HashSet<Pubkey, S>,
) {
let (cluster_nodes, _) = discover_cluster(&entry_point_info.gossip, nodes).unwrap();
assert!(cluster_nodes.len() >= nodes);
for ingress_node in &cluster_nodes {
if ignore_nodes.contains(&ingress_node.id) {
continue;
}
let random_keypair = Keypair::new();
let client = create_client(ingress_node.client_facing_addr(), FULLNODE_PORT_RANGE);
let bal = client
@ -48,6 +54,9 @@ pub fn spend_and_verify_all_nodes(
.retry_transfer_until_confirmed(&funding_keypair, &mut transaction, 5, confs)
.unwrap();
for validator in &cluster_nodes {
if ignore_nodes.contains(&validator.id) {
continue;
}
let client = create_client(validator.client_facing_addr(), FULLNODE_PORT_RANGE);
client.poll_for_signature_confirmation(&sig, confs).unwrap();
}

View File

@ -51,10 +51,24 @@ impl ReplicatorInfo {
}
}
pub struct ClusterValidatorInfo {
pub info: ValidatorInfo,
pub config: ValidatorConfig,
}
impl ClusterValidatorInfo {
pub fn new(validator_info: ValidatorInfo, config: ValidatorConfig) -> Self {
Self {
info: validator_info,
config,
}
}
}
#[derive(Clone, Debug)]
pub struct ClusterConfig {
/// The fullnode config that should be applied to every node in the cluster
pub validator_config: ValidatorConfig,
pub validator_configs: Vec<ValidatorConfig>,
/// Number of replicators in the cluster
/// Note- replicators will timeout if ticks_per_slot is much larger than the default 8
pub num_replicators: usize,
@ -74,7 +88,7 @@ pub struct ClusterConfig {
impl Default for ClusterConfig {
fn default() -> Self {
ClusterConfig {
validator_config: ValidatorConfig::default(),
validator_configs: vec![],
num_replicators: 0,
num_listeners: 0,
node_stakes: vec![],
@ -91,11 +105,10 @@ impl Default for ClusterConfig {
pub struct LocalCluster {
/// Keypair with funding to participate in the network
pub funding_keypair: Keypair,
pub validator_config: ValidatorConfig,
/// Entry point from which the rest of the network can be discovered
pub entry_point_info: ContactInfo,
pub fullnode_infos: HashMap<Pubkey, ValidatorInfo>,
pub listener_infos: HashMap<Pubkey, ValidatorInfo>,
pub fullnode_infos: HashMap<Pubkey, ClusterValidatorInfo>,
pub listener_infos: HashMap<Pubkey, ClusterValidatorInfo>,
fullnodes: HashMap<Pubkey, Validator>,
genesis_ledger_path: String,
pub genesis_block: GenesisBlock,
@ -113,12 +126,14 @@ impl LocalCluster {
let config = ClusterConfig {
node_stakes: stakes,
cluster_lamports,
validator_configs: vec![ValidatorConfig::default(); num_nodes],
..ClusterConfig::default()
};
Self::new(&config)
}
pub fn new(config: &ClusterConfig) -> Self {
assert_eq!(config.validator_configs.len(), config.node_stakes.len());
let leader_keypair = Arc::new(Keypair::new());
let leader_pubkey = leader_keypair.pubkey();
let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey());
@ -161,22 +176,24 @@ impl LocalCluster {
&leader_voting_keypair,
&leader_storage_keypair,
None,
&config.validator_config,
&config.validator_configs[0],
);
let mut fullnodes = HashMap::new();
let mut fullnode_infos = HashMap::new();
fullnodes.insert(leader_pubkey, leader_server);
fullnode_infos.insert(
leader_pubkey,
ValidatorInfo {
keypair: leader_keypair,
voting_keypair: leader_voting_keypair,
storage_keypair: leader_storage_keypair,
ledger_path: leader_ledger_path,
contact_info: leader_contact_info.clone(),
},
);
let leader_info = ValidatorInfo {
keypair: leader_keypair,
voting_keypair: leader_voting_keypair,
storage_keypair: leader_storage_keypair,
ledger_path: leader_ledger_path,
contact_info: leader_contact_info.clone(),
};
let cluster_leader =
ClusterValidatorInfo::new(leader_info, config.validator_configs[0].clone());
fullnode_infos.insert(leader_pubkey, cluster_leader);
let mut cluster = Self {
funding_keypair: mint_keypair,
@ -187,17 +204,19 @@ impl LocalCluster {
genesis_block,
fullnode_infos,
replicator_infos: HashMap::new(),
validator_config: config.validator_config.clone(),
listener_infos: HashMap::new(),
};
for stake in &config.node_stakes[1..] {
cluster.add_validator(&config.validator_config, *stake);
for (stake, validator_config) in (&config.node_stakes[1..])
.iter()
.zip((&config.validator_configs[1..]).iter())
{
cluster.add_validator(validator_config, *stake);
}
let listener_config = ValidatorConfig {
voting_disabled: true,
..config.validator_config.clone()
..config.validator_configs[0].clone()
};
(0..config.num_listeners).for_each(|_| cluster.add_validator(&listener_config, 0));
@ -294,28 +313,22 @@ impl LocalCluster {
self.fullnodes
.insert(validator_keypair.pubkey(), validator_server);
let validator_pubkey = validator_keypair.pubkey();
let validator_info = ClusterValidatorInfo::new(
ValidatorInfo {
keypair: validator_keypair,
voting_keypair,
storage_keypair,
ledger_path,
contact_info,
},
validator_config.clone(),
);
if validator_config.voting_disabled {
self.listener_infos.insert(
validator_keypair.pubkey(),
ValidatorInfo {
keypair: validator_keypair,
voting_keypair,
storage_keypair,
ledger_path,
contact_info,
},
);
self.listener_infos.insert(validator_pubkey, validator_info);
} else {
self.fullnode_infos.insert(
validator_keypair.pubkey(),
ValidatorInfo {
keypair: validator_keypair,
voting_keypair,
storage_keypair,
ledger_path,
contact_info,
},
);
self.fullnode_infos.insert(validator_pubkey, validator_info);
}
}
@ -362,7 +375,7 @@ impl LocalCluster {
for ledger_path in self
.fullnode_infos
.values()
.map(|f| &f.ledger_path)
.map(|f| &f.info.ledger_path)
.chain(self.replicator_infos.values().map(|info| &info.ledger_path))
{
remove_dir_all(&ledger_path)
@ -519,9 +532,12 @@ impl Cluster for LocalCluster {
}
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient> {
self.fullnode_infos
.get(pubkey)
.map(|f| create_client(f.contact_info.client_facing_addr(), FULLNODE_PORT_RANGE))
self.fullnode_infos.get(pubkey).map(|f| {
create_client(
f.info.contact_info.client_facing_addr(),
FULLNODE_PORT_RANGE,
)
})
}
fn restart_node(&mut self, pubkey: Pubkey) {
@ -531,7 +547,8 @@ impl Cluster for LocalCluster {
node.join().unwrap();
// Restart the node
let fullnode_info = &self.fullnode_infos[&pubkey];
let fullnode_info = &self.fullnode_infos[&pubkey].info;
let config = &self.fullnode_infos[&pubkey].config;
let node = Node::new_localhost_with_pubkey(&fullnode_info.keypair.pubkey());
if pubkey == self.entry_point_info.id {
self.entry_point_info = node.info.clone();
@ -544,7 +561,7 @@ impl Cluster for LocalCluster {
&fullnode_info.voting_keypair,
&fullnode_info.storage_keypair,
None,
&self.validator_config,
config,
);
self.fullnodes.insert(pubkey, restarted_node);
@ -581,7 +598,7 @@ mod test {
const NUM_NODES: usize = 1;
let num_replicators = 1;
let config = ClusterConfig {
validator_config,
validator_configs: vec![ValidatorConfig::default(); NUM_NODES],
num_replicators,
node_stakes: vec![3; NUM_NODES],
cluster_lamports: 100,
@ -593,5 +610,4 @@ mod test {
assert_eq!(cluster.fullnodes.len(), NUM_NODES);
assert_eq!(cluster.replicators.len(), num_replicators);
}
}

View File

@ -3,7 +3,7 @@
use crate::banking_stage::BankingStage;
use crate::blocktree::Blocktree;
use crate::broadcast_stage::BroadcastStage;
use crate::broadcast_stage::{BroadcastStage, BroadcastStageType};
use crate::cluster_info::ClusterInfo;
use crate::cluster_info_vote_listener::ClusterInfoVoteListener;
use crate::fetch_stage::FetchStage;
@ -37,6 +37,7 @@ impl Tpu {
broadcast_socket: UdpSocket,
sigverify_disabled: bool,
blocktree: &Arc<Blocktree>,
broadcast_type: &BroadcastStageType,
exit: &Arc<AtomicBool>,
) -> Self {
cluster_info.write().unwrap().set_leader(id);
@ -70,7 +71,7 @@ impl Tpu {
verified_vote_receiver,
);
let broadcast_stage = BroadcastStage::new(
let broadcast_stage = broadcast_type.new_broadcast_stage(
broadcast_socket,
cluster_info.clone(),
entry_receiver,

View File

@ -3,6 +3,7 @@
use crate::bank_forks::BankForks;
use crate::blocktree::{Blocktree, CompletedSlotsReceiver};
use crate::blocktree_processor::{self, BankForksInfo};
use crate::broadcast_stage::BroadcastStageType;
use crate::cluster_info::{ClusterInfo, Node};
use crate::contact_info::ContactInfo;
use crate::gossip_service::{discover_cluster, GossipService};
@ -39,7 +40,9 @@ pub struct ValidatorConfig {
pub account_paths: Option<String>,
pub rpc_config: JsonRpcConfig,
pub snapshot_path: Option<String>,
pub broadcast_stage_type: BroadcastStageType,
}
impl Default for ValidatorConfig {
fn default() -> Self {
// TODO: remove this, temporary parameter to configure
@ -54,6 +57,7 @@ impl Default for ValidatorConfig {
account_paths: None,
rpc_config: JsonRpcConfig::default(),
snapshot_path: None,
broadcast_stage_type: BroadcastStageType::Standard,
}
}
}
@ -262,6 +266,7 @@ impl Validator {
node.sockets.broadcast,
config.sigverify_disabled,
&blocktree,
&config.broadcast_stage_type,
&exit,
);

View File

@ -2,6 +2,7 @@ extern crate solana;
use hashbrown::HashSet;
use log::*;
use solana::broadcast_stage::BroadcastStageType;
use solana::cluster::Cluster;
use solana::cluster_tests;
use solana::gossip_service::discover_cluster;
@ -23,6 +24,7 @@ fn test_spend_and_verify_all_nodes_1() {
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
HashSet::new(),
);
}
@ -35,6 +37,7 @@ fn test_spend_and_verify_all_nodes_2() {
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
HashSet::new(),
);
}
@ -47,6 +50,7 @@ fn test_spend_and_verify_all_nodes_3() {
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
HashSet::new(),
);
}
@ -63,6 +67,7 @@ fn test_spend_and_verify_all_nodes_env_num_nodes() {
&local.entry_point_info,
&local.funding_keypair,
num_nodes,
HashSet::new(),
);
}
@ -83,8 +88,8 @@ fn test_fullnode_exit_2() {
validator_config.rpc_config.enable_fullnode_exit = true;
let config = ClusterConfig {
cluster_lamports: 10_000,
node_stakes: vec![100; 2],
validator_config,
node_stakes: vec![100; num_nodes],
validator_configs: vec![validator_config.clone(); num_nodes],
..ClusterConfig::default()
};
let local = LocalCluster::new(&config);
@ -101,7 +106,7 @@ fn test_leader_failure_4() {
let config = ClusterConfig {
cluster_lamports: 10_000,
node_stakes: vec![100; 4],
validator_config: validator_config.clone(),
validator_configs: vec![validator_config.clone(); num_nodes],
..ClusterConfig::default()
};
let local = LocalCluster::new(&config);
@ -124,7 +129,7 @@ fn test_two_unbalanced_stakes() {
let mut cluster = LocalCluster::new(&ClusterConfig {
node_stakes: vec![999_990, 3],
cluster_lamports: 1_000_000,
validator_config: validator_config.clone(),
validator_configs: vec![validator_config.clone(); 2],
ticks_per_slot: num_ticks_per_slot,
slots_per_epoch: num_slots_per_epoch,
poh_config: PohConfig::new_sleep(Duration::from_millis(1000 / num_ticks_per_second)),
@ -139,7 +144,10 @@ fn test_two_unbalanced_stakes() {
);
cluster.close_preserve_ledgers();
let leader_pubkey = cluster.entry_point_info.id;
let leader_ledger = cluster.fullnode_infos[&leader_pubkey].ledger_path.clone();
let leader_ledger = cluster.fullnode_infos[&leader_pubkey]
.info
.ledger_path
.clone();
cluster_tests::verify_ledger_ticks(&leader_ledger, num_ticks_per_slot as usize);
}
@ -151,6 +159,7 @@ fn test_forwarding() {
let config = ClusterConfig {
node_stakes: vec![999_990, 3],
cluster_lamports: 2_000_000,
validator_configs: vec![ValidatorConfig::default(); 3],
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&config);
@ -171,13 +180,12 @@ fn test_forwarding() {
#[test]
fn test_restart_node() {
let validator_config = ValidatorConfig::default();
let slots_per_epoch = MINIMUM_SLOTS_PER_EPOCH as u64;
let ticks_per_slot = 16;
let mut cluster = LocalCluster::new(&ClusterConfig {
node_stakes: vec![3],
cluster_lamports: 100,
validator_config: validator_config.clone(),
validator_configs: vec![ValidatorConfig::default()],
ticks_per_slot,
slots_per_epoch,
..ClusterConfig::default()
@ -205,6 +213,7 @@ fn test_listener_startup() {
node_stakes: vec![100; 1],
cluster_lamports: 1_000,
num_listeners: 3,
validator_configs: vec![ValidatorConfig::default(); 1],
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&config);
@ -212,6 +221,60 @@ fn test_listener_startup() {
assert_eq!(cluster_nodes.len(), 4);
}
#[test]
#[ignore]
fn test_fail_entry_verification_leader() {
solana_logger::setup();
let num_nodes = 4;
let validator_config = ValidatorConfig::default();
let mut error_validator_config = ValidatorConfig::default();
error_validator_config.broadcast_stage_type = BroadcastStageType::FailEntryVerification;
let mut validator_configs = vec![validator_config; num_nodes - 1];
validator_configs.push(error_validator_config);
let cluster_config = ClusterConfig {
cluster_lamports: 10_000,
node_stakes: vec![100; 4],
validator_configs: validator_configs,
slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2 as u64,
stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2 as u64,
..ClusterConfig::default()
};
let cluster = LocalCluster::new(&cluster_config);
let epoch_schedule = EpochSchedule::new(
cluster_config.slots_per_epoch,
cluster_config.stakers_slot_offset,
true,
);
let num_warmup_epochs = epoch_schedule.get_stakers_epoch(0) + 1;
// Wait for the corrupted leader to be scheduled afer the warmup epochs expire
cluster_tests::sleep_n_epochs(
(num_warmup_epochs + 1) as f64,
&cluster.genesis_block.poh_config,
cluster_config.ticks_per_slot,
cluster_config.slots_per_epoch,
);
let corrupt_node = cluster
.fullnode_infos
.iter()
.find(|(_, v)| v.config.broadcast_stage_type == BroadcastStageType::FailEntryVerification)
.unwrap()
.0;
let mut ignore = HashSet::new();
ignore.insert(*corrupt_node);
// Verify that we can still spend and verify even in the presence of corrupt nodes
cluster_tests::spend_and_verify_all_nodes(
&cluster.entry_point_info,
&cluster.funding_keypair,
num_nodes,
ignore,
);
}
#[test]
fn test_repairman_catchup() {
run_repairman_catchup(3);
@ -223,7 +286,7 @@ fn run_repairman_catchup(num_repairmen: u64) {
let num_ticks_per_slot = 40;
let num_slots_per_epoch = MINIMUM_SLOTS_PER_EPOCH as u64;
let num_root_buffer_slots = 10;
// Calculate the leader schedule num_root_buffer slots ahead. Otherwise, if stakers_slot_offset ==
// Calculate the leader schedule num_root_buffer_slots ahead. Otherwise, if stakers_slot_offset ==
// num_slots_per_epoch, and num_slots_per_epoch == MINIMUM_SLOTS_PER_EPOCH, then repairmen
// will stop sending repairs after the last slot in epoch 1 (0-indexed), because the root
// is at most in the first epoch.
@ -256,7 +319,7 @@ fn run_repairman_catchup(num_repairmen: u64) {
let mut cluster = LocalCluster::new(&ClusterConfig {
node_stakes,
cluster_lamports,
validator_config: validator_config.clone(),
validator_configs: vec![validator_config.clone(); num_repairmen as usize],
ticks_per_slot: num_ticks_per_slot,
slots_per_epoch: num_slots_per_epoch,
stakers_slot_offset,

View File

@ -27,7 +27,7 @@ fn run_replicator_startup_basic(num_nodes: usize, num_replicators: usize) {
let mut validator_config = ValidatorConfig::default();
validator_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
let config = ClusterConfig {
validator_config,
validator_configs: vec![ValidatorConfig::default(); num_nodes],
num_replicators,
node_stakes: vec![100; num_nodes],
cluster_lamports: 10_000,
@ -149,7 +149,7 @@ fn test_account_setup() {
let mut validator_config = ValidatorConfig::default();
validator_config.storage_rotate_count = STORAGE_ROTATE_TEST_COUNT;
let config = ClusterConfig {
validator_config,
validator_configs: vec![ValidatorConfig::default(); num_nodes],
num_replicators,
node_stakes: vec![100; num_nodes],
cluster_lamports: 10_000,