Add hidden cli option to allow validator reports replayed transaction cost metrics (#22369)

* add hidden cli option to allow validator reports replayed transaction cost detail metrics

* Update validator/src/main.rs

Co-authored-by: Michael Vines <mvines@gmail.com>

* - rebase master, using unbounded instead of channel; dowgrade to datapoint_trace

* removed cli arg, prefer log at trace

Co-authored-by: Michael Vines <mvines@gmail.com>
This commit is contained in:
Tao Zhu 2022-01-14 18:31:21 -06:00 committed by GitHub
parent 2aa113fd8c
commit a724fa2347
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 147 additions and 0 deletions

View File

@ -51,6 +51,7 @@ use {
bank::{Bank, NewBankOptions},
bank_forks::BankForks,
commitment::BlockCommitmentCache,
transaction_cost_metrics_sender::TransactionCostMetricsSender,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{
@ -339,6 +340,7 @@ impl ReplayStage {
voting_sender: Sender<VoteOp>,
drop_bank_sender: Sender<Vec<Arc<Bank>>>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
transaction_cost_metrics_sender: Option<TransactionCostMetricsSender>,
) -> Self {
let ReplayStageConfig {
vote_account,
@ -445,6 +447,7 @@ impl ReplayStage {
&mut duplicate_slots_to_repair,
&ancestor_hashes_replay_update_sender,
block_metadata_notifier.clone(),
transaction_cost_metrics_sender.as_ref(),
);
replay_active_banks_time.stop();
@ -1570,6 +1573,7 @@ impl ReplayStage {
bank_progress: &mut ForkProgress,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: &ReplayVoteSender,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
verify_recyclers: &VerifyRecyclers,
) -> result::Result<usize, BlockstoreProcessorError> {
let tx_count_before = bank_progress.replay_progress.num_txs;
@ -1581,6 +1585,7 @@ impl ReplayStage {
false,
transaction_status_sender,
Some(replay_vote_sender),
transaction_cost_metrics_sender,
None,
verify_recyclers,
false,
@ -2070,6 +2075,7 @@ impl ReplayStage {
duplicate_slots_to_repair: &mut DuplicateSlotsToRepair,
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
) -> bool {
let mut did_complete_bank = false;
let mut tx_count = 0;
@ -2119,6 +2125,7 @@ impl ReplayStage {
bank_progress,
transaction_status_sender,
replay_vote_sender,
transaction_cost_metrics_sender,
verify_recyclers,
);
match replay_result {
@ -3652,6 +3659,7 @@ pub mod tests {
bank1_progress,
None,
&replay_vote_sender,
None,
&VerifyRecyclers::default(),
);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());

View File

@ -49,6 +49,9 @@ use {
snapshot_package::{
AccountsPackageReceiver, AccountsPackageSender, PendingSnapshotPackage,
},
transaction_cost_metrics_sender::{
TransactionCostMetricsSender, TransactionCostMetricsService,
},
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
@ -72,6 +75,7 @@ pub struct Tvu {
cost_update_service: CostUpdateService,
voting_service: VotingService,
drop_bank_service: DropBankService,
transaction_cost_metrics_service: TransactionCostMetricsService,
}
pub struct TvuSockets {
@ -309,6 +313,15 @@ impl Tvu {
);
let (drop_bank_sender, drop_bank_receiver) = unbounded();
let (tx_cost_metrics_sender, tx_cost_metrics_receiver) = unbounded();
let transaction_cost_metrics_sender = Some(TransactionCostMetricsSender::new(
cost_model.clone(),
tx_cost_metrics_sender,
));
let transaction_cost_metrics_service =
TransactionCostMetricsService::new(tx_cost_metrics_receiver);
let drop_bank_service = DropBankService::new(drop_bank_receiver);
let replay_stage = ReplayStage::new(
@ -332,6 +345,7 @@ impl Tvu {
voting_sender,
drop_bank_sender,
block_metadata_notifier,
transaction_cost_metrics_sender,
);
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
@ -366,6 +380,7 @@ impl Tvu {
cost_update_service,
voting_service,
drop_bank_service,
transaction_cost_metrics_service,
}
}
@ -382,6 +397,7 @@ impl Tvu {
self.cost_update_service.join()?;
self.voting_service.join()?;
self.drop_bank_service.join()?;
self.transaction_cost_metrics_service.join()?;
Ok(())
}
}

View File

@ -32,6 +32,7 @@ use {
snapshot_package::{AccountsPackageSender, SnapshotType},
snapshot_utils::{self, BankFromArchiveTimings},
transaction_batch::TransactionBatch,
transaction_cost_metrics_sender::TransactionCostMetricsSender,
vote_account::VoteAccount,
vote_sender_types::ReplayVoteSender,
},
@ -317,6 +318,7 @@ pub fn process_entries_for_tests(
None,
transaction_status_sender,
replay_vote_sender,
None,
&mut timings,
Arc::new(RwLock::new(BlockCostCapacityMeter::default())),
);
@ -333,6 +335,7 @@ fn process_entries_with_callback(
entry_callback: Option<&ProcessCallback>,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
timings: &mut ExecuteTimings,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
) -> Result<()> {
@ -366,6 +369,11 @@ fn process_entries_with_callback(
}
}
EntryType::Transactions(transactions) => {
if let Some(transaction_cost_metrics_sender) = transaction_cost_metrics_sender {
transaction_cost_metrics_sender
.send_cost_details(bank.clone(), transactions.iter());
}
if randomize {
transactions.shuffle(&mut rng);
}
@ -791,6 +799,7 @@ fn confirm_full_slot(
skip_verification,
transaction_status_sender,
replay_vote_sender,
None,
opts.entry_callback.as_ref(),
recyclers,
opts.allow_dead_slots,
@ -858,6 +867,7 @@ pub fn confirm_slot(
skip_verification: bool,
transaction_status_sender: Option<&TransactionStatusSender>,
replay_vote_sender: Option<&ReplayVoteSender>,
transaction_cost_metrics_sender: Option<&TransactionCostMetricsSender>,
entry_callback: Option<&ProcessCallback>,
recyclers: &VerifyRecyclers,
allow_dead_slots: bool,
@ -954,6 +964,7 @@ pub fn confirm_slot(
entry_callback,
transaction_status_sender,
replay_vote_sender,
transaction_cost_metrics_sender,
&mut execute_timings,
cost_capacity_meter,
)

View File

@ -57,6 +57,7 @@ pub mod stakes;
pub mod status_cache;
mod system_instruction_processor;
pub mod transaction_batch;
pub mod transaction_cost_metrics_sender;
pub mod vote_account;
pub mod vote_sender_types;
pub mod waitable_condvar;

View File

@ -0,0 +1,111 @@
use {
crate::{bank::Bank, cost_model::CostModel},
crossbeam_channel::{Receiver, Sender},
log::*,
solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction},
std::{
sync::{Arc, RwLock},
thread::{self, Builder, JoinHandle},
},
};
pub enum TransactionCostMetrics {
TransactionCostDetail {
slot: Slot,
tx_signature: Signature,
signature_cost: u64,
write_lock_cost: u64,
data_bytes_cost: u64,
execution_cost: u64,
},
}
pub struct TransactionCostMetricsSender {
cost_model: Arc<RwLock<CostModel>>,
metrics_sender: Sender<TransactionCostMetrics>,
}
impl TransactionCostMetricsSender {
pub fn new(
cost_model: Arc<RwLock<CostModel>>,
metrics_sender: Sender<TransactionCostMetrics>,
) -> Self {
Self {
cost_model,
metrics_sender,
}
}
pub fn send_cost_details<'a>(
&self,
bank: Arc<Bank>,
txs: impl Iterator<Item = &'a SanitizedTransaction>,
) {
let cost_model = self.cost_model.read().unwrap();
txs.for_each(|tx| {
let cost = cost_model.calculate_cost(tx);
self.metrics_sender
.send(TransactionCostMetrics::TransactionCostDetail {
slot: bank.slot(),
tx_signature: *tx.signature(),
signature_cost: cost.signature_cost,
write_lock_cost: cost.write_lock_cost,
data_bytes_cost: cost.data_bytes_cost,
execution_cost: cost.execution_cost,
})
.unwrap_or_else(|err| {
warn!(
"transaction cost metrics service report cost detail failed: {:?}",
err
)
});
});
}
}
pub struct TransactionCostMetricsService {
thread_hdl: JoinHandle<()>,
}
impl TransactionCostMetricsService {
pub fn new(transaction_cost_metrics_receiver: Receiver<TransactionCostMetrics>) -> Self {
let thread_hdl = Builder::new()
.name("transaction_cost_metrics_service".to_string())
.spawn(move || {
Self::service_loop(transaction_cost_metrics_receiver);
})
.unwrap();
Self { thread_hdl }
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
fn service_loop(transaction_cost_metrics_receiver: Receiver<TransactionCostMetrics>) {
for tx_cost_metrics in transaction_cost_metrics_receiver.iter() {
match tx_cost_metrics {
TransactionCostMetrics::TransactionCostDetail {
slot,
tx_signature,
signature_cost,
write_lock_cost,
data_bytes_cost,
execution_cost,
} => {
// report transaction cost details per slot|signature
datapoint_trace!(
"transaction-cost-details",
("slot", slot as i64, i64),
("tx_signature", tx_signature.to_string(), String),
("signature_cost", signature_cost as i64, i64),
("write_lock_cost", write_lock_cost as i64, i64),
("data_bytes_cost", data_bytes_cost as i64, i64),
("execution_cost", execution_cost as i64, i64),
);
}
}
}
}
}