tx store in tx_sender

This commit is contained in:
Aniket Prajapati 2023-01-10 12:36:24 +05:30
parent a4080aa396
commit bea2403e2a
No known key found for this signature in database
GPG Key ID: D4346D8C9C5398F2
5 changed files with 43 additions and 35 deletions

View File

@ -9,7 +9,6 @@ use std::{ops::Deref, str::FromStr, sync::Arc, time::Duration};
use anyhow::bail;
use dashmap::DashMap;
use log::info;
use reqwest::Url;
@ -35,7 +34,6 @@ pub struct LiteBridge {
pub tx_sender: TxSender,
pub finalized_block_listenser: BlockListener,
pub confirmed_block_listenser: BlockListener,
pub txs_sent: Arc<DashMap<String, Option<TransactionStatus>>>,
#[cfg(feature = "metrics")]
pub metrics: Arc<tokio::sync::RwLock<crate::metrics::Metrics>>,
}
@ -48,12 +46,13 @@ impl LiteBridge {
Arc::new(TpuClient::new(rpc_client.clone(), ws_addr, Default::default()).await?);
let pub_sub_client = Arc::new(PubsubClient::new(ws_addr).await?);
let txs_sent = Arc::new(DashMap::new());
let tx_sender = TxSender::new(tpu_client.clone());
let finalized_block_listenser = BlockListener::new(
pub_sub_client.clone(),
rpc_client.clone(),
txs_sent.clone(),
tx_sender.txs_sent.clone(),
CommitmentConfig::finalized(),
)
.await?;
@ -61,20 +60,17 @@ impl LiteBridge {
let confirmed_block_listenser = BlockListener::new(
pub_sub_client,
rpc_client.clone(),
txs_sent.clone(),
tx_sender.txs_sent.clone(),
CommitmentConfig::confirmed(),
)
.await?;
let tx_sender = TxSender::new(tpu_client.clone());
Ok(Self {
tx_sender,
finalized_block_listenser,
confirmed_block_listenser,
rpc_url,
tpu_client,
txs_sent,
#[cfg(feature = "metrics")]
metrics: Default::default(),
})
@ -92,11 +88,11 @@ impl LiteBridge {
loop {
one_second.tick().await;
let txs_sent = self.txs_sent.len();
let txs_sent = self.tx_sender.txs_sent.len();
let mut txs_confirmed: usize = 0;
let mut txs_finalized: usize = 0;
for tx in self.txs_sent.iter() {
for tx in self.tx_sender.txs_sent.iter() {
if let Some(tx) = tx.value() {
match tx.confirmation_status() {
TransactionConfirmationStatus::Confirmed => txs_confirmed += 1,
@ -215,9 +211,7 @@ impl LiteRpcServer for LiteBridge {
.unwrap()
.signatures[0];
self.txs_sent.insert(sig.to_string(), None);
self.tx_sender.enqnueue_tx(raw_tx);
self.tx_sender.enqnueue_tx(sig.to_string(), raw_tx);
Ok(BinaryEncoding::Base58.encode(sig))
}
@ -255,7 +249,7 @@ impl LiteRpcServer for LiteBridge {
) -> crate::rpc::Result<RpcResponse<Vec<Option<TransactionStatus>>>> {
let sig_statuses = sigs
.iter()
.map(|sig| self.txs_sent.get(sig).and_then(|v| v.clone()))
.map(|sig| self.tx_sender.txs_sent.get(sig).and_then(|v| v.clone()))
.collect();
Ok(RpcResponse {
@ -291,7 +285,7 @@ impl LiteRpcServer for LiteBridge {
.unwrap()
.to_string();
self.txs_sent.insert(airdrop_sig.clone(), None);
self.tx_sender.txs_sent.insert(airdrop_sig.clone(), None);
Ok(airdrop_sig)
}

View File

@ -1,4 +1,6 @@
use crate::{DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR};
use crate::{
DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS, DEFAULT_TX_BATCH_SIZE, DEFAULT_WS_ADDR,
};
use clap::Parser;
#[derive(Parser, Debug)]
@ -13,9 +15,9 @@ pub struct Args {
#[arg(short = 's', long, default_value_t = String::from("127.0.0.1:8891"))]
pub lite_rpc_ws_addr: String,
/// batch size of each batch forward
#[arg(short = 'b', long, default_value_t = 64usize)]
#[arg(short = 'b', long, default_value_t = DEFAULT_TX_BATCH_SIZE)]
pub tx_batch_size: usize,
/// interval between each batch forward
#[arg(short = 'i', long, default_value_t = 2u64)]
#[arg(short = 'i', long, default_value_t = DEFAULT_TX_BATCH_INTERVAL_MS)]
pub tx_batch_interval_ms: u64,
}

View File

@ -1,5 +1,8 @@
use std::sync::Arc;
use const_env::from_env;
use solana_transaction_status::TransactionConfirmationStatus;
use dashmap::DashMap;
use solana_transaction_status::{TransactionConfirmationStatus, TransactionStatus};
pub mod bridge;
pub mod cli;
@ -11,6 +14,7 @@ pub mod rpc;
pub mod workers;
pub type WireTransaction = Vec<u8>;
pub type TxsSent = Arc<DashMap<String, Option<TransactionStatus>>>;
#[from_env]
pub const DEFAULT_RPC_ADDR: &str = "http://127.0.0.1:8899";
@ -23,10 +27,13 @@ pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
#[from_env]
pub const TX_MAX_RETRIES_UPPER_LIMIT: u16 = 5;
#[from_env]
pub const DEFAULT_TX_RETRY_BATCH_SIZE: usize = 20;
pub const DEFAULT_TX_BATCH_SIZE: usize = 64;
#[from_env]
pub const DEFAULT_TX_BATCH_INTERVAL_MS: u64= 2;
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =
TransactionConfirmationStatus::Finalized;
#[cfg(feature = "metrics")]
#[from_env]
pub const DEFAULT_METRIC_RESET_TIME_INTERVAL: u64 = 12;
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =
TransactionConfirmationStatus::Finalized;

View File

@ -9,11 +9,13 @@ use solana_client::nonblocking::tpu_client::TpuClient;
use tokio::task::JoinHandle;
use crate::WireTransaction;
use crate::{TxsSent, WireTransaction};
/// Retry transactions to a maximum of `u16` times, keep a track of confirmed transactions
#[derive(Clone)]
pub struct TxSender {
/// Tx(s) forwarded to tpu
pub txs_sent: TxsSent,
/// Transactions queue for retrying
enqueued_txs: Arc<RwLock<Vec<WireTransaction>>>,
/// TpuClient to call the tpu port
@ -25,10 +27,12 @@ impl TxSender {
Self {
enqueued_txs: Default::default(),
tpu_client,
txs_sent: Default::default(),
}
}
/// en-queue transaction if it doesn't already exist
pub fn enqnueue_tx(&self, raw_tx: WireTransaction) {
pub fn enqnueue_tx(&self, sig: String, raw_tx: WireTransaction) {
self.txs_sent.insert(sig, None);
self.enqueued_txs.write().unwrap().push(raw_tx);
}

View File

@ -2,12 +2,12 @@ use std::sync::Arc;
use std::time::Duration;
use bench_utils::helpers::BenchHelper;
use dashmap::DashMap;
use futures::future::try_join_all;
use lite_rpc::{
encoding::BinaryEncoding,
workers::{BlockListener, TxSender},
DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR,
DEFAULT_LITE_RPC_ADDR, DEFAULT_RPC_ADDR, DEFAULT_TX_BATCH_INTERVAL_MS, DEFAULT_TX_BATCH_SIZE,
DEFAULT_WS_ADDR,
};
use solana_client::nonblocking::{
pubsub_client::PubsubClient, rpc_client::RpcClient, tpu_client::TpuClient,
@ -29,22 +29,24 @@ async fn send_and_confirm_txs() {
);
let pub_sub_client = Arc::new(PubsubClient::new(DEFAULT_WS_ADDR).await.unwrap());
let txs_sent = Arc::new(DashMap::new());
let tx_sender = TxSender::new(tpu_client);
let block_listener = BlockListener::new(
pub_sub_client.clone(),
rpc_client.clone(),
txs_sent.clone(),
tx_sender.txs_sent.clone(),
CommitmentConfig::confirmed(),
)
.await
.unwrap();
let tx_sender = TxSender::new(tpu_client);
let services = try_join_all(vec![
block_listener.clone().listen(),
tx_sender.clone().execute(),
tx_sender.clone().execute(
DEFAULT_TX_BATCH_SIZE,
Duration::from_millis(DEFAULT_TX_BATCH_INTERVAL_MS),
),
]);
let confirm = tokio::spawn(async move {
@ -58,13 +60,12 @@ async fn send_and_confirm_txs() {
let tx = bench_helper.create_transaction(&funded_payer, blockhash);
let sig = tx.signatures[0];
let tx = BinaryEncoding::Base58.encode(bincode::serialize(&tx).unwrap());
tx_sender.enqnueue_tx(tx.as_bytes().to_vec());
let sig = sig.to_string();
tx_sender.enqnueue_tx(sig.clone(), tx.as_bytes().to_vec());
for _ in 0..2 {
let tx_status = txs_sent.get(&sig).unwrap();
let tx_status = tx_sender.txs_sent.get(&sig).unwrap();
if let Some(tx_status) = tx_status.value() {
if tx_status.confirmation_status() == TransactionConfirmationStatus::Confirmed {