diff --git a/src/main.rs b/src/main.rs index 9448e82..d49d6ff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -109,6 +109,8 @@ pub async fn main() -> anyhow::Result<()> { ) .await; + tpu_manager.force_reset_after_every(Duration::from_secs(300)); + info!( "accounts:{:?} markets:{:?} quotes_per_second:{:?} expected_tps:{:?} duration:{:?}", account_keys_parsed.len(), diff --git a/src/tpu_manager.rs b/src/tpu_manager.rs index dddc5a3..8ced98a 100644 --- a/src/tpu_manager.rs +++ b/src/tpu_manager.rs @@ -1,10 +1,11 @@ use bincode::serialize; -use log::{info, warn}; +use log::{info, warn, error}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_client::{connection_cache::ConnectionCache, nonblocking::tpu_client::TpuClient}; use solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool}; use solana_sdk::signature::Keypair; use solana_sdk::transaction::Transaction; +use std::time::Duration; use std::{ net::{IpAddr, Ipv4Addr}, sync::{ @@ -120,6 +121,16 @@ impl TpuManager { Ok(()) } + pub fn force_reset_after_every(&self, duration: Duration) { + let this = self.clone(); + tokio::spawn(async move { + tokio::time::sleep(duration).await; + if let Err(e) = this.reset_tpu_client().await { + error!("timely restart of tpu client failed {}", e); + } + }); + } + async fn get_tpu_client(&self) -> Arc { self.tpu_client.read().await.clone() } @@ -140,6 +151,9 @@ impl TpuManager { "sending error on channel : {}", sent.err().unwrap().to_string() ); + if let Err(e) = self.reset().await { + error!("error while reseting tpu client {}", e); + } } tpu_client.send_transaction(transaction).await