batch_transactions

This commit is contained in:
Aniket Prajapati 2023-01-02 20:42:26 +05:30
parent 6172a4f4c3
commit 26c104a186
5 changed files with 55 additions and 74 deletions

View File

@ -6,7 +6,6 @@ use crate::{
SendTransactionParams,
},
workers::{BlockListener, TxSender},
DEFAULT_TX_MAX_RETRIES,
};
use std::{net::ToSocketAddrs, ops::Deref, sync::Arc};
@ -26,12 +25,16 @@ use tokio::task::JoinHandle;
pub struct LiteBridge {
pub tpu_client: Arc<TpuClient>,
pub rpc_url: Url,
pub tx_sender: TxSender,
pub tx_sender: Option<TxSender>,
pub block_listner: BlockListener,
}
impl LiteBridge {
pub async fn new(rpc_url: reqwest::Url, ws_addr: &str) -> anyhow::Result<Self> {
pub async fn new(
rpc_url: reqwest::Url,
ws_addr: &str,
batch_transactions: bool,
) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.to_string()));
let tpu_client =
@ -40,7 +43,11 @@ impl LiteBridge {
let block_listner = BlockListener::new(rpc_client.clone(), ws_addr).await?;
Ok(Self {
tx_sender: TxSender::new(tpu_client.clone(), block_listner.clone()),
tx_sender: if batch_transactions {
Some(TxSender::new(tpu_client.clone()))
} else {
None
},
block_listner,
rpc_url,
tpu_client,
@ -53,7 +60,7 @@ impl LiteBridge {
tx,
SendTransactionConfig {
encoding,
max_retries,
max_retries: _,
},
): SendTransactionParams,
) -> Result<String, JsonRpcError> {
@ -61,11 +68,11 @@ impl LiteBridge {
let sig = bincode::deserialize::<VersionedTransaction>(&raw_tx)?.signatures[0];
self.tpu_client.send_wire_transaction(raw_tx.clone()).await;
self.tx_sender
.enqnueue_tx(sig, raw_tx, max_retries.unwrap_or(DEFAULT_TX_MAX_RETRIES))
.await;
if let Some(tx_sender) = &self.tx_sender {
tx_sender.enqnueue_tx(raw_tx);
} else {
self.tpu_client.send_wire_transaction(raw_tx.clone()).await;
}
Ok(BinaryEncoding::Base58.encode(sig))
}
@ -117,7 +124,8 @@ impl LiteBridge {
addr: impl ToSocketAddrs + Send + 'static,
) -> Vec<JoinHandle<anyhow::Result<()>>> {
let this = Arc::new(self);
let tx_sender = this.tx_sender.clone().execute();
let tx_sender = this.tx_sender.clone();
let finalized_block_listenser = this
.block_listner
.clone()
@ -149,12 +157,13 @@ impl LiteBridge {
Ok(())
});
vec![
server,
finalized_block_listenser,
confirmed_block_listenser,
tx_sender,
]
let mut services = vec![server, finalized_block_listenser, confirmed_block_listenser];
if let Some(tx_sender) = tx_sender {
services.push(tx_sender.execute());
}
services
}
async fn rpc_route(body: bytes::Bytes, state: web::Data<Arc<LiteBridge>>) -> JsonRpcRes {

View File

@ -10,4 +10,6 @@ pub struct Args {
pub ws_addr: String,
#[arg(short, long, default_value_t = String::from(DEFAULT_LITE_RPC_ADDR))]
pub lite_rpc_addr: String,
#[arg(short, long, default_value_t = false)]
pub batch_transactions: bool,
}

View File

@ -19,9 +19,15 @@ pub async fn main() -> anyhow::Result<()> {
rpc_addr,
ws_addr,
lite_rpc_addr,
batch_transactions,
} = Args::parse();
let light_bridge = LiteBridge::new(Url::from_str(&rpc_addr).unwrap(), &ws_addr).await?;
let light_bridge = LiteBridge::new(
Url::from_str(&rpc_addr).unwrap(),
&ws_addr,
batch_transactions,
)
.await?;
let services = light_bridge.start_services(lite_rpc_addr);
let services = futures::future::try_join_all(services);

View File

@ -1,98 +1,62 @@
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use log::{info, warn};
use solana_client::nonblocking::tpu_client::TpuClient;
use solana_sdk::signature::Signature;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use crate::{WireTransaction, DEFAULT_TX_RETRY_BATCH_SIZE, TX_MAX_RETRIES_UPPER_LIMIT};
use super::block_listenser::BlockListener;
use crate::{WireTransaction, DEFAULT_TX_RETRY_BATCH_SIZE};
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
#[derive(Clone)]
pub struct TxSender {
/// Transactions queue for retrying
enqueued_txs: Arc<RwLock<HashMap<Signature, (WireTransaction, u16)>>>,
/// block_listner
block_listner: BlockListener,
enqueued_txs: Arc<RwLock<Vec<WireTransaction>>>,
/// TpuClient to call the tpu port
tpu_client: Arc<TpuClient>,
}
impl TxSender {
pub fn new(tpu_client: Arc<TpuClient>, block_listner: BlockListener) -> Self {
pub fn new(tpu_client: Arc<TpuClient>) -> Self {
Self {
enqueued_txs: Default::default(),
block_listner,
tpu_client,
}
}
/// en-queue transaction if it doesn't already exist
pub async fn enqnueue_tx(&self, sig: Signature, raw_tx: WireTransaction, max_retries: u16) {
if max_retries == 0 {
return;
}
if !self.block_listner.blocks.contains_key(&sig.to_string()) {
let max_retries = max_retries.min(TX_MAX_RETRIES_UPPER_LIMIT);
info!("en-queuing {sig} with max retries {max_retries}");
self.enqueued_txs
.write()
.await
.insert(sig, (raw_tx, max_retries));
println!("{:?}", self.enqueued_txs.read().await.len());
}
pub fn enqnueue_tx(&self, raw_tx: WireTransaction) {
self.enqueued_txs.write().unwrap().push(raw_tx);
}
/// retry enqued_tx(s)
pub async fn retry_txs(&self) {
let len = self.enqueued_txs.read().await.len();
let mut enqueued_txs = Vec::new();
info!("retrying {len} tx(s)");
std::mem::swap(&mut enqueued_txs, &mut self.enqueued_txs.write().unwrap());
let enqueued_txs = self.enqueued_txs.read().unwrap().clone();
let len = enqueued_txs.len();
info!("sending {len} tx(s)");
if len == 0 {
return;
}
let mut enqued_tx = self.enqueued_txs.write().await;
let mut tx_batch = Vec::with_capacity(enqued_tx.len() / DEFAULT_TX_RETRY_BATCH_SIZE);
let mut stale_txs = vec![];
let mut tx_batch = Vec::with_capacity(len / DEFAULT_TX_RETRY_BATCH_SIZE);
let mut batch_index = 0;
for (index, (sig, (tx, retries))) in enqued_tx.iter_mut().enumerate() {
if self.block_listner.blocks.contains_key(&sig.to_string()) {
stale_txs.push(sig.to_owned());
continue;
}
for (index, tx) in self.enqueued_txs.read().unwrap().iter().enumerate() {
if index % DEFAULT_TX_RETRY_BATCH_SIZE == 0 {
tx_batch.push(Vec::with_capacity(DEFAULT_TX_RETRY_BATCH_SIZE));
batch_index += 1;
}
tx_batch[batch_index - 1].push(tx.clone());
let Some(retries_left) = retries.checked_sub(1) else {
stale_txs.push(sig.to_owned());
continue;
};
info!("retrying {sig} with {retries_left} retries left");
*retries = retries_left;
}
// remove stale tx(s)
for stale_tx in stale_txs {
enqued_tx.remove(&stale_tx);
tx_batch[batch_index - 1].push(tx.to_owned());
}
for tx_batch in tx_batch {
@ -108,7 +72,7 @@ impl TxSender {
/// retry and confirm transactions every 800ms (avg time to confirm tx)
pub fn execute(self) -> JoinHandle<anyhow::Result<()>> {
let mut interval = tokio::time::interval(Duration::from_millis(800));
let mut interval = tokio::time::interval(Duration::from_millis(80));
#[allow(unreachable_code)]
tokio::spawn(async move {

View File

@ -28,7 +28,7 @@ async fn send_and_confirm_txs() {
.await
.unwrap();
let tx_sender = TxSender::new(tpu_client, block_listener.clone());
let tx_sender = TxSender::new(tpu_client);
let services = try_join_all(vec![
block_listener.clone().listen(CommitmentConfig::confirmed()),
@ -47,7 +47,7 @@ async fn send_and_confirm_txs() {
let sig = tx.signatures[0];
let tx = BinaryEncoding::Base58.encode(bincode::serialize(&tx).unwrap());
tx_sender.enqnueue_tx(sig, tx.as_bytes().to_vec(), 2).await;
tx_sender.enqnueue_tx(tx.as_bytes().to_vec());
let sig = sig.to_string();