[hermes] Use raw_message for future-compatibility (#913)
This commit is contained in:
parent
742c37ed88
commit
dfc2df779c
|
@ -20,9 +20,7 @@ use {
|
||||||
Result,
|
Result,
|
||||||
},
|
},
|
||||||
borsh::BorshDeserialize,
|
borsh::BorshDeserialize,
|
||||||
byteorder::BE,
|
|
||||||
futures::stream::StreamExt,
|
futures::stream::StreamExt,
|
||||||
pythnet_sdk::messages::Message,
|
|
||||||
solana_account_decoder::UiAccountEncoding,
|
solana_account_decoder::UiAccountEncoding,
|
||||||
solana_client::{
|
solana_client::{
|
||||||
nonblocking::{
|
nonblocking::{
|
||||||
|
@ -157,44 +155,9 @@ pub async fn run(store: Arc<Store>, pythnet_ws_endpoint: String) -> Result<!> {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// The validators writes the accumulator messages using Borsh with
|
let accumulator_messages = AccumulatorMessages::try_from_slice(&account.data);
|
||||||
// the following struct. We cannot directly have messages as Vec<Messages>
|
|
||||||
// because they are serialized using big-endian byte order and Borsh
|
|
||||||
// uses little-endian byte order.
|
|
||||||
#[derive(BorshDeserialize)]
|
|
||||||
struct RawAccumulatorMessages {
|
|
||||||
pub magic: [u8; 4],
|
|
||||||
pub slot: u64,
|
|
||||||
pub ring_size: u32,
|
|
||||||
pub raw_messages: Vec<Vec<u8>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
let accumulator_messages = RawAccumulatorMessages::try_from_slice(&account.data);
|
|
||||||
match accumulator_messages {
|
match accumulator_messages {
|
||||||
Ok(accumulator_messages) => {
|
Ok(accumulator_messages) => {
|
||||||
let messages = accumulator_messages
|
|
||||||
.raw_messages
|
|
||||||
.iter()
|
|
||||||
.map(|message| {
|
|
||||||
pythnet_sdk::wire::from_slice::<BE, Message>(message.as_slice())
|
|
||||||
})
|
|
||||||
.collect::<Result<Vec<Message>, _>>();
|
|
||||||
|
|
||||||
let messages = match messages {
|
|
||||||
Ok(messages) => messages,
|
|
||||||
Err(err) => {
|
|
||||||
log::error!("Failed to parse messages: {:?}", err);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let accumulator_messages = AccumulatorMessages {
|
|
||||||
magic: accumulator_messages.magic,
|
|
||||||
slot: accumulator_messages.slot,
|
|
||||||
ring_size: accumulator_messages.ring_size,
|
|
||||||
messages,
|
|
||||||
};
|
|
||||||
|
|
||||||
let (candidate, _) = Pubkey::find_program_address(
|
let (candidate, _) = Pubkey::find_program_address(
|
||||||
&[
|
&[
|
||||||
b"AccumulatorState",
|
b"AccumulatorState",
|
||||||
|
|
|
@ -33,15 +33,19 @@ use {
|
||||||
anyhow,
|
anyhow,
|
||||||
Result,
|
Result,
|
||||||
},
|
},
|
||||||
|
byteorder::BigEndian,
|
||||||
pyth_sdk::PriceIdentifier,
|
pyth_sdk::PriceIdentifier,
|
||||||
pythnet_sdk::{
|
pythnet_sdk::{
|
||||||
messages::{
|
messages::{
|
||||||
Message,
|
Message,
|
||||||
MessageType,
|
MessageType,
|
||||||
},
|
},
|
||||||
wire::v1::{
|
wire::{
|
||||||
WormholeMessage,
|
from_slice,
|
||||||
WormholePayload,
|
v1::{
|
||||||
|
WormholeMessage,
|
||||||
|
WormholePayload,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
|
@ -199,12 +203,14 @@ impl Store {
|
||||||
|
|
||||||
let message_states = completed_state
|
let message_states = completed_state
|
||||||
.accumulator_messages
|
.accumulator_messages
|
||||||
.messages
|
.raw_messages
|
||||||
.iter()
|
.into_iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(idx, message)| {
|
.map(|(idx, raw_message)| {
|
||||||
Ok(MessageState::new(
|
Ok(MessageState::new(
|
||||||
*message,
|
from_slice::<BigEndian, _>(raw_message.as_ref())
|
||||||
|
.map_err(|e| anyhow!("Failed to deserialize message: {:?}", e))?,
|
||||||
|
raw_message,
|
||||||
ProofSet {
|
ProofSet {
|
||||||
wormhole_merkle_proof: wormhole_merkle_message_states_proofs
|
wormhole_merkle_proof: wormhole_merkle_message_states_proofs
|
||||||
.get(idx)
|
.get(idx)
|
||||||
|
|
|
@ -76,26 +76,20 @@ pub fn construct_message_states_proofs(
|
||||||
let accumulator_messages = &completed_accumulator_state.accumulator_messages;
|
let accumulator_messages = &completed_accumulator_state.accumulator_messages;
|
||||||
let wormhole_merkle_state = &completed_accumulator_state.wormhole_merkle_state;
|
let wormhole_merkle_state = &completed_accumulator_state.wormhole_merkle_state;
|
||||||
|
|
||||||
let raw_messages = accumulator_messages
|
|
||||||
.messages
|
|
||||||
.iter()
|
|
||||||
.map(|m| {
|
|
||||||
to_vec::<_, byteorder::BE>(m).map_err(|e| anyhow!("Failed to serialize message: {}", e))
|
|
||||||
})
|
|
||||||
.collect::<Result<Vec<Vec<u8>>>>()?;
|
|
||||||
|
|
||||||
// Check whether the state is valid
|
// Check whether the state is valid
|
||||||
let merkle_acc =
|
let merkle_acc = match MerkleTree::<Keccak160>::from_set(
|
||||||
match MerkleTree::<Keccak160>::from_set(raw_messages.iter().map(|m| m.as_ref())) {
|
accumulator_messages.raw_messages.iter().map(|m| m.as_ref()),
|
||||||
Some(merkle_acc) => merkle_acc,
|
) {
|
||||||
None => return Ok(vec![]), // It only happens when the message set is empty
|
Some(merkle_acc) => merkle_acc,
|
||||||
};
|
None => return Ok(vec![]), // It only happens when the message set is empty
|
||||||
|
};
|
||||||
|
|
||||||
if merkle_acc.root.as_bytes() != wormhole_merkle_state.root.root {
|
if merkle_acc.root.as_bytes() != wormhole_merkle_state.root.root {
|
||||||
return Err(anyhow!("Invalid merkle root"));
|
return Err(anyhow!("Invalid merkle root"));
|
||||||
}
|
}
|
||||||
|
|
||||||
raw_messages
|
accumulator_messages
|
||||||
|
.raw_messages
|
||||||
.iter()
|
.iter()
|
||||||
.map(|m| {
|
.map(|m| {
|
||||||
Ok(WormholeMerkleMessageProof {
|
Ok(WormholeMerkleMessageProof {
|
||||||
|
|
|
@ -4,6 +4,7 @@ use {
|
||||||
types::{
|
types::{
|
||||||
AccumulatorMessages,
|
AccumulatorMessages,
|
||||||
ProofSet,
|
ProofSet,
|
||||||
|
RawMessage,
|
||||||
RequestTime,
|
RequestTime,
|
||||||
Slot,
|
Slot,
|
||||||
UnixTimestamp,
|
UnixTimestamp,
|
||||||
|
@ -71,6 +72,7 @@ pub struct MessageStateTime {
|
||||||
pub struct MessageState {
|
pub struct MessageState {
|
||||||
pub slot: Slot,
|
pub slot: Slot,
|
||||||
pub message: Message,
|
pub message: Message,
|
||||||
|
pub raw_message: RawMessage,
|
||||||
pub proof_set: ProofSet,
|
pub proof_set: ProofSet,
|
||||||
pub received_at: UnixTimestamp,
|
pub received_at: UnixTimestamp,
|
||||||
}
|
}
|
||||||
|
@ -92,6 +94,7 @@ impl MessageState {
|
||||||
|
|
||||||
pub fn new(
|
pub fn new(
|
||||||
message: Message,
|
message: Message,
|
||||||
|
raw_message: RawMessage,
|
||||||
proof_set: ProofSet,
|
proof_set: ProofSet,
|
||||||
slot: Slot,
|
slot: Slot,
|
||||||
received_at: UnixTimestamp,
|
received_at: UnixTimestamp,
|
||||||
|
@ -99,6 +102,7 @@ impl MessageState {
|
||||||
Self {
|
Self {
|
||||||
slot,
|
slot,
|
||||||
message,
|
message,
|
||||||
|
raw_message,
|
||||||
proof_set,
|
proof_set,
|
||||||
received_at,
|
received_at,
|
||||||
}
|
}
|
||||||
|
@ -151,20 +155,20 @@ mod test {
|
||||||
wormhole_merkle_state: None,
|
wormhole_merkle_state: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err());
|
assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err());
|
||||||
|
|
||||||
let accumulator_state = AccumulatorState {
|
let accumulator_state = AccumulatorState {
|
||||||
slot: 1,
|
slot: 1,
|
||||||
accumulator_messages: Some(AccumulatorMessages {
|
accumulator_messages: Some(AccumulatorMessages {
|
||||||
slot: 1,
|
slot: 1,
|
||||||
magic: [0; 4],
|
magic: [0; 4],
|
||||||
ring_size: 10,
|
ring_size: 10,
|
||||||
messages: vec![],
|
raw_messages: vec![],
|
||||||
}),
|
}),
|
||||||
wormhole_merkle_state: None,
|
wormhole_merkle_state: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err());
|
assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err());
|
||||||
|
|
||||||
let accumulator_state = AccumulatorState {
|
let accumulator_state = AccumulatorState {
|
||||||
slot: 1,
|
slot: 1,
|
||||||
|
@ -179,15 +183,15 @@ mod test {
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
|
||||||
assert!(CompletedAccumulatorState::try_from(accumulator_state.clone()).is_err());
|
assert!(CompletedAccumulatorState::try_from(accumulator_state).is_err());
|
||||||
|
|
||||||
let accumulator_state = AccumulatorState {
|
let accumulator_state = AccumulatorState {
|
||||||
slot: 1,
|
slot: 1,
|
||||||
accumulator_messages: Some(AccumulatorMessages {
|
accumulator_messages: Some(AccumulatorMessages {
|
||||||
slot: 1,
|
slot: 1,
|
||||||
magic: [0; 4],
|
magic: [0; 4],
|
||||||
ring_size: 10,
|
ring_size: 10,
|
||||||
messages: vec![],
|
raw_messages: vec![],
|
||||||
}),
|
}),
|
||||||
wormhole_merkle_state: Some(WormholeMerkleState {
|
wormhole_merkle_state: Some(WormholeMerkleState {
|
||||||
vaa: vec![],
|
vaa: vec![],
|
||||||
|
@ -200,14 +204,14 @@ mod test {
|
||||||
};
|
};
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
CompletedAccumulatorState::try_from(accumulator_state.clone()).unwrap(),
|
CompletedAccumulatorState::try_from(accumulator_state).unwrap(),
|
||||||
CompletedAccumulatorState {
|
CompletedAccumulatorState {
|
||||||
slot: 1,
|
slot: 1,
|
||||||
accumulator_messages: AccumulatorMessages {
|
accumulator_messages: AccumulatorMessages {
|
||||||
slot: 1,
|
slot: 1,
|
||||||
magic: [0; 4],
|
magic: [0; 4],
|
||||||
ring_size: 10,
|
ring_size: 10,
|
||||||
messages: vec![],
|
raw_messages: vec![],
|
||||||
},
|
},
|
||||||
wormhole_merkle_state: WormholeMerkleState {
|
wormhole_merkle_state: WormholeMerkleState {
|
||||||
vaa: vec![],
|
vaa: vec![],
|
||||||
|
|
|
@ -211,6 +211,7 @@ mod test {
|
||||||
) -> MessageState {
|
) -> MessageState {
|
||||||
MessageState {
|
MessageState {
|
||||||
slot,
|
slot,
|
||||||
|
raw_message: vec![],
|
||||||
message: Message::PriceFeedMessage(PriceFeedMessage {
|
message: Message::PriceFeedMessage(PriceFeedMessage {
|
||||||
feed_id,
|
feed_id,
|
||||||
publish_time,
|
publish_time,
|
||||||
|
@ -574,10 +575,10 @@ mod test {
|
||||||
// Change the state to have accumulator messages
|
// Change the state to have accumulator messages
|
||||||
// We mutate the existing state because the normal flow is like this.
|
// We mutate the existing state because the normal flow is like this.
|
||||||
accumulator_state.accumulator_messages = Some(AccumulatorMessages {
|
accumulator_state.accumulator_messages = Some(AccumulatorMessages {
|
||||||
magic: [0; 4],
|
magic: [0; 4],
|
||||||
slot: 10,
|
slot: 10,
|
||||||
ring_size: 3,
|
ring_size: 3,
|
||||||
messages: vec![],
|
raw_messages: vec![],
|
||||||
});
|
});
|
||||||
|
|
||||||
// Store the accumulator state again.
|
// Store the accumulator state again.
|
||||||
|
|
|
@ -1,9 +1,7 @@
|
||||||
use {
|
use {
|
||||||
super::proof::wormhole_merkle::WormholeMerkleMessageProof,
|
super::proof::wormhole_merkle::WormholeMerkleMessageProof,
|
||||||
pythnet_sdk::messages::{
|
borsh::BorshDeserialize,
|
||||||
Message,
|
pythnet_sdk::messages::PriceFeedMessage,
|
||||||
PriceFeedMessage,
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Debug)]
|
#[derive(Clone, PartialEq, Debug)]
|
||||||
|
@ -20,12 +18,20 @@ pub enum RequestTime {
|
||||||
FirstAfter(UnixTimestamp),
|
FirstAfter(UnixTimestamp),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Debug)]
|
pub type RawMessage = Vec<u8>;
|
||||||
|
|
||||||
|
/// Accumulator messages coming from Pythnet validators.
|
||||||
|
///
|
||||||
|
/// The validators writes the accumulator messages using Borsh with
|
||||||
|
/// the following struct. We cannot directly have messages as Vec<Messages>
|
||||||
|
/// because they are serialized using big-endian byte order and Borsh
|
||||||
|
/// uses little-endian byte order.
|
||||||
|
#[derive(Clone, PartialEq, Debug, BorshDeserialize)]
|
||||||
pub struct AccumulatorMessages {
|
pub struct AccumulatorMessages {
|
||||||
pub magic: [u8; 4],
|
pub magic: [u8; 4],
|
||||||
pub slot: Slot,
|
pub slot: u64,
|
||||||
pub ring_size: u32,
|
pub ring_size: u32,
|
||||||
pub messages: Vec<Message>,
|
pub raw_messages: Vec<RawMessage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AccumulatorMessages {
|
impl AccumulatorMessages {
|
||||||
|
|
Loading…
Reference in New Issue