implement chunk+parallel sending

This commit is contained in:
GroovieGermanikus 2023-07-25 12:54:47 +02:00
parent 391a42ec22
commit 5a4546c807
2 changed files with 68 additions and 8 deletions

View File

@ -1,4 +1,4 @@
use log::{error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use quinn::{ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, WriteError}; use quinn::{ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, WriteError};
use solana_sdk::pubkey::Pubkey; use solana_sdk::pubkey::Pubkey;
use std::{ use std::{
@ -11,6 +11,8 @@ use std::{
time::Duration, time::Duration,
}; };
use anyhow::bail; use anyhow::bail;
use futures::future::join_all;
use itertools::Itertools;
use tokio::{sync::RwLock, time::timeout}; use tokio::{sync::RwLock, time::timeout};
use tokio::time::error::Elapsed; use tokio::time::error::Elapsed;
use tracing::instrument; use tracing::instrument;
@ -185,9 +187,10 @@ impl QuicConnectionUtils {
} }
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip_all, level = "debug")] #[tracing::instrument(skip_all, level = "debug")]
pub async fn send_transaction_batch( pub async fn send_transaction_batch_serial(
connection: Connection, connection: Connection,
txs: Vec<Vec<u8>>, txs: Vec<Vec<u8>>,
exit_signal: Arc<AtomicBool>, exit_signal: Arc<AtomicBool>,
@ -227,6 +230,52 @@ impl QuicConnectionUtils {
panic!("no retry handling"); // FIXME panic!("no retry handling"); // FIXME
} }
} }
// open streams in parallel
// one stream is used for one transaction
// number of parallel streams that connect to TPU must be limited by caller (should be 8)
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip_all, level = "debug")]
pub async fn send_transaction_batch_parallel(
connection: Connection,
txs: Vec<Vec<u8>>,
exit_signal: Arc<AtomicBool>,
connection_timeout: Duration,
) {
assert_ne!(txs.len(), 0, "no transactions to send");
debug!("Opening {} parallel quic streams", txs.len());
let all_send_fns = (0..txs.len()).map(|i| Self::send_tx_to_new_stream(&txs[i], connection.clone(), connection_timeout)).collect_vec();
join_all(all_send_fns).await;
}
async fn send_tx_to_new_stream(tx: &Vec<u8>, connection: Connection, connection_timeout: Duration) {
let mut send_stream = Self::open_unistream(connection.clone(), connection_timeout)
.await.0
.unwrap();
let write_timeout_res =
timeout(connection_timeout, send_stream.write_all(tx.as_slice())).await;
match write_timeout_res {
Ok(no_timeout) => {
match no_timeout {
Ok(()) => {}
Err(write_error) => {
error!("Error writing transaction to stream: {}", write_error);
}
}
}
Err(elapsed) => {
warn!("timeout sending transactions");
}
}
// TODO wrap in timeout
send_stream.finish().await.unwrap();
}
} }
pub struct SkipServerVerification; pub struct SkipServerVerification;

View File

@ -28,6 +28,10 @@ use crate::tls_config_provicer::{ProxyTlsConfigProvider, SelfSignedTlsConfigProv
const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); const QUIC_CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);
pub const CONNECTION_RETRY_COUNT: usize = 10; pub const CONNECTION_RETRY_COUNT: usize = 10;
pub const MAX_TRANSACTIONS_PER_BATCH: usize = 10;
pub const MAX_BYTES_PER_BATCH: usize = 10;
const MAX_PARALLEL_STREAMS: usize = 6;
/// stable connect to TPU to send transactions - optimized for proxy use case /// stable connect to TPU to send transactions - optimized for proxy use case
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TpuQuicClient { pub struct TpuQuicClient {
@ -93,12 +97,19 @@ impl TpuQuicClient {
txs: &Vec<VersionedTransaction>, txs: &Vec<VersionedTransaction>,
exit_signal: Arc<AtomicBool>, exit_signal: Arc<AtomicBool>,
) { ) {
QuicConnectionUtils::send_transaction_batch(
connection, for chunk in txs.chunks(MAX_PARALLEL_STREAMS) {
serialize_to_vecvec(txs), let vecvec = chunk.iter().map(|tx| {
exit_signal.clone(), let tx_raw = bincode::serialize(tx).unwrap();
QUIC_CONNECTION_TIMEOUT, tx_raw
).await; }).collect_vec();
QuicConnectionUtils::send_transaction_batch_parallel(
connection.clone(),
vecvec,
exit_signal.clone(),
QUIC_CONNECTION_TIMEOUT,
).await;
}
} }