refactor(hermes): state->benchmarks downcasting

This commit is contained in:
Reisen 2024-04-09 12:38:03 +00:00 committed by Reisen
parent 110c6dcea3
commit b4ed825cd6
3 changed files with 45 additions and 20 deletions

View File

@ -403,7 +403,9 @@ where
Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data),
Err(e) => {
if let RequestTime::FirstAfter(publish_time) = request_time {
return Benchmarks::get_verified_price_feeds(state, price_ids, publish_time).await;
return state
.get_verified_price_feeds(price_ids, publish_time)
.await;
}
Err(e)
}

View File

@ -1,7 +1,10 @@
//! This module contains the global state of the application.
use {
self::cache::CacheState,
self::{
benchmarks::BenchmarksState,
cache::CacheState,
},
crate::{
aggregate::{
AggregateState,
@ -29,10 +32,12 @@ pub mod benchmarks;
pub mod cache;
pub struct State {
/// Storage is a short-lived cache of the state of all the updates that have been passed to the
/// store.
/// State for the `Cache` service for short-lived storage of updates.
pub cache: CacheState,
/// State for the `Benchmarks` service for looking up historical updates.
pub benchmarks: BenchmarksState,
/// Sequence numbers of lately observed Vaas. Store uses this set
/// to ignore the previously observed Vaas as a performance boost.
pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
@ -46,9 +51,6 @@ pub struct State {
/// The aggregate module state.
pub aggregate_state: RwLock<AggregateState>,
/// Benchmarks endpoint
pub benchmarks_endpoint: Option<Url>,
/// Metrics registry
pub metrics_registry: RwLock<Registry>,
@ -64,13 +66,13 @@ impl State {
) -> Arc<Self> {
let mut metrics_registry = Registry::default();
Arc::new(Self {
cache: CacheState::new(cache_size),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
api_update_tx: update_tx,
aggregate_state: RwLock::new(AggregateState::new(&mut metrics_registry)),
benchmarks_endpoint,
metrics_registry: RwLock::new(metrics_registry),
cache: CacheState::new(cache_size),
benchmarks: BenchmarksState::new(benchmarks_endpoint),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
api_update_tx: update_tx,
aggregate_state: RwLock::new(AggregateState::new(&mut metrics_registry)),
metrics_registry: RwLock::new(metrics_registry),
price_feeds_metadata: RwLock::new(Default::default()),
})
}

View File

@ -1,6 +1,7 @@
//! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates.
use {
super::State,
crate::{
aggregate::{
PriceFeedsWithUpdateData,
@ -14,6 +15,7 @@ use {
Engine as _,
},
pyth_sdk::PriceIdentifier,
reqwest::Url,
serde::Deserialize,
};
@ -50,7 +52,23 @@ impl TryFrom<BinaryBlob> for Vec<Vec<u8>> {
}
}
#[async_trait::async_trait]
pub struct BenchmarksState {
endpoint: Option<Url>,
}
impl BenchmarksState {
pub fn new(url: Option<Url>) -> Self {
Self { endpoint: url }
}
}
/// Allow downcasting State into BenchmarksState for functions that depend on the `Benchmarks` service.
impl<'a> From<&'a State> for &'a BenchmarksState {
fn from(state: &'a State) -> &'a BenchmarksState {
&state.benchmarks
}
}
pub trait Benchmarks {
async fn get_verified_price_feeds(
&self,
@ -59,22 +77,25 @@ pub trait Benchmarks {
) -> Result<PriceFeedsWithUpdateData>;
}
#[async_trait::async_trait]
impl Benchmarks for crate::state::State {
impl<T> Benchmarks for T
where
for<'a> &'a T: Into<&'a BenchmarksState>,
T: Sync,
{
async fn get_verified_price_feeds(
&self,
price_ids: &[PriceIdentifier],
publish_time: UnixTimestamp,
) -> Result<PriceFeedsWithUpdateData> {
let endpoint = self
.benchmarks_endpoint
.into()
.endpoint
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Benchmarks endpoint is not set"))?
.join(&format!("/v1/updates/price/{}", publish_time))
.unwrap();
let client = reqwest::Client::new();
let mut request = client
let mut request = reqwest::Client::new()
.get(endpoint)
.timeout(BENCHMARKS_REQUEST_TIMEOUT)
.query(&[("encoding", "hex")])