Add mango patches to jito / v1.17.15, hash c1040b012
This commit is contained in:
parent
405550972d
commit
5eb1fdf4ec
|
@ -1,7 +1,6 @@
|
|||
//! The `banking_stage` processes Transaction messages. It is intended to be used
|
||||
//! to construct a software pipeline. The stage uses all available CPU cores and
|
||||
//! can do its processing in parallel with signature verification on the GPU.
|
||||
|
||||
use {
|
||||
self::{
|
||||
committer::Committer,
|
||||
|
@ -504,7 +503,10 @@ impl BankingStage {
|
|||
}
|
||||
let (decision, make_decision_time) =
|
||||
measure!(decision_maker.make_consume_or_forward_decision());
|
||||
let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(decision.bank_start());
|
||||
let metrics_action = slot_metrics_tracker.check_leader_slot_boundary(
|
||||
decision.bank_start(),
|
||||
Some(unprocessed_transaction_storage),
|
||||
);
|
||||
slot_metrics_tracker.increment_make_decision_us(make_decision_time.as_us());
|
||||
|
||||
match decision {
|
||||
|
|
|
@ -25,7 +25,9 @@ use {
|
|||
transaction_batch::TransactionBatch,
|
||||
},
|
||||
solana_sdk::{
|
||||
borsh0_10::try_from_slice_unchecked,
|
||||
clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE},
|
||||
compute_budget::{self, ComputeBudgetInstruction},
|
||||
feature_set,
|
||||
pubkey::Pubkey,
|
||||
saturating_add_assign,
|
||||
|
@ -68,6 +70,8 @@ pub struct ExecuteAndCommitTransactionsOutput {
|
|||
pub commit_transactions_result: Result<Vec<CommitTransactionDetails>, PohRecorderError>,
|
||||
execute_and_commit_timings: LeaderExecuteAndCommitTimings,
|
||||
error_counters: TransactionErrorMetrics,
|
||||
scheduled_min_prioritization_fees: usize,
|
||||
scheduled_max_prioritization_fees: usize,
|
||||
}
|
||||
|
||||
pub struct Consumer {
|
||||
|
@ -297,6 +301,8 @@ impl Consumer {
|
|||
let mut total_execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
|
||||
let mut total_error_counters = TransactionErrorMetrics::default();
|
||||
let mut reached_max_poh_height = false;
|
||||
let mut total_scheduled_min_prioritization_fees: usize = usize::MAX;
|
||||
let mut total_scheduled_max_prioritization_fees: usize = 0;
|
||||
while chunk_start != transactions.len() {
|
||||
let chunk_end = std::cmp::min(
|
||||
transactions.len(),
|
||||
|
@ -327,6 +333,8 @@ impl Consumer {
|
|||
commit_transactions_result: new_commit_transactions_result,
|
||||
execute_and_commit_timings: new_execute_and_commit_timings,
|
||||
error_counters: new_error_counters,
|
||||
scheduled_min_prioritization_fees,
|
||||
scheduled_max_prioritization_fees,
|
||||
..
|
||||
} = execute_and_commit_transactions_output;
|
||||
|
||||
|
@ -336,6 +344,14 @@ impl Consumer {
|
|||
total_transactions_attempted_execution_count,
|
||||
new_transactions_attempted_execution_count
|
||||
);
|
||||
total_scheduled_min_prioritization_fees = std::cmp::min(
|
||||
total_scheduled_min_prioritization_fees,
|
||||
scheduled_min_prioritization_fees,
|
||||
);
|
||||
total_scheduled_max_prioritization_fees = std::cmp::min(
|
||||
total_scheduled_max_prioritization_fees,
|
||||
scheduled_max_prioritization_fees,
|
||||
);
|
||||
|
||||
trace!(
|
||||
"process_transactions result: {:?}",
|
||||
|
@ -396,6 +412,8 @@ impl Consumer {
|
|||
cost_model_us: total_cost_model_us,
|
||||
execute_and_commit_timings: total_execute_and_commit_timings,
|
||||
error_counters: total_error_counters,
|
||||
scheduled_min_prioritization_fees: total_scheduled_min_prioritization_fees,
|
||||
scheduled_max_prioritization_fees: total_scheduled_max_prioritization_fees,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -425,7 +443,13 @@ impl Consumer {
|
|||
// Re-sanitized transaction should be equal to the original transaction,
|
||||
// but whether it will pass sanitization needs to be checked.
|
||||
let resanitized_tx =
|
||||
bank.fully_verify_transaction(tx.to_versioned_transaction())?;
|
||||
match bank.fully_verify_transaction(tx.to_versioned_transaction()) {
|
||||
Ok(resanitized_tx) => resanitized_tx,
|
||||
Err(e) => {
|
||||
bank.notify_transaction_error(tx, Some(e.clone()));
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
if resanitized_tx != *tx {
|
||||
// Sanitization before/after epoch give different transaction data - do not execute.
|
||||
return Err(TransactionError::ResanitizationNeeded);
|
||||
|
@ -563,6 +587,33 @@ impl Consumer {
|
|||
});
|
||||
execute_and_commit_timings.collect_balances_us = collect_balances_us;
|
||||
|
||||
let min_max = batch
|
||||
.sanitized_transactions()
|
||||
.iter()
|
||||
.filter_map(|transaction| {
|
||||
let message = transaction.message();
|
||||
for (program_id, instruction) in message.program_instructions_iter() {
|
||||
if compute_budget::check_id(program_id) {
|
||||
match try_from_slice_unchecked(&instruction.data) {
|
||||
Ok(ComputeBudgetInstruction::SetComputeUnitPrice(micro_lamports)) => {
|
||||
return Some(micro_lamports);
|
||||
}
|
||||
Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
|
||||
additional_fee,
|
||||
..
|
||||
}) => {
|
||||
return Some(additional_fee as u64);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.minmax();
|
||||
let (scheduled_min_prioritization_fees, scheduled_max_prioritization_fees) =
|
||||
min_max.into_option().unwrap_or_default();
|
||||
|
||||
let (load_and_execute_transactions_output, load_execute_us) = measure_us!(bank
|
||||
.load_and_execute_transactions(
|
||||
batch,
|
||||
|
@ -638,6 +689,8 @@ impl Consumer {
|
|||
commit_transactions_result: Err(recorder_err),
|
||||
execute_and_commit_timings,
|
||||
error_counters,
|
||||
scheduled_min_prioritization_fees: scheduled_min_prioritization_fees as usize,
|
||||
scheduled_max_prioritization_fees: scheduled_max_prioritization_fees as usize,
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -693,6 +746,8 @@ impl Consumer {
|
|||
commit_transactions_result: Ok(commit_transaction_statuses),
|
||||
execute_and_commit_timings,
|
||||
error_counters,
|
||||
scheduled_min_prioritization_fees: scheduled_min_prioritization_fees as usize,
|
||||
scheduled_max_prioritization_fees: scheduled_max_prioritization_fees as usize,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
use {
|
||||
super::{
|
||||
leader_slot_timing_metrics::{LeaderExecuteAndCommitTimings, LeaderSlotTimingMetrics},
|
||||
unprocessed_transaction_storage::InsertPacketBatchSummary,
|
||||
unprocessed_transaction_storage::{
|
||||
InsertPacketBatchSummary, UnprocessedTransactionStorage,
|
||||
},
|
||||
},
|
||||
solana_accounts_db::transaction_error_metrics::*,
|
||||
solana_poh::poh_recorder::BankStart,
|
||||
|
@ -52,6 +54,53 @@ pub(crate) struct ProcessTransactionsSummary {
|
|||
|
||||
// Breakdown of all the transaction errors from transactions passed for execution
|
||||
pub error_counters: TransactionErrorMetrics,
|
||||
|
||||
pub scheduled_min_prioritization_fees: usize,
|
||||
pub scheduled_max_prioritization_fees: usize,
|
||||
}
|
||||
|
||||
// Metrics describing prioritization fee information for each transaction storage before processing transactions
|
||||
#[derive(Debug, Default)]
|
||||
struct LeaderPrioritizationFeesMetrics {
|
||||
// minimum prioritization fees in the MinMaxHeap
|
||||
min_prioritization_fees_per_cu: u64,
|
||||
// maximum prioritization fees in the MinMaxHeap
|
||||
max_prioritization_fees_per_cu: u64,
|
||||
}
|
||||
|
||||
impl LeaderPrioritizationFeesMetrics {
|
||||
fn new(unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>) -> Self {
|
||||
if let Some(unprocessed_transaction_storage) = unprocessed_transaction_storage {
|
||||
Self {
|
||||
min_prioritization_fees_per_cu: unprocessed_transaction_storage
|
||||
.get_min_priority()
|
||||
.unwrap_or_default(),
|
||||
max_prioritization_fees_per_cu: unprocessed_transaction_storage
|
||||
.get_max_priority()
|
||||
.unwrap_or_default(),
|
||||
}
|
||||
} else {
|
||||
Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn report(&self, id: u32, slot: Slot) {
|
||||
datapoint_info!(
|
||||
"banking_stage-leader_prioritization_fees_info_by_thread",
|
||||
("id", id as i64, i64),
|
||||
("slot", slot as i64, i64),
|
||||
(
|
||||
"min_prioritization_fees_per_cu",
|
||||
self.min_prioritization_fees_per_cu as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"max_prioritization_fees_per_cu",
|
||||
self.max_prioritization_fees_per_cu as i64,
|
||||
i64
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Metrics describing packets ingested/processed in various parts of BankingStage during this
|
||||
|
@ -138,6 +187,11 @@ struct LeaderSlotPacketCountMetrics {
|
|||
// total number of forwardable batches that were attempted for forwarding. A forwardable batch
|
||||
// is defined in `ForwardPacketBatchesByAccounts` in `forward_packet_batches_by_accounts.rs`
|
||||
forwardable_batches_count: u64,
|
||||
|
||||
// min prioritization fees for scheduled transactions
|
||||
scheduled_min_prioritization_fees: u64,
|
||||
// max prioritization fees for scheduled transactions
|
||||
scheduled_max_prioritization_fees: u64,
|
||||
}
|
||||
|
||||
impl LeaderSlotPacketCountMetrics {
|
||||
|
@ -255,6 +309,16 @@ impl LeaderSlotPacketCountMetrics {
|
|||
self.end_of_slot_unprocessed_buffer_len as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"scheduled_min_prioritization_fees",
|
||||
self.scheduled_min_prioritization_fees as i64,
|
||||
i64
|
||||
),
|
||||
(
|
||||
"scheduled_max_prioritization_fees",
|
||||
self.scheduled_max_prioritization_fees as i64,
|
||||
i64
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -277,12 +341,19 @@ pub(crate) struct LeaderSlotMetrics {
|
|||
|
||||
timing_metrics: LeaderSlotTimingMetrics,
|
||||
|
||||
prioritization_fees_metric: LeaderPrioritizationFeesMetrics,
|
||||
|
||||
// Used by tests to check if the `self.report()` method was called
|
||||
is_reported: bool,
|
||||
}
|
||||
|
||||
impl LeaderSlotMetrics {
|
||||
pub(crate) fn new(id: u32, slot: Slot, bank_creation_time: &Instant) -> Self {
|
||||
pub(crate) fn new(
|
||||
id: u32,
|
||||
slot: Slot,
|
||||
bank_creation_time: &Instant,
|
||||
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
slot,
|
||||
|
@ -290,6 +361,9 @@ impl LeaderSlotMetrics {
|
|||
transaction_error_metrics: TransactionErrorMetrics::new(),
|
||||
vote_packet_count_metrics: VotePacketCountMetrics::new(),
|
||||
timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time),
|
||||
prioritization_fees_metric: LeaderPrioritizationFeesMetrics::new(
|
||||
unprocessed_transaction_storage,
|
||||
),
|
||||
is_reported: false,
|
||||
}
|
||||
}
|
||||
|
@ -301,6 +375,7 @@ impl LeaderSlotMetrics {
|
|||
self.transaction_error_metrics.report(self.id, self.slot);
|
||||
self.packet_count_metrics.report(self.id, self.slot);
|
||||
self.vote_packet_count_metrics.report(self.id, self.slot);
|
||||
self.prioritization_fees_metric.report(self.id, self.slot);
|
||||
}
|
||||
|
||||
/// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None
|
||||
|
@ -372,6 +447,7 @@ impl LeaderSlotMetricsTracker {
|
|||
pub(crate) fn check_leader_slot_boundary(
|
||||
&mut self,
|
||||
bank_start: Option<&BankStart>,
|
||||
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
|
||||
) -> MetricsTrackerAction {
|
||||
match (self.leader_slot_metrics.as_mut(), bank_start) {
|
||||
(None, None) => MetricsTrackerAction::Noop,
|
||||
|
@ -387,6 +463,7 @@ impl LeaderSlotMetricsTracker {
|
|||
self.id,
|
||||
bank_start.working_bank.slot(),
|
||||
&bank_start.bank_creation_time,
|
||||
unprocessed_transaction_storage,
|
||||
)))
|
||||
}
|
||||
|
||||
|
@ -398,6 +475,7 @@ impl LeaderSlotMetricsTracker {
|
|||
self.id,
|
||||
bank_start.working_bank.slot(),
|
||||
&bank_start.bank_creation_time,
|
||||
unprocessed_transaction_storage,
|
||||
)))
|
||||
} else {
|
||||
MetricsTrackerAction::Noop
|
||||
|
@ -449,6 +527,8 @@ impl LeaderSlotMetricsTracker {
|
|||
cost_model_us,
|
||||
ref execute_and_commit_timings,
|
||||
error_counters,
|
||||
scheduled_min_prioritization_fees,
|
||||
scheduled_max_prioritization_fees,
|
||||
..
|
||||
} = process_transactions_summary;
|
||||
|
||||
|
@ -525,6 +605,23 @@ impl LeaderSlotMetricsTracker {
|
|||
*cost_model_us
|
||||
);
|
||||
|
||||
leader_slot_metrics
|
||||
.packet_count_metrics
|
||||
.scheduled_min_prioritization_fees = std::cmp::min(
|
||||
leader_slot_metrics
|
||||
.packet_count_metrics
|
||||
.scheduled_min_prioritization_fees,
|
||||
*scheduled_min_prioritization_fees as u64,
|
||||
);
|
||||
leader_slot_metrics
|
||||
.packet_count_metrics
|
||||
.scheduled_max_prioritization_fees = std::cmp::min(
|
||||
leader_slot_metrics
|
||||
.packet_count_metrics
|
||||
.scheduled_max_prioritization_fees,
|
||||
*scheduled_max_prioritization_fees as u64,
|
||||
);
|
||||
|
||||
leader_slot_metrics
|
||||
.timing_metrics
|
||||
.execute_and_commit_timings
|
||||
|
@ -896,7 +993,7 @@ mod tests {
|
|||
..
|
||||
} = setup_test_slot_boundary_banks();
|
||||
// Test that with no bank being tracked, and no new bank being tracked, nothing is reported
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::Noop),
|
||||
mem::discriminant(&action)
|
||||
|
@ -916,8 +1013,8 @@ mod tests {
|
|||
// Test case where the thread has not detected a leader bank, and now sees a leader bank.
|
||||
// Metrics should not be reported because leader slot has not ended
|
||||
assert!(leader_slot_metrics_tracker.leader_slot_metrics.is_none());
|
||||
let action =
|
||||
leader_slot_metrics_tracker.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
|
||||
let action = leader_slot_metrics_tracker
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::NewTracker(None)),
|
||||
mem::discriminant(&action)
|
||||
|
@ -941,12 +1038,12 @@ mod tests {
|
|||
{
|
||||
// Setup first_bank
|
||||
let action = leader_slot_metrics_tracker
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
|
||||
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
|
||||
}
|
||||
{
|
||||
// Assert reporting if slot has ended
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
|
||||
mem::discriminant(&action)
|
||||
|
@ -959,7 +1056,7 @@ mod tests {
|
|||
}
|
||||
{
|
||||
// Assert no-op if still no new bank
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::Noop),
|
||||
mem::discriminant(&action)
|
||||
|
@ -981,13 +1078,13 @@ mod tests {
|
|||
{
|
||||
// Setup with first_bank
|
||||
let action = leader_slot_metrics_tracker
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
|
||||
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
|
||||
}
|
||||
{
|
||||
// Assert nop-op if same bank
|
||||
let action = leader_slot_metrics_tracker
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::Noop),
|
||||
mem::discriminant(&action)
|
||||
|
@ -996,7 +1093,7 @@ mod tests {
|
|||
}
|
||||
{
|
||||
// Assert reporting if slot has ended
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
|
||||
mem::discriminant(&action)
|
||||
|
@ -1025,13 +1122,13 @@ mod tests {
|
|||
{
|
||||
// Setup with first_bank
|
||||
let action = leader_slot_metrics_tracker
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
|
||||
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
|
||||
}
|
||||
{
|
||||
// Assert reporting if new bank
|
||||
let action = leader_slot_metrics_tracker
|
||||
.check_leader_slot_boundary(Some(&next_poh_recorder_bank));
|
||||
.check_leader_slot_boundary(Some(&next_poh_recorder_bank), None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)),
|
||||
mem::discriminant(&action)
|
||||
|
@ -1044,7 +1141,7 @@ mod tests {
|
|||
}
|
||||
{
|
||||
// Assert reporting if slot has ended
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
|
||||
mem::discriminant(&action)
|
||||
|
@ -1072,13 +1169,13 @@ mod tests {
|
|||
{
|
||||
// Setup with next_bank
|
||||
let action = leader_slot_metrics_tracker
|
||||
.check_leader_slot_boundary(Some(&next_poh_recorder_bank));
|
||||
.check_leader_slot_boundary(Some(&next_poh_recorder_bank), None);
|
||||
assert!(leader_slot_metrics_tracker.apply_action(action).is_none());
|
||||
}
|
||||
{
|
||||
// Assert reporting if new bank
|
||||
let action = leader_slot_metrics_tracker
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank));
|
||||
.check_leader_slot_boundary(Some(&first_poh_recorder_bank), None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::ReportAndNewTracker(None)),
|
||||
mem::discriminant(&action)
|
||||
|
@ -1091,7 +1188,7 @@ mod tests {
|
|||
}
|
||||
{
|
||||
// Assert reporting if slot has ended
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None);
|
||||
let action = leader_slot_metrics_tracker.check_leader_slot_boundary(None, None);
|
||||
assert_eq!(
|
||||
mem::discriminant(&MetricsTrackerAction::ReportAndResetTracker),
|
||||
mem::discriminant(&action)
|
||||
|
|
|
@ -193,6 +193,14 @@ impl UnprocessedPacketBatches {
|
|||
self.packet_priority_queue.is_empty()
|
||||
}
|
||||
|
||||
pub fn get_min_priority(&self) -> Option<u64> {
|
||||
self.packet_priority_queue.peek_min().map(|x| x.priority())
|
||||
}
|
||||
|
||||
pub fn get_max_priority(&self) -> Option<u64> {
|
||||
self.packet_priority_queue.peek_max().map(|x| x.priority())
|
||||
}
|
||||
|
||||
fn push_internal(&mut self, deserialized_packet: DeserializedPacket) {
|
||||
// Push into the priority queue
|
||||
self.packet_priority_queue
|
||||
|
|
|
@ -18,7 +18,7 @@ use {
|
|||
bundle_stage::bundle_stage_leader_metrics::BundleStageLeaderMetrics,
|
||||
immutable_deserialized_bundle::ImmutableDeserializedBundle,
|
||||
},
|
||||
itertools::Itertools,
|
||||
itertools::{Itertools, MinMaxResult},
|
||||
min_max_heap::MinMaxHeap,
|
||||
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
|
||||
solana_bundle::{bundle_execution::LoadAndExecuteBundleError, BundleExecutionError},
|
||||
|
@ -296,6 +296,32 @@ impl UnprocessedTransactionStorage {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn get_min_priority(&self) -> Option<u64> {
|
||||
match self {
|
||||
Self::VoteStorage(_) => None,
|
||||
Self::LocalTransactionStorage(transaction_storage) => {
|
||||
transaction_storage.get_min_priority()
|
||||
}
|
||||
Self::BundleStorage(bundle_storage) => bundle_storage
|
||||
.get_minmax_priorization_fees()
|
||||
.into_option()
|
||||
.map(|x| x.0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_max_priority(&self) -> Option<u64> {
|
||||
match self {
|
||||
Self::VoteStorage(_) => None,
|
||||
Self::LocalTransactionStorage(transaction_storage) => {
|
||||
transaction_storage.get_max_priority()
|
||||
}
|
||||
Self::BundleStorage(bundle_storage) => bundle_storage
|
||||
.get_minmax_priorization_fees()
|
||||
.into_option()
|
||||
.map(|x| x.1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the maximum number of packets a receive should accept
|
||||
pub fn max_receive_size(&self) -> usize {
|
||||
match self {
|
||||
|
@ -622,6 +648,14 @@ impl ThreadLocalUnprocessedPackets {
|
|||
self.unprocessed_packet_batches.len()
|
||||
}
|
||||
|
||||
pub fn get_min_priority(&self) -> Option<u64> {
|
||||
self.unprocessed_packet_batches.get_min_priority()
|
||||
}
|
||||
|
||||
pub fn get_max_priority(&self) -> Option<u64> {
|
||||
self.unprocessed_packet_batches.get_max_priority()
|
||||
}
|
||||
|
||||
fn max_receive_size(&self) -> usize {
|
||||
self.unprocessed_packet_batches.capacity() - self.unprocessed_packet_batches.len()
|
||||
}
|
||||
|
@ -1378,6 +1412,36 @@ impl BundleStorage {
|
|||
|
||||
sanitized_bundles
|
||||
}
|
||||
|
||||
pub fn get_minmax_priorization_fees(&self) -> MinMaxResult<u64> {
|
||||
let (min, max) = self
|
||||
.unprocessed_bundle_storage
|
||||
.iter()
|
||||
.map(|bundle| {
|
||||
bundle
|
||||
.get_minmax_priorization_fees()
|
||||
.into_option()
|
||||
.unwrap_or((u64::MAX, u64::MIN))
|
||||
})
|
||||
.fold((u64::MAX, u64::MIN), |(a, b), (c, d)| {
|
||||
(std::cmp::min(a, c), std::cmp::max(b, d))
|
||||
});
|
||||
|
||||
let (min_c, max_c) = self
|
||||
.cost_model_buffered_bundle_storage
|
||||
.iter()
|
||||
.map(|bundle| {
|
||||
bundle
|
||||
.get_minmax_priorization_fees()
|
||||
.into_option()
|
||||
.unwrap_or((u64::MAX, u64::MIN))
|
||||
})
|
||||
.fold((u64::MAX, u64::MIN), |(a, b), (c, d)| {
|
||||
(std::cmp::min(a, c), std::cmp::max(b, d))
|
||||
});
|
||||
|
||||
MinMaxResult::MinMax(std::cmp::min(min, min_c), std::cmp::max(max, max_c))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -384,8 +384,8 @@ impl BundleStage {
|
|||
let (decision, make_decision_time) =
|
||||
measure!(decision_maker.make_consume_or_forward_decision());
|
||||
|
||||
let (metrics_action, banking_stage_metrics_action) =
|
||||
bundle_stage_leader_metrics.check_leader_slot_boundary(decision.bank_start());
|
||||
let (metrics_action, banking_stage_metrics_action) = bundle_stage_leader_metrics
|
||||
.check_leader_slot_boundary(decision.bank_start(), Some(unprocessed_bundle_storage));
|
||||
bundle_stage_leader_metrics
|
||||
.leader_slot_metrics_tracker()
|
||||
.increment_make_decision_us(make_decision_time.as_us());
|
||||
|
|
|
@ -16,6 +16,7 @@ use {
|
|||
proxy::block_engine_stage::BlockBuilderFeeInfo,
|
||||
tip_manager::TipManager,
|
||||
},
|
||||
itertools::Itertools,
|
||||
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
|
||||
solana_bundle::{
|
||||
bundle_execution::{load_and_execute_bundle, BundleExecutionMetrics},
|
||||
|
@ -27,8 +28,10 @@ use {
|
|||
solana_poh::poh_recorder::{BankStart, RecordTransactionsSummary, TransactionRecorder},
|
||||
solana_runtime::bank::Bank,
|
||||
solana_sdk::{
|
||||
borsh0_10::try_from_slice_unchecked,
|
||||
bundle::SanitizedBundle,
|
||||
clock::{Slot, MAX_PROCESSING_AGE},
|
||||
compute_budget::{self, ComputeBudgetInstruction},
|
||||
feature_set,
|
||||
pubkey::Pubkey,
|
||||
transaction::{self},
|
||||
|
@ -508,6 +511,33 @@ impl BundleConsumer {
|
|||
sanitized_bundle.transactions.len()
|
||||
);
|
||||
|
||||
let minmax = sanitized_bundle
|
||||
.transactions
|
||||
.iter()
|
||||
.filter_map(|transaction| {
|
||||
let message = transaction.message();
|
||||
for (program_id, instruction) in message.program_instructions_iter() {
|
||||
if compute_budget::check_id(program_id) {
|
||||
match try_from_slice_unchecked(&instruction.data) {
|
||||
Ok(ComputeBudgetInstruction::SetComputeUnitPrice(micro_lamports)) => {
|
||||
return Some(micro_lamports as usize);
|
||||
}
|
||||
Ok(ComputeBudgetInstruction::RequestUnitsDeprecated {
|
||||
additional_fee,
|
||||
..
|
||||
}) => {
|
||||
return Some(additional_fee as usize);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.minmax();
|
||||
let (scheduled_max_prioritization_fees, scheduled_min_prioritization_fees) =
|
||||
minmax.into_option().unwrap_or_default();
|
||||
|
||||
let (
|
||||
(transaction_qos_cost_results, _cost_model_throttled_transactions_count),
|
||||
cost_model_elapsed_us,
|
||||
|
@ -572,6 +602,8 @@ impl BundleConsumer {
|
|||
cost_model_us: cost_model_elapsed_us,
|
||||
execute_and_commit_timings: result.execute_and_commit_timings,
|
||||
error_counters: result.transaction_error_counter,
|
||||
scheduled_min_prioritization_fees,
|
||||
scheduled_max_prioritization_fees,
|
||||
});
|
||||
|
||||
match result.result {
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
use {
|
||||
crate::{
|
||||
banking_stage::{leader_slot_metrics, leader_slot_metrics::LeaderSlotMetricsTracker},
|
||||
banking_stage::{
|
||||
leader_slot_metrics, leader_slot_metrics::LeaderSlotMetricsTracker,
|
||||
unprocessed_transaction_storage::UnprocessedTransactionStorage,
|
||||
},
|
||||
immutable_deserialized_bundle::DeserializedBundleError,
|
||||
},
|
||||
solana_bundle::{bundle_execution::LoadAndExecuteBundleError, BundleExecutionError},
|
||||
|
@ -31,13 +34,14 @@ impl BundleStageLeaderMetrics {
|
|||
pub(crate) fn check_leader_slot_boundary(
|
||||
&mut self,
|
||||
bank_start: Option<&BankStart>,
|
||||
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
|
||||
) -> (
|
||||
leader_slot_metrics::MetricsTrackerAction,
|
||||
MetricsTrackerAction,
|
||||
) {
|
||||
let banking_stage_metrics_action = self
|
||||
.leader_slot_metrics_tracker
|
||||
.check_leader_slot_boundary(bank_start);
|
||||
.check_leader_slot_boundary(bank_start, unprocessed_transaction_storage);
|
||||
let bundle_stage_metrics_action = self
|
||||
.bundle_stage_metrics_tracker
|
||||
.check_leader_slot_boundary(bank_start);
|
||||
|
|
|
@ -3,6 +3,7 @@ use {
|
|||
banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket,
|
||||
packet_bundle::PacketBundle,
|
||||
},
|
||||
itertools::{Itertools, MinMaxResult},
|
||||
solana_accounts_db::transaction_error_metrics::TransactionErrorMetrics,
|
||||
solana_perf::sigverify::verify_packet,
|
||||
solana_runtime::bank::Bank,
|
||||
|
@ -166,6 +167,10 @@ impl ImmutableDeserializedBundle {
|
|||
bundle_id: self.bundle_id.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_minmax_priorization_fees(&self) -> MinMaxResult<u64> {
|
||||
self.packets.iter().map(|packet| packet.priority()).minmax()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -73,7 +73,7 @@ use {
|
|||
saturating_add_assign,
|
||||
signature::{Keypair, Signature, Signer},
|
||||
timing::timestamp,
|
||||
transaction::Transaction,
|
||||
transaction::{BankingTransactionResultNotifier, Transaction},
|
||||
},
|
||||
solana_vote::vote_sender_types::ReplayVoteSender,
|
||||
solana_vote_program::vote_state::VoteTransaction,
|
||||
|
@ -249,6 +249,7 @@ pub struct ReplayStageConfig {
|
|||
// duplicate voting which can lead to slashing.
|
||||
pub wait_to_vote_slot: Option<Slot>,
|
||||
pub replay_slots_concurrently: bool,
|
||||
pub banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -531,6 +532,7 @@ impl ReplayStage {
|
|||
tower_storage,
|
||||
wait_to_vote_slot,
|
||||
replay_slots_concurrently,
|
||||
banking_transaction_result_notifier: banking_transaction_result_notifier_lock,
|
||||
} = config;
|
||||
|
||||
trace!("replay stage");
|
||||
|
@ -1054,6 +1056,7 @@ impl ReplayStage {
|
|||
&banking_tracer,
|
||||
has_new_vote_been_rooted,
|
||||
transaction_status_sender.is_some(),
|
||||
banking_transaction_result_notifier_lock.clone(),
|
||||
);
|
||||
|
||||
let poh_bank = poh_recorder.read().unwrap().bank();
|
||||
|
@ -1913,6 +1916,7 @@ impl ReplayStage {
|
|||
banking_tracer: &Arc<BankingTracer>,
|
||||
has_new_vote_been_rooted: bool,
|
||||
track_transaction_indexes: bool,
|
||||
banking_transaction_result_notifier_lock: Option<BankingTransactionResultNotifier>,
|
||||
) {
|
||||
// all the individual calls to poh_recorder.read() are designed to
|
||||
// increase granularity, decrease contention
|
||||
|
@ -2023,7 +2027,7 @@ impl ReplayStage {
|
|||
false
|
||||
};
|
||||
|
||||
let tpu_bank = Self::new_bank_from_parent_with_notify(
|
||||
let mut tpu_bank = Self::new_bank_from_parent_with_notify(
|
||||
parent.clone(),
|
||||
poh_slot,
|
||||
root_slot,
|
||||
|
@ -2031,6 +2035,11 @@ impl ReplayStage {
|
|||
rpc_subscriptions,
|
||||
NewBankOptions { vote_only_bank },
|
||||
);
|
||||
if banking_transaction_result_notifier_lock.is_some() {
|
||||
tpu_bank.set_banking_transaction_results_notifier(
|
||||
banking_transaction_result_notifier_lock,
|
||||
);
|
||||
}
|
||||
// make sure parent is frozen for finalized hashes via the above
|
||||
// new()-ing of its child bank
|
||||
banking_tracer.hash_event(parent.slot(), &parent.last_blockhash(), &parent.hash());
|
||||
|
|
|
@ -45,7 +45,10 @@ use {
|
|||
accounts_background_service::AbsRequestSender, bank_forks::BankForks,
|
||||
commitment::BlockCommitmentCache, prioritization_fee_cache::PrioritizationFeeCache,
|
||||
},
|
||||
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
|
||||
solana_sdk::{
|
||||
clock::Slot, pubkey::Pubkey, signature::Keypair,
|
||||
transaction::BankingTransactionResultNotifier,
|
||||
},
|
||||
solana_turbine::retransmit_stage::RetransmitStage,
|
||||
solana_vote::vote_sender_types::ReplayVoteSender,
|
||||
std::{
|
||||
|
@ -140,6 +143,7 @@ impl Tvu {
|
|||
turbine_quic_endpoint_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>,
|
||||
repair_quic_endpoint_sender: AsyncSender<LocalRequest>,
|
||||
shred_receiver_addr: Arc<RwLock<Option<SocketAddr>>>,
|
||||
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
|
||||
) -> Result<Self, String> {
|
||||
let TvuSockets {
|
||||
repair: repair_socket,
|
||||
|
@ -259,6 +263,7 @@ impl Tvu {
|
|||
tower_storage: tower_storage.clone(),
|
||||
wait_to_vote_slot,
|
||||
replay_slots_concurrently: tvu_config.replay_slots_concurrently,
|
||||
banking_transaction_result_notifier,
|
||||
};
|
||||
|
||||
let (voting_sender, voting_receiver) = unbounded();
|
||||
|
@ -495,6 +500,7 @@ pub mod tests {
|
|||
turbine_quic_endpoint_receiver,
|
||||
repair_quic_endpoint_sender,
|
||||
Arc::new(RwLock::new(None)),
|
||||
None,
|
||||
)
|
||||
.expect("assume success");
|
||||
exit.store(true, Ordering::Relaxed);
|
||||
|
|
|
@ -665,6 +665,13 @@ impl Validator {
|
|||
.as_ref()
|
||||
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());
|
||||
|
||||
let banking_transaction_results_notifier =
|
||||
geyser_plugin_service
|
||||
.as_ref()
|
||||
.and_then(|geyser_plugin_service| {
|
||||
geyser_plugin_service.get_banking_transaction_result_notifier()
|
||||
});
|
||||
|
||||
info!(
|
||||
"Geyser plugin: accounts_update_notifier: {}, \
|
||||
transaction_notifier: {}, \
|
||||
|
@ -1311,6 +1318,7 @@ impl Validator {
|
|||
turbine_quic_endpoint_receiver,
|
||||
repair_quic_endpoint_sender,
|
||||
config.shred_receiver_address.clone(),
|
||||
banking_transaction_results_notifier,
|
||||
)?;
|
||||
|
||||
let tpu = Tpu::new(
|
||||
|
|
|
@ -6,7 +6,7 @@ use {
|
|||
solana_sdk::{
|
||||
clock::{Slot, UnixTimestamp},
|
||||
signature::Signature,
|
||||
transaction::SanitizedTransaction,
|
||||
transaction::{SanitizedTransaction, TransactionError},
|
||||
},
|
||||
solana_transaction_status::{Reward, TransactionStatusMeta},
|
||||
std::{any::Any, error, io},
|
||||
|
@ -355,6 +355,17 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when we get banking stage errors.
|
||||
#[allow(unused_variables)]
|
||||
fn notify_banking_stage_transaction_results(
|
||||
&self,
|
||||
transaction: &SanitizedTransaction,
|
||||
error: Option<TransactionError>,
|
||||
slot: Slot,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the plugin is interested in account data
|
||||
/// Default is true -- if the plugin is not interested in
|
||||
/// account data, please return false.
|
||||
|
@ -375,4 +386,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
|
|||
fn entry_notifications_enabled(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Check if the plugin is interesed in transaction errors duing banking
|
||||
/// stage Default is false -- if the plugin is interested in naking
|
||||
/// stage errors
|
||||
fn banking_transaction_results_notifications_enabled(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
use {
|
||||
crate::geyser_plugin_manager::GeyserPluginManager,
|
||||
log::{error, log_enabled, trace},
|
||||
solana_measure::measure::Measure,
|
||||
solana_metrics::{create_counter, inc_counter, inc_new_counter, inc_new_counter_debug},
|
||||
solana_sdk::{
|
||||
slot_history::Slot,
|
||||
transaction::{SanitizedTransaction, TransactionError, TransactionResultNotifier},
|
||||
},
|
||||
std::sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BankingTransactionResultImpl {
|
||||
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
|
||||
}
|
||||
|
||||
impl TransactionResultNotifier for BankingTransactionResultImpl {
|
||||
fn notify_banking_transaction_result(
|
||||
&self,
|
||||
transaction: &SanitizedTransaction,
|
||||
result: Option<TransactionError>,
|
||||
slot: Slot,
|
||||
) {
|
||||
let mut measure = Measure::start("geyser-plugin-notify_plugins_of_entry_info");
|
||||
|
||||
let plugin_manager = self.plugin_manager.read().unwrap();
|
||||
if plugin_manager.plugins.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
for plugin in plugin_manager.plugins.iter() {
|
||||
if !plugin.banking_transaction_results_notifications_enabled() {
|
||||
continue;
|
||||
}
|
||||
match plugin.notify_banking_stage_transaction_results(transaction, result.clone(), slot)
|
||||
{
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Failed to notify banking transaction result, error: ({}) to plugin {}",
|
||||
err,
|
||||
plugin.name()
|
||||
)
|
||||
}
|
||||
Ok(_) => {
|
||||
trace!(
|
||||
"Successfully notified banking transaction result to plugin {}",
|
||||
plugin.name()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
measure.stop();
|
||||
inc_new_counter_debug!(
|
||||
"geyser-plugin-notify_plugins_of_banking_transaction_info-us",
|
||||
measure.as_us() as usize,
|
||||
10000,
|
||||
10000
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl BankingTransactionResultImpl {
|
||||
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
|
||||
Self { plugin_manager }
|
||||
}
|
||||
}
|
|
@ -100,6 +100,15 @@ impl GeyserPluginManager {
|
|||
false
|
||||
}
|
||||
|
||||
pub fn banking_transaction_result_notification_enabled(&self) -> bool {
|
||||
for plugin in &self.plugins {
|
||||
if plugin.banking_transaction_results_notifications_enabled() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Admin RPC request handler
|
||||
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
|
||||
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use {
|
||||
crate::{
|
||||
accounts_update_notifier::AccountsUpdateNotifierImpl,
|
||||
banking_transaction_result_notifier::BankingTransactionResultImpl,
|
||||
block_metadata_notifier::BlockMetadataNotifierImpl,
|
||||
block_metadata_notifier_interface::BlockMetadataNotifierLock,
|
||||
entry_notifier::EntryNotifierImpl,
|
||||
|
@ -17,6 +18,7 @@ use {
|
|||
optimistically_confirmed_bank_tracker::SlotNotification,
|
||||
transaction_notifier_interface::TransactionNotifierLock,
|
||||
},
|
||||
solana_sdk::transaction::BankingTransactionResultNotifier,
|
||||
std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::{
|
||||
|
@ -37,6 +39,7 @@ pub struct GeyserPluginService {
|
|||
transaction_notifier: Option<TransactionNotifierLock>,
|
||||
entry_notifier: Option<EntryNotifierLock>,
|
||||
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
|
||||
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
|
||||
}
|
||||
|
||||
impl GeyserPluginService {
|
||||
|
@ -81,6 +84,8 @@ impl GeyserPluginService {
|
|||
plugin_manager.account_data_notifications_enabled();
|
||||
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
|
||||
let entry_notifications_enabled = plugin_manager.entry_notifications_enabled();
|
||||
let banking_stage_transaction_result_notification =
|
||||
plugin_manager.banking_transaction_result_notification_enabled();
|
||||
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
|
||||
|
||||
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
|
||||
|
@ -107,6 +112,17 @@ impl GeyserPluginService {
|
|||
None
|
||||
};
|
||||
|
||||
let transaction_result_notifier: Option<BankingTransactionResultNotifier> =
|
||||
if banking_stage_transaction_result_notification {
|
||||
let banking_transaction_result_notifier =
|
||||
BankingTransactionResultImpl::new(plugin_manager.clone());
|
||||
Some(BankingTransactionResultNotifier {
|
||||
lock: Arc::new(RwLock::new(banking_transaction_result_notifier)),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let (slot_status_observer, block_metadata_notifier): (
|
||||
Option<SlotStatusObserver>,
|
||||
Option<BlockMetadataNotifierLock>,
|
||||
|
@ -143,6 +159,7 @@ impl GeyserPluginService {
|
|||
transaction_notifier,
|
||||
entry_notifier,
|
||||
block_metadata_notifier,
|
||||
banking_transaction_result_notifier: transaction_result_notifier,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -168,6 +185,12 @@ impl GeyserPluginService {
|
|||
self.entry_notifier.clone()
|
||||
}
|
||||
|
||||
pub fn get_banking_transaction_result_notifier(
|
||||
&self,
|
||||
) -> Option<BankingTransactionResultNotifier> {
|
||||
self.banking_transaction_result_notifier.clone()
|
||||
}
|
||||
|
||||
pub fn get_block_metadata_notifier(&self) -> Option<BlockMetadataNotifierLock> {
|
||||
self.block_metadata_notifier.clone()
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
pub mod accounts_update_notifier;
|
||||
pub mod banking_transaction_result_notifier;
|
||||
pub mod block_metadata_notifier;
|
||||
pub mod block_metadata_notifier_interface;
|
||||
pub mod entry_notifier;
|
||||
|
@ -7,5 +8,4 @@ pub mod geyser_plugin_service;
|
|||
pub mod slot_status_notifier;
|
||||
pub mod slot_status_observer;
|
||||
pub mod transaction_notifier;
|
||||
|
||||
pub use geyser_plugin_manager::GeyserPluginManagerRequest;
|
||||
|
|
|
@ -167,8 +167,9 @@ use {
|
|||
sysvar::{self, last_restart_slot::LastRestartSlot, Sysvar, SysvarId},
|
||||
timing::years_as_slots,
|
||||
transaction::{
|
||||
self, MessageHash, Result, SanitizedTransaction, Transaction, TransactionError,
|
||||
TransactionVerificationMode, VersionedTransaction, MAX_TX_ACCOUNT_LOCKS,
|
||||
self, BankingTransactionResultNotifier, MessageHash, Result, SanitizedTransaction,
|
||||
Transaction, TransactionError, TransactionVerificationMode, VersionedTransaction,
|
||||
MAX_TX_ACCOUNT_LOCKS,
|
||||
},
|
||||
transaction_context::{
|
||||
ExecutionRecord, TransactionAccount, TransactionContext, TransactionReturnData,
|
||||
|
@ -583,6 +584,7 @@ impl PartialEq for Bank {
|
|||
loaded_programs_cache: _,
|
||||
check_program_modification_slot: _,
|
||||
epoch_reward_status: _,
|
||||
banking_transaction_result_notifier: _,
|
||||
// Ignore new fields explicitly if they do not impact PartialEq.
|
||||
// Adding ".." will remove compile-time checks that if a new field
|
||||
// is added to the struct, this PartialEq is accordingly updated.
|
||||
|
@ -841,6 +843,9 @@ pub struct Bank {
|
|||
pub check_program_modification_slot: bool,
|
||||
|
||||
epoch_reward_status: EpochRewardStatus,
|
||||
|
||||
/// geyser plugin to notify transaction results
|
||||
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
|
||||
}
|
||||
|
||||
struct VoteWithStakeDelegations {
|
||||
|
@ -1091,6 +1096,7 @@ impl Bank {
|
|||
))),
|
||||
check_program_modification_slot: false,
|
||||
epoch_reward_status: EpochRewardStatus::default(),
|
||||
banking_transaction_result_notifier: None,
|
||||
};
|
||||
|
||||
let accounts_data_size_initial = bank.get_total_accounts_stats().unwrap().data_len as u64;
|
||||
|
@ -1442,6 +1448,7 @@ impl Bank {
|
|||
loaded_programs_cache: parent.loaded_programs_cache.clone(),
|
||||
check_program_modification_slot: false,
|
||||
epoch_reward_status: parent.epoch_reward_status.clone(),
|
||||
banking_transaction_result_notifier: None,
|
||||
};
|
||||
|
||||
let (_, ancestors_time_us) = measure_us!({
|
||||
|
@ -1943,6 +1950,7 @@ impl Bank {
|
|||
))),
|
||||
check_program_modification_slot: false,
|
||||
epoch_reward_status: EpochRewardStatus::default(),
|
||||
banking_transaction_result_notifier: None,
|
||||
};
|
||||
bank.finish_init(
|
||||
genesis_config,
|
||||
|
@ -4194,6 +4202,9 @@ impl Bank {
|
|||
let mut status_cache = self.status_cache.write().unwrap();
|
||||
assert_eq!(sanitized_txs.len(), execution_results.len());
|
||||
for (tx, execution_result) in sanitized_txs.iter().zip(execution_results) {
|
||||
if let TransactionExecutionResult::NotExecuted(err) = execution_result {
|
||||
self.notify_transaction_error(tx, Some(err.clone()));
|
||||
}
|
||||
if let Some(details) = execution_result.details() {
|
||||
// Add the message hash to the status cache to ensure that this message
|
||||
// won't be processed again with a different signature.
|
||||
|
@ -4573,6 +4584,7 @@ impl Bank {
|
|||
(Ok(()), Some(NoncePartial::new(address, account)))
|
||||
} else {
|
||||
error_counters.blockhash_not_found += 1;
|
||||
self.notify_transaction_error(tx, Some(TransactionError::BlockhashNotFound));
|
||||
(Err(TransactionError::BlockhashNotFound), None)
|
||||
}
|
||||
}
|
||||
|
@ -4604,6 +4616,10 @@ impl Bank {
|
|||
&& self.is_transaction_already_processed(sanitized_tx, &rcache)
|
||||
{
|
||||
error_counters.already_processed += 1;
|
||||
self.notify_transaction_error(
|
||||
sanitized_tx,
|
||||
Some(TransactionError::AlreadyProcessed),
|
||||
);
|
||||
return (Err(TransactionError::AlreadyProcessed), None);
|
||||
}
|
||||
|
||||
|
@ -5063,6 +5079,7 @@ impl Bank {
|
|||
error_counters.instruction_error += 1;
|
||||
}
|
||||
}
|
||||
self.notify_transaction_error(tx, Some(err.clone()));
|
||||
err
|
||||
});
|
||||
|
||||
|
@ -5094,6 +5111,7 @@ impl Bank {
|
|||
.is_none()
|
||||
{
|
||||
status = Err(TransactionError::UnbalancedTransaction);
|
||||
self.notify_transaction_error(tx, Some(TransactionError::UnbalancedTransaction));
|
||||
}
|
||||
let mut accounts_data_len_delta = status
|
||||
.as_ref()
|
||||
|
@ -5227,6 +5245,24 @@ impl Bank {
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
|
||||
pub fn notify_transaction_error(
|
||||
&self,
|
||||
transaction: &SanitizedTransaction,
|
||||
result: Option<TransactionError>,
|
||||
) {
|
||||
if let Some(transaction_result_notifier_lock) = &self.banking_transaction_result_notifier {
|
||||
let transaction_error_notifier = transaction_result_notifier_lock.lock.read();
|
||||
if let Ok(transaction_error_notifier) = transaction_error_notifier {
|
||||
transaction_error_notifier.notify_banking_transaction_result(
|
||||
transaction,
|
||||
result,
|
||||
self.slot,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn load_and_execute_transactions(
|
||||
&self,
|
||||
batch: &TransactionBatch,
|
||||
|
@ -5279,6 +5315,23 @@ impl Bank {
|
|||
})
|
||||
.collect();
|
||||
|
||||
if let Some(transaction_result_notifier_lock) = &self.banking_transaction_result_notifier {
|
||||
let transaction_error_notifier = transaction_result_notifier_lock.lock.read();
|
||||
if let Ok(transaction_error_notifier) = transaction_error_notifier {
|
||||
batch
|
||||
.sanitized_transactions()
|
||||
.iter()
|
||||
.zip(batch.lock_results())
|
||||
.for_each(|(transaction, result)| {
|
||||
transaction_error_notifier.notify_banking_transaction_result(
|
||||
transaction,
|
||||
result.clone().err(),
|
||||
self.slot,
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut check_time = Measure::start("check_transactions");
|
||||
let mut check_results = self.check_transactions(
|
||||
sanitized_txs,
|
||||
|
@ -8342,6 +8395,17 @@ impl Bank {
|
|||
}
|
||||
false
|
||||
}
|
||||
|
||||
pub fn set_banking_transaction_results_notifier(
|
||||
&mut self,
|
||||
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
|
||||
) {
|
||||
self.banking_transaction_result_notifier = banking_transaction_result_notifier;
|
||||
}
|
||||
|
||||
pub fn has_banking_transaction_results_notifier(&self) -> bool {
|
||||
self.banking_transaction_result_notifier.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
/// Compute how much an account has changed size. This function is useful when the data size delta
|
||||
|
|
|
@ -3,8 +3,14 @@ use {
|
|||
instruction::InstructionError,
|
||||
message::{AddressLoaderError, SanitizeMessageError},
|
||||
sanitize::SanitizeError,
|
||||
slot_history::Slot,
|
||||
transaction::SanitizedTransaction,
|
||||
},
|
||||
serde::Serialize,
|
||||
std::{
|
||||
fmt::Debug,
|
||||
sync::{Arc, RwLock},
|
||||
},
|
||||
thiserror::Error,
|
||||
};
|
||||
|
||||
|
@ -198,3 +204,42 @@ impl From<AddressLoaderError> for TransactionError {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait TransactionResultNotifier: Debug {
|
||||
fn notify_banking_transaction_result(
|
||||
&self,
|
||||
transaction: &SanitizedTransaction,
|
||||
error: Option<TransactionError>,
|
||||
slot: Slot,
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BankingTransactionResultNotifier {
|
||||
pub lock: Arc<RwLock<dyn TransactionResultNotifier + Sync + Send>>,
|
||||
}
|
||||
|
||||
#[cfg(RUSTC_WITH_SPECIALIZATION)]
|
||||
#[derive(Debug)]
|
||||
struct DummyTransactionResultNotifier {}
|
||||
|
||||
#[cfg(RUSTC_WITH_SPECIALIZATION)]
|
||||
impl TransactionResultNotifier for DummyTransactionResultNotifier {
|
||||
fn notify_banking_transaction_result(
|
||||
&self,
|
||||
_: &SanitizedTransaction,
|
||||
_: Option<TransactionError>,
|
||||
_: Slot,
|
||||
) {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(RUSTC_WITH_SPECIALIZATION)]
|
||||
impl solana_frozen_abi::abi_example::AbiExample for BankingTransactionResultNotifier {
|
||||
fn example() -> Self {
|
||||
// BankingTransactionResultNotifier isn't serializable by definition.
|
||||
BankingTransactionResultNotifier {
|
||||
lock: Arc::new(RwLock::new(DummyTransactionResultNotifier {})),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,6 +163,7 @@ async fn run_server(
|
|||
));
|
||||
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
|
||||
Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked)));
|
||||
|
||||
let (sender, receiver) = async_unbounded();
|
||||
tokio::spawn(packet_batch_sender(
|
||||
packet_sender,
|
||||
|
@ -1061,6 +1062,15 @@ pub enum ConnectionPeerType {
|
|||
Staked,
|
||||
}
|
||||
|
||||
impl ConnectionPeerType {
|
||||
pub fn to_string(&self) -> String {
|
||||
match self {
|
||||
ConnectionPeerType::Unstaked => "unstaked".to_string(),
|
||||
ConnectionPeerType::Staked => "staked".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Eq, Hash, PartialEq)]
|
||||
enum ConnectionTableKey {
|
||||
IP(IpAddr),
|
||||
|
@ -1073,6 +1083,13 @@ impl ConnectionTableKey {
|
|||
ConnectionTableKey::Pubkey(pubkey)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn to_string(&self) -> String {
|
||||
match self {
|
||||
ConnectionTableKey::IP(ip) => format!("ip:{}", ip),
|
||||
ConnectionTableKey::Pubkey(pubkey) => format!("pubkey:{}", pubkey.to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Map of IP to list of connection entries
|
||||
|
@ -1103,7 +1120,14 @@ impl ConnectionTable {
|
|||
None => break,
|
||||
Some((index, connections)) => {
|
||||
num_pruned += connections.len();
|
||||
self.table.swap_remove_index(index);
|
||||
if let Some((k, v)) = self.table.swap_remove_index(index) {
|
||||
log::info!(
|
||||
"QUIC Connection dropped {}, {}, count : {} ",
|
||||
self.peer_type.to_string(),
|
||||
k.to_string(),
|
||||
v.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1130,7 +1154,19 @@ impl ConnectionTable {
|
|||
.take(sample_size)
|
||||
.min_by_key(|&(_, stake)| stake)
|
||||
.filter(|&(_, stake)| stake < Some(threshold_stake))
|
||||
.and_then(|(index, _)| self.table.swap_remove_index(index))
|
||||
.and_then(|(index, _)| {
|
||||
if let Some((k, v)) = self.table.swap_remove_index(index) {
|
||||
log::info!(
|
||||
"QUIC Connection dropped {}, {}, count : {} ",
|
||||
self.peer_type.to_string(),
|
||||
k.to_string(),
|
||||
v.len()
|
||||
);
|
||||
Some((k, v))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.map(|(_, connections)| connections.len())
|
||||
.unwrap_or_default();
|
||||
self.total_size = self.total_size.saturating_sub(num_pruned);
|
||||
|
@ -1153,6 +1189,12 @@ impl ConnectionTable {
|
|||
.map(|c| c <= max_connections_per_peer)
|
||||
.unwrap_or(false);
|
||||
if has_connection_capacity {
|
||||
log::info!(
|
||||
"QUIC Connection added {}, {}, count : {} ",
|
||||
self.peer_type.to_string(),
|
||||
key.to_string(),
|
||||
connection_entry.len() + 1
|
||||
);
|
||||
let exit = Arc::new(AtomicBool::new(false));
|
||||
let last_update = Arc::new(AtomicU64::new(last_update));
|
||||
connection_entry.push(ConnectionEntry::new(
|
||||
|
@ -1200,6 +1242,13 @@ impl ConnectionTable {
|
|||
}
|
||||
let connections_removed = old_size.saturating_sub(new_size);
|
||||
self.total_size = self.total_size.saturating_sub(connections_removed);
|
||||
|
||||
log::info!(
|
||||
"QUIC Connection added {}, {}, count : {} ",
|
||||
self.peer_type.to_string(),
|
||||
key.to_string(),
|
||||
new_size
|
||||
);
|
||||
connections_removed
|
||||
} else {
|
||||
0
|
||||
|
|
Loading…
Reference in New Issue