diff --git a/Cargo.lock b/Cargo.lock index 74213e6d11..8cb598a981 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4349,6 +4349,8 @@ name = "solana-accountsdb-plugin-interface" version = "1.9.0" dependencies = [ "log 0.4.14", + "solana-sdk", + "solana-transaction-status", "thiserror", ] @@ -4370,6 +4372,7 @@ dependencies = [ "solana-rpc", "solana-runtime", "solana-sdk", + "solana-transaction-status", "thiserror", ] diff --git a/accountsdb-plugin-interface/Cargo.toml b/accountsdb-plugin-interface/Cargo.toml index 308c83ffd7..a63f1e131e 100644 --- a/accountsdb-plugin-interface/Cargo.toml +++ b/accountsdb-plugin-interface/Cargo.toml @@ -12,6 +12,8 @@ documentation = "https://docs.rs/solana-validator" [dependencies] log = "0.4.11" thiserror = "1.0.30" +solana-sdk = { path = "../sdk", version = "=1.9.0" } +solana-transaction-status = { path = "../transaction-status", version = "=1.9.0" } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs index 11948e4322..3a6caa53a2 100644 --- a/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs +++ b/accountsdb-plugin-interface/src/accountsdb_plugin_interface.rs @@ -3,6 +3,8 @@ /// In addition, the dynamic library must export a "C" function _create_plugin which /// creates the implementation of the plugin. use { + solana_sdk::{signature::Signature, transaction::SanitizedTransaction}, + solana_transaction_status::TransactionStatusMeta, std::{any::Any, error, io}, thiserror::Error, }; @@ -24,6 +26,18 @@ pub enum ReplicaAccountInfoVersions<'a> { V0_0_1(&'a ReplicaAccountInfo<'a>), } +#[derive(Clone, Debug)] +pub struct ReplicaTransactionInfo<'a> { + pub signature: &'a Signature, + pub is_vote: bool, + pub transaction: &'a SanitizedTransaction, + pub transaction_status_meta: &'a TransactionStatusMeta, +} + +pub enum ReplicaTransactionInfoVersions<'a> { + V0_0_1(&'a ReplicaTransactionInfo<'a>), +} + #[derive(Error, Debug)] pub enum AccountsDbPluginError { #[error("Error opening config file. Error detail: ({0}).")] @@ -105,10 +119,27 @@ pub trait AccountsDbPlugin: Any + Send + Sync + std::fmt::Debug { Ok(()) } + /// Called when a transaction is updated at a slot. + #[allow(unused_variables)] + fn notify_transaction( + &mut self, + transaction: ReplicaTransactionInfoVersions, + slot: u64, + ) -> Result<()> { + Ok(()) + } + /// Check if the plugin is interested in account data /// Default is true -- if the plugin is not interested in /// account data, please return false. - fn to_notify_account_data(&self) -> bool { + fn account_data_notifications_enabled(&self) -> bool { true } + + /// Check if the plugin is interested in transaction data + /// Default is false -- if the plugin is not interested in + /// transaction data, please return false. + fn transaction_notifications_enabled(&self) -> bool { + false + } } diff --git a/accountsdb-plugin-manager/Cargo.toml b/accountsdb-plugin-manager/Cargo.toml index 5d7176c1a9..582b02243e 100644 --- a/accountsdb-plugin-manager/Cargo.toml +++ b/accountsdb-plugin-manager/Cargo.toml @@ -24,6 +24,7 @@ solana-metrics = { path = "../metrics", version = "=1.9.0" } solana-rpc = { path = "../rpc", version = "=1.9.0" } solana-runtime = { path = "../runtime", version = "=1.9.0" } solana-sdk = { path = "../sdk", version = "=1.9.0" } +solana-transaction-status = { path = "../transaction-status", version = "=1.9.0" } thiserror = "1.0.30" [package.metadata.docs.rs] diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs index d628814f23..52085545f2 100644 --- a/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_manager.rs @@ -54,9 +54,19 @@ impl AccountsDbPluginManager { } /// Check if there is any plugin interested in account data - pub fn to_notify_account_data(&self) -> bool { + pub fn account_data_notifications_enabled(&self) -> bool { for plugin in &self.plugins { - if plugin.to_notify_account_data() { + if plugin.account_data_notifications_enabled() { + return true; + } + } + false + } + + /// Check if there is any plugin interested in transaction data + pub fn transaction_notifications_enabled(&self) -> bool { + for plugin in &self.plugins { + if plugin.transaction_notifications_enabled() { return true; } } diff --git a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs index 2fced0442e..fefaa8c1b8 100644 --- a/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs +++ b/accountsdb-plugin-manager/src/accountsdb_plugin_service.rs @@ -3,11 +3,15 @@ use { accounts_update_notifier::AccountsUpdateNotifierImpl, accountsdb_plugin_manager::AccountsDbPluginManager, slot_status_notifier::SlotStatusNotifierImpl, slot_status_observer::SlotStatusObserver, + transaction_notifier::TransactionNotifierImpl, }, crossbeam_channel::Receiver, log::*, serde_json, - solana_rpc::optimistically_confirmed_bank_tracker::BankNotification, + solana_rpc::{ + optimistically_confirmed_bank_tracker::BankNotification, + transaction_notifier_interface::TransactionNotifierLock, + }, solana_runtime::accounts_update_notifier_interface::AccountsUpdateNotifier, std::{ fs::File, @@ -45,6 +49,7 @@ pub struct AccountsDbPluginService { slot_status_observer: Option, plugin_manager: Arc>, accounts_update_notifier: Option, + transaction_notifier: Option, } impl AccountsDbPluginService { @@ -74,33 +79,47 @@ impl AccountsDbPluginService { for accountsdb_plugin_config_file in accountsdb_plugin_config_files { Self::load_plugin(&mut plugin_manager, accountsdb_plugin_config_file)?; } - let to_notify_account_data = plugin_manager.to_notify_account_data(); + let account_data_notifications_enabled = + plugin_manager.account_data_notifications_enabled(); + let transaction_notifications_enabled = plugin_manager.transaction_notifications_enabled(); let plugin_manager = Arc::new(RwLock::new(plugin_manager)); - let accounts_update_notifier: Option = if to_notify_account_data { - let accounts_update_notifier = AccountsUpdateNotifierImpl::new(plugin_manager.clone()); - Some(Arc::new(RwLock::new(accounts_update_notifier))) - } else { - None - }; + let accounts_update_notifier: Option = + if account_data_notifications_enabled { + let accounts_update_notifier = + AccountsUpdateNotifierImpl::new(plugin_manager.clone()); + Some(Arc::new(RwLock::new(accounts_update_notifier))) + } else { + None + }; - let slot_status_observer = if to_notify_account_data { - let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); - let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier)); - Some(SlotStatusObserver::new( - confirmed_bank_receiver, - slot_status_notifier, - )) - } else { - None - }; + let transaction_notifier: Option = + if transaction_notifications_enabled { + let transaction_notifier = TransactionNotifierImpl::new(plugin_manager.clone()); + Some(Arc::new(RwLock::new(transaction_notifier))) + } else { + None + }; + + let slot_status_observer = + if account_data_notifications_enabled || transaction_notifications_enabled { + let slot_status_notifier = SlotStatusNotifierImpl::new(plugin_manager.clone()); + let slot_status_notifier = Arc::new(RwLock::new(slot_status_notifier)); + Some(SlotStatusObserver::new( + confirmed_bank_receiver, + slot_status_notifier, + )) + } else { + None + }; info!("Started AccountsDbPluginService"); Ok(AccountsDbPluginService { slot_status_observer, plugin_manager, accounts_update_notifier, + transaction_notifier, }) } @@ -163,6 +182,10 @@ impl AccountsDbPluginService { self.accounts_update_notifier.clone() } + pub fn get_transaction_notifier(&self) -> Option { + self.transaction_notifier.clone() + } + pub fn join(self) -> thread::Result<()> { if let Some(mut slot_status_observer) = self.slot_status_observer { slot_status_observer.join()?; diff --git a/accountsdb-plugin-manager/src/lib.rs b/accountsdb-plugin-manager/src/lib.rs index 89c305a29a..e12484eb05 100644 --- a/accountsdb-plugin-manager/src/lib.rs +++ b/accountsdb-plugin-manager/src/lib.rs @@ -3,3 +3,4 @@ pub mod accountsdb_plugin_manager; pub mod accountsdb_plugin_service; pub mod slot_status_notifier; pub mod slot_status_observer; +pub mod transaction_notifier; diff --git a/accountsdb-plugin-manager/src/transaction_notifier.rs b/accountsdb-plugin-manager/src/transaction_notifier.rs new file mode 100644 index 0000000000..02429501ae --- /dev/null +++ b/accountsdb-plugin-manager/src/transaction_notifier.rs @@ -0,0 +1,93 @@ +/// Module responsible for notifying plugins of transactions +use { + crate::accountsdb_plugin_manager::AccountsDbPluginManager, + log::*, + solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ + ReplicaTransactionInfo, ReplicaTransactionInfoVersions, + }, + solana_measure::measure::Measure, + solana_metrics::*, + solana_rpc::transaction_notifier_interface::TransactionNotifier, + solana_runtime::bank, + solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction}, + solana_transaction_status::TransactionStatusMeta, + std::sync::{Arc, RwLock}, +}; + +/// This implementation of TransactionNotifier is passed to the rpc's TransactionStatusService +/// at the validator startup. TransactionStatusService invokes the notify_transaction method +/// for new transactions. The implementation in turn invokes the notify_transaction of each +/// plugin enabled with transaction notification managed by the AccountsDbPluginManager. +pub(crate) struct TransactionNotifierImpl { + plugin_manager: Arc>, +} + +impl TransactionNotifier for TransactionNotifierImpl { + fn notify_transaction( + &self, + slot: Slot, + signature: &Signature, + transaction_status_meta: &TransactionStatusMeta, + transaction: &SanitizedTransaction, + ) { + let mut measure = Measure::start("accountsdb-plugin-notify_plugins_of_transaction_info"); + let transaction_log_info = + Self::build_replica_transaction_info(signature, transaction_status_meta, transaction); + + let mut plugin_manager = self.plugin_manager.write().unwrap(); + + if plugin_manager.plugins.is_empty() { + return; + } + + for plugin in plugin_manager.plugins.iter_mut() { + if !plugin.transaction_notifications_enabled() { + continue; + } + match plugin.notify_transaction( + ReplicaTransactionInfoVersions::V0_0_1(&transaction_log_info), + slot, + ) { + Err(err) => { + error!( + "Failed to notify transaction, error: ({}) to plugin {}", + err, + plugin.name() + ) + } + Ok(_) => { + trace!( + "Successfully notified transaction to plugin {}", + plugin.name() + ); + } + } + } + measure.stop(); + inc_new_counter_debug!( + "accountsdb-plugin-notify_plugins_of_transaction_info-us", + measure.as_us() as usize, + 10000, + 10000 + ); + } +} + +impl TransactionNotifierImpl { + pub fn new(plugin_manager: Arc>) -> Self { + Self { plugin_manager } + } + + fn build_replica_transaction_info<'a>( + signature: &'a Signature, + transaction_status_meta: &'a TransactionStatusMeta, + transaction: &'a SanitizedTransaction, + ) -> ReplicaTransactionInfo<'a> { + ReplicaTransactionInfo { + signature, + is_vote: bank::is_simple_vote_transaction(transaction), + transaction, + transaction_status_meta, + } + } +} diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs index 165f971e67..111b5f6dd9 100644 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs @@ -280,7 +280,7 @@ impl AccountsDbPlugin for AccountsDbPluginPostgres { /// Check if the plugin is interested in account data /// Default is true -- if the plugin is not interested in /// account data, please return false. - fn to_notify_account_data(&self) -> bool { + fn account_data_notifications_enabled(&self) -> bool { self.accounts_selector .as_ref() .map_or_else(|| false, |selector| selector.is_enabled()) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 200866d664..d92d6d04c5 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -2541,6 +2541,8 @@ mod tests { let transaction_status_service = TransactionStatusService::new( transaction_status_receiver, Arc::new(AtomicU64::default()), + true, + None, blockstore.clone(), &Arc::new(AtomicBool::new(false)), ); diff --git a/core/src/validator.rs b/core/src/validator.rs index 08d2a71fbe..f081ea7328 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -60,6 +60,7 @@ use { rpc_pubsub_service::{PubSubConfig, PubSubService}, rpc_service::JsonRpcService, rpc_subscriptions::RpcSubscriptions, + transaction_notifier_interface::TransactionNotifierLock, transaction_status_service::TransactionStatusService, }, solana_runtime::{ @@ -420,9 +421,18 @@ impl Validator { .and_then(|accountsdb_plugin_service| { accountsdb_plugin_service.get_accounts_update_notifier() }); + + let transaction_notifier = + accountsdb_plugin_service + .as_ref() + .and_then(|accountsdb_plugin_service| { + accountsdb_plugin_service.get_transaction_notifier() + }); + info!( - "AccountsDb plugin: accounts_update_notifier: {}", - accounts_update_notifier.is_some() + "AccountsDb plugin: accounts_update_notifier: {} transaction_notifier: {}", + accounts_update_notifier.is_some(), + transaction_notifier.is_some() ); let system_monitor_service = Some(SystemMonitorService::new( @@ -461,6 +471,7 @@ impl Validator { config.no_poh_speed_test, accounts_package_channel.0.clone(), accounts_update_notifier, + transaction_notifier, ); *start_progress.write().unwrap() = ValidatorStartProgress::StartingServices; @@ -1183,6 +1194,7 @@ fn new_banks_from_ledger( no_poh_speed_test: bool, accounts_package_sender: AccountsPackageSender, accounts_update_notifier: Option, + transaction_notifier: Option, ) -> ( GenesisConfig, BankForks, @@ -1275,12 +1287,17 @@ fn new_banks_from_ledger( ..blockstore_processor::ProcessOptions::default() }; + let enable_rpc_transaction_history = + config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history; + let is_plugin_transaction_history_required = transaction_notifier.as_ref().is_some(); let transaction_history_services = - if config.rpc_addrs.is_some() && config.rpc_config.enable_rpc_transaction_history { + if enable_rpc_transaction_history || is_plugin_transaction_history_required { initialize_rpc_transaction_history_services( blockstore.clone(), exit, + enable_rpc_transaction_history, config.rpc_config.enable_cpi_and_log_storage, + transaction_notifier, ) } else { TransactionHistoryServices::default() @@ -1471,7 +1488,9 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi fn initialize_rpc_transaction_history_services( blockstore: Arc, exit: &Arc, + enable_rpc_transaction_history: bool, enable_cpi_and_log_storage: bool, + transaction_notifier: Option, ) -> TransactionHistoryServices { let max_complete_transaction_status_slot = Arc::new(AtomicU64::new(blockstore.max_root())); let (transaction_status_sender, transaction_status_receiver) = unbounded(); @@ -1482,6 +1501,8 @@ fn initialize_rpc_transaction_history_services( let transaction_status_service = Some(TransactionStatusService::new( transaction_status_receiver, max_complete_transaction_status_slot.clone(), + enable_rpc_transaction_history, + transaction_notifier.clone(), blockstore.clone(), exit, )); diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 9f2563146a..e934981ac1 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -11,6 +11,7 @@ pub mod rpc_pubsub_service; pub mod rpc_service; pub mod rpc_subscription_tracker; pub mod rpc_subscriptions; +pub mod transaction_notifier_interface; pub mod transaction_status_service; #[macro_use] diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 6b89dadde2..2fc558d6bf 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -4263,6 +4263,8 @@ pub fn create_test_transactions_and_populate_blockstore( crate::transaction_status_service::TransactionStatusService::new( transaction_status_receiver, max_complete_transaction_status_slot, + true, + None, blockstore, &Arc::new(AtomicBool::new(false)), ); diff --git a/rpc/src/transaction_notifier_interface.rs b/rpc/src/transaction_notifier_interface.rs new file mode 100644 index 0000000000..5d67674e03 --- /dev/null +++ b/rpc/src/transaction_notifier_interface.rs @@ -0,0 +1,17 @@ +use { + solana_sdk::{clock::Slot, signature::Signature, transaction::SanitizedTransaction}, + solana_transaction_status::TransactionStatusMeta, + std::sync::{Arc, RwLock}, +}; + +pub trait TransactionNotifier { + fn notify_transaction( + &self, + slot: Slot, + signature: &Signature, + transaction_status_meta: &TransactionStatusMeta, + transaction: &SanitizedTransaction, + ); +} + +pub type TransactionNotifierLock = Arc>; diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index d28099e864..50d7ecb09f 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -1,4 +1,5 @@ use { + crate::transaction_notifier_interface::TransactionNotifierLock, crossbeam_channel::{Receiver, RecvTimeoutError}, itertools::izip, solana_ledger::{ @@ -28,6 +29,8 @@ impl TransactionStatusService { pub fn new( write_transaction_status_receiver: Receiver, max_complete_transaction_status_slot: Arc, + enable_rpc_transaction_history: bool, + transaction_notifier: Option, blockstore: Arc, exit: &Arc, ) -> Self { @@ -38,9 +41,12 @@ impl TransactionStatusService { if exit.load(Ordering::Relaxed) { break; } + if let Err(RecvTimeoutError::Disconnected) = Self::write_transaction_status_batch( &write_transaction_status_receiver, &max_complete_transaction_status_slot, + enable_rpc_transaction_history, + transaction_notifier.clone(), &blockstore, ) { break; @@ -53,6 +59,8 @@ impl TransactionStatusService { fn write_transaction_status_batch( write_transaction_status_receiver: &Receiver, max_complete_transaction_status_slot: &Arc, + enable_rpc_transaction_history: bool, + transaction_notifier: Option, blockstore: &Arc, ) -> Result<(), RecvTimeoutError> { match write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))? { @@ -145,31 +153,43 @@ impl TransactionStatusService { .collect(), ); - if let Some(memos) = extract_and_fmt_memos(transaction.message()) { - blockstore - .write_transaction_memos(transaction.signature(), memos) - .expect("Expect database write to succeed: TransactionMemos"); - } + let transaction_status_meta = TransactionStatusMeta { + status, + fee, + pre_balances, + post_balances, + inner_instructions, + log_messages, + pre_token_balances, + post_token_balances, + rewards, + }; - blockstore - .write_transaction_status( + if let Some(transaction_notifier) = transaction_notifier.as_ref() { + transaction_notifier.write().unwrap().notify_transaction( slot, - *transaction.signature(), - tx_account_locks.writable, - tx_account_locks.readonly, - TransactionStatusMeta { - status, - fee, - pre_balances, - post_balances, - inner_instructions, - log_messages, - pre_token_balances, - post_token_balances, - rewards, - }, - ) - .expect("Expect database write to succeed: TransactionStatus"); + transaction.signature(), + &transaction_status_meta, + &transaction, + ); + } + if enable_rpc_transaction_history { + if let Some(memos) = extract_and_fmt_memos(transaction.message()) { + blockstore + .write_transaction_memos(transaction.signature(), memos) + .expect("Expect database write to succeed: TransactionMemos"); + } + + blockstore + .write_transaction_status( + slot, + *transaction.signature(), + tx_account_locks.writable, + tx_account_locks.readonly, + transaction_status_meta, + ) + .expect("Expect database write to succeed: TransactionStatus"); + } } } } @@ -184,3 +204,209 @@ impl TransactionStatusService { self.thread_hdl.join() } } + +#[cfg(test)] +pub(crate) mod tests { + use { + super::*, + crate::transaction_notifier_interface::TransactionNotifier, + crossbeam_channel::unbounded, + dashmap::DashMap, + solana_account_decoder::parse_token::token_amount_to_ui_amount, + solana_ledger::{genesis_utils::create_genesis_config, get_tmp_ledger_path}, + solana_runtime::bank::{NonceFull, NoncePartial, RentDebits, TransactionBalancesSet}, + solana_sdk::{ + account_utils::StateMut, + clock::Slot, + hash::Hash, + instruction::CompiledInstruction, + message::{Message, MessageHeader, SanitizedMessage}, + nonce, nonce_account, + pubkey::Pubkey, + signature::{Keypair, Signature, Signer}, + system_transaction, + transaction::{ + SanitizedTransaction, Transaction, TransactionError, VersionedTransaction, + }, + }, + solana_transaction_status::{ + token_balances::TransactionTokenBalancesSet, TransactionStatusMeta, + TransactionTokenBalance, + }, + std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread::sleep, + time::Duration, + }, + }; + + struct TestTransactionNotifier { + notifications: DashMap<(Slot, Signature), (TransactionStatusMeta, SanitizedTransaction)>, + } + + impl TestTransactionNotifier { + pub fn new() -> Self { + Self { + notifications: DashMap::default(), + } + } + } + + impl TransactionNotifier for TestTransactionNotifier { + fn notify_transaction( + &self, + slot: Slot, + signature: &Signature, + transaction_status_meta: &TransactionStatusMeta, + transaction: &SanitizedTransaction, + ) { + self.notifications.insert( + (slot, *signature), + (transaction_status_meta.clone(), transaction.clone()), + ); + } + } + + fn build_test_transaction_legacy() -> Transaction { + let keypair1 = Keypair::new(); + let pubkey1 = keypair1.pubkey(); + let zero = Hash::default(); + system_transaction::transfer(&keypair1, &pubkey1, 42, zero) + } + + fn build_message() -> Message { + Message { + header: MessageHeader { + num_readonly_signed_accounts: 11, + num_readonly_unsigned_accounts: 12, + num_required_signatures: 13, + }, + account_keys: vec![Pubkey::new_unique()], + recent_blockhash: Hash::new_unique(), + instructions: vec![CompiledInstruction { + program_id_index: 1, + accounts: vec![1, 2, 3], + data: vec![4, 5, 6], + }], + } + } + + #[test] + fn test_notify_transaction() { + let genesis_config = create_genesis_config(2).genesis_config; + let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); + + let (transaction_status_sender, transaction_status_receiver) = unbounded(); + let ledger_path = get_tmp_ledger_path!(); + let blockstore = + Blockstore::open(&ledger_path).expect("Expected to be able to open database ledger"); + let blockstore = Arc::new(blockstore); + + let message_hash = Hash::new_unique(); + let transaction = build_test_transaction_legacy(); + let transaction = VersionedTransaction::from(transaction); + let transaction = + SanitizedTransaction::try_create(transaction, message_hash, Some(true), |_| { + Err(TransactionError::UnsupportedVersion) + }) + .unwrap(); + + let expected_transaction = transaction.clone(); + let pubkey = Pubkey::new_unique(); + + let mut nonce_account = nonce_account::create_account(1).into_inner(); + let data = nonce::state::Data::new(Pubkey::new(&[1u8; 32]), Hash::new(&[42u8; 32]), 42); + nonce_account + .set_state(&nonce::state::Versions::new_current( + nonce::State::Initialized(data), + )) + .unwrap(); + + let message = build_message(); + + let rollback_partial = NoncePartial::new(pubkey, nonce_account.clone()); + + let mut rent_debits = RentDebits::default(); + rent_debits.insert(&pubkey, 123, 456); + + let transaction_result = ( + Ok(()), + Some( + NonceFull::from_partial( + rollback_partial, + &SanitizedMessage::Legacy(message), + &[(pubkey, nonce_account)], + &rent_debits, + ) + .unwrap(), + ), + ); + + let balances = TransactionBalancesSet { + pre_balances: vec![vec![123456]], + post_balances: vec![vec![234567]], + }; + + let owner = Pubkey::new_unique().to_string(); + let pre_token_balance = TransactionTokenBalance { + account_index: 0, + mint: Pubkey::new_unique().to_string(), + ui_token_amount: token_amount_to_ui_amount(42, 2), + owner: owner.clone(), + }; + + let post_token_balance = TransactionTokenBalance { + account_index: 0, + mint: Pubkey::new_unique().to_string(), + ui_token_amount: token_amount_to_ui_amount(58, 2), + owner, + }; + + let token_balances = TransactionTokenBalancesSet { + pre_token_balances: vec![vec![pre_token_balance]], + post_token_balances: vec![vec![post_token_balance]], + }; + + let slot = bank.slot(); + let signature = *transaction.signature(); + let transaction_status_batch = TransactionStatusBatch { + bank, + transactions: vec![transaction], + statuses: vec![transaction_result], + balances, + token_balances, + inner_instructions: None, + transaction_logs: None, + rent_debits: vec![rent_debits], + }; + + let test_notifier = Arc::new(RwLock::new(TestTransactionNotifier::new())); + + let exit = Arc::new(AtomicBool::new(false)); + let transaction_status_service = TransactionStatusService::new( + transaction_status_receiver, + Arc::new(AtomicU64::default()), + false, + Some(test_notifier.clone()), + blockstore, + &exit, + ); + + transaction_status_sender + .send(TransactionStatusMessage::Batch(transaction_status_batch)) + .unwrap(); + sleep(Duration::from_millis(500)); + + exit.store(true, Ordering::Relaxed); + transaction_status_service.join().unwrap(); + let notifier = test_notifier.read().unwrap(); + assert_eq!(notifier.notifications.len(), 1); + assert!(notifier.notifications.contains_key(&(slot, signature))); + + let result = &*notifier.notifications.get(&(slot, signature)).unwrap(); + assert_eq!(expected_transaction.signature(), result.1.signature()); + } +}