diff --git a/hermes/src/main.rs b/hermes/src/main.rs index 69108cd3..de5ef2e4 100644 --- a/hermes/src/main.rs +++ b/hermes/src/main.rs @@ -1,5 +1,6 @@ #![feature(never_type)] #![feature(slice_group_by)] +#![feature(btree_cursors)] use { crate::store::Store, @@ -33,7 +34,7 @@ async fn init() -> Result<()> { let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000); log::info!("Running Hermes..."); - let store = Store::new_with_local_cache(update_tx, 1000); + let store = Store::new(update_tx, 1000); // Spawn the P2P layer. log::info!("Starting P2P server on {:?}", wh_listen_addrs); diff --git a/hermes/src/store.rs b/hermes/src/store.rs index ae745db9..c02885e8 100644 --- a/hermes/src/store.rs +++ b/hermes/src/store.rs @@ -1,12 +1,16 @@ use { self::{ - proof::wormhole_merkle::construct_update_data, + proof::wormhole_merkle::{ + construct_update_data, + WormholeMerkleState, + }, storage::{ MessageState, MessageStateFilter, - StorageInstance, + Storage, }, types::{ + AccumulatorMessages, PriceFeedUpdate, PriceFeedsWithUpdateData, RequestTime, @@ -19,7 +23,6 @@ use { construct_message_states_proofs, store_wormhole_merkle_verified_message, }, - storage::CompletedAccumulatorState, types::{ ProofSet, UnixTimestamp, @@ -82,17 +85,27 @@ pub mod wormhole; const OBSERVED_CACHE_SIZE: usize = 1000; pub struct Store { - pub storage: StorageInstance, + /// Storage is a short-lived cache of the state of all the updates + /// that have been passed to the store. + pub storage: Storage, + /// Sequence numbers of lately observed Vaas. Store uses this set + /// to ignore the previously observed Vaas as a performance boost. pub observed_vaa_seqs: RwLock>, + /// Wormhole guardian sets. It is used to verify Vaas before using + /// them. pub guardian_set: RwLock>, + /// The sender to the channel between Store and Api to notify + /// completed updates. pub update_tx: Sender<()>, + /// Time of the last completed update. This is used for the health + /// probes. pub last_completed_update_at: RwLock>, } impl Store { - pub fn new_with_local_cache(update_tx: Sender<()>, cache_size: u64) -> Arc { + pub fn new(update_tx: Sender<()>, cache_size: u64) -> Arc { Arc::new(Self { - storage: storage::local_storage::LocalStorage::new_instance(cache_size), + storage: Storage::new(cache_size), observed_vaa_seqs: RwLock::new(Default::default()), guardian_set: RwLock::new(Default::default()), update_tx, @@ -148,34 +161,27 @@ impl Store { let slot = accumulator_messages.slot; log::info!("Storing accumulator messages for slot {:?}.", slot,); self.storage - .update_accumulator_state( - slot, - Box::new(|mut state| { - state.accumulator_messages = Some(accumulator_messages); - state - }), - ) + .store_accumulator_messages(accumulator_messages) .await?; slot } }; - let state = match self.storage.fetch_accumulator_state(slot).await? { - Some(state) => state, - None => return Ok(()), - }; + let accumulator_messages = self.storage.fetch_accumulator_messages(slot).await?; + let wormhole_merkle_state = self.storage.fetch_wormhole_merkle_state(slot).await?; - let completed_state = state.try_into(); - let completed_state: CompletedAccumulatorState = match completed_state { - Ok(completed_state) => completed_state, - Err(_) => { - return Ok(()); - } - }; + let (accumulator_messages, wormhole_merkle_state) = + match (accumulator_messages, wormhole_merkle_state) { + (Some(accumulator_messages), Some(wormhole_merkle_state)) => { + (accumulator_messages, wormhole_merkle_state) + } + _ => return Ok(()), + }; // Once the accumulator reaches a complete state for a specific slot // we can build the message states - self.build_message_states(completed_state).await?; + self.build_message_states(accumulator_messages, wormhole_merkle_state) + .await?; self.update_tx.send(()).await?; @@ -187,15 +193,18 @@ impl Store { Ok(()) } - async fn build_message_states(&self, completed_state: CompletedAccumulatorState) -> Result<()> { + async fn build_message_states( + &self, + accumulator_messages: AccumulatorMessages, + wormhole_merkle_state: WormholeMerkleState, + ) -> Result<()> { let wormhole_merkle_message_states_proofs = - construct_message_states_proofs(&completed_state)?; + construct_message_states_proofs(&accumulator_messages, &wormhole_merkle_state)?; let current_time: UnixTimestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _; - let message_states = completed_state - .accumulator_messages + let message_states = accumulator_messages .raw_messages .into_iter() .enumerate() @@ -210,7 +219,7 @@ impl Store { .ok_or(anyhow!("Missing proof for message"))? .clone(), }, - completed_state.slot, + accumulator_messages.slot, current_time, )) }) diff --git a/hermes/src/store/proof/wormhole_merkle.rs b/hermes/src/store/proof/wormhole_merkle.rs index 99b5d2a0..1d6fe5c5 100644 --- a/hermes/src/store/proof/wormhole_merkle.rs +++ b/hermes/src/store/proof/wormhole_merkle.rs @@ -1,9 +1,7 @@ use { crate::store::{ - storage::{ - CompletedAccumulatorState, - MessageState, - }, + storage::MessageState, + types::AccumulatorMessages, Store, }, anyhow::{ @@ -50,26 +48,18 @@ pub async fn store_wormhole_merkle_verified_message( ) -> Result<()> { store .storage - .update_accumulator_state( - root.slot, - Box::new(|mut state| { - state.wormhole_merkle_state = Some(WormholeMerkleState { - root, - vaa: vaa_bytes, - }); - state - }), - ) + .store_wormhole_merkle_state(WormholeMerkleState { + root, + vaa: vaa_bytes, + }) .await?; Ok(()) } pub fn construct_message_states_proofs( - completed_accumulator_state: &CompletedAccumulatorState, + accumulator_messages: &AccumulatorMessages, + wormhole_merkle_state: &WormholeMerkleState, ) -> Result> { - let accumulator_messages = &completed_accumulator_state.accumulator_messages; - let wormhole_merkle_state = &completed_accumulator_state.wormhole_merkle_state; - // Check whether the state is valid let merkle_acc = match MerkleTree::::from_set( accumulator_messages.raw_messages.iter().map(|m| m.as_ref()), diff --git a/hermes/src/store/storage.rs b/hermes/src/store/storage.rs index 1d1b10af..457ee3ea 100644 --- a/hermes/src/store/storage.rs +++ b/hermes/src/store/storage.rs @@ -14,48 +14,21 @@ use { anyhow, Result, }, - async_trait::async_trait, + dashmap::DashMap, pythnet_sdk::messages::{ FeedId, Message, MessageType, }, + std::{ + collections::BTreeMap, + ops::Bound, + sync::Arc, + }, + strum::IntoEnumIterator, + tokio::sync::RwLock, }; -pub mod local_storage; - -#[derive(Clone, PartialEq, Debug)] -pub struct AccumulatorState { - pub slot: Slot, - pub accumulator_messages: Option, - pub wormhole_merkle_state: Option, -} - -#[derive(Clone, PartialEq, Debug)] -pub struct CompletedAccumulatorState { - pub slot: Slot, - pub accumulator_messages: AccumulatorMessages, - pub wormhole_merkle_state: WormholeMerkleState, -} - -impl TryFrom for CompletedAccumulatorState { - type Error = anyhow::Error; - - fn try_from(state: AccumulatorState) -> Result { - let accumulator_messages = state - .accumulator_messages - .ok_or_else(|| anyhow!("missing accumulator messages"))?; - let wormhole_merkle_state = state - .wormhole_merkle_state - .ok_or_else(|| anyhow!("missing wormhole merkle state"))?; - Ok(Self { - slot: state.slot, - accumulator_messages, - wormhole_merkle_state, - }) - } -} - #[derive(Clone, PartialEq, Eq, Debug, Hash)] pub struct MessageStateKey { pub feed_id: FeedId, @@ -116,124 +89,656 @@ pub enum MessageStateFilter { Only(MessageType), } -/// This trait defines the interface for update data storage -/// -/// Price update data for Pyth can come in multiple formats, for example VAA's and -/// Merkle proofs. The abstraction therefore allows storing these as binary -/// data to abstract the details of the update data, and so each update data is stored -/// under a separate key. The caller is responsible for specifying the right -/// key for the update data they wish to access. -#[async_trait] -pub trait Storage: Send + Sync { - async fn message_state_keys(&self) -> Vec; - async fn store_message_states(&self, message_states: Vec) -> Result<()>; - async fn fetch_message_states( +pub struct Storage { + message_cache: Arc>>, + /// Accumulator messages cache + /// + /// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap. + accumulator_messages_cache: Arc>>, + /// Wormhole merkle state cache + /// + /// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap. + wormhole_merkle_state_cache: Arc>>, + cache_size: u64, +} + +impl Storage { + pub fn new(cache_size: u64) -> Self { + Self { + message_cache: Arc::new(DashMap::new()), + accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())), + wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())), + cache_size, + } + } + + pub async fn message_state_keys(&self) -> Vec { + self.message_cache + .iter() + .map(|entry| entry.key().clone()) + .collect::>() + } + + pub async fn store_message_states(&self, message_states: Vec) -> Result<()> { + for message_state in message_states { + let key = message_state.key(); + let time = message_state.time(); + let mut cache = self.message_cache.entry(key).or_insert_with(BTreeMap::new); + + cache.insert(time, message_state); + + // Remove the earliest message states if the cache size is exceeded + while cache.len() > self.cache_size as usize { + cache.pop_first(); + } + } + Ok(()) + } + + fn retrieve_message_state( + &self, + key: MessageStateKey, + request_time: RequestTime, + ) -> Option { + match self.message_cache.get(&key) { + Some(key_cache) => { + match request_time { + RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(), + RequestTime::FirstAfter(time) => { + // If the requested time is before the first element in the vector, we are + // not sure that the first element is the closest one. + if let Some((_, oldest_record_value)) = key_cache.first_key_value() { + if time < oldest_record_value.time().publish_time { + return None; + } + } + + let lookup_time = MessageStateTime { + publish_time: time, + slot: 0, + }; + + // Get the first element that is greater than or equal to the lookup time. + key_cache + .lower_bound(Bound::Included(&lookup_time)) + .value() + .cloned() + } + } + } + None => None, + } + } + + pub async fn fetch_message_states( &self, ids: Vec, request_time: RequestTime, filter: MessageStateFilter, - ) -> Result>; + ) -> Result> { + ids.into_iter() + .flat_map(|id| { + let request_time = request_time.clone(); + let message_types: Vec = match filter { + MessageStateFilter::All => MessageType::iter().collect(), + MessageStateFilter::Only(t) => vec![t], + }; - /// Store the accumulator state. Please note that this call will replace the - /// existing accumulator state for the given state's slot. If you wish to - /// update the accumulator state, use `update_accumulator_state` instead. - async fn store_accumulator_state(&self, state: AccumulatorState) -> Result<()>; - async fn fetch_accumulator_state(&self, slot: Slot) -> Result>; + message_types.into_iter().map(move |message_type| { + let key = MessageStateKey { + feed_id: id, + type_: message_type, + }; + self.retrieve_message_state(key, request_time.clone()) + .ok_or(anyhow!("Message not found")) + }) + }) + .collect() + } - /// Update the accumulator state inplace using the provided callback. The callback - /// takes the current state and returns the new state. If there is no accumulator - /// state for the given slot, the callback will be called with an empty accumulator state. - async fn update_accumulator_state( + pub async fn store_accumulator_messages( + &self, + accumulator_messages: AccumulatorMessages, + ) -> Result<()> { + let mut cache = self.accumulator_messages_cache.write().await; + cache.insert(accumulator_messages.slot, accumulator_messages); + while cache.len() > self.cache_size as usize { + cache.pop_first(); + } + Ok(()) + } + + pub async fn fetch_accumulator_messages( &self, slot: Slot, - callback: Box AccumulatorState) + Send>, - ) -> Result<()>; -} + ) -> Result> { + let cache = self.accumulator_messages_cache.read().await; + Ok(cache.get(&slot).cloned()) + } -pub type StorageInstance = Box; + pub async fn store_wormhole_merkle_state( + &self, + wormhole_merkle_state: WormholeMerkleState, + ) -> Result<()> { + let mut cache = self.wormhole_merkle_state_cache.write().await; + cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state); + while cache.len() > self.cache_size as usize { + cache.pop_first(); + } + Ok(()) + } + + pub async fn fetch_wormhole_merkle_state( + &self, + slot: Slot, + ) -> Result> { + let cache = self.wormhole_merkle_state_cache.read().await; + Ok(cache.get(&slot).cloned()) + } +} #[cfg(test)] mod test { use { super::*, - pythnet_sdk::wire::v1::WormholeMerkleRoot, + crate::store::{ + proof::wormhole_merkle::{ + WormholeMerkleMessageProof, + WormholeMerkleState, + }, + types::{ + AccumulatorMessages, + ProofSet, + }, + }, + pyth_sdk::UnixTimestamp, + pythnet_sdk::{ + accumulators::merkle::MerklePath, + hashers::keccak256_160::Keccak160, + messages::{ + Message, + PriceFeedMessage, + }, + wire::v1::WormholeMerkleRoot, + }, }; - #[test] - pub fn test_complete_accumulator_state_try_from_accumulator_state_works() { - let accumulator_state = AccumulatorState { - slot: 1, - accumulator_messages: None, - wormhole_merkle_state: None, - }; - - assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err()); - - let accumulator_state = AccumulatorState { - slot: 1, - accumulator_messages: Some(AccumulatorMessages { - slot: 1, - magic: [0; 4], - ring_size: 10, - raw_messages: vec![], + pub fn create_dummy_price_feed_message_state( + feed_id: FeedId, + publish_time: i64, + slot: Slot, + ) -> MessageState { + MessageState { + slot, + raw_message: vec![], + message: Message::PriceFeedMessage(PriceFeedMessage { + feed_id, + publish_time, + price: 1, + conf: 2, + exponent: 3, + ema_price: 4, + ema_conf: 5, + prev_publish_time: 6, }), - wormhole_merkle_state: None, - }; - - assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err()); - - let accumulator_state = AccumulatorState { - slot: 1, - accumulator_messages: None, - wormhole_merkle_state: Some(WormholeMerkleState { - vaa: vec![], - root: WormholeMerkleRoot { - slot: 1, - ring_size: 10, - root: [0; 20], + received_at: publish_time, + proof_set: ProofSet { + wormhole_merkle_proof: WormholeMerkleMessageProof { + vaa: vec![], + proof: MerklePath::::new(vec![]), }, - }), - }; + }, + } + } - assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err()); + pub async fn create_and_store_dummy_price_feed_message_state( + storage: &Storage, + feed_id: FeedId, + publish_time: UnixTimestamp, + slot: Slot, + ) -> MessageState { + let message_state = create_dummy_price_feed_message_state(feed_id, publish_time, slot); + storage + .store_message_states(vec![message_state.clone()]) + .await + .unwrap(); + message_state + } - let accumulator_state = AccumulatorState { - slot: 1, - accumulator_messages: Some(AccumulatorMessages { - slot: 1, - magic: [0; 4], - ring_size: 10, - raw_messages: vec![], - }), - wormhole_merkle_state: Some(WormholeMerkleState { - vaa: vec![], - root: WormholeMerkleRoot { - slot: 1, - ring_size: 10, - root: [0; 20], - }, - }), - }; + #[tokio::test] + pub async fn test_store_and_retrieve_latest_message_state_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = Storage::new(2); + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // The latest message state should be the one we just stored. assert_eq!( - CompletedAccumulatorState::try_from(accumulator_state).unwrap(), - CompletedAccumulatorState { - slot: 1, - accumulator_messages: AccumulatorMessages { - slot: 1, - magic: [0; 4], - ring_size: 10, - raw_messages: vec![], - }, - wormhole_merkle_state: WormholeMerkleState { - vaa: vec![], - root: WormholeMerkleRoot { - slot: 1, - ring_size: 10, - root: [0; 20], - }, - }, - } + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .unwrap(), + vec![message_state] ); } + + #[tokio::test] + pub async fn test_store_and_retrieve_latest_message_state_with_multiple_update_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = Storage::new(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let _old_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 20 at slot 10. + let new_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await; + + // The latest message state should be the one with publish time 20. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .unwrap(), + vec![new_message_state] + ); + } + + #[tokio::test] + pub async fn test_store_and_retrieve_latest_message_state_with_out_of_order_update_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = Storage::new(2); + + // Create and store a message state with feed id [1....] and publish time 20 at slot 10. + let new_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await; + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let _old_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // The latest message state should be the one with publish time 20. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .unwrap(), + vec![new_message_state] + ); + } + + #[tokio::test] + pub async fn test_store_and_retrieve_first_after_message_state_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = Storage::new(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let old_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 13 at slot 10. + let new_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; + + // The first message state after time 10 should be the old message state. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(10), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .unwrap(), + vec![old_message_state] + ); + + // Querying the first after pub time 11, 12, 13 should all return the new message state. + for request_time in 11..14 { + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(request_time), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .unwrap(), + vec![new_message_state.clone()] + ); + } + } + + #[tokio::test] + pub async fn test_store_and_retrieve_latest_message_state_with_same_pubtime_works() { + // Initialize a storage with a cache size of 2 per key. + let storage = Storage::new(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let slightly_older_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 10 at slot 7. + let slightly_newer_message_state = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 7).await; + + // The latest message state should be the one with the higher slot. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .unwrap(), + vec![slightly_newer_message_state] + ); + + // Querying the first message state after time 10 should return the one with the lower slot. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(10), + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .unwrap(), + vec![slightly_older_message_state] + ); + } + + + #[tokio::test] + pub async fn test_store_and_retrieve_first_after_message_state_fails_for_past_time() { + // Initialize a storage with a cache size of 2 per key. + let storage = Storage::new(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 13 at slot 10. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; + + // Query the message state before the available times should return an error. + // This is because we are not sure that the first available message is really the first. + assert!(storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(9), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .is_err()); + } + + #[tokio::test] + pub async fn test_store_and_retrieve_first_after_message_state_fails_for_future_time() { + // Initialize a storage with a cache size of 2 per key. + let storage = Storage::new(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 13 at slot 10. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; + + // Query the message state after the available times should return an error. + assert!(storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(14), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .is_err()); + } + + #[tokio::test] + pub async fn test_store_more_message_states_than_cache_size_evicts_old_messages() { + // Initialize a storage with a cache size of 2 per key. + let storage = Storage::new(2); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [1....] and publish time 13 at slot 10. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; + + // Create and store a message state with feed id [1....] and publish time 20 at slot 14. + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 14).await; + + // The message at time 10 should be evicted and querying for it should return an error. + assert!(storage + .fetch_message_states( + vec![[1; 32]], + RequestTime::FirstAfter(10), + MessageStateFilter::Only(MessageType::PriceFeedMessage) + ) + .await + .is_err()); + } + + #[tokio::test] + pub async fn test_store_and_fetch_multiple_message_feed_ids_works() { + // Initialize a storage with a cache size of 1 per key. + let storage = Storage::new(1); + + // Create and store a message state with feed id [1....] and publish time 10 at slot 5. + let message_state_1 = + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Create and store a message state with feed id [2....] and publish time 13 at slot 10. + let message_state_2 = + create_and_store_dummy_price_feed_message_state(&storage, [2; 32], 10, 5).await; + + // Check both message states can be retrieved. + assert_eq!( + storage + .fetch_message_states( + vec![[1; 32], [2; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .unwrap(), + vec![message_state_1, message_state_2] + ); + } + + #[tokio::test] + pub async fn test_fetch_not_existent_message_fails() { + // Initialize a storage with a cache size of 2 per key. + let storage = Storage::new(2); + + create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; + + // Check both message states can be retrieved. + assert!(storage + .fetch_message_states( + vec![[2; 32]], + RequestTime::Latest, + MessageStateFilter::Only(MessageType::PriceFeedMessage), + ) + .await + .is_err()); + } + + pub fn create_empty_accumulator_messages_at_slot(slot: Slot) -> AccumulatorMessages { + AccumulatorMessages { + magic: [0; 4], + slot, + ring_size: 3, + raw_messages: vec![], + } + } + + #[tokio::test] + pub async fn test_store_and_fetch_accumulator_messages_works() { + // Initialize a storage with a cache size of 2 per key and the accumulator state. + let storage = Storage::new(2); + + // Make sure the retrieved accumulator messages is what we store. + let mut accumulator_messages_at_10 = create_empty_accumulator_messages_at_slot(10); + storage + .store_accumulator_messages(accumulator_messages_at_10.clone()) + .await + .unwrap(); + assert_eq!( + storage + .fetch_accumulator_messages(10) + .await + .unwrap() + .unwrap(), + accumulator_messages_at_10 + ); + + // Make sure overwriting the accumulator messages works. + accumulator_messages_at_10.ring_size = 5; // Change the ring size from 3 to 5. + storage + .store_accumulator_messages(accumulator_messages_at_10.clone()) + .await + .unwrap(); + assert_eq!( + storage + .fetch_accumulator_messages(10) + .await + .unwrap() + .unwrap(), + accumulator_messages_at_10 + ); + + // Create and store an accumulator messages with slot 5 and check it's stored. + let accumulator_messages_at_5 = create_empty_accumulator_messages_at_slot(5); + storage + .store_accumulator_messages(accumulator_messages_at_5.clone()) + .await + .unwrap(); + assert_eq!( + storage + .fetch_accumulator_messages(5) + .await + .unwrap() + .unwrap(), + accumulator_messages_at_5 + ); + + // Add a newer accumulator messages with slot 15 to exceed cache size and make sure the earliest is evicted. + let accumulator_messages_at_15 = create_empty_accumulator_messages_at_slot(15); + storage + .store_accumulator_messages(accumulator_messages_at_15.clone()) + .await + .unwrap(); + assert_eq!( + storage + .fetch_accumulator_messages(15) + .await + .unwrap() + .unwrap(), + accumulator_messages_at_15 + ); + assert!(storage + .fetch_accumulator_messages(5) + .await + .unwrap() + .is_none()); + } + + pub fn create_empty_wormhole_merkle_state_at_slot(slot: Slot) -> WormholeMerkleState { + WormholeMerkleState { + vaa: vec![], + root: WormholeMerkleRoot { + slot, + root: [0; 20], + ring_size: 3, + }, + } + } + + #[tokio::test] + pub async fn test_store_and_fetch_wormhole_merkle_state_works() { + // Initialize a storage with a cache size of 2 per key and the accumulator state. + let storage = Storage::new(2); + + // Make sure the retrieved wormhole merkle state is what we store + let mut wormhole_merkle_state_at_10 = create_empty_wormhole_merkle_state_at_slot(10); + storage + .store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone()) + .await + .unwrap(); + assert_eq!( + storage + .fetch_wormhole_merkle_state(10) + .await + .unwrap() + .unwrap(), + wormhole_merkle_state_at_10 + ); + + // Make sure overwriting the wormhole merkle state works. + wormhole_merkle_state_at_10.root.ring_size = 5; // Change the ring size from 3 to 5. + storage + .store_wormhole_merkle_state(wormhole_merkle_state_at_10.clone()) + .await + .unwrap(); + assert_eq!( + storage + .fetch_wormhole_merkle_state(10) + .await + .unwrap() + .unwrap(), + wormhole_merkle_state_at_10 + ); + + // Create and store an wormhole merkle state with slot 5 and check it's stored. + let wormhole_merkle_state_at_5 = create_empty_wormhole_merkle_state_at_slot(5); + storage + .store_wormhole_merkle_state(wormhole_merkle_state_at_5.clone()) + .await + .unwrap(); + assert_eq!( + storage + .fetch_wormhole_merkle_state(5) + .await + .unwrap() + .unwrap(), + wormhole_merkle_state_at_5 + ); + + // Add a newer wormhole merkle state with slot 15 to exceed cache size and make sure the earliest is evicted. + let wormhole_merkle_state_at_15 = create_empty_wormhole_merkle_state_at_slot(15); + storage + .store_wormhole_merkle_state(wormhole_merkle_state_at_15.clone()) + .await + .unwrap(); + assert_eq!( + storage + .fetch_wormhole_merkle_state(15) + .await + .unwrap() + .unwrap(), + wormhole_merkle_state_at_15 + ); + assert!(storage + .fetch_wormhole_merkle_state(5) + .await + .unwrap() + .is_none()); + } } diff --git a/hermes/src/store/storage/local_storage.rs b/hermes/src/store/storage/local_storage.rs deleted file mode 100644 index c081dde2..00000000 --- a/hermes/src/store/storage/local_storage.rs +++ /dev/null @@ -1,862 +0,0 @@ -use { - super::{ - AccumulatorState, - MessageState, - MessageStateFilter, - MessageStateKey, - MessageStateTime, - RequestTime, - Storage, - StorageInstance, - }, - crate::store::types::Slot, - anyhow::{ - anyhow, - Result, - }, - async_trait::async_trait, - dashmap::DashMap, - pythnet_sdk::messages::{ - FeedId, - MessageType, - }, - std::{ - collections::VecDeque, - sync::Arc, - }, - strum::IntoEnumIterator, - tokio::sync::RwLock, -}; - -#[derive(Clone)] -pub struct LocalStorage { - message_cache: Arc>>, - accumulator_cache: Arc>>, - cache_size: u64, -} - -impl LocalStorage { - pub fn new_instance(cache_size: u64) -> StorageInstance { - Box::new(Self { - message_cache: Arc::new(DashMap::new()), - accumulator_cache: Arc::new(RwLock::new(VecDeque::new())), - cache_size, - }) - } - - fn retrieve_message_state( - &self, - key: MessageStateKey, - request_time: RequestTime, - ) -> Option { - match self.message_cache.get(&key) { - Some(key_cache) => { - match request_time { - RequestTime::Latest => key_cache.back().cloned(), - RequestTime::FirstAfter(time) => { - // If the requested time is before the first element in the vector, we are - // not sure that the first element is the closest one. - if let Some(oldest_record) = key_cache.front() { - if time < oldest_record.time().publish_time { - return None; - } - } - - let lookup_time = MessageStateTime { - publish_time: time, - slot: 0, - }; - - // Binary search returns Ok(idx) if the element is found at index idx or Err(idx) if it's not - // found which idx is the index where the element should be inserted to keep the vector sorted. - // Getting idx within any of the match arms will give us the index of the element that is - // closest after or equal to the requested time. - let idx = match key_cache - .binary_search_by_key(&lookup_time, |record| record.time()) - { - Ok(idx) => idx, - Err(idx) => idx, - }; - - // We are using `get` to handle out of bound idx. This happens if the - // requested time is after the last element in the vector. - key_cache.get(idx).cloned() - } - } - } - None => None, - } - } - - /// Store the accumulator state in the cache assuming that the lock is already acquired. - fn store_accumulator_state_impl( - &self, - state: AccumulatorState, - cache: &mut VecDeque, - ) { - cache.push_back(state); - - let mut i = cache.len().saturating_sub(1); - while i > 0 && cache[i - 1].slot > cache[i].slot { - cache.swap(i - 1, i); - i -= 1; - } - - if cache.len() > self.cache_size as usize { - cache.pop_front(); - } - } -} - -#[async_trait] -impl Storage for LocalStorage { - /// Add a new db entry to the cache. - /// - /// This method keeps the backed store sorted for efficiency, and removes - /// the oldest record in the cache if the max_size is reached. Entries are - /// usually added in increasing order and likely to be inserted near the - /// end of the deque. The function is optimized for this specific case. - async fn store_message_states(&self, message_states: Vec) -> Result<()> { - for message_state in message_states { - let key = message_state.key(); - - let mut key_cache = self.message_cache.entry(key).or_insert_with(VecDeque::new); - - key_cache.push_back(message_state); - - // Shift the pushed record until it's in the right place. - let mut i = key_cache.len().saturating_sub(1); - while i > 0 && key_cache[i - 1].time() > key_cache[i].time() { - key_cache.swap(i - 1, i); - i -= 1; - } - - // FIXME remove equal elements by key and time - - // Remove the oldest record if the max size is reached. - if key_cache.len() > self.cache_size as usize { - key_cache.pop_front(); - } - } - - Ok(()) - } - - async fn fetch_message_states( - &self, - ids: Vec, - request_time: RequestTime, - filter: MessageStateFilter, - ) -> Result> { - ids.into_iter() - .flat_map(|id| { - let request_time = request_time.clone(); - let message_types: Vec = match filter { - MessageStateFilter::All => MessageType::iter().collect(), - MessageStateFilter::Only(t) => vec![t], - }; - - message_types.into_iter().map(move |message_type| { - let key = MessageStateKey { - feed_id: id, - type_: message_type, - }; - self.retrieve_message_state(key, request_time.clone()) - .ok_or(anyhow!("Message not found")) - }) - }) - .collect() - } - - async fn message_state_keys(&self) -> Vec { - self.message_cache - .iter() - .map(|entry| entry.key().clone()) - .collect() - } - - async fn store_accumulator_state(&self, state: super::AccumulatorState) -> Result<()> { - let mut accumulator_cache = self.accumulator_cache.write().await; - self.store_accumulator_state_impl(state, &mut accumulator_cache); - Ok(()) - } - - async fn fetch_accumulator_state(&self, slot: Slot) -> Result> { - let accumulator_cache = self.accumulator_cache.read().await; - match accumulator_cache.binary_search_by_key(&slot, |state| state.slot) { - Ok(idx) => Ok(accumulator_cache.get(idx).cloned()), - Err(_) => Ok(None), - } - } - - async fn update_accumulator_state( - &self, - slot: Slot, - callback: Box AccumulatorState) + Send>, - ) -> Result<()> { - let mut accumulator_cache = self.accumulator_cache.write().await; - match accumulator_cache.binary_search_by_key(&slot, |state| state.slot) { - Ok(idx) => { - let state = accumulator_cache.get_mut(idx).unwrap(); - *state = callback(state.clone()); - } - Err(_) => { - let state = callback(AccumulatorState { - slot, - accumulator_messages: None, - wormhole_merkle_state: None, - }); - self.store_accumulator_state_impl(state, &mut accumulator_cache); - } - } - - Ok(()) - } -} - -#[cfg(test)] -mod test { - use { - super::*, - crate::store::{ - proof::wormhole_merkle::{ - WormholeMerkleMessageProof, - WormholeMerkleState, - }, - types::{ - AccumulatorMessages, - ProofSet, - }, - }, - futures::future::join_all, - pyth_sdk::UnixTimestamp, - pythnet_sdk::{ - accumulators::merkle::MerklePath, - hashers::keccak256_160::Keccak160, - messages::{ - Message, - PriceFeedMessage, - }, - wire::v1::WormholeMerkleRoot, - }, - }; - - pub fn create_dummy_price_feed_message_state( - feed_id: FeedId, - publish_time: i64, - slot: Slot, - ) -> MessageState { - MessageState { - slot, - raw_message: vec![], - message: Message::PriceFeedMessage(PriceFeedMessage { - feed_id, - publish_time, - price: 1, - conf: 2, - exponent: 3, - ema_price: 4, - ema_conf: 5, - prev_publish_time: 6, - }), - received_at: publish_time, - proof_set: ProofSet { - wormhole_merkle_proof: WormholeMerkleMessageProof { - vaa: vec![], - proof: MerklePath::::new(vec![]), - }, - }, - } - } - - pub async fn create_and_store_dummy_price_feed_message_state( - storage: &StorageInstance, - feed_id: FeedId, - publish_time: UnixTimestamp, - slot: Slot, - ) -> MessageState { - let message_state = create_dummy_price_feed_message_state(feed_id, publish_time, slot); - storage - .store_message_states(vec![message_state.clone()]) - .await - .unwrap(); - message_state - } - - #[tokio::test] - pub async fn test_store_and_retrieve_latest_message_state_works() { - // Initialize a storage with a cache size of 2 per key. - let storage = LocalStorage::new_instance(2); - - // Create and store a message state with feed id [1....] and publish time 10 at slot 5. - let message_state = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // The latest message state should be the one we just stored. - assert_eq!( - storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::Latest, - MessageStateFilter::Only(MessageType::PriceFeedMessage), - ) - .await - .unwrap(), - vec![message_state] - ); - } - - #[tokio::test] - pub async fn test_store_and_retrieve_latest_message_state_with_multiple_update_works() { - // Initialize a storage with a cache size of 2 per key. - let storage = LocalStorage::new_instance(2); - - // Create and store a message state with feed id [1....] and publish time 10 at slot 5. - let _old_message_state = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // Create and store a message state with feed id [1....] and publish time 20 at slot 10. - let new_message_state = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await; - - // The latest message state should be the one with publish time 20. - assert_eq!( - storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::Latest, - MessageStateFilter::Only(MessageType::PriceFeedMessage) - ) - .await - .unwrap(), - vec![new_message_state] - ); - } - - #[tokio::test] - pub async fn test_store_and_retrieve_latest_message_state_with_out_of_order_update_works() { - // Initialize a storage with a cache size of 2 per key. - let storage = LocalStorage::new_instance(2); - - // Create and store a message state with feed id [1....] and publish time 20 at slot 10. - let new_message_state = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 10).await; - - // Create and store a message state with feed id [1....] and publish time 10 at slot 5. - let _old_message_state = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // The latest message state should be the one with publish time 20. - assert_eq!( - storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::Latest, - MessageStateFilter::Only(MessageType::PriceFeedMessage) - ) - .await - .unwrap(), - vec![new_message_state] - ); - } - - #[tokio::test] - pub async fn test_store_and_retrieve_first_after_message_state_works() { - // Initialize a storage with a cache size of 2 per key. - let storage = LocalStorage::new_instance(2); - - // Create and store a message state with feed id [1....] and publish time 10 at slot 5. - let old_message_state = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // Create and store a message state with feed id [1....] and publish time 13 at slot 10. - let new_message_state = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; - - // The first message state after time 10 should be the old message state. - assert_eq!( - storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::FirstAfter(10), - MessageStateFilter::Only(MessageType::PriceFeedMessage) - ) - .await - .unwrap(), - vec![old_message_state] - ); - - // Querying the first after pub time 11, 12, 13 should all return the new message state. - for request_time in 11..14 { - assert_eq!( - storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::FirstAfter(request_time), - MessageStateFilter::Only(MessageType::PriceFeedMessage) - ) - .await - .unwrap(), - vec![new_message_state.clone()] - ); - } - } - - #[tokio::test] - pub async fn test_store_and_retrieve_latest_message_state_with_same_pubtime_works() { - // Initialize a storage with a cache size of 2 per key. - let storage = LocalStorage::new_instance(2); - - // Create and store a message state with feed id [1....] and publish time 10 at slot 5. - let slightly_older_message_state = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // Create and store a message state with feed id [1....] and publish time 10 at slot 7. - let slightly_newer_message_state = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 7).await; - - // The latest message state should be the one with the higher slot. - assert_eq!( - storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::Latest, - MessageStateFilter::Only(MessageType::PriceFeedMessage), - ) - .await - .unwrap(), - vec![slightly_newer_message_state] - ); - - // Querying the first message state after time 10 should return the one with the lower slot. - assert_eq!( - storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::FirstAfter(10), - MessageStateFilter::Only(MessageType::PriceFeedMessage), - ) - .await - .unwrap(), - vec![slightly_older_message_state] - ); - } - - - #[tokio::test] - pub async fn test_store_and_retrieve_first_after_message_state_fails_for_past_time() { - // Initialize a storage with a cache size of 2 per key. - let storage = LocalStorage::new_instance(2); - - // Create and store a message state with feed id [1....] and publish time 10 at slot 5. - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // Create and store a message state with feed id [1....] and publish time 13 at slot 10. - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; - - // Query the message state before the available times should return an error. - // This is because we are not sure that the first available message is really the first. - assert!(storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::FirstAfter(9), - MessageStateFilter::Only(MessageType::PriceFeedMessage) - ) - .await - .is_err()); - } - - #[tokio::test] - pub async fn test_store_and_retrieve_first_after_message_state_fails_for_future_time() { - // Initialize a storage with a cache size of 2 per key. - let storage = LocalStorage::new_instance(2); - - // Create and store a message state with feed id [1....] and publish time 10 at slot 5. - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // Create and store a message state with feed id [1....] and publish time 13 at slot 10. - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; - - // Query the message state after the available times should return an error. - assert!(storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::FirstAfter(14), - MessageStateFilter::Only(MessageType::PriceFeedMessage) - ) - .await - .is_err()); - } - - #[tokio::test] - pub async fn test_store_more_message_states_than_cache_size_evicts_old_messages() { - // Initialize a storage with a cache size of 2 per key. - let storage = LocalStorage::new_instance(2); - - // Create and store a message state with feed id [1....] and publish time 10 at slot 5. - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // Create and store a message state with feed id [1....] and publish time 13 at slot 10. - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 13, 10).await; - - // Create and store a message state with feed id [1....] and publish time 20 at slot 14. - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 20, 14).await; - - // The message at time 10 should be evicted and querying for it should return an error. - assert!(storage - .fetch_message_states( - vec![[1; 32]], - RequestTime::FirstAfter(10), - MessageStateFilter::Only(MessageType::PriceFeedMessage) - ) - .await - .is_err()); - } - - #[tokio::test] - pub async fn test_store_and_receive_multiple_message_feed_ids_works() { - // Initialize a storage with a cache size of 1 per key. - let storage = LocalStorage::new_instance(1); - - // Create and store a message state with feed id [1....] and publish time 10 at slot 5. - let message_state_1 = - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // Create and store a message state with feed id [2....] and publish time 13 at slot 10. - let message_state_2 = - create_and_store_dummy_price_feed_message_state(&storage, [2; 32], 10, 5).await; - - // Check both message states can be retrieved. - assert_eq!( - storage - .fetch_message_states( - vec![[1; 32], [2; 32]], - RequestTime::Latest, - MessageStateFilter::Only(MessageType::PriceFeedMessage), - ) - .await - .unwrap(), - vec![message_state_1, message_state_2] - ); - } - - #[tokio::test] - pub async fn test_receive_not_existent_message_fails() { - // Initialize a storage with a cache size of 2 per key. - let storage = LocalStorage::new_instance(2); - - create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await; - - // Check both message states can be retrieved. - assert!(storage - .fetch_message_states( - vec![[2; 32]], - RequestTime::Latest, - MessageStateFilter::Only(MessageType::PriceFeedMessage), - ) - .await - .is_err()); - } - - pub fn create_empty_accumulator_state_at_slot(slot: Slot) -> AccumulatorState { - AccumulatorState { - slot, - accumulator_messages: None, - wormhole_merkle_state: None, - } - } - - pub async fn create_and_store_empty_accumulator_state_at_slot( - storage: &StorageInstance, - slot: Slot, - ) -> AccumulatorState { - let accumulator_state = create_empty_accumulator_state_at_slot(slot); - storage - .store_accumulator_state(accumulator_state.clone()) - .await - .unwrap(); - accumulator_state - } - - #[tokio::test] - pub async fn test_store_and_receive_accumulator_state_works() { - // Initialize a storage with a cache size of 2 per key and the accumulator state. - let storage = LocalStorage::new_instance(2); - - // Create and store an accumulator state with slot 10. - let accumulator_state = - create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; - - // Make sure the retrieved accumulator state is what we stored. - assert_eq!( - storage.fetch_accumulator_state(10).await.unwrap().unwrap(), - accumulator_state - ); - } - - #[tokio::test] - pub async fn test_store_and_receive_accumulator_state_works_on_overwrite() { - // Initialize a storage with a cache size of 2 per key and the accumulator state. - let storage = LocalStorage::new_instance(2); - - // Create and store an accumulator state with slot 10. - let mut accumulator_state = - create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; - - // Retrieve the accumulator state and make sure it is what we stored. - assert_eq!( - storage.fetch_accumulator_state(10).await.unwrap().unwrap(), - accumulator_state - ); - - // Change the state to have accumulator messages - // We mutate the existing state because the normal flow is like this. - accumulator_state.accumulator_messages = Some(AccumulatorMessages { - magic: [0; 4], - slot: 10, - ring_size: 3, - raw_messages: vec![], - }); - - // Store the accumulator state again. - storage - .store_accumulator_state(accumulator_state.clone()) - .await - .unwrap(); - - // Make sure the retrieved accumulator state is what we stored. - assert_eq!( - storage.fetch_accumulator_state(10).await.unwrap().unwrap(), - accumulator_state - ); - } - - #[tokio::test] - pub async fn test_store_and_receive_multiple_accumulator_state_works() { - // Initialize a storage with a cache size of 2 per key and the accumulator state. - let storage = LocalStorage::new_instance(2); - - let accumulator_state_at_slot_10 = - create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; - let accumulator_state_at_slot_20 = - create_and_store_empty_accumulator_state_at_slot(&storage, 20).await; - - // Retrieve the accumulator states and make sure it is what we stored. - assert_eq!( - storage.fetch_accumulator_state(10).await.unwrap().unwrap(), - accumulator_state_at_slot_10 - ); - - assert_eq!( - storage.fetch_accumulator_state(20).await.unwrap().unwrap(), - accumulator_state_at_slot_20 - ); - } - - #[tokio::test] - pub async fn test_store_and_receive_accumulator_state_evicts_cache() { - // Initialize a storage with a cache size of 2 per key and the accumulator state. - let storage = LocalStorage::new_instance(2); - - let _accumulator_state_at_slot_10 = - create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; - let accumulator_state_at_slot_20 = - create_and_store_empty_accumulator_state_at_slot(&storage, 20).await; - let accumulator_state_at_slot_30 = - create_and_store_empty_accumulator_state_at_slot(&storage, 30).await; - - // The accumulator state at slot 10 should be evicted from the cache. - assert_eq!(storage.fetch_accumulator_state(10).await.unwrap(), None); - - // Retrieve the rest of accumulator states and make sure it is what we stored. - assert_eq!( - storage.fetch_accumulator_state(20).await.unwrap().unwrap(), - accumulator_state_at_slot_20 - ); - - assert_eq!( - storage.fetch_accumulator_state(30).await.unwrap().unwrap(), - accumulator_state_at_slot_30 - ); - } - - #[tokio::test] - pub async fn test_update_accumulator_state_works() { - // Initialize a storage with a cache size of 2 per key and the accumulator state. - let storage = LocalStorage::new_instance(2); - - // Create an empty accumulator state at slot 10. - create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; - - // Update the accumulator state with slot 10. - let accumulator_messages = AccumulatorMessages { - magic: [0; 4], - slot: 10, - ring_size: 3, - raw_messages: vec![], - }; - - let accumulator_messages_clone = accumulator_messages.clone(); - - storage - .update_accumulator_state( - 10, - Box::new(|mut accumulator_state| { - accumulator_state.accumulator_messages = Some(accumulator_messages_clone); - accumulator_state - }), - ) - .await - .unwrap(); - - // Make sure the retrieved accumulator state is what we stored. - assert_eq!( - storage.fetch_accumulator_state(10).await.unwrap(), - Some(AccumulatorState { - slot: 10, - accumulator_messages: Some(accumulator_messages), - wormhole_merkle_state: None, - }) - ); - } - - #[tokio::test] - pub async fn test_update_accumulator_state_works_on_nonexistent_state() { - // Initialize an empty storage with a cache size of 2 per key and the accumulator state. - let storage = LocalStorage::new_instance(2); - - // Update the accumulator state with slot 10. - let accumulator_messages = AccumulatorMessages { - magic: [0; 4], - slot: 10, - ring_size: 3, - raw_messages: vec![], - }; - let accumulator_messages_clone = accumulator_messages.clone(); - storage - .update_accumulator_state( - 10, - Box::new(|mut accumulator_state| { - accumulator_state.accumulator_messages = Some(accumulator_messages_clone); - accumulator_state - }), - ) - .await - .unwrap(); - - // Make sure the retrieved accumulator state is what we stored. - assert_eq!( - storage.fetch_accumulator_state(10).await.unwrap(), - Some(AccumulatorState { - slot: 10, - accumulator_messages: Some(accumulator_messages), - wormhole_merkle_state: None, - }) - ); - } - - #[tokio::test] - pub async fn test_update_accumulator_state_works_on_concurrent_updates() { - // Initialize an empty storage with a cache size of 20 per key and the accumulator state. - let storage = LocalStorage::new_instance(20); - - - // Run this check 10 times to make sure the concurrent updates work. - let mut futures = vec![]; - - for slot in 1..10 { - futures.push(storage.update_accumulator_state( - slot, - Box::new(|mut accumulator_state| { - accumulator_state.accumulator_messages = Some(AccumulatorMessages { - magic: [0; 4], - slot: 123, - ring_size: 3, - raw_messages: vec![], - }); - accumulator_state - }), - )); - futures.push(storage.update_accumulator_state( - slot, - Box::new(|mut accumulator_state| { - accumulator_state.wormhole_merkle_state = Some(WormholeMerkleState { - root: WormholeMerkleRoot { - root: [0; 20], - slot: 123, - ring_size: 3, - }, - vaa: vec![], - }); - accumulator_state - }), - )); - } - - join_all(futures).await; - - for slot in 1..10 { - assert_eq!( - storage.fetch_accumulator_state(slot).await.unwrap(), - Some(AccumulatorState { - slot, - accumulator_messages: Some(AccumulatorMessages { - magic: [0; 4], - slot: 123, - ring_size: 3, - raw_messages: vec![], - }), - wormhole_merkle_state: Some(WormholeMerkleState { - root: WormholeMerkleRoot { - root: [0; 20], - slot: 123, - ring_size: 3, - }, - vaa: vec![], - }), - }) - ) - } - } - - #[tokio::test] - pub async fn test_update_accumulator_state_evicts_cache() { - // Initialize a storage with a cache size of 2 per key and the accumulator state. - let storage = LocalStorage::new_instance(2); - - storage - .update_accumulator_state(10, Box::new(|accumulator_state| accumulator_state)) - .await - .unwrap(); - storage - .update_accumulator_state(20, Box::new(|accumulator_state| accumulator_state)) - .await - .unwrap(); - storage - .update_accumulator_state(30, Box::new(|accumulator_state| accumulator_state)) - .await - .unwrap(); - - // The accumulator state at slot 10 should be evicted from the cache. - assert_eq!(storage.fetch_accumulator_state(10).await.unwrap(), None); - - // Retrieve the rest of accumulator states and make sure it is what we stored. - assert_eq!( - storage.fetch_accumulator_state(20).await.unwrap().unwrap(), - AccumulatorState { - slot: 20, - accumulator_messages: None, - wormhole_merkle_state: None, - } - ); - - assert_eq!( - storage.fetch_accumulator_state(30).await.unwrap().unwrap(), - AccumulatorState { - slot: 30, - accumulator_messages: None, - wormhole_merkle_state: None, - } - ); - } -}