ThinClient internal name grooming (#5800)

This commit is contained in:
Michael Vines 2019-09-06 09:07:40 -07:00 committed by GitHub
parent c3782082bc
commit 1f9fde5f7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 37 additions and 45 deletions

View File

@ -404,7 +404,7 @@ fn do_tx_transfers<T: Client>(
println!( println!(
"Transferring 1 unit {} times... to {}", "Transferring 1 unit {} times... to {}",
txs0.len(), txs0.len(),
client.as_ref().transactions_addr(), client.as_ref().tpu_addr(),
); );
let tx_len = txs0.len(); let tx_len = txs0.len();
let transfer_start = Instant::now(); let transfer_start = Instant::now();

View File

@ -79,7 +79,7 @@ pub fn sample_txs<T>(
sample_stats sample_stats
.write() .write()
.unwrap() .unwrap()
.push((client.transactions_addr(), stats)); .push((client.tpu_addr(), stats));
return; return;
} }
sleep(Duration::from_secs(sample_period)); sleep(Duration::from_secs(sample_period));

View File

@ -107,21 +107,17 @@ impl ClientOptimizer {
/// An object for querying and sending transactions to the network. /// An object for querying and sending transactions to the network.
pub struct ThinClient { pub struct ThinClient {
transactions_socket: UdpSocket, transactions_socket: UdpSocket,
transactions_addrs: Vec<SocketAddr>, tpu_addrs: Vec<SocketAddr>,
rpc_clients: Vec<RpcClient>, rpc_clients: Vec<RpcClient>,
optimizer: ClientOptimizer, optimizer: ClientOptimizer,
} }
impl ThinClient { impl ThinClient {
/// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP /// Create a new ThinClient that will interface with the Rpc at `rpc_addr` using TCP
/// and the Tpu at `transactions_addr` over `transactions_socket` using UDP. /// and the Tpu at `tpu_addr` over `transactions_socket` using UDP.
pub fn new( pub fn new(rpc_addr: SocketAddr, tpu_addr: SocketAddr, transactions_socket: UdpSocket) -> Self {
rpc_addr: SocketAddr,
transactions_addr: SocketAddr,
transactions_socket: UdpSocket,
) -> Self {
Self::new_from_client( Self::new_from_client(
transactions_addr, tpu_addr,
transactions_socket, transactions_socket,
RpcClient::new_socket(rpc_addr), RpcClient::new_socket(rpc_addr),
) )
@ -129,47 +125,47 @@ impl ThinClient {
pub fn new_socket_with_timeout( pub fn new_socket_with_timeout(
rpc_addr: SocketAddr, rpc_addr: SocketAddr,
transactions_addr: SocketAddr, tpu_addr: SocketAddr,
transactions_socket: UdpSocket, transactions_socket: UdpSocket,
timeout: Duration, timeout: Duration,
) -> Self { ) -> Self {
let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout); let rpc_client = RpcClient::new_socket_with_timeout(rpc_addr, timeout);
Self::new_from_client(transactions_addr, transactions_socket, rpc_client) Self::new_from_client(tpu_addr, transactions_socket, rpc_client)
} }
fn new_from_client( fn new_from_client(
transactions_addr: SocketAddr, tpu_addr: SocketAddr,
transactions_socket: UdpSocket, transactions_socket: UdpSocket,
rpc_client: RpcClient, rpc_client: RpcClient,
) -> Self { ) -> Self {
Self { Self {
transactions_socket, transactions_socket,
transactions_addrs: vec![transactions_addr], tpu_addrs: vec![tpu_addr],
rpc_clients: vec![rpc_client], rpc_clients: vec![rpc_client],
optimizer: ClientOptimizer::new(0), optimizer: ClientOptimizer::new(0),
} }
} }
pub fn new_from_addrs( pub fn new_from_addrs(
transactions_addrs: Vec<SocketAddr>, rpc_addrs: Vec<SocketAddr>,
tpu_addrs: Vec<SocketAddr>,
transactions_socket: UdpSocket, transactions_socket: UdpSocket,
rpc_sockets: Vec<SocketAddr>,
) -> Self { ) -> Self {
assert!(!transactions_addrs.is_empty()); assert!(!rpc_addrs.is_empty());
assert!(!rpc_sockets.is_empty()); assert_eq!(rpc_addrs.len(), tpu_addrs.len());
assert_eq!(rpc_sockets.len(), transactions_addrs.len());
let rpc_len = rpc_sockets.len(); let rpc_clients: Vec<_> = rpc_addrs.into_iter().map(RpcClient::new_socket).collect();
let rpc_clients: Vec<_> = rpc_sockets.into_iter().map(RpcClient::new_socket).collect(); let optimizer = ClientOptimizer::new(rpc_clients.len());
Self { Self {
transactions_addrs, tpu_addrs,
transactions_socket, transactions_socket,
rpc_clients, rpc_clients,
optimizer: ClientOptimizer::new(rpc_len), optimizer,
} }
} }
fn transactions_addr(&self) -> &SocketAddr { fn tpu_addr(&self) -> &SocketAddr {
&self.transactions_addrs[self.optimizer.best()] &self.tpu_addrs[self.optimizer.best()]
} }
fn rpc_client(&self) -> &RpcClient { fn rpc_client(&self) -> &RpcClient {
@ -218,7 +214,7 @@ impl ThinClient {
if num_confirmed == 0 { if num_confirmed == 0 {
// Send the transaction if there has been no confirmation (e.g. the first time) // Send the transaction if there has been no confirmation (e.g. the first time)
self.transactions_socket self.transactions_socket
.send_to(&buf[..], &self.transactions_addr())?; .send_to(&buf[..], &self.tpu_addr())?;
} }
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
@ -237,11 +233,7 @@ impl ThinClient {
); );
} }
} }
info!( info!("{} tries failed transfer to {}", x, self.tpu_addr());
"{} tries failed transfer to {}",
x,
self.transactions_addr()
);
let (blockhash, _fee_calculator) = self.rpc_client().get_recent_blockhash()?; let (blockhash, _fee_calculator) = self.rpc_client().get_recent_blockhash()?;
transaction.sign(keypairs, blockhash); transaction.sign(keypairs, blockhash);
} }
@ -289,8 +281,8 @@ impl ThinClient {
} }
impl Client for ThinClient { impl Client for ThinClient {
fn transactions_addr(&self) -> String { fn tpu_addr(&self) -> String {
self.transactions_addr().to_string() self.tpu_addr().to_string()
} }
} }
@ -421,7 +413,7 @@ impl AsyncClient for ThinClient {
.expect("serialize Transaction in pub fn transfer_signed"); .expect("serialize Transaction in pub fn transfer_signed");
assert!(buf.len() < PACKET_DATA_SIZE); assert!(buf.len() < PACKET_DATA_SIZE);
self.transactions_socket self.transactions_socket
.send_to(&buf[..], &self.transactions_addr())?; .send_to(&buf[..], &self.tpu_addr())?;
Ok(transaction.signatures[0]) Ok(transaction.signatures[0])
} }
fn async_send_message( fn async_send_message(

View File

@ -139,12 +139,12 @@ pub fn get_multi_client(nodes: &[ContactInfo]) -> (ThinClient, usize) {
.filter_map(ContactInfo::valid_client_facing_addr) .filter_map(ContactInfo::valid_client_facing_addr)
.map(|addrs| addrs) .map(|addrs| addrs)
.collect(); .collect();
let rpcs: Vec<_> = addrs.iter().map(|addr| addr.0).collect(); let rpc_addrs: Vec<_> = addrs.iter().map(|addr| addr.0).collect();
let tpus: Vec<_> = addrs.iter().map(|addr| addr.1).collect(); let tpu_addrs: Vec<_> = addrs.iter().map(|addr| addr.1).collect();
let (_, transactions_socket) = solana_netutil::bind_in_range(FULLNODE_PORT_RANGE).unwrap(); let (_, transactions_socket) = solana_netutil::bind_in_range(FULLNODE_PORT_RANGE).unwrap();
let num_nodes = tpus.len(); let num_nodes = tpu_addrs.len();
( (
ThinClient::new_from_addrs(tpus, transactions_socket, rpcs), ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, transactions_socket),
num_nodes, num_nodes,
) )
} }

View File

@ -585,9 +585,9 @@ impl RpcSol for RpcSolImpl {
})?; })?;
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions_addr = get_tpu_addr(&meta.cluster_info)?; let tpu_addr = get_tpu_addr(&meta.cluster_info)?;
transactions_socket transactions_socket
.send_to(&data, transactions_addr) .send_to(&data, tpu_addr)
.map_err(|err| { .map_err(|err| {
info!("request_airdrop: send_to error: {:?}", err); info!("request_airdrop: send_to error: {:?}", err);
Error::internal_error() Error::internal_error()
@ -628,10 +628,10 @@ impl RpcSol for RpcSolImpl {
return Err(Error::invalid_request()); return Err(Error::invalid_request());
} }
let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let transactions_addr = get_tpu_addr(&meta.cluster_info)?; let tpu_addr = get_tpu_addr(&meta.cluster_info)?;
trace!("send_transaction: leader is {:?}", &transactions_addr); trace!("send_transaction: leader is {:?}", &tpu_addr);
transactions_socket transactions_socket
.send_to(&data, transactions_addr) .send_to(&data, tpu_addr)
.map_err(|err| { .map_err(|err| {
info!("send_transaction: send_to error: {:?}", err); info!("send_transaction: send_to error: {:?}", err);
Error::internal_error() Error::internal_error()

View File

@ -24,7 +24,7 @@ pub struct BankClient {
} }
impl Client for BankClient { impl Client for BankClient {
fn transactions_addr(&self) -> String { fn tpu_addr(&self) -> String {
"Local BankClient".to_string() "Local BankClient".to_string()
} }
} }

View File

@ -19,7 +19,7 @@ use crate::transport::Result;
use std::io; use std::io;
pub trait Client: SyncClient + AsyncClient { pub trait Client: SyncClient + AsyncClient {
fn transactions_addr(&self) -> String; fn tpu_addr(&self) -> String;
} }
pub trait SyncClient { pub trait SyncClient {