Fix RPC tests race condition (#29589)

This commit is contained in:
Brennan Watt 2023-01-09 18:51:58 -08:00 committed by GitHub
parent 0a6ff82911
commit 226e1921bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 21 deletions

View File

@ -1,5 +1,6 @@
//! The `pubsub` module implements a threaded subscription service on client RPC request //! The `pubsub` module implements a threaded subscription service on client RPC request
#[cfg(test)]
use crate::{rpc_pubsub_service, rpc_subscriptions::RpcSubscriptions};
use { use {
crate::{ crate::{
rpc::check_is_at_least_confirmed, rpc::check_is_at_least_confirmed,
@ -400,6 +401,14 @@ impl RpcSolPubSubImpl {
}) })
} }
} }
#[cfg(test)]
pub fn block_until_processed(&self, rpc_subscriptions: &Arc<RpcSubscriptions>) {
let (rpc, mut receiver) = rpc_pubsub_service::test_connection(rpc_subscriptions);
rpc.slot_subscribe().unwrap();
rpc_subscriptions.notify_slot(1, 0, 0);
receiver.recv();
}
} }
fn param<T: FromStr>(param_str: &str, thing: &str) -> Result<T> { fn param<T: FromStr>(param_str: &str, thing: &str) -> Result<T> {
@ -886,12 +895,7 @@ mod tests {
}), }),
) )
.unwrap(); .unwrap();
rpc.block_until_processed(&rpc_subscriptions);
// Make sure the subscription is processed before continuing.
let (rpc2, mut receiver2) = rpc_pubsub_service::test_connection(&rpc_subscriptions);
rpc2.slot_subscribe().unwrap();
rpc_subscriptions.notify_slot(1, 0, 0);
receiver2.recv();
let balance = { let balance = {
let bank = bank_forks.read().unwrap().working_bank(); let bank = bank_forks.read().unwrap().working_bank();
@ -1015,12 +1019,7 @@ mod tests {
}), }),
) )
.unwrap(); .unwrap();
rpc.block_until_processed(&rpc_subscriptions);
// Make sure the subscription is processed before continuing.
let (rpc2, mut receiver2) = rpc_pubsub_service::test_connection(&rpc_subscriptions);
rpc2.slot_subscribe().unwrap();
rpc_subscriptions.notify_slot(1, 0, 0);
receiver2.recv();
let ixs = system_instruction::create_nonce_account( let ixs = system_instruction::create_nonce_account(
&alice.pubkey(), &alice.pubkey(),

View File

@ -1352,7 +1352,7 @@ pub(crate) mod tests {
false, false,
AccountResult { AccountResult {
lamports: 0, lamports: 0,
subscription: 1, subscription: 2,
space: 0, space: 0,
data: "", data: "",
}, },
@ -1370,7 +1370,7 @@ pub(crate) mod tests {
true, true,
AccountResult { AccountResult {
lamports: 1, lamports: 1,
subscription: 2, subscription: 4,
space: 1024, space: 1024,
data: "error: data too large for bs58 encoding", data: "error: data too large for bs58 encoding",
}, },
@ -1406,11 +1406,7 @@ pub(crate) mod tests {
encoding: UiAccountEncoding::Binary, encoding: UiAccountEncoding::Binary,
})); }));
// Sleep here to ensure adequate time for the async thread to fully process the rpc.block_until_processed(&subscriptions);
// subscribed notification before the bank transaction is processed. Without this
// sleep, the bank transaction ocassionally completes first and we hang forever
// waiting to receive a bank notification.
std::thread::sleep(Duration::from_millis(100));
bank_forks bank_forks
.read() .read()
@ -2746,6 +2742,7 @@ pub(crate) mod tests {
.unwrap(); .unwrap();
assert!(subscriptions.control.account_subscribed(&alice.pubkey())); assert!(subscriptions.control.account_subscribed(&alice.pubkey()));
rpc0.block_until_processed(&subscriptions);
let tx = system_transaction::create_account( let tx = system_transaction::create_account(
&mint_keypair, &mint_keypair,
@ -2821,6 +2818,7 @@ pub(crate) mod tests {
serde_json::from_str::<serde_json::Value>(&response).unwrap(), serde_json::from_str::<serde_json::Value>(&response).unwrap(),
); );
rpc0.account_unsubscribe(sub_id0).unwrap(); rpc0.account_unsubscribe(sub_id0).unwrap();
rpc0.block_until_processed(&subscriptions);
let sub_id1 = rpc1 let sub_id1 = rpc1
.account_subscribe( .account_subscribe(
@ -2833,6 +2831,7 @@ pub(crate) mod tests {
}), }),
) )
.unwrap(); .unwrap();
rpc1.block_until_processed(&subscriptions);
let bank2 = bank_forks.read().unwrap().get(2).unwrap(); let bank2 = bank_forks.read().unwrap().get(2).unwrap();
bank2.freeze(); bank2.freeze();
@ -2863,7 +2862,7 @@ pub(crate) mod tests {
"space": 16, "space": 16,
}, },
}, },
"subscription": 1, "subscription": 3,
} }
}); });
assert_eq!( assert_eq!(
@ -2942,6 +2941,7 @@ pub(crate) mod tests {
) )
.unwrap(); .unwrap();
assert!(subscriptions.control.logs_subscribed(Some(&alice.pubkey()))); assert!(subscriptions.control.logs_subscribed(Some(&alice.pubkey())));
rpc_alice.block_until_processed(&subscriptions);
let tx = system_transaction::create_account( let tx = system_transaction::create_account(
&mint_keypair, &mint_keypair,