Solving issue in block-listner not processing blocks anymore

This commit is contained in:
Godmode Galactus 2023-06-13 10:38:16 +02:00
parent ae095fbe42
commit 18a220e790
No known key found for this signature in database
GPG Key ID: A04142C71ABB0DEA
2 changed files with 43 additions and 28 deletions

View File

@ -270,32 +270,38 @@ impl BlockListener {
let block_schedule_queue_rx = block_schedule_queue_rx.clone();
let task: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
while let Ok(slot) = block_schedule_queue_rx.recv().await {
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.dec();
} else {
BLOCKS_IN_CONFIRMED_QUEUE.dec();
loop {
match block_schedule_queue_rx.recv().await {
Ok(slot) => {
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.dec();
} else {
BLOCKS_IN_CONFIRMED_QUEUE.dec();
}
if this
.index_slot(slot, commitment_config, notifier.clone())
.await
.is_err()
{
// add a task to be queued after a delay
let retry_at = tokio::time::Instant::now()
.checked_add(Duration::from_millis(10))
.unwrap();
slot_retry_queue_sx
.send((slot, retry_at))
.context("Error sending slot to retry queue from slot indexer task")?;
BLOCKS_IN_RETRY_QUEUE.inc();
};
},
Err(e) => {
error!("Error on block listner recv stream {e:?}");
}
}
if this
.index_slot(slot, commitment_config, notifier.clone())
.await
.is_err()
{
// add a task to be queued after a delay
let retry_at = tokio::time::Instant::now()
.checked_add(Duration::from_millis(10))
.unwrap();
slot_retry_queue_sx
.send((slot, retry_at))
.context("Error sending slot to retry queue from slot indexer task")?;
BLOCKS_IN_RETRY_QUEUE.inc();
};
}
bail!("Block Slot channel closed")
//bail!("Block Slot channel closed")
});
task
@ -323,9 +329,10 @@ impl BlockListener {
tokio::time::sleep_until(instant).await;
}
block_schedule_queue_sx.send(slot).await.context(
"Slot retry que failed to send block to block_schedule_queue_sx",
)?;
if let Err(e) = block_schedule_queue_sx.send(slot).await {
error!("Error sending slot on {commitment_config:?} queue for block listner {e:?}");
continue;
}
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.inc();
@ -363,7 +370,10 @@ impl BlockListener {
// context for lock
{
for slot in new_block_slots {
block_schedule_queue_sx.send(slot).await?;
if let Err(e) = block_schedule_queue_sx.send(slot).await {
error!("Error sending slot on {commitment_config:?} queue for block listner {e:?}");
continue;
}
if commitment_config.is_finalized() {
BLOCKS_IN_FINALIZED_QUEUE.inc();

View File

@ -1,3 +1,6 @@
use std::time::Duration;
use log::error;
use prometheus::{Encoder, TextEncoder};
use tokio::{
io::AsyncWriteExt,
@ -43,6 +46,8 @@ impl PrometheusSync {
loop {
let Ok((mut stream, _addr)) = listener.accept().await else {
error!("Error accepting prometheus stream");
tokio::time::sleep(Duration::from_millis(1)).await;
continue;
};