Merge pull request #114 from blockworks-foundation/custom_tpu_bugfixes

Try connection 10 times, adding timeouts for connect, writeall and fi…
This commit is contained in:
galactus 2023-04-06 16:12:24 +02:00 committed by GitHub
commit b0856507f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 171 additions and 87 deletions

12
Cargo.lock generated
View File

@ -310,6 +310,17 @@ dependencies = [
"event-listener",
]
[[package]]
name = "async-recursion"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba"
dependencies = [
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 2.0.13",
]
[[package]]
name = "async-trait"
version = "0.1.64"
@ -2247,6 +2258,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"async-recursion",
"base64 0.21.0",
"bench",
"bincode",

View File

@ -45,6 +45,7 @@ dotenv = "0.15.0"
async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "0.20.6", default-features = false }
async-recursion = "1.0.4"
[dev-dependencies]
bench = { path = "./bench" }
@ -84,3 +85,4 @@ dotenv = { workspace = true }
async-channel = { workspace = true }
quinn = { workspace = true }
rustls = { workspace = true }
async-recursion = { workspace = true }

View File

@ -8,12 +8,13 @@ use std::{
time::Duration,
};
use async_recursion::async_recursion;
use dashmap::DashMap;
use log::{error, info, trace, warn};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime,
TransportConfig,
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig,
};
use solana_sdk::pubkey::Pubkey;
use tokio::{
@ -24,7 +25,8 @@ use tokio::{
use super::rotating_queue::RotatingQueue;
pub const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: u64 = 5;
const QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC: Duration = Duration::from_secs(10);
const CONNECTION_RETRY_COUNT: usize = 10;
lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
@ -54,13 +56,7 @@ impl ActiveConnection {
async fn make_connection(endpoint: Endpoint, addr: SocketAddr) -> anyhow::Result<Connection> {
let connecting = endpoint.connect(addr, "connect")?;
// let res = timeout(
// Duration::from_secs(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC),
// connecting,
// )
// .await?;
let res = connecting.await;
let res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, connecting).await?;
Ok(res.unwrap())
}
@ -71,23 +67,15 @@ impl ActiveConnection {
let connecting = endpoint.connect(addr, "connect")?;
let connection = match connecting.into_0rtt() {
Ok((connection, zero_rtt)) => {
if let Ok(_) = timeout(
Duration::from_secs(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC),
zero_rtt,
)
.await
{
if let Ok(_) = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, zero_rtt).await {
connection
} else {
return Err(ConnectionError::TimedOut.into());
}
}
Err(connecting) => {
if let Ok(connecting_result) = timeout(
Duration::from_millis(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC),
connecting,
)
.await
if let Ok(connecting_result) =
timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, connecting).await
{
connecting_result?
} else {
@ -98,6 +86,98 @@ impl ActiveConnection {
Ok(connection)
}
async fn connect(
identity: Pubkey,
already_connected: bool,
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
) -> Option<Arc<Connection>> {
for _i in 0..CONNECTION_RETRY_COUNT {
let conn = if already_connected {
info!("making make_connection_0rtt");
Self::make_connection_0rtt(endpoint.clone(), addr.clone()).await
} else {
info!("making make_connection");
Self::make_connection(endpoint.clone(), addr.clone()).await
};
match conn {
Ok(conn) => {
NB_QUIC_CONNECTIONS.inc();
return Some(Arc::new(conn));
}
Err(e) => {
trace!("Could not connect to {} because of error {}", identity, e);
if exit_signal.load(Ordering::Relaxed) {
break;
}
}
}
}
None
}
#[async_recursion]
async fn open_unistream(
connection: &mut Option<Arc<Connection>>,
reconnect: bool,
identity: Pubkey,
already_connected: bool,
endpoint: Endpoint,
addr: SocketAddr,
exit_signal: Arc<AtomicBool>,
) -> Option<SendStream> {
let (unistream, reconnect_and_try_again) = match connection {
Some(conn) => {
let unistream_maybe_timeout =
timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, conn.open_uni()).await;
match unistream_maybe_timeout {
Ok(unistream_res) => match unistream_res {
Ok(unistream) => (Some(unistream), false),
Err(_) => (None, reconnect),
},
Err(_) => {
// timed out
(None, false)
}
}
}
None => (None, true),
};
if reconnect_and_try_again {
let conn = Self::connect(
identity,
already_connected,
endpoint.clone(),
addr.clone(),
exit_signal.clone(),
)
.await;
match conn {
Some(conn) => {
*connection = Some(conn);
Self::open_unistream(
connection,
false,
identity,
already_connected,
endpoint,
addr,
exit_signal,
)
.await
}
None => {
// connection with the peer is not possible
None
}
}
} else {
unistream
}
}
async fn listen(
transaction_reciever: Receiver<Vec<u8>>,
exit_oneshot_channel: tokio::sync::mpsc::Receiver<()>,
@ -108,9 +188,10 @@ impl ActiveConnection {
) {
NB_QUIC_TASKS.inc();
let mut already_connected = false;
let mut connection: Option<Connection> = None;
let mut connection: Option<Arc<Connection>> = None;
let mut transaction_reciever = transaction_reciever;
let mut exit_oneshot_channel = exit_oneshot_channel;
loop {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
@ -118,7 +199,7 @@ impl ActiveConnection {
}
tokio::select! {
tx_or_timeout = timeout(Duration::from_secs(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC), transaction_reciever.recv() ) => {
tx_or_timeout = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, transaction_reciever.recv() ) => {
// exit signal set
if exit_signal.load(Ordering::Relaxed) {
break;
@ -136,76 +217,65 @@ impl ActiveConnection {
continue;
}
};
let mut send_stream = match &connection {
Some(conn) => {
let unistream = conn.open_uni().await;
if let Err(_) = unistream {
// reconnect as connection is closed by the server and then retry
let conn = Self::make_connection_0rtt(endpoint.clone(), addr.clone()).await;
match conn {
Ok(conn) => {
let unistream = conn.open_uni().await;
connection = Some(conn);
match unistream{
Ok(stream) => stream,
Err(e) => {
warn!("error opening a unistream for {} error {}", identity, e);
break;
}
}
},
Err(e) => {
// validator no longer accepting connection
trace!("Could not reconnect to {} because of error {}", identity, e);
break;
let unistream = Self::open_unistream(
&mut connection,
true,
identity.clone(),
already_connected,
endpoint.clone(),
addr.clone(),
exit_signal.clone(),
).await;
if !already_connected && connection.is_some() {
already_connected = true;
}
match unistream {
Some(mut send_stream) => {
trace!("Sending {} transaction", identity);
let write_timeout_res = timeout( QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, send_stream.write_all(tx.as_slice())).await;
match write_timeout_res {
Ok(write_res) => {
if let Err(e) = write_res {
warn!(
"Error while writing transaction for {}, error {}",
identity,
e
);
}
},
Err(_) => {
warn!(
"timeout while writing transaction for {}",
identity
);
}
}
let finish_timeout_res = timeout(QUIC_CONNECTION_TIMEOUT_DURATION_IN_SEC, send_stream.finish()).await;
match finish_timeout_res {
Ok(finish_res) => {
if let Err(e) = finish_res {
warn!(
"Error while writing transaction for {}, error {}",
identity,
e
);
}
},
Err(_) => {
warn!(
"timeout while writing transaction for {}",
identity
);
}
} else {
unistream.unwrap()
}
},
None => {
let conn = if already_connected {
info!("making make_connection_0rtt");
Self::make_connection_0rtt(endpoint.clone(), addr.clone()).await
} else {
info!("making make_connection");
Self::make_connection(endpoint.clone(), addr.clone()).await
};
match conn {
Ok(conn) => {
NB_QUIC_CONNECTIONS.inc();
already_connected = true;
let unistream = conn.open_uni().await;
if let Err(e) = unistream {
warn!("error opening a unistream for {} error {}", identity, e);
continue;
}
connection = Some(conn);
unistream.unwrap()
},
Err(e) => {
warn!("Could not connect to {} because of error {}", identity, e);
break;
}
}
trace!("could not create a unistream for {}", identity);
break;
}
};
trace!("Sending {} transaction", identity);
if let Err(e) = send_stream.write_all(tx.as_slice()).await {
warn!(
"Error while writing transaction for {} error {}",
identity, e
);
}
if let Err(e) = send_stream.finish().await {
warn!(
"Error finishing for {}, error {}",
identity, e,
)
}
},
Err(_) => {