retry mechanisms for past blocks
This commit is contained in:
parent
77c68c5069
commit
7eb10b92ff
|
@ -52,6 +52,8 @@ pub struct BlockRange {
|
|||
const RETRY_INTERVAL: Duration = Duration::from_secs(5);
|
||||
/// How many blocks to look back for events that might be missed when starting the keeper
|
||||
const BACKLOG_RANGE: u64 = 1000;
|
||||
/// How much to wait before retrying a past blocks
|
||||
const RETRY_PAST_BLOCKS_INTERVAL: Duration = Duration::from_secs(600);
|
||||
/// How many blocks to fetch events for in a single rpc call
|
||||
const BLOCK_BATCH_SIZE: u64 = 100;
|
||||
/// How much to wait before polling the next latest block
|
||||
|
@ -101,11 +103,8 @@ pub async fn run_keeper_threads(
|
|||
|
||||
// Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
|
||||
spawn(
|
||||
process_backlog(
|
||||
BlockRange {
|
||||
from: latest_safe_block.saturating_sub(BACKLOG_RANGE),
|
||||
to: latest_safe_block,
|
||||
},
|
||||
retry_past_blocks(
|
||||
latest_safe_block,
|
||||
contract.clone(),
|
||||
chain_eth_config.gas_limit,
|
||||
chain_state.clone(),
|
||||
|
@ -471,17 +470,47 @@ pub async fn process_new_blocks(
|
|||
}
|
||||
}
|
||||
|
||||
/// Processes the backlog_range for a chain.
|
||||
/// Retry past blocks. It first retries the range `latest_safe_block - BACKLOG_RANGE` to `latest_safe_block` and
|
||||
/// then after every 10 minutes it will retry the range between last block it processed and the latest safe block.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn process_backlog(
|
||||
backlog_range: BlockRange,
|
||||
pub async fn retry_past_blocks(
|
||||
latest_safe_block: BlockNumber,
|
||||
contract: Arc<SignablePythContract>,
|
||||
gas_limit: U256,
|
||||
chain_state: BlockchainState,
|
||||
) {
|
||||
tracing::info!("Processing backlog");
|
||||
process_block_range(backlog_range, contract, gas_limit, chain_state)
|
||||
// initialize with backlog range
|
||||
let mut from_block = latest_safe_block - BACKLOG_RANGE;
|
||||
let mut latest_safe_block = latest_safe_block;
|
||||
loop {
|
||||
tracing::info!(
|
||||
from_block = &from_block,
|
||||
to_block = &latest_safe_block,
|
||||
"Retrying past blocks"
|
||||
);
|
||||
|
||||
process_block_range(
|
||||
BlockRange {
|
||||
from: from_block,
|
||||
to: latest_safe_block,
|
||||
},
|
||||
contract.clone(),
|
||||
gas_limit,
|
||||
chain_state.clone(),
|
||||
)
|
||||
.in_current_span()
|
||||
.await;
|
||||
tracing::info!("Backlog processed");
|
||||
|
||||
tracing::info!(
|
||||
from_block = &from_block,
|
||||
to_block = &latest_safe_block,
|
||||
"Retried past blocks"
|
||||
);
|
||||
|
||||
from_block = latest_safe_block + 1;
|
||||
|
||||
tracing::info!("Waiting for 10 minutes before retrying again");
|
||||
time::sleep(RETRY_PAST_BLOCKS_INTERVAL).await;
|
||||
latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue