[hermes] Fix concurrency issue (#925)
This commit is contained in:
parent
c4c4a6384a
commit
96cb221a3a
|
@ -1659,7 +1659,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hermes"
|
name = "hermes"
|
||||||
version = "0.1.3"
|
version = "0.1.4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
@ -1678,6 +1678,7 @@ dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"libp2p",
|
"libp2p",
|
||||||
"log",
|
"log",
|
||||||
|
"prometheus-client",
|
||||||
"pyth-sdk",
|
"pyth-sdk",
|
||||||
"pythnet-sdk",
|
"pythnet-sdk",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
|
@ -3635,6 +3636,29 @@ dependencies = [
|
||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prometheus-client"
|
||||||
|
version = "0.21.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "78c2f43e8969d51935d2a7284878ae053ba30034cd563f673cde37ba5205685e"
|
||||||
|
dependencies = [
|
||||||
|
"dtoa",
|
||||||
|
"itoa",
|
||||||
|
"parking_lot 0.12.1",
|
||||||
|
"prometheus-client-derive-encode",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "prometheus-client-derive-encode"
|
||||||
|
version = "0.4.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "72b6a5217beb0ad503ee7fa752d451c905113d70721b937126158f3106a48cc1"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2 1.0.56",
|
||||||
|
"quote 1.0.27",
|
||||||
|
"syn 1.0.109",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "prost"
|
name = "prost"
|
||||||
version = "0.9.0"
|
version = "0.9.0"
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "hermes"
|
name = "hermes"
|
||||||
version = "0.1.3"
|
version = "0.1.4"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
@ -35,6 +35,7 @@ libp2p = { version = "0.42.2", features = [
|
||||||
]}
|
]}
|
||||||
|
|
||||||
log = { version = "0.4.17" }
|
log = { version = "0.4.17" }
|
||||||
|
prometheus-client = { version = "0.21.1" }
|
||||||
pyth-sdk = { version = "0.7.0" }
|
pyth-sdk = { version = "0.7.0" }
|
||||||
|
|
||||||
# Parse Wormhole attester price attestations.
|
# Parse Wormhole attester price attestations.
|
||||||
|
|
|
@ -19,10 +19,7 @@ use {
|
||||||
construct_message_states_proofs,
|
construct_message_states_proofs,
|
||||||
store_wormhole_merkle_verified_message,
|
store_wormhole_merkle_verified_message,
|
||||||
},
|
},
|
||||||
storage::{
|
storage::CompletedAccumulatorState,
|
||||||
AccumulatorState,
|
|
||||||
CompletedAccumulatorState,
|
|
||||||
},
|
|
||||||
types::{
|
types::{
|
||||||
ProofSet,
|
ProofSet,
|
||||||
UnixTimestamp,
|
UnixTimestamp,
|
||||||
|
@ -150,18 +147,14 @@ impl Store {
|
||||||
Update::AccumulatorMessages(accumulator_messages) => {
|
Update::AccumulatorMessages(accumulator_messages) => {
|
||||||
let slot = accumulator_messages.slot;
|
let slot = accumulator_messages.slot;
|
||||||
log::info!("Storing accumulator messages for slot {:?}.", slot,);
|
log::info!("Storing accumulator messages for slot {:?}.", slot,);
|
||||||
let mut accumulator_state = self
|
|
||||||
.storage
|
|
||||||
.fetch_accumulator_state(slot)
|
|
||||||
.await?
|
|
||||||
.unwrap_or(AccumulatorState {
|
|
||||||
slot,
|
|
||||||
accumulator_messages: None,
|
|
||||||
wormhole_merkle_state: None,
|
|
||||||
});
|
|
||||||
accumulator_state.accumulator_messages = Some(accumulator_messages);
|
|
||||||
self.storage
|
self.storage
|
||||||
.store_accumulator_state(accumulator_state)
|
.update_accumulator_state(
|
||||||
|
slot,
|
||||||
|
Box::new(|mut state| {
|
||||||
|
state.accumulator_messages = Some(accumulator_messages);
|
||||||
|
state
|
||||||
|
}),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
slot
|
slot
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use {
|
use {
|
||||||
crate::store::{
|
crate::store::{
|
||||||
storage::{
|
storage::{
|
||||||
AccumulatorState,
|
|
||||||
CompletedAccumulatorState,
|
CompletedAccumulatorState,
|
||||||
MessageState,
|
MessageState,
|
||||||
},
|
},
|
||||||
|
@ -49,23 +48,18 @@ pub async fn store_wormhole_merkle_verified_message(
|
||||||
root: WormholeMerkleRoot,
|
root: WormholeMerkleRoot,
|
||||||
vaa_bytes: Vec<u8>,
|
vaa_bytes: Vec<u8>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut accumulator_state = store
|
|
||||||
.storage
|
|
||||||
.fetch_accumulator_state(root.slot)
|
|
||||||
.await?
|
|
||||||
.unwrap_or(AccumulatorState {
|
|
||||||
slot: root.slot,
|
|
||||||
accumulator_messages: None,
|
|
||||||
wormhole_merkle_state: None,
|
|
||||||
});
|
|
||||||
|
|
||||||
accumulator_state.wormhole_merkle_state = Some(WormholeMerkleState {
|
|
||||||
root,
|
|
||||||
vaa: vaa_bytes,
|
|
||||||
});
|
|
||||||
store
|
store
|
||||||
.storage
|
.storage
|
||||||
.store_accumulator_state(accumulator_state)
|
.update_accumulator_state(
|
||||||
|
root.slot,
|
||||||
|
Box::new(|mut state| {
|
||||||
|
state.wormhole_merkle_state = Some(WormholeMerkleState {
|
||||||
|
root,
|
||||||
|
vaa: vaa_bytes,
|
||||||
|
});
|
||||||
|
state
|
||||||
|
}),
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,8 +134,20 @@ pub trait Storage: Send + Sync {
|
||||||
filter: MessageStateFilter,
|
filter: MessageStateFilter,
|
||||||
) -> Result<Vec<MessageState>>;
|
) -> Result<Vec<MessageState>>;
|
||||||
|
|
||||||
|
/// Store the accumulator state. Please note that this call will replace the
|
||||||
|
/// existing accumulator state for the given state's slot. If you wish to
|
||||||
|
/// update the accumulator state, use `update_accumulator_state` instead.
|
||||||
async fn store_accumulator_state(&self, state: AccumulatorState) -> Result<()>;
|
async fn store_accumulator_state(&self, state: AccumulatorState) -> Result<()>;
|
||||||
async fn fetch_accumulator_state(&self, slot: Slot) -> Result<Option<AccumulatorState>>;
|
async fn fetch_accumulator_state(&self, slot: Slot) -> Result<Option<AccumulatorState>>;
|
||||||
|
|
||||||
|
/// Update the accumulator state inplace using the provided callback. The callback
|
||||||
|
/// takes the current state and returns the new state. If there is no accumulator
|
||||||
|
/// state for the given slot, the callback will be called with an empty accumulator state.
|
||||||
|
async fn update_accumulator_state(
|
||||||
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
callback: Box<dyn (FnOnce(AccumulatorState) -> AccumulatorState) + Send>,
|
||||||
|
) -> Result<()>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type StorageInstance = Box<dyn Storage>;
|
pub type StorageInstance = Box<dyn Storage>;
|
||||||
|
|
|
@ -87,6 +87,25 @@ impl LocalStorage {
|
||||||
None => None,
|
None => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Store the accumulator state in the cache assuming that the lock is already acquired.
|
||||||
|
fn store_accumulator_state_impl(
|
||||||
|
&self,
|
||||||
|
state: AccumulatorState,
|
||||||
|
cache: &mut VecDeque<AccumulatorState>,
|
||||||
|
) {
|
||||||
|
cache.push_back(state);
|
||||||
|
|
||||||
|
let mut i = cache.len().saturating_sub(1);
|
||||||
|
while i > 0 && cache[i - 1].slot > cache[i].slot {
|
||||||
|
cache.swap(i - 1, i);
|
||||||
|
i -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if cache.len() > self.cache_size as usize {
|
||||||
|
cache.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
@ -158,18 +177,7 @@ impl Storage for LocalStorage {
|
||||||
|
|
||||||
async fn store_accumulator_state(&self, state: super::AccumulatorState) -> Result<()> {
|
async fn store_accumulator_state(&self, state: super::AccumulatorState) -> Result<()> {
|
||||||
let mut accumulator_cache = self.accumulator_cache.write().await;
|
let mut accumulator_cache = self.accumulator_cache.write().await;
|
||||||
accumulator_cache.push_back(state);
|
self.store_accumulator_state_impl(state, &mut accumulator_cache);
|
||||||
|
|
||||||
let mut i = accumulator_cache.len().saturating_sub(1);
|
|
||||||
while i > 0 && accumulator_cache[i - 1].slot > accumulator_cache[i].slot {
|
|
||||||
accumulator_cache.swap(i - 1, i);
|
|
||||||
i -= 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if accumulator_cache.len() > self.cache_size as usize {
|
|
||||||
accumulator_cache.pop_front();
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,6 +188,30 @@ impl Storage for LocalStorage {
|
||||||
Err(_) => Ok(None),
|
Err(_) => Ok(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn update_accumulator_state(
|
||||||
|
&self,
|
||||||
|
slot: Slot,
|
||||||
|
callback: Box<dyn (FnOnce(AccumulatorState) -> AccumulatorState) + Send>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut accumulator_cache = self.accumulator_cache.write().await;
|
||||||
|
match accumulator_cache.binary_search_by_key(&slot, |state| state.slot) {
|
||||||
|
Ok(idx) => {
|
||||||
|
let state = accumulator_cache.get_mut(idx).unwrap();
|
||||||
|
*state = callback(state.clone());
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
let state = callback(AccumulatorState {
|
||||||
|
slot,
|
||||||
|
accumulator_messages: None,
|
||||||
|
wormhole_merkle_state: None,
|
||||||
|
});
|
||||||
|
self.store_accumulator_state_impl(state, &mut accumulator_cache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -187,12 +219,16 @@ mod test {
|
||||||
use {
|
use {
|
||||||
super::*,
|
super::*,
|
||||||
crate::store::{
|
crate::store::{
|
||||||
proof::wormhole_merkle::WormholeMerkleMessageProof,
|
proof::wormhole_merkle::{
|
||||||
|
WormholeMerkleMessageProof,
|
||||||
|
WormholeMerkleState,
|
||||||
|
},
|
||||||
types::{
|
types::{
|
||||||
AccumulatorMessages,
|
AccumulatorMessages,
|
||||||
ProofSet,
|
ProofSet,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
futures::future::join_all,
|
||||||
pyth_sdk::UnixTimestamp,
|
pyth_sdk::UnixTimestamp,
|
||||||
pythnet_sdk::{
|
pythnet_sdk::{
|
||||||
accumulators::merkle::MerklePath,
|
accumulators::merkle::MerklePath,
|
||||||
|
@ -201,6 +237,7 @@ mod test {
|
||||||
Message,
|
Message,
|
||||||
PriceFeedMessage,
|
PriceFeedMessage,
|
||||||
},
|
},
|
||||||
|
wire::v1::WormholeMerkleRoot,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -642,4 +679,184 @@ mod test {
|
||||||
accumulator_state_at_slot_30
|
accumulator_state_at_slot_30
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn test_update_accumulator_state_works() {
|
||||||
|
// Initialize a storage with a cache size of 2 per key and the accumulator state.
|
||||||
|
let storage = LocalStorage::new_instance(2);
|
||||||
|
|
||||||
|
// Create an empty accumulator state at slot 10.
|
||||||
|
create_and_store_empty_accumulator_state_at_slot(&storage, 10).await;
|
||||||
|
|
||||||
|
// Update the accumulator state with slot 10.
|
||||||
|
let accumulator_messages = AccumulatorMessages {
|
||||||
|
magic: [0; 4],
|
||||||
|
slot: 10,
|
||||||
|
ring_size: 3,
|
||||||
|
raw_messages: vec![],
|
||||||
|
};
|
||||||
|
|
||||||
|
let accumulator_messages_clone = accumulator_messages.clone();
|
||||||
|
|
||||||
|
storage
|
||||||
|
.update_accumulator_state(
|
||||||
|
10,
|
||||||
|
Box::new(|mut accumulator_state| {
|
||||||
|
accumulator_state.accumulator_messages = Some(accumulator_messages_clone);
|
||||||
|
accumulator_state
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Make sure the retrieved accumulator state is what we stored.
|
||||||
|
assert_eq!(
|
||||||
|
storage.fetch_accumulator_state(10).await.unwrap(),
|
||||||
|
Some(AccumulatorState {
|
||||||
|
slot: 10,
|
||||||
|
accumulator_messages: Some(accumulator_messages),
|
||||||
|
wormhole_merkle_state: None,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn test_update_accumulator_state_works_on_nonexistent_state() {
|
||||||
|
// Initialize an empty storage with a cache size of 2 per key and the accumulator state.
|
||||||
|
let storage = LocalStorage::new_instance(2);
|
||||||
|
|
||||||
|
// Update the accumulator state with slot 10.
|
||||||
|
let accumulator_messages = AccumulatorMessages {
|
||||||
|
magic: [0; 4],
|
||||||
|
slot: 10,
|
||||||
|
ring_size: 3,
|
||||||
|
raw_messages: vec![],
|
||||||
|
};
|
||||||
|
let accumulator_messages_clone = accumulator_messages.clone();
|
||||||
|
storage
|
||||||
|
.update_accumulator_state(
|
||||||
|
10,
|
||||||
|
Box::new(|mut accumulator_state| {
|
||||||
|
accumulator_state.accumulator_messages = Some(accumulator_messages_clone);
|
||||||
|
accumulator_state
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// Make sure the retrieved accumulator state is what we stored.
|
||||||
|
assert_eq!(
|
||||||
|
storage.fetch_accumulator_state(10).await.unwrap(),
|
||||||
|
Some(AccumulatorState {
|
||||||
|
slot: 10,
|
||||||
|
accumulator_messages: Some(accumulator_messages),
|
||||||
|
wormhole_merkle_state: None,
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn test_update_accumulator_state_works_on_concurrent_updates() {
|
||||||
|
// Initialize an empty storage with a cache size of 20 per key and the accumulator state.
|
||||||
|
let storage = LocalStorage::new_instance(20);
|
||||||
|
|
||||||
|
|
||||||
|
// Run this check 10 times to make sure the concurrent updates work.
|
||||||
|
let mut futures = vec![];
|
||||||
|
|
||||||
|
for slot in 1..10 {
|
||||||
|
futures.push(storage.update_accumulator_state(
|
||||||
|
slot,
|
||||||
|
Box::new(|mut accumulator_state| {
|
||||||
|
accumulator_state.accumulator_messages = Some(AccumulatorMessages {
|
||||||
|
magic: [0; 4],
|
||||||
|
slot: 123,
|
||||||
|
ring_size: 3,
|
||||||
|
raw_messages: vec![],
|
||||||
|
});
|
||||||
|
accumulator_state
|
||||||
|
}),
|
||||||
|
));
|
||||||
|
futures.push(storage.update_accumulator_state(
|
||||||
|
slot,
|
||||||
|
Box::new(|mut accumulator_state| {
|
||||||
|
accumulator_state.wormhole_merkle_state = Some(WormholeMerkleState {
|
||||||
|
root: WormholeMerkleRoot {
|
||||||
|
root: [0; 20],
|
||||||
|
slot: 123,
|
||||||
|
ring_size: 3,
|
||||||
|
},
|
||||||
|
vaa: vec![],
|
||||||
|
});
|
||||||
|
accumulator_state
|
||||||
|
}),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
join_all(futures).await;
|
||||||
|
|
||||||
|
for slot in 1..10 {
|
||||||
|
assert_eq!(
|
||||||
|
storage.fetch_accumulator_state(slot).await.unwrap(),
|
||||||
|
Some(AccumulatorState {
|
||||||
|
slot,
|
||||||
|
accumulator_messages: Some(AccumulatorMessages {
|
||||||
|
magic: [0; 4],
|
||||||
|
slot: 123,
|
||||||
|
ring_size: 3,
|
||||||
|
raw_messages: vec![],
|
||||||
|
}),
|
||||||
|
wormhole_merkle_state: Some(WormholeMerkleState {
|
||||||
|
root: WormholeMerkleRoot {
|
||||||
|
root: [0; 20],
|
||||||
|
slot: 123,
|
||||||
|
ring_size: 3,
|
||||||
|
},
|
||||||
|
vaa: vec![],
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn test_update_accumulator_state_evicts_cache() {
|
||||||
|
// Initialize a storage with a cache size of 2 per key and the accumulator state.
|
||||||
|
let storage = LocalStorage::new_instance(2);
|
||||||
|
|
||||||
|
storage
|
||||||
|
.update_accumulator_state(10, Box::new(|accumulator_state| accumulator_state))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
storage
|
||||||
|
.update_accumulator_state(20, Box::new(|accumulator_state| accumulator_state))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
storage
|
||||||
|
.update_accumulator_state(30, Box::new(|accumulator_state| accumulator_state))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// The accumulator state at slot 10 should be evicted from the cache.
|
||||||
|
assert_eq!(storage.fetch_accumulator_state(10).await.unwrap(), None);
|
||||||
|
|
||||||
|
// Retrieve the rest of accumulator states and make sure it is what we stored.
|
||||||
|
assert_eq!(
|
||||||
|
storage.fetch_accumulator_state(20).await.unwrap().unwrap(),
|
||||||
|
AccumulatorState {
|
||||||
|
slot: 20,
|
||||||
|
accumulator_messages: None,
|
||||||
|
wormhole_merkle_state: None,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
storage.fetch_accumulator_state(30).await.unwrap().unwrap(),
|
||||||
|
AccumulatorState {
|
||||||
|
slot: 30,
|
||||||
|
accumulator_messages: None,
|
||||||
|
wormhole_merkle_state: None,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue