diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index bfa025aa..b902a9a1 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -1659,7 +1659,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermes" -version = "0.1.3" +version = "0.1.4" dependencies = [ "anyhow", "async-trait", @@ -1678,6 +1678,7 @@ dependencies = [ "libc", "libp2p", "log", + "prometheus-client", "pyth-sdk", "pythnet-sdk", "rand 0.8.5", @@ -3635,6 +3636,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus-client" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c2f43e8969d51935d2a7284878ae053ba30034cd563f673cde37ba5205685e" +dependencies = [ + "dtoa", + "itoa", + "parking_lot 0.12.1", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b6a5217beb0ad503ee7fa752d451c905113d70721b937126158f3106a48cc1" +dependencies = [ + "proc-macro2 1.0.56", + "quote 1.0.27", + "syn 1.0.109", +] + [[package]] name = "prost" version = "0.9.0" diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index 1bfc06eb..ec5f6796 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.1.3" +version = "0.1.4" edition = "2021" [dependencies] @@ -35,6 +35,7 @@ libp2p = { version = "0.42.2", features = [ ]} log = { version = "0.4.17" } +prometheus-client = { version = "0.21.1" } pyth-sdk = { version = "0.7.0" } # Parse Wormhole attester price attestations. diff --git a/hermes/src/store.rs b/hermes/src/store.rs index f8a741b9..ae745db9 100644 --- a/hermes/src/store.rs +++ b/hermes/src/store.rs @@ -19,10 +19,7 @@ use { construct_message_states_proofs, store_wormhole_merkle_verified_message, }, - storage::{ - AccumulatorState, - CompletedAccumulatorState, - }, + storage::CompletedAccumulatorState, types::{ ProofSet, UnixTimestamp, @@ -150,18 +147,14 @@ impl Store { Update::AccumulatorMessages(accumulator_messages) => { let slot = accumulator_messages.slot; log::info!("Storing accumulator messages for slot {:?}.", slot,); - let mut accumulator_state = self - .storage - .fetch_accumulator_state(slot) - .await? - .unwrap_or(AccumulatorState { - slot, - accumulator_messages: None, - wormhole_merkle_state: None, - }); - accumulator_state.accumulator_messages = Some(accumulator_messages); self.storage - .store_accumulator_state(accumulator_state) + .update_accumulator_state( + slot, + Box::new(|mut state| { + state.accumulator_messages = Some(accumulator_messages); + state + }), + ) .await?; slot } diff --git a/hermes/src/store/proof/wormhole_merkle.rs b/hermes/src/store/proof/wormhole_merkle.rs index 7450a55b..99b5d2a0 100644 --- a/hermes/src/store/proof/wormhole_merkle.rs +++ b/hermes/src/store/proof/wormhole_merkle.rs @@ -1,7 +1,6 @@ use { crate::store::{ storage::{ - AccumulatorState, CompletedAccumulatorState, MessageState, }, @@ -49,23 +48,18 @@ pub async fn store_wormhole_merkle_verified_message( root: WormholeMerkleRoot, vaa_bytes: Vec, ) -> Result<()> { - let mut accumulator_state = store - .storage - .fetch_accumulator_state(root.slot) - .await? - .unwrap_or(AccumulatorState { - slot: root.slot, - accumulator_messages: None, - wormhole_merkle_state: None, - }); - - accumulator_state.wormhole_merkle_state = Some(WormholeMerkleState { - root, - vaa: vaa_bytes, - }); store .storage - .store_accumulator_state(accumulator_state) + .update_accumulator_state( + root.slot, + Box::new(|mut state| { + state.wormhole_merkle_state = Some(WormholeMerkleState { + root, + vaa: vaa_bytes, + }); + state + }), + ) .await?; Ok(()) } diff --git a/hermes/src/store/storage.rs b/hermes/src/store/storage.rs index 371d708c..1d1b10af 100644 --- a/hermes/src/store/storage.rs +++ b/hermes/src/store/storage.rs @@ -134,8 +134,20 @@ pub trait Storage: Send + Sync { filter: MessageStateFilter, ) -> Result>; + /// Store the accumulator state. Please note that this call will replace the + /// existing accumulator state for the given state's slot. If you wish to + /// update the accumulator state, use `update_accumulator_state` instead. async fn store_accumulator_state(&self, state: AccumulatorState) -> Result<()>; async fn fetch_accumulator_state(&self, slot: Slot) -> Result>; + + /// Update the accumulator state inplace using the provided callback. The callback + /// takes the current state and returns the new state. If there is no accumulator + /// state for the given slot, the callback will be called with an empty accumulator state. + async fn update_accumulator_state( + &self, + slot: Slot, + callback: Box AccumulatorState) + Send>, + ) -> Result<()>; } pub type StorageInstance = Box; diff --git a/hermes/src/store/storage/local_storage.rs b/hermes/src/store/storage/local_storage.rs index b6cbc09d..c081dde2 100644 --- a/hermes/src/store/storage/local_storage.rs +++ b/hermes/src/store/storage/local_storage.rs @@ -87,6 +87,25 @@ impl LocalStorage { None => None, } } + + /// Store the accumulator state in the cache assuming that the lock is already acquired. + fn store_accumulator_state_impl( + &self, + state: AccumulatorState, + cache: &mut VecDeque, + ) { + cache.push_back(state); + + let mut i = cache.len().saturating_sub(1); + while i > 0 && cache[i - 1].slot > cache[i].slot { + cache.swap(i - 1, i); + i -= 1; + } + + if cache.len() > self.cache_size as usize { + cache.pop_front(); + } + } } #[async_trait] @@ -158,18 +177,7 @@ impl Storage for LocalStorage { async fn store_accumulator_state(&self, state: super::AccumulatorState) -> Result<()> { let mut accumulator_cache = self.accumulator_cache.write().await; - accumulator_cache.push_back(state); - - let mut i = accumulator_cache.len().saturating_sub(1); - while i > 0 && accumulator_cache[i - 1].slot > accumulator_cache[i].slot { - accumulator_cache.swap(i - 1, i); - i -= 1; - } - - if accumulator_cache.len() > self.cache_size as usize { - accumulator_cache.pop_front(); - } - + self.store_accumulator_state_impl(state, &mut accumulator_cache); Ok(()) } @@ -180,6 +188,30 @@ impl Storage for LocalStorage { Err(_) => Ok(None), } } + + async fn update_accumulator_state( + &self, + slot: Slot, + callback: Box AccumulatorState) + Send>, + ) -> Result<()> { + let mut accumulator_cache = self.accumulator_cache.write().await; + match accumulator_cache.binary_search_by_key(&slot, |state| state.slot) { + Ok(idx) => { + let state = accumulator_cache.get_mut(idx).unwrap(); + *state = callback(state.clone()); + } + Err(_) => { + let state = callback(AccumulatorState { + slot, + accumulator_messages: None, + wormhole_merkle_state: None, + }); + self.store_accumulator_state_impl(state, &mut accumulator_cache); + } + } + + Ok(()) + } } #[cfg(test)] @@ -187,12 +219,16 @@ mod test { use { super::*, crate::store::{ - proof::wormhole_merkle::WormholeMerkleMessageProof, + proof::wormhole_merkle::{ + WormholeMerkleMessageProof, + WormholeMerkleState, + }, types::{ AccumulatorMessages, ProofSet, }, }, + futures::future::join_all, pyth_sdk::UnixTimestamp, pythnet_sdk::{ accumulators::merkle::MerklePath, @@ -201,6 +237,7 @@ mod test { Message, PriceFeedMessage, }, + wire::v1::WormholeMerkleRoot, }, }; @@ -642,4 +679,184 @@ mod test { accumulator_state_at_slot_30 ); } + + #[tokio::test] + pub async fn test_update_accumulator_state_works() { + // Initialize a storage with a cache size of 2 per key and the accumulator state. + let storage = LocalStorage::new_instance(2); + + // Create an empty accumulator state at slot 10. + create_and_store_empty_accumulator_state_at_slot(&storage, 10).await; + + // Update the accumulator state with slot 10. + let accumulator_messages = AccumulatorMessages { + magic: [0; 4], + slot: 10, + ring_size: 3, + raw_messages: vec![], + }; + + let accumulator_messages_clone = accumulator_messages.clone(); + + storage + .update_accumulator_state( + 10, + Box::new(|mut accumulator_state| { + accumulator_state.accumulator_messages = Some(accumulator_messages_clone); + accumulator_state + }), + ) + .await + .unwrap(); + + // Make sure the retrieved accumulator state is what we stored. + assert_eq!( + storage.fetch_accumulator_state(10).await.unwrap(), + Some(AccumulatorState { + slot: 10, + accumulator_messages: Some(accumulator_messages), + wormhole_merkle_state: None, + }) + ); + } + + #[tokio::test] + pub async fn test_update_accumulator_state_works_on_nonexistent_state() { + // Initialize an empty storage with a cache size of 2 per key and the accumulator state. + let storage = LocalStorage::new_instance(2); + + // Update the accumulator state with slot 10. + let accumulator_messages = AccumulatorMessages { + magic: [0; 4], + slot: 10, + ring_size: 3, + raw_messages: vec![], + }; + let accumulator_messages_clone = accumulator_messages.clone(); + storage + .update_accumulator_state( + 10, + Box::new(|mut accumulator_state| { + accumulator_state.accumulator_messages = Some(accumulator_messages_clone); + accumulator_state + }), + ) + .await + .unwrap(); + + // Make sure the retrieved accumulator state is what we stored. + assert_eq!( + storage.fetch_accumulator_state(10).await.unwrap(), + Some(AccumulatorState { + slot: 10, + accumulator_messages: Some(accumulator_messages), + wormhole_merkle_state: None, + }) + ); + } + + #[tokio::test] + pub async fn test_update_accumulator_state_works_on_concurrent_updates() { + // Initialize an empty storage with a cache size of 20 per key and the accumulator state. + let storage = LocalStorage::new_instance(20); + + + // Run this check 10 times to make sure the concurrent updates work. + let mut futures = vec![]; + + for slot in 1..10 { + futures.push(storage.update_accumulator_state( + slot, + Box::new(|mut accumulator_state| { + accumulator_state.accumulator_messages = Some(AccumulatorMessages { + magic: [0; 4], + slot: 123, + ring_size: 3, + raw_messages: vec![], + }); + accumulator_state + }), + )); + futures.push(storage.update_accumulator_state( + slot, + Box::new(|mut accumulator_state| { + accumulator_state.wormhole_merkle_state = Some(WormholeMerkleState { + root: WormholeMerkleRoot { + root: [0; 20], + slot: 123, + ring_size: 3, + }, + vaa: vec![], + }); + accumulator_state + }), + )); + } + + join_all(futures).await; + + for slot in 1..10 { + assert_eq!( + storage.fetch_accumulator_state(slot).await.unwrap(), + Some(AccumulatorState { + slot, + accumulator_messages: Some(AccumulatorMessages { + magic: [0; 4], + slot: 123, + ring_size: 3, + raw_messages: vec![], + }), + wormhole_merkle_state: Some(WormholeMerkleState { + root: WormholeMerkleRoot { + root: [0; 20], + slot: 123, + ring_size: 3, + }, + vaa: vec![], + }), + }) + ) + } + } + + #[tokio::test] + pub async fn test_update_accumulator_state_evicts_cache() { + // Initialize a storage with a cache size of 2 per key and the accumulator state. + let storage = LocalStorage::new_instance(2); + + storage + .update_accumulator_state(10, Box::new(|accumulator_state| accumulator_state)) + .await + .unwrap(); + storage + .update_accumulator_state(20, Box::new(|accumulator_state| accumulator_state)) + .await + .unwrap(); + storage + .update_accumulator_state(30, Box::new(|accumulator_state| accumulator_state)) + .await + .unwrap(); + + // The accumulator state at slot 10 should be evicted from the cache. + assert_eq!(storage.fetch_accumulator_state(10).await.unwrap(), None); + + // Retrieve the rest of accumulator states and make sure it is what we stored. + assert_eq!( + storage.fetch_accumulator_state(20).await.unwrap().unwrap(), + AccumulatorState { + slot: 20, + accumulator_messages: None, + wormhole_merkle_state: None, + } + ); + + assert_eq!( + storage.fetch_accumulator_state(30).await.unwrap().unwrap(), + AccumulatorState { + slot: 30, + accumulator_messages: None, + wormhole_merkle_state: None, + } + ); + } }