refactor(hermes): introduce CacheStore

This commit is contained in:
Reisen 2023-09-14 09:11:57 +00:00 committed by Ali Behjati
parent e144dd2bd0
commit 263b80b1f7
8 changed files with 164 additions and 116 deletions

5
hermes/Cargo.lock generated
View File

@ -351,9 +351,9 @@ checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae"
[[package]]
name = "async-trait"
version = "0.1.71"
version = "0.1.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a564d521dd56509c4c47480d00b80ee55f7e385ae48db5744c67ad50c92d2ebf"
checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2 1.0.66",
"quote 1.0.31",
@ -1767,6 +1767,7 @@ name = "hermes"
version = "0.1.16"
dependencies = [
"anyhow",
"async-trait",
"axum",
"axum-macros",
"base64 0.21.2",

View File

@ -4,6 +4,7 @@ version = "0.1.16"
edition = "2021"
[dependencies]
async-trait = { version = "0.1.73" }
anyhow = { version = "1.0.69" }
axum = { version = "0.6.20", features = ["json", "ws", "macros"] }
axum-macros = { version = "0.3.8" }

View File

@ -24,7 +24,7 @@ use {
pub async fn price_feed_ids(
State(state): State<crate::api::State>,
) -> Result<Json<Vec<RpcPriceIdentifier>>, RestError> {
let price_feed_ids = crate::store::get_price_feed_ids(&state.store)
let price_feed_ids = crate::store::get_price_feed_ids(&*state.store)
.await
.iter()
.map(RpcPriceIdentifier::from)

View File

@ -233,7 +233,7 @@ impl Subscriber {
binary,
}) => {
let price_ids: Vec<PriceIdentifier> = ids.into_iter().map(|id| id.into()).collect();
let available_price_ids = crate::store::get_price_feed_ids(&self.store).await;
let available_price_ids = crate::store::get_price_feed_ids(&*self.store).await;
let not_found_price_ids: Vec<&PriceIdentifier> = price_ids
.iter()

View File

@ -12,15 +12,16 @@ use std::time::{
};
use {
self::{
cache::{
Cache,
CacheStore,
MessageState,
MessageStateFilter,
},
proof::wormhole_merkle::{
construct_update_data,
WormholeMerkleState,
},
storage::{
MessageState,
MessageStateFilter,
Storage,
},
types::{
AccumulatorMessages,
PriceFeedUpdate,
@ -82,8 +83,8 @@ use {
};
pub mod benchmarks;
pub mod cache;
pub mod proof;
pub mod storage;
pub mod types;
pub mod wormhole;
@ -93,20 +94,31 @@ const READINESS_STALENESS_THRESHOLD: Duration = Duration::from_secs(30);
pub struct Store {
/// Storage is a short-lived cache of the state of all the updates that have been passed to the
/// store.
pub storage: Storage,
pub cache: Cache,
/// Sequence numbers of lately observed Vaas. Store uses this set
/// to ignore the previously observed Vaas as a performance boost.
pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
/// Wormhole guardian sets. It is used to verify Vaas before using them.
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
/// The sender to the channel between Store and Api to notify completed updates.
pub update_tx: Sender<()>,
pub update_tx: Sender<()>,
/// Time of the last completed update. This is used for the health probes.
pub last_completed_update_at: RwLock<Option<Instant>>,
/// Benchmarks endpoint
pub benchmarks_endpoint: Option<Url>,
pub benchmarks_endpoint: Option<Url>,
}
// impl CacheStore for Store {
// }
//
// impl Benchmarks for Store {
// }
impl Store {
pub fn new(
update_tx: Sender<()>,
@ -114,7 +126,7 @@ impl Store {
benchmarks_endpoint: Option<Url>,
) -> Arc<Self> {
Arc::new(Self {
storage: Storage::new(cache_size),
cache: Cache::new(cache_size),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
update_tx,
@ -180,15 +192,14 @@ pub async fn store_update(store: &Store, update: Update) -> Result<()> {
tracing::info!(slot = slot, "Storing Accumulator Messages.");
store
.storage
.store_accumulator_messages(accumulator_messages)
.await?;
slot
}
};
let accumulator_messages = store.storage.fetch_accumulator_messages(slot).await?;
let wormhole_merkle_state = store.storage.fetch_wormhole_merkle_state(slot).await?;
let accumulator_messages = store.fetch_accumulator_messages(slot).await?;
let wormhole_merkle_state = store.fetch_wormhole_merkle_state(slot).await?;
let (accumulator_messages, wormhole_merkle_state) =
match (accumulator_messages, wormhole_merkle_state) {
@ -249,7 +260,7 @@ async fn build_message_states(
tracing::info!(len = message_states.len(), "Storing Message States.");
store.storage.store_message_states(message_states).await?;
store.store_message_states(message_states).await?;
Ok(())
}
@ -259,13 +270,15 @@ pub async fn update_guardian_set(store: &Store, id: u32, guardian_set: GuardianS
guardian_sets.insert(id, guardian_set);
}
async fn get_price_feeds_with_update_data_from_storage(
store: &Store,
async fn get_verified_price_feeds<S>(
store: &S,
price_ids: Vec<PriceIdentifier>,
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData> {
) -> Result<PriceFeedsWithUpdateData>
where
S: CacheStore,
{
let messages = store
.storage
.fetch_message_states(
price_ids
.iter()
@ -321,13 +334,7 @@ pub async fn get_price_feeds_with_update_data(
price_ids: Vec<PriceIdentifier>,
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData> {
match get_price_feeds_with_update_data_from_storage(
store,
price_ids.clone(),
request_time.clone(),
)
.await
{
match get_verified_price_feeds(store, price_ids.clone(), request_time.clone()).await {
Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data),
Err(e) => {
if let RequestTime::FirstAfter(publish_time) = request_time {
@ -345,9 +352,11 @@ pub async fn get_price_feeds_with_update_data(
}
}
pub async fn get_price_feed_ids(store: &Store) -> HashSet<PriceIdentifier> {
pub async fn get_price_feed_ids<S>(store: &S) -> HashSet<PriceIdentifier>
where
S: CacheStore,
{
store
.storage
.message_state_keys()
.await
.iter()
@ -514,7 +523,7 @@ mod test {
// Check the price ids are stored correctly
assert_eq!(
get_price_feed_ids(&store).await,
get_price_feed_ids(&*store).await,
vec![PriceIdentifier::new([100; 32])].into_iter().collect()
);

View File

@ -76,6 +76,13 @@ impl TryFrom<BenchmarkUpdates> for PriceFeedsWithUpdateData {
}
}
trait Benchmarks {
fn get_verified_price_feeds(
&self,
price_ids: Vec<PriceIdentifier>,
publish_time: UnixTimestamp,
) -> Result<PriceFeedsWithUpdateData>;
}
pub async fn get_price_feeds_with_update_data_from_benchmarks(
endpoint: Url,

View File

@ -94,20 +94,79 @@ pub enum MessageStateFilter {
Only(MessageType),
}
pub struct Storage {
message_cache: Arc<DashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>,
pub struct Cache {
/// Accumulator messages cache
///
/// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
accumulator_messages_cache: Arc<RwLock<BTreeMap<Slot, AccumulatorMessages>>>,
accumulator_messages_cache: Arc<RwLock<BTreeMap<Slot, AccumulatorMessages>>>,
/// Wormhole merkle state cache
///
/// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
wormhole_merkle_state_cache: Arc<RwLock<BTreeMap<Slot, WormholeMerkleState>>>,
cache_size: u64,
message_cache: Arc<DashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>,
cache_size: u64,
}
impl Storage {
fn retrieve_message_state(
cache: &Cache,
key: MessageStateKey,
request_time: RequestTime,
) -> Option<MessageState> {
match cache.message_cache.get(&key) {
Some(key_cache) => {
match request_time {
RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(),
RequestTime::FirstAfter(time) => {
// If the requested time is before the first element in the vector, we are
// not sure that the first element is the closest one.
if let Some((_, oldest_record_value)) = key_cache.first_key_value() {
if time < oldest_record_value.time().publish_time {
return None;
}
}
let lookup_time = MessageStateTime {
publish_time: time,
slot: 0,
};
// Get the first element that is greater than or equal to the lookup time.
key_cache
.lower_bound(Bound::Included(&lookup_time))
.value()
.cloned()
}
}
}
None => None,
}
}
#[async_trait::async_trait]
pub trait CacheStore {
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
request_time: RequestTime,
filter: MessageStateFilter,
) -> Result<Vec<MessageState>>;
async fn store_accumulator_messages(
&self,
accumulator_messages: AccumulatorMessages,
) -> Result<()>;
async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>>;
async fn store_wormhole_merkle_state(
&self,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()>;
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>;
}
impl Cache {
pub fn new(cache_size: u64) -> Self {
Self {
message_cache: Arc::new(DashMap::new()),
@ -116,66 +175,39 @@ impl Storage {
cache_size,
}
}
}
pub async fn message_state_keys(&self) -> Vec<MessageStateKey> {
self.message_cache
#[async_trait::async_trait]
impl CacheStore for crate::store::Store {
async fn message_state_keys(&self) -> Vec<MessageStateKey> {
self.cache
.message_cache
.iter()
.map(|entry| entry.key().clone())
.collect::<Vec<_>>()
}
pub async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
for message_state in message_states {
let key = message_state.key();
let time = message_state.time();
let mut cache = self.message_cache.entry(key).or_insert_with(BTreeMap::new);
let mut cache = self
.cache
.message_cache
.entry(key)
.or_insert_with(BTreeMap::new);
cache.insert(time, message_state);
// Remove the earliest message states if the cache size is exceeded
while cache.len() > self.cache_size as usize {
while cache.len() > self.cache.cache_size as usize {
cache.pop_first();
}
}
Ok(())
}
fn retrieve_message_state(
&self,
key: MessageStateKey,
request_time: RequestTime,
) -> Option<MessageState> {
match self.message_cache.get(&key) {
Some(key_cache) => {
match request_time {
RequestTime::Latest => key_cache.last_key_value().map(|(_, v)| v).cloned(),
RequestTime::FirstAfter(time) => {
// If the requested time is before the first element in the vector, we are
// not sure that the first element is the closest one.
if let Some((_, oldest_record_value)) = key_cache.first_key_value() {
if time < oldest_record_value.time().publish_time {
return None;
}
}
let lookup_time = MessageStateTime {
publish_time: time,
slot: 0,
};
// Get the first element that is greater than or equal to the lookup time.
key_cache
.lower_bound(Bound::Included(&lookup_time))
.value()
.cloned()
}
}
}
None => None,
}
}
pub async fn fetch_message_states(
async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
request_time: RequestTime,
@ -194,50 +226,44 @@ impl Storage {
feed_id: id,
type_: message_type,
};
self.retrieve_message_state(key, request_time.clone())
retrieve_message_state(&self.cache, key, request_time.clone())
.ok_or(anyhow!("Message not found"))
})
})
.collect()
}
pub async fn store_accumulator_messages(
async fn store_accumulator_messages(
&self,
accumulator_messages: AccumulatorMessages,
) -> Result<()> {
let mut cache = self.accumulator_messages_cache.write().await;
let mut cache = self.cache.accumulator_messages_cache.write().await;
cache.insert(accumulator_messages.slot, accumulator_messages);
while cache.len() > self.cache_size as usize {
while cache.len() > self.cache.cache_size as usize {
cache.pop_first();
}
Ok(())
}
pub async fn fetch_accumulator_messages(
&self,
slot: Slot,
) -> Result<Option<AccumulatorMessages>> {
let cache = self.accumulator_messages_cache.read().await;
async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>> {
let cache = self.cache.accumulator_messages_cache.read().await;
Ok(cache.get(&slot).cloned())
}
pub async fn store_wormhole_merkle_state(
async fn store_wormhole_merkle_state(
&self,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()> {
let mut cache = self.wormhole_merkle_state_cache.write().await;
let mut cache = self.cache.wormhole_merkle_state_cache.write().await;
cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state);
while cache.len() > self.cache_size as usize {
while cache.len() > self.cache.cache_size as usize {
cache.pop_first();
}
Ok(())
}
pub async fn fetch_wormhole_merkle_state(
&self,
slot: Slot,
) -> Result<Option<WormholeMerkleState>> {
let cache = self.wormhole_merkle_state_cache.read().await;
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>> {
let cache = self.cache.wormhole_merkle_state_cache.read().await;
Ok(cache.get(&slot).cloned())
}
}
@ -298,7 +324,7 @@ mod test {
#[cfg(test)]
pub async fn create_and_store_dummy_price_feed_message_state(
storage: &Storage,
storage: &Cache,
feed_id: FeedId,
publish_time: UnixTimestamp,
slot: Slot,
@ -314,7 +340,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_retrieve_latest_message_state_works() {
// Initialize a storage with a cache size of 2 per key.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Create and store a message state with feed id [1....] and publish time 10 at slot 5.
let message_state =
@ -337,7 +363,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_retrieve_latest_message_state_with_multiple_update_works() {
// Initialize a storage with a cache size of 2 per key.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Create and store a message state with feed id [1....] and publish time 10 at slot 5.
let _old_message_state =
@ -364,7 +390,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_retrieve_latest_message_state_with_out_of_order_update_works() {
// Initialize a storage with a cache size of 2 per key.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Create and store a message state with feed id [1....] and publish time 20 at slot 10.
let new_message_state =
@ -391,7 +417,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_retrieve_first_after_message_state_works() {
// Initialize a storage with a cache size of 2 per key.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Create and store a message state with feed id [1....] and publish time 10 at slot 5.
let old_message_state =
@ -433,7 +459,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_retrieve_latest_message_state_with_same_pubtime_works() {
// Initialize a storage with a cache size of 2 per key.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Create and store a message state with feed id [1....] and publish time 10 at slot 5.
let slightly_older_message_state =
@ -474,7 +500,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_retrieve_first_after_message_state_fails_for_past_time() {
// Initialize a storage with a cache size of 2 per key.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Create and store a message state with feed id [1....] and publish time 10 at slot 5.
create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
@ -497,7 +523,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_retrieve_first_after_message_state_fails_for_future_time() {
// Initialize a storage with a cache size of 2 per key.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Create and store a message state with feed id [1....] and publish time 10 at slot 5.
create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
@ -519,7 +545,7 @@ mod test {
#[tokio::test]
pub async fn test_store_more_message_states_than_cache_size_evicts_old_messages() {
// Initialize a storage with a cache size of 2 per key.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Create and store a message state with feed id [1....] and publish time 10 at slot 5.
create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
@ -544,7 +570,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_fetch_multiple_message_feed_ids_works() {
// Initialize a storage with a cache size of 1 per key.
let storage = Storage::new(1);
let storage = Cache::new(1);
// Create and store a message state with feed id [1....] and publish time 10 at slot 5.
let message_state_1 =
@ -571,7 +597,7 @@ mod test {
#[tokio::test]
pub async fn test_fetch_not_existent_message_fails() {
// Initialize a storage with a cache size of 2 per key.
let storage = Storage::new(2);
let storage = Cache::new(2);
create_and_store_dummy_price_feed_message_state(&storage, [1; 32], 10, 5).await;
@ -598,7 +624,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_fetch_accumulator_messages_works() {
// Initialize a storage with a cache size of 2 per key and the accumulator state.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Make sure the retrieved accumulator messages is what we store.
let mut accumulator_messages_at_10 = create_empty_accumulator_messages_at_slot(10);
@ -680,7 +706,7 @@ mod test {
#[tokio::test]
pub async fn test_store_and_fetch_wormhole_merkle_state_works() {
// Initialize a storage with a cache size of 2 per key and the accumulator state.
let storage = Storage::new(2);
let storage = Cache::new(2);
// Make sure the retrieved wormhole merkle state is what we store
let mut wormhole_merkle_state_at_10 = create_empty_wormhole_merkle_state_at_slot(10);

View File

@ -1,12 +1,14 @@
use {
crate::store::{
storage::MessageState,
cache::{
CacheStore,
MessageState,
},
types::{
AccumulatorMessages,
RawMessage,
Slot,
},
Store,
},
anyhow::{
anyhow,
@ -68,13 +70,15 @@ impl From<MessageState> for RawMessageWithMerkleProof {
}
}
pub async fn store_wormhole_merkle_verified_message(
store: &Store,
pub async fn store_wormhole_merkle_verified_message<S>(
store: &S,
root: WormholeMerkleRoot,
vaa: Vaa,
) -> Result<()> {
) -> Result<()>
where
S: CacheStore,
{
store
.storage
.store_wormhole_merkle_state(WormholeMerkleState { root, vaa })
.await?;
Ok(())