Refactor thin_client::create_client (#24067)

Refactor the thin_client::create_client to take addresses separately instead of as a tuple

Co-authored-by: Bijie Zhu <bijiezhu@Bijies-MBP.cable.rcn.com>
This commit is contained in:
BG Zhu 2022-04-06 11:03:38 -04:00 committed by GitHub
parent a38bd4acc8
commit 22224127e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 24 deletions

View File

@ -44,10 +44,10 @@ fn test_bench_tps_local_cluster(config: Config) {
100_000_000, 100_000_000,
); );
let client = Arc::new(create_client(( let client = Arc::new(create_client(
cluster.entry_point_info.rpc, cluster.entry_point_info.rpc,
cluster.entry_point_info.tpu, cluster.entry_point_info.tpu,
))); ));
let (addr_sender, addr_receiver) = unbounded(); let (addr_sender, addr_receiver) = unbounded();
run_local_faucet_with_port(faucet_keypair, addr_sender, None, 0); run_local_faucet_with_port(faucet_keypair, addr_sender, None, 0);

View File

@ -612,12 +612,13 @@ impl AsyncClient for ThinClient {
} }
} }
pub fn create_client((rpc, tpu): (SocketAddr, SocketAddr)) -> ThinClient { pub fn create_client(rpc: SocketAddr, tpu: SocketAddr) -> ThinClient {
ThinClient::new(rpc, tpu) ThinClient::new(rpc, tpu)
} }
pub fn create_client_with_timeout( pub fn create_client_with_timeout(
(rpc, tpu): (SocketAddr, SocketAddr), rpc: SocketAddr,
tpu: SocketAddr,
timeout: Duration, timeout: Duration,
) -> ThinClient { ) -> ThinClient {
ThinClient::new_socket_with_timeout(rpc, tpu, timeout) ThinClient::new_socket_with_timeout(rpc, tpu, timeout)

View File

@ -195,7 +195,7 @@ pub fn get_clients(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) -
nodes nodes
.iter() .iter()
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) .filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
.map(create_client) .map(|(rpc, tpu)| create_client(rpc, tpu))
.collect() .collect()
} }
@ -206,7 +206,8 @@ pub fn get_client(nodes: &[ContactInfo], socket_addr_space: &SocketAddrSpace) ->
.filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space)) .filter_map(|node| ContactInfo::valid_client_facing_addr(node, socket_addr_space))
.collect(); .collect();
let select = thread_rng().gen_range(0, nodes.len()); let select = thread_rng().gen_range(0, nodes.len());
create_client(nodes[select]) let (rpc, tpu) = nodes[select];
create_client(rpc, tpu)
} }
pub fn get_multi_client( pub fn get_multi_client(

View File

@ -60,7 +60,8 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
return; return;
} }
let random_keypair = Keypair::new(); let random_keypair = Keypair::new();
let client = create_client(ingress_node.client_facing_addr()); let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
let bal = client let bal = client
.poll_get_balance_with_commitment( .poll_get_balance_with_commitment(
&funding_keypair.pubkey(), &funding_keypair.pubkey(),
@ -81,7 +82,8 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
if ignore_nodes.contains(&validator.id) { if ignore_nodes.contains(&validator.id) {
continue; continue;
} }
let client = create_client(validator.client_facing_addr()); let (rpc, tpu) = validator.client_facing_addr();
let client = create_client(rpc, tpu);
client.poll_for_signature_confirmation(&sig, confs).unwrap(); client.poll_for_signature_confirmation(&sig, confs).unwrap();
} }
}); });
@ -91,7 +93,8 @@ pub fn verify_balances<S: ::std::hash::BuildHasher>(
expected_balances: HashMap<Pubkey, u64, S>, expected_balances: HashMap<Pubkey, u64, S>,
node: &ContactInfo, node: &ContactInfo,
) { ) {
let client = create_client(node.client_facing_addr()); let (rpc, tpu) = node.client_facing_addr();
let client = create_client(rpc, tpu);
for (pk, b) in expected_balances { for (pk, b) in expected_balances {
let bal = client let bal = client
.poll_get_balance_with_commitment(&pk, CommitmentConfig::processed()) .poll_get_balance_with_commitment(&pk, CommitmentConfig::processed())
@ -106,7 +109,8 @@ pub fn send_many_transactions(
max_tokens_per_transfer: u64, max_tokens_per_transfer: u64,
num_txs: u64, num_txs: u64,
) -> HashMap<Pubkey, u64> { ) -> HashMap<Pubkey, u64> {
let client = create_client(node.client_facing_addr()); let (rpc, tpu) = node.client_facing_addr();
let client = create_client(rpc, tpu);
let mut expected_balances = HashMap::new(); let mut expected_balances = HashMap::new();
for _ in 0..num_txs { for _ in 0..num_txs {
let random_keypair = Keypair::new(); let random_keypair = Keypair::new();
@ -197,7 +201,9 @@ pub fn kill_entry_and_spend_and_verify_rest(
let cluster_nodes = let cluster_nodes =
discover_cluster(&entry_point_info.gossip, nodes, socket_addr_space).unwrap(); discover_cluster(&entry_point_info.gossip, nodes, socket_addr_space).unwrap();
assert!(cluster_nodes.len() >= nodes); assert!(cluster_nodes.len() >= nodes);
let client = create_client(entry_point_info.client_facing_addr()); let (rpc, tpu) = entry_point_info.client_facing_addr();
let client = create_client(rpc, tpu);
// sleep long enough to make sure we are in epoch 3 // sleep long enough to make sure we are in epoch 3
let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1); let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1);
@ -225,7 +231,8 @@ pub fn kill_entry_and_spend_and_verify_rest(
continue; continue;
} }
let client = create_client(ingress_node.client_facing_addr()); let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
let balance = client let balance = client
.poll_get_balance_with_commitment( .poll_get_balance_with_commitment(
&funding_keypair.pubkey(), &funding_keypair.pubkey(),
@ -296,7 +303,8 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo],
assert!(loop_start.elapsed() < loop_timeout); assert!(loop_start.elapsed() < loop_timeout);
for (i, ingress_node) in contact_infos.iter().enumerate() { for (i, ingress_node) in contact_infos.iter().enumerate() {
let client = create_client(ingress_node.client_facing_addr()); let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
let root_slot = client let root_slot = client
.get_slot_with_commitment(CommitmentConfig::finalized()) .get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap_or(0); .unwrap_or(0);
@ -327,7 +335,8 @@ pub fn check_no_new_roots(
.iter() .iter()
.enumerate() .enumerate()
.map(|(i, ingress_node)| { .map(|(i, ingress_node)| {
let client = create_client(ingress_node.client_facing_addr()); let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
let initial_root = client let initial_root = client
.get_slot() .get_slot()
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.id)); .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.id));
@ -345,7 +354,8 @@ pub fn check_no_new_roots(
let mut reached_end_slot = false; let mut reached_end_slot = false;
loop { loop {
for contact_info in contact_infos { for contact_info in contact_infos {
let client = create_client(contact_info.client_facing_addr()); let (rpc, tpu) = contact_info.client_facing_addr();
let client = create_client(rpc, tpu);
current_slot = client current_slot = client
.get_slot_with_commitment(CommitmentConfig::processed()) .get_slot_with_commitment(CommitmentConfig::processed())
.unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].id)); .unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].id));
@ -367,7 +377,8 @@ pub fn check_no_new_roots(
} }
for (i, ingress_node) in contact_infos.iter().enumerate() { for (i, ingress_node) in contact_infos.iter().enumerate() {
let client = create_client(ingress_node.client_facing_addr()); let (rpc, tpu) = ingress_node.client_facing_addr();
let client = create_client(rpc, tpu);
assert_eq!( assert_eq!(
client client
.get_slot() .get_slot()
@ -387,7 +398,8 @@ fn poll_all_nodes_for_signature(
if validator.id == entry_point_info.id { if validator.id == entry_point_info.id {
continue; continue;
} }
let client = create_client(validator.client_facing_addr()); let (rpc, tpu) = validator.client_facing_addr();
let client = create_client(rpc, tpu);
client.poll_for_signature_confirmation(sig, confs)?; client.poll_for_signature_confirmation(sig, confs)?;
} }

View File

@ -388,7 +388,8 @@ impl LocalCluster {
mut voting_keypair: Option<Arc<Keypair>>, mut voting_keypair: Option<Arc<Keypair>>,
socket_addr_space: SocketAddrSpace, socket_addr_space: SocketAddrSpace,
) -> Pubkey { ) -> Pubkey {
let client = create_client(self.entry_point_info.client_facing_addr()); let (rpc, tpu) = self.entry_point_info.client_facing_addr();
let client = create_client(rpc, tpu);
// Must have enough tokens to fund vote account and set delegate // Must have enough tokens to fund vote account and set delegate
let should_create_vote_pubkey = voting_keypair.is_none(); let should_create_vote_pubkey = voting_keypair.is_none();
@ -472,7 +473,8 @@ impl LocalCluster {
} }
pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 {
let client = create_client(self.entry_point_info.client_facing_addr()); let (rpc, tpu) = self.entry_point_info.client_facing_addr();
let client = create_client(rpc, tpu);
Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports)
} }
@ -694,9 +696,10 @@ impl Cluster for LocalCluster {
} }
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient> { fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient> {
self.validators self.validators.get(pubkey).map(|f| {
.get(pubkey) let (rpc, tpu) = f.info.contact_info.client_facing_addr();
.map(|f| create_client(f.info.contact_info.client_facing_addr())) create_client(rpc, tpu)
})
} }
fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo { fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo {

View File

@ -211,7 +211,9 @@ fn test_local_cluster_signature_subscribe() {
.unwrap(); .unwrap();
let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap();
let tx_client = create_client(non_bootstrap_info.client_facing_addr()); let (rpc, tpu) = non_bootstrap_info.client_facing_addr();
let tx_client = create_client(rpc, tpu);
let (blockhash, _) = tx_client let (blockhash, _) = tx_client
.get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap(); .unwrap();
@ -516,7 +518,8 @@ fn test_mainnet_beta_cluster_type() {
.unwrap(); .unwrap();
assert_eq!(cluster_nodes.len(), 1); assert_eq!(cluster_nodes.len(), 1);
let client = create_client(cluster.entry_point_info.client_facing_addr()); let (rpc, tpu) = cluster.entry_point_info.client_facing_addr();
let client = create_client(rpc, tpu);
// Programs that are available at epoch 0 // Programs that are available at epoch 0
for program_id in [ for program_id in [