From 1a372a792ec4a646a24ad2faaac9793d84784757 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Tue, 10 Aug 2021 16:44:45 -0700 Subject: [PATCH] Fixing missing pubsub notification for programSubscribe and logsSubscribe (#19092) #18587: programSubscribe is missing notifications randomly. The issue is because of two reasons Not all rooted slots get OptimisticallyConfirmed notifications The OptimisticallyConfirmed notifications can be out of order for slots: slot A and B with A < B can see notification for B first before A. Summary of Changes Changed OptimisticallyConfirmedBankTracker to send notifications for parent banks if they have not been notified yet. We use a new variable last_notified_slot to track that. Tests: With my validator running against testnet, before the fix, it was failing 75% of time, with the fix, it is passing consistently. Using the program mentioned in #18587. --- .../optimistically_confirmed_bank_tracker.rs | 125 ++++- rpc/src/rpc.rs | 10 + rpc/src/rpc_subscriptions.rs | 464 +++++++++++++++++- 3 files changed, 588 insertions(+), 11 deletions(-) diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index 72182aa4e..12de21495 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -66,6 +66,8 @@ impl OptimisticallyConfirmedBankTracker { ) -> Self { let exit_ = exit.clone(); let mut pending_optimistically_confirmed_banks = HashSet::new(); + let mut last_notified_confirmed_slot: Slot = 0; + let mut highest_confirmed_slot: Slot = 0; let thread_hdl = Builder::new() .name("solana-optimistic-bank-tracker".to_string()) .spawn(move || loop { @@ -79,6 +81,8 @@ impl OptimisticallyConfirmedBankTracker { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ) { break; } @@ -93,6 +97,8 @@ impl OptimisticallyConfirmedBankTracker { optimistically_confirmed_bank: &Arc>, subscriptions: &Arc, mut pending_optimistically_confirmed_banks: &mut HashSet, + mut last_notified_confirmed_slot: &mut Slot, + mut highest_confirmed_slot: &mut Slot, ) -> Result<(), RecvTimeoutError> { let notification = receiver.recv_timeout(Duration::from_secs(1))?; Self::process_notification( @@ -101,31 +107,91 @@ impl OptimisticallyConfirmedBankTracker { optimistically_confirmed_bank, subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); Ok(()) } + fn notify_or_defer( + subscriptions: &Arc, + bank_forks: &Arc>, + bank: &Arc, + last_notified_confirmed_slot: &mut Slot, + pending_optimistically_confirmed_banks: &mut HashSet, + ) { + if bank.is_frozen() { + if bank.slot() > *last_notified_confirmed_slot { + debug!( + "notify_or_defer notifying via notify_gossip_subscribers for slot {:?}", + bank.slot() + ); + subscriptions.notify_gossip_subscribers(bank.slot()); + *last_notified_confirmed_slot = bank.slot(); + } + } else if bank.slot() > bank_forks.read().unwrap().root_bank().slot() { + pending_optimistically_confirmed_banks.insert(bank.slot()); + debug!("notify_or_defer defer notifying for slot {:?}", bank.slot()); + } + } + + fn notify_or_defer_confirmed_banks( + subscriptions: &Arc, + bank_forks: &Arc>, + bank: &Arc, + slot_threshold: Slot, + mut last_notified_confirmed_slot: &mut Slot, + mut pending_optimistically_confirmed_banks: &mut HashSet, + ) { + for confirmed_bank in bank.clone().parents_inclusive().iter().rev() { + if confirmed_bank.slot() > slot_threshold { + debug!( + "Calling notify_or_defer for confirmed_bank {:?}", + confirmed_bank.slot() + ); + Self::notify_or_defer( + subscriptions, + bank_forks, + confirmed_bank, + &mut last_notified_confirmed_slot, + &mut pending_optimistically_confirmed_banks, + ); + } + } + } + pub fn process_notification( notification: BankNotification, bank_forks: &Arc>, optimistically_confirmed_bank: &Arc>, subscriptions: &Arc, - pending_optimistically_confirmed_banks: &mut HashSet, + mut pending_optimistically_confirmed_banks: &mut HashSet, + mut last_notified_confirmed_slot: &mut Slot, + highest_confirmed_slot: &mut Slot, ) { debug!("received bank notification: {:?}", notification); match notification { BankNotification::OptimisticallyConfirmed(slot) => { - if let Some(bank) = bank_forks - .read() - .unwrap() - .get(slot) - .filter(|b| b.is_frozen()) - { + if let Some(bank) = bank_forks.read().unwrap().get(slot) { let mut w_optimistically_confirmed_bank = optimistically_confirmed_bank.write().unwrap(); - if bank.slot() > w_optimistically_confirmed_bank.bank.slot() { + + if bank.slot() > w_optimistically_confirmed_bank.bank.slot() && bank.is_frozen() + { w_optimistically_confirmed_bank.bank = bank.clone(); - subscriptions.notify_gossip_subscribers(slot); + } + + if slot > *highest_confirmed_slot { + Self::notify_or_defer_confirmed_banks( + subscriptions, + bank_forks, + bank, + *highest_confirmed_slot, + &mut last_notified_confirmed_slot, + &mut pending_optimistically_confirmed_banks, + ); + + *highest_confirmed_slot = slot; } drop(w_optimistically_confirmed_bank); } else if slot > bank_forks.read().unwrap().root_bank().slot() { @@ -159,11 +225,24 @@ impl OptimisticallyConfirmedBankTracker { } if pending_optimistically_confirmed_banks.remove(&bank.slot()) { + debug!( + "Calling notify_gossip_subscribers to send deferred notification {:?}", + frozen_slot + ); + + Self::notify_or_defer_confirmed_banks( + subscriptions, + bank_forks, + &bank, + *last_notified_confirmed_slot, + &mut last_notified_confirmed_slot, + &mut pending_optimistically_confirmed_banks, + ); + let mut w_optimistically_confirmed_bank = optimistically_confirmed_bank.write().unwrap(); if frozen_slot > w_optimistically_confirmed_bank.bank.slot() { w_optimistically_confirmed_bank.bank = bank; - subscriptions.notify_gossip_subscribers(frozen_slot); } drop(w_optimistically_confirmed_bank); } @@ -231,14 +310,19 @@ mod tests { assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0); + let mut highest_confirmed_slot: Slot = 0; + let mut last_notified_confirmed_slot: Slot = 0; OptimisticallyConfirmedBankTracker::process_notification( BankNotification::OptimisticallyConfirmed(2), &bank_forks, &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); + assert_eq!(highest_confirmed_slot, 2); // Test max optimistically confirmed bank remains in the cache OptimisticallyConfirmedBankTracker::process_notification( @@ -247,8 +331,11 @@ mod tests { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); + assert_eq!(highest_confirmed_slot, 2); // Test bank will only be cached when frozen OptimisticallyConfirmedBankTracker::process_notification( @@ -257,21 +344,30 @@ mod tests { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); assert!(pending_optimistically_confirmed_banks.contains(&3)); + assert_eq!(highest_confirmed_slot, 3); // Test bank will only be cached when frozen let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone(); + bank3.freeze(); + OptimisticallyConfirmedBankTracker::process_notification( BankNotification::Frozen(bank3), &bank_forks, &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); + assert_eq!(highest_confirmed_slot, 3); + assert_eq!(pending_optimistically_confirmed_banks.len(), 0); // Test higher root will be cached and clear pending_optimistically_confirmed_banks let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone(); @@ -283,10 +379,13 @@ mod tests { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3); assert_eq!(pending_optimistically_confirmed_banks.len(), 1); assert!(pending_optimistically_confirmed_banks.contains(&4)); + assert_eq!(highest_confirmed_slot, 4); let bank4 = bank_forks.read().unwrap().get(4).unwrap().clone(); let bank5 = Bank::new_from_parent(&bank4, &Pubkey::default(), 5); @@ -298,10 +397,13 @@ mod tests { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); assert!(!pending_optimistically_confirmed_banks.contains(&4)); + assert_eq!(highest_confirmed_slot, 4); // Banks <= root do not get added to pending list, even if not frozen let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone(); @@ -320,9 +422,12 @@ mod tests { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5); assert_eq!(pending_optimistically_confirmed_banks.len(), 0); assert!(!pending_optimistically_confirmed_banks.contains(&6)); + assert_eq!(highest_confirmed_slot, 4); } } diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 1ac694740..af30684d8 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -7529,6 +7529,8 @@ pub mod tests { let json: Value = serde_json::from_str(&res.unwrap()).unwrap(); let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap(); assert_eq!(slot, 0); + let mut highest_confirmed_slot: Slot = 0; + let mut last_notified_confirmed_slot: Slot = 0; OptimisticallyConfirmedBankTracker::process_notification( BankNotification::OptimisticallyConfirmed(2), @@ -7536,6 +7538,8 @@ pub mod tests { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7551,6 +7555,8 @@ pub mod tests { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7566,6 +7572,8 @@ pub mod tests { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; @@ -7582,6 +7590,8 @@ pub mod tests { &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); let req = r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#; diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 6e364dcab..5537cfdc0 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -1346,7 +1346,10 @@ pub(crate) mod tests { stake, system_instruction, system_program, system_transaction, transaction::Transaction, }, - std::{fmt::Debug, sync::mpsc::channel}, + std::{ + fmt::Debug, + sync::{atomic::Ordering::Relaxed, mpsc::channel}, + }, tokio::{ runtime::Runtime, time::{sleep, timeout}, @@ -1619,6 +1622,454 @@ pub(crate) mod tests { .contains_key(&stake::program::id())); } + #[test] + #[serial] + fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot() { + // Testing if we can get the pubsub notification if a slot does not + // receive OptimisticallyConfirmed but its descendant slot get the confirmed + // notification. + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(100); + let bank = Bank::new_for_tests(&genesis_config); + bank.lazy_rent_collection.store(true, Relaxed); + + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone(); + + // add account for alice and process the transaction at bank1 + let alice = Keypair::new(); + let tx = system_transaction::create_account( + &mint_keypair, + &alice, + blockhash, + 1, + 16, + &stake::program::id(), + ); + + bank1.process_transaction(&tx).unwrap(); + + let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); + bank_forks.write().unwrap().insert(bank2); + + // add account for bob and process the transaction at bank2 + let bob = Keypair::new(); + let tx = system_transaction::create_account( + &mint_keypair, + &bob, + blockhash, + 2, + 16, + &stake::program::id(), + ); + let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); + + bank2.process_transaction(&tx).unwrap(); + + let bank3 = Bank::new_from_parent(&bank2, &Pubkey::default(), 3); + bank_forks.write().unwrap().insert(bank3); + + // add account for joe and process the transaction at bank3 + let joe = Keypair::new(); + let tx = system_transaction::create_account( + &mint_keypair, + &joe, + blockhash, + 3, + 16, + &stake::program::id(), + ); + let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone(); + + bank3.process_transaction(&tx).unwrap(); + + // now add programSubscribe at the "confirmed" commitment level + let (subscriber, _id_receiver, transport_receiver) = + Subscriber::new_test("programNotification"); + let sub_id = SubscriptionId::Number(0); + let exit = Arc::new(AtomicBool::new(false)); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let mut pending_optimistically_confirmed_banks = HashSet::new(); + + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( + 1, 1, + ))), + optimistically_confirmed_bank.clone(), + )); + subscriptions.add_program_subscription( + stake::program::id(), + Some(RpcProgramAccountsConfig { + account_config: RpcAccountInfoConfig { + commitment: Some(CommitmentConfig::confirmed()), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }), + sub_id.clone(), + subscriber, + ); + + assert!(subscriptions + .subscriptions + .gossip_program_subscriptions + .read() + .unwrap() + .contains_key(&stake::program::id())); + + let mut highest_confirmed_slot: Slot = 0; + let mut last_notified_confirmed_slot: Slot = 0; + // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is unfrozen, we expect + // to see transaction for alice and bob to be notified in order. + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(3), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, + ); + + // a closure to reduce code duplications in building expected responses: + let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| { + json!({ + "jsonrpc": "2.0", + "method": "programNotification", + "params": { + "result": { + "context": { "slot": slot }, + "value": { + "account": { + "data": "1111111111111111", + "executable": false, + "lamports": lamports, + "owner": "Stake11111111111111111111111111111111111111", + "rentEpoch": 0, + }, + "pubkey": pubkey, + }, + }, + "subscription": subscription, + } + }) + }; + + let (response, transport_receiver) = robust_poll_or_panic(transport_receiver); + let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + let (response, transport_receiver) = robust_poll_or_panic(transport_receiver); + let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + bank3.freeze(); + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::Frozen(bank3), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, + ); + + let (response, _) = robust_poll_or_panic(transport_receiver); + let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + subscriptions.remove_program_subscription(&sub_id); + } + + #[test] + #[serial] + #[should_panic] + fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks_no_notifications( + ) { + // Testing if we can get the pubsub notification if a slot does not + // receive OptimisticallyConfirmed but its descendant slot get the confirmed + // notification with a bank in the BankForks. We are not expecting to receive any notifications -- should panic. + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(100); + let bank = Bank::new_for_tests(&genesis_config); + bank.lazy_rent_collection.store(true, Relaxed); + + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone(); + + // add account for alice and process the transaction at bank1 + let alice = Keypair::new(); + let tx = system_transaction::create_account( + &mint_keypair, + &alice, + blockhash, + 1, + 16, + &stake::program::id(), + ); + + bank1.process_transaction(&tx).unwrap(); + + let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); + bank_forks.write().unwrap().insert(bank2); + + // add account for bob and process the transaction at bank2 + let bob = Keypair::new(); + let tx = system_transaction::create_account( + &mint_keypair, + &bob, + blockhash, + 2, + 16, + &stake::program::id(), + ); + let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); + + bank2.process_transaction(&tx).unwrap(); + + // now add programSubscribe at the "confirmed" commitment level + let (subscriber, _id_receiver, transport_receiver) = + Subscriber::new_test("programNotification"); + let sub_id = SubscriptionId::Number(0); + let exit = Arc::new(AtomicBool::new(false)); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let mut pending_optimistically_confirmed_banks = HashSet::new(); + + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( + 1, 1, + ))), + optimistically_confirmed_bank.clone(), + )); + subscriptions.add_program_subscription( + stake::program::id(), + Some(RpcProgramAccountsConfig { + account_config: RpcAccountInfoConfig { + commitment: Some(CommitmentConfig::confirmed()), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }), + sub_id, + subscriber, + ); + + assert!(subscriptions + .subscriptions + .gossip_program_subscriptions + .read() + .unwrap() + .contains_key(&stake::program::id())); + + let mut highest_confirmed_slot: Slot = 0; + let mut last_notified_confirmed_slot: Slot = 0; + // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we do not + // expect to see any RPC notifications. + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(3), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, + ); + + // The following should panic + let (_response, _transport_receiver) = robust_poll_or_panic(transport_receiver); + } + + #[test] + #[serial] + fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks() { + // Testing if we can get the pubsub notification if a slot does not + // receive OptimisticallyConfirmed but its descendant slot get the confirmed + // notification. It differs from the test_check_program_subscribe_for_missing_optimistically_confirmed_slot + // test in that when the descendant get confirmed, the descendant does not have a bank yet. + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(100); + let bank = Bank::new_for_tests(&genesis_config); + bank.lazy_rent_collection.store(true, Relaxed); + + let blockhash = bank.last_blockhash(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + + let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone(); + let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1); + bank_forks.write().unwrap().insert(bank1); + let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone(); + + // add account for alice and process the transaction at bank1 + let alice = Keypair::new(); + let tx = system_transaction::create_account( + &mint_keypair, + &alice, + blockhash, + 1, + 16, + &stake::program::id(), + ); + + bank1.process_transaction(&tx).unwrap(); + + let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2); + bank_forks.write().unwrap().insert(bank2); + + // add account for bob and process the transaction at bank2 + let bob = Keypair::new(); + let tx = system_transaction::create_account( + &mint_keypair, + &bob, + blockhash, + 2, + 16, + &stake::program::id(), + ); + let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); + + bank2.process_transaction(&tx).unwrap(); + + // now add programSubscribe at the "confirmed" commitment level + let (subscriber, _id_receiver, transport_receiver) = + Subscriber::new_test("programNotification"); + let sub_id = SubscriptionId::Number(0); + let exit = Arc::new(AtomicBool::new(false)); + let optimistically_confirmed_bank = + OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let mut pending_optimistically_confirmed_banks = HashSet::new(); + + let subscriptions = Arc::new(RpcSubscriptions::new( + &exit, + bank_forks.clone(), + Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots( + 1, 1, + ))), + optimistically_confirmed_bank.clone(), + )); + subscriptions.add_program_subscription( + stake::program::id(), + Some(RpcProgramAccountsConfig { + account_config: RpcAccountInfoConfig { + commitment: Some(CommitmentConfig::confirmed()), + ..RpcAccountInfoConfig::default() + }, + ..RpcProgramAccountsConfig::default() + }), + sub_id.clone(), + subscriber, + ); + + assert!(subscriptions + .subscriptions + .gossip_program_subscriptions + .read() + .unwrap() + .contains_key(&stake::program::id())); + + let mut highest_confirmed_slot: Slot = 0; + let mut last_notified_confirmed_slot: Slot = 0; + // Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we expect + // to see transaction for alice and bob to be notified only when bank3 is added to the fork and + // frozen. The notifications should be in the increasing order of the slot. + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::OptimisticallyConfirmed(3), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, + ); + + // a closure to reduce code duplications in building expected responses: + let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| { + json!({ + "jsonrpc": "2.0", + "method": "programNotification", + "params": { + "result": { + "context": { "slot": slot }, + "value": { + "account": { + "data": "1111111111111111", + "executable": false, + "lamports": lamports, + "owner": "Stake11111111111111111111111111111111111111", + "rentEpoch": 0, + }, + "pubkey": pubkey, + }, + }, + "subscription": subscription, + } + }) + }; + + let bank3 = Bank::new_from_parent(&bank2, &Pubkey::default(), 3); + bank_forks.write().unwrap().insert(bank3); + + // add account for joe and process the transaction at bank3 + let joe = Keypair::new(); + let tx = system_transaction::create_account( + &mint_keypair, + &joe, + blockhash, + 3, + 16, + &stake::program::id(), + ); + let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone(); + + bank3.process_transaction(&tx).unwrap(); + bank3.freeze(); + OptimisticallyConfirmedBankTracker::process_notification( + BankNotification::Frozen(bank3), + &bank_forks, + &optimistically_confirmed_bank, + &subscriptions, + &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, + ); + + let (response, transport_receiver) = robust_poll_or_panic(transport_receiver); + let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + let (response, transport_receiver) = robust_poll_or_panic(transport_receiver); + let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + + let (response, _) = robust_poll_or_panic(transport_receiver); + let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0); + assert_eq!(serde_json::to_string(&expected).unwrap(), response); + subscriptions.remove_program_subscription(&sub_id); + } + #[test] #[serial] fn test_check_signature_subscribe() { @@ -2062,21 +2513,28 @@ pub(crate) mod tests { .unwrap(); // First, notify the unfrozen bank first to queue pending notification + let mut highest_confirmed_slot: Slot = 0; + let mut last_notified_confirmed_slot: Slot = 0; OptimisticallyConfirmedBankTracker::process_notification( BankNotification::OptimisticallyConfirmed(2), &bank_forks, &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); // Now, notify the frozen bank and ensure its notifications are processed + highest_confirmed_slot = 0; OptimisticallyConfirmedBankTracker::process_notification( BankNotification::OptimisticallyConfirmed(1), &bank_forks, &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); let (response, _) = robust_poll_or_panic(transport_receiver0); @@ -2113,12 +2571,16 @@ pub(crate) mod tests { ); let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone(); + bank2.freeze(); + highest_confirmed_slot = 0; OptimisticallyConfirmedBankTracker::process_notification( BankNotification::Frozen(bank2), &bank_forks, &optimistically_confirmed_bank, &subscriptions, &mut pending_optimistically_confirmed_banks, + &mut last_notified_confirmed_slot, + &mut highest_confirmed_slot, ); let (response, _) = robust_poll_or_panic(transport_receiver1); let expected = json!({