redelivery code drafted, testing inconclusive

This commit is contained in:
Joe Howarth 2023-01-19 17:15:48 -07:00 committed by chase-45
parent 312df2d862
commit 387450aa5c
2 changed files with 96 additions and 48 deletions

View File

@ -9,7 +9,6 @@ metadata:
labels:
app: spy
spec:
# type: LoadBalancer
selector:
app: spy
ports:
@ -39,20 +38,20 @@ spec:
containers:
- name: spy
image: ghcr.io/wormhole-foundation/guardiand:latest
args:
- spy
- --nodeKey
- /node.key
- --spyRPC
- "[::]:7073"
- --network
- /wormhole/testnet/2/1
- --bootstrap
- /dns4/wormhole-testnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWAkB9ynDur1Jtoa97LBUp8RXdhzS5uHgAfdTquJbrbN7i
args:
- spy
- --nodeKey
- /node.key
- --spyRPC
- "[::]:7073"
- --network
- /wormhole/testnet/2/1
- --bootstrap
- /dns4/wormhole-testnet-v2-bootstrap.certus.one/udp/8999/quic/p2p/12D3KooWAkB9ynDur1Jtoa97LBUp8RXdhzS5uHgAfdTquJbrbN7i
resources:
limits:
memory: 256Mi
cpu: 500m
requests:
memory: 128Mi
cpu: 250m
cpu: 250m

View File

@ -35,6 +35,7 @@ import * as ethers from "ethers"
import { Implementation__factory } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts"
import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-transport"
import { retryAsyncUntilDefined } from "ts-retry/lib/cjs/retry"
import { entropyToMnemonic } from "ethers/lib/utils"
const wormholeRpc = "https://wormhole-v2-testnet-api.certus.one"
@ -290,9 +291,45 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
}
async consumeRedeliveryEvent(
coreRelayerVaa: ParsedVaaWithBytes,
redeliveryVaa: ParsedVaaWithBytes,
db: StagingAreaKeyLock
): Promise<{ workflowData?: WorkflowPayload }> {}
): Promise<{ workflowData?: WorkflowPayload }> {
const redeliveryInstruction = parseRedeliveryByTxHashInstruction(
redeliveryVaa.payload
)
const chainId = redeliveryInstruction.sourceChain as wh.EVMChainId
const config = this.pluginConfig.supportedChains.get(chainId)!
const coreWHContract = config.coreContract!
const rx = await coreWHContract.provider.getTransactionReceipt(
ethers.utils.hexlify(redeliveryInstruction.sourceTxHash, {
allowMissingPrefix: true,
})
)
const { vaas, deliveryVaaIdx } = this.filterLogs(
rx,
chainId,
wh.tryNativeToHexString(config.relayerAddress, "ethereum"),
redeliveryInstruction.sourceNonce.toNumber()
)
// create entry and pending in db
const newEntry: Entry = {
vaas,
chainId,
deliveryVaaIdx,
redeliveryVaa: redeliveryVaa.bytes.toString("base64"),
allFetched: false,
}
this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`)
await this.addEntryToPendingQueue(
Buffer.from(redeliveryVaa.hash).toString("base64"),
newEntry,
db
)
return {} as any
}
async consumeDeliveryEvent(
coreRelayerVaa: ParsedVaaWithBytes,
@ -320,9 +357,17 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
)
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId)
// parse rx for seqs and emitters
const { vaas, deliveryVaaIdx } = this.filterLogs(rx, chainId, coreRelayerVaa)
const emitter = wh.tryNativeToHexString(
wh.tryUint8ArrayToNative(coreRelayerVaa.emitterAddress, "ethereum"),
"ethereum"
)
const { vaas, deliveryVaaIdx } = this.filterLogs(
rx,
chainId,
emitter,
coreRelayerVaa.nonce
)
vaas[deliveryVaaIdx].bytes = coreRelayerVaa.bytes.toString("base64")
// create entry and pending in db
@ -346,34 +391,44 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
}
this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`)
await db.withKey(
[hash, PENDING],
// note _hash is actually the value of the variable `hash`, but ts will not
// let this be expressed
async (kv: { [PENDING]: Pending[]; _hash: Entry }) => {
// @ts-ignore
let oldEntry: Entry | null = kv[hash]
if (oldEntry?.allFetched) {
return { newKV: kv, val: undefined }
}
const now = Date.now().toString()
kv.pending.push({
nextRetryTime: now,
numTimesRetried: 0,
startTime: now,
hash,
})
// @ts-ignore
kv[hash] = newEntry
return { newKV: kv, val: undefined }
}
)
// todo: retry if withKey throws (possible contention with worker process)
await this.addEntryToPendingQueue(hash, newEntry, db)
// do not create workflow until we have collected all VAAs
return {}
}
}
async addEntryToPendingQueue(hash: string, newEntry: Entry, db: StagingAreaKeyLock) {
await retryAsyncUntilDefined(async () => {
try {
return db.withKey(
[hash, PENDING],
// note _hash is actually the value of the variable `hash`, but ts will not
// let this be expressed
async (kv: { [PENDING]: Pending[]; _hash: Entry }) => {
// @ts-ignore
let oldEntry: Entry | null = kv[hash]
if (oldEntry?.allFetched) {
return { newKV: kv, val: undefined }
}
// todo: check that hash is not in pending list already
const now = Date.now().toString()
kv.pending.push({
nextRetryTime: now,
numTimesRetried: 0,
startTime: now,
hash,
})
// @ts-ignore
kv[hash] = newEntry
return { newKV: kv, val: true }
}
)
} catch {}
})
}
// fetch the contract transaction receipt for the given sequence number emitted by the core relayer contract
async fetchReceipt(
sequence: BigInt,
@ -424,7 +479,8 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
filterLogs(
rx: ethers.ContractReceipt,
chainId: wh.EVMChainId,
coreRelayerVaa: ParsedVaaWithBytes
emitterAddress: string, //hex
nonce: number
): {
vaas: {
sequence: string
@ -442,7 +498,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
const iface = Implementation__factory.createInterface()
const log = iface.parseLog(bridgeLog) as unknown as LogMessagePublishedEvent
// filter down to just synthetic batch
if (log.args.nonce !== coreRelayerVaa.nonce) {
if (log.args.nonce !== nonce) {
return []
}
return [
@ -454,14 +510,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
]
})
this.logger.debug(vaas)
const deliveryVaaIdx = vaas.findIndex(
(vaa) =>
vaa.emitter ===
wh.tryNativeToHexString(
wh.tryUint8ArrayToNative(coreRelayerVaa.emitterAddress, "ethereum"),
"ethereum"
)
)
const deliveryVaaIdx = vaas.findIndex((vaa) => vaa.emitter === emitterAddress)
if (deliveryVaaIdx === -1) {
throw new PluginError("CoreRelayerVaa not found in fetched vaas", {
vaas,