remove handle backlog and add re processing

This commit is contained in:
0xfirefist 2024-05-07 14:20:44 +05:30
parent 42d54e79ee
commit 9a6ac7e397
1 changed files with 21 additions and 41 deletions

View File

@ -53,7 +53,7 @@ const RETRY_INTERVAL: Duration = Duration::from_secs(5);
/// How many blocks to look back for events that might be missed when starting the keeper /// How many blocks to look back for events that might be missed when starting the keeper
const BACKLOG_RANGE: u64 = 1000; const BACKLOG_RANGE: u64 = 1000;
/// How much to wait before retrying a past blocks /// How much to wait before retrying a past blocks
const RETRY_PAST_BLOCKS_INTERVAL: Duration = Duration::from_secs(600); const RETRY_BLOCKS_INTERVAL: Duration = Duration::from_secs(600);
/// How many blocks to fetch events for in a single rpc call /// How many blocks to fetch events for in a single rpc call
const BLOCK_BATCH_SIZE: u64 = 100; const BLOCK_BATCH_SIZE: u64 = 100;
/// How much to wait before polling the next latest block /// How much to wait before polling the next latest block
@ -101,13 +101,10 @@ pub async fn run_keeper_threads(
.expect("Chain config should be valid"), .expect("Chain config should be valid"),
); );
// Spawn a thread to handle the events from last BACKLOG_RANGE blocks. // Spawn a thread to re-process blocks starting from the `latest_safe_block - BACKLOG_RANGE`.
spawn( spawn(
process_backlog( re_process_blocks(
BlockRange { latest_safe_block - BACKLOG_RANGE,
from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
to: latest_safe_block,
},
contract.clone(), contract.clone(),
chain_eth_config.gas_limit, chain_eth_config.gas_limit,
chain_state.clone(), chain_state.clone(),
@ -473,41 +470,29 @@ pub async fn process_new_blocks(
} }
} }
/// Processes the backlog_range for a chain. /// Re-process blocks. It waits for 10 minutes before re-processing the blocks.
/// It re-processes the blocks from_block to latest_safe_block-BLOCK_BATCH_SIZE.
/// It lags behind the latest_safe_block as we don't want to re-process the same blocks
/// as the watch_blocks at the same time.
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub async fn process_backlog( pub async fn re_process_blocks(
backlog_range: BlockRange, from_block: BlockNumber,
contract: Arc<SignablePythContract>, contract: Arc<SignablePythContract>,
gas_limit: U256, gas_limit: U256,
chain_state: BlockchainState, chain_state: BlockchainState,
) { ) {
tracing::info!("Processing backlog"); let mut from_block = from_block;
process_block_range(backlog_range, contract, gas_limit, chain_state)
.in_current_span()
.await;
tracing::info!("Backlog processed");
}
/// Retry past blocks. It first retries the range `latest_safe_block - BACKLOG_RANGE` to `latest_safe_block` and
/// then after every 10 minutes it will retry the range between last block it processed and the latest safe block.
#[tracing::instrument(skip_all)]
pub async fn retry_past_blocks(
latest_safe_block: BlockNumber,
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: BlockchainState,
) {
// initialize with backlog range
let mut from_block = latest_safe_block - BACKLOG_RANGE;
let mut to_block = latest_safe_block;
loop { loop {
// As we are adding a lag to latest_safe_block by subtracting BLOCK_BATCH_SIZE, // We don't want to process the same blocks as the watch_blocks at the same time.
// we need to check the following condition // If we process them at the same time, the events might be missed by the both.
// We will lag the to_block by `BLOCK_BATCH_SIZE`.
let to_block =
get_latest_safe_block(&chain_state).in_current_span().await - BLOCK_BATCH_SIZE;
if to_block > from_block { if to_block > from_block {
tracing::info!( tracing::info!(
from_block = &from_block, from_block = &from_block,
to_block = &to_block, to_block = &to_block,
"Retrying past blocks" "Re-processing past blocks"
); );
process_block_range( process_block_range(
@ -525,17 +510,12 @@ pub async fn retry_past_blocks(
tracing::info!( tracing::info!(
from_block = &from_block, from_block = &from_block,
to_block = &to_block, to_block = &to_block,
"Retried past blocks" "Re-processed past blocks"
); );
from_block = to_block + 1; from_block = to_block + 1;
tracing::info!("Waiting for 10 minutes before retrying again");
} }
time::sleep(RETRY_PAST_BLOCKS_INTERVAL).await;
// We don't want to process the same blocks as the watch_blocks at the same time. tracing::info!("Waiting for 10 minutes before re-processing blocks");
// If we process them at the same time, the events might be missed by the both. time::sleep(RETRY_BLOCKS_INTERVAL).await;
// We will lag the range by `BLOCK_BATCH_SIZE`.
to_block = get_latest_safe_block(&chain_state).in_current_span().await - BLOCK_BATCH_SIZE;
} }
} }