optimistic pre-fetch

This commit is contained in:
Joe Howarth 2023-01-10 14:52:11 -07:00
parent 704d080075
commit cc240bbd08
1 changed files with 67 additions and 64 deletions

View File

@ -145,83 +145,36 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
if (!kv.resolved) {
kv.resolved = []
logger.debug(`Pending: ${JSON.stringify(kv.pending, undefined, 4)}`)
logger.debug(`Resolved: ${JSON.stringify(kv.resolved, undefined, 4)}`)
// filter to the pending items that are due to be retried
const entriesToFetch = kv.pending.filter(
(delivery) =>
dbg(new Date(JSON.parse(dbg(delivery.nextRetryTime))).getTime()) <
new Date(JSON.parse(delivery.nextRetryTime)).getTime() < Date.now()
if (entriesToFetch.length === 0) {
logger.debug("no entries to fetch")
return { newKV: kv, val: undefined }
logger.info(`Attempting to fetch ${entriesToFetch.length} entries`)
// try fetching vaas
await db.withKey(
// get `Entry`s for each hash
entriesToFetch.map((d) => d.hash),
async (kv: Record<string, Entry>) => {
const promises = Object.entries(kv).map(async ([hash, value]) => {
// track if there are missing vaas after trying to fetch
let hasMissingVaas = false
const vaas = await Promise.all(
value.vaas.map(async ({ emitter, sequence, bytes }, idx) => {
// skip if vaa has already been fetched
if (bytes.length !== 0) {
return { emitter, sequence, bytes }
try {
// try to fetch vaa from guardian rpc
const resp = await wh.getSignedVAA(
value.chainId as wh.EVMChainId,
{ transport: grpcWebNodeHttpTransport.NodeHttpTransport() }
logger.info(`Fetched vaa ${idx} for delivery ${hash}`)
return {
// base64 encode
bytes: Buffer.from(resp.vaaBytes).toString("base64"),
} catch (e) {
hasMissingVaas = true
return { emitter, sequence, bytes: "" }
// if all vaas have been fetched, mark this hash as resolved
if (!hasMissingVaas) {
// todo: remove
logger.debug(`All fetched for ${hash}`)
const deliveryVaa = Buffer.from(
newlyResolved.set(hash, deliveryVaa)
} else {
const entry = entriesToFetch.find((k) => k.hash === hash)!
// todo: remove
logger.debug(`Old entry: ${JSON.stringify(entry, undefined, 2)}`)
entry.numTimesRetried += 1
entry.nextRetryTime = (
Date.now() +
entry.numTimesRetried * 2000
// todo: remove
logger.debug(`New entry: ${JSON.stringify(entry, undefined, 2)}`)
const promises = Object.entries(kv).map(async ([hash, entry]) => {
if (entry.allFetched) {
// nothing to do
logger.warn("Entry in pending but nothing to fetch " + hash)
return [hash, entry]
const newEntry: Entry = await this.fetchEntry(hash, entry, logger)
if (newEntry.allFetched) {
Buffer.from(newEntry.vaas[newEntry.deliveryVaaIdx].bytes, "base64")
const newEntry: Entry = { ...value, vaas, allFetched: !hasMissingVaas }
return [hash, newEntry]
@ -249,6 +202,42 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
async fetchEntry(hash: string, value: Entry, logger: Logger): Promise<Entry> {
// track if there are missing vaas after trying to fetch
let hasMissingVaas = false
const vaas = await Promise.all(
value.vaas.map(async ({ emitter, sequence, bytes }, idx) => {
// skip if vaa has already been fetched
if (bytes.length !== 0) {
return { emitter, sequence, bytes }
try {
// try to fetch vaa from guardian rpc
const resp = await wh.getSignedVAA(
value.chainId as wh.EVMChainId,
{ transport: grpcWebNodeHttpTransport.NodeHttpTransport() }
logger.info(`Fetched vaa ${idx} for delivery ${hash}`)
return {
// base64 encode
bytes: Buffer.from(resp.vaaBytes).toString("base64"),
} catch (e) {
hasMissingVaas = true
return { emitter, sequence, bytes: "" }
// if all vaas have been fetched, mark this hash as resolved
return { ...value, vaas, allFetched: !hasMissingVaas }
// listen to core relayer contract on each chain
getFilters(): ContractFilter[] {
return Array.from(this.pluginConfig.supportedChains.entries()).map(
@ -262,7 +251,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
_providers: Providers
): Promise<{ workflowData?: WorkflowPayload }> {
`Top of consume event ${coreRelayerVaa.sequence.toString()}, hash: ${Buffer.from(
`Consuming event ${coreRelayerVaa.sequence.toString()}, hash: ${Buffer.from(
@ -280,7 +269,7 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
if (fetched?.allFetched) {
// if all vaas have been fetched, kick off workflow
dbg(fetched, "fetched")
this.logger.info(`All fetched, queueing workflow for ${hash}...`)
return dbg(
workflowData: {
@ -291,7 +280,9 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
"workflow from consume event"
} else {
this.logger.debug("Not fetched branch")
`Not fetched, fetching receipt and filtering to synthetic batch for ${hash}...`
const chainId = coreRelayerVaa.emitterChain as wh.EVMChainId
const rx = await this.fetchReceipt(coreRelayerVaa.sequence, chainId)
// parse rx for seqs and emitters
@ -339,6 +330,18 @@ export class GenericRelayerPlugin implements Plugin<WorkflowPayload> {
allFetched: false,
// const maybeResolvedEntry = await this.fetchEntry(hash, newEntry, this.logger)
// if (maybeResolvedEntry.allFetched) {
// this.logger.info("Resolved entry immediately")
// return {
// workflowData: {
// coreRelayerVaaIndex: maybeResolvedEntry.deliveryVaaIdx,
// vaas: maybeResolvedEntry.vaas.map((v) => v.bytes),
// },
// }
// }
this.logger.debug(`Entry: ${JSON.stringify(newEntry, undefined, 4)}`)
await db.withKey(
[hash, "pending"],