diff --git a/client/src/quic_client.rs b/client/src/quic_client.rs index c5bfb4855b..204b9d78f3 100644 --- a/client/src/quic_client.rs +++ b/client/src/quic_client.rs @@ -7,10 +7,8 @@ use { futures::future::join_all, itertools::Itertools, quinn::{ClientConfig, Endpoint, EndpointConfig, NewConnection, WriteError}, - rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_sdk::{ quic::{QUIC_MAX_CONCURRENT_STREAMS, QUIC_PORT_OFFSET}, - transaction::Transaction, transport::Result as TransportResult, }, std::{ @@ -65,21 +63,19 @@ impl TpuConnection for QuicTpuConnection { &self.client.addr } - fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()> { + fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> { let _guard = self.client.runtime.enter(); - let send_buffer = self.client.send_buffer(data); + let send_buffer = self.client.send_buffer(wire_transaction); self.client.runtime.block_on(send_buffer)?; Ok(()) } - fn send_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { - let buffers = transactions - .into_par_iter() - .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) - .collect::>(); - + fn send_wire_transaction_batch( + &self, + wire_transaction_batch: &[Vec], + ) -> TransportResult<()> { let _guard = self.client.runtime.enter(); - let send_batch = self.client.send_batch(&buffers); + let send_batch = self.client.send_batch(wire_transaction_batch); self.client.runtime.block_on(send_batch)?; Ok(()) } diff --git a/client/src/thin_client.rs b/client/src/thin_client.rs index 27b1536d0f..0ebc4f02f6 100644 --- a/client/src/thin_client.rs +++ b/client/src/thin_client.rs @@ -24,7 +24,7 @@ use { signers::Signers, system_instruction, timing::duration_as_ms, - transaction::{self, Transaction}, + transaction::{self, Transaction, VersionedTransaction}, transport::Result as TransportResult, }, std::{ @@ -215,10 +215,13 @@ impl ThinClient { let mut num_confirmed = 0; let mut wait_time = MAX_PROCESSING_AGE; // resend the same transaction until the transaction has no chance of succeeding + let wire_transaction = + bincode::serialize(&transaction).expect("transaction serialization failed"); while now.elapsed().as_secs() < wait_time as u64 { if num_confirmed == 0 { // Send the transaction if there has been no confirmation (e.g. the first time) - self.tpu_connection().send_transaction(transaction)?; + self.tpu_connection() + .send_wire_transaction(&wire_transaction)?; } if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation( @@ -601,12 +604,17 @@ impl SyncClient for ThinClient { impl AsyncClient for ThinClient { fn async_send_transaction(&self, transaction: Transaction) -> TransportResult { - self.tpu_connection().send_transaction(&transaction)?; + let transaction = VersionedTransaction::from(transaction); + self.tpu_connection() + .serialize_and_send_transaction(&transaction)?; Ok(transaction.signatures[0]) } fn async_send_batch(&self, transactions: Vec) -> TransportResult<()> { - self.tpu_connection().send_batch(&transactions) + let batch: Vec = transactions.into_iter().map(Into::into).collect(); + self.tpu_connection() + .par_serialize_and_send_transaction_batch(&batch)?; + Ok(()) } fn async_send_message( diff --git a/client/src/tpu_connection.rs b/client/src/tpu_connection.rs index c9036c5d31..8f61c27d3a 100644 --- a/client/src/tpu_connection.rs +++ b/client/src/tpu_connection.rs @@ -1,5 +1,6 @@ use { - solana_sdk::{transaction::Transaction, transport::Result as TransportResult}, + rayon::iter::{IntoParallelRefIterator, ParallelIterator}, + solana_sdk::{transaction::VersionedTransaction, transport::Result as TransportResult}, std::net::{SocketAddr, UdpSocket}, }; @@ -10,12 +11,35 @@ pub trait TpuConnection { fn tpu_addr(&self) -> &SocketAddr; - fn send_transaction(&self, tx: &Transaction) -> TransportResult<()> { - let data = bincode::serialize(tx).expect("serialize Transaction in send_transaction"); - self.send_wire_transaction(&data) + fn serialize_and_send_transaction( + &self, + transaction: &VersionedTransaction, + ) -> TransportResult<()> { + let wire_transaction = + bincode::serialize(transaction).expect("serialize Transaction in send_batch"); + self.send_wire_transaction(&wire_transaction) } - fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()>; + fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()>; - fn send_batch(&self, transactions: &[Transaction]) -> TransportResult<()>; + fn par_serialize_and_send_transaction_batch( + &self, + transaction_batch: &[VersionedTransaction], + ) -> TransportResult<()> { + let wire_transaction_batch: Vec<_> = transaction_batch + .par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect(); + self.send_wire_transaction_batch(&wire_transaction_batch) + } + + fn send_wire_transaction_batch( + &self, + wire_transaction_batch: &[Vec], + ) -> TransportResult<()> { + for wire_transaction in wire_transaction_batch { + self.send_wire_transaction(wire_transaction)?; + } + Ok(()) + } } diff --git a/client/src/udp_client.rs b/client/src/udp_client.rs index bf761eed99..ce48c134bf 100644 --- a/client/src/udp_client.rs +++ b/client/src/udp_client.rs @@ -3,7 +3,7 @@ use { crate::tpu_connection::TpuConnection, - solana_sdk::{transaction::Transaction, transport::Result as TransportResult}, + solana_sdk::transport::Result as TransportResult, std::net::{SocketAddr, UdpSocket}, }; @@ -24,19 +24,8 @@ impl TpuConnection for UdpTpuConnection { &self.addr } - fn send_wire_transaction(&self, data: &[u8]) -> TransportResult<()> { - self.socket.send_to(data, self.addr)?; - Ok(()) - } - - fn send_batch(&self, transactions: &[Transaction]) -> TransportResult<()> { - transactions - .iter() - .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) - .try_for_each(|buff| -> TransportResult<()> { - self.socket.send_to(&buff, self.addr)?; - Ok(()) - })?; + fn send_wire_transaction(&self, wire_transaction: &[u8]) -> TransportResult<()> { + self.socket.send_to(wire_transaction, self.addr)?; Ok(()) } }