Wait for signed relayer VAA, then fetch others based off that

This commit is contained in:
Joe Howarth 2023-01-09 16:30:25 -07:00
parent 0e2e6dc7f2
commit 50bd76f01d
6 changed files with 118 additions and 243 deletions

View File

@ -1,5 +1,9 @@
import { EVMChainId } from "@certusone/wormhole-sdk"
import * as relayerEngine from "@wormhole-foundation/relayer-engine"
import GenericRelayerPluginDef, { GenericRelayerPluginConfig } from "./plugin/src/plugin"
import GenericRelayerPluginDef, {
ChainInfo,
GenericRelayerPluginConfig,
} from "./plugin/src/plugin"
async function main() {
// load plugin config
@ -8,6 +12,25 @@ async function main() {
`./src/plugin/config/${envType.toLowerCase()}.json`
)) as GenericRelayerPluginConfig
const contracts = await relayerEngine.loadFileAndParseToObject(
`../ethereum/ts-scripts/config/${envType
.toLocaleLowerCase()
.replace("devnet", "testnet")}/contracts.json`
)
const supportedChains = pluginConfig.supportedChains as unknown as Record<
any,
ChainInfo
>
contracts.coreRelayers.forEach(
({ chainId, address }: contractConfigEntry) =>
(supportedChains[chainId].relayerAddress = address)
)
contracts.mockIntegrations.forEach(
({ chainId, address }: contractConfigEntry) =>
(supportedChains[chainId].mockIntegrationContractAddress = address)
)
pluginConfig.supportedChains = supportedChains as any
// run relayer engine
await relayerEngine.run({
configs: "./engine_config/" + envType.toLowerCase(),
@ -34,3 +57,5 @@ main().catch((e) => {
console.error(e)
process.exit(1)
})
type contractConfigEntry = { chainId: EVMChainId; address: "string" }

View File

@ -1,5 +1,5 @@
{
"shouldSpy": false,
"shouldSpy": true,
"shouldRest": false,
"logWatcherSleepMs": 300000,
"supportedChains": {

View File

@ -1,5 +1,5 @@
{
"shouldSpy": false,
"shouldSpy": true,
"shouldRest": false,
"logWatcherSleepMs": 300000,
"supportedChains": {

View File

@ -9,7 +9,7 @@
"watch": "tsc --watch"
},
"dependencies": {
"@certusone/wormhole-sdk": "^0.9.0",
"@certusone/wormhole-sdk": "^0.9.6",
"generic-relayer-sdk": "file:../../../sdk",
"relayer-engine": "github:wormhole-foundation/relayer-engine"
},

View File

@ -5,34 +5,20 @@ import {
CommonPluginEnv,
ContractFilter,
dbg,
EventSource,
ParsedVaaWithBytes,
parseVaaWithBytes,
Plugin,
PluginDefinition,
Providers,
sleep,
StagingAreaKeyLock,
Workflow,
} from "@wormhole-foundation/relayer-engine"
import * as wh from "@certusone/wormhole-sdk"
import { Logger } from "winston"
import { PluginError } from "./utils"
import {
EVMChainId,
parseSequenceFromLogEth,
parseSequencesFromLogEth,
SignedVaa,
} from "@certusone/wormhole-sdk"
import {
CoreRelayer__factory,
IWormhole,
IWormhole__factory,
MockRelayerIntegration__factory,
MockRelayerIntegration,
} from "../../../../sdk/src"
import { parseSequencesFromLogEth, SignedVaa } from "@certusone/wormhole-sdk"
import { CoreRelayer__factory, IWormhole, IWormhole__factory } from "../../../../sdk/src"
import * as ethers from "ethers"
import { TypedEvent } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts/commons"
import { Implementation__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts"
import { LogMessagePublishedEvent } from "../../../../sdk/src/ethers-contracts/IWormhole"
import { CoreRelayerStructs } from "../../../../sdk/src/ethers-contracts/CoreRelayer"
@ -41,11 +27,11 @@ import * as _ from "lodash"
let PLUGIN_NAME: string = "GenericRelayerPlugin"
export interface ChainInfo {
chainId: wh.ChainId
coreContract?: IWormhole
relayerAddress: string
mockIntegrationContractAddress: string
}
export interface GenericRelayerPluginConfig {
supportedChains: Map<wh.EVMChainId, ChainInfo>
logWatcherSleepMs: number
@ -54,14 +40,15 @@ export interface GenericRelayerPluginConfig {
}
interface WorkflowPayload {
coreRelayerVaa: string // base64
otherVaas: string[] // base64
coreRelayerVaaIndex: number
vaas: string[] // base64
}
interface WorkflowPayloadParsed {
deliveryInstructionsContainer: DeliveryInstructionsContainer
coreRelayerVaaIndex: number
coreRelayerVaa: ParsedVaaWithBytes
otherVaas: ParsedVaaWithBytes[]
vaas: Buffer[]
}
export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
@ -76,9 +63,6 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
pluginConfigRaw: Record<string, any>,
readonly logger: Logger
) {
console.log(`Config: ${JSON.stringify(engineConfig, undefined, 2)}`)
console.log(`Plugin Env: ${JSON.stringify(pluginConfigRaw, undefined, 2)}`)
this.pluginConfig = GenericRelayerPlugin.validateConfig(pluginConfigRaw)
this.shouldRest = this.pluginConfig.shouldRest
this.shouldSpy = this.pluginConfig.shouldSpy
@ -86,126 +70,34 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
async afterSetup(
providers: Providers,
eventSource?: (event: SignedVaa) => Promise<void>
_eventSource?: (event: SignedVaa) => Promise<void>
) {
if (!eventSource) {
this.logger.info(
"No eventSource function provided, not subscribing to blockchain rpc"
)
return
}
this.logger.info("Starting logWatcher event source...")
// connect to the core wh contract for each chain
for (const [chainId, info] of this.pluginConfig.supportedChains.entries()) {
const chainName = wh.coalesceChainName(chainId)
const { core } = wh.CONTRACTS.TESTNET[chainName]
if (!core || !wh.isEVMChain(chainId)) {
this.logger.error("No known core contract for chain", chainName)
throw new Error("No known core contract for chain")
throw new PluginError("No known core contract for chain", { chainName })
}
info.coreContract = IWormhole__factory.connect(
core,
providers.evm[chainId as wh.EVMChainId]
)
}
// fire off task
this.subscribeToEvents(eventSource)
}
async subscribeToEvents(eventSource: (event: SignedVaa) => Promise<void>) {
// need to keep same function object to unsubscribe from events successfully
const fns: Record<number, any> = {}
while (true) {
// resubscribe to contract events every 5 minutes
for (const [
chainId,
{ relayerAddress, coreContract },
] of this.pluginConfig.supportedChains.entries()) {
if (!wh.isEVMChain(chainId) || !coreContract) {
throw new PluginError("Invalid chain not evm", { chainId })
}
try {
if (!fns[chainId]) {
fns[chainId] = (
_sender: string,
sequence: ethers.BigNumber,
_nonce: number,
payload: string,
_consistencyLevel: number,
typedEvent: TypedEvent<any>
) =>
this.handleRelayerEvent(
eventSource,
chainId as wh.EVMChainId,
payload,
typedEvent
)
}
coreContract.off(
coreContract.filters.LogMessagePublished(relayerAddress),
fns[chainId]
)
coreContract.on(
coreContract.filters.LogMessagePublished(relayerAddress),
fns[chainId]
)
this.logger.info(
`Subscribed to ${wh.coalesceChainName(chainId)} ${
coreContract.address
} ${relayerAddress}`
)
} catch (e: any) {
// todo: improve error handling
this.logger.error(e)
}
}
await sleep(this.pluginConfig.logWatcherSleepMs)
}
}
async handleRelayerEvent(
this: GenericRelayerPlugin,
eventSource: EventSource,
chainId: wh.EVMChainId,
payload: string,
typedEvent: TypedEvent<
[string, ethers.BigNumber, number, string, number] & {
sender: string
sequence: ethers.BigNumber
nonce: number
payload: string
consistencyLevel: number
}
>
): Promise<void> {
dbg(payload, "payload")
dbg(typedEvent.args.payload, "event args payload")
parsePayloadType(payload)
const rx = await typedEvent.getTransactionReceipt()
// todo: will need to tweak retry and backoff params
const resp = await wh.getSignedVAAWithRetry(
["https://wormhole-v2-testnet-api.certus.one"],
chainId,
wh.tryNativeToHexString(typedEvent.args.sender, "ethereum"),
typedEvent.args.sequence.toString(),
undefined,
undefined,
10
)
eventSource(resp.vaaBytes, [rx])
}
// This plugin listens to batches or blockchain rpc events,
// so we actually want to bypass the inbuilt filters.
// listen to core relayer contract on each chain
getFilters(): ContractFilter[] {
return []
return Array.from(this.pluginConfig.supportedChains.entries()).map(
([chainId, c]) => ({ emitterAddress: c.relayerAddress, chainId })
)
}
async consumeEvent(
coreRelayerVaa: ParsedVaaWithBytes,
_stagingArea: StagingAreaKeyLock,
_providers: Providers,
[rx]: [ethers.ContractReceipt]
_providers: Providers
): Promise<{ workflowData?: WorkflowPayload }> {
const payloadType = parsePayloadType(coreRelayerVaa.payload)
if (payloadType !== RelayerPayloadType.Delivery) {
@ -216,74 +108,83 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
return {}
}
const emitterChain = coreRelayerVaa.emitterChain as wh.EVMChainId
const otherVaas = await this.fetchOtherVaas(
rx,
coreRelayerVaa.nonce,
emitterChain,
coreRelayerVaa.emitterAddress
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId)
const allVAAs = await this.fetchOtherVaas(rx, coreRelayerVaa.nonce, chainId)
const coreRelayerVaaIndex = allVAAs.findIndex((vaa) =>
vaa.emitterAddress.equals(coreRelayerVaa.emitterAddress)
)
if (coreRelayerVaaIndex === -1) {
throw new PluginError("CoreRelayerVaa not found in fetched vaas", { vaas: allVAAs })
}
return {
workflowData: {
coreRelayerVaa: dbg(
coreRelayerVaa.bytes.toString("base64"),
"coreRelayerSerialized"
),
otherVaas: otherVaas.map((vaa) => vaa.bytes.toString("base64")),
coreRelayerVaaIndex,
vaas: allVAAs.map((vaa) => vaa.bytes.toString("base64")),
},
}
}
// fetch the contract transaction receipt for the given sequence number emitted by the core relayer contract
async fetchReceipt(
sequence: BigInt,
chainId: wh.EVMChainId
): Promise<ethers.ContractReceipt> {
const config = this.pluginConfig.supportedChains.get(chainId)!
const coreWHContract = config.coreContract!
const filter = coreWHContract.filters.LogMessagePublished(config.relayerAddress)
const blockNumber = await coreWHContract.provider.getBlockNumber()
for (let i = 0; i < 20; ++i) {
let paginatedLogs
if (i === 0) {
paginatedLogs = await coreWHContract.queryFilter(filter, -20)
} else {
paginatedLogs = await coreWHContract.queryFilter(
filter,
blockNumber - (i + 1) * 20,
blockNumber - i * 20
)
}
const log = paginatedLogs.find(
(log) => log.args.sequence.toString() === sequence.toString()
)
if (log) {
return await log.getTransactionReceipt()
}
}
throw new PluginError("Could not find contract receipt", { sequence, chainId })
}
async fetchOtherVaas(
rx: ethers.ContractReceipt,
batchId: number, // aka nonce
chainId: wh.EVMChainId,
coreRelayerAddress: Buffer
chainId: wh.EVMChainId
): Promise<ParsedVaaWithBytes[]> {
// parse log and fetch vaa
const logToVaa = async (bridgeLog: ethers.providers.Log) => {
this.logger.info("Bridge log: " + JSON.stringify(bridgeLog))
const log = Implementation__factory.createInterface().parseLog(
bridgeLog
) as unknown as LogMessagePublishedEvent
const sender = wh.tryNativeToHexString(log.args.sender, "ethereum")
if (
dbg(log.args.sender.toLowerCase(), "log sender") ===
dbg(
wh.tryUint8ArrayToNative(coreRelayerAddress, "ethereum").toLowerCase(),
"coreRelayerAddress"
)
) {
this.logger.info("Hitting undefined")
return undefined
}
// todo: this should handle VAAs that are not ready for hours
const resp = await wh.getSignedVAAWithRetry(
["https://wormhole-v2-testnet-api.certus.one"],
chainId,
sender,
log.args.sequence.toString(),
undefined,
undefined,
10
)
return parseVaaWithBytes(resp.vaaBytes)
}
// collect all vaas
// @ts-ignore
const vaas: ParsedVaaWithBytes[] = (
await Promise.all(
rx.logs
.filter(
(log) =>
log.address ===
this.pluginConfig.supportedChains.get(chainId)?.coreContract?.address
)
.map(logToVaa)
)
).filter((x) => x)
const onlyVAALogs = rx.logs.filter(
(log) =>
log.address ===
this.pluginConfig.supportedChains.get(chainId)?.coreContract?.address
)
const vaas: ParsedVaaWithBytes[] = await Promise.all(
onlyVAALogs.map(async (bridgeLog: ethers.providers.Log) => {
const iface = Implementation__factory.createInterface()
const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent
const resp = await wh.getSignedVAAWithRetry(
["https://wormhole-v2-testnet-api.certus.one"],
chainId,
wh.tryNativeToHexString(log.args.sender, "ethereum"),
log.args.sequence.toString(),
undefined,
undefined,
10
)
return parseVaaWithBytes(resp.vaaBytes)
})
)
if (vaas.length == 0) {
// todo: figure out error handling for subscription code
@ -294,7 +195,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
async handleWorkflow(
workflow: Workflow<WorkflowPayload>,
providers: Providers,
_providers: Providers,
execute: ActionExecutor
): Promise<void> {
this.logger.info("Got workflow")
@ -302,35 +203,20 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
console.log("sanity console log")
const payload = this.parseWorkflowPayload(workflow)
this.logger.info("after parse")
for (let i = 0; i < payload.deliveryInstructionsContainer.instructions.length; i++) {
this.logger.info("top of loop")
const ix = payload.deliveryInstructionsContainer.instructions[i]
const budget = ix.applicationBudgetTarget.add(ix.maximumRefundTarget).add(100) // todo: add wormhole fee
const input: CoreRelayerStructs.TargetDeliveryParametersSingleStruct = {
// todo: get vaas in order they were emitted
encodedVMs: [
...payload.otherVaas.map((v) => v.bytes),
payload.coreRelayerVaa.bytes,
],
deliveryIndex: 1,
encodedVMs: payload.vaas,
deliveryIndex: payload.coreRelayerVaaIndex,
multisendIndex: i,
}
const chainId = ix.targetChain as wh.EVMChainId
// todo: consider parallelizing this
await execute.onEVM({
chainId,
f: async ({ wallet }) => {
let message = await MockRelayerIntegration__factory.connect(
this.pluginConfig.supportedChains.get(chainId)!
.mockIntegrationContractAddress,
wallet
).getMessage()
arrayify(message)
this.logger.info(
`Message is ${message} ${Buffer.from(message, "hex").toString()}`
)
const coreRelayer = CoreRelayer__factory.connect(
this.pluginConfig.supportedChains.get(chainId)!.relayerAddress,
wallet
@ -339,45 +225,11 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
.deliverSingle(input, { value: budget, gasLimit: 3000000 })
.then((x) => x.wait())
const seqs = parseSequencesFromLogEth(
rx,
wh.tryNativeToHexString(
this.pluginConfig.supportedChains.get(chainId)!.coreContract?.address!,
"ethereum"
)
)
seqs.forEach(async (seq) => {
const resp = await wh.getSignedVAAWithRetry(
["https://wormhole-v2-testnet-api.certus.one"],
chainId,
wh.tryNativeToHexString(coreRelayer.address, "ethereum"),
seq.toString(),
undefined,
undefined,
10
)
console.log(resp)
console.log("forward vaa: ", wh.parseVaa(resp.vaaBytes))
})
this.logger.info(
`Relayed instruction ${i + 1} of ${
payload.deliveryInstructionsContainer.instructions.length
} to chain ${chainId}`
)
setTimeout(async () => {
message = await MockRelayerIntegration__factory.connect(
this.pluginConfig.supportedChains.get(chainId)!
.mockIntegrationContractAddress,
wallet
).getMessage()
this.logger.info(
`Message is ${message} ${Buffer.from(message, "hex").toString()}`
)
}, 2000)
// delivery receipt will be picked up by listener for bulk redemption later
},
})
}
@ -408,16 +260,13 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
}
parseWorkflowPayload(workflow: Workflow<WorkflowPayload>): WorkflowPayloadParsed {
this.logger.info("parse workflow")
const coreRelayerVaa = parseVaaWithBytes(
dbg(Buffer.from(dbg(workflow.data.coreRelayerVaa), "base64"))
)
this.logger.info("after parse core relayer")
this.logger.info("Parse workflow")
const vaas = workflow.data.vaas.map((s) => Buffer.from(s, "base64"))
const coreRelayerVaa = parseVaaWithBytes(vaas[workflow.data.coreRelayerVaaIndex])
return {
coreRelayerVaa,
otherVaas: workflow.data.otherVaas.map((s) =>
parseVaaWithBytes(Buffer.from(s, "base64"))
),
coreRelayerVaaIndex: workflow.data.coreRelayerVaaIndex,
vaas,
deliveryInstructionsContainer: parseDeliveryInstructionsContainer(
coreRelayerVaa.payload
),
@ -441,6 +290,7 @@ class Definition implements PluginDefinition<GenericRelayerPluginConfig, Plugin>
}
}
// todo: move to sdk
export default new Definition()
import { arrayify } from "ethers/lib/utils"

View File

@ -1,5 +1,5 @@
export class PluginError extends Error {
constructor(msg: string, public args: Record<any, any>) {
constructor(msg: string, public args?: Record<any, any>) {
super(msg)
}
}