diff --git a/Cargo.lock b/Cargo.lock index 6320cd1c8..1c728d1b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2346,6 +2346,8 @@ dependencies = [ "jsonrpc-core 10.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-http-server 10.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.17 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.91 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/bench-exchange/src/bench.rs b/bench-exchange/src/bench.rs index ddabd9a12..99d68f052 100644 --- a/bench-exchange/src/bench.rs +++ b/bench-exchange/src/bench.rs @@ -892,7 +892,7 @@ pub fn airdrop_lamports(client: &Client, drone_addr: &SocketAddr, id: &Keypair, #[cfg(test)] mod tests { use super::*; - use solana::gossip_service::{discover_cluster, get_clients}; + use solana::gossip_service::{discover_cluster, get_multi_client}; use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana::validator::ValidatorConfig; use solana_drone::drone::run_local_drone; @@ -952,25 +952,20 @@ mod tests { exit(1); }); - let clients = get_clients(&nodes); + let (client, num_clients) = get_multi_client(&nodes); - if clients.len() < NUM_NODES { - error!( - "Error: Insufficient nodes discovered. Expecting {} or more", - NUM_NODES - ); - exit(1); - } + info!("clients: {}", num_clients); + assert!(num_clients >= NUM_NODES); const NUM_SIGNERS: u64 = 2; airdrop_lamports( - &clients[0], + &client, &drone_addr, &config.identity, fund_amount * (accounts_in_groups + 1) as u64 * NUM_SIGNERS, ); - do_bench_exchange(clients, config); + do_bench_exchange(vec![client], config); } #[test] diff --git a/bench-exchange/src/main.rs b/bench-exchange/src/main.rs index b6d9fc022..1cc9b0e28 100644 --- a/bench-exchange/src/main.rs +++ b/bench-exchange/src/main.rs @@ -8,7 +8,7 @@ extern crate solana_exchange_program; use crate::bench::{airdrop_lamports, do_bench_exchange, Config}; use log::*; -use solana::gossip_service::{discover_cluster, get_clients}; +use solana::gossip_service::{discover_cluster, get_multi_client}; use solana_sdk::signature::KeypairUtil; fn main() { @@ -39,10 +39,10 @@ fn main() { panic!("Failed to discover nodes"); }); - let clients = get_clients(&nodes); + let (client, num_clients) = get_multi_client(&nodes); - info!("{} nodes found", clients.len()); - if clients.len() < num_nodes { + info!("{} nodes found", num_clients); + if num_clients < num_nodes { panic!("Error: Insufficient nodes discovered"); } @@ -51,7 +51,7 @@ fn main() { let accounts_in_groups = batch_size * account_groups; const NUM_SIGNERS: u64 = 2; airdrop_lamports( - &clients[0], + &client, &drone_addr, &identity, fund_amount * (accounts_in_groups + 1) as u64 * NUM_SIGNERS, @@ -68,5 +68,5 @@ fn main() { account_groups, }; - do_bench_exchange(clients, config); + do_bench_exchange(vec![client], config); } diff --git a/bench-tps/src/main.rs b/bench-tps/src/main.rs index 38744eef8..faecf1361 100644 --- a/bench-tps/src/main.rs +++ b/bench-tps/src/main.rs @@ -2,7 +2,7 @@ mod bench; mod cli; use crate::bench::{do_bench_tps, generate_and_fund_keypairs, Config, NUM_LAMPORTS_PER_ACCOUNT}; -use solana::gossip_service::{discover_cluster, get_clients}; +use solana::gossip_service::{discover_cluster, get_multi_client}; use std::process::exit; fn main() { @@ -30,7 +30,10 @@ fn main() { eprintln!("Failed to discover {} nodes: {:?}", num_nodes, err); exit(1); }); - if nodes.len() < num_nodes { + + let (client, num_clients) = get_multi_client(&nodes); + + if nodes.len() < num_clients { eprintln!( "Error: Insufficient nodes discovered. Expecting {} or more", num_nodes @@ -38,10 +41,8 @@ fn main() { exit(1); } - let clients = get_clients(&nodes); - let (keypairs, keypair_balance) = generate_and_fund_keypairs( - &clients[0], + &client, Some(drone_addr), &id, tx_count, @@ -57,5 +58,5 @@ fn main() { sustained, }; - do_bench_tps(clients, config, keypairs, keypair_balance); + do_bench_tps(vec![client], config, keypairs, keypair_balance); } diff --git a/client/Cargo.toml b/client/Cargo.toml index ac7bc437f..835148c3a 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -14,6 +14,8 @@ bs58 = "0.2.0" log = "0.4.2" jsonrpc-core = "10.1.0" reqwest = "0.9.17" +rand = "0.6.5" +rayon = "1.0.0" serde = "1.0.89" serde_derive = "1.0.91" serde_json = "1.0.39" diff --git a/client/src/perf_utils.rs b/client/src/perf_utils.rs index eaceae3d7..303074bbd 100644 --- a/client/src/perf_utils.rs +++ b/client/src/perf_utils.rs @@ -36,7 +36,18 @@ pub fn sample_txs( total_elapsed = start_time.elapsed(); let elapsed = now.elapsed(); now = Instant::now(); - let mut txs = client.get_transaction_count().expect("transaction count"); + let mut txs; + match client.get_transaction_count() { + Err(e) => { + // ThinClient with multiple options should pick a better one now. + info!("Couldn't get transaction count {:?}", e); + sleep(Duration::from_secs(sample_period)); + continue; + } + Ok(tx_count) => { + txs = tx_count; + } + } if txs < last_txs { info!("Expected txs({}) >= last_txs({})", txs, last_txs); diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 3be1733b6..360250021 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -15,17 +15,100 @@ use solana_sdk::packet::PACKET_DATA_SIZE; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil, Signature}; use solana_sdk::system_instruction; +use solana_sdk::timing::duration_as_ms; use solana_sdk::transaction::{self, Transaction}; use solana_sdk::transport::Result as TransportResult; use std::io; use std::net::{SocketAddr, UdpSocket}; -use std::time::Duration; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::RwLock; +use std::time::{Duration, Instant}; + +struct ClientOptimizer { + cur_index: AtomicUsize, + experiment_index: AtomicUsize, + experiment_done: AtomicBool, + times: RwLock>, + num_clients: usize, +} + +fn min_index(array: &[u64]) -> (u64, usize) { + let mut min_time = std::u64::MAX; + let mut min_index = 0; + for (i, time) in array.iter().enumerate() { + if *time < min_time { + min_time = *time; + min_index = i; + } + } + (min_time, min_index) +} + +impl ClientOptimizer { + fn new(num_clients: usize) -> Self { + Self { + cur_index: AtomicUsize::new(0), + experiment_index: AtomicUsize::new(0), + experiment_done: AtomicBool::new(false), + times: RwLock::new(vec![std::u64::MAX; num_clients]), + num_clients, + } + } + + fn experiment(&self) -> usize { + if self.experiment_index.load(Ordering::Relaxed) < self.num_clients { + let old = self.experiment_index.fetch_add(1, Ordering::Relaxed); + if old < self.num_clients { + old + } else { + self.best() + } + } else { + self.best() + } + } + + fn report(&self, index: usize, time_ms: u64) { + if self.num_clients > 1 + && (!self.experiment_done.load(Ordering::Relaxed) || time_ms == std::u64::MAX) + { + trace!( + "report {} with {} exp: {}", + index, + time_ms, + self.experiment_index.load(Ordering::Relaxed) + ); + + self.times.write().unwrap()[index] = time_ms; + + if index == (self.num_clients - 1) || time_ms == std::u64::MAX { + let times = self.times.read().unwrap(); + let (min_time, min_index) = min_index(×); + trace!( + "done experimenting min: {} time: {} times: {:?}", + min_index, + min_time, + times + ); + + // Only 1 thread should grab the num_clients-1 index, so this should be ok. + self.cur_index.store(min_index, Ordering::Relaxed); + self.experiment_done.store(true, Ordering::Relaxed); + } + } + } + + fn best(&self) -> usize { + self.cur_index.load(Ordering::Relaxed) + } +} /// An object for querying and sending transactions to the network. pub struct ThinClient { - transactions_addr: SocketAddr, transactions_socket: UdpSocket, - rpc_client: RpcClient, + transactions_addrs: Vec, + rpc_clients: Vec, + optimizer: ClientOptimizer, } impl ThinClient { @@ -59,12 +142,39 @@ impl ThinClient { rpc_client: RpcClient, ) -> Self { Self { - rpc_client, - transactions_addr, transactions_socket, + transactions_addrs: vec![transactions_addr], + rpc_clients: vec![rpc_client], + optimizer: ClientOptimizer::new(0), } } + pub fn new_from_addrs( + transactions_addrs: Vec, + transactions_socket: UdpSocket, + rpc_sockets: Vec, + ) -> Self { + assert!(!transactions_addrs.is_empty()); + assert!(!rpc_sockets.is_empty()); + assert_eq!(rpc_sockets.len(), transactions_addrs.len()); + let rpc_len = rpc_sockets.len(); + let rpc_clients: Vec<_> = rpc_sockets.into_iter().map(RpcClient::new_socket).collect(); + Self { + transactions_addrs, + transactions_socket, + rpc_clients, + optimizer: ClientOptimizer::new(rpc_len), + } + } + + fn transactions_addr(&self) -> &SocketAddr { + &self.transactions_addrs[self.optimizer.best()] + } + + fn rpc_client(&self) -> &RpcClient { + &self.rpc_clients[self.optimizer.best()] + } + /// Retry a sending a signed Transaction to the server for processing. pub fn retry_transfer_until_confirmed( &self, @@ -100,15 +210,19 @@ impl ThinClient { serialize_into(&mut wr, &transaction) .expect("serialize Transaction in pub fn transfer_signed"); self.transactions_socket - .send_to(&buf[..], &self.transactions_addr)?; + .send_to(&buf[..], &self.transactions_addr())?; if self .poll_for_signature_confirmation(&transaction.signatures[0], min_confirmed_blocks) .is_ok() { return Ok(transaction.signatures[0]); } - info!("{} tries failed transfer to {}", x, self.transactions_addr); - let (blockhash, _fee_calculator) = self.rpc_client.get_recent_blockhash()?; + info!( + "{} tries failed transfer to {}", + x, + self.transactions_addr() + ); + let (blockhash, _fee_calculator) = self.rpc_client().get_recent_blockhash()?; transaction.sign(keypairs, blockhash); } Err(io::Error::new( @@ -123,39 +237,40 @@ impl ThinClient { polling_frequency: &Duration, timeout: &Duration, ) -> io::Result { - self.rpc_client + self.rpc_client() .poll_balance_with_timeout(pubkey, polling_frequency, timeout) } pub fn poll_get_balance(&self, pubkey: &Pubkey) -> io::Result { - self.rpc_client.poll_get_balance(pubkey) + self.rpc_client().poll_get_balance(pubkey) } pub fn wait_for_balance(&self, pubkey: &Pubkey, expected_balance: Option) -> Option { - self.rpc_client.wait_for_balance(pubkey, expected_balance) + self.rpc_client().wait_for_balance(pubkey, expected_balance) } /// Check a signature in the bank. This method blocks /// until the server sends a response. pub fn check_signature(&self, signature: &Signature) -> bool { - self.rpc_client.check_signature(signature) + self.rpc_client().check_signature(signature) } pub fn fullnode_exit(&self) -> io::Result { - self.rpc_client.fullnode_exit() + self.rpc_client().fullnode_exit() } + pub fn get_num_blocks_since_signature_confirmation( &mut self, sig: &Signature, ) -> io::Result { - self.rpc_client + self.rpc_client() .get_num_blocks_since_signature_confirmation(sig) } } impl Client for ThinClient { fn transactions_addr(&self) -> String { - self.transactions_addr.to_string() + self.transactions_addr().to_string() } } @@ -188,11 +303,11 @@ impl SyncClient for ThinClient { } fn get_account_data(&self, pubkey: &Pubkey) -> TransportResult>> { - Ok(self.rpc_client.get_account_data(pubkey).ok()) + Ok(self.rpc_client().get_account_data(pubkey).ok()) } fn get_balance(&self, pubkey: &Pubkey) -> TransportResult { - let balance = self.rpc_client.get_balance(pubkey)?; + let balance = self.rpc_client().get_balance(pubkey)?; Ok(balance) } @@ -201,7 +316,7 @@ impl SyncClient for ThinClient { signature: &Signature, ) -> TransportResult>> { let status = self - .rpc_client + .rpc_client() .get_signature_status(&signature.to_string()) .map_err(|err| { io::Error::new( @@ -213,12 +328,34 @@ impl SyncClient for ThinClient { } fn get_recent_blockhash(&self) -> TransportResult<(Hash, FeeCalculator)> { - Ok(self.rpc_client.get_recent_blockhash()?) + let index = self.optimizer.experiment(); + let now = Instant::now(); + let recent_blockhash = self.rpc_clients[index].get_recent_blockhash(); + match recent_blockhash { + Ok(recent_blockhash) => { + self.optimizer.report(index, duration_as_ms(&now.elapsed())); + Ok(recent_blockhash) + } + Err(e) => { + self.optimizer.report(index, std::u64::MAX); + Err(e)? + } + } } fn get_transaction_count(&self) -> TransportResult { - let transaction_count = self.rpc_client.get_transaction_count()?; - Ok(transaction_count) + let index = self.optimizer.experiment(); + let now = Instant::now(); + match self.rpc_client().get_transaction_count() { + Ok(transaction_count) => { + self.optimizer.report(index, duration_as_ms(&now.elapsed())); + Ok(transaction_count) + } + Err(e) => { + self.optimizer.report(index, std::u64::MAX); + Err(e)? + } + } } /// Poll the server until the signature has been confirmed by at least `min_confirmed_blocks` @@ -228,16 +365,17 @@ impl SyncClient for ThinClient { min_confirmed_blocks: usize, ) -> TransportResult<()> { Ok(self - .rpc_client + .rpc_client() .poll_for_signature_confirmation(signature, min_confirmed_blocks)?) } fn poll_for_signature(&self, signature: &Signature) -> TransportResult<()> { - Ok(self.rpc_client.poll_for_signature(signature)?) + Ok(self.rpc_client().poll_for_signature(signature)?) } fn get_new_blockhash(&self, blockhash: &Hash) -> TransportResult<(Hash, FeeCalculator)> { - Ok(self.rpc_client.get_new_blockhash(blockhash)?) + let new_blockhash = self.rpc_client().get_new_blockhash(blockhash)?; + Ok(new_blockhash) } } @@ -249,7 +387,7 @@ impl AsyncClient for ThinClient { .expect("serialize Transaction in pub fn transfer_signed"); assert!(buf.len() < PACKET_DATA_SIZE); self.transactions_socket - .send_to(&buf[..], &self.transactions_addr)?; + .send_to(&buf[..], &self.transactions_addr())?; Ok(transaction.signatures[0]) } fn async_send_message( @@ -296,3 +434,28 @@ pub fn create_client_with_timeout( let (_, transactions_socket) = solana_netutil::bind_in_range(range).unwrap(); ThinClient::new_socket_with_timeout(rpc, tpu, transactions_socket, timeout) } + +#[cfg(test)] +mod tests { + use super::*; + use rayon::prelude::*; + + #[test] + fn test_client_optimizer() { + solana_logger::setup(); + + const NUM_CLIENTS: usize = 5; + let optimizer = ClientOptimizer::new(NUM_CLIENTS); + (0..NUM_CLIENTS).into_par_iter().for_each(|_| { + let index = optimizer.experiment(); + optimizer.report(index, (NUM_CLIENTS - index) as u64); + }); + + let index = optimizer.experiment(); + optimizer.report(index, 50); + assert_eq!(optimizer.best(), NUM_CLIENTS - 1); + + optimizer.report(optimizer.best(), std::u64::MAX); + assert_eq!(optimizer.best(), NUM_CLIENTS - 2); + } +} diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 4d0b41ea4..35e1caadd 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -129,6 +129,22 @@ pub fn get_client(nodes: &[ContactInfo]) -> ThinClient { create_client(nodes[select], FULLNODE_PORT_RANGE) } +pub fn get_multi_client(nodes: &[ContactInfo]) -> (ThinClient, usize) { + let addrs: Vec<_> = nodes + .iter() + .filter_map(ContactInfo::valid_client_facing_addr) + .map(|addrs| addrs) + .collect(); + let rpcs: Vec<_> = addrs.iter().map(|addr| addr.0).collect(); + let tpus: Vec<_> = addrs.iter().map(|addr| addr.1).collect(); + let (_, transactions_socket) = solana_netutil::bind_in_range(FULLNODE_PORT_RANGE).unwrap(); + let num_nodes = tpus.len(); + ( + ThinClient::new_from_addrs(tpus, transactions_socket, rpcs), + num_nodes, + ) +} + fn spy( spy_ref: Arc>, num_nodes: Option,