This commit is contained in:
Aniket Prajapati 2023-01-10 12:10:43 +05:30
parent fba16b9941
commit e16e0ed545
No known key found for this signature in database
GPG Key ID: D4346D8C9C5398F2
4 changed files with 60 additions and 53 deletions

View File

@ -5,7 +5,7 @@ use crate::{
workers::{BlockListener, TxSender}, workers::{BlockListener, TxSender},
}; };
use std::{ops::Deref, str::FromStr, sync::Arc}; use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
use anyhow::bail; use anyhow::bail;
@ -32,7 +32,7 @@ use tokio::{net::ToSocketAddrs, task::JoinHandle};
pub struct LiteBridge { pub struct LiteBridge {
pub tpu_client: Arc<TpuClient>, pub tpu_client: Arc<TpuClient>,
pub rpc_url: Url, pub rpc_url: Url,
pub tx_sender: Option<TxSender>, pub tx_sender: TxSender,
pub finalized_block_listenser: BlockListener, pub finalized_block_listenser: BlockListener,
pub confirmed_block_listenser: BlockListener, pub confirmed_block_listenser: BlockListener,
pub txs_sent: Arc<DashMap<String, Option<TransactionStatus>>>, pub txs_sent: Arc<DashMap<String, Option<TransactionStatus>>>,
@ -41,11 +41,7 @@ pub struct LiteBridge {
} }
impl LiteBridge { impl LiteBridge {
pub async fn new( pub async fn new(rpc_url: reqwest::Url, ws_addr: &str) -> anyhow::Result<Self> {
rpc_url: reqwest::Url,
ws_addr: &str,
batch_transactions: bool,
) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string())); let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string()));
let tpu_client = let tpu_client =
@ -70,12 +66,10 @@ impl LiteBridge {
) )
.await?; .await?;
let tx_sender = TxSender::new(tpu_client.clone());
Ok(Self { Ok(Self {
tx_sender: if batch_transactions { tx_sender,
Some(TxSender::new(tpu_client.clone()))
} else {
None
},
finalized_block_listenser, finalized_block_listenser,
confirmed_block_listenser, confirmed_block_listenser,
rpc_url, rpc_url,
@ -143,13 +137,18 @@ impl LiteBridge {
self, self,
http_addr: T, http_addr: T,
ws_addr: T, ws_addr: T,
tx_batch_size: usize,
tx_send_interval: Duration,
) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> { ) -> anyhow::Result<Vec<JoinHandle<anyhow::Result<()>>>> {
let tx_sender = self.tx_sender.clone();
let finalized_block_listenser = self.finalized_block_listenser.clone().listen(); let finalized_block_listenser = self.finalized_block_listenser.clone().listen();
let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen(); let confirmed_block_listenser = self.confirmed_block_listenser.clone().listen();
let tx_sender = self
.tx_sender
.clone()
.execute(tx_batch_size, tx_send_interval);
let ws_server_handle = ServerBuilder::default() let ws_server_handle = ServerBuilder::default()
.ws_only() .ws_only()
.build(ws_addr.clone()) .build(ws_addr.clone())
@ -174,21 +173,18 @@ impl LiteBridge {
bail!("HTTP server stopped"); bail!("HTTP server stopped");
}); });
let mut services = vec![ #[cfg(feature = "metrics")]
let capture_metrics = self.capture_metrics();
Ok(vec![
ws_server, ws_server,
http_server, http_server,
tx_sender,
finalized_block_listenser, finalized_block_listenser,
confirmed_block_listenser, confirmed_block_listenser,
]; #[cfg(feature = "metrics")]
capture_metrics,
if let Some(tx_sender) = tx_sender { ])
services.push(tx_sender.execute());
}
#[cfg(feature = "metrics")]
services.push(self.capture_metrics());
Ok(services)
} }
} }
@ -221,11 +217,7 @@ impl LiteRpcServer for LiteBridge {
self.txs_sent.insert(sig.to_string(), None); self.txs_sent.insert(sig.to_string(), None);
if let Some(tx_sender) = &self.tx_sender { self.tx_sender.enqnueue_tx(raw_tx);
tx_sender.enqnueue_tx(raw_tx);
} else {
self.tpu_client.send_wire_transaction(raw_tx.clone()).await;
}
Ok(BinaryEncoding::Base58.encode(sig)) Ok(BinaryEncoding::Base58.encode(sig))
} }

View File

@ -8,10 +8,14 @@ pub struct Args {
pub rpc_addr: String, pub rpc_addr: String,
#[arg(short, long, default_value_t = String::from(DEFAULT_WS_ADDR))] #[arg(short, long, default_value_t = String::from(DEFAULT_WS_ADDR))]
pub ws_addr: String, pub ws_addr: String,
#[arg(short='l',long, default_value_t = String::from("127.0.0.1:8890"))] #[arg(short = 'l', long, default_value_t = String::from("127.0.0.1:8890"))]
pub lite_rpc_http_addr: String, pub lite_rpc_http_addr: String,
#[arg(short='s', long, default_value_t = String::from("127.0.0.1:8891"))] #[arg(short = 's', long, default_value_t = String::from("127.0.0.1:8891"))]
pub lite_rpc_ws_addr: String, pub lite_rpc_ws_addr: String,
#[arg(short, long, default_value_t = false)] /// batch size of each batch forward
pub batch_transactions: bool, #[arg(short = 'b', long, default_value_t = 64usize)]
pub tx_batch_size: usize,
/// interval between each batch forward
#[arg(short = 'i', long, default_value_t = 2u64)]
pub tx_batch_interval_ms: u64,
} }

View File

@ -1,4 +1,5 @@
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration;
use anyhow::Context; use anyhow::Context;
use clap::Parser; use clap::Parser;
@ -18,21 +19,25 @@ pub async fn main() -> anyhow::Result<()> {
let Args { let Args {
rpc_addr, rpc_addr,
ws_addr, ws_addr,
batch_transactions, tx_batch_size,
lite_rpc_ws_addr, lite_rpc_ws_addr,
lite_rpc_http_addr, lite_rpc_http_addr,
tx_batch_interval_ms,
} = Args::parse(); } = Args::parse();
let light_bridge = LiteBridge::new( let tx_batch_interval_ms = Duration::from_millis(tx_batch_interval_ms);
Url::from_str(&rpc_addr).unwrap(),
&ws_addr, let light_bridge = LiteBridge::new(Url::from_str(&rpc_addr).unwrap(), &ws_addr).await?;
batch_transactions,
)
.await?;
let services = light_bridge let services = light_bridge
.start_services(lite_rpc_http_addr, lite_rpc_ws_addr) .start_services(
lite_rpc_http_addr,
lite_rpc_ws_addr,
tx_batch_size,
tx_batch_interval_ms,
)
.await?; .await?;
let services = futures::future::try_join_all(services); let services = futures::future::try_join_all(services);
let ctrl_c_signal = tokio::signal::ctrl_c(); let ctrl_c_signal = tokio::signal::ctrl_c();

View File

@ -1,5 +1,7 @@
use std::sync::{Arc, RwLock}; use std::{
use std::time::Duration; sync::{Arc, RwLock},
time::Duration,
};
use log::{info, warn}; use log::{info, warn};
@ -7,7 +9,7 @@ use solana_client::nonblocking::tpu_client::TpuClient;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::{WireTransaction, DEFAULT_TX_RETRY_BATCH_SIZE}; use crate::WireTransaction;
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions /// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
#[derive(Clone)] #[derive(Clone)]
@ -31,7 +33,7 @@ impl TxSender {
} }
/// retry enqued_tx(s) /// retry enqued_tx(s)
pub async fn retry_txs(&self) { pub async fn retry_txs(&self, tx_batch_size: usize) {
let mut enqueued_txs = Vec::new(); let mut enqueued_txs = Vec::new();
std::mem::swap(&mut enqueued_txs, &mut self.enqueued_txs.write().unwrap()); std::mem::swap(&mut enqueued_txs, &mut self.enqueued_txs.write().unwrap());
@ -46,13 +48,13 @@ impl TxSender {
return; return;
} }
let mut tx_batch = Vec::with_capacity(len / DEFAULT_TX_RETRY_BATCH_SIZE); let mut tx_batch = Vec::with_capacity(len / tx_batch_size);
let mut batch_index = 0; let mut batch_index = 0;
for (index, tx) in self.enqueued_txs.read().unwrap().iter().enumerate() { for (index, tx) in self.enqueued_txs.read().unwrap().iter().enumerate() {
if index % DEFAULT_TX_RETRY_BATCH_SIZE == 0 { if index % tx_batch_size == 0 {
tx_batch.push(Vec::with_capacity(DEFAULT_TX_RETRY_BATCH_SIZE)); tx_batch.push(Vec::with_capacity(tx_batch_size));
batch_index += 1; batch_index += 1;
} }
@ -70,15 +72,19 @@ impl TxSender {
} }
} }
/// retry and confirm transactions every 800ms (avg time to confirm tx) /// retry and confirm transactions every 2ms (avg time to confirm tx)
pub fn execute(self) -> JoinHandle<anyhow::Result<()>> { pub fn execute(
let mut interval = tokio::time::interval(Duration::from_millis(80)); self,
tx_batch_size: usize,
tx_send_interval: Duration,
) -> JoinHandle<anyhow::Result<()>> {
let mut interval = tokio::time::interval(tx_send_interval);
#[allow(unreachable_code)] #[allow(unreachable_code)]
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
interval.tick().await; interval.tick().await;
self.retry_txs().await; self.retry_txs(tx_batch_size).await;
} }
// to give the correct type to JoinHandle // to give the correct type to JoinHandle