vaa fetching background job
This commit is contained in:
parent
121ad6d84b
commit
704d080075
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"mode": "BOTH",
|
||||
"logLevel": "info",
|
||||
"logLevel": "debug",
|
||||
"readinessPort": 2000,
|
||||
"numGuardians": 1,
|
||||
"supportedChains": [
|
||||
|
|
|
@ -10,8 +10,10 @@
|
|||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@certusone/wormhole-sdk": "^0.9.6",
|
||||
"@improbable-eng/grpc-web-node-http-transport": "^0.15.0",
|
||||
"@wormhole-foundation/relayer-engine": "file:../../relayer-engine",
|
||||
"lodash": "^4.17.21"
|
||||
"lodash": "^4.17.21",
|
||||
"ts-retry": "^4.1.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"nodemon": "^2.0.20",
|
||||
|
@ -4653,6 +4655,11 @@
|
|||
"node": ">=0.4.0"
|
||||
}
|
||||
},
|
||||
"node_modules/ts-retry": {
|
||||
"version": "4.1.1",
|
||||
"resolved": "https://registry.npmjs.org/ts-retry/-/ts-retry-4.1.1.tgz",
|
||||
"integrity": "sha512-AXeVdVUMBdCH/wSXtjpXqZv9FC1ECWqX7VAmSImvEM09SCOYoMh4mb39t1Q49nCRpu2S1WVA+2oNQTkhjMOuZQ=="
|
||||
},
|
||||
"node_modules/tslib": {
|
||||
"version": "2.4.1",
|
||||
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.1.tgz",
|
||||
|
@ -8556,6 +8563,11 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
"ts-retry": {
|
||||
"version": "4.1.1",
|
||||
"resolved": "https://registry.npmjs.org/ts-retry/-/ts-retry-4.1.1.tgz",
|
||||
"integrity": "sha512-AXeVdVUMBdCH/wSXtjpXqZv9FC1ECWqX7VAmSImvEM09SCOYoMh4mb39t1Q49nCRpu2S1WVA+2oNQTkhjMOuZQ=="
|
||||
},
|
||||
"tslib": {
|
||||
"version": "2.4.1",
|
||||
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.1.tgz",
|
||||
|
|
|
@ -16,8 +16,10 @@
|
|||
},
|
||||
"dependencies": {
|
||||
"@certusone/wormhole-sdk": "^0.9.6",
|
||||
"@improbable-eng/grpc-web-node-http-transport": "^0.15.0",
|
||||
"@wormhole-foundation/relayer-engine": "file:../../relayer-engine",
|
||||
"lodash": "^4.17.21"
|
||||
"lodash": "^4.17.21",
|
||||
"ts-retry": "^4.1.1"
|
||||
},
|
||||
"author": "Chase Moran",
|
||||
"license": "ISC",
|
||||
|
|
|
@ -5,24 +5,30 @@ import {
|
|||
CommonPluginEnv,
|
||||
ContractFilter,
|
||||
dbg,
|
||||
getScopedLogger,
|
||||
ParsedVaaWithBytes,
|
||||
parseVaaWithBytes,
|
||||
Plugin,
|
||||
PluginDefinition,
|
||||
Providers,
|
||||
sleep,
|
||||
StagingAreaKeyLock,
|
||||
Workflow,
|
||||
} from "@wormhole-foundation/relayer-engine"
|
||||
import * as wh from "@certusone/wormhole-sdk"
|
||||
import { Logger } from "winston"
|
||||
import { PluginError } from "./utils"
|
||||
import { parseSequencesFromLogEth, SignedVaa } from "@certusone/wormhole-sdk"
|
||||
import { logNearGas, parseSequencesFromLogEth, SignedVaa } from "@certusone/wormhole-sdk"
|
||||
import { CoreRelayer__factory, IWormhole, IWormhole__factory } from "../../../../sdk/src"
|
||||
import * as ethers from "ethers"
|
||||
import { Implementation__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts"
|
||||
import { LogMessagePublishedEvent } from "../../../../sdk/src/ethers-contracts/IWormhole"
|
||||
import { CoreRelayerStructs } from "../../../../sdk/src/ethers-contracts/CoreRelayer"
|
||||
import * as _ from "lodash"
|
||||
import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-transport"
|
||||
import { retryAsync } from "ts-retry"
|
||||
|
||||
const wormholeRpc = "https://wormhole-v2-testnet-api.certus.one"
|
||||
|
||||
let PLUGIN_NAME: string = "GenericRelayerPlugin"
|
||||
|
||||
|
@ -51,6 +57,28 @@ interface WorkflowPayloadParsed {
|
|||
vaas: Buffer[]
|
||||
}
|
||||
|
||||
/*
|
||||
* DB types
|
||||
*/
|
||||
|
||||
interface Pending {
|
||||
startTime: string
|
||||
numTimesRetried: number
|
||||
hash: string
|
||||
nextRetryTime: string
|
||||
}
|
||||
|
||||
interface Resolved {
|
||||
hash: string
|
||||
}
|
||||
|
||||
interface Entry {
|
||||
chainId: number
|
||||
deliveryVaaIdx: number
|
||||
vaas: { emitter: string; sequence: string; bytes: string }[]
|
||||
allFetched: boolean
|
||||
}
|
||||
|
||||
export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
||||
readonly shouldSpy: boolean
|
||||
readonly shouldRest: boolean
|
||||
|
@ -70,7 +98,10 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
|||
|
||||
async afterSetup(
|
||||
providers: Providers,
|
||||
_eventSource?: (event: SignedVaa) => Promise<void>
|
||||
listenerResources?: {
|
||||
eventSource: (event: SignedVaa) => Promise<void>
|
||||
db: StagingAreaKeyLock
|
||||
}
|
||||
) {
|
||||
// connect to the core wh contract for each chain
|
||||
for (const [chainId, info] of this.pluginConfig.supportedChains.entries()) {
|
||||
|
@ -85,6 +116,137 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
|||
providers.evm[chainId as wh.EVMChainId]
|
||||
)
|
||||
}
|
||||
if (listenerResources) {
|
||||
setTimeout(
|
||||
() => this.fetchVaaWorker(listenerResources.eventSource, listenerResources.db),
|
||||
0
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fetchVaaWorker(
|
||||
eventSource: (event: SignedVaa) => Promise<void>,
|
||||
db: StagingAreaKeyLock
|
||||
): Promise<void> {
|
||||
const logger = getScopedLogger(["fetchWorker"], this.logger)
|
||||
logger.debug(`Started fetchVaaWorker`)
|
||||
while (true) {
|
||||
await sleep(3_000) // todo: make configurable
|
||||
|
||||
// track which delivery vaa hashes have all vaas ready this iteration
|
||||
let newlyResolved = new Map<string, SignedVaa>()
|
||||
await db.withKey(
|
||||
["pending", "resolved"],
|
||||
async (kv: { resolved?: Resolved[]; pending?: Pending[] }) => {
|
||||
// if objects have not been crearted, initialize
|
||||
if (!kv.pending) {
|
||||
kv.pending = []
|
||||
}
|
||||
if (!kv.resolved) {
|
||||
kv.resolved = []
|
||||
}
|
||||
console.log("")
|
||||
logger.debug(`Pending: ${JSON.stringify(kv.pending, undefined, 4)}`)
|
||||
logger.debug(`Resolved: ${JSON.stringify(kv.resolved, undefined, 4)}`)
|
||||
|
||||
// filter to the pending items that are due to be retried
|
||||
const entriesToFetch = kv.pending.filter(
|
||||
(delivery) =>
|
||||
dbg(new Date(JSON.parse(dbg(delivery.nextRetryTime))).getTime()) <
|
||||
dbg(Date.now())
|
||||
)
|
||||
if (entriesToFetch.length === 0) {
|
||||
logger.debug("no entries to fetch")
|
||||
return { newKV: kv, val: undefined }
|
||||
}
|
||||
logger.info(`Attempting to fetch ${entriesToFetch.length} entries`)
|
||||
// try fetching vaas
|
||||
await db.withKey(
|
||||
// get `Entry`s for each hash
|
||||
entriesToFetch.map((d) => d.hash),
|
||||
async (kv: Record<string, Entry>) => {
|
||||
const promises = Object.entries(kv).map(async ([hash, value]) => {
|
||||
// track if there are missing vaas after trying to fetch
|
||||
let hasMissingVaas = false
|
||||
const vaas = await Promise.all(
|
||||
value.vaas.map(async ({ emitter, sequence, bytes }, idx) => {
|
||||
// skip if vaa has already been fetched
|
||||
if (bytes.length !== 0) {
|
||||
return { emitter, sequence, bytes }
|
||||
}
|
||||
try {
|
||||
// try to fetch vaa from guardian rpc
|
||||
const resp = await wh.getSignedVAA(
|
||||
wormholeRpc,
|
||||
value.chainId as wh.EVMChainId,
|
||||
emitter,
|
||||
sequence,
|
||||
{ transport: grpcWebNodeHttpTransport.NodeHttpTransport() }
|
||||
)
|
||||
logger.info(`Fetched vaa ${idx} for delivery ${hash}`)
|
||||
return {
|
||||
emitter,
|
||||
sequence,
|
||||
// base64 encode
|
||||
bytes: Buffer.from(resp.vaaBytes).toString("base64"),
|
||||
}
|
||||
} catch (e) {
|
||||
hasMissingVaas = true
|
||||
this.logger.debug(e)
|
||||
return { emitter, sequence, bytes: "" }
|
||||
}
|
||||
})
|
||||
)
|
||||
// if all vaas have been fetched, mark this hash as resolved
|
||||
if (!hasMissingVaas) {
|
||||
// todo: remove
|
||||
logger.debug(`All fetched for ${hash}`)
|
||||
const deliveryVaa = Buffer.from(
|
||||
value.vaas[value.deliveryVaaIdx].bytes,
|
||||
"base64"
|
||||
)
|
||||
newlyResolved.set(hash, deliveryVaa)
|
||||
} else {
|
||||
const entry = entriesToFetch.find((k) => k.hash === hash)!
|
||||
// todo: remove
|
||||
logger.debug(`Old entry: ${JSON.stringify(entry, undefined, 2)}`)
|
||||
|
||||
entry.numTimesRetried += 1
|
||||
entry.nextRetryTime = (
|
||||
Date.now() +
|
||||
entry.numTimesRetried * 2000
|
||||
).toString()
|
||||
|
||||
// todo: remove
|
||||
logger.debug(`New entry: ${JSON.stringify(entry, undefined, 2)}`)
|
||||
}
|
||||
|
||||
const newEntry: Entry = { ...value, vaas, allFetched: !hasMissingVaas }
|
||||
return [hash, newEntry]
|
||||
})
|
||||
|
||||
const newKV = Object.fromEntries(await Promise.all(promises))
|
||||
return { newKV, val: undefined }
|
||||
}
|
||||
)
|
||||
|
||||
// todo: gc resolved eventually
|
||||
kv.resolved.push(
|
||||
...Array.from(newlyResolved.keys()).map((hash) => ({
|
||||
hash,
|
||||
}))
|
||||
)
|
||||
kv.pending = kv.pending.filter((p) => !newlyResolved.has(p.hash))
|
||||
|
||||
return { newKV: kv, val: undefined }
|
||||
}
|
||||
)
|
||||
// kick off an engine listener event for each resolved delivery vaa
|
||||
for (const deliveryVAA of newlyResolved.values()) {
|
||||
this.logger.info("Kicking off engine listener event for resolved deliveryVAA")
|
||||
eventSource(deliveryVAA)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// listen to core relayer contract on each chain
|
||||
|
@ -96,9 +258,14 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
|||
|
||||
async consumeEvent(
|
||||
coreRelayerVaa: ParsedVaaWithBytes,
|
||||
_stagingArea: StagingAreaKeyLock,
|
||||
db: StagingAreaKeyLock,
|
||||
_providers: Providers
|
||||
): Promise<{ workflowData?: WorkflowPayload }> {
|
||||
this.logger.debug(
|
||||
`Top of consume event ${coreRelayerVaa.sequence.toString()}, hash: ${Buffer.from(
|
||||
coreRelayerVaa.hash
|
||||
).toString("base64")}`
|
||||
)
|
||||
const payloadType = parsePayloadType(coreRelayerVaa.payload)
|
||||
if (payloadType !== RelayerPayloadType.Delivery) {
|
||||
// todo: support redelivery
|
||||
|
@ -108,21 +275,96 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
|||
return {}
|
||||
}
|
||||
|
||||
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
|
||||
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId)
|
||||
const allVAAs = await this.fetchOtherVaas(rx, coreRelayerVaa.nonce, chainId)
|
||||
const coreRelayerVaaIndex = allVAAs.findIndex((vaa) =>
|
||||
vaa.emitterAddress.equals(coreRelayerVaa.emitterAddress)
|
||||
)
|
||||
if (coreRelayerVaaIndex === -1) {
|
||||
throw new PluginError("CoreRelayerVaa not found in fetched vaas", { vaas: allVAAs })
|
||||
}
|
||||
const hash = coreRelayerVaa.hash.toString("base64")
|
||||
const { [hash]: fetched } = await db.getKeys<Record<typeof hash, Entry>>([hash])
|
||||
|
||||
return {
|
||||
workflowData: {
|
||||
coreRelayerVaaIndex,
|
||||
vaas: allVAAs.map((vaa) => vaa.bytes.toString("base64")),
|
||||
},
|
||||
if (fetched?.allFetched) {
|
||||
// if all vaas have been fetched, kick off workflow
|
||||
dbg(fetched, "fetched")
|
||||
return dbg(
|
||||
{
|
||||
workflowData: {
|
||||
coreRelayerVaaIndex: fetched.deliveryVaaIdx,
|
||||
vaas: fetched.vaas.map((v) => v.bytes),
|
||||
},
|
||||
},
|
||||
"workflow from consume event"
|
||||
)
|
||||
} else {
|
||||
this.logger.debug("Not fetched branch")
|
||||
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
|
||||
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId)
|
||||
// parse rx for seqs and emitters
|
||||
|
||||
const onlyVAALogs = rx.logs.filter(
|
||||
(log) =>
|
||||
log.address ===
|
||||
this.pluginConfig.supportedChains.get(chainId)?.coreContract?.address
|
||||
)
|
||||
const vaas = onlyVAALogs.flatMap((bridgeLog: ethers.providers.Log) => {
|
||||
const iface = Implementation__factory.createInterface()
|
||||
const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent
|
||||
// filter down to just synthetic batch
|
||||
if (log.args.nonce !== coreRelayerVaa.nonce) {
|
||||
return []
|
||||
}
|
||||
return [
|
||||
{
|
||||
sequence: log.args.sequence.toString(),
|
||||
emitter: wh.tryNativeToHexString(log.args.sender, "ethereum"),
|
||||
bytes: "",
|
||||
},
|
||||
]
|
||||
})
|
||||
this.logger.debug(vaas)
|
||||
const deliveryVaaIdx = vaas.findIndex(
|
||||
(vaa) =>
|
||||
vaa.emitter ===
|
||||
wh.tryNativeToHexString(
|
||||
wh.tryUint8ArrayToNative(coreRelayerVaa.emitterAddress, "ethereum"),
|
||||
"ethereum"
|
||||
)
|
||||
)
|
||||
if (deliveryVaaIdx === -1) {
|
||||
throw new PluginError("CoreRelayerVaa not found in fetched vaas", {
|
||||
vaas,
|
||||
})
|
||||
}
|
||||
vaas[deliveryVaaIdx].bytes = coreRelayerVaa.bytes.toString("base64")
|
||||
|
||||
// create entry and pending in db
|
||||
const newEntry: Entry = {
|
||||
vaas,
|
||||
chainId,
|
||||
deliveryVaaIdx,
|
||||
allFetched: false,
|
||||
}
|
||||
this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`)
|
||||
await db.withKey(
|
||||
[hash, "pending"],
|
||||
// note _hash is actually the value of the variable `hash`, but ts will not
|
||||
// let this be expressed
|
||||
async (kv: { pending: Pending[]; _hash: Entry }) => {
|
||||
// @ts-ignore
|
||||
let oldEntry: Entry | null = kv[hash]
|
||||
if (oldEntry?.allFetched) {
|
||||
return { newKV: kv, val: undefined }
|
||||
}
|
||||
const now = Date.now().toString()
|
||||
kv.pending.push({
|
||||
nextRetryTime: now,
|
||||
numTimesRetried: 0,
|
||||
startTime: now,
|
||||
hash,
|
||||
})
|
||||
// @ts-ignore
|
||||
kv[hash] = newEntry
|
||||
return { newKV: kv, val: undefined }
|
||||
}
|
||||
)
|
||||
|
||||
// do not create workflow until we have collected all VAAs
|
||||
return {}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,43 +396,23 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
|||
return await log.getTransactionReceipt()
|
||||
}
|
||||
}
|
||||
throw new PluginError("Could not find contract receipt", { sequence, chainId })
|
||||
}
|
||||
|
||||
async fetchOtherVaas(
|
||||
rx: ethers.ContractReceipt,
|
||||
batchId: number, // aka nonce
|
||||
chainId: wh.EVMChainId
|
||||
): Promise<ParsedVaaWithBytes[]> {
|
||||
// collect all vaas
|
||||
// @ts-ignore
|
||||
const onlyVAALogs = rx.logs.filter(
|
||||
(log) =>
|
||||
log.address ===
|
||||
this.pluginConfig.supportedChains.get(chainId)?.coreContract?.address
|
||||
)
|
||||
const vaas: ParsedVaaWithBytes[] = await Promise.all(
|
||||
onlyVAALogs.map(async (bridgeLog: ethers.providers.Log) => {
|
||||
const iface = Implementation__factory.createInterface()
|
||||
const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent
|
||||
const resp = await wh.getSignedVAAWithRetry(
|
||||
["https://wormhole-v2-testnet-api.certus.one"],
|
||||
chainId,
|
||||
wh.tryNativeToHexString(log.args.sender, "ethereum"),
|
||||
log.args.sequence.toString(),
|
||||
undefined,
|
||||
undefined,
|
||||
10
|
||||
)
|
||||
return parseVaaWithBytes(resp.vaaBytes)
|
||||
})
|
||||
)
|
||||
|
||||
if (vaas.length == 0) {
|
||||
// todo: figure out error handling for subscription code
|
||||
this.logger.error("Expected generic relay tx to have >0 VAAs")
|
||||
try {
|
||||
return await retryAsyncUntilDefined(
|
||||
async () => {
|
||||
const paginatedLogs = await coreWHContract.queryFilter(filter, -20)
|
||||
const log = paginatedLogs.find(
|
||||
(log) => log.args.sequence.toString() === sequence.toString()
|
||||
)
|
||||
if (log) {
|
||||
return await log.getTransactionReceipt()
|
||||
}
|
||||
return undefined
|
||||
},
|
||||
{ maxTry: 10, delay: 500 }
|
||||
)
|
||||
} catch {
|
||||
throw new PluginError("Could not find contract receipt", { sequence, chainId })
|
||||
}
|
||||
return vaas.filter((vaa) => vaa.nonce === batchId)
|
||||
}
|
||||
|
||||
async handleWorkflow(
|
||||
|
@ -200,19 +422,23 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
|
|||
): Promise<void> {
|
||||
this.logger.info("Got workflow")
|
||||
this.logger.info(JSON.stringify(workflow, undefined, 2))
|
||||
this.logger.info(workflow.data.coreRelayerVaaIndex)
|
||||
this.logger.info(workflow.data.vaas)
|
||||
console.log("sanity console log")
|
||||
|
||||
const payload = this.parseWorkflowPayload(workflow)
|
||||
for (let i = 0; i < payload.deliveryInstructionsContainer.instructions.length; i++) {
|
||||
const ix = payload.deliveryInstructionsContainer.instructions[i]
|
||||
const budget = ix.applicationBudgetTarget.add(ix.maximumRefundTarget).add(100) // todo: add wormhole fee
|
||||
|
||||
// todo: add wormhole fee
|
||||
const budget = ix.applicationBudgetTarget.add(ix.maximumRefundTarget).add(100)
|
||||
const input: CoreRelayerStructs.TargetDeliveryParametersSingleStruct = {
|
||||
encodedVMs: payload.vaas,
|
||||
deliveryIndex: payload.coreRelayerVaaIndex,
|
||||
multisendIndex: i,
|
||||
}
|
||||
const chainId = ix.targetChain as wh.EVMChainId
|
||||
|
||||
const chainId = ix.targetChain as wh.EVMChainId
|
||||
// todo: consider parallelizing this
|
||||
await execute.onEVM({
|
||||
chainId,
|
||||
|
@ -294,6 +520,7 @@ class Definition implements PluginDefinition<GenericRelayerPluginConfig, Plugin>
|
|||
export default new Definition()
|
||||
|
||||
import { arrayify } from "ethers/lib/utils"
|
||||
import { retryAsyncUntilDefined } from "ts-retry/lib/cjs/retry"
|
||||
|
||||
export enum RelayerPayloadType {
|
||||
Delivery = 1,
|
||||
|
|
Loading…
Reference in New Issue