refactor(hermes): rename store to aggregate

This commit is contained in:
Ali Behjati 2023-09-14 17:08:51 +02:00
parent cbeada6c6d
commit 09e2b17d1c
22 changed files with 229 additions and 216 deletions

View File

@ -12,13 +12,6 @@ use std::time::{
};
use {
self::{
benchmarks::Benchmarks,
cache::{
Cache,
CacheStore,
MessageState,
MessageStateFilter,
},
proof::wormhole_merkle::{
construct_update_data,
WormholeMerkleState,
@ -32,16 +25,27 @@ use {
},
wormhole::GuardianSet,
},
crate::store::{
proof::wormhole_merkle::{
construct_message_states_proofs,
store_wormhole_merkle_verified_message,
crate::{
aggregate::{
proof::wormhole_merkle::{
construct_message_states_proofs,
store_wormhole_merkle_verified_message,
},
types::{
ProofSet,
UnixTimestamp,
},
wormhole::verify_vaa,
},
types::{
ProofSet,
UnixTimestamp,
state::{
benchmarks::Benchmarks,
cache::{
CacheStore,
MessageState,
MessageStateFilter,
},
State,
},
wormhole::verify_vaa,
},
anyhow::{
anyhow,
@ -66,25 +70,13 @@ use {
},
},
},
reqwest::Url,
std::{
collections::{
BTreeMap,
BTreeSet,
HashSet,
},
sync::Arc,
collections::HashSet,
time::Duration,
},
tokio::sync::{
mpsc::Sender,
RwLock,
},
wormhole_sdk::Vaa,
};
pub mod benchmarks;
pub mod cache;
pub mod proof;
pub mod types;
pub mod wormhole;
@ -92,54 +84,9 @@ pub mod wormhole;
const OBSERVED_CACHE_SIZE: usize = 1000;
const READINESS_STALENESS_THRESHOLD: Duration = Duration::from_secs(30);
pub struct Store {
/// Storage is a short-lived cache of the state of all the updates that have been passed to the
/// store.
pub cache: Cache,
/// Sequence numbers of lately observed Vaas. Store uses this set
/// to ignore the previously observed Vaas as a performance boost.
pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
/// Wormhole guardian sets. It is used to verify Vaas before using them.
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
/// The sender to the channel between Store and Api to notify completed updates.
pub update_tx: Sender<()>,
/// Time of the last completed update. This is used for the health probes.
pub last_completed_update_at: RwLock<Option<Instant>>,
/// Benchmarks endpoint
pub benchmarks_endpoint: Option<Url>,
}
// impl CacheStore for Store {
// }
//
// impl Benchmarks for Store {
// }
impl Store {
pub fn new(
update_tx: Sender<()>,
cache_size: u64,
benchmarks_endpoint: Option<Url>,
) -> Arc<Self> {
Arc::new(Self {
cache: Cache::new(cache_size),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
update_tx,
last_completed_update_at: RwLock::new(None),
benchmarks_endpoint,
})
}
}
/// Stores the update data in the store
#[tracing::instrument(skip(store, update))]
pub async fn store_update(store: &Store, update: Update) -> Result<()> {
#[tracing::instrument(skip(state, update))]
pub async fn store_update(state: &State, update: Update) -> Result<()> {
// The slot that the update is originating from. It should be available
// in all the updates.
let slot = match update {
@ -147,11 +94,11 @@ pub async fn store_update(store: &Store, update: Update) -> Result<()> {
// FIXME: Move to wormhole.rs
let vaa = serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(&update_vaa)?;
if store.observed_vaa_seqs.read().await.contains(&vaa.sequence) {
if state.observed_vaa_seqs.read().await.contains(&vaa.sequence) {
return Ok(()); // Ignore VAA if we have already seen it
}
let vaa = verify_vaa(store, vaa).await;
let vaa = verify_vaa(state, vaa).await;
let vaa = match vaa {
Ok(vaa) => vaa,
@ -162,7 +109,7 @@ pub async fn store_update(store: &Store, update: Update) -> Result<()> {
};
{
let mut observed_vaa_seqs = store.observed_vaa_seqs.write().await;
let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await;
if observed_vaa_seqs.contains(&vaa.sequence) {
return Ok(()); // Ignore VAA if we have already seen it
}
@ -177,7 +124,7 @@ pub async fn store_update(store: &Store, update: Update) -> Result<()> {
tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");
store_wormhole_merkle_verified_message(
store,
state,
proof.clone(),
update_vaa.to_owned(),
)
@ -192,15 +139,15 @@ pub async fn store_update(store: &Store, update: Update) -> Result<()> {
let slot = accumulator_messages.slot;
tracing::info!(slot = slot, "Storing Accumulator Messages.");
store
state
.store_accumulator_messages(accumulator_messages)
.await?;
slot
}
};
let accumulator_messages = store.fetch_accumulator_messages(slot).await?;
let wormhole_merkle_state = store.fetch_wormhole_merkle_state(slot).await?;
let accumulator_messages = state.fetch_accumulator_messages(slot).await?;
let wormhole_merkle_state = state.fetch_wormhole_merkle_state(slot).await?;
let (accumulator_messages, wormhole_merkle_state) =
match (accumulator_messages, wormhole_merkle_state) {
@ -214,11 +161,11 @@ pub async fn store_update(store: &Store, update: Update) -> Result<()> {
// Once the accumulator reaches a complete state for a specific slot
// we can build the message states
build_message_states(store, accumulator_messages, wormhole_merkle_state).await?;
build_message_states(state, accumulator_messages, wormhole_merkle_state).await?;
store.update_tx.send(()).await?;
state.update_tx.send(()).await?;
store
state
.last_completed_update_at
.write()
.await
@ -227,9 +174,9 @@ pub async fn store_update(store: &Store, update: Update) -> Result<()> {
Ok(())
}
#[tracing::instrument(skip(store, accumulator_messages, wormhole_merkle_state))]
#[tracing::instrument(skip(state, accumulator_messages, wormhole_merkle_state))]
async fn build_message_states(
store: &Store,
state: &State,
accumulator_messages: AccumulatorMessages,
wormhole_merkle_state: WormholeMerkleState,
) -> Result<()> {
@ -261,25 +208,25 @@ async fn build_message_states(
tracing::info!(len = message_states.len(), "Storing Message States.");
store.store_message_states(message_states).await?;
state.store_message_states(message_states).await?;
Ok(())
}
pub async fn update_guardian_set(store: &Store, id: u32, guardian_set: GuardianSet) {
let mut guardian_sets = store.guardian_set.write().await;
pub async fn update_guardian_set(state: &State, id: u32, guardian_set: GuardianSet) {
let mut guardian_sets = state.guardian_set.write().await;
guardian_sets.insert(id, guardian_set);
}
async fn get_verified_price_feeds<S>(
store: &S,
state: &S,
price_ids: Vec<PriceIdentifier>,
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>
where
S: CacheStore,
{
let messages = store
let messages = state
.fetch_message_states(
price_ids
.iter()
@ -331,7 +278,7 @@ where
}
pub async fn get_price_feeds_with_update_data<S>(
store: &S,
state: &S,
price_ids: Vec<PriceIdentifier>,
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>
@ -339,22 +286,22 @@ where
S: CacheStore,
S: Benchmarks,
{
match get_verified_price_feeds(store, price_ids.clone(), request_time.clone()).await {
match get_verified_price_feeds(state, price_ids.clone(), request_time.clone()).await {
Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data),
Err(e) => {
if let RequestTime::FirstAfter(publish_time) = request_time {
return Benchmarks::get_verified_price_feeds(store, price_ids, publish_time).await;
return Benchmarks::get_verified_price_feeds(state, price_ids, publish_time).await;
}
Err(e)
}
}
}
pub async fn get_price_feed_ids<S>(store: &S) -> HashSet<PriceIdentifier>
pub async fn get_price_feed_ids<S>(state: &S) -> HashSet<PriceIdentifier>
where
S: CacheStore,
{
store
state
.message_state_keys()
.await
.iter()
@ -362,8 +309,8 @@ where
.collect()
}
pub async fn is_ready(store: &Store) -> bool {
let last_completed_update_at = store.last_completed_update_at.read().await;
pub async fn is_ready(state: &State) -> bool {
let last_completed_update_at = state.last_completed_update_at.read().await;
match last_completed_update_at.as_ref() {
Some(last_completed_update_at) => {
last_completed_update_at.elapsed() < READINESS_STALENESS_THRESHOLD
@ -480,13 +427,13 @@ mod test {
}
}
pub async fn setup_store(cache_size: u64) -> (Arc<Store>, Receiver<()>) {
pub async fn setup_store(cache_size: u64) -> (Arc<State>, Receiver<()>) {
let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
let store = Store::new(update_tx, cache_size, None);
let state = State::new(update_tx, cache_size, None);
// Add an initial guardian set with public key 0
update_guardian_set(
&store,
&state,
0,
GuardianSet {
keys: vec![[0; 20]],
@ -494,24 +441,24 @@ mod test {
)
.await;
(store, update_rx)
(state, update_rx)
}
pub async fn store_multiple_concurrent_valid_updates(store: Arc<Store>, updates: Vec<Update>) {
let res = join_all(updates.into_iter().map(|u| store_update(&store, u))).await;
pub async fn store_multiple_concurrent_valid_updates(state: Arc<State>, updates: Vec<Update>) {
let res = join_all(updates.into_iter().map(|u| store_update(&state, u))).await;
// Check that all store_update calls succeeded
assert!(res.into_iter().all(|r| r.is_ok()));
}
#[tokio::test]
pub async fn test_store_works() {
let (store, mut update_rx) = setup_store(10).await;
let (state, mut update_rx) = setup_store(10).await;
let price_feed_message = create_dummy_price_feed_message(100, 10, 9);
// Populate the store
// Populate the state
store_multiple_concurrent_valid_updates(
store.clone(),
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_message)], 10, 20),
)
.await;
@ -521,14 +468,14 @@ mod test {
// Check the price ids are stored correctly
assert_eq!(
get_price_feed_ids(&*store).await,
get_price_feed_ids(&*state).await,
vec![PriceIdentifier::new([100; 32])].into_iter().collect()
);
// Check get_price_feeds_with_update_data retrieves the correct
// price feed with correct update data.
let price_feeds_with_update_data = get_price_feeds_with_update_data(
&store,
&state,
vec![PriceIdentifier::new([100; 32])],
RequestTime::Latest,
)
@ -616,10 +563,10 @@ mod test {
#[tokio::test]
pub async fn test_metadata_times_and_readiness_work() {
// The receiver channel should stay open for the store to work
// The receiver channel should stay open for the state to work
// properly. That is why we don't use _ here as it drops the channel
// immediately.
let (store, _receiver_tx) = setup_store(10).await;
let (state, _receiver_tx) = setup_store(10).await;
let price_feed_message = create_dummy_price_feed_message(100, 10, 9);
@ -635,9 +582,9 @@ mod test {
.unwrap()
.as_secs();
// Populate the store
// Populate the state
store_multiple_concurrent_valid_updates(
store.clone(),
state.clone(),
generate_update(vec![Message::PriceFeedMessage(price_feed_message)], 10, 20),
)
.await;
@ -648,7 +595,7 @@ mod test {
// Get the price feeds with update data
let price_feeds_with_update_data = get_price_feeds_with_update_data(
&store,
&state,
vec![PriceIdentifier::new([100; 32])],
RequestTime::Latest,
)
@ -662,28 +609,28 @@ mod test {
Some(unix_timestamp as i64)
);
// Check the store is ready
assert!(is_ready(&store).await);
// Check the state is ready
assert!(is_ready(&state).await);
// Advance the clock to make the prices stale
MockClock::advance_system_time(READINESS_STALENESS_THRESHOLD);
MockClock::advance(READINESS_STALENESS_THRESHOLD);
// Check the store is not ready
assert!(!is_ready(&store).await);
// Check the state is not ready
assert!(!is_ready(&state).await);
}
/// Test that the store retains the latest slots upon cache eviction.
/// Test that the state retains the latest slots upon cache eviction.
///
/// Store is set up with cache size of 100 and 1000 slot updates will
/// state is set up with cache size of 100 and 1000 slot updates will
/// be stored all at the same time with random order.
/// After the cache eviction, the store should retain the latest 100
/// After the cache eviction, the state should retain the latest 100
/// slots regardless of the order.
#[tokio::test]
pub async fn test_store_retains_latest_slots_upon_cache_eviction() {
// The receiver channel should stay open for the store to work
// properly. That is why we don't use _ here as it drops the channel
// immediately.
let (store, _receiver_tx) = setup_store(100).await;
let (state, _receiver_tx) = setup_store(100).await;
let mut updates: Vec<Update> = (0..1000)
.flat_map(|slot| {
@ -708,12 +655,12 @@ mod test {
updates.shuffle(&mut rng);
// Store the updates
store_multiple_concurrent_valid_updates(store.clone(), updates).await;
store_multiple_concurrent_valid_updates(state.clone(), updates).await;
// Check the last 100 slots are retained
for slot in 900..1000 {
let price_feeds_with_update_data = get_price_feeds_with_update_data(
&store,
&state,
vec![
PriceIdentifier::new([100; 32]),
PriceIdentifier::new([200; 32]),
@ -730,7 +677,7 @@ mod test {
// Check nothing else is retained
for slot in 0..900 {
assert!(get_price_feeds_with_update_data(
&store,
&state,
vec![
PriceIdentifier::new([100; 32]),
PriceIdentifier::new([200; 32]),

View File

@ -1,14 +1,14 @@
use {
crate::store::{
cache::{
CacheStore,
MessageState,
},
types::{
crate::{
aggregate::types::{
AccumulatorMessages,
RawMessage,
Slot,
},
state::cache::{
CacheStore,
MessageState,
},
},
anyhow::{
anyhow,

View File

@ -1,5 +1,5 @@
use {
super::Store,
super::State,
anyhow::{
anyhow,
Result,
@ -74,12 +74,12 @@ pub struct GuardianSetData {
/// Verifies a VAA to ensure it is signed by the Wormhole guardian set.
pub async fn verify_vaa<'a>(
store: &Store,
state: &State,
vaa: Vaa<&'a RawMessage>,
) -> Result<Vaa<&'a RawMessage>> {
let (header, body): (Header, Body<&RawMessage>) = vaa.into();
let digest = body.digest()?;
let guardian_set = store.guardian_set.read().await;
let guardian_set = state.guardian_set.read().await;
let guardian_set = guardian_set
.get(&header.guardian_set_index)
.ok_or_else(|| {

View File

@ -2,7 +2,7 @@ use {
self::ws::notify_updates,
crate::{
config::RunOptions,
store::Store,
state::State,
},
anyhow::Result,
axum::{
@ -29,15 +29,15 @@ mod types;
mod ws;
#[derive(Clone)]
pub struct State {
pub store: Arc<Store>,
pub struct ApiState {
pub state: Arc<State>,
pub ws: Arc<ws::WsState>,
}
impl State {
pub fn new(store: Arc<Store>) -> Self {
impl ApiState {
pub fn new(state: Arc<State>) -> Self {
Self {
store,
state,
ws: Arc::new(ws::WsState::new()),
}
}
@ -47,8 +47,8 @@ impl State {
///
/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
/// packages they are based on (tokio & hyper).
#[tracing::instrument(skip(opts, store, update_rx))]
pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()>) -> Result<()> {
#[tracing::instrument(skip(opts, state, update_rx))]
pub async fn run(opts: RunOptions, state: Arc<State>, mut update_rx: Receiver<()>) -> Result<()> {
tracing::info!(endpoint = %opts.api_addr, "Starting RPC Server.");
#[derive(OpenApi)]
@ -79,7 +79,7 @@ pub async fn run(opts: RunOptions, store: Arc<Store>, mut update_rx: Receiver<()
)]
struct ApiDoc;
let state = State::new(store);
let state = ApiState::new(state);
// Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
// `with_state` method which replaces `Body` with `State` in the type signature.

View File

@ -1,5 +1,9 @@
use {
crate::{
aggregate::types::{
RequestTime,
UnixTimestamp,
},
api::{
rest::RestError,
types::{
@ -8,10 +12,6 @@ use {
},
},
doc_examples,
store::types::{
RequestTime,
UnixTimestamp,
},
},
anyhow::Result,
axum::{
@ -60,13 +60,13 @@ pub struct GetPriceFeedQueryParams {
)
)]
pub async fn get_price_feed(
State(state): State<crate::api::State>,
State(state): State<crate::api::ApiState>,
QsQuery(params): QsQuery<GetPriceFeedQueryParams>,
) -> Result<Json<RpcPriceFeed>, RestError> {
let price_id: PriceIdentifier = params.id.into();
let price_feeds_with_update_data = crate::store::get_price_feeds_with_update_data(
&*state.store,
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
&*state.state,
vec![price_id],
RequestTime::FirstAfter(params.publish_time),
)

View File

@ -1,17 +1,17 @@
use {
crate::{
api::{
rest::RestError,
types::PriceIdInput,
},
doc_examples,
store::{
aggregate::{
self,
types::{
RequestTime,
UnixTimestamp,
},
},
api::{
rest::RestError,
types::PriceIdInput,
},
doc_examples,
},
anyhow::Result,
axum::{
@ -70,13 +70,13 @@ pub struct GetVaaResponse {
)
)]
pub async fn get_vaa(
State(state): State<crate::api::State>,
State(state): State<crate::api::ApiState>,
QsQuery(params): QsQuery<GetVaaQueryParams>,
) -> Result<Json<GetVaaResponse>, RestError> {
let price_id: PriceIdentifier = params.id.into();
let price_feeds_with_update_data = store::get_price_feeds_with_update_data(
&*state.store,
let price_feeds_with_update_data = aggregate::get_price_feeds_with_update_data(
&*state.state,
vec![price_id],
RequestTime::FirstAfter(params.publish_time),
)

View File

@ -1,11 +1,11 @@
use {
crate::{
api::rest::RestError,
impl_deserialize_for_hex_string_wrapper,
store::types::{
aggregate::types::{
RequestTime,
UnixTimestamp,
},
api::rest::RestError,
impl_deserialize_for_hex_string_wrapper,
},
anyhow::Result,
axum::{
@ -54,7 +54,7 @@ pub struct GetVaaCcipResponse {
)
)]
pub async fn get_vaa_ccip(
State(state): State<crate::api::State>,
State(state): State<crate::api::ApiState>,
QsQuery(params): QsQuery<GetVaaCcipQueryParams>,
) -> Result<Json<GetVaaCcipResponse>, RestError> {
let price_id: PriceIdentifier = PriceIdentifier::new(
@ -68,8 +68,8 @@ pub async fn get_vaa_ccip(
.map_err(|_| RestError::InvalidCCIPInput)?,
);
let price_feeds_with_update_data = crate::store::get_price_feeds_with_update_data(
&*state.store,
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
&*state.state,
vec![price_id],
RequestTime::FirstAfter(publish_time),
)

View File

@ -1,5 +1,6 @@
use {
crate::{
aggregate::types::RequestTime,
api::{
rest::RestError,
types::{
@ -7,7 +8,6 @@ use {
RpcPriceFeed,
},
},
store::types::RequestTime,
},
anyhow::Result,
axum::{
@ -59,12 +59,12 @@ pub struct LatestPriceFeedsQueryParams {
)
)]
pub async fn latest_price_feeds(
State(state): State<crate::api::State>,
State(state): State<crate::api::ApiState>,
QsQuery(params): QsQuery<LatestPriceFeedsQueryParams>,
) -> Result<Json<Vec<RpcPriceFeed>>, RestError> {
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
let price_feeds_with_update_data = crate::store::get_price_feeds_with_update_data(
&*state.store,
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
&*state.state,
price_ids,
RequestTime::Latest,
)

View File

@ -1,11 +1,11 @@
use {
crate::{
aggregate::types::RequestTime,
api::{
rest::RestError,
types::PriceIdInput,
},
doc_examples,
store::types::RequestTime,
},
anyhow::Result,
axum::{
@ -54,12 +54,12 @@ pub struct LatestVaasQueryParams {
),
)]
pub async fn latest_vaas(
State(state): State<crate::api::State>,
State(state): State<crate::api::ApiState>,
QsQuery(params): QsQuery<LatestVaasQueryParams>,
) -> Result<Json<Vec<String>>, RestError> {
let price_ids: Vec<PriceIdentifier> = params.ids.into_iter().map(|id| id.into()).collect();
let price_feeds_with_update_data = crate::store::get_price_feeds_with_update_data(
&*state.store,
let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data(
&*state.state,
price_ids,
RequestTime::Latest,
)

View File

@ -22,9 +22,9 @@ use {
),
)]
pub async fn price_feed_ids(
State(state): State<crate::api::State>,
State(state): State<crate::api::ApiState>,
) -> Result<Json<Vec<RpcPriceIdentifier>>, RestError> {
let price_feed_ids = crate::store::get_price_feed_ids(&*state.store)
let price_feed_ids = crate::aggregate::get_price_feed_ids(&*state.state)
.await
.iter()
.map(RpcPriceIdentifier::from)

View File

@ -7,8 +7,8 @@ use axum::{
},
};
pub async fn ready(State(state): State<crate::api::State>) -> Response {
match crate::store::is_ready(&state.store).await {
pub async fn ready(State(state): State<crate::api::ApiState>) -> Response {
match crate::aggregate::is_ready(&state.state).await {
true => (StatusCode::OK, "OK").into_response(),
false => (StatusCode::SERVICE_UNAVAILABLE, "Service Unavailable").into_response(),
}

View File

@ -1,12 +1,12 @@
use {
crate::{
doc_examples,
impl_deserialize_for_hex_string_wrapper,
store::types::{
aggregate::types::{
PriceFeedUpdate,
Slot,
UnixTimestamp,
},
doc_examples,
impl_deserialize_for_hex_string_wrapper,
},
base64::{
engine::general_purpose::STANDARD as base64_standard_engine,

View File

@ -3,9 +3,9 @@ use {
PriceIdInput,
RpcPriceFeed,
},
crate::store::{
types::RequestTime,
Store,
crate::{
aggregate::types::RequestTime,
state::State,
},
anyhow::{
anyhow,
@ -18,7 +18,7 @@ use {
WebSocket,
WebSocketUpgrade,
},
State,
State as AxumState,
},
response::IntoResponse,
},
@ -56,13 +56,13 @@ pub const NOTIFICATIONS_CHAN_LEN: usize = 1000;
pub async fn ws_route_handler(
ws: WebSocketUpgrade,
State(state): State<super::State>,
AxumState(state): AxumState<super::ApiState>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| websocket_handler(socket, state))
}
#[tracing::instrument(skip(stream, state))]
async fn websocket_handler(stream: WebSocket, state: super::State) {
async fn websocket_handler(stream: WebSocket, state: super::ApiState) {
let ws_state = state.ws.clone();
let id = ws_state.subscriber_counter.fetch_add(1, Ordering::SeqCst);
tracing::debug!(id, "New Websocket Connection");
@ -70,7 +70,7 @@ async fn websocket_handler(stream: WebSocket, state: super::State) {
let (notify_sender, notify_receiver) = mpsc::channel::<()>(NOTIFICATIONS_CHAN_LEN);
let (sender, receiver) = stream.split();
let mut subscriber =
Subscriber::new(id, state.store.clone(), notify_receiver, receiver, sender);
Subscriber::new(id, state.state.clone(), notify_receiver, receiver, sender);
ws_state.subscribers.insert(id, notify_sender);
subscriber.run().await;
@ -83,7 +83,7 @@ pub type SubscriberId = usize;
pub struct Subscriber {
id: SubscriberId,
closed: bool,
store: Arc<Store>,
store: Arc<State>,
notify_receiver: mpsc::Receiver<()>,
receiver: SplitStream<WebSocket>,
sender: SplitSink<WebSocket, Message>,
@ -95,7 +95,7 @@ pub struct Subscriber {
impl Subscriber {
pub fn new(
id: SubscriberId,
store: Arc<Store>,
store: Arc<State>,
notify_receiver: mpsc::Receiver<()>,
receiver: SplitStream<WebSocket>,
sender: SplitSink<WebSocket, Message>,
@ -149,7 +149,7 @@ impl Subscriber {
async fn handle_price_feeds_update(&mut self) -> Result<()> {
let price_feed_ids = self.price_feeds_with_config.keys().cloned().collect();
for update in crate::store::get_price_feeds_with_update_data(
for update in crate::aggregate::get_price_feeds_with_update_data(
&*self.store,
price_feed_ids,
RequestTime::Latest,
@ -233,7 +233,7 @@ impl Subscriber {
binary,
}) => {
let price_ids: Vec<PriceIdentifier> = ids.into_iter().map(|id| id.into()).collect();
let available_price_ids = crate::store::get_price_feed_ids(&*self.store).await;
let available_price_ids = crate::aggregate::get_price_feed_ids(&*self.store).await;
let not_found_price_ids: Vec<&PriceIdentifier> = price_ids
.iter()

View File

@ -1,4 +1,4 @@
use crate::store::types::UnixTimestamp;
use crate::aggregate::types::UnixTimestamp;
// Example values for the utoipa API docs.
// Note that each of these expressions is only evaluated once when the documentation is created,

View File

@ -2,7 +2,7 @@
#![feature(btree_cursors)]
use {
crate::store::Store,
crate::state::State,
anyhow::Result,
futures::future::join_all,
std::{
@ -13,12 +13,13 @@ use {
tokio::spawn,
};
mod aggregate;
mod api;
mod config;
mod doc_examples;
mod macros;
mod network;
mod store;
mod state;
// A static exit flag to indicate to running threads that we're shutting down. This is used to
// gracefully shutdown the application.
@ -43,7 +44,7 @@ async fn init() -> Result<()> {
let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
// Initialize a cache store with a 1000 element circular buffer.
let store = Store::new(update_tx.clone(), 1000, opts.benchmarks_endpoint.clone());
let store = State::new(update_tx.clone(), 1000, opts.benchmarks_endpoint.clone());
// Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. We
// also send off any notifications needed to close off any waiting tasks.

View File

@ -11,11 +11,9 @@
use {
crate::{
aggregate::types::Update,
config::RunOptions,
store::{
types::Update,
Store,
},
state::State,
},
anyhow::Result,
libp2p::Multiaddr,
@ -176,7 +174,7 @@ pub fn bootstrap(
// Spawn's the P2P layer as a separate thread via Go.
#[tracing::instrument(skip(opts, store))]
pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
pub async fn spawn(opts: RunOptions, store: Arc<State>) -> Result<()> {
tracing::info!(listeners = ?opts.wh_listen_addrs, "Starting P2P Server");
std::thread::spawn(|| {
@ -213,7 +211,7 @@ pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
let store = store.clone();
tokio::spawn(async move {
if let Err(e) = crate::store::store_update(&store, Update::Vaa(vaa)).await {
if let Err(e) = crate::aggregate::store_update(&store, Update::Vaa(vaa)).await {
tracing::error!(error = ?e, "Failed to process VAA.");
}
});

View File

@ -4,8 +4,7 @@
use {
crate::{
config::RunOptions,
store::{
aggregate::{
types::{
AccumulatorMessages,
Update,
@ -15,8 +14,9 @@ use {
GuardianSet,
GuardianSetData,
},
Store,
},
config::RunOptions,
state::State,
},
anyhow::{
anyhow,
@ -128,7 +128,7 @@ async fn fetch_bridge_data(
}
}
pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<()> {
pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<()> {
let client = PubsubClient::new(pythnet_ws_endpoint.as_ref()).await?;
let config = RpcProgramAccountsConfig {
@ -175,7 +175,7 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<()> {
if candidate.to_string() == update.value.pubkey {
let store = store.clone();
tokio::spawn(async move {
if let Err(err) = crate::store::store_update(
if let Err(err) = crate::aggregate::store_update(
&store,
Update::AccumulatorMessages(accumulator_messages),
)
@ -213,7 +213,7 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<()> {
/// sets from a deployed Wormhole contract. Note that we only fetch the last two accounts due to
/// the fact that during a Wormhole upgrade, there will only be messages produces from those two.
async fn fetch_existing_guardian_sets(
store: Arc<Store>,
store: Arc<State>,
pythnet_http_endpoint: String,
wormhole_contract_addr: Pubkey,
) -> Result<()> {
@ -230,7 +230,7 @@ async fn fetch_existing_guardian_sets(
"Retrieved Current GuardianSet.",
);
crate::store::update_guardian_set(&store, bridge.guardian_set_index, current).await;
crate::aggregate::update_guardian_set(&store, bridge.guardian_set_index, current).await;
// If there are more than one guardian set, we want to fetch the previous one as well as it
// may still be in transition phase if a guardian upgrade has just occurred.
@ -248,14 +248,15 @@ async fn fetch_existing_guardian_sets(
"Retrieved Previous GuardianSet.",
);
crate::store::update_guardian_set(&store, bridge.guardian_set_index - 1, previous).await;
crate::aggregate::update_guardian_set(&store, bridge.guardian_set_index - 1, previous)
.await;
}
Ok(())
}
#[tracing::instrument(skip(opts, store))]
pub async fn spawn(opts: RunOptions, store: Arc<Store>) -> Result<()> {
pub async fn spawn(opts: RunOptions, store: Arc<State>) -> Result<()> {
tracing::info!(
endpoint = opts.pythnet_ws_endpoint,
"Started Pythnet Listener."

66
hermes/src/state.rs Normal file
View File

@ -0,0 +1,66 @@
#[cfg(test)]
use mock_instant::{
Instant,
SystemTime,
UNIX_EPOCH,
};
#[cfg(not(test))]
use std::time::Instant;
use {
self::cache::Cache,
crate::aggregate::wormhole::GuardianSet,
reqwest::Url,
std::{
collections::{
BTreeMap,
BTreeSet,
},
sync::Arc,
},
tokio::sync::{
mpsc::Sender,
RwLock,
},
};
pub mod benchmarks;
pub mod cache;
pub struct State {
/// Storage is a short-lived cache of the state of all the updates that have been passed to the
/// store.
pub cache: Cache,
/// Sequence numbers of lately observed Vaas. Store uses this set
/// to ignore the previously observed Vaas as a performance boost.
pub observed_vaa_seqs: RwLock<BTreeSet<u64>>,
/// Wormhole guardian sets. It is used to verify Vaas before using them.
pub guardian_set: RwLock<BTreeMap<u32, GuardianSet>>,
/// The sender to the channel between Store and Api to notify completed updates.
pub update_tx: Sender<()>,
/// Time of the last completed update. This is used for the health probes.
pub last_completed_update_at: RwLock<Option<Instant>>,
/// Benchmarks endpoint
pub benchmarks_endpoint: Option<Url>,
}
impl State {
pub fn new(
update_tx: Sender<()>,
cache_size: u64,
benchmarks_endpoint: Option<Url>,
) -> Arc<Self> {
Arc::new(Self {
cache: Cache::new(cache_size),
observed_vaa_seqs: RwLock::new(Default::default()),
guardian_set: RwLock::new(Default::default()),
update_tx,
last_completed_update_at: RwLock::new(None),
benchmarks_endpoint,
})
}
}

View File

@ -1,7 +1,7 @@
//! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates.
use {
super::types::{
crate::aggregate::types::{
PriceFeedUpdate,
PriceFeedsWithUpdateData,
UnixTimestamp,
@ -85,7 +85,7 @@ pub trait Benchmarks {
}
#[async_trait::async_trait]
impl Benchmarks for crate::store::Store {
impl Benchmarks for crate::state::State {
async fn get_verified_price_feeds(
&self,
price_ids: Vec<PriceIdentifier>,

View File

@ -1,5 +1,5 @@
use {
super::{
crate::aggregate::{
proof::wormhole_merkle::WormholeMerkleState,
types::{
AccumulatorMessages,
@ -178,7 +178,7 @@ impl Cache {
}
#[async_trait::async_trait]
impl CacheStore for crate::store::Store {
impl CacheStore for crate::state::State {
async fn message_state_keys(&self) -> Vec<MessageStateKey> {
self.cache
.message_cache
@ -272,7 +272,7 @@ impl CacheStore for crate::store::Store {
mod test {
use {
super::*,
crate::store::{
crate::aggregate::{
proof::wormhole_merkle::{
WormholeMerkleMessageProof,
WormholeMerkleState,