diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index bca151280..9d033059a 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -28,7 +28,7 @@ use std::{ atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}, Arc, Mutex, RwLock, }, - thread::{sleep, Builder}, + thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, }; @@ -64,105 +64,63 @@ fn get_recent_blockhash(client: &T) -> (Hash, FeeCalculator) { } } -pub fn do_bench_tps( - client: Arc, - config: Config, - gen_keypairs: Vec, - libra_args: Option, -) -> u64 +fn wait_for_target_slots_per_epoch(target_slots_per_epoch: u64, client: &Arc) where T: 'static + Client + Send + Sync, { - let Config { - id, - threads, - thread_batch_sleep_ms, - duration, - tx_count, - sustained, - .. - } = config; - - let mut source_keypair_chunks: Vec> = Vec::new(); - let mut dest_keypair_chunks: Vec> = Vec::new(); - assert!(gen_keypairs.len() >= 2 * tx_count); - for chunk in gen_keypairs.chunks_exact(2 * tx_count) { - source_keypair_chunks.push(chunk[..tx_count].iter().collect()); - dest_keypair_chunks.push(chunk[tx_count..].iter().collect()); - } - - let first_tx_count = loop { - match client.get_transaction_count() { - Ok(count) => break count, - Err(err) => { - info!("Couldn't get transaction count: {:?}", err); - sleep(Duration::from_secs(1)); + if target_slots_per_epoch != 0 { + info!( + "Waiting until epochs are {} slots long..", + target_slots_per_epoch + ); + loop { + if let Ok(epoch_info) = client.get_epoch_info() { + if epoch_info.slots_in_epoch >= target_slots_per_epoch { + info!("Done epoch_info: {:?}", epoch_info); + break; + } + info!( + "Waiting for epoch: {} now: {}", + target_slots_per_epoch, epoch_info.slots_in_epoch + ); } + sleep(Duration::from_secs(3)); } - }; - info!("Initial transaction count {}", first_tx_count); + } +} - let exit_signal = Arc::new(AtomicBool::new(false)); - - // Setup a thread per validator to sample every period - // collect the max transaction rate and total tx count seen - let maxes = Arc::new(RwLock::new(Vec::new())); - let sample_period = 1; // in seconds +fn create_sampler_thread( + client: &Arc, + exit_signal: &Arc, + sample_period: u64, + maxes: &Arc>>, +) -> JoinHandle<()> +where + T: 'static + Client + Send + Sync, +{ info!("Sampling TPS every {} second...", sample_period); - let sample_thread = { - let exit_signal = exit_signal.clone(); - let maxes = maxes.clone(); - let client = client.clone(); - Builder::new() - .name("solana-client-sample".to_string()) - .spawn(move || { - sample_txs(&exit_signal, &maxes, sample_period, &client); - }) - .unwrap() - }; - - let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); - - let recent_blockhash = Arc::new(RwLock::new(get_recent_blockhash(client.as_ref()).0)); - let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); - let total_tx_sent_count = Arc::new(AtomicUsize::new(0)); - - let blockhash_thread = { - let exit_signal = exit_signal.clone(); - let recent_blockhash = recent_blockhash.clone(); - let client = client.clone(); - let id = id.pubkey(); - Builder::new() - .name("solana-blockhash-poller".to_string()) - .spawn(move || { - poll_blockhash(&exit_signal, &recent_blockhash, &client, &id); - }) - .unwrap() - }; - - let s_threads: Vec<_> = (0..threads) - .map(|_| { - let exit_signal = exit_signal.clone(); - let shared_txs = shared_txs.clone(); - let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); - let total_tx_sent_count = total_tx_sent_count.clone(); - let client = client.clone(); - Builder::new() - .name("solana-client-sender".to_string()) - .spawn(move || { - do_tx_transfers( - &exit_signal, - &shared_txs, - &shared_tx_active_thread_count, - &total_tx_sent_count, - thread_batch_sleep_ms, - &client, - ); - }) - .unwrap() + let exit_signal = exit_signal.clone(); + let maxes = maxes.clone(); + let client = client.clone(); + Builder::new() + .name("solana-client-sample".to_string()) + .spawn(move || { + sample_txs(&exit_signal, &maxes, sample_period, &client); }) - .collect(); + .unwrap() +} +fn generate_chunked_transfers( + recent_blockhash: Arc>, + shared_txs: &SharedTransactions, + shared_tx_active_thread_count: Arc, + source_keypair_chunks: Vec>, + dest_keypair_chunks: &mut Vec>, + threads: usize, + duration: Duration, + sustained: bool, + libra_args: Option, +) { // generate and send transactions for the specified duration let start = Instant::now(); let keypair_chunks = source_keypair_chunks.len(); @@ -170,7 +128,7 @@ where let mut chunk_index = 0; while start.elapsed() < duration { generate_txs( - &shared_txs, + shared_txs, &recent_blockhash, &source_keypair_chunks[chunk_index], &dest_keypair_chunks[chunk_index], @@ -206,6 +164,135 @@ where reclaim_lamports_back_to_source_account = !reclaim_lamports_back_to_source_account; } } +} + +fn create_sender_threads( + client: &Arc, + shared_txs: &SharedTransactions, + thread_batch_sleep_ms: usize, + total_tx_sent_count: &Arc, + threads: usize, + exit_signal: &Arc, + shared_tx_active_thread_count: &Arc, +) -> Vec> +where + T: 'static + Client + Send + Sync, +{ + (0..threads) + .map(|_| { + let exit_signal = exit_signal.clone(); + let shared_txs = shared_txs.clone(); + let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); + let total_tx_sent_count = total_tx_sent_count.clone(); + let client = client.clone(); + Builder::new() + .name("solana-client-sender".to_string()) + .spawn(move || { + do_tx_transfers( + &exit_signal, + &shared_txs, + &shared_tx_active_thread_count, + &total_tx_sent_count, + thread_batch_sleep_ms, + &client, + ); + }) + .unwrap() + }) + .collect() +} + +pub fn do_bench_tps( + client: Arc, + config: Config, + gen_keypairs: Vec, + libra_args: Option, +) -> u64 +where + T: 'static + Client + Send + Sync, +{ + let Config { + id, + threads, + thread_batch_sleep_ms, + duration, + tx_count, + sustained, + target_slots_per_epoch, + .. + } = config; + + let mut source_keypair_chunks: Vec> = Vec::new(); + let mut dest_keypair_chunks: Vec> = Vec::new(); + assert!(gen_keypairs.len() >= 2 * tx_count); + for chunk in gen_keypairs.chunks_exact(2 * tx_count) { + source_keypair_chunks.push(chunk[..tx_count].iter().collect()); + dest_keypair_chunks.push(chunk[tx_count..].iter().collect()); + } + + let first_tx_count = loop { + match client.get_transaction_count() { + Ok(count) => break count, + Err(err) => { + info!("Couldn't get transaction count: {:?}", err); + sleep(Duration::from_secs(1)); + } + } + }; + info!("Initial transaction count {}", first_tx_count); + + let exit_signal = Arc::new(AtomicBool::new(false)); + + // Setup a thread per validator to sample every period + // collect the max transaction rate and total tx count seen + let maxes = Arc::new(RwLock::new(Vec::new())); + let sample_period = 1; // in seconds + let sample_thread = create_sampler_thread(&client, &exit_signal, sample_period, &maxes); + + let shared_txs: SharedTransactions = Arc::new(RwLock::new(VecDeque::new())); + + let recent_blockhash = Arc::new(RwLock::new(get_recent_blockhash(client.as_ref()).0)); + let shared_tx_active_thread_count = Arc::new(AtomicIsize::new(0)); + let total_tx_sent_count = Arc::new(AtomicUsize::new(0)); + + let blockhash_thread = { + let exit_signal = exit_signal.clone(); + let recent_blockhash = recent_blockhash.clone(); + let client = client.clone(); + let id = id.pubkey(); + Builder::new() + .name("solana-blockhash-poller".to_string()) + .spawn(move || { + poll_blockhash(&exit_signal, &recent_blockhash, &client, &id); + }) + .unwrap() + }; + + let s_threads = create_sender_threads( + &client, + &shared_txs, + thread_batch_sleep_ms, + &total_tx_sent_count, + threads, + &exit_signal, + &shared_tx_active_thread_count, + ); + + wait_for_target_slots_per_epoch(target_slots_per_epoch, &client); + + let start = Instant::now(); + + generate_chunked_transfers( + recent_blockhash, + &shared_txs, + shared_tx_active_thread_count, + source_keypair_chunks, + &mut dest_keypair_chunks, + threads, + duration, + sustained, + libra_args, + ); // Stop the sampling threads so it will collect the stats exit_signal.store(true, Ordering::Relaxed); diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 3efa6eb76..719d260cb 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -25,6 +25,7 @@ pub struct Config { pub multi_client: bool, pub use_move: bool, pub num_lamports_per_account: u64, + pub target_slots_per_epoch: u64, } impl Default for Config { @@ -47,6 +48,7 @@ impl Default for Config { multi_client: true, use_move: false, num_lamports_per_account: NUM_LAMPORTS_PER_ACCOUNT_DEFAULT, + target_slots_per_epoch: 0, } } } @@ -172,6 +174,15 @@ pub fn build_args<'a, 'b>(version: &'b str) -> App<'a, 'b> { "Number of lamports per account.", ), ) + .arg( + Arg::with_name("target_slots_per_epoch") + .long("target-slots-per-epoch") + .value_name("SLOTS") + .takes_value(true) + .help( + "Wait until epochs are this many slots long.", + ), + ) } /// Parses a clap `ArgMatches` structure into a `Config` @@ -259,5 +270,12 @@ pub fn extract_args<'a>(matches: &ArgMatches<'a>) -> Config { args.num_lamports_per_account = v.to_string().parse().expect("can't parse lamports"); } + if let Some(t) = matches.value_of("target_slots_per_epoch") { + args.target_slots_per_epoch = t + .to_string() + .parse() + .expect("can't parse target slots per epoch"); + } + args } diff --git a/cli/src/cli_output.rs b/cli/src/cli_output.rs index 685b278c0..10653372b 100644 --- a/cli/src/cli_output.rs +++ b/cli/src/cli_output.rs @@ -5,10 +5,11 @@ use inflector::cases::titlecase::to_title_case; use serde::Serialize; use serde_json::{Map, Value}; use solana_client::rpc_response::{ - RpcAccountBalance, RpcEpochInfo, RpcKeyedAccount, RpcSupply, RpcVoteAccountInfo, + RpcAccountBalance, RpcKeyedAccount, RpcSupply, RpcVoteAccountInfo, }; use solana_sdk::{ clock::{self, Epoch, Slot, UnixTimestamp}, + epoch_info::EpochInfo, native_token::lamports_to_sol, stake_history::StakeHistoryEntry, }; @@ -186,11 +187,11 @@ pub struct CliSlotStatus { #[serde(rename_all = "camelCase")] pub struct CliEpochInfo { #[serde(flatten)] - pub epoch_info: RpcEpochInfo, + pub epoch_info: EpochInfo, } -impl From for CliEpochInfo { - fn from(epoch_info: RpcEpochInfo) -> Self { +impl From for CliEpochInfo { + fn from(epoch_info: EpochInfo) -> Self { Self { epoch_info } } } diff --git a/client/src/rpc_client.rs b/client/src/rpc_client.rs index 1aa2b97ea..8ecf4fb45 100644 --- a/client/src/rpc_client.rs +++ b/client/src/rpc_client.rs @@ -18,6 +18,7 @@ use solana_sdk::{ MAX_HASH_AGE_IN_SECONDS, }, commitment_config::CommitmentConfig, + epoch_info::EpochInfo, epoch_schedule::EpochSchedule, fee_calculator::{FeeCalculator, FeeRateGovernor}, hash::Hash, @@ -309,14 +310,14 @@ impl RpcClient { .map_err(|err| err.into_with_request(request))? } - pub fn get_epoch_info(&self) -> ClientResult { + pub fn get_epoch_info(&self) -> ClientResult { self.get_epoch_info_with_commitment(CommitmentConfig::default()) } pub fn get_epoch_info_with_commitment( &self, commitment_config: CommitmentConfig, - ) -> ClientResult { + ) -> ClientResult { self.send(RpcRequest::GetEpochInfo, json!([commitment_config]), 0) } diff --git a/client/src/rpc_response.rs b/client/src/rpc_response.rs index e15a74943..1fbed4e7a 100644 --- a/client/src/rpc_response.rs +++ b/client/src/rpc_response.rs @@ -114,22 +114,6 @@ pub struct RpcContactInfo { /// Map of leader base58 identity pubkeys to the slot indices relative to the first epoch slot pub type RpcLeaderSchedule = HashMap>; -#[derive(Serialize, Deserialize, Clone, Debug)] -#[serde(rename_all = "camelCase")] -pub struct RpcEpochInfo { - /// The current epoch - pub epoch: Epoch, - - /// The current slot, relative to the start of the current epoch - pub slot_index: u64, - - /// The number of slots in this epoch - pub slots_in_epoch: u64, - - /// The absolute current slot - pub absolute_slot: Slot, -} - #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "kebab-case")] pub struct RpcVersionInfo { diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 2384a66a3..c5d35f193 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -11,6 +11,7 @@ use solana_sdk::{ client::{AsyncClient, Client, SyncClient}, clock::MAX_PROCESSING_AGE, commitment_config::CommitmentConfig, + epoch_info::EpochInfo, fee_calculator::{FeeCalculator, FeeRateGovernor}, hash::Hash, instruction::Instruction, @@ -518,6 +519,10 @@ impl SyncClient for ThinClient { Ok(slot) } + fn get_epoch_info(&self) -> TransportResult { + self.rpc_client().get_epoch_info().map_err(|e| e.into()) + } + fn get_transaction_count(&self) -> TransportResult { let index = self.optimizer.experiment(); let now = Instant::now(); diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 784da5ce6..96fefb865 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -28,6 +28,7 @@ use solana_runtime::{accounts::AccountAddressFilter, bank::Bank}; use solana_sdk::{ clock::{Slot, UnixTimestamp}, commitment_config::{CommitmentConfig, CommitmentLevel}, + epoch_info::EpochInfo, epoch_schedule::EpochSchedule, hash::Hash, inflation::Inflation, @@ -758,7 +759,7 @@ pub trait RpcSol { &self, meta: Self::Metadata, commitment: Option, - ) -> Result; + ) -> Result; #[rpc(meta, name = "getBlockCommitment")] fn get_block_commitment( @@ -1057,18 +1058,9 @@ impl RpcSol for RpcSolImpl { &self, meta: Self::Metadata, commitment: Option, - ) -> Result { + ) -> Result { let bank = meta.request_processor.read().unwrap().bank(commitment)?; - let epoch_schedule = bank.epoch_schedule(); - - let slot = bank.slot(); - let (epoch, slot_index) = epoch_schedule.get_epoch_and_slot_index(slot); - Ok(RpcEpochInfo { - epoch, - slot_index, - slots_in_epoch: epoch_schedule.get_slots_in_epoch(epoch), - absolute_slot: slot, - }) + Ok(bank.get_epoch_info()) } fn get_block_commitment( diff --git a/ramp-tps/src/stake.rs b/ramp-tps/src/stake.rs index 9b3ce1bc4..8218524ed 100644 --- a/ramp-tps/src/stake.rs +++ b/ramp-tps/src/stake.rs @@ -1,8 +1,9 @@ use crate::utils; use log::*; -use solana_client::{rpc_client::RpcClient, rpc_response::RpcEpochInfo}; +use solana_client::rpc_client::RpcClient; use solana_sdk::{ clock::Epoch, + epoch_info::EpochInfo, genesis_config::GenesisConfig, stake_history::StakeHistoryEntry, sysvar::{ @@ -60,7 +61,7 @@ fn stake_history_entry(epoch: Epoch, rpc_client: &RpcClient) -> Option EpochInfo { + let absolute_slot = self.slot(); + let (epoch, slot_index) = self.get_epoch_and_slot_index(absolute_slot); + let slots_in_epoch = self.get_slots_in_epoch(epoch); + EpochInfo { + epoch, + slot_index, + slots_in_epoch, + absolute_slot, + } + } + pub fn is_empty(&self) -> bool { !self.is_delta.load(Ordering::Relaxed) } diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index fa9731c2e..3b2eb6b7f 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -3,6 +3,7 @@ use solana_sdk::{ account::Account, client::{AsyncClient, Client, SyncClient}, commitment_config::CommitmentConfig, + epoch_info::EpochInfo, fee_calculator::{FeeCalculator, FeeRateGovernor}, hash::Hash, instruction::Instruction, @@ -241,6 +242,10 @@ impl SyncClient for BankClient { ))) } } + + fn get_epoch_info(&self) -> Result { + Ok(self.bank.get_epoch_info()) + } } impl BankClient { diff --git a/sdk/src/client.rs b/sdk/src/client.rs index e80d39c84..42675e7f5 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -11,6 +11,7 @@ use crate::{ account::Account, clock::Slot, commitment_config::CommitmentConfig, + epoch_info::EpochInfo, fee_calculator::{FeeCalculator, FeeRateGovernor}, hash::Hash, instruction::Instruction, @@ -106,6 +107,8 @@ pub trait SyncClient { commitment_config: CommitmentConfig, ) -> Result; + fn get_epoch_info(&self) -> Result; + /// Poll until the signature has been confirmed by at least `min_confirmed_blocks` fn poll_for_signature_confirmation( &self, diff --git a/sdk/src/epoch_info.rs b/sdk/src/epoch_info.rs new file mode 100644 index 000000000..65de26578 --- /dev/null +++ b/sdk/src/epoch_info.rs @@ -0,0 +1,17 @@ +use crate::clock::{Epoch, Slot}; + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct EpochInfo { + /// The current epoch + pub epoch: Epoch, + + /// The current slot, relative to the start of the current epoch + pub slot_index: u64, + + /// The number of slots in this epoch + pub slots_in_epoch: u64, + + /// The absolute current slot + pub absolute_slot: Slot, +} diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index dedc4ca9d..7a8d4e46b 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -7,6 +7,7 @@ pub mod bpf_loader; pub mod clock; pub mod commitment_config; pub mod entrypoint_native; +pub mod epoch_info; pub mod epoch_schedule; pub mod fee_calculator; pub mod hash;