updating the default values
This commit is contained in:
parent
a80b84762b
commit
33dfcd1895
|
@ -6,6 +6,7 @@ use crate::{
|
|||
workers::{
|
||||
tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres,
|
||||
PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction,
|
||||
MESSAGES_IN_REPLAY_QUEUE,
|
||||
},
|
||||
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
|
||||
};
|
||||
|
@ -278,13 +279,18 @@ impl LiteRpcServer for LiteBridge {
|
|||
let max_replay = max_retries.map_or(self.max_retries, |x| x as usize);
|
||||
let replay_at = Instant::now() + self.tx_replayer.retry_after;
|
||||
// ignore error for replay service
|
||||
let _ = tx_replay_sender.send(TransactionReplay {
|
||||
signature: sig.to_string(),
|
||||
tx: raw_tx_clone,
|
||||
replay_count: 0,
|
||||
max_replay,
|
||||
replay_at,
|
||||
});
|
||||
if tx_replay_sender
|
||||
.send(TransactionReplay {
|
||||
signature: sig.to_string(),
|
||||
tx: raw_tx_clone,
|
||||
replay_count: 0,
|
||||
max_replay,
|
||||
replay_at,
|
||||
})
|
||||
.is_ok()
|
||||
{
|
||||
MESSAGES_IN_REPLAY_QUEUE.inc();
|
||||
}
|
||||
}
|
||||
TXS_IN_CHANNEL.inc();
|
||||
|
||||
|
|
|
@ -22,12 +22,12 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000;
|
|||
|
||||
/// 25 slots in 10s send to little more leaders
|
||||
#[from_env]
|
||||
pub const DEFAULT_FANOUT_SIZE: u64 = 100;
|
||||
pub const DEFAULT_FANOUT_SIZE: u64 = 16;
|
||||
|
||||
#[from_env]
|
||||
pub const MAX_RETRIES: usize = 10;
|
||||
pub const MAX_RETRIES: usize = 40;
|
||||
|
||||
pub const DEFAULT_RETRY_TIMEOUT: u64 = 4;
|
||||
pub const DEFAULT_RETRY_TIMEOUT: u64 = 2;
|
||||
|
||||
#[from_env]
|
||||
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
|
||||
|
|
|
@ -222,8 +222,6 @@ impl ActiveConnection {
|
|||
if conn.stable_id() != current_stable_id {
|
||||
conn.clone()
|
||||
} else {
|
||||
NB_QUIC_CONNECTIONS.dec();
|
||||
|
||||
let new_conn = Self::connect(
|
||||
identity,
|
||||
true,
|
||||
|
@ -233,6 +231,7 @@ impl ActiveConnection {
|
|||
)
|
||||
.await;
|
||||
if let Some(new_conn) = new_conn {
|
||||
NB_QUIC_CONNECTIONS.dec();
|
||||
*conn = new_conn;
|
||||
conn.clone()
|
||||
} else {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use super::TxSender;
|
||||
use log::error;
|
||||
use prometheus::{core::GenericGauge, opts, register_int_gauge};
|
||||
use std::time::Duration;
|
||||
use tokio::{
|
||||
sync::mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
|
@ -7,6 +8,11 @@ use tokio::{
|
|||
time::Instant,
|
||||
};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref MESSAGES_IN_REPLAY_QUEUE: GenericGauge<prometheus::core::AtomicI64> =
|
||||
register_int_gauge!(opts!("literpc_messages_in_replay_queue", "Number of quic connections open")).unwrap();
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TransactionReplay {
|
||||
pub signature: String,
|
||||
|
@ -43,6 +49,7 @@ impl TransactionReplayer {
|
|||
let tx = reciever.recv().await;
|
||||
match tx {
|
||||
Some(mut tx_replay) => {
|
||||
MESSAGES_IN_REPLAY_QUEUE.dec();
|
||||
if Instant::now() < tx_replay.replay_at {
|
||||
tokio::time::sleep_until(tx_replay.replay_at).await;
|
||||
}
|
||||
|
@ -64,6 +71,8 @@ impl TransactionReplayer {
|
|||
if let Err(e) = sender.send(tx_replay) {
|
||||
error!("error while scheduling replay ({})", e);
|
||||
continue;
|
||||
} else {
|
||||
MESSAGES_IN_REPLAY_QUEUE.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue