Merge pull request #124 from blockworks-foundation/adding_tx_replayer

Adding transaction replay service
This commit is contained in:
galactus 2023-04-18 14:52:57 +02:00 committed by GitHub
commit a80b84762b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 153 additions and 13 deletions

View File

@ -5,7 +5,7 @@ use crate::{
rpc::LiteRpcServer, rpc::LiteRpcServer,
workers::{ workers::{
tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres, tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres,
PrometheusSync, TxSender, WireTransaction, PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction,
}, },
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
}; };
@ -31,8 +31,9 @@ use solana_sdk::{
use solana_transaction_status::TransactionStatus; use solana_transaction_status::TransactionStatus;
use tokio::{ use tokio::{
net::ToSocketAddrs, net::ToSocketAddrs,
sync::mpsc::{self, Sender}, sync::mpsc::{self, Sender, UnboundedSender},
task::JoinHandle, task::JoinHandle,
time::Instant,
}; };
lazy_static::lazy_static! { lazy_static::lazy_static! {
@ -62,6 +63,10 @@ pub struct LiteBridge {
pub tx_sender: TxSender, pub tx_sender: TxSender,
pub block_listner: BlockListener, pub block_listner: BlockListener,
pub block_store: BlockStore, pub block_store: BlockStore,
pub tx_replayer: TransactionReplayer,
pub tx_replay_sender: Option<UnboundedSender<TransactionReplay>>,
pub max_retries: usize,
} }
impl LiteBridge { impl LiteBridge {
@ -70,6 +75,8 @@ impl LiteBridge {
ws_addr: String, ws_addr: String,
fanout_slots: u64, fanout_slots: u64,
identity: Keypair, identity: Keypair,
retry_after: Duration,
max_retries: usize,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone())); let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
let current_slot = rpc_client.get_slot().await?; let current_slot = rpc_client.get_slot().await?;
@ -91,6 +98,7 @@ impl LiteBridge {
let block_listner = let block_listner =
BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store.clone()); BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store.clone());
let tx_replayer = TransactionReplayer::new(tx_sender.clone(), retry_after);
Ok(Self { Ok(Self {
rpc_client, rpc_client,
tpu_service, tpu_service,
@ -98,6 +106,9 @@ impl LiteBridge {
tx_sender, tx_sender,
block_listner, block_listner,
block_store, block_store,
tx_replayer,
tx_replay_sender: None,
max_retries,
}) })
} }
@ -131,6 +142,12 @@ impl LiteBridge {
.clone() .clone()
.execute(tx_recv, postgres_send.clone()); .execute(tx_recv, postgres_send.clone());
let (replay_sender, replay_reciever) = tokio::sync::mpsc::unbounded_channel();
let replay_service = self
.tx_replayer
.start_service(replay_sender.clone(), replay_reciever);
self.tx_replay_sender = Some(replay_sender);
let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture(); let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture();
let prometheus_sync = PrometheusSync.sync(prometheus_addr); let prometheus_sync = PrometheusSync.sync(prometheus_addr);
@ -193,6 +210,7 @@ impl LiteBridge {
metrics_capture, metrics_capture,
prometheus_sync, prometheus_sync,
cleaner, cleaner,
replay_service,
]; ];
services.append(&mut tpu_services); services.append(&mut tpu_services);
@ -216,7 +234,7 @@ impl LiteRpcServer for LiteBridge {
let SendTransactionConfig { let SendTransactionConfig {
encoding, encoding,
max_retries: _, max_retries,
} = send_transaction_config.unwrap_or_default(); } = send_transaction_config.unwrap_or_default();
let raw_tx = match encoding.decode(tx) { let raw_tx = match encoding.decode(tx) {
@ -242,6 +260,7 @@ impl LiteRpcServer for LiteBridge {
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string())); return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string()));
}; };
let raw_tx_clone = raw_tx.clone();
if let Err(e) = self if let Err(e) = self
.tx_send_channel .tx_send_channel
.as_ref() .as_ref()
@ -254,6 +273,19 @@ impl LiteRpcServer for LiteBridge {
e e
); );
} }
if let Some(tx_replay_sender) = &self.tx_replay_sender {
let max_replay = max_retries.map_or(self.max_retries, |x| x as usize);
let replay_at = Instant::now() + self.tx_replayer.retry_after;
// ignore error for replay service
let _ = tx_replay_sender.send(TransactionReplay {
signature: sig.to_string(),
tx: raw_tx_clone,
replay_count: 0,
max_replay,
replay_at,
});
}
TXS_IN_CHANNEL.inc(); TXS_IN_CHANNEL.inc();
Ok(BinaryEncoding::Base58.encode(sig)) Ok(BinaryEncoding::Base58.encode(sig))

View File

@ -1,4 +1,7 @@
use crate::{DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR}; use crate::{
DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RETRY_TIMEOUT, DEFAULT_RPC_ADDR,
DEFAULT_WS_ADDR, MAX_RETRIES,
};
use clap::Parser; use clap::Parser;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -26,4 +29,8 @@ pub struct Args {
pub prometheus_addr: String, pub prometheus_addr: String,
#[arg(short = 'k', long, default_value_t = String::new())] #[arg(short = 'k', long, default_value_t = String::new())]
pub identity_keypair: String, pub identity_keypair: String,
#[arg(long, default_value_t = MAX_RETRIES)]
pub maximum_retries_per_tx: usize,
#[arg(long, default_value_t = DEFAULT_RETRY_TIMEOUT)]
pub transaction_retry_after_secs: u64,
} }

View File

@ -16,10 +16,6 @@ pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899";
pub const DEFAULT_LITE_RPC_ADDR: &str = "http://0.0.0.0:8890"; pub const DEFAULT_LITE_RPC_ADDR: &str = "http://0.0.0.0:8890";
#[from_env] #[from_env]
pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900"; pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900";
#[from_env]
pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
#[from_env]
pub const DEFAULT_TX_BATCH_SIZE: usize = 100;
#[from_env] #[from_env]
pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000; pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000;
@ -27,6 +23,12 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000;
/// 25 slots in 10s send to little more leaders /// 25 slots in 10s send to little more leaders
#[from_env] #[from_env]
pub const DEFAULT_FANOUT_SIZE: u64 = 100; pub const DEFAULT_FANOUT_SIZE: u64 = 100;
#[from_env]
pub const MAX_RETRIES: usize = 10;
pub const DEFAULT_RETRY_TIMEOUT: u64 = 4;
#[from_env] #[from_env]
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus = pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =

View File

@ -45,6 +45,8 @@ pub async fn main() -> anyhow::Result<()> {
enable_postgres, enable_postgres,
prometheus_addr, prometheus_addr,
identity_keypair, identity_keypair,
maximum_retries_per_tx,
transaction_retry_after_secs,
} = Args::parse(); } = Args::parse();
dotenv().ok(); dotenv().ok();
@ -53,7 +55,16 @@ pub async fn main() -> anyhow::Result<()> {
let clean_interval_ms = Duration::from_millis(clean_interval_ms); let clean_interval_ms = Duration::from_millis(clean_interval_ms);
let light_bridge = LiteBridge::new(rpc_addr, ws_addr, fanout_size, identity).await?; let retry_after = Duration::from_secs(transaction_retry_after_secs);
let light_bridge = LiteBridge::new(
rpc_addr,
ws_addr,
fanout_size,
identity,
retry_after,
maximum_retries_per_tx,
)
.await?;
let services = light_bridge let services = light_bridge
.start_services( .start_services(

View File

@ -313,7 +313,7 @@ impl BlockListener {
None None
}); });
if let Some((units, additional_fee)) = legacy_compute_budget { if let Some((units, additional_fee)) = legacy_compute_budget {
cu_requested = Some(units); cu_requested = Some(units);
if additional_fee > 0 { if additional_fee > 0 {
cu_price = Some((units * 1000) / additional_fee) cu_price = Some((units * 1000) / additional_fee)

View File

@ -4,6 +4,7 @@ mod metrics_capture;
mod postgres; mod postgres;
mod prometheus_sync; mod prometheus_sync;
pub mod tpu_utils; pub mod tpu_utils;
mod transaction_replayer;
mod tx_sender; mod tx_sender;
pub use block_listenser::*; pub use block_listenser::*;
@ -11,4 +12,5 @@ pub use cleaner::*;
pub use metrics_capture::*; pub use metrics_capture::*;
pub use postgres::*; pub use postgres::*;
pub use prometheus_sync::*; pub use prometheus_sync::*;
pub use transaction_replayer::*;
pub use tx_sender::*; pub use tx_sender::*;

View File

@ -70,7 +70,7 @@ pub struct PostgresUpdateTx {
pub processed_slot: i64, // 8 bytes pub processed_slot: i64, // 8 bytes
pub cu_consumed: Option<i64>, pub cu_consumed: Option<i64>,
pub cu_requested: Option<i64>, pub cu_requested: Option<i64>,
pub cu_price: Option<i64> pub cu_price: Option<i64>,
} }
impl SchemaSize for PostgresUpdateTx { impl SchemaSize for PostgresUpdateTx {
@ -315,7 +315,7 @@ impl PostgresSession {
processed_slot, processed_slot,
cu_consumed, cu_consumed,
cu_requested, cu_requested,
cu_price cu_price,
} = tx; } = tx;
args.push(signature); args.push(signature);

View File

@ -205,6 +205,10 @@ impl ActiveConnection {
last_stable_id: Arc<AtomicU64>, last_stable_id: Arc<AtomicU64>,
) { ) {
for _ in 0..3 { for _ in 0..3 {
if exit_signal.load(Ordering::Relaxed) {
// return
return;
}
// get new connection reset if necessary // get new connection reset if necessary
let conn = { let conn = {
let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize; let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize;
@ -218,6 +222,8 @@ impl ActiveConnection {
if conn.stable_id() != current_stable_id { if conn.stable_id() != current_stable_id {
conn.clone() conn.clone()
} else { } else {
NB_QUIC_CONNECTIONS.dec();
let new_conn = Self::connect( let new_conn = Self::connect(
identity, identity,
true, true,
@ -398,6 +404,7 @@ impl ActiveConnection {
}; };
} }
drop(transaction_reciever); drop(transaction_reciever);
NB_QUIC_CONNECTIONS.dec();
NB_QUIC_ACTIVE_CONNECTIONS.dec(); NB_QUIC_ACTIVE_CONNECTIONS.dec();
} }

View File

@ -0,0 +1,79 @@
use super::TxSender;
use log::error;
use std::time::Duration;
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
task::JoinHandle,
time::Instant,
};
#[derive(Debug, Clone)]
pub struct TransactionReplay {
pub signature: String,
pub tx: Vec<u8>,
pub replay_count: usize,
pub max_replay: usize,
pub replay_at: Instant,
}
#[derive(Clone)]
pub struct TransactionReplayer {
pub tx_sender: TxSender,
pub retry_after: Duration,
}
impl TransactionReplayer {
pub fn new(tx_sender: TxSender, retry_after: Duration) -> Self {
Self {
tx_sender,
retry_after,
}
}
pub fn start_service(
&self,
sender: UnboundedSender<TransactionReplay>,
reciever: UnboundedReceiver<TransactionReplay>,
) -> JoinHandle<anyhow::Result<()>> {
let tx_sender = self.tx_sender.clone();
let retry_after = self.retry_after;
tokio::spawn(async move {
let mut reciever = reciever;
loop {
let tx = reciever.recv().await;
match tx {
Some(mut tx_replay) => {
if Instant::now() < tx_replay.replay_at {
tokio::time::sleep_until(tx_replay.replay_at).await;
}
if let Some(tx) = tx_sender.txs_sent_store.get(&tx_replay.signature) {
if tx.status.is_some() {
// transaction has been confirmed / no retry needed
continue;
}
} else {
// transaction timed out
continue;
}
// ignore reset error
let _ = tx_sender.tpu_service.send_transaction(tx_replay.tx.clone());
if tx_replay.replay_count < tx_replay.max_replay {
tx_replay.replay_count += 1;
tx_replay.replay_at = Instant::now() + retry_after;
if let Err(e) = sender.send(tx_replay) {
error!("error while scheduling replay ({})", e);
continue;
}
}
}
None => {
error!("transaction replay channel broken");
break;
}
}
}
Ok(())
})
}
}

View File

@ -170,11 +170,11 @@ impl TxSender {
{ {
Ok(value) => match value { Ok(value) => match value {
Some((sig, tx, slot)) => { Some((sig, tx, slot)) => {
TXS_IN_CHANNEL.dec();
if self.txs_sent_store.contains_key(&sig) { if self.txs_sent_store.contains_key(&sig) {
// duplicate transaction // duplicate transaction
continue; continue;
} }
TXS_IN_CHANNEL.dec();
sigs_and_slots.push((sig, slot)); sigs_and_slots.push((sig, slot));
txs.push(tx); txs.push(tx);
// update the timeout inteval // update the timeout inteval