From 93e9c49b6092d533395165749a01f3953cfbfcbb Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Tue, 5 Dec 2023 15:47:40 +0100 Subject: [PATCH] fix(hermes): handle price feed removal properly --- hermes/Cargo.lock | 2 +- hermes/Cargo.toml | 2 +- hermes/src/aggregate.rs | 109 +++++++++++++++++++++++++++++++++----- hermes/src/api/ws.rs | 41 ++++++++++---- hermes/src/state/cache.rs | 28 +++++++++- 5 files changed, 156 insertions(+), 26 deletions(-) diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index 2cfb6725..47d78a0e 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -1574,7 +1574,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermes" -version = "0.4.3" +version = "0.4.4" dependencies = [ "anyhow", "async-trait", diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index c759668a..3fb7334a 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.4.3" +version = "0.4.4" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/hermes/src/aggregate.rs b/hermes/src/aggregate.rs index f91fd6d8..cd56b2e9 100644 --- a/hermes/src/aggregate.rs +++ b/hermes/src/aggregate.rs @@ -251,7 +251,15 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> { // Once the accumulator reaches a complete state for a specific slot // we can build the message states - build_message_states(state, accumulator_messages, wormhole_merkle_state).await?; + let message_states = build_message_states(accumulator_messages, wormhole_merkle_state)?; + + let message_state_keys = message_states + .iter() + .map(|message_state| message_state.key()) + .collect::>(); + + tracing::info!(len = message_states.len(), "Storing Message States."); + state.store_message_states(message_states).await?; // Update the aggregate state let mut aggregate_state = state.aggregate_state.write().await; @@ -266,6 +274,7 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> { .await?; } Some(latest) if slot > latest => { + state.prune_removed_keys(message_state_keys).await; aggregate_state.latest_completed_slot.replace(slot); state .api_update_tx @@ -296,18 +305,17 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> { Ok(()) } -#[tracing::instrument(skip(state, accumulator_messages, wormhole_merkle_state))] -async fn build_message_states( - state: &State, +#[tracing::instrument(skip(accumulator_messages, wormhole_merkle_state))] +fn build_message_states( accumulator_messages: AccumulatorMessages, wormhole_merkle_state: WormholeMerkleState, -) -> Result<()> { +) -> Result> { let wormhole_merkle_message_states_proofs = construct_message_states_proofs(&accumulator_messages, &wormhole_merkle_state)?; let current_time: UnixTimestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _; - let message_states = accumulator_messages + accumulator_messages .raw_messages .into_iter() .enumerate() @@ -326,13 +334,7 @@ async fn build_message_states( current_time, )) }) - .collect::>>()?; - - tracing::info!(len = message_states.len(), "Storing Message States."); - - state.store_message_states(message_states).await?; - - Ok(()) + .collect::>>() } async fn get_verified_price_feeds( @@ -677,6 +679,87 @@ mod test { } } + /// On this test we will initially have two price feeds. Then we will send an update with only + /// price feed 1 (without price feed 2) and make sure that price feed 2 is not stored anymore. + #[tokio::test] + pub async fn test_getting_price_ids_works_fine_after_price_removal() { + let (state, mut update_rx) = setup_state(10).await; + + let price_feed_1 = create_dummy_price_feed_message(100, 10, 9); + let price_feed_2 = create_dummy_price_feed_message(200, 10, 9); + + // Populate the state + store_multiple_concurrent_valid_updates( + state.clone(), + generate_update( + vec![ + Message::PriceFeedMessage(price_feed_1), + Message::PriceFeedMessage(price_feed_2), + ], + 10, + 20, + ), + ) + .await; + + // Check that the update_rx channel has received a message + assert_eq!( + update_rx.recv().await, + Some(AggregationEvent::New { slot: 10 }) + ); + + // Check the price ids are stored correctly + assert_eq!( + get_price_feed_ids(&*state).await, + vec![ + PriceIdentifier::new([100; 32]), + PriceIdentifier::new([200; 32]) + ] + .into_iter() + .collect() + ); + + // Check that price feed 2 exists + assert!(get_price_feeds_with_update_data( + &*state, + &[PriceIdentifier::new([200; 32])], + RequestTime::Latest, + ) + .await + .is_ok()); + + // Now send an update with only price feed 1 (without price feed 2) + // and make sure that price feed 2 is not stored anymore. + let price_feed_1 = create_dummy_price_feed_message(100, 12, 10); + + // Populate the state + store_multiple_concurrent_valid_updates( + state.clone(), + generate_update(vec![Message::PriceFeedMessage(price_feed_1)], 15, 30), + ) + .await; + + // Check that the update_rx channel has received a message + assert_eq!( + update_rx.recv().await, + Some(AggregationEvent::New { slot: 15 }) + ); + + // Check that price feed 2 does not exist anymore + assert_eq!( + get_price_feed_ids(&*state).await, + vec![PriceIdentifier::new([100; 32]),].into_iter().collect() + ); + + assert!(get_price_feeds_with_update_data( + &*state, + &[PriceIdentifier::new([200; 32])], + RequestTime::Latest, + ) + .await + .is_err()); + } + #[tokio::test] pub async fn test_metadata_times_and_readiness_work() { // The receiver channel should stay open for the state to work diff --git a/hermes/src/api/ws.rs b/hermes/src/api/ws.rs index 4a4b6c14..d38ae0d9 100644 --- a/hermes/src/api/ws.rs +++ b/hermes/src/api/ws.rs @@ -350,22 +350,43 @@ impl Subscriber { .keys() .cloned() .collect::>(); - for update in crate::aggregate::get_price_feeds_with_update_data( + + let updates = match crate::aggregate::get_price_feeds_with_update_data( &*self.store, &price_feed_ids, RequestTime::AtSlot(event.slot()), ) .await - .map_err(|e| { - tracing::warn!( - "Failed to get price feeds {:?} with update data: {:?}", - price_feed_ids, - e - ); - e - })? - .price_feeds { + Ok(updates) => updates, + Err(_) => { + // The error can only happen when a price feed was available + // and is no longer there as we check the price feed ids upon + // subscription. In this case we just remove the non-existing + // price feed from the list and will keep sending updates for + // the rest. + let available_price_feed_ids = + crate::aggregate::get_price_feed_ids(&*self.store).await; + + self.price_feeds_with_config + .retain(|price_feed_id, _| available_price_feed_ids.contains(price_feed_id)); + + let price_feed_ids = self + .price_feeds_with_config + .keys() + .cloned() + .collect::>(); + + crate::aggregate::get_price_feeds_with_update_data( + &*self.store, + &price_feed_ids, + RequestTime::AtSlot(event.slot()), + ) + .await? + } + }; + + for update in updates.price_feeds { let config = self .price_feeds_with_config .get(&update.price_feed.id) diff --git a/hermes/src/state/cache.rs b/hermes/src/state/cache.rs index f263c528..d086a319 100644 --- a/hermes/src/state/cache.rs +++ b/hermes/src/state/cache.rs @@ -22,6 +22,7 @@ use { collections::{ BTreeMap, HashMap, + HashSet, }, ops::Bound, sync::Arc, @@ -169,6 +170,7 @@ impl Cache { pub trait AggregateCache { async fn message_state_keys(&self) -> Vec; async fn store_message_states(&self, message_states: Vec) -> Result<()>; + async fn prune_removed_keys(&self, current_keys: HashSet); async fn fetch_message_states( &self, ids: Vec, @@ -206,7 +208,6 @@ impl AggregateCache for crate::state::State { let key = message_state.key(); let time = message_state.time(); let cache = message_cache.entry(key).or_insert_with(BTreeMap::new); - cache.insert(time, message_state); // Remove the earliest message states if the cache size is exceeded @@ -214,9 +215,34 @@ impl AggregateCache for crate::state::State { cache.pop_first(); } } + Ok(()) } + /// This method takes the current feed ids and prunes the cache for the keys + /// that are not present in the current feed ids. + /// + /// There is a side-effect of this: if a key gets removed, we will + /// lose the cache for that key and cannot retrieve it for historical + /// price queries. + async fn prune_removed_keys(&self, current_keys: HashSet) { + let mut message_cache = self.cache.message_cache.write().await; + + // Sometimes, some keys are removed from the accumulator. We track which keys are not + // present in the message states and remove them from the cache. + let keys_in_cache = message_cache + .iter() + .map(|(key, _)| key.clone()) + .collect::>(); + + for key in keys_in_cache { + if !current_keys.contains(&key) { + tracing::info!("Feed {:?} seems to be removed. Removing it from cache", key); + message_cache.remove(&key); + } + } + } + async fn fetch_message_states( &self, ids: Vec,