creating a seperate queuing task

This commit is contained in:
Godmode Galactus 2023-03-02 13:21:33 +01:00
parent 6a54591b45
commit 592d99ec6e
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
1 changed files with 39 additions and 13 deletions

View File

@ -1,4 +1,8 @@
use std::{collections::VecDeque, sync::Arc, time::Instant};
use std::{
collections::VecDeque,
sync::Arc,
time::{Duration, Instant},
};
use dashmap::DashMap;
use jsonrpsee::SubscriptionSink;
@ -292,11 +296,14 @@ impl BlockListener {
postgres: Option<PostgresMpscSend>,
) -> JoinHandle<anyhow::Result<()>> {
let slots_task_queue = Arc::new(Mutex::new(VecDeque::<(u64, u8)>::new()));
let (slot_retry_queue_sx, mut slot_retry_queue_rx) = tokio::sync::mpsc::unbounded_channel();
// task to fetch blocks
for _i in 0..6 {
let this = self.clone();
let postgres = postgres.clone();
let slots_task_queue = slots_task_queue.clone();
let slot_retry_queue_sx = slot_retry_queue_sx.clone();
tokio::spawn(async move {
let slots_task_queue = slots_task_queue.clone();
@ -317,6 +324,9 @@ impl BlockListener {
.index_slot(slot, commitment_config, postgres.clone())
.await
{
// usually as we index all the slots even if they are not been processed we get some errors for slot
// as they are not in long term storage of the rpc // we check 5 times before ignoring the slot
if error_count > 5 {
// retried for 10 times / there should be no block for this slot
warn!(
@ -324,20 +334,36 @@ impl BlockListener {
slot, commitment_config.commitment
);
continue;
} else {
// add a task to be queued after a delay
let retry_at = tokio::time::Instant::now()
.checked_add(Duration::from_millis(100))
.unwrap();
let _ = slot_retry_queue_sx.send((slot, error_count, retry_at));
}
// usually as we index all the slots even if they are not been processed we get some errors for slot
// as they are not in long term storage of the rpc // we check 5 times before ignoring the slot
let slots_task_queue = slots_task_queue.clone();
// create a task that will add errored task to the queue after a timeout
tokio::spawn( async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
};
}
});
}
// a task that will queue back the slots to be retried after a certain delay
{
let slots_task_queue = slots_task_queue.clone();
tokio::spawn(async move {
loop {
match slot_retry_queue_rx.recv().await {
Some((slot, error_count, instant)) => {
let now = tokio::time::Instant::now();
if now < instant {
tokio::time::sleep_until(instant).await;
}
let mut queue = slots_task_queue.lock().await;
queue.push_back((slot, error_count + 1));
}
} );
};
// println!("{i} thread done slot {slot}");
None => {
break;
}
}
}
});
}