Allow thin clients to be passed across thread boundaries (#3887)

* Remove ThinClient wrapper

* Allow RpcClient (and ThinClient) to be passed across thread boundaries

* Pass clients, not constructors

* Fix bad rebase
This commit is contained in:
Greg Fitzgerald 2019-04-19 08:54:21 -06:00 committed by GitHub
parent baac21209e
commit 809b051f10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 62 deletions

View File

@ -11,7 +11,7 @@ use solana_exchange_api::exchange_state::*;
use solana_exchange_api::id; use solana_exchange_api::id;
use solana_metrics::influxdb; use solana_metrics::influxdb;
use solana_sdk::client::Client; use solana_sdk::client::Client;
use solana_sdk::client::{AsyncClient, SyncClient}; use solana_sdk::client::SyncClient;
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_instruction; use solana_sdk::system_instruction;
@ -74,11 +74,9 @@ pub struct SampleStats {
pub tx_count: u64, pub tx_count: u64,
} }
pub fn do_bench_exchange<F, T>(client_ctors: Vec<F>, config: Config) pub fn do_bench_exchange<T>(clients: Vec<T>, config: Config)
where where
F: Fn() -> T, T: 'static + Client + Send + Sync,
F: 'static + std::marker::Sync + std::marker::Send,
T: Client,
{ {
let Config { let Config {
identity, identity,
@ -91,8 +89,8 @@ where
} = config; } = config;
let accounts_in_groups = batch_size * account_groups; let accounts_in_groups = batch_size * account_groups;
let exit_signal = Arc::new(AtomicBool::new(false)); let exit_signal = Arc::new(AtomicBool::new(false));
let client_ctors: Vec<_> = client_ctors.into_iter().map(Arc::new).collect(); let clients: Vec<_> = clients.into_iter().map(Arc::new).collect();
let client = client_ctors[0](); let client = clients[0].as_ref();
let total_keys = accounts_in_groups as u64 * 5; let total_keys = accounts_in_groups as u64 * 5;
info!("Generating {:?} keys", total_keys); info!("Generating {:?} keys", total_keys);
@ -119,33 +117,31 @@ where
.collect(); .collect();
info!("Fund trader accounts"); info!("Fund trader accounts");
fund_keys(&client, &identity, &trader_signers, fund_amount); fund_keys(client, &identity, &trader_signers, fund_amount);
info!("Fund swapper accounts"); info!("Fund swapper accounts");
fund_keys(&client, &identity, &swapper_signers, fund_amount); fund_keys(client, &identity, &swapper_signers, fund_amount);
info!("Create {:?} source token accounts", src_pubkeys.len()); info!("Create {:?} source token accounts", src_pubkeys.len());
create_token_accounts(&client, &trader_signers, &src_pubkeys); create_token_accounts(client, &trader_signers, &src_pubkeys);
info!("Create {:?} destination token accounts", dst_pubkeys.len()); info!("Create {:?} destination token accounts", dst_pubkeys.len());
create_token_accounts(&client, &trader_signers, &dst_pubkeys); create_token_accounts(client, &trader_signers, &dst_pubkeys);
info!("Create {:?} profit token accounts", profit_pubkeys.len()); info!("Create {:?} profit token accounts", profit_pubkeys.len());
create_token_accounts(&client, &swapper_signers, &profit_pubkeys); create_token_accounts(client, &swapper_signers, &profit_pubkeys);
// Collect the max transaction rate and total tx count seen (single node only) // Collect the max transaction rate and total tx count seen (single node only)
let sample_stats = Arc::new(RwLock::new(Vec::new())); let sample_stats = Arc::new(RwLock::new(Vec::new()));
let sample_period = 1; // in seconds let sample_period = 1; // in seconds
info!("Sampling clients for tps every {} s", sample_period); info!("Sampling clients for tps every {} s", sample_period);
let sample_threads: Vec<_> = client_ctors let sample_threads: Vec<_> = clients
.iter() .iter()
.map(|ctor| { .map(|client| {
let exit_signal = exit_signal.clone(); let exit_signal = exit_signal.clone();
let sample_stats = sample_stats.clone(); let sample_stats = sample_stats.clone();
let client_ctor = ctor.clone(); let client = client.clone();
Builder::new() Builder::new()
.name("solana-exchange-sample".to_string()) .name("solana-exchange-sample".to_string())
.spawn(move || { .spawn(move || sample_tx_count(&exit_signal, &sample_stats, sample_period, &client))
sample_tx_count(&exit_signal, &sample_stats, sample_period, &client_ctor)
})
.unwrap() .unwrap()
}) })
.collect(); .collect();
@ -159,7 +155,7 @@ where
let shared_txs = shared_txs.clone(); let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let total_tx_sent_count = total_tx_sent_count.clone(); let total_tx_sent_count = total_tx_sent_count.clone();
let client_ctor = client_ctors[0].clone(); let client = clients[0].clone();
Builder::new() Builder::new()
.name("solana-exchange-transfer".to_string()) .name("solana-exchange-transfer".to_string())
.spawn(move || { .spawn(move || {
@ -168,7 +164,7 @@ where
&shared_txs, &shared_txs,
&shared_tx_active_thread_count, &shared_tx_active_thread_count,
&total_tx_sent_count, &total_tx_sent_count,
&client_ctor, &client,
) )
}) })
.unwrap() .unwrap()
@ -181,7 +177,7 @@ where
let exit_signal = exit_signal.clone(); let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone(); let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let client_ctor = client_ctors[0].clone(); let client = clients[0].clone();
Builder::new() Builder::new()
.name("solana-exchange-swapper".to_string()) .name("solana-exchange-swapper".to_string())
.spawn(move || { .spawn(move || {
@ -194,7 +190,7 @@ where
&profit_pubkeys, &profit_pubkeys,
batch_size, batch_size,
account_groups, account_groups,
&client_ctor, &client,
) )
}) })
.unwrap() .unwrap()
@ -205,7 +201,7 @@ where
let exit_signal = exit_signal.clone(); let exit_signal = exit_signal.clone();
let shared_txs = shared_txs.clone(); let shared_txs = shared_txs.clone();
let shared_tx_active_thread_count = shared_tx_active_thread_count.clone(); let shared_tx_active_thread_count = shared_tx_active_thread_count.clone();
let client_ctor = client_ctors[0].clone(); let client = clients[0].clone();
Builder::new() Builder::new()
.name("solana-exchange-trader".to_string()) .name("solana-exchange-trader".to_string())
.spawn(move || { .spawn(move || {
@ -220,7 +216,7 @@ where
trade_delay, trade_delay,
batch_size, batch_size,
account_groups, account_groups,
&client_ctor, &client,
) )
}) })
.unwrap() .unwrap()
@ -242,16 +238,14 @@ where
compute_and_report_stats(&sample_stats, total_tx_sent_count.load(Ordering::Relaxed)); compute_and_report_stats(&sample_stats, total_tx_sent_count.load(Ordering::Relaxed));
} }
fn sample_tx_count<F, T>( fn sample_tx_count<T>(
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
sample_stats: &Arc<RwLock<Vec<SampleStats>>>, sample_stats: &Arc<RwLock<Vec<SampleStats>>>,
sample_period: u64, sample_period: u64,
client_ctor: &Arc<F>, client: &Arc<T>,
) where ) where
F: Fn() -> T,
T: Client, T: Client,
{ {
let client = client_ctor();
let mut max_tps = 0.0; let mut max_tps = 0.0;
let mut total_tx_time; let mut total_tx_time;
let mut total_tx_count; let mut total_tx_count;
@ -300,18 +294,15 @@ fn sample_tx_count<F, T>(
} }
} }
fn do_tx_transfers<F, T>( fn do_tx_transfers<T>(
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
shared_txs: &SharedTransactions, shared_txs: &SharedTransactions,
shared_tx_thread_count: &Arc<AtomicIsize>, shared_tx_thread_count: &Arc<AtomicIsize>,
total_tx_sent_count: &Arc<AtomicUsize>, total_tx_sent_count: &Arc<AtomicUsize>,
client_ctor: &Arc<F>, client: &Arc<T>,
) where ) where
F: Fn() -> T,
T: Client, T: Client,
{ {
let client = client_ctor();
let async_client: &AsyncClient = &client;
let mut stats = Stats::default(); let mut stats = Stats::default();
loop { loop {
let txs; let txs;
@ -326,7 +317,7 @@ fn do_tx_transfers<F, T>(
shared_tx_thread_count.fetch_add(1, Ordering::Relaxed); shared_tx_thread_count.fetch_add(1, Ordering::Relaxed);
let now = Instant::now(); let now = Instant::now();
for tx in txs0 { for tx in txs0 {
async_client.async_send_transaction(tx).expect("Transfer"); client.async_send_transaction(tx).expect("Transfer");
} }
let duration = now.elapsed(); let duration = now.elapsed();
shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed); shared_tx_thread_count.fetch_add(-1, Ordering::Relaxed);
@ -382,7 +373,7 @@ struct TradeInfo {
order_info: TradeOrderInfo, order_info: TradeOrderInfo,
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn swapper<F, T>( fn swapper<T>(
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
receiver: &Receiver<Vec<TradeInfo>>, receiver: &Receiver<Vec<TradeInfo>>,
shared_txs: &SharedTransactions, shared_txs: &SharedTransactions,
@ -391,12 +382,10 @@ fn swapper<F, T>(
profit_pubkeys: &[Pubkey], profit_pubkeys: &[Pubkey],
batch_size: usize, batch_size: usize,
account_groups: usize, account_groups: usize,
client_ctor: &Arc<F>, client: &Arc<T>,
) where ) where
F: Fn() -> T,
T: Client, T: Client,
{ {
let client = client_ctor();
let mut stats = Stats::default(); let mut stats = Stats::default();
let mut order_book = OrderBook::default(); let mut order_book = OrderBook::default();
let mut account_group: usize = 0; let mut account_group: usize = 0;
@ -554,7 +543,7 @@ fn swapper<F, T>(
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn trader<F, T>( fn trader<T>(
exit_signal: &Arc<AtomicBool>, exit_signal: &Arc<AtomicBool>,
sender: &Sender<Vec<TradeInfo>>, sender: &Sender<Vec<TradeInfo>>,
shared_txs: &SharedTransactions, shared_txs: &SharedTransactions,
@ -565,12 +554,10 @@ fn trader<F, T>(
delay: u64, delay: u64,
batch_size: usize, batch_size: usize,
account_groups: usize, account_groups: usize,
client_ctor: &Arc<F>, client: &Arc<T>,
) where ) where
F: Fn() -> T,
T: Client, T: Client,
{ {
let client = client_ctor();
let mut stats = Stats::default(); let mut stats = Stats::default();
// TODO Hard coded for now // TODO Hard coded for now
@ -1029,7 +1016,6 @@ mod tests {
use solana::gossip_service::discover_nodes; use solana::gossip_service::discover_nodes;
use solana::local_cluster::{ClusterConfig, LocalCluster}; use solana::local_cluster::{ClusterConfig, LocalCluster};
use solana_client::thin_client::create_client; use solana_client::thin_client::create_client;
use solana_client::thin_client::ThinClient;
use solana_drone::drone::run_local_drone; use solana_drone::drone::run_local_drone;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
@ -1085,7 +1071,8 @@ mod tests {
error!("Failed to discover {} nodes: {:?}", NUM_NODES, err); error!("Failed to discover {} nodes: {:?}", NUM_NODES, err);
exit(1); exit(1);
}); });
let client_ctors: Vec<_> = nodes
let clients: Vec<_> = nodes
.iter() .iter()
.filter_map(|node| { .filter_map(|node| {
let cluster_entrypoint = node.clone(); let cluster_entrypoint = node.clone();
@ -1093,16 +1080,15 @@ mod tests {
if ContactInfo::is_valid_address(&cluster_addrs.0) if ContactInfo::is_valid_address(&cluster_addrs.0)
&& ContactInfo::is_valid_address(&cluster_addrs.1) && ContactInfo::is_valid_address(&cluster_addrs.1)
{ {
let client_ctor = let client = create_client(cluster_addrs, FULLNODE_PORT_RANGE);
move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) }; Some(client)
Some(client_ctor)
} else { } else {
None None
} }
}) })
.collect(); .collect();
if client_ctors.len() < NUM_NODES { if clients.len() < NUM_NODES {
error!( error!(
"Error: Insufficient nodes discovered. Expecting {} or more", "Error: Insufficient nodes discovered. Expecting {} or more",
NUM_NODES NUM_NODES
@ -1110,14 +1096,13 @@ mod tests {
exit(1); exit(1);
} }
let client = client_ctors[0]();
airdrop_lamports( airdrop_lamports(
&client, &clients[0],
&drone_addr, &drone_addr,
&config.identity, &config.identity,
fund_amount * (accounts_in_groups + 1) as u64 * 2, fund_amount * (accounts_in_groups + 1) as u64 * 2,
); );
do_bench_exchange(client_ctors, config); do_bench_exchange(clients, config);
} }
} }

View File

@ -8,7 +8,6 @@ use solana::cluster_info::FULLNODE_PORT_RANGE;
use solana::contact_info::ContactInfo; use solana::contact_info::ContactInfo;
use solana::gossip_service::discover_nodes; use solana::gossip_service::discover_nodes;
use solana_client::thin_client::create_client; use solana_client::thin_client::create_client;
use solana_client::thin_client::ThinClient;
use solana_sdk::signature::KeypairUtil; use solana_sdk::signature::KeypairUtil;
fn main() { fn main() {
@ -35,7 +34,8 @@ fn main() {
let nodes = discover_nodes(&network_addr, num_nodes).unwrap_or_else(|_| { let nodes = discover_nodes(&network_addr, num_nodes).unwrap_or_else(|_| {
panic!("Failed to discover nodes"); panic!("Failed to discover nodes");
}); });
let client_ctors: Vec<_> = nodes
let clients: Vec<_> = nodes
.iter() .iter()
.filter_map(|node| { .filter_map(|node| {
let cluster_entrypoint = node.clone(); let cluster_entrypoint = node.clone();
@ -43,26 +43,24 @@ fn main() {
if ContactInfo::is_valid_address(&cluster_addrs.0) if ContactInfo::is_valid_address(&cluster_addrs.0)
&& ContactInfo::is_valid_address(&cluster_addrs.1) && ContactInfo::is_valid_address(&cluster_addrs.1)
{ {
let client_ctor = let client = create_client(cluster_addrs, FULLNODE_PORT_RANGE);
move || -> ThinClient { create_client(cluster_addrs, FULLNODE_PORT_RANGE) }; Some(client)
Some(client_ctor)
} else { } else {
None None
} }
}) })
.collect(); .collect();
info!("{} nodes found", client_ctors.len()); info!("{} nodes found", clients.len());
if client_ctors.len() < num_nodes { if clients.len() < num_nodes {
panic!("Error: Insufficient nodes discovered"); panic!("Error: Insufficient nodes discovered");
} }
info!("Funding keypair: {}", identity.pubkey()); info!("Funding keypair: {}", identity.pubkey());
let client = client_ctors[0]();
let accounts_in_groups = batch_size * account_groups; let accounts_in_groups = batch_size * account_groups;
airdrop_lamports( airdrop_lamports(
&client, &clients[0],
&drone_addr, &drone_addr,
&identity, &identity,
fund_amount * (accounts_in_groups + 1) as u64 * 2, fund_amount * (accounts_in_groups + 1) as u64 * 2,
@ -78,5 +76,5 @@ fn main() {
account_groups, account_groups,
}; };
do_bench_exchange(client_ctors, config); do_bench_exchange(clients, config);
} }

View File

@ -19,7 +19,7 @@ use std::thread::sleep;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
pub struct RpcClient { pub struct RpcClient {
client: Box<GenericRpcClientRequest>, client: Box<GenericRpcClientRequest + Send + Sync>,
} }
impl RpcClient { impl RpcClient {
@ -743,4 +743,9 @@ mod tests {
); );
} }
#[test]
fn test_rpc_client_thread() {
let rpc_client = RpcClient::new_mock("succeeds".to_string());
thread::spawn(move || rpc_client);
}
} }