From 90f74302cf16a890cf9fbf5473be55710494c17d Mon Sep 17 00:00:00 2001 From: Yihau Chen Date: Thu, 15 Dec 2022 16:27:31 +0800 Subject: [PATCH] chore: improve solana-client-test (#29255) * separate test port * make server check more deterministic * remove serial_test * use atomic auto incremental port * make NEXT_RPC_PUBSUB_PORT as an global static variable * make check_server_is_ready become check_server_is_ready_or_panic * use processed commitment in test_rpc_client * lint --- Cargo.lock | 2 +- client-test/Cargo.toml | 2 +- client-test/tests/client.rs | 99 ++++++++++++++++++++++--------------- 3 files changed, 60 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da776c285d..afaebf6099 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5121,7 +5121,6 @@ version = "1.15.0" dependencies = [ "futures-util", "serde_json", - "serial_test", "solana-ledger", "solana-logger 1.15.0", "solana-measure", @@ -5141,6 +5140,7 @@ dependencies = [ "solana-version", "systemstat", "tokio", + "tungstenite", ] [[package]] diff --git a/client-test/Cargo.toml b/client-test/Cargo.toml index 59863b33bc..574ee68dff 100644 --- a/client-test/Cargo.toml +++ b/client-test/Cargo.toml @@ -13,7 +13,6 @@ publish = false [dependencies] futures-util = "0.3.21" serde_json = "1.0.83" -serial_test = "0.9.0" solana-ledger = { path = "../ledger", version = "=1.15.0" } solana-measure = { path = "../measure", version = "=1.15.0" } solana-merkle-tree = { path = "../merkle-tree", version = "=1.15.0" } @@ -32,6 +31,7 @@ solana-transaction-status = { path = "../transaction-status", version = "=1.15.0 solana-version = { path = "../version", version = "=1.15.0" } systemstat = "0.2.0" tokio = { version = "~1.14.1", features = ["full"] } +tungstenite = { version = "0.17.2", features = ["rustls-tls-webpki-roots"] } [dev-dependencies] solana-logger = { path = "../logger", version = "=1.15.0" } diff --git a/client-test/tests/client.rs b/client-test/tests/client.rs index 7af15cb3f1..7e168f6f3e 100644 --- a/client-test/tests/client.rs +++ b/client-test/tests/client.rs @@ -1,7 +1,6 @@ use { futures_util::StreamExt, serde_json::{json, Value}, - serial_test::serial, solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}, solana_pubsub_client::{nonblocking, pubsub_client::PubsubClient}, solana_rpc::{ @@ -42,15 +41,25 @@ use { collections::HashSet, net::{IpAddr, SocketAddr}, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering}, Arc, RwLock, }, thread::sleep, time::{Duration, Instant}, }, systemstat::Ipv4Addr, + tungstenite::connect, }; +static NEXT_RPC_PUBSUB_PORT: AtomicU16 = AtomicU16::new(rpc_port::DEFAULT_RPC_PUBSUB_PORT); + +fn pubsub_addr() -> SocketAddr { + SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + NEXT_RPC_PUBSUB_PORT.fetch_add(1, Ordering::Relaxed), + ) +} + #[test] fn test_rpc_client() { solana_logger::setup(); @@ -84,7 +93,7 @@ fn test_rpc_client() { let now = Instant::now(); while now.elapsed().as_secs() <= 20 { let response = client - .confirm_transaction_with_commitment(&signature, CommitmentConfig::default()) + .confirm_transaction_with_commitment(&signature, CommitmentConfig::processed()) .unwrap(); if response.value { @@ -98,22 +107,24 @@ fn test_rpc_client() { assert!(confirmed_tx); assert_eq!( - client.get_balance(&bob_pubkey).unwrap(), + client + .get_balance_with_commitment(&bob_pubkey, CommitmentConfig::processed()) + .unwrap() + .value, sol_to_lamports(20.0) ); assert_eq!( - client.get_balance(&alice.pubkey()).unwrap(), + client + .get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::processed()) + .unwrap() + .value, original_alice_balance - sol_to_lamports(20.0) ); } #[test] -#[serial] fn test_account_subscription() { - let pubsub_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - rpc_port::DEFAULT_RPC_PUBSUB_PORT, - ); + let pubsub_addr = pubsub_addr(); let exit = Arc::new(AtomicBool::new(false)); let GenesisConfigInfo { @@ -138,7 +149,9 @@ fn test_account_subscription() { )); let (trigger, pubsub_service) = PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); - std::thread::sleep(Duration::from_millis(400)); + + check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300)); + let config = Some(RpcAccountInfoConfig { commitment: Some(CommitmentConfig::finalized()), encoding: None, @@ -207,7 +220,6 @@ fn test_account_subscription() { } #[test] -#[serial] fn test_block_subscription() { // setup BankForks let exit = Arc::new(AtomicBool::new(false)); @@ -253,17 +265,14 @@ fn test_block_subscription() { Arc::new(RwLock::new(BlockCommitmentCache::default())), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), )); - let pubsub_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - rpc_port::DEFAULT_RPC_PUBSUB_PORT, - ); + let pubsub_addr = pubsub_addr(); let pub_cfg = PubSubConfig { enable_block_subscription: true, ..PubSubConfig::default() }; let (trigger, pubsub_service) = PubSubService::new(pub_cfg, &subscriptions, pubsub_addr); - std::thread::sleep(Duration::from_millis(400)); + check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300)); // setup PubsubClient let (mut client, receiver) = PubsubClient::block_subscribe( @@ -316,12 +325,8 @@ fn test_block_subscription() { } #[test] -#[serial] fn test_program_subscription() { - let pubsub_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - rpc_port::DEFAULT_RPC_PUBSUB_PORT, - ); + let pubsub_addr = pubsub_addr(); let exit = Arc::new(AtomicBool::new(false)); let GenesisConfigInfo { @@ -346,7 +351,9 @@ fn test_program_subscription() { )); let (trigger, pubsub_service) = PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); - std::thread::sleep(Duration::from_millis(400)); + + check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300)); + let config = Some(RpcProgramAccountsConfig { ..RpcProgramAccountsConfig::default() }); @@ -408,12 +415,8 @@ fn test_program_subscription() { } #[test] -#[serial] fn test_root_subscription() { - let pubsub_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - rpc_port::DEFAULT_RPC_PUBSUB_PORT, - ); + let pubsub_addr = pubsub_addr(); let exit = Arc::new(AtomicBool::new(false)); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); @@ -432,7 +435,9 @@ fn test_root_subscription() { )); let (trigger, pubsub_service) = PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); - std::thread::sleep(Duration::from_millis(400)); + + check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300)); + let (mut client, receiver) = PubsubClient::root_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap(); @@ -461,12 +466,8 @@ fn test_root_subscription() { } #[test] -#[serial] fn test_slot_subscription() { - let pubsub_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - rpc_port::DEFAULT_RPC_PUBSUB_PORT, - ); + let pubsub_addr = pubsub_addr(); let exit = Arc::new(AtomicBool::new(false)); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); @@ -483,7 +484,8 @@ fn test_slot_subscription() { )); let (trigger, pubsub_service) = PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); - std::thread::sleep(Duration::from_millis(400)); + + check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300)); let (mut client, receiver) = PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap(); @@ -523,7 +525,6 @@ fn test_slot_subscription() { } #[tokio::test] -#[serial] async fn test_slot_subscription_async() { let sync_service = Arc::new(AtomicU64::new(0)); let sync_client = Arc::clone(&sync_service); @@ -533,10 +534,7 @@ async fn test_slot_subscription_async() { } } - let pubsub_addr = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - rpc_port::DEFAULT_RPC_PUBSUB_PORT, - ); + let pubsub_addr = pubsub_addr(); tokio::task::spawn_blocking(move || { let exit = Arc::new(AtomicBool::new(false)); @@ -555,7 +553,9 @@ async fn test_slot_subscription_async() { )); let (trigger, pubsub_service) = PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); - sleep(Duration::from_millis(100)); + + check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(100)); + sync_service.store(1, Ordering::Relaxed); wait_until(&sync_service, 2); @@ -604,3 +604,20 @@ async fn test_slot_subscription_async() { unsubscribe().await; } + +fn check_server_is_ready_or_panic(socket_addr: &SocketAddr, retry: u8, sleep_duration: Duration) { + loop { + if retry == 0 { + break; + } else { + retry.checked_sub(1).unwrap(); + } + + if connect(format!("ws://{}", socket_addr)).is_ok() { + return; + } + sleep(sleep_duration); + } + + panic!("server hasn't been ready"); +}