Using same connection and reconnecting when necessary

This commit is contained in:
Godmode Galactus 2023-04-14 11:50:52 +02:00
parent 46c123121c
commit ddd45dd478
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
2 changed files with 101 additions and 69 deletions

View File

@ -12,7 +12,7 @@ use bench::{
use clap::Parser;
use log::info;
use solana_rpc_client::{nonblocking::rpc_client::RpcClient, rpc_client::SerializableTransaction};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature};
use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature, signer::Signer};
#[tokio::main]
async fn main() {
@ -60,6 +60,8 @@ async fn main() {
async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize) -> Metric {
let funded_payer = BenchHelper::get_payer().await.unwrap();
println!("payer {}", funded_payer.pubkey());
let blockhash = rpc_client.get_latest_blockhash().await.unwrap();
let txs = BenchHelper::generate_txs(tx_count, &funded_payer, blockhash, None);
@ -106,7 +108,7 @@ async fn bench(rpc_client: Arc<RpcClient>, tx_count: usize) -> Metric {
metrics.txs_confirmed += 1;
to_remove_txs.push(sig);
} else if time_elapsed_since_last_confirmed.unwrap().elapsed()
> Duration::from_secs(3)
> Duration::from_secs(30)
{
metrics.txs_un_confirmed += 1;
to_remove_txs.push(sig);

View File

@ -9,7 +9,7 @@ use std::{
};
use dashmap::DashMap;
use log::{error, info, trace, warn};
use log::{error, trace, warn};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
@ -21,14 +21,14 @@ use solana_sdk::{
};
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use tokio::{
sync::{broadcast::Receiver, broadcast::Sender},
sync::{broadcast::Receiver, broadcast::Sender, RwLock},
time::timeout,
};
use super::{rotating_queue::RotatingQueue, tpu_service::IdentityStakes};
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(5);
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10);
const CONNECTION_RETRY_COUNT: usize = 10;
lazy_static::lazy_static! {
@ -93,19 +93,16 @@ impl ActiveConnection {
async fn connect(
identity: Pubkey,
already_connected: Arc<AtomicBool>,
already_connected: bool,
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
) -> Option<Connection> {
for _i in 0..CONNECTION_RETRY_COUNT {
let conn = if already_connected.load(Ordering::Relaxed) {
info!("making make_connection_0rtt");
let conn = if already_connected {
Self::make_connection_0rtt(endpoint.clone(), addr).await
} else {
info!("making make_connection");
let conn = Self::make_connection(endpoint.clone(), addr).await;
already_connected.store(true, Ordering::Relaxed);
conn
};
match conn {
@ -124,26 +121,27 @@ impl ActiveConnection {
None
}
async fn write_all(mut send_stream: SendStream, txs: Vec<Vec<u8>>, identity: Pubkey) {
for tx in txs {
let write_timeout_res = timeout(
QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC,
send_stream.write_all(tx.as_slice()),
)
.await;
match write_timeout_res {
Ok(write_res) => {
if let Err(e) = write_res {
warn!(
"Error while writing transaction for {}, error {}",
identity, e
);
}
}
Err(_) => {
warn!("timeout while writing transaction for {}", identity);
async fn write_all(mut send_stream: SendStream, tx: &Vec<u8>, identity: Pubkey, last_stable_id : Arc<AtomicU64>, connection_stable_id: u64) -> bool {
let write_timeout_res = timeout(
QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC,
send_stream.write_all(tx.as_slice()),
)
.await;
match write_timeout_res {
Ok(write_res) => {
if let Err(e) = write_res {
trace!(
"Error while writing transaction for {}, error {}",
identity, e
);
// retry
last_stable_id.store(connection_stable_id, Ordering::Relaxed);
return true;
}
}
Err(_) => {
warn!("timeout while writing transaction for {}", identity);
}
}
let finish_timeout_res = timeout(
@ -164,55 +162,72 @@ impl ActiveConnection {
warn!("timeout while writing transaction for {}", identity);
}
}
false
}
async fn open_unistream(
identity: Pubkey,
already_connected: Arc<AtomicBool>,
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
) -> Option<(Connection, SendStream)> {
let connection =
Self::connect(identity, already_connected, endpoint, addr, exit_signal).await;
if let Some(connection) = connection {
match timeout(
QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC,
connection.open_uni(),
)
.await
{
Ok(Ok(unistream)) => return Some((connection, unistream)),
Ok(Err(_)) => None,
Err(_) => return None,
}
} else {
None
async fn open_unistream(connection: Connection, last_stable_id : Arc<AtomicU64>) -> (Option<SendStream>, bool) {
match timeout(
QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC,
connection.open_uni(),
)
.await
{
Ok(Ok(unistream)) => return (Some(unistream), false),
Ok(Err(_)) => {
// reset connection for next retry
last_stable_id.store(connection.stable_id() as u64, Ordering::Relaxed);
(None, true)
},
Err(_) => return (None, false),
}
}
async fn send_transaction_batch(
connection: Arc<RwLock<Connection>>,
txs: Vec<Vec<u8>>,
identity: Pubkey,
already_connected: Arc<AtomicBool>,
identity: Pubkey,
endpoint: Endpoint,
socket_addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
last_stable_id : Arc<AtomicU64>
) {
let stream = Self::open_unistream(
identity,
already_connected,
endpoint,
socket_addr,
exit_signal.clone(),
)
.await;
if let Some((_connection, send_stream)) = stream {
if exit_signal.load(Ordering::Relaxed) {
return;
}
for _ in 0..3 {
let conn = {
let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize;
let conn = connection.read().await;
if conn.stable_id() == last_stable_id {
// problematic connection
drop(conn);
let mut conn = connection.write().await;
let new_conn = Self::connect(identity, true, endpoint.clone(), socket_addr.clone(), exit_signal.clone()).await;
if let Some(new_conn) = new_conn {
*conn = new_conn;
conn.clone()
} else {
// could not connect
return;
}
} else {
conn.clone()
}
};
let mut retry = false;
for tx in &txs {
let (stream, retry_conn) = Self::open_unistream(conn.clone(), last_stable_id.clone()).await;
if let Some(send_stream) = stream {
if exit_signal.load(Ordering::Relaxed) {
return;
}
Self::write_all(send_stream, txs, identity).await;
retry = Self::write_all(send_stream, tx, identity, last_stable_id.clone(), conn.stable_id() as u64).await;
} else {
retry = retry_conn;
}
}
if !retry {
break;
}
}
}
@ -248,7 +263,6 @@ impl ActiveConnection {
identity_stakes: IdentityStakes,
) {
NB_QUIC_ACTIVE_CONNECTIONS.inc();
let already_connected: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
let mut transaction_reciever = transaction_reciever;
let mut exit_oneshot_channel = exit_oneshot_channel;
@ -269,6 +283,8 @@ impl ActiveConnection {
};
let task_counter: Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
let mut connection : Option<Arc<RwLock<Connection>>> = None;
let last_stable_id : Arc<AtomicU64> = Arc::new(AtomicU64::new(0));
loop {
// exit signal set
@ -307,21 +323,35 @@ impl ActiveConnection {
txs.push(tx);
}
}
if connection.is_none() {
// initial connection
let conn = Self::connect(identity, false, endpoint.clone(), addr.clone(), exit_signal.clone()).await;
if let Some(conn) = conn {
// could connect
connection = Some(Arc::new(RwLock::new(conn)));
} else {
break;
}
}
let task_counter = task_counter.clone();
let endpoint = endpoint.clone();
let exit_signal = exit_signal.clone();
let addr = addr.clone();
let already_connected = already_connected.clone();
let connection = connection.clone();
let last_stable_id = last_stable_id.clone();
tokio::spawn(async move {
task_counter.fetch_add(1, Ordering::Relaxed);
NB_QUIC_TASKS.inc();
Self::send_transaction_batch(txs, identity, already_connected, endpoint, addr, exit_signal).await;
let connection = connection.unwrap();
Self::send_transaction_batch(connection, txs, identity, endpoint, addr, exit_signal, last_stable_id).await;
NB_QUIC_TASKS.dec();
task_counter.fetch_sub(1, Ordering::Relaxed);
});
tokio::time::sleep(tokio::time::Duration::from_micros(100)).await;
},
Err(_) => {
// timed out