Add vaa cache
This commit is contained in:
parent
04806a551e
commit
02de29624c
|
@ -26,12 +26,13 @@ use {
|
||||||
ProofSet,
|
ProofSet,
|
||||||
UnixTimestamp,
|
UnixTimestamp,
|
||||||
},
|
},
|
||||||
wormhole::parse_and_verify_vaa,
|
wormhole::verify_vaa,
|
||||||
},
|
},
|
||||||
anyhow::{
|
anyhow::{
|
||||||
anyhow,
|
anyhow,
|
||||||
Result,
|
Result,
|
||||||
},
|
},
|
||||||
|
moka::future::Cache,
|
||||||
pyth_oracle::{
|
pyth_oracle::{
|
||||||
Message,
|
Message,
|
||||||
MessageType,
|
MessageType,
|
||||||
|
@ -45,6 +46,7 @@ use {
|
||||||
collections::HashSet,
|
collections::HashSet,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{
|
time::{
|
||||||
|
Duration,
|
||||||
SystemTime,
|
SystemTime,
|
||||||
UNIX_EPOCH,
|
UNIX_EPOCH,
|
||||||
},
|
},
|
||||||
|
@ -57,6 +59,7 @@ use {
|
||||||
Address,
|
Address,
|
||||||
Chain,
|
Chain,
|
||||||
GuardianAddress,
|
GuardianAddress,
|
||||||
|
Vaa,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -66,15 +69,20 @@ pub mod types;
|
||||||
pub mod wormhole;
|
pub mod wormhole;
|
||||||
|
|
||||||
pub struct Store {
|
pub struct Store {
|
||||||
pub storage: StorageInstance,
|
pub storage: StorageInstance,
|
||||||
pub guardian_set: RwLock<Option<Vec<GuardianAddress>>>,
|
pub observed_vaa_seqs: Cache<u64, bool>,
|
||||||
pub update_tx: Sender<()>,
|
pub guardian_set: RwLock<Option<Vec<GuardianAddress>>>,
|
||||||
|
pub update_tx: Sender<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Store {
|
impl Store {
|
||||||
pub fn new_with_local_cache(update_tx: Sender<()>, cache_size: u64) -> Arc<Self> {
|
pub fn new_with_local_cache(update_tx: Sender<()>, cache_size: u64) -> Arc<Self> {
|
||||||
Arc::new(Self {
|
Arc::new(Self {
|
||||||
storage: storage::local_storage::LocalStorage::new_instance(cache_size),
|
storage: storage::local_storage::LocalStorage::new_instance(cache_size),
|
||||||
|
observed_vaa_seqs: Cache::builder()
|
||||||
|
.max_capacity(cache_size)
|
||||||
|
.time_to_live(Duration::from_secs(60 * 5))
|
||||||
|
.build(),
|
||||||
guardian_set: RwLock::new(None),
|
guardian_set: RwLock::new(None),
|
||||||
update_tx,
|
update_tx,
|
||||||
})
|
})
|
||||||
|
@ -84,22 +92,32 @@ impl Store {
|
||||||
pub async fn store_update(&self, update: Update) -> Result<()> {
|
pub async fn store_update(&self, update: Update) -> Result<()> {
|
||||||
let slot = match update {
|
let slot = match update {
|
||||||
Update::Vaa(vaa_bytes) => {
|
Update::Vaa(vaa_bytes) => {
|
||||||
let body = parse_and_verify_vaa(self, &vaa_bytes).await;
|
let vaa =
|
||||||
let body = match body {
|
serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(&vaa_bytes)?;
|
||||||
Ok(body) => body,
|
|
||||||
|
if vaa.emitter_chain != Chain::Pythnet
|
||||||
|
|| vaa.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS)
|
||||||
|
{
|
||||||
|
return Ok(()); // Ignore VAA from other emitters
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.observed_vaa_seqs.get(&vaa.sequence).is_some() {
|
||||||
|
return Ok(()); // Ignore VAA if we have already seen it
|
||||||
|
}
|
||||||
|
|
||||||
|
let vaa = verify_vaa(self, vaa).await;
|
||||||
|
|
||||||
|
let vaa = match vaa {
|
||||||
|
Ok(vaa) => vaa,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
log::info!("Ignoring invalid VAA: {:?}", err);
|
log::info!("Ignoring invalid VAA: {:?}", err);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if body.emitter_chain != Chain::Pythnet
|
self.observed_vaa_seqs.insert(vaa.sequence, true).await;
|
||||||
|| body.emitter_address != Address(pythnet_sdk::ACCUMULATOR_EMITTER_ADDRESS)
|
|
||||||
{
|
|
||||||
return Ok(()); // Ignore VAA from other emitters
|
|
||||||
}
|
|
||||||
|
|
||||||
match WormholeMessage::try_from_bytes(body.payload)?.payload {
|
match WormholeMessage::try_from_bytes(vaa.payload)?.payload {
|
||||||
WormholePayload::Merkle(proof) => {
|
WormholePayload::Merkle(proof) => {
|
||||||
log::info!("Storing merkle proof for slot {:?}", proof.slot,);
|
log::info!("Storing merkle proof for slot {:?}", proof.slot,);
|
||||||
store_wormhole_merkle_verified_message(self, proof.clone(), vaa_bytes)
|
store_wormhole_merkle_verified_message(self, proof.clone(), vaa_bytes)
|
||||||
|
|
|
@ -28,11 +28,10 @@ use {
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Parses and verifies a VAA to ensure it is signed by the Wormhole guardian set.
|
/// Parses and verifies a VAA to ensure it is signed by the Wormhole guardian set.
|
||||||
pub async fn parse_and_verify_vaa<'a>(
|
pub async fn verify_vaa<'a>(
|
||||||
store: &Store,
|
store: &Store,
|
||||||
vaa_bytes: &'a [u8],
|
vaa: Vaa<&'a RawMessage>,
|
||||||
) -> Result<Body<&'a RawMessage>> {
|
) -> Result<Vaa<&'a RawMessage>> {
|
||||||
let vaa = serde_wormhole::from_slice::<Vaa<&serde_wormhole::RawMessage>>(vaa_bytes)?;
|
|
||||||
let (header, body): (Header, Body<&RawMessage>) = vaa.into();
|
let (header, body): (Header, Body<&RawMessage>) = vaa.into();
|
||||||
let digest = body.digest()?;
|
let digest = body.digest()?;
|
||||||
|
|
||||||
|
@ -83,5 +82,5 @@ pub async fn parse_and_verify_vaa<'a>(
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(body)
|
Ok((header, body).into())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue