making the parameter as arguments
This commit is contained in:
parent
d3db9dbb84
commit
dde14ab16e
|
@ -6,7 +6,6 @@ use crate::{
|
||||||
workers::{
|
workers::{
|
||||||
tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres,
|
tpu_utils::tpu_service::TpuService, BlockListener, Cleaner, MetricsCapture, Postgres,
|
||||||
PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction,
|
PrometheusSync, TransactionReplay, TransactionReplayer, TxSender, WireTransaction,
|
||||||
RETRY_AFTER,
|
|
||||||
},
|
},
|
||||||
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
|
DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE,
|
||||||
};
|
};
|
||||||
|
@ -67,6 +66,7 @@ pub struct LiteBridge {
|
||||||
|
|
||||||
pub tx_replayer: TransactionReplayer,
|
pub tx_replayer: TransactionReplayer,
|
||||||
pub tx_replay_sender: Option<UnboundedSender<TransactionReplay>>,
|
pub tx_replay_sender: Option<UnboundedSender<TransactionReplay>>,
|
||||||
|
pub max_retries: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LiteBridge {
|
impl LiteBridge {
|
||||||
|
@ -75,6 +75,8 @@ impl LiteBridge {
|
||||||
ws_addr: String,
|
ws_addr: String,
|
||||||
fanout_slots: u64,
|
fanout_slots: u64,
|
||||||
identity: Keypair,
|
identity: Keypair,
|
||||||
|
retry_after: Duration,
|
||||||
|
max_retries: usize,
|
||||||
) -> anyhow::Result<Self> {
|
) -> anyhow::Result<Self> {
|
||||||
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
|
let rpc_client = Arc::new(RpcClient::new(rpc_url.clone()));
|
||||||
let current_slot = rpc_client.get_slot().await?;
|
let current_slot = rpc_client.get_slot().await?;
|
||||||
|
@ -96,7 +98,7 @@ impl LiteBridge {
|
||||||
let block_listner =
|
let block_listner =
|
||||||
BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store.clone());
|
BlockListener::new(rpc_client.clone(), tx_sender.clone(), block_store.clone());
|
||||||
|
|
||||||
let tx_replayer = TransactionReplayer::new(tx_sender.clone());
|
let tx_replayer = TransactionReplayer::new(tx_sender.clone(), retry_after);
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
rpc_client,
|
rpc_client,
|
||||||
tpu_service,
|
tpu_service,
|
||||||
|
@ -106,6 +108,7 @@ impl LiteBridge {
|
||||||
block_store,
|
block_store,
|
||||||
tx_replayer,
|
tx_replayer,
|
||||||
tx_replay_sender: None,
|
tx_replay_sender: None,
|
||||||
|
max_retries,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,8 +275,8 @@ impl LiteRpcServer for LiteBridge {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(tx_replay_sender) = &self.tx_replay_sender {
|
if let Some(tx_replay_sender) = &self.tx_replay_sender {
|
||||||
let max_replay = max_retries.map_or(10, |x| x) as usize;
|
let max_replay = max_retries.map_or(self.max_retries, |x| x as usize);
|
||||||
let replay_at = Instant::now() + RETRY_AFTER;
|
let replay_at = Instant::now() + self.tx_replayer.retry_after;
|
||||||
// ignore error for replay service
|
// ignore error for replay service
|
||||||
let _ = tx_replay_sender.send(TransactionReplay {
|
let _ = tx_replay_sender.send(TransactionReplay {
|
||||||
signature: sig.to_string(),
|
signature: sig.to_string(),
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
use crate::{DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RPC_ADDR, DEFAULT_WS_ADDR};
|
use crate::{
|
||||||
|
DEFAULT_CLEAN_INTERVAL_MS, DEFAULT_FANOUT_SIZE, DEFAULT_RETRY_TIMEOUT, DEFAULT_RPC_ADDR,
|
||||||
|
DEFAULT_WS_ADDR, MAX_RETRIES,
|
||||||
|
};
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
|
@ -26,4 +29,8 @@ pub struct Args {
|
||||||
pub prometheus_addr: String,
|
pub prometheus_addr: String,
|
||||||
#[arg(short = 'k', long, default_value_t = String::new())]
|
#[arg(short = 'k', long, default_value_t = String::new())]
|
||||||
pub identity_keypair: String,
|
pub identity_keypair: String,
|
||||||
|
#[arg(long, default_value_t = MAX_RETRIES)]
|
||||||
|
pub maximum_retries_per_tx: usize,
|
||||||
|
#[arg(long, default_value_t = DEFAULT_RETRY_TIMEOUT)]
|
||||||
|
pub transaction_retry_after_secs: u64,
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,11 @@ pub const DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE: usize = 40_000;
|
||||||
#[from_env]
|
#[from_env]
|
||||||
pub const DEFAULT_FANOUT_SIZE: u64 = 100;
|
pub const DEFAULT_FANOUT_SIZE: u64 = 100;
|
||||||
|
|
||||||
|
#[from_env]
|
||||||
|
pub const MAX_RETRIES: usize = 10;
|
||||||
|
|
||||||
|
pub const DEFAULT_RETRY_TIMEOUT: u64 = 4;
|
||||||
|
|
||||||
#[from_env]
|
#[from_env]
|
||||||
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
|
pub const DEFAULT_CLEAN_INTERVAL_MS: u64 = 5 * 60 * 1000; // five minute
|
||||||
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =
|
pub const DEFAULT_TRANSACTION_CONFIRMATION_STATUS: TransactionConfirmationStatus =
|
||||||
|
|
13
src/main.rs
13
src/main.rs
|
@ -45,6 +45,8 @@ pub async fn main() -> anyhow::Result<()> {
|
||||||
enable_postgres,
|
enable_postgres,
|
||||||
prometheus_addr,
|
prometheus_addr,
|
||||||
identity_keypair,
|
identity_keypair,
|
||||||
|
maximum_retries_per_tx,
|
||||||
|
transaction_retry_after_secs,
|
||||||
} = Args::parse();
|
} = Args::parse();
|
||||||
|
|
||||||
dotenv().ok();
|
dotenv().ok();
|
||||||
|
@ -53,7 +55,16 @@ pub async fn main() -> anyhow::Result<()> {
|
||||||
|
|
||||||
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
|
let clean_interval_ms = Duration::from_millis(clean_interval_ms);
|
||||||
|
|
||||||
let light_bridge = LiteBridge::new(rpc_addr, ws_addr, fanout_size, identity).await?;
|
let retry_after = Duration::from_secs(transaction_retry_after_secs);
|
||||||
|
let light_bridge = LiteBridge::new(
|
||||||
|
rpc_addr,
|
||||||
|
ws_addr,
|
||||||
|
fanout_size,
|
||||||
|
identity,
|
||||||
|
retry_after,
|
||||||
|
maximum_retries_per_tx,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
let services = light_bridge
|
let services = light_bridge
|
||||||
.start_services(
|
.start_services(
|
||||||
|
|
|
@ -7,8 +7,6 @@ use tokio::{
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const RETRY_AFTER: Duration = Duration::from_secs(4);
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct TransactionReplay {
|
pub struct TransactionReplay {
|
||||||
pub signature: String,
|
pub signature: String,
|
||||||
|
@ -21,11 +19,15 @@ pub struct TransactionReplay {
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TransactionReplayer {
|
pub struct TransactionReplayer {
|
||||||
pub tx_sender: TxSender,
|
pub tx_sender: TxSender,
|
||||||
|
pub retry_after: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TransactionReplayer {
|
impl TransactionReplayer {
|
||||||
pub fn new(tx_sender: TxSender) -> Self {
|
pub fn new(tx_sender: TxSender, retry_after: Duration) -> Self {
|
||||||
Self { tx_sender }
|
Self {
|
||||||
|
tx_sender,
|
||||||
|
retry_after,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start_service(
|
pub fn start_service(
|
||||||
|
@ -34,6 +36,7 @@ impl TransactionReplayer {
|
||||||
reciever: UnboundedReceiver<TransactionReplay>,
|
reciever: UnboundedReceiver<TransactionReplay>,
|
||||||
) -> JoinHandle<anyhow::Result<()>> {
|
) -> JoinHandle<anyhow::Result<()>> {
|
||||||
let tx_sender = self.tx_sender.clone();
|
let tx_sender = self.tx_sender.clone();
|
||||||
|
let retry_after = self.retry_after;
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut reciever = reciever;
|
let mut reciever = reciever;
|
||||||
loop {
|
loop {
|
||||||
|
@ -57,7 +60,7 @@ impl TransactionReplayer {
|
||||||
|
|
||||||
if tx_replay.replay_count < tx_replay.max_replay {
|
if tx_replay.replay_count < tx_replay.max_replay {
|
||||||
tx_replay.replay_count += 1;
|
tx_replay.replay_count += 1;
|
||||||
tx_replay.replay_at = Instant::now() + RETRY_AFTER;
|
tx_replay.replay_at = Instant::now() + retry_after;
|
||||||
if let Err(e) = sender.send(tx_replay) {
|
if let Err(e) = sender.send(tx_replay) {
|
||||||
error!("error while scheduling replay ({})", e);
|
error!("error while scheduling replay ({})", e);
|
||||||
continue;
|
continue;
|
||||||
|
|
Loading…
Reference in New Issue