Merge pull request #111 from blockworks-foundation/custom_tpu_client

Reconnect to the quic server
This commit is contained in:
galactus 2023-04-05 10:42:39 +02:00 committed by GitHub
commit 17009dca40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 11 deletions

View File

@ -243,9 +243,10 @@ impl BlockListener {
TXS_CONFIRMED.inc(); TXS_CONFIRMED.inc();
} }
info!( trace!(
"got transaction {} confrimation level {}", "got transaction {} confrimation level {}",
sig, commitment_config.commitment sig,
commitment_config.commitment
); );
tx_status.value_mut().status = Some(TransactionStatus { tx_status.value_mut().status = Some(TransactionStatus {

View File

@ -9,7 +9,7 @@ use std::{
}; };
use dashmap::DashMap; use dashmap::DashMap;
use log::{error, info, trace}; use log::{error, info, trace, warn};
use prometheus::{core::GenericGauge, opts, register_int_gauge}; use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{ use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime,
@ -139,11 +139,29 @@ impl ActiveConnection {
let mut send_stream = match &connection { let mut send_stream = match &connection {
Some(conn) => { Some(conn) => {
let unistream = conn.open_uni().await; let unistream = conn.open_uni().await;
if let Err(e) = unistream { if let Err(_) = unistream {
error!("error opening a unistream for {} error {}", identity, e); // reconnect as connection is closed by the server and then retry
continue; let conn = Self::make_connection_0rtt(endpoint.clone(), addr.clone()).await;
match conn {
Ok(conn) => {
let unistream = conn.open_uni().await;
connection = Some(conn);
match unistream{
Ok(stream) => stream,
Err(e) => {
warn!("error opening a unistream for {} error {}", identity, e);
continue;
}
}
},
Err(e) => {
warn!("Could not reconnect to {} because of error {}", identity, e);
continue;
}
}
} else {
unistream.unwrap()
} }
unistream.unwrap()
}, },
None => { None => {
let conn = if already_connected { let conn = if already_connected {
@ -159,7 +177,7 @@ impl ActiveConnection {
already_connected = true; already_connected = true;
let unistream = conn.open_uni().await; let unistream = conn.open_uni().await;
if let Err(e) = unistream { if let Err(e) = unistream {
error!("error opening a unistream for {} error {}", identity, e); warn!("error opening a unistream for {} error {}", identity, e);
continue; continue;
} }
@ -167,7 +185,7 @@ impl ActiveConnection {
unistream.unwrap() unistream.unwrap()
}, },
Err(e) => { Err(e) => {
error!("Could not connect to {} because of error {}", identity, e); warn!("Could not connect to {} because of error {}", identity, e);
continue; continue;
} }
} }

View File

@ -5,7 +5,7 @@ use std::{
use anyhow::bail; use anyhow::bail;
use dashmap::DashMap; use dashmap::DashMap;
use log::{info, warn}; use log::{info, trace, warn};
use prometheus::{ use prometheus::{
core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter, core::GenericGauge, histogram_opts, opts, register_histogram, register_int_counter,
@ -96,7 +96,7 @@ impl TxSender {
let txs_sent = self.txs_sent_store.clone(); let txs_sent = self.txs_sent_store.clone();
for (sig, _) in &sigs_and_slots { for (sig, _) in &sigs_and_slots {
info!("sending transaction {}", sig); trace!("sending transaction {}", sig);
txs_sent.insert(sig.to_owned(), TxProps::default()); txs_sent.insert(sig.to_owned(), TxProps::default());
} }