feat: Add solana vaa logs support (#656)

This commit is contained in:
Ricardo Olarte 2023-08-23 10:18:33 -05:00 committed by GitHub
parent 503a9da5d3
commit 23d349c9b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 376 additions and 167 deletions

View File

@ -47,13 +47,13 @@ export const evmChains: EVMChainName[] = [
export const supportedChains: ChainName[] = [ export const supportedChains: ChainName[] = [
...evmChains, ...evmChains,
'algorand', // 'algorand',
'aptos', // 'aptos',
'injective', // 'injective',
'near', // 'near',
'solana', 'solana',
'sui', // 'sui',
'terra', // 'terra',
'terra2', // 'terra2',
'xpla', // 'xpla',
]; ];

View File

@ -7,13 +7,13 @@ import { VaaLog } from './types';
const ENCODING = 'utf8'; const ENCODING = 'utf8';
export default class JsonDB extends BaseDB { export default class JsonDB extends BaseDB {
db: {} | null = null; db: VaaLog[] = [];
dbFile: string; dbFile: string;
dbLastBlockFile: string; dbLastBlockFile: string;
constructor() { constructor() {
super(); super();
this.db = {}; this.db = [];
this.lastBlockByChain = {}; this.lastBlockByChain = {};
this.dbFile = env.JSON_DB_FILE; this.dbFile = env.JSON_DB_FILE;
this.dbLastBlockFile = env.JSON_LAST_BLOCK_FILE; this.dbLastBlockFile = env.JSON_LAST_BLOCK_FILE;
@ -22,11 +22,11 @@ export default class JsonDB extends BaseDB {
async connect(): Promise<void> { async connect(): Promise<void> {
try { try {
const rawDb = readFileSync(this.dbFile, ENCODING); const rawDb = readFileSync(this.dbFile, ENCODING);
this.db = rawDb ? JSON.parse(rawDb) : {}; this.db = rawDb ? JSON.parse(rawDb) : [];
console.log('---CONNECTED TO JsonDB---'); console.log('---CONNECTED TO JsonDB---');
} catch (e) { } catch (e) {
this.logger.warn(`${this.dbFile} does not exists, creating new file`); this.logger.warn(`${this.dbFile} does not exists, creating new file`);
this.db = {}; this.db = [];
} }
} }
@ -41,7 +41,7 @@ export default class JsonDB extends BaseDB {
} }
override async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void> { override async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void> {
this.db = [{ ...this.db, ...vaaLogs }]; this.db = [...this.db, ...vaaLogs];
writeFileSync(this.dbFile, JSON.stringify(this.db, null, 2), ENCODING); writeFileSync(this.dbFile, JSON.stringify(this.db, null, 2), ENCODING);
} }

View File

@ -13,16 +13,17 @@ export interface DBImplementation {
storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void>; storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void>;
} }
export type VaasByBlock = { [blockInfo: string]: string[] };
export interface VaaLog { export interface VaaLog {
vaaId: string; vaaId: string;
chainId: number; chainId: number;
chainName: string; chainName: string;
emitter: string; emitter: string;
sequence: number; sequence: number | string;
txHash: string; txHash: string | null;
sender: string; sender: string | null;
payload: any; payload: any;
blockNumber: number; blockNumber: number | string | null;
indexedAt?: string | number; indexedAt?: string | number;
updatedAt?: string | number; updatedAt?: string | number;
createdAt?: string | number; createdAt?: string | number;

View File

@ -3,7 +3,7 @@ import { MAX_UINT_64, padUint16, padUint64 } from '../common';
import JsonDB from './JsonDB'; import JsonDB from './JsonDB';
import MongoDB from './MongoDB'; import MongoDB from './MongoDB';
import { env } from '../config'; import { env } from '../config';
import { DBOptionTypes } from './types'; import { DBOptionTypes, VaaLog } from './types';
// Bigtable Message ID format // Bigtable Message ID format
// chain/MAX_UINT64-block/emitter/sequence // chain/MAX_UINT64-block/emitter/sequence
@ -49,3 +49,30 @@ export const makeVaaKey = (
emitter: string, emitter: string,
seq: string, seq: string,
): string => `${transactionHash}:${coalesceChainId(chain)}/${emitter}/${seq}`; ): string => `${transactionHash}:${coalesceChainId(chain)}/${emitter}/${seq}`;
export const makeVaaLog = ({
chainName,
emitter,
sequence,
txHash,
sender,
blockNumber,
payload,
}: Omit<VaaLog, 'vaaId' | 'chainId'>): VaaLog => {
const chainId = coalesceChainId(chainName as ChainName);
return {
vaaId: `${chainId}/${emitter}/${sequence}`,
chainId: chainId,
chainName,
emitter,
sequence,
txHash,
sender,
payload,
blockNumber,
indexedAt: new Date().getTime(),
updatedAt: new Date().getTime(),
createdAt: new Date().getTime(),
};
};

View File

@ -6,7 +6,7 @@ import { getSNS } from './services/SNS/utils';
import { makeFinalizedWatcher } from './watchers/utils'; import { makeFinalizedWatcher } from './watchers/utils';
import { InfrastructureController } from './infrastructure/infrastructure.controller'; import { InfrastructureController } from './infrastructure/infrastructure.controller';
import { createServer } from './builder/server'; import { createServer } from './builder/server';
import { env, evmChains } from './config'; import { env, supportedChains } from './config';
import { DBOptionTypes } from './databases/types'; import { DBOptionTypes } from './databases/types';
import { SNSOptionTypes } from './services/SNS/types'; import { SNSOptionTypes } from './services/SNS/types';
class EventWatcher { class EventWatcher {
@ -36,13 +36,18 @@ class EventWatcher {
async run() { async run() {
await this.db.start(); await this.db.start();
// for (const chain of supportedChains) { for (const chain of supportedChains) {
for (const chain of evmChains) {
const watcher = makeFinalizedWatcher(chain); const watcher = makeFinalizedWatcher(chain);
watcher.setDB(this.db); watcher.setDB(this.db);
watcher.setServices(this.sns); watcher.setServices(this.sns);
watcher.watch(); watcher.watch();
} }
// TEST
// const watcher = makeFinalizedWatcher('solana');
// watcher.setDB(this.db);
// watcher.setServices(this.sns);
// watcher.watch();
} }
} }

View File

@ -2,7 +2,7 @@ import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN, sleep } from '../common'; import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN, sleep } from '../common';
import { z } from 'zod'; import { z } from 'zod';
import { TIMEOUT } from '../consts'; import { TIMEOUT } from '../consts';
import { DBOptionTypes, VaaLog } from '../databases/types'; import { DBOptionTypes, VaaLog, VaasByBlock } from '../databases/types';
import { getLogger, WormholeLogger } from '../utils/logger'; import { getLogger, WormholeLogger } from '../utils/logger';
import { SNSInput, SNSOptionTypes } from '../services/SNS/types'; import { SNSInput, SNSOptionTypes } from '../services/SNS/types';
import { WatcherImplementation } from './types'; import { WatcherImplementation } from './types';
@ -25,6 +25,10 @@ abstract class BaseWatcher implements WatcherImplementation {
this.sns = sns; this.sns = sns;
} }
getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
throw new Error('Method not implemented.');
}
abstract getFinalizedBlockNumber(): Promise<number>; abstract getFinalizedBlockNumber(): Promise<number>;
abstract getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]>; abstract getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]>;

View File

@ -8,8 +8,9 @@ import { Log } from '@ethersproject/abstract-provider';
import axios from 'axios'; import axios from 'axios';
import { BigNumber } from 'ethers'; import { BigNumber } from 'ethers';
import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts'; import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts';
import { VaaLog } from '../databases/types'; import { VaaLog, VaasByBlock } from '../databases/types';
import BaseWatcher from './BaseWatcher'; import BaseWatcher from './BaseWatcher';
import { makeBlockKey, makeVaaKey, makeVaaLog } from '../databases/utils';
// This is the hash for topic[0] of the core contract event LogMessagePublished // This is the hash for topic[0] of the core contract event LogMessagePublished
// https://github.com/wormhole-foundation/wormhole/blob/main/ethereum/contracts/Implementation.sol#L12 // https://github.com/wormhole-foundation/wormhole/blob/main/ethereum/contracts/Implementation.sol#L12
@ -207,6 +208,36 @@ export class EVMWatcher extends BaseWatcher {
return block.number; return block.number;
} }
override async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
const address = CONTRACTS.MAINNET[this.chain].core;
if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
}
const logs = await this.getLogs(fromBlock, toBlock, address, [LOG_MESSAGE_PUBLISHED_TOPIC]);
const timestampsByBlock: { [block: number]: string } = {};
// fetch timestamps for each block
const vaasByBlock: VaasByBlock = {};
this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const blocks = await this.getBlocks(fromBlock, toBlock);
for (const block of blocks) {
const timestamp = new Date(block.timestamp * 1000).toISOString();
timestampsByBlock[block.number] = timestamp;
vaasByBlock[makeBlockKey(block.number.toString(), timestamp)] = [];
}
this.logger.info(`processing ${logs.length} logs`);
for (const log of logs) {
const blockNumber = log.blockNumber;
const emitter = log.topics[1].slice(2);
const {
args: { sequence },
} = wormholeInterface.parseLog(log);
const vaaKey = makeVaaKey(log.transactionHash, this.chain, emitter, sequence.toString());
const blockKey = makeBlockKey(blockNumber.toString(), timestampsByBlock[blockNumber]);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
return vaasByBlock;
}
override async getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> { override async getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
const vaaLogs: VaaLog[] = []; const vaaLogs: VaaLog[] = [];
const address = CONTRACTS.MAINNET[this.chain].core; const address = CONTRACTS.MAINNET[this.chain].core;
@ -224,26 +255,21 @@ export class EVMWatcher extends BaseWatcher {
const { args } = wormholeInterface.parseLog(log); const { args } = wormholeInterface.parseLog(log);
const { sequence, sender, payload } = args || {}; const { sequence, sender, payload } = args || {};
const chainName = this.chain;
const blockNumber = log.blockNumber; const blockNumber = log.blockNumber;
const chainName = this.chain;
const emitter = log.topics[1].slice(2); const emitter = log.topics[1].slice(2);
const chainId = coalesceChainId(this.chain); const parseSequence = sequence.toString();
const vaaId = `${chainId}/${emitter}/${sequence.toString()}`; const txHash = log.transactionHash;
const vaaLog: VaaLog = { const vaaLog = makeVaaLog({
vaaId,
chainName, chainName,
chainId,
emitter, emitter,
sequence: sequence.toString(), sequence: parseSequence,
txHash: log.transactionHash, txHash,
sender, sender,
payload,
blockNumber, blockNumber,
indexedAt: new Date().getTime(), payload,
updatedAt: new Date().getTime(), });
createdAt: new Date().getTime(),
};
vaaLogs.push(vaaLog); vaaLogs.push(vaaLog);
} }

View File

@ -1,5 +1,9 @@
import { getPostedMessage } from '@certusone/wormhole-sdk/lib/cjs/solana/wormhole'; import { getPostedMessage } from '@certusone/wormhole-sdk/lib/cjs/solana/wormhole';
import { CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts'; import {
coalesceChainId,
CONTRACTS,
ChainName,
} from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { import {
Commitment, Commitment,
ConfirmedSignatureInfo, ConfirmedSignatureInfo,
@ -11,8 +15,8 @@ import {
import { decode } from 'bs58'; import { decode } from 'bs58';
import { z } from 'zod'; import { z } from 'zod';
import { RPCS_BY_CHAIN } from '../consts'; import { RPCS_BY_CHAIN } from '../consts';
import { VaaLog } from '../databases/types'; import { VaaLog, VaasByBlock } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils'; import { makeBlockKey, makeVaaKey, makeVaaLog } from '../databases/utils';
import { isLegacyMessage, normalizeCompileInstruction } from '../utils/solana'; import { isLegacyMessage, normalizeCompileInstruction } from '../utils/solana';
import BaseWatcher from './BaseWatcher'; import BaseWatcher from './BaseWatcher';
@ -41,143 +45,284 @@ export class SolanaWatcher extends BaseWatcher {
return connection.getSlot(); return connection.getSlot();
} }
// async getMessagesForBlocks(fromSlot: number, toSlot: number): Promise<VaasByBlock> { override async getMessagesForBlocks(fromSlot: number, toSlot: number): Promise<VaasByBlock> {
// const connection = new Connection(this.rpc, COMMITMENT); const connection = new Connection(this.rpc, COMMITMENT);
// // in the rare case of maximumBatchSize skipped blocks in a row, // in the rare case of maximumBatchSize skipped blocks in a row,
// // you might hit this error due to the recursion below // you might hit this error due to the recursion below
// if (fromSlot > toSlot) throw new Error('solana: invalid block range'); if (fromSlot > toSlot) throw new Error('solana: invalid block range');
// this.logger.info(`fetching info for blocks ${fromSlot} to ${toSlot}`); this.logger.info(`fetching info for blocks ${fromSlot} to ${toSlot}`);
// const vaasByBlock: VaasByBlock = {}; const vaasByBlock: VaasByBlock = {};
// // identify block range by fetching signatures of the first and last transactions // identify block range by fetching signatures of the first and last transactions
// // getSignaturesForAddress walks backwards so fromSignature occurs after toSignature // getSignaturesForAddress walks backwards so fromSignature occurs after toSignature
// let toBlock: VersionedBlockResponse | null = null; let toBlock: VersionedBlockResponse | null = null;
// try { try {
// toBlock = await connection.getBlock(toSlot, { maxSupportedTransactionVersion: 0 }); toBlock = await connection.getBlock(toSlot, { maxSupportedTransactionVersion: 0 });
// } catch (e) { } catch (e) {
// if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) { if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) {
// // failed to get confirmed block: slot was skipped or missing in long-term storage // failed to get confirmed block: slot was skipped or missing in long-term storage
// return this.getMessagesForBlocks(fromSlot, toSlot - 1); return this.getMessagesForBlocks(fromSlot, toSlot - 1);
// } else { } else {
// throw e; throw e;
// } }
// } }
// if (!toBlock || !toBlock.blockTime || toBlock.transactions.length === 0) { if (!toBlock || !toBlock.blockTime || toBlock.transactions.length === 0) {
// return this.getMessagesForBlocks(fromSlot, toSlot - 1); return this.getMessagesForBlocks(fromSlot, toSlot - 1);
// } }
// const fromSignature = const fromSignature =
// toBlock.transactions[toBlock.transactions.length - 1].transaction.signatures[0]; toBlock.transactions[toBlock.transactions.length - 1].transaction.signatures[0];
// let fromBlock: VersionedBlockResponse | null = null; let fromBlock: VersionedBlockResponse | null = null;
// try { try {
// fromBlock = await connection.getBlock(fromSlot, { maxSupportedTransactionVersion: 0 }); fromBlock = await connection.getBlock(fromSlot, { maxSupportedTransactionVersion: 0 });
// } catch (e) { } catch (e) {
// if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) { if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) {
// // failed to get confirmed block: slot was skipped or missing in long-term storage // failed to get confirmed block: slot was skipped or missing in long-term storage
// return this.getMessagesForBlocks(fromSlot + 1, toSlot); return this.getMessagesForBlocks(fromSlot + 1, toSlot);
// } else { } else {
// throw e; throw e;
// } }
// } }
// if (!fromBlock || !fromBlock.blockTime || fromBlock.transactions.length === 0) { if (!fromBlock || !fromBlock.blockTime || fromBlock.transactions.length === 0) {
// return this.getMessagesForBlocks(fromSlot + 1, toSlot); return this.getMessagesForBlocks(fromSlot + 1, toSlot);
// } }
// const toSignature = fromBlock.transactions[0].transaction.signatures[0]; const toSignature = fromBlock.transactions[0].transaction.signatures[0];
// // get all core bridge signatures between fromTransaction and toTransaction // get all core bridge signatures between fromTransaction and toTransaction
// let numSignatures = this.getSignaturesLimit; let numSignatures = this.getSignaturesLimit;
// let currSignature: string | undefined = fromSignature; let currSignature: string | undefined = fromSignature;
// while (numSignatures === this.getSignaturesLimit) { while (numSignatures === this.getSignaturesLimit) {
// const signatures: ConfirmedSignatureInfo[] = await connection.getSignaturesForAddress( const signatures: ConfirmedSignatureInfo[] = await connection.getSignaturesForAddress(
// new PublicKey(WORMHOLE_PROGRAM_ID), new PublicKey(WORMHOLE_PROGRAM_ID),
// { {
// before: currSignature, before: currSignature,
// until: toSignature, until: toSignature,
// limit: this.getSignaturesLimit, limit: this.getSignaturesLimit,
// } },
// ); );
// this.logger.info(`processing ${signatures.length} transactions`); this.logger.info(`processing ${signatures.length} transactions`);
// // In order to determine if a transaction has a Wormhole message, we normalize and iterate // In order to determine if a transaction has a Wormhole message, we normalize and iterate
// // through all instructions in the transaction. Only PostMessage instructions are relevant // through all instructions in the transaction. Only PostMessage instructions are relevant
// // when looking for messages. PostMessageUnreliable instructions are ignored because there // when looking for messages. PostMessageUnreliable instructions are ignored because there
// // are no data availability guarantees (ie the associated message accounts may have been // are no data availability guarantees (ie the associated message accounts may have been
// // reused, overwriting previous data). Then, the message account is the account given by // reused, overwriting previous data). Then, the message account is the account given by
// // the second index in the instruction's account key indices. From here, we can fetch the // the second index in the instruction's account key indices. From here, we can fetch the
// // message data from the account and parse out the emitter and sequence. // message data from the account and parse out the emitter and sequence.
// const results = await connection.getTransactions( const results = await connection.getTransactions(
// signatures.map((s) => s.signature), signatures.map((s) => s.signature),
// { {
// maxSupportedTransactionVersion: 0, maxSupportedTransactionVersion: 0,
// } },
// ); );
// if (results.length !== signatures.length) { if (results.length !== signatures.length) {
// throw new Error(`solana: failed to fetch tx for signatures`); throw new Error(`solana: failed to fetch tx for signatures`);
// } }
// for (const res of results) { for (const res of results) {
// if (res?.meta?.err) { if (res?.meta?.err) {
// // skip errored txs // skip errored txs
// continue; continue;
// } }
// if (!res || !res.blockTime) { if (!res || !res.blockTime) {
// throw new Error( throw new Error(
// `solana: failed to fetch tx for signature ${ `solana: failed to fetch tx for signature ${
// res?.transaction.signatures[0] || 'unknown' res?.transaction.signatures[0] || 'unknown'
// }` }`,
// ); );
// } }
// const message = res.transaction.message; const message = res.transaction.message;
// const accountKeys = isLegacyMessage(message) const accountKeys = isLegacyMessage(message)
// ? message.accountKeys ? message.accountKeys
// : message.staticAccountKeys; : message.staticAccountKeys;
// const programIdIndex = accountKeys.findIndex((i) => i.toBase58() === WORMHOLE_PROGRAM_ID); const programIdIndex = accountKeys.findIndex((i) => i.toBase58() === WORMHOLE_PROGRAM_ID);
// const instructions = message.compiledInstructions; const instructions = message.compiledInstructions;
// const innerInstructions = const innerInstructions =
// res.meta?.innerInstructions?.flatMap((i) => res.meta?.innerInstructions?.flatMap((i) =>
// i.instructions.map(normalizeCompileInstruction) i.instructions.map(normalizeCompileInstruction),
// ) || []; ) || [];
// const whInstructions = innerInstructions const whInstructions = innerInstructions
// .concat(instructions) .concat(instructions)
// .filter((i) => i.programIdIndex === programIdIndex); .filter((i) => i.programIdIndex === programIdIndex);
// for (const instruction of whInstructions) { for (const instruction of whInstructions) {
// // skip if not postMessage instruction // skip if not postMessage instruction
// const instructionId = instruction.data; const instructionId = instruction.data;
// if (instructionId[0] !== 0x01) continue; if (instructionId[0] !== 0x01) continue;
// const accountId = accountKeys[instruction.accountKeyIndexes[1]]; const accountId = accountKeys[instruction.accountKeyIndexes[1]];
// const { const {
// message: { emitterAddress, sequence }, message: { emitterAddress, sequence },
// } = await getPostedMessage(connection, accountId.toBase58(), COMMITMENT); } = await getPostedMessage(connection, accountId.toBase58(), COMMITMENT);
// const blockKey = makeBlockKey( const blockKey = makeBlockKey(
// res.slot.toString(), res.slot.toString(),
// new Date(res.blockTime * 1000).toISOString() new Date(res.blockTime * 1000).toISOString(),
// ); );
// const vaaKey = makeVaaKey( const vaaKey = makeVaaKey(
// res.transaction.signatures[0], res.transaction.signatures[0],
// this.chain, this.chain,
// emitterAddress.toString('hex'), emitterAddress.toString('hex'),
// sequence.toString() sequence.toString(),
// ); );
// vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey]; vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
// } }
// } }
// numSignatures = signatures.length; numSignatures = signatures.length;
// currSignature = signatures.at(-1)?.signature; currSignature = signatures.at(-1)?.signature;
// } }
// // add last block for storeVaasByBlock // add last block for storeVaasByBlock
// const lastBlockKey = makeBlockKey( const lastBlockKey = makeBlockKey(
// toSlot.toString(), toSlot.toString(),
// new Date(toBlock.blockTime * 1000).toISOString() new Date(toBlock.blockTime * 1000).toISOString(),
// ); );
// return { [lastBlockKey]: [], ...vaasByBlock }; return { [lastBlockKey]: [], ...vaasByBlock };
// } }
override getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> { override async getVaaLogs(fromSlot: number, toSlot: number): Promise<VaaLog[]> {
throw new Error('Not Implemented'); const vaaLogs: VaaLog[] = [];
const connection = new Connection(this.rpc, COMMITMENT);
// in the rare case of maximumBatchSize skipped blocks in a row,
// you might hit this error due to the recursion below
if (fromSlot > toSlot) throw new Error('solana: invalid block range');
this.logger.info(`fetching info for blocks ${fromSlot} to ${toSlot}`);
// identify block range by fetching signatures of the first and last transactions
// getSignaturesForAddress walks backwards so fromSignature occurs after toSignature
let toBlock: VersionedBlockResponse | null = null;
try {
toBlock = await connection.getBlock(toSlot, { maxSupportedTransactionVersion: 0 });
} catch (e) {
if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) {
// failed to get confirmed block: slot was skipped or missing in long-term storage
return this.getVaaLogs(fromSlot, toSlot - 1);
} else {
throw e;
}
}
if (!toBlock || !toBlock.blockTime || toBlock.transactions.length === 0) {
return this.getVaaLogs(fromSlot, toSlot - 1);
}
const fromSignature =
toBlock.transactions[toBlock.transactions.length - 1].transaction.signatures[0];
let fromBlock: VersionedBlockResponse | null = null;
try {
fromBlock = await connection.getBlock(fromSlot, { maxSupportedTransactionVersion: 0 });
} catch (e) {
if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) {
// failed to get confirmed block: slot was skipped or missing in long-term storage
return this.getVaaLogs(fromSlot + 1, toSlot);
} else {
throw e;
}
}
if (!fromBlock || !fromBlock.blockTime || fromBlock.transactions.length === 0) {
return this.getVaaLogs(fromSlot + 1, toSlot);
}
const toSignature = fromBlock.transactions[0].transaction.signatures[0];
// get all core bridge signatures between fromTransaction and toTransaction
let numSignatures = this.getSignaturesLimit;
let currSignature: string | undefined = fromSignature;
while (numSignatures === this.getSignaturesLimit) {
const signatures: ConfirmedSignatureInfo[] = await connection.getSignaturesForAddress(
new PublicKey(WORMHOLE_PROGRAM_ID),
{
before: currSignature,
until: toSignature,
limit: this.getSignaturesLimit,
},
);
this.logger.info(`processing ${signatures.length} transactions`);
// In order to determine if a transaction has a Wormhole message, we normalize and iterate
// through all instructions in the transaction. Only PostMessage instructions are relevant
// when looking for messages. PostMessageUnreliable instructions are ignored because there
// are no data availability guarantees (ie the associated message accounts may have been
// reused, overwriting previous data). Then, the message account is the account given by
// the second index in the instruction's account key indices. From here, we can fetch the
// message data from the account and parse out the emitter and sequence.
const results = await connection.getTransactions(
signatures.map((s) => s.signature),
{
maxSupportedTransactionVersion: 0,
},
);
if (results.length !== signatures.length) {
throw new Error(`solana: failed to fetch tx for signatures`);
}
for (const res of results) {
if (res?.meta?.err) {
// skip errored txs
continue;
}
if (!res || !res.blockTime) {
throw new Error(
`solana: failed to fetch tx for signature ${
res?.transaction.signatures[0] || 'unknown'
}`,
);
}
const message = res.transaction.message;
const accountKeys = isLegacyMessage(message)
? message.accountKeys
: message.staticAccountKeys;
const programIdIndex = accountKeys.findIndex((i) => i.toBase58() === WORMHOLE_PROGRAM_ID);
const instructions = message.compiledInstructions;
const innerInstructions =
res.meta?.innerInstructions?.flatMap((i) =>
i.instructions.map(normalizeCompileInstruction),
) || [];
const whInstructions = innerInstructions
.concat(instructions)
.filter((i) => i.programIdIndex === programIdIndex);
for (const instruction of whInstructions) {
// skip if not postMessage instruction
const instructionId = instruction.data;
if (instructionId[0] !== 0x01) continue;
const accountId = accountKeys[instruction.accountKeyIndexes[1]];
const { message } = await getPostedMessage(connection, accountId.toBase58(), COMMITMENT);
const { sequence, emitterAddress, payload } = message || {};
// console.log('res', res);
// console.log('instruction', instruction);
// console.log(
// 'parseLog',
// await getPostedMessage(connection, accountId.toBase58(), COMMITMENT),
// );
const blockNumber = res.slot.toString();
const chainName = this.chain;
const emitter = emitterAddress.toString('hex');
const parsePayload = payload.toString('hex');
const parseSequence = sequence.toString();
const sender = null;
const txHash = res.transaction.signatures[0];
const vaaLog = makeVaaLog({
chainName,
emitter,
sequence: parseSequence,
txHash,
sender,
blockNumber,
payload: parsePayload,
});
vaaLogs.push(vaaLog);
}
}
numSignatures = signatures.length;
currSignature = signatures.at(-1)?.signature;
}
return vaaLogs;
} }
override isValidVaaKey(key: string) { override isValidVaaKey(key: string) {

View File

@ -1,6 +1,6 @@
import { ChainName } from '@certusone/wormhole-sdk'; import { ChainName } from '@certusone/wormhole-sdk';
import BaseDB from '../databases/BaseDB'; import BaseDB from '../databases/BaseDB';
import { VaaLog } from '../databases/types'; import { VaaLog, VaasByBlock } from '../databases/types';
import BaseSNS from '../services/SNS/BaseSNS'; import BaseSNS from '../services/SNS/BaseSNS';
import { WormholeLogger } from '../utils/logger'; import { WormholeLogger } from '../utils/logger';
import { AlgorandWatcher } from './AlgorandWatcher'; import { AlgorandWatcher } from './AlgorandWatcher';
@ -32,6 +32,7 @@ export interface WatcherImplementation {
sns?: BaseSNS | null; sns?: BaseSNS | null;
db?: BaseDB; db?: BaseDB;
getFinalizedBlockNumber(): Promise<number>; getFinalizedBlockNumber(): Promise<number>;
getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock>;
getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]>; getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]>;
isValidBlockKey(key: string): boolean; isValidBlockKey(key: string): boolean;
isValidVaaKey(key: string): boolean; isValidVaaKey(key: string): boolean;