Fix race in RPC subscriptions test (#9142)

This commit is contained in:
Justin Starry 2020-03-29 01:58:51 +08:00 committed by GitHub
parent d7fa40087c
commit 8bbf6e3f54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 35 deletions

View File

@ -159,8 +159,11 @@ where
let root = if root.len() == 1 { root[0] } else { 0 }; let root = if root.len() == 1 { root[0] } else { 0 };
if desired_slot.len() == 1 { if desired_slot.len() == 1 {
let slot = desired_slot[0]; let slot = desired_slot[0];
let desired_bank = bank_forks.read().unwrap().get(slot).unwrap().clone(); let results = {
let results = bank_method(&desired_bank, hashmap_key); let bank_forks = bank_forks.read().unwrap();
let desired_bank = bank_forks.get(slot).unwrap();
bank_method(&desired_bank, hashmap_key)
};
for result in filter_results(results, root) { for result in filter_results(results, root) {
notifier.notify( notifier.notify(
Response { Response {

View File

@ -16,6 +16,7 @@ use solana_sdk::{
commitment_config::CommitmentConfig, commitment_config::CommitmentConfig,
hash::Hash, hash::Hash,
pubkey::Pubkey, pubkey::Pubkey,
signature::Signer,
system_transaction, system_transaction,
transaction::{self, Transaction}, transaction::{self, Transaction},
}; };
@ -24,7 +25,6 @@ use std::{
fs::remove_dir_all, fs::remove_dir_all,
net::UdpSocket, net::UdpSocket,
sync::mpsc::channel, sync::mpsc::channel,
sync::{Arc, Mutex},
thread::sleep, thread::sleep,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -210,9 +210,11 @@ fn test_rpc_subscriptions() {
.. ..
} = TestValidator::run(); } = TestValidator::run();
// Create transaction signatures to subscribe to
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions: Vec<Transaction> = (0..100) transactions_socket.connect(leader_data.tpu).unwrap();
// Create transaction signatures to subscribe to
let transactions: Vec<Transaction> = (0..1000)
.map(|_| system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash)) .map(|_| system_transaction::transfer(&alice, &Pubkey::new_rand(), 1, genesis_hash))
.collect(); .collect();
let mut signature_set: HashSet<String> = transactions let mut signature_set: HashSet<String> = transactions
@ -220,15 +222,15 @@ fn test_rpc_subscriptions() {
.map(|tx| tx.signatures[0].to_string()) .map(|tx| tx.signatures[0].to_string())
.collect(); .collect();
// Track when subscriptions are ready
let (ready_sender, ready_receiver) = channel::<()>();
// Track when status notifications are received
let (status_sender, status_receiver) = channel::<(String, Response<transaction::Result<()>>)>();
// Create the pub sub runtime // Create the pub sub runtime
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub); let rpc_pubsub_url = format!("ws://{}/", leader_data.rpc_pubsub);
let (status_sender, status_receiver) = channel::<(String, Response<transaction::Result<()>>)>();
let status_sender = Arc::new(Mutex::new(status_sender));
let (sent_sender, sent_receiver) = channel::<()>();
let sent_sender = Arc::new(Mutex::new(sent_sender));
// Subscribe to all signatures // Subscribe to all signatures
rt.spawn({ rt.spawn({
let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap(); let connect = ws::try_connect::<PubsubClient>(&rpc_pubsub_url).unwrap();
@ -237,18 +239,12 @@ fn test_rpc_subscriptions() {
.and_then(move |client| { .and_then(move |client| {
for sig in signature_set { for sig in signature_set {
let status_sender = status_sender.clone(); let status_sender = status_sender.clone();
let sent_sender = sent_sender.clone();
tokio::spawn( tokio::spawn(
client client
.signature_subscribe(sig.clone(), None) .signature_subscribe(sig.clone(), None)
.and_then(move |sig_stream| { .and_then(move |sig_stream| {
sent_sender.lock().unwrap().send(()).unwrap();
sig_stream.for_each(move |result| { sig_stream.for_each(move |result| {
status_sender status_sender.send((sig.clone(), result)).unwrap();
.lock()
.unwrap()
.send((sig.clone(), result))
.unwrap();
future::ok(()) future::ok(())
}) })
}) })
@ -257,37 +253,50 @@ fn test_rpc_subscriptions() {
}), }),
); );
} }
tokio::spawn(
client
.slot_subscribe()
.and_then(move |slot_stream| {
slot_stream.for_each(move |_| {
ready_sender.send(()).unwrap();
future::ok(())
})
})
.map_err(|err| {
eprintln!("slot sub err: {:#?}", err);
}),
);
future::ok(()) future::ok(())
}) })
.map_err(|_| ()) .map_err(|_| ())
}); });
// Wait for signature subscriptions // Wait for signature subscriptions
let deadline = Instant::now() + Duration::from_secs(2); ready_receiver.recv_timeout(Duration::from_secs(2)).unwrap();
(0..transactions.len()).for_each(|_| {
sent_receiver
.recv_timeout(deadline.saturating_duration_since(Instant::now()))
.unwrap();
});
let rpc_client = RpcClient::new_socket(leader_data.rpc); let rpc_client = RpcClient::new_socket(leader_data.rpc);
let mut transaction_count = rpc_client let mut mint_balance = rpc_client
.get_transaction_count_with_commitment(CommitmentConfig::recent()) .get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::recent())
.unwrap(); .unwrap()
.value;
assert!(mint_balance >= transactions.len() as u64);
// Send all transactions to tpu socket for processing // Send all transactions to tpu socket for processing
transactions.iter().for_each(|tx| { transactions.iter().for_each(|tx| {
transactions_socket transactions_socket
.send_to(&bincode::serialize(&tx).unwrap(), leader_data.tpu) .send(&bincode::serialize(&tx).unwrap())
.unwrap(); .unwrap();
}); });
// Track mint balance to know when transactions have completed
let now = Instant::now(); let now = Instant::now();
let expected_transaction_count = transaction_count + transactions.len() as u64; let expected_mint_balance = mint_balance - transactions.len() as u64;
while transaction_count < expected_transaction_count && now.elapsed() < Duration::from_secs(5) { while mint_balance != expected_mint_balance && now.elapsed() < Duration::from_secs(5) {
transaction_count = rpc_client mint_balance = rpc_client
.get_transaction_count_with_commitment(CommitmentConfig::recent()) .get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::recent())
.unwrap(); .unwrap()
sleep(Duration::from_millis(200)); .value;
sleep(Duration::from_millis(100));
} }
// Wait for all signature subscriptions // Wait for all signature subscriptions
@ -300,12 +309,12 @@ fn test_rpc_subscriptions() {
assert!(signature_set.remove(&sig)); assert!(signature_set.remove(&sig));
} }
Err(_err) => { Err(_err) => {
eprintln!( assert!(
false,
"recv_timeout, {}/{} signatures remaining", "recv_timeout, {}/{} signatures remaining",
signature_set.len(), signature_set.len(),
transactions.len() transactions.len()
); );
assert!(false)
} }
} }
} }