ignore retring old slots
This commit is contained in:
parent
eb80c845fe
commit
e59bf043c7
|
@ -1,6 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
sync::Arc,
|
sync::{Arc, atomic::AtomicU64},
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -347,13 +347,22 @@ impl BlockListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
// a task that will queue back the slots to be retried after a certain delay
|
// a task that will queue back the slots to be retried after a certain delay
|
||||||
|
let recent_slot = Arc::new(AtomicU64::new(0));
|
||||||
{
|
{
|
||||||
let slots_task_queue = slots_task_queue.clone();
|
let slots_task_queue = slots_task_queue.clone();
|
||||||
|
let recent_slot = recent_slot.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
match slot_retry_queue_rx.recv().await {
|
match slot_retry_queue_rx.recv().await {
|
||||||
Some((slot, error_count, instant)) => {
|
Some((slot, error_count, instant)) => {
|
||||||
let now = tokio::time::Instant::now();
|
let now = tokio::time::Instant::now();
|
||||||
|
let recent_slot = recent_slot.load(std::sync::atomic::Ordering::Relaxed);
|
||||||
|
// if slot is too old ignore
|
||||||
|
if recent_slot.saturating_sub(slot) > 256 {
|
||||||
|
// slot too old to retry
|
||||||
|
// most probably its an empty slot
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if now < instant {
|
if now < instant {
|
||||||
tokio::time::sleep_until(instant).await;
|
tokio::time::sleep_until(instant).await;
|
||||||
}
|
}
|
||||||
|
@ -378,6 +387,7 @@ impl BlockListener {
|
||||||
.slot;
|
.slot;
|
||||||
// -5 for warmup
|
// -5 for warmup
|
||||||
let mut last_latest_slot = last_latest_slot - 5;
|
let mut last_latest_slot = last_latest_slot - 5;
|
||||||
|
recent_slot.store( last_latest_slot, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
// storage for recent slots processed
|
// storage for recent slots processed
|
||||||
let rpc_client = rpc_client.clone();
|
let rpc_client = rpc_client.clone();
|
||||||
|
@ -403,6 +413,7 @@ impl BlockListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
last_latest_slot = new_slot;
|
last_latest_slot = new_slot;
|
||||||
|
recent_slot.store( last_latest_slot, std::sync::atomic::Ordering::Relaxed);
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue