fix(fortuna): watch blocks from start and infinite get queries (#1551)

* fix: watch blocks from start and infinite get queries

* formatting

* fix

* undo small change
This commit is contained in:
Dev Kalra 2024-05-07 14:21:52 +05:30 committed by GitHub
parent bf2c8b5d43
commit 77c68c5069
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 33 additions and 17 deletions

View File

@ -1488,7 +1488,7 @@ dependencies = [
[[package]]
name = "fortuna"
version = "5.2.1"
version = "5.2.2"
dependencies = [
"anyhow",
"axum",

View File

@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "5.2.1"
version = "5.2.2"
edition = "2021"
[dependencies]

View File

@ -13,7 +13,10 @@ use {
},
config::EthereumConfig,
},
anyhow::Result,
anyhow::{
anyhow,
Result,
},
ethers::{
contract::ContractError,
providers::{
@ -64,10 +67,14 @@ async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
.await
{
Ok(latest_confirmed_block) => {
return latest_confirmed_block - chain_state.reveal_delay_blocks
tracing::info!(
"Fetched latest safe block {}",
latest_confirmed_block - chain_state.reveal_delay_blocks
);
return latest_confirmed_block - chain_state.reveal_delay_blocks;
}
Err(e) => {
tracing::error!("error while getting block number. error: {:?}", e);
tracing::error!("Error while getting block number. error: {:?}", e);
time::sleep(RETRY_INTERVAL).await;
}
}
@ -346,10 +353,11 @@ pub async fn watch_blocks_wrapper(
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) {
let mut last_safe_block_processed = latest_safe_block;
loop {
if let Err(e) = watch_blocks(
chain_state.clone(),
latest_safe_block,
&mut last_safe_block_processed,
tx.clone(),
geth_rpc_wss.clone(),
)
@ -368,12 +376,11 @@ pub async fn watch_blocks_wrapper(
/// know about it.
pub async fn watch_blocks(
chain_state: BlockchainState,
latest_safe_block: BlockNumber,
last_safe_block_processed: &mut BlockNumber,
tx: mpsc::Sender<BlockRange>,
geth_rpc_wss: Option<String>,
) -> Result<()> {
tracing::info!("Watching blocks to handle new events");
let mut last_safe_block_processed = latest_safe_block;
let provider_option = match geth_rpc_wss {
Some(wss) => Some(match Provider::<Ws>::connect(wss.clone()).await {
@ -390,14 +397,23 @@ pub async fn watch_blocks(
};
let mut stream_option = match provider_option {
Some(ref provider) => Some(provider.subscribe_blocks().await?),
Some(ref provider) => Some(match provider.subscribe_blocks().await {
Ok(client) => client,
Err(e) => {
tracing::error!("Error while subscribing to blocks. error {:?}", e);
return Err(e.into());
}
}),
None => None,
};
loop {
match stream_option {
Some(ref mut stream) => {
stream.next().await;
if let None = stream.next().await {
tracing::error!("Error blocks subscription stream ended");
return Err(anyhow!("Error blocks subscription stream ended"));
}
}
None => {
time::sleep(POLL_INTERVAL).await;
@ -405,27 +421,27 @@ pub async fn watch_blocks(
}
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
if latest_safe_block > last_safe_block_processed {
if latest_safe_block > *last_safe_block_processed {
match tx
.send(BlockRange {
from: last_safe_block_processed + 1,
from: *last_safe_block_processed + 1,
to: latest_safe_block,
})
.await
{
Ok(_) => {
tracing::info!(
from_block = &last_safe_block_processed + 1,
from_block = *last_safe_block_processed + 1,
to_block = &latest_safe_block,
"Block range sent to handle events",
);
last_safe_block_processed = latest_safe_block;
*last_safe_block_processed = latest_safe_block;
}
Err(e) => {
tracing::error!(
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
}
};
}