Pyth relay (#14)

* initial commit

* Rework to make it pyth instead of spy

* Add command to listen to mainnet to readme

* More code rework

* Dump productId from messages

* Fixes for message parsing

* Drop duplicate messages

* Start of creating docker file for pyth_relay

* Trying to get docker  to listen to spy_guardian

* Proving that docker image works

* Publish to Terra

* Keep connection to Terra open all the time

* Add timing to terra relayer code

* Not passing fees into transaction

* Use Winston logger

* Update commads to build and run spy guardian

* Specify config file via env variable

* Console logging not honoring log level

* Remove dependence on redis

* Add support for listen only mode

* number of publishes is off by one

* Only one worker should use the wallet at a time

* Add prometheus metrics

* Add ability to query wallet balance

* Add latency histogram

* Use a condition variable rather than sleeping to reduce delays

* Periodically query the wallet balance

* Add wallet balance metrics

* Add incoming packet counter

* Enable strict type checking

* Add design document

* Adding more stuff

* Remove use of deprecated methods.

* Back out bug from "fixing" deprecated code.

* Remove deprecated methods

* fix: Improvements to Dockerfile Wormhole pyth_relay and spy_guardian (#10)

Co-authored-by: Eran Davidovich <edavidovich@jumptrading.com>

* Revert dockerfile

* fix: fix Dockerfile entrypoint (#11)

also modify docs to reflect the change.

Co-authored-by: Eran Davidovich <edavidovich@jumptrading.com>

* Batch messages into a single tx per block

* Update the testnet parms in env.sample

* Reduce lock contention in the worker

* Adding already executed metric

* Adding timeout metric

* Peg already executed and timeout metrics

* Handle "already executed" exception

* Add wallet update time for metrics

* Catch silent failure on relay

* Log file not getting written if error on init

* Add timestamp label to wallet metric

* Add support for readiness probe

* Make spy_relay support listen_only mode

* Add default metrics

* Manage seq num locally

* Manage seq num locally

* Oops! Didn't mean to commit "noEmit"!

* Update docs

* Delete app.js

This file is not relevant to pyth_relay

Co-authored-by: Paul Noel <panoel007@gmail.com>
Co-authored-by: Eran Davidovich <erancx@users.noreply.github.com>
Co-authored-by: Eran Davidovich <edavidovich@jumptrading.com>
Co-authored-by: Paul Noel <35237584+panoel@users.noreply.github.com>
This commit is contained in:
bruce-riley 2022-02-09 08:01:45 -06:00 committed by GitHub
parent a588ccf8f1
commit dc55817d6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 15574 additions and 64 deletions

42
pyth_relay/.env.sample Normal file
View File

@ -0,0 +1,42 @@
# DevNet:
SPY_SERVICE_HOST=0.0.0.0:7072
SPY_SERVICE_FILTERS=[{"chain_id":1,"emitter_address":"71f8dcb863d176e2c420ad6610cf687359612b6fb392e0642b0ca6b1f186aa3b"}]
TERRA_NODE_URL=http://localhost:1317
TERRA_PRIVATE_KEY=notice oak worry limit wrap speak medal online prefer cluster roof addict wrist behave treat actual wasp year salad speed social layer crew genius
TERRA_PYTH_CONTRACT_ADDRESS=terra1wgh6adn8geywx0v78zs9azrqtqdegufuegnwep
TERRA_CHAIN_ID=columbus-5
TERRA_NAME=localterra
TERRA_COIN=uluna
# TestNet:
#SPY_SERVICE_HOST=0.0.0.0:7073
#SPY_SERVICE_FILTERS=[{"chain_id":1,"emitter_address":"3afda841c1f43dd7d546c8a581ba1f92a139f4133f9f6ab095558f6a359df5d4"}]
#TERRA_NODE_URL=https://bombay-lcd.terra.dev
#TERRA_PRIVATE_KEY=your key here
#TERRA_PYTH_CONTRACT_ADDRESS=terra1wjkzgcrg3a2jh2cyc5lekvtjydf600splmvdk4
#TERRA_CHAIN_ID=bombay-12
#TERRA_NAME=testnet
#TERRA_COIN=uluna
# MainNet:
#SPY_SERVICE_HOST=0.0.0.0:7074
#SPY_SERVICE_FILTERS=[{"chain_id":1,"emitter_address":"b2dd468c9b8c80b3dd9211e9e3fd6ee4d652eb5997b7c9020feae971c278ab07"}]
#TERRA_NODE_URL=https://lcd.terra.dev
#TERRA_PRIVATE_KEY=your key here
#TERRA_PYTH_CONTRACT_ADDRESS=fill_this_in
#TERRA_CHAIN_ID=columbus-5
#TERRA_NAME=mainnet
#TERRA_COIN=uluna
REST_PORT=4200
PROM_PORT=8081
BAL_QUERY_INTERVAL=60000
#READINESS_PORT=2000
RETRY_MAX_ATTEMPTS=4
RETRY_DELAY_IN_MS=250
MAX_MSGS_PER_BATCH=1
# The default is to log the console with level info.
LOG_DIR=/var/pyth_relay/logs
#LOG_LEVEL=debug

1
pyth_relay/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/lib

95
pyth_relay/Design.md Normal file
View File

@ -0,0 +1,95 @@
# Overview
The pyth_relay program is designed to listen to Pyth messages published on Solana and relay them to other chains.
Although in its initial release, the only supported destination chain is Terra, the design supports publishing to multiple chains.
<p>
The relayer listens to the spy_guardian for signed VAA messages. It can be configured to only request specific emitters, so that only Pyth messages get forwarded.
<p>
When the relayer receives messages from the spy, it drops redundant messages based on sequence numbers, verifies the message is a Pyth message, and relays the pyth
messages to Terra.
# Operational Details
The relayer can be run as a docker image. Additionally, you need to have an instance of the spy guardian running, which can be started using a docker image.
<p>
The relayer is configured using an env file, as specified by the PYTH_RELAY_CONFIG environment variable. Please see the env.samples file in the source directory for
valid variables.
<p>
The relayer can be configured to log to a file in the directory specified by the LOG_DIR environment variable. If the variable is not specified, it logs to the console.
<p>
The log level can be controlled by the LOG_LEVEL environment variable, where info is the default. The valid values are debug, info, warn, and error.
# External Dependencies
The relayer connects to Terra, so it therefore has the following dependencies
1. A Pyth to Wormhole publisher
2. A highly reliable connection to a local Terra node via Wormhole
3. A unique Terra Wallet per instance of pyth_relayer
4. A Wormhole spy guardian process running that the pyth_relayer can subscribe to for Pyth messages
Note that for performance reasons, pyth_relayer manages the Terra wallet sequence number locally. If it does not do so, it will get wallet sequence number errors if it publishes faster than the Terra node can handle it. For this to work, the relayer should be connected to a local Terra node, to minimize the possible paths the published message could take, and maintain sequence number ordering.
# High Availability
If high availability is a goal, then two completely seperate instances of pyth_relay should be run. They should run on completely separate hardware, using separate Terra connections and wallets. Additionally, they should connect to separate instances of the spy_guardian. They will both be publishing messages to the Pyth Terra contract, which will simply drop the duplicates.
# Design Details
The relayer code is divided into separate source files, based on functionality. The main entry point is index.ts. It invokes code in the other files as follows.
## listener.ts
The listener code parses the emitter filter parameter, which may consist of none, one or more chain / emitter pairs. If any filters are specified, then only VAAs from those emitters will be processed. The listener then registers those emitters with the spy guardian via RPC callback.
<p>
When the listener receives a VAA from the spy, it verifies that it has not already been seen, based on the sequence number. This is necessary since there are multiple guardians signing and publishing the same VAAs. It then validates that it is a Pyth message. All Pyth payloads start with P2WH. If so, it invokes the postEvent method on the worker to forward the VAA for publishing.
## worker.ts
The worker code is responsible for taking VAAs to be published from the listener and passing them to the relay code for relaying to Terra.
<p>
The worker uses a map of pending events, and a condition variable to signal that there are events waiting to be published, and a map of the latest state of each Pyth price.
The worker protects all of these objects with a mutex.
<p>
The worker maintains performance metrics to be published by the Prometeus interface.
<p>
The worker also provides methods to query the status of the wallet being used for relaying, the current status of all maintained prices, and can query Terra for the current
data for a given price. These are used by the REST interface, if it is enabled in the config.
<p>
In most cases, if a Terra transaction fails, the worker will retry up to a configurable number of times, with a configurable delay between each time. For each successive retry of a given message, they delay is increased by the retry attempt number (delay * attempt).
## main.ts and terra.ts
This is the code that actually communicates with the Terra block chain. It takes configuration data from the env file, and provides methods to relay a Pyth message, query the wallet balance, and query the current data for a given price.
## promHelper.ts
Prometheus is being used as a framework for storing metrics. Currently, the following metrics are being collected:
- The last sequence number sent
- The total number of successful relays
- The total number of failed relays
- A histogram of transfer times
- The current wallet balance
- The total number of VAAs received by the listener
- The total number of VAAs already executed on Terra
- The total number of Terra transaction timeouts
- The total number of Terra sequence number errors
- The total number of Terra retry attempts
- The total number of retry limit exceeded errors
- The total number of transactions failed due to insufficient funds
All the above metrics can be viewed at http://localhost:8081/metrics
<p>
The port 8081 is the default. The port can be specified by the `PROM_PORT` tunable in the env file.
<p>
This file contains a class named `PromHelper`. It is an encapsulation of the Prometheus API.
## helpers.ts
This contains an assortment of helper functions and objects used by the other code, including logger initialization and parsing of Pyth messages.

View File

@ -0,0 +1,15 @@
FROM node:16-alpine
WORKDIR /app/pyth_relay
COPY . .
RUN npm install && npm run build && npm cache clean --force
# If you are building for production
# RUN npm ci --only=production
RUN mkdir -p /app/pyth_relay/logs
RUN addgroup -S pyth -g 10001 && adduser -S pyth -G pyth -u 10001
RUN chown -R pyth:pyth src/ logs/ lib/
USER pyth
CMD [ "node", "lib/index.js" ]

View File

@ -0,0 +1,30 @@
FROM docker.io/golang:1.17.0-alpine as builder
RUN apk add --no-cache git gcc linux-headers alpine-sdk bash
WORKDIR /app
RUN git clone https://github.com/certusone/wormhole.git
WORKDIR /app/wormhole/tools
RUN CGO_ENABLED=0 ./build.sh
WORKDIR /app/wormhole
RUN tools/bin/buf lint && tools/bin/buf generate
WORKDIR /app/wormhole/node/tools
RUN go build -mod=readonly -o /dlv github.com/go-delve/delve/cmd/dlv
WORKDIR /app/wormhole/node
RUN go build -race -gcflags="all=-N -l" -mod=readonly -o /guardiand github.com/certusone/wormhole/node
FROM docker.io/golang:1.17.0-alpine
WORKDIR /app
COPY --from=builder /guardiand /app/guardiand
ENV PATH="/app:${PATH}"
RUN addgroup -S pyth -g 10001 && adduser -S pyth -G pyth -u 10001
RUN chown -R pyth:pyth .
USER pyth
ENTRYPOINT [ "guardiand", "spy", "--nodeKey", "/tmp/node.key" ]

48
pyth_relay/README.md Normal file
View File

@ -0,0 +1,48 @@
# Setup Spy Guardian and Pyth Relay
To build the spy_guardian docker container:
```
$ cd pyth_relay
$ docker build -f Dockerfile.spy_guardian -t spy_guardian .
```
To build the pyth_relay docker container:
```
$ cd pyth_relay
$ docker build -f Dockerfile.pyth_relay -t pyth_relay .
```
Run the spy_guardian docker container in TestNet:
```
$ docker run --platform linux/amd64 -d --network=host spy_guardian \
--bootstrap /dns4/wormhole-testnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWBY9ty9CXLBXGQzMuqkziLntsVcyz4pk1zWaJRvJn6Mmt \
--network /wormhole/testnet/2/1 \
--spyRPC "[::]:7073"
```
Or run the spy_guardian docker container in MainNet:
For the MainNet gossip network parameters, see https://github.com/certusone/wormhole-networks/blob/master/mainnetv2/info.md
```
$ docker run --platform linux/amd64 -d --network=host spy_guardian \
--bootstrap <guardianNetworkBootstrapParameterForMainNet> \
--network <guardianNetworkPathForMainNet> \
--spyRPC "[::]:7073"
```
Then to run the pyth_relay docker container using a config file called
${HOME}/pyth_relay/env and logging to directory ${HOME}/pyth_relay/logs, do the
following:
```
$ docker run \
--volume=${HOME}/pyth_relay:/var/pyth_relay \
-e PYTH_RELAY_CONFIG=/var/pyth_relay/env \
--network=host \
-d \
pyth_relay
```

13501
pyth_relay/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

49
pyth_relay/package.json Normal file
View File

@ -0,0 +1,49 @@
{
"name": "pyth_relay",
"version": "1.0.0",
"description": "Pyth relayer",
"main": "index.js",
"scripts": {
"build": "tsc",
"start": "node lib/index.js",
"listen_only": "node lib/index.js --listen_only"
},
"author": "",
"license": "Apache-2.0",
"devDependencies": {
"@improbable-eng/grpc-web-node-http-transport": "^0.15.0",
"@types/jest": "^27.0.2",
"@types/long": "^4.0.1",
"@types/node": "^16.6.1",
"axios": "^0.24.0",
"esm": "^3.2.25",
"ethers": "^5.4.4",
"jest": "^27.3.1",
"prettier": "^2.3.2",
"ts-jest": "^27.0.7",
"tslint": "^6.1.3",
"tslint-config-prettier": "^1.18.0",
"typescript": "^4.3.5"
},
"dependencies": {
"@certusone/wormhole-sdk": "^0.1.4",
"@certusone/wormhole-spydk": "^0.0.1",
"@solana/spl-token": "^0.1.8",
"@solana/web3.js": "^1.24.0",
"@terra-money/terra.js": "^3.0.4",
"@types/express": "^4.17.13",
"async-mutex": "^0.3.2",
"body-parser": "^1.19.0",
"condition-variable": "^1.0.0",
"cors": "^2.8.5",
"dotenv": "^10.0.0",
"express": "^4.17.2",
"prom-client": "^14.0.1",
"redis": "^4.0.0",
"winston": "^3.3.3"
},
"directories": {
"lib": "lib"
},
"keywords": []
}

175
pyth_relay/src/helpers.ts Normal file
View File

@ -0,0 +1,175 @@
////////////////////////////////// Start of Logger Stuff //////////////////////////////////////
export let logger: any;
export function initLogger() {
const winston = require("winston");
let useConsole: boolean = true;
let logFileName: string = "";
if (process.env.LOG_DIR) {
useConsole = false;
logFileName =
process.env.LOG_DIR + "/pyth_relay." + new Date().toISOString() + ".log";
}
let logLevel = "info";
if (process.env.LOG_LEVEL) {
logLevel = process.env.LOG_LEVEL;
}
let transport: any;
if (useConsole) {
console.log("pyth_relay is logging to the console at level [%s]", logLevel);
transport = new winston.transports.Console({
level: logLevel,
});
} else {
console.log(
"pyth_relay is logging to [%s] at level [%s]",
logFileName,
logLevel
);
transport = new winston.transports.File({
filename: logFileName,
level: logLevel,
});
}
const logConfiguration = {
transports: [transport],
format: winston.format.combine(
winston.format.splat(),
winston.format.simple(),
winston.format.timestamp({
format: "YYYY-MM-DD HH:mm:ss.SSS",
}),
winston.format.printf(
(info: any) => `${[info.timestamp]}|${info.level}|${info.message}`
)
),
};
logger = winston.createLogger(logConfiguration);
}
////////////////////////////////// Start of PYTH Stuff //////////////////////////////////////
/*
// Pyth PriceAttestation messages are defined in wormhole/ethereum/contracts/pyth/PythStructs.sol
// The Pyth smart contract stuff is in terra/contracts/pyth-bridge
struct Ema {
int64 value;
int64 numerator;
int64 denominator;
}
struct PriceAttestation {
uint32 magic; // constant "P2WH"
uint16 version;
// PayloadID uint8 = 1
uint8 payloadId;
bytes32 productId;
bytes32 priceId;
uint8 priceType;
int64 price;
int32 exponent;
Ema twap;
Ema twac;
uint64 confidenceInterval;
uint8 status;
uint8 corpAct;
uint64 timestamp;
}
0 uint32 magic // constant "P2WH"
4 u16 version
6 u8 payloadId // 1
7 [u8; 32] productId
39 [u8; 32] priceId
71 u8 priceType
72 i64 price
80 i32 exponent
84 PythEma twap
108 PythEma twac
132 u64 confidenceInterval
140 u8 status
141 u8 corpAct
142 u64 timestamp
*/
export const PYTH_PRICE_ATTESTATION_LENGTH: number = 150;
export type PythEma = {
value: BigInt;
numerator: BigInt;
denominator: BigInt;
};
export type PythPriceAttestation = {
magic: number;
version: number;
payloadId: number;
productId: string;
priceId: string;
priceType: number;
price: BigInt;
exponent: number;
twap: PythEma;
twac: PythEma;
confidenceInterval: BigInt;
status: number;
corpAct: number;
timestamp: BigInt;
};
export const PYTH_MAGIC: number = 0x50325748;
export function parsePythPriceAttestation(arr: Buffer): PythPriceAttestation {
return {
magic: arr.readUInt32BE(0),
version: arr.readUInt16BE(4),
payloadId: arr[6],
productId: arr.slice(7, 7 + 32).toString("hex"),
priceId: arr.slice(39, 39 + 32).toString("hex"),
priceType: arr[71],
price: arr.readBigInt64BE(72),
exponent: arr.readInt32BE(80),
twap: {
value: arr.readBigInt64BE(84),
numerator: arr.readBigInt64BE(92),
denominator: arr.readBigInt64BE(100),
},
twac: {
value: arr.readBigInt64BE(108),
numerator: arr.readBigInt64BE(116),
denominator: arr.readBigInt64BE(124),
},
confidenceInterval: arr.readBigUInt64BE(132),
status: arr.readUInt32BE(140),
corpAct: arr.readUInt32BE(141),
timestamp: arr.readBigUInt64BE(142),
};
}
////////////////////////////////// Start of Other Helpful Stuff //////////////////////////////////////
export function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
export function computePrice(rawPrice: BigInt, expo: number): number {
return Number(rawPrice) * 10 ** expo;
}

64
pyth_relay/src/index.ts Normal file
View File

@ -0,0 +1,64 @@
import { setDefaultWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
import * as listen from "./listen";
import * as worker from "./worker";
import * as rest from "./rest";
import * as helpers from "./helpers";
import { logger } from "./helpers";
import { PromHelper } from "./promHelpers";
let configFile: string = ".env";
if (process.env.PYTH_RELAY_CONFIG) {
configFile = process.env.PYTH_RELAY_CONFIG;
}
console.log("Loading config file [%s]", configFile);
require("dotenv").config({ path: configFile });
setDefaultWasm("node");
// Set up the logger.
helpers.initLogger();
let error: boolean = false;
let listenOnly: boolean = false;
for (let idx = 0; idx < process.argv.length; ++idx) {
if (process.argv[idx] === "--listen_only") {
logger.info("running in listen only mode, will not relay anything!");
listenOnly = true;
}
}
if (
!error &&
listen.init(listenOnly) &&
worker.init(!listenOnly) &&
rest.init(!listenOnly)
) {
// Start the Prometheus client with the app name and http port
let promPort = 8081;
if (process.env.PROM_PORT) {
promPort = parseInt(process.env.PROM_PORT);
}
logger.info("prometheus client listening on port " + promPort);
const promClient = new PromHelper("pyth_relay", promPort);
listen.run(promClient);
if (!listenOnly) {
worker.run(promClient);
rest.run();
}
if (process.env.READINESS_PORT) {
const readinessPort: number = parseInt(process.env.READINESS_PORT);
const Net = require("net");
const readinessServer = new Net.Server();
readinessServer.listen(readinessPort, function () {
logger.info("listening for readiness requests on port " + readinessPort);
});
readinessServer.on("connection", function (socket: any) {
//logger.debug("readiness connection");
});
}
}

247
pyth_relay/src/listen.ts Normal file
View File

@ -0,0 +1,247 @@
import {
ChainId,
CHAIN_ID_SOLANA,
CHAIN_ID_TERRA,
hexToUint8Array,
uint8ArrayToHex,
getEmitterAddressEth,
getEmitterAddressSolana,
getEmitterAddressTerra,
} from "@certusone/wormhole-sdk";
import {
createSpyRPCServiceClient,
subscribeSignedVAA,
} from "@certusone/wormhole-spydk";
import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
import * as helpers from "./helpers";
import { logger } from "./helpers";
import { postEvent } from "./worker";
import { PromHelper } from "./promHelpers";
let seqMap = new Map<string, number>();
let listenOnly: boolean = false;
let metrics: PromHelper;
export function init(lo: boolean): boolean {
listenOnly = lo;
if (!process.env.SPY_SERVICE_HOST) {
logger.error("Missing environment variable SPY_SERVICE_HOST");
return false;
}
return true;
}
export async function run(pm: PromHelper) {
metrics = pm;
logger.info(
"pyth_relay starting up, will listen for signed VAAs from [" +
process.env.SPY_SERVICE_HOST +
"]"
);
(async () => {
let filter = {};
if (process.env.SPY_SERVICE_FILTERS) {
const parsedJsonFilters = eval(process.env.SPY_SERVICE_FILTERS);
let myFilters = [];
for (let i = 0; i < parsedJsonFilters.length; i++) {
let myChainId = parseInt(parsedJsonFilters[i].chain_id) as ChainId;
let myEmitterAddress = parsedJsonFilters[i].emitter_address;
// let myEmitterAddress = await encodeEmitterAddress(
// myChainId,
// parsedJsonFilters[i].emitter_address
// );
let myEmitterFilter = {
emitterFilter: {
chainId: myChainId,
emitterAddress: myEmitterAddress,
},
};
logger.info(
"adding filter: chainId: [" +
myEmitterFilter.emitterFilter.chainId +
"], emitterAddress: [" +
myEmitterFilter.emitterFilter.emitterAddress +
"]"
);
myFilters.push(myEmitterFilter);
}
logger.info("setting " + myFilters.length + " filters");
filter = {
filters: myFilters,
};
} else {
logger.info("processing all signed VAAs");
}
while (true) {
let stream: any;
try {
const client = createSpyRPCServiceClient(
process.env.SPY_SERVICE_HOST || ""
);
stream = await subscribeSignedVAA(client, filter);
stream.on("data", ({ vaaBytes }: { vaaBytes: string }) => {
processVaa(vaaBytes);
});
let connected = true;
stream.on("error", (err: any) => {
logger.error("spy service returned an error: %o", err);
connected = false;
});
stream.on("close", () => {
logger.error("spy service closed the connection!");
connected = false;
});
logger.info("connected to spy service, listening for messages");
while (connected) {
await helpers.sleep(1000);
}
} catch (e) {
logger.error("spy service threw an exception: %o", e);
}
stream.end;
await helpers.sleep(5 * 1000);
logger.info("attempting to reconnect to the spy service");
}
})();
}
async function encodeEmitterAddress(
myChainId: ChainId,
emitterAddressStr: string
): Promise<string> {
if (myChainId === CHAIN_ID_SOLANA) {
return await getEmitterAddressSolana(emitterAddressStr);
}
if (myChainId === CHAIN_ID_TERRA) {
return await getEmitterAddressTerra(emitterAddressStr);
}
return getEmitterAddressEth(emitterAddressStr);
}
async function processVaa(vaaBytes: string) {
let receiveTime = new Date();
const { parse_vaa } = await importCoreWasm();
const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes));
// logger.debug(
// "processVaa, vaa len: " +
// vaaBytes.length +
// ", payload len: " +
// parsedVAA.payload.length
// );
// logger.debug("listen:processVaa: parsedVAA: %o", parsedVAA);
if (isPyth(parsedVAA.payload)) {
if (parsedVAA.payload.length < helpers.PYTH_PRICE_ATTESTATION_LENGTH) {
logger.error(
"dropping vaa because the payload length is wrong: length: " +
parsedVAA.payload.length +
", expected length:",
helpers.PYTH_PRICE_ATTESTATION_LENGTH + ", vaa: %o",
parsedVAA
);
return;
}
let pa = helpers.parsePythPriceAttestation(Buffer.from(parsedVAA.payload));
// logger.debug("listen:processVaa: price attestation: %o", pa);
let key = pa.priceId;
let lastSeqNum = seqMap.get(key);
if (lastSeqNum) {
if (lastSeqNum >= parsedVAA.sequence) {
logger.debug(
"ignoring duplicate: emitter: [" +
parsedVAA.emitter_chain +
":" +
uint8ArrayToHex(parsedVAA.emitter_address) +
"], productId: [" +
pa.productId +
"], priceId: [" +
pa.priceId +
"], seqNum: " +
parsedVAA.sequence
);
return;
}
}
seqMap.set(key, parsedVAA.sequence);
logger.info(
"received: emitter: [" +
parsedVAA.emitter_chain +
":" +
uint8ArrayToHex(parsedVAA.emitter_address) +
"], seqNum: " +
parsedVAA.sequence +
", productId: [" +
pa.productId +
"], priceId: [" +
pa.priceId +
"], priceType: " +
pa.priceType +
", price: " +
pa.price +
", exponent: " +
pa.exponent +
", confidenceInterval: " +
pa.confidenceInterval +
", timeStamp: " +
pa.timestamp +
", computedPrice: " +
helpers.computePrice(pa.price, pa.exponent) +
" +/-" +
helpers.computePrice(pa.confidenceInterval, pa.exponent)
// +
// ", payload: [" +
// uint8ArrayToHex(parsedVAA.payload) +
// "]"
);
metrics.incIncoming();
if (!listenOnly) {
logger.debug("posting to worker");
await postEvent(vaaBytes, pa, parsedVAA.sequence, receiveTime);
}
} else {
logger.debug(
"dropping non-pyth vaa, payload type " +
parsedVAA.payload[0] +
", vaa: %o",
parsedVAA
);
}
}
function isPyth(payload: Buffer): boolean {
if (payload.length < 4) return false;
if (
payload[0] === 80 &&
payload[1] === 50 &&
payload[2] === 87 &&
payload[3] === 72
) {
// P2WH
return true;
}
return false;
}

View File

@ -0,0 +1,148 @@
import http = require("http");
import client = require("prom-client");
// NOTE: To create a new metric:
// 1) Create a private counter/gauge with appropriate name and help
// 2) Create a method to set the metric to a value
// 3) Register the metric
export class PromHelper {
private register = new client.Registry();
private walletReg = new client.Registry();
private collectDefaultMetrics = client.collectDefaultMetrics;
// Actual metrics
private seqNumGauge = new client.Gauge({
name: "seqNum",
help: "Last sent sequence number",
});
private successCounter = new client.Counter({
name: "successes",
help: "number of successful relays",
});
private failureCounter = new client.Counter({
name: "failures",
help: "number of failed relays",
});
private completeTime = new client.Histogram({
name: "complete_time",
help: "Time is took to complete transfer",
buckets: [400, 800, 1600, 3200, 6400, 12800],
});
private walletBalance = new client.Gauge({
name: "wallet_balance",
help: "The wallet balance",
labelNames: ["timestamp"],
registers: [this.walletReg],
});
private listenCounter = new client.Counter({
name: "VAAs_received",
help: "number of Pyth VAAs received",
});
private alreadyExecutedCounter = new client.Counter({
name: "already_executed",
help: "number of transfers rejected due to already having been executed",
});
private transferTimeoutCounter = new client.Counter({
name: "transfer_timeout",
help: "number of transfers that timed out",
});
private seqNumMismatchCounter = new client.Counter({
name: "seq_num_mismatch",
help: "number of transfers that failed due to sequence number mismatch",
});
private retryCounter = new client.Counter({
name: "retries",
help: "number of retry attempts",
});
private retriesExceededCounter = new client.Counter({
name: "retries_exceeded",
help: "number of transfers that failed due to exceeding the retry count",
});
private insufficentFundsCounter = new client.Counter({
name: "insufficient_funds",
help: "number of transfers that failed due to insufficient funds count",
});
// End metrics
private server = http.createServer(async (req, res) => {
if (req.url === "/metrics") {
// Return all metrics in the Prometheus exposition format
res.setHeader("Content-Type", this.register.contentType);
res.write(await this.register.metrics());
res.end(await this.walletReg.metrics());
}
});
constructor(name: string, port: number) {
this.register.setDefaultLabels({
app: name,
});
this.collectDefaultMetrics({ register: this.register });
// Register each metric
this.register.registerMetric(this.seqNumGauge);
this.register.registerMetric(this.successCounter);
this.register.registerMetric(this.failureCounter);
this.register.registerMetric(this.completeTime);
this.register.registerMetric(this.listenCounter);
this.register.registerMetric(this.alreadyExecutedCounter);
this.register.registerMetric(this.transferTimeoutCounter);
this.register.registerMetric(this.seqNumMismatchCounter);
this.register.registerMetric(this.retryCounter);
this.register.registerMetric(this.retriesExceededCounter);
this.register.registerMetric(this.insufficentFundsCounter);
// End registering metric
this.server.listen(port);
}
// These are the accessor methods for the metrics
setSeqNum(sn: number) {
this.seqNumGauge.set(sn);
}
incSuccesses() {
this.successCounter.inc();
}
incFailures() {
this.failureCounter.inc();
}
addCompleteTime(val: number) {
this.completeTime.observe(val);
}
setWalletBalance(bal: number) {
this.walletReg.clear();
// this.walletReg = new client.Registry();
this.walletBalance = new client.Gauge({
name: "wallet_balance",
help: "The wallet balance",
labelNames: ["timestamp"],
registers: [this.walletReg],
});
this.walletReg.registerMetric(this.walletBalance);
let now = new Date();
// this.walletDate = now.toString();
this.walletBalance.set({ timestamp: now.toString() }, bal);
// this.walletBalance.set(bal);
}
incIncoming() {
this.listenCounter.inc();
}
incAlreadyExec() {
this.alreadyExecutedCounter.inc();
}
incTransferTimeout() {
this.transferTimeoutCounter.inc();
}
incSeqNumMismatch() {
this.seqNumMismatchCounter.inc();
}
incRetries() {
this.retryCounter.inc();
}
incRetriesExceeded() {
this.retriesExceededCounter.inc();
}
incInsufficentFunds() {
this.insufficentFundsCounter.inc();
}
}

View File

@ -0,0 +1,73 @@
import {
connectToTerra,
queryBalanceOnTerra,
queryTerra,
relayTerra,
setAccountNumOnTerra,
setSeqNumOnTerra,
TerraConnectionData,
} from "./terra";
export type ConnectionData = {
terraData: TerraConnectionData;
};
import { logger } from "../helpers";
export function connectRelayer(): ConnectionData {
let td = connectToTerra();
return { terraData: td };
}
export async function setAccountNum(connectionData: ConnectionData) {
try {
await setAccountNumOnTerra(connectionData.terraData);
} catch (e) {
logger.error("setAccountNum: query failed: %o", e);
}
}
export async function setSeqNum(connectionData: ConnectionData) {
try {
await setSeqNumOnTerra(connectionData.terraData);
} catch (e) {
logger.error("setSeqNum: query failed: %o", e);
}
}
// Exceptions from this method are caught at the higher level.
export async function relay(
signedVAAs: Array<string>,
connectionData: ConnectionData
): Promise<any> {
return await relayTerra(connectionData.terraData, signedVAAs);
}
export async function query(
productIdStr: string,
priceIdStr: string
): Promise<any> {
let result: any;
try {
let terraData = connectToTerra();
result = await queryTerra(terraData, productIdStr, priceIdStr);
} catch (e) {
logger.error("query failed: %o", e);
result = "Error: unhandled exception";
}
return result;
}
export async function queryBalance(
connectionData: ConnectionData
): Promise<number> {
let balance: number = NaN;
try {
balance = await queryBalanceOnTerra(connectionData.terraData);
} catch (e) {
logger.error("balance query failed: %o", e);
}
return balance;
}

View File

@ -0,0 +1,260 @@
import { fromUint8Array } from "js-base64";
import {
LCDClient,
LCDClientConfig,
MnemonicKey,
MsgExecuteContract,
} from "@terra-money/terra.js";
import { hexToUint8Array } from "@certusone/wormhole-sdk";
import { redeemOnTerra } from "@certusone/wormhole-sdk";
import { logger } from "../helpers";
export type TerraConnectionData = {
nodeUrl: string;
terraChainId: string;
terraName: string;
walletPrivateKey: string;
coin: string;
contractAddress: string;
lcdConfig: LCDClientConfig;
walletSeqNum: number;
walletAccountNum: number;
};
export function connectToTerra(): TerraConnectionData {
if (!process.env.TERRA_NODE_URL) {
throw "Missing environment variable TERRA_NODE_URL";
}
if (!process.env.TERRA_CHAIN_ID) {
throw "Missing environment variable TERRA_CHAIN_ID";
}
if (!process.env.TERRA_NAME) {
throw "Missing environment variable TERRA_NAME";
}
if (!process.env.TERRA_PRIVATE_KEY) {
throw "Missing environment variable TERRA_PRIVATE_KEY";
}
if (!process.env.TERRA_COIN) {
throw "Missing environment variable TERRA_COIN";
}
if (!process.env.TERRA_PYTH_CONTRACT_ADDRESS) {
throw "Missing environment variable TERRA_PYTH_CONTRACT_ADDRESS";
}
logger.info(
"Terra connection parameters: url: [" +
process.env.TERRA_NODE_URL +
"], terraChainId: [" +
process.env.TERRA_CHAIN_ID +
"], terraName: [" +
process.env.TERRA_NAME +
"], coin: [" +
process.env.TERRA_COIN +
"], contractAddress: [" +
process.env.TERRA_PYTH_CONTRACT_ADDRESS +
"]"
);
const lcdConfig = {
URL: process.env.TERRA_NODE_URL,
chainID: process.env.TERRA_CHAIN_ID,
name: process.env.TERRA_NAME,
};
return {
nodeUrl: process.env.TERRA_NODE_URL,
terraChainId: process.env.TERRA_CHAIN_ID,
terraName: process.env.TERRA_NAME,
walletPrivateKey: process.env.TERRA_PRIVATE_KEY,
coin: process.env.TERRA_COIN,
contractAddress: process.env.TERRA_PYTH_CONTRACT_ADDRESS,
lcdConfig: lcdConfig,
walletSeqNum: 0,
walletAccountNum: 0,
};
}
export async function relayTerra(
connectionData: TerraConnectionData,
signedVAAs: Array<string>
) {
logger.debug("relaying " + signedVAAs.length + " messages to terra");
logger.debug("TIME: connecting to terra");
const lcdClient = new LCDClient(connectionData.lcdConfig);
const mk = new MnemonicKey({
mnemonic: connectionData.walletPrivateKey,
});
const wallet = lcdClient.wallet(mk);
logger.debug("TIME: creating messages");
let msgs = new Array<MsgExecuteContract>();
for (let idx = 0; idx < signedVAAs.length; ++idx) {
const msg = new MsgExecuteContract(
wallet.key.accAddress,
connectionData.contractAddress,
{
submit_vaa: {
data: Buffer.from(signedVAAs[idx], "hex").toString("base64"),
},
}
);
msgs.push(msg);
}
// logger.debug("TIME: looking up gas");
// //Alternate FCD methodology
// //let gasPrices = await axios.get("http://localhost:3060/v1/txs/gas_prices").then((result) => result.data);
// const gasPrices = lcdClient.config.gasPrices;
// logger.debug("TIME: estimating fees");
// //const walletSequence = await wallet.sequence();
// const feeEstimate = await lcdClient.tx.estimateFee(
// wallet.key.accAddress,
// msgs,
// {
// //TODO figure out type mismatch
// feeDenoms: [connectionData.coin],
// gasPrices,
// }
// );
logger.debug(
"TIME: creating transaction using seq number " +
connectionData.walletSeqNum +
" and account number " +
connectionData.walletAccountNum
);
const tx = await wallet.createAndSignTx({
sequence: connectionData.walletSeqNum,
accountNumber: connectionData.walletAccountNum,
msgs: msgs,
memo: "P2T",
feeDenoms: [connectionData.coin],
});
connectionData.walletSeqNum = connectionData.walletSeqNum + 1;
logger.debug("TIME: sending msg");
const receipt = await lcdClient.tx.broadcastSync(tx);
logger.debug("TIME:submitted to terra: receipt: %o", receipt);
return receipt;
}
export async function queryTerra(
connectionData: TerraConnectionData,
productIdStr: string,
priceIdStr: string
) {
const encodedProductId = fromUint8Array(hexToUint8Array(productIdStr));
const encodedPriceId = fromUint8Array(hexToUint8Array(priceIdStr));
logger.info(
"Querying terra for price info for productId [" +
productIdStr +
"], encoded as [" +
encodedProductId +
"], priceId [" +
priceIdStr +
"], encoded as [" +
encodedPriceId +
"]"
);
const lcdClient = new LCDClient(connectionData.lcdConfig);
const mk = new MnemonicKey({
mnemonic: connectionData.walletPrivateKey,
});
const wallet = lcdClient.wallet(mk);
const query_result = await lcdClient.wasm.contractQuery(
connectionData.contractAddress,
{
price_info: {
product_id: encodedProductId,
price_id: encodedPriceId,
},
}
);
logger.debug("queryTerra: query returned: %o", query_result);
return query_result;
}
export async function queryBalanceOnTerra(connectionData: TerraConnectionData) {
const lcdClient = new LCDClient(connectionData.lcdConfig);
const mk = new MnemonicKey({
mnemonic: connectionData.walletPrivateKey,
});
const wallet = lcdClient.wallet(mk);
let balance: number = NaN;
try {
logger.debug("querying wallet balance");
let coins: any;
let pagnation: any;
[coins, pagnation] = await lcdClient.bank.balance(wallet.key.accAddress);
logger.debug("wallet query returned: %o", coins);
if (coins) {
let coin = coins.get(connectionData.coin);
if (coin) {
balance = parseInt(coin.toData().amount);
} else {
logger.error(
"failed to query coin balance, coin [" +
connectionData.coin +
"] is not in the wallet, coins: %o",
coins
);
}
} else {
logger.error("failed to query coin balance!");
}
} catch (e) {
logger.error("failed to query coin balance: %o", e);
}
return balance;
}
export async function setAccountNumOnTerra(
connectionData: TerraConnectionData
) {
const lcdClient = new LCDClient(connectionData.lcdConfig);
const mk = new MnemonicKey({
mnemonic: process.env.TERRA_PRIVATE_KEY,
});
const wallet = lcdClient.wallet(mk);
logger.debug("getting wallet account num");
connectionData.walletAccountNum = await wallet.accountNumber();
logger.debug("wallet account num is " + connectionData.walletAccountNum);
}
export async function setSeqNumOnTerra(connectionData: TerraConnectionData) {
const lcdClient = new LCDClient(connectionData.lcdConfig);
const mk = new MnemonicKey({
mnemonic: process.env.TERRA_PRIVATE_KEY,
});
const wallet = lcdClient.wallet(mk);
logger.debug("getting wallet seq num");
connectionData.walletSeqNum = await wallet.sequence();
logger.debug("wallet seq num is " + connectionData.walletSeqNum);
}

48
pyth_relay/src/rest.ts Normal file
View File

@ -0,0 +1,48 @@
import { Request, Response } from "express";
import { logger } from "./helpers";
import { getStatus, getPriceData } from "./worker";
let restPort: number = 0;
export function init(runRest: boolean): boolean {
if (!runRest) return true;
if (!process.env.REST_PORT) return true;
restPort = parseInt(process.env.REST_PORT);
return true;
}
export async function run() {
if (restPort == 0) return;
const express = require("express");
const cors = require("cors");
const app = express();
app.use(cors());
app.listen(restPort, () =>
logger.debug("listening on REST port " + restPort)
);
(async () => {
app.get("/status", async (req: Request, res: Response) => {
let result = await getStatus();
res.json(result);
});
app.get(
"/queryterra/:product_id/:price_id",
async (req: Request, res: Response) => {
let result = await getPriceData(
req.params.product_id,
req.params.price_id
);
res.json(result);
}
);
app.get("/", (req: Request, res: Response) =>
res.json(["/status", "/queryterra/<product_id>/<price_id>"])
);
})();
}

571
pyth_relay/src/worker.ts Normal file
View File

@ -0,0 +1,571 @@
import { Mutex } from "async-mutex";
let CondVar = require("condition-variable");
import { setDefaultWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
import { uint8ArrayToHex } from "@certusone/wormhole-sdk";
import * as helpers from "./helpers";
import { logger } from "./helpers";
import * as main from "./relay/main";
import { PromHelper } from "./promHelpers";
const mutex = new Mutex();
let condition = new CondVar();
let conditionTimeout = 20000;
type PendingPayload = {
vaa_bytes: string;
pa: helpers.PythPriceAttestation;
receiveTime: Date;
seqNum: number;
};
let pendingMap = new Map<string, PendingPayload>(); // The key to this is price_id. Note that Map maintains insertion order, not key order.
type ProductData = {
key: string;
lastTimePublished: Date;
numTimesPublished: number;
lastPa: helpers.PythPriceAttestation;
lastResult: any;
};
type CurrentEntry = {
pendingEntry: PendingPayload;
currObj: ProductData;
};
let productMap = new Map<string, ProductData>(); // The key to this is price_id
let connectionData: main.ConnectionData;
let metrics: PromHelper;
let nextBalanceQueryTimeAsMs: number = 0;
let balanceQueryInterval = 0;
let walletTimeStamp: Date;
let maxPerBatch: number = 1;
let maxAttempts: number = 2;
let retryDelayInMs: number = 0;
export function init(runWorker: boolean): boolean {
if (!runWorker) return true;
try {
connectionData = main.connectRelayer();
} catch (e) {
logger.error("failed to load connection config: %o", e);
return false;
}
if (process.env.MAX_MSGS_PER_BATCH) {
maxPerBatch = parseInt(process.env.MAX_MSGS_PER_BATCH);
}
if (maxPerBatch <= 0) {
logger.error(
"Environment variable MAX_MSGS_PER_BATCH has an invalid value of " +
maxPerBatch +
", must be greater than zero."
);
return false;
}
if (process.env.RETRY_MAX_ATTEMPTS) {
maxAttempts = parseInt(process.env.RETRY_MAX_ATTEMPTS);
}
if (maxAttempts <= 0) {
logger.error(
"Environment variable RETRY_MAX_ATTEMPTS has an invalid value of " +
maxAttempts +
", must be greater than zero."
);
return false;
}
if (process.env.RETRY_DELAY_IN_MS) {
retryDelayInMs = parseInt(process.env.RETRY_DELAY_IN_MS);
}
if (retryDelayInMs < 0) {
logger.error(
"Environment variable RETRY_DELAY_IN_MS has an invalid value of " +
retryDelayInMs +
", must be positive or zero."
);
return false;
}
return true;
}
export async function run(met: PromHelper) {
setDefaultWasm("node");
metrics = met;
await mutex.runExclusive(async () => {
logger.info(
"will attempt to relay each pyth message at most " +
maxAttempts +
" times, with a delay of " +
retryDelayInMs +
" milliseconds between attempts, will batch up to " +
maxPerBatch +
" pyth messages in a batch"
);
if (process.env.BAL_QUERY_INTERVAL) {
balanceQueryInterval = parseInt(process.env.BAL_QUERY_INTERVAL);
}
await main.setAccountNum(connectionData);
logger.info(
"wallet account number is " + connectionData.terraData.walletAccountNum
);
await main.setSeqNum(connectionData);
logger.info(
"initial wallet sequence number is " +
connectionData.terraData.walletSeqNum
);
let balance = await main.queryBalance(connectionData);
if (!isNaN(balance)) {
walletTimeStamp = new Date();
}
if (balanceQueryInterval !== 0) {
logger.info(
"initial wallet balance is " +
balance +
", will query every " +
balanceQueryInterval +
" milliseconds."
);
metrics.setWalletBalance(balance);
nextBalanceQueryTimeAsMs = new Date().getTime() + balanceQueryInterval;
} else {
logger.info("initial wallet balance is " + balance);
metrics.setWalletBalance(balance);
}
await condition.wait(computeTimeout(), callBack);
});
}
async function callBack(err: any, result: any) {
logger.debug(
"entering callback, pendingEvents: " +
pendingMap.size +
", err: %o, result: %o",
err,
result
);
// condition = null;
// await helpers.sleep(10000);
// logger.debug("done with long sleep");
let done = false;
do {
let currObjs = new Array<CurrentEntry>();
let messages = new Array<string>();
await mutex.runExclusive(async () => {
condition = null;
logger.debug("in callback, getting pending events.");
await getPendingEventsAlreadyLocked(currObjs, messages);
if (currObjs.length === 0) {
done = true;
condition = new CondVar();
await condition.wait(computeTimeout(), callBack);
}
});
if (currObjs.length !== 0) {
logger.debug("in callback, relaying " + currObjs.length + " events.");
let sendTime = new Date();
let retVal: number;
let relayResult: any;
[retVal, relayResult] = await relayEventsNotLocked(messages);
await mutex.runExclusive(async () => {
logger.debug("in callback, finalizing " + currObjs.length + " events.");
await finalizeEventsAlreadyLocked(
currObjs,
retVal,
relayResult,
sendTime
);
if (pendingMap.size === 0) {
logger.debug("in callback, rearming the condition.");
done = true;
condition = new CondVar();
await condition.wait(computeTimeout(), callBack);
}
});
}
} while (!done);
logger.debug("leaving callback.");
}
function computeTimeout(): number {
if (balanceQueryInterval !== 0) {
let now = new Date().getTime();
if (now < nextBalanceQueryTimeAsMs) {
return nextBalanceQueryTimeAsMs - now;
}
return 0;
}
return conditionTimeout;
}
async function getPendingEventsAlreadyLocked(
currObjs: Array<CurrentEntry>,
messages: Array<string>
) {
while (pendingMap.size !== 0 && currObjs.length < maxPerBatch) {
const first = pendingMap.entries().next();
logger.debug("processing event with key [" + first.value[0] + "]");
const pendingValue = first.value[1];
let pendingKey = pendingValue.pa.priceId;
let currObj = productMap.get(pendingKey);
if (currObj) {
currObj.lastPa = pendingValue.pa;
currObj.lastTimePublished = new Date();
productMap.set(pendingKey, currObj);
logger.debug(
"processing update " +
currObj.numTimesPublished +
" for [" +
pendingKey +
"], seq num " +
pendingValue.seqNum
);
} else {
logger.debug(
"processing first update for [" +
pendingKey +
"], seq num " +
pendingValue.seqNum
);
currObj = {
key: pendingKey,
lastPa: pendingValue.pa,
lastTimePublished: new Date(),
numTimesPublished: 0,
lastResult: "",
};
productMap.set(pendingKey, currObj);
}
currObjs.push({ pendingEntry: pendingValue, currObj: currObj });
messages.push(pendingValue.vaa_bytes);
pendingMap.delete(first.value[0]);
}
if (currObjs.length !== 0) {
for (let idx = 0; idx < currObjs.length; ++idx) {
pendingMap.delete(currObjs[idx].currObj.key);
}
}
}
const RELAY_SUCCESS: number = 0;
const RELAY_FAIL: number = 1;
const RELAY_ALREADY_EXECUTED: number = 2;
const RELAY_TIMEOUT: number = 3;
const RELAY_SEQ_NUM_MISMATCH: number = 4;
const RELAY_INSUFFICIENT_FUNDS: number = 5;
async function relayEventsNotLocked(
messages: Array<string>
): Promise<[number, any]> {
let retVal: number = RELAY_SUCCESS;
let relayResult: any;
let retry: boolean = false;
for (let attempt = 0; attempt < maxAttempts; ++attempt) {
retVal = RELAY_SUCCESS;
retry = false;
try {
relayResult = await main.relay(messages, connectionData);
if (relayResult.txhash) {
if (
relayResult.raw_log &&
relayResult.raw_log.search("VaaAlreadyExecuted") >= 0
) {
relayResult = "Already Executed: " + relayResult.txhash;
retVal = RELAY_ALREADY_EXECUTED;
} else if (
relayResult.raw_log &&
relayResult.raw_log.search("insufficient funds") >= 0
) {
logger.error(
"relay failed due to insufficient funds: %o",
relayResult
);
connectionData.terraData.walletSeqNum =
connectionData.terraData.walletSeqNum - 1;
retVal = RELAY_INSUFFICIENT_FUNDS;
} else if (
relayResult.raw_log &&
relayResult.raw_log.search("failed") >= 0
) {
logger.error("relay seems to have failed: %o", relayResult);
retVal = RELAY_FAIL;
retry = true;
} else {
relayResult = relayResult.txhash;
}
} else {
retVal = RELAY_FAIL;
retry = true;
if (relayResult.message) {
relayResult = relayResult.message;
} else {
logger.error("No txhash: %o", relayResult);
relayResult = "No txhash";
}
}
} catch (e: any) {
if (
e.message &&
e.message.search("timeout") >= 0 &&
e.message.search("exceeded") >= 0
) {
logger.error("relay timed out: %o", e);
retVal = RELAY_TIMEOUT;
retry = true;
} else {
logger.error("relay failed: %o", e);
if (e.response && e.response.data) {
if (
e.response.data.error &&
e.response.data.error.search("VaaAlreadyExecuted") >= 0
) {
relayResult = "Already Executed";
retVal = RELAY_ALREADY_EXECUTED;
} else if (
e.response.data.message &&
e.response.data.message.search("account sequence mismatch") >= 0
) {
relayResult = e.response.data.message;
retVal = RELAY_SEQ_NUM_MISMATCH;
retry = true;
logger.debug(
"wallet sequence number is out of sync, querying the current value"
);
await main.setSeqNum(connectionData);
logger.info(
"wallet seq number is now " +
connectionData.terraData.walletSeqNum
);
} else {
retVal = RELAY_FAIL;
retry = true;
if (e.message) {
relayResult = "Error: " + e.message;
} else {
relayResult = "Error: unexpected exception";
}
}
} else {
retVal = RELAY_FAIL;
retry = true;
if (e.message) {
relayResult = "Error: " + e.message;
} else {
relayResult = "Error: unexpected exception";
}
}
}
}
logger.debug(
"relay attempt complete: retVal: " +
retVal +
", retry: " +
retry +
", attempt " +
attempt +
" of " +
maxAttempts
);
if (!retry) {
break;
} else {
metrics.incRetries();
if (retryDelayInMs != 0) {
logger.debug(
"delaying for " + retryDelayInMs + " milliseconds before retrying"
);
await helpers.sleep(retryDelayInMs * (attempt + 1));
}
}
}
if (retry) {
logger.error("failed to relay batch, retry count exceeded!");
metrics.incRetriesExceeded();
}
return [retVal, relayResult];
}
async function finalizeEventsAlreadyLocked(
currObjs: Array<CurrentEntry>,
retVal: number,
relayResult: any,
sendTime: Date
) {
for (let idx = 0; idx < currObjs.length; ++idx) {
let currObj = currObjs[idx].currObj;
let currEntry = currObjs[idx].pendingEntry;
currObj.lastResult = relayResult;
currObj.numTimesPublished = currObj.numTimesPublished + 1;
if (retVal == RELAY_SUCCESS) {
metrics.incSuccesses();
} else if (retVal == RELAY_ALREADY_EXECUTED) {
metrics.incAlreadyExec();
} else if (retVal == RELAY_TIMEOUT) {
metrics.incTransferTimeout();
metrics.incFailures();
} else if (retVal == RELAY_SEQ_NUM_MISMATCH) {
metrics.incSeqNumMismatch();
metrics.incFailures();
} else if (retVal == RELAY_INSUFFICIENT_FUNDS) {
metrics.incInsufficentFunds();
metrics.incFailures();
} else {
metrics.incFailures();
}
productMap.set(currObj.key, currObj);
let completeTime = new Date();
metrics.setSeqNum(currEntry.seqNum);
metrics.addCompleteTime(
completeTime.getTime() - currEntry.receiveTime.getTime()
);
logger.info(
"complete: priceId: " +
currEntry.pa.priceId +
", seqNum: " +
currEntry.seqNum +
", price: " +
helpers.computePrice(currEntry.pa.price, currEntry.pa.exponent) +
", ci: " +
helpers.computePrice(
currEntry.pa.confidenceInterval,
currEntry.pa.exponent
) +
", rcv2SendBegin: " +
(sendTime.getTime() - currEntry.receiveTime.getTime()) +
", rcv2SendComplete: " +
(completeTime.getTime() - currEntry.receiveTime.getTime()) +
", totalSends: " +
currObj.numTimesPublished +
", result: " +
relayResult
);
}
let now = new Date();
if (balanceQueryInterval > 0 && now.getTime() >= nextBalanceQueryTimeAsMs) {
let balance = await main.queryBalance(connectionData);
if (isNaN(balance)) {
logger.error("failed to query wallet balance!");
} else {
if (!isNaN(balance)) {
walletTimeStamp = new Date();
}
logger.info(
"wallet balance: " +
balance +
", update time: " +
walletTimeStamp.toISOString()
);
metrics.setWalletBalance(balance);
}
nextBalanceQueryTimeAsMs = now.getTime() + balanceQueryInterval;
}
}
export async function postEvent(
vaaBytes: any,
pa: helpers.PythPriceAttestation,
sequence: number,
receiveTime: Date
) {
let event: PendingPayload = {
vaa_bytes: uint8ArrayToHex(vaaBytes),
pa: pa,
receiveTime: receiveTime,
seqNum: sequence,
};
let pendingKey = pa.priceId;
// pendingKey = pendingKey + ":" + sequence;
await mutex.runExclusive(() => {
logger.debug("posting event with key [" + pendingKey + "]");
pendingMap.set(pendingKey, event);
if (condition) {
logger.debug("hitting condition variable.");
condition.complete(true);
}
});
}
export async function getStatus() {
let result = "[";
await mutex.runExclusive(() => {
let first: boolean = true;
for (let [key, value] of productMap) {
if (first) {
first = false;
} else {
result = result + ", ";
}
let item: object = {
product_id: value.lastPa.productId,
price_id: value.lastPa.priceId,
price: helpers.computePrice(value.lastPa.price, value.lastPa.exponent),
ci: helpers.computePrice(
value.lastPa.confidenceInterval,
value.lastPa.exponent
),
num_times_published: value.numTimesPublished,
last_time_published: value.lastTimePublished.toISOString(),
result: value.lastResult,
};
result = result + JSON.stringify(item);
}
});
result = result + "]";
return result;
}
// Note that querying the contract does not update the sequence number, so we don't need to be locked.
export async function getPriceData(
productId: string,
priceId: string
): Promise<any> {
let result: any;
// await mutex.runExclusive(async () => {
result = await main.query(productId, priceId);
// });
return result;
}

20
pyth_relay/tsconfig.json Normal file
View File

@ -0,0 +1,20 @@
{
"compilerOptions": {
"outDir": "lib",
"target": "esnext",
"module": "commonjs",
"moduleResolution": "node",
"lib": ["es2019"],
"skipLibCheck": true,
"allowJs": true,
"alwaysStrict": true,
"strict": true,
"forceConsistentCasingInFileNames": true,
"noFallthroughCasesInSwitch": true,
"resolveJsonModule": true,
"isolatedModules": true,
"downlevelIteration": true
},
"include": ["src"],
"exclude": ["node_modules", "**/__tests__/*"]
}

View File

@ -7,9 +7,8 @@
"build": "tsc",
"spy_relay": "node lib/spy_relay.js",
"listen_only": "node lib/spy_relay.js --listen_only",
"worker_only": "node lib/spy_relay.js --worker_only",
"rest_only": "node lib/spy_relay.js --rest_only",
"test": "jest --config jestconfig.json --verbose"
"spy_rest": "node lib/spy_rest.js",
"spy_worker": "node lib/spy_worker.js"
},
"author": "",
"license": "Apache-2.0",

View File

@ -1,80 +1,204 @@
import { setDefaultWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
import {
createSpyRPCServiceClient,
subscribeSignedVAA,
} from "@certusone/wormhole-spydk";
import { spy_listen } from "./spy_listen";
import { spy_worker } from "./spy_worker";
import { spy_rest } from "./spy_rest";
import {
ChainId,
CHAIN_ID_SOLANA,
CHAIN_ID_TERRA,
hexToUint8Array,
uint8ArrayToHex,
parseTransferPayload,
getEmitterAddressEth,
getEmitterAddressSolana,
getEmitterAddressTerra,
} from "@certusone/wormhole-sdk";
import {
importCoreWasm,
setDefaultWasm,
} from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
// import { storeKeyFromParsedVAA, storePayloadFromVaaBytes } from "./helpers";
import * as helpers from "./helpers";
import { RelayerEnvironment, validateEnvironment } from "./configureEnv";
require("dotenv").config();
var pendingMap = new Map<string, string>();
pendingMap.set("XXX", "XXX should be first");
pendingMap.set("CCC", "CCC should be second");
pendingMap.set("XXX", "XXX should still be first");
pendingMap.set("AAA", "AAA should be third");
setDefaultWasm("node");
for (let [pk, pendingValue] of pendingMap) {
console.log("key: [" + pk + "], value: [" + pendingValue + "]");
}
const env: RelayerEnvironment = validateEnvironment();
while (pendingMap.size !== 0) {
const first = pendingMap.entries().next();
var runListen: boolean = true;
var runWorker: boolean = true;
var runRest: boolean = true;
var foundOne: boolean = false;
console.log(
"deleting first item, which is: key: [" +
first.value[0] +
"], value: [" +
first.value[1] +
"]"
);
pendingMap.delete(first.value[0]);
for (let [pk, pendingValue] of pendingMap) {
console.log("key: [" + pk + "], value: [" + pendingValue + "]");
}
}
var listenOnly: boolean = false;
for (let idx = 0; idx < process.argv.length; ++idx) {
if (process.argv[idx] === "--listen_only") {
if (foundOne) {
console.error(
'May only specify one of "--listen_only", "--worker_only" or "--rest_only"'
);
process.exit(1);
}
runWorker = false;
runRest = false;
foundOne = true;
}
if (process.argv[idx] === "--worker_only") {
if (foundOne) {
console.error(
'May only specify one of "--listen_only", "--worker_only" or "--rest_only"'
);
process.exit(1);
}
runListen = false;
runRest = false;
foundOne = true;
}
if (process.argv[idx] === "--rest_only") {
if (foundOne) {
console.error(
'May only specify one of "--listen_only", "--worker_only" or "--rest_only"'
);
process.exit(1);
}
runListen = false;
runWorker = false;
foundOne = true;
console.log("running in listen only mode, will not forward to redis");
listenOnly = true;
break;
}
}
// Start the spy listener to listen to the guardians.
if (runListen) {
spy_listen();
require("dotenv").config();
if (!process.env.SPY_SERVICE_HOST) {
console.error("Missing environment variable SPY_SERVICE_HOST");
process.exit(1);
}
// Start the spy worker to process VAAs from the store.
if (runWorker) {
spy_worker();
// Connect to redis
import { createClient } from "redis";
async function connectToRedis() {
var rClient = createClient();
rClient.on("error", (err) => console.log("Redis Client Error", err));
rClient.on("connect", (err) => console.log("Redis Client Connected", err));
console.log("Attempting to connect...");
await rClient.connect();
return rClient;
}
// Start the REST server, if configured.
if (runRest && process.env.SPY_REST_PORT) {
var restPort = parseInt(process.env.SPY_REST_PORT);
if (!restPort) {
console.error(
"Environment variable SPY_REST_PORT is set to [%s], which is not a valid port number.",
process.env.SPY_REST_PORT
async function storeInRedis(
client: typeof myRedisClient,
name: string,
value: string
) {
await (await client).set(name, value);
}
if (false) {
var myRedisClient;
if (!listenOnly) {
myRedisClient = connectToRedis();
}
console.log(
"spy_relay starting up, will listen for signed VAAs from [%s]",
process.env.SPY_SERVICE_HOST
);
setDefaultWasm("node");
(async () => {
var filter = {};
if (process.env.SPY_SERVICE_FILTERS) {
const parsedJsonFilters = eval(process.env.SPY_SERVICE_FILTERS);
var myFilters = [];
for (var i = 0; i < parsedJsonFilters.length; i++) {
var myChainId = parseInt(parsedJsonFilters[i].chain_id) as ChainId;
var myEmitterAddress = await encodeEmitterAddress(
myChainId,
parsedJsonFilters[i].emitter_address
);
var myEmitterFilter = {
emitterFilter: {
chainId: myChainId,
emitterAddress: myEmitterAddress,
},
};
console.log(
"adding filter: chainId: [%i], emitterAddress: [%s]",
myEmitterFilter.emitterFilter.chainId,
myEmitterFilter.emitterFilter.emitterAddress
);
myFilters.push(myEmitterFilter);
}
console.log("setting", myFilters.length, "filters");
filter = {
filters: myFilters,
};
} else {
console.log("processing all signed VAAs");
}
const client = createSpyRPCServiceClient(process.env.SPY_SERVICE_HOST);
const stream = await subscribeSignedVAA(client, filter);
const { parse_vaa } = await importCoreWasm();
stream.on("data", ({ vaaBytes }) => {
processVaa(parse_vaa, vaaBytes);
});
console.log("spy_relay waiting for transfer signed VAAs");
})();
}
async function encodeEmitterAddress(
myChainId,
emitterAddressStr
): Promise<string> {
if (myChainId === CHAIN_ID_SOLANA) {
return await getEmitterAddressSolana(emitterAddressStr);
}
if (myChainId === CHAIN_ID_TERRA) {
return await getEmitterAddressTerra(emitterAddressStr);
}
return getEmitterAddressEth(emitterAddressStr);
}
function processVaa(parse_vaa, vaaBytes) {
// console.log(vaaBytes);
const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes));
// console.log(parsedVAA);
if (parsedVAA.payload[0] === 1) {
if (!listenOnly) {
var storeKey = helpers.storeKeyFromParsedVAA(parsedVAA);
var storePayload = helpers.storePayloadFromVaaBytes(vaaBytes);
// console.log("storeKey: ", helpers.storeKeyToJson(storeKey));
// console.log("storePayload: ", helpers.storePayloadToJson(storePayload));
var newStoreKey = helpers.storeKeyFromJson(JSON.stringify(storeKey));
var newStorePayload = helpers.storeePayloadFromJson(
JSON.stringify(storePayload)
);
// console.log("newStoreKey: ", newStoreKey);
// console.log("newStorePayload: ", newStorePayload);
storeInRedis(
myRedisClient,
helpers.storeKeyToJson(storeKey),
helpers.storePayloadToJson(storePayload)
);
}
var transferPayload = parseTransferPayload(Buffer.from(parsedVAA.payload));
console.log(
"transfer: emitter: [%d:%s], seqNum: %d, payload: origin: [%d:%s], target: [%d:%s], amount: %d",
parsedVAA.emitter_chain,
uint8ArrayToHex(parsedVAA.emitter_address),
parsedVAA.sequence,
transferPayload.originChain,
transferPayload.originAddress,
transferPayload.targetChain,
transferPayload.targetAddress,
transferPayload.amount
);
process.exit(1);
// console.log(transferPayload);
}
spy_rest(restPort);
}