refactor(hermes): removed `impl Store`

This commit is contained in:
Reisen 2023-09-13 13:32:27 +00:00 committed by Ali Behjati
parent f2ca88176a
commit e144dd2bd0
11 changed files with 336 additions and 331 deletions

View File

@ -65,9 +65,8 @@ pub async fn get_price_feed(
) -> Result<Json<RpcPriceFeed>, RestError> { ) -> Result<Json<RpcPriceFeed>, RestError> {
let price_id: PriceIdentifier = params.id.into(); let price_id: PriceIdentifier = params.id.into();
let price_feeds_with_update_data = state let price_feeds_with_update_data = crate::store::get_price_feeds_with_update_data(
.store &state.store,
.get_price_feeds_with_update_data(
vec![price_id], vec![price_id],
RequestTime::FirstAfter(params.publish_time), RequestTime::FirstAfter(params.publish_time),
) )

View File

@ -5,11 +5,14 @@ use {
types::PriceIdInput, types::PriceIdInput,
}, },
doc_examples, doc_examples,
store::types::{ store::{
self,
types::{
RequestTime, RequestTime,
UnixTimestamp, UnixTimestamp,
}, },
}, },
},
anyhow::Result, anyhow::Result,
axum::{ axum::{
extract::State, extract::State,
@ -72,9 +75,8 @@ pub async fn get_vaa(
) -> Result<Json<GetVaaResponse>, RestError> { ) -> Result<Json<GetVaaResponse>, RestError> {
let price_id: PriceIdentifier = params.id.into(); let price_id: PriceIdentifier = params.id.into();
let price_feeds_with_update_data = state let price_feeds_with_update_data = store::get_price_feeds_with_update_data(
.store &state.store,
.get_price_feeds_with_update_data(
vec![price_id], vec![price_id],
RequestTime::FirstAfter(params.publish_time), RequestTime::FirstAfter(params.publish_time),
) )

View File

@ -68,9 +68,11 @@ pub async fn get_vaa_ccip(
.map_err(|_| RestError::InvalidCCIPInput)?, .map_err(|_| RestError::InvalidCCIPInput)?,
); );
let price_feeds_with_update_data = state let price_feeds_with_update_data = crate::store::get_price_feeds_with_update_data(
.store &state.store,
.get_price_feeds_with_update_data(vec![price_id], RequestTime::FirstAfter(publish_time)) vec![price_id],
RequestTime::FirstAfter(publish_time),
)
.await .await
.map_err(|_| RestError::CcipUpdateDataNotFound)?; .map_err(|_| RestError::CcipUpdateDataNotFound)?;

View File

@ -63,9 +63,11 @@ pub async fn latest_price_feeds(
QsQuery(params): QsQuery<LatestPriceFeedsQueryParams>, QsQuery(params): QsQuery<LatestPriceFeedsQueryParams>,
) -> Result<Json<Vec<RpcPriceFeed>>, RestError> { ) -> Result<Json<Vec<RpcPriceFeed>>, RestError> {
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect(); let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
let price_feeds_with_update_data = state let price_feeds_with_update_data = crate::store::get_price_feeds_with_update_data(
.store &state.store,
.get_price_feeds_with_update_data(price_ids, RequestTime::Latest) price_ids,
RequestTime::Latest,
)
.await .await
.map_err(|_| RestError::UpdateDataNotFound)?; .map_err(|_| RestError::UpdateDataNotFound)?;

View File

@ -58,9 +58,11 @@ pub async fn latest_vaas(
QsQuery(params): QsQuery<LatestVaasQueryParams>, QsQuery(params): QsQuery<LatestVaasQueryParams>,
) -> Result<Json<Vec<String>>, RestError> { ) -> Result<Json<Vec<String>>, RestError> {
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect(); let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
let price_feeds_with_update_data = state let price_feeds_with_update_data = crate::store::get_price_feeds_with_update_data(
.store &state.store,
.get_price_feeds_with_update_data(price_ids, RequestTime::Latest) price_ids,
RequestTime::Latest,
)
.await .await
.map_err(|_| RestError::UpdateDataNotFound)?; .map_err(|_| RestError::UpdateDataNotFound)?;

View File

@ -24,9 +24,7 @@ use {
pub async fn price_feed_ids( pub async fn price_feed_ids(
State(state): State<crate::api::State>, State(state): State<crate::api::State>,
) -> Result<Json<Vec<RpcPriceIdentifier>>, RestError> { ) -> Result<Json<Vec<RpcPriceIdentifier>>, RestError> {
let price_feed_ids = state let price_feed_ids = crate::store::get_price_feed_ids(&state.store)
.store
.get_price_feed_ids()
.await .await
.iter() .iter()
.map(RpcPriceIdentifier::from) .map(RpcPriceIdentifier::from)

View File

@ -8,7 +8,7 @@ use axum::{
}; };
pub async fn ready(State(state): State<crate::api::State>) -> Response { pub async fn ready(State(state): State<crate::api::State>) -> Response {
match state.store.is_ready().await { match crate::store::is_ready(&state.store).await {
true => (StatusCode::OK, "OK").into_response(), true => (StatusCode::OK, "OK").into_response(),
false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(), false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(),
} }

View File

@ -149,9 +149,11 @@ impl Subscriber {
async fn handle_price_feeds_update(&mut self) -> Result<()> { async fn handle_price_feeds_update(&mut self) -> Result<()> {
let price_feed_ids = self.price_feeds_with_config.keys().cloned().collect(); let price_feed_ids = self.price_feeds_with_config.keys().cloned().collect();
for update in self for update in crate::store::get_price_feeds_with_update_data(
.store &self.store,
.get_price_feeds_with_update_data(price_feed_ids, RequestTime::Latest) price_feed_ids,
RequestTime::Latest,
)
.await? .await?
.price_feeds .price_feeds
{ {
@ -231,7 +233,7 @@ impl Subscriber {
binary, binary,
}) => { }) => {
let price_ids: Vec<PriceIdentifier> = ids.into_iter().map(|id| id.into()).collect(); let price_ids: Vec<PriceIdentifier> = ids.into_iter().map(|id| id.into()).collect();
let available_price_ids = self.store.get_price_feed_ids().await; let available_price_ids = crate::store::get_price_feed_ids(&self.store).await;
let not_found_price_ids: Vec<&PriceIdentifier> = price_ids let not_found_price_ids: Vec<&PriceIdentifier> = price_ids
.iter() .iter()

View File

@ -213,7 +213,7 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
let store = store.clone(); let store = store.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = store.store_update(Update::Vaa(vaa)).await { if let Err(e) = crate::store::store_update(&store, Update::Vaa(vaa)).await {
tracing::error!(error = ?e, "Failed to process VAA."); tracing::error!(error = ?e, "Failed to process VAA.");
} }
}); });

View File

@ -175,8 +175,10 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<()> {
if candidate.to_string() == update.value.pubkey { if candidate.to_string() == update.value.pubkey {
let store = store.clone(); let store = store.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(err) = store if let Err(err) = crate::store::store_update(
.store_update(Update::AccumulatorMessages(accumulator_messages)) &store,
Update::AccumulatorMessages(accumulator_messages),
)
.await .await
{ {
tracing::error!(error = ?err, "Failed to store accumulator messages."); tracing::error!(error = ?err, "Failed to store accumulator messages.");
@ -228,9 +230,7 @@ async fn fetch_existing_guardian_sets(
"Retrieved Current GuardianSet.", "Retrieved Current GuardianSet.",
); );
store crate::store::update_guardian_set(&store, bridge.guardian_set_index, current).await;
.update_guardian_set(bridge.guardian_set_index, current)
.await;
// If there are more than one guardian set, we want to fetch the previous one as well as it // If there are more than one guardian set, we want to fetch the previous one as well as it
// may still be in transition phase if a guardian upgrade has just occurred. // may still be in transition phase if a guardian upgrade has just occurred.
@ -248,9 +248,7 @@ async fn fetch_existing_guardian_sets(
"Retrieved Previous GuardianSet.", "Retrieved Previous GuardianSet.",
); );
store crate::store::update_guardian_set(&store, bridge.guardian_set_index - 1, previous).await;
.update_guardian_set(bridge.guardian_set_index - 1, previous)
.await;
} }
Ok(()) Ok(())

View File

@ -91,20 +91,17 @@ const OBSERVED_CACHE_SIZE: usize = 1000;
const READINESS_STALENESS_THRESHOLD: Duration = Duration::from_secs(30); const READINESS_STALENESS_THRESHOLD: Duration = Duration::from_secs(30);
pub struct Store { pub struct Store {
/// Storage is a short-lived cache of the state of all the updates /// Storage is a short-lived cache of the state of all the updates that have been passed to the
/// that have been passed to the store. /// store.
pub storage: Storage, pub storage: Storage,
/// Sequence numbers of lately observed Vaas. Store uses this set /// Sequence numbers of lately observed Vaas. Store uses this set
/// to ignore the previously observed Vaas as a performance boost. /// to ignore the previously observed Vaas as a performance boost.
pub observed_vaa_seqs: RwLock<BTreeSet<u64>>, pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
/// Wormhole guardian sets. It is used to verify Vaas before using /// Wormhole guardian sets. It is used to verify Vaas before using them.
/// them.
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>, pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
/// The sender to the channel between Store and Api to notify /// The sender to the channel between Store and Api to notify completed updates.
/// completed updates.
pub update_tx: Sender<()>, pub update_tx: Sender<()>,
/// Time of the last completed update. This is used for the health /// Time of the last completed update. This is used for the health probes.
/// probes.
pub last_completed_update_at: RwLock<Option<Instant>>, pub last_completed_update_at: RwLock<Option<Instant>>,
/// Benchmarks endpoint /// Benchmarks endpoint
pub benchmarks_endpoint: Option<Url>, pub benchmarks_endpoint: Option<Url>,
@ -125,23 +122,23 @@ impl Store {
benchmarks_endpoint, benchmarks_endpoint,
}) })
} }
}
/// Stores the update data in the store /// Stores the update data in the store
#[tracing::instrument(skip(self, update))] #[tracing::instrument(skip(store, update))]
pub async fn store_update(&self, update: Update) -> Result<()> { pub async fn store_update(store: &Store, update: Update) -> Result<()> {
// The slot that the update is originating from. It should be available // The slot that the update is originating from. It should be available
// in all the updates. // in all the updates.
let slot = match update { let slot = match update {
Update::Vaa(update_vaa) => { Update::Vaa(update_vaa) => {
// FIXME: Move to wormhole.rs // FIXME: Move to wormhole.rs
let vaa = let vaa = serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(&update_vaa)?;
serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(&update_vaa)?;
if self.observed_vaa_seqs.read().await.contains(&vaa.sequence) { if store.observed_vaa_seqs.read().await.contains(&vaa.sequence) {
return Ok(()); // Ignore VAA if we have already seen it return Ok(()); // Ignore VAA if we have already seen it
} }
let vaa = verify_vaa(self, vaa).await; let vaa = verify_vaa(store, vaa).await;
let vaa = match vaa { let vaa = match vaa {
Ok(vaa) => vaa, Ok(vaa) => vaa,
@ -152,7 +149,7 @@ impl Store {
}; };
{ {
let mut observed_vaa_seqs = self.observed_vaa_seqs.write().await; let mut observed_vaa_seqs = store.observed_vaa_seqs.write().await;
if observed_vaa_seqs.contains(&vaa.sequence) { if observed_vaa_seqs.contains(&vaa.sequence) {
return Ok(()); // Ignore VAA if we have already seen it return Ok(()); // Ignore VAA if we have already seen it
} }
@ -167,7 +164,7 @@ impl Store {
tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof."); tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");
store_wormhole_merkle_verified_message( store_wormhole_merkle_verified_message(
self, store,
proof.clone(), proof.clone(),
update_vaa.to_owned(), update_vaa.to_owned(),
) )
@ -182,15 +179,16 @@ impl Store {
let slot = accumulator_messages.slot; let slot = accumulator_messages.slot;
tracing::info!(slot = slot, "Storing Accumulator Messages."); tracing::info!(slot = slot, "Storing Accumulator Messages.");
self.storage store
.storage
.store_accumulator_messages(accumulator_messages) .store_accumulator_messages(accumulator_messages)
.await?; .await?;
slot slot
} }
}; };
let accumulator_messages = self.storage.fetch_accumulator_messages(slot).await?; let accumulator_messages = store.storage.fetch_accumulator_messages(slot).await?;
let wormhole_merkle_state = self.storage.fetch_wormhole_merkle_state(slot).await?; let wormhole_merkle_state = store.storage.fetch_wormhole_merkle_state(slot).await?;
let (accumulator_messages, wormhole_merkle_state) = let (accumulator_messages, wormhole_merkle_state) =
match (accumulator_messages, wormhole_merkle_state) { match (accumulator_messages, wormhole_merkle_state) {
@ -204,30 +202,29 @@ impl Store {
// Once the accumulator reaches a complete state for a specific slot // Once the accumulator reaches a complete state for a specific slot
// we can build the message states // we can build the message states
self.build_message_states(accumulator_messages, wormhole_merkle_state) build_message_states(store, accumulator_messages, wormhole_merkle_state).await?;
.await?;
self.update_tx.send(()).await?; store.update_tx.send(()).await?;
self.last_completed_update_at store
.last_completed_update_at
.write() .write()
.await .await
.replace(Instant::now()); .replace(Instant::now());
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, accumulator_messages, wormhole_merkle_state))] #[tracing::instrument(skip(store, accumulator_messages, wormhole_merkle_state))]
async fn build_message_states( async fn build_message_states(
&self, store: &Store,
accumulator_messages: AccumulatorMessages, accumulator_messages: AccumulatorMessages,
wormhole_merkle_state: WormholeMerkleState, wormhole_merkle_state: WormholeMerkleState,
) -> Result<()> { ) -> Result<()> {
let wormhole_merkle_message_states_proofs = let wormhole_merkle_message_states_proofs =
construct_message_states_proofs(&accumulator_messages, &wormhole_merkle_state)?; construct_message_states_proofs(&accumulator_messages, &wormhole_merkle_state)?;
let current_time: UnixTimestamp = let current_time: UnixTimestamp = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _;
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _;
let message_states = accumulator_messages let message_states = accumulator_messages
.raw_messages .raw_messages
@ -252,22 +249,22 @@ impl Store {
tracing::info!(len = message_states.len(), "Storing Message States."); tracing::info!(len = message_states.len(), "Storing Message States.");
self.storage.store_message_states(message_states).await?; store.storage.store_message_states(message_states).await?;
Ok(()) Ok(())
} }
pub async fn update_guardian_set(&self, id: u32, guardian_set: GuardianSet) { pub async fn update_guardian_set(store: &Store, id: u32, guardian_set: GuardianSet) {
let mut guardian_sets = self.guardian_set.write().await; let mut guardian_sets = store.guardian_set.write().await;
guardian_sets.insert(id, guardian_set); guardian_sets.insert(id, guardian_set);
} }
async fn get_price_feeds_with_update_data_from_storage( async fn get_price_feeds_with_update_data_from_storage(
&self, store: &Store,
price_ids: Vec<PriceIdentifier>, price_ids: Vec<PriceIdentifier>,
request_time: RequestTime, request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData> { ) -> Result<PriceFeedsWithUpdateData> {
let messages = self let messages = store
.storage .storage
.fetch_message_states( .fetch_message_states(
price_ids price_ids
@ -317,21 +314,24 @@ impl Store {
price_feeds, price_feeds,
update_data, update_data,
}) })
} }
pub async fn get_price_feeds_with_update_data( pub async fn get_price_feeds_with_update_data(
&self, store: &Store,
price_ids: Vec<PriceIdentifier>, price_ids: Vec<PriceIdentifier>,
request_time: RequestTime, request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData> { ) -> Result<PriceFeedsWithUpdateData> {
match self match get_price_feeds_with_update_data_from_storage(
.get_price_feeds_with_update_data_from_storage(price_ids.clone(), request_time.clone()) store,
price_ids.clone(),
request_time.clone(),
)
.await .await
{ {
Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data), Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data),
Err(e) => { Err(e) => {
if let RequestTime::FirstAfter(publish_time) = request_time { if let RequestTime::FirstAfter(publish_time) = request_time {
if let Some(endpoint) = &self.benchmarks_endpoint { if let Some(endpoint) = &store.benchmarks_endpoint {
return benchmarks::get_price_feeds_with_update_data_from_benchmarks( return benchmarks::get_price_feeds_with_update_data_from_benchmarks(
endpoint.clone(), endpoint.clone(),
price_ids, price_ids,
@ -343,26 +343,26 @@ impl Store {
Err(e) Err(e)
} }
} }
} }
pub async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier> { pub async fn get_price_feed_ids(store: &Store) -> HashSet<PriceIdentifier> {
self.storage store
.storage
.message_state_keys() .message_state_keys()
.await .await
.iter() .iter()
.map(|key| PriceIdentifier::new(key.feed_id)) .map(|key| PriceIdentifier::new(key.feed_id))
.collect() .collect()
} }
pub async fn is_ready(&self) -> bool { pub async fn is_ready(store: &Store) -> bool {
let last_completed_update_at = self.last_completed_update_at.read().await; let last_completed_update_at = store.last_completed_update_at.read().await;
match last_completed_update_at.as_ref() { match last_completed_update_at.as_ref() {
Some(last_completed_update_at) => { Some(last_completed_update_at) => {
last_completed_update_at.elapsed() < READINESS_STALENESS_THRESHOLD last_completed_update_at.elapsed() < READINESS_STALENESS_THRESHOLD
} }
None => false, None => false,
} }
}
} }
#[cfg(test)] #[cfg(test)]
@ -478,8 +478,8 @@ mod test {
let store = Store::new(update_tx, cache_size, None); let store = Store::new(update_tx, cache_size, None);
// Add an initial guardian set with public key 0 // Add an initial guardian set with public key 0
store update_guardian_set(
.update_guardian_set( &store,
0, 0,
GuardianSet { GuardianSet {
keys: vec![[0; 20]], keys: vec![[0; 20]],
@ -491,7 +491,7 @@ mod test {
} }
pub async fn store_multiple_concurrent_valid_updates(store: Arc<Store>, updates: Vec<Update>) { pub async fn store_multiple_concurrent_valid_updates(store: Arc<Store>, updates: Vec<Update>) {
let res = join_all(updates.into_iter().map(|u| store.store_update(u))).await; let res = join_all(updates.into_iter().map(|u| store_update(&store, u))).await;
// Check that all store_update calls succeeded // Check that all store_update calls succeeded
assert!(res.into_iter().all(|r| r.is_ok())); assert!(res.into_iter().all(|r| r.is_ok()));
} }
@ -514,14 +514,14 @@ mod test {
// Check the price ids are stored correctly // Check the price ids are stored correctly
assert_eq!( assert_eq!(
store.get_price_feed_ids().await, get_price_feed_ids(&store).await,
vec![PriceIdentifier::new([100; 32])].into_iter().collect() vec![PriceIdentifier::new([100; 32])].into_iter().collect()
); );
// Check get_price_feeds_with_update_data retrieves the correct // Check get_price_feeds_with_update_data retrieves the correct
// price feed with correct update data. // price feed with correct update data.
let price_feeds_with_update_data = store let price_feeds_with_update_data = get_price_feeds_with_update_data(
.get_price_feeds_with_update_data( &store,
vec![PriceIdentifier::new([100; 32])], vec![PriceIdentifier::new([100; 32])],
RequestTime::Latest, RequestTime::Latest,
) )
@ -640,8 +640,8 @@ mod test {
MockClock::advance(Duration::from_secs(1)); MockClock::advance(Duration::from_secs(1));
// Get the price feeds with update data // Get the price feeds with update data
let price_feeds_with_update_data = store let price_feeds_with_update_data = get_price_feeds_with_update_data(
.get_price_feeds_with_update_data( &store,
vec![PriceIdentifier::new([100; 32])], vec![PriceIdentifier::new([100; 32])],
RequestTime::Latest, RequestTime::Latest,
) )
@ -656,13 +656,13 @@ mod test {
); );
// Check the store is ready // Check the store is ready
assert!(store.is_ready().await); assert!(is_ready(&store).await);
// Advance the clock to make the prices stale // Advance the clock to make the prices stale
MockClock::advance_system_time(READINESS_STALENESS_THRESHOLD); MockClock::advance_system_time(READINESS_STALENESS_THRESHOLD);
MockClock::advance(READINESS_STALENESS_THRESHOLD); MockClock::advance(READINESS_STALENESS_THRESHOLD);
// Check the store is not ready // Check the store is not ready
assert!(!store.is_ready().await); assert!(!is_ready(&store).await);
} }
/// Test that the store retains the latest slots upon cache eviction. /// Test that the store retains the latest slots upon cache eviction.
@ -705,8 +705,8 @@ mod test {
// Check the last 100 slots are retained // Check the last 100 slots are retained
for slot in 900..1000 { for slot in 900..1000 {
let price_feeds_with_update_data = store let price_feeds_with_update_data = get_price_feeds_with_update_data(
.get_price_feeds_with_update_data( &store,
vec![ vec![
PriceIdentifier::new([100; 32]), PriceIdentifier::new([100; 32]),
PriceIdentifier::new([200; 32]), PriceIdentifier::new([200; 32]),
@ -722,8 +722,8 @@ mod test {
// Check nothing else is retained // Check nothing else is retained
for slot in 0..900 { for slot in 0..900 {
assert!(store assert!(get_price_feeds_with_update_data(
.get_price_feeds_with_update_data( &store,
vec![ vec![
PriceIdentifier::new([100; 32]), PriceIdentifier::new([100; 32]),
PriceIdentifier::new([200; 32]), PriceIdentifier::new([200; 32]),