Changes after groovies review
This commit is contained in:
parent
8c795345dd
commit
5f48f21cd9
|
@ -26,22 +26,22 @@ pub struct TransactionReplay {
|
|||
/// Transaction Replayer
|
||||
/// It will replay transaction sent to the cluster if they are not confirmed
|
||||
/// They will be replayed max_replay times
|
||||
/// The replay time will be exponentialy increasing by after count * replay after
|
||||
/// The replay time will be linearly increasing by after count * replay after
|
||||
/// So the transasctions will be replayed like retry_after, retry_after*2, retry_after*3 ...
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TransactionReplayer {
|
||||
pub tpu_service: TpuService,
|
||||
pub tx_store: TxStore,
|
||||
pub retry_after: Duration,
|
||||
pub retry_offset: Duration,
|
||||
}
|
||||
|
||||
impl TransactionReplayer {
|
||||
pub fn new(tpu_service: TpuService, tx_store: TxStore, retry_after: Duration) -> Self {
|
||||
pub fn new(tpu_service: TpuService, tx_store: TxStore, retry_offset: Duration) -> Self {
|
||||
Self {
|
||||
tpu_service,
|
||||
tx_store,
|
||||
retry_after,
|
||||
retry_offset,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -52,14 +52,14 @@ impl TransactionReplayer {
|
|||
) -> AnyhowJoinHandle {
|
||||
let tpu_service = self.tpu_service.clone();
|
||||
let tx_store = self.tx_store.clone();
|
||||
let retry_after = self.retry_after;
|
||||
let retry_offset = self.retry_offset;
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(mut tx_replay) = reciever.recv().await {
|
||||
MESSAGES_IN_REPLAY_QUEUE.dec();
|
||||
let now = Instant::now();
|
||||
if now < tx_replay.replay_at {
|
||||
if tx_replay.replay_at > now + retry_after {
|
||||
if tx_replay.replay_at > now + retry_offset {
|
||||
// requeue the transactions will be replayed after retry_after duration
|
||||
sender.send(tx_replay).context("replay channel closed")?;
|
||||
MESSAGES_IN_REPLAY_QUEUE.inc();
|
||||
|
@ -83,7 +83,7 @@ impl TransactionReplayer {
|
|||
if tx_replay.replay_count < tx_replay.max_replay {
|
||||
tx_replay.replay_count += 1;
|
||||
tx_replay.replay_at =
|
||||
Instant::now() + retry_after.mul_f32(tx_replay.replay_count as f32);
|
||||
Instant::now() + retry_offset.mul_f32(tx_replay.replay_count as f32);
|
||||
sender.send(tx_replay).context("replay channel closed")?;
|
||||
MESSAGES_IN_REPLAY_QUEUE.inc();
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ impl TransactionServiceBuilder {
|
|||
replay_channel,
|
||||
block_store,
|
||||
max_retries,
|
||||
replay_after: self.tx_replayer.retry_after,
|
||||
replay_offset: self.tx_replayer.retry_offset,
|
||||
},
|
||||
jh_services,
|
||||
)
|
||||
|
@ -101,7 +101,7 @@ pub struct TransactionService {
|
|||
pub replay_channel: UnboundedSender<TransactionReplay>,
|
||||
pub block_store: BlockInformationStore,
|
||||
pub max_retries: usize,
|
||||
pub replay_after: Duration,
|
||||
pub replay_offset: Duration,
|
||||
}
|
||||
|
||||
impl TransactionService {
|
||||
|
@ -146,7 +146,7 @@ impl TransactionService {
|
|||
e
|
||||
);
|
||||
}
|
||||
let replay_at = Instant::now() + self.replay_after;
|
||||
let replay_at = Instant::now() + self.replay_offset;
|
||||
// ignore error for replay service
|
||||
if self
|
||||
.replay_channel
|
||||
|
|
Loading…
Reference in New Issue