chore: improve solana-client-test (#29255)

* separate test port

* make server check more deterministic

* remove serial_test

* use atomic auto incremental port

* make NEXT_RPC_PUBSUB_PORT as an global static variable

* make check_server_is_ready become check_server_is_ready_or_panic

* use processed commitment in test_rpc_client

* lint
This commit is contained in:
Yihau Chen 2022-12-15 16:27:31 +08:00 committed by GitHub
parent 50c1de5597
commit 90f74302cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 60 additions and 43 deletions

2
Cargo.lock generated
View File

@ -5121,7 +5121,6 @@ version = "1.15.0"
dependencies = [ dependencies = [
"futures-util", "futures-util",
"serde_json", "serde_json",
"serial_test",
"solana-ledger", "solana-ledger",
"solana-logger 1.15.0", "solana-logger 1.15.0",
"solana-measure", "solana-measure",
@ -5141,6 +5140,7 @@ dependencies = [
"solana-version", "solana-version",
"systemstat", "systemstat",
"tokio", "tokio",
"tungstenite",
] ]
[[package]] [[package]]

View File

@ -13,7 +13,6 @@ publish = false
[dependencies] [dependencies]
futures-util = "0.3.21" futures-util = "0.3.21"
serde_json = "1.0.83" serde_json = "1.0.83"
serial_test = "0.9.0"
solana-ledger = { path = "../ledger", version = "=1.15.0" } solana-ledger = { path = "../ledger", version = "=1.15.0" }
solana-measure = { path = "../measure", version = "=1.15.0" } solana-measure = { path = "../measure", version = "=1.15.0" }
solana-merkle-tree = { path = "../merkle-tree", version = "=1.15.0" } solana-merkle-tree = { path = "../merkle-tree", version = "=1.15.0" }
@ -32,6 +31,7 @@ solana-transaction-status = { path = "../transaction-status", version = "=1.15.0
solana-version = { path = "../version", version = "=1.15.0" } solana-version = { path = "../version", version = "=1.15.0" }
systemstat = "0.2.0" systemstat = "0.2.0"
tokio = { version = "~1.14.1", features = ["full"] } tokio = { version = "~1.14.1", features = ["full"] }
tungstenite = { version = "0.17.2", features = ["rustls-tls-webpki-roots"] }
[dev-dependencies] [dev-dependencies]
solana-logger = { path = "../logger", version = "=1.15.0" } solana-logger = { path = "../logger", version = "=1.15.0" }

View File

@ -1,7 +1,6 @@
use { use {
futures_util::StreamExt, futures_util::StreamExt,
serde_json::{json, Value}, serde_json::{json, Value},
serial_test::serial,
solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path}, solana_ledger::{blockstore::Blockstore, get_tmp_ledger_path},
solana_pubsub_client::{nonblocking, pubsub_client::PubsubClient}, solana_pubsub_client::{nonblocking, pubsub_client::PubsubClient},
solana_rpc::{ solana_rpc::{
@ -42,15 +41,25 @@ use {
collections::HashSet, collections::HashSet,
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
sync::{ sync::{
atomic::{AtomicBool, AtomicU64, Ordering}, atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
Arc, RwLock, Arc, RwLock,
}, },
thread::sleep, thread::sleep,
time::{Duration, Instant}, time::{Duration, Instant},
}, },
systemstat::Ipv4Addr, systemstat::Ipv4Addr,
tungstenite::connect,
}; };
static NEXT_RPC_PUBSUB_PORT: AtomicU16 = AtomicU16::new(rpc_port::DEFAULT_RPC_PUBSUB_PORT);
fn pubsub_addr() -> SocketAddr {
SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
NEXT_RPC_PUBSUB_PORT.fetch_add(1, Ordering::Relaxed),
)
}
#[test] #[test]
fn test_rpc_client() { fn test_rpc_client() {
solana_logger::setup(); solana_logger::setup();
@ -84,7 +93,7 @@ fn test_rpc_client() {
let now = Instant::now(); let now = Instant::now();
while now.elapsed().as_secs() <= 20 { while now.elapsed().as_secs() <= 20 {
let response = client let response = client
.confirm_transaction_with_commitment(&signature, CommitmentConfig::default()) .confirm_transaction_with_commitment(&signature, CommitmentConfig::processed())
.unwrap(); .unwrap();
if response.value { if response.value {
@ -98,22 +107,24 @@ fn test_rpc_client() {
assert!(confirmed_tx); assert!(confirmed_tx);
assert_eq!( assert_eq!(
client.get_balance(&bob_pubkey).unwrap(), client
.get_balance_with_commitment(&bob_pubkey, CommitmentConfig::processed())
.unwrap()
.value,
sol_to_lamports(20.0) sol_to_lamports(20.0)
); );
assert_eq!( assert_eq!(
client.get_balance(&alice.pubkey()).unwrap(), client
.get_balance_with_commitment(&alice.pubkey(), CommitmentConfig::processed())
.unwrap()
.value,
original_alice_balance - sol_to_lamports(20.0) original_alice_balance - sol_to_lamports(20.0)
); );
} }
#[test] #[test]
#[serial]
fn test_account_subscription() { fn test_account_subscription() {
let pubsub_addr = SocketAddr::new( let pubsub_addr = pubsub_addr();
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { let GenesisConfigInfo {
@ -138,7 +149,9 @@ fn test_account_subscription() {
)); ));
let (trigger, pubsub_service) = let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
std::thread::sleep(Duration::from_millis(400));
check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
let config = Some(RpcAccountInfoConfig { let config = Some(RpcAccountInfoConfig {
commitment: Some(CommitmentConfig::finalized()), commitment: Some(CommitmentConfig::finalized()),
encoding: None, encoding: None,
@ -207,7 +220,6 @@ fn test_account_subscription() {
} }
#[test] #[test]
#[serial]
fn test_block_subscription() { fn test_block_subscription() {
// setup BankForks // setup BankForks
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
@ -253,17 +265,14 @@ fn test_block_subscription() {
Arc::new(RwLock::new(BlockCommitmentCache::default())), Arc::new(RwLock::new(BlockCommitmentCache::default())),
OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks), OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks),
)); ));
let pubsub_addr = SocketAddr::new( let pubsub_addr = pubsub_addr();
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let pub_cfg = PubSubConfig { let pub_cfg = PubSubConfig {
enable_block_subscription: true, enable_block_subscription: true,
..PubSubConfig::default() ..PubSubConfig::default()
}; };
let (trigger, pubsub_service) = PubSubService::new(pub_cfg, &subscriptions, pubsub_addr); let (trigger, pubsub_service) = PubSubService::new(pub_cfg, &subscriptions, pubsub_addr);
std::thread::sleep(Duration::from_millis(400)); check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
// setup PubsubClient // setup PubsubClient
let (mut client, receiver) = PubsubClient::block_subscribe( let (mut client, receiver) = PubsubClient::block_subscribe(
@ -316,12 +325,8 @@ fn test_block_subscription() {
} }
#[test] #[test]
#[serial]
fn test_program_subscription() { fn test_program_subscription() {
let pubsub_addr = SocketAddr::new( let pubsub_addr = pubsub_addr();
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { let GenesisConfigInfo {
@ -346,7 +351,9 @@ fn test_program_subscription() {
)); ));
let (trigger, pubsub_service) = let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
std::thread::sleep(Duration::from_millis(400));
check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
let config = Some(RpcProgramAccountsConfig { let config = Some(RpcProgramAccountsConfig {
..RpcProgramAccountsConfig::default() ..RpcProgramAccountsConfig::default()
}); });
@ -408,12 +415,8 @@ fn test_program_subscription() {
} }
#[test] #[test]
#[serial]
fn test_root_subscription() { fn test_root_subscription() {
let pubsub_addr = SocketAddr::new( let pubsub_addr = pubsub_addr();
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
@ -432,7 +435,9 @@ fn test_root_subscription() {
)); ));
let (trigger, pubsub_service) = let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
std::thread::sleep(Duration::from_millis(400));
check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
let (mut client, receiver) = let (mut client, receiver) =
PubsubClient::root_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap(); PubsubClient::root_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
@ -461,12 +466,8 @@ fn test_root_subscription() {
} }
#[test] #[test]
#[serial]
fn test_slot_subscription() { fn test_slot_subscription() {
let pubsub_addr = SocketAddr::new( let pubsub_addr = pubsub_addr();
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config); let bank = Bank::new_for_tests(&genesis_config);
@ -483,7 +484,8 @@ fn test_slot_subscription() {
)); ));
let (trigger, pubsub_service) = let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
std::thread::sleep(Duration::from_millis(400));
check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(300));
let (mut client, receiver) = let (mut client, receiver) =
PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap(); PubsubClient::slot_subscribe(&format!("ws://0.0.0.0:{}/", pubsub_addr.port())).unwrap();
@ -523,7 +525,6 @@ fn test_slot_subscription() {
} }
#[tokio::test] #[tokio::test]
#[serial]
async fn test_slot_subscription_async() { async fn test_slot_subscription_async() {
let sync_service = Arc::new(AtomicU64::new(0)); let sync_service = Arc::new(AtomicU64::new(0));
let sync_client = Arc::clone(&sync_service); let sync_client = Arc::clone(&sync_service);
@ -533,10 +534,7 @@ async fn test_slot_subscription_async() {
} }
} }
let pubsub_addr = SocketAddr::new( let pubsub_addr = pubsub_addr();
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
rpc_port::DEFAULT_RPC_PUBSUB_PORT,
);
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
@ -555,7 +553,9 @@ async fn test_slot_subscription_async() {
)); ));
let (trigger, pubsub_service) = let (trigger, pubsub_service) =
PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr); PubSubService::new(PubSubConfig::default(), &subscriptions, pubsub_addr);
sleep(Duration::from_millis(100));
check_server_is_ready_or_panic(&pubsub_addr, 10, Duration::from_millis(100));
sync_service.store(1, Ordering::Relaxed); sync_service.store(1, Ordering::Relaxed);
wait_until(&sync_service, 2); wait_until(&sync_service, 2);
@ -604,3 +604,20 @@ async fn test_slot_subscription_async() {
unsubscribe().await; unsubscribe().await;
} }
fn check_server_is_ready_or_panic(socket_addr: &SocketAddr, retry: u8, sleep_duration: Duration) {
loop {
if retry == 0 {
break;
} else {
retry.checked_sub(1).unwrap();
}
if connect(format!("ws://{}", socket_addr)).is_ok() {
return;
}
sleep(sleep_duration);
}
panic!("server hasn't been ready");
}