From d261c805741a13d85531d461229f7ecd55fcc652 Mon Sep 17 00:00:00 2001 From: Godmode Galactus Date: Tue, 18 Apr 2023 11:05:01 +0200 Subject: [PATCH] adding transaction replay service --- src/bridge.rs | 35 ++++++++- src/lib.rs | 5 +- src/workers/block_listenser.rs | 2 +- src/workers/mod.rs | 2 + src/workers/postgres.rs | 4 +- .../tpu_utils/tpu_connection_manager.rs | 4 + src/workers/transaction_replayer.rs | 76 +++++++++++++++++++ 7 files changed, 118 insertions(+), 10 deletions(-) create mode 100644 src/workers/transaction_replayer.rs diff --git a/src/bridge.rs b/src/bridge.rs index 9c56731b..ed89ec04 100644 --- a/src/bridge.rs +++ b/src/bridge.rs @@ -5,7 +5,8 @@ use crate::{ rpc::LiteRpcServer, workers::{ tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres, - PrometheusSync, TxSender, WireTransaction, + PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction, + RETRY_AFTER, }, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, }; @@ -31,8 +32,9 @@ use solana_sdk::{ use solana_transaction_status::TransactionStatus; use tokio::{ net::ToSocketAddrs, - sync::mpsc::{self, Sender}, + sync::mpsc::{self, Sender, UnboundedSender}, task::JoinHandle, + time::Instant, }; lazy_static::lazy_static! { @@ -62,6 +64,9 @@ pub struct LiteBridge { pub tx_sender: TxSender, pub block_listner: BlockListener, pub block_store: BlockStore, + + pub tx_replayer: TransactionReplayer, + pub tx_replay_sender: Option>, } impl LiteBridge { @@ -91,6 +96,7 @@ impl LiteBridge { let block_listner = BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store.clone()); + let tx_replayer = TransactionReplayer::new(tx_sender.clone()); Ok(Self { rpc_client, tpu_service, @@ -98,6 +104,8 @@ impl LiteBridge { tx_sender, block_listner, block_store, + tx_replayer, + tx_replay_sender: None, }) } @@ -131,6 +139,12 @@ impl LiteBridge { .clone() .execute(tx_recv, postgres_send.clone()); + let (replay_sender, replay_reciever) = tokio::sync::mpsc::unbounded_channel(); + let replay_service = self + .tx_replayer + .start_service(replay_sender.clone(), replay_reciever); + self.tx_replay_sender = Some(replay_sender); + let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture(); let prometheus_sync = PrometheusSync.sync(prometheus_addr); @@ -193,6 +207,7 @@ impl LiteBridge { metrics_capture, prometheus_sync, cleaner, + replay_service, ]; services.append(&mut tpu_services); @@ -216,7 +231,7 @@ impl LiteRpcServer for LiteBridge { let SendTransactionConfig { encoding, - max_retries: _, + max_retries, } = send_transaction_config.unwrap_or_default(); let raw_tx = match encoding.decode(tx) { @@ -242,6 +257,7 @@ impl LiteRpcServer for LiteBridge { return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string())); }; + let raw_tx_clone = raw_tx.clone(); if let Err(e) = self .tx_send_channel .as_ref() @@ -254,6 +270,19 @@ impl LiteRpcServer for LiteBridge { e ); } + + if let Some(tx_replay_sender) = &self.tx_replay_sender { + let max_replay = max_retries.map_or(10, |x| x) as usize; + let replay_at = Instant::now() + RETRY_AFTER; + // ignore error for replay service + let _ = tx_replay_sender.send(TransactionReplay { + signature: sig.to_string(), + tx: raw_tx_clone, + replay_count: 0, + max_replay, + replay_at, + }); + } TXS_IN_CHANNEL.inc(); Ok(BinaryEncoding::Base58.encode(sig)) diff --git a/src/lib.rs b/src/lib.rs index 658cbc32..fa93e038 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,10 +16,6 @@ pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899"; pub const DEFAULT_LITE_RPC_ADDR: &str = "http://0.0.0.0:8890"; #[from_env] pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900"; -#[from_env] -pub const DEFAULT_TX_MAX_RETRIES: u16 = 1; -#[from_env] -pub const DEFAULT_TX_BATCH_SIZE: usize = 100; #[from_env] pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000; @@ -27,6 +23,7 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000; /// 25 slots in 10s send to little more leaders #[from_env] pub const DEFAULT_FANOUT_SIZE: u64 = 100; + #[from_env] pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus = diff --git a/src/workers/block_listenser.rs b/src/workers/block_listenser.rs index 4d7fdea5..041ef80d 100644 --- a/src/workers/block_listenser.rs +++ b/src/workers/block_listenser.rs @@ -313,7 +313,7 @@ impl BlockListener { None }); - if let Some((units, additional_fee)) = legacy_compute_budget { + if let Some((units, additional_fee)) = legacy_compute_budget { cu_requested = Some(units); if additional_fee > 0 { cu_price = Some((units * 1000) / additional_fee) diff --git a/src/workers/mod.rs b/src/workers/mod.rs index cdad24ab..f250f269 100644 --- a/src/workers/mod.rs +++ b/src/workers/mod.rs @@ -4,6 +4,7 @@ mod metrics_capture; mod postgres; mod prometheus_sync; pub mod tpu_utils; +mod transaction_replayer; mod tx_sender; pub use block_listenser::*; @@ -11,4 +12,5 @@ pub use cleaner::*; pub use metrics_capture::*; pub use postgres::*; pub use prometheus_sync::*; +pub use transaction_replayer::*; pub use tx_sender::*; diff --git a/src/workers/postgres.rs b/src/workers/postgres.rs index 2164d40b..844ad394 100644 --- a/src/workers/postgres.rs +++ b/src/workers/postgres.rs @@ -70,7 +70,7 @@ pub struct PostgresUpdateTx { pub processed_slot: i64, // 8 bytes pub cu_consumed: Option, pub cu_requested: Option, - pub cu_price: Option + pub cu_price: Option, } impl SchemaSize for PostgresUpdateTx { @@ -315,7 +315,7 @@ impl PostgresSession { processed_slot, cu_consumed, cu_requested, - cu_price + cu_price, } = tx; args.push(signature); diff --git a/src/workers/tpu_utils/tpu_connection_manager.rs b/src/workers/tpu_utils/tpu_connection_manager.rs index 80f05a3f..034f330d 100644 --- a/src/workers/tpu_utils/tpu_connection_manager.rs +++ b/src/workers/tpu_utils/tpu_connection_manager.rs @@ -205,6 +205,10 @@ impl ActiveConnection { last_stable_id: Arc, ) { for _ in 0..3 { + if exit_signal.load(Ordering::Relaxed) { + // return + return; + } // get new connection reset if necessary let conn = { let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize; diff --git a/src/workers/transaction_replayer.rs b/src/workers/transaction_replayer.rs new file mode 100644 index 00000000..aecfb6bf --- /dev/null +++ b/src/workers/transaction_replayer.rs @@ -0,0 +1,76 @@ +use super::TxSender; +use log::error; +use std::time::Duration; +use tokio::{ + sync::mpsc::{UnboundedReceiver, UnboundedSender}, + task::JoinHandle, + time::Instant, +}; + +pub const RETRY_AFTER: Duration = Duration::from_secs(4); + +#[derive(Debug, Clone)] +pub struct TransactionReplay { + pub signature: String, + pub tx: Vec, + pub replay_count: usize, + pub max_replay: usize, + pub replay_at: Instant, +} + +#[derive(Clone)] +pub struct TransactionReplayer { + pub tx_sender: TxSender, +} + +impl TransactionReplayer { + pub fn new(tx_sender: TxSender) -> Self { + Self { tx_sender } + } + + pub fn start_service( + &self, + sender: UnboundedSender, + reciever: UnboundedReceiver, + ) -> JoinHandle> { + let tx_sender = self.tx_sender.clone(); + tokio::spawn(async move { + let mut reciever = reciever; + loop { + let tx = reciever.recv().await; + match tx { + Some(mut tx_replay) => { + if Instant::now() < tx_replay.replay_at { + tokio::time::sleep_until(tx_replay.replay_at).await; + } + if let Some(tx) = tx_sender.txs_sent_store.get(&tx_replay.signature) { + if tx.status.is_some() { + // transaction has been confirmed / no retry needed + continue; + } + } else { + // transaction timed out + continue; + } + // ignore reset error + let _ = tx_sender.tpu_service.send_transaction(tx_replay.tx.clone()); + + if tx_replay.replay_count < tx_replay.max_replay { + tx_replay.replay_count += 1; + tx_replay.replay_at = Instant::now() + RETRY_AFTER; + if let Err(e) = sender.send(tx_replay) { + error!("error while scheduling replay ({})", e); + continue; + } + } + } + None => { + error!("transaction replay channel broken"); + break; + } + } + } + Ok(()) + }) + } +}