Add option to wait for a specific epoch length to bench-tps (#10083)

This commit is contained in:
sakridge 2020-05-20 16:42:46 -07:00 committed by GitHub
parent 417f0e41fa
commit ce17de7d25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 256 additions and 128 deletions

View File

@ -28,7 +28,7 @@ use std::{
atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
},
thread::{sleep, Builder},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
};
@ -64,105 +64,63 @@ fn get_recent_blockhash<T: Client>(client: &T) -> (Hash, FeeCalculator) {
}
}
pub fn do_bench_tps<T>(
client: Arc<T>,
config: Config,
gen_keypairs: Vec<Keypair>,
libra_args: Option<LibraKeys>,
) -> u64
fn wait_for_target_slots_per_epoch<T>(target_slots_per_epoch: u64, client: &Arc<T>)
where
T: 'static + Client + Send + Sync,
{
let Config {
id,
threads,
thread_batch_sleep_ms,
duration,
tx_count,
sustained,
..
} = config;
let mut source_keypair_chunks: Vec<Vec<&Keypair>> = Vec::new();
let mut dest_keypair_chunks: Vec<VecDeque<&Keypair>> = Vec::new();
assert!(gen_keypairs.len() >= 2 * tx_count);
for chunk in gen_keypairs.chunks_exact(2 * tx_count) {
source_keypair_chunks.push(chunk[..tx_count].iter().collect());
dest_keypair_chunks.push(chunk[tx_count..].iter().collect());
}
let first_tx_count = loop {
match client.get_transaction_count() {
Ok(count) => break count,
Err(err) => {
info!("Couldn't get transaction count: {:?}", err);
sleep(Duration::from_secs(1));
if target_slots_per_epoch != 0 {
info!(
"Waiting until epochs are {} slots long..",
target_slots_per_epoch
);
loop {
if let Ok(epoch_info) = client.get_epoch_info() {
if epoch_info.slots_in_epoch >= target_slots_per_epoch {
info!("Done epoch_info: {:?}", epoch_info);
break;
}
info!(
"Waiting for epoch: {} now: {}",
target_slots_per_epoch, epoch_info.slots_in_epoch
);
}
sleep(Duration::from_secs(3));
}
};
info!("Initial transaction count {}", first_tx_count);
}
}
let exit_signal = Arc::new(AtomicBool::new(false));
// Setup a thread per validator to sample every period
// collect the max transaction rate and total tx count seen
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
fn create_sampler_thread<T>(
client: &Arc<T>,
exit_signal: &Arc<AtomicBool>,
sample_period: u64,
maxes: &Arc<RwLock<Vec<(String, SampleStats)>>>,
) -> JoinHandle<()>
where
T: 'static + Client + Send + Sync,
{
info!("Sampling TPS every {} second...", sample_period);
let sample_thread = {
let exit_signal = exit_signal.clone();
let maxes = maxes.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_txs(&exit_signal, &maxes, sample_period, &client);
})
.unwrap()
};
let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new()));
let recent_blockhash = Arc::new(RwLock::new(get_recent_blockhash(client.as_ref()).0));
let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0));
let total_tx_sent_count = Arc::new(AtomicUsize::new(0));
let blockhash_thread = {
let exit_signal = exit_signal.clone();
let recent_blockhash = recent_blockhash.clone();
let client = client.clone();
let id = id.pubkey();
Builder::new()
.name("solana-blockhash-poller".to_string())
.spawn(move || {
poll_blockhash(&exit_signal, &recent_blockhash, &client, &id);
})
.unwrap()
};
let s_threads: Vec<_> = (0..threads)
.map(|_| {
let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
do_tx_transfers(
&exit_signal,
&shared_txs,
&shared_tx_active_thread_count,
&total_tx_sent_count,
thread_batch_sleep_ms,
&client,
);
})
.unwrap()
let exit_signal = exit_signal.clone();
let maxes = maxes.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sample".to_string())
.spawn(move || {
sample_txs(&exit_signal, &maxes, sample_period, &client);
})
.collect();
.unwrap()
}
fn generate_chunked_transfers(
recent_blockhash: Arc<RwLock<Hash>>,
shared_txs: &SharedTransactions,
shared_tx_active_thread_count: Arc<AtomicIsize>,
source_keypair_chunks: Vec<Vec<&Keypair>>,
dest_keypair_chunks: &mut Vec<VecDeque<&Keypair>>,
threads: usize,
duration: Duration,
sustained: bool,
libra_args: Option<LibraKeys>,
) {
// generate and send transactions for the specified duration
let start = Instant::now();
let keypair_chunks = source_keypair_chunks.len();
@ -170,7 +128,7 @@ where
let mut chunk_index = 0;
while start.elapsed() < duration {
generate_txs(
&shared_txs,
shared_txs,
&recent_blockhash,
&source_keypair_chunks[chunk_index],
&dest_keypair_chunks[chunk_index],
@ -206,6 +164,135 @@ where
reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account;
}
}
}
fn create_sender_threads<T>(
client: &Arc<T>,
shared_txs: &SharedTransactions,
thread_batch_sleep_ms: usize,
total_tx_sent_count: &Arc<AtomicUsize>,
threads: usize,
exit_signal: &Arc<AtomicBool>,
shared_tx_active_thread_count: &Arc<AtomicIsize>,
) -> Vec<JoinHandle<()>>
where
T: 'static + Client + Send + Sync,
{
(0..threads)
.map(|_| {
let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
let client = client.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
do_tx_transfers(
&exit_signal,
&shared_txs,
&shared_tx_active_thread_count,
&total_tx_sent_count,
thread_batch_sleep_ms,
&client,
);
})
.unwrap()
})
.collect()
}
pub fn do_bench_tps<T>(
client: Arc<T>,
config: Config,
gen_keypairs: Vec<Keypair>,
libra_args: Option<LibraKeys>,
) -> u64
where
T: 'static + Client + Send + Sync,
{
let Config {
id,
threads,
thread_batch_sleep_ms,
duration,
tx_count,
sustained,
target_slots_per_epoch,
..
} = config;
let mut source_keypair_chunks: Vec<Vec<&Keypair>> = Vec::new();
let mut dest_keypair_chunks: Vec<VecDeque<&Keypair>> = Vec::new();
assert!(gen_keypairs.len() >= 2 * tx_count);
for chunk in gen_keypairs.chunks_exact(2 * tx_count) {
source_keypair_chunks.push(chunk[..tx_count].iter().collect());
dest_keypair_chunks.push(chunk[tx_count..].iter().collect());
}
let first_tx_count = loop {
match client.get_transaction_count() {
Ok(count) => break count,
Err(err) => {
info!("Couldn't get transaction count: {:?}", err);
sleep(Duration::from_secs(1));
}
}
};
info!("Initial transaction count {}", first_tx_count);
let exit_signal = Arc::new(AtomicBool::new(false));
// Setup a thread per validator to sample every period
// collect the max transaction rate and total tx count seen
let maxes = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds
let sample_thread = create_sampler_thread(&client, &exit_signal, sample_period, &maxes);
let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new()));
let recent_blockhash = Arc::new(RwLock::new(get_recent_blockhash(client.as_ref()).0));
let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0));
let total_tx_sent_count = Arc::new(AtomicUsize::new(0));
let blockhash_thread = {
let exit_signal = exit_signal.clone();
let recent_blockhash = recent_blockhash.clone();
let client = client.clone();
let id = id.pubkey();
Builder::new()
.name("solana-blockhash-poller".to_string())
.spawn(move || {
poll_blockhash(&exit_signal, &recent_blockhash, &client, &id);
})
.unwrap()
};
let s_threads = create_sender_threads(
&client,
&shared_txs,
thread_batch_sleep_ms,
&total_tx_sent_count,
threads,
&exit_signal,
&shared_tx_active_thread_count,
);
wait_for_target_slots_per_epoch(target_slots_per_epoch, &client);
let start = Instant::now();
generate_chunked_transfers(
recent_blockhash,
&shared_txs,
shared_tx_active_thread_count,
source_keypair_chunks,
&mut dest_keypair_chunks,
threads,
duration,
sustained,
libra_args,
);
// Stop the sampling threads so it will collect the stats
exit_signal.store(true, Ordering::Relaxed);

View File

@ -25,6 +25,7 @@ pub struct Config {
pub multi_client: bool,
pub use_move: bool,
pub num_lamports_per_account: u64,
pub target_slots_per_epoch: u64,
}
impl Default for Config {
@ -47,6 +48,7 @@ impl Default for Config {
multi_client: true,
use_move: false,
num_lamports_per_account: NUM_LAMPORTS_PER_ACCOUNT_DEFAULT,
target_slots_per_epoch: 0,
}
}
}
@ -172,6 +174,15 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> {
"Number of lamports per account.",
),
)
.arg(
Arg::with_name("target_slots_per_epoch")
.long("target-slots-per-epoch")
.value_name("SLOTS")
.takes_value(true)
.help(
"Wait until epochs are this many slots long.",
),
)
}
/// Parses a clap `ArgMatches` structure into a `Config`
@ -259,5 +270,12 @@ pub fn extract_args<'a>(matches: &ArgMatches<'a>) -> Config {
args.num_lamports_per_account = v.to_string().parse().expect("can't parse lamports");
}
if let Some(t) = matches.value_of("target_slots_per_epoch") {
args.target_slots_per_epoch = t
.to_string()
.parse()
.expect("can't parse target slots per epoch");
}
args
}

View File

@ -5,10 +5,11 @@ use inflector::cases::titlecase::to_title_case;
use serde::Serialize;
use serde_json::{Map, Value};
use solana_client::rpc_response::{
RpcAccountBalance, RpcEpochInfo, RpcKeyedAccount, RpcSupply, RpcVoteAccountInfo,
RpcAccountBalance, RpcKeyedAccount, RpcSupply, RpcVoteAccountInfo,
};
use solana_sdk::{
clock::{self, Epoch, Slot, UnixTimestamp},
epoch_info::EpochInfo,
native_token::lamports_to_sol,
stake_history::StakeHistoryEntry,
};
@ -186,11 +187,11 @@ pub struct CliSlotStatus {
#[serde(rename_all = "camelCase")]
pub struct CliEpochInfo {
#[serde(flatten)]
pub epoch_info: RpcEpochInfo,
pub epoch_info: EpochInfo,
}
impl From<RpcEpochInfo> for CliEpochInfo {
fn from(epoch_info: RpcEpochInfo) -> Self {
impl From<EpochInfo> for CliEpochInfo {
fn from(epoch_info: EpochInfo) -> Self {
Self { epoch_info }
}
}

View File

@ -18,6 +18,7 @@ use solana_sdk::{
MAX_HASH_AGE_IN_SECONDS,
},
commitment_config::CommitmentConfig,
epoch_info::EpochInfo,
epoch_schedule::EpochSchedule,
fee_calculator::{FeeCalculator, FeeRateGovernor},
hash::Hash,
@ -309,14 +310,14 @@ impl RpcClient {
.map_err(|err| err.into_with_request(request))?
}
pub fn get_epoch_info(&self) -> ClientResult<RpcEpochInfo> {
pub fn get_epoch_info(&self) -> ClientResult<EpochInfo> {
self.get_epoch_info_with_commitment(CommitmentConfig::default())
}
pub fn get_epoch_info_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> ClientResult<RpcEpochInfo> {
) -> ClientResult<EpochInfo> {
self.send(RpcRequest::GetEpochInfo, json!([commitment_config]), 0)
}

View File

@ -114,22 +114,6 @@ pub struct RpcContactInfo {
/// Map of leader base58 identity pubkeys to the slot indices relative to the first epoch slot
pub type RpcLeaderSchedule = HashMap<String, Vec<usize>>;
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RpcEpochInfo {
/// The current epoch
pub epoch: Epoch,
/// The current slot, relative to the start of the current epoch
pub slot_index: u64,
/// The number of slots in this epoch
pub slots_in_epoch: u64,
/// The absolute current slot
pub absolute_slot: Slot,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "kebab-case")]
pub struct RpcVersionInfo {

View File

@ -11,6 +11,7 @@ use solana_sdk::{
client::{AsyncClient, Client, SyncClient},
clock::MAX_PROCESSING_AGE,
commitment_config::CommitmentConfig,
epoch_info::EpochInfo,
fee_calculator::{FeeCalculator, FeeRateGovernor},
hash::Hash,
instruction::Instruction,
@ -518,6 +519,10 @@ impl SyncClient for ThinClient {
Ok(slot)
}
fn get_epoch_info(&self) -> TransportResult<EpochInfo> {
self.rpc_client().get_epoch_info().map_err(|e| e.into())
}
fn get_transaction_count(&self) -> TransportResult<u64> {
let index = self.optimizer.experiment();
let now = Instant::now();

View File

@ -28,6 +28,7 @@ use solana_runtime::{accounts::AccountAddressFilter, bank::Bank};
use solana_sdk::{
clock::{Slot, UnixTimestamp},
commitment_config::{CommitmentConfig, CommitmentLevel},
epoch_info::EpochInfo,
epoch_schedule::EpochSchedule,
hash::Hash,
inflation::Inflation,
@ -758,7 +759,7 @@ pub trait RpcSol {
&self,
meta: Self::Metadata,
commitment: Option<CommitmentConfig>,
) -> Result<RpcEpochInfo>;
) -> Result<EpochInfo>;
#[rpc(meta, name = "getBlockCommitment")]
fn get_block_commitment(
@ -1057,18 +1058,9 @@ impl RpcSol for RpcSolImpl {
&self,
meta: Self::Metadata,
commitment: Option<CommitmentConfig>,
) -> Result<RpcEpochInfo> {
) -> Result<EpochInfo> {
let bank = meta.request_processor.read().unwrap().bank(commitment)?;
let epoch_schedule = bank.epoch_schedule();
let slot = bank.slot();
let (epoch, slot_index) = epoch_schedule.get_epoch_and_slot_index(slot);
Ok(RpcEpochInfo {
epoch,
slot_index,
slots_in_epoch: epoch_schedule.get_slots_in_epoch(epoch),
absolute_slot: slot,
})
Ok(bank.get_epoch_info())
}
fn get_block_commitment(

View File

@ -1,8 +1,9 @@
use crate::utils;
use log::*;
use solana_client::{rpc_client::RpcClient, rpc_response::RpcEpochInfo};
use solana_client::rpc_client::RpcClient;
use solana_sdk::{
clock::Epoch,
epoch_info::EpochInfo,
genesis_config::GenesisConfig,
stake_history::StakeHistoryEntry,
sysvar::{
@ -60,7 +61,7 @@ fn stake_history_entry(epoch: Epoch, rpc_client: &RpcClient) -> Option<StakeHist
/// Wait until stake warms up and return the current epoch
pub fn wait_for_warm_up(
activation_epoch: Epoch,
mut epoch_info: RpcEpochInfo,
mut epoch_info: EpochInfo,
rpc_client: &RpcClient,
stake_config: &StakeConfig,
genesis_config: &GenesisConfig,

View File

@ -36,6 +36,7 @@ use solana_sdk::{
Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_TICKS_PER_SECOND,
MAX_PROCESSING_AGE, MAX_RECENT_BLOCKHASHES, SECONDS_PER_DAY,
},
epoch_info::EpochInfo,
epoch_schedule::EpochSchedule,
fee_calculator::{FeeCalculator, FeeRateGovernor},
genesis_config::{GenesisConfig, OperatingMode},
@ -2468,6 +2469,18 @@ impl Bank {
self.epoch_schedule.get_epoch_and_slot_index(slot)
}
pub fn get_epoch_info(&self) -> EpochInfo {
let absolute_slot = self.slot();
let (epoch, slot_index) = self.get_epoch_and_slot_index(absolute_slot);
let slots_in_epoch = self.get_slots_in_epoch(epoch);
EpochInfo {
epoch,
slot_index,
slots_in_epoch,
absolute_slot,
}
}
pub fn is_empty(&self) -> bool {
!self.is_delta.load(Ordering::Relaxed)
}

View File

@ -3,6 +3,7 @@ use solana_sdk::{
account::Account,
client::{AsyncClient, Client, SyncClient},
commitment_config::CommitmentConfig,
epoch_info::EpochInfo,
fee_calculator::{FeeCalculator, FeeRateGovernor},
hash::Hash,
instruction::Instruction,
@ -241,6 +242,10 @@ impl SyncClient for BankClient {
)))
}
}
fn get_epoch_info(&self) -> Result<EpochInfo> {
Ok(self.bank.get_epoch_info())
}
}
impl BankClient {

View File

@ -11,6 +11,7 @@ use crate::{
account::Account,
clock::Slot,
commitment_config::CommitmentConfig,
epoch_info::EpochInfo,
fee_calculator::{FeeCalculator, FeeRateGovernor},
hash::Hash,
instruction::Instruction,
@ -106,6 +107,8 @@ pub trait SyncClient {
commitment_config: CommitmentConfig,
) -> Result<u64>;
fn get_epoch_info(&self) -> Result<EpochInfo>;
/// Poll until the signature has been confirmed by at least `min_confirmed_blocks`
fn poll_for_signature_confirmation(
&self,

17
sdk/src/epoch_info.rs Normal file
View File

@ -0,0 +1,17 @@
use crate::clock::{Epoch, Slot};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct EpochInfo {
/// The current epoch
pub epoch: Epoch,
/// The current slot, relative to the start of the current epoch
pub slot_index: u64,
/// The number of slots in this epoch
pub slots_in_epoch: u64,
/// The absolute current slot
pub absolute_slot: Slot,
}

View File

@ -7,6 +7,7 @@ pub mod bpf_loader;
pub mod clock;
pub mod commitment_config;
pub mod entrypoint_native;
pub mod epoch_info;
pub mod epoch_schedule;
pub mod fee_calculator;
pub mod hash;