diff --git a/relayer_engine/src/plugin/src/plugin.ts b/relayer_engine/src/plugin/src/plugin.ts index 26a9149..fee28c6 100644 --- a/relayer_engine/src/plugin/src/plugin.ts +++ b/relayer_engine/src/plugin/src/plugin.ts @@ -1,27 +1,24 @@ import { ActionExecutor, assertBool, + assertEvmChainId, assertInt, CommonPluginEnv, ContractFilter, - getScopedLogger, ParsedVaaWithBytes, parseVaaWithBytes, Plugin, PluginDefinition, Providers, - sleep, StagingAreaKeyLock, Workflow, } from "@wormhole-foundation/relayer-engine" import * as wh from "@certusone/wormhole-sdk" -import { Logger } from "winston" -import { PluginError } from "./utils" +import { config, Logger } from "winston" +import { convertAddressBytesToHex, PluginError } from "./utils" import { SignedVaa } from "@certusone/wormhole-sdk" import { - IWormhole__factory, RelayProvider__factory, - LogMessagePublishedEvent, IDelivery, DeliveryInstructionsContainer, parseDeliveryInstructionsContainer, @@ -31,9 +28,8 @@ import { RedeliveryByTxHashInstruction, } from "../../../pkgs/sdk/src" import * as ethers from "ethers" -import { Implementation__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts" -import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-transport" -import { retryAsyncUntilDefined } from "ts-retry/lib/cjs/retry" +import * as vaaFetching from "./vaaFetching" +import * as syntheticBatch from "./syntheticBatch" let PLUGIN_NAME: string = "GenericRelayerPlugin" @@ -71,31 +67,6 @@ interface WorkflowPayloadParsed { vaas: Buffer[] } -/* - * DB types - */ - -const PENDING = "pending" -interface Pending { - startTime: string - numTimesRetried: number - hash: string - nextRetryTime: string -} - -interface Resolved { - hash: string -} - -interface Entry { - chainId: number - deliveryVaaIdx: number - vaas: { emitter: string; sequence: string; bytes: string }[] - allFetched: boolean - // only present for Redeliveries - redeliveryVaa?: string -} - export class GenericRelayerPlugin implements Plugin { readonly shouldSpy: boolean readonly shouldRest: boolean @@ -114,7 +85,7 @@ export class GenericRelayerPlugin implements Plugin { } async afterSetup( - providers: Providers, + _providers: Providers, listenerResources?: { eventSource: (event: SignedVaa) => Promise db: StagingAreaKeyLock @@ -129,116 +100,15 @@ export class GenericRelayerPlugin implements Plugin { } if (listenerResources) { - setTimeout( - () => this.fetchVaaWorker(listenerResources.eventSource, listenerResources.db), - 0 + vaaFetching.fetchVaaWorker( + listenerResources.eventSource, + listenerResources.db, + this.logger, + this.engineConfig ) } } - async fetchVaaWorker( - eventSource: (event: SignedVaa) => Promise, - db: StagingAreaKeyLock - ): Promise { - const logger = getScopedLogger(["fetchWorker"], this.logger) - logger.debug(`Started fetchVaaWorker`) - while (true) { - await sleep(3_000) // todo: make configurable - - // track which delivery vaa hashes have all vaas ready this iteration - let newlyResolved = new Map() - await db.withKey([PENDING], async (kv: { [PENDING]?: Pending[] }, tx) => { - // if objects have not been created, initialize - if (!kv.pending) { - kv.pending = [] - } - logger.debug(`Pending: ${JSON.stringify(kv.pending, undefined, 4)}`) - - // filter to the pending items that are due to be retried - const entriesToFetch = kv.pending.filter( - (delivery) => - new Date(JSON.parse(delivery.nextRetryTime)).getTime() < Date.now() - ) - if (entriesToFetch.length === 0) { - return { newKV: kv, val: undefined } - } - - logger.info(`Attempting to fetch ${entriesToFetch.length} entries`) - await db.withKey( - // get `Entry`s for each hash - entriesToFetch.map((d) => d.hash), - async (kv: Record) => { - const promises = Object.entries(kv).map(async ([hash, entry]) => { - if (entry.allFetched) { - // nothing to do - logger.warn("Entry in pending but nothing to fetch " + hash) - return [hash, entry] - } - const newEntry: Entry = await this.fetchEntry(hash, entry, logger) - if (newEntry.allFetched) { - newlyResolved.set(hash, newEntry) - } - return [hash, newEntry] - }) - - const newKV = Object.fromEntries(await Promise.all(promises)) - return { newKV, val: undefined } - }, - tx - ) - - kv.pending = kv.pending.filter((p) => !newlyResolved.has(p.hash)) - return { newKV: kv, val: undefined } - }) - // kick off an engine listener event for each resolved delivery vaa - for (const entry of newlyResolved.values()) { - this.logger.info("Kicking off engine listener event for resolved entry") - if (entry.redeliveryVaa) { - eventSource(Buffer.from(entry.redeliveryVaa, "base64")) - } else { - eventSource(Buffer.from(entry.vaas[entry.deliveryVaaIdx].bytes, "base64")) - } - } - } - } - - async fetchEntry(hash: string, value: Entry, logger: Logger): Promise { - // track if there are missing vaas after trying to fetch - this.logger.info("Fetching entry...", { hash }) - let hasMissingVaas = false - const vaas = await Promise.all( - value.vaas.map(async ({ emitter, sequence, bytes }, idx) => { - // skip if vaa has already been fetched - if (bytes.length !== 0) { - return { emitter, sequence, bytes } - } - try { - // try to fetch vaa from guardian rpc - const resp = await wh.getSignedVAA( - this.engineConfig.wormholeRpc, - value.chainId as wh.EVMChainId, - emitter, - sequence, - { transport: grpcWebNodeHttpTransport.NodeHttpTransport() } - ) - logger.info(`Fetched vaa ${idx} for delivery ${hash}`) - return { - emitter, - sequence, - // base64 encode - bytes: Buffer.from(resp.vaaBytes).toString("base64"), - } - } catch (e) { - hasMissingVaas = true - this.logger.debug(e) - return { emitter, sequence, bytes: "" } - } - }) - ) - // if all vaas have been fetched, mark this hash as resolved - return { ...value, vaas, allFetched: !hasMissingVaas } - } - // listen to core relayer contract on each chain getFilters(): ContractFilter[] { return Array.from(this.pluginConfig.supportedChains.entries()).map( @@ -251,18 +121,18 @@ export class GenericRelayerPlugin implements Plugin { db: StagingAreaKeyLock, providers: Providers ): Promise<{ workflowData: WorkflowPayload } | undefined> { + const hash = coreRelayerVaa.hash.toString("base64") this.logger.debug( `Consuming event from chain ${ coreRelayerVaa.emitterChain - } with seq ${coreRelayerVaa.sequence.toString()} and hash ${Buffer.from( - coreRelayerVaa.hash - ).toString("base64")}` + } with seq ${coreRelayerVaa.sequence.toString()} and hash ${hash}` ) // Kick off workflow if entry has already been fetched const payloadId = parsePayloadType(coreRelayerVaa.payload) - const hash = coreRelayerVaa.hash.toString("base64") - const { [hash]: fetched } = await db.getKeys>([hash]) + const { [hash]: fetched } = await db.getKeys< + Record + >([hash]) if (fetched?.allFetched) { // if all vaas have been fetched, kick off workflow this.logger.info(`All fetched, queueing workflow for ${hash}...`) @@ -284,48 +154,6 @@ export class GenericRelayerPlugin implements Plugin { } } - async consumeRedeliveryEvent( - redeliveryVaa: ParsedVaaWithBytes, - db: StagingAreaKeyLock, - providers: Providers - ): Promise<{ workflowData: WorkflowPayload } | undefined> { - const redeliveryInstruction = parseRedeliveryByTxHashInstruction( - redeliveryVaa.payload - ) - const chainId = redeliveryInstruction.sourceChain as wh.EVMChainId - const provider = providers.evm[chainId] - const config = this.pluginConfig.supportedChains.get(chainId)! - const rx = await provider.getTransactionReceipt( - ethers.utils.hexlify(redeliveryInstruction.sourceTxHash, { - allowMissingPrefix: true, - }) - ) - const { vaas, deliveryVaaIdx } = this.filterLogs( - rx, - chainId, - wh.tryNativeToHexString(config.relayerAddress, "ethereum"), - redeliveryInstruction.sourceNonce.toNumber() - ) - - // create entry and pending in db - const newEntry: Entry = { - vaas, - chainId, - deliveryVaaIdx, - redeliveryVaa: redeliveryVaa.bytes.toString("base64"), - allFetched: false, - } - - this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`) - await this.addEntryToPendingQueue( - Buffer.from(redeliveryVaa.hash).toString("base64"), - newEntry, - db - ) - - return {} as any - } - async consumeDeliveryEvent( coreRelayerVaa: ParsedVaaWithBytes, db: StagingAreaKeyLock, @@ -336,172 +164,96 @@ export class GenericRelayerPlugin implements Plugin { `Not fetched, fetching receipt and filtering to synthetic batch for ${hash}...` ) const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId - const provider = providers.evm[chainId] - const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId, provider) - - const emitter = wh.tryNativeToHexString( - wh.tryUint8ArrayToNative(coreRelayerVaa.emitterAddress, "ethereum"), - "ethereum" - ) - const { vaas, deliveryVaaIdx } = this.filterLogs( - rx, + const rx = await syntheticBatch.fetchReceipt( + coreRelayerVaa.sequence, chainId, - emitter, - coreRelayerVaa.nonce + providers.evm[chainId], + this.pluginConfig.supportedChains.get(chainId)! + ) + + const chainConfig = this.pluginConfig.supportedChains.get(chainId)! + const { vaas, deliveryVaaIdx } = syntheticBatch.filterLogs( + rx, + coreRelayerVaa.nonce, + chainConfig, + this.logger ) vaas[deliveryVaaIdx].bytes = coreRelayerVaa.bytes.toString("base64") // create entry and pending in db - const newEntry: Entry = { + const newEntry: vaaFetching.SyntheticBatchEntry = { vaas, chainId, deliveryVaaIdx, allFetched: false, } + return await this.addWorkflowOrQueueEntryForFetching(db, hash, newEntry) + } - const maybeResolvedEntry = await this.fetchEntry(hash, newEntry, this.logger) - if (maybeResolvedEntry.allFetched) { + async consumeRedeliveryEvent( + redeliveryVaa: ParsedVaaWithBytes, + db: StagingAreaKeyLock, + providers: Providers + ): Promise<{ workflowData: WorkflowPayload } | undefined> { + const redeliveryInstruction = parseRedeliveryByTxHashInstruction( + redeliveryVaa.payload + ) + const chainId = redeliveryInstruction.sourceChain as wh.EVMChainId + const provider = providers.evm[chainId] + const rx = await provider.getTransactionReceipt( + ethers.utils.hexlify(redeliveryInstruction.sourceTxHash, { + allowMissingPrefix: true, + }) + ) + const chainConfig = this.pluginConfig.supportedChains.get(chainId)! + const { vaas, deliveryVaaIdx } = syntheticBatch.filterLogs( + rx, + redeliveryInstruction.sourceNonce.toNumber(), + chainConfig, + this.logger + ) + + // create entry and pending in db + const newEntry: vaaFetching.SyntheticBatchEntry = { + vaas, + chainId, + deliveryVaaIdx, + redeliveryVaa: redeliveryVaa.bytes.toString("base64"), + allFetched: false, + } + const hash = Buffer.from(redeliveryVaa.hash).toString("base64") + return this.addWorkflowOrQueueEntryForFetching(db, hash, newEntry) + } + + async addWorkflowOrQueueEntryForFetching( + db: StagingAreaKeyLock, + hash: string, + entry: vaaFetching.SyntheticBatchEntry + ): Promise<{ workflowData: WorkflowPayload } | undefined> { + const resolvedEntry = await vaaFetching.fetchEntry( + hash, + entry, + this.logger, + this.engineConfig + ) + if (resolvedEntry.allFetched) { this.logger.info("Resolved entry immediately") return { workflowData: { - payloadId: RelayerPayloadId.Delivery, - deliveryVaaIndex: maybeResolvedEntry.deliveryVaaIdx, - vaas: maybeResolvedEntry.vaas.map((v) => v.bytes), + payloadId: entry.redeliveryVaa + ? RelayerPayloadId.Redelivery + : RelayerPayloadId.Delivery, + deliveryVaaIndex: resolvedEntry.deliveryVaaIdx, + vaas: resolvedEntry.vaas.map((v) => v.bytes), + redeliveryVaa: resolvedEntry.redeliveryVaa, }, } } - this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`) - // todo: retry if withKey throws (possible contention with worker process) - await this.addEntryToPendingQueue(hash, newEntry, db) - - // do not create workflow until we have collected all VAAs + await vaaFetching.addEntryToPendingQueue(hash, entry, db) return } - async addEntryToPendingQueue(hash: string, newEntry: Entry, db: StagingAreaKeyLock) { - await retryAsyncUntilDefined(async () => { - try { - return db.withKey( - [hash, PENDING], - // note _hash is actually the value of the variable `hash`, but ts will not - // let this be expressed - async (kv: { [PENDING]: Pending[]; _hash: Entry }) => { - // @ts-ignore - let oldEntry: Entry | null = kv[hash] - if (oldEntry?.allFetched) { - return { newKV: kv, val: true } - } - if (kv[PENDING].findIndex((e) => e.hash === hash) !== -1) { - return { newKV: kv, val: true } - } - - const now = Date.now().toString() - kv.pending.push({ - nextRetryTime: now, - numTimesRetried: 0, - startTime: now, - hash, - }) - // @ts-ignore - kv[hash] = newEntry - return { newKV: kv, val: true } - } - ) - } catch (e) { - this.logger.warn(e) - } - }) - } - - // fetch the contract transaction receipt for the given sequence number emitted by the core relayer contract - async fetchReceipt( - sequence: BigInt, - chainId: wh.EVMChainId, - provider: ethers.providers.Provider - ): Promise { - const config = this.pluginConfig.supportedChains.get(chainId)! - const coreWHContract = IWormhole__factory.connect(config.coreContract!, provider) - const filter = coreWHContract.filters.LogMessagePublished(config.relayerAddress) - const blockNumber = await coreWHContract.provider.getBlockNumber() - for (let i = 0; i < 20; ++i) { - let paginatedLogs - if (i === 0) { - paginatedLogs = await coreWHContract.queryFilter(filter, -30) - } else { - paginatedLogs = await coreWHContract.queryFilter( - filter, - blockNumber - (i + 1) * 20, - blockNumber - i * 20 - ) - } - const log = paginatedLogs.find( - (log) => log.args.sequence.toString() === sequence.toString() - ) - if (log) { - return log.getTransactionReceipt() - } - } - try { - return retryAsyncUntilDefined( - async () => { - const paginatedLogs = await coreWHContract.queryFilter(filter, -50) - const log = paginatedLogs.find( - (log) => log.args.sequence.toString() === sequence.toString() - ) - if (log) { - return log.getTransactionReceipt() - } - return undefined - }, - { maxTry: 10, delay: 500 } - ) - } catch { - throw new PluginError("Could not find contract receipt", { sequence, chainId }) - } - } - - filterLogs( - rx: ethers.ContractReceipt, - chainId: wh.EVMChainId, - emitterAddress: string, //hex - nonce: number - ): { - vaas: { - sequence: string - emitter: string - bytes: string - }[] - deliveryVaaIdx: number - } { - const onlyVAALogs = rx.logs.filter( - (log) => - log.address === this.pluginConfig.supportedChains.get(chainId)?.coreContract - ) - const vaas = onlyVAALogs.flatMap((bridgeLog: ethers.providers.Log) => { - const iface = Implementation__factory.createInterface() - const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent - // filter down to just synthetic batch - if (log.args.nonce !== nonce) { - return [] - } - return [ - { - sequence: log.args.sequence.toString(), - emitter: wh.tryNativeToHexString(log.args.sender, "ethereum"), - bytes: "", - }, - ] - }) - this.logger.debug(vaas) - const deliveryVaaIdx = vaas.findIndex((vaa) => vaa.emitter === emitterAddress) - if (deliveryVaaIdx === -1) { - throw new PluginError("CoreRelayerVaa not found in fetched vaas", { - vaas, - }) - } - return { vaas, deliveryVaaIdx } - } async handleWorkflow( workflow: Workflow, _providers: Providers, @@ -509,8 +261,6 @@ export class GenericRelayerPlugin implements Plugin { ): Promise { this.logger.info("Got workflow") this.logger.info(JSON.stringify(workflow, undefined, 2)) - this.logger.info(workflow.data.deliveryVaaIndex) - this.logger.info(workflow.data.vaas) const payload = this.parseWorkflowPayload(workflow) switch (payload.payloadId) { case RelayerPayloadId.Delivery: @@ -533,7 +283,6 @@ export class GenericRelayerPlugin implements Plugin { const chainId = assertEvmChainId(ix.targetChain) const budget = ix.receiverValueTarget.add(ix.maximumRefundTarget).add(100) - // todo: consider parallelizing this await execute.onEVM({ chainId, f: async ({ wallet }) => { @@ -576,7 +325,6 @@ export class GenericRelayerPlugin implements Plugin { ): Promise { const redelivery = payload.redelivery! const chainId = assertEvmChainId(redelivery.ix.targetChain) - // todo: consider parallelizing this await execute.onEVM({ chainId, f: async ({ wallet }) => { @@ -677,14 +425,4 @@ class Definition implements PluginDefinition } } -function assertEvmChainId(chainId: number): wh.EVMChainId { - if (!wh.isEVMChain(chainId as wh.ChainId)) { - throw new PluginError("Expected wh evm chainId for target chain", { - chainId, - }) - } - return chainId as wh.EVMChainId -} - -// todo: move to sdk export default new Definition() diff --git a/relayer_engine/src/plugin/src/syntheticBatch.ts b/relayer_engine/src/plugin/src/syntheticBatch.ts new file mode 100644 index 0000000..49caba4 --- /dev/null +++ b/relayer_engine/src/plugin/src/syntheticBatch.ts @@ -0,0 +1,96 @@ +import * as wh from "@certusone/wormhole-sdk" +import { PluginError } from "./utils" +import { IWormhole__factory, LogMessagePublishedEvent } from "../../../pkgs/sdk/src" +import * as ethers from "ethers" +import { Implementation__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts" +import { retryAsyncUntilDefined } from "ts-retry/lib/cjs/retry" +import { ChainInfo } from "./plugin" +import { ScopedLogger } from "@wormhole-foundation/relayer-engine/relayer-engine/lib/helpers/logHelper" +import { tryNativeToHexString } from "@certusone/wormhole-sdk" + +// fetch the contract transaction receipt for the given sequence number emitted by the core relayer contract +export async function fetchReceipt( + sequence: BigInt, + chainId: wh.EVMChainId, + provider: ethers.providers.Provider, + chainConfig: ChainInfo +): Promise { + const coreWHContract = IWormhole__factory.connect(chainConfig.coreContract!, provider) + const filter = coreWHContract.filters.LogMessagePublished(chainConfig.relayerAddress) + const blockNumber = await coreWHContract.provider.getBlockNumber() + for (let i = 0; i < 20; ++i) { + let paginatedLogs + if (i === 0) { + paginatedLogs = await coreWHContract.queryFilter(filter, -30) + } else { + paginatedLogs = await coreWHContract.queryFilter( + filter, + blockNumber - (i + 1) * 20, + blockNumber - i * 20 + ) + } + const log = paginatedLogs.find( + (log) => log.args.sequence.toString() === sequence.toString() + ) + if (log) { + return await log.getTransactionReceipt() + } + } + try { + return await retryAsyncUntilDefined( + async () => { + const paginatedLogs = await coreWHContract.queryFilter(filter, -50) + const log = paginatedLogs.find( + (log) => log.args.sequence.toString() === sequence.toString() + ) + if (log) { + return await log.getTransactionReceipt() + } + return undefined + }, + { maxTry: 10, delay: 500 } + ) + } catch { + throw new PluginError("Could not find contract receipt", { sequence, chainId }) + } +} + +export function filterLogs( + rx: ethers.ContractReceipt, + nonce: number, + chainConfig: ChainInfo, + logger: ScopedLogger +): { + vaas: { + sequence: string + emitter: string + bytes: string + }[] + deliveryVaaIdx: number +} { + const onlyVAALogs = rx.logs.filter((log) => log.address === chainConfig.coreContract) + const vaas = onlyVAALogs.flatMap((bridgeLog: ethers.providers.Log) => { + const iface = Implementation__factory.createInterface() + const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent + // filter down to just synthetic batch + if (log.args.nonce !== nonce) { + return [] + } + return [ + { + sequence: log.args.sequence.toString(), + emitter: wh.tryNativeToHexString(log.args.sender, "ethereum"), + bytes: "", + }, + ] + }) + logger.debug(vaas) + const emitterAddress = tryNativeToHexString(chainConfig.relayerAddress, "ethereum") + const deliveryVaaIdx = vaas.findIndex((vaa) => vaa.emitter === emitterAddress) + if (deliveryVaaIdx === -1) { + throw new PluginError("CoreRelayerVaa not found in fetched vaas", { + vaas, + }) + } + return { vaas, deliveryVaaIdx } +} diff --git a/relayer_engine/src/plugin/src/utils.ts b/relayer_engine/src/plugin/src/utils.ts index 0b486b6..cb6880f 100644 --- a/relayer_engine/src/plugin/src/utils.ts +++ b/relayer_engine/src/plugin/src/utils.ts @@ -1,5 +1,17 @@ +import { + ChainId, + EVMChainId, + isEVMChain, + tryNativeToHexString, + tryUint8ArrayToNative, +} from "@certusone/wormhole-sdk" + export class PluginError extends Error { constructor(msg: string, public args?: Record) { super(msg) } } + +export function convertAddressBytesToHex(bytes: Uint8Array | Buffer): string { + return tryNativeToHexString(tryUint8ArrayToNative(bytes, "ethereum"), "ethereum") +} diff --git a/relayer_engine/src/plugin/src/vaaFetching.ts b/relayer_engine/src/plugin/src/vaaFetching.ts new file mode 100644 index 0000000..901c820 --- /dev/null +++ b/relayer_engine/src/plugin/src/vaaFetching.ts @@ -0,0 +1,193 @@ +import { redeemOnXpla, SignedVaa } from "@certusone/wormhole-sdk" +import { + StagingAreaKeyLock, + getScopedLogger, + sleep, + second, +} from "@wormhole-foundation/relayer-engine" +import { ScopedLogger } from "@wormhole-foundation/relayer-engine/relayer-engine/lib/helpers/logHelper" +import { logger } from "ethers" +import { retryAsyncUntilDefined } from "ts-retry/lib/cjs/retry" +import { Logger } from "winston" +import * as wh from "@certusone/wormhole-sdk" +import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-transport" + +/* + * DB types + */ + +export const PENDING = "pending" +export interface Pending { + startTime: string + numTimesRetried: number + hash: string + nextRetryTime: string +} + +export interface SyntheticBatchEntry { + chainId: number + deliveryVaaIdx: number + vaas: { emitter: string; sequence: string; bytes: string }[] + allFetched: boolean + // only present for Redeliveries + redeliveryVaa?: string +} + +export async function fetchVaaWorker( + eventSource: (event: SignedVaa) => Promise, + db: StagingAreaKeyLock, + parentLogger: ScopedLogger, + engineConfig: { wormholeRpc: string } +): Promise { + const logger = getScopedLogger(["fetchWorker"], parentLogger) + logger.info(`Started fetchVaaWorker`) + while (true) { + await sleep(3_000) // todo: make configurable + + // track which delivery vaa hashes have all vaas ready this iteration + let newlyResolved = new Map() + await db.withKey([PENDING], async (kv: { [PENDING]?: Pending[] }, tx) => { + // if objects have not been created, initialize + if (!kv.pending) { + kv.pending = [] + } + logger.debug(`Pending: ${JSON.stringify(kv.pending, undefined, 4)}`) + + // filter to the pending items that are due to be retried + const entriesToFetch = kv.pending.filter( + (delivery) => new Date(JSON.parse(delivery.nextRetryTime)).getTime() < Date.now() + ) + if (entriesToFetch.length === 0) { + return { newKV: kv, val: undefined } + } + + logger.info(`Attempting to fetch ${entriesToFetch.length} entries`) + await db.withKey( + // get `SyntheticBatchEntry`s for each hash + entriesToFetch.map((d) => d.hash), + async (kv: Record) => { + const promises = Object.entries(kv).map(async ([hash, entry]) => { + if (entry.allFetched) { + // nothing to do + logger.warn("Entry in pending but nothing to fetch " + hash) + return [hash, entry] + } + const newEntry: SyntheticBatchEntry = await fetchEntry( + hash, + entry, + logger, + engineConfig + ) + if (newEntry.allFetched) { + newlyResolved.set(hash, newEntry) + } + return [hash, newEntry] + }) + + const newKV = Object.fromEntries(await Promise.all(promises)) + return { newKV, val: undefined } + }, + tx + ) + + kv[PENDING] = kv[PENDING].filter((p) => !newlyResolved.has(p.hash)).map((x) => ({ + ...x, + numTimesRetried: x.numTimesRetried + 1, + nextRetryTime: new Date(Date.now() + second * x.numTimesRetried).toString(), + })) + return { newKV: kv, val: undefined } + }) + // kick off an engine listener event for each resolved delivery vaa + for (const entry of newlyResolved.values()) { + logger.info("Kicking off engine listener event for resolved entry") + if (entry.redeliveryVaa) { + eventSource(Buffer.from(entry.redeliveryVaa, "base64")) + } else { + eventSource(Buffer.from(entry.vaas[entry.deliveryVaaIdx].bytes, "base64")) + } + } + } +} + +export async function fetchEntry( + hash: string, + value: SyntheticBatchEntry, + logger: Logger, + engineConfig: { wormholeRpc: string } +): Promise { + logger.info("Fetching SyntheticBatchEntry...", { hash }) + // track if there are missing vaas after trying to fetch + let hasMissingVaas = false + + // for each entry, attempt to fetch vaas from wormhole rpc + const vaas = await Promise.all( + value.vaas.map(async ({ emitter, sequence, bytes }, idx) => { + // skip if vaa has already been fetched + if (bytes.length !== 0) { + return { emitter, sequence, bytes } + } + try { + // try to fetch vaa from rpc + const resp = await wh.getSignedVAA( + engineConfig.wormholeRpc, + value.chainId as wh.EVMChainId, + emitter, + sequence, + { transport: grpcWebNodeHttpTransport.NodeHttpTransport() } + ) + logger.info(`Fetched vaa ${idx} for delivery ${hash}`) + return { + emitter, + sequence, + // base64 encode + bytes: Buffer.from(resp.vaaBytes).toString("base64"), + } + } catch (e) { + hasMissingVaas = true + logger.debug(e) + return { emitter, sequence, bytes: "" } + } + }) + ) + // if all vaas have been fetched, mark this hash as resolved + return { ...value, vaas, allFetched: !hasMissingVaas } +} + +export async function addEntryToPendingQueue( + hash: string, + newEntry: SyntheticBatchEntry, + db: StagingAreaKeyLock +) { + await retryAsyncUntilDefined(async () => { + try { + return db.withKey( + [hash, PENDING], + // note _hash is actually the value of the variable `hash`, but ts will not + // let this be expressed + async (kv: { [PENDING]: Pending[]; _hash: SyntheticBatchEntry }) => { + // @ts-ignore + let oldEntry: SyntheticBatchEntry | null = kv[hash] + if (oldEntry?.allFetched) { + return { newKV: kv, val: true } + } + if (kv[PENDING].findIndex((e) => e.hash === hash) !== -1) { + return { newKV: kv, val: true } + } + + const now = Date.now().toString() + kv.pending.push({ + nextRetryTime: now, + numTimesRetried: 0, + startTime: now, + hash, + }) + // @ts-ignore + kv[hash] = newEntry + return { newKV: kv, val: true } + } + ) + } catch (e) { + logger.warn(e) + } + }) +}