Implementing getting banking stage transaction errors over geyser

Fixing abi example issue during tests and some minor changes

Add more messages to notification
This commit is contained in:
godmodegalactus 2023-10-12 16:09:18 +02:00
parent 3f9a7a52ea
commit 7ea018db97
No known key found for this signature in database
GPG Key ID: 22DA4A30887FDA3C
12 changed files with 258 additions and 9 deletions

View File

@ -1,7 +1,6 @@
//! The `banking_stage` processes Transaction messages. It is intended to be used
//! to construct a software pipeline. The stage uses all available CPU cores and
//! can do its processing in parallel with signature verification on the GPU.
use {
self::{
committer::Committer,

View File

@ -445,7 +445,13 @@ impl Consumer {
// Re-sanitized transaction should be equal to the original transaction,
// but whether it will pass sanitization needs to be checked.
let resanitized_tx =
bank.fully_verify_transaction(tx.to_versioned_transaction())?;
match bank.fully_verify_transaction(tx.to_versioned_transaction()) {
Ok(resanitized_tx) => resanitized_tx,
Err(e) => {
bank.notify_transaction_error(tx, Some(e.clone()));
return Err(e);
}
};
if resanitized_tx != *tx {
// Sanitization before/after epoch give different transaction data - do not execute.
return Err(TransactionError::ResanitizationNeeded);

View File

@ -74,7 +74,7 @@ use {
saturating_add_assign,
signature::{Keypair, Signature, Signer},
timing::timestamp,
transaction::Transaction,
transaction::{BankingTransactionResultNotifier, Transaction},
},
solana_vote::vote_sender_types::ReplayVoteSender,
solana_vote_program::vote_state::VoteTransaction,
@ -295,6 +295,7 @@ pub struct ReplayStageConfig {
// duplicate voting which can lead to slashing.
pub wait_to_vote_slot: Option<Slot>,
pub replay_slots_concurrently: bool,
pub banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
}
/// Timing information for the ReplayStage main processing loop
@ -578,6 +579,7 @@ impl ReplayStage {
tower_storage,
wait_to_vote_slot,
replay_slots_concurrently,
banking_transaction_result_notifier: banking_transaction_result_notifier_lock,
} = config;
trace!("replay stage");
@ -1131,6 +1133,7 @@ impl ReplayStage {
&banking_tracer,
has_new_vote_been_rooted,
transaction_status_sender.is_some(),
banking_transaction_result_notifier_lock.clone(),
);
let poh_bank = poh_recorder.read().unwrap().bank();
@ -1994,6 +1997,7 @@ impl ReplayStage {
banking_tracer: &Arc<BankingTracer>,
has_new_vote_been_rooted: bool,
track_transaction_indexes: bool,
banking_transaction_result_notifier_lock: Option<BankingTransactionResultNotifier>,
) {
// all the individual calls to poh_recorder.read() are designed to
// increase granularity, decrease contention
@ -2104,7 +2108,7 @@ impl ReplayStage {
false
};
let tpu_bank = Self::new_bank_from_parent_with_notify(
let mut tpu_bank = Self::new_bank_from_parent_with_notify(
parent.clone(),
poh_slot,
root_slot,
@ -2112,6 +2116,11 @@ impl ReplayStage {
rpc_subscriptions,
NewBankOptions { vote_only_bank },
);
if banking_transaction_result_notifier_lock.is_some() {
tpu_bank.set_banking_transaction_results_notifier(
banking_transaction_result_notifier_lock,
);
}
// make sure parent is frozen for finalized hashes via the above
// new()-ing of its child bank
banking_tracer.hash_event(parent.slot(), &parent.last_blockhash(), &parent.hash());

View File

@ -47,7 +47,10 @@ use {
accounts_background_service::AbsRequestSender, bank_forks::BankForks,
commitment::BlockCommitmentCache, prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
solana_sdk::{
clock::Slot, pubkey::Pubkey, signature::Keypair,
transaction::BankingTransactionResultNotifier,
},
solana_turbine::retransmit_stage::RetransmitStage,
solana_vote::vote_sender_types::ReplayVoteSender,
std::{
@ -144,6 +147,7 @@ impl Tvu {
outstanding_repair_requests: Arc<RwLock<OutstandingShredRepairs>>,
cluster_slots: Arc<ClusterSlots>,
wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
@ -266,6 +270,7 @@ impl Tvu {
tower_storage: tower_storage.clone(),
wait_to_vote_slot,
replay_slots_concurrently: tvu_config.replay_slots_concurrently,
banking_transaction_result_notifier,
};
let (voting_sender, voting_receiver) = unbounded();

View File

@ -658,6 +658,13 @@ impl Validator {
.as_ref()
.and_then(|geyser_plugin_service| geyser_plugin_service.get_block_metadata_notifier());
let banking_transaction_results_notifier =
geyser_plugin_service
.as_ref()
.and_then(|geyser_plugin_service| {
geyser_plugin_service.get_banking_transaction_result_notifier()
});
info!(
"Geyser plugin: accounts_update_notifier: {}, \
transaction_notifier: {}, \
@ -1321,6 +1328,7 @@ impl Validator {
outstanding_repair_requests.clone(),
cluster_slots.clone(),
wen_restart_repair_slots.clone(),
banking_transaction_results_notifier,
)?;
if in_wen_restart {

View File

@ -6,7 +6,7 @@ use {
solana_sdk::{
clock::{Slot, UnixTimestamp},
signature::Signature,
transaction::SanitizedTransaction,
transaction::{SanitizedTransaction, TransactionError},
},
solana_transaction_status::{Reward, TransactionStatusMeta},
std::{any::Any, error, io},
@ -418,6 +418,17 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
Ok(())
}
/// Called when we get banking stage errors.
#[allow(unused_variables)]
fn notify_banking_stage_transaction_results(
&self,
transaction: &SanitizedTransaction,
error: Option<TransactionError>,
slot: Slot,
) -> Result<()> {
Ok(())
}
/// Check if the plugin is interested in account data
/// Default is true -- if the plugin is not interested in
/// account data, please return false.
@ -438,4 +449,11 @@ pub trait GeyserPlugin: Any + Send + Sync + std::fmt::Debug {
fn entry_notifications_enabled(&self) -> bool {
false
}
/// Check if the plugin is interesed in transaction errors duing banking
/// stage Default is false -- if the plugin is interested in naking
/// stage errors
fn banking_transaction_results_notifications_enabled(&self) -> bool {
false
}
}

View File

@ -0,0 +1,67 @@
use {
crate::geyser_plugin_manager::GeyserPluginManager,
log::{error, log_enabled, trace},
solana_measure::measure::Measure,
solana_metrics::{create_counter, inc_counter, inc_new_counter, inc_new_counter_debug},
solana_sdk::{
slot_history::Slot,
transaction::{SanitizedTransaction, TransactionError, TransactionResultNotifier},
},
std::sync::{Arc, RwLock},
};
#[derive(Debug)]
pub(crate) struct BankingTransactionResultImpl {
plugin_manager: Arc<RwLock<GeyserPluginManager>>,
}
impl TransactionResultNotifier for BankingTransactionResultImpl {
fn notify_banking_transaction_result(
&self,
transaction: &SanitizedTransaction,
result: Option<TransactionError>,
slot: Slot,
) {
let mut measure = Measure::start("geyser-plugin-notify_plugins_of_entry_info");
let plugin_manager = self.plugin_manager.read().unwrap();
if plugin_manager.plugins.is_empty() {
return;
}
for plugin in plugin_manager.plugins.iter() {
if !plugin.banking_transaction_results_notifications_enabled() {
continue;
}
match plugin.notify_banking_stage_transaction_results(transaction, result.clone(), slot)
{
Err(err) => {
error!(
"Failed to notify banking transaction result, error: ({}) to plugin {}",
err,
plugin.name()
)
}
Ok(_) => {
trace!(
"Successfully notified banking transaction result to plugin {}",
plugin.name()
);
}
}
}
measure.stop();
inc_new_counter_debug!(
"geyser-plugin-notify_plugins_of_banking_transaction_info-us",
measure.as_us() as usize,
10000,
10000
);
}
}
impl BankingTransactionResultImpl {
pub fn new(plugin_manager: Arc<RwLock<GeyserPluginManager>>) -> Self {
Self { plugin_manager }
}
}

View File

@ -100,6 +100,15 @@ impl GeyserPluginManager {
false
}
pub fn banking_transaction_result_notification_enabled(&self) -> bool {
for plugin in &self.plugins {
if plugin.banking_transaction_results_notifications_enabled() {
return true;
}
}
false
}
/// Admin RPC request handler
pub(crate) fn list_plugins(&self) -> JsonRpcResult<Vec<String>> {
Ok(self.plugins.iter().map(|p| p.name().to_owned()).collect())

View File

@ -1,6 +1,7 @@
use {
crate::{
accounts_update_notifier::AccountsUpdateNotifierImpl,
banking_transaction_result_notifier::BankingTransactionResultImpl,
block_metadata_notifier::BlockMetadataNotifierImpl,
block_metadata_notifier_interface::BlockMetadataNotifierArc,
entry_notifier::EntryNotifierImpl,
@ -17,6 +18,7 @@ use {
optimistically_confirmed_bank_tracker::SlotNotification,
transaction_notifier_interface::TransactionNotifierArc,
},
solana_sdk::transaction::BankingTransactionResultNotifier,
std::{
path::{Path, PathBuf},
sync::{
@ -37,6 +39,7 @@ pub struct GeyserPluginService {
transaction_notifier: Option<TransactionNotifierArc>,
entry_notifier: Option<EntryNotifierArc>,
block_metadata_notifier: Option<BlockMetadataNotifierArc>,
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
}
impl GeyserPluginService {
@ -81,6 +84,8 @@ impl GeyserPluginService {
plugin_manager.account_data_notifications_enabled();
let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled();
let entry_notifications_enabled = plugin_manager.entry_notifications_enabled();
let banking_stage_transaction_result_notification =
plugin_manager.banking_transaction_result_notification_enabled();
let plugin_manager = Arc::new(RwLock::new(plugin_manager));
let accounts_update_notifier: Option<AccountsUpdateNotifier> =
@ -107,6 +112,17 @@ impl GeyserPluginService {
None
};
let transaction_result_notifier: Option<BankingTransactionResultNotifier> =
if banking_stage_transaction_result_notification {
let banking_transaction_result_notifier =
BankingTransactionResultImpl::new(plugin_manager.clone());
Some(BankingTransactionResultNotifier {
lock: Arc::new(RwLock::new(banking_transaction_result_notifier)),
})
} else {
None
};
let (slot_status_observer, block_metadata_notifier): (
Option<SlotStatusObserver>,
Option<BlockMetadataNotifierArc>,
@ -143,6 +159,7 @@ impl GeyserPluginService {
transaction_notifier,
entry_notifier,
block_metadata_notifier,
banking_transaction_result_notifier: transaction_result_notifier,
})
}
@ -168,6 +185,12 @@ impl GeyserPluginService {
self.entry_notifier.clone()
}
pub fn get_banking_transaction_result_notifier(
&self,
) -> Option<BankingTransactionResultNotifier> {
self.banking_transaction_result_notifier.clone()
}
pub fn get_block_metadata_notifier(&self) -> Option<BlockMetadataNotifierArc> {
self.block_metadata_notifier.clone()
}

View File

@ -1,4 +1,5 @@
pub mod accounts_update_notifier;
pub mod banking_transaction_result_notifier;
pub mod block_metadata_notifier;
pub mod block_metadata_notifier_interface;
pub mod entry_notifier;
@ -7,5 +8,4 @@ pub mod geyser_plugin_service;
pub mod slot_status_notifier;
pub mod slot_status_observer;
pub mod transaction_notifier;
pub use geyser_plugin_manager::GeyserPluginManagerRequest;

View File

@ -151,8 +151,9 @@ use {
sysvar::{self, last_restart_slot::LastRestartSlot, Sysvar, SysvarId},
timing::years_as_slots,
transaction::{
self, MessageHash, Result, SanitizedTransaction, Transaction, TransactionError,
TransactionVerificationMode, VersionedTransaction, MAX_TX_ACCOUNT_LOCKS,
self, BankingTransactionResultNotifier, MessageHash, Result, SanitizedTransaction,
Transaction, TransactionError, TransactionVerificationMode, VersionedTransaction,
MAX_TX_ACCOUNT_LOCKS,
},
transaction_context::{TransactionAccount, TransactionReturnData},
},
@ -547,6 +548,7 @@ impl PartialEq for Bank {
loaded_programs_cache: _,
epoch_reward_status: _,
transaction_processor: _,
banking_transaction_result_notifier: _,
// Ignore new fields explicitly if they do not impact PartialEq.
// Adding ".." will remove compile-time checks that if a new field
// is added to the struct, this PartialEq is accordingly updated.
@ -807,6 +809,8 @@ pub struct Bank {
epoch_reward_status: EpochRewardStatus,
transaction_processor: TransactionBatchProcessor<BankForks>,
/// geyser plugin to notify transaction results
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
}
struct VoteWithStakeDelegations {
@ -993,6 +997,7 @@ impl Bank {
))),
epoch_reward_status: EpochRewardStatus::default(),
transaction_processor: TransactionBatchProcessor::default(),
banking_transaction_result_notifier: None,
};
bank.transaction_processor = TransactionBatchProcessor::new(
@ -1311,6 +1316,7 @@ impl Bank {
loaded_programs_cache: parent.loaded_programs_cache.clone(),
epoch_reward_status: parent.epoch_reward_status.clone(),
transaction_processor: TransactionBatchProcessor::default(),
banking_transaction_result_notifier: None,
};
new.transaction_processor = TransactionBatchProcessor::new(
@ -1828,6 +1834,7 @@ impl Bank {
))),
epoch_reward_status: fields.epoch_reward_status,
transaction_processor: TransactionBatchProcessor::default(),
banking_transaction_result_notifier: None,
};
bank.transaction_processor = TransactionBatchProcessor::new(
@ -4083,6 +4090,9 @@ impl Bank {
let mut status_cache = self.status_cache.write().unwrap();
assert_eq!(sanitized_txs.len(), execution_results.len());
for (tx, execution_result) in sanitized_txs.iter().zip(execution_results) {
if let TransactionExecutionResult::NotExecuted(err) = execution_result {
self.notify_transaction_error(tx, Some(err.clone()));
}
if let Some(details) = execution_result.details() {
// Add the message hash to the status cache to ensure that this message
// won't be processed again with a different signature.
@ -4439,6 +4449,7 @@ impl Bank {
(Ok(()), Some(nonce), lamports_per_signature)
} else {
error_counters.blockhash_not_found += 1;
self.notify_transaction_error(tx, Some(TransactionError::BlockhashNotFound));
(Err(TransactionError::BlockhashNotFound), None, None)
}
}
@ -4471,6 +4482,10 @@ impl Bank {
&& self.is_transaction_already_processed(sanitized_tx, &rcache)
{
error_counters.already_processed += 1;
self.notify_transaction_error(
sanitized_tx,
Some(TransactionError::AlreadyProcessed),
);
return (Err(TransactionError::AlreadyProcessed), None, None);
}
@ -4542,6 +4557,23 @@ impl Bank {
balances
}
pub fn notify_transaction_error(
&self,
transaction: &SanitizedTransaction,
result: Option<TransactionError>,
) {
if let Some(transaction_result_notifier_lock) = &self.banking_transaction_result_notifier {
let transaction_error_notifier = transaction_result_notifier_lock.lock.read();
if let Ok(transaction_error_notifier) = transaction_error_notifier {
transaction_error_notifier.notify_banking_transaction_result(
transaction,
result,
self.slot,
);
}
}
}
#[allow(clippy::too_many_arguments, clippy::type_complexity)]
pub fn load_and_execute_transactions(
&self,
@ -4595,6 +4627,23 @@ impl Bank {
})
.collect();
if let Some(transaction_result_notifier_lock) = &self.banking_transaction_result_notifier {
let transaction_error_notifier = transaction_result_notifier_lock.lock.read();
if let Ok(transaction_error_notifier) = transaction_error_notifier {
batch
.sanitized_transactions()
.iter()
.zip(batch.lock_results())
.for_each(|(transaction, result)| {
transaction_error_notifier.notify_banking_transaction_result(
transaction,
result.clone().err(),
self.slot,
);
});
}
}
let mut check_time = Measure::start("check_transactions");
let mut check_results = self.check_transactions(
sanitized_txs,
@ -7505,6 +7554,17 @@ impl Bank {
self.transaction_processor
.load_program(self, pubkey, reload, effective_epoch)
}
pub fn set_banking_transaction_results_notifier(
&mut self,
banking_transaction_result_notifier: Option<BankingTransactionResultNotifier>,
) {
self.banking_transaction_result_notifier = banking_transaction_result_notifier;
}
pub fn has_banking_transaction_results_notifier(&self) -> bool {
self.banking_transaction_result_notifier.is_some()
}
}
impl TransactionProcessingCallback for Bank {

View File

@ -3,8 +3,14 @@ use {
instruction::InstructionError,
message::{AddressLoaderError, SanitizeMessageError},
sanitize::SanitizeError,
slot_history::Slot,
transaction::SanitizedTransaction,
},
serde::Serialize,
std::{
fmt::Debug,
sync::{Arc, RwLock},
},
thiserror::Error,
};
@ -198,3 +204,42 @@ impl From<AddressLoaderError> for TransactionError {
}
}
}
pub trait TransactionResultNotifier: Debug {
fn notify_banking_transaction_result(
&self,
transaction: &SanitizedTransaction,
error: Option<TransactionError>,
slot: Slot,
);
}
#[derive(Clone, Debug)]
pub struct BankingTransactionResultNotifier {
pub lock: Arc<RwLock<dyn TransactionResultNotifier + Sync + Send>>,
}
#[cfg(RUSTC_WITH_SPECIALIZATION)]
#[derive(Debug)]
struct DummyTransactionResultNotifier {}
#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl TransactionResultNotifier for DummyTransactionResultNotifier {
fn notify_banking_transaction_result(
&self,
_: &SanitizedTransaction,
_: Option<TransactionError>,
_: Slot,
) {
}
}
#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl solana_frozen_abi::abi_example::AbiExample for BankingTransactionResultNotifier {
fn example() -> Self {
// BankingTransactionResultNotifier isn't serializable by definition.
BankingTransactionResultNotifier {
lock: Arc::new(RwLock::new(DummyTransactionResultNotifier {})),
}
}
}