Send async quic batch of txs (#24298)
Add an interface send_wire_transaction_batch_async to TpuConnection to allow for sending batches without waiting for completion Co-authored-by: Anatoly Yakovenko <anatoly@solana.com>
This commit is contained in:
parent
5e8c12ebdf
commit
8cfc010b84
|
@ -256,6 +256,25 @@ pub fn send_wire_transaction_async(
|
||||||
r
|
r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_wire_transaction_batch_async(
|
||||||
|
packets: Vec<Vec<u8>>,
|
||||||
|
addr: &SocketAddr,
|
||||||
|
) -> Result<(), TransportError> {
|
||||||
|
let (conn, stats) = get_connection(addr);
|
||||||
|
let client_stats = Arc::new(ClientStats::default());
|
||||||
|
let len = packets.len();
|
||||||
|
let r = match conn {
|
||||||
|
Connection::Udp(conn) => {
|
||||||
|
conn.send_wire_transaction_batch_async(packets, client_stats.clone())
|
||||||
|
}
|
||||||
|
Connection::Quic(conn) => {
|
||||||
|
conn.send_wire_transaction_batch_async(packets, client_stats.clone())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
stats.add_client_stats(&client_stats, len, r.is_ok());
|
||||||
|
r
|
||||||
|
}
|
||||||
|
|
||||||
pub fn send_wire_transaction(
|
pub fn send_wire_transaction(
|
||||||
wire_transaction: &[u8],
|
wire_transaction: &[u8],
|
||||||
addr: &SocketAddr,
|
addr: &SocketAddr,
|
||||||
|
|
|
@ -2,6 +2,9 @@
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate serde_derive;
|
extern crate serde_derive;
|
||||||
|
|
||||||
|
#[macro_use]
|
||||||
|
extern crate solana_metrics;
|
||||||
|
|
||||||
pub mod blockhash_query;
|
pub mod blockhash_query;
|
||||||
pub mod client_error;
|
pub mod client_error;
|
||||||
pub mod connection_cache;
|
pub mod connection_cache;
|
||||||
|
@ -27,9 +30,6 @@ pub mod tpu_connection;
|
||||||
pub mod transaction_executor;
|
pub mod transaction_executor;
|
||||||
pub mod udp_client;
|
pub mod udp_client;
|
||||||
|
|
||||||
#[macro_use]
|
|
||||||
extern crate solana_metrics;
|
|
||||||
|
|
||||||
pub mod mock_sender_for_cli {
|
pub mod mock_sender_for_cli {
|
||||||
/// Magic `SIGNATURE` value used by `solana-cli` unit tests.
|
/// Magic `SIGNATURE` value used by `solana-cli` unit tests.
|
||||||
/// Please don't use this constant.
|
/// Please don't use this constant.
|
||||||
|
|
|
@ -119,16 +119,31 @@ impl TpuConnection for QuicTpuConnection {
|
||||||
stats: Arc<ClientStats>,
|
stats: Arc<ClientStats>,
|
||||||
) -> TransportResult<()> {
|
) -> TransportResult<()> {
|
||||||
let _guard = RUNTIME.enter();
|
let _guard = RUNTIME.enter();
|
||||||
//drop and detach the task
|
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
inc_new_counter_info!("send_wire_transaction_async", 1);
|
//drop and detach the task
|
||||||
let _ = RUNTIME.spawn(async move {
|
let _ = RUNTIME.spawn(async move {
|
||||||
let send_buffer = client.send_buffer(wire_transaction, &stats);
|
let send_buffer = client.send_buffer(wire_transaction, &stats);
|
||||||
if let Err(e) = send_buffer.await {
|
if let Err(e) = send_buffer.await {
|
||||||
inc_new_counter_warn!("send_wire_transaction_async_fail", 1);
|
|
||||||
warn!("Failed to send transaction async to {:?}", e);
|
warn!("Failed to send transaction async to {:?}", e);
|
||||||
} else {
|
datapoint_warn!("send-wire-async", ("failure", 1, i64),);
|
||||||
inc_new_counter_info!("send_wire_transaction_async_pass", 1);
|
}
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_wire_transaction_batch_async(
|
||||||
|
&self,
|
||||||
|
buffers: Vec<Vec<u8>>,
|
||||||
|
stats: Arc<ClientStats>,
|
||||||
|
) -> TransportResult<()> {
|
||||||
|
let _guard = RUNTIME.enter();
|
||||||
|
let client = self.client.clone();
|
||||||
|
//drop and detach the task
|
||||||
|
let _ = RUNTIME.spawn(async move {
|
||||||
|
let send_batch = client.send_batch(&buffers, &stats);
|
||||||
|
if let Err(e) = send_batch.await {
|
||||||
|
warn!("Failed to send transaction batch async to {:?}", e);
|
||||||
|
datapoint_warn!("send-wire-batch-async", ("failure", 1, i64),);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -269,13 +284,16 @@ impl QuicClient {
|
||||||
.iter()
|
.iter()
|
||||||
.chunks(QUIC_MAX_CONCURRENT_STREAMS);
|
.chunks(QUIC_MAX_CONCURRENT_STREAMS);
|
||||||
|
|
||||||
let futures = chunks.into_iter().map(|buffs| {
|
let futures: Vec<_> = chunks
|
||||||
join_all(
|
.into_iter()
|
||||||
buffs
|
.map(|buffs| {
|
||||||
.into_iter()
|
join_all(
|
||||||
.map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)),
|
buffs
|
||||||
)
|
.into_iter()
|
||||||
});
|
.map(|buf| Self::_send_buffer_using_conn(buf.as_ref(), connection_ref)),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
for f in futures {
|
for f in futures {
|
||||||
f.await.into_iter().try_for_each(|res| res)?;
|
f.await.into_iter().try_for_each(|res| res)?;
|
||||||
|
|
|
@ -70,4 +70,10 @@ pub trait TpuConnection {
|
||||||
) -> TransportResult<()>
|
) -> TransportResult<()>
|
||||||
where
|
where
|
||||||
T: AsRef<[u8]>;
|
T: AsRef<[u8]>;
|
||||||
|
|
||||||
|
fn send_wire_transaction_batch_async(
|
||||||
|
&self,
|
||||||
|
buffers: Vec<Vec<u8>>,
|
||||||
|
stats: Arc<ClientStats>,
|
||||||
|
) -> TransportResult<()>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,4 +62,13 @@ impl TpuConnection for UdpTpuConnection {
|
||||||
batch_send(&self.socket, &pkts)?;
|
batch_send(&self.socket, &pkts)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
fn send_wire_transaction_batch_async(
|
||||||
|
&self,
|
||||||
|
buffers: Vec<Vec<u8>>,
|
||||||
|
_stats: Arc<ClientStats>,
|
||||||
|
) -> TransportResult<()> {
|
||||||
|
let pkts: Vec<_> = buffers.into_iter().zip(repeat(self.tpu_addr())).collect();
|
||||||
|
batch_send(&self.socket, &pkts)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue