Merge pull request #71 from blockworks-foundation/solving_memory_issue

ignore retring old slots
This commit is contained in:
galactus 2023-03-03 18:06:23 +01:00 committed by GitHub
commit 5b6e853629
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 1 deletions

View File

@ -1,6 +1,6 @@
use std::{
collections::VecDeque,
sync::Arc,
sync::{atomic::AtomicU64, Arc},
time::{Duration, Instant},
};
@ -347,13 +347,23 @@ impl BlockListener {
}
// a task that will queue back the slots to be retried after a certain delay
let recent_slot = Arc::new(AtomicU64::new(0));
{
let slots_task_queue = slots_task_queue.clone();
let recent_slot = recent_slot.clone();
tokio::spawn(async move {
loop {
match slot_retry_queue_rx.recv().await {
Some((slot, error_count, instant)) => {
let now = tokio::time::Instant::now();
let recent_slot =
recent_slot.load(std::sync::atomic::Ordering::Relaxed);
// if slot is too old ignore
if recent_slot.saturating_sub(slot) > 256 {
// slot too old to retry
// most probably its an empty slot
continue;
}
if now < instant {
tokio::time::sleep_until(instant).await;
}
@ -378,6 +388,7 @@ impl BlockListener {
.slot;
// -5 for warmup
let mut last_latest_slot = last_latest_slot - 5;
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
// storage for recent slots processed
let rpc_client = rpc_client.clone();
@ -403,6 +414,7 @@ impl BlockListener {
}
last_latest_slot = new_slot;
recent_slot.store(last_latest_slot, std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
})