This commit is contained in:
Reisen 2023-06-01 19:13:45 +02:00 committed by Reisen
parent 065fba29b1
commit 454cd03b90
7 changed files with 178 additions and 123 deletions

View File

@ -65,7 +65,7 @@ impl IntoResponse for RestError {
pub async fn price_feed_ids( pub async fn price_feed_ids(
State(state): State<super::State>, State(state): State<super::State>,
) -> Result<Json<HashSet<PriceIdentifier>>, RestError> { ) -> Result<Json<HashSet<PriceIdentifier>>, RestError> {
let price_feeds = state.store.get_price_feed_ids(); let price_feeds = state.store.get_price_feed_ids().await;
Ok(Json(price_feeds)) Ok(Json(price_feeds))
} }
@ -83,6 +83,7 @@ pub async fn latest_vaas(
let price_feeds_with_update_data = state let price_feeds_with_update_data = state
.store .store
.get_price_feeds_with_update_data(price_ids, RequestTime::Latest) .get_price_feeds_with_update_data(price_ids, RequestTime::Latest)
.await
.map_err(|_| RestError::UpdateDataNotFound)?; .map_err(|_| RestError::UpdateDataNotFound)?;
Ok(Json( Ok(Json(
price_feeds_with_update_data price_feeds_with_update_data
@ -111,6 +112,7 @@ pub async fn latest_price_feeds(
let price_feeds_with_update_data = state let price_feeds_with_update_data = state
.store .store
.get_price_feeds_with_update_data(price_ids, RequestTime::Latest) .get_price_feeds_with_update_data(price_ids, RequestTime::Latest)
.await
.map_err(|_| RestError::UpdateDataNotFound)?; .map_err(|_| RestError::UpdateDataNotFound)?;
Ok(Json( Ok(Json(
price_feeds_with_update_data price_feeds_with_update_data
@ -148,6 +150,7 @@ pub async fn get_vaa(
vec![price_id], vec![price_id],
RequestTime::FirstAfter(params.publish_time), RequestTime::FirstAfter(params.publish_time),
) )
.await
.map_err(|_| RestError::UpdateDataNotFound)?; .map_err(|_| RestError::UpdateDataNotFound)?;
let vaa = price_feeds_with_update_data let vaa = price_feeds_with_update_data
@ -198,6 +201,7 @@ pub async fn get_vaa_ccip(
let price_feeds_with_update_data = state let price_feeds_with_update_data = state
.store .store
.get_price_feeds_with_update_data(vec![price_id], RequestTime::FirstAfter(publish_time)) .get_price_feeds_with_update_data(vec![price_id], RequestTime::FirstAfter(publish_time))
.await
.map_err(|_| RestError::CcipUpdateDataNotFound)?; .map_err(|_| RestError::CcipUpdateDataNotFound)?;
let bytes = price_feeds_with_update_data let bytes = price_feeds_with_update_data

View File

@ -161,31 +161,34 @@ impl Subscriber {
} }
async fn handle_price_feeds_update(&mut self) -> Result<()> { async fn handle_price_feeds_update(&mut self) -> Result<()> {
let messages = self let price_feed_ids = self.price_feeds_with_config.keys().cloned().collect();
.price_feeds_with_config for update in self
.iter() .store
.map(|(price_feed_id, config)| { .get_price_feeds_with_update_data(price_feed_ids, RequestTime::Latest)
let price_feeds_with_update_data = self .await?
.store .price_feeds
.get_price_feeds_with_update_data(vec![*price_feed_id], RequestTime::Latest)?; {
let price_feed = price_feeds_with_update_data let config = self
.price_feeds .price_feeds_with_config
.into_iter() .get(&PriceIdentifier::new(update.price_feed.id))
.next() .ok_or(anyhow::anyhow!(
.ok_or_else(|| { "Config missing, price feed list was poisoned during iteration."
anyhow::anyhow!("Price feed {} not found.", price_feed_id.to_string()) ))?;
})?;
let price_feed =
RpcPriceFeed::from_price_feed_update(price_feed, config.verbose, config.binary);
Ok(Message::Text(serde_json::to_string( self.sender
&ServerMessage::PriceUpdate { price_feed }, .feed(Message::Text(serde_json::to_string(
&ServerMessage::PriceUpdate {
price_feed: RpcPriceFeed::from_price_feed_update(
update,
config.verbose,
config.binary,
),
},
)?)) )?))
}) .await?;
.collect::<Result<Vec<Message>>>()?; }
self.sender
.send_all(&mut iter(messages.into_iter().map(Ok))) self.sender.flush().await?;
.await?;
Ok(()) Ok(())
} }

View File

@ -1,7 +1,10 @@
use { use {
self::{ self::{
proof::wormhole_merkle::construct_update_data, proof::wormhole_merkle::construct_update_data,
storage::StorageInstance, storage::{
MessageStateFilter,
StorageInstance,
},
types::{ types::{
AccumulatorMessages, AccumulatorMessages,
MessageType, MessageType,
@ -17,6 +20,7 @@ use {
construct_message_states_proofs, construct_message_states_proofs,
store_wormhole_merkle_verified_message, store_wormhole_merkle_verified_message,
}, },
storage::AccumulatorState,
types::{ types::{
MessageState, MessageState,
ProofSet, ProofSet,
@ -41,7 +45,6 @@ use {
collections::HashSet, collections::HashSet,
sync::Arc, sync::Arc,
time::{ time::{
Duration,
SystemTime, SystemTime,
UNIX_EPOCH, UNIX_EPOCH,
}, },
@ -62,28 +65,16 @@ pub mod storage;
pub mod types; pub mod types;
pub mod wormhole; pub mod wormhole;
#[derive(Clone, PartialEq, Debug, Builder)]
#[builder(derive(Debug), pattern = "immutable")]
pub struct AccumulatorState {
pub accumulator_messages: AccumulatorMessages,
pub wormhole_merkle_proof: (WormholeMerkleRoot, Vec<u8>),
}
pub struct Store { pub struct Store {
pub storage: StorageInstance, pub storage: StorageInstance,
pub pending_accumulations: Cache<Slot, AccumulatorStateBuilder>, pub guardian_set: RwLock<Option<Vec<GuardianAddress>>>,
pub guardian_set: RwLock<Option<Vec<GuardianAddress>>>, pub update_tx: Sender<()>,
pub update_tx: Sender<()>,
} }
impl Store { impl Store {
pub fn new_with_local_cache(update_tx: Sender<()>, max_size_per_key: usize) -> Arc<Self> { pub fn new_with_local_cache(update_tx: Sender<()>, cache_size: u64) -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
storage: storage::local_storage::LocalStorage::new_instance(max_size_per_key), storage: storage::local_storage::LocalStorage::new_instance(cache_size),
pending_accumulations: Cache::builder()
.max_capacity(10_000)
.time_to_live(Duration::from_secs(60 * 5))
.build(), // FIXME: Make this configurable
guardian_set: RwLock::new(None), guardian_set: RwLock::new(None),
update_tx, update_tx,
}) })
@ -117,44 +108,47 @@ impl Store {
} }
} }
} }
Update::AccumulatorMessages(accumulator_messages) => { Update::AccumulatorMessages(accumulator_messages) => {
let slot = accumulator_messages.slot; let slot = accumulator_messages.slot;
log::info!("Storing accumulator messages for slot {:?}.", slot,); log::info!("Storing accumulator messages for slot {:?}.", slot,);
let mut accumulator_state = self
let pending_acc = self .storage
.pending_accumulations .fetch_accumulator_state(slot)
.entry(slot) .await?
.or_default() .unwrap_or(AccumulatorState {
.await slot,
.into_value(); accumulator_messages: None,
self.pending_accumulations wormhole_merkle_proof: None,
.insert(slot, pending_acc.accumulator_messages(accumulator_messages)) });
.await; accumulator_state.accumulator_messages = Some(accumulator_messages);
self.storage
.store_accumulator_state(accumulator_state)
.await?;
slot slot
} }
}; };
let pending_state = self.pending_accumulations.get(&slot); let state = match self.storage.fetch_accumulator_state(slot).await? {
let pending_state = match pending_state { Some(state) => state,
Some(pending_state) => pending_state,
// Due to some race conditions this might happen when it's processed before
None => return Ok(()), None => return Ok(()),
}; };
let state = match pending_state.build() { let (accumulator_messages, wormhole_merkle_proof) =
Ok(state) => state, match (state.accumulator_messages, state.wormhole_merkle_proof) {
Err(_) => return Ok(()), (Some(accumulator_messages), Some(wormhole_merkle_proof)) => {
}; (accumulator_messages, wormhole_merkle_proof)
}
_ => return Ok(()),
};
let wormhole_merkle_message_states_proofs = construct_message_states_proofs(state.clone())?; let wormhole_merkle_message_states_proofs =
construct_message_states_proofs(&accumulator_messages, &wormhole_merkle_proof)?;
let current_time: UnixTimestamp = let current_time: UnixTimestamp =
SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _; SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() as _;
let message_states = state let message_states = accumulator_messages
.accumulator_messages
.messages .messages
.iter() .iter()
.enumerate() .enumerate()
@ -170,7 +164,7 @@ impl Store {
.ok_or(anyhow!("Missing proof for message"))? .ok_or(anyhow!("Missing proof for message"))?
.clone(), .clone(),
}, },
state.accumulator_messages.slot, accumulator_messages.slot,
current_time, current_time,
)) ))
}) })
@ -178,9 +172,7 @@ impl Store {
log::info!("Message states len: {:?}", message_states.len()); log::info!("Message states len: {:?}", message_states.len());
self.storage.store_message_states(message_states)?; self.storage.store_message_states(message_states).await?;
self.pending_accumulations.invalidate(&slot).await;
self.update_tx.send(()).await?; self.update_tx.send(()).await?;
@ -191,16 +183,19 @@ impl Store {
self.guardian_set.write().await.replace(guardian_set); self.guardian_set.write().await.replace(guardian_set);
} }
pub fn get_price_feeds_with_update_data( pub async fn get_price_feeds_with_update_data(
&self, &self,
price_ids: Vec<PriceIdentifier>, price_ids: Vec<PriceIdentifier>,
request_time: RequestTime, request_time: RequestTime,
) -> Result<PriceFeedsWithUpdateData> { ) -> Result<PriceFeedsWithUpdateData> {
let messages = self.storage.retrieve_message_states( let messages = self
price_ids, .storage
request_time, .fetch_message_states(
Some(&|message_type| *message_type == MessageType::PriceFeedMessage), price_ids,
)?; request_time,
MessageStateFilter::Only(MessageType::PriceFeedMessage),
)
.await?;
let price_feeds = messages let price_feeds = messages
.iter() .iter()
@ -226,7 +221,12 @@ impl Store {
}) })
} }
pub fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier> { pub async fn get_price_feed_ids(&self) -> HashSet<PriceIdentifier> {
self.storage.keys().iter().map(|key| key.price_id).collect() self.storage
.message_state_keys()
.await
.iter()
.map(|key| key.price_id)
.collect()
} }
} }

View File

@ -1,7 +1,10 @@
use { use {
crate::store::{ crate::store::{
types::MessageState, storage::AccumulatorState,
AccumulatorState, types::{
AccumulatorMessages,
MessageState,
},
Store, Store,
}, },
anyhow::{ anyhow::{
@ -40,45 +43,43 @@ pub async fn store_wormhole_merkle_verified_message(
proof: WormholeMerkleRoot, proof: WormholeMerkleRoot,
vaa_bytes: Vec<u8>, vaa_bytes: Vec<u8>,
) -> Result<()> { ) -> Result<()> {
let pending_acc = store let mut accumulator_state = store
.pending_accumulations .storage
.entry(proof.slot) .fetch_accumulator_state(proof.slot)
.or_default() .await?
.await .unwrap_or(AccumulatorState {
.into_value(); slot: proof.slot,
accumulator_messages: None,
wormhole_merkle_proof: None,
});
accumulator_state.wormhole_merkle_proof = Some((proof, vaa_bytes));
store store
.pending_accumulations .storage
.insert( .store_accumulator_state(accumulator_state)
proof.slot, .await?;
pending_acc.wormhole_merkle_proof((proof, vaa_bytes)),
)
.await;
Ok(()) Ok(())
} }
pub fn construct_message_states_proofs( pub fn construct_message_states_proofs(
state: AccumulatorState, accumulator_messages: &AccumulatorMessages,
wormhole_merkle_proof: &(WormholeMerkleRoot, Vec<u8>),
) -> Result<Vec<WormholeMerkleMessageProof>> { ) -> Result<Vec<WormholeMerkleMessageProof>> {
// Check whether the state is valid // Check whether the state is valid
let merkle_acc = match MerkleAccumulator::<Keccak160>::from_set( let merkle_acc = match MerkleAccumulator::<Keccak160>::from_set(
state accumulator_messages.messages.iter().map(|m| m.as_ref()),
.accumulator_messages
.messages
.iter()
.map(|m| m.as_ref()),
) { ) {
Some(merkle_acc) => merkle_acc, Some(merkle_acc) => merkle_acc,
None => return Ok(vec![]), // It only happens when the message set is empty None => return Ok(vec![]), // It only happens when the message set is empty
}; };
let (proof, vaa) = &state.wormhole_merkle_proof; let (proof, vaa) = &wormhole_merkle_proof;
if merkle_acc.root != proof.root { if merkle_acc.root != proof.root {
return Err(anyhow!("Invalid merkle root")); return Err(anyhow!("Invalid merkle root"));
} }
state accumulator_messages
.accumulator_messages
.messages .messages
.iter() .iter()
.map(|m| { .map(|m| {

View File

@ -1,16 +1,33 @@
use { use {
super::types::{ super::types::{
AccumulatorMessages,
MessageIdentifier, MessageIdentifier,
MessageState, MessageState,
MessageType, MessageType,
RequestTime, RequestTime,
Slot,
}, },
anyhow::Result, anyhow::Result,
async_trait::async_trait,
pyth_sdk::PriceIdentifier, pyth_sdk::PriceIdentifier,
pythnet_sdk::wire::v1::WormholeMerkleRoot,
}; };
pub mod local_storage; pub mod local_storage;
#[derive(Clone, PartialEq, Debug)]
pub struct AccumulatorState {
pub slot: Slot,
pub accumulator_messages: Option<AccumulatorMessages>,
pub wormhole_merkle_proof: Option<(WormholeMerkleRoot, Vec<u8>)>,
}
#[derive(Clone, Copy)]
pub enum MessageStateFilter {
All,
Only(MessageType),
}
/// This trait defines the interface for update data storage /// This trait defines the interface for update data storage
/// ///
/// Price update data for Pyth can come in multiple formats, for example VAA's and /// Price update data for Pyth can come in multiple formats, for example VAA's and
@ -18,15 +35,19 @@ pub mod local_storage;
/// data to abstract the details of the update data, and so each update data is stored /// data to abstract the details of the update data, and so each update data is stored
/// under a separate key. The caller is responsible for specifying the right /// under a separate key. The caller is responsible for specifying the right
/// key for the update data they wish to access. /// key for the update data they wish to access.
#[async_trait]
pub trait Storage: Send + Sync { pub trait Storage: Send + Sync {
fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>; async fn message_state_keys(&self) -> Vec<MessageIdentifier>;
fn retrieve_message_states( async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()>;
async fn fetch_message_states(
&self, &self,
ids: Vec<PriceIdentifier>, ids: Vec<PriceIdentifier>,
request_time: RequestTime, request_time: RequestTime,
filter: Option<&dyn Fn(&MessageType) -> bool>, filter: MessageStateFilter,
) -> Result<Vec<MessageState>>; ) -> Result<Vec<MessageState>>;
fn keys(&self) -> Vec<MessageIdentifier>;
async fn store_accumulator_state(&self, state: AccumulatorState) -> Result<()>;
async fn fetch_accumulator_state(&self, slot: u64) -> Result<Option<AccumulatorState>>;
} }
pub type StorageInstance = Box<dyn Storage>; pub type StorageInstance = Box<dyn Storage>;

View File

@ -1,17 +1,24 @@
use { use {
super::{ super::{
AccumulatorState,
MessageIdentifier, MessageIdentifier,
MessageState, MessageState,
MessageStateFilter,
RequestTime, RequestTime,
Storage, Storage,
StorageInstance, StorageInstance,
}, },
crate::store::types::MessageType, crate::store::types::{
MessageType,
Slot,
},
anyhow::{ anyhow::{
anyhow, anyhow,
Result, Result,
}, },
async_trait::async_trait,
dashmap::DashMap, dashmap::DashMap,
moka::sync::Cache,
pyth_sdk::PriceIdentifier, pyth_sdk::PriceIdentifier,
std::{ std::{
collections::VecDeque, collections::VecDeque,
@ -22,15 +29,20 @@ use {
#[derive(Clone)] #[derive(Clone)]
pub struct LocalStorage { pub struct LocalStorage {
cache: Arc<DashMap<MessageIdentifier, VecDeque<MessageState>>>, message_cache: Arc<DashMap<MessageIdentifier, VecDeque<MessageState>>>,
max_size_per_key: usize, accumulator_cache: Cache<Slot, AccumulatorState>,
cache_size: u64,
} }
impl LocalStorage { impl LocalStorage {
pub fn new_instance(max_size_per_key: usize) -> StorageInstance { pub fn new_instance(cache_size: u64) -> StorageInstance {
Box::new(Self { Box::new(Self {
cache: Arc::new(DashMap::new()), message_cache: Arc::new(DashMap::new()),
max_size_per_key, accumulator_cache: Cache::builder()
.max_capacity(cache_size)
.time_to_live(std::time::Duration::from_secs(60 * 60 * 24))
.build(),
cache_size,
}) })
} }
@ -39,7 +51,7 @@ impl LocalStorage {
key: MessageIdentifier, key: MessageIdentifier,
request_time: RequestTime, request_time: RequestTime,
) -> Option<MessageState> { ) -> Option<MessageState> {
match self.cache.get(&key) { match self.message_cache.get(&key) {
Some(key_cache) => { Some(key_cache) => {
match request_time { match request_time {
RequestTime::Latest => key_cache.back().cloned(), RequestTime::Latest => key_cache.back().cloned(),
@ -74,6 +86,7 @@ impl LocalStorage {
} }
} }
#[async_trait]
impl Storage for LocalStorage { impl Storage for LocalStorage {
/// Add a new db entry to the cache. /// Add a new db entry to the cache.
/// ///
@ -81,11 +94,11 @@ impl Storage for LocalStorage {
/// the oldest record in the cache if the max_size is reached. Entries are /// the oldest record in the cache if the max_size is reached. Entries are
/// usually added in increasing order and likely to be inserted near the /// usually added in increasing order and likely to be inserted near the
/// end of the deque. The function is optimized for this specific case. /// end of the deque. The function is optimized for this specific case.
fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> { async fn store_message_states(&self, message_states: Vec<MessageState>) -> Result<()> {
for message_state in message_states { for message_state in message_states {
let key = message_state.key(); let key = message_state.key();
let mut key_cache = self.cache.entry(key).or_insert_with(VecDeque::new); let mut key_cache = self.message_cache.entry(key).or_insert_with(VecDeque::new);
key_cache.push_back(message_state); key_cache.push_back(message_state);
@ -99,7 +112,7 @@ impl Storage for LocalStorage {
// FIXME remove equal elements by key and time // FIXME remove equal elements by key and time
// Remove the oldest record if the max size is reached. // Remove the oldest record if the max size is reached.
if key_cache.len() > self.max_size_per_key { if key_cache.len() > self.cache_size as usize {
key_cache.pop_front(); key_cache.pop_front();
} }
} }
@ -107,20 +120,20 @@ impl Storage for LocalStorage {
Ok(()) Ok(())
} }
fn retrieve_message_states( async fn fetch_message_states(
&self, &self,
ids: Vec<PriceIdentifier>, ids: Vec<PriceIdentifier>,
request_time: RequestTime, request_time: RequestTime,
filter: Option<&dyn Fn(&MessageType) -> bool>, filter: MessageStateFilter,
) -> Result<Vec<MessageState>> { ) -> Result<Vec<MessageState>> {
// TODO: Should we return an error if any of the ids are not found?
ids.into_iter() ids.into_iter()
.flat_map(|id| { .flat_map(|id| {
let request_time = request_time.clone(); let request_time = request_time.clone();
let message_types: Vec<MessageType> = match filter { let message_types: Vec<MessageType> = match filter {
Some(filter) => MessageType::iter().filter(filter).collect(), MessageStateFilter::All => MessageType::iter().collect(),
None => MessageType::iter().collect(), MessageStateFilter::Only(t) => vec![t],
}; };
message_types.into_iter().map(move |message_type| { message_types.into_iter().map(move |message_type| {
let key = MessageIdentifier { let key = MessageIdentifier {
price_id: id, price_id: id,
@ -133,7 +146,20 @@ impl Storage for LocalStorage {
.collect() .collect()
} }
fn keys(&self) -> Vec<MessageIdentifier> { async fn message_state_keys(&self) -> Vec<MessageIdentifier> {
self.cache.iter().map(|entry| entry.key().clone()).collect() self.message_cache
.iter()
.map(|entry| entry.key().clone())
.collect()
}
async fn store_accumulator_state(&self, state: super::AccumulatorState) -> Result<()> {
let key = state.slot;
self.accumulator_cache.insert(key, state);
Ok(())
}
async fn fetch_accumulator_state(&self, slot: Slot) -> Result<Option<super::AccumulatorState>> {
Ok(self.accumulator_cache.get(&slot))
} }
} }

View File

@ -11,7 +11,7 @@ use {
// TODO: We can use strum on Message enum to derive this. // TODO: We can use strum on Message enum to derive this.
#[derive(Clone, Debug, Eq, PartialEq, Hash, EnumIter)] #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, EnumIter)]
pub enum MessageType { pub enum MessageType {
PriceFeedMessage, PriceFeedMessage,
TwapMessage, TwapMessage,