Add mango patches to jito

This commit is contained in:
godmodegalactus 2024-01-10 15:58:34 +01:00
parent 165a87482d
commit c1040b0129
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
22 changed files with 602 additions and 38 deletions

6
Cargo.lock generated
View File

@ -2924,7 +2924,7 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38"
[[package]]
name = "jito-programs-vote-state"
version = "0.1.5"
version = "0.1.4"
dependencies = [
"anchor-lang",
"bincode",
@ -2947,7 +2947,7 @@ dependencies = [
[[package]]
name = "jito-tip-distribution"
version = "0.1.5"
version = "0.1.4"
dependencies = [
"anchor-lang",
"default-env",
@ -2958,7 +2958,7 @@ dependencies = [
[[package]]
name = "jito-tip-payment"
version = "0.1.5"
version = "0.1.4"
dependencies = [
"anchor-lang",
"default-env",

View File

@ -1,7 +1,6 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to construct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use {
self::{
committer::Committer,
@ -504,7 +503,10 @@ impl BankingStage {
}
let (decision, make_decision_time) =
measure!(decision_maker.make_consume_or_forward_decision());
let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank_start());
let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(
decision.bank_start(),
Some(unprocessed_transaction_storage),
);
slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us());
match decision {

View File

@ -25,7 +25,9 @@ use {
transaction_batch::TransactionBatch,
},
solana_sdk::{
borsh0_10::try_from_slice_unchecked,
clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE},
compute_budget::{self, ComputeBudgetInstruction},
feature_set,
pubkey::Pubkey,
saturating_add_assign,
@ -68,6 +70,8 @@ pub struct ExecuteAndCommitTransactionsOutput {
pub commit_transactions_result: Result<Vec<CommitTransactionDetails>, PohRecorderError>,
execute_and_commit_timings: LeaderExecuteAndCommitTimings,
error_counters: TransactionErrorMetrics,
scheduled_min_prioritization_fees: usize,
scheduled_max_prioritization_fees: usize,
}
pub struct Consumer {
@ -297,6 +301,8 @@ impl Consumer {
let mut total_execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
let mut total_error_counters = TransactionErrorMetrics::default();
let mut reached_max_poh_height = false;
let mut total_scheduled_min_prioritization_fees: usize = usize::MAX;
let mut total_scheduled_max_prioritization_fees: usize = 0;
while chunk_start != transactions.len() {
let chunk_end = std::cmp::min(
transactions.len(),
@ -327,6 +333,8 @@ impl Consumer {
commit_transactions_result: new_commit_transactions_result,
execute_and_commit_timings: new_execute_and_commit_timings,
error_counters: new_error_counters,
scheduled_min_prioritization_fees,
scheduled_max_prioritization_fees,
..
} = execute_and_commit_transactions_output;
@ -336,6 +344,14 @@ impl Consumer {
total_transactions_attempted_execution_count,
new_transactions_attempted_execution_count
);
total_scheduled_min_prioritization_fees = std::cmp::min(
total_scheduled_min_prioritization_fees,
scheduled_min_prioritization_fees,
);
total_scheduled_max_prioritization_fees = std::cmp::min(
total_scheduled_max_prioritization_fees,
scheduled_max_prioritization_fees,
);
trace!(
"process_transactions result: {:?}",
@ -396,6 +412,8 @@ impl Consumer {
cost_model_us: total_cost_model_us,
execute_and_commit_timings: total_execute_and_commit_timings,
error_counters: total_error_counters,
scheduled_min_prioritization_fees: total_scheduled_min_prioritization_fees,
scheduled_max_prioritization_fees: total_scheduled_max_prioritization_fees,
}
}
@ -425,7 +443,13 @@ impl Consumer {
// Re-sanitized transaction should be equal to the original transaction,
// but whether it will pass sanitization needs to be checked.
let resanitized_tx =
bank.fully_verify_transaction(tx.to_versioned_transaction())?;
match bank.fully_verify_transaction(tx.to_versioned_transaction()) {
Ok(resanitized_tx) => resanitized_tx,
Err(e) => {
bank.notify_transaction_error(tx, Some(e.clone()));
return Err(e);
}
};
if resanitized_tx != *tx {
// Sanitization before/after epoch give different transaction data - do not execute.
return Err(TransactionError::ResanitizationNeeded);
@ -563,6 +587,33 @@ impl Consumer {
});
execute_and_commit_timings.collect_balances_us = collect_balances_us;
let min_max = batch
.sanitized_transactions()
.iter()
.filter_map(|transaction| {
let message = transaction.message();
for (program_id, instruction) in message.program_instructions_iter() {
if compute_budget::check_id(program_id) {
match try_from_slice_unchecked(&instruction.data) {
Ok(ComputeBudgetInstruction::SetComputeUnitPrice(micro_lamports)) => {
return Some(micro_lamports);
}
Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
additional_fee,
..
}) => {
return Some(additional_fee as u64);
}
_ => {}
}
}
}
None
})
.minmax();
let (scheduled_min_prioritization_fees, scheduled_max_prioritization_fees) =
min_max.into_option().unwrap_or_default();
let (load_and_execute_transactions_output, load_execute_us) = measure_us!(bank
.load_and_execute_transactions(
batch,
@ -637,6 +688,8 @@ impl Consumer {
commit_transactions_result: Err(recorder_err),
execute_and_commit_timings,
error_counters,
scheduled_min_prioritization_fees: scheduled_min_prioritization_fees as usize,
scheduled_max_prioritization_fees: scheduled_max_prioritization_fees as usize,
};
}
@ -692,6 +745,8 @@ impl Consumer {
commit_transactions_result: Ok(commit_transaction_statuses),
execute_and_commit_timings,
error_counters,
scheduled_min_prioritization_fees: scheduled_min_prioritization_fees as usize,
scheduled_max_prioritization_fees: scheduled_max_prioritization_fees as usize,
}
}

View File

@ -1,7 +1,9 @@
use {
super::{
leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics},
unprocessed_transaction_storage::InsertPacketBatchSummary,
unprocessed_transaction_storage::{
InsertPacketBatchSummary, UnprocessedTransactionStorage,
},
},
solana_accounts_db::transaction_error_metrics::*,
solana_poh::poh_recorder::BankStart,
@ -52,6 +54,53 @@ pub(crate) struct ProcessTransactionsSummary {
// Breakdown of all the transaction errors from transactions passed for execution
pub error_counters: TransactionErrorMetrics,
pub scheduled_min_prioritization_fees: usize,
pub scheduled_max_prioritization_fees: usize,
}
// Metrics describing prioritization fee information for each transaction storage before processing transactions
#[derive(Debug, Default)]
struct LeaderPrioritizationFeesMetrics {
// minimum prioritization fees in the MinMaxHeap
min_prioritization_fees_per_cu: u64,
// maximum prioritization fees in the MinMaxHeap
max_prioritization_fees_per_cu: u64,
}
impl LeaderPrioritizationFeesMetrics {
fn new(unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>) -> Self {
if let Some(unprocessed_transaction_storage) = unprocessed_transaction_storage {
Self {
min_prioritization_fees_per_cu: unprocessed_transaction_storage
.get_min_priority()
.unwrap_or_default(),
max_prioritization_fees_per_cu: unprocessed_transaction_storage
.get_max_priority()
.unwrap_or_default(),
}
} else {
Self::default()
}
}
fn report(&self, id: u32, slot: Slot) {
datapoint_info!(
"banking_stage-leader_prioritization_fees_info_by_thread",
("id", id as i64, i64),
("slot", slot as i64, i64),
(
"min_prioritization_fees_per_cu",
self.min_prioritization_fees_per_cu as i64,
i64
),
(
"max_prioritization_fees_per_cu",
self.max_prioritization_fees_per_cu as i64,
i64
)
);
}
}
// Metrics describing packets ingested/processed in various parts of BankingStage during this
@ -138,6 +187,11 @@ struct LeaderSlotPacketCountMetrics {
// total number of forwardable batches that were attempted for forwarding. A forwardable batch
// is defined in `ForwardPacketBatchesByAccounts` in `forward_packet_batches_by_accounts.rs`
forwardable_batches_count: u64,
// min prioritization fees for scheduled transactions
scheduled_min_prioritization_fees: u64,
// max prioritization fees for scheduled transactions
scheduled_max_prioritization_fees: u64,
}
impl LeaderSlotPacketCountMetrics {
@ -255,6 +309,16 @@ impl LeaderSlotPacketCountMetrics {
self.end_of_slot_unprocessed_buffer_len as i64,
i64
),
(
"scheduled_min_prioritization_fees",
self.scheduled_min_prioritization_fees as i64,
i64
),
(
"scheduled_max_prioritization_fees",
self.scheduled_max_prioritization_fees as i64,
i64
),
);
}
}
@ -277,12 +341,19 @@ pub(crate) struct LeaderSlotMetrics {
timing_metrics: LeaderSlotTimingMetrics,
prioritization_fees_metric: LeaderPrioritizationFeesMetrics,
// Used by tests to check if the `self.report()` method was called
is_reported: bool,
}
impl LeaderSlotMetrics {
pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self {
pub(crate) fn new(
id: u32,
slot: Slot,
bank_creation_time: &Instant,
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
) -> Self {
Self {
id,
slot,
@ -290,6 +361,9 @@ impl LeaderSlotMetrics {
transaction_error_metrics: TransactionErrorMetrics::new(),
vote_packet_count_metrics: VotePacketCountMetrics::new(),
timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time),
prioritization_fees_metric: LeaderPrioritizationFeesMetrics::new(
unprocessed_transaction_storage,
),
is_reported: false,
}
}
@ -301,6 +375,7 @@ impl LeaderSlotMetrics {
self.transaction_error_metrics.report(self.id, self.slot);
self.packet_count_metrics.report(self.id, self.slot);
self.vote_packet_count_metrics.report(self.id, self.slot);
self.prioritization_fees_metric.report(self.id, self.slot);
}
/// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None
@ -372,6 +447,7 @@ impl LeaderSlotMetricsTracker {
pub(crate) fn check_leader_slot_boundary(
&mut self,
bank_start: Option<&BankStart>,
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
) -> MetricsTrackerAction {
match (self.leader_slot_metrics.as_mut(), bank_start) {
(None, None) => MetricsTrackerAction::Noop,
@ -387,6 +463,7 @@ impl LeaderSlotMetricsTracker {
self.id,
bank_start.working_bank.slot(),
&bank_start.bank_creation_time,
unprocessed_transaction_storage,
)))
}
@ -398,6 +475,7 @@ impl LeaderSlotMetricsTracker {
self.id,
bank_start.working_bank.slot(),
&bank_start.bank_creation_time,
unprocessed_transaction_storage,
)))
} else {
MetricsTrackerAction::Noop
@ -449,6 +527,8 @@ impl LeaderSlotMetricsTracker {
cost_model_us,
ref execute_and_commit_timings,
error_counters,
scheduled_min_prioritization_fees,
scheduled_max_prioritization_fees,
..
} = process_transactions_summary;
@ -525,6 +605,23 @@ impl LeaderSlotMetricsTracker {
*cost_model_us
);
leader_slot_metrics
.packet_count_metrics
.scheduled_min_prioritization_fees = std::cmp::min(
leader_slot_metrics
.packet_count_metrics
.scheduled_min_prioritization_fees,
*scheduled_min_prioritization_fees as u64,
);
leader_slot_metrics
.packet_count_metrics
.scheduled_max_prioritization_fees = std::cmp::min(
leader_slot_metrics
.packet_count_metrics
.scheduled_max_prioritization_fees,
*scheduled_max_prioritization_fees as u64,
);
leader_slot_metrics
.timing_metrics
.execute_and_commit_timings
@ -896,7 +993,7 @@ mod tests {
..
} = setup_test_slot_boundary_banks();
// Test that with no bank being tracked, and no new bank being tracked, nothing is reported
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::Noop),
mem::discriminant(&action)
@ -916,8 +1013,8 @@ mod tests {
// Test case where the thread has not detected a leader bank, and now sees a leader bank.
// Metrics should not be reported because leader slot has not ended
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
let action =
leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::NewTracker(None)),
mem::discriminant(&action)
@ -941,12 +1038,12 @@ mod tests {
{
// Setup first_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
@ -959,7 +1056,7 @@ mod tests {
}
{
// Assert no-op if still no new bank
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::Noop),
mem::discriminant(&action)
@ -981,13 +1078,13 @@ mod tests {
{
// Setup with first_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert nop-op if same bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::Noop),
mem::discriminant(&action)
@ -996,7 +1093,7 @@ mod tests {
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
@ -1025,13 +1122,13 @@ mod tests {
{
// Setup with first_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if new bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&next_poh_recorder_bank));
.check_leader_slot_boundary(Some(&next_poh_recorder_bank), None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)),
mem::discriminant(&action)
@ -1044,7 +1141,7 @@ mod tests {
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)
@ -1072,13 +1169,13 @@ mod tests {
{
// Setup with next_bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&next_poh_recorder_bank));
.check_leader_slot_boundary(Some(&next_poh_recorder_bank), None);
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
}
{
// Assert reporting if new bank
let action = leader_slot_metrics_tracker
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)),
mem::discriminant(&action)
@ -1091,7 +1188,7 @@ mod tests {
}
{
// Assert reporting if slot has ended
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
assert_eq!(
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
mem::discriminant(&action)

View File

@ -193,6 +193,14 @@ impl UnprocessedPacketBatches {
self.packet_priority_queue.is_empty()
}
pub fn get_min_priority(&self) -> Option<u64> {
self.packet_priority_queue.peek_min().map(|x| x.priority())
}
pub fn get_max_priority(&self) -> Option<u64> {
self.packet_priority_queue.peek_max().map(|x| x.priority())
}
fn push_internal(&mut self, deserialized_packet: DeserializedPacket) {
// Push into the priority queue
self.packet_priority_queue

View File

@ -18,7 +18,7 @@ use {
bundle_stage::bundle_stage_leader_metrics::BundleStageLeaderMetrics,
immutable_deserialized_bundle::ImmutableDeserializedBundle,
},
itertools::Itertools,
itertools::{Itertools, MinMaxResult},
min_max_heap::MinMaxHeap,
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
solana_bundle::BundleExecutionError,
@ -296,6 +296,32 @@ impl UnprocessedTransactionStorage {
}
}
pub fn get_min_priority(&self) -> Option<u64> {
match self {
Self::VoteStorage(_) => None,
Self::LocalTransactionStorage(transaction_storage) => {
transaction_storage.get_min_priority()
}
Self::BundleStorage(bundle_storage) => bundle_storage
.get_minmax_priorization_fees()
.into_option()
.map(|x| x.0),
}
}
pub fn get_max_priority(&self) -> Option<u64> {
match self {
Self::VoteStorage(_) => None,
Self::LocalTransactionStorage(transaction_storage) => {
transaction_storage.get_max_priority()
}
Self::BundleStorage(bundle_storage) => bundle_storage
.get_minmax_priorization_fees()
.into_option()
.map(|x| x.1),
}
}
/// Returns the maximum number of packets a receive should accept
pub fn max_receive_size(&self) -> usize {
match self {
@ -622,6 +648,14 @@ impl ThreadLocalUnprocessedPackets {
self.unprocessed_packet_batches.len()
}
pub fn get_min_priority(&self) -> Option<u64> {
self.unprocessed_packet_batches.get_min_priority()
}
pub fn get_max_priority(&self) -> Option<u64> {
self.unprocessed_packet_batches.get_max_priority()
}
fn max_receive_size(&self) -> usize {
self.unprocessed_packet_batches.capacity() - self.unprocessed_packet_batches.len()
}
@ -1364,6 +1398,36 @@ impl BundleStorage {
sanitized_bundles
}
pub fn get_minmax_priorization_fees(&self) -> MinMaxResult<u64> {
let (min, max) = self
.unprocessed_bundle_storage
.iter()
.map(|bundle| {
bundle
.get_minmax_priorization_fees()
.into_option()
.unwrap_or((u64::MAX, u64::MIN))
})
.fold((u64::MAX, u64::MIN), |(a, b), (c, d)| {
(std::cmp::min(a, c), std::cmp::max(b, d))
});
let (min_c, max_c) = self
.cost_model_buffered_bundle_storage
.iter()
.map(|bundle| {
bundle
.get_minmax_priorization_fees()
.into_option()
.unwrap_or((u64::MAX, u64::MIN))
})
.fold((u64::MAX, u64::MIN), |(a, b), (c, d)| {
(std::cmp::min(a, c), std::cmp::max(b, d))
});
MinMaxResult::MinMax(std::cmp::min(min, min_c), std::cmp::max(max, max_c))
}
}
#[cfg(test)]

View File

@ -384,8 +384,8 @@ impl BundleStage {
let (decision, make_decision_time) =
measure!(decision_maker.make_consume_or_forward_decision());
let (metrics_action, banking_stage_metrics_action) =
bundle_stage_leader_metrics.check_leader_slot_boundary(decision.bank_start());
let (metrics_action, banking_stage_metrics_action) = bundle_stage_leader_metrics
.check_leader_slot_boundary(decision.bank_start(), Some(unprocessed_bundle_storage));
bundle_stage_leader_metrics
.leader_slot_metrics_tracker()
.increment_make_decision_us(make_decision_time.as_us());

View File

@ -16,6 +16,7 @@ use {
proxy::block_engine_stage::BlockBuilderFeeInfo,
tip_manager::TipManager,
},
itertools::Itertools,
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
solana_bundle::{
bundle_execution::{load_and_execute_bundle, BundleExecutionMetrics},
@ -27,8 +28,10 @@ use {
solana_poh::poh_recorder::{BankStart, RecordTransactionsSummary, TransactionRecorder},
solana_runtime::bank::Bank,
solana_sdk::{
borsh0_10::try_from_slice_unchecked,
bundle::SanitizedBundle,
clock::{Slot, MAX_PROCESSING_AGE},
compute_budget::{self, ComputeBudgetInstruction},
feature_set,
pubkey::Pubkey,
transaction::{self},
@ -508,6 +511,33 @@ impl BundleConsumer {
sanitized_bundle.transactions.len()
);
let minmax = sanitized_bundle
.transactions
.iter()
.filter_map(|transaction| {
let message = transaction.message();
for (program_id, instruction) in message.program_instructions_iter() {
if compute_budget::check_id(program_id) {
match try_from_slice_unchecked(&instruction.data) {
Ok(ComputeBudgetInstruction::SetComputeUnitPrice(micro_lamports)) => {
return Some(micro_lamports as usize);
}
Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
additional_fee,
..
}) => {
return Some(additional_fee as usize);
}
_ => {}
}
}
}
None
})
.minmax();
let (scheduled_max_prioritization_fees, scheduled_min_prioritization_fees) =
minmax.into_option().unwrap_or_default();
let (
(transaction_qos_cost_results, _cost_model_throttled_transactions_count),
cost_model_elapsed_us,
@ -572,6 +602,8 @@ impl BundleConsumer {
cost_model_us: cost_model_elapsed_us,
execute_and_commit_timings: result.execute_and_commit_timings,
error_counters: result.transaction_error_counter,
scheduled_min_prioritization_fees,
scheduled_max_prioritization_fees,
});
match result.result {

View File

@ -1,6 +1,9 @@
use {
crate::{
banking_stage::{leader_slot_metrics, leader_slot_metrics::LeaderSlotMetricsTracker},
banking_stage::{
leader_slot_metrics, leader_slot_metrics::LeaderSlotMetricsTracker,
unprocessed_transaction_storage::UnprocessedTransactionStorage,
},
immutable_deserialized_bundle::DeserializedBundleError,
},
solana_bundle::{bundle_execution::LoadAndExecuteBundleError, BundleExecutionError},
@ -31,13 +34,14 @@ impl BundleStageLeaderMetrics {
pub(crate) fn check_leader_slot_boundary(
&mut self,
bank_start: Option<&BankStart>,
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
) -> (
leader_slot_metrics::MetricsTrackerAction,
MetricsTrackerAction,
) {
let banking_stage_metrics_action = self
.leader_slot_metrics_tracker
.check_leader_slot_boundary(bank_start);
.check_leader_slot_boundary(bank_start, unprocessed_transaction_storage);
let bundle_stage_metrics_action = self
.bundle_stage_metrics_tracker
.check_leader_slot_boundary(bank_start);

View File

@ -3,6 +3,7 @@ use {
banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket,
packet_bundle::PacketBundle,
},
itertools::{Itertools, MinMaxResult},
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
solana_perf::sigverify::verify_packet,
solana_runtime::bank::Bank,
@ -166,6 +167,10 @@ impl ImmutableDeserializedBundle {
bundle_id: self.bundle_id.clone(),
})
}
pub fn get_minmax_priorization_fees(&self) -> MinMaxResult<u64> {
self.packets.iter().map(|packet| packet.priority()).minmax()
}
}
#[cfg(test)]

View File

@ -73,7 +73,7 @@ use {
saturating_add_assign,
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
transaction::{BankingTransactionResultNotifier, Transaction},
},
solana_vote::vote_sender_types::ReplayVoteSender,
solana_vote_program::vote_state::VoteTransaction,
@ -249,6 +249,7 @@ pub struct ReplayStageConfig {
// duplicate voting which can lead to slashing.
pub wait_to_vote_slot: Option<Slot>,
pub replay_slots_concurrently: bool,
pub banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
}
#[derive(Default)]
@ -531,6 +532,7 @@ impl ReplayStage {
tower_storage,
wait_to_vote_slot,
replay_slots_concurrently,
banking_transaction_result_notifier: banking_transaction_result_notifier_lock,
} = config;
trace!("replay stage");
@ -1055,6 +1057,7 @@ impl ReplayStage {
&banking_tracer,
has_new_vote_been_rooted,
transaction_status_sender.is_some(),
banking_transaction_result_notifier_lock.clone(),
);
let poh_bank = poh_recorder.read().unwrap().bank();
@ -1881,6 +1884,7 @@ impl ReplayStage {
banking_tracer: &Arc<BankingTracer>,
has_new_vote_been_rooted: bool,
track_transaction_indexes: bool,
banking_transaction_result_notifier_lock: Option<BankingTransactionResultNotifier>,
) {
// all the individual calls to poh_recorder.read() are designed to
// increase granularity, decrease contention
@ -1991,7 +1995,7 @@ impl ReplayStage {
false
};
let tpu_bank = Self::new_bank_from_parent_with_notify(
let mut tpu_bank = Self::new_bank_from_parent_with_notify(
parent.clone(),
poh_slot,
root_slot,
@ -1999,6 +2003,11 @@ impl ReplayStage {
rpc_subscriptions,
NewBankOptions { vote_only_bank },
);
if banking_transaction_result_notifier_lock.is_some() {
tpu_bank.set_banking_transaction_results_notifier(
banking_transaction_result_notifier_lock,
);
}
// make sure parent is frozen for finalized hashes via the above
// new()-ing of its child bank
banking_tracer.hash_event(parent.slot(), &parent.last_blockhash(), &parent.hash());

View File

@ -45,7 +45,10 @@ use {
accounts_background_service::AbsRequestSender, bank_forks::BankForks,
commitment::BlockCommitmentCache, prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
solana_sdk::{
clock::Slot, pubkey::Pubkey, signature::Keypair,
transaction::BankingTransactionResultNotifier,
},
solana_turbine::retransmit_stage::RetransmitStage,
solana_vote::vote_sender_types::ReplayVoteSender,
std::{
@ -140,6 +143,7 @@ impl Tvu {
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
shred_receiver_addr: Arc<RwLock<Option<SocketAddr>>>,
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
@ -259,6 +263,7 @@ impl Tvu {
tower_storage: tower_storage.clone(),
wait_to_vote_slot,
replay_slots_concurrently: tvu_config.replay_slots_concurrently,
banking_transaction_result_notifier,
};
let (voting_sender, voting_receiver) = unbounded();
@ -495,6 +500,7 @@ pub mod tests {
turbine_quic_endpoint_receiver,
repair_quic_endpoint_sender,
Arc::new(RwLock::new(None)),
None,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);

View File

@ -665,6 +665,13 @@ impl Validator {
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());
let banking_transaction_results_notifier =
geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| {
geyser_plugin_service.get_banking_transaction_result_notifier()
});
info!(
"Geyser plugin: accounts_update_notifier: {}, \
transaction_notifier: {}, \
@ -1311,6 +1318,7 @@ impl Validator {
turbine_quic_endpoint_receiver,
repair_quic_endpoint_sender,
config.shred_receiver_address.clone(),
banking_transaction_results_notifier,
)?;
let tpu = Tpu::new(

View File

@ -6,7 +6,7 @@ use {
solana_sdk::{
clock::{Slot, UnixTimestamp},
signature::Signature,
transaction::SanitizedTransaction,
transaction::{SanitizedTransaction, TransactionError},
},
solana_transaction_status::{Reward, TransactionStatusMeta},
std::{any::Any, error, io},
@ -355,6 +355,17 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
Ok(())
}
/// Called when we get banking stage errors.
#[allow(unused_variables)]
fn notify_banking_stage_transaction_results(
&self,
transaction: &SanitizedTransaction,
error: Option<TransactionError>,
slot: Slot,
) -> Result<()> {
Ok(())
}
/// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in
/// account data, please return false.
@ -375,4 +386,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn entry_notifications_enabled(&self) -> bool {
false
}
/// Check if the plugin is interesed in transaction errors duing banking
/// stage Default is false -- if the plugin is interested in naking
/// stage errors
fn banking_transaction_results_notifications_enabled(&self) -> bool {
false
}
}

View File

@ -0,0 +1,67 @@
use {
crate::geyser_plugin_manager::GeyserPluginManager,
log::{error, log_enabled, trace},
solana_measure::measure::Measure,
solana_metrics::{create_counter, inc_counter, inc_new_counter, inc_new_counter_debug},
solana_sdk::{
slot_history::Slot,
transaction::{SanitizedTransaction, TransactionError, TransactionResultNotifier},
},
std::sync::{Arc, RwLock},
};
#[derive(Debug)]
pub(crate) struct BankingTransactionResultImpl {
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
}
impl TransactionResultNotifier for BankingTransactionResultImpl {
fn notify_banking_transaction_result(
&self,
transaction: &SanitizedTransaction,
result: Option<TransactionError>,
slot: Slot,
) {
let mut measure = Measure::start("geyser-plugin-notify_plugins_of_entry_info");
let plugin_manager = self.plugin_manager.read().unwrap();
if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter() {
if !plugin.banking_transaction_results_notifications_enabled() {
continue;
}
match plugin.notify_banking_stage_transaction_results(transaction, result.clone(), slot)
{
Err(err) => {
error!(
"Failed to notify banking transaction result, error: ({}) to plugin {}",
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully notified banking transaction result to plugin {}",
plugin.name()
);
}
}
}
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-notify_plugins_of_banking_transaction_info-us",
measure.as_us() as usize,
10000,
10000
);
}
}
impl BankingTransactionResultImpl {
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
Self { plugin_manager }
}
}

View File

@ -64,6 +64,15 @@ impl GeyserPluginManager {
false
}
pub fn banking_transaction_result_notification_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.banking_transaction_results_notifications_enabled() {
return true;
}
}
false
}
/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())

View File

@ -1,6 +1,7 @@
use {
crate::{
accounts_update_notifier::AccountsUpdateNotifierImpl,
banking_transaction_result_notifier::BankingTransactionResultImpl,
block_metadata_notifier::BlockMetadataNotifierImpl,
block_metadata_notifier_interface::BlockMetadataNotifierLock,
entry_notifier::EntryNotifierImpl,
@ -17,6 +18,7 @@ use {
optimistically_confirmed_bank_tracker::SlotNotification,
transaction_notifier_interface::TransactionNotifierLock,
},
solana_sdk::transaction::BankingTransactionResultNotifier,
std::{
path::{Path, PathBuf},
sync::{
@ -37,6 +39,7 @@ pub struct GeyserPluginService {
transaction_notifier: Option<TransactionNotifierLock>,
entry_notifier: Option<EntryNotifierLock>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
}
impl GeyserPluginService {
@ -81,6 +84,8 @@ impl GeyserPluginService {
plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let entry_notifications_enabled = plugin_manager.entry_notifications_enabled();
let banking_stage_transaction_result_notification =
plugin_manager.banking_transaction_result_notification_enabled();
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
@ -107,6 +112,17 @@ impl GeyserPluginService {
None
};
let transaction_result_notifier: Option<BankingTransactionResultNotifier> =
if banking_stage_transaction_result_notification {
let banking_transaction_result_notifier =
BankingTransactionResultImpl::new(plugin_manager.clone());
Some(BankingTransactionResultNotifier {
lock: Arc::new(RwLock::new(banking_transaction_result_notifier)),
})
} else {
None
};
let (slot_status_observer, block_metadata_notifier): (
Option<SlotStatusObserver>,
Option<BlockMetadataNotifierLock>,
@ -143,6 +159,7 @@ impl GeyserPluginService {
transaction_notifier,
entry_notifier,
block_metadata_notifier,
banking_transaction_result_notifier: transaction_result_notifier,
})
}
@ -168,6 +185,12 @@ impl GeyserPluginService {
self.entry_notifier.clone()
}
pub fn get_banking_transaction_result_notifier(
&self,
) -> Option<BankingTransactionResultNotifier> {
self.banking_transaction_result_notifier.clone()
}
pub fn get_block_metadata_notifier(&self) -> Option<BlockMetadataNotifierLock> {
self.block_metadata_notifier.clone()
}

View File

@ -1,4 +1,5 @@
pub mod accounts_update_notifier;
pub mod banking_transaction_result_notifier;
pub mod block_metadata_notifier;
pub mod block_metadata_notifier_interface;
pub mod entry_notifier;
@ -7,5 +8,4 @@ pub mod geyser_plugin_service;
pub mod slot_status_notifier;
pub mod slot_status_observer;
pub mod transaction_notifier;
pub use geyser_plugin_manager::GeyserPluginManagerRequest;

@ -1 +1 @@
Subproject commit 4a23d81bd621084d3b8596b35e0457b954a74f98
Subproject commit 180be58cafcbbe3dac66bbe6428579d58d58f683

View File

@ -167,8 +167,9 @@ use {
sysvar::{self, last_restart_slot::LastRestartSlot, Sysvar, SysvarId},
timing::years_as_slots,
transaction::{
self, MessageHash, Result, SanitizedTransaction, Transaction, TransactionError,
TransactionVerificationMode, VersionedTransaction, MAX_TX_ACCOUNT_LOCKS,
self, BankingTransactionResultNotifier, MessageHash, Result, SanitizedTransaction,
Transaction, TransactionError, TransactionVerificationMode, VersionedTransaction,
MAX_TX_ACCOUNT_LOCKS,
},
transaction_context::{
ExecutionRecord, TransactionAccount, TransactionContext, TransactionReturnData,
@ -583,6 +584,7 @@ impl PartialEq for Bank {
loaded_programs_cache: _,
check_program_modification_slot: _,
epoch_reward_status: _,
banking_transaction_result_notifier: _,
// Ignore new fields explicitly if they do not impact PartialEq.
// Adding ".." will remove compile-time checks that if a new field
// is added to the struct, this PartialEq is accordingly updated.
@ -841,6 +843,9 @@ pub struct Bank {
pub check_program_modification_slot: bool,
epoch_reward_status: EpochRewardStatus,
/// geyser plugin to notify transaction results
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
}
struct VoteWithStakeDelegations {
@ -1091,6 +1096,7 @@ impl Bank {
))),
check_program_modification_slot: false,
epoch_reward_status: EpochRewardStatus::default(),
banking_transaction_result_notifier: None,
};
let accounts_data_size_initial = bank.get_total_accounts_stats().unwrap().data_len as u64;
@ -1442,6 +1448,7 @@ impl Bank {
loaded_programs_cache: parent.loaded_programs_cache.clone(),
check_program_modification_slot: false,
epoch_reward_status: parent.epoch_reward_status.clone(),
banking_transaction_result_notifier: None,
};
let (_, ancestors_time_us) = measure_us!({
@ -1943,6 +1950,7 @@ impl Bank {
))),
check_program_modification_slot: false,
epoch_reward_status: EpochRewardStatus::default(),
banking_transaction_result_notifier: None,
};
bank.finish_init(
genesis_config,
@ -4194,6 +4202,9 @@ impl Bank {
let mut status_cache = self.status_cache.write().unwrap();
assert_eq!(sanitized_txs.len(), execution_results.len());
for (tx, execution_result) in sanitized_txs.iter().zip(execution_results) {
if let TransactionExecutionResult::NotExecuted(err) = execution_result {
self.notify_transaction_error(tx, Some(err.clone()));
}
if let Some(details) = execution_result.details() {
// Add the message hash to the status cache to ensure that this message
// won't be processed again with a different signature.
@ -4565,6 +4576,7 @@ impl Bank {
(Ok(()), Some(NoncePartial::new(address, account)))
} else {
error_counters.blockhash_not_found += 1;
self.notify_transaction_error(tx, Some(TransactionError::BlockhashNotFound));
(Err(TransactionError::BlockhashNotFound), None)
}
}
@ -4596,6 +4608,10 @@ impl Bank {
&& self.is_transaction_already_processed(sanitized_tx, &rcache)
{
error_counters.already_processed += 1;
self.notify_transaction_error(
sanitized_tx,
Some(TransactionError::AlreadyProcessed),
);
return (Err(TransactionError::AlreadyProcessed), None);
}
@ -5055,6 +5071,7 @@ impl Bank {
error_counters.instruction_error += 1;
}
}
self.notify_transaction_error(tx, Some(err.clone()));
err
});
@ -5086,6 +5103,7 @@ impl Bank {
.is_none()
{
status = Err(TransactionError::UnbalancedTransaction);
self.notify_transaction_error(tx, Some(TransactionError::UnbalancedTransaction));
}
let mut accounts_data_len_delta = status
.as_ref()
@ -5210,6 +5228,23 @@ impl Bank {
loaded_programs_for_txs.unwrap()
}
pub fn notify_transaction_error(
&self,
transaction: &SanitizedTransaction,
result: Option<TransactionError>,
) {
if let Some(transaction_result_notifier_lock) = &self.banking_transaction_result_notifier {
let transaction_error_notifier = transaction_result_notifier_lock.lock.read();
if let Ok(transaction_error_notifier) = transaction_error_notifier {
transaction_error_notifier.notify_banking_transaction_result(
transaction,
result,
self.slot,
);
}
}
}
#[allow(clippy::type_complexity)]
pub fn load_and_execute_transactions(
&self,
@ -5262,6 +5297,23 @@ impl Bank {
})
.collect();
if let Some(transaction_result_notifier_lock) = &self.banking_transaction_result_notifier {
let transaction_error_notifier = transaction_result_notifier_lock.lock.read();
if let Ok(transaction_error_notifier) = transaction_error_notifier {
batch
.sanitized_transactions()
.iter()
.zip(batch.lock_results())
.for_each(|(transaction, result)| {
transaction_error_notifier.notify_banking_transaction_result(
transaction,
result.clone().err(),
self.slot,
);
});
}
}
let mut check_time = Measure::start("check_transactions");
let mut check_results = self.check_transactions(
sanitized_txs,
@ -8311,6 +8363,17 @@ impl Bank {
}
false
}
pub fn set_banking_transaction_results_notifier(
&mut self,
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
) {
self.banking_transaction_result_notifier = banking_transaction_result_notifier;
}
pub fn has_banking_transaction_results_notifier(&self) -> bool {
self.banking_transaction_result_notifier.is_some()
}
}
/// Compute how much an account has changed size. This function is useful when the data size delta

View File

@ -3,8 +3,14 @@ use {
instruction::InstructionError,
message::{AddressLoaderError, SanitizeMessageError},
sanitize::SanitizeError,
slot_history::Slot,
transaction::SanitizedTransaction,
},
serde::Serialize,
std::{
fmt::Debug,
sync::{Arc, RwLock},
},
thiserror::Error,
};
@ -198,3 +204,42 @@ impl From<AddressLoaderError> for TransactionError {
}
}
}
pub trait TransactionResultNotifier: Debug {
fn notify_banking_transaction_result(
&self,
transaction: &SanitizedTransaction,
error: Option<TransactionError>,
slot: Slot,
);
}
#[derive(Clone, Debug)]
pub struct BankingTransactionResultNotifier {
pub lock: Arc<RwLock<dyn TransactionResultNotifier + Sync + Send>>,
}
#[cfg(RUSTC_WITH_SPECIALIZATION)]
#[derive(Debug)]
struct DummyTransactionResultNotifier {}
#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl TransactionResultNotifier for DummyTransactionResultNotifier {
fn notify_banking_transaction_result(
&self,
_: &SanitizedTransaction,
_: Option<TransactionError>,
_: Slot,
) {
}
}
#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl solana_frozen_abi::abi_example::AbiExample for BankingTransactionResultNotifier {
fn example() -> Self {
// BankingTransactionResultNotifier isn't serializable by definition.
BankingTransactionResultNotifier {
lock: Arc::new(RwLock::new(DummyTransactionResultNotifier {})),
}
}
}

View File

@ -149,6 +149,7 @@ async fn run_server(
));
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked)));
let (sender, receiver) = async_unbounded();
tokio::spawn(packet_batch_sender(
packet_sender,
@ -980,6 +981,15 @@ pub enum ConnectionPeerType {
Staked,
}
impl ConnectionPeerType {
pub fn to_string(&self) -> String {
match self {
ConnectionPeerType::Unstaked => "unstaked".to_string(),
ConnectionPeerType::Staked => "staked".to_string(),
}
}
}
#[derive(Copy, Clone, Eq, Hash, PartialEq)]
enum ConnectionTableKey {
IP(IpAddr),
@ -992,6 +1002,13 @@ impl ConnectionTableKey {
ConnectionTableKey::Pubkey(pubkey)
})
}
pub fn to_string(&self) -> String {
match self {
ConnectionTableKey::IP(ip) => format!("ip:{}", ip),
ConnectionTableKey::Pubkey(pubkey) => format!("pubkey:{}", pubkey.to_string()),
}
}
}
// Map of IP to list of connection entries
@ -1022,7 +1039,14 @@ impl ConnectionTable {
None => break,
Some((index, connections)) => {
num_pruned += connections.len();
self.table.swap_remove_index(index);
if let Some((k, v)) = self.table.swap_remove_index(index) {
log::info!(
"QUIC Connection dropped {}, {}, count : {} ",
self.peer_type.to_string(),
k.to_string(),
v.len()
);
}
}
}
}
@ -1049,7 +1073,19 @@ impl ConnectionTable {
.take(sample_size)
.min_by_key(|&(_, stake)| stake)
.filter(|&(_, stake)| stake < Some(threshold_stake))
.and_then(|(index, _)| self.table.swap_remove_index(index))
.and_then(|(index, _)| {
if let Some((k, v)) = self.table.swap_remove_index(index) {
log::info!(
"QUIC Connection dropped {}, {}, count : {} ",
self.peer_type.to_string(),
k.to_string(),
v.len()
);
Some((k, v))
} else {
None
}
})
.map(|(_, connections)| connections.len())
.unwrap_or_default();
self.total_size = self.total_size.saturating_sub(num_pruned);
@ -1072,6 +1108,12 @@ impl ConnectionTable {
.map(|c| c <= max_connections_per_peer)
.unwrap_or(false);
if has_connection_capacity {
log::info!(
"QUIC Connection added {}, {}, count : {} ",
self.peer_type.to_string(),
key.to_string(),
connection_entry.len() + 1
);
let exit = Arc::new(AtomicBool::new(false));
let last_update = Arc::new(AtomicU64::new(last_update));
connection_entry.push(ConnectionEntry::new(
@ -1119,6 +1161,13 @@ impl ConnectionTable {
}
let connections_removed = old_size.saturating_sub(new_size);
self.total_size = self.total_size.saturating_sub(connections_removed);
log::info!(
"QUIC Connection added {}, {}, count : {} ",
self.peer_type.to_string(),
key.to_string(),
new_size
);
connections_removed
} else {
0