Merge branch 'main' into feat/adds-k8s-namespace
This commit is contained in:
commit
845737bd33
|
@ -10,6 +10,13 @@ import "../interfaces/IWormholeReceiver.sol";
|
||||||
|
|
||||||
import "forge-std/console.sol";
|
import "forge-std/console.sol";
|
||||||
|
|
||||||
|
interface Structs {
|
||||||
|
struct XAddress {
|
||||||
|
uint16 chainId;
|
||||||
|
bytes32 addr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
contract MockRelayerIntegration is IWormholeReceiver {
|
contract MockRelayerIntegration is IWormholeReceiver {
|
||||||
using BytesLib for bytes;
|
using BytesLib for bytes;
|
||||||
|
|
||||||
|
@ -28,7 +35,8 @@ contract MockRelayerIntegration is IWormholeReceiver {
|
||||||
// mapping of other MockRelayerIntegration contracts
|
// mapping of other MockRelayerIntegration contracts
|
||||||
mapping(uint16 => bytes32) registeredContracts;
|
mapping(uint16 => bytes32) registeredContracts;
|
||||||
|
|
||||||
bytes[] messages;
|
// bytes[] messages;
|
||||||
|
bytes[][] messageHistory;
|
||||||
|
|
||||||
struct FurtherInstructions {
|
struct FurtherInstructions {
|
||||||
bool keepSending;
|
bool keepSending;
|
||||||
|
@ -139,14 +147,17 @@ contract MockRelayerIntegration is IWormholeReceiver {
|
||||||
function receiveWormholeMessages(bytes[] memory wormholeObservations, bytes[] memory) public payable override {
|
function receiveWormholeMessages(bytes[] memory wormholeObservations, bytes[] memory) public payable override {
|
||||||
// loop through the array of wormhole observations from the batch and store each payload
|
// loop through the array of wormhole observations from the batch and store each payload
|
||||||
uint256 numObservations = wormholeObservations.length;
|
uint256 numObservations = wormholeObservations.length;
|
||||||
messages = new bytes[](wormholeObservations.length - 2);
|
bytes[] memory messages = new bytes[](wormholeObservations.length - 2);
|
||||||
|
uint16 emitterChainId;
|
||||||
for (uint256 i = 0; i < numObservations - 2; i++) {
|
for (uint256 i = 0; i < numObservations - 2; i++) {
|
||||||
(IWormhole.VM memory parsed, bool valid, string memory reason) =
|
(IWormhole.VM memory parsed, bool valid, string memory reason) =
|
||||||
wormhole.parseAndVerifyVM(wormholeObservations[i]);
|
wormhole.parseAndVerifyVM(wormholeObservations[i]);
|
||||||
require(valid, reason);
|
require(valid, reason);
|
||||||
require(registeredContracts[parsed.emitterChainId] == parsed.emitterAddress);
|
require(registeredContracts[parsed.emitterChainId] == parsed.emitterAddress);
|
||||||
|
emitterChainId = parsed.emitterChainId;
|
||||||
messages[i] = parsed.payload;
|
messages[i] = parsed.payload;
|
||||||
}
|
}
|
||||||
|
messageHistory.push(messages);
|
||||||
|
|
||||||
(IWormhole.VM memory parsed, bool valid, string memory reason) =
|
(IWormhole.VM memory parsed, bool valid, string memory reason) =
|
||||||
wormhole.parseAndVerifyVM(wormholeObservations[wormholeObservations.length - 2]);
|
wormhole.parseAndVerifyVM(wormholeObservations[wormholeObservations.length - 2]);
|
||||||
|
@ -183,14 +194,21 @@ contract MockRelayerIntegration is IWormholeReceiver {
|
||||||
}
|
}
|
||||||
|
|
||||||
function getMessage() public view returns (bytes memory) {
|
function getMessage() public view returns (bytes memory) {
|
||||||
if (messages.length == 0) {
|
if (messageHistory.length == 0 || messageHistory[messageHistory.length - 1].length == 0) {
|
||||||
return new bytes(0);
|
return new bytes(0);
|
||||||
}
|
}
|
||||||
return messages[0];
|
return messageHistory[messageHistory.length - 1][0];
|
||||||
}
|
}
|
||||||
|
|
||||||
function getMessages() public view returns (bytes[] memory) {
|
function getMessages() public view returns (bytes[] memory) {
|
||||||
return messages;
|
if (messageHistory.length == 0 || messageHistory[messageHistory.length - 1].length == 0) {
|
||||||
|
return new bytes[](0);
|
||||||
|
}
|
||||||
|
return messageHistory[messageHistory.length - 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
function getMessageHistory() public view returns (bytes[][] memory) {
|
||||||
|
return messageHistory;
|
||||||
}
|
}
|
||||||
|
|
||||||
function clearPayload(bytes32 hash) public {
|
function clearPayload(bytes32 hash) public {
|
||||||
|
@ -210,6 +228,13 @@ contract MockRelayerIntegration is IWormholeReceiver {
|
||||||
registeredContracts[chainId] = emitterAddress;
|
registeredContracts[chainId] = emitterAddress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function registerEmitters(Structs.XAddress[] calldata emitters) public {
|
||||||
|
require(msg.sender == owner);
|
||||||
|
for (uint256 i = 0; i < emitters.length; i++) {
|
||||||
|
registeredContracts[emitters[i].chainId] = emitters[i].addr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function encodeFurtherInstructions(FurtherInstructions memory furtherInstructions)
|
function encodeFurtherInstructions(FurtherInstructions memory furtherInstructions)
|
||||||
public
|
public
|
||||||
view
|
view
|
||||||
|
|
|
@ -23,7 +23,7 @@ import {IWormhole} from "../contracts/interfaces/IWormhole.sol";
|
||||||
import {WormholeSimulator, FakeWormholeSimulator} from "./WormholeSimulator.sol";
|
import {WormholeSimulator, FakeWormholeSimulator} from "./WormholeSimulator.sol";
|
||||||
import {IWormholeReceiver} from "../contracts/interfaces/IWormholeReceiver.sol";
|
import {IWormholeReceiver} from "../contracts/interfaces/IWormholeReceiver.sol";
|
||||||
import {AttackForwardIntegration} from "../contracts/mock/AttackForwardIntegration.sol";
|
import {AttackForwardIntegration} from "../contracts/mock/AttackForwardIntegration.sol";
|
||||||
import {MockRelayerIntegration} from "../contracts/mock/MockRelayerIntegration.sol";
|
import {MockRelayerIntegration, Structs} from "../contracts/mock/MockRelayerIntegration.sol";
|
||||||
import "../contracts/libraries/external/BytesLib.sol";
|
import "../contracts/libraries/external/BytesLib.sol";
|
||||||
|
|
||||||
import "forge-std/Test.sol";
|
import "forge-std/Test.sol";
|
||||||
|
@ -255,6 +255,9 @@ contract TestCoreRelayer is Test {
|
||||||
);
|
);
|
||||||
map[i].relayProvider.updateMaximumBudget(j, maxBudget);
|
map[i].relayProvider.updateMaximumBudget(j, maxBudget);
|
||||||
map[i].integration.registerEmitter(j, bytes32(uint256(uint160(address(map[j].integration)))));
|
map[i].integration.registerEmitter(j, bytes32(uint256(uint160(address(map[j].integration)))));
|
||||||
|
Structs.XAddress[] memory addresses = new Structs.XAddress[](1);
|
||||||
|
addresses[0] = Structs.XAddress(j, bytes32(uint256(uint160(address(map[j].integration)))));
|
||||||
|
map[i].integration.registerEmitters(addresses);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -7,6 +7,8 @@
|
||||||
"@poanet/solidity-flattener": "^3.0.8",
|
"@poanet/solidity-flattener": "^3.0.8",
|
||||||
"@typechain/ethers-v5": "^10.1.1",
|
"@typechain/ethers-v5": "^10.1.1",
|
||||||
"@types/chai": "^4.3.3",
|
"@types/chai": "^4.3.3",
|
||||||
|
"@types/koa": "^2.13.5",
|
||||||
|
"@types/koa-router": "^7.4.4",
|
||||||
"@types/mocha": "^9.1.1",
|
"@types/mocha": "^9.1.1",
|
||||||
"chai": "^4.3.6",
|
"chai": "^4.3.6",
|
||||||
"dotenv": "^16.0.3",
|
"dotenv": "^16.0.3",
|
||||||
|
@ -19,6 +21,7 @@
|
||||||
"build": "forge build -o build --via-ir",
|
"build": "forge build -o build --via-ir",
|
||||||
"forge-test": "forge test -vvv --via-ir",
|
"forge-test": "forge test -vvv --via-ir",
|
||||||
"integration-test": "bash shell-scripts/run_integration_tests.sh",
|
"integration-test": "bash shell-scripts/run_integration_tests.sh",
|
||||||
|
"load-test": "ENV=testnet ts-node ts-scripts/mockIntegrations/loadGeneration.ts",
|
||||||
"typechain": "bash ../sdk/scripts/make_ethers_types.sh",
|
"typechain": "bash ../sdk/scripts/make_ethers_types.sh",
|
||||||
"flatten": "mkdir -p node_modules/@poanet/solidity-flattener/contracts && cp -r contracts/* node_modules/@poanet/solidity-flattener/contracts/ && poa-solidity-flattener",
|
"flatten": "mkdir -p node_modules/@poanet/solidity-flattener/contracts && cp -r contracts/* node_modules/@poanet/solidity-flattener/contracts/ && poa-solidity-flattener",
|
||||||
"deployAndConfigureTilt": "ENV=tilt bash ./ts-scripts/shell/deployConfigureTest.sh",
|
"deployAndConfigureTilt": "ENV=tilt bash ./ts-scripts/shell/deployConfigureTest.sh",
|
||||||
|
@ -32,6 +35,9 @@
|
||||||
"@improbable-eng/grpc-web-node-http-transport": "^0.15.0",
|
"@improbable-eng/grpc-web-node-http-transport": "^0.15.0",
|
||||||
"elliptic": "^6.5.4",
|
"elliptic": "^6.5.4",
|
||||||
"jsonfile": "^6.1.0",
|
"jsonfile": "^6.1.0",
|
||||||
|
"koa": "^2.14.1",
|
||||||
|
"koa-router": "^12.0.0",
|
||||||
|
"prom-client": "^14.2.0",
|
||||||
"typescript": "^4.8.3"
|
"typescript": "^4.8.3"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,23 +48,23 @@
|
||||||
"mockIntegrations": [
|
"mockIntegrations": [
|
||||||
{
|
{
|
||||||
"chainId": 6,
|
"chainId": 6,
|
||||||
"address": "0x0819CaB27473e585DD28c0D908DCc218dE4cB05C"
|
"address": "0xaC19a80C3409F8296043FC24F955790164Ccb15E"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"chainId": 14,
|
"chainId": 14,
|
||||||
"address": "0xB7c205B9E147e4371E28fAe3F9530566de374Dcc"
|
"address": "0xC02d04f9067Ede6fF4CaDC31233c795ec982eFd7"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"chainId": 4,
|
"chainId": 4,
|
||||||
"address": "0xD1d6Ae0149A460204967F91Cb358C3581D4F1692"
|
"address": "0x3C1cd47E8D9aa563B238841B8D95c80d11E6B3A6"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"chainId": 5,
|
"chainId": 5,
|
||||||
"address": "0xE8cfD947Eb14b0F08DEE7bf0E352480164ED2e6e"
|
"address": "0xf4092f14f74E119fae5343d08C3A253FB9D2e8d6"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"chainId": 16,
|
"chainId": 16,
|
||||||
"address": "0x87c26Ad201cfC0A969911143985B39855C9CB704"
|
"address": "0x0896fB74CaBeDb603E3A6EdA36f40af00010Bf4c"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
|
@ -1,7 +1,13 @@
|
||||||
import { init, loadChains, writeOutputFiles, getMockIntegration } from "../helpers/env"
|
import {
|
||||||
|
init,
|
||||||
|
loadChains,
|
||||||
|
writeOutputFiles,
|
||||||
|
getMockIntegration,
|
||||||
|
Deployment,
|
||||||
|
} from "../helpers/env"
|
||||||
import { deployMockIntegration } from "../helpers/deployments"
|
import { deployMockIntegration } from "../helpers/deployments"
|
||||||
import { BigNumber } from "ethers"
|
import { BigNumber, BigNumberish, BytesLike } from "ethers"
|
||||||
import { tryNativeToHexString } from "@certusone/wormhole-sdk"
|
import { tryNativeToHexString, tryNativeToUint8Array } from "@certusone/wormhole-sdk"
|
||||||
import { MockRelayerIntegration__factory } from "../../../sdk/src"
|
import { MockRelayerIntegration__factory } from "../../../sdk/src"
|
||||||
import { wait } from "../helpers/utils"
|
import { wait } from "../helpers/utils"
|
||||||
|
|
||||||
|
@ -11,13 +17,12 @@ const chains = loadChains()
|
||||||
|
|
||||||
async function run() {
|
async function run() {
|
||||||
console.log("Start!")
|
console.log("Start!")
|
||||||
const output: any = {
|
const output = {
|
||||||
mockIntegrations: [],
|
mockIntegrations: [] as Deployment[],
|
||||||
}
|
}
|
||||||
|
|
||||||
for (let i = 0; i < chains.length; i++) {
|
for (let i = 0; i < chains.length; i++) {
|
||||||
const mockIntegration = await deployMockIntegration(chains[i])
|
const mockIntegration = await deployMockIntegration(chains[i])
|
||||||
|
|
||||||
output.mockIntegrations.push(mockIntegration)
|
output.mockIntegrations.push(mockIntegration)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,18 +30,17 @@ async function run() {
|
||||||
|
|
||||||
for (let i = 0; i < chains.length; i++) {
|
for (let i = 0; i < chains.length; i++) {
|
||||||
console.log(`Registering emitters for chainId ${chains[i].chainId}`)
|
console.log(`Registering emitters for chainId ${chains[i].chainId}`)
|
||||||
|
// note: must use useLastRun = true
|
||||||
const mockIntegration = getMockIntegration(chains[i])
|
const mockIntegration = getMockIntegration(chains[i])
|
||||||
for (let j = 0; j < chains.length; j++) {
|
|
||||||
console.log(`Registering emitter ${chains[j].chainId}`)
|
const arg: {
|
||||||
const secondMockIntegration = output.mockIntegrations[j]
|
chainId: BigNumberish
|
||||||
await mockIntegration
|
addr: BytesLike
|
||||||
.registerEmitter(
|
}[] = chains.map((c, j) => ({
|
||||||
secondMockIntegration.chainId,
|
chainId: c.chainId,
|
||||||
"0x" + tryNativeToHexString(secondMockIntegration.address, "ethereum"),
|
addr: "0x" + tryNativeToHexString(output.mockIntegrations[j].address, "ethereum"),
|
||||||
{ gasLimit: 500000 }
|
}))
|
||||||
)
|
await mockIntegration.registerEmitters(arg, { gasLimit: 500000 }).then(wait)
|
||||||
.then(wait)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
import { ChainInfo, init, loadChains } from "../helpers/env"
|
||||||
|
import { sendMessage, sleep } from "./messageUtils"
|
||||||
|
import { Counter, register } from "prom-client"
|
||||||
|
import Koa from "koa"
|
||||||
|
import Router from "koa-router"
|
||||||
|
|
||||||
|
const promPort = 3000
|
||||||
|
|
||||||
|
init()
|
||||||
|
const chains = loadChains()
|
||||||
|
|
||||||
|
export const undeliveredMessages = new Counter({
|
||||||
|
name: "undelivered_messages",
|
||||||
|
help: "Counter for number of messages that were not delivered",
|
||||||
|
labelNames: ["sourceChain", "targetChain"],
|
||||||
|
})
|
||||||
|
|
||||||
|
export const deliveredMessages = new Counter({
|
||||||
|
name: "delivered_messages",
|
||||||
|
help: "Counter for number of messages that were successfully delivered",
|
||||||
|
labelNames: ["sourceChain", "targetChain"],
|
||||||
|
})
|
||||||
|
|
||||||
|
async function run() {
|
||||||
|
const chainIntervalIdx = process.argv.findIndex((arg) => arg === "--chainInterval")
|
||||||
|
const salvoIntervalIdx = process.argv.findIndex((arg) => arg === "--salvoInterval")
|
||||||
|
const chainInterval =
|
||||||
|
chainIntervalIdx !== -1 ? Number(process.argv[chainIntervalIdx + 1]) : 5_000
|
||||||
|
const salvoInterval =
|
||||||
|
salvoIntervalIdx !== -1 ? Number(process.argv[salvoIntervalIdx + 1]) : 60_000
|
||||||
|
|
||||||
|
console.log(`chainInterval: ${chainInterval}`)
|
||||||
|
console.log(`salvoInterval: ${salvoInterval}`)
|
||||||
|
|
||||||
|
if (process.argv.find((arg) => arg === "--per-chain")) {
|
||||||
|
await perChain(chainInterval, salvoInterval)
|
||||||
|
} else {
|
||||||
|
await matrix(chainInterval, salvoInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function perChain(chainIntervalMS: number, salvoIntervalMS: number) {
|
||||||
|
console.log(`Sending test messages to and from each chain...`)
|
||||||
|
for (let salvo = 0; true; salvo++) {
|
||||||
|
console.log("")
|
||||||
|
console.log(`Sending salvo ${salvo}`)
|
||||||
|
|
||||||
|
for (let i = 0; i < chains.length; ++i) {
|
||||||
|
const j = i === 0 ? chains.length - 1 : 0
|
||||||
|
await sendMessageAndReportMetric(chains[i], chains[j], chainIntervalMS)
|
||||||
|
}
|
||||||
|
|
||||||
|
await sleep(salvoIntervalMS)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function matrix(chainIntervalMS: number, salvoIntervalMS: number) {
|
||||||
|
console.log(`Sending test messages to and from every combination of chains...`)
|
||||||
|
for (let salvo = 0; true; salvo++) {
|
||||||
|
console.log("")
|
||||||
|
console.log(`Sending salvo ${salvo}`)
|
||||||
|
|
||||||
|
for (let i = 0; i < chains.length; ++i) {
|
||||||
|
for (let j = 0; i < chains.length; ++i) {
|
||||||
|
await sendMessageAndReportMetric(chains[i], chains[j], chainIntervalMS)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await sleep(salvoIntervalMS)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function sendMessageAndReportMetric(
|
||||||
|
sourceChain: ChainInfo,
|
||||||
|
targetChain: ChainInfo,
|
||||||
|
chainInterval: number
|
||||||
|
) {
|
||||||
|
try {
|
||||||
|
const notFound = await sendMessage(sourceChain, targetChain, false, true)
|
||||||
|
const counter = notFound ? undeliveredMessages : deliveredMessages
|
||||||
|
counter
|
||||||
|
.labels({
|
||||||
|
sourceChain: sourceChain.chainId,
|
||||||
|
targetChain: targetChain.chainId,
|
||||||
|
})
|
||||||
|
.inc()
|
||||||
|
} catch (e) {
|
||||||
|
console.error(e)
|
||||||
|
}
|
||||||
|
await sleep(chainInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
async function launchMetricsServer() {
|
||||||
|
const app = new Koa()
|
||||||
|
const router = new Router()
|
||||||
|
|
||||||
|
router.get("/metrics", async (ctx, next) => {
|
||||||
|
let metrics = await register.metrics()
|
||||||
|
ctx.body = metrics
|
||||||
|
})
|
||||||
|
|
||||||
|
app.use(router.allowedMethods())
|
||||||
|
app.use(router.routes())
|
||||||
|
app.listen(promPort, () =>
|
||||||
|
console.log(`Prometheus metrics running on port ${promPort}`)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log("Start!")
|
||||||
|
run().then(() => console.log("Done!"))
|
|
@ -1,107 +1,18 @@
|
||||||
import * as wh from "@certusone/wormhole-sdk"
|
|
||||||
import { Implementation__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts"
|
|
||||||
import { LogMessagePublishedEvent } from "../../../sdk/src"
|
|
||||||
import {
|
import {
|
||||||
ChainInfo,
|
ChainInfo,
|
||||||
getCoreRelayer,
|
|
||||||
getCoreRelayerAddress,
|
|
||||||
getMockIntegration,
|
|
||||||
getMockIntegrationAddress,
|
|
||||||
getRelayProviderAddress,
|
|
||||||
init,
|
init,
|
||||||
loadChains,
|
loadChains,
|
||||||
} from "../helpers/env"
|
} from "../helpers/env"
|
||||||
import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-transport"
|
import { sendMessage } from "./messageUtils"
|
||||||
|
|
||||||
init()
|
init()
|
||||||
const chains = loadChains()
|
const chains = loadChains()
|
||||||
|
|
||||||
async function sendMessage(
|
|
||||||
sourceChain: ChainInfo,
|
|
||||||
targetChain: ChainInfo,
|
|
||||||
fetchSignedVaa: boolean = false,
|
|
||||||
queryMessageOnTarget: boolean = false
|
|
||||||
) {
|
|
||||||
console.log(
|
|
||||||
`Sending message from chain ${sourceChain.chainId} to ${targetChain.chainId}...`
|
|
||||||
)
|
|
||||||
|
|
||||||
const sourceRelayer = getCoreRelayer(sourceChain)
|
|
||||||
|
|
||||||
// todo: remove
|
|
||||||
const registeredChain = await sourceRelayer.registeredCoreRelayerContract(
|
|
||||||
sourceChain.chainId
|
|
||||||
)
|
|
||||||
console.log("The source chain should be registered to itself")
|
|
||||||
console.log(registeredChain)
|
|
||||||
console.log(getCoreRelayerAddress(sourceChain))
|
|
||||||
console.log("")
|
|
||||||
|
|
||||||
const defaultRelayerProvider = await sourceRelayer.getDefaultRelayProvider()
|
|
||||||
console.log("Default relay provider should be this chains relayProvider ")
|
|
||||||
console.log(defaultRelayerProvider)
|
|
||||||
console.log(getRelayProviderAddress(sourceChain))
|
|
||||||
console.log("")
|
|
||||||
|
|
||||||
const relayQuote = await (
|
|
||||||
await sourceRelayer.quoteGas(
|
|
||||||
targetChain.chainId,
|
|
||||||
2000000,
|
|
||||||
await sourceRelayer.getDefaultRelayProvider()
|
|
||||||
)
|
|
||||||
).add(10000000000)
|
|
||||||
console.log("relay quote: " + relayQuote)
|
|
||||||
|
|
||||||
const mockIntegration = getMockIntegration(sourceChain)
|
|
||||||
const targetAddress = getMockIntegrationAddress(targetChain)
|
|
||||||
|
|
||||||
const sentMessage = Buffer.from("Hello World: " + String(Math.ceil(Math.random() * 100)))
|
|
||||||
const tx = await mockIntegration.sendMessage(
|
|
||||||
sentMessage,
|
|
||||||
targetChain.chainId,
|
|
||||||
targetAddress,
|
|
||||||
{
|
|
||||||
gasLimit: 1000000,
|
|
||||||
value: relayQuote,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
const rx = await tx.wait()
|
|
||||||
const sequences = wh.parseSequencesFromLogEth(rx, sourceChain.wormholeAddress)
|
|
||||||
console.log("Tx hash: ", rx.transactionHash)
|
|
||||||
console.log(`Sequences: ${sequences}`)
|
|
||||||
if (fetchSignedVaa) {
|
|
||||||
for (let i = 0; i < 120; i++) {
|
|
||||||
try {
|
|
||||||
const vaa1 = await fetchVaaFromLog(rx.logs[0], sourceChain.chainId)
|
|
||||||
console.log(vaa1)
|
|
||||||
const vaa2 = await fetchVaaFromLog(rx.logs[1], sourceChain.chainId)
|
|
||||||
console.log(vaa2)
|
|
||||||
break
|
|
||||||
} catch (e) {
|
|
||||||
console.error(`${i} seconds`)
|
|
||||||
if (i === 0) {
|
|
||||||
console.error(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, 1_000))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (queryMessageOnTarget) {
|
|
||||||
await new Promise<void>((resolve) => setTimeout(() => resolve(), 5000))
|
|
||||||
const targetIntegration = getMockIntegration(targetChain)
|
|
||||||
const message = await targetIntegration.getMessage()
|
|
||||||
const messageParsed = Buffer.from(message, "hex").toString("utf-8")
|
|
||||||
console.log(`Sent message: ${sentMessage}`)
|
|
||||||
console.log(`Received message: ${message}`)
|
|
||||||
}
|
|
||||||
console.log("")
|
|
||||||
}
|
|
||||||
|
|
||||||
async function run() {
|
async function run() {
|
||||||
console.log(process.argv)
|
console.log(process.argv)
|
||||||
const fetchSignedVaa = !!process.argv.find((arg) => arg === "--fetchSignedVaa")
|
const fetchSignedVaa = !!process.argv.find((arg) => arg === "--fetchSignedVaa")
|
||||||
const queryMessageOnTarget = !!process.argv.find(
|
const queryMessageOnTarget = !process.argv.find(
|
||||||
(arg) => arg === "--queryMessageOnTarget"
|
(arg) => arg === "--noQueryMessageOnTarget"
|
||||||
)
|
)
|
||||||
if (process.argv[2] === "--from" && process.argv[4] === "--to") {
|
if (process.argv[2] === "--from" && process.argv[4] === "--to") {
|
||||||
await sendMessage(
|
await sendMessage(
|
||||||
|
@ -148,35 +59,3 @@ function getChainById(id: number | string): ChainInfo {
|
||||||
|
|
||||||
console.log("Start!")
|
console.log("Start!")
|
||||||
run().then(() => console.log("Done!"))
|
run().then(() => console.log("Done!"))
|
||||||
|
|
||||||
export async function encodeEmitterAddress(
|
|
||||||
myChainId: wh.ChainId,
|
|
||||||
emitterAddressStr: string
|
|
||||||
): Promise<string> {
|
|
||||||
if (myChainId === wh.CHAIN_ID_SOLANA || myChainId === wh.CHAIN_ID_PYTHNET) {
|
|
||||||
return wh.getEmitterAddressSolana(emitterAddressStr)
|
|
||||||
}
|
|
||||||
if (wh.isTerraChain(myChainId)) {
|
|
||||||
return wh.getEmitterAddressTerra(emitterAddressStr)
|
|
||||||
}
|
|
||||||
if (wh.isEVMChain(myChainId)) {
|
|
||||||
return wh.getEmitterAddressEth(emitterAddressStr)
|
|
||||||
}
|
|
||||||
throw new Error(`Unrecognized wormhole chainId ${myChainId}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
function fetchVaaFromLog(bridgeLog: any, chainId: wh.ChainId): Promise<wh.SignedVaa> {
|
|
||||||
const iface = Implementation__factory.createInterface()
|
|
||||||
const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent
|
|
||||||
const sequence = log.args.sequence.toString()
|
|
||||||
const emitter = wh.tryNativeToHexString(log.args.sender, "ethereum")
|
|
||||||
return wh
|
|
||||||
.getSignedVAA(
|
|
||||||
"https://wormhole-v2-testnet-api.certus.one",
|
|
||||||
chainId,
|
|
||||||
emitter,
|
|
||||||
sequence,
|
|
||||||
{ transport: grpcWebNodeHttpTransport.NodeHttpTransport() }
|
|
||||||
)
|
|
||||||
.then((r) => r.vaaBytes)
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,141 @@
|
||||||
|
import * as wh from "@certusone/wormhole-sdk"
|
||||||
|
import { Implementation__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts"
|
||||||
|
import { LogMessagePublishedEvent } from "../../../sdk/src"
|
||||||
|
import {
|
||||||
|
ChainInfo,
|
||||||
|
getCoreRelayer,
|
||||||
|
getMockIntegration,
|
||||||
|
getMockIntegrationAddress,
|
||||||
|
} from "../helpers/env"
|
||||||
|
import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-transport"
|
||||||
|
import { ethers } from "ethers"
|
||||||
|
|
||||||
|
export async function sendMessage(
|
||||||
|
sourceChain: ChainInfo,
|
||||||
|
targetChain: ChainInfo,
|
||||||
|
fetchSignedVaa: boolean = false,
|
||||||
|
queryMessageOnTargetFlag: boolean = true
|
||||||
|
): Promise<boolean | undefined> {
|
||||||
|
console.log(
|
||||||
|
`Sending message from chain ${sourceChain.chainId} to ${targetChain.chainId}...`
|
||||||
|
)
|
||||||
|
|
||||||
|
const sourceRelayer = getCoreRelayer(sourceChain)
|
||||||
|
|
||||||
|
const relayQuote = await (
|
||||||
|
await sourceRelayer.quoteGas(
|
||||||
|
targetChain.chainId,
|
||||||
|
2000000,
|
||||||
|
await sourceRelayer.getDefaultRelayProvider()
|
||||||
|
)
|
||||||
|
).add(10000000000)
|
||||||
|
console.log("relay quote: " + relayQuote)
|
||||||
|
|
||||||
|
const mockIntegration = getMockIntegration(sourceChain)
|
||||||
|
const targetAddress = getMockIntegrationAddress(targetChain)
|
||||||
|
|
||||||
|
const sentMessage = "ID: " + String(Math.ceil(Math.random() * 10000))
|
||||||
|
console.log(`Sent message: ${sentMessage}`)
|
||||||
|
const tx = await mockIntegration.sendMessage(
|
||||||
|
Buffer.from(sentMessage),
|
||||||
|
targetChain.chainId,
|
||||||
|
targetAddress,
|
||||||
|
{
|
||||||
|
gasLimit: 1000000,
|
||||||
|
value: relayQuote,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
const rx = await tx.wait()
|
||||||
|
const sequences = wh.parseSequencesFromLogEth(rx, sourceChain.wormholeAddress)
|
||||||
|
console.log("Tx hash: ", rx.transactionHash)
|
||||||
|
console.log(`Sequences: ${sequences}`)
|
||||||
|
if (fetchSignedVaa) {
|
||||||
|
for (let i = 0; i < 120; i++) {
|
||||||
|
try {
|
||||||
|
const vaa1 = await fetchVaaFromLog(rx.logs[0], sourceChain.chainId)
|
||||||
|
console.log(vaa1)
|
||||||
|
const vaa2 = await fetchVaaFromLog(rx.logs[1], sourceChain.chainId)
|
||||||
|
console.log(vaa2)
|
||||||
|
break
|
||||||
|
} catch (e) {
|
||||||
|
console.error(`${i} seconds`)
|
||||||
|
if (i === 0) {
|
||||||
|
console.error(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 1_000))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (queryMessageOnTargetFlag) {
|
||||||
|
return await queryMessageOnTarget(sentMessage, targetChain)
|
||||||
|
}
|
||||||
|
console.log("")
|
||||||
|
}
|
||||||
|
|
||||||
|
async function queryMessageOnTarget(
|
||||||
|
sentMessage: string,
|
||||||
|
targetChain: ChainInfo
|
||||||
|
): Promise<boolean> {
|
||||||
|
let messageHistory: string[][] = []
|
||||||
|
const targetIntegration = getMockIntegration(targetChain)
|
||||||
|
|
||||||
|
let notFound = true
|
||||||
|
for (let i = 0; i < 20 && notFound; i++) {
|
||||||
|
await new Promise<void>((resolve) => setTimeout(() => resolve(), 2000))
|
||||||
|
const messageHistoryResp = await targetIntegration.getMessageHistory()
|
||||||
|
messageHistory = messageHistoryResp.map((messages) =>
|
||||||
|
messages.map((message) => ethers.utils.toUtf8String(message))
|
||||||
|
)
|
||||||
|
notFound = !messageHistory
|
||||||
|
.slice(messageHistory.length - 20)
|
||||||
|
.find((msgs) => msgs.find((m) => m === sentMessage))
|
||||||
|
process.stdout.write("..")
|
||||||
|
}
|
||||||
|
console.log("")
|
||||||
|
|
||||||
|
if (notFound) {
|
||||||
|
console.log(`ERROR: Did not receive message!`)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Received message: ${messageHistory[messageHistory.length - 1][0]}`)
|
||||||
|
console.log(`Received messageHistory: ${messageHistory.join(", ")}`)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function encodeEmitterAddress(
|
||||||
|
myChainId: wh.ChainId,
|
||||||
|
emitterAddressStr: string
|
||||||
|
): Promise<string> {
|
||||||
|
if (myChainId === wh.CHAIN_ID_SOLANA || myChainId === wh.CHAIN_ID_PYTHNET) {
|
||||||
|
return wh.getEmitterAddressSolana(emitterAddressStr)
|
||||||
|
}
|
||||||
|
if (wh.isTerraChain(myChainId)) {
|
||||||
|
return wh.getEmitterAddressTerra(emitterAddressStr)
|
||||||
|
}
|
||||||
|
if (wh.isEVMChain(myChainId)) {
|
||||||
|
return wh.getEmitterAddressEth(emitterAddressStr)
|
||||||
|
}
|
||||||
|
throw new Error(`Unrecognized wormhole chainId ${myChainId}`)
|
||||||
|
}
|
||||||
|
|
||||||
|
function fetchVaaFromLog(bridgeLog: any, chainId: wh.ChainId): Promise<wh.SignedVaa> {
|
||||||
|
const iface = Implementation__factory.createInterface()
|
||||||
|
const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent
|
||||||
|
const sequence = log.args.sequence.toString()
|
||||||
|
const emitter = wh.tryNativeToHexString(log.args.sender, "ethereum")
|
||||||
|
return wh
|
||||||
|
.getSignedVAA(
|
||||||
|
"https://wormhole-v2-testnet-api.certus.one",
|
||||||
|
chainId,
|
||||||
|
emitter,
|
||||||
|
sequence,
|
||||||
|
{ transport: grpcWebNodeHttpTransport.NodeHttpTransport() }
|
||||||
|
)
|
||||||
|
.then((r) => r.vaaBytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function sleep(ms: number) {
|
||||||
|
return new Promise((r) => setTimeout(r, ms))
|
||||||
|
}
|
|
@ -1,27 +1,24 @@
|
||||||
import {
|
import {
|
||||||
ActionExecutor,
|
ActionExecutor,
|
||||||
assertBool,
|
assertBool,
|
||||||
|
assertEvmChainId,
|
||||||
assertInt,
|
assertInt,
|
||||||
CommonPluginEnv,
|
CommonPluginEnv,
|
||||||
ContractFilter,
|
ContractFilter,
|
||||||
getScopedLogger,
|
|
||||||
ParsedVaaWithBytes,
|
ParsedVaaWithBytes,
|
||||||
parseVaaWithBytes,
|
parseVaaWithBytes,
|
||||||
Plugin,
|
Plugin,
|
||||||
Providers,
|
Providers,
|
||||||
sleep,
|
|
||||||
StagingAreaKeyLock,
|
StagingAreaKeyLock,
|
||||||
Workflow,
|
Workflow,
|
||||||
} from "@wormhole-foundation/relayer-engine"
|
} from "@wormhole-foundation/relayer-engine"
|
||||||
import * as wh from "@certusone/wormhole-sdk"
|
import * as wh from "@certusone/wormhole-sdk"
|
||||||
import {SignedVaa} from "@certusone/wormhole-sdk"
|
import {SignedVaa} from "@certusone/wormhole-sdk"
|
||||||
import {Logger} from "winston"
|
import { Logger } from "winston"
|
||||||
import {PluginError} from "./utils"
|
import { PluginError } from "./utils"
|
||||||
import {
|
import {
|
||||||
DeliveryInstructionsContainer,
|
DeliveryInstructionsContainer,
|
||||||
IDelivery,
|
IDelivery,
|
||||||
IWormhole__factory,
|
|
||||||
LogMessagePublishedEvent,
|
|
||||||
parseDeliveryInstructionsContainer,
|
parseDeliveryInstructionsContainer,
|
||||||
parsePayloadType,
|
parsePayloadType,
|
||||||
parseRedeliveryByTxHashInstruction,
|
parseRedeliveryByTxHashInstruction,
|
||||||
|
@ -30,9 +27,8 @@ import {
|
||||||
RelayProvider__factory,
|
RelayProvider__factory,
|
||||||
} from "../../../pkgs/sdk/src"
|
} from "../../../pkgs/sdk/src"
|
||||||
import * as ethers from "ethers"
|
import * as ethers from "ethers"
|
||||||
import {Implementation__factory} from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts"
|
import * as vaaFetching from "./vaaFetching"
|
||||||
import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-transport"
|
import * as syntheticBatch from "./syntheticBatch"
|
||||||
import {retryAsyncUntilDefined} from "ts-retry/lib/cjs/retry"
|
|
||||||
|
|
||||||
let PLUGIN_NAME: string = "GenericRelayerPlugin"
|
let PLUGIN_NAME: string = "GenericRelayerPlugin"
|
||||||
|
|
||||||
|
@ -70,27 +66,6 @@ interface WorkflowPayloadParsed {
|
||||||
vaas: Buffer[]
|
vaas: Buffer[]
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* DB types
|
|
||||||
*/
|
|
||||||
|
|
||||||
const PENDING = "pending"
|
|
||||||
interface Pending {
|
|
||||||
startTime: string
|
|
||||||
numTimesRetried: number
|
|
||||||
hash: string
|
|
||||||
nextRetryTime: string
|
|
||||||
}
|
|
||||||
|
|
||||||
interface Entry {
|
|
||||||
chainId: number
|
|
||||||
deliveryVaaIdx: number
|
|
||||||
vaas: { emitter: string; sequence: string; bytes: string }[]
|
|
||||||
allFetched: boolean
|
|
||||||
// only present for Redeliveries
|
|
||||||
redeliveryVaa?: string
|
|
||||||
}
|
|
||||||
|
|
||||||
export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
readonly shouldSpy: boolean
|
readonly shouldSpy: boolean
|
||||||
readonly shouldRest: boolean
|
readonly shouldRest: boolean
|
||||||
|
@ -109,7 +84,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async afterSetup(
|
async afterSetup(
|
||||||
providers: Providers,
|
_providers: Providers,
|
||||||
listenerResources?: {
|
listenerResources?: {
|
||||||
eventSource: (event: SignedVaa) => Promise<void>
|
eventSource: (event: SignedVaa) => Promise<void>
|
||||||
db: StagingAreaKeyLock
|
db: StagingAreaKeyLock
|
||||||
|
@ -124,116 +99,15 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (listenerResources) {
|
if (listenerResources) {
|
||||||
setTimeout(
|
vaaFetching.fetchVaaWorker(
|
||||||
() => this.fetchVaaWorker(listenerResources.eventSource, listenerResources.db),
|
listenerResources.eventSource,
|
||||||
0
|
listenerResources.db,
|
||||||
|
this.logger,
|
||||||
|
this.engineConfig
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fetchVaaWorker(
|
|
||||||
eventSource: (event: SignedVaa) => Promise<void>,
|
|
||||||
db: StagingAreaKeyLock
|
|
||||||
): Promise<void> {
|
|
||||||
const logger = getScopedLogger(["fetchWorker"], this.logger)
|
|
||||||
logger.debug(`Started fetchVaaWorker`)
|
|
||||||
while (true) {
|
|
||||||
await sleep(3_000) // todo: make configurable
|
|
||||||
|
|
||||||
// track which delivery vaa hashes have all vaas ready this iteration
|
|
||||||
let newlyResolved = new Map<string, Entry>()
|
|
||||||
await db.withKey([PENDING], async (kv: { [PENDING]?: Pending[] }, tx) => {
|
|
||||||
// if objects have not been created, initialize
|
|
||||||
if (!kv.pending) {
|
|
||||||
kv.pending = []
|
|
||||||
}
|
|
||||||
logger.debug(`Pending: ${JSON.stringify(kv.pending, undefined, 4)}`)
|
|
||||||
|
|
||||||
// filter to the pending items that are due to be retried
|
|
||||||
const entriesToFetch = kv.pending.filter(
|
|
||||||
(delivery) =>
|
|
||||||
new Date(JSON.parse(delivery.nextRetryTime)).getTime() < Date.now()
|
|
||||||
)
|
|
||||||
if (entriesToFetch.length === 0) {
|
|
||||||
return { newKV: kv, val: undefined }
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info(`Attempting to fetch ${entriesToFetch.length} entries`)
|
|
||||||
await db.withKey(
|
|
||||||
// get `Entry`s for each hash
|
|
||||||
entriesToFetch.map((d) => d.hash),
|
|
||||||
async (kv: Record<string, Entry>) => {
|
|
||||||
const promises = Object.entries(kv).map(async ([hash, entry]) => {
|
|
||||||
if (entry.allFetched) {
|
|
||||||
// nothing to do
|
|
||||||
logger.warn("Entry in pending but nothing to fetch " + hash)
|
|
||||||
return [hash, entry]
|
|
||||||
}
|
|
||||||
const newEntry: Entry = await this.fetchEntry(hash, entry, logger)
|
|
||||||
if (newEntry.allFetched) {
|
|
||||||
newlyResolved.set(hash, newEntry)
|
|
||||||
}
|
|
||||||
return [hash, newEntry]
|
|
||||||
})
|
|
||||||
|
|
||||||
const newKV = Object.fromEntries(await Promise.all(promises))
|
|
||||||
return { newKV, val: undefined }
|
|
||||||
},
|
|
||||||
tx
|
|
||||||
)
|
|
||||||
|
|
||||||
kv.pending = kv.pending.filter((p) => !newlyResolved.has(p.hash))
|
|
||||||
return { newKV: kv, val: undefined }
|
|
||||||
})
|
|
||||||
// kick off an engine listener event for each resolved delivery vaa
|
|
||||||
for (const entry of newlyResolved.values()) {
|
|
||||||
this.logger.info("Kicking off engine listener event for resolved entry")
|
|
||||||
if (entry.redeliveryVaa) {
|
|
||||||
eventSource(Buffer.from(entry.redeliveryVaa, "base64"))
|
|
||||||
} else {
|
|
||||||
eventSource(Buffer.from(entry.vaas[entry.deliveryVaaIdx].bytes, "base64"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fetchEntry(hash: string, value: Entry, logger: Logger): Promise<Entry> {
|
|
||||||
// track if there are missing vaas after trying to fetch
|
|
||||||
this.logger.info("Fetching entry...", { hash })
|
|
||||||
let hasMissingVaas = false
|
|
||||||
const vaas = await Promise.all(
|
|
||||||
value.vaas.map(async ({ emitter, sequence, bytes }, idx) => {
|
|
||||||
// skip if vaa has already been fetched
|
|
||||||
if (bytes.length !== 0) {
|
|
||||||
return { emitter, sequence, bytes }
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
// try to fetch vaa from guardian rpc
|
|
||||||
const resp = await wh.getSignedVAA(
|
|
||||||
this.engineConfig.wormholeRpc,
|
|
||||||
value.chainId as wh.EVMChainId,
|
|
||||||
emitter,
|
|
||||||
sequence,
|
|
||||||
{ transport: grpcWebNodeHttpTransport.NodeHttpTransport() }
|
|
||||||
)
|
|
||||||
logger.info(`Fetched vaa ${idx} for delivery ${hash}`)
|
|
||||||
return {
|
|
||||||
emitter,
|
|
||||||
sequence,
|
|
||||||
// base64 encode
|
|
||||||
bytes: Buffer.from(resp.vaaBytes).toString("base64"),
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
hasMissingVaas = true
|
|
||||||
this.logger.debug(e)
|
|
||||||
return { emitter, sequence, bytes: "" }
|
|
||||||
}
|
|
||||||
})
|
|
||||||
)
|
|
||||||
// if all vaas have been fetched, mark this hash as resolved
|
|
||||||
return { ...value, vaas, allFetched: !hasMissingVaas }
|
|
||||||
}
|
|
||||||
|
|
||||||
// listen to core relayer contract on each chain
|
// listen to core relayer contract on each chain
|
||||||
getFilters(): ContractFilter[] {
|
getFilters(): ContractFilter[] {
|
||||||
return Array.from(this.pluginConfig.supportedChains.entries()).map(
|
return Array.from(this.pluginConfig.supportedChains.entries()).map(
|
||||||
|
@ -246,18 +120,18 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
db: StagingAreaKeyLock,
|
db: StagingAreaKeyLock,
|
||||||
providers: Providers
|
providers: Providers
|
||||||
): Promise<{ workflowData: WorkflowPayload } | undefined> {
|
): Promise<{ workflowData: WorkflowPayload } | undefined> {
|
||||||
|
const hash = coreRelayerVaa.hash.toString("base64")
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
`Consuming event from chain ${
|
`Consuming event from chain ${
|
||||||
coreRelayerVaa.emitterChain
|
coreRelayerVaa.emitterChain
|
||||||
} with seq ${coreRelayerVaa.sequence.toString()} and hash ${Buffer.from(
|
} with seq ${coreRelayerVaa.sequence.toString()} and hash ${hash}`
|
||||||
coreRelayerVaa.hash
|
|
||||||
).toString("base64")}`
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Kick off workflow if entry has already been fetched
|
// Kick off workflow if entry has already been fetched
|
||||||
const payloadId = parsePayloadType(coreRelayerVaa.payload)
|
const payloadId = parsePayloadType(coreRelayerVaa.payload)
|
||||||
const hash = coreRelayerVaa.hash.toString("base64")
|
const { [hash]: fetched } = await db.getKeys<
|
||||||
const { [hash]: fetched } = await db.getKeys<Record<typeof hash, Entry>>([hash])
|
Record<typeof hash, vaaFetching.SyntheticBatchEntry>
|
||||||
|
>([hash])
|
||||||
if (fetched?.allFetched) {
|
if (fetched?.allFetched) {
|
||||||
// if all vaas have been fetched, kick off workflow
|
// if all vaas have been fetched, kick off workflow
|
||||||
this.logger.info(`All fetched, queueing workflow for ${hash}...`)
|
this.logger.info(`All fetched, queueing workflow for ${hash}...`)
|
||||||
|
@ -279,48 +153,6 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async consumeRedeliveryEvent(
|
|
||||||
redeliveryVaa: ParsedVaaWithBytes,
|
|
||||||
db: StagingAreaKeyLock,
|
|
||||||
providers: Providers
|
|
||||||
): Promise<{ workflowData: WorkflowPayload } | undefined> {
|
|
||||||
const redeliveryInstruction = parseRedeliveryByTxHashInstruction(
|
|
||||||
redeliveryVaa.payload
|
|
||||||
)
|
|
||||||
const chainId = redeliveryInstruction.sourceChain as wh.EVMChainId
|
|
||||||
const provider = providers.evm[chainId]
|
|
||||||
const config = this.pluginConfig.supportedChains.get(chainId)!
|
|
||||||
const rx = await provider.getTransactionReceipt(
|
|
||||||
ethers.utils.hexlify(redeliveryInstruction.sourceTxHash, {
|
|
||||||
allowMissingPrefix: true,
|
|
||||||
})
|
|
||||||
)
|
|
||||||
const { vaas, deliveryVaaIdx } = this.filterLogs(
|
|
||||||
rx,
|
|
||||||
chainId,
|
|
||||||
wh.tryNativeToHexString(config.relayerAddress, "ethereum"),
|
|
||||||
redeliveryInstruction.sourceNonce.toNumber()
|
|
||||||
)
|
|
||||||
|
|
||||||
// create entry and pending in db
|
|
||||||
const newEntry: Entry = {
|
|
||||||
vaas,
|
|
||||||
chainId,
|
|
||||||
deliveryVaaIdx,
|
|
||||||
redeliveryVaa: redeliveryVaa.bytes.toString("base64"),
|
|
||||||
allFetched: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`)
|
|
||||||
await this.addEntryToPendingQueue(
|
|
||||||
Buffer.from(redeliveryVaa.hash).toString("base64"),
|
|
||||||
newEntry,
|
|
||||||
db
|
|
||||||
)
|
|
||||||
|
|
||||||
return {} as any
|
|
||||||
}
|
|
||||||
|
|
||||||
async consumeDeliveryEvent(
|
async consumeDeliveryEvent(
|
||||||
coreRelayerVaa: ParsedVaaWithBytes,
|
coreRelayerVaa: ParsedVaaWithBytes,
|
||||||
db: StagingAreaKeyLock,
|
db: StagingAreaKeyLock,
|
||||||
|
@ -331,172 +163,96 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
`Not fetched, fetching receipt and filtering to synthetic batch for ${hash}...`
|
`Not fetched, fetching receipt and filtering to synthetic batch for ${hash}...`
|
||||||
)
|
)
|
||||||
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
|
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
|
||||||
const provider = providers.evm[chainId]
|
const rx = await syntheticBatch.fetchReceipt(
|
||||||
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId, provider)
|
coreRelayerVaa.sequence,
|
||||||
|
|
||||||
const emitter = wh.tryNativeToHexString(
|
|
||||||
wh.tryUint8ArrayToNative(coreRelayerVaa.emitterAddress, "ethereum"),
|
|
||||||
"ethereum"
|
|
||||||
)
|
|
||||||
const { vaas, deliveryVaaIdx } = this.filterLogs(
|
|
||||||
rx,
|
|
||||||
chainId,
|
chainId,
|
||||||
emitter,
|
providers.evm[chainId],
|
||||||
coreRelayerVaa.nonce
|
this.pluginConfig.supportedChains.get(chainId)!
|
||||||
|
)
|
||||||
|
|
||||||
|
const chainConfig = this.pluginConfig.supportedChains.get(chainId)!
|
||||||
|
const { vaas, deliveryVaaIdx } = syntheticBatch.filterLogs(
|
||||||
|
rx,
|
||||||
|
coreRelayerVaa.nonce,
|
||||||
|
chainConfig,
|
||||||
|
this.logger
|
||||||
)
|
)
|
||||||
vaas[deliveryVaaIdx].bytes = coreRelayerVaa.bytes.toString("base64")
|
vaas[deliveryVaaIdx].bytes = coreRelayerVaa.bytes.toString("base64")
|
||||||
|
|
||||||
// create entry and pending in db
|
// create entry and pending in db
|
||||||
const newEntry: Entry = {
|
const newEntry: vaaFetching.SyntheticBatchEntry = {
|
||||||
vaas,
|
vaas,
|
||||||
chainId,
|
chainId,
|
||||||
deliveryVaaIdx,
|
deliveryVaaIdx,
|
||||||
allFetched: false,
|
allFetched: false,
|
||||||
}
|
}
|
||||||
|
return await this.addWorkflowOrQueueEntryForFetching(db, hash, newEntry)
|
||||||
|
}
|
||||||
|
|
||||||
const maybeResolvedEntry = await this.fetchEntry(hash, newEntry, this.logger)
|
async consumeRedeliveryEvent(
|
||||||
if (maybeResolvedEntry.allFetched) {
|
redeliveryVaa: ParsedVaaWithBytes,
|
||||||
|
db: StagingAreaKeyLock,
|
||||||
|
providers: Providers
|
||||||
|
): Promise<{ workflowData: WorkflowPayload } | undefined> {
|
||||||
|
const redeliveryInstruction = parseRedeliveryByTxHashInstruction(
|
||||||
|
redeliveryVaa.payload
|
||||||
|
)
|
||||||
|
const chainId = redeliveryInstruction.sourceChain as wh.EVMChainId
|
||||||
|
const provider = providers.evm[chainId]
|
||||||
|
const rx = await provider.getTransactionReceipt(
|
||||||
|
ethers.utils.hexlify(redeliveryInstruction.sourceTxHash, {
|
||||||
|
allowMissingPrefix: true,
|
||||||
|
})
|
||||||
|
)
|
||||||
|
const chainConfig = this.pluginConfig.supportedChains.get(chainId)!
|
||||||
|
const { vaas, deliveryVaaIdx } = syntheticBatch.filterLogs(
|
||||||
|
rx,
|
||||||
|
redeliveryInstruction.sourceNonce.toNumber(),
|
||||||
|
chainConfig,
|
||||||
|
this.logger
|
||||||
|
)
|
||||||
|
|
||||||
|
// create entry and pending in db
|
||||||
|
const newEntry: vaaFetching.SyntheticBatchEntry = {
|
||||||
|
vaas,
|
||||||
|
chainId,
|
||||||
|
deliveryVaaIdx,
|
||||||
|
redeliveryVaa: redeliveryVaa.bytes.toString("base64"),
|
||||||
|
allFetched: false,
|
||||||
|
}
|
||||||
|
const hash = Buffer.from(redeliveryVaa.hash).toString("base64")
|
||||||
|
return this.addWorkflowOrQueueEntryForFetching(db, hash, newEntry)
|
||||||
|
}
|
||||||
|
|
||||||
|
async addWorkflowOrQueueEntryForFetching(
|
||||||
|
db: StagingAreaKeyLock,
|
||||||
|
hash: string,
|
||||||
|
entry: vaaFetching.SyntheticBatchEntry
|
||||||
|
): Promise<{ workflowData: WorkflowPayload } | undefined> {
|
||||||
|
const resolvedEntry = await vaaFetching.fetchEntry(
|
||||||
|
hash,
|
||||||
|
entry,
|
||||||
|
this.logger,
|
||||||
|
this.engineConfig
|
||||||
|
)
|
||||||
|
if (resolvedEntry.allFetched) {
|
||||||
this.logger.info("Resolved entry immediately")
|
this.logger.info("Resolved entry immediately")
|
||||||
return {
|
return {
|
||||||
workflowData: {
|
workflowData: {
|
||||||
payloadId: RelayerPayloadId.Delivery,
|
payloadId: entry.redeliveryVaa
|
||||||
deliveryVaaIndex: maybeResolvedEntry.deliveryVaaIdx,
|
? RelayerPayloadId.Redelivery
|
||||||
vaas: maybeResolvedEntry.vaas.map((v) => v.bytes),
|
: RelayerPayloadId.Delivery,
|
||||||
|
deliveryVaaIndex: resolvedEntry.deliveryVaaIdx,
|
||||||
|
vaas: resolvedEntry.vaas.map((v) => v.bytes),
|
||||||
|
redeliveryVaa: resolvedEntry.redeliveryVaa,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`)
|
await vaaFetching.addEntryToPendingQueue(hash, entry, db)
|
||||||
// todo: retry if withKey throws (possible contention with worker process)
|
|
||||||
await this.addEntryToPendingQueue(hash, newEntry, db)
|
|
||||||
|
|
||||||
// do not create workflow until we have collected all VAAs
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
async addEntryToPendingQueue(hash: string, newEntry: Entry, db: StagingAreaKeyLock) {
|
|
||||||
await retryAsyncUntilDefined(async () => {
|
|
||||||
try {
|
|
||||||
return db.withKey(
|
|
||||||
[hash, PENDING],
|
|
||||||
// note _hash is actually the value of the variable `hash`, but ts will not
|
|
||||||
// let this be expressed
|
|
||||||
async (kv: { [PENDING]: Pending[]; _hash: Entry }) => {
|
|
||||||
// @ts-ignore
|
|
||||||
let oldEntry: Entry | null = kv[hash]
|
|
||||||
if (oldEntry?.allFetched) {
|
|
||||||
return { newKV: kv, val: true }
|
|
||||||
}
|
|
||||||
if (kv[PENDING].findIndex((e) => e.hash === hash) !== -1) {
|
|
||||||
return { newKV: kv, val: true }
|
|
||||||
}
|
|
||||||
|
|
||||||
const now = Date.now().toString()
|
|
||||||
kv.pending.push({
|
|
||||||
nextRetryTime: now,
|
|
||||||
numTimesRetried: 0,
|
|
||||||
startTime: now,
|
|
||||||
hash,
|
|
||||||
})
|
|
||||||
// @ts-ignore
|
|
||||||
kv[hash] = newEntry
|
|
||||||
return { newKV: kv, val: true }
|
|
||||||
}
|
|
||||||
)
|
|
||||||
} catch (e) {
|
|
||||||
this.logger.warn(e)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// fetch the contract transaction receipt for the given sequence number emitted by the core relayer contract
|
|
||||||
async fetchReceipt(
|
|
||||||
sequence: BigInt,
|
|
||||||
chainId: wh.EVMChainId,
|
|
||||||
provider: ethers.providers.Provider
|
|
||||||
): Promise<ethers.ContractReceipt> {
|
|
||||||
const config = this.pluginConfig.supportedChains.get(chainId)!
|
|
||||||
const coreWHContract = IWormhole__factory.connect(config.coreContract!, provider)
|
|
||||||
const filter = coreWHContract.filters.LogMessagePublished(config.relayerAddress)
|
|
||||||
const blockNumber = await coreWHContract.provider.getBlockNumber()
|
|
||||||
for (let i = 0; i < 20; ++i) {
|
|
||||||
let paginatedLogs
|
|
||||||
if (i === 0) {
|
|
||||||
paginatedLogs = await coreWHContract.queryFilter(filter, -30)
|
|
||||||
} else {
|
|
||||||
paginatedLogs = await coreWHContract.queryFilter(
|
|
||||||
filter,
|
|
||||||
blockNumber - (i + 1) * 20,
|
|
||||||
blockNumber - i * 20
|
|
||||||
)
|
|
||||||
}
|
|
||||||
const log = paginatedLogs.find(
|
|
||||||
(log) => log.args.sequence.toString() === sequence.toString()
|
|
||||||
)
|
|
||||||
if (log) {
|
|
||||||
return log.getTransactionReceipt()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
return retryAsyncUntilDefined(
|
|
||||||
async () => {
|
|
||||||
const paginatedLogs = await coreWHContract.queryFilter(filter, -50)
|
|
||||||
const log = paginatedLogs.find(
|
|
||||||
(log) => log.args.sequence.toString() === sequence.toString()
|
|
||||||
)
|
|
||||||
if (log) {
|
|
||||||
return log.getTransactionReceipt()
|
|
||||||
}
|
|
||||||
return undefined
|
|
||||||
},
|
|
||||||
{ maxTry: 10, delay: 500 }
|
|
||||||
)
|
|
||||||
} catch {
|
|
||||||
throw new PluginError("Could not find contract receipt", { sequence, chainId })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
filterLogs(
|
|
||||||
rx: ethers.ContractReceipt,
|
|
||||||
chainId: wh.EVMChainId,
|
|
||||||
emitterAddress: string, //hex
|
|
||||||
nonce: number
|
|
||||||
): {
|
|
||||||
vaas: {
|
|
||||||
sequence: string
|
|
||||||
emitter: string
|
|
||||||
bytes: string
|
|
||||||
}[]
|
|
||||||
deliveryVaaIdx: number
|
|
||||||
} {
|
|
||||||
const onlyVAALogs = rx.logs.filter(
|
|
||||||
(log) =>
|
|
||||||
log.address === this.pluginConfig.supportedChains.get(chainId)?.coreContract
|
|
||||||
)
|
|
||||||
const vaas = onlyVAALogs.flatMap((bridgeLog: ethers.providers.Log) => {
|
|
||||||
const iface = Implementation__factory.createInterface()
|
|
||||||
const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent
|
|
||||||
// filter down to just synthetic batch
|
|
||||||
if (log.args.nonce !== nonce) {
|
|
||||||
return []
|
|
||||||
}
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
sequence: log.args.sequence.toString(),
|
|
||||||
emitter: wh.tryNativeToHexString(log.args.sender, "ethereum"),
|
|
||||||
bytes: "",
|
|
||||||
},
|
|
||||||
]
|
|
||||||
})
|
|
||||||
this.logger.debug(vaas)
|
|
||||||
const deliveryVaaIdx = vaas.findIndex((vaa) => vaa.emitter === emitterAddress)
|
|
||||||
if (deliveryVaaIdx === -1) {
|
|
||||||
throw new PluginError("CoreRelayerVaa not found in fetched vaas", {
|
|
||||||
vaas,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return { vaas, deliveryVaaIdx }
|
|
||||||
}
|
|
||||||
async handleWorkflow(
|
async handleWorkflow(
|
||||||
workflow: Workflow<WorkflowPayload>,
|
workflow: Workflow<WorkflowPayload>,
|
||||||
_providers: Providers,
|
_providers: Providers,
|
||||||
|
@ -504,8 +260,6 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
this.logger.info("Got workflow")
|
this.logger.info("Got workflow")
|
||||||
this.logger.info(JSON.stringify(workflow, undefined, 2))
|
this.logger.info(JSON.stringify(workflow, undefined, 2))
|
||||||
this.logger.info(workflow.data.deliveryVaaIndex)
|
|
||||||
this.logger.info(workflow.data.vaas)
|
|
||||||
const payload = this.parseWorkflowPayload(workflow)
|
const payload = this.parseWorkflowPayload(workflow)
|
||||||
switch (payload.payloadId) {
|
switch (payload.payloadId) {
|
||||||
case RelayerPayloadId.Delivery:
|
case RelayerPayloadId.Delivery:
|
||||||
|
@ -528,7 +282,6 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
const chainId = assertEvmChainId(ix.targetChain)
|
const chainId = assertEvmChainId(ix.targetChain)
|
||||||
const budget = ix.receiverValueTarget.add(ix.maximumRefundTarget).add(100)
|
const budget = ix.receiverValueTarget.add(ix.maximumRefundTarget).add(100)
|
||||||
|
|
||||||
// todo: consider parallelizing this
|
|
||||||
await execute.onEVM({
|
await execute.onEVM({
|
||||||
chainId,
|
chainId,
|
||||||
f: async ({ wallet }) => {
|
f: async ({ wallet }) => {
|
||||||
|
@ -571,7 +324,6 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const redelivery = payload.redelivery!
|
const redelivery = payload.redelivery!
|
||||||
const chainId = assertEvmChainId(redelivery.ix.targetChain)
|
const chainId = assertEvmChainId(redelivery.ix.targetChain)
|
||||||
// todo: consider parallelizing this
|
|
||||||
await execute.onEVM({
|
await execute.onEVM({
|
||||||
chainId,
|
chainId,
|
||||||
f: async ({ wallet }) => {
|
f: async ({ wallet }) => {
|
||||||
|
@ -656,11 +408,3 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function assertEvmChainId(chainId: number): wh.EVMChainId {
|
|
||||||
if (!wh.isEVMChain(chainId as wh.ChainId)) {
|
|
||||||
throw new PluginError("Expected wh evm chainId for target chain", {
|
|
||||||
chainId,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return chainId as wh.EVMChainId
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,96 @@
|
||||||
|
import * as wh from "@certusone/wormhole-sdk"
|
||||||
|
import { PluginError } from "./utils"
|
||||||
|
import { IWormhole__factory, LogMessagePublishedEvent } from "../../../pkgs/sdk/src"
|
||||||
|
import * as ethers from "ethers"
|
||||||
|
import { Implementation__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts"
|
||||||
|
import { retryAsyncUntilDefined } from "ts-retry/lib/cjs/retry"
|
||||||
|
import { ChainInfo } from "./plugin"
|
||||||
|
import { ScopedLogger } from "@wormhole-foundation/relayer-engine/relayer-engine/lib/helpers/logHelper"
|
||||||
|
import { tryNativeToHexString } from "@certusone/wormhole-sdk"
|
||||||
|
|
||||||
|
// fetch the contract transaction receipt for the given sequence number emitted by the core relayer contract
|
||||||
|
export async function fetchReceipt(
|
||||||
|
sequence: BigInt,
|
||||||
|
chainId: wh.EVMChainId,
|
||||||
|
provider: ethers.providers.Provider,
|
||||||
|
chainConfig: ChainInfo
|
||||||
|
): Promise<ethers.ContractReceipt> {
|
||||||
|
const coreWHContract = IWormhole__factory.connect(chainConfig.coreContract!, provider)
|
||||||
|
const filter = coreWHContract.filters.LogMessagePublished(chainConfig.relayerAddress)
|
||||||
|
const blockNumber = await coreWHContract.provider.getBlockNumber()
|
||||||
|
for (let i = 0; i < 20; ++i) {
|
||||||
|
let paginatedLogs
|
||||||
|
if (i === 0) {
|
||||||
|
paginatedLogs = await coreWHContract.queryFilter(filter, -30)
|
||||||
|
} else {
|
||||||
|
paginatedLogs = await coreWHContract.queryFilter(
|
||||||
|
filter,
|
||||||
|
blockNumber - (i + 1) * 20,
|
||||||
|
blockNumber - i * 20
|
||||||
|
)
|
||||||
|
}
|
||||||
|
const log = paginatedLogs.find(
|
||||||
|
(log) => log.args.sequence.toString() === sequence.toString()
|
||||||
|
)
|
||||||
|
if (log) {
|
||||||
|
return await log.getTransactionReceipt()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return await retryAsyncUntilDefined(
|
||||||
|
async () => {
|
||||||
|
const paginatedLogs = await coreWHContract.queryFilter(filter, -50)
|
||||||
|
const log = paginatedLogs.find(
|
||||||
|
(log) => log.args.sequence.toString() === sequence.toString()
|
||||||
|
)
|
||||||
|
if (log) {
|
||||||
|
return await log.getTransactionReceipt()
|
||||||
|
}
|
||||||
|
return undefined
|
||||||
|
},
|
||||||
|
{ maxTry: 10, delay: 500 }
|
||||||
|
)
|
||||||
|
} catch {
|
||||||
|
throw new PluginError("Could not find contract receipt", { sequence, chainId })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function filterLogs(
|
||||||
|
rx: ethers.ContractReceipt,
|
||||||
|
nonce: number,
|
||||||
|
chainConfig: ChainInfo,
|
||||||
|
logger: ScopedLogger
|
||||||
|
): {
|
||||||
|
vaas: {
|
||||||
|
sequence: string
|
||||||
|
emitter: string
|
||||||
|
bytes: string
|
||||||
|
}[]
|
||||||
|
deliveryVaaIdx: number
|
||||||
|
} {
|
||||||
|
const onlyVAALogs = rx.logs.filter((log) => log.address === chainConfig.coreContract)
|
||||||
|
const vaas = onlyVAALogs.flatMap((bridgeLog: ethers.providers.Log) => {
|
||||||
|
const iface = Implementation__factory.createInterface()
|
||||||
|
const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent
|
||||||
|
// filter down to just synthetic batch
|
||||||
|
if (log.args.nonce !== nonce) {
|
||||||
|
return []
|
||||||
|
}
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
sequence: log.args.sequence.toString(),
|
||||||
|
emitter: wh.tryNativeToHexString(log.args.sender, "ethereum"),
|
||||||
|
bytes: "",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
})
|
||||||
|
logger.debug(vaas)
|
||||||
|
const emitterAddress = tryNativeToHexString(chainConfig.relayerAddress, "ethereum")
|
||||||
|
const deliveryVaaIdx = vaas.findIndex((vaa) => vaa.emitter === emitterAddress)
|
||||||
|
if (deliveryVaaIdx === -1) {
|
||||||
|
throw new PluginError("CoreRelayerVaa not found in fetched vaas", {
|
||||||
|
vaas,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return { vaas, deliveryVaaIdx }
|
||||||
|
}
|
|
@ -1,5 +1,17 @@
|
||||||
|
import {
|
||||||
|
ChainId,
|
||||||
|
EVMChainId,
|
||||||
|
isEVMChain,
|
||||||
|
tryNativeToHexString,
|
||||||
|
tryUint8ArrayToNative,
|
||||||
|
} from "@certusone/wormhole-sdk"
|
||||||
|
|
||||||
export class PluginError extends Error {
|
export class PluginError extends Error {
|
||||||
constructor(msg: string, public args?: Record<any, any>) {
|
constructor(msg: string, public args?: Record<any, any>) {
|
||||||
super(msg)
|
super(msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function convertAddressBytesToHex(bytes: Uint8Array | Buffer): string {
|
||||||
|
return tryNativeToHexString(tryUint8ArrayToNative(bytes, "ethereum"), "ethereum")
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,193 @@
|
||||||
|
import { redeemOnXpla, SignedVaa } from "@certusone/wormhole-sdk"
|
||||||
|
import {
|
||||||
|
StagingAreaKeyLock,
|
||||||
|
getScopedLogger,
|
||||||
|
sleep,
|
||||||
|
second,
|
||||||
|
} from "@wormhole-foundation/relayer-engine"
|
||||||
|
import { ScopedLogger } from "@wormhole-foundation/relayer-engine/relayer-engine/lib/helpers/logHelper"
|
||||||
|
import { logger } from "ethers"
|
||||||
|
import { retryAsyncUntilDefined } from "ts-retry/lib/cjs/retry"
|
||||||
|
import { Logger } from "winston"
|
||||||
|
import * as wh from "@certusone/wormhole-sdk"
|
||||||
|
import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-transport"
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DB types
|
||||||
|
*/
|
||||||
|
|
||||||
|
export const PENDING = "pending"
|
||||||
|
export interface Pending {
|
||||||
|
startTime: string
|
||||||
|
numTimesRetried: number
|
||||||
|
hash: string
|
||||||
|
nextRetryTime: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface SyntheticBatchEntry {
|
||||||
|
chainId: number
|
||||||
|
deliveryVaaIdx: number
|
||||||
|
vaas: { emitter: string; sequence: string; bytes: string }[]
|
||||||
|
allFetched: boolean
|
||||||
|
// only present for Redeliveries
|
||||||
|
redeliveryVaa?: string
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function fetchVaaWorker(
|
||||||
|
eventSource: (event: SignedVaa) => Promise<void>,
|
||||||
|
db: StagingAreaKeyLock,
|
||||||
|
parentLogger: ScopedLogger,
|
||||||
|
engineConfig: { wormholeRpc: string }
|
||||||
|
): Promise<void> {
|
||||||
|
const logger = getScopedLogger(["fetchWorker"], parentLogger)
|
||||||
|
logger.info(`Started fetchVaaWorker`)
|
||||||
|
while (true) {
|
||||||
|
await sleep(3_000) // todo: make configurable
|
||||||
|
|
||||||
|
// track which delivery vaa hashes have all vaas ready this iteration
|
||||||
|
let newlyResolved = new Map<string, SyntheticBatchEntry>()
|
||||||
|
await db.withKey([PENDING], async (kv: { [PENDING]?: Pending[] }, tx) => {
|
||||||
|
// if objects have not been created, initialize
|
||||||
|
if (!kv.pending) {
|
||||||
|
kv.pending = []
|
||||||
|
}
|
||||||
|
logger.debug(`Pending: ${JSON.stringify(kv.pending, undefined, 4)}`)
|
||||||
|
|
||||||
|
// filter to the pending items that are due to be retried
|
||||||
|
const entriesToFetch = kv.pending.filter(
|
||||||
|
(delivery) => new Date(JSON.parse(delivery.nextRetryTime)).getTime() < Date.now()
|
||||||
|
)
|
||||||
|
if (entriesToFetch.length === 0) {
|
||||||
|
return { newKV: kv, val: undefined }
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`Attempting to fetch ${entriesToFetch.length} entries`)
|
||||||
|
await db.withKey(
|
||||||
|
// get `SyntheticBatchEntry`s for each hash
|
||||||
|
entriesToFetch.map((d) => d.hash),
|
||||||
|
async (kv: Record<string, SyntheticBatchEntry>) => {
|
||||||
|
const promises = Object.entries(kv).map(async ([hash, entry]) => {
|
||||||
|
if (entry.allFetched) {
|
||||||
|
// nothing to do
|
||||||
|
logger.warn("Entry in pending but nothing to fetch " + hash)
|
||||||
|
return [hash, entry]
|
||||||
|
}
|
||||||
|
const newEntry: SyntheticBatchEntry = await fetchEntry(
|
||||||
|
hash,
|
||||||
|
entry,
|
||||||
|
logger,
|
||||||
|
engineConfig
|
||||||
|
)
|
||||||
|
if (newEntry.allFetched) {
|
||||||
|
newlyResolved.set(hash, newEntry)
|
||||||
|
}
|
||||||
|
return [hash, newEntry]
|
||||||
|
})
|
||||||
|
|
||||||
|
const newKV = Object.fromEntries(await Promise.all(promises))
|
||||||
|
return { newKV, val: undefined }
|
||||||
|
},
|
||||||
|
tx
|
||||||
|
)
|
||||||
|
|
||||||
|
kv[PENDING] = kv[PENDING].filter((p) => !newlyResolved.has(p.hash)).map((x) => ({
|
||||||
|
...x,
|
||||||
|
numTimesRetried: x.numTimesRetried + 1,
|
||||||
|
nextRetryTime: new Date(Date.now() + second * x.numTimesRetried).toString(),
|
||||||
|
}))
|
||||||
|
return { newKV: kv, val: undefined }
|
||||||
|
})
|
||||||
|
// kick off an engine listener event for each resolved delivery vaa
|
||||||
|
for (const entry of newlyResolved.values()) {
|
||||||
|
logger.info("Kicking off engine listener event for resolved entry")
|
||||||
|
if (entry.redeliveryVaa) {
|
||||||
|
eventSource(Buffer.from(entry.redeliveryVaa, "base64"))
|
||||||
|
} else {
|
||||||
|
eventSource(Buffer.from(entry.vaas[entry.deliveryVaaIdx].bytes, "base64"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function fetchEntry(
|
||||||
|
hash: string,
|
||||||
|
value: SyntheticBatchEntry,
|
||||||
|
logger: Logger,
|
||||||
|
engineConfig: { wormholeRpc: string }
|
||||||
|
): Promise<SyntheticBatchEntry> {
|
||||||
|
logger.info("Fetching SyntheticBatchEntry...", { hash })
|
||||||
|
// track if there are missing vaas after trying to fetch
|
||||||
|
let hasMissingVaas = false
|
||||||
|
|
||||||
|
// for each entry, attempt to fetch vaas from wormhole rpc
|
||||||
|
const vaas = await Promise.all(
|
||||||
|
value.vaas.map(async ({ emitter, sequence, bytes }, idx) => {
|
||||||
|
// skip if vaa has already been fetched
|
||||||
|
if (bytes.length !== 0) {
|
||||||
|
return { emitter, sequence, bytes }
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// try to fetch vaa from rpc
|
||||||
|
const resp = await wh.getSignedVAA(
|
||||||
|
engineConfig.wormholeRpc,
|
||||||
|
value.chainId as wh.EVMChainId,
|
||||||
|
emitter,
|
||||||
|
sequence,
|
||||||
|
{ transport: grpcWebNodeHttpTransport.NodeHttpTransport() }
|
||||||
|
)
|
||||||
|
logger.info(`Fetched vaa ${idx} for delivery ${hash}`)
|
||||||
|
return {
|
||||||
|
emitter,
|
||||||
|
sequence,
|
||||||
|
// base64 encode
|
||||||
|
bytes: Buffer.from(resp.vaaBytes).toString("base64"),
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
hasMissingVaas = true
|
||||||
|
logger.debug(e)
|
||||||
|
return { emitter, sequence, bytes: "" }
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
// if all vaas have been fetched, mark this hash as resolved
|
||||||
|
return { ...value, vaas, allFetched: !hasMissingVaas }
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function addEntryToPendingQueue(
|
||||||
|
hash: string,
|
||||||
|
newEntry: SyntheticBatchEntry,
|
||||||
|
db: StagingAreaKeyLock
|
||||||
|
) {
|
||||||
|
await retryAsyncUntilDefined(async () => {
|
||||||
|
try {
|
||||||
|
return db.withKey(
|
||||||
|
[hash, PENDING],
|
||||||
|
// note _hash is actually the value of the variable `hash`, but ts will not
|
||||||
|
// let this be expressed
|
||||||
|
async (kv: { [PENDING]: Pending[]; _hash: SyntheticBatchEntry }) => {
|
||||||
|
// @ts-ignore
|
||||||
|
let oldEntry: SyntheticBatchEntry | null = kv[hash]
|
||||||
|
if (oldEntry?.allFetched) {
|
||||||
|
return { newKV: kv, val: true }
|
||||||
|
}
|
||||||
|
if (kv[PENDING].findIndex((e) => e.hash === hash) !== -1) {
|
||||||
|
return { newKV: kv, val: true }
|
||||||
|
}
|
||||||
|
|
||||||
|
const now = Date.now().toString()
|
||||||
|
kv.pending.push({
|
||||||
|
nextRetryTime: now,
|
||||||
|
numTimesRetried: 0,
|
||||||
|
startTime: now,
|
||||||
|
hash,
|
||||||
|
})
|
||||||
|
// @ts-ignore
|
||||||
|
kv[hash] = newEntry
|
||||||
|
return { newKV: kv, val: true }
|
||||||
|
}
|
||||||
|
)
|
||||||
|
} catch (e) {
|
||||||
|
logger.warn(e)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue