Merge remote-tracking branch 'origin/dev.v2' into drozdziak1/p2w-client-async-retries
commit-id:0d22d6a2
This commit is contained in:
commit
add82b4e33
|
@ -44,24 +44,24 @@ ENV EMITTER_ADDRESS="11111111111111111111111111111115"
|
|||
ENV BRIDGE_ADDRESS="Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o"
|
||||
|
||||
# Build Wormhole Solana programs
|
||||
RUN --mount=type=cache,target=bridge/target \
|
||||
--mount=type=cache,target=modules/token_bridge/target \
|
||||
--mount=type=cache,target=modules/nft_bridge/target \
|
||||
--mount=type=cache,target=pyth2wormhole/target \
|
||||
--mount=type=cache,target=migration/target \
|
||||
cargo build-bpf --manifest-path "bridge/program/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "bridge/cpi_poster/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "modules/token_bridge/program/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "pyth2wormhole/program/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "modules/nft_bridge/program/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "migration/Cargo.toml" -- --locked && \
|
||||
cp bridge/target/deploy/bridge.so /opt/solana/deps/bridge.so && \
|
||||
cp bridge/target/deploy/cpi_poster.so /opt/solana/deps/cpi_poster.so && \
|
||||
cp migration/target/deploy/wormhole_migration.so /opt/solana/deps/wormhole_migration.so && \
|
||||
cp modules/token_bridge/target/deploy/token_bridge.so /opt/solana/deps/token_bridge.so && \
|
||||
cp modules/nft_bridge/target/deploy/nft_bridge.so /opt/solana/deps/nft_bridge.so && \
|
||||
cp modules/token_bridge/token-metadata/spl_token_metadata.so /opt/solana/deps/spl_token_metadata.so && \
|
||||
cp pyth2wormhole/target/deploy/pyth2wormhole.so /opt/solana/deps/pyth2wormhole.so
|
||||
RUN --mount=type=cache,target=solana/bridge/target \
|
||||
--mount=type=cache,target=solana/modules/token_bridge/target \
|
||||
--mount=type=cache,target=solana/modules/nft_bridge/target \
|
||||
--mount=type=cache,target=solana/pyth2wormhole/target \
|
||||
--mount=type=cache,target=solana/migration/target \
|
||||
cargo build-bpf --manifest-path "solana/bridge/program/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "solana/bridge/cpi_poster/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "solana/modules/token_bridge/program/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "solana/pyth2wormhole/program/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "solana/modules/nft_bridge/program/Cargo.toml" -- --locked && \
|
||||
cargo build-bpf --manifest-path "solana/migration/Cargo.toml" -- --locked && \
|
||||
cp solana/bridge/target/deploy/bridge.so /opt/solana/deps/bridge.so && \
|
||||
cp solana/bridge/target/deploy/cpi_poster.so /opt/solana/deps/cpi_poster.so && \
|
||||
cp solana/migration/target/deploy/wormhole_migration.so /opt/solana/deps/wormhole_migration.so && \
|
||||
cp solana/modules/token_bridge/target/deploy/token_bridge.so /opt/solana/deps/token_bridge.so && \
|
||||
cp solana/modules/nft_bridge/target/deploy/nft_bridge.so /opt/solana/deps/nft_bridge.so && \
|
||||
cp solana/modules/token_bridge/token-metadata/spl_token_metadata.so /opt/solana/deps/spl_token_metadata.so && \
|
||||
cp solana/pyth2wormhole/target/deploy/pyth2wormhole.so /opt/solana/deps/pyth2wormhole.so
|
||||
|
||||
# Build the Pyth Solana program
|
||||
WORKDIR $PYTH_DIR/pyth-client/program
|
|
@ -14,6 +14,7 @@ ENV EMITTER_ADDRESS="11111111111111111111111111111115"
|
|||
ENV BRIDGE_ADDRESS="Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o"
|
||||
|
||||
COPY solana solana
|
||||
COPY third_party third_party
|
||||
|
||||
# wasm-bindgen 0.2.74 generates JavaScript bindings for SystemInstruction exported from solana-program 1.9.4.
|
||||
# The generated JavaScript references a non-existent function (wasm.__wbg_systeminstruction_free) that leads
|
||||
|
@ -65,13 +66,13 @@ RUN --mount=type=cache,target=/root/.cache \
|
|||
|
||||
# Compile pyth2wormhole
|
||||
RUN --mount=type=cache,target=/root/.cache \
|
||||
--mount=type=cache,target=solana/pyth2wormhole/target \
|
||||
cd solana/pyth2wormhole/program \
|
||||
--mount=type=cache,target=third_party/pyth/p2w-sdk/rust/target \
|
||||
cd third_party/pyth/p2w-sdk/rust \
|
||||
&& /usr/local/cargo/bin/wasm-pack build --target bundler -d bundler -- --features wasm --locked
|
||||
|
||||
RUN --mount=type=cache,target=/root/.cache \
|
||||
--mount=type=cache,target=solana/pyth2wormhole/target \
|
||||
cd solana/pyth2wormhole/program \
|
||||
--mount=type=cache,target=third_party/pyth/p2w-sdk/rust/target \
|
||||
cd third_party/pyth/p2w-sdk/rust \
|
||||
&& /usr/local/cargo/bin/wasm-pack build --target nodejs -d nodejs -- --features wasm --locked
|
||||
|
||||
FROM scratch AS export
|
||||
|
@ -81,7 +82,7 @@ COPY --from=build /usr/src/bridge/solana/modules/token_bridge/program/bundler sd
|
|||
COPY --from=build /usr/src/bridge/solana/migration/bundler sdk/js/src/solana/migration
|
||||
COPY --from=build /usr/src/bridge/solana/modules/nft_bridge/program/bundler sdk/js/src/solana/nft
|
||||
|
||||
COPY --from=build /usr/src/bridge/solana/pyth2wormhole/program/bundler third_party/pyth/p2w-sdk/js/src/solana/p2w-core
|
||||
COPY --from=build /usr/src/bridge/third_party/pyth/p2w-sdk/rust/bundler third_party/pyth/p2w-sdk/js/src/solana/p2w-core
|
||||
COPY --from=build /usr/src/bridge/solana/bridge/program/bundler third_party/pyth/p2w-sdk/js/src/solana/wormhole-core
|
||||
|
||||
COPY --from=build /usr/src/bridge/solana/bridge/program/nodejs sdk/js/src/solana/core-node
|
||||
|
|
4
Tiltfile
4
Tiltfile
|
@ -207,8 +207,8 @@ docker_build(
|
|||
|
||||
docker_build(
|
||||
ref = "solana-contract",
|
||||
context = "solana",
|
||||
dockerfile = "solana/Dockerfile",
|
||||
context = ".",
|
||||
dockerfile = "Dockerfile.solana",
|
||||
)
|
||||
|
||||
# solana local devnet
|
||||
|
|
|
@ -1594,6 +1594,16 @@ dependencies = [
|
|||
"syn 1.0.73",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "p2w-sdk"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"pyth-client 0.5.0",
|
||||
"serde",
|
||||
"solana-program",
|
||||
"solitaire",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.11.1"
|
||||
|
@ -1746,12 +1756,29 @@ version = "0.2.2"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44de48029c54ec1ca570786b5baeb906b0fc2409c8e0145585e287ee7a526c72"
|
||||
|
||||
[[package]]
|
||||
name = "pyth-client"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f779e98b8c8016d0c1409247a204bd4fcdea8b67ceeef545f04e324d66c49e52"
|
||||
dependencies = [
|
||||
"borsh",
|
||||
"borsh-derive",
|
||||
"bytemuck",
|
||||
"num-derive",
|
||||
"num-traits",
|
||||
"serde",
|
||||
"solana-program",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pyth2wormhole"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"borsh",
|
||||
"pyth-client",
|
||||
"p2w-sdk",
|
||||
"pyth-client 0.2.2",
|
||||
"rocksalt",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
|
@ -1759,7 +1786,6 @@ dependencies = [
|
|||
"solana-program",
|
||||
"solitaire",
|
||||
"solitaire-client",
|
||||
"wasm-bindgen",
|
||||
"wormhole-bridge-solana",
|
||||
]
|
||||
|
||||
|
@ -1771,6 +1797,7 @@ dependencies = [
|
|||
"clap 3.0.0-beta.2",
|
||||
"env_logger 0.8.4",
|
||||
"log",
|
||||
"p2w-sdk",
|
||||
"pyth2wormhole",
|
||||
"serde",
|
||||
"serde_yaml",
|
||||
|
@ -2124,9 +2151,9 @@ checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012"
|
|||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.133"
|
||||
version = "1.0.136"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97565067517b60e2d1ea8b268e59ce036de907ac523ad83a0475da04e818989a"
|
||||
checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
|
||||
dependencies = [
|
||||
"serde_derive",
|
||||
]
|
||||
|
@ -2142,9 +2169,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "serde_derive"
|
||||
version = "1.0.133"
|
||||
version = "1.0.136"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ed201699328568d8d08208fdd080e3ff594e6c422e438b6705905da01005d537"
|
||||
checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9"
|
||||
dependencies = [
|
||||
"proc-macro2 1.0.27",
|
||||
"quote 1.0.9",
|
||||
|
@ -3376,8 +3403,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"wasm-bindgen-macro",
|
||||
]
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ env_logger = "0.8.4"
|
|||
log = "0.4.14"
|
||||
wormhole-bridge-solana = {path = "../../bridge/program"}
|
||||
pyth2wormhole = {path = "../program"}
|
||||
p2w-sdk = { path = "../../../third_party/pyth/p2w-sdk/rust" }
|
||||
serde = "1"
|
||||
serde_yaml = "0.8"
|
||||
shellexpand = "2.1.0"
|
||||
|
|
|
@ -53,7 +53,7 @@ pub enum Action {
|
|||
short = 'n',
|
||||
long = "--retries",
|
||||
about = "How many times to retry each batch on failure",
|
||||
default_value = "3"
|
||||
default_value = "3"
|
||||
)]
|
||||
n_retries: usize,
|
||||
},
|
||||
|
|
|
@ -41,11 +41,10 @@ use bridge::{
|
|||
types::ConsistencyLevel,
|
||||
};
|
||||
|
||||
use p2w_sdk::P2WEmitter;
|
||||
|
||||
use pyth2wormhole::{
|
||||
attest::{
|
||||
P2WEmitter,
|
||||
P2W_MAX_BATCH_SIZE,
|
||||
},
|
||||
attest::P2W_MAX_BATCH_SIZE,
|
||||
config::P2WConfigAccount,
|
||||
initialize::InitializeAccounts,
|
||||
set_config::SetConfigAccounts,
|
||||
|
|
|
@ -28,7 +28,7 @@ use cli::{
|
|||
Cli,
|
||||
};
|
||||
|
||||
use pyth2wormhole::attest::P2WEmitter;
|
||||
use p2w_sdk::P2WEmitter;
|
||||
|
||||
use pyth2wormhole_client::*;
|
||||
|
||||
|
@ -74,8 +74,8 @@ fn main() -> Result<(), ErrBox> {
|
|||
payer,
|
||||
p2w_addr,
|
||||
read_keypair_file(&*shellexpand::tilde(&owner))?,
|
||||
new_owner_addr,
|
||||
new_wh_prog,
|
||||
new_owner_addr,
|
||||
new_wh_prog,
|
||||
new_pyth_owner_addr,
|
||||
latest_blockhash,
|
||||
)?;
|
||||
|
@ -146,7 +146,6 @@ fn handle_attest(
|
|||
// If no batches are scheduled for retry, the vector eventually drains
|
||||
while !batches.is_empty() {
|
||||
for (batch_no, symbols, attempt_no) in batches {
|
||||
let sym_msg_keypair = Keypair::new();
|
||||
info!(
|
||||
"Batch {}/{} contents: {:?}",
|
||||
batch_no,
|
||||
|
|
|
@ -13,7 +13,6 @@ default = ["wormhole-bridge-solana/no-entrypoint"]
|
|||
client = ["solitaire/client", "solitaire-client", "no-entrypoint"]
|
||||
trace = ["solitaire/trace", "wormhole-bridge-solana/trace"]
|
||||
no-entrypoint = []
|
||||
wasm = ["no-entrypoint", "wasm-bindgen", "serde", "serde_derive", "serde_json"]
|
||||
|
||||
[dependencies]
|
||||
wormhole-bridge-solana = {path = "../../bridge/program"}
|
||||
|
@ -23,8 +22,7 @@ rocksalt = { path = "../../solitaire/rocksalt" }
|
|||
solana-program = "=1.9.4"
|
||||
borsh = "=0.9.1"
|
||||
pyth-client = "0.2.2"
|
||||
# Crates needed for easier wasm data passing
|
||||
wasm-bindgen = { version = "0.2.74", features = ["serde-serialize"], optional = true}
|
||||
p2w-sdk = { path = "../../../third_party/pyth/p2w-sdk/rust" }
|
||||
serde = { version = "1", optional = true}
|
||||
serde_derive = { version = "1", optional = true}
|
||||
serde_json = { version = "1", optional = true}
|
||||
|
|
|
@ -1,10 +1,4 @@
|
|||
use crate::{
|
||||
config::P2WConfigAccount,
|
||||
types::{
|
||||
batch_serialize,
|
||||
PriceAttestation,
|
||||
},
|
||||
};
|
||||
use crate::config::P2WConfigAccount;
|
||||
use borsh::{
|
||||
BorshDeserialize,
|
||||
BorshSerialize,
|
||||
|
@ -24,6 +18,12 @@ use solana_program::{
|
|||
rent::Rent,
|
||||
};
|
||||
|
||||
use p2w_sdk::{
|
||||
BatchPriceAttestation,
|
||||
PriceAttestation,
|
||||
P2WEmitter,
|
||||
};
|
||||
|
||||
use bridge::{
|
||||
accounts::BridgeData,
|
||||
types::ConsistencyLevel,
|
||||
|
@ -50,8 +50,6 @@ use solitaire::{
|
|||
ToInstruction,
|
||||
};
|
||||
|
||||
pub type P2WEmitter<'b> = Derive<Info<'b>, "p2w-emitter">;
|
||||
|
||||
/// Important: must be manually maintained until native Solitaire
|
||||
/// variable len vector support.
|
||||
///
|
||||
|
@ -209,7 +207,11 @@ pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> So
|
|||
price.key.clone(),
|
||||
accs.clock.unix_timestamp,
|
||||
&*price.try_borrow_data()?,
|
||||
)?;
|
||||
)
|
||||
.map_err(|e| {
|
||||
trace!(e.to_string());
|
||||
ProgramError::InvalidAccountData
|
||||
})?;
|
||||
|
||||
// The following check is crucial against poorly ordered
|
||||
// account inputs, e.g. [Some(prod1), Some(price1),
|
||||
|
@ -230,6 +232,10 @@ pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> So
|
|||
attestations.push(attestation);
|
||||
}
|
||||
|
||||
let batch_attestation = BatchPriceAttestation {
|
||||
price_attestations: attestations,
|
||||
};
|
||||
|
||||
trace!("Attestations successfully created");
|
||||
|
||||
let bridge_config = BridgeData::try_from_slice(&accs.wh_bridge.try_borrow_mut_data()?)?.config;
|
||||
|
@ -247,7 +253,7 @@ pub fn attest(ctx: &ExecutionContext, accs: &mut Attest, data: AttestData) -> So
|
|||
bridge::instruction::Instruction::PostMessage,
|
||||
PostMessageData {
|
||||
nonce: 0, // Superseded by the sequence number
|
||||
payload: batch_serialize(attestations.as_slice().iter()).map_err(|e| {
|
||||
payload: batch_attestation.serialize().map_err(|e| {
|
||||
trace!(e.to_string());
|
||||
ProgramError::InvalidAccountData
|
||||
})?,
|
||||
|
|
|
@ -3,7 +3,6 @@ pub mod attest;
|
|||
pub mod config;
|
||||
pub mod initialize;
|
||||
pub mod set_config;
|
||||
pub mod types;
|
||||
|
||||
use solitaire::solitaire;
|
||||
|
||||
|
|
|
@ -1,653 +0,0 @@
|
|||
//! Constants and values common to every p2w custom-serialized message.
|
||||
//!
|
||||
//! The format makes no attempt to provide human-readable symbol names
|
||||
//! in favor of explicit product/price Solana account addresses
|
||||
//! (IDs). This choice was made to disambiguate any symbols with
|
||||
//! similar human-readable names and provide a failsafe for some of
|
||||
//! the probable adversarial scenarios.
|
||||
|
||||
pub mod pyth_extensions;
|
||||
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
convert::{
|
||||
TryFrom,
|
||||
TryInto,
|
||||
},
|
||||
io::Read,
|
||||
iter::Iterator,
|
||||
mem,
|
||||
};
|
||||
|
||||
use borsh::BorshSerialize;
|
||||
use pyth_client::{
|
||||
AccountType,
|
||||
CorpAction,
|
||||
Ema,
|
||||
Price,
|
||||
PriceStatus,
|
||||
PriceType,
|
||||
};
|
||||
use solana_program::{
|
||||
clock::UnixTimestamp,
|
||||
program_error::ProgramError,
|
||||
pubkey::Pubkey,
|
||||
};
|
||||
use solitaire::{
|
||||
trace,
|
||||
ErrBox,
|
||||
Result as SoliResult,
|
||||
SolitaireError,
|
||||
};
|
||||
|
||||
use self::pyth_extensions::{
|
||||
P2WCorpAction,
|
||||
P2WEma,
|
||||
P2WPriceStatus,
|
||||
P2WPriceType,
|
||||
};
|
||||
|
||||
/// Precedes every message implementing the p2w serialization format
|
||||
pub const P2W_MAGIC: &'static [u8] = b"P2WH";
|
||||
|
||||
/// Format version used and understood by this codebase
|
||||
pub const P2W_FORMAT_VERSION: u16 = 2;
|
||||
|
||||
pub const PUBKEY_LEN: usize = 32;
|
||||
|
||||
/// Decides the format of following bytes
|
||||
#[repr(u8)]
|
||||
pub enum PayloadId {
|
||||
PriceAttestation = 1, // Not in use, currently batch attestations imply PriceAttestation messages inside
|
||||
PriceBatchAttestation,
|
||||
}
|
||||
|
||||
// On-chain data types
|
||||
|
||||
/// The main attestation data type.
|
||||
///
|
||||
/// Important: For maximum security, *both* product_id and price_id
|
||||
/// should be used as storage keys for known attestations in target
|
||||
/// chain logic.
|
||||
#[derive(Clone, Default, Debug, Eq, PartialEq)]
|
||||
#[cfg_attr(
|
||||
feature = "wasm",
|
||||
derive(serde_derive::Serialize, serde_derive::Deserialize)
|
||||
)]
|
||||
pub struct PriceAttestation {
|
||||
pub product_id: Pubkey,
|
||||
pub price_id: Pubkey,
|
||||
pub price_type: P2WPriceType,
|
||||
pub price: i64,
|
||||
pub expo: i32,
|
||||
pub twap: P2WEma,
|
||||
pub twac: P2WEma,
|
||||
pub confidence_interval: u64,
|
||||
pub status: P2WPriceStatus,
|
||||
pub corp_act: P2WCorpAction,
|
||||
pub timestamp: UnixTimestamp,
|
||||
}
|
||||
|
||||
/// Turn a bunch of attestations into a combined payload.
|
||||
///
|
||||
/// Batches assume constant-size attestations within a single batch.
|
||||
pub fn batch_serialize(
|
||||
attestations: impl Iterator<Item = impl Borrow<PriceAttestation>>,
|
||||
) -> Result<Vec<u8>, ErrBox> {
|
||||
// magic
|
||||
let mut buf = P2W_MAGIC.to_vec();
|
||||
|
||||
// version
|
||||
buf.extend_from_slice(&P2W_FORMAT_VERSION.to_be_bytes()[..]);
|
||||
|
||||
// payload_id
|
||||
buf.push(PayloadId::PriceBatchAttestation as u8);
|
||||
|
||||
let collected: Vec<_> = attestations.collect();
|
||||
|
||||
// n_attestations
|
||||
buf.extend_from_slice(&(collected.len() as u16).to_be_bytes()[..]);
|
||||
|
||||
let mut attestation_size = 0; // Will be determined as we serialize attestations
|
||||
let mut serialized_attestations = Vec::with_capacity(collected.len());
|
||||
for (idx, a) in collected.iter().enumerate() {
|
||||
// Learn the current attestation's size
|
||||
let serialized = PriceAttestation::serialize(a.borrow());
|
||||
let a_len = serialized.len();
|
||||
|
||||
// Verify it's the same as the first one we saw for the batch, assign if we're first.
|
||||
if attestation_size > 0 {
|
||||
if a_len != attestation_size {
|
||||
return Err(format!(
|
||||
"attestation {} serializes to {} bytes, {} expected",
|
||||
idx + 1,
|
||||
a_len,
|
||||
attestation_size
|
||||
)
|
||||
.into());
|
||||
}
|
||||
} else {
|
||||
attestation_size = a_len;
|
||||
}
|
||||
|
||||
serialized_attestations.push(serialized);
|
||||
}
|
||||
|
||||
// attestation_size
|
||||
buf.extend_from_slice(&(attestation_size as u16).to_be_bytes()[..]);
|
||||
|
||||
for mut s in serialized_attestations.into_iter() {
|
||||
buf.append(&mut s)
|
||||
}
|
||||
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
/// Undo `batch_serialize`
|
||||
pub fn batch_deserialize(mut bytes: impl Read) -> Result<Vec<PriceAttestation>, ErrBox> {
|
||||
let mut magic_vec = vec![0u8; P2W_MAGIC.len()];
|
||||
bytes.read_exact(magic_vec.as_mut_slice())?;
|
||||
|
||||
if magic_vec.as_slice() != P2W_MAGIC {
|
||||
return Err(format!(
|
||||
"Invalid magic {:02X?}, expected {:02X?}",
|
||||
magic_vec, P2W_MAGIC,
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let mut version_vec = vec![0u8; mem::size_of_val(&P2W_FORMAT_VERSION)];
|
||||
bytes.read_exact(version_vec.as_mut_slice())?;
|
||||
let version = u16::from_be_bytes(version_vec.as_slice().try_into()?);
|
||||
|
||||
if version != P2W_FORMAT_VERSION {
|
||||
return Err(format!(
|
||||
"Unsupported format version {}, expected {}",
|
||||
version, P2W_FORMAT_VERSION
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let mut payload_id_vec = vec![0u8; mem::size_of::<PayloadId>()];
|
||||
bytes.read_exact(payload_id_vec.as_mut_slice())?;
|
||||
|
||||
if payload_id_vec[0] != PayloadId::PriceBatchAttestation as u8 {
|
||||
return Err(format!(
|
||||
"Invalid Payload ID {}, expected {}",
|
||||
payload_id_vec[0],
|
||||
PayloadId::PriceBatchAttestation as u8,
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let mut batch_len_vec = vec![0u8; 2];
|
||||
bytes.read_exact(batch_len_vec.as_mut_slice())?;
|
||||
let batch_len = u16::from_be_bytes(batch_len_vec.as_slice().try_into()?);
|
||||
|
||||
let mut attestation_size_vec = vec![0u8; 2];
|
||||
bytes.read_exact(attestation_size_vec.as_mut_slice())?;
|
||||
let attestation_size = u16::from_be_bytes(attestation_size_vec.as_slice().try_into()?);
|
||||
|
||||
let mut ret = Vec::with_capacity(batch_len as usize);
|
||||
|
||||
for i in 0..batch_len {
|
||||
let mut attestation_buf = vec![0u8; attestation_size as usize];
|
||||
bytes.read_exact(attestation_buf.as_mut_slice())?;
|
||||
|
||||
dbg!(&attestation_buf.len());
|
||||
|
||||
match PriceAttestation::deserialize(attestation_buf.as_slice()) {
|
||||
Ok(attestation) => ret.push(attestation),
|
||||
Err(e) => return Err(format!("PriceAttestation {}/{}: {}", i + 1, batch_len, e).into()),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
impl PriceAttestation {
|
||||
pub fn from_pyth_price_bytes(
|
||||
price_id: Pubkey,
|
||||
timestamp: UnixTimestamp,
|
||||
value: &[u8],
|
||||
) -> Result<Self, SolitaireError> {
|
||||
let price = parse_pyth_price(value)?;
|
||||
|
||||
Ok(PriceAttestation {
|
||||
product_id: Pubkey::new(&price.prod.val[..]),
|
||||
price_id,
|
||||
price_type: (&price.ptype).into(),
|
||||
price: price.agg.price,
|
||||
twap: (&price.twap).into(),
|
||||
twac: (&price.twac).into(),
|
||||
expo: price.expo,
|
||||
confidence_interval: price.agg.conf,
|
||||
status: (&price.agg.status).into(),
|
||||
corp_act: (&price.agg.corp_act).into(),
|
||||
timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
/// Serialize this attestation according to the Pyth-over-wormhole serialization format
|
||||
pub fn serialize(&self) -> Vec<u8> {
|
||||
// A nifty trick to get us yelled at if we forget to serialize a field
|
||||
#[deny(warnings)]
|
||||
let PriceAttestation {
|
||||
product_id,
|
||||
price_id,
|
||||
price_type,
|
||||
price,
|
||||
expo,
|
||||
twap,
|
||||
twac,
|
||||
confidence_interval,
|
||||
status,
|
||||
corp_act,
|
||||
timestamp,
|
||||
} = self;
|
||||
|
||||
// magic
|
||||
let mut buf = P2W_MAGIC.to_vec();
|
||||
|
||||
// version
|
||||
buf.extend_from_slice(&P2W_FORMAT_VERSION.to_be_bytes()[..]);
|
||||
|
||||
// payload_id
|
||||
buf.push(PayloadId::PriceAttestation as u8);
|
||||
|
||||
// product_id
|
||||
buf.extend_from_slice(&product_id.to_bytes()[..]);
|
||||
|
||||
// price_id
|
||||
buf.extend_from_slice(&price_id.to_bytes()[..]);
|
||||
|
||||
// price_type
|
||||
buf.push(price_type.clone() as u8);
|
||||
|
||||
// price
|
||||
buf.extend_from_slice(&price.to_be_bytes()[..]);
|
||||
|
||||
// exponent
|
||||
buf.extend_from_slice(&expo.to_be_bytes()[..]);
|
||||
|
||||
// twap
|
||||
buf.append(&mut twap.serialize());
|
||||
|
||||
// twac
|
||||
buf.append(&mut twac.serialize());
|
||||
|
||||
// confidence_interval
|
||||
buf.extend_from_slice(&confidence_interval.to_be_bytes()[..]);
|
||||
|
||||
// status
|
||||
buf.push(status.clone() as u8);
|
||||
|
||||
// corp_act
|
||||
buf.push(corp_act.clone() as u8);
|
||||
|
||||
// timestamp
|
||||
buf.extend_from_slice(×tamp.to_be_bytes()[..]);
|
||||
|
||||
buf
|
||||
}
|
||||
pub fn deserialize(mut bytes: impl Read) -> Result<Self, ErrBox> {
|
||||
use P2WCorpAction::*;
|
||||
use P2WPriceStatus::*;
|
||||
use P2WPriceType::*;
|
||||
|
||||
let mut magic_vec = vec![0u8; P2W_MAGIC.len()];
|
||||
|
||||
bytes.read_exact(magic_vec.as_mut_slice())?;
|
||||
|
||||
if magic_vec.as_slice() != P2W_MAGIC {
|
||||
return Err(format!(
|
||||
"Invalid magic {:02X?}, expected {:02X?}",
|
||||
magic_vec, P2W_MAGIC,
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let mut version_vec = vec![0u8; mem::size_of_val(&P2W_FORMAT_VERSION)];
|
||||
bytes.read_exact(version_vec.as_mut_slice())?;
|
||||
let version = u16::from_be_bytes(version_vec.as_slice().try_into()?);
|
||||
|
||||
if version != P2W_FORMAT_VERSION {
|
||||
return Err(format!(
|
||||
"Unsupported format version {}, expected {}",
|
||||
version, P2W_FORMAT_VERSION
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let mut payload_id_vec = vec![0u8; mem::size_of::<PayloadId>()];
|
||||
bytes.read_exact(payload_id_vec.as_mut_slice())?;
|
||||
|
||||
if PayloadId::PriceAttestation as u8 != payload_id_vec[0] {
|
||||
return Err(format!(
|
||||
"Invalid Payload ID {}, expected {}",
|
||||
payload_id_vec[0],
|
||||
PayloadId::PriceAttestation as u8,
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let mut product_id_vec = vec![0u8; PUBKEY_LEN];
|
||||
bytes.read_exact(product_id_vec.as_mut_slice())?;
|
||||
let product_id = Pubkey::new(product_id_vec.as_slice());
|
||||
|
||||
let mut price_id_vec = vec![0u8; PUBKEY_LEN];
|
||||
bytes.read_exact(price_id_vec.as_mut_slice())?;
|
||||
let price_id = Pubkey::new(price_id_vec.as_slice());
|
||||
|
||||
let mut price_type_vec = vec![0u8; mem::size_of::<P2WPriceType>()];
|
||||
bytes.read_exact(price_type_vec.as_mut_slice())?;
|
||||
let price_type = match price_type_vec[0] {
|
||||
a if a == Price as u8 => Price,
|
||||
a if a == P2WPriceType::Unknown as u8 => P2WPriceType::Unknown,
|
||||
other => {
|
||||
return Err(format!("Invalid price_type value {}", other).into());
|
||||
}
|
||||
};
|
||||
|
||||
let mut price_vec = vec![0u8; mem::size_of::<i64>()];
|
||||
bytes.read_exact(price_vec.as_mut_slice())?;
|
||||
let price = i64::from_be_bytes(price_vec.as_slice().try_into()?);
|
||||
|
||||
let mut expo_vec = vec![0u8; mem::size_of::<i32>()];
|
||||
bytes.read_exact(expo_vec.as_mut_slice())?;
|
||||
let expo = i32::from_be_bytes(expo_vec.as_slice().try_into()?);
|
||||
|
||||
let twap = P2WEma::deserialize(&mut bytes)?;
|
||||
let twac = P2WEma::deserialize(&mut bytes)?;
|
||||
|
||||
println!("twac OK");
|
||||
let mut confidence_interval_vec = vec![0u8; mem::size_of::<u64>()];
|
||||
bytes.read_exact(confidence_interval_vec.as_mut_slice())?;
|
||||
let confidence_interval =
|
||||
u64::from_be_bytes(confidence_interval_vec.as_slice().try_into()?);
|
||||
|
||||
let mut status_vec = vec![0u8; mem::size_of::<P2WPriceType>()];
|
||||
bytes.read_exact(status_vec.as_mut_slice())?;
|
||||
let status = match status_vec[0] {
|
||||
a if a == P2WPriceStatus::Unknown as u8 => P2WPriceStatus::Unknown,
|
||||
a if a == Trading as u8 => Trading,
|
||||
a if a == Halted as u8 => Halted,
|
||||
a if a == Auction as u8 => Auction,
|
||||
other => {
|
||||
return Err(format!("Invalid status value {}", other).into());
|
||||
}
|
||||
};
|
||||
|
||||
let mut corp_act_vec = vec![0u8; mem::size_of::<P2WPriceType>()];
|
||||
bytes.read_exact(corp_act_vec.as_mut_slice())?;
|
||||
let corp_act = match corp_act_vec[0] {
|
||||
a if a == NoCorpAct as u8 => NoCorpAct,
|
||||
other => {
|
||||
return Err(format!("Invalid corp_act value {}", other).into());
|
||||
}
|
||||
};
|
||||
|
||||
let mut timestamp_vec = vec![0u8; mem::size_of::<UnixTimestamp>()];
|
||||
bytes.read_exact(timestamp_vec.as_mut_slice())?;
|
||||
let timestamp = UnixTimestamp::from_be_bytes(timestamp_vec.as_slice().try_into()?);
|
||||
|
||||
Ok(Self {
|
||||
product_id,
|
||||
price_id,
|
||||
price_type,
|
||||
price,
|
||||
expo,
|
||||
twap,
|
||||
twac,
|
||||
confidence_interval,
|
||||
status,
|
||||
corp_act,
|
||||
timestamp,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserializes Price from raw bytes, sanity-check.
|
||||
fn parse_pyth_price(price_data: &[u8]) -> SoliResult<&Price> {
|
||||
if price_data.len() != mem::size_of::<Price>() {
|
||||
trace!(&format!(
|
||||
"parse_pyth_price: buffer length mismatch ({} expected, got {})",
|
||||
mem::size_of::<Price>(),
|
||||
price_data.len()
|
||||
));
|
||||
return Err(ProgramError::InvalidAccountData.into());
|
||||
}
|
||||
let price_account = pyth_client::cast::<Price>(price_data);
|
||||
|
||||
if price_account.atype != AccountType::Price as u32 {
|
||||
trace!(&format!(
|
||||
"parse_pyth_price: AccountType mismatch ({} expected, got {})",
|
||||
mem::size_of::<Price>(),
|
||||
price_data.len()
|
||||
));
|
||||
return Err(ProgramError::InvalidAccountData.into());
|
||||
}
|
||||
|
||||
Ok(price_account)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pyth_client::{
|
||||
AccKey,
|
||||
AccountType,
|
||||
PriceComp,
|
||||
PriceInfo,
|
||||
};
|
||||
|
||||
macro_rules! empty_acckey {
|
||||
() => {
|
||||
AccKey { val: [0u8; 32] }
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! empty_priceinfo {
|
||||
() => {
|
||||
PriceInfo {
|
||||
price: 0,
|
||||
conf: 0,
|
||||
status: PriceStatus::Unknown,
|
||||
corp_act: CorpAction::NoCorpAct,
|
||||
pub_slot: 0,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! empty_pricecomp {
|
||||
() => {
|
||||
PriceComp {
|
||||
publisher: empty_acckey!(),
|
||||
agg: empty_priceinfo!(),
|
||||
latest: empty_priceinfo!(),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! empty_ema {
|
||||
() => {
|
||||
(&P2WEma::default()).into()
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! empty_price {
|
||||
() => {
|
||||
Price {
|
||||
magic: pyth_client::MAGIC,
|
||||
ver: pyth_client::VERSION,
|
||||
atype: AccountType::Price as u32,
|
||||
size: 0,
|
||||
ptype: PriceType::Price,
|
||||
expo: 0,
|
||||
num: 0,
|
||||
num_qt: 0,
|
||||
last_slot: 0,
|
||||
valid_slot: 0,
|
||||
drv1: 0,
|
||||
drv2: 0,
|
||||
drv3: 0,
|
||||
twap: empty_ema!(),
|
||||
twac: empty_ema!(),
|
||||
prod: empty_acckey!(),
|
||||
next: empty_acckey!(),
|
||||
prev_slot: 0, // valid slot of previous update
|
||||
prev_price: 0, // aggregate price of previous update
|
||||
prev_conf: 0, // confidence interval of previous update
|
||||
agg: empty_priceinfo!(),
|
||||
// A nice macro might come in handy if this gets annoying
|
||||
comp: [
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
empty_pricecomp!(),
|
||||
],
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn mock_attestation(prod: Option<[u8; 32]>, price: Option<[u8; 32]>) -> PriceAttestation {
|
||||
let product_id_bytes = prod.unwrap_or([21u8; 32]);
|
||||
let price_id_bytes = prod.unwrap_or([222u8; 32]);
|
||||
PriceAttestation {
|
||||
product_id: Pubkey::new_from_array(product_id_bytes),
|
||||
price_id: Pubkey::new_from_array(price_id_bytes),
|
||||
price: (0xdeadbeefdeadbabe as u64) as i64,
|
||||
price_type: P2WPriceType::Price,
|
||||
twap: P2WEma {
|
||||
val: -42,
|
||||
numer: 15,
|
||||
denom: 37,
|
||||
},
|
||||
twac: P2WEma {
|
||||
val: 42,
|
||||
numer: 1111,
|
||||
denom: 2222,
|
||||
},
|
||||
expo: -3,
|
||||
status: P2WPriceStatus::Trading,
|
||||
confidence_interval: 101,
|
||||
corp_act: P2WCorpAction::NoCorpAct,
|
||||
timestamp: 123456789i64,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_pyth_price_wrong_size_slices() {
|
||||
assert!(parse_pyth_price(&[]).is_err());
|
||||
assert!(parse_pyth_price(vec![0u8; 1].as_slice()).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_pyth_price() -> SoliResult<()> {
|
||||
let price = Price {
|
||||
expo: 5,
|
||||
agg: PriceInfo {
|
||||
price: 42,
|
||||
..empty_priceinfo!()
|
||||
},
|
||||
..empty_price!()
|
||||
};
|
||||
let price_vec = vec![price];
|
||||
|
||||
// use the C repr to mock pyth's format
|
||||
let (_, bytes, _) = unsafe { price_vec.as_slice().align_to::<u8>() };
|
||||
|
||||
parse_pyth_price(bytes)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_attestation_serde() -> Result<(), ErrBox> {
|
||||
let product_id_bytes = [21u8; 32];
|
||||
let price_id_bytes = [222u8; 32];
|
||||
let attestation: PriceAttestation =
|
||||
mock_attestation(Some(product_id_bytes), Some(price_id_bytes));
|
||||
|
||||
println!("Hex product_id: {:02X?}", &product_id_bytes);
|
||||
println!("Hex price_id: {:02X?}", &price_id_bytes);
|
||||
|
||||
println!("Regular: {:#?}", &attestation);
|
||||
println!("Hex: {:#02X?}", &attestation);
|
||||
let bytes = attestation.serialize();
|
||||
println!("Hex Bytes: {:02X?}", bytes);
|
||||
|
||||
assert_eq!(
|
||||
PriceAttestation::deserialize(bytes.as_slice())?,
|
||||
attestation
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_attestation_serde_wrong_size() -> Result<(), ErrBox> {
|
||||
assert!(PriceAttestation::deserialize(&[][..]).is_err());
|
||||
assert!(PriceAttestation::deserialize(vec![0u8; 1].as_slice()).is_err());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_serde() -> Result<(), ErrBox> {
|
||||
let attestations: Vec<_> = (0..65535)
|
||||
.map(|i| mock_attestation(Some([(i % 256) as u8; 32]), None))
|
||||
.collect();
|
||||
|
||||
let serialized = batch_serialize(attestations.iter())?;
|
||||
|
||||
let deserialized = batch_deserialize(serialized.as_slice())?;
|
||||
|
||||
assert_eq!(attestations, deserialized);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_batch_serde_wrong_size() -> Result<(), ErrBox> {
|
||||
assert!(batch_deserialize(&[][..]).is_err());
|
||||
assert!(batch_deserialize(vec![0u8; 1].as_slice()).is_err());
|
||||
|
||||
let attestations: Vec<_> = (0..20)
|
||||
.map(|i| mock_attestation(Some([(i % 256) as u8; 32]), None))
|
||||
.collect();
|
||||
|
||||
let serialized = batch_serialize(attestations.iter())?;
|
||||
|
||||
// Missing last byte in last attestation must be an error
|
||||
let len = serialized.len();
|
||||
assert!(batch_deserialize(&serialized.as_slice()[..len - 1]).is_err());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,176 +0,0 @@
|
|||
//! This module contains 1:1 (or close) copies of selected Pyth types
|
||||
//! with quick and dirty enhancements.
|
||||
|
||||
use std::{
|
||||
convert::TryInto,
|
||||
io::Read,
|
||||
mem,
|
||||
};
|
||||
|
||||
use pyth_client::{
|
||||
CorpAction,
|
||||
Ema,
|
||||
PriceStatus,
|
||||
PriceType,
|
||||
};
|
||||
use solitaire::ErrBox;
|
||||
|
||||
/// 1:1 Copy of pyth_client::PriceType with derived additional traits.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
#[cfg_attr(
|
||||
feature = "wasm",
|
||||
derive(serde_derive::Serialize, serde_derive::Deserialize)
|
||||
)]
|
||||
#[repr(u8)]
|
||||
pub enum P2WPriceType {
|
||||
Unknown,
|
||||
Price,
|
||||
}
|
||||
|
||||
impl From<&PriceType> for P2WPriceType {
|
||||
fn from(pt: &PriceType) -> Self {
|
||||
match pt {
|
||||
PriceType::Unknown => Self::Unknown,
|
||||
PriceType::Price => Self::Price,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for P2WPriceType {
|
||||
fn default() -> Self {
|
||||
Self::Price
|
||||
}
|
||||
}
|
||||
|
||||
/// 1:1 Copy of pyth_client::PriceStatus with derived additional traits.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
#[cfg_attr(
|
||||
feature = "wasm",
|
||||
derive(serde_derive::Serialize, serde_derive::Deserialize)
|
||||
)]
|
||||
pub enum P2WPriceStatus {
|
||||
Unknown,
|
||||
Trading,
|
||||
Halted,
|
||||
Auction,
|
||||
}
|
||||
|
||||
impl From<&PriceStatus> for P2WPriceStatus {
|
||||
fn from(ps: &PriceStatus) -> Self {
|
||||
match ps {
|
||||
PriceStatus::Unknown => Self::Unknown,
|
||||
PriceStatus::Trading => Self::Trading,
|
||||
PriceStatus::Halted => Self::Halted,
|
||||
PriceStatus::Auction => Self::Auction,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for P2WPriceStatus {
|
||||
fn default() -> Self {
|
||||
Self::Trading
|
||||
}
|
||||
}
|
||||
|
||||
/// 1:1 Copy of pyth_client::CorpAction with derived additional traits.
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
#[cfg_attr(
|
||||
feature = "wasm",
|
||||
derive(serde_derive::Serialize, serde_derive::Deserialize)
|
||||
)]
|
||||
pub enum P2WCorpAction {
|
||||
NoCorpAct,
|
||||
}
|
||||
|
||||
impl Default for P2WCorpAction {
|
||||
fn default() -> Self {
|
||||
Self::NoCorpAct
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&CorpAction> for P2WCorpAction {
|
||||
fn from(ca: &CorpAction) -> Self {
|
||||
match ca {
|
||||
CorpAction::NoCorpAct => P2WCorpAction::NoCorpAct,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// 1:1 Copy of pyth_client::Ema with all-pub fields.
|
||||
#[derive(Clone, Default, Debug, Eq, PartialEq)]
|
||||
#[cfg_attr(
|
||||
feature = "wasm",
|
||||
derive(serde_derive::Serialize, serde_derive::Deserialize)
|
||||
)]
|
||||
#[repr(C)]
|
||||
pub struct P2WEma {
|
||||
pub val: i64,
|
||||
pub numer: i64,
|
||||
pub denom: i64,
|
||||
}
|
||||
|
||||
/// CAUTION: This impl may panic and requires an unsafe cast
|
||||
impl From<&Ema> for P2WEma {
|
||||
fn from(ema: &Ema) -> Self {
|
||||
let our_size = mem::size_of::<P2WEma>();
|
||||
let upstream_size = mem::size_of::<Ema>();
|
||||
if our_size == upstream_size {
|
||||
unsafe { std::mem::transmute_copy(ema) }
|
||||
} else {
|
||||
dbg!(our_size);
|
||||
dbg!(upstream_size);
|
||||
// Because of private upstream fields it's impossible to
|
||||
// complain about type-level changes at compile-time
|
||||
panic!("P2WEma sizeof mismatch")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// CAUTION: This impl may panic and requires an unsafe cast
|
||||
impl Into<Ema> for &P2WEma {
|
||||
fn into(self) -> Ema {
|
||||
let our_size = mem::size_of::<P2WEma>();
|
||||
let upstream_size = mem::size_of::<Ema>();
|
||||
if our_size == upstream_size {
|
||||
unsafe { std::mem::transmute_copy(self) }
|
||||
} else {
|
||||
dbg!(our_size);
|
||||
dbg!(upstream_size);
|
||||
// Because of private upstream fields it's impossible to
|
||||
// complain about type-level changes at compile-time
|
||||
panic!("P2WEma sizeof mismatch")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl P2WEma {
|
||||
pub fn serialize(&self) -> Vec<u8> {
|
||||
let mut v = vec![];
|
||||
// val
|
||||
v.extend(&self.val.to_be_bytes()[..]);
|
||||
|
||||
// numer
|
||||
v.extend(&self.numer.to_be_bytes()[..]);
|
||||
|
||||
// denom
|
||||
v.extend(&self.denom.to_be_bytes()[..]);
|
||||
|
||||
v
|
||||
}
|
||||
|
||||
pub fn deserialize(mut bytes: impl Read) -> Result<Self, ErrBox> {
|
||||
let mut val_vec = vec![0u8; mem::size_of::<i64>()];
|
||||
bytes.read_exact(val_vec.as_mut_slice())?;
|
||||
let val = i64::from_be_bytes(val_vec.as_slice().try_into()?);
|
||||
|
||||
let mut numer_vec = vec![0u8; mem::size_of::<i64>()];
|
||||
bytes.read_exact(numer_vec.as_mut_slice())?;
|
||||
let numer = i64::from_be_bytes(numer_vec.as_slice().try_into()?);
|
||||
|
||||
let mut denom_vec = vec![0u8; mem::size_of::<i64>()];
|
||||
bytes.read_exact(denom_vec.as_mut_slice())?;
|
||||
let denom = i64::from_be_bytes(denom_vec.as_slice().try_into()?);
|
||||
|
||||
Ok(Self { val, numer, denom })
|
||||
}
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
use solana_program::pubkey::Pubkey;
|
||||
use solitaire::Seeded;
|
||||
use wasm_bindgen::prelude::*;
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::{
|
||||
attest::P2WEmitter,
|
||||
types,
|
||||
};
|
||||
|
||||
#[wasm_bindgen]
|
||||
pub fn get_emitter_address(program_id: String) -> Vec<u8> {
|
||||
let program_id = Pubkey::from_str(program_id.as_str()).unwrap();
|
||||
let emitter = P2WEmitter::key(None, &program_id);
|
||||
|
||||
emitter.to_bytes().to_vec()
|
||||
}
|
||||
|
||||
#[wasm_bindgen]
|
||||
pub fn parse_attestation(bytes: Vec<u8>) -> JsValue {
|
||||
let a = types::PriceAttestation::deserialize(bytes.as_slice()).unwrap();
|
||||
|
||||
JsValue::from_serde(&a).unwrap()
|
||||
}
|
||||
|
||||
#[wasm_bindgen]
|
||||
pub fn parse_batch_attestation(bytes: Vec<u8>) -> JsValue {
|
||||
let a = types::batch_deserialize(bytes.as_slice()).unwrap();
|
||||
|
||||
JsValue::from_serde(&a).unwrap()
|
||||
}
|
|
@ -4,6 +4,7 @@ RUN apt-get install -y python3
|
|||
|
||||
ADD third_party/pyth/pyth_utils.py /usr/src/pyth/pyth_utils.py
|
||||
ADD third_party/pyth/p2w_autoattest.py /usr/src/pyth/p2w_autoattest.py
|
||||
ADD third_party/pyth/p2w-sdk/rust /usr/src/third_party/pyth/p2w-sdk/rust
|
||||
|
||||
RUN --mount=type=cache,target=/root/.cache \
|
||||
--mount=type=cache,target=target \
|
||||
|
|
|
@ -146,7 +146,7 @@ async function readinessProbeRoutine(port: number) {
|
|||
|
||||
let parsedAttestations = await parseBatchAttestation(parsedVaa.payload);
|
||||
|
||||
console.log(`[seqno ${poolEntry}] Parsed ${parsedAttestations.length} price attestations:\n`, parsedAttestations);
|
||||
console.log(`[seqno ${poolEntry}] Parsed ${parsedAttestations.price_attestations.length} price attestations:\n`, parsedAttestations);
|
||||
|
||||
// try {
|
||||
// let tx = await p2w_eth.attestPrice(vaaResponse.vaaBytes, {gasLimit: 1000000});
|
||||
|
|
|
@ -8,7 +8,7 @@ var P2W_INSTANCE: any = undefined;
|
|||
export async function p2w_core(): Promise<any> {
|
||||
// Only import once if P2W wasm is needed
|
||||
if (!P2W_INSTANCE) {
|
||||
P2W_INSTANCE = await import("./solana/p2w-core/pyth2wormhole");
|
||||
P2W_INSTANCE = await import("./solana/p2w-core/p2w_sdk");
|
||||
}
|
||||
return P2W_INSTANCE;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
# DevNet:
|
||||
SPY_SERVICE_HOST=0.0.0.0:7072
|
||||
SPY_SERVICE_FILTERS=[{"chain_id":1,"emitter_address":"71f8dcb863d176e2c420ad6610cf687359612b6fb392e0642b0ca6b1f186aa3b"}]
|
||||
TERRA_NODE_URL=http://localhost:1317
|
||||
TERRA_PRIVATE_KEY=notice oak worry limit wrap speak medal online prefer cluster roof addict wrist behave treat actual wasp year salad speed social layer crew genius
|
||||
TERRA_PYTH_CONTRACT_ADDRESS=terra1wgh6adn8geywx0v78zs9azrqtqdegufuegnwep
|
||||
TERRA_CHAIN_ID=columbus-5
|
||||
TERRA_NAME=localterra
|
||||
TERRA_COIN=uluna
|
||||
|
||||
# TestNet:
|
||||
#SPY_SERVICE_HOST=0.0.0.0:7073
|
||||
#SPY_SERVICE_FILTERS=[{"chain_id":1,"emitter_address":"3afda841c1f43dd7d546c8a581ba1f92a139f4133f9f6ab095558f6a359df5d4"}]
|
||||
#TERRA_NODE_URL=https://bombay-lcd.terra.dev
|
||||
#TERRA_PRIVATE_KEY=your key here
|
||||
#TERRA_PYTH_CONTRACT_ADDRESS=terra1wjkzgcrg3a2jh2cyc5lekvtjydf600splmvdk4
|
||||
#TERRA_CHAIN_ID=bombay-12
|
||||
#TERRA_NAME=testnet
|
||||
#TERRA_COIN=uluna
|
||||
|
||||
# MainNet:
|
||||
#SPY_SERVICE_HOST=0.0.0.0:7074
|
||||
#SPY_SERVICE_FILTERS=[{"chain_id":1,"emitter_address":"b2dd468c9b8c80b3dd9211e9e3fd6ee4d652eb5997b7c9020feae971c278ab07"}]
|
||||
#TERRA_NODE_URL=https://lcd.terra.dev
|
||||
#TERRA_PRIVATE_KEY=your key here
|
||||
#TERRA_PYTH_CONTRACT_ADDRESS=fill_this_in
|
||||
#TERRA_CHAIN_ID=columbus-5
|
||||
#TERRA_NAME=mainnet
|
||||
#TERRA_COIN=uluna
|
||||
|
||||
REST_PORT=4200
|
||||
PROM_PORT=8081
|
||||
BAL_QUERY_INTERVAL=60000
|
||||
#READINESS_PORT=2000
|
||||
|
||||
RETRY_MAX_ATTEMPTS=4
|
||||
RETRY_DELAY_IN_MS=250
|
||||
MAX_MSGS_PER_BATCH=1
|
||||
|
||||
# The default is to log the console with level info.
|
||||
LOG_DIR=/var/pyth_relay/logs
|
||||
#LOG_LEVEL=debug
|
|
@ -0,0 +1 @@
|
|||
/lib
|
|
@ -0,0 +1,95 @@
|
|||
# Overview
|
||||
|
||||
The pyth_relay program is designed to listen to Pyth messages published on Solana and relay them to other chains.
|
||||
Although in its initial release, the only supported destination chain is Terra, the design supports publishing to multiple chains.
|
||||
|
||||
<p>
|
||||
The relayer listens to the spy_guardian for signed VAA messages. It can be configured to only request specific emitters, so that only Pyth messages get forwarded.
|
||||
<p>
|
||||
When the relayer receives messages from the spy, it drops redundant messages based on sequence numbers, verifies the message is a Pyth message, and relays the pyth
|
||||
messages to Terra.
|
||||
|
||||
# Operational Details
|
||||
|
||||
The relayer can be run as a docker image. Additionally, you need to have an instance of the spy guardian running, which can be started using a docker image.
|
||||
|
||||
<p>
|
||||
The relayer is configured using an env file, as specified by the PYTH_RELAY_CONFIG environment variable. Please see the env.samples file in the source directory for
|
||||
valid variables.
|
||||
<p>
|
||||
The relayer can be configured to log to a file in the directory specified by the LOG_DIR environment variable. If the variable is not specified, it logs to the console.
|
||||
<p>
|
||||
The log level can be controlled by the LOG_LEVEL environment variable, where info is the default. The valid values are debug, info, warn, and error.
|
||||
|
||||
# External Dependencies
|
||||
|
||||
The relayer connects to Terra, so it therefore has the following dependencies
|
||||
|
||||
1. A Pyth to Wormhole publisher
|
||||
2. A highly reliable connection to a local Terra node via Wormhole
|
||||
3. A unique Terra Wallet per instance of pyth_relayer
|
||||
4. A Wormhole spy guardian process running that the pyth_relayer can subscribe to for Pyth messages
|
||||
|
||||
Note that for performance reasons, pyth_relayer manages the Terra wallet sequence number locally. If it does not do so, it will get wallet sequence number errors if it publishes faster than the Terra node can handle it. For this to work, the relayer should be connected to a local Terra node, to minimize the possible paths the published message could take, and maintain sequence number ordering.
|
||||
|
||||
# High Availability
|
||||
|
||||
If high availability is a goal, then two completely seperate instances of pyth_relay should be run. They should run on completely separate hardware, using separate Terra connections and wallets. Additionally, they should connect to separate instances of the spy_guardian. They will both be publishing messages to the Pyth Terra contract, which will simply drop the duplicates.
|
||||
|
||||
# Design Details
|
||||
|
||||
The relayer code is divided into separate source files, based on functionality. The main entry point is index.ts. It invokes code in the other files as follows.
|
||||
|
||||
## listener.ts
|
||||
|
||||
The listener code parses the emitter filter parameter, which may consist of none, one or more chain / emitter pairs. If any filters are specified, then only VAAs from those emitters will be processed. The listener then registers those emitters with the spy guardian via RPC callback.
|
||||
|
||||
<p>
|
||||
When the listener receives a VAA from the spy, it verifies that it has not already been seen, based on the sequence number. This is necessary since there are multiple guardians signing and publishing the same VAAs. It then validates that it is a Pyth message. All Pyth payloads start with P2WH. If so, it invokes the postEvent method on the worker to forward the VAA for publishing.
|
||||
|
||||
## worker.ts
|
||||
|
||||
The worker code is responsible for taking VAAs to be published from the listener and passing them to the relay code for relaying to Terra.
|
||||
|
||||
<p>
|
||||
The worker uses a map of pending events, and a condition variable to signal that there are events waiting to be published, and a map of the latest state of each Pyth price.
|
||||
The worker protects all of these objects with a mutex.
|
||||
<p>
|
||||
The worker maintains performance metrics to be published by the Prometeus interface.
|
||||
<p>
|
||||
The worker also provides methods to query the status of the wallet being used for relaying, the current status of all maintained prices, and can query Terra for the current
|
||||
data for a given price. These are used by the REST interface, if it is enabled in the config.
|
||||
<p>
|
||||
In most cases, if a Terra transaction fails, the worker will retry up to a configurable number of times, with a configurable delay between each time. For each successive retry of a given message, they delay is increased by the retry attempt number (delay * attempt).
|
||||
|
||||
## main.ts and terra.ts
|
||||
|
||||
This is the code that actually communicates with the Terra block chain. It takes configuration data from the env file, and provides methods to relay a Pyth message, query the wallet balance, and query the current data for a given price.
|
||||
|
||||
## promHelper.ts
|
||||
|
||||
Prometheus is being used as a framework for storing metrics. Currently, the following metrics are being collected:
|
||||
|
||||
- The last sequence number sent
|
||||
- The total number of successful relays
|
||||
- The total number of failed relays
|
||||
- A histogram of transfer times
|
||||
- The current wallet balance
|
||||
- The total number of VAAs received by the listener
|
||||
- The total number of VAAs already executed on Terra
|
||||
- The total number of Terra transaction timeouts
|
||||
- The total number of Terra sequence number errors
|
||||
- The total number of Terra retry attempts
|
||||
- The total number of retry limit exceeded errors
|
||||
- The total number of transactions failed due to insufficient funds
|
||||
|
||||
All the above metrics can be viewed at http://localhost:8081/metrics
|
||||
|
||||
<p>
|
||||
The port 8081 is the default. The port can be specified by the `PROM_PORT` tunable in the env file.
|
||||
<p>
|
||||
This file contains a class named `PromHelper`. It is an encapsulation of the Prometheus API.
|
||||
|
||||
## helpers.ts
|
||||
|
||||
This contains an assortment of helper functions and objects used by the other code, including logger initialization and parsing of Pyth messages.
|
|
@ -0,0 +1,15 @@
|
|||
FROM node:16-alpine
|
||||
|
||||
WORKDIR /app/pyth_relay
|
||||
COPY . .
|
||||
RUN npm install && npm run build && npm cache clean --force
|
||||
|
||||
# If you are building for production
|
||||
# RUN npm ci --only=production
|
||||
|
||||
RUN mkdir -p /app/pyth_relay/logs
|
||||
RUN addgroup -S pyth -g 10001 && adduser -S pyth -G pyth -u 10001
|
||||
RUN chown -R pyth:pyth src/ logs/ lib/
|
||||
USER pyth
|
||||
|
||||
CMD [ "node", "lib/index.js" ]
|
|
@ -0,0 +1,30 @@
|
|||
FROM docker.io/golang:1.17.0-alpine as builder
|
||||
|
||||
RUN apk add --no-cache git gcc linux-headers alpine-sdk bash
|
||||
|
||||
WORKDIR /app
|
||||
RUN git clone https://github.com/certusone/wormhole.git
|
||||
|
||||
WORKDIR /app/wormhole/tools
|
||||
RUN CGO_ENABLED=0 ./build.sh
|
||||
|
||||
WORKDIR /app/wormhole
|
||||
RUN tools/bin/buf lint && tools/bin/buf generate
|
||||
|
||||
WORKDIR /app/wormhole/node/tools
|
||||
RUN go build -mod=readonly -o /dlv github.com/go-delve/delve/cmd/dlv
|
||||
|
||||
WORKDIR /app/wormhole/node
|
||||
RUN go build -race -gcflags="all=-N -l" -mod=readonly -o /guardiand github.com/certusone/wormhole/node
|
||||
|
||||
FROM docker.io/golang:1.17.0-alpine
|
||||
|
||||
WORKDIR /app
|
||||
COPY --from=builder /guardiand /app/guardiand
|
||||
|
||||
ENV PATH="/app:${PATH}"
|
||||
RUN addgroup -S pyth -g 10001 && adduser -S pyth -G pyth -u 10001
|
||||
RUN chown -R pyth:pyth .
|
||||
USER pyth
|
||||
|
||||
ENTRYPOINT [ "guardiand", "spy", "--nodeKey", "/tmp/node.key" ]
|
|
@ -0,0 +1,46 @@
|
|||
# Setup Spy Guardian and Pyth Relay
|
||||
|
||||
To build the spy_guardian docker container:
|
||||
|
||||
```
|
||||
$ docker build -f Dockerfile.spy_guardian -t spy_guardian .
|
||||
```
|
||||
|
||||
To build the pyth_relay docker container:
|
||||
|
||||
```
|
||||
$ docker build -f Dockerfile.pyth_relay -t pyth_relay .
|
||||
```
|
||||
|
||||
Run the spy_guardian docker container in TestNet:
|
||||
|
||||
```
|
||||
$ docker run --platform linux/amd64 -d --network=host spy_guardian \
|
||||
--bootstrap /dns4/wormhole-testnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWBY9ty9CXLBXGQzMuqkziLntsVcyz4pk1zWaJRvJn6Mmt \
|
||||
--network /wormhole/testnet/2/1 \
|
||||
--spyRPC "[::]:7073"
|
||||
```
|
||||
|
||||
Or run the spy_guardian docker container in MainNet:
|
||||
For the MainNet gossip network parameters, see https://github.com/certusone/wormhole-networks/blob/master/mainnetv2/info.md
|
||||
|
||||
```
|
||||
$ docker run --platform linux/amd64 -d --network=host spy_guardian \
|
||||
--bootstrap <guardianNetworkBootstrapParameterForMainNet> \
|
||||
--network <guardianNetworkPathForMainNet> \
|
||||
--spyRPC "[::]:7073"
|
||||
|
||||
```
|
||||
|
||||
Then to run the pyth_relay docker container using a config file called
|
||||
${HOME}/pyth_relay/env and logging to directory ${HOME}/pyth_relay/logs, do the
|
||||
following:
|
||||
|
||||
```
|
||||
$ docker run \
|
||||
--volume=${HOME}/pyth_relay:/var/pyth_relay \
|
||||
-e PYTH_RELAY_CONFIG=/var/pyth_relay/env \
|
||||
--network=host \
|
||||
-d \
|
||||
pyth_relay
|
||||
```
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,49 @@
|
|||
{
|
||||
"name": "pyth_relay",
|
||||
"version": "1.0.0",
|
||||
"description": "Pyth relayer",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
"build": "tsc",
|
||||
"start": "node lib/index.js",
|
||||
"listen_only": "node lib/index.js --listen_only"
|
||||
},
|
||||
"author": "",
|
||||
"license": "Apache-2.0",
|
||||
"devDependencies": {
|
||||
"@improbable-eng/grpc-web-node-http-transport": "^0.15.0",
|
||||
"@types/jest": "^27.0.2",
|
||||
"@types/long": "^4.0.1",
|
||||
"@types/node": "^16.6.1",
|
||||
"axios": "^0.24.0",
|
||||
"esm": "^3.2.25",
|
||||
"ethers": "^5.4.4",
|
||||
"jest": "^27.3.1",
|
||||
"prettier": "^2.3.2",
|
||||
"ts-jest": "^27.0.7",
|
||||
"tslint": "^6.1.3",
|
||||
"tslint-config-prettier": "^1.18.0",
|
||||
"typescript": "^4.3.5"
|
||||
},
|
||||
"dependencies": {
|
||||
"@certusone/wormhole-sdk": "^0.1.4",
|
||||
"@certusone/wormhole-spydk": "^0.0.1",
|
||||
"@solana/spl-token": "^0.1.8",
|
||||
"@solana/web3.js": "^1.24.0",
|
||||
"@terra-money/terra.js": "^3.0.4",
|
||||
"@types/express": "^4.17.13",
|
||||
"async-mutex": "^0.3.2",
|
||||
"body-parser": "^1.19.0",
|
||||
"condition-variable": "^1.0.0",
|
||||
"cors": "^2.8.5",
|
||||
"dotenv": "^10.0.0",
|
||||
"express": "^4.17.2",
|
||||
"prom-client": "^14.0.1",
|
||||
"redis": "^4.0.0",
|
||||
"winston": "^3.3.3"
|
||||
},
|
||||
"directories": {
|
||||
"lib": "lib"
|
||||
},
|
||||
"keywords": []
|
||||
}
|
|
@ -0,0 +1,175 @@
|
|||
////////////////////////////////// Start of Logger Stuff //////////////////////////////////////
|
||||
|
||||
export let logger: any;
|
||||
|
||||
export function initLogger() {
|
||||
const winston = require("winston");
|
||||
|
||||
let useConsole: boolean = true;
|
||||
let logFileName: string = "";
|
||||
if (process.env.LOG_DIR) {
|
||||
useConsole = false;
|
||||
logFileName =
|
||||
process.env.LOG_DIR + "/pyth_relay." + new Date().toISOString() + ".log";
|
||||
}
|
||||
|
||||
let logLevel = "info";
|
||||
if (process.env.LOG_LEVEL) {
|
||||
logLevel = process.env.LOG_LEVEL;
|
||||
}
|
||||
|
||||
let transport: any;
|
||||
if (useConsole) {
|
||||
console.log("pyth_relay is logging to the console at level [%s]", logLevel);
|
||||
|
||||
transport = new winston.transports.Console({
|
||||
level: logLevel,
|
||||
});
|
||||
} else {
|
||||
console.log(
|
||||
"pyth_relay is logging to [%s] at level [%s]",
|
||||
logFileName,
|
||||
logLevel
|
||||
);
|
||||
|
||||
transport = new winston.transports.File({
|
||||
filename: logFileName,
|
||||
level: logLevel,
|
||||
});
|
||||
}
|
||||
|
||||
const logConfiguration = {
|
||||
transports: [transport],
|
||||
format: winston.format.combine(
|
||||
winston.format.splat(),
|
||||
winston.format.simple(),
|
||||
winston.format.timestamp({
|
||||
format: "YYYY-MM-DD HH:mm:ss.SSS",
|
||||
}),
|
||||
winston.format.printf(
|
||||
(info: any) => `${[info.timestamp]}|${info.level}|${info.message}`
|
||||
)
|
||||
),
|
||||
};
|
||||
|
||||
logger = winston.createLogger(logConfiguration);
|
||||
}
|
||||
|
||||
////////////////////////////////// Start of PYTH Stuff //////////////////////////////////////
|
||||
|
||||
/*
|
||||
// Pyth PriceAttestation messages are defined in wormhole/ethereum/contracts/pyth/PythStructs.sol
|
||||
// The Pyth smart contract stuff is in terra/contracts/pyth-bridge
|
||||
|
||||
struct Ema {
|
||||
int64 value;
|
||||
int64 numerator;
|
||||
int64 denominator;
|
||||
}
|
||||
|
||||
struct PriceAttestation {
|
||||
uint32 magic; // constant "P2WH"
|
||||
uint16 version;
|
||||
|
||||
// PayloadID uint8 = 1
|
||||
uint8 payloadId;
|
||||
|
||||
bytes32 productId;
|
||||
bytes32 priceId;
|
||||
|
||||
uint8 priceType;
|
||||
|
||||
int64 price;
|
||||
int32 exponent;
|
||||
|
||||
Ema twap;
|
||||
Ema twac;
|
||||
|
||||
uint64 confidenceInterval;
|
||||
|
||||
uint8 status;
|
||||
uint8 corpAct;
|
||||
|
||||
uint64 timestamp;
|
||||
}
|
||||
|
||||
0 uint32 magic // constant "P2WH"
|
||||
4 u16 version
|
||||
6 u8 payloadId // 1
|
||||
7 [u8; 32] productId
|
||||
39 [u8; 32] priceId
|
||||
71 u8 priceType
|
||||
72 i64 price
|
||||
80 i32 exponent
|
||||
84 PythEma twap
|
||||
108 PythEma twac
|
||||
132 u64 confidenceInterval
|
||||
140 u8 status
|
||||
141 u8 corpAct
|
||||
142 u64 timestamp
|
||||
|
||||
*/
|
||||
|
||||
export const PYTH_PRICE_ATTESTATION_LENGTH: number = 150;
|
||||
|
||||
export type PythEma = {
|
||||
value: BigInt;
|
||||
numerator: BigInt;
|
||||
denominator: BigInt;
|
||||
};
|
||||
|
||||
export type PythPriceAttestation = {
|
||||
magic: number;
|
||||
version: number;
|
||||
payloadId: number;
|
||||
productId: string;
|
||||
priceId: string;
|
||||
priceType: number;
|
||||
price: BigInt;
|
||||
exponent: number;
|
||||
twap: PythEma;
|
||||
twac: PythEma;
|
||||
confidenceInterval: BigInt;
|
||||
status: number;
|
||||
corpAct: number;
|
||||
timestamp: BigInt;
|
||||
};
|
||||
|
||||
export const PYTH_MAGIC: number = 0x50325748;
|
||||
|
||||
export function parsePythPriceAttestation(arr: Buffer): PythPriceAttestation {
|
||||
return {
|
||||
magic: arr.readUInt32BE(0),
|
||||
version: arr.readUInt16BE(4),
|
||||
payloadId: arr[6],
|
||||
productId: arr.slice(7, 7 + 32).toString("hex"),
|
||||
priceId: arr.slice(39, 39 + 32).toString("hex"),
|
||||
priceType: arr[71],
|
||||
price: arr.readBigInt64BE(72),
|
||||
exponent: arr.readInt32BE(80),
|
||||
twap: {
|
||||
value: arr.readBigInt64BE(84),
|
||||
numerator: arr.readBigInt64BE(92),
|
||||
denominator: arr.readBigInt64BE(100),
|
||||
},
|
||||
twac: {
|
||||
value: arr.readBigInt64BE(108),
|
||||
numerator: arr.readBigInt64BE(116),
|
||||
denominator: arr.readBigInt64BE(124),
|
||||
},
|
||||
confidenceInterval: arr.readBigUInt64BE(132),
|
||||
status: arr.readUInt32BE(140),
|
||||
corpAct: arr.readUInt32BE(141),
|
||||
timestamp: arr.readBigUInt64BE(142),
|
||||
};
|
||||
}
|
||||
|
||||
////////////////////////////////// Start of Other Helpful Stuff //////////////////////////////////////
|
||||
|
||||
export function sleep(ms: number) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
export function computePrice(rawPrice: BigInt, expo: number): number {
|
||||
return Number(rawPrice) * 10 ** expo;
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
import { setDefaultWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
|
||||
|
||||
import * as listen from "./listen";
|
||||
import * as worker from "./worker";
|
||||
import * as rest from "./rest";
|
||||
import * as helpers from "./helpers";
|
||||
import { logger } from "./helpers";
|
||||
import { PromHelper } from "./promHelpers";
|
||||
|
||||
let configFile: string = ".env";
|
||||
if (process.env.PYTH_RELAY_CONFIG) {
|
||||
configFile = process.env.PYTH_RELAY_CONFIG;
|
||||
}
|
||||
|
||||
console.log("Loading config file [%s]", configFile);
|
||||
require("dotenv").config({ path: configFile });
|
||||
|
||||
setDefaultWasm("node");
|
||||
|
||||
// Set up the logger.
|
||||
helpers.initLogger();
|
||||
|
||||
let error: boolean = false;
|
||||
let listenOnly: boolean = false;
|
||||
for (let idx = 0; idx < process.argv.length; ++idx) {
|
||||
if (process.argv[idx] === "--listen_only") {
|
||||
logger.info("running in listen only mode, will not relay anything!");
|
||||
listenOnly = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
!error &&
|
||||
listen.init(listenOnly) &&
|
||||
worker.init(!listenOnly) &&
|
||||
rest.init(!listenOnly)
|
||||
) {
|
||||
// Start the Prometheus client with the app name and http port
|
||||
let promPort = 8081;
|
||||
if (process.env.PROM_PORT) {
|
||||
promPort = parseInt(process.env.PROM_PORT);
|
||||
}
|
||||
logger.info("prometheus client listening on port " + promPort);
|
||||
const promClient = new PromHelper("pyth_relay", promPort);
|
||||
|
||||
listen.run(promClient);
|
||||
if (!listenOnly) {
|
||||
worker.run(promClient);
|
||||
rest.run();
|
||||
}
|
||||
|
||||
if (process.env.READINESS_PORT) {
|
||||
const readinessPort: number = parseInt(process.env.READINESS_PORT);
|
||||
const Net = require("net");
|
||||
const readinessServer = new Net.Server();
|
||||
readinessServer.listen(readinessPort, function () {
|
||||
logger.info("listening for readiness requests on port " + readinessPort);
|
||||
});
|
||||
|
||||
readinessServer.on("connection", function (socket: any) {
|
||||
//logger.debug("readiness connection");
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,247 @@
|
|||
import {
|
||||
ChainId,
|
||||
CHAIN_ID_SOLANA,
|
||||
CHAIN_ID_TERRA,
|
||||
hexToUint8Array,
|
||||
uint8ArrayToHex,
|
||||
getEmitterAddressEth,
|
||||
getEmitterAddressSolana,
|
||||
getEmitterAddressTerra,
|
||||
} from "@certusone/wormhole-sdk";
|
||||
|
||||
import {
|
||||
createSpyRPCServiceClient,
|
||||
subscribeSignedVAA,
|
||||
} from "@certusone/wormhole-spydk";
|
||||
|
||||
import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
|
||||
|
||||
import * as helpers from "./helpers";
|
||||
import { logger } from "./helpers";
|
||||
import { postEvent } from "./worker";
|
||||
import { PromHelper } from "./promHelpers";
|
||||
|
||||
let seqMap = new Map<string, number>();
|
||||
let listenOnly: boolean = false;
|
||||
let metrics: PromHelper;
|
||||
|
||||
export function init(lo: boolean): boolean {
|
||||
listenOnly = lo;
|
||||
|
||||
if (!process.env.SPY_SERVICE_HOST) {
|
||||
logger.error("Missing environment variable SPY_SERVICE_HOST");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
export async function run(pm: PromHelper) {
|
||||
metrics = pm;
|
||||
logger.info(
|
||||
"pyth_relay starting up, will listen for signed VAAs from [" +
|
||||
process.env.SPY_SERVICE_HOST +
|
||||
"]"
|
||||
);
|
||||
|
||||
(async () => {
|
||||
let filter = {};
|
||||
if (process.env.SPY_SERVICE_FILTERS) {
|
||||
const parsedJsonFilters = eval(process.env.SPY_SERVICE_FILTERS);
|
||||
|
||||
let myFilters = [];
|
||||
for (let i = 0; i < parsedJsonFilters.length; i++) {
|
||||
let myChainId = parseInt(parsedJsonFilters[i].chain_id) as ChainId;
|
||||
let myEmitterAddress = parsedJsonFilters[i].emitter_address;
|
||||
// let myEmitterAddress = await encodeEmitterAddress(
|
||||
// myChainId,
|
||||
// parsedJsonFilters[i].emitter_address
|
||||
// );
|
||||
let myEmitterFilter = {
|
||||
emitterFilter: {
|
||||
chainId: myChainId,
|
||||
emitterAddress: myEmitterAddress,
|
||||
},
|
||||
};
|
||||
logger.info(
|
||||
"adding filter: chainId: [" +
|
||||
myEmitterFilter.emitterFilter.chainId +
|
||||
"], emitterAddress: [" +
|
||||
myEmitterFilter.emitterFilter.emitterAddress +
|
||||
"]"
|
||||
);
|
||||
myFilters.push(myEmitterFilter);
|
||||
}
|
||||
|
||||
logger.info("setting " + myFilters.length + " filters");
|
||||
filter = {
|
||||
filters: myFilters,
|
||||
};
|
||||
} else {
|
||||
logger.info("processing all signed VAAs");
|
||||
}
|
||||
|
||||
while (true) {
|
||||
let stream: any;
|
||||
try {
|
||||
const client = createSpyRPCServiceClient(
|
||||
process.env.SPY_SERVICE_HOST || ""
|
||||
);
|
||||
stream = await subscribeSignedVAA(client, filter);
|
||||
|
||||
stream.on("data", ({ vaaBytes }: { vaaBytes: string }) => {
|
||||
processVaa(vaaBytes);
|
||||
});
|
||||
|
||||
let connected = true;
|
||||
stream.on("error", (err: any) => {
|
||||
logger.error("spy service returned an error: %o", err);
|
||||
connected = false;
|
||||
});
|
||||
|
||||
stream.on("close", () => {
|
||||
logger.error("spy service closed the connection!");
|
||||
connected = false;
|
||||
});
|
||||
|
||||
logger.info("connected to spy service, listening for messages");
|
||||
|
||||
while (connected) {
|
||||
await helpers.sleep(1000);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error("spy service threw an exception: %o", e);
|
||||
}
|
||||
|
||||
stream.end;
|
||||
await helpers.sleep(5 * 1000);
|
||||
logger.info("attempting to reconnect to the spy service");
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
async function encodeEmitterAddress(
|
||||
myChainId: ChainId,
|
||||
emitterAddressStr: string
|
||||
): Promise<string> {
|
||||
if (myChainId === CHAIN_ID_SOLANA) {
|
||||
return await getEmitterAddressSolana(emitterAddressStr);
|
||||
}
|
||||
|
||||
if (myChainId === CHAIN_ID_TERRA) {
|
||||
return await getEmitterAddressTerra(emitterAddressStr);
|
||||
}
|
||||
|
||||
return getEmitterAddressEth(emitterAddressStr);
|
||||
}
|
||||
|
||||
async function processVaa(vaaBytes: string) {
|
||||
let receiveTime = new Date();
|
||||
const { parse_vaa } = await importCoreWasm();
|
||||
const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes));
|
||||
// logger.debug(
|
||||
// "processVaa, vaa len: " +
|
||||
// vaaBytes.length +
|
||||
// ", payload len: " +
|
||||
// parsedVAA.payload.length
|
||||
// );
|
||||
|
||||
// logger.debug("listen:processVaa: parsedVAA: %o", parsedVAA);
|
||||
|
||||
if (isPyth(parsedVAA.payload)) {
|
||||
if (parsedVAA.payload.length < helpers.PYTH_PRICE_ATTESTATION_LENGTH) {
|
||||
logger.error(
|
||||
"dropping vaa because the payload length is wrong: length: " +
|
||||
parsedVAA.payload.length +
|
||||
", expected length:",
|
||||
helpers.PYTH_PRICE_ATTESTATION_LENGTH + ", vaa: %o",
|
||||
parsedVAA
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
let pa = helpers.parsePythPriceAttestation(Buffer.from(parsedVAA.payload));
|
||||
// logger.debug("listen:processVaa: price attestation: %o", pa);
|
||||
|
||||
let key = pa.priceId;
|
||||
let lastSeqNum = seqMap.get(key);
|
||||
if (lastSeqNum) {
|
||||
if (lastSeqNum >= parsedVAA.sequence) {
|
||||
logger.debug(
|
||||
"ignoring duplicate: emitter: [" +
|
||||
parsedVAA.emitter_chain +
|
||||
":" +
|
||||
uint8ArrayToHex(parsedVAA.emitter_address) +
|
||||
"], productId: [" +
|
||||
pa.productId +
|
||||
"], priceId: [" +
|
||||
pa.priceId +
|
||||
"], seqNum: " +
|
||||
parsedVAA.sequence
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
seqMap.set(key, parsedVAA.sequence);
|
||||
|
||||
logger.info(
|
||||
"received: emitter: [" +
|
||||
parsedVAA.emitter_chain +
|
||||
":" +
|
||||
uint8ArrayToHex(parsedVAA.emitter_address) +
|
||||
"], seqNum: " +
|
||||
parsedVAA.sequence +
|
||||
", productId: [" +
|
||||
pa.productId +
|
||||
"], priceId: [" +
|
||||
pa.priceId +
|
||||
"], priceType: " +
|
||||
pa.priceType +
|
||||
", price: " +
|
||||
pa.price +
|
||||
", exponent: " +
|
||||
pa.exponent +
|
||||
", confidenceInterval: " +
|
||||
pa.confidenceInterval +
|
||||
", timeStamp: " +
|
||||
pa.timestamp +
|
||||
", computedPrice: " +
|
||||
helpers.computePrice(pa.price, pa.exponent) +
|
||||
" +/-" +
|
||||
helpers.computePrice(pa.confidenceInterval, pa.exponent)
|
||||
// +
|
||||
// ", payload: [" +
|
||||
// uint8ArrayToHex(parsedVAA.payload) +
|
||||
// "]"
|
||||
);
|
||||
|
||||
metrics.incIncoming();
|
||||
if (!listenOnly) {
|
||||
logger.debug("posting to worker");
|
||||
await postEvent(vaaBytes, pa, parsedVAA.sequence, receiveTime);
|
||||
}
|
||||
} else {
|
||||
logger.debug(
|
||||
"dropping non-pyth vaa, payload type " +
|
||||
parsedVAA.payload[0] +
|
||||
", vaa: %o",
|
||||
parsedVAA
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function isPyth(payload: Buffer): boolean {
|
||||
if (payload.length < 4) return false;
|
||||
if (
|
||||
payload[0] === 80 &&
|
||||
payload[1] === 50 &&
|
||||
payload[2] === 87 &&
|
||||
payload[3] === 72
|
||||
) {
|
||||
// P2WH
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
|
@ -0,0 +1,148 @@
|
|||
import http = require("http");
|
||||
import client = require("prom-client");
|
||||
|
||||
// NOTE: To create a new metric:
|
||||
// 1) Create a private counter/gauge with appropriate name and help
|
||||
// 2) Create a method to set the metric to a value
|
||||
// 3) Register the metric
|
||||
|
||||
export class PromHelper {
|
||||
private register = new client.Registry();
|
||||
private walletReg = new client.Registry();
|
||||
private collectDefaultMetrics = client.collectDefaultMetrics;
|
||||
|
||||
// Actual metrics
|
||||
private seqNumGauge = new client.Gauge({
|
||||
name: "seqNum",
|
||||
help: "Last sent sequence number",
|
||||
});
|
||||
private successCounter = new client.Counter({
|
||||
name: "successes",
|
||||
help: "number of successful relays",
|
||||
});
|
||||
private failureCounter = new client.Counter({
|
||||
name: "failures",
|
||||
help: "number of failed relays",
|
||||
});
|
||||
private completeTime = new client.Histogram({
|
||||
name: "complete_time",
|
||||
help: "Time is took to complete transfer",
|
||||
buckets: [400, 800, 1600, 3200, 6400, 12800],
|
||||
});
|
||||
private walletBalance = new client.Gauge({
|
||||
name: "wallet_balance",
|
||||
help: "The wallet balance",
|
||||
labelNames: ["timestamp"],
|
||||
registers: [this.walletReg],
|
||||
});
|
||||
private listenCounter = new client.Counter({
|
||||
name: "VAAs_received",
|
||||
help: "number of Pyth VAAs received",
|
||||
});
|
||||
private alreadyExecutedCounter = new client.Counter({
|
||||
name: "already_executed",
|
||||
help: "number of transfers rejected due to already having been executed",
|
||||
});
|
||||
private transferTimeoutCounter = new client.Counter({
|
||||
name: "transfer_timeout",
|
||||
help: "number of transfers that timed out",
|
||||
});
|
||||
private seqNumMismatchCounter = new client.Counter({
|
||||
name: "seq_num_mismatch",
|
||||
help: "number of transfers that failed due to sequence number mismatch",
|
||||
});
|
||||
private retryCounter = new client.Counter({
|
||||
name: "retries",
|
||||
help: "number of retry attempts",
|
||||
});
|
||||
private retriesExceededCounter = new client.Counter({
|
||||
name: "retries_exceeded",
|
||||
help: "number of transfers that failed due to exceeding the retry count",
|
||||
});
|
||||
private insufficentFundsCounter = new client.Counter({
|
||||
name: "insufficient_funds",
|
||||
help: "number of transfers that failed due to insufficient funds count",
|
||||
});
|
||||
// End metrics
|
||||
|
||||
private server = http.createServer(async (req, res) => {
|
||||
if (req.url === "/metrics") {
|
||||
// Return all metrics in the Prometheus exposition format
|
||||
res.setHeader("Content-Type", this.register.contentType);
|
||||
res.write(await this.register.metrics());
|
||||
res.end(await this.walletReg.metrics());
|
||||
}
|
||||
});
|
||||
|
||||
constructor(name: string, port: number) {
|
||||
this.register.setDefaultLabels({
|
||||
app: name,
|
||||
});
|
||||
this.collectDefaultMetrics({ register: this.register });
|
||||
// Register each metric
|
||||
this.register.registerMetric(this.seqNumGauge);
|
||||
this.register.registerMetric(this.successCounter);
|
||||
this.register.registerMetric(this.failureCounter);
|
||||
this.register.registerMetric(this.completeTime);
|
||||
this.register.registerMetric(this.listenCounter);
|
||||
this.register.registerMetric(this.alreadyExecutedCounter);
|
||||
this.register.registerMetric(this.transferTimeoutCounter);
|
||||
this.register.registerMetric(this.seqNumMismatchCounter);
|
||||
this.register.registerMetric(this.retryCounter);
|
||||
this.register.registerMetric(this.retriesExceededCounter);
|
||||
this.register.registerMetric(this.insufficentFundsCounter);
|
||||
// End registering metric
|
||||
|
||||
this.server.listen(port);
|
||||
}
|
||||
|
||||
// These are the accessor methods for the metrics
|
||||
setSeqNum(sn: number) {
|
||||
this.seqNumGauge.set(sn);
|
||||
}
|
||||
incSuccesses() {
|
||||
this.successCounter.inc();
|
||||
}
|
||||
incFailures() {
|
||||
this.failureCounter.inc();
|
||||
}
|
||||
addCompleteTime(val: number) {
|
||||
this.completeTime.observe(val);
|
||||
}
|
||||
setWalletBalance(bal: number) {
|
||||
this.walletReg.clear();
|
||||
// this.walletReg = new client.Registry();
|
||||
this.walletBalance = new client.Gauge({
|
||||
name: "wallet_balance",
|
||||
help: "The wallet balance",
|
||||
labelNames: ["timestamp"],
|
||||
registers: [this.walletReg],
|
||||
});
|
||||
this.walletReg.registerMetric(this.walletBalance);
|
||||
let now = new Date();
|
||||
// this.walletDate = now.toString();
|
||||
this.walletBalance.set({ timestamp: now.toString() }, bal);
|
||||
// this.walletBalance.set(bal);
|
||||
}
|
||||
incIncoming() {
|
||||
this.listenCounter.inc();
|
||||
}
|
||||
incAlreadyExec() {
|
||||
this.alreadyExecutedCounter.inc();
|
||||
}
|
||||
incTransferTimeout() {
|
||||
this.transferTimeoutCounter.inc();
|
||||
}
|
||||
incSeqNumMismatch() {
|
||||
this.seqNumMismatchCounter.inc();
|
||||
}
|
||||
incRetries() {
|
||||
this.retryCounter.inc();
|
||||
}
|
||||
incRetriesExceeded() {
|
||||
this.retriesExceededCounter.inc();
|
||||
}
|
||||
incInsufficentFunds() {
|
||||
this.insufficentFundsCounter.inc();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
import {
|
||||
connectToTerra,
|
||||
queryBalanceOnTerra,
|
||||
queryTerra,
|
||||
relayTerra,
|
||||
setAccountNumOnTerra,
|
||||
setSeqNumOnTerra,
|
||||
TerraConnectionData,
|
||||
} from "./terra";
|
||||
|
||||
export type ConnectionData = {
|
||||
terraData: TerraConnectionData;
|
||||
};
|
||||
|
||||
import { logger } from "../helpers";
|
||||
|
||||
export function connectRelayer(): ConnectionData {
|
||||
let td = connectToTerra();
|
||||
return { terraData: td };
|
||||
}
|
||||
|
||||
export async function setAccountNum(connectionData: ConnectionData) {
|
||||
try {
|
||||
await setAccountNumOnTerra(connectionData.terraData);
|
||||
} catch (e) {
|
||||
logger.error("setAccountNum: query failed: %o", e);
|
||||
}
|
||||
}
|
||||
|
||||
export async function setSeqNum(connectionData: ConnectionData) {
|
||||
try {
|
||||
await setSeqNumOnTerra(connectionData.terraData);
|
||||
} catch (e) {
|
||||
logger.error("setSeqNum: query failed: %o", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Exceptions from this method are caught at the higher level.
|
||||
export async function relay(
|
||||
signedVAAs: Array<string>,
|
||||
connectionData: ConnectionData
|
||||
): Promise<any> {
|
||||
return await relayTerra(connectionData.terraData, signedVAAs);
|
||||
}
|
||||
|
||||
export async function query(
|
||||
productIdStr: string,
|
||||
priceIdStr: string
|
||||
): Promise<any> {
|
||||
let result: any;
|
||||
try {
|
||||
let terraData = connectToTerra();
|
||||
result = await queryTerra(terraData, productIdStr, priceIdStr);
|
||||
} catch (e) {
|
||||
logger.error("query failed: %o", e);
|
||||
result = "Error: unhandled exception";
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
export async function queryBalance(
|
||||
connectionData: ConnectionData
|
||||
): Promise<number> {
|
||||
let balance: number = NaN;
|
||||
try {
|
||||
balance = await queryBalanceOnTerra(connectionData.terraData);
|
||||
} catch (e) {
|
||||
logger.error("balance query failed: %o", e);
|
||||
}
|
||||
|
||||
return balance;
|
||||
}
|
|
@ -0,0 +1,260 @@
|
|||
import { fromUint8Array } from "js-base64";
|
||||
import {
|
||||
LCDClient,
|
||||
LCDClientConfig,
|
||||
MnemonicKey,
|
||||
MsgExecuteContract,
|
||||
} from "@terra-money/terra.js";
|
||||
import { hexToUint8Array } from "@certusone/wormhole-sdk";
|
||||
import { redeemOnTerra } from "@certusone/wormhole-sdk";
|
||||
|
||||
import { logger } from "../helpers";
|
||||
|
||||
export type TerraConnectionData = {
|
||||
nodeUrl: string;
|
||||
terraChainId: string;
|
||||
terraName: string;
|
||||
walletPrivateKey: string;
|
||||
coin: string;
|
||||
contractAddress: string;
|
||||
lcdConfig: LCDClientConfig;
|
||||
walletSeqNum: number;
|
||||
walletAccountNum: number;
|
||||
};
|
||||
|
||||
export function connectToTerra(): TerraConnectionData {
|
||||
if (!process.env.TERRA_NODE_URL) {
|
||||
throw "Missing environment variable TERRA_NODE_URL";
|
||||
}
|
||||
|
||||
if (!process.env.TERRA_CHAIN_ID) {
|
||||
throw "Missing environment variable TERRA_CHAIN_ID";
|
||||
}
|
||||
|
||||
if (!process.env.TERRA_NAME) {
|
||||
throw "Missing environment variable TERRA_NAME";
|
||||
}
|
||||
|
||||
if (!process.env.TERRA_PRIVATE_KEY) {
|
||||
throw "Missing environment variable TERRA_PRIVATE_KEY";
|
||||
}
|
||||
|
||||
if (!process.env.TERRA_COIN) {
|
||||
throw "Missing environment variable TERRA_COIN";
|
||||
}
|
||||
|
||||
if (!process.env.TERRA_PYTH_CONTRACT_ADDRESS) {
|
||||
throw "Missing environment variable TERRA_PYTH_CONTRACT_ADDRESS";
|
||||
}
|
||||
|
||||
logger.info(
|
||||
"Terra connection parameters: url: [" +
|
||||
process.env.TERRA_NODE_URL +
|
||||
"], terraChainId: [" +
|
||||
process.env.TERRA_CHAIN_ID +
|
||||
"], terraName: [" +
|
||||
process.env.TERRA_NAME +
|
||||
"], coin: [" +
|
||||
process.env.TERRA_COIN +
|
||||
"], contractAddress: [" +
|
||||
process.env.TERRA_PYTH_CONTRACT_ADDRESS +
|
||||
"]"
|
||||
);
|
||||
|
||||
const lcdConfig = {
|
||||
URL: process.env.TERRA_NODE_URL,
|
||||
chainID: process.env.TERRA_CHAIN_ID,
|
||||
name: process.env.TERRA_NAME,
|
||||
};
|
||||
|
||||
return {
|
||||
nodeUrl: process.env.TERRA_NODE_URL,
|
||||
terraChainId: process.env.TERRA_CHAIN_ID,
|
||||
terraName: process.env.TERRA_NAME,
|
||||
walletPrivateKey: process.env.TERRA_PRIVATE_KEY,
|
||||
coin: process.env.TERRA_COIN,
|
||||
contractAddress: process.env.TERRA_PYTH_CONTRACT_ADDRESS,
|
||||
lcdConfig: lcdConfig,
|
||||
walletSeqNum: 0,
|
||||
walletAccountNum: 0,
|
||||
};
|
||||
}
|
||||
|
||||
export async function relayTerra(
|
||||
connectionData: TerraConnectionData,
|
||||
signedVAAs: Array<string>
|
||||
) {
|
||||
logger.debug("relaying " + signedVAAs.length + " messages to terra");
|
||||
|
||||
logger.debug("TIME: connecting to terra");
|
||||
const lcdClient = new LCDClient(connectionData.lcdConfig);
|
||||
|
||||
const mk = new MnemonicKey({
|
||||
mnemonic: connectionData.walletPrivateKey,
|
||||
});
|
||||
|
||||
const wallet = lcdClient.wallet(mk);
|
||||
|
||||
logger.debug("TIME: creating messages");
|
||||
let msgs = new Array<MsgExecuteContract>();
|
||||
for (let idx = 0; idx < signedVAAs.length; ++idx) {
|
||||
const msg = new MsgExecuteContract(
|
||||
wallet.key.accAddress,
|
||||
connectionData.contractAddress,
|
||||
{
|
||||
submit_vaa: {
|
||||
data: Buffer.from(signedVAAs[idx], "hex").toString("base64"),
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
msgs.push(msg);
|
||||
}
|
||||
|
||||
// logger.debug("TIME: looking up gas");
|
||||
// //Alternate FCD methodology
|
||||
// //let gasPrices = await axios.get("http://localhost:3060/v1/txs/gas_prices").then((result) => result.data);
|
||||
// const gasPrices = lcdClient.config.gasPrices;
|
||||
|
||||
// logger.debug("TIME: estimating fees");
|
||||
// //const walletSequence = await wallet.sequence();
|
||||
// const feeEstimate = await lcdClient.tx.estimateFee(
|
||||
// wallet.key.accAddress,
|
||||
// msgs,
|
||||
// {
|
||||
// //TODO figure out type mismatch
|
||||
// feeDenoms: [connectionData.coin],
|
||||
// gasPrices,
|
||||
// }
|
||||
// );
|
||||
|
||||
logger.debug(
|
||||
"TIME: creating transaction using seq number " +
|
||||
connectionData.walletSeqNum +
|
||||
" and account number " +
|
||||
connectionData.walletAccountNum
|
||||
);
|
||||
const tx = await wallet.createAndSignTx({
|
||||
sequence: connectionData.walletSeqNum,
|
||||
accountNumber: connectionData.walletAccountNum,
|
||||
msgs: msgs,
|
||||
memo: "P2T",
|
||||
feeDenoms: [connectionData.coin],
|
||||
});
|
||||
|
||||
connectionData.walletSeqNum = connectionData.walletSeqNum + 1;
|
||||
|
||||
logger.debug("TIME: sending msg");
|
||||
const receipt = await lcdClient.tx.broadcastSync(tx);
|
||||
logger.debug("TIME:submitted to terra: receipt: %o", receipt);
|
||||
return receipt;
|
||||
}
|
||||
|
||||
export async function queryTerra(
|
||||
connectionData: TerraConnectionData,
|
||||
productIdStr: string,
|
||||
priceIdStr: string
|
||||
) {
|
||||
const encodedProductId = fromUint8Array(hexToUint8Array(productIdStr));
|
||||
const encodedPriceId = fromUint8Array(hexToUint8Array(priceIdStr));
|
||||
|
||||
logger.info(
|
||||
"Querying terra for price info for productId [" +
|
||||
productIdStr +
|
||||
"], encoded as [" +
|
||||
encodedProductId +
|
||||
"], priceId [" +
|
||||
priceIdStr +
|
||||
"], encoded as [" +
|
||||
encodedPriceId +
|
||||
"]"
|
||||
);
|
||||
|
||||
const lcdClient = new LCDClient(connectionData.lcdConfig);
|
||||
|
||||
const mk = new MnemonicKey({
|
||||
mnemonic: connectionData.walletPrivateKey,
|
||||
});
|
||||
|
||||
const wallet = lcdClient.wallet(mk);
|
||||
|
||||
const query_result = await lcdClient.wasm.contractQuery(
|
||||
connectionData.contractAddress,
|
||||
{
|
||||
price_info: {
|
||||
product_id: encodedProductId,
|
||||
price_id: encodedPriceId,
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
logger.debug("queryTerra: query returned: %o", query_result);
|
||||
return query_result;
|
||||
}
|
||||
|
||||
export async function queryBalanceOnTerra(connectionData: TerraConnectionData) {
|
||||
const lcdClient = new LCDClient(connectionData.lcdConfig);
|
||||
|
||||
const mk = new MnemonicKey({
|
||||
mnemonic: connectionData.walletPrivateKey,
|
||||
});
|
||||
|
||||
const wallet = lcdClient.wallet(mk);
|
||||
|
||||
let balance: number = NaN;
|
||||
try {
|
||||
logger.debug("querying wallet balance");
|
||||
let coins: any;
|
||||
let pagnation: any;
|
||||
[coins, pagnation] = await lcdClient.bank.balance(wallet.key.accAddress);
|
||||
logger.debug("wallet query returned: %o", coins);
|
||||
if (coins) {
|
||||
let coin = coins.get(connectionData.coin);
|
||||
if (coin) {
|
||||
balance = parseInt(coin.toData().amount);
|
||||
} else {
|
||||
logger.error(
|
||||
"failed to query coin balance, coin [" +
|
||||
connectionData.coin +
|
||||
"] is not in the wallet, coins: %o",
|
||||
coins
|
||||
);
|
||||
}
|
||||
} else {
|
||||
logger.error("failed to query coin balance!");
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error("failed to query coin balance: %o", e);
|
||||
}
|
||||
|
||||
return balance;
|
||||
}
|
||||
|
||||
export async function setAccountNumOnTerra(
|
||||
connectionData: TerraConnectionData
|
||||
) {
|
||||
const lcdClient = new LCDClient(connectionData.lcdConfig);
|
||||
|
||||
const mk = new MnemonicKey({
|
||||
mnemonic: process.env.TERRA_PRIVATE_KEY,
|
||||
});
|
||||
|
||||
const wallet = lcdClient.wallet(mk);
|
||||
logger.debug("getting wallet account num");
|
||||
connectionData.walletAccountNum = await wallet.accountNumber();
|
||||
logger.debug("wallet account num is " + connectionData.walletAccountNum);
|
||||
}
|
||||
|
||||
export async function setSeqNumOnTerra(connectionData: TerraConnectionData) {
|
||||
const lcdClient = new LCDClient(connectionData.lcdConfig);
|
||||
|
||||
const mk = new MnemonicKey({
|
||||
mnemonic: process.env.TERRA_PRIVATE_KEY,
|
||||
});
|
||||
|
||||
const wallet = lcdClient.wallet(mk);
|
||||
|
||||
logger.debug("getting wallet seq num");
|
||||
connectionData.walletSeqNum = await wallet.sequence();
|
||||
logger.debug("wallet seq num is " + connectionData.walletSeqNum);
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
import { Request, Response } from "express";
|
||||
import { logger } from "./helpers";
|
||||
import { getStatus, getPriceData } from "./worker";
|
||||
|
||||
let restPort: number = 0;
|
||||
|
||||
export function init(runRest: boolean): boolean {
|
||||
if (!runRest) return true;
|
||||
if (!process.env.REST_PORT) return true;
|
||||
|
||||
restPort = parseInt(process.env.REST_PORT);
|
||||
return true;
|
||||
}
|
||||
|
||||
export async function run() {
|
||||
if (restPort == 0) return;
|
||||
|
||||
const express = require("express");
|
||||
const cors = require("cors");
|
||||
const app = express();
|
||||
app.use(cors());
|
||||
|
||||
app.listen(restPort, () =>
|
||||
logger.debug("listening on REST port " + restPort)
|
||||
);
|
||||
|
||||
(async () => {
|
||||
app.get("/status", async (req: Request, res: Response) => {
|
||||
let result = await getStatus();
|
||||
res.json(result);
|
||||
});
|
||||
|
||||
app.get(
|
||||
"/queryterra/:product_id/:price_id",
|
||||
async (req: Request, res: Response) => {
|
||||
let result = await getPriceData(
|
||||
req.params.product_id,
|
||||
req.params.price_id
|
||||
);
|
||||
res.json(result);
|
||||
}
|
||||
);
|
||||
|
||||
app.get("/", (req: Request, res: Response) =>
|
||||
res.json(["/status", "/queryterra/<product_id>/<price_id>"])
|
||||
);
|
||||
})();
|
||||
}
|
|
@ -0,0 +1,571 @@
|
|||
import { Mutex } from "async-mutex";
|
||||
let CondVar = require("condition-variable");
|
||||
|
||||
import { setDefaultWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
|
||||
import { uint8ArrayToHex } from "@certusone/wormhole-sdk";
|
||||
|
||||
import * as helpers from "./helpers";
|
||||
import { logger } from "./helpers";
|
||||
import * as main from "./relay/main";
|
||||
import { PromHelper } from "./promHelpers";
|
||||
|
||||
const mutex = new Mutex();
|
||||
let condition = new CondVar();
|
||||
let conditionTimeout = 20000;
|
||||
|
||||
type PendingPayload = {
|
||||
vaa_bytes: string;
|
||||
pa: helpers.PythPriceAttestation;
|
||||
receiveTime: Date;
|
||||
seqNum: number;
|
||||
};
|
||||
|
||||
let pendingMap = new Map<string, PendingPayload>(); // The key to this is price_id. Note that Map maintains insertion order, not key order.
|
||||
|
||||
type ProductData = {
|
||||
key: string;
|
||||
lastTimePublished: Date;
|
||||
numTimesPublished: number;
|
||||
lastPa: helpers.PythPriceAttestation;
|
||||
lastResult: any;
|
||||
};
|
||||
|
||||
type CurrentEntry = {
|
||||
pendingEntry: PendingPayload;
|
||||
currObj: ProductData;
|
||||
};
|
||||
|
||||
let productMap = new Map<string, ProductData>(); // The key to this is price_id
|
||||
|
||||
let connectionData: main.ConnectionData;
|
||||
let metrics: PromHelper;
|
||||
let nextBalanceQueryTimeAsMs: number = 0;
|
||||
let balanceQueryInterval = 0;
|
||||
let walletTimeStamp: Date;
|
||||
let maxPerBatch: number = 1;
|
||||
let maxAttempts: number = 2;
|
||||
let retryDelayInMs: number = 0;
|
||||
|
||||
export function init(runWorker: boolean): boolean {
|
||||
if (!runWorker) return true;
|
||||
|
||||
try {
|
||||
connectionData = main.connectRelayer();
|
||||
} catch (e) {
|
||||
logger.error("failed to load connection config: %o", e);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (process.env.MAX_MSGS_PER_BATCH) {
|
||||
maxPerBatch = parseInt(process.env.MAX_MSGS_PER_BATCH);
|
||||
}
|
||||
|
||||
if (maxPerBatch <= 0) {
|
||||
logger.error(
|
||||
"Environment variable MAX_MSGS_PER_BATCH has an invalid value of " +
|
||||
maxPerBatch +
|
||||
", must be greater than zero."
|
||||
);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (process.env.RETRY_MAX_ATTEMPTS) {
|
||||
maxAttempts = parseInt(process.env.RETRY_MAX_ATTEMPTS);
|
||||
}
|
||||
|
||||
if (maxAttempts <= 0) {
|
||||
logger.error(
|
||||
"Environment variable RETRY_MAX_ATTEMPTS has an invalid value of " +
|
||||
maxAttempts +
|
||||
", must be greater than zero."
|
||||
);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (process.env.RETRY_DELAY_IN_MS) {
|
||||
retryDelayInMs = parseInt(process.env.RETRY_DELAY_IN_MS);
|
||||
}
|
||||
|
||||
if (retryDelayInMs < 0) {
|
||||
logger.error(
|
||||
"Environment variable RETRY_DELAY_IN_MS has an invalid value of " +
|
||||
retryDelayInMs +
|
||||
", must be positive or zero."
|
||||
);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
export async function run(met: PromHelper) {
|
||||
setDefaultWasm("node");
|
||||
|
||||
metrics = met;
|
||||
|
||||
await mutex.runExclusive(async () => {
|
||||
logger.info(
|
||||
"will attempt to relay each pyth message at most " +
|
||||
maxAttempts +
|
||||
" times, with a delay of " +
|
||||
retryDelayInMs +
|
||||
" milliseconds between attempts, will batch up to " +
|
||||
maxPerBatch +
|
||||
" pyth messages in a batch"
|
||||
);
|
||||
|
||||
if (process.env.BAL_QUERY_INTERVAL) {
|
||||
balanceQueryInterval = parseInt(process.env.BAL_QUERY_INTERVAL);
|
||||
}
|
||||
|
||||
await main.setAccountNum(connectionData);
|
||||
logger.info(
|
||||
"wallet account number is " + connectionData.terraData.walletAccountNum
|
||||
);
|
||||
|
||||
await main.setSeqNum(connectionData);
|
||||
logger.info(
|
||||
"initial wallet sequence number is " +
|
||||
connectionData.terraData.walletSeqNum
|
||||
);
|
||||
|
||||
let balance = await main.queryBalance(connectionData);
|
||||
if (!isNaN(balance)) {
|
||||
walletTimeStamp = new Date();
|
||||
}
|
||||
if (balanceQueryInterval !== 0) {
|
||||
logger.info(
|
||||
"initial wallet balance is " +
|
||||
balance +
|
||||
", will query every " +
|
||||
balanceQueryInterval +
|
||||
" milliseconds."
|
||||
);
|
||||
metrics.setWalletBalance(balance);
|
||||
|
||||
nextBalanceQueryTimeAsMs = new Date().getTime() + balanceQueryInterval;
|
||||
} else {
|
||||
logger.info("initial wallet balance is " + balance);
|
||||
metrics.setWalletBalance(balance);
|
||||
}
|
||||
|
||||
await condition.wait(computeTimeout(), callBack);
|
||||
});
|
||||
}
|
||||
|
||||
async function callBack(err: any, result: any) {
|
||||
logger.debug(
|
||||
"entering callback, pendingEvents: " +
|
||||
pendingMap.size +
|
||||
", err: %o, result: %o",
|
||||
err,
|
||||
result
|
||||
);
|
||||
// condition = null;
|
||||
// await helpers.sleep(10000);
|
||||
// logger.debug("done with long sleep");
|
||||
let done = false;
|
||||
do {
|
||||
let currObjs = new Array<CurrentEntry>();
|
||||
let messages = new Array<string>();
|
||||
|
||||
await mutex.runExclusive(async () => {
|
||||
condition = null;
|
||||
logger.debug("in callback, getting pending events.");
|
||||
await getPendingEventsAlreadyLocked(currObjs, messages);
|
||||
|
||||
if (currObjs.length === 0) {
|
||||
done = true;
|
||||
condition = new CondVar();
|
||||
await condition.wait(computeTimeout(), callBack);
|
||||
}
|
||||
});
|
||||
|
||||
if (currObjs.length !== 0) {
|
||||
logger.debug("in callback, relaying " + currObjs.length + " events.");
|
||||
let sendTime = new Date();
|
||||
let retVal: number;
|
||||
let relayResult: any;
|
||||
[retVal, relayResult] = await relayEventsNotLocked(messages);
|
||||
|
||||
await mutex.runExclusive(async () => {
|
||||
logger.debug("in callback, finalizing " + currObjs.length + " events.");
|
||||
await finalizeEventsAlreadyLocked(
|
||||
currObjs,
|
||||
retVal,
|
||||
relayResult,
|
||||
sendTime
|
||||
);
|
||||
|
||||
if (pendingMap.size === 0) {
|
||||
logger.debug("in callback, rearming the condition.");
|
||||
done = true;
|
||||
condition = new CondVar();
|
||||
await condition.wait(computeTimeout(), callBack);
|
||||
}
|
||||
});
|
||||
}
|
||||
} while (!done);
|
||||
|
||||
logger.debug("leaving callback.");
|
||||
}
|
||||
|
||||
function computeTimeout(): number {
|
||||
if (balanceQueryInterval !== 0) {
|
||||
let now = new Date().getTime();
|
||||
if (now < nextBalanceQueryTimeAsMs) {
|
||||
return nextBalanceQueryTimeAsMs - now;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
return conditionTimeout;
|
||||
}
|
||||
|
||||
async function getPendingEventsAlreadyLocked(
|
||||
currObjs: Array<CurrentEntry>,
|
||||
messages: Array<string>
|
||||
) {
|
||||
while (pendingMap.size !== 0 && currObjs.length < maxPerBatch) {
|
||||
const first = pendingMap.entries().next();
|
||||
logger.debug("processing event with key [" + first.value[0] + "]");
|
||||
const pendingValue = first.value[1];
|
||||
let pendingKey = pendingValue.pa.priceId;
|
||||
let currObj = productMap.get(pendingKey);
|
||||
if (currObj) {
|
||||
currObj.lastPa = pendingValue.pa;
|
||||
currObj.lastTimePublished = new Date();
|
||||
productMap.set(pendingKey, currObj);
|
||||
logger.debug(
|
||||
"processing update " +
|
||||
currObj.numTimesPublished +
|
||||
" for [" +
|
||||
pendingKey +
|
||||
"], seq num " +
|
||||
pendingValue.seqNum
|
||||
);
|
||||
} else {
|
||||
logger.debug(
|
||||
"processing first update for [" +
|
||||
pendingKey +
|
||||
"], seq num " +
|
||||
pendingValue.seqNum
|
||||
);
|
||||
currObj = {
|
||||
key: pendingKey,
|
||||
lastPa: pendingValue.pa,
|
||||
lastTimePublished: new Date(),
|
||||
numTimesPublished: 0,
|
||||
lastResult: "",
|
||||
};
|
||||
productMap.set(pendingKey, currObj);
|
||||
}
|
||||
|
||||
currObjs.push({ pendingEntry: pendingValue, currObj: currObj });
|
||||
messages.push(pendingValue.vaa_bytes);
|
||||
pendingMap.delete(first.value[0]);
|
||||
}
|
||||
|
||||
if (currObjs.length !== 0) {
|
||||
for (let idx = 0; idx < currObjs.length; ++idx) {
|
||||
pendingMap.delete(currObjs[idx].currObj.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const RELAY_SUCCESS: number = 0;
|
||||
const RELAY_FAIL: number = 1;
|
||||
const RELAY_ALREADY_EXECUTED: number = 2;
|
||||
const RELAY_TIMEOUT: number = 3;
|
||||
const RELAY_SEQ_NUM_MISMATCH: number = 4;
|
||||
const RELAY_INSUFFICIENT_FUNDS: number = 5;
|
||||
|
||||
async function relayEventsNotLocked(
|
||||
messages: Array<string>
|
||||
): Promise<[number, any]> {
|
||||
let retVal: number = RELAY_SUCCESS;
|
||||
let relayResult: any;
|
||||
let retry: boolean = false;
|
||||
|
||||
for (let attempt = 0; attempt < maxAttempts; ++attempt) {
|
||||
retVal = RELAY_SUCCESS;
|
||||
retry = false;
|
||||
|
||||
try {
|
||||
relayResult = await main.relay(messages, connectionData);
|
||||
if (relayResult.txhash) {
|
||||
if (
|
||||
relayResult.raw_log &&
|
||||
relayResult.raw_log.search("VaaAlreadyExecuted") >= 0
|
||||
) {
|
||||
relayResult = "Already Executed: " + relayResult.txhash;
|
||||
retVal = RELAY_ALREADY_EXECUTED;
|
||||
} else if (
|
||||
relayResult.raw_log &&
|
||||
relayResult.raw_log.search("insufficient funds") >= 0
|
||||
) {
|
||||
logger.error(
|
||||
"relay failed due to insufficient funds: %o",
|
||||
relayResult
|
||||
);
|
||||
connectionData.terraData.walletSeqNum =
|
||||
connectionData.terraData.walletSeqNum - 1;
|
||||
retVal = RELAY_INSUFFICIENT_FUNDS;
|
||||
} else if (
|
||||
relayResult.raw_log &&
|
||||
relayResult.raw_log.search("failed") >= 0
|
||||
) {
|
||||
logger.error("relay seems to have failed: %o", relayResult);
|
||||
retVal = RELAY_FAIL;
|
||||
retry = true;
|
||||
} else {
|
||||
relayResult = relayResult.txhash;
|
||||
}
|
||||
} else {
|
||||
retVal = RELAY_FAIL;
|
||||
retry = true;
|
||||
if (relayResult.message) {
|
||||
relayResult = relayResult.message;
|
||||
} else {
|
||||
logger.error("No txhash: %o", relayResult);
|
||||
relayResult = "No txhash";
|
||||
}
|
||||
}
|
||||
} catch (e: any) {
|
||||
if (
|
||||
e.message &&
|
||||
e.message.search("timeout") >= 0 &&
|
||||
e.message.search("exceeded") >= 0
|
||||
) {
|
||||
logger.error("relay timed out: %o", e);
|
||||
retVal = RELAY_TIMEOUT;
|
||||
retry = true;
|
||||
} else {
|
||||
logger.error("relay failed: %o", e);
|
||||
if (e.response && e.response.data) {
|
||||
if (
|
||||
e.response.data.error &&
|
||||
e.response.data.error.search("VaaAlreadyExecuted") >= 0
|
||||
) {
|
||||
relayResult = "Already Executed";
|
||||
retVal = RELAY_ALREADY_EXECUTED;
|
||||
} else if (
|
||||
e.response.data.message &&
|
||||
e.response.data.message.search("account sequence mismatch") >= 0
|
||||
) {
|
||||
relayResult = e.response.data.message;
|
||||
retVal = RELAY_SEQ_NUM_MISMATCH;
|
||||
retry = true;
|
||||
|
||||
logger.debug(
|
||||
"wallet sequence number is out of sync, querying the current value"
|
||||
);
|
||||
await main.setSeqNum(connectionData);
|
||||
logger.info(
|
||||
"wallet seq number is now " +
|
||||
connectionData.terraData.walletSeqNum
|
||||
);
|
||||
} else {
|
||||
retVal = RELAY_FAIL;
|
||||
retry = true;
|
||||
if (e.message) {
|
||||
relayResult = "Error: " + e.message;
|
||||
} else {
|
||||
relayResult = "Error: unexpected exception";
|
||||
}
|
||||
}
|
||||
} else {
|
||||
retVal = RELAY_FAIL;
|
||||
retry = true;
|
||||
if (e.message) {
|
||||
relayResult = "Error: " + e.message;
|
||||
} else {
|
||||
relayResult = "Error: unexpected exception";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
"relay attempt complete: retVal: " +
|
||||
retVal +
|
||||
", retry: " +
|
||||
retry +
|
||||
", attempt " +
|
||||
attempt +
|
||||
" of " +
|
||||
maxAttempts
|
||||
);
|
||||
|
||||
if (!retry) {
|
||||
break;
|
||||
} else {
|
||||
metrics.incRetries();
|
||||
if (retryDelayInMs != 0) {
|
||||
logger.debug(
|
||||
"delaying for " + retryDelayInMs + " milliseconds before retrying"
|
||||
);
|
||||
await helpers.sleep(retryDelayInMs * (attempt + 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (retry) {
|
||||
logger.error("failed to relay batch, retry count exceeded!");
|
||||
metrics.incRetriesExceeded();
|
||||
}
|
||||
|
||||
return [retVal, relayResult];
|
||||
}
|
||||
|
||||
async function finalizeEventsAlreadyLocked(
|
||||
currObjs: Array<CurrentEntry>,
|
||||
retVal: number,
|
||||
relayResult: any,
|
||||
sendTime: Date
|
||||
) {
|
||||
for (let idx = 0; idx < currObjs.length; ++idx) {
|
||||
let currObj = currObjs[idx].currObj;
|
||||
let currEntry = currObjs[idx].pendingEntry;
|
||||
currObj.lastResult = relayResult;
|
||||
currObj.numTimesPublished = currObj.numTimesPublished + 1;
|
||||
if (retVal == RELAY_SUCCESS) {
|
||||
metrics.incSuccesses();
|
||||
} else if (retVal == RELAY_ALREADY_EXECUTED) {
|
||||
metrics.incAlreadyExec();
|
||||
} else if (retVal == RELAY_TIMEOUT) {
|
||||
metrics.incTransferTimeout();
|
||||
metrics.incFailures();
|
||||
} else if (retVal == RELAY_SEQ_NUM_MISMATCH) {
|
||||
metrics.incSeqNumMismatch();
|
||||
metrics.incFailures();
|
||||
} else if (retVal == RELAY_INSUFFICIENT_FUNDS) {
|
||||
metrics.incInsufficentFunds();
|
||||
metrics.incFailures();
|
||||
} else {
|
||||
metrics.incFailures();
|
||||
}
|
||||
productMap.set(currObj.key, currObj);
|
||||
|
||||
let completeTime = new Date();
|
||||
metrics.setSeqNum(currEntry.seqNum);
|
||||
metrics.addCompleteTime(
|
||||
completeTime.getTime() - currEntry.receiveTime.getTime()
|
||||
);
|
||||
|
||||
logger.info(
|
||||
"complete: priceId: " +
|
||||
currEntry.pa.priceId +
|
||||
", seqNum: " +
|
||||
currEntry.seqNum +
|
||||
", price: " +
|
||||
helpers.computePrice(currEntry.pa.price, currEntry.pa.exponent) +
|
||||
", ci: " +
|
||||
helpers.computePrice(
|
||||
currEntry.pa.confidenceInterval,
|
||||
currEntry.pa.exponent
|
||||
) +
|
||||
", rcv2SendBegin: " +
|
||||
(sendTime.getTime() - currEntry.receiveTime.getTime()) +
|
||||
", rcv2SendComplete: " +
|
||||
(completeTime.getTime() - currEntry.receiveTime.getTime()) +
|
||||
", totalSends: " +
|
||||
currObj.numTimesPublished +
|
||||
", result: " +
|
||||
relayResult
|
||||
);
|
||||
}
|
||||
|
||||
let now = new Date();
|
||||
if (balanceQueryInterval > 0 && now.getTime() >= nextBalanceQueryTimeAsMs) {
|
||||
let balance = await main.queryBalance(connectionData);
|
||||
if (isNaN(balance)) {
|
||||
logger.error("failed to query wallet balance!");
|
||||
} else {
|
||||
if (!isNaN(balance)) {
|
||||
walletTimeStamp = new Date();
|
||||
}
|
||||
logger.info(
|
||||
"wallet balance: " +
|
||||
balance +
|
||||
", update time: " +
|
||||
walletTimeStamp.toISOString()
|
||||
);
|
||||
metrics.setWalletBalance(balance);
|
||||
}
|
||||
nextBalanceQueryTimeAsMs = now.getTime() + balanceQueryInterval;
|
||||
}
|
||||
}
|
||||
|
||||
export async function postEvent(
|
||||
vaaBytes: any,
|
||||
pa: helpers.PythPriceAttestation,
|
||||
sequence: number,
|
||||
receiveTime: Date
|
||||
) {
|
||||
let event: PendingPayload = {
|
||||
vaa_bytes: uint8ArrayToHex(vaaBytes),
|
||||
pa: pa,
|
||||
receiveTime: receiveTime,
|
||||
seqNum: sequence,
|
||||
};
|
||||
let pendingKey = pa.priceId;
|
||||
// pendingKey = pendingKey + ":" + sequence;
|
||||
await mutex.runExclusive(() => {
|
||||
logger.debug("posting event with key [" + pendingKey + "]");
|
||||
pendingMap.set(pendingKey, event);
|
||||
if (condition) {
|
||||
logger.debug("hitting condition variable.");
|
||||
condition.complete(true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export async function getStatus() {
|
||||
let result = "[";
|
||||
await mutex.runExclusive(() => {
|
||||
let first: boolean = true;
|
||||
for (let [key, value] of productMap) {
|
||||
if (first) {
|
||||
first = false;
|
||||
} else {
|
||||
result = result + ", ";
|
||||
}
|
||||
|
||||
let item: object = {
|
||||
product_id: value.lastPa.productId,
|
||||
price_id: value.lastPa.priceId,
|
||||
price: helpers.computePrice(value.lastPa.price, value.lastPa.exponent),
|
||||
ci: helpers.computePrice(
|
||||
value.lastPa.confidenceInterval,
|
||||
value.lastPa.exponent
|
||||
),
|
||||
num_times_published: value.numTimesPublished,
|
||||
last_time_published: value.lastTimePublished.toISOString(),
|
||||
result: value.lastResult,
|
||||
};
|
||||
|
||||
result = result + JSON.stringify(item);
|
||||
}
|
||||
});
|
||||
|
||||
result = result + "]";
|
||||
return result;
|
||||
}
|
||||
|
||||
// Note that querying the contract does not update the sequence number, so we don't need to be locked.
|
||||
export async function getPriceData(
|
||||
productId: string,
|
||||
priceId: string
|
||||
): Promise<any> {
|
||||
let result: any;
|
||||
// await mutex.runExclusive(async () => {
|
||||
result = await main.query(productId, priceId);
|
||||
// });
|
||||
|
||||
return result;
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
{
|
||||
"compilerOptions": {
|
||||
"outDir": "lib",
|
||||
"target": "esnext",
|
||||
"module": "commonjs",
|
||||
"moduleResolution": "node",
|
||||
"lib": ["es2019"],
|
||||
"skipLibCheck": true,
|
||||
"allowJs": true,
|
||||
"alwaysStrict": true,
|
||||
"strict": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"noFallthroughCasesInSwitch": true,
|
||||
"resolveJsonModule": true,
|
||||
"isolatedModules": true,
|
||||
"downlevelIteration": true
|
||||
},
|
||||
"include": ["src"],
|
||||
"exclude": ["node_modules", "**/__tests__/*"]
|
||||
}
|
|
@ -3,6 +3,7 @@
|
|||
from pyth_utils import *
|
||||
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
|
||||
import json
|
||||
import os
|
||||
|
@ -57,24 +58,8 @@ def accounts_endpoint():
|
|||
httpd.serve_forever()
|
||||
|
||||
|
||||
# Fund the publisher
|
||||
sol_run_or_die("airdrop", [
|
||||
str(SOL_AIRDROP_AMT),
|
||||
"--keypair", PYTH_PUBLISHER_KEYPAIR,
|
||||
"--commitment", "finalized",
|
||||
])
|
||||
|
||||
# Create a mapping
|
||||
pyth_run_or_die("init_mapping")
|
||||
|
||||
print(f"Creating {PYTH_TEST_SYMBOL_COUNT} test Pyth symbols")
|
||||
|
||||
publisher_pubkey = sol_run_or_die("address", args=[
|
||||
"--keypair", PYTH_PUBLISHER_KEYPAIR
|
||||
], capture_output=True).stdout.strip()
|
||||
|
||||
for i in range(PYTH_TEST_SYMBOL_COUNT):
|
||||
symbol_name = f"Test symbol {i}"
|
||||
def add_symbol(num: int):
|
||||
symbol_name = f"Test symbol {num}"
|
||||
# Add a product
|
||||
prod_pubkey = pyth_run_or_die(
|
||||
"add_product", capture_output=True).stdout.strip()
|
||||
|
@ -112,6 +97,31 @@ for i in range(PYTH_TEST_SYMBOL_COUNT):
|
|||
|
||||
sys.stdout.flush()
|
||||
|
||||
return num
|
||||
|
||||
|
||||
# Fund the publisher
|
||||
sol_run_or_die("airdrop", [
|
||||
str(SOL_AIRDROP_AMT),
|
||||
"--keypair", PYTH_PUBLISHER_KEYPAIR,
|
||||
"--commitment", "finalized",
|
||||
])
|
||||
|
||||
# Create a mapping
|
||||
pyth_run_or_die("init_mapping")
|
||||
|
||||
print(f"Creating {PYTH_TEST_SYMBOL_COUNT} test Pyth symbols")
|
||||
|
||||
publisher_pubkey = sol_run_or_die("address", args=[
|
||||
"--keypair", PYTH_PUBLISHER_KEYPAIR
|
||||
], capture_output=True).stdout.strip()
|
||||
|
||||
with ThreadPoolExecutor(max_workers=10) as executor:
|
||||
add_symbol_futures = {executor.submit(add_symbol, sym_id) for sym_id in range(PYTH_TEST_SYMBOL_COUNT)}
|
||||
|
||||
for future in as_completed(add_symbol_futures):
|
||||
print(f"Completed {future.result()}")
|
||||
|
||||
print(
|
||||
f"Mock updates ready to roll. Updating every {str(PYTH_PUBLISHER_INTERVAL)} seconds")
|
||||
|
||||
|
|
Loading…
Reference in New Issue