refactor(hermes): state->cache downcasting

This commit is contained in:
Reisen 2024-04-08 10:56:01 +00:00 committed by Reisen
parent 62d189e3b5
commit 68a2ce1221
4 changed files with 189 additions and 190 deletions

View File

@ -23,7 +23,7 @@ use {
state::{
benchmarks::Benchmarks,
cache::{
AggregateCache,
Cache,
MessageState,
MessageStateFilter,
},
@ -336,7 +336,7 @@ async fn get_verified_price_feeds<S>(
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>
where
S: AggregateCache,
S: Cache,
{
let messages = state
.fetch_message_states(
@ -396,7 +396,7 @@ pub async fn get_price_feeds_with_update_data<S>(
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>
where
S: AggregateCache,
S: Cache,
S: Benchmarks,
{
match get_verified_price_feeds(state, price_ids, request_time.clone()).await {
@ -412,7 +412,7 @@ where
pub async fn get_price_feed_ids<S>(state: &S) -> HashSet<PriceIdentifier>
where
S: AggregateCache,
S: Cache,
{
state
.message_state_keys()
@ -468,10 +468,7 @@ mod test {
Accumulator,
},
hashers::keccak256_160::Keccak160,
messages::{
Message,
PriceFeedMessage,
},
messages::PriceFeedMessage,
wire::v1::{
AccumulatorUpdateData,
Proof,

View File

@ -7,7 +7,7 @@ use {
crate::{
network::wormhole::VaaBytes,
state::cache::{
AggregateCache,
Cache,
MessageState,
},
},
@ -70,14 +70,14 @@ impl From<MessageState> for RawMessageWithMerkleProof {
}
pub async fn store_wormhole_merkle_verified_message<S>(
store: &S,
state: &S,
root: WormholeMerkleRoot,
vaa: VaaBytes,
) -> Result<()>
where
S: AggregateCache,
S: Cache,
{
store
state
.store_wormhole_merkle_state(WormholeMerkleState { root, vaa })
.await?;
Ok(())

View File

@ -1,7 +1,7 @@
//! This module contains the global state of the application.
use {
self::cache::Cache,
self::cache::CacheState,
crate::{
aggregate::{
AggregateState,
@ -31,7 +31,7 @@ pub mod cache;
pub struct State {
/// Storage is a short-lived cache of the state of all the updates that have been passed to the
/// store.
pub cache: Cache,
pub cache: CacheState,
/// Sequence numbers of lately observed Vaas. Store uses this set
/// to ignore the previously observed Vaas as a performance boost.
@ -64,7 +64,7 @@ impl State {
) -> Arc<Self> {
let mut metrics_registry = Registry::default();
Arc::new(Self {
cache: Cache::new(cache_size),
cache: CacheState::new(cache_size),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
api_update_tx: update_tx,

View File

@ -1,4 +1,5 @@
use {
super::State,
crate::aggregate::{
wormhole_merkle::WormholeMerkleState,
AccumulatorMessages,
@ -96,23 +97,184 @@ pub enum MessageStateFilter {
Only(MessageType),
}
pub struct Cache {
/// Accumulator messages cache
///
/// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
accumulator_messages_cache: Arc<RwLock<BTreeMap<Slot, AccumulatorMessages>>>,
/// A Cache of AccumulatorMessage by slot. We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
type AccumulatorMessagesCache = Arc<RwLock<BTreeMap<Slot, AccumulatorMessages>>>;
/// Wormhole merkle state cache
///
/// We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
wormhole_merkle_state_cache: Arc<RwLock<BTreeMap<Slot, WormholeMerkleState>>>,
/// A Cache of WormholeMerkleState by slot. We do not write to this cache much, so we can use a simple RwLock instead of a DashMap.
type WormholeMerkleStateCache = Arc<RwLock<BTreeMap<Slot, WormholeMerkleState>>>;
message_cache: Arc<RwLock<HashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>>,
cache_size: u64,
/// A Cache of `Time<->MessageState` by feed id.
type MessageCache = Arc<RwLock<HashMap<MessageStateKey, BTreeMap<MessageStateTime, MessageState>>>>;
/// A collection of caches for various program state.
pub struct CacheState {
accumulator_messages_cache: AccumulatorMessagesCache,
wormhole_merkle_state_cache: WormholeMerkleStateCache,
message_cache: MessageCache,
cache_size: u64,
}
impl CacheState {
pub fn new(size: u64) -> Self {
Self {
accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())),
wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())),
message_cache: Arc::new(RwLock::new(HashMap::new())),
cache_size: size,
}
}
}
/// Allow downcasting State into CacheState for functions that depend on the `Cache` service.
impl<'a> From<&'a State> for &'a CacheState {
fn from(state: &'a State) -> &'a CacheState {
&state.cache
}
}
pub trait Cache {
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>);
async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
request_time: RequestTime,
filter: MessageStateFilter,
) -> Result<Vec<MessageState>>;
async fn store_accumulator_messages(
&self,
accumulator_messages: AccumulatorMessages,
) -> Result<()>;
async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>>;
async fn store_wormhole_merkle_state(
&self,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()>;
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>;
}
impl<T> Cache for T
where
for<'a> &'a T: Into<&'a CacheState>,
T: Sync,
{
async fn message_state_keys(&self) -> Vec<MessageStateKey> {
self.into()
.message_cache
.read()
.await
.iter()
.map(|entry| entry.0.clone())
.collect::<Vec<_>>()
}
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
let mut message_cache = self.into().message_cache.write().await;
for message_state in message_states {
let key = message_state.key();
let time = message_state.time();
let cache = message_cache.entry(key).or_insert_with(BTreeMap::new);
cache.insert(time, message_state);
// Remove the earliest message states if the cache size is exceeded
while cache.len() > self.into().cache_size as usize {
cache.pop_first();
}
}
Ok(())
}
/// This method takes the current feed ids and prunes the cache for the keys
/// that are not present in the current feed ids.
///
/// There is a side-effect of this: if a key gets removed, we will
/// lose the cache for that key and cannot retrieve it for historical
/// price queries.
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>) {
let mut message_cache = self.into().message_cache.write().await;
// Sometimes, some keys are removed from the accumulator. We track which keys are not
// present in the message states and remove them from the cache.
let keys_in_cache = message_cache
.iter()
.map(|(key, _)| key.clone())
.collect::<HashSet<_>>();
for key in keys_in_cache {
if !current_keys.contains(&key) {
tracing::info!("Feed {:?} seems to be removed. Removing it from cache", key);
message_cache.remove(&key);
}
}
}
async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
request_time: RequestTime,
filter: MessageStateFilter,
) -> Result<Vec<MessageState>> {
join_all(ids.into_iter().flat_map(|id| {
let request_time = request_time.clone();
let message_types: Vec<MessageType> = match filter {
MessageStateFilter::All => MessageType::iter().collect(),
MessageStateFilter::Only(t) => vec![t],
};
message_types.into_iter().map(move |message_type| {
let key = MessageStateKey {
feed_id: id,
type_: message_type,
};
retrieve_message_state(self.into(), key, request_time.clone())
})
}))
.await
.into_iter()
.collect::<Option<Vec<_>>>()
.ok_or(anyhow!("Message not found"))
}
async fn store_accumulator_messages(
&self,
accumulator_messages: AccumulatorMessages,
) -> Result<()> {
let mut cache = self.into().accumulator_messages_cache.write().await;
cache.insert(accumulator_messages.slot, accumulator_messages);
while cache.len() > self.into().cache_size as usize {
cache.pop_first();
}
Ok(())
}
async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>> {
let cache = self.into().accumulator_messages_cache.read().await;
Ok(cache.get(&slot).cloned())
}
async fn store_wormhole_merkle_state(
&self,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()> {
let mut cache = self.into().wormhole_merkle_state_cache.write().await;
cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state);
while cache.len() > self.into().cache_size as usize {
cache.pop_first();
}
Ok(())
}
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>> {
let cache = self.into().wormhole_merkle_state_cache.read().await;
Ok(cache.get(&slot).cloned())
}
}
async fn retrieve_message_state(
cache: &Cache,
cache: &CacheState,
key: MessageStateKey,
request_time: RequestTime,
) -> Option<MessageState> {
@ -156,179 +318,19 @@ async fn retrieve_message_state(
}
}
impl Cache {
pub fn new(cache_size: u64) -> Self {
Self {
message_cache: Arc::new(RwLock::new(HashMap::new())),
accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())),
wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())),
cache_size,
}
}
}
#[async_trait::async_trait]
pub trait AggregateCache {
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>);
async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
request_time: RequestTime,
filter: MessageStateFilter,
) -> Result<Vec<MessageState>>;
async fn store_accumulator_messages(
&self,
accumulator_messages: AccumulatorMessages,
) -> Result<()>;
async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>>;
async fn store_wormhole_merkle_state(
&self,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()>;
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>;
}
#[async_trait::async_trait]
impl AggregateCache for crate::state::State {
async fn message_state_keys(&self) -> Vec<MessageStateKey> {
self.cache
.message_cache
.read()
.await
.iter()
.map(|entry| entry.0.clone())
.collect::<Vec<_>>()
}
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
let mut message_cache = self.cache.message_cache.write().await;
for message_state in message_states {
let key = message_state.key();
let time = message_state.time();
let cache = message_cache.entry(key).or_insert_with(BTreeMap::new);
cache.insert(time, message_state);
// Remove the earliest message states if the cache size is exceeded
while cache.len() > self.cache.cache_size as usize {
cache.pop_first();
}
}
Ok(())
}
/// This method takes the current feed ids and prunes the cache for the keys
/// that are not present in the current feed ids.
///
/// There is a side-effect of this: if a key gets removed, we will
/// lose the cache for that key and cannot retrieve it for historical
/// price queries.
async fn prune_removed_keys(&self, current_keys: HashSet<MessageStateKey>) {
let mut message_cache = self.cache.message_cache.write().await;
// Sometimes, some keys are removed from the accumulator. We track which keys are not
// present in the message states and remove them from the cache.
let keys_in_cache = message_cache
.iter()
.map(|(key, _)| key.clone())
.collect::<HashSet<_>>();
for key in keys_in_cache {
if !current_keys.contains(&key) {
tracing::info!("Feed {:?} seems to be removed. Removing it from cache", key);
message_cache.remove(&key);
}
}
}
async fn fetch_message_states(
&self,
ids: Vec<FeedId>,
request_time: RequestTime,
filter: MessageStateFilter,
) -> Result<Vec<MessageState>> {
join_all(ids.into_iter().flat_map(|id| {
let request_time = request_time.clone();
let message_types: Vec<MessageType> = match filter {
MessageStateFilter::All => MessageType::iter().collect(),
MessageStateFilter::Only(t) => vec![t],
};
message_types.into_iter().map(move |message_type| {
let key = MessageStateKey {
feed_id: id,
type_: message_type,
};
retrieve_message_state(&self.cache, key, request_time.clone())
})
}))
.await
.into_iter()
.collect::<Option<Vec<_>>>()
.ok_or(anyhow!("Message not found"))
}
async fn store_accumulator_messages(
&self,
accumulator_messages: AccumulatorMessages,
) -> Result<()> {
let mut cache = self.cache.accumulator_messages_cache.write().await;
cache.insert(accumulator_messages.slot, accumulator_messages);
while cache.len() > self.cache.cache_size as usize {
cache.pop_first();
}
Ok(())
}
async fn fetch_accumulator_messages(&self, slot: Slot) -> Result<Option<AccumulatorMessages>> {
let cache = self.cache.accumulator_messages_cache.read().await;
Ok(cache.get(&slot).cloned())
}
async fn store_wormhole_merkle_state(
&self,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()> {
let mut cache = self.cache.wormhole_merkle_state_cache.write().await;
cache.insert(wormhole_merkle_state.root.slot, wormhole_merkle_state);
while cache.len() > self.cache.cache_size as usize {
cache.pop_first();
}
Ok(())
}
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>> {
let cache = self.cache.wormhole_merkle_state_cache.read().await;
Ok(cache.get(&slot).cloned())
}
}
#[cfg(test)]
mod test {
use {
super::*,
crate::{
aggregate::{
wormhole_merkle::{
WormholeMerkleMessageProof,
WormholeMerkleState,
},
AccumulatorMessages,
ProofSet,
},
aggregate::wormhole_merkle::WormholeMerkleMessageProof,
state::test::setup_state,
},
pyth_sdk::UnixTimestamp,
pythnet_sdk::{
accumulators::merkle::MerklePath,
hashers::keccak256_160::Keccak160,
messages::{
Message,
PriceFeedMessage,
},
messages::PriceFeedMessage,
wire::v1::WormholeMerkleRoot,
},
};
@ -369,7 +371,7 @@ mod test {
slot: Slot,
) -> MessageState
where
S: AggregateCache,
S: Cache,
{
let message_state = create_dummy_price_feed_message_state(feed_id, publish_time, slot);
state