refactor(hermes): move wormhole out of aggregate

This commit is contained in:
Ali Behjati 2023-09-15 14:17:11 +02:00
parent 09e2b17d1c
commit 3a9df67ef2
19 changed files with 235 additions and 252 deletions

View File

@ -11,46 +11,30 @@ use std::time::{
UNIX_EPOCH,
};
use {
self::{
proof::wormhole_merkle::{
construct_update_data,
WormholeMerkleState,
},
types::{
AccumulatorMessages,
PriceFeedUpdate,
PriceFeedsWithUpdateData,
RequestTime,
Update,
},
wormhole::GuardianSet,
self::wormhole_merkle::{
construct_message_states_proofs,
construct_update_data,
store_wormhole_merkle_verified_message,
WormholeMerkleMessageProof,
WormholeMerkleState,
},
crate::{
aggregate::{
proof::wormhole_merkle::{
construct_message_states_proofs,
store_wormhole_merkle_verified_message,
},
types::{
ProofSet,
UnixTimestamp,
},
wormhole::verify_vaa,
},
state::{
benchmarks::Benchmarks,
cache::{
CacheStore,
AggregateCache,
MessageState,
MessageStateFilter,
},
State,
},
wormhole::VaaBytes,
},
anyhow::{
anyhow,
Result,
},
borsh::BorshDeserialize,
byteorder::BigEndian,
pyth_sdk::{
Price,
@ -77,11 +61,68 @@ use {
wormhole_sdk::Vaa,
};
pub mod proof;
pub mod types;
pub mod wormhole;
pub mod wormhole_merkle;
#[derive(Clone, PartialEq, Debug)]
pub struct ProofSet {
pub wormhole_merkle_proof: WormholeMerkleMessageProof,
}
pub type Slot = u64;
/// The number of seconds since the Unix epoch (00:00:00 UTC on 1 Jan 1970). The timestamp is
/// always positive, but represented as a signed integer because that's the standard on Unix
/// systems and allows easy subtraction to compute durations.
pub type UnixTimestamp = i64;
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum RequestTime {
Latest,
FirstAfter(UnixTimestamp),
}
pub type RawMessage = Vec<u8>;
/// Accumulator messages coming from Pythnet validators.
///
/// The validators writes the accumulator messages using Borsh with
/// the following struct. We cannot directly have messages as Vec<Messages>
/// because they are serialized using big-endian byte order and Borsh
/// uses little-endian byte order.
#[derive(Clone, PartialEq, Debug, BorshDeserialize)]
pub struct AccumulatorMessages {
pub magic: [u8; 4],
pub slot: u64,
pub ring_size: u32,
pub raw_messages: Vec<RawMessage>,
}
impl AccumulatorMessages {
pub fn ring_index(&self) -> u32 {
(self.slot % self.ring_size as u64) as u32
}
}
#[derive(Debug)]
pub enum Update {
Vaa(VaaBytes),
AccumulatorMessages(AccumulatorMessages),
}
#[derive(Debug, PartialEq)]
pub struct PriceFeedUpdate {
pub price_feed: PriceFeed,
pub slot: Option<Slot>,
pub received_at: Option<UnixTimestamp>,
pub update_data: Option<Vec<u8>>,
}
#[derive(Debug, PartialEq)]
pub struct PriceFeedsWithUpdateData {
pub price_feeds: Vec<PriceFeedUpdate>,
pub update_data: Vec<Vec<u8>>,
}
const OBSERVED_CACHE_SIZE: usize = 1000;
const READINESS_STALENESS_THRESHOLD: Duration = Duration::from_secs(30);
/// Stores the update data in the store
@ -91,34 +132,9 @@ pub async fn store_update(state: &State, update: Update) -> Result<()> {
// in all the updates.
let slot = match update {
Update::Vaa(update_vaa) => {
// FIXME: Move to wormhole.rs
let vaa = serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(&update_vaa)?;
if state.observed_vaa_seqs.read().await.contains(&vaa.sequence) {
return Ok(()); // Ignore VAA if we have already seen it
}
let vaa = verify_vaa(state, vaa).await;
let vaa = match vaa {
Ok(vaa) => vaa,
Err(err) => {
tracing::warn!(error = ?err, "Ignoring invalid VAA.");
return Ok(());
}
};
{
let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await;
if observed_vaa_seqs.contains(&vaa.sequence) {
return Ok(()); // Ignore VAA if we have already seen it
}
observed_vaa_seqs.insert(vaa.sequence);
while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
observed_vaa_seqs.pop_first();
}
}
let vaa = serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(
update_vaa.as_ref(),
)?;
match WormholeMessage::try_from_bytes(vaa.payload)?.payload {
WormholePayload::Merkle(proof) => {
tracing::info!(slot = proof.slot, "Storing VAA Merkle Proof.");
@ -213,18 +229,13 @@ async fn build_message_states(
Ok(())
}
pub async fn update_guardian_set(state: &State, id: u32, guardian_set: GuardianSet) {
let mut guardian_sets = state.guardian_set.write().await;
guardian_sets.insert(id, guardian_set);
}
async fn get_verified_price_feeds<S>(
state: &S,
price_ids: Vec<PriceIdentifier>,
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>
where
S: CacheStore,
S: AggregateCache,
{
let messages = state
.fetch_message_states(
@ -283,7 +294,7 @@ pub async fn get_price_feeds_with_update_data<S>(
request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData>
where
S: CacheStore,
S: AggregateCache,
S: Benchmarks,
{
match get_verified_price_feeds(state, price_ids.clone(), request_time.clone()).await {
@ -299,7 +310,7 @@ where
pub async fn get_price_feed_ids<S>(state: &S) -> HashSet<PriceIdentifier>
where
S: CacheStore,
S: AggregateCache,
{
state
.message_state_keys()

View File

@ -1 +0,0 @@
pub mod wormhole_merkle;

View File

@ -1,66 +0,0 @@
use {
super::proof::wormhole_merkle::WormholeMerkleMessageProof,
crate::network::p2p::Vaa,
borsh::BorshDeserialize,
pyth_sdk::PriceFeed,
};
#[derive(Clone, PartialEq, Debug)]
pub struct ProofSet {
pub wormhole_merkle_proof: WormholeMerkleMessageProof,
}
pub type Slot = u64;
/// The number of seconds since the Unix epoch (00:00:00 UTC on 1 Jan 1970). The timestamp is
/// always positive, but represented as a signed integer because that's the standard on Unix
/// systems and allows easy subtraction to compute durations.
pub type UnixTimestamp = i64;
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum RequestTime {
Latest,
FirstAfter(UnixTimestamp),
}
pub type RawMessage = Vec<u8>;
/// Accumulator messages coming from Pythnet validators.
///
/// The validators writes the accumulator messages using Borsh with
/// the following struct. We cannot directly have messages as Vec<Messages>
/// because they are serialized using big-endian byte order and Borsh
/// uses little-endian byte order.
#[derive(Clone, PartialEq, Debug, BorshDeserialize)]
pub struct AccumulatorMessages {
pub magic: [u8; 4],
pub slot: u64,
pub ring_size: u32,
pub raw_messages: Vec<RawMessage>,
}
impl AccumulatorMessages {
pub fn ring_index(&self) -> u32 {
(self.slot % self.ring_size as u64) as u32
}
}
#[derive(Debug)]
pub enum Update {
Vaa(Vaa),
AccumulatorMessages(AccumulatorMessages),
}
#[derive(Debug, PartialEq)]
pub struct PriceFeedUpdate {
pub price_feed: PriceFeed,
pub slot: Option<Slot>,
pub received_at: Option<UnixTimestamp>,
pub update_data: Option<Vec<u8>>,
}
#[derive(Debug, PartialEq)]
pub struct PriceFeedsWithUpdateData {
pub price_feeds: Vec<PriceFeedUpdate>,
pub update_data: Vec<Vec<u8>>,
}

View File

@ -1,14 +1,15 @@
use {
super::{
AccumulatorMessages,
RawMessage,
Slot,
},
crate::{
aggregate::types::{
AccumulatorMessages,
RawMessage,
Slot,
},
state::cache::{
CacheStore,
AggregateCache,
MessageState,
},
wormhole::VaaBytes,
},
anyhow::{
anyhow,
@ -39,18 +40,16 @@ use {
// u8 in the wire format. So, we can't have more than 255 messages.
pub const MAX_MESSAGE_IN_SINGLE_UPDATE_DATA: usize = 255;
pub type Vaa = Vec<u8>;
#[derive(Clone, PartialEq, Debug)]
pub struct WormholeMerkleState {
pub root: WormholeMerkleRoot,
pub vaa: Vaa,
pub vaa: VaaBytes,
}
#[derive(Clone, PartialEq, Debug)]
pub struct WormholeMerkleMessageProof {
pub proof: MerklePath<Keccak160>,
pub vaa: Vaa,
pub vaa: VaaBytes,
}
#[derive(Clone, PartialEq, Debug)]
@ -73,10 +72,10 @@ impl From<MessageState> for RawMessageWithMerkleProof {
pub async fn store_wormhole_merkle_verified_message<S>(
store: &S,
root: WormholeMerkleRoot,
vaa: Vaa,
vaa: VaaBytes,
) -> Result<()>
where
S: CacheStore,
S: AggregateCache,
{
store
.store_wormhole_merkle_state(WormholeMerkleState { root, vaa })

View File

@ -1,6 +1,6 @@
use {
crate::{
aggregate::types::{
aggregate::{
RequestTime,
UnixTimestamp,
},

View File

@ -1,11 +1,9 @@
use {
crate::{
aggregate::{
self,
types::{
RequestTime,
UnixTimestamp,
},
get_price_feeds_with_update_data,
RequestTime,
UnixTimestamp,
},
api::{
rest::RestError,
@ -75,7 +73,7 @@ pub async fn get_vaa(
) -> Result<Json<GetVaaResponse>, RestError> {
let price_id: PriceIdentifier = params.id.into();
let price_feeds_with_update_data = aggregate::get_price_feeds_with_update_data(
let price_feeds_with_update_data = get_price_feeds_with_update_data(
&*state.state,
vec![price_id],
RequestTime::FirstAfter(params.publish_time),

View File

@ -1,6 +1,6 @@
use {
crate::{
aggregate::types::{
aggregate::{
RequestTime,
UnixTimestamp,
},

View File

@ -1,6 +1,6 @@
use {
crate::{
aggregate::types::RequestTime,
aggregate::RequestTime,
api::{
rest::RestError,
types::{

View File

@ -1,6 +1,6 @@
use {
crate::{
aggregate::types::RequestTime,
aggregate::RequestTime,
api::{
rest::RestError,
types::PriceIdInput,

View File

@ -1,6 +1,6 @@
use {
crate::{
aggregate::types::{
aggregate::{
PriceFeedUpdate,
Slot,
UnixTimestamp,

View File

@ -4,7 +4,7 @@ use {
RpcPriceFeed,
},
crate::{
aggregate::types::RequestTime,
aggregate::RequestTime,
state::State,
},
anyhow::{

View File

@ -1,4 +1,4 @@
use crate::aggregate::types::UnixTimestamp;
use crate::aggregate::UnixTimestamp;
// Example values for the utoipa API docs.
// Note that each of these expressions is only evaluated once when the documentation is created,

View File

@ -20,6 +20,7 @@ mod doc_examples;
mod macros;
mod network;
mod state;
mod wormhole;
// A static exit flag to indicate to running threads that we're shutting down. This is used to
// gracefully shutdown the application.

View File

@ -11,16 +11,15 @@
use {
crate::{
aggregate::types::Update,
config::RunOptions,
state::State,
wormhole::{
forward_vaa,
VaaBytes,
},
},
anyhow::Result,
libp2p::Multiaddr,
pythnet_sdk::wire::v1::{
WormholeMessage,
WormholePayload,
},
std::{
ffi::{
c_char,
@ -38,10 +37,6 @@ use {
},
Mutex,
},
wormhole_sdk::{
Address,
Chain,
},
};
extern "C" {
@ -61,17 +56,15 @@ pub struct ObservationC {
pub vaa_len: usize,
}
pub type Vaa = Vec<u8>;
pub const CHANNEL_SIZE: usize = 1000;
const CHANNEL_SIZE: usize = 1000;
// A Static Channel to pipe the `Observation` from the callback into the local Rust handler for
// observation messages. It has to be static for now because there's no way to capture state in
// the callback passed into Go-land.
lazy_static::lazy_static! {
pub static ref OBSERVATIONS: (
Mutex<Sender<Vaa>>,
Mutex<Receiver<Vaa>>,
Mutex<Sender<VaaBytes>>,
Mutex<Receiver<VaaBytes>>,
) = {
let (tx, rc) = tokio::sync::mpsc::channel(CHANNEL_SIZE);
(Mutex::new(tx), Mutex::new(rc))
@ -86,36 +79,6 @@ extern "C" fn proxy(o: ObservationC) {
// Create a fixed slice from the pointer and length.
let vaa = unsafe { std::slice::from_raw_parts(o.vaa, o.vaa_len) }.to_owned();
// Deserialize VAA to check Creation Time
let deserialized_vaa = {
serde_wormhole::from_slice::<wormhole_sdk::Vaa<&serde_wormhole::RawMessage>>(&vaa)
.map_err(|e| {
tracing::error!(error = ?e, "Failed to deserialize VAA.");
})
.ok()
}
.unwrap();
if deserialized_vaa.emitter_chain != Chain::Pythnet
|| deserialized_vaa.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS)
{
return; // Ignore VAA from other emitters
}
// Get the slot from the VAA.
let slot = match WormholeMessage::try_from_bytes(deserialized_vaa.payload)
.unwrap()
.payload
{
WormholePayload::Merkle(proof) => proof.slot,
};
// Find the observation time for said VAA (which is a unix timestamp) and serialize as a ISO 8601 string.
let vaa_timestamp = deserialized_vaa.timestamp;
let vaa_timestamp = chrono::NaiveDateTime::from_timestamp_opt(vaa_timestamp as i64, 0).unwrap();
let vaa_timestamp = vaa_timestamp.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string();
tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA");
// The chances of the mutex getting poisioned is very low and if it happens there is no way for
// us to recover from it.
if OBSERVATIONS
@ -173,8 +136,8 @@ pub fn bootstrap(
}
// Spawn's the P2P layer as a separate thread via Go.
#[tracing::instrument(skip(opts, store))]
pub async fn spawn(opts: RunOptions, store: Arc<State>) -> Result<()> {
#[tracing::instrument(skip(opts, state))]
pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
tracing::info!(listeners = ?opts.wh_listen_addrs, "Starting P2P Server");
std::thread::spawn(|| {
@ -194,7 +157,7 @@ pub async fn spawn(opts: RunOptions, store: Arc<State>) -> Result<()> {
// Listen in the background for new VAA's from the p2p layer
// and update the state accordingly.
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
let vaa = {
let vaa_bytes = {
let mut observation = OBSERVATIONS.1.lock().await;
match observation.recv().await {
@ -209,12 +172,7 @@ pub async fn spawn(opts: RunOptions, store: Arc<State>) -> Result<()> {
}
};
let store = store.clone();
tokio::spawn(async move {
if let Err(e) = crate::aggregate::store_update(&store, Update::Vaa(vaa)).await {
tracing::error!(error = ?e, "Failed to process VAA.");
}
});
forward_vaa(state.clone(), vaa_bytes).await;
}
tracing::info!("Shutting down P2P server...");

View File

@ -5,18 +5,17 @@
use {
crate::{
aggregate::{
types::{
AccumulatorMessages,
Update,
},
wormhole::{
BridgeData,
GuardianSet,
GuardianSetData,
},
AccumulatorMessages,
Update,
},
config::RunOptions,
state::State,
wormhole::{
update_guardian_set,
BridgeData,
GuardianSet,
GuardianSetData,
},
},
anyhow::{
anyhow,
@ -213,7 +212,7 @@ pub async fn run(store: Arc<State>, pythnet_ws_endpoint: String) -> Result<()> {
/// sets from a deployed Wormhole contract. Note that we only fetch the last two accounts due to
/// the fact that during a Wormhole upgrade, there will only be messages produces from those two.
async fn fetch_existing_guardian_sets(
store: Arc<State>,
state: Arc<State>,
pythnet_http_endpoint: String,
wormhole_contract_addr: Pubkey,
) -> Result<()> {
@ -230,7 +229,7 @@ async fn fetch_existing_guardian_sets(
"Retrieved Current GuardianSet.",
);
crate::aggregate::update_guardian_set(&store, bridge.guardian_set_index, current).await;
update_guardian_set(&state, bridge.guardian_set_index, current).await;
// If there are more than one guardian set, we want to fetch the previous one as well as it
// may still be in transition phase if a guardian upgrade has just occurred.
@ -248,29 +247,28 @@ async fn fetch_existing_guardian_sets(
"Retrieved Previous GuardianSet.",
);
crate::aggregate::update_guardian_set(&store, bridge.guardian_set_index - 1, previous)
.await;
update_guardian_set(&state, bridge.guardian_set_index - 1, previous).await;
}
Ok(())
}
#[tracing::instrument(skip(opts, store))]
pub async fn spawn(opts: RunOptions, store: Arc<State>) -> Result<()> {
#[tracing::instrument(skip(opts, state))]
pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {
tracing::info!(
endpoint = opts.pythnet_ws_endpoint,
"Started Pythnet Listener."
);
fetch_existing_guardian_sets(
store.clone(),
state.clone(),
opts.pythnet_http_endpoint.clone(),
opts.wh_contract_addr,
)
.await?;
let task_listener = {
let store = store.clone();
let store = state.clone();
let pythnet_ws_endpoint = opts.pythnet_ws_endpoint.clone();
tokio::spawn(async move {
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
@ -290,7 +288,7 @@ pub async fn spawn(opts: RunOptions, store: Arc<State>) -> Result<()> {
};
let task_guadian_watcher = {
let store = store.clone();
let store = state.clone();
let pythnet_http_endpoint = opts.pythnet_http_endpoint.clone();
tokio::spawn(async move {
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {

View File

@ -1,3 +1,5 @@
//! This module contains the global state of the application.
#[cfg(test)]
use mock_instant::{
Instant,
@ -8,7 +10,7 @@ use mock_instant::{
use std::time::Instant;
use {
self::cache::Cache,
crate::aggregate::wormhole::GuardianSet,
crate::wormhole::GuardianSet,
reqwest::Url,
std::{
collections::{

View File

@ -1,7 +1,7 @@
//! This module communicates with Pyth Benchmarks, an API for historical price feeds and their updates.
use {
crate::aggregate::types::{
crate::aggregate::{
PriceFeedUpdate,
PriceFeedsWithUpdateData,
UnixTimestamp,

View File

@ -1,14 +1,12 @@
use {
crate::aggregate::{
proof::wormhole_merkle::WormholeMerkleState,
types::{
AccumulatorMessages,
ProofSet,
RawMessage,
RequestTime,
Slot,
UnixTimestamp,
},
wormhole_merkle::WormholeMerkleState,
AccumulatorMessages,
ProofSet,
RawMessage,
RequestTime,
Slot,
UnixTimestamp,
},
anyhow::{
anyhow,
@ -144,8 +142,19 @@ fn retrieve_message_state(
}
}
impl Cache {
pub fn new(cache_size: u64) -> Self {
Self {
message_cache: Arc::new(DashMap::new()),
accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())),
wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())),
cache_size,
}
}
}
#[async_trait::async_trait]
pub trait CacheStore {
pub trait AggregateCache {
async fn message_state_keys(&self) -> Vec<MessageStateKey>;
async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
async fn fetch_message_states(
@ -166,19 +175,8 @@ pub trait CacheStore {
async fn fetch_wormhole_merkle_state(&self, slot: Slot) -> Result<Option<WormholeMerkleState>>;
}
impl Cache {
pub fn new(cache_size: u64) -> Self {
Self {
message_cache: Arc::new(DashMap::new()),
accumulator_messages_cache: Arc::new(RwLock::new(BTreeMap::new())),
wormhole_merkle_state_cache: Arc::new(RwLock::new(BTreeMap::new())),
cache_size,
}
}
}
#[async_trait::async_trait]
impl CacheStore for crate::state::State {
impl AggregateCache for crate::state::State {
async fn message_state_keys(&self) -> Vec<MessageStateKey> {
self.cache
.message_cache

View File

@ -1,9 +1,14 @@
use {
super::State,
crate::aggregate::Update,
anyhow::{
anyhow,
Result,
},
pythnet_sdk::wire::v1::{
WormholeMessage,
WormholePayload,
},
secp256k1::{
ecdsa::{
RecoverableSignature,
@ -17,15 +22,23 @@ use {
Digest,
Keccak256,
},
std::sync::Arc,
tracing::trace,
wormhole_sdk::{
vaa::{
Body,
Header,
},
Address,
Chain,
Vaa,
},
};
pub type VaaBytes = Vec<u8>;
const OBSERVED_CACHE_SIZE: usize = 1000;
#[derive(Eq, PartialEq, Clone, Hash, Debug)]
pub struct GuardianSet {
pub keys: Vec<[u8; 20]>,
@ -138,3 +151,75 @@ pub async fn verify_vaa<'a>(
Ok((header, body).into())
}
/// Update the guardian set with the given ID in the state.
pub async fn update_guardian_set(state: &State, id: u32, guardian_set: GuardianSet) {
let mut guardian_sets = state.guardian_set.write().await;
guardian_sets.insert(id, guardian_set);
}
/// Process a VAA from the Wormhole p2p and aggregate it if it is
/// verified and is new and belongs to the Accumulator.
pub async fn forward_vaa(state: Arc<State>, vaa_bytes: VaaBytes) {
// Deserialize VAA
let vaa = match serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(&vaa_bytes) {
Ok(vaa) => vaa,
Err(e) => {
tracing::error!(error = ?e, "Failed to deserialize VAA.");
return;
}
};
if vaa.emitter_chain != Chain::Pythnet
|| vaa.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS)
{
return; // Ignore VAA from other emitters
}
// Get the slot from the VAA.
let slot = match WormholeMessage::try_from_bytes(vaa.payload)
.unwrap()
.payload
{
WormholePayload::Merkle(proof) => proof.slot,
};
// Find the observation time for said VAA (which is a unix timestamp) and serialize as a ISO 8601 string.
let vaa_timestamp = vaa.timestamp;
let vaa_timestamp = chrono::NaiveDateTime::from_timestamp_opt(vaa_timestamp as i64, 0).unwrap();
let vaa_timestamp = vaa_timestamp.format("%Y-%m-%dT%H:%M:%S.%fZ").to_string();
tracing::info!(slot = slot, vaa_timestamp = vaa_timestamp, "Observed VAA");
if state.observed_vaa_seqs.read().await.contains(&vaa.sequence) {
return; // Ignore VAA if we have already seen it
}
let vaa = match verify_vaa(&state, vaa).await {
Ok(vaa) => vaa,
Err(e) => {
trace!(error = ?e, "Ignoring invalid VAA.");
return;
}
};
{
let mut observed_vaa_seqs = state.observed_vaa_seqs.write().await;
// Check again if we have already seen the VAA. Due to concurrency
// the above check might not catch all the cases.
if observed_vaa_seqs.contains(&vaa.sequence) {
return; // Ignore VAA if we have already seen it
}
observed_vaa_seqs.insert(vaa.sequence);
while observed_vaa_seqs.len() > OBSERVED_CACHE_SIZE {
observed_vaa_seqs.pop_first();
}
}
let state = state.clone();
tokio::spawn(async move {
if let Err(e) = crate::aggregate::store_update(&state, Update::Vaa(vaa_bytes)).await {
tracing::error!(error = ?e, "Failed to process VAA.");
}
});
}