diff --git a/rpc/src/rpc_subscription_tracker.rs b/rpc/src/rpc_subscription_tracker.rs index ffeffcb7b6..e0bc041bd5 100644 --- a/rpc/src/rpc_subscription_tracker.rs +++ b/rpc/src/rpc_subscription_tracker.rs @@ -13,10 +13,7 @@ use { }, solana_transaction_status::{TransactionDetails, UiTransactionEncoding}, std::{ - collections::{ - hash_map::{Entry, HashMap}, - HashSet, - }, + collections::hash_map::{Entry, HashMap}, fmt, sync::{ atomic::{AtomicU64, Ordering}, @@ -290,6 +287,21 @@ impl SubscriptionControl { }) } + #[cfg(test)] + pub fn logs_subscribed(&self, pubkey: Option<&Pubkey>) -> bool { + self.0.subscriptions.iter().any(|item| { + if let SubscriptionParams::Logs(params) = item.key() { + let subscribed_pubkey = match ¶ms.kind { + LogsSubscriptionKind::All | LogsSubscriptionKind::AllWithVotes => None, + LogsSubscriptionKind::Single(pubkey) => Some(pubkey), + }; + subscribed_pubkey == pubkey + } else { + false + } + }) + } + #[cfg(test)] pub fn signature_subscribed(&self, signature: &Signature) -> bool { self.0.subscriptions.iter().any(|item| { @@ -373,20 +385,21 @@ impl LogsSubscriptionsIndex { } fn update_config(&self) { + let mentioned_addresses = self.single_count.keys().copied().collect(); let config = if self.all_with_votes_count > 0 { TransactionLogCollectorConfig { filter: TransactionLogCollectorFilter::AllWithVotes, - mentioned_addresses: HashSet::new(), + mentioned_addresses, } } else if self.all_count > 0 { TransactionLogCollectorConfig { filter: TransactionLogCollectorFilter::All, - mentioned_addresses: HashSet::new(), + mentioned_addresses, } } else { TransactionLogCollectorConfig { filter: TransactionLogCollectorFilter::OnlyMentionedAddresses, - mentioned_addresses: self.single_count.keys().copied().collect(), + mentioned_addresses, } }; diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 1710c5f107..d25670ef1b 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -1210,7 +1210,8 @@ pub(crate) mod tests { serial_test::serial, solana_client::rpc_config::{ RpcAccountInfoConfig, RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, - RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsFilter, + RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, RpcTransactionLogsConfig, + RpcTransactionLogsFilter, }, solana_runtime::{ commitment::BlockCommitment, @@ -2791,6 +2792,114 @@ pub(crate) mod tests { assert!(!subscriptions.control.account_subscribed(&alice.pubkey())); } + fn make_logs_result(signature: &str, subscription_id: u64) -> serde_json::Value { + json!({ + "jsonrpc": "2.0", + "method": "logsNotification", + "params": { + "result": { + "context": { + "slot": 0 + }, + "value": { + "signature": signature, + "err": null, + "logs": [ + "Program 11111111111111111111111111111111 invoke [1]", + "Program 11111111111111111111111111111111 success" + ] + } + }, + "subscription": subscription_id + } + }) + } + + #[test] + #[serial] + fn test_logs_subscribe() { + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(100); + let bank = Bank::new_for_tests(&genesis_config); + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + + let alice = Keypair::new(); + + let exit = Arc::new(AtomicBool::new(false)); + let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let subscriptions = Arc::new(RpcSubscriptions::new_for_tests( + &exit, + max_complete_transaction_status_slot, + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests())), + optimistically_confirmed_bank, + )); + + let sub_config = RpcTransactionLogsConfig { + commitment: Some(CommitmentConfig::processed()), + }; + + let (rpc_all, mut receiver_all) = rpc_pubsub_service::test_connection(&subscriptions); + let sub_id_for_all = rpc_all + .logs_subscribe(RpcTransactionLogsFilter::All, Some(sub_config.clone())) + .unwrap(); + assert!(subscriptions.control.logs_subscribed(None)); + + let (rpc_alice, mut receiver_alice) = rpc_pubsub_service::test_connection(&subscriptions); + let sub_id_for_alice = rpc_alice + .logs_subscribe( + RpcTransactionLogsFilter::Mentions(vec![alice.pubkey().to_string()]), + Some(sub_config), + ) + .unwrap(); + assert!(subscriptions.control.logs_subscribed(Some(&alice.pubkey()))); + + let tx = system_transaction::create_account( + &mint_keypair, + &alice, + blockhash, + 1, + 0, + &system_program::id(), + ); + + bank_forks + .read() + .unwrap() + .get(0) + .unwrap() + .process_transaction_with_logs(&tx) + .unwrap(); + + subscriptions.notify_subscribers(CommitmentSlots::new_from_slot(0)); + + let expected_response_all = + make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_all)); + let response_all = receiver_all.recv(); + assert_eq!( + expected_response_all, + serde_json::from_str::(&response_all).unwrap(), + ); + let expected_response_alice = + make_logs_result(&tx.signatures[0].to_string(), u64::from(sub_id_for_alice)); + let response_alice = receiver_alice.recv(); + assert_eq!( + expected_response_alice, + serde_json::from_str::(&response_alice).unwrap(), + ); + + rpc_all.logs_unsubscribe(sub_id_for_all).unwrap(); + assert!(!subscriptions.control.logs_subscribed(None)); + rpc_alice.logs_unsubscribe(sub_id_for_alice).unwrap(); + assert!(!subscriptions.control.logs_subscribed(Some(&alice.pubkey()))); + } + #[test] fn test_total_subscriptions() { let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(100); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 352d66b73a..d6ec4df95f 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -5672,6 +5672,25 @@ impl Bank { .map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap()) } + /// Process a Transaction and store program log data. This is used for unit tests, and simply + /// replicates the vector Bank::process_transactions method with `enable_cpi_recording: true` + pub fn process_transaction_with_logs(&self, tx: &Transaction) -> Result<()> { + let txs = vec![VersionedTransaction::from(tx.clone())]; + let batch = self.prepare_entry_batch(txs)?; + let _results = self.load_execute_and_commit_transactions( + &batch, + MAX_PROCESSING_AGE, + false, + false, + true, + false, + &mut ExecuteTimings::default(), + ); + tx.signatures + .get(0) + .map_or(Ok(()), |sig| self.get_signature_status(sig).unwrap()) + } + /// Process multiple transaction in a single batch. This is used for benches and unit tests. /// /// # Panics