From 33dfcd189529e781cbc5d14bf224cec52929df2a Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Wed, 19 Apr 2023 18:51:04 +0200 Subject: [PATCH] updating the default values --- src/bridge.rs | 20 ++++++++++++------- src/lib.rs | 6 +++--- .../tpu_utils/tpu_connection_manager.rs | 3 +-- src/workers/transaction_replayer.rs | 9 +++++++++ 4 files changed, 26 insertions(+), 12 deletions(-) diff --git a/src/bridge.rs b/src/bridge.rs index 7803779e..4051dc62 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -6,6 +6,7 @@ use crate::{ workers::{ tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres, PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction, + MESSAGES_IN_REPLAY_QUEUE, }, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, }; @@ -278,13 +279,18 @@ impl LiteRpcServer for LiteBridge { let max_replay = max_retries.map_or(self.max_retries, |x| x as usize); let replay_at = Instant::now() + self.tx_replayer.retry_after; // ignore error for replay service - let _ = tx_replay_sender.send(TransactionReplay { - signature: sig.to_string(), - tx: raw_tx_clone, - replay_count: 0, - max_replay, - replay_at, - }); + if tx_replay_sender + .send(TransactionReplay { + signature: sig.to_string(), + tx: raw_tx_clone, + replay_count: 0, + max_replay, + replay_at, + }) + .is_ok() + { + MESSAGES_IN_REPLAY_QUEUE.inc(); + } } TXS_IN_CHANNEL.inc(); diff --git a/src/lib.rs b/src/lib.rs index b9c6125d..3376fac8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,12 +22,12 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000; /// 25 slots in 10s send to little more leaders #[from_env] -pub const DEFAULT_FANOUT_SIZE: u64 = 100; +pub const DEFAULT_FANOUT_SIZE: u64 = 16; #[from_env] -pub const MAX_RETRIES: usize = 10; +pub const MAX_RETRIES: usize = 40; -pub const DEFAULT_RETRY_TIMEOUT: u64 = 4; +pub const DEFAULT_RETRY_TIMEOUT: u64 = 2; #[from_env] pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index 365f66ad..4c5bda5c 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -222,8 +222,6 @@ impl ActiveConnection { if conn.stable_id() != current_stable_id { conn.clone() } else { - NB_QUIC_CONNECTIONS.dec(); - let new_conn = Self::connect( identity, true, @@ -233,6 +231,7 @@ impl ActiveConnection { ) .await; if let Some(new_conn) = new_conn { + NB_QUIC_CONNECTIONS.dec(); *conn = new_conn; conn.clone() } else { diff --git a/src/workers/transaction_replayer.rs b/src/workers/transaction_replayer.rs index 3c12d084..179aec08 100644 --- a/src/workers/transaction_replayer.rs +++ b/src/workers/transaction_replayer.rs @@ -1,5 +1,6 @@ use super::TxSender; use log::error; +use prometheus::{core::GenericGauge, opts, register_int_gauge}; use std::time::Duration; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender}, @@ -7,6 +8,11 @@ use tokio::{ time::Instant, }; +lazy_static::lazy_static! { + pub static ref MESSAGES_IN_REPLAY_QUEUE: GenericGauge = + register_int_gauge!(opts!("literpc_messages_in_replay_queue", "Number of quic connections open")).unwrap(); +} + #[derive(Debug, Clone)] pub struct TransactionReplay { pub signature: String, @@ -43,6 +49,7 @@ impl TransactionReplayer { let tx = reciever.recv().await; match tx { Some(mut tx_replay) => { + MESSAGES_IN_REPLAY_QUEUE.dec(); if Instant::now() < tx_replay.replay_at { tokio::time::sleep_until(tx_replay.replay_at).await; } @@ -64,6 +71,8 @@ impl TransactionReplayer { if let Err(e) = sender.send(tx_replay) { error!("error while scheduling replay ({})", e); continue; + } else { + MESSAGES_IN_REPLAY_QUEUE.inc(); } } }