diff --git a/src/bridge.rs b/src/bridge.rs index dae880ae..ebb8e3bf 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -5,7 +5,7 @@ use crate::{ workers::{BlockListener, TxSender}, }; -use std::{ops::Deref, str::FromStr, sync::Arc}; +use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration}; use anyhow::bail; @@ -32,7 +32,7 @@ use tokio::{net::ToSocketAddrs, task::JoinHandle}; pub struct LiteBridge { pub tpu_client: Arc, pub rpc_url: Url, - pub tx_sender: Option, + pub tx_sender: TxSender, pub finalized_block_listenser: BlockListener, pub confirmed_block_listenser: BlockListener, pub txs_sent: Arc>>, @@ -41,11 +41,7 @@ pub struct LiteBridge { } impl LiteBridge { - pub async fn new( - rpc_url: reqwest::Url, - ws_addr: &str, - batch_transactions: bool, - ) -> anyhow::Result { + pub async fn new(rpc_url: reqwest::Url, ws_addr: &str) -> anyhow::Result { let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string())); let tpu_client = @@ -70,12 +66,10 @@ impl LiteBridge { ) .await?; + let tx_sender = TxSender::new(tpu_client.clone()); + Ok(Self { - tx_sender: if batch_transactions { - Some(TxSender::new(tpu_client.clone())) - } else { - None - }, + tx_sender, finalized_block_listenser, confirmed_block_listenser, rpc_url, @@ -143,13 +137,18 @@ impl LiteBridge { self, http_addr: T, ws_addr: T, + tx_batch_size: usize, + tx_send_interval: Duration, ) -> anyhow::Result>>> { - let tx_sender = self.tx_sender.clone(); - let finalized_block_listenser = self.finalized_block_listenser.clone().listen(); let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen(); + let tx_sender = self + .tx_sender + .clone() + .execute(tx_batch_size, tx_send_interval); + let ws_server_handle = ServerBuilder::default() .ws_only() .build(ws_addr.clone()) @@ -174,21 +173,18 @@ impl LiteBridge { bail!("HTTP server stopped"); }); - let mut services = vec![ + #[cfg(feature = "metrics")] + let capture_metrics = self.capture_metrics(); + + Ok(vec![ ws_server, http_server, + tx_sender, finalized_block_listenser, confirmed_block_listenser, - ]; - - if let Some(tx_sender) = tx_sender { - services.push(tx_sender.execute()); - } - - #[cfg(feature = "metrics")] - services.push(self.capture_metrics()); - - Ok(services) + #[cfg(feature = "metrics")] + capture_metrics, + ]) } } @@ -221,11 +217,7 @@ impl LiteRpcServer for LiteBridge { self.txs_sent.insert(sig.to_string(), None); - if let Some(tx_sender) = &self.tx_sender { - tx_sender.enqnueue_tx(raw_tx); - } else { - self.tpu_client.send_wire_transaction(raw_tx.clone()).await; - } + self.tx_sender.enqnueue_tx(raw_tx); Ok(BinaryEncoding::Base58.encode(sig)) } diff --git a/src/cli.rs b/src/cli.rs index cad44bba..7d8ee0db 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -8,10 +8,14 @@ pub struct Args { pub rpc_addr: String, #[arg(short, long, default_value_t = String::from(DEFAULT_WS_ADDR))] pub ws_addr: String, - #[arg(short='l',long, default_value_t = String::from("127.0.0.1:8890"))] + #[arg(short = 'l', long, default_value_t = String::from("127.0.0.1:8890"))] pub lite_rpc_http_addr: String, - #[arg(short='s', long, default_value_t = String::from("127.0.0.1:8891"))] + #[arg(short = 's', long, default_value_t = String::from("127.0.0.1:8891"))] pub lite_rpc_ws_addr: String, - #[arg(short, long, default_value_t = false)] - pub batch_transactions: bool, + /// batch size of each batch forward + #[arg(short = 'b', long, default_value_t = 64usize)] + pub tx_batch_size: usize, + /// interval between each batch forward + #[arg(short = 'i', long, default_value_t = 2u64)] + pub tx_batch_interval_ms: u64, } diff --git a/src/main.rs b/src/main.rs index 1ac487c3..a1e5a81d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use std::str::FromStr; +use std::time::Duration; use anyhow::Context; use clap::Parser; @@ -18,21 +19,25 @@ pub async fn main() -> anyhow::Result<()> { let Args { rpc_addr, ws_addr, - batch_transactions, + tx_batch_size, lite_rpc_ws_addr, lite_rpc_http_addr, + tx_batch_interval_ms, } = Args::parse(); - let light_bridge = LiteBridge::new( - Url::from_str(&rpc_addr).unwrap(), - &ws_addr, - batch_transactions, - ) - .await?; + let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms); + + let light_bridge = LiteBridge::new(Url::from_str(&rpc_addr).unwrap(), &ws_addr).await?; let services = light_bridge - .start_services(lite_rpc_http_addr, lite_rpc_ws_addr) + .start_services( + lite_rpc_http_addr, + lite_rpc_ws_addr, + tx_batch_size, + tx_batch_interval_ms, + ) .await?; + let services = futures::future::try_join_all(services); let ctrl_c_signal = tokio::signal::ctrl_c(); diff --git a/src/workers/tx_sender.rs b/src/workers/tx_sender.rs index 93393b9d..67891f31 100644 --- a/src/workers/tx_sender.rs +++ b/src/workers/tx_sender.rs @@ -1,5 +1,7 @@ -use std::sync::{Arc, RwLock}; -use std::time::Duration; +use std::{ + sync::{Arc, RwLock}, + time::Duration, +}; use log::{info, warn}; @@ -7,7 +9,7 @@ use solana_client::nonblocking::tpu_client::TpuClient; use tokio::task::JoinHandle; -use crate::{WireTransaction, DEFAULT_TX_RETRY_BATCH_SIZE}; +use crate::WireTransaction; /// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions #[derive(Clone)] @@ -31,7 +33,7 @@ impl TxSender { } /// retry enqued_tx(s) - pub async fn retry_txs(&self) { + pub async fn retry_txs(&self, tx_batch_size: usize) { let mut enqueued_txs = Vec::new(); std::mem::swap(&mut enqueued_txs, &mut self.enqueued_txs.write().unwrap()); @@ -46,13 +48,13 @@ impl TxSender { return; } - let mut tx_batch = Vec::with_capacity(len / DEFAULT_TX_RETRY_BATCH_SIZE); + let mut tx_batch = Vec::with_capacity(len / tx_batch_size); let mut batch_index = 0; for (index, tx) in self.enqueued_txs.read().unwrap().iter().enumerate() { - if index % DEFAULT_TX_RETRY_BATCH_SIZE == 0 { - tx_batch.push(Vec::with_capacity(DEFAULT_TX_RETRY_BATCH_SIZE)); + if index % tx_batch_size == 0 { + tx_batch.push(Vec::with_capacity(tx_batch_size)); batch_index += 1; } @@ -70,15 +72,19 @@ impl TxSender { } } - /// retry and confirm transactions every 800ms (avg time to confirm tx) - pub fn execute(self) -> JoinHandle> { - let mut interval = tokio::time::interval(Duration::from_millis(80)); + /// retry and confirm transactions every 2ms (avg time to confirm tx) + pub fn execute( + self, + tx_batch_size: usize, + tx_send_interval: Duration, + ) -> JoinHandle> { + let mut interval = tokio::time::interval(tx_send_interval); #[allow(unreachable_code)] tokio::spawn(async move { loop { interval.tick().await; - self.retry_txs().await; + self.retry_txs(tx_batch_size).await; } // to give the correct type to JoinHandle