From 1f9fde5f7b8ab5ca8bdcf20befb02cfa96f5ce64 Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Fri, 6 Sep 2019 09:07:40 -0700 Subject: [PATCH] ThinClient internal name grooming (#5800) --- bench-tps/src/bench.rs | 2 +- client/src/perf_utils.rs | 2 +- client/src/thin_client.rs | 56 ++++++++++++++++---------------------- core/src/gossip_service.rs | 8 +++--- core/src/rpc.rs | 10 +++---- runtime/src/bank_client.rs | 2 +- sdk/src/client.rs | 2 +- 7 files changed, 37 insertions(+), 45 deletions(-) diff --git a/bench-tps/src/bench.rs b/bench-tps/src/bench.rs index 038afc4fa..fc0072c97 100644 --- a/bench-tps/src/bench.rs +++ b/bench-tps/src/bench.rs @@ -404,7 +404,7 @@ fn do_tx_transfers( println!( "Transferring 1 unit {} times... to {}", txs0.len(), - client.as_ref().transactions_addr(), + client.as_ref().tpu_addr(), ); let tx_len = txs0.len(); let transfer_start = Instant::now(); diff --git a/client/src/perf_utils.rs b/client/src/perf_utils.rs index 303074bbd..0a1d2f7ef 100644 --- a/client/src/perf_utils.rs +++ b/client/src/perf_utils.rs @@ -79,7 +79,7 @@ pub fn sample_txs( sample_stats .write() .unwrap() - .push((client.transactions_addr(), stats)); + .push((client.tpu_addr(), stats)); return; } sleep(Duration::from_secs(sample_period)); diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index c4f6b1e63..7c51ae456 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -107,21 +107,17 @@ impl ClientOptimizer { /// An object for querying and sending transactions to the network. pub struct ThinClient { transactions_socket: UdpSocket, - transactions_addrs: Vec, + tpu_addrs: Vec, rpc_clients: Vec, optimizer: ClientOptimizer, } impl ThinClient { /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP - /// and the Tpu at `transactions_addr` over `transactions_socket` using UDP. - pub fn new( - rpc_addr: SocketAddr, - transactions_addr: SocketAddr, - transactions_socket: UdpSocket, - ) -> Self { + /// and the Tpu at `tpu_addr` over `transactions_socket` using UDP. + pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr, transactions_socket: UdpSocket) -> Self { Self::new_from_client( - transactions_addr, + tpu_addr, transactions_socket, RpcClient::new_socket(rpc_addr), ) @@ -129,47 +125,47 @@ impl ThinClient { pub fn new_socket_with_timeout( rpc_addr: SocketAddr, - transactions_addr: SocketAddr, + tpu_addr: SocketAddr, transactions_socket: UdpSocket, timeout: Duration, ) -> Self { let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout); - Self::new_from_client(transactions_addr, transactions_socket, rpc_client) + Self::new_from_client(tpu_addr, transactions_socket, rpc_client) } fn new_from_client( - transactions_addr: SocketAddr, + tpu_addr: SocketAddr, transactions_socket: UdpSocket, rpc_client: RpcClient, ) -> Self { Self { transactions_socket, - transactions_addrs: vec![transactions_addr], + tpu_addrs: vec![tpu_addr], rpc_clients: vec![rpc_client], optimizer: ClientOptimizer::new(0), } } pub fn new_from_addrs( - transactions_addrs: Vec, + rpc_addrs: Vec, + tpu_addrs: Vec, transactions_socket: UdpSocket, - rpc_sockets: Vec, ) -> Self { - assert!(!transactions_addrs.is_empty()); - assert!(!rpc_sockets.is_empty()); - assert_eq!(rpc_sockets.len(), transactions_addrs.len()); - let rpc_len = rpc_sockets.len(); - let rpc_clients: Vec<_> = rpc_sockets.into_iter().map(RpcClient::new_socket).collect(); + assert!(!rpc_addrs.is_empty()); + assert_eq!(rpc_addrs.len(), tpu_addrs.len()); + + let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect(); + let optimizer = ClientOptimizer::new(rpc_clients.len()); Self { - transactions_addrs, + tpu_addrs, transactions_socket, rpc_clients, - optimizer: ClientOptimizer::new(rpc_len), + optimizer, } } - fn transactions_addr(&self) -> &SocketAddr { - &self.transactions_addrs[self.optimizer.best()] + fn tpu_addr(&self) -> &SocketAddr { + &self.tpu_addrs[self.optimizer.best()] } fn rpc_client(&self) -> &RpcClient { @@ -218,7 +214,7 @@ impl ThinClient { if num_confirmed == 0 { // Send the transaction if there has been no confirmation (e.g. the first time) self.transactions_socket - .send_to(&buf[..], &self.transactions_addr())?; + .send_to(&buf[..], &self.tpu_addr())?; } if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( @@ -237,11 +233,7 @@ impl ThinClient { ); } } - info!( - "{} tries failed transfer to {}", - x, - self.transactions_addr() - ); + info!("{} tries failed transfer to {}", x, self.tpu_addr()); let (blockhash, _fee_calculator) = self.rpc_client().get_recent_blockhash()?; transaction.sign(keypairs, blockhash); } @@ -289,8 +281,8 @@ impl ThinClient { } impl Client for ThinClient { - fn transactions_addr(&self) -> String { - self.transactions_addr().to_string() + fn tpu_addr(&self) -> String { + self.tpu_addr().to_string() } } @@ -421,7 +413,7 @@ impl AsyncClient for ThinClient { .expect("serialize Transaction in pub fn transfer_signed"); assert!(buf.len() < PACKET_DATA_SIZE); self.transactions_socket - .send_to(&buf[..], &self.transactions_addr())?; + .send_to(&buf[..], &self.tpu_addr())?; Ok(transaction.signatures[0]) } fn async_send_message( diff --git a/core/src/gossip_service.rs b/core/src/gossip_service.rs index 227a34d69..45a9e08db 100644 --- a/core/src/gossip_service.rs +++ b/core/src/gossip_service.rs @@ -139,12 +139,12 @@ pub fn get_multi_client(nodes: &[ContactInfo]) -> (ThinClient, usize) { .filter_map(ContactInfo::valid_client_facing_addr) .map(|addrs| addrs) .collect(); - let rpcs: Vec<_> = addrs.iter().map(|addr| addr.0).collect(); - let tpus: Vec<_> = addrs.iter().map(|addr| addr.1).collect(); + let rpc_addrs: Vec<_> = addrs.iter().map(|addr| addr.0).collect(); + let tpu_addrs: Vec<_> = addrs.iter().map(|addr| addr.1).collect(); let (_, transactions_socket) = solana_netutil::bind_in_range(FULLNODE_PORT_RANGE).unwrap(); - let num_nodes = tpus.len(); + let num_nodes = tpu_addrs.len(); ( - ThinClient::new_from_addrs(tpus, transactions_socket, rpcs), + ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, transactions_socket), num_nodes, ) } diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 423b4b568..942c757a0 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -585,9 +585,9 @@ impl RpcSol for RpcSolImpl { })?; let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let transactions_addr = get_tpu_addr(&meta.cluster_info)?; + let tpu_addr = get_tpu_addr(&meta.cluster_info)?; transactions_socket - .send_to(&data, transactions_addr) + .send_to(&data, tpu_addr) .map_err(|err| { info!("request_airdrop: send_to error: {:?}", err); Error::internal_error() @@ -628,10 +628,10 @@ impl RpcSol for RpcSolImpl { return Err(Error::invalid_request()); } let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let transactions_addr = get_tpu_addr(&meta.cluster_info)?; - trace!("send_transaction: leader is {:?}", &transactions_addr); + let tpu_addr = get_tpu_addr(&meta.cluster_info)?; + trace!("send_transaction: leader is {:?}", &tpu_addr); transactions_socket - .send_to(&data, transactions_addr) + .send_to(&data, tpu_addr) .map_err(|err| { info!("send_transaction: send_to error: {:?}", err); Error::internal_error() diff --git a/runtime/src/bank_client.rs b/runtime/src/bank_client.rs index 40b60191a..d76f3fdde 100644 --- a/runtime/src/bank_client.rs +++ b/runtime/src/bank_client.rs @@ -24,7 +24,7 @@ pub struct BankClient { } impl Client for BankClient { - fn transactions_addr(&self) -> String { + fn tpu_addr(&self) -> String { "Local BankClient".to_string() } } diff --git a/sdk/src/client.rs b/sdk/src/client.rs index 67e6586fb..b1f0ce9d6 100644 --- a/sdk/src/client.rs +++ b/sdk/src/client.rs @@ -19,7 +19,7 @@ use crate::transport::Result; use std::io; pub trait Client: SyncClient + AsyncClient { - fn transactions_addr(&self) -> String; + fn tpu_addr(&self) -> String; } pub trait SyncClient {