Test redelivery

This commit is contained in:
Joe Howarth 2023-01-31 13:49:55 -07:00
parent fc06699266
commit 935a6732c1
3 changed files with 188 additions and 123 deletions

View File

@ -123,7 +123,7 @@ async function run() {
}
}
function getChainById(id: number | string): ChainInfo {
export function getChainById(id: number | string): ChainInfo {
id = Number(id)
const chain = chains.find((c) => c.chainId === id)
if (!chain) {

View File

@ -0,0 +1,76 @@
import * as wh from "@certusone/wormhole-sdk"
import {
Implementation__factory,
Migrations,
} from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts"
import { getSignatureSetData } from "@certusone/wormhole-sdk/lib/cjs/solana/wormhole"
import { LogMessagePublishedEvent } from "../../../sdk/src"
import {
ChainInfo,
getCoreRelayer,
getCoreRelayerAddress,
getMockIntegration,
getMockIntegrationAddress,
getRelayProvider,
getRelayProviderAddress,
init,
loadChains,
} from "../helpers/env"
import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-transport"
import { BigNumber } from "ethers"
import { wait } from "../helpers/utils"
init()
const chains = loadChains()
async function run(
sourceChain: ChainInfo,
targetChain: ChainInfo,
nonce: number,
sourceTxHash: string
) {
const coreRelayer = getCoreRelayer(sourceChain)
const relayProvider = coreRelayer.getDefaultRelayProvider()
const relayQuote = await (
await coreRelayer.quoteGasDeliveryFee(targetChain.chainId, 2000000, relayProvider)
).add(10000000000)
const rx = await coreRelayer
.requestRedelivery(
{
sourceChain: sourceChain.chainId,
sourceNonce: nonce,
sourceTxHash: sourceTxHash,
targetChain: targetChain.chainId,
deliveryIndex: 1,
multisendIndex: 0,
newComputeBudget: relayQuote,
newApplicationBudget: BigNumber.from(0),
newRelayParameters: new Uint8Array(),
},
nonce,
relayProvider,
{ value: relayQuote, gasLimit: 1000000 }
)
.then(wait)
console.log(rx)
}
async function main() {
await run(getChainById(6), getChainById(14), 1, process.argv[2])
}
console.log("Start!")
main().then(() => console.log("Done!"))
/* Helpers */
export function getChainById(id: number | string): ChainInfo {
id = Number(id)
const chain = chains.find((c) => c.chainId === id)
if (!chain) {
throw new Error("chainId not found, " + id)
}
return chain
}

View File

@ -79,7 +79,6 @@ interface WorkflowPayloadParsed {
* DB types
*/
const RESOLVED = "resolved"
const PENDING = "pending"
interface Pending {
startTime: string
@ -156,64 +155,48 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
// track which delivery vaa hashes have all vaas ready this iteration
let newlyResolved = new Map<string, Entry>()
await db.withKey(
[PENDING, RESOLVED],
async (kv: { [RESOLVED]?: Resolved[]; [PENDING]?: Pending[] }) => {
// if objects have not been crearted, initialize
if (!kv.pending) {
kv.pending = []
}
if (!kv.resolved) {
kv.resolved = []
}
logger.debug(`Pending: ${JSON.stringify(kv.pending, undefined, 4)}`)
logger.debug(`Resolved: ${JSON.stringify(kv.resolved, undefined, 4)}`)
// filter to the pending items that are due to be retried
const entriesToFetch = kv.pending.filter(
(delivery) =>
new Date(JSON.parse(delivery.nextRetryTime)).getTime() < Date.now()
)
if (entriesToFetch.length === 0) {
return { newKV: kv, val: undefined }
}
logger.info(`Attempting to fetch ${entriesToFetch.length} entries`)
await db.withKey(
// get `Entry`s for each hash
entriesToFetch.map((d) => d.hash),
async (kv: Record<string, Entry>) => {
const promises = Object.entries(kv).map(async ([hash, entry]) => {
if (entry.allFetched) {
// nothing to do
logger.warn("Entry in pending but nothing to fetch " + hash)
return [hash, entry]
}
const newEntry: Entry = await this.fetchEntry(hash, entry, logger)
if (newEntry.allFetched) {
newlyResolved.set(hash, newEntry)
}
return [hash, newEntry]
})
const newKV = Object.fromEntries(await Promise.all(promises))
return { newKV, val: undefined }
}
)
// todo: gc resolved eventually
// todo: currently not used, but the idea is to refire resolved events
// in the case of a restart or smt. Maybe should just remove it for now...
kv.resolved.push(
...Array.from(newlyResolved.keys()).map((hash) => ({
hash,
}))
)
kv.pending = kv.pending.filter((p) => !newlyResolved.has(p.hash))
await db.withKey([PENDING], async (kv: { [PENDING]?: Pending[] }) => {
// if objects have not been crearted, initialize
if (!kv.pending) {
kv.pending = []
}
logger.debug(`Pending: ${JSON.stringify(kv.pending, undefined, 4)}`)
// filter to the pending items that are due to be retried
const entriesToFetch = kv.pending.filter(
(delivery) =>
new Date(JSON.parse(delivery.nextRetryTime)).getTime() < Date.now()
)
if (entriesToFetch.length === 0) {
return { newKV: kv, val: undefined }
}
)
logger.info(`Attempting to fetch ${entriesToFetch.length} entries`)
await db.withKey(
// get `Entry`s for each hash
entriesToFetch.map((d) => d.hash),
async (kv: Record<string, Entry>) => {
const promises = Object.entries(kv).map(async ([hash, entry]) => {
if (entry.allFetched) {
// nothing to do
logger.warn("Entry in pending but nothing to fetch " + hash)
return [hash, entry]
}
const newEntry: Entry = await this.fetchEntry(hash, entry, logger)
if (newEntry.allFetched) {
newlyResolved.set(hash, newEntry)
}
return [hash, newEntry]
})
const newKV = Object.fromEntries(await Promise.all(promises))
return { newKV, val: undefined }
}
)
kv.pending = kv.pending.filter((p) => !newlyResolved.has(p.hash))
return { newKV: kv, val: undefined }
})
// kick off an engine listener event for each resolved delivery vaa
for (const entry of newlyResolved.values()) {
this.logger.info("Kicking off engine listener event for resolved entry")
@ -281,18 +264,36 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
coreRelayerVaa.hash
).toString("base64")}`
)
// Kick off workflow if entry has already been fetched
const payloadId = parsePayloadType(coreRelayerVaa.payload)
const hash = coreRelayerVaa.hash.toString("base64")
const { [hash]: fetched } = await db.getKeys<Record<typeof hash, Entry>>([hash])
if (fetched?.allFetched) {
// if all vaas have been fetched, kick off workflow
this.logger.info(`All fetched, queueing workflow for ${hash}...`)
return {
workflowData: {
payloadId,
deliveryVaaIndex: fetched.deliveryVaaIdx,
vaas: fetched.vaas.map((v) => v.bytes),
redeliveryVaa: fetched.redeliveryVaa,
},
}
}
switch (payloadId) {
case RelayerPayloadId.Delivery:
return this.consumeDeliveryEvent(coreRelayerVaa, db)
return this.consumeDeliveryEvent(coreRelayerVaa, db, hash)
case RelayerPayloadId.Redelivery:
return this.consumeRedeliveryEvent(coreRelayerVaa, db)
return this.consumeRedeliveryEvent(coreRelayerVaa, db, hash)
}
}
async consumeRedeliveryEvent(
redeliveryVaa: ParsedVaaWithBytes,
db: StagingAreaKeyLock
db: StagingAreaKeyLock,
hash: string
): Promise<{ workflowData?: WorkflowPayload }> {
const redeliveryInstruction = parseRedeliveryByTxHashInstruction(
redeliveryVaa.payload
@ -333,70 +334,53 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
async consumeDeliveryEvent(
coreRelayerVaa: ParsedVaaWithBytes,
db: StagingAreaKeyLock
db: StagingAreaKeyLock,
hash: string
): Promise<{ workflowData?: WorkflowPayload }> {
const hash = coreRelayerVaa.hash.toString("base64")
const { [hash]: fetched } = await db.getKeys<Record<typeof hash, Entry>>([hash])
this.logger.info(
`Not fetched, fetching receipt and filtering to synthetic batch for ${hash}...`
)
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId)
if (fetched?.allFetched) {
// if all vaas have been fetched, kick off workflow
this.logger.info(`All fetched, queueing workflow for ${hash}...`)
return dbg(
{
workflowData: {
payloadId: RelayerPayloadId.Delivery,
deliveryVaaIndex: fetched.deliveryVaaIdx,
vaas: fetched.vaas.map((v) => v.bytes),
},
},
"workflow from consume event"
)
} else {
this.logger.info(
`Not fetched, fetching receipt and filtering to synthetic batch for ${hash}...`
)
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId)
const emitter = wh.tryNativeToHexString(
wh.tryUint8ArrayToNative(coreRelayerVaa.emitterAddress, "ethereum"),
"ethereum"
)
const { vaas, deliveryVaaIdx } = this.filterLogs(
rx,
chainId,
emitter,
coreRelayerVaa.nonce
)
vaas[deliveryVaaIdx].bytes = coreRelayerVaa.bytes.toString("base64")
const emitter = wh.tryNativeToHexString(
wh.tryUint8ArrayToNative(coreRelayerVaa.emitterAddress, "ethereum"),
"ethereum"
)
const { vaas, deliveryVaaIdx } = this.filterLogs(
rx,
chainId,
emitter,
coreRelayerVaa.nonce
)
vaas[deliveryVaaIdx].bytes = coreRelayerVaa.bytes.toString("base64")
// create entry and pending in db
const newEntry: Entry = {
vaas,
chainId,
deliveryVaaIdx,
allFetched: false,
}
const maybeResolvedEntry = await this.fetchEntry(hash, newEntry, this.logger)
if (maybeResolvedEntry.allFetched) {
this.logger.info("Resolved entry immediately")
return {
workflowData: {
payloadId: RelayerPayloadId.Delivery,
deliveryVaaIndex: maybeResolvedEntry.deliveryVaaIdx,
vaas: maybeResolvedEntry.vaas.map((v) => v.bytes),
},
}
}
this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`)
// todo: retry if withKey throws (possible contention with worker process)
await this.addEntryToPendingQueue(hash, newEntry, db)
// do not create workflow until we have collected all VAAs
return {}
// create entry and pending in db
const newEntry: Entry = {
vaas,
chainId,
deliveryVaaIdx,
allFetched: false,
}
const maybeResolvedEntry = await this.fetchEntry(hash, newEntry, this.logger)
if (maybeResolvedEntry.allFetched) {
this.logger.info("Resolved entry immediately")
return {
workflowData: {
payloadId: RelayerPayloadId.Delivery,
deliveryVaaIndex: maybeResolvedEntry.deliveryVaaIdx,
vaas: maybeResolvedEntry.vaas.map((v) => v.bytes),
},
}
}
this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`)
// todo: retry if withKey throws (possible contention with worker process)
await this.addEntryToPendingQueue(hash, newEntry, db)
// do not create workflow until we have collected all VAAs
return {}
}
async addEntryToPendingQueue(hash: string, newEntry: Entry, db: StagingAreaKeyLock) {
@ -410,9 +394,12 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
// @ts-ignore
let oldEntry: Entry | null = kv[hash]
if (oldEntry?.allFetched) {
return { newKV: kv, val: undefined }
return { newKV: kv, val: true }
}
// todo: check that hash is not in pending list already
if (kv[PENDING].findIndex((e) => e.hash === hash) !== -1) {
return { newKV: kv, val: true }
}
const now = Date.now().toString()
kv.pending.push({
nextRetryTime: now,
@ -425,7 +412,9 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
return { newKV: kv, val: true }
}
)
} catch {}
} catch (e) {
this.logger.warn(e)
}
})
}