Spy relay (#15)

* Initial spy relayer

* Adding "scan" functionality

* try to may spy_rest send to rest_relay

* Refactor into a single process

* Create docker image

* Update README.md

* Have a consistently formatted file

* Remove extraneous err

* remove extraneous error

* Changed keys to scanIterator

* Refactor async/await code

* Add ability to only run certain features

* Fix Broken listener

* Process pyth messages

* Fix async problem

* Initial attempt to submit transfers

* relay method needs an await

* Make redis client local

* Added some constants

* Relay to EVM, Solana and Terra

* Make terra chainID come from the env

* Make relay method figure out the target chain

* Adding worker code

* Start of integration tests

* Eth to Solana integration test

* ETH->Sol test querying of spy relay is wrong

* Upgrade to sdk version 1.4

* Add Eth to Terra test

* Tested multi thread scenario

* Beef up tests to check if redeemed

* Use sleep in helpers

* Make relay verify the redeem happened

* Storing of pyth in redis needs to await

* Handle duplicates

* Remove obsolete pyth stuff

* Update README.md file

* Make listen_only not use redis

Co-authored-by: Paul Noel <panoel007@gmail.com>
Co-authored-by: Bruce Riley <95238258+brucer1963@users.noreply.github.com>
Co-authored-by: Paul Noel <35237584+panoel@users.noreply.github.com>
This commit is contained in:
bruce-riley 2022-02-09 08:47:45 -06:00 committed by GitHub
parent dc55817d6f
commit f6f7e8fa65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 174 deletions

View File

@ -1,7 +1,8 @@
In order to compile spy_relay you need to do:
```
npm install redis
npm ci
```
In order to run spy_relay successfully you need to do:

View File

@ -7,8 +7,10 @@
"build": "tsc",
"spy_relay": "node lib/spy_relay.js",
"listen_only": "node lib/spy_relay.js --listen_only",
"spy_rest": "node lib/spy_rest.js",
"spy_worker": "node lib/spy_worker.js"
"worker_only": "node lib/spy_relay.js --worker_only",
"rest_only": "node lib/spy_relay.js --rest_only",
"test": "jest --config jestconfig.json --verbose"
},
"author": "",
"license": "Apache-2.0",

View File

@ -23,7 +23,8 @@ import { importCoreWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
import * as helpers from "./helpers";
export async function spy_listen() {
export async function spy_listen(useRedis: boolean) {
require("dotenv").config();
if (!process.env.SPY_SERVICE_HOST) {
console.error("Missing environment variable SPY_SERVICE_HOST");
@ -79,7 +80,7 @@ export async function spy_listen() {
const stream = await subscribeSignedVAA(client, filter);
stream.on("data", ({ vaaBytes }) => {
processVaa(vaaBytes);
processVaa(vaaBytes, useRedis);
});
console.log("spy_relay waiting for transfer signed VAAs");
@ -101,21 +102,13 @@ async function encodeEmitterAddress(
return getEmitterAddressEth(emitterAddressStr);
}
async function processVaa(vaaBytes) {
async function processVaa(vaaBytes, useRedis: boolean) {
// console.log("processVaa");
console.log(vaaBytes);
const { parse_vaa } = await importCoreWasm();
const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes));
console.log(parsedVAA);
// Connect to redis
const myRedisClient = await connectToRedis();
if (myRedisClient) {
console.log("Got a valid client from connect");
} else {
console.error("Invalid client from connect");
return;
}
if (parsedVAA.payload[0] === 1) {
var storeKey = helpers.storeKeyFromParsedVAA(parsedVAA);
var storePayload = helpers.storePayloadFromVaaBytes(vaaBytes);
@ -128,12 +121,26 @@ async function processVaa(vaaBytes) {
storeKey.sequence,
helpers.storePayloadToJson(storePayload)
);
await storeInRedis(
myRedisClient,
helpers.storeKeyToJson(storeKey),
helpers.storePayloadToJson(storePayload)
);
console.log("Finished storing in redis.");
if (useRedis) {
// Connect to redis
const myRedisClient = await connectToRedis();
if (myRedisClient) {
console.log("Got a valid client from connect");
} else {
console.error("Invalid client from connect");
return;
}
await storeInRedis(
myRedisClient,
helpers.storeKeyToJson(storeKey),
helpers.storePayloadToJson(storePayload)
);
console.log("Finished storing in redis.");
await myRedisClient.quit();
}
var transferPayload = parseTransferPayload(Buffer.from(parsedVAA.payload));
console.log(
@ -167,7 +174,6 @@ async function processVaa(vaaBytes) {
parsedVAA
);
}
await myRedisClient.quit();
}
async function connectToRedis() {

View File

@ -1,27 +1,10 @@
import {
createSpyRPCServiceClient,
subscribeSignedVAA,
} from "@certusone/wormhole-spydk";
import { setDefaultWasm } from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
import {
ChainId,
CHAIN_ID_SOLANA,
CHAIN_ID_TERRA,
hexToUint8Array,
uint8ArrayToHex,
parseTransferPayload,
getEmitterAddressEth,
getEmitterAddressSolana,
getEmitterAddressTerra,
} from "@certusone/wormhole-sdk";
import {
importCoreWasm,
setDefaultWasm,
} from "@certusone/wormhole-sdk/lib/cjs/solana/wasm";
// import { storeKeyFromParsedVAA, storePayloadFromVaaBytes } from "./helpers";
import { spy_listen } from "./spy_listen";
import { spy_worker } from "./spy_worker";
import { spy_rest } from "./spy_rest";
import * as helpers from "./helpers";
import { RelayerEnvironment, validateEnvironment } from "./configureEnv";
var pendingMap = new Map<string, string>();
pendingMap.set("XXX", "XXX should be first");
@ -61,144 +44,74 @@ for (let idx = 0; idx < process.argv.length; ++idx) {
}
require("dotenv").config();
if (!process.env.SPY_SERVICE_HOST) {
console.error("Missing environment variable SPY_SERVICE_HOST");
process.exit(1);
}
// Connect to redis
import { createClient } from "redis";
setDefaultWasm("node");
async function connectToRedis() {
var rClient = createClient();
const env: RelayerEnvironment = validateEnvironment();
rClient.on("error", (err) => console.log("Redis Client Error", err));
rClient.on("connect", (err) => console.log("Redis Client Connected", err));
var runListen: boolean = true;
var runWorker: boolean = true;
var runRest: boolean = true;
var foundOne: boolean = false;
console.log("Attempting to connect...");
await rClient.connect();
return rClient;
}
async function storeInRedis(
client: typeof myRedisClient,
name: string,
value: string
) {
await (await client).set(name, value);
}
if (false) {
var myRedisClient;
if (!listenOnly) {
myRedisClient = connectToRedis();
}
console.log(
"spy_relay starting up, will listen for signed VAAs from [%s]",
process.env.SPY_SERVICE_HOST
);
setDefaultWasm("node");
(async () => {
var filter = {};
if (process.env.SPY_SERVICE_FILTERS) {
const parsedJsonFilters = eval(process.env.SPY_SERVICE_FILTERS);
var myFilters = [];
for (var i = 0; i < parsedJsonFilters.length; i++) {
var myChainId = parseInt(parsedJsonFilters[i].chain_id) as ChainId;
var myEmitterAddress = await encodeEmitterAddress(
myChainId,
parsedJsonFilters[i].emitter_address
);
var myEmitterFilter = {
emitterFilter: {
chainId: myChainId,
emitterAddress: myEmitterAddress,
},
};
console.log(
"adding filter: chainId: [%i], emitterAddress: [%s]",
myEmitterFilter.emitterFilter.chainId,
myEmitterFilter.emitterFilter.emitterAddress
);
myFilters.push(myEmitterFilter);
}
console.log("setting", myFilters.length, "filters");
filter = {
filters: myFilters,
};
} else {
console.log("processing all signed VAAs");
}
const client = createSpyRPCServiceClient(process.env.SPY_SERVICE_HOST);
const stream = await subscribeSignedVAA(client, filter);
const { parse_vaa } = await importCoreWasm();
stream.on("data", ({ vaaBytes }) => {
processVaa(parse_vaa, vaaBytes);
});
console.log("spy_relay waiting for transfer signed VAAs");
})();
}
async function encodeEmitterAddress(
myChainId,
emitterAddressStr
): Promise<string> {
if (myChainId === CHAIN_ID_SOLANA) {
return await getEmitterAddressSolana(emitterAddressStr);
}
if (myChainId === CHAIN_ID_TERRA) {
return await getEmitterAddressTerra(emitterAddressStr);
}
return getEmitterAddressEth(emitterAddressStr);
}
function processVaa(parse_vaa, vaaBytes) {
// console.log(vaaBytes);
const parsedVAA = parse_vaa(hexToUint8Array(vaaBytes));
// console.log(parsedVAA);
if (parsedVAA.payload[0] === 1) {
if (!listenOnly) {
var storeKey = helpers.storeKeyFromParsedVAA(parsedVAA);
var storePayload = helpers.storePayloadFromVaaBytes(vaaBytes);
// console.log("storeKey: ", helpers.storeKeyToJson(storeKey));
// console.log("storePayload: ", helpers.storePayloadToJson(storePayload));
var newStoreKey = helpers.storeKeyFromJson(JSON.stringify(storeKey));
var newStorePayload = helpers.storeePayloadFromJson(
JSON.stringify(storePayload)
);
// console.log("newStoreKey: ", newStoreKey);
// console.log("newStorePayload: ", newStorePayload);
storeInRedis(
myRedisClient,
helpers.storeKeyToJson(storeKey),
helpers.storePayloadToJson(storePayload)
for (let idx = 0; idx < process.argv.length; ++idx) {
if (process.argv[idx] === "--listen_only") {
if (foundOne) {
console.error(
'May only specify one of "--listen_only", "--worker_only" or "--rest_only"'
);
process.exit(1);
}
runWorker = false;
runRest = false;
foundOne = true;
}
var transferPayload = parseTransferPayload(Buffer.from(parsedVAA.payload));
console.log(
"transfer: emitter: [%d:%s], seqNum: %d, payload: origin: [%d:%s], target: [%d:%s], amount: %d",
parsedVAA.emitter_chain,
uint8ArrayToHex(parsedVAA.emitter_address),
parsedVAA.sequence,
transferPayload.originChain,
transferPayload.originAddress,
transferPayload.targetChain,
transferPayload.targetAddress,
transferPayload.amount
if (process.argv[idx] === "--worker_only") {
if (foundOne) {
console.error(
'May only specify one of "--listen_only", "--worker_only" or "--rest_only"'
);
process.exit(1);
}
runListen = false;
runRest = false;
foundOne = true;
}
if (process.argv[idx] === "--rest_only") {
if (foundOne) {
console.error(
'May only specify one of "--listen_only", "--worker_only" or "--rest_only"'
);
process.exit(1);
}
runListen = false;
runWorker = false;
foundOne = true;
}
}
// Start the spy listener to listen to the guardians.
if (runListen) {
spy_listen(runWorker);
}
// Start the spy worker to process VAAs from the store.
if (runWorker) {
spy_worker();
}
// Start the REST server, if configured.
if (runRest && process.env.SPY_REST_PORT) {
var restPort = parseInt(process.env.SPY_REST_PORT);
if (!restPort) {
console.error(
"Environment variable SPY_REST_PORT is set to [%s], which is not a valid port number.",
process.env.SPY_REST_PORT
);
// console.log(transferPayload);
process.exit(1);
}
spy_rest(restPort);
}