Add analysis for bench-tps transactions (#92)

* save progress

* rename threads handler

* added writer for txs

* after extracting structure to handle tx confirmations

* extract LogWriter

* Replace pair TimestampedTransaction with struct

* add compute_unit_price to TimestampedTransaction

* add cu_price to LogWriter

* add block time to the logs

* Fix warnings

* add comments and restructure code

* some small improvements

* Renamed conformation_processing.rs to log_transaction_service.rs

* address numerous PR comments

* split LogWriter into two structs

* simplify code of LogWriters

* extract process_blocks

* specify commitment in LogTransactionService

* break thread loop if receiver happens to be dropped

* update start_slot when processing blocks

* address pr comments

* fix clippy error

* minor changes

* fix ms problem

* fix bug with time in clear transaction map
This commit is contained in:
kirill lykov 2024-03-26 03:47:24 -07:00 committed by GitHub
parent 30eecd62b1
commit 1261f1f900
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 679 additions and 41 deletions

3
Cargo.lock generated
View File

@ -5560,11 +5560,14 @@ dependencies = [
name = "solana-bench-tps"
version = "2.0.0"
dependencies = [
"chrono",
"clap 2.33.3",
"crossbeam-channel",
"csv",
"log",
"rand 0.8.5",
"rayon",
"serde",
"serde_json",
"serde_yaml 0.9.32",
"serial_test",

View File

@ -9,11 +9,14 @@ license = { workspace = true }
edition = { workspace = true }
[dependencies]
chrono = { workspace = true }
clap = { workspace = true }
crossbeam-channel = { workspace = true }
csv = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
solana-clap-utils = { workspace = true }

View File

@ -2,9 +2,13 @@ use {
crate::{
bench_tps_client::*,
cli::{ComputeUnitPrice, Config, InstructionPaddingConfig},
log_transaction_service::{
create_log_transactions_service_and_sender, SignatureBatchSender, TransactionInfoBatch,
},
perf_utils::{sample_txs, SampleStats},
send_batch::*,
},
chrono::Utc,
log::*,
rand::distributions::{Distribution, Uniform},
rayon::prelude::*,
@ -87,8 +91,14 @@ fn get_transaction_loaded_accounts_data_size(enable_padding: bool) -> u32 {
}
}
pub type TimestampedTransaction = (Transaction, Option<u64>);
pub type SharedTransactions = Arc<RwLock<VecDeque<Vec<TimestampedTransaction>>>>;
#[derive(Debug, PartialEq, Default, Eq, Clone)]
pub(crate) struct TimestampedTransaction {
transaction: Transaction,
timestamp: Option<u64>,
compute_unit_price: Option<u64>,
}
pub(crate) type SharedTransactions = Arc<RwLock<VecDeque<Vec<TimestampedTransaction>>>>;
/// Keypairs split into source and destination
/// used for transfer transactions
@ -356,6 +366,7 @@ fn create_sender_threads<T>(
threads: usize,
exit_signal: Arc<AtomicBool>,
shared_tx_active_thread_count: &Arc<AtomicIsize>,
signatures_sender: Option<SignatureBatchSender>,
) -> Vec<JoinHandle<()>>
where
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
@ -367,6 +378,7 @@ where
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone();
let client = client.clone();
let signatures_sender = signatures_sender.clone();
Builder::new()
.name("solana-client-sender".to_string())
.spawn(move || {
@ -377,6 +389,7 @@ where
&total_tx_sent_count,
thread_batch_sleep_ms,
&client,
signatures_sender,
);
})
.unwrap()
@ -406,6 +419,8 @@ where
use_durable_nonce,
instruction_padding_config,
num_conflict_groups,
block_data_file,
transaction_data_file,
..
} = config;
@ -464,7 +479,13 @@ where
None
};
let s_threads = create_sender_threads(
let (log_transaction_service, signatures_sender) = create_log_transactions_service_and_sender(
&client,
block_data_file.as_deref(),
transaction_data_file.as_deref(),
);
let sender_threads = create_sender_threads(
&client,
&shared_txs,
thread_batch_sleep_ms,
@ -472,6 +493,7 @@ where
threads,
exit_signal.clone(),
&shared_tx_active_thread_count,
signatures_sender,
);
wait_for_target_slots_per_epoch(target_slots_per_epoch, &client);
@ -499,7 +521,7 @@ where
// join the tx send threads
info!("Waiting for transmit threads...");
for t in s_threads {
for t in sender_threads {
if let Err(err) = t.join() {
info!(" join() failed with: {:?}", err);
}
@ -512,6 +534,13 @@ where
}
}
if let Some(log_transaction_service) = log_transaction_service {
info!("Waiting for log_transaction_service thread...");
if let Err(err) = log_transaction_service.join() {
info!(" join() failed with: {:?}", err);
}
}
if let Some(nonce_keypairs) = nonce_keypairs {
withdraw_durable_nonce_accounts(client.clone(), &gen_keypairs, &nonce_keypairs);
}
@ -575,36 +604,37 @@ fn generate_system_txs(
pairs_with_compute_unit_prices
.par_iter()
.map(|((from, to), compute_unit_price)| {
(
transfer_with_compute_unit_price_and_padding(
let compute_unit_price = Some(**compute_unit_price);
TimestampedTransaction {
transaction: transfer_with_compute_unit_price_and_padding(
from,
&to.pubkey(),
1,
*blockhash,
instruction_padding_config,
Some(**compute_unit_price),
compute_unit_price,
skip_tx_account_data_size,
),
Some(timestamp()),
)
timestamp: Some(timestamp()),
compute_unit_price,
}
})
.collect()
} else {
pairs
.par_iter()
.map(|(from, to)| {
(
transfer_with_compute_unit_price_and_padding(
from,
&to.pubkey(),
1,
*blockhash,
instruction_padding_config,
None,
skip_tx_account_data_size,
),
Some(timestamp()),
)
.map(|(from, to)| TimestampedTransaction {
transaction: transfer_with_compute_unit_price_and_padding(
from,
&to.pubkey(),
1,
*blockhash,
instruction_padding_config,
None,
skip_tx_account_data_size,
),
timestamp: Some(timestamp()),
compute_unit_price: None,
})
.collect()
}
@ -779,8 +809,8 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized
let blockhashes: Vec<Hash> = get_nonce_blockhashes(&client, &pubkeys);
for i in 0..length {
transactions.push((
nonced_transfer_with_padding(
transactions.push(TimestampedTransaction {
transaction: nonced_transfer_with_padding(
source[i],
&dest[i].pubkey(),
1,
@ -790,16 +820,17 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized
skip_tx_account_data_size,
instruction_padding_config,
),
None,
));
timestamp: None,
compute_unit_price: None,
});
}
} else {
let pubkeys: Vec<Pubkey> = dest_nonce.iter().map(|keypair| keypair.pubkey()).collect();
let blockhashes: Vec<Hash> = get_nonce_blockhashes(&client, &pubkeys);
for i in 0..length {
transactions.push((
nonced_transfer_with_padding(
transactions.push(TimestampedTransaction {
transaction: nonced_transfer_with_padding(
dest[i],
&source[i].pubkey(),
1,
@ -809,8 +840,9 @@ fn generate_nonced_system_txs<T: 'static + BenchTpsClient + Send + Sync + ?Sized
skip_tx_account_data_size,
instruction_padding_config,
),
None,
));
timestamp: None,
compute_unit_price: None,
});
}
}
transactions
@ -916,9 +948,10 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
total_tx_sent_count: &Arc<AtomicUsize>,
thread_batch_sleep_ms: usize,
client: &Arc<T>,
signatures_sender: Option<SignatureBatchSender>,
) {
let mut last_sent_time = timestamp();
loop {
'thread_loop: loop {
if thread_batch_sleep_ms > 0 {
sleep(Duration::from_millis(thread_batch_sleep_ms as u64));
}
@ -926,19 +959,21 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
let mut shared_txs_wl = shared_txs.write().expect("write lock in do_tx_transfers");
shared_txs_wl.pop_front()
};
if let Some(txs0) = txs {
if let Some(txs) = txs {
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
info!("Transferring 1 unit {} times...", txs0.len());
let tx_len = txs0.len();
let num_txs = txs.len();
info!("Transferring 1 unit {} times...", num_txs);
let transfer_start = Instant::now();
let mut old_transactions = false;
let mut transactions = Vec::<_>::new();
let mut min_timestamp = u64::MAX;
for tx in txs0 {
let mut transactions = Vec::<_>::with_capacity(num_txs);
let mut signatures = Vec::<_>::with_capacity(num_txs);
let mut compute_unit_prices = Vec::<_>::with_capacity(num_txs);
for tx in txs {
let now = timestamp();
// Transactions without durable nonce that are too old will be rejected by the cluster Don't bother
// sending them.
if let Some(tx_timestamp) = tx.1 {
if let Some(tx_timestamp) = tx.timestamp {
if tx_timestamp < min_timestamp {
min_timestamp = tx_timestamp;
}
@ -947,7 +982,9 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
continue;
}
}
transactions.push(tx.0);
signatures.push(tx.transaction.signatures[0]);
transactions.push(tx.transaction);
compute_unit_prices.push(tx.compute_unit_price);
}
if min_timestamp != u64::MAX {
@ -957,6 +994,17 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
);
}
if let Some(signatures_sender) = &signatures_sender {
if let Err(error) = signatures_sender.send(TransactionInfoBatch {
signatures,
sent_at: Utc::now(),
compute_unit_prices,
}) {
error!("Receiver has been dropped with error `{error}`, stop sending transactions.");
break 'thread_loop;
}
}
if let Err(error) = client.send_batch(transactions) {
warn!("send_batch_sync in do_tx_transfers failed: {}", error);
}
@ -977,16 +1025,16 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
shared_txs_wl.clear();
}
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
total_tx_sent_count.fetch_add(tx_len, Ordering::Relaxed);
total_tx_sent_count.fetch_add(num_txs, Ordering::Relaxed);
info!(
"Tx send done. {} ms {} tps",
duration_as_ms(&transfer_start.elapsed()),
tx_len as f32 / duration_as_s(&transfer_start.elapsed()),
num_txs as f32 / duration_as_s(&transfer_start.elapsed()),
);
datapoint_info!(
"bench-tps-do_tx_transfers",
("duration", duration_as_us(&transfer_start.elapsed()), i64),
("count", tx_len, i64)
("count", num_txs, i64)
);
}
if exit_signal.load(Ordering::Relaxed) {

View File

@ -76,6 +76,8 @@ pub struct Config {
pub bind_address: IpAddr,
pub client_node_id: Option<Keypair>,
pub commitment_config: CommitmentConfig,
pub block_data_file: Option<String>,
pub transaction_data_file: Option<String>,
}
impl Eq for Config {}
@ -109,6 +111,8 @@ impl Default for Config {
bind_address: IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
client_node_id: None,
commitment_config: CommitmentConfig::confirmed(),
block_data_file: None,
transaction_data_file: None,
}
}
}
@ -419,6 +423,23 @@ pub fn build_args<'a>(version: &'_ str) -> App<'a, '_> {
.default_value("confirmed")
.help("Block commitment config for getting latest blockhash"),
)
.arg(
Arg::with_name("block_data_file")
.long("block-data-file")
.value_name("FILENAME")
.takes_value(true)
.help("File to save block statistics relevant to the submitted transactions."),
)
.arg(
Arg::with_name("transaction_data_file")
.long("transaction-data-file")
.value_name("FILENAME")
.takes_value(true)
.help(
"File to save details about all the submitted transactions.\
This option is useful for debug purposes."
),
)
}
/// Parses a clap `ArgMatches` structure into a `Config`
@ -587,6 +608,10 @@ pub fn parse_args(matches: &ArgMatches) -> Result<Config, &'static str> {
}
args.commitment_config = value_t_or_exit!(matches, "commitment_config", CommitmentConfig);
args.block_data_file = matches.value_of("block_data_file").map(|s| s.to_string());
args.transaction_data_file = matches
.value_of("transaction_data_file")
.map(|s| s.to_string());
Ok(args)
}

View File

@ -3,5 +3,7 @@ pub mod bench;
pub mod bench_tps_client;
pub mod cli;
pub mod keypairs;
mod log_transaction_service;
mod perf_utils;
mod rpc_with_retry_utils;
pub mod send_batch;

View File

@ -0,0 +1,496 @@
//! `LogTransactionService` requests confirmed blocks, analyses transactions submitted by bench-tps,
//! and saves log files in csv format.
use {
crate::{
bench_tps_client::BenchTpsClient,
rpc_with_retry_utils::{get_blocks_with_retry, get_slot_with_retry},
},
chrono::{DateTime, TimeZone, Utc},
crossbeam_channel::{select, tick, unbounded, Receiver, Sender},
log::*,
serde::Serialize,
solana_client::rpc_config::RpcBlockConfig,
solana_measure::measure::Measure,
solana_sdk::{
clock::{DEFAULT_MS_PER_SLOT, MAX_PROCESSING_AGE},
commitment_config::{CommitmentConfig, CommitmentLevel},
signature::Signature,
slot_history::Slot,
},
solana_transaction_status::{
option_serializer::OptionSerializer, EncodedTransactionWithStatusMeta, RewardType,
TransactionDetails, UiConfirmedBlock, UiTransactionEncoding, UiTransactionStatusMeta,
},
std::{
collections::HashMap,
fs::File,
sync::Arc,
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
// Data to establish communication between sender thread and
// LogTransactionService.
#[derive(Clone)]
pub(crate) struct TransactionInfoBatch {
pub signatures: Vec<Signature>,
pub sent_at: DateTime<Utc>,
pub compute_unit_prices: Vec<Option<u64>>,
}
pub(crate) type SignatureBatchSender = Sender<TransactionInfoBatch>;
pub(crate) struct LogTransactionService {
thread_handler: JoinHandle<()>,
}
pub(crate) fn create_log_transactions_service_and_sender<Client>(
client: &Arc<Client>,
block_data_file: Option<&str>,
transaction_data_file: Option<&str>,
) -> (Option<LogTransactionService>, Option<SignatureBatchSender>)
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
if data_file_provided(block_data_file, transaction_data_file) {
let (sender, receiver) = unbounded();
let log_tx_service =
LogTransactionService::new(client, receiver, block_data_file, transaction_data_file);
(Some(log_tx_service), Some(sender))
} else {
(None, None)
}
}
// How many blocks to process during one iteration.
// The time to process blocks is dominated by get_block calls.
// Each call takes slightly less time than slot.
const NUM_SLOTS_PER_ITERATION: u64 = 16;
// How often process blocks.
const PROCESS_BLOCKS_EVERY_MS: u64 = NUM_SLOTS_PER_ITERATION * DEFAULT_MS_PER_SLOT;
// Max age for transaction in the transaction map, older transactions are cleaned up and marked as timeout.
const REMOVE_TIMEOUT_TX_EVERY_MS: i64 = MAX_PROCESSING_AGE as i64 * DEFAULT_MS_PER_SLOT as i64;
// Map used to filter submitted transactions.
#[derive(Clone)]
struct TransactionSendInfo {
pub sent_at: DateTime<Utc>,
pub compute_unit_price: Option<u64>,
}
type MapSignatureToTxInfo = HashMap<Signature, TransactionSendInfo>;
type SignatureBatchReceiver = Receiver<TransactionInfoBatch>;
impl LogTransactionService {
fn new<Client>(
client: &Arc<Client>,
signature_receiver: SignatureBatchReceiver,
block_data_file: Option<&str>,
transaction_data_file: Option<&str>,
) -> Self
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
if !data_file_provided(block_data_file, transaction_data_file) {
panic!("Expect block-data-file or transaction-data-file is specified, must have been verified by callee.");
}
let client = client.clone();
let tx_log_writer = TransactionLogWriter::new(transaction_data_file);
let block_log_writer = BlockLogWriter::new(block_data_file);
let thread_handler = Builder::new()
.name("LogTransactionService".to_string())
.spawn(move || {
Self::run(client, signature_receiver, tx_log_writer, block_log_writer);
})
.expect("LogTransactionService should have started successfully.");
Self { thread_handler }
}
pub fn join(self) -> thread::Result<()> {
self.thread_handler.join()
}
fn run<Client>(
client: Arc<Client>,
signature_receiver: SignatureBatchReceiver,
mut tx_log_writer: TransactionLogWriter,
mut block_log_writer: BlockLogWriter,
) where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
// used to request blocks data and only confirmed makes sense in this context.
let commitment: CommitmentConfig = CommitmentConfig {
commitment: CommitmentLevel::Confirmed,
};
let block_processing_timer_receiver = tick(Duration::from_millis(PROCESS_BLOCKS_EVERY_MS));
let mut start_slot = get_slot_with_retry(&client, commitment)
.expect("get_slot_with_retry should have succeed, cannot proceed without having slot. Must be a problem with RPC.");
let mut sender_stopped = false;
let mut signature_to_tx_info = MapSignatureToTxInfo::new();
loop {
select! {
recv(signature_receiver) -> msg => {
match msg {
Ok(TransactionInfoBatch {
signatures,
sent_at,
compute_unit_prices
}) => {
signatures.iter().zip(compute_unit_prices).for_each( |(sign, compute_unit_price)| {signature_to_tx_info.insert(*sign, TransactionSendInfo {
sent_at,
compute_unit_price
});});
}
Err(_) => {
sender_stopped = true;
}
}
},
recv(block_processing_timer_receiver) -> _ => {
info!("sign_receiver queue len: {}", signature_receiver.len());
if !signature_receiver.is_empty() {
continue;
}
let mut measure_get_blocks = Measure::start("measure_get_blocks");
let block_slots = get_blocks_with_retry(&client, start_slot, Some(start_slot + NUM_SLOTS_PER_ITERATION - 1), commitment);
measure_get_blocks.stop();
let time_get_blocks_us = measure_get_blocks.as_us();
info!("Time to get_blocks : {time_get_blocks_us}us.");
let Ok(block_slots) = block_slots else {
error!("Failed to get blocks, stop LogWriterService.");
break;
};
if block_slots.is_empty() {
continue;
}
let last_block_time = Self::process_blocks(
&client,
block_slots,
&mut signature_to_tx_info,
&mut tx_log_writer,
&mut block_log_writer,
commitment,
);
Self::clean_transaction_map(&mut tx_log_writer, &mut signature_to_tx_info, last_block_time);
start_slot = start_slot.saturating_add(NUM_SLOTS_PER_ITERATION);
tx_log_writer.flush();
block_log_writer.flush();
if sender_stopped && signature_to_tx_info.is_empty() {
info!("Stop LogTransactionService");
break;
}
},
}
}
}
/// Download and process the blocks.
/// Returns the time when the last processed block has been confirmed or now().
fn process_blocks<Client>(
client: &Arc<Client>,
block_slots: Vec<Slot>,
signature_to_tx_info: &mut MapSignatureToTxInfo,
tx_log_writer: &mut TransactionLogWriter,
block_log_writer: &mut BlockLogWriter,
commitment: CommitmentConfig,
) -> DateTime<Utc>
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
let rpc_block_config = RpcBlockConfig {
encoding: Some(UiTransactionEncoding::Base64),
transaction_details: Some(TransactionDetails::Full),
rewards: Some(true),
commitment: Some(commitment),
max_supported_transaction_version: Some(0),
};
let mut measure_process_blocks = Measure::start("measure_process_blocks");
let blocks = block_slots
.iter()
.map(|slot| client.get_block_with_config(*slot, rpc_block_config));
let num_blocks = blocks.len();
let mut last_block_time = None;
for (block, slot) in blocks.zip(&block_slots) {
let Ok(block) = block else {
continue;
};
let block_time = Self::process_block(
block,
signature_to_tx_info,
*slot,
tx_log_writer,
block_log_writer,
);
// if last_time is some, it means that the there is at least one valid block
if block_time.is_some() {
last_block_time = block_time;
}
}
measure_process_blocks.stop();
let time_process_blocks_us = measure_process_blocks.as_us();
info!("Time to process {num_blocks} blocks: {time_process_blocks_us}us.");
last_block_time.unwrap_or_else(Utc::now)
}
fn process_block(
block: UiConfirmedBlock,
signature_to_tx_info: &mut MapSignatureToTxInfo,
slot: u64,
tx_log_writer: &mut TransactionLogWriter,
block_log_writer: &mut BlockLogWriter,
) -> Option<DateTime<Utc>> {
let rewards = block
.rewards
.as_ref()
.expect("Rewards should be part of the block information.");
let slot_leader = rewards
.iter()
.find(|r| r.reward_type == Some(RewardType::Fee))
.map_or("".to_string(), |x| x.pubkey.clone());
let Some(transactions) = &block.transactions else {
warn!("Empty block: {slot}");
return None;
};
let mut num_bench_tps_transactions: usize = 0;
let mut total_cu_consumed: u64 = 0;
let mut bench_tps_cu_consumed: u64 = 0;
for EncodedTransactionWithStatusMeta {
transaction, meta, ..
} in transactions
{
let Some(transaction) = transaction.decode() else {
continue;
};
let cu_consumed = meta
.as_ref()
.map_or(0, |meta| match meta.compute_units_consumed {
OptionSerializer::Some(cu_consumed) => cu_consumed,
_ => 0,
});
let signature = &transaction.signatures[0];
total_cu_consumed = total_cu_consumed.saturating_add(cu_consumed);
if let Some(TransactionSendInfo {
sent_at,
compute_unit_price,
}) = signature_to_tx_info.remove(signature)
{
num_bench_tps_transactions = num_bench_tps_transactions.saturating_add(1);
bench_tps_cu_consumed = bench_tps_cu_consumed.saturating_add(cu_consumed);
tx_log_writer.write(
Some(block.blockhash.clone()),
Some(slot_leader.clone()),
signature,
sent_at,
Some(slot),
block.block_time,
meta.as_ref(),
false,
compute_unit_price,
);
}
}
block_log_writer.write(
block.blockhash.clone(),
slot_leader,
slot,
block.block_time,
num_bench_tps_transactions,
transactions.len(),
bench_tps_cu_consumed,
total_cu_consumed,
);
block.block_time.map(|time| {
Utc.timestamp_opt(time, 0)
.latest()
.expect("valid timestamp")
})
}
/// Remove from map all the signatures which we haven't processed before and they are
/// older than the the timestamp of the last processed block plus max blockhash age.
fn clean_transaction_map(
tx_log_writer: &mut TransactionLogWriter,
signature_to_tx_info: &mut MapSignatureToTxInfo,
last_block_time: DateTime<Utc>,
) {
signature_to_tx_info.retain(|signature, tx_info| {
let duration_since_sent = last_block_time.signed_duration_since(tx_info.sent_at);
let is_timeout_tx = duration_since_sent.num_milliseconds() > REMOVE_TIMEOUT_TX_EVERY_MS;
if is_timeout_tx {
tx_log_writer.write(
None,
None,
signature,
tx_info.sent_at,
None,
None,
None,
true,
tx_info.compute_unit_price,
);
}
!is_timeout_tx
});
}
}
fn data_file_provided(block_data_file: Option<&str>, transaction_data_file: Option<&str>) -> bool {
block_data_file.is_some() || transaction_data_file.is_some()
}
type CsvFileWriter = csv::Writer<File>;
#[derive(Clone, Serialize)]
struct BlockData {
pub blockhash: String,
pub block_slot: Slot,
pub slot_leader: String,
pub block_time: Option<DateTime<Utc>>,
pub total_num_transactions: usize,
pub num_bench_tps_transactions: usize,
pub total_cu_consumed: u64,
pub bench_tps_cu_consumed: u64,
}
struct BlockLogWriter {
log_writer: Option<CsvFileWriter>,
}
impl BlockLogWriter {
fn new(block_data_file: Option<&str>) -> Self {
let block_log_writer = block_data_file.map(|block_data_file| {
CsvFileWriter::from_writer(
File::create(block_data_file)
.expect("Application should be able to create a file."),
)
});
Self {
log_writer: block_log_writer,
}
}
#[allow(clippy::too_many_arguments)]
fn write(
&mut self,
blockhash: String,
slot_leader: String,
slot: Slot,
block_time: Option<i64>,
num_bench_tps_transactions: usize,
total_num_transactions: usize,
bench_tps_cu_consumed: u64,
total_cu_consumed: u64,
) {
let Some(block_log_writer) = &mut self.log_writer else {
return;
};
let block_data = BlockData {
blockhash,
slot_leader,
block_slot: slot,
block_time: block_time.map(|time| {
Utc.timestamp_opt(time, 0)
.latest()
.expect("timestamp should be valid")
}),
num_bench_tps_transactions,
total_num_transactions,
bench_tps_cu_consumed,
total_cu_consumed,
};
let _ = block_log_writer.serialize(block_data);
}
fn flush(&mut self) {
if let Some(block_log_writer) = &mut self.log_writer {
let _ = block_log_writer.flush();
}
}
}
#[derive(Clone, Serialize)]
struct TransactionData {
pub blockhash: Option<String>,
pub slot_leader: Option<String>,
pub signature: String,
pub sent_at: Option<DateTime<Utc>>,
pub confirmed_slot: Option<Slot>,
pub block_time: Option<DateTime<Utc>>,
pub successful: bool,
pub error: Option<String>,
pub timed_out: bool,
pub compute_unit_price: u64,
}
struct TransactionLogWriter {
log_writer: Option<CsvFileWriter>,
}
impl TransactionLogWriter {
fn new(transaction_data_file: Option<&str>) -> Self {
let transaction_log_writer = transaction_data_file.map(|transaction_data_file| {
CsvFileWriter::from_writer(
File::create(transaction_data_file)
.expect("Application should be able to create a file."),
)
});
Self {
log_writer: transaction_log_writer,
}
}
#[allow(clippy::too_many_arguments)]
fn write(
&mut self,
blockhash: Option<String>,
slot_leader: Option<String>,
signature: &Signature,
sent_at: DateTime<Utc>,
confirmed_slot: Option<Slot>,
block_time: Option<i64>,
meta: Option<&UiTransactionStatusMeta>,
timed_out: bool,
compute_unit_price: Option<u64>,
) {
let Some(transaction_log_writer) = &mut self.log_writer else {
return;
};
let tx_data = TransactionData {
blockhash,
slot_leader,
signature: signature.to_string(),
sent_at: Some(sent_at),
confirmed_slot,
block_time: block_time.map(|time| {
Utc.timestamp_opt(time, 0)
.latest()
.expect("valid timestamp")
}),
successful: meta.as_ref().map_or(false, |m| m.status.is_ok()),
error: meta
.as_ref()
.and_then(|m| m.err.as_ref().map(|x| x.to_string())),
timed_out,
compute_unit_price: compute_unit_price.unwrap_or(0),
};
let _ = transaction_log_writer.serialize(tx_data);
}
fn flush(&mut self) {
if let Some(transaction_log_writer) = &mut self.log_writer {
let _ = transaction_log_writer.flush();
}
}
}

View File

@ -0,0 +1,61 @@
use {
crate::bench_tps_client::{BenchTpsClient, Result},
log::*,
solana_sdk::{
clock::DEFAULT_MS_PER_SLOT, commitment_config::CommitmentConfig, slot_history::Slot,
},
std::{sync::Arc, thread::sleep, time::Duration},
};
const NUM_RETRY: u64 = 5;
const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT;
fn call_rpc_with_retry<Func, Data>(f: Func, retry_warning: &str) -> Result<Data>
where
Func: Fn() -> Result<Data>,
{
let mut iretry = 0;
loop {
match f() {
Ok(slot) => {
return Ok(slot);
}
Err(error) => {
if iretry == NUM_RETRY {
return Err(error);
}
warn!("{retry_warning}: {error}, retry.");
sleep(Duration::from_millis(RETRY_EVERY_MS));
}
}
iretry += 1;
}
}
pub(crate) fn get_slot_with_retry<Client>(
client: &Arc<Client>,
commitment: CommitmentConfig,
) -> Result<Slot>
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
call_rpc_with_retry(
|| client.get_slot_with_commitment(commitment),
"Failed to get slot",
)
}
pub(crate) fn get_blocks_with_retry<Client>(
client: &Arc<Client>,
start_slot: Slot,
end_slot: Option<Slot>,
commitment: CommitmentConfig,
) -> Result<Vec<Slot>>
where
Client: 'static + BenchTpsClient + Send + Sync + ?Sized,
{
call_rpc_with_retry(
|| client.get_blocks_with_commitment(start_slot, end_slot, commitment),
"Failed to download blocks",
)
}