metrics for request reveal processed

This commit is contained in:
0xfirefist 2024-05-08 17:58:55 +05:30
parent cf3dc52ec9
commit 08c8b70953
3 changed files with 81 additions and 8 deletions

View File

@ -108,6 +108,7 @@ pub async fn run_keeper(
chains: HashMap<String, api::BlockchainState>,
config: Config,
private_key: String,
metrics_registry: Arc<metrics::Metrics>,
) -> Result<()> {
let mut handles = Vec::new();
for (chain_id, chain_config) in chains {
@ -121,6 +122,7 @@ pub async fn run_keeper(
private_key,
chain_eth_config,
chain_config.clone(),
metrics_registry.clone(),
)));
}
@ -232,6 +234,7 @@ pub async fn run(opts: &RunOptions) -> Result<()> {
chains.clone(),
config.clone(),
keeper_private_key,
metrics_registry.clone(),
));
}

View File

@ -12,6 +12,10 @@ use {
},
},
config::EthereumConfig,
metrics::{
Metrics,
ProviderLabel,
},
},
anyhow::{
anyhow,
@ -88,6 +92,7 @@ pub async fn run_keeper_threads(
private_key: String,
chain_eth_config: EthereumConfig,
chain_state: BlockchainState,
metrics: Arc<Metrics>,
) {
tracing::info!("starting keeper");
let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
@ -109,6 +114,7 @@ pub async fn run_keeper_threads(
contract.clone(),
chain_eth_config.gas_limit,
chain_state.clone(),
metrics.clone(),
)
.in_current_span(),
);
@ -131,6 +137,7 @@ pub async fn run_keeper_threads(
rx,
Arc::clone(&contract),
chain_eth_config.gas_limit,
metrics.clone(),
)
.in_current_span(),
);
@ -146,6 +153,7 @@ pub async fn process_event(
chain_config: &BlockchainState,
contract: &Arc<SignablePythContract>,
gas_limit: U256,
metrics: Arc<Metrics>,
) -> Result<()> {
if chain_config.provider_address != event.provider_address {
return Ok(());
@ -230,6 +238,13 @@ pub async fn process_event(
"Revealed with res: {:?}",
res
);
metrics
.reveals
.get_or_create(&ProviderLabel {
chain_id: chain_config.id.clone(),
address: chain_config.provider_address.to_string(),
})
.inc();
Ok(())
}
None => {
@ -280,6 +295,7 @@ pub async fn process_block_range(
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: api::BlockchainState,
metrics: Arc<Metrics>,
) {
let BlockRange {
from: first_block,
@ -300,6 +316,7 @@ pub async fn process_block_range(
contract.clone(),
gas_limit,
chain_state.clone(),
metrics.clone(),
)
.in_current_span()
.await;
@ -316,6 +333,7 @@ pub async fn process_single_block_batch(
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: api::BlockchainState,
metrics: Arc<Metrics>,
) {
loop {
let events_res = chain_state
@ -327,11 +345,23 @@ pub async fn process_single_block_batch(
Ok(events) => {
tracing::info!(num_of_events = &events.len(), "Processing",);
for event in &events {
metrics
.requests
.get_or_create(&ProviderLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.inc();
tracing::info!(sequence_number = &event.sequence_number, "Processing event",);
while let Err(e) =
process_event(event.clone(), &chain_state, &contract, gas_limit)
.in_current_span()
.await
while let Err(e) = process_event(
event.clone(),
&chain_state,
&contract,
gas_limit,
metrics.clone(),
)
.in_current_span()
.await
{
tracing::error!(
sequence_number = &event.sequence_number,
@ -342,6 +372,13 @@ pub async fn process_single_block_batch(
time::sleep(RETRY_INTERVAL).await;
}
tracing::info!(sequence_number = &event.sequence_number, "Processed event",);
metrics
.requests_processed
.get_or_create(&ProviderLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.inc();
}
tracing::info!(num_of_events = &events.len(), "Processed",);
break;
@ -469,6 +506,7 @@ pub async fn process_new_blocks(
mut rx: mpsc::Receiver<BlockRange>,
contract: Arc<SignablePythContract>,
gas_limit: U256,
metrics: Arc<Metrics>,
) {
tracing::info!("Waiting for new block ranges to process");
loop {
@ -478,6 +516,7 @@ pub async fn process_new_blocks(
Arc::clone(&contract),
gas_limit,
chain_state.clone(),
metrics.clone(),
)
.in_current_span()
.await;
@ -492,9 +531,10 @@ pub async fn process_backlog(
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: BlockchainState,
metrics: Arc<Metrics>,
) {
tracing::info!("Processing backlog");
process_block_range(backlog_range, contract, gas_limit, chain_state)
process_block_range(backlog_range, contract, gas_limit, chain_state, metrics)
.in_current_span()
.await;
tracing::info!("Backlog processed");

View File

@ -41,9 +41,9 @@ pub struct Metrics {
//
// pub rpc: Family<Label, Counter>,
//
// pub requests: Family<Label, Counter>,
// pub reveal: Family<Label, Counter>,
pub requests: Family<ProviderLabel, Counter>,
pub requests_processed: Family<ProviderLabel, Counter>,
pub reveals: Family<ProviderLabel, Counter>,
// NOTE: gas_spending is not part of metrics.
// why?
// - it is not a value that increases or decreases over time. Not a counter or a gauge
@ -81,11 +81,41 @@ impl Metrics {
end_sequence_number.clone(),
);
let requests = Family::<ProviderLabel, Counter>::default();
metrics_registry.register(
// With the metric name.
"requests",
// And the metric help text.
"Number of requests received",
requests.clone(),
);
let requests_processed = Family::<ProviderLabel, Counter>::default();
metrics_registry.register(
// With the metric name.
"requests_processed",
// And the metric help text.
"Number of requests processed",
requests_processed.clone(),
);
let reveals = Family::<ProviderLabel, Counter>::default();
metrics_registry.register(
// With the metric name.
"reveal",
// And the metric help text.
"Number of reveals",
reveals.clone(),
);
Metrics {
registry: RwLock::new(metrics_registry),
request_counter: http_requests,
current_sequence_number,
end_sequence_number,
requests,
requests_processed,
reveals,
}
}
}