diff --git a/src/bridge.rs b/src/bridge.rs index ed89ec04..7803779e 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -6,7 +6,6 @@ use crate::{ workers::{ tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres, PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction, - RETRY_AFTER, }, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, }; @@ -67,6 +66,7 @@ pub struct LiteBridge { pub tx_replayer: TransactionReplayer, pub tx_replay_sender: Option>, + pub max_retries: usize, } impl LiteBridge { @@ -75,6 +75,8 @@ impl LiteBridge { ws_addr: String, fanout_slots: u64, identity: Keypair, + retry_after: Duration, + max_retries: usize, ) -> anyhow::Result { let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); let current_slot = rpc_client.get_slot().await?; @@ -96,7 +98,7 @@ impl LiteBridge { let block_listner = BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store.clone()); - let tx_replayer = TransactionReplayer::new(tx_sender.clone()); + let tx_replayer = TransactionReplayer::new(tx_sender.clone(), retry_after); Ok(Self { rpc_client, tpu_service, @@ -106,6 +108,7 @@ impl LiteBridge { block_store, tx_replayer, tx_replay_sender: None, + max_retries, }) } @@ -272,8 +275,8 @@ impl LiteRpcServer for LiteBridge { } if let Some(tx_replay_sender) = &self.tx_replay_sender { - let max_replay = max_retries.map_or(10, |x| x) as usize; - let replay_at = Instant::now() + RETRY_AFTER; + let max_replay = max_retries.map_or(self.max_retries, |x| x as usize); + let replay_at = Instant::now() + self.tx_replayer.retry_after; // ignore error for replay service let _ = tx_replay_sender.send(TransactionReplay { signature: sig.to_string(), diff --git a/src/cli.rs b/src/cli.rs index c33a0eb0..cc93aba5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,4 +1,7 @@ -use crate::{DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR}; +use crate::{ + DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RETRY_TIMEOUT, DEFAULT_RPC_ADDR, + DEFAULT_WS_ADDR, MAX_RETRIES, +}; use clap::Parser; #[derive(Parser, Debug)] @@ -26,4 +29,8 @@ pub struct Args { pub prometheus_addr: String, #[arg(short = 'k', long, default_value_t = String::new())] pub identity_keypair: String, + #[arg(long, default_value_t = MAX_RETRIES)] + pub maximum_retries_per_tx: usize, + #[arg(long, default_value_t = DEFAULT_RETRY_TIMEOUT)] + pub transaction_retry_after_secs: u64, } diff --git a/src/lib.rs b/src/lib.rs index fa93e038..b9c6125d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,11 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000; #[from_env] pub const DEFAULT_FANOUT_SIZE: u64 = 100; +#[from_env] +pub const MAX_RETRIES: usize = 10; + +pub const DEFAULT_RETRY_TIMEOUT: u64 = 4; + #[from_env] pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus = diff --git a/src/main.rs b/src/main.rs index 73ba3683..7c375172 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,6 +45,8 @@ pub async fn main() -> anyhow::Result<()> { enable_postgres, prometheus_addr, identity_keypair, + maximum_retries_per_tx, + transaction_retry_after_secs, } = Args::parse(); dotenv().ok(); @@ -53,7 +55,16 @@ pub async fn main() -> anyhow::Result<()> { let clean_interval_ms = Duration::from_millis(clean_interval_ms); - let light_bridge = LiteBridge::new(rpc_addr, ws_addr, fanout_size, identity).await?; + let retry_after = Duration::from_secs(transaction_retry_after_secs); + let light_bridge = LiteBridge::new( + rpc_addr, + ws_addr, + fanout_size, + identity, + retry_after, + maximum_retries_per_tx, + ) + .await?; let services = light_bridge .start_services( diff --git a/src/workers/transaction_replayer.rs b/src/workers/transaction_replayer.rs index aecfb6bf..3c12d084 100644 --- a/src/workers/transaction_replayer.rs +++ b/src/workers/transaction_replayer.rs @@ -7,8 +7,6 @@ use tokio::{ time::Instant, }; -pub const RETRY_AFTER: Duration = Duration::from_secs(4); - #[derive(Debug, Clone)] pub struct TransactionReplay { pub signature: String, @@ -21,11 +19,15 @@ pub struct TransactionReplay { #[derive(Clone)] pub struct TransactionReplayer { pub tx_sender: TxSender, + pub retry_after: Duration, } impl TransactionReplayer { - pub fn new(tx_sender: TxSender) -> Self { - Self { tx_sender } + pub fn new(tx_sender: TxSender, retry_after: Duration) -> Self { + Self { + tx_sender, + retry_after, + } } pub fn start_service( @@ -34,6 +36,7 @@ impl TransactionReplayer { reciever: UnboundedReceiver, ) -> JoinHandle> { let tx_sender = self.tx_sender.clone(); + let retry_after = self.retry_after; tokio::spawn(async move { let mut reciever = reciever; loop { @@ -57,7 +60,7 @@ impl TransactionReplayer { if tx_replay.replay_count < tx_replay.max_replay { tx_replay.replay_count += 1; - tx_replay.replay_at = Instant::now() + RETRY_AFTER; + tx_replay.replay_at = Instant::now() + retry_after; if let Err(e) = sender.send(tx_replay) { error!("error while scheduling replay ({})", e); continue;