Fixing missing pubsub notification for programSubscribe and logsSubscribe (#19092)

#18587: programSubscribe is missing notifications randomly. The issue is because of two reasons

Not all rooted slots get OptimisticallyConfirmed notifications
The OptimisticallyConfirmed notifications can be out of order for slots: slot A and B with A < B can see notification for B first before A.
Summary of Changes

Changed OptimisticallyConfirmedBankTracker to send notifications for parent banks if they have not been notified yet. We use a new variable last_notified_slot to track that.

Tests:
With my validator running against testnet, before the fix, it was failing 75% of time, with the fix, it is passing consistently. Using the program mentioned in #18587.
This commit is contained in:
Lijun Wang 2021-08-10 16:44:45 -07:00 committed by GitHub
parent e7190cc727
commit 1a372a792e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 588 additions and 11 deletions

View File

@ -66,6 +66,8 @@ impl OptimisticallyConfirmedBankTracker {
) -> Self {
let exit_ = exit.clone();
let mut pending_optimistically_confirmed_banks = HashSet::new();
let mut last_notified_confirmed_slot: Slot = 0;
let mut highest_confirmed_slot: Slot = 0;
let thread_hdl = Builder::new()
.name("solana-optimistic-bank-tracker".to_string())
.spawn(move || loop {
@ -79,6 +81,8 @@ impl OptimisticallyConfirmedBankTracker {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
) {
break;
}
@ -93,6 +97,8 @@ impl OptimisticallyConfirmedBankTracker {
optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: &Arc<RpcSubscriptions>,
mut pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
mut last_notified_confirmed_slot: &mut Slot,
mut highest_confirmed_slot: &mut Slot,
) -> Result<(), RecvTimeoutError> {
let notification = receiver.recv_timeout(Duration::from_secs(1))?;
Self::process_notification(
@ -101,31 +107,91 @@ impl OptimisticallyConfirmedBankTracker {
optimistically_confirmed_bank,
subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
Ok(())
}
fn notify_or_defer(
subscriptions: &Arc<RpcSubscriptions>,
bank_forks: &Arc<RwLock<BankForks>>,
bank: &Arc<Bank>,
last_notified_confirmed_slot: &mut Slot,
pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
) {
if bank.is_frozen() {
if bank.slot() > *last_notified_confirmed_slot {
debug!(
"notify_or_defer notifying via notify_gossip_subscribers for slot {:?}",
bank.slot()
);
subscriptions.notify_gossip_subscribers(bank.slot());
*last_notified_confirmed_slot = bank.slot();
}
} else if bank.slot() > bank_forks.read().unwrap().root_bank().slot() {
pending_optimistically_confirmed_banks.insert(bank.slot());
debug!("notify_or_defer defer notifying for slot {:?}", bank.slot());
}
}
fn notify_or_defer_confirmed_banks(
subscriptions: &Arc<RpcSubscriptions>,
bank_forks: &Arc<RwLock<BankForks>>,
bank: &Arc<Bank>,
slot_threshold: Slot,
mut last_notified_confirmed_slot: &mut Slot,
mut pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
) {
for confirmed_bank in bank.clone().parents_inclusive().iter().rev() {
if confirmed_bank.slot() > slot_threshold {
debug!(
"Calling notify_or_defer for confirmed_bank {:?}",
confirmed_bank.slot()
);
Self::notify_or_defer(
subscriptions,
bank_forks,
confirmed_bank,
&mut last_notified_confirmed_slot,
&mut pending_optimistically_confirmed_banks,
);
}
}
}
pub fn process_notification(
notification: BankNotification,
bank_forks: &Arc<RwLock<BankForks>>,
optimistically_confirmed_bank: &Arc<RwLock<OptimisticallyConfirmedBank>>,
subscriptions: &Arc<RpcSubscriptions>,
pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
mut pending_optimistically_confirmed_banks: &mut HashSet<Slot>,
mut last_notified_confirmed_slot: &mut Slot,
highest_confirmed_slot: &mut Slot,
) {
debug!("received bank notification: {:?}", notification);
match notification {
BankNotification::OptimisticallyConfirmed(slot) => {
if let Some(bank) = bank_forks
.read()
.unwrap()
.get(slot)
.filter(|b| b.is_frozen())
{
if let Some(bank) = bank_forks.read().unwrap().get(slot) {
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if bank.slot() > w_optimistically_confirmed_bank.bank.slot() {
if bank.slot() > w_optimistically_confirmed_bank.bank.slot() && bank.is_frozen()
{
w_optimistically_confirmed_bank.bank = bank.clone();
subscriptions.notify_gossip_subscribers(slot);
}
if slot > *highest_confirmed_slot {
Self::notify_or_defer_confirmed_banks(
subscriptions,
bank_forks,
bank,
*highest_confirmed_slot,
&mut last_notified_confirmed_slot,
&mut pending_optimistically_confirmed_banks,
);
*highest_confirmed_slot = slot;
}
drop(w_optimistically_confirmed_bank);
} else if slot > bank_forks.read().unwrap().root_bank().slot() {
@ -159,11 +225,24 @@ impl OptimisticallyConfirmedBankTracker {
}
if pending_optimistically_confirmed_banks.remove(&bank.slot()) {
debug!(
"Calling notify_gossip_subscribers to send deferred notification {:?}",
frozen_slot
);
Self::notify_or_defer_confirmed_banks(
subscriptions,
bank_forks,
&bank,
*last_notified_confirmed_slot,
&mut last_notified_confirmed_slot,
&mut pending_optimistically_confirmed_banks,
);
let mut w_optimistically_confirmed_bank =
optimistically_confirmed_bank.write().unwrap();
if frozen_slot > w_optimistically_confirmed_bank.bank.slot() {
w_optimistically_confirmed_bank.bank = bank;
subscriptions.notify_gossip_subscribers(frozen_slot);
}
drop(w_optimistically_confirmed_bank);
}
@ -231,14 +310,19 @@ mod tests {
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 0);
let mut highest_confirmed_slot: Slot = 0;
let mut last_notified_confirmed_slot: Slot = 0;
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
assert_eq!(highest_confirmed_slot, 2);
// Test max optimistically confirmed bank remains in the cache
OptimisticallyConfirmedBankTracker::process_notification(
@ -247,8 +331,11 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
assert_eq!(highest_confirmed_slot, 2);
// Test bank will only be cached when frozen
OptimisticallyConfirmedBankTracker::process_notification(
@ -257,21 +344,30 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 2);
assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
assert!(pending_optimistically_confirmed_banks.contains(&3));
assert_eq!(highest_confirmed_slot, 3);
// Test bank will only be cached when frozen
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
bank3.freeze();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
assert_eq!(highest_confirmed_slot, 3);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
// Test higher root will be cached and clear pending_optimistically_confirmed_banks
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
@ -283,10 +379,13 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 3);
assert_eq!(pending_optimistically_confirmed_banks.len(), 1);
assert!(pending_optimistically_confirmed_banks.contains(&4));
assert_eq!(highest_confirmed_slot, 4);
let bank4 = bank_forks.read().unwrap().get(4).unwrap().clone();
let bank5 = Bank::new_from_parent(&bank4, &Pubkey::default(), 5);
@ -298,10 +397,13 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
assert!(!pending_optimistically_confirmed_banks.contains(&4));
assert_eq!(highest_confirmed_slot, 4);
// Banks <= root do not get added to pending list, even if not frozen
let bank5 = bank_forks.read().unwrap().get(5).unwrap().clone();
@ -320,9 +422,12 @@ mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
assert_eq!(optimistically_confirmed_bank.read().unwrap().bank.slot(), 5);
assert_eq!(pending_optimistically_confirmed_banks.len(), 0);
assert!(!pending_optimistically_confirmed_banks.contains(&6));
assert_eq!(highest_confirmed_slot, 4);
}
}

View File

@ -7529,6 +7529,8 @@ pub mod tests {
let json: Value = serde_json::from_str(&res.unwrap()).unwrap();
let slot: Slot = serde_json::from_value(json["result"].clone()).unwrap();
assert_eq!(slot, 0);
let mut highest_confirmed_slot: Slot = 0;
let mut last_notified_confirmed_slot: Slot = 0;
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
@ -7536,6 +7538,8 @@ pub mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#;
@ -7551,6 +7555,8 @@ pub mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#;
@ -7566,6 +7572,8 @@ pub mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#;
@ -7582,6 +7590,8 @@ pub mod tests {
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let req =
r#"{"jsonrpc":"2.0","id":1,"method":"getSlot","params":[{"commitment": "confirmed"}]}"#;

View File

@ -1346,7 +1346,10 @@ pub(crate) mod tests {
stake, system_instruction, system_program, system_transaction,
transaction::Transaction,
},
std::{fmt::Debug, sync::mpsc::channel},
std::{
fmt::Debug,
sync::{atomic::Ordering::Relaxed, mpsc::channel},
},
tokio::{
runtime::Runtime,
time::{sleep, timeout},
@ -1619,6 +1622,454 @@ pub(crate) mod tests {
.contains_key(&stake::program::id()));
}
#[test]
#[serial]
fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot() {
// Testing if we can get the pubsub notification if a slot does not
// receive OptimisticallyConfirmed but its descendant slot get the confirmed
// notification.
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(100);
let bank = Bank::new_for_tests(&genesis_config);
bank.lazy_rent_collection.store(true, Relaxed);
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
// add account for alice and process the transaction at bank1
let alice = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&alice,
blockhash,
1,
16,
&stake::program::id(),
);
bank1.process_transaction(&tx).unwrap();
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2);
// add account for bob and process the transaction at bank2
let bob = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&bob,
blockhash,
2,
16,
&stake::program::id(),
);
let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
bank2.process_transaction(&tx).unwrap();
let bank3 = Bank::new_from_parent(&bank2, &Pubkey::default(), 3);
bank_forks.write().unwrap().insert(bank3);
// add account for joe and process the transaction at bank3
let joe = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&joe,
blockhash,
3,
16,
&stake::program::id(),
);
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
bank3.process_transaction(&tx).unwrap();
// now add programSubscribe at the "confirmed" commitment level
let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("programNotification");
let sub_id = SubscriptionId::Number(0);
let exit = Arc::new(AtomicBool::new(false));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
))),
optimistically_confirmed_bank.clone(),
));
subscriptions.add_program_subscription(
stake::program::id(),
Some(RpcProgramAccountsConfig {
account_config: RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::confirmed()),
..RpcAccountInfoConfig::default()
},
..RpcProgramAccountsConfig::default()
}),
sub_id.clone(),
subscriber,
);
assert!(subscriptions
.subscriptions
.gossip_program_subscriptions
.read()
.unwrap()
.contains_key(&stake::program::id()));
let mut highest_confirmed_slot: Slot = 0;
let mut last_notified_confirmed_slot: Slot = 0;
// Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is unfrozen, we expect
// to see transaction for alice and bob to be notified in order.
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
// a closure to reduce code duplications in building expected responses:
let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
json!({
"jsonrpc": "2.0",
"method": "programNotification",
"params": {
"result": {
"context": { "slot": slot },
"value": {
"account": {
"data": "1111111111111111",
"executable": false,
"lamports": lamports,
"owner": "Stake11111111111111111111111111111111111111",
"rentEpoch": 0,
},
"pubkey": pubkey,
},
},
"subscription": subscription,
}
})
};
let (response, transport_receiver) = robust_poll_or_panic(transport_receiver);
let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
let (response, transport_receiver) = robust_poll_or_panic(transport_receiver);
let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
bank3.freeze();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let (response, _) = robust_poll_or_panic(transport_receiver);
let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.remove_program_subscription(&sub_id);
}
#[test]
#[serial]
#[should_panic]
fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks_no_notifications(
) {
// Testing if we can get the pubsub notification if a slot does not
// receive OptimisticallyConfirmed but its descendant slot get the confirmed
// notification with a bank in the BankForks. We are not expecting to receive any notifications -- should panic.
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(100);
let bank = Bank::new_for_tests(&genesis_config);
bank.lazy_rent_collection.store(true, Relaxed);
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
// add account for alice and process the transaction at bank1
let alice = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&alice,
blockhash,
1,
16,
&stake::program::id(),
);
bank1.process_transaction(&tx).unwrap();
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2);
// add account for bob and process the transaction at bank2
let bob = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&bob,
blockhash,
2,
16,
&stake::program::id(),
);
let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
bank2.process_transaction(&tx).unwrap();
// now add programSubscribe at the "confirmed" commitment level
let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("programNotification");
let sub_id = SubscriptionId::Number(0);
let exit = Arc::new(AtomicBool::new(false));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
))),
optimistically_confirmed_bank.clone(),
));
subscriptions.add_program_subscription(
stake::program::id(),
Some(RpcProgramAccountsConfig {
account_config: RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::confirmed()),
..RpcAccountInfoConfig::default()
},
..RpcProgramAccountsConfig::default()
}),
sub_id,
subscriber,
);
assert!(subscriptions
.subscriptions
.gossip_program_subscriptions
.read()
.unwrap()
.contains_key(&stake::program::id()));
let mut highest_confirmed_slot: Slot = 0;
let mut last_notified_confirmed_slot: Slot = 0;
// Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we do not
// expect to see any RPC notifications.
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
// The following should panic
let (_response, _transport_receiver) = robust_poll_or_panic(transport_receiver);
}
#[test]
#[serial]
fn test_check_program_subscribe_for_missing_optimistically_confirmed_slot_with_no_banks() {
// Testing if we can get the pubsub notification if a slot does not
// receive OptimisticallyConfirmed but its descendant slot get the confirmed
// notification. It differs from the test_check_program_subscribe_for_missing_optimistically_confirmed_slot
// test in that when the descendant get confirmed, the descendant does not have a bank yet.
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(100);
let bank = Bank::new_for_tests(&genesis_config);
bank.lazy_rent_collection.store(true, Relaxed);
let blockhash = bank.last_blockhash();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let bank0 = bank_forks.read().unwrap().get(0).unwrap().clone();
let bank1 = Bank::new_from_parent(&bank0, &Pubkey::default(), 1);
bank_forks.write().unwrap().insert(bank1);
let bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
// add account for alice and process the transaction at bank1
let alice = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&alice,
blockhash,
1,
16,
&stake::program::id(),
);
bank1.process_transaction(&tx).unwrap();
let bank2 = Bank::new_from_parent(&bank1, &Pubkey::default(), 2);
bank_forks.write().unwrap().insert(bank2);
// add account for bob and process the transaction at bank2
let bob = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&bob,
blockhash,
2,
16,
&stake::program::id(),
);
let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
bank2.process_transaction(&tx).unwrap();
// now add programSubscribe at the "confirmed" commitment level
let (subscriber, _id_receiver, transport_receiver) =
Subscriber::new_test("programNotification");
let sub_id = SubscriptionId::Number(0);
let exit = Arc::new(AtomicBool::new(false));
let optimistically_confirmed_bank =
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks);
let mut pending_optimistically_confirmed_banks = HashSet::new();
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
bank_forks.clone(),
Arc::new(RwLock::new(BlockCommitmentCache::new_for_tests_with_slots(
1, 1,
))),
optimistically_confirmed_bank.clone(),
));
subscriptions.add_program_subscription(
stake::program::id(),
Some(RpcProgramAccountsConfig {
account_config: RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::confirmed()),
..RpcAccountInfoConfig::default()
},
..RpcProgramAccountsConfig::default()
}),
sub_id.clone(),
subscriber,
);
assert!(subscriptions
.subscriptions
.gossip_program_subscriptions
.read()
.unwrap()
.contains_key(&stake::program::id()));
let mut highest_confirmed_slot: Slot = 0;
let mut last_notified_confirmed_slot: Slot = 0;
// Optimistically notifying slot 3 without notifying slot 1 and 2, bank3 is not in the bankforks, we expect
// to see transaction for alice and bob to be notified only when bank3 is added to the fork and
// frozen. The notifications should be in the increasing order of the slot.
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
// a closure to reduce code duplications in building expected responses:
let build_expected_resp = |slot: Slot, lamports: u64, pubkey: &str, subscription: i32| {
json!({
"jsonrpc": "2.0",
"method": "programNotification",
"params": {
"result": {
"context": { "slot": slot },
"value": {
"account": {
"data": "1111111111111111",
"executable": false,
"lamports": lamports,
"owner": "Stake11111111111111111111111111111111111111",
"rentEpoch": 0,
},
"pubkey": pubkey,
},
},
"subscription": subscription,
}
})
};
let bank3 = Bank::new_from_parent(&bank2, &Pubkey::default(), 3);
bank_forks.write().unwrap().insert(bank3);
// add account for joe and process the transaction at bank3
let joe = Keypair::new();
let tx = system_transaction::create_account(
&mint_keypair,
&joe,
blockhash,
3,
16,
&stake::program::id(),
);
let bank3 = bank_forks.read().unwrap().get(3).unwrap().clone();
bank3.process_transaction(&tx).unwrap();
bank3.freeze();
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank3),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let (response, transport_receiver) = robust_poll_or_panic(transport_receiver);
let expected = build_expected_resp(1, 1, &alice.pubkey().to_string(), 0);
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
let (response, transport_receiver) = robust_poll_or_panic(transport_receiver);
let expected = build_expected_resp(2, 2, &bob.pubkey().to_string(), 0);
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
let (response, _) = robust_poll_or_panic(transport_receiver);
let expected = build_expected_resp(3, 3, &joe.pubkey().to_string(), 0);
assert_eq!(serde_json::to_string(&expected).unwrap(), response);
subscriptions.remove_program_subscription(&sub_id);
}
#[test]
#[serial]
fn test_check_signature_subscribe() {
@ -2062,21 +2513,28 @@ pub(crate) mod tests {
.unwrap();
// First, notify the unfrozen bank first to queue pending notification
let mut highest_confirmed_slot: Slot = 0;
let mut last_notified_confirmed_slot: Slot = 0;
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
// Now, notify the frozen bank and ensure its notifications are processed
highest_confirmed_slot = 0;
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::OptimisticallyConfirmed(1),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let (response, _) = robust_poll_or_panic(transport_receiver0);
@ -2113,12 +2571,16 @@ pub(crate) mod tests {
);
let bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
bank2.freeze();
highest_confirmed_slot = 0;
OptimisticallyConfirmedBankTracker::process_notification(
BankNotification::Frozen(bank2),
&bank_forks,
&optimistically_confirmed_bank,
&subscriptions,
&mut pending_optimistically_confirmed_banks,
&mut last_notified_confirmed_slot,
&mut highest_confirmed_slot,
);
let (response, _) = robust_poll_or_panic(transport_receiver1);
let expected = json!({