solve db deadlock and incorrect wormholeRpc

This commit is contained in:
Joe Howarth 2023-02-08 23:26:16 +00:00 committed by Joe Howarth
parent d34db742b1
commit 2a3283dc8e
4 changed files with 217 additions and 3791 deletions

File diff suppressed because it is too large Load Diff

View File

@ -17,7 +17,7 @@
},
"dependencies": {
"@certusone/wormhole-sdk": "^0.9.6",
"@wormhole-foundation/relayer-engine": "github:wormhole-foundation/relayer-engine#7fb24b5bd9193205494cdee8a9bf82694f7ef42a",
"@wormhole-foundation/relayer-engine": "github:wormhole-foundation/relayer-engine#5ee0aeedb9888bf9a1a0474d3436e4b236975857",
"ts-retry": "^4.1.1"
},
"author": "Chase Moran",

View File

@ -1,5 +1,11 @@
import { EVMChainId } from "@certusone/wormhole-sdk"
import {
EVMChainId,
CONTRACTS,
coalesceChainName,
ChainId,
} from "@certusone/wormhole-sdk"
import * as relayerEngine from "@wormhole-foundation/relayer-engine"
import { EnvType, validateStringEnum } from "@wormhole-foundation/relayer-engine"
import GenericRelayerPluginDef, {
ChainInfo,
GenericRelayerPluginConfig,
@ -12,9 +18,19 @@ type ContractsJson = {
mockIntegrations: ContractConfigEntry[]
}
enum Flag {
Tilt = "--tilt",
Testnet = "--testnet",
K8sTestnet = "--k8s-testnet",
Mainnet = "--mainnet",
}
async function main() {
// todo: turn flag into enum
const flag: Flag = validateStringEnum(Flag, process.argv[2])
// load plugin config
const envType = selectPluginConfig(process.argv[2] ?? "")
const envType = selectPluginConfig(flag)
const pluginConfig = (await relayerEngine.loadFileAndParseToObject(
`./src/plugin/config/${envType}.json`
)) as GenericRelayerPluginConfig
@ -25,7 +41,8 @@ async function main() {
)) as ContractsJson
pluginConfig.supportedChains = transfromContractsToSupportedChains(
contracts,
pluginConfig.supportedChains as any
pluginConfig.supportedChains as any,
flag
) as any
// run relayer engine
@ -36,24 +53,10 @@ async function main() {
})
}
function selectPluginConfig(flag: string): string {
switch (flag) {
case "--testnet":
return relayerEngine.EnvType.DEVNET.toLowerCase()
case "--mainnet":
return relayerEngine.EnvType.MAINNET.toLowerCase()
case "--tilt":
return relayerEngine.EnvType.TILT.toLowerCase()
case "--k8s-testnet":
return "k8s-testnet"
default:
return relayerEngine.EnvType.TILT.toLowerCase()
}
}
function transfromContractsToSupportedChains(
contracts: ContractsJson,
supportedChains: Record<EVMChainId, ChainInfo>
supportedChains: Record<EVMChainId, ChainInfo>,
flag: Flag
): Record<EVMChainId, ChainInfo> {
contracts.relayProviders.forEach(
({ chainId, address }: ContractConfigEntry) =>
@ -67,9 +70,44 @@ function transfromContractsToSupportedChains(
({ chainId, address }: ContractConfigEntry) =>
(supportedChains[chainId].mockIntegrationContractAddress = address)
)
const whContracts = CONTRACTS[flagToWormholeContracts(flag)]
for (const [chain, entry] of Object.entries(supportedChains)) {
const chainName = coalesceChainName(Number(chain) as ChainId)
entry.coreContract = whContracts[chainName].core!
}
return supportedChains
}
function selectPluginConfig(flag: Flag): string {
switch (flag) {
case Flag.Testnet:
return relayerEngine.EnvType.DEVNET.toLowerCase()
case Flag.Mainnet:
return relayerEngine.EnvType.MAINNET.toLowerCase()
case Flag.Tilt:
return relayerEngine.EnvType.TILT.toLowerCase()
case Flag.K8sTestnet:
return "k8s-testnet"
default:
return relayerEngine.EnvType.TILT.toLowerCase()
}
}
function flagToWormholeContracts(flag: string): "MAINNET" | "TESTNET" | "DEVNET" {
switch (flag) {
case Flag.K8sTestnet:
return "TESTNET"
case Flag.Testnet:
return "TESTNET"
case Flag.Mainnet:
return "MAINNET"
case Flag.Tilt:
return "DEVNET"
default:
throw new Error("Unexpected flag ")
}
}
// allow main to be an async function and block until it rejects or resolves
main().catch((e) => {
console.error(e)

View File

@ -38,13 +38,11 @@ import * as grpcWebNodeHttpTransport from "@improbable-eng/grpc-web-node-http-tr
import { retryAsyncUntilDefined } from "ts-retry/lib/cjs/retry"
import { hexToNativeStringAlgorand } from "@certusone/wormhole-sdk/lib/cjs/algorand"
const wormholeRpc = "https://wormhole-v2-testnet-api.certus.one"
let PLUGIN_NAME: string = "GenericRelayerPlugin"
export interface ChainInfo {
relayProvider: string
coreContract?: IWormhole
coreContract: string
relayerAddress: string
mockIntegrationContractAddress: string
}
@ -109,7 +107,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
pluginConfig: GenericRelayerPluginConfig
constructor(
readonly engineConfig: CommonPluginEnv,
readonly engineConfig: CommonPluginEnv & { wormholeRpc: string },
pluginConfigRaw: Record<string, any>,
readonly logger: Logger
) {
@ -127,14 +125,10 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
) {
// connect to the core wh contract for each chain
for (const [chainId, info] of this.pluginConfig.supportedChains.entries()) {
const chainName = wh.coalesceChainName(chainId)
const { core } = wh.CONTRACTS.TESTNET[chainName]
if (!core || !wh.isEVMChain(chainId)) {
this.logger.error("No known core contract for chain", chainName)
throw new PluginError("No known core contract for chain", { chainName })
const { coreContract } = info
if (!coreContract || !wh.isEVMChain(chainId)) {
throw new PluginError("No known core contract for chain", { chainId })
}
const provider = providers.evm[chainId as wh.EVMChainId]
info.coreContract = IWormhole__factory.connect(core, provider)
}
if (listenerResources) {
@ -156,7 +150,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
// track which delivery vaa hashes have all vaas ready this iteration
let newlyResolved = new Map<string, Entry>()
await db.withKey([PENDING], async (kv: { [PENDING]?: Pending[] }) => {
await db.withKey([PENDING], async (kv: { [PENDING]?: Pending[] }, tx) => {
// if objects have not been created, initialize
if (!kv.pending) {
kv.pending = []
@ -192,7 +186,8 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
const newKV = Object.fromEntries(await Promise.all(promises))
return { newKV, val: undefined }
}
},
tx
)
kv.pending = kv.pending.filter((p) => !newlyResolved.has(p.hash))
@ -212,6 +207,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
async fetchEntry(hash: string, value: Entry, logger: Logger): Promise<Entry> {
// track if there are missing vaas after trying to fetch
this.logger.info("Fetching entry...", { hash })
let hasMissingVaas = false
const vaas = await Promise.all(
value.vaas.map(async ({ emitter, sequence, bytes }, idx) => {
@ -222,7 +218,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
try {
// try to fetch vaa from guardian rpc
const resp = await wh.getSignedVAA(
wormholeRpc,
this.engineConfig.wormholeRpc,
value.chainId as wh.EVMChainId,
emitter,
sequence,
@ -256,7 +252,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
async consumeEvent(
coreRelayerVaa: ParsedVaaWithBytes,
db: StagingAreaKeyLock,
_providers: Providers
providers: Providers
): Promise<{ workflowData: WorkflowPayload } | undefined> {
this.logger.debug(
`Consuming event from chain ${
@ -285,24 +281,24 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
switch (payloadId) {
case RelayerPayloadId.Delivery:
return this.consumeDeliveryEvent(coreRelayerVaa, db, hash)
return this.consumeDeliveryEvent(coreRelayerVaa, db, hash, providers)
case RelayerPayloadId.Redelivery:
return this.consumeRedeliveryEvent(coreRelayerVaa, db, hash)
return this.consumeRedeliveryEvent(coreRelayerVaa, db, providers)
}
}
async consumeRedeliveryEvent(
redeliveryVaa: ParsedVaaWithBytes,
db: StagingAreaKeyLock,
hash: string
providers: Providers
): Promise<{ workflowData: WorkflowPayload } | undefined> {
const redeliveryInstruction = parseRedeliveryByTxHashInstruction(
redeliveryVaa.payload
)
const chainId = redeliveryInstruction.sourceChain as wh.EVMChainId
const provider = providers.evm[chainId]
const config = this.pluginConfig.supportedChains.get(chainId)!
const coreWHContract = config.coreContract!
const rx = await coreWHContract.provider.getTransactionReceipt(
const rx = await provider.getTransactionReceipt(
ethers.utils.hexlify(redeliveryInstruction.sourceTxHash, {
allowMissingPrefix: true,
})
@ -336,13 +332,15 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
async consumeDeliveryEvent(
coreRelayerVaa: ParsedVaaWithBytes,
db: StagingAreaKeyLock,
hash: string
hash: string,
providers: Providers
): Promise<{ workflowData: WorkflowPayload } | undefined> {
this.logger.info(
`Not fetched, fetching receipt and filtering to synthetic batch for ${hash}...`
)
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId)
const provider = providers.evm[chainId]
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId, provider)
const emitter = wh.tryNativeToHexString(
wh.tryUint8ArrayToNative(coreRelayerVaa.emitterAddress, "ethereum"),
@ -422,15 +420,12 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
// fetch the contract transaction receipt for the given sequence number emitted by the core relayer contract
async fetchReceipt(
sequence: BigInt,
chainId: wh.EVMChainId
chainId: wh.EVMChainId,
provider: ethers.providers.Provider
): Promise<ethers.ContractReceipt> {
const config = this.pluginConfig.supportedChains.get(chainId)!
const coreWHContract = config.coreContract!
const coreWHContract = IWormhole__factory.connect(config.coreContract!, provider)
const filter = coreWHContract.filters.LogMessagePublished(config.relayerAddress)
this.logger.info(`Relayer address: ${config.relayerAddress}`)
console.log(JSON.stringify(coreWHContract.provider, undefined, 2))
const blockNumber = await coreWHContract.provider.getBlockNumber()
for (let i = 0; i < 20; ++i) {
let paginatedLogs
@ -443,7 +438,6 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
blockNumber - i * 20
)
}
console.log(paginatedLogs)
const log = paginatedLogs.find(
(log) => log.args.sequence.toString() === sequence.toString()
)
@ -455,7 +449,6 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
return await retryAsyncUntilDefined(
async () => {
const paginatedLogs = await coreWHContract.queryFilter(filter, -50)
console.log(paginatedLogs)
const log = paginatedLogs.find(
(log) => log.args.sequence.toString() === sequence.toString()
)
@ -486,8 +479,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
} {
const onlyVAALogs = rx.logs.filter(
(log) =>
log.address ===
this.pluginConfig.supportedChains.get(chainId)?.coreContract?.address
log.address === this.pluginConfig.supportedChains.get(chainId)?.coreContract
)
const vaas = onlyVAALogs.flatMap((bridgeLog: ethers.providers.Log) => {
const iface = Implementation__factory.createInterface()