adding transaction replay service
This commit is contained in:
parent
35d436aa5a
commit
d261c80574
|
@ -5,7 +5,8 @@ use crate::{
|
|||
rpc::LiteRpcServer,
|
||||
workers::{
|
||||
tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres,
|
||||
PrometheusSync, TxSender, WireTransaction,
|
||||
PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction,
|
||||
RETRY_AFTER,
|
||||
},
|
||||
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
|
||||
};
|
||||
|
@ -31,8 +32,9 @@ use solana_sdk::{
|
|||
use solana_transaction_status::TransactionStatus;
|
||||
use tokio::{
|
||||
net::ToSocketAddrs,
|
||||
sync::mpsc::{self, Sender},
|
||||
sync::mpsc::{self, Sender, UnboundedSender},
|
||||
task::JoinHandle,
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
|
@ -62,6 +64,9 @@ pub struct LiteBridge {
|
|||
pub tx_sender: TxSender,
|
||||
pub block_listner: BlockListener,
|
||||
pub block_store: BlockStore,
|
||||
|
||||
pub tx_replayer: TransactionReplayer,
|
||||
pub tx_replay_sender: Option<UnboundedSender<TransactionReplay>>,
|
||||
}
|
||||
|
||||
impl LiteBridge {
|
||||
|
@ -91,6 +96,7 @@ impl LiteBridge {
|
|||
let block_listner =
|
||||
BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store.clone());
|
||||
|
||||
let tx_replayer = TransactionReplayer::new(tx_sender.clone());
|
||||
Ok(Self {
|
||||
rpc_client,
|
||||
tpu_service,
|
||||
|
@ -98,6 +104,8 @@ impl LiteBridge {
|
|||
tx_sender,
|
||||
block_listner,
|
||||
block_store,
|
||||
tx_replayer,
|
||||
tx_replay_sender: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -131,6 +139,12 @@ impl LiteBridge {
|
|||
.clone()
|
||||
.execute(tx_recv, postgres_send.clone());
|
||||
|
||||
let (replay_sender, replay_reciever) = tokio::sync::mpsc::unbounded_channel();
|
||||
let replay_service = self
|
||||
.tx_replayer
|
||||
.start_service(replay_sender.clone(), replay_reciever);
|
||||
self.tx_replay_sender = Some(replay_sender);
|
||||
|
||||
let metrics_capture = MetricsCapture::new(self.tx_sender.clone()).capture();
|
||||
let prometheus_sync = PrometheusSync.sync(prometheus_addr);
|
||||
|
||||
|
@ -193,6 +207,7 @@ impl LiteBridge {
|
|||
metrics_capture,
|
||||
prometheus_sync,
|
||||
cleaner,
|
||||
replay_service,
|
||||
];
|
||||
|
||||
services.append(&mut tpu_services);
|
||||
|
@ -216,7 +231,7 @@ impl LiteRpcServer for LiteBridge {
|
|||
|
||||
let SendTransactionConfig {
|
||||
encoding,
|
||||
max_retries: _,
|
||||
max_retries,
|
||||
} = send_transaction_config.unwrap_or_default();
|
||||
|
||||
let raw_tx = match encoding.decode(tx) {
|
||||
|
@ -242,6 +257,7 @@ impl LiteRpcServer for LiteBridge {
|
|||
return Err(jsonrpsee::core::Error::Custom("Blockhash not found in block store".to_string()));
|
||||
};
|
||||
|
||||
let raw_tx_clone = raw_tx.clone();
|
||||
if let Err(e) = self
|
||||
.tx_send_channel
|
||||
.as_ref()
|
||||
|
@ -254,6 +270,19 @@ impl LiteRpcServer for LiteBridge {
|
|||
e
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(tx_replay_sender) = &self.tx_replay_sender {
|
||||
let max_replay = max_retries.map_or(10, |x| x) as usize;
|
||||
let replay_at = Instant::now() + RETRY_AFTER;
|
||||
// ignore error for replay service
|
||||
let _ = tx_replay_sender.send(TransactionReplay {
|
||||
signature: sig.to_string(),
|
||||
tx: raw_tx_clone,
|
||||
replay_count: 0,
|
||||
max_replay,
|
||||
replay_at,
|
||||
});
|
||||
}
|
||||
TXS_IN_CHANNEL.inc();
|
||||
|
||||
Ok(BinaryEncoding::Base58.encode(sig))
|
||||
|
|
|
@ -16,10 +16,6 @@ pub const DEFAULT_RPC_ADDR: &str = "http://0.0.0.0:8899";
|
|||
pub const DEFAULT_LITE_RPC_ADDR: &str = "http://0.0.0.0:8890";
|
||||
#[from_env]
|
||||
pub const DEFAULT_WS_ADDR: &str = "ws://0.0.0.0:8900";
|
||||
#[from_env]
|
||||
pub const DEFAULT_TX_MAX_RETRIES: u16 = 1;
|
||||
#[from_env]
|
||||
pub const DEFAULT_TX_BATCH_SIZE: usize = 100;
|
||||
|
||||
#[from_env]
|
||||
pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000;
|
||||
|
@ -27,6 +23,7 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000;
|
|||
/// 25 slots in 10s send to little more leaders
|
||||
#[from_env]
|
||||
pub const DEFAULT_FANOUT_SIZE: u64 = 100;
|
||||
|
||||
#[from_env]
|
||||
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
|
||||
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =
|
||||
|
|
|
@ -4,6 +4,7 @@ mod metrics_capture;
|
|||
mod postgres;
|
||||
mod prometheus_sync;
|
||||
pub mod tpu_utils;
|
||||
mod transaction_replayer;
|
||||
mod tx_sender;
|
||||
|
||||
pub use block_listenser::*;
|
||||
|
@ -11,4 +12,5 @@ pub use cleaner::*;
|
|||
pub use metrics_capture::*;
|
||||
pub use postgres::*;
|
||||
pub use prometheus_sync::*;
|
||||
pub use transaction_replayer::*;
|
||||
pub use tx_sender::*;
|
||||
|
|
|
@ -70,7 +70,7 @@ pub struct PostgresUpdateTx {
|
|||
pub processed_slot: i64, // 8 bytes
|
||||
pub cu_consumed: Option<i64>,
|
||||
pub cu_requested: Option<i64>,
|
||||
pub cu_price: Option<i64>
|
||||
pub cu_price: Option<i64>,
|
||||
}
|
||||
|
||||
impl SchemaSize for PostgresUpdateTx {
|
||||
|
@ -315,7 +315,7 @@ impl PostgresSession {
|
|||
processed_slot,
|
||||
cu_consumed,
|
||||
cu_requested,
|
||||
cu_price
|
||||
cu_price,
|
||||
} = tx;
|
||||
|
||||
args.push(signature);
|
||||
|
|
|
@ -205,6 +205,10 @@ impl ActiveConnection {
|
|||
last_stable_id: Arc<AtomicU64>,
|
||||
) {
|
||||
for _ in 0..3 {
|
||||
if exit_signal.load(Ordering::Relaxed) {
|
||||
// return
|
||||
return;
|
||||
}
|
||||
// get new connection reset if necessary
|
||||
let conn = {
|
||||
let last_stable_id = last_stable_id.load(Ordering::Relaxed) as usize;
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
use super::TxSender;
|
||||
use log::error;
|
||||
use std::time::Duration;
|
||||
use tokio::{
|
||||
sync::mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
task::JoinHandle,
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
pub const RETRY_AFTER: Duration = Duration::from_secs(4);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TransactionReplay {
|
||||
pub signature: String,
|
||||
pub tx: Vec<u8>,
|
||||
pub replay_count: usize,
|
||||
pub max_replay: usize,
|
||||
pub replay_at: Instant,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TransactionReplayer {
|
||||
pub tx_sender: TxSender,
|
||||
}
|
||||
|
||||
impl TransactionReplayer {
|
||||
pub fn new(tx_sender: TxSender) -> Self {
|
||||
Self { tx_sender }
|
||||
}
|
||||
|
||||
pub fn start_service(
|
||||
&self,
|
||||
sender: UnboundedSender<TransactionReplay>,
|
||||
reciever: UnboundedReceiver<TransactionReplay>,
|
||||
) -> JoinHandle<anyhow::Result<()>> {
|
||||
let tx_sender = self.tx_sender.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut reciever = reciever;
|
||||
loop {
|
||||
let tx = reciever.recv().await;
|
||||
match tx {
|
||||
Some(mut tx_replay) => {
|
||||
if Instant::now() < tx_replay.replay_at {
|
||||
tokio::time::sleep_until(tx_replay.replay_at).await;
|
||||
}
|
||||
if let Some(tx) = tx_sender.txs_sent_store.get(&tx_replay.signature) {
|
||||
if tx.status.is_some() {
|
||||
// transaction has been confirmed / no retry needed
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
// transaction timed out
|
||||
continue;
|
||||
}
|
||||
// ignore reset error
|
||||
let _ = tx_sender.tpu_service.send_transaction(tx_replay.tx.clone());
|
||||
|
||||
if tx_replay.replay_count < tx_replay.max_replay {
|
||||
tx_replay.replay_count += 1;
|
||||
tx_replay.replay_at = Instant::now() + RETRY_AFTER;
|
||||
if let Err(e) = sender.send(tx_replay) {
|
||||
error!("error while scheduling replay ({})", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
error!("transaction replay channel broken");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue