Use batch send in bench-tps to send transactions (#27527)

* Use batch send in bench-tps to send transactions

* serialize using par iter
This commit is contained in:
Pankaj Garg 2022-09-01 10:32:23 -07:00 committed by GitHub
parent 31087b8aba
commit 49df1c47e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 3 deletions

View File

@ -14,9 +14,7 @@ impl BenchTpsClient for TpuClient {
Ok(signature)
}
fn send_batch(&self, transactions: Vec<Transaction>) -> Result<()> {
for transaction in transactions {
BenchTpsClient::send_transaction(self, transaction)?;
}
self.try_send_transaction_batch(&transactions)?;
Ok(())
}
fn get_latest_blockhash(&self) -> Result<Hash> {

View File

@ -231,6 +231,15 @@ async fn send_wire_transaction_to_addr(
conn.send_wire_transaction(wire_transaction.clone()).await
}
async fn send_wire_transaction_batch_to_addr(
connection_cache: &ConnectionCache,
addr: &SocketAddr,
wire_transactions: &[Vec<u8>],
) -> TransportResult<()> {
let conn = connection_cache.get_nonblocking_connection(addr);
conn.send_wire_transaction_batch(wire_transactions).await
}
impl TpuClient {
/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
@ -297,6 +306,50 @@ impl TpuClient {
}
}
/// Send a batch of wire transactions to the current and upcoming leader TPUs according to
/// fanout size
/// Returns the last error if all sends fail
pub async fn try_send_wire_transaction_batch(
&self,
wire_transactions: Vec<Vec<u8>>,
) -> TransportResult<()> {
let leaders = self
.leader_tpu_service
.leader_tpu_sockets(self.fanout_slots);
let futures = leaders
.iter()
.map(|addr| {
send_wire_transaction_batch_to_addr(
&self.connection_cache,
addr,
&wire_transactions,
)
})
.collect::<Vec<_>>();
let results: Vec<TransportResult<()>> = join_all(futures).await;
let mut last_error: Option<TransportError> = None;
let mut some_success = false;
for result in results {
if let Err(e) = result {
if last_error.is_none() {
last_error = Some(e);
}
} else {
some_success = true;
}
}
if !some_success {
Err(if let Some(err) = last_error {
err
} else {
std::io::Error::new(std::io::ErrorKind::Other, "No sends attempted").into()
})
} else {
Ok(())
}
}
/// Create a new client that disconnects when dropped
pub async fn new(
rpc_client: Arc<RpcClient>,

View File

@ -4,6 +4,7 @@ use {
connection_cache::ConnectionCache,
nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
},
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult},
std::{
@ -77,6 +78,20 @@ impl TpuClient {
self.invoke(self.tpu_client.try_send_transaction(transaction))
}
/// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
/// to fanout size
/// Returns the last error if all sends fail
pub fn try_send_transaction_batch(&self, transactions: &[Transaction]) -> TransportResult<()> {
let wire_transactions = transactions
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
self.invoke(
self.tpu_client
.try_send_wire_transaction_batch(wire_transactions),
)
}
/// Send a wire transaction to the current and upcoming leader TPUs according to fanout size
/// Returns the last error if all sends fail
pub fn try_send_wire_transaction(&self, wire_transaction: Vec<u8>) -> TransportResult<()> {