diff --git a/hermes/src/api.rs b/hermes/src/api.rs index cce10edc..b4d74f63 100644 --- a/hermes/src/api.rs +++ b/hermes/src/api.rs @@ -27,7 +27,7 @@ use { mod doc_examples; mod metrics_middleware; mod rest; -mod types; +pub mod types; mod ws; #[derive(Clone)] @@ -154,6 +154,10 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> { .route("/api/latest_vaas", get(rest::latest_vaas)) .route("/api/price_feed_ids", get(rest::price_feed_ids)) .route("/v2/updates/price/latest", get(rest::latest_price_updates)) + .route( + "/v2/updates/price/:publish_time", + get(rest::timestamp_price_updates), + ) .route("/live", get(rest::live)) .route("/ready", get(rest::ready)) .route("/ws", get(ws::ws_route_handler)) diff --git a/hermes/src/api/benchmarks.rs b/hermes/src/api/benchmarks.rs new file mode 100644 index 00000000..63f945f7 --- /dev/null +++ b/hermes/src/api/benchmarks.rs @@ -0,0 +1,134 @@ +//! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates. + +use { + crate::{ + aggregate::{ + PriceFeedUpdate, + PriceFeedsWithUpdateData, + UnixTimestamp, + }, + api::types::PriceUpdate, + }, + anyhow::Result, + base64::{ + engine::general_purpose::STANDARD as base64_standard_engine, + Engine as _, + }, + pyth_sdk::{ + Price, + PriceFeed, + PriceIdentifier, + }, + serde::Deserialize, +}; + +const BENCHMARKS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + +#[derive(Deserialize, Debug, Clone)] +enum BlobEncoding { + #[serde(rename = "base64")] + Base64, + #[serde(rename = "hex")] + Hex, +} + +#[derive(Deserialize, Debug, Clone)] +struct BinaryBlob { + pub encoding: BlobEncoding, + pub data: Vec, +} + +impl TryFrom for PriceFeedsWithUpdateData { + type Error = anyhow::Error; + fn try_from(price_update: PriceUpdate) -> Result { + let price_feeds = match price_update.parsed { + Some(parsed_updates) => parsed_updates + .into_iter() + .map(|parsed_price_update| { + Ok(PriceFeedUpdate { + price_feed: PriceFeed::new( + parsed_price_update.id, + Price { + price: parsed_price_update.price.price, + conf: parsed_price_update.price.conf, + expo: parsed_price_update.price.expo, + publish_time: parsed_price_update.price.publish_time, + }, + Price { + price: parsed_price_update.ema_price.price, + conf: parsed_price_update.ema_price.conf, + expo: parsed_price_update.ema_price.expo, + publish_time: parsed_price_update.ema_price.publish_time, + }, + ), + slot: parsed_price_update.metadata.slot, + received_at: parsed_price_update.metadata.proof_available_time, + update_data: None, // This field is not available in ParsedPriceUpdate + prev_publish_time: parsed_price_update.metadata.prev_publish_time, + }) + }) + .collect::>>(), + None => Err(anyhow::anyhow!("No parsed price updates available")), + }?; + + let update_data = price_update + .binary + .data + .iter() + .map(|hex_str| hex::decode(hex_str).unwrap_or_default()) + .collect::>>(); + + Ok(PriceFeedsWithUpdateData { + price_feeds, + update_data, + }) + } +} + +#[async_trait::async_trait] +pub trait Benchmarks { + async fn get_verified_price_feeds( + &self, + price_ids: &[PriceIdentifier], + publish_time: UnixTimestamp, + ) -> Result; +} + +#[async_trait::async_trait] +impl Benchmarks for crate::state::State { + async fn get_verified_price_feeds( + &self, + price_ids: &[PriceIdentifier], + publish_time: UnixTimestamp, + ) -> Result { + let endpoint = self + .benchmarks_endpoint + .as_ref() + .ok_or_else(|| anyhow::anyhow!("Benchmarks endpoint is not set"))? + .join(&format!("/v1/updates/price/{}", publish_time)) + .unwrap(); + + let client = reqwest::Client::new(); + let mut request = client + .get(endpoint) + .timeout(BENCHMARKS_REQUEST_TIMEOUT) + .query(&[("encoding", "hex")]) + .query(&[("parsed", "true")]); + + for price_id in price_ids { + request = request.query(&[("ids", price_id)]) + } + + let response = request.send().await?; + + if response.status() != reqwest::StatusCode::OK { + return Err(anyhow::anyhow!(format!( + "Price update for price ids {:?} with publish time {} not found in benchmarks. Status code: {}, message: {}", + price_ids, publish_time, response.status(), response.text().await? + ))); + } + + let price_update: PriceUpdate = response.json().await?; + price_update.try_into() + } +} diff --git a/hermes/src/api/rest.rs b/hermes/src/api/rest.rs index 7c29e441..def245bc 100644 --- a/hermes/src/api/rest.rs +++ b/hermes/src/api/rest.rs @@ -31,7 +31,10 @@ pub use { live::*, price_feed_ids::*, ready::*, - v2::latest_price_updates::*, + v2::{ + latest_price_updates::*, + timestamp_price_updates::*, + }, }; pub enum RestError { diff --git a/hermes/src/api/rest/index.rs b/hermes/src/api/rest/index.rs index 5acdd636..8b985803 100644 --- a/hermes/src/api/rest/index.rs +++ b/hermes/src/api/rest/index.rs @@ -17,5 +17,6 @@ pub async fn index() -> impl IntoResponse { "/api/get_vaa?id=&publish_time=", "/api/get_vaa_ccip?data=<0x+>", "/v2/updates/price/latest?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)", + "/v2/updates/price/?ids[]=&ids[]=&..(&encoding=hex|base64)(&parsed=false)", ]) } diff --git a/hermes/src/api/rest/v2/latest_price_updates.rs b/hermes/src/api/rest/v2/latest_price_updates.rs index 3ad7ec13..408eb0fd 100644 --- a/hermes/src/api/rest/v2/latest_price_updates.rs +++ b/hermes/src/api/rest/v2/latest_price_updates.rs @@ -25,12 +25,13 @@ use { Engine as _, }, pyth_sdk::PriceIdentifier, + serde::Deserialize, serde_qs::axum::QsQuery, utoipa::IntoParams, }; -#[derive(Debug, serde::Deserialize, IntoParams)] +#[derive(Debug, Deserialize, IntoParams)] #[into_params(parameter_in=Query)] pub struct LatestPriceUpdatesQueryParams { /// Get the most recent price update for this set of price feed ids. diff --git a/hermes/src/api/rest/v2/mod.rs b/hermes/src/api/rest/v2/mod.rs index 413ee04f..24ef531e 100644 --- a/hermes/src/api/rest/v2/mod.rs +++ b/hermes/src/api/rest/v2/mod.rs @@ -1 +1,2 @@ pub mod latest_price_updates; +pub mod timestamp_price_updates; diff --git a/hermes/src/api/rest/v2/timestamp_price_updates.rs b/hermes/src/api/rest/v2/timestamp_price_updates.rs new file mode 100644 index 00000000..4a01704d --- /dev/null +++ b/hermes/src/api/rest/v2/timestamp_price_updates.rs @@ -0,0 +1,143 @@ +use { + crate::{ + aggregate::{ + RequestTime, + UnixTimestamp, + }, + api::{ + doc_examples, + rest::{ + verify_price_ids_exist, + RestError, + }, + types::{ + BinaryPriceUpdate, + EncodingType, + ParsedPriceUpdate, + PriceIdInput, + PriceUpdate, + }, + }, + }, + anyhow::Result, + axum::{ + extract::{ + Path, + State, + }, + Json, + }, + pyth_sdk::PriceIdentifier, + serde::Deserialize, + serde_qs::axum::QsQuery, + utoipa::IntoParams, +}; + +#[derive(Debug, Deserialize, IntoParams)] +#[into_params(parameter_in=Path)] +pub struct TimestampPriceUpdatesPathParams { + /// The unix timestamp in seconds. This endpoint will return the first update whose + /// publish_time is >= the provided value. + #[param(value_type = i64)] + #[param(example = doc_examples::timestamp_example)] + publish_time: UnixTimestamp, +} + +#[derive(Debug, Deserialize, IntoParams)] +#[into_params(parameter_in=Query)] +pub struct TimestampPriceUpdatesQueryParams { + /// Get the most recent price update for this set of price feed ids. + /// + /// This parameter can be provided multiple times to retrieve multiple price updates, + /// for example see the following query string: + /// + /// ``` + /// ?ids[]=a12...&ids[]=b4c... + /// ``` + #[param(rename = "ids[]")] + #[param(example = "e62df6c8b4a85fe1a67db44dc12de5db330f7ac66b72dc658afedf0f4a415b43")] + ids: Vec, + + /// If true, include the parsed price update in the `parsed` field of each returned feed. + #[serde(default)] + encoding: EncodingType, + + /// If true, include the parsed price update in the `parsed` field of each returned feed. + #[serde(default = "default_true")] + parsed: bool, +} + + +fn default_true() -> bool { + true +} + +/// Get the latest price updates by price feed id. +/// +/// Given a collection of price feed ids, retrieve the latest Pyth price for each price feed. +#[utoipa::path( + get, + path = "/v2/updates/price/{publish_time}", + responses( + (status = 200, description = "Price updates retrieved successfully", body = Vec), + (status = 404, description = "Price ids not found", body = String) + ), + params( + TimestampPriceUpdatesPathParams, + TimestampPriceUpdatesQueryParams + ) +)] +pub async fn timestamp_price_updates( + State(state): State, + Path(path_params): Path, + QsQuery(query_params): QsQuery, +) -> Result>, RestError> { + let price_ids: Vec = + query_params.ids.into_iter().map(|id| id.into()).collect(); + + verify_price_ids_exist(&state, &price_ids).await?; + + let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( + &*state.state, + &price_ids, + RequestTime::FirstAfter(path_params.publish_time), + ) + .await + .map_err(|e| { + tracing::warn!( + "Error getting price feeds {:?} with update data: {:?}", + price_ids, + e + ); + RestError::UpdateDataNotFound + })?; + + let price_update_data = price_feeds_with_update_data.update_data; + let encoded_data: Vec = price_update_data + .into_iter() + .map(|data| query_params.encoding.encode_str(&data)) + .collect(); + let binary_price_update = BinaryPriceUpdate { + encoding: query_params.encoding, + data: encoded_data, + }; + let parsed_price_updates: Option> = if query_params.parsed { + Some( + price_feeds_with_update_data + .price_feeds + .into_iter() + .map(|price_feed| price_feed.into()) + .collect(), + ) + } else { + None + }; + + let compressed_price_update = PriceUpdate { + binary: binary_price_update, + parsed: parsed_price_updates, + }; + + + Ok(Json(vec![compressed_price_update])) +} diff --git a/hermes/src/api/types.rs b/hermes/src/api/types.rs index fe0a66ea..bdafe9f4 100644 --- a/hermes/src/api/types.rs +++ b/hermes/src/api/types.rs @@ -2,9 +2,11 @@ use { super::doc_examples, crate::aggregate::{ PriceFeedUpdate, + PriceFeedsWithUpdateData, Slot, UnixTimestamp, }, + anyhow::Result, base64::{ engine::general_purpose::STANDARD as base64_standard_engine, Engine as _, @@ -17,7 +19,11 @@ use { Deref, DerefMut, }, - pyth_sdk::PriceIdentifier, + pyth_sdk::{ + Price, + PriceFeed, + PriceIdentifier, + }, serde::{ Deserialize, Serialize, @@ -199,6 +205,15 @@ pub enum EncodingType { Base64, } +impl EncodingType { + pub fn encode_str(&self, data: &[u8]) -> String { + match self { + EncodingType::Base64 => base64_standard_engine.encode(data), + EncodingType::Hex => hex::encode(data), + } + } +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] pub struct BinaryPriceUpdate { pub encoding: EncodingType, @@ -207,7 +222,7 @@ pub struct BinaryPriceUpdate { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, ToSchema)] pub struct ParsedPriceUpdate { - pub id: String, + pub id: PriceIdentifier, pub price: RpcPrice, pub ema_price: RpcPrice, pub metadata: RpcPriceFeedMetadataV2, @@ -218,7 +233,7 @@ impl From for ParsedPriceUpdate { let price_feed = price_feed_update.price_feed; Self { - id: price_feed.id.to_string(), + id: price_feed.id, price: RpcPrice { price: price_feed.get_price_unchecked().price, conf: price_feed.get_price_unchecked().conf, @@ -246,3 +261,50 @@ pub struct PriceUpdate { #[serde(skip_serializing_if = "Option::is_none")] pub parsed: Option>, } + +impl TryFrom for PriceFeedsWithUpdateData { + type Error = anyhow::Error; + fn try_from(price_update: PriceUpdate) -> Result { + let price_feeds = match price_update.parsed { + Some(parsed_updates) => parsed_updates + .into_iter() + .map(|parsed_price_update| { + Ok(PriceFeedUpdate { + price_feed: PriceFeed::new( + parsed_price_update.id, + Price { + price: parsed_price_update.price.price, + conf: parsed_price_update.price.conf, + expo: parsed_price_update.price.expo, + publish_time: parsed_price_update.price.publish_time, + }, + Price { + price: parsed_price_update.ema_price.price, + conf: parsed_price_update.ema_price.conf, + expo: parsed_price_update.ema_price.expo, + publish_time: parsed_price_update.ema_price.publish_time, + }, + ), + slot: parsed_price_update.metadata.slot, + received_at: parsed_price_update.metadata.proof_available_time, + update_data: None, // This field is not available in ParsedPriceUpdate + prev_publish_time: parsed_price_update.metadata.prev_publish_time, + }) + }) + .collect::>>(), + None => Err(anyhow::anyhow!("No parsed price updates available")), + }?; + + let update_data = price_update + .binary + .data + .iter() + .map(|hex_str| hex::decode(hex_str).unwrap_or_default()) + .collect::>>(); + + Ok(PriceFeedsWithUpdateData { + price_feeds, + update_data, + }) + } +} diff --git a/hermes/src/state/benchmarks.rs b/hermes/src/state/benchmarks.rs index bc24c78c..0b484b52 100644 --- a/hermes/src/state/benchmarks.rs +++ b/hermes/src/state/benchmarks.rs @@ -1,25 +1,25 @@ //! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates. use { - crate::aggregate::{ - PriceFeedUpdate, - PriceFeedsWithUpdateData, - UnixTimestamp, + crate::{ + aggregate::{ + PriceFeedsWithUpdateData, + UnixTimestamp, + }, + api::types::PriceUpdate, }, anyhow::Result, base64::{ engine::general_purpose::STANDARD as base64_standard_engine, Engine as _, }, - pyth_sdk::{ - PriceFeed, - PriceIdentifier, - }, + pyth_sdk::PriceIdentifier, + serde::Deserialize, }; const BENCHMARKS_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); -#[derive(serde::Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone)] enum BlobEncoding { #[serde(rename = "base64")] Base64, @@ -27,18 +27,12 @@ enum BlobEncoding { Hex, } -#[derive(serde::Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone)] struct BinaryBlob { pub encoding: BlobEncoding, pub data: Vec, } -#[derive(serde::Deserialize, Debug, Clone)] -struct BenchmarkUpdates { - pub parsed: Vec, - pub binary: BinaryBlob, -} - impl TryFrom for Vec> { type Error = anyhow::Error; @@ -56,26 +50,6 @@ impl TryFrom for Vec> { } } -impl TryFrom for PriceFeedsWithUpdateData { - type Error = anyhow::Error; - fn try_from(benchmark_updates: BenchmarkUpdates) -> Result { - Ok(PriceFeedsWithUpdateData { - price_feeds: benchmark_updates - .parsed - .into_iter() - .map(|price_feed| PriceFeedUpdate { - price_feed, - slot: None, - received_at: None, - update_data: None, - prev_publish_time: None, // TODO: Set this field when Benchmarks API supports it. - }) - .collect::>(), - update_data: benchmark_updates.binary.try_into()?, - }) - } -} - #[async_trait::async_trait] pub trait Benchmarks { async fn get_verified_price_feeds( @@ -119,7 +93,7 @@ impl Benchmarks for crate::state::State { ))); } - let benchmark_updates: BenchmarkUpdates = response.json().await?; - benchmark_updates.try_into() + let price_update: PriceUpdate = response.json().await?; + price_update.try_into() } }