Refactor/event watcher (#647)

* feat: add EVM event-watcher support with mongodb

* refactor: DB, Watcher, SNS Classes
This commit is contained in:
Ricardo Olarte 2023-08-22 13:52:30 -05:00 committed by GitHub
parent 3fa1804c1a
commit bf02b2054b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1209 additions and 1227 deletions

View File

@ -1,15 +1,28 @@
LOG_DIR=.
#Log
LOG_DIR=
LOG_LEVEL=info
ETH_RPC=
#RPC
ETH_RPC=https://eth.llamarpc.com
#Database source
DB_SOURCE=
JSON_DB_FILE=db.json
JSON_LAST_BLOCK_FILE=lastBlockByChain.json
#Server
PORT=3005
#MongoDB
MONGODB_URI=mongodb://localhost:27017
MONGODB_DATABASE=wormhole
#SNS source
SNS_SOURCE=aws
#AWS
AWS_SNS_REGION=
AWS_TOPIC_ARN=
AWS_SNS_TOPIC_ARN=
AWS_SNS_SUBJECT=EventWatcher
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_SECRET_ACCESS_KEY=

View File

@ -3,13 +3,13 @@ dotenv.config();
import { ChainId, coalesceChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { chunkArray, sleep } from '../src/common';
import { BigtableDatabase } from '../src/databases/BigtableDatabase';
import { JsonDatabase } from '../src/databases/JsonDatabase';
import { VaasByBlock } from '../src/databases/types';
import JsonDB from '../src/databases/JsonDB';
// This script backfills the bigtable db from a json db
(async () => {
const localDb = new JsonDatabase();
const localDb = new JsonDB();
const remoteDb = new BigtableDatabase();
const dbEntries = Object.entries(localDb.db);
@ -25,7 +25,7 @@ import { VaasByBlock } from '../src/databases/types';
}, {});
await remoteDb.storeVaasByBlock(
coalesceChainName(Number(chain) as ChainId),
chunkedVaasByBlock
chunkedVaasByBlock,
);
await sleep(500);
}

View File

@ -3,7 +3,7 @@ dotenv.config();
import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import axios from 'axios';
import ora from 'ora';
import { initDb } from '../src/databases/utils';
import { getDB } from '../src/databases/utils';
import { AXIOS_CONFIG_JSON } from '../src/consts';
import { ArbitrumWatcher } from '../src/watchers/ArbitrumWatcher';
import { LOG_MESSAGE_PUBLISHED_TOPIC } from '../src/watchers/EVMWatcher';
@ -11,14 +11,14 @@ import { LOG_MESSAGE_PUBLISHED_TOPIC } from '../src/watchers/EVMWatcher';
// This script exists because the Arbitrum RPC node only supports a 10 block range which is super slow
(async () => {
const db = initDb();
const db = getDB();
const chain: ChainName = 'arbitrum';
const endpoint = `https://api.arbiscan.io/api?module=logs&action=getLogs&address=${CONTRACTS.MAINNET.arbitrum.core}&topic0=${LOG_MESSAGE_PUBLISHED_TOPIC}&apikey=YourApiKeyToken`;
// fetch all message publish logs for core bridge contract from explorer
let log = ora('Fetching logs from Arbiscan...').start();
const blockNumbers = (await axios.get(endpoint, AXIOS_CONFIG_JSON)).data.result.map((x: any) =>
parseInt(x.blockNumber, 16)
parseInt(x.blockNumber, 16),
);
log.succeed(`Fetched ${blockNumbers.length} logs from Arbiscan`);
// use the watcher to fetch corresponding blocks

View File

@ -4,7 +4,7 @@ import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/cons
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../src/common';
import { BlockResult } from 'near-api-js/lib/providers/provider';
import ora from 'ora';
import { initDb } from '../src/databases/utils';
import { getDB } from '../src/databases/utils';
import { getNearProvider, getTransactionsByAccountId, NEAR_ARCHIVE_RPC } from '../src/utils/near';
import { getMessagesFromBlockResults } from '../src/watchers/NearWatcher';
@ -19,11 +19,11 @@ import { getMessagesFromBlockResults } from '../src/watchers/NearWatcher';
const BATCH_SIZE = 1000;
(async () => {
const db = initDb();
const db = getDB();
const chain: ChainName = 'near';
const provider = await getNearProvider(NEAR_ARCHIVE_RPC);
const fromBlock = Number(
(await db.getLastBlockByChain(chain)) ?? INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain] ?? 0
(await db.getLastBlockByChain(chain)) ?? INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain] ?? 0,
);
// fetch all transactions for core bridge contract from explorer
@ -32,7 +32,7 @@ const BATCH_SIZE = 1000;
const transactions = await getTransactionsByAccountId(
CONTRACTS.MAINNET.near.core,
BATCH_SIZE,
toBlock.header.timestamp.toString().padEnd(19, '9') // pad to nanoseconds
toBlock.header.timestamp.toString().padEnd(19, '9'), // pad to nanoseconds
);
log.succeed(`Fetched ${transactions.length} transactions from NEAR Explorer`);

View File

@ -0,0 +1,59 @@
import { ChainName, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
export const env = {
NODE_ENV: process.env.NODE_ENV,
LOG_DIR: process.env.LOG_DIR,
LOG_LEVEL: process.env.LOG_LEVEL || 'info',
ETH_RPC: process.env.ETH_RPC,
DB_SOURCE: process.env.NODE_ENV === 'test' ? 'local' : process.env.DB_SOURCE || 'local',
JSON_DB_FILE: process.env.JSON_DB_FILE || './db.json',
JSON_LAST_BLOCK_FILE: process.env.JSON_LAST_BLOCK_FILE || './lastBlockByChain.json',
PORT: process.env.PORT,
MONGODB_URI: process.env.MONGODB_URI,
MONGODB_DATABASE: process.env.MONGODB_DATABASE,
SNS_SOURCE: process.env.SNS_SOURCE,
AWS_SNS_REGION: process.env.AWS_SNS_REGION,
AWS_SNS_TOPIC_ARN: process.env.AWS_SNS_TOPIC_ARN,
AWS_SNS_SUBJECT: process.env.AWS_SNS_SUBJECT,
AWS_ACCESS_KEY_ID: process.env.AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY: process.env.AWS_SECRET_ACCESS_KEY,
} as const;
// EVM Chains not supported
// aurora, gnosis, neon, sepolia
export const evmChains: EVMChainName[] = [
'acala',
'arbitrum',
'avalanche',
'base',
'bsc',
'celo',
'ethereum',
'fantom',
'karura',
'klaytn',
'moonbeam',
'oasis',
'optimism',
'polygon',
];
export const supportedChains: ChainName[] = [
...evmChains,
'algorand',
'aptos',
'injective',
'near',
'solana',
'sui',
'terra',
'terra2',
'xpla',
];

View File

@ -1,5 +1,6 @@
import { ChainName, CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { AxiosRequestConfig } from 'axios';
import { env } from './config';
export const TIMEOUT = 0.5 * 1000;
@ -25,37 +26,36 @@ export const TIMEOUT = 0.5 * 1000;
// This node didn't work: 'https://arb1.arbitrum.io/rpc',
export const RPCS_BY_CHAIN: { [key in ChainName]?: string } = {
ethereum: process.env.ETH_RPC || 'https://svc.blockdaemon.com/ethereum/mainnet/native',
bsc: 'https://bsc-dataseed2.defibit.io',
polygon: 'https://rpc.ankr.com/polygon',
avalanche: 'https://rpc.ankr.com/avalanche',
oasis: 'https://emerald.oasis.dev',
algorand: 'https://mainnet-api.algonode.cloud',
fantom: 'https://rpc.ankr.com/fantom',
karura: 'https://eth-rpc-karura.aca-api.network',
acala: 'https://eth-rpc-acala.aca-api.network',
klaytn: 'https://klaytn-mainnet-rpc.allthatnode.com:8551',
celo: 'https://forno.celo.org',
moonbeam: 'https://rpc.ankr.com/moonbeam',
arbitrum: 'https://arb1.arbitrum.io/rpc',
optimism: 'https://rpc.ankr.com/optimism',
algorand: 'https://mainnet-api.algonode.cloud',
aptos: 'https://fullnode.mainnet.aptoslabs.com/',
near: 'https://rpc.mainnet.near.org',
xpla: 'https://dimension-lcd.xpla.dev',
terra2: 'https://phoenix-lcd.terra.dev',
// terra: 'https://columbus-fcd.terra.dev',
terra: 'https://terra-classic-fcd.publicnode.com',
injective: 'https://api.injective.network',
solana: process.env.SOLANA_RPC ?? 'https://api.mainnet-beta.solana.com',
sui: 'https://rpc.mainnet.sui.io',
arbitrum: 'https://arb1.arbitrum.io/rpc',
avalanche: 'https://rpc.ankr.com/avalanche',
base: 'https://developer-access-mainnet.base.org',
bsc: 'https://bsc-dataseed2.defibit.io',
celo: 'https://forno.celo.org',
ethereum: env.ETH_RPC || 'https://svc.blockdaemon.com/ethereum/mainnet/native',
fantom: 'https://rpc.ankr.com/fantom',
injective: 'https://api.injective.network',
karura: 'https://eth-rpc-karura.aca-api.network',
klaytn: 'https://klaytn-mainnet-rpc.allthatnode.com:8551',
moonbeam: 'https://rpc.ankr.com/moonbeam',
near: 'https://rpc.mainnet.near.org',
oasis: 'https://emerald.oasis.dev',
optimism: 'https://rpc.ankr.com/optimism',
polygon: 'https://rpc.ankr.com/polygon',
solana: 'https://api.mainnet-beta.solana.com',
sui: 'https://rpc.mainnet.sui.io',
terra: 'https://terra-classic-fcd.publicnode.com', // 'https://columbus-fcd.terra.dev',
terra2: 'https://phoenix-lcd.terra.dev',
xpla: 'https://dimension-lcd.xpla.dev',
};
// Separating for now so if we max out infura we can keep Polygon going
export const POLYGON_ROOT_CHAIN_RPC = 'https://rpc.ankr.com/eth';
export const POLYGON_ROOT_CHAIN_ADDRESS = '0x86E4Dc95c7FBdBf52e33D563BbDB00823894C287';
// Optimism watcher relies on finalized calls which don't work right on Ankr
export const OPTIMISM_CTC_CHAIN_RPC = process.env.ETH_RPC;
export const OPTIMISM_CTC_CHAIN_RPC = env.ETH_RPC;
export const OPTIMISM_CTC_CHAIN_ADDRESS = '0x5E4e65926BA27467555EB562121fac00D24E9dD2';
export const ALGORAND_INFO = {
@ -68,11 +68,6 @@ export const ALGORAND_INFO = {
token: '',
};
export const DB_SOURCE =
process.env.NODE_ENV === 'test' ? 'local' : process.env.DB_SOURCE || 'local';
export const JSON_DB_FILE = process.env.JSON_DB_FILE || './db.json';
export const DB_LAST_BLOCK_FILE = process.env.DB_LAST_BLOCK_FILE || './lastBlockByChain.json';
// without this, axios request will error `Z_BUF_ERROR`: https://github.com/axios/axios/issues/5346
export const AXIOS_CONFIG_JSON: AxiosRequestConfig = {
headers: {

View File

@ -0,0 +1,47 @@
import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../common/consts';
import { getLogger, WormholeLogger } from '../utils/logger';
import { DBImplementation, LastBlockByChain, VaaLog } from './types';
abstract class BaseDB implements DBImplementation {
public logger: WormholeLogger;
public lastBlockByChain: LastBlockByChain = {};
constructor() {
this.logger = getLogger('db');
this.lastBlockByChain = {};
}
public async start(): Promise<void> {
await this.connect();
await this.getLastBlocksProcessed();
console.log('----------DB CONFIGURED------------');
}
public async getResumeBlockByChain(chain: ChainName): Promise<number | null> {
const lastBlock = this.getLastBlockByChain(chain);
const initialBlock = INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain];
if (lastBlock) return Number(lastBlock) + 1;
if (initialBlock) return Number(initialBlock);
return null;
}
public getLastBlockByChain(chain: ChainName): string | null {
const chainId = coalesceChainId(chain);
const blockInfo = this.lastBlockByChain?.[chainId];
if (blockInfo) {
const tokens = String(blockInfo)?.split('/');
return chain === 'aptos' ? tokens.at(-1)! : tokens[0];
}
return null;
}
abstract connect(): Promise<void>;
abstract getLastBlocksProcessed(): Promise<void>;
abstract storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void>;
abstract storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void>;
}
export default BaseDB;

View File

@ -1,33 +0,0 @@
import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { getLogger, WormholeLogger } from '../utils/logger';
import { VaaLog, VaasByBlock } from './types';
export class Database {
logger: WormholeLogger;
constructor() {
this.logger = getLogger('db');
}
static filterEmptyBlocks(vaasByBlock: VaasByBlock): VaasByBlock {
const filteredVaasByBlock: VaasByBlock = {};
for (const [block, vaas] of Object.entries(vaasByBlock)) {
if (vaas.length > 0) filteredVaasByBlock[block] = [...vaas];
}
return filteredVaasByBlock;
}
async getLastBlockByChain(chain: ChainName): Promise<string | null> {
throw new Error('Not Implemented');
}
async storeVaasByBlock(chain: ChainName, vaasByBlock: VaasByBlock): Promise<void> {
throw new Error('Not Implemented');
}
async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void> {
throw new Error('Not Implemented');
}
async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void> {
throw new Error('Not Implemented');
}
}

View File

@ -0,0 +1,54 @@
import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { readFileSync, writeFileSync } from 'fs';
import { env } from '../config';
import BaseDB from './BaseDB';
import { VaaLog } from './types';
const ENCODING = 'utf8';
export default class JsonDB extends BaseDB {
db: {} | null = null;
dbFile: string;
dbLastBlockFile: string;
constructor() {
super();
this.db = {};
this.lastBlockByChain = {};
this.dbFile = env.JSON_DB_FILE;
this.dbLastBlockFile = env.JSON_LAST_BLOCK_FILE;
}
async connect(): Promise<void> {
try {
const rawDb = readFileSync(this.dbFile, ENCODING);
this.db = rawDb ? JSON.parse(rawDb) : {};
console.log('---CONNECTED TO JsonDB---');
} catch (e) {
this.logger.warn(`${this.dbFile} does not exists, creating new file`);
this.db = {};
}
}
async getLastBlocksProcessed(): Promise<void> {
try {
const rawLastBlockByChain = readFileSync(this.dbLastBlockFile, ENCODING);
this.lastBlockByChain = rawLastBlockByChain ? JSON.parse(rawLastBlockByChain) : {};
} catch (e) {
this.logger.warn(`${this.dbLastBlockFile} does not exists, creating new file`);
this.lastBlockByChain = {};
}
}
override async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void> {
this.db = [{ ...this.db, ...vaaLogs }];
writeFileSync(this.dbFile, JSON.stringify(this.db, null, 2), ENCODING);
}
override async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void> {
const chainId = coalesceChainId(chain);
this.lastBlockByChain[chainId] = String(lastBlock);
writeFileSync(this.dbLastBlockFile, JSON.stringify(this.lastBlockByChain, null, 2), ENCODING);
}
}

View File

@ -1,63 +0,0 @@
import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { readFileSync, writeFileSync } from 'fs';
import { DB_LAST_BLOCK_FILE, JSON_DB_FILE } from '../consts';
import { Database } from './Database';
import { DB, LastBlockByChain, VaasByBlock } from './types';
const ENCODING = 'utf8';
export class JsonDatabase extends Database {
db: DB;
lastBlockByChain: LastBlockByChain;
dbFile: string;
dbLastBlockFile: string;
constructor() {
super();
this.db = {};
this.lastBlockByChain = {};
if (!process.env.JSON_DB_FILE) {
this.logger.info(`no db file set, using default path=${JSON_DB_FILE}`);
}
if (!process.env.DB_LAST_BLOCK_FILE) {
this.logger.info(`no db file set, using default path=${DB_LAST_BLOCK_FILE}`);
}
this.dbFile = JSON_DB_FILE;
this.dbLastBlockFile = DB_LAST_BLOCK_FILE;
try {
const rawDb = readFileSync(this.dbFile, ENCODING);
this.db = JSON.parse(rawDb);
const rawLast = readFileSync(this.dbLastBlockFile, ENCODING);
this.lastBlockByChain = JSON.parse(rawLast);
} catch (e) {
this.logger.warn('Failed to load DB, initiating a fresh one.');
this.db = {};
}
}
async getLastBlockByChain(chain: ChainName): Promise<string | null> {
const chainId = coalesceChainId(chain);
const blockInfo = this.lastBlockByChain[chainId];
if (blockInfo) {
const tokens = blockInfo.split('/');
return chain === 'aptos' ? tokens.at(-1)! : tokens[0];
}
return null;
}
async storeVaasByBlock(chain: ChainName, vaasByBlock: VaasByBlock): Promise<void> {
const chainId = coalesceChainId(chain);
const filteredVaasByBlock = Database.filterEmptyBlocks(vaasByBlock);
if (Object.keys(filteredVaasByBlock).length) {
this.db[chainId] = { ...(this.db[chainId] || {}), ...filteredVaasByBlock };
writeFileSync(this.dbFile, JSON.stringify(this.db), ENCODING);
}
// this will always overwrite the "last" block, so take caution if manually backfilling gaps
const blockKeys = Object.keys(vaasByBlock).sort(
(bk1, bk2) => Number(bk1.split('/')[0]) - Number(bk2.split('/')[0])
);
if (blockKeys.length) {
this.lastBlockByChain[chainId] = blockKeys[blockKeys.length - 1];
writeFileSync(this.dbLastBlockFile, JSON.stringify(this.lastBlockByChain), ENCODING);
}
}
}

View File

@ -1,85 +1,50 @@
import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { Database } from './Database';
import { LastBlockByChain, VaaLog, VaasByBlock } from './types';
import BaseDB from './BaseDB';
import { VaaLog } from './types';
import * as mongoDB from 'mongodb';
import { env } from '../config';
const WORMHOLE_TX_COLLECTION: string = 'wormholeTx';
const WORMHOLE_LAST_BLOCK_COLLECTION: string = 'lastBlockByChain';
export class MongoDatabase extends Database {
export default class MongoDB extends BaseDB {
private client: mongoDB.MongoClient | null = null;
private db: mongoDB.Db | null = null;
private wormholeTx: mongoDB.Collection | null = null;
private lastTxBlockByChain: mongoDB.Collection | null = null;
private lastBlockByChain: LastBlockByChain | null = null;
private wormholeTxCollection: mongoDB.Collection | null = null;
private lastTxBlockByChainCollection: mongoDB.Collection | null = null;
constructor() {
super();
}
this.lastBlockByChain = null;
async connect(): Promise<void> {
try {
this.client = new mongoDB.MongoClient(process.env.MONGODB_URI as string);
this.connectDB();
this.db = this.client.db(process.env.MONGODB_DATABASE ?? 'wormhole');
this.wormholeTx = this.db.collection(WORMHOLE_TX_COLLECTION);
this.lastTxBlockByChain = this.db.collection(WORMHOLE_LAST_BLOCK_COLLECTION);
this.client = new mongoDB.MongoClient(env.MONGODB_URI as string);
this.db = this.client.db(env.MONGODB_DATABASE ?? 'wormhole');
this.wormholeTxCollection = this.db.collection(WORMHOLE_TX_COLLECTION);
this.lastTxBlockByChainCollection = this.db.collection(WORMHOLE_LAST_BLOCK_COLLECTION);
await this.client?.connect();
console.log('---CONNECTED TO MongoDB---');
} catch (e) {
throw new Error(`(MongoDB) Error: ${e}`);
}
}
async connectDB() {
await this.client?.connect();
console.log(`Successfully connected to database: ${this.db?.databaseName} `);
}
async getLastBlockByChainFromDB() {
const latestBlocks = await this.lastTxBlockByChain?.findOne({});
async getLastBlocksProcessed(): Promise<void> {
const latestBlocks = await this.lastTxBlockByChainCollection?.findOne({});
const json = JSON.parse(JSON.stringify(latestBlocks));
this.lastBlockByChain = json;
this.lastBlockByChain = json || {};
}
async getLastBlockByChain(chain: ChainName): Promise<string | null> {
if (!this.lastBlockByChain) await this.getLastBlockByChainFromDB();
const chainId = coalesceChainId(chain);
const blockInfo: string | undefined = this.lastBlockByChain?.[chainId];
if (blockInfo) {
const tokens = String(blockInfo)?.split('/');
return chain === 'aptos' ? tokens.at(-1)! : tokens[0];
}
return null;
override async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void> {
await this.wormholeTxCollection?.insertMany(vaaLogs);
}
async storeVaasByBlock(chain: ChainName, vaasByBlock: VaasByBlock): Promise<void> {
// const chainId = coalesceChainId(chain);
// const filteredVaasByBlock = Database.filterEmptyBlocks(vaasByBlock);
// if (Object.keys(filteredVaasByBlock).length) {
// }
// this will always overwrite the "last" block, so take caution if manually backfilling gaps
// const blockKeys = Object.keys(vaasByBlock).sort(
// (bk1, bk2) => Number(bk1.split('/')[0]) - Number(bk2.split('/')[0]),
// );
// if (blockKeys.length) {
// this.lastBlockByChain[chainId] = blockKeys[blockKeys.length - 1];
// await this.wormholeTx.insertOne({
// chainId: chainId,
// block: this.lastBlockByChain[chainId],
// data: vaasByBlock,
// });
// }
}
async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void> {
await this.wormholeTx?.insertMany(vaaLogs);
}
async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void> {
override async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void> {
const chainId = coalesceChainId(chain);
await this.lastTxBlockByChain?.findOneAndUpdate(
await this.lastTxBlockByChainCollection?.findOneAndUpdate(
{},
{
$set: {
@ -92,14 +57,4 @@ export class MongoDatabase extends Database {
},
);
}
async storeVaa(chain: ChainName, txHash: string, vaa_id: string, payload: string): Promise<void> {
const chainId = coalesceChainId(chain);
this.wormholeTx?.insertOne({
chainId: chainId,
txHash: txHash,
vaa_id: vaa_id,
payload: payload,
});
}
}

View File

@ -2,10 +2,10 @@ import { CHAIN_ID_SOLANA } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { expect, test } from '@jest/globals';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../../common';
import { JsonDatabase } from '../JsonDatabase';
import { getResumeBlockByChain, initDb, makeBlockKey } from '../utils';
import { getResumeBlockByChain, getDB, makeBlockKey } from '../utils';
test('getResumeBlockByChain', async () => {
const db = initDb() as JsonDatabase;
const db = getDB() as JsonDatabase;
const fauxBlock = '98765';
const blockKey = makeBlockKey(fauxBlock, new Date().toISOString());
db.lastBlockByChain = { [CHAIN_ID_SOLANA]: blockKey };
@ -15,7 +15,7 @@ test('getResumeBlockByChain', async () => {
// if a chain is not in the database, the initial deployment block should be returned
expect(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.moonbeam).toBeDefined();
expect(await getResumeBlockByChain('moonbeam')).toEqual(
Number(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.moonbeam)
Number(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.moonbeam),
);
// if neither, null should be returned
expect(INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN.unset).toBeUndefined();

View File

@ -1,7 +1,20 @@
import { ChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { Row } from '@google-cloud/bigtable';
export type VaaLog = {
id: string;
import { ChainId, ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import JsonDB from './JsonDB';
import MongoDB from './MongoDB';
export type DBOptionTypes = MongoDB | JsonDB;
export interface DBImplementation {
start(): Promise<void>;
connect(): Promise<void>;
getResumeBlockByChain(chain: ChainName): Promise<number | null>;
getLastBlocksProcessed(): Promise<void>;
getLastBlockByChain(chain: ChainName): string | null;
storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void>;
storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void>;
}
export interface VaaLog {
vaaId: string;
chainId: number;
chainName: string;
emitter: string;
@ -10,62 +23,9 @@ export type VaaLog = {
sender: string;
payload: any;
blockNumber: number;
};
export type VaasByBlock = { [blockInfo: string]: string[] };
export type DB = { [chain in ChainId]?: VaasByBlock };
indexedAt?: string | number;
updatedAt?: string | number;
createdAt?: string | number;
}
export type LastBlockByChain = { [chain in ChainId]?: string };
export type JSONArray = string;
export type BigtableMessagesRow = {
key: string;
data: {
// column family
info: {
// columns
timestamp?: { value: string; timestamp: string };
txHash?: { value: string; timestamp: string };
hasSignedVaa?: { value: number; timestamp: string };
};
};
};
export interface BigtableSignedVAAsRow {
key: string;
data: {
// column family
info: {
// columns
bytes: { value: Buffer; timestamp: string };
};
};
}
export interface BigtableVAAsByTxHashRow {
key: string;
data: {
// column family
info: {
// columns
vaaKeys: { value: JSONArray; timestamp: string };
};
};
}
export interface BigtableMessagesResultRow extends Row {
key: string;
data: {
// column family
info: {
// columns
timestamp?: [{ value: string; timestamp: string }];
txHash?: [{ value: string; timestamp: string }];
hasSignedVaa?: [{ value: number; timestamp: string }];
};
};
}
export interface BigtableSignedVAAsResultRow extends Row {
key: string;
data: {
// column family
info: {
// columns
bytes: [{ value: Buffer; timestamp: string }];
};
};
}

View File

@ -1,15 +1,19 @@
import { ChainId, ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN, MAX_UINT_64, padUint16, padUint64 } from '../common';
import { DB_SOURCE } from '../consts';
import { Database } from './Database';
import { JsonDatabase } from './JsonDatabase';
import { VaaLog, VaasByBlock } from './types';
import { MongoDatabase } from './MongoDB';
import { MAX_UINT_64, padUint16, padUint64 } from '../common';
import JsonDB from './JsonDB';
import MongoDB from './MongoDB';
import { env } from '../config';
import { DBOptionTypes } from './types';
// Bigtable Message ID format
// chain/MAX_UINT64-block/emitter/sequence
// 00002/00000000000013140651/0000000000000000000000008ea8874192c8c715e620845f833f48f39b24e222/00000000000000000000
export const getDB = (): DBOptionTypes => {
if (env.DB_SOURCE === 'mongo') return new MongoDB();
return new JsonDB();
};
export function makeMessageId(
chainId: number,
block: string,
@ -45,66 +49,3 @@ export const makeVaaKey = (
emitter: string,
seq: string,
): string => `${transactionHash}:${coalesceChainId(chain)}/${emitter}/${seq}`;
// make a bigtable row key for the `vaasByTxHash` table
export const makeVAAsByTxHashRowKey = (txHash: string, chain: number): string =>
`${txHash}/${padUint16(chain.toString())}`;
// make a bigtable row key for the `signedVAAs` table
export const makeSignedVAAsRowKey = (chain: number, emitter: string, sequence: string): string =>
`${padUint16(chain.toString())}/${emitter}/${padUint64(sequence)}`;
let database: Database = new Database();
export const initDb = (): Database => {
if (DB_SOURCE === 'mongo') {
database = new MongoDatabase();
} else {
database = new JsonDatabase();
}
return database;
};
export const getResumeBlockByChain = async (chain: ChainName): Promise<number | null> => {
const lastBlock = await database.getLastBlockByChain(chain);
const initialBlock = INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain];
return lastBlock !== null
? Number(lastBlock) + 1
: initialBlock !== undefined
? Number(initialBlock)
: null;
};
export const storeVaasByBlock = async (
chain: ChainName,
vaasByBlock: VaasByBlock,
): Promise<void> => {
return database.storeVaasByBlock(chain, vaasByBlock);
};
export const storeVaaLogs = (chain: ChainName, vaaLogs: VaaLog[]): Promise<void> => {
return database.storeVaaLogs(chain, vaaLogs);
};
export const storeLatestProcessBlock = (chain: ChainName, lastBlock: number): Promise<void> => {
return database.storeLatestProcessBlock(chain, lastBlock);
};
export function printRow(rowkey: string, rowData: { [x: string]: any }) {
console.log(`Reading data for ${rowkey}:`);
for (const columnFamily of Object.keys(rowData)) {
const columnFamilyData = rowData[columnFamily];
console.log(`Column Family ${columnFamily}`);
for (const columnQualifier of Object.keys(columnFamilyData)) {
const col = columnFamilyData[columnQualifier];
for (const cell of col) {
const labels = cell.labels.length ? ` [${cell.labels.join(',')}]` : '';
console.log(`\t${columnQualifier}: ${cell.value} @${cell.timestamp}${labels}`);
}
}
}
console.log();
}

View File

@ -1,74 +1,53 @@
import * as dotenv from 'dotenv';
import dotenv from 'dotenv';
dotenv.config();
import { ChainName, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { initDb } from './databases/utils';
import { getDB } from './databases/utils';
import { getSNS } from './services/SNS/utils';
import { makeFinalizedWatcher } from './watchers/utils';
import { InfrastructureController } from './infrastructure/infrastructure.controller';
import { createServer } from './builder/server';
import { env, evmChains } from './config';
import { DBOptionTypes } from './databases/types';
import { SNSOptionTypes } from './services/SNS/types';
class EventWatcher {
private infrastructureController = new InfrastructureController();
// EVM Chains not supported
// aurora, gnosis, neon, sepolia
const evmChains: EVMChainName[] = [
'acala',
'arbitrum',
'avalanche',
'base',
'bsc',
'celo',
'ethereum',
'fantom',
'karura',
'klaytn',
'moonbeam',
'oasis',
'optimism',
'polygon',
];
const supportedChains: ChainName[] = [
...evmChains,
'algorand',
'aptos',
'injective',
'near',
'solana',
'sui',
'terra',
'terra2',
'xpla',
];
const db = initDb();
const infrastructureController = new InfrastructureController();
const startServer = async () => {
const port = Number(process.env.PORT) || 3005;
const server = await createServer(port);
server.get('/ready', { logLevel: 'silent' }, infrastructureController.ready);
server.get('/health', { logLevel: 'silent' }, infrastructureController.health);
server.listen({ port, host: '0.0.0.0' }, (err: any, address: any) => {
if (err) {
process.exit(1);
}
console.log(`Server listening at ${address}`);
});
};
startServer();
const start = async () => {
// We wait to the database to fetch the `latestBlocks` (avoid multi requests)
// Im trying not to change too much the codebase.
await db.getLastBlockByChain('unset');
// for (const chain of supportedChains) {
for (const chain of evmChains) {
makeFinalizedWatcher(chain).watch();
constructor(private db: DBOptionTypes, private sns: SNSOptionTypes) {
this.setup();
}
};
start();
async setup() {
await this.startServer();
}
async startServer() {
const port = Number(env.PORT) || 3005;
const server = await createServer(port);
server.get('/ready', { logLevel: 'silent' }, this.infrastructureController.ready);
server.get('/health', { logLevel: 'silent' }, this.infrastructureController.health);
server.listen({ port, host: '0.0.0.0' }, (err: any, address: any) => {
if (err) process.exit(1);
console.log(`Server listening at ${address}`);
});
}
async run() {
await this.db.start();
// for (const chain of supportedChains) {
for (const chain of evmChains) {
const watcher = makeFinalizedWatcher(chain);
watcher.setDB(this.db);
watcher.setServices(this.sns);
watcher.watch();
}
}
}
// Init and run the event watcher
const db: DBOptionTypes = getDB();
const sns: SNSOptionTypes = getSNS();
const eventWatcher = new EventWatcher(db, sns);
eventWatcher.run();

View File

@ -7,14 +7,17 @@ import {
PublishBatchCommandInput,
PublishBatchRequestEntry,
} from '@aws-sdk/client-sns';
import { SNSConfig, SNSImplementation, SNSInput, SNSPublishMessageOutput } from '../types';
import { AwsSNSConfig, SNSInput, SNSPublishMessageOutput } from '../types';
import BaseSNS from '../BaseSNS';
class AwsSNS implements SNSImplementation {
private client: SNSClient | null = null;
private subject: string | null = null;
private topicArn: string | null = null;
class AwsSNS extends BaseSNS {
private client: SNSClient;
private subject: string;
private topicArn: string;
constructor(private config: AwsSNSConfig) {
super();
constructor(private config: SNSConfig) {
const { region, credentials, subject, topicArn } = this.config;
this.subject = subject;
@ -25,7 +28,7 @@ class AwsSNS implements SNSImplementation {
});
}
async publishMessage({ subject, message }: SNSInput): Promise<SNSPublishMessageOutput> {
override async publishMessage({ subject, message }: SNSInput): Promise<SNSPublishMessageOutput> {
const input: PublishCommandInput = {
TopicArn: this.topicArn!,
Subject: subject ?? this.subject!,
@ -48,7 +51,7 @@ class AwsSNS implements SNSImplementation {
};
}
async publishMessages(messages: SNSInput[]): Promise<SNSPublishMessageOutput> {
override async publishMessages(messages: SNSInput[]): Promise<SNSPublishMessageOutput> {
const CHUNK_SIZE = 10;
const batches: PublishBatchCommandInput[] = [];
const inputs: PublishBatchRequestEntry[] = messages.map(({ subject, message }) => ({

View File

@ -0,0 +1,8 @@
import { SNSImplementation, SNSInput, SNSPublishMessageOutput } from './types';
abstract class BaseSNS implements SNSImplementation {
abstract publishMessage(message: SNSInput): Promise<SNSPublishMessageOutput>;
abstract publishMessages(messages: SNSInput[]): Promise<SNSPublishMessageOutput>;
}
export default BaseSNS;

View File

@ -1,9 +1,12 @@
import AwsSNS from './AwsSNS';
export type SNSOptionTypes = AwsSNS | null;
export interface SNSImplementation {
publishMessage(message: SNSInput): Promise<SNSPublishMessageOutput>;
publishMessages(messages: SNSInput[]): Promise<SNSPublishMessageOutput>;
}
export interface SNSConfig {
export interface AwsSNSConfig {
region: string;
topicArn: string;
subject: string;

View File

@ -0,0 +1,18 @@
import { env } from '../../config';
import AwsSNS from './AwsSNS';
import { AwsSNSConfig, SNSOptionTypes } from './types';
const AwsConfig: AwsSNSConfig = {
region: env.AWS_SNS_REGION as string,
subject: env.AWS_SNS_SUBJECT as string,
topicArn: env.AWS_SNS_TOPIC_ARN as string,
credentials: {
accessKeyId: env.AWS_ACCESS_KEY_ID as string,
secretAccessKey: env.AWS_SECRET_ACCESS_KEY as string,
},
};
export const getSNS = (): SNSOptionTypes => {
if (env.SNS_SOURCE === 'aws') return new AwsSNS(AwsConfig);
return null;
};

View File

@ -1,3 +1,5 @@
import { env } from '../config';
let loggingEnv: LoggingEnvironment | undefined = undefined;
export type LoggingEnvironment = {
@ -10,8 +12,8 @@ export const getEnvironment = () => {
return loggingEnv;
} else {
loggingEnv = {
logLevel: process.env.LOG_LEVEL || 'info',
logDir: process.env.LOG_DIR,
logLevel: env.LOG_LEVEL,
logDir: env.LOG_DIR,
};
return loggingEnv;
}

View File

@ -1,17 +1,17 @@
import algosdk from 'algosdk';
import { Watcher } from './Watcher';
import BaseWatcher from './BaseWatcher';
import { ALGORAND_INFO } from '../consts';
import { VaasByBlock } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { VaaLog } from '../databases/types';
type Message = {
blockKey: string;
vaaKey: string;
};
export class AlgorandWatcher extends Watcher {
export class AlgorandWatcher extends BaseWatcher {
// Arbitrarily large since the code here is capable of pulling all logs from all via indexer pagination
maximumBatchSize: number = 100000;
override maximumBatchSize: number = 100_000;
algodClient: algosdk.Algodv2;
indexerClient: algosdk.Indexer;
@ -26,16 +26,16 @@ export class AlgorandWatcher extends Watcher {
this.algodClient = new algosdk.Algodv2(
ALGORAND_INFO.algodToken,
ALGORAND_INFO.algodServer,
ALGORAND_INFO.algodPort
ALGORAND_INFO.algodPort,
);
this.indexerClient = new algosdk.Indexer(
ALGORAND_INFO.token,
ALGORAND_INFO.server,
ALGORAND_INFO.port
ALGORAND_INFO.port,
);
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
this.logger.info(`fetching final block for ${this.chain}`);
let status = await this.algodClient.status().do();
@ -78,13 +78,13 @@ export class AlgorandWatcher extends Watcher {
messages.push({
blockKey: makeBlockKey(
transaction['confirmed-round'].toString(),
new Date(transaction['round-time'] * 1000).toISOString()
new Date(transaction['round-time'] * 1000).toISOString(),
),
vaaKey: makeVaaKey(
parentId || transaction.id,
this.chain,
Buffer.from(algosdk.decodeAddress(transaction.sender).publicKey).toString('hex'),
BigInt(`0x${Buffer.from(transaction.logs[0], 'base64').toString('hex')}`).toString()
BigInt(`0x${Buffer.from(transaction.logs[0], 'base64').toString('hex')}`).toString(),
),
});
}
@ -96,32 +96,36 @@ export class AlgorandWatcher extends Watcher {
return messages;
}
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
const txIds = await this.getApplicationLogTransactionIds(fromBlock, toBlock);
const transactions = [];
for (const txId of txIds) {
const response = await this.indexerClient.searchForTransactions().txid(txId).do();
if (response?.transactions?.[0]) {
transactions.push(response.transactions[0]);
}
}
let messages: Message[] = [];
for (const transaction of transactions) {
messages = [...messages, ...this.processTransaction(transaction)];
}
const vaasByBlock = messages.reduce((vaasByBlock, message) => {
if (!vaasByBlock[message.blockKey]) {
vaasByBlock[message.blockKey] = [];
}
vaasByBlock[message.blockKey].push(message.vaaKey);
return vaasByBlock;
}, {} as VaasByBlock);
const toBlockInfo = await this.indexerClient.lookupBlock(toBlock).do();
const toBlockTimestamp = new Date(toBlockInfo.timestamp * 1000).toISOString();
const toBlockKey = makeBlockKey(toBlock.toString(), toBlockTimestamp);
if (!vaasByBlock[toBlockKey]) {
vaasByBlock[toBlockKey] = [];
}
return vaasByBlock;
// async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
// const txIds = await this.getApplicationLogTransactionIds(fromBlock, toBlock);
// const transactions = [];
// for (const txId of txIds) {
// const response = await this.indexerClient.searchForTransactions().txid(txId).do();
// if (response?.transactions?.[0]) {
// transactions.push(response.transactions[0]);
// }
// }
// let messages: Message[] = [];
// for (const transaction of transactions) {
// messages = [...messages, ...this.processTransaction(transaction)];
// }
// const vaasByBlock = messages.reduce((vaasByBlock, message) => {
// if (!vaasByBlock[message.blockKey]) {
// vaasByBlock[message.blockKey] = [];
// }
// vaasByBlock[message.blockKey].push(message.vaaKey);
// return vaasByBlock;
// }, {} as VaasByBlock);
// const toBlockInfo = await this.indexerClient.lookupBlock(toBlock).do();
// const toBlockTimestamp = new Date(toBlockInfo.timestamp * 1000).toISOString();
// const toBlockKey = makeBlockKey(toBlock.toString(), toBlockTimestamp);
// if (!vaasByBlock[toBlockKey]) {
// vaasByBlock[toBlockKey] = [];
// }
// return vaasByBlock;
// }
override getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
}
}

View File

@ -3,10 +3,10 @@ import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN } from '../common';
import { AptosClient } from 'aptos';
import { z } from 'zod';
import { RPCS_BY_CHAIN } from '../consts';
import { VaasByBlock } from '../databases/types';
import { makeVaaKey } from '../databases/utils';
import { AptosEvent } from '../types/aptos';
import { Watcher } from './Watcher';
import BaseWatcher from './BaseWatcher';
import { VaaLog } from '../databases/types';
const APTOS_CORE_BRIDGE_ADDRESS = CONTRACTS.MAINNET.aptos.core;
const APTOS_EVENT_HANDLE = `${APTOS_CORE_BRIDGE_ADDRESS}::state::WormholeMessageHandle`;
@ -16,54 +16,58 @@ const APTOS_FIELD_NAME = 'event';
* NOTE: The Aptos watcher differs from other watchers in that it uses the event sequence number to
* fetch Wormhole messages and therefore also stores sequence numbers instead of block numbers.
*/
export class AptosWatcher extends Watcher {
export class AptosWatcher extends BaseWatcher {
client: AptosClient;
maximumBatchSize: number = 25;
override maximumBatchSize: number = 25;
constructor() {
super('aptos');
this.client = new AptosClient(RPCS_BY_CHAIN[this.chain]!);
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
return Number(
(
await this.client.getEventsByEventHandle(
APTOS_CORE_BRIDGE_ADDRESS,
APTOS_EVENT_HANDLE,
APTOS_FIELD_NAME,
{ limit: 1 }
{ limit: 1 },
)
)[0].sequence_number
)[0].sequence_number,
);
}
async getMessagesForBlocks(fromSequence: number, toSequence: number): Promise<VaasByBlock> {
const limit = toSequence - fromSequence + 1;
const events: AptosEvent[] = (await this.client.getEventsByEventHandle(
APTOS_CORE_BRIDGE_ADDRESS,
APTOS_EVENT_HANDLE,
APTOS_FIELD_NAME,
{ start: fromSequence, limit }
)) as AptosEvent[];
const vaasByBlock: VaasByBlock = {};
await Promise.all(
events.map(async ({ data, sequence_number, version }) => {
const [block, transaction] = await Promise.all([
this.client.getBlockByVersion(Number(version)),
this.client.getTransactionByVersion(Number(version)),
]);
const timestamp = new Date(Number(block.block_timestamp) / 1000).toISOString();
const blockKey = [block.block_height, timestamp, sequence_number].join('/'); // use custom block key for now so we can include sequence number
const emitter = data.sender.padStart(64, '0');
const vaaKey = makeVaaKey(transaction.hash, this.chain, emitter, data.sequence);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] ?? []), vaaKey];
})
);
return vaasByBlock;
// async getMessagesForBlocks(fromSequence: number, toSequence: number): Promise<VaasByBlock> {
// const limit = toSequence - fromSequence + 1;
// const events: AptosEvent[] = (await this.client.getEventsByEventHandle(
// APTOS_CORE_BRIDGE_ADDRESS,
// APTOS_EVENT_HANDLE,
// APTOS_FIELD_NAME,
// { start: fromSequence, limit }
// )) as AptosEvent[];
// const vaasByBlock: VaasByBlock = {};
// await Promise.all(
// events.map(async ({ data, sequence_number, version }) => {
// const [block, transaction] = await Promise.all([
// this.client.getBlockByVersion(Number(version)),
// this.client.getTransactionByVersion(Number(version)),
// ]);
// const timestamp = new Date(Number(block.block_timestamp) / 1000).toISOString();
// const blockKey = [block.block_height, timestamp, sequence_number].join('/'); // use custom block key for now so we can include sequence number
// const emitter = data.sender.padStart(64, '0');
// const vaaKey = makeVaaKey(transaction.hash, this.chain, emitter, data.sequence);
// vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] ?? []), vaaKey];
// })
// );
// return vaasByBlock;
// }
override getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
}
isValidBlockKey(key: string) {
override isValidBlockKey(key: string) {
try {
const [block, timestamp, sequence] = key.split('/');
const initialSequence = z
@ -80,7 +84,7 @@ export class AptosWatcher extends Watcher {
}
}
isValidVaaKey(key: string) {
override isValidVaaKey(key: string) {
try {
const [txHash, vaaKey] = key.split(':');
const [_, emitter, sequence] = vaaKey.split('/');

View File

@ -23,7 +23,7 @@ export class ArbitrumWatcher extends EVMWatcher {
this.maximumBatchSize = 25;
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
if (!this.rpc) {
throw new Error(`${this.chain} RPC is not defined!`);
}
@ -40,18 +40,21 @@ export class ArbitrumWatcher extends EVMWatcher {
params: ['latest', false],
},
],
AXIOS_CONFIG_JSON
AXIOS_CONFIG_JSON,
)
)?.data?.[0]?.result;
if (!l1Result || !l1Result.l1BlockNumber || !l1Result.number) {
throw new Error(
`Unable to parse result of ArbitrumWatcher::eth_getBlockByNumber for latest on ${this.rpc}`
`Unable to parse result of ArbitrumWatcher::eth_getBlockByNumber for latest on ${this.rpc}`,
);
}
const associatedL1: number = parseInt(l1Result.l1BlockNumber, 16);
const l2BlkNum: number = parseInt(l1Result.number, 16);
this.logger.debug(
'getFinalizedBlockNumber() checking map L1Block: ' + associatedL1 + ' => L2Block: ' + l2BlkNum
'getFinalizedBlockNumber() checking map L1Block: ' +
associatedL1 +
' => L2Block: ' +
l2BlkNum,
);
// Only update the map, if the L2 block number is newer

View File

@ -4,7 +4,8 @@ export class BSCWatcher extends EVMWatcher {
constructor() {
super('bsc');
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
const latestBlock = await super.getFinalizedBlockNumber();
return Math.max(latestBlock - 15, 0);
}

View File

@ -2,52 +2,37 @@ import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN, sleep } from '../common';
import { z } from 'zod';
import { TIMEOUT } from '../consts';
import { VaaLog, VaasByBlock } from '../databases/types';
import {
getResumeBlockByChain,
storeVaaLogs,
storeVaasByBlock,
storeLatestProcessBlock,
} from '../databases/utils';
import { DBOptionTypes, VaaLog } from '../databases/types';
import { getLogger, WormholeLogger } from '../utils/logger';
import AwsSNS from '../services/SNS/AwsSNS';
import { SNSConfig, SNSInput } from '../services/SNS/types';
import { SNSInput, SNSOptionTypes } from '../services/SNS/types';
import { WatcherImplementation } from './types';
const config: SNSConfig = {
region: process.env.AWS_SNS_REGION as string,
subject: process.env.AWS_SNS_SUBJECT as string,
topicArn: process.env.AWS_TOPIC_ARN as string,
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID as string,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY as string,
},
};
export class Watcher {
chain: ChainName;
logger: WormholeLogger;
abstract class BaseWatcher implements WatcherImplementation {
public logger: WormholeLogger;
maximumBatchSize: number = 100;
SNSClient: AwsSNS;
sns?: SNSOptionTypes;
db?: DBOptionTypes;
constructor(chain: ChainName) {
this.chain = chain;
constructor(public chain: ChainName) {
this.logger = getLogger(chain);
this.SNSClient = new AwsSNS(config);
}
async getFinalizedBlockNumber(): Promise<number> {
throw new Error('Not Implemented');
setDB(db: DBOptionTypes) {
this.db = db;
}
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
throw new Error('Not Implemented');
setServices(sns: SNSOptionTypes) {
this.sns = sns;
}
async getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
abstract getFinalizedBlockNumber(): Promise<number>;
abstract getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]>;
isValidVaaKey(key: string): boolean {
throw new Error('Method not implemented.');
}
isValidBlockKey(key: string) {
isValidBlockKey(key: string): boolean {
try {
const [block, timestamp] = key.split('/');
const initialBlock = z
@ -63,13 +48,11 @@ export class Watcher {
}
}
isValidVaaKey(key: string): boolean {
throw new Error('Not Implemented');
}
async watch(): Promise<void> {
let toBlock: number | null = null;
let fromBlock: number | null = await getResumeBlockByChain(this.chain);
let fromBlock: number | null = this.db
? await this.db?.getResumeBlockByChain(this.chain)
: null;
let retry = 0;
while (true) {
@ -77,23 +60,27 @@ export class Watcher {
if (fromBlock !== null && toBlock !== null && fromBlock <= toBlock) {
// fetch logs for the block range, inclusive of toBlock
toBlock = Math.min(fromBlock + this.maximumBatchSize - 1, toBlock);
this.logger.info(`fetching messages from ${fromBlock} to ${toBlock}`);
// const vaasByBlock = await this.getMessagesForBlocks(fromBlock, toBlock);
// await storeVaasByBlock(this.chain, vaasByBlock);
// Here we get all the vaa logs from LOG_MESSAGE_PUBLISHED_TOPIC
// Then store the latest processed block by Chain Id
try {
this.logger.info(`fetching messages from ${fromBlock} to ${toBlock}`);
// Here we get all the vaa logs from LOG_MESSAGE_PUBLISHED_TOPIC
const vaaLogs = await this.getVaaLogs(fromBlock, toBlock);
if (vaaLogs?.length > 0) {
await storeVaaLogs(this.chain, vaaLogs);
// Then store the vaa logs processed in db
// TODO: handle store logs failure
await this.db?.storeVaaLogs(this.chain, vaaLogs);
// Then publish the vaa logs processed in SNS
const messages: SNSInput[] = vaaLogs.map((log) => ({
message: JSON.stringify({ ...log }),
}));
this.SNSClient.publishMessages(messages);
// TODO: handle publish failure
this.sns?.publishMessages(messages);
}
await storeLatestProcessBlock(this.chain, toBlock);
// Then store the latest processed block by Chain Id
// TODO: handle store last blocks failure
await this.db?.storeLatestProcessBlock(this.chain, toBlock);
} catch (e) {
this.logger.error(e);
}
@ -119,10 +106,12 @@ export class Watcher {
} catch (e) {
retry++;
this.logger.error(e);
const expoBacko = TIMEOUT * 2 ** retry;
this.logger.warn(`backing off for ${expoBacko}ms`);
await sleep(expoBacko);
const backOffTimeoutMS = TIMEOUT * 2 ** retry;
this.logger.warn(`backing off for ${backOffTimeoutMS}ms`);
await sleep(backOffTimeoutMS);
}
}
}
}
export default BaseWatcher;

View File

@ -1,13 +1,13 @@
import { CONTRACTS, CosmWasmChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import axios from 'axios';
import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts';
import { VaasByBlock } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { Watcher } from './Watcher';
import BaseWatcher from './BaseWatcher';
import { SHA256 } from 'jscrypto/SHA256';
import { Base64 } from 'jscrypto/Base64';
import { VaaLog } from '../databases/types';
export class CosmwasmWatcher extends Watcher {
export class CosmwasmWatcher extends BaseWatcher {
latestBlockTag: string;
getBlockTag: string;
hashTag: string;
@ -38,7 +38,7 @@ export class CosmwasmWatcher extends Watcher {
return SHA256.hash(Base64.parse(data)).toString().toUpperCase();
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
const result = (await axios.get(`${this.rpc}/${this.latestBlockTag}`)).data;
if (result && result.block.header.height) {
let blockHeight: number = parseInt(result.block.header.height);
@ -51,105 +51,109 @@ export class CosmwasmWatcher extends Watcher {
throw new Error(`Unable to parse result of ${this.latestBlockTag} on ${this.rpc}`);
}
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
const address = CONTRACTS.MAINNET[this.chain].core;
if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
}
this.logger.debug(`core contract for ${this.chain} is ${address}`);
let vaasByBlock: VaasByBlock = {};
this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
// async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
// const address = CONTRACTS.MAINNET[this.chain].core;
// if (!address) {
// throw new Error(`Core contract not defined for ${this.chain}`);
// }
// this.logger.debug(`core contract for ${this.chain} is ${address}`);
// let vaasByBlock: VaasByBlock = {};
// this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
// For each block number, call {RPC}/{getBlockTag}/{block_number}
// Foreach block.data.txs[] do hexToHash() to get the txHash
// Then call {RPC}/{hashTag}/{hash} to get the logs/events
// Walk the logs/events
// // For each block number, call {RPC}/{getBlockTag}/{block_number}
// // Foreach block.data.txs[] do hexToHash() to get the txHash
// // Then call {RPC}/{hashTag}/{hash} to get the logs/events
// // Walk the logs/events
for (let blockNumber = fromBlock; blockNumber <= toBlock; blockNumber++) {
this.logger.debug('Getting block number ' + blockNumber);
const blockResult: CosmwasmBlockResult = (
await axios.get(`${this.rpc}/${this.getBlockTag}${blockNumber}`)
).data;
if (!blockResult || !blockResult.block.data) {
throw new Error('bad result for block ${blockNumber}');
}
const blockKey = makeBlockKey(
blockNumber.toString(),
new Date(blockResult.block.header.time).toISOString()
);
vaasByBlock[blockKey] = [];
let vaaKey: string = '';
let numTxs: number = 0;
if (blockResult.block.data.txs) {
numTxs = blockResult.block.data.txs.length;
}
for (let i = 0; i < numTxs; i++) {
// The following check is not needed because of the check for numTxs.
// But typescript wanted it anyway.
if (!blockResult.block.data.txs) {
continue;
}
let hash: string = this.hexToHash(blockResult.block.data.txs[i]);
this.logger.debug('blockNumber = ' + blockNumber + ', txHash[' + i + '] = ' + hash);
// console.log('Attempting to get hash', `${this.rpc}/${this.hashTag}${hash}`);
try {
const hashResult: CosmwasmHashResult = (
await axios.get(`${this.rpc}/${this.hashTag}${hash}`, AXIOS_CONFIG_JSON)
).data;
if (hashResult && hashResult.tx_response.events) {
const numEvents = hashResult.tx_response.events.length;
for (let j = 0; j < numEvents; j++) {
let type: string = hashResult.tx_response.events[j].type;
if (type === 'wasm') {
if (hashResult.tx_response.events[j].attributes) {
let attrs = hashResult.tx_response.events[j].attributes;
let emitter: string = '';
let sequence: string = '';
let coreContract: boolean = false;
// only care about _contract_address, message.sender and message.sequence
const numAttrs = attrs.length;
for (let k = 0; k < numAttrs; k++) {
const key = Buffer.from(attrs[k].key, 'base64').toString().toLowerCase();
this.logger.debug('Encoded Key = ' + attrs[k].key + ', decoded = ' + key);
if (key === 'message.sender') {
emitter = Buffer.from(attrs[k].value, 'base64').toString();
} else if (key === 'message.sequence') {
sequence = Buffer.from(attrs[k].value, 'base64').toString();
} else if (key === '_contract_address' || key === 'contract_address') {
let addr = Buffer.from(attrs[k].value, 'base64').toString();
if (addr === address) {
coreContract = true;
}
}
}
if (coreContract && emitter !== '' && sequence !== '') {
vaaKey = makeVaaKey(hash, this.chain, emitter, sequence);
this.logger.debug('blockKey: ' + blockKey);
this.logger.debug('Making vaaKey: ' + vaaKey);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
}
}
}
} else {
this.logger.error('There were no hashResults');
}
} catch (e: any) {
// console.error(e);
if (
e?.response?.status === 500 &&
e?.response?.data?.code === 2 &&
e?.response?.data?.message.startsWith('json: error calling MarshalJSON')
) {
// Just skip this one...
} else {
// Rethrow the error because we only want to catch the above error
throw e;
}
}
}
}
return vaasByBlock;
// for (let blockNumber = fromBlock; blockNumber <= toBlock; blockNumber++) {
// this.logger.debug('Getting block number ' + blockNumber);
// const blockResult: CosmwasmBlockResult = (
// await axios.get(`${this.rpc}/${this.getBlockTag}${blockNumber}`)
// ).data;
// if (!blockResult || !blockResult.block.data) {
// throw new Error('bad result for block ${blockNumber}');
// }
// const blockKey = makeBlockKey(
// blockNumber.toString(),
// new Date(blockResult.block.header.time).toISOString()
// );
// vaasByBlock[blockKey] = [];
// let vaaKey: string = '';
// let numTxs: number = 0;
// if (blockResult.block.data.txs) {
// numTxs = blockResult.block.data.txs.length;
// }
// for (let i = 0; i < numTxs; i++) {
// // The following check is not needed because of the check for numTxs.
// // But typescript wanted it anyway.
// if (!blockResult.block.data.txs) {
// continue;
// }
// let hash: string = this.hexToHash(blockResult.block.data.txs[i]);
// this.logger.debug('blockNumber = ' + blockNumber + ', txHash[' + i + '] = ' + hash);
// // console.log('Attempting to get hash', `${this.rpc}/${this.hashTag}${hash}`);
// try {
// const hashResult: CosmwasmHashResult = (
// await axios.get(`${this.rpc}/${this.hashTag}${hash}`, AXIOS_CONFIG_JSON)
// ).data;
// if (hashResult && hashResult.tx_response.events) {
// const numEvents = hashResult.tx_response.events.length;
// for (let j = 0; j < numEvents; j++) {
// let type: string = hashResult.tx_response.events[j].type;
// if (type === 'wasm') {
// if (hashResult.tx_response.events[j].attributes) {
// let attrs = hashResult.tx_response.events[j].attributes;
// let emitter: string = '';
// let sequence: string = '';
// let coreContract: boolean = false;
// // only care about _contract_address, message.sender and message.sequence
// const numAttrs = attrs.length;
// for (let k = 0; k < numAttrs; k++) {
// const key = Buffer.from(attrs[k].key, 'base64').toString().toLowerCase();
// this.logger.debug('Encoded Key = ' + attrs[k].key + ', decoded = ' + key);
// if (key === 'message.sender') {
// emitter = Buffer.from(attrs[k].value, 'base64').toString();
// } else if (key === 'message.sequence') {
// sequence = Buffer.from(attrs[k].value, 'base64').toString();
// } else if (key === '_contract_address' || key === 'contract_address') {
// let addr = Buffer.from(attrs[k].value, 'base64').toString();
// if (addr === address) {
// coreContract = true;
// }
// }
// }
// if (coreContract && emitter !== '' && sequence !== '') {
// vaaKey = makeVaaKey(hash, this.chain, emitter, sequence);
// this.logger.debug('blockKey: ' + blockKey);
// this.logger.debug('Making vaaKey: ' + vaaKey);
// vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
// }
// }
// }
// }
// } else {
// this.logger.error('There were no hashResults');
// }
// } catch (e: any) {
// // console.error(e);
// if (
// e?.response?.status === 500 &&
// e?.response?.data?.code === 2 &&
// e?.response?.data?.message.startsWith('json: error calling MarshalJSON')
// ) {
// // Just skip this one...
// } else {
// // Rethrow the error because we only want to catch the above error
// throw e;
// }
// }
// }
// }
// return vaasByBlock;
// }
override getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
}
}
@ -230,7 +234,7 @@ type CosmwasmHashResult = {
validator_src_address: string;
validator_dst_address: string;
amount: { denom: string; amount: string };
}
},
];
memo: '';
timeout_height: '0';
@ -246,7 +250,7 @@ type CosmwasmHashResult = {
};
mode_info: { single: { mode: string } };
sequence: string;
}
},
];
fee: {
amount: [{ denom: string; amount: string }];
@ -269,6 +273,6 @@ type EventsType = {
key: string;
value: string;
index: boolean;
}
},
];
};

View File

@ -8,9 +8,8 @@ import { Log } from '@ethersproject/abstract-provider';
import axios from 'axios';
import { BigNumber } from 'ethers';
import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts';
import { VaaLog, VaasByBlock } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { Watcher } from './Watcher';
import { VaaLog } from '../databases/types';
import BaseWatcher from './BaseWatcher';
// This is the hash for topic[0] of the core contract event LogMessagePublished
// https://github.com/wormhole-foundation/wormhole/blob/main/ethereum/contracts/Implementation.sol#L12
@ -29,7 +28,7 @@ export type ErrorBlock = {
message: string; //'Error: No response received from RPC endpoint in 60s'
};
export class EVMWatcher extends Watcher {
export class EVMWatcher extends BaseWatcher {
finalizedBlockTag: BlockTag;
lastTimestamp: number;
latestFinalizedBlockNumber: number;
@ -39,7 +38,7 @@ export class EVMWatcher extends Watcher {
this.lastTimestamp = 0;
this.latestFinalizedBlockNumber = 0;
this.finalizedBlockTag = finalizedBlockTag;
if (chain === 'acala' || chain === 'karura') {
if (['acala', 'karura'].includes(chain)) {
this.maximumBatchSize = 50;
}
}
@ -201,45 +200,14 @@ export class EVMWatcher extends Watcher {
throw new Error(`Unable to parse result of eth_getLogs for ${fromBlock}-${toBlock} on ${rpc}`);
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
this.logger.info(`fetching block ${this.finalizedBlockTag}`);
const block: Block = await this.getBlock(this.finalizedBlockTag);
this.latestFinalizedBlockNumber = block.number;
return block.number;
}
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
const address = CONTRACTS.MAINNET[this.chain].core;
if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
}
const logs = await this.getLogs(fromBlock, toBlock, address, [LOG_MESSAGE_PUBLISHED_TOPIC]);
const timestampsByBlock: { [block: number]: string } = {};
// fetch timestamps for each block
const vaasByBlock: VaasByBlock = {};
this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const blocks = await this.getBlocks(fromBlock, toBlock);
for (const block of blocks) {
const timestamp = new Date(block.timestamp * 1000).toISOString();
timestampsByBlock[block.number] = timestamp;
vaasByBlock[makeBlockKey(block.number.toString(), timestamp)] = [];
}
this.logger.info(`processing ${logs.length} logs`);
for (const log of logs) {
const blockNumber = log.blockNumber;
const emitter = log.topics[1].slice(2);
const {
args: { sequence, sender, payload },
} = wormholeInterface.parseLog(log);
const vaaKey = makeVaaKey(log.transactionHash, this.chain, emitter, sequence.toString());
const blockKey = makeBlockKey(blockNumber.toString(), timestampsByBlock[blockNumber]);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
return vaasByBlock;
}
async getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
override async getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
const vaaLogs: VaaLog[] = [];
const address = CONTRACTS.MAINNET[this.chain].core;
@ -247,7 +215,6 @@ export class EVMWatcher extends Watcher {
throw new Error(`Core contract not defined for ${this.chain}`);
}
// this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const logs = await this.getLogs(fromBlock, toBlock, address, [LOG_MESSAGE_PUBLISHED_TOPIC]);
this.logger.info(`processing ${logs.length} logs`);
@ -264,7 +231,7 @@ export class EVMWatcher extends Watcher {
const vaaId = `${chainId}/${emitter}/${sequence.toString()}`;
const vaaLog: VaaLog = {
id: vaaId,
vaaId,
chainName,
chainId,
emitter,
@ -273,6 +240,9 @@ export class EVMWatcher extends Watcher {
sender,
payload,
blockNumber,
indexedAt: new Date().getTime(),
updatedAt: new Date().getTime(),
createdAt: new Date().getTime(),
};
vaaLogs.push(vaaLog);

View File

@ -1,14 +1,14 @@
import { CONTRACTS } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import axios from 'axios';
import { RPCS_BY_CHAIN } from '../consts';
import { VaasByBlock } from '../databases/types';
import { VaaLog } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { EventObjectsTypes, RawLogEvents } from './TerraExplorerWatcher';
import { Watcher } from './Watcher';
import BaseWatcher from './BaseWatcher';
export class InjectiveExplorerWatcher extends Watcher {
export class InjectiveExplorerWatcher extends BaseWatcher {
// Arbitrarily large since the code here is capable of pulling all logs from all via indexer pagination
maximumBatchSize: number = 1_000_000;
override maximumBatchSize: number = 1_000_000;
latestBlockTag: string;
getBlockTag: string;
@ -30,7 +30,7 @@ export class InjectiveExplorerWatcher extends Watcher {
this.contractTag = 'api/explorer/v1/contractTxs/';
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
const result: ExplorerBlocks = (await axios.get(`${this.rpc}/${this.latestBlockTag}`)).data;
if (result && result.paging.total) {
let blockHeight: number = result.paging.total;
@ -47,129 +47,133 @@ export class InjectiveExplorerWatcher extends Watcher {
// should be core, but the explorer doesn't support it yet
// use "to": as the pagination key
// compare block height ("block_number":) with what is passed in.
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
const coreAddress = CONTRACTS.MAINNET[this.chain].core;
const address = CONTRACTS.MAINNET[this.chain].token_bridge;
if (!address) {
throw new Error(`Token Bridge contract not defined for ${this.chain}`);
}
this.logger.debug(`Token Bridge contract for ${this.chain} is ${address}`);
let vaasByBlock: VaasByBlock = {};
this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
// async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
// const coreAddress = CONTRACTS.MAINNET[this.chain].core;
// const address = CONTRACTS.MAINNET[this.chain].token_bridge;
// if (!address) {
// throw new Error(`Token Bridge contract not defined for ${this.chain}`);
// }
// this.logger.debug(`Token Bridge contract for ${this.chain} is ${address}`);
// let vaasByBlock: VaasByBlock = {};
// this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const limit: number = 50;
let done: boolean = false;
let skip: number = 0;
let lastBlockInserted: number = 0;
while (!done) {
// This URL gets the paginated list of transactions for the token bridge contract
let url: string = `${this.rpc}/${this.contractTag}${address}?skip=${skip}&limit=${limit}`;
this.logger.debug(`Query string = ${url}`);
const bulkTxnResult = (
await axios.get<ContractTxnResult>(url, {
headers: {
'User-Agent': 'Mozilla/5.0',
},
})
).data;
if (!bulkTxnResult) {
throw new Error('bad bulkTxnResult');
}
skip = bulkTxnResult.paging.to;
const bulkTxns: ContractTxnData[] = bulkTxnResult.data;
if (!bulkTxns) {
throw new Error('No transactions');
}
for (let i: number = 0; i < bulkTxns.length; ++i) {
// Walk the transactions
const txn: ContractTxnData = bulkTxns[i];
const height: number = txn.block_number;
if (height >= fromBlock && height <= toBlock) {
// We only care about the transactions in the given block range
this.logger.debug(`Found one: ${fromBlock}, ${height}, ${toBlock}`);
const blockKey = makeBlockKey(
txn.block_number.toString(),
new Date(txn.block_unix_timestamp).toISOString()
);
vaasByBlock[blockKey] = [];
lastBlockInserted = height;
this.logger.debug(`lastBlockInserted = ${lastBlockInserted}`);
let vaaKey: string = '';
// Each txn has an array of raw_logs
if (txn.logs) {
const rawLogs: RawLogEvents[] = txn.logs;
for (let j: number = 0; j < rawLogs.length; ++j) {
const rawLog: RawLogEvents = rawLogs[j];
const events: EventObjectsTypes[] = rawLog.events;
if (!events) {
this.logger.debug(
`No events in rawLog${j} for block ${height}, hash = ${txn.hash}`
);
continue;
}
for (let k: number = 0; k < events.length; k++) {
const event: EventObjectsTypes = events[k];
if (event.type === 'wasm') {
if (event.attributes) {
let attrs = event.attributes;
let emitter: string = '';
let sequence: string = '';
let coreContract: boolean = false;
// only care about _contract_address, message.sender and message.sequence
const numAttrs = attrs.length;
for (let l = 0; l < numAttrs; l++) {
const key = attrs[l].key;
if (key === 'message.sender') {
emitter = attrs[l].value;
} else if (key === 'message.sequence') {
sequence = attrs[l].value;
} else if (key === '_contract_address' || key === 'contract_address') {
let addr = attrs[l].value;
if (addr === coreAddress) {
coreContract = true;
}
}
}
if (coreContract && emitter !== '' && sequence !== '') {
vaaKey = makeVaaKey(txn.hash, this.chain, emitter, sequence);
this.logger.debug('blockKey: ' + blockKey);
this.logger.debug('Making vaaKey: ' + vaaKey);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
}
}
}
}
}
}
if (height < fromBlock) {
this.logger.debug('Breaking out due to height < fromBlock');
done = true;
break;
}
}
if (bulkTxns.length < limit) {
this.logger.debug('Breaking out due to ran out of txns.');
done = true;
}
}
if (lastBlockInserted < toBlock) {
// Need to create something for the last requested block because it will
// become the new starting point for subsequent calls.
this.logger.debug(`Adding filler for block ${toBlock}`);
const blkUrl = `${this.rpc}/${this.getBlockTag}${toBlock}`;
this.logger.debug(`Query string for block = ${blkUrl}`);
const result = (await axios.get<ExplorerBlock>(blkUrl)).data;
if (!result) {
throw new Error(`Unable to get block information for block ${toBlock}`);
}
const blockKey = makeBlockKey(
result.data.height.toString(),
new Date(result.data.timestamp).toISOString()
);
vaasByBlock[blockKey] = [];
}
return vaasByBlock;
// const limit: number = 50;
// let done: boolean = false;
// let skip: number = 0;
// let lastBlockInserted: number = 0;
// while (!done) {
// // This URL gets the paginated list of transactions for the token bridge contract
// let url: string = `${this.rpc}/${this.contractTag}${address}?skip=${skip}&limit=${limit}`;
// this.logger.debug(`Query string = ${url}`);
// const bulkTxnResult = (
// await axios.get<ContractTxnResult>(url, {
// headers: {
// 'User-Agent': 'Mozilla/5.0',
// },
// })
// ).data;
// if (!bulkTxnResult) {
// throw new Error('bad bulkTxnResult');
// }
// skip = bulkTxnResult.paging.to;
// const bulkTxns: ContractTxnData[] = bulkTxnResult.data;
// if (!bulkTxns) {
// throw new Error('No transactions');
// }
// for (let i: number = 0; i < bulkTxns.length; ++i) {
// // Walk the transactions
// const txn: ContractTxnData = bulkTxns[i];
// const height: number = txn.block_number;
// if (height >= fromBlock && height <= toBlock) {
// // We only care about the transactions in the given block range
// this.logger.debug(`Found one: ${fromBlock}, ${height}, ${toBlock}`);
// const blockKey = makeBlockKey(
// txn.block_number.toString(),
// new Date(txn.block_unix_timestamp).toISOString()
// );
// vaasByBlock[blockKey] = [];
// lastBlockInserted = height;
// this.logger.debug(`lastBlockInserted = ${lastBlockInserted}`);
// let vaaKey: string = '';
// // Each txn has an array of raw_logs
// if (txn.logs) {
// const rawLogs: RawLogEvents[] = txn.logs;
// for (let j: number = 0; j < rawLogs.length; ++j) {
// const rawLog: RawLogEvents = rawLogs[j];
// const events: EventObjectsTypes[] = rawLog.events;
// if (!events) {
// this.logger.debug(
// `No events in rawLog${j} for block ${height}, hash = ${txn.hash}`
// );
// continue;
// }
// for (let k: number = 0; k < events.length; k++) {
// const event: EventObjectsTypes = events[k];
// if (event.type === 'wasm') {
// if (event.attributes) {
// let attrs = event.attributes;
// let emitter: string = '';
// let sequence: string = '';
// let coreContract: boolean = false;
// // only care about _contract_address, message.sender and message.sequence
// const numAttrs = attrs.length;
// for (let l = 0; l < numAttrs; l++) {
// const key = attrs[l].key;
// if (key === 'message.sender') {
// emitter = attrs[l].value;
// } else if (key === 'message.sequence') {
// sequence = attrs[l].value;
// } else if (key === '_contract_address' || key === 'contract_address') {
// let addr = attrs[l].value;
// if (addr === coreAddress) {
// coreContract = true;
// }
// }
// }
// if (coreContract && emitter !== '' && sequence !== '') {
// vaaKey = makeVaaKey(txn.hash, this.chain, emitter, sequence);
// this.logger.debug('blockKey: ' + blockKey);
// this.logger.debug('Making vaaKey: ' + vaaKey);
// vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
// }
// }
// }
// }
// }
// }
// }
// if (height < fromBlock) {
// this.logger.debug('Breaking out due to height < fromBlock');
// done = true;
// break;
// }
// }
// if (bulkTxns.length < limit) {
// this.logger.debug('Breaking out due to ran out of txns.');
// done = true;
// }
// }
// if (lastBlockInserted < toBlock) {
// // Need to create something for the last requested block because it will
// // become the new starting point for subsequent calls.
// this.logger.debug(`Adding filler for block ${toBlock}`);
// const blkUrl = `${this.rpc}/${this.getBlockTag}${toBlock}`;
// this.logger.debug(`Query string for block = ${blkUrl}`);
// const result = (await axios.get<ExplorerBlock>(blkUrl)).data;
// if (!result) {
// throw new Error(`Unable to get block information for block ${toBlock}`);
// }
// const blockKey = makeBlockKey(
// result.data.height.toString(),
// new Date(result.data.timestamp).toISOString()
// );
// vaasByBlock[blockKey] = [];
// }
// return vaasByBlock;
// }
override getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
}
}

View File

@ -7,7 +7,8 @@ export class MoonbeamWatcher extends EVMWatcher {
constructor() {
super('moonbeam');
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
const latestBlock = await super.getFinalizedBlockNumber();
let isBlockFinalized = false;
while (!isBlockFinalized) {
@ -30,7 +31,7 @@ export class MoonbeamWatcher extends EVMWatcher {
params: [blockFromNumber.hash],
},
],
AXIOS_CONFIG_JSON
AXIOS_CONFIG_JSON,
)
)?.data?.[0]?.result || false;
} catch (e) {

View File

@ -5,60 +5,64 @@ import { BlockResult, ExecutionStatus } from 'near-api-js/lib/providers/provider
import ora from 'ora';
import { z } from 'zod';
import { RPCS_BY_CHAIN } from '../consts';
import { VaasByBlock } from '../databases/types';
import { VaaLog } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { EventLog } from '../types/near';
import { getNearProvider, isWormholePublishEventLog } from '../utils/near';
import { Watcher } from './Watcher';
import BaseWatcher from './BaseWatcher';
export class NearWatcher extends Watcher {
export class NearWatcher extends BaseWatcher {
provider: Provider | null = null;
constructor() {
super('near');
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
this.logger.info(`fetching final block for ${this.chain}`);
const provider = await this.getProvider();
const block = await provider.block({ finality: 'final' });
return block.header.height;
}
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
// assume toBlock was retrieved from getFinalizedBlockNumber and is finalized
this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const provider = await this.getProvider();
const blocks: BlockResult[] = [];
let block: BlockResult | null = null;
try {
block = await provider.block({ blockId: toBlock });
blocks.push(block);
while (true) {
// traverse backwards via block hashes: https://github.com/wormhole-foundation/wormhole-monitor/issues/35
block = await provider.block({ blockId: block.header.prev_hash });
if (block.header.height < fromBlock) break;
blocks.push(block);
}
} catch (e) {
if (e instanceof TypedError && e.type === 'HANDLER_ERROR') {
const error = block
? `block ${block.header.prev_hash} is too old, use backfillNear for blocks before height ${block.header.height}`
: `toBlock ${toBlock} is too old, use backfillNear for this range`; // starting block too old
this.logger.error(error);
} else {
throw e;
}
}
// async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
// // assume toBlock was retrieved from getFinalizedBlockNumber and is finalized
// this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
// const provider = await this.getProvider();
// const blocks: BlockResult[] = [];
// let block: BlockResult | null = null;
// try {
// block = await provider.block({ blockId: toBlock });
// blocks.push(block);
// while (true) {
// // traverse backwards via block hashes: https://github.com/wormhole-foundation/wormhole-monitor/issues/35
// block = await provider.block({ blockId: block.header.prev_hash });
// if (block.header.height < fromBlock) break;
// blocks.push(block);
// }
// } catch (e) {
// if (e instanceof TypedError && e.type === 'HANDLER_ERROR') {
// const error = block
// ? `block ${block.header.prev_hash} is too old, use backfillNear for blocks before height ${block.header.height}`
// : `toBlock ${toBlock} is too old, use backfillNear for this range`; // starting block too old
// this.logger.error(error);
// } else {
// throw e;
// }
// }
return getMessagesFromBlockResults(provider, blocks);
// return getMessagesFromBlockResults(provider, blocks);
// }
override getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
}
async getProvider(): Promise<Provider> {
return (this.provider = this.provider || (await getNearProvider(RPCS_BY_CHAIN.near!)));
}
isValidVaaKey(key: string) {
override isValidVaaKey(key: string) {
try {
const [txHash, vaaKey] = key.split(':');
const txHashDecoded = Buffer.from(decode(txHash)).toString('hex');
@ -74,49 +78,49 @@ export class NearWatcher extends Watcher {
}
}
export const getMessagesFromBlockResults = async (
provider: Provider,
blocks: BlockResult[],
debug: boolean = false
): Promise<VaasByBlock> => {
const vaasByBlock: VaasByBlock = {};
let log: ora.Ora;
if (debug) log = ora(`Fetching messages from ${blocks.length} blocks...`).start();
for (let i = 0; i < blocks.length; i++) {
if (debug) log!.text = `Fetching messages from block ${i + 1}/${blocks.length}...`;
const { height, timestamp } = blocks[i].header;
const blockKey = makeBlockKey(height.toString(), new Date(timestamp / 1_000_000).toISOString());
vaasByBlock[blockKey] = [];
// export const getMessagesFromBlockResults = async (
// provider: Provider,
// blocks: BlockResult[],
// debug: boolean = false
// ): Promise<VaasByBlock> => {
// const vaasByBlock: VaasByBlock = {};
// let log: ora.Ora;
// if (debug) log = ora(`Fetching messages from ${blocks.length} blocks...`).start();
// for (let i = 0; i < blocks.length; i++) {
// if (debug) log!.text = `Fetching messages from block ${i + 1}/${blocks.length}...`;
// const { height, timestamp } = blocks[i].header;
// const blockKey = makeBlockKey(height.toString(), new Date(timestamp / 1_000_000).toISOString());
// vaasByBlock[blockKey] = [];
const chunks = [];
for (const chunk of blocks[i].chunks) {
chunks.push(await provider.chunk(chunk.chunk_hash));
}
// const chunks = [];
// for (const chunk of blocks[i].chunks) {
// chunks.push(await provider.chunk(chunk.chunk_hash));
// }
const transactions = chunks.flatMap(({ transactions }) => transactions);
for (const tx of transactions) {
const outcome = await provider.txStatus(tx.hash, CONTRACTS.MAINNET.near.core);
const logs = outcome.receipts_outcome
.filter(
({ outcome }) =>
(outcome as any).executor_id === CONTRACTS.MAINNET.near.core &&
(outcome.status as ExecutionStatus).SuccessValue
)
.flatMap(({ outcome }) => outcome.logs)
.filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat
.map((log) => JSON.parse(log.slice(11)) as EventLog)
.filter(isWormholePublishEventLog);
for (const log of logs) {
const vaaKey = makeVaaKey(tx.hash, 'near', log.emitter, log.seq.toString());
vaasByBlock[blockKey] = [...vaasByBlock[blockKey], vaaKey];
}
}
}
// const transactions = chunks.flatMap(({ transactions }) => transactions);
// for (const tx of transactions) {
// const outcome = await provider.txStatus(tx.hash, CONTRACTS.MAINNET.near.core);
// const logs = outcome.receipts_outcome
// .filter(
// ({ outcome }) =>
// (outcome as any).executor_id === CONTRACTS.MAINNET.near.core &&
// (outcome.status as ExecutionStatus).SuccessValue
// )
// .flatMap(({ outcome }) => outcome.logs)
// .filter((log) => log.startsWith('EVENT_JSON:')) // https://nomicon.io/Standards/EventsFormat
// .map((log) => JSON.parse(log.slice(11)) as EventLog)
// .filter(isWormholePublishEventLog);
// for (const log of logs) {
// const vaaKey = makeVaaKey(tx.hash, 'near', log.emitter, log.seq.toString());
// vaasByBlock[blockKey] = [...vaasByBlock[blockKey], vaaKey];
// }
// }
// }
if (debug) {
const numMessages = Object.values(vaasByBlock).flat().length;
log!.succeed(`Fetched ${numMessages} messages from ${blocks.length} blocks`);
}
// if (debug) {
// const numMessages = Object.values(vaasByBlock).flat().length;
// log!.succeed(`Fetched ${numMessages} messages from ${blocks.length} blocks`);
// }
return vaasByBlock;
};
// return vaasByBlock;
// };

View File

@ -7,7 +7,8 @@ export class PolygonWatcher extends EVMWatcher {
constructor() {
super('polygon');
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
this.logger.info('fetching last child block from Ethereum');
const rootChain = new ethers.utils.Interface([
`function getLastChildBlock() external view returns (uint256)`,
@ -27,7 +28,7 @@ export class PolygonWatcher extends EVMWatcher {
],
},
],
AXIOS_CONFIG_JSON
AXIOS_CONFIG_JSON,
)
)?.data?.[0]?.result;
const block = rootChain.decodeFunctionResult('getLastChildBlock', callResult)[0].toNumber();

View File

@ -11,16 +11,16 @@ import {
import { decode } from 'bs58';
import { z } from 'zod';
import { RPCS_BY_CHAIN } from '../consts';
import { VaasByBlock } from '../databases/types';
import { VaaLog } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { isLegacyMessage, normalizeCompileInstruction } from '../utils/solana';
import { Watcher } from './Watcher';
import BaseWatcher from './BaseWatcher';
const WORMHOLE_PROGRAM_ID = CONTRACTS.MAINNET.solana.core;
const COMMITMENT: Commitment = 'finalized';
const GET_SIGNATURES_LIMIT = 1000;
export class SolanaWatcher extends Watcher {
export class SolanaWatcher extends BaseWatcher {
rpc: string;
// this is set as a class field so we can modify it in tests
getSignaturesLimit = GET_SIGNATURES_LIMIT;
@ -29,154 +29,158 @@ export class SolanaWatcher extends Watcher {
// transactions returned. Since we don't know the number of transactions in advance, we use
// a block range of 100K slots. Technically, batch size can be arbitrarily large since pagination
// of the WH transactions within that range is handled internally below.
maximumBatchSize = 100_000;
override maximumBatchSize = 100_000;
constructor() {
super('solana');
this.rpc = RPCS_BY_CHAIN.solana!;
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
const connection = new Connection(this.rpc, COMMITMENT);
return connection.getSlot();
}
async getMessagesForBlocks(fromSlot: number, toSlot: number): Promise<VaasByBlock> {
const connection = new Connection(this.rpc, COMMITMENT);
// in the rare case of maximumBatchSize skipped blocks in a row,
// you might hit this error due to the recursion below
if (fromSlot > toSlot) throw new Error('solana: invalid block range');
this.logger.info(`fetching info for blocks ${fromSlot} to ${toSlot}`);
const vaasByBlock: VaasByBlock = {};
// async getMessagesForBlocks(fromSlot: number, toSlot: number): Promise<VaasByBlock> {
// const connection = new Connection(this.rpc, COMMITMENT);
// // in the rare case of maximumBatchSize skipped blocks in a row,
// // you might hit this error due to the recursion below
// if (fromSlot > toSlot) throw new Error('solana: invalid block range');
// this.logger.info(`fetching info for blocks ${fromSlot} to ${toSlot}`);
// const vaasByBlock: VaasByBlock = {};
// identify block range by fetching signatures of the first and last transactions
// getSignaturesForAddress walks backwards so fromSignature occurs after toSignature
let toBlock: VersionedBlockResponse | null = null;
try {
toBlock = await connection.getBlock(toSlot, { maxSupportedTransactionVersion: 0 });
} catch (e) {
if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) {
// failed to get confirmed block: slot was skipped or missing in long-term storage
return this.getMessagesForBlocks(fromSlot, toSlot - 1);
} else {
throw e;
}
}
if (!toBlock || !toBlock.blockTime || toBlock.transactions.length === 0) {
return this.getMessagesForBlocks(fromSlot, toSlot - 1);
}
const fromSignature =
toBlock.transactions[toBlock.transactions.length - 1].transaction.signatures[0];
// // identify block range by fetching signatures of the first and last transactions
// // getSignaturesForAddress walks backwards so fromSignature occurs after toSignature
// let toBlock: VersionedBlockResponse | null = null;
// try {
// toBlock = await connection.getBlock(toSlot, { maxSupportedTransactionVersion: 0 });
// } catch (e) {
// if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) {
// // failed to get confirmed block: slot was skipped or missing in long-term storage
// return this.getMessagesForBlocks(fromSlot, toSlot - 1);
// } else {
// throw e;
// }
// }
// if (!toBlock || !toBlock.blockTime || toBlock.transactions.length === 0) {
// return this.getMessagesForBlocks(fromSlot, toSlot - 1);
// }
// const fromSignature =
// toBlock.transactions[toBlock.transactions.length - 1].transaction.signatures[0];
let fromBlock: VersionedBlockResponse | null = null;
try {
fromBlock = await connection.getBlock(fromSlot, { maxSupportedTransactionVersion: 0 });
} catch (e) {
if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) {
// failed to get confirmed block: slot was skipped or missing in long-term storage
return this.getMessagesForBlocks(fromSlot + 1, toSlot);
} else {
throw e;
}
}
if (!fromBlock || !fromBlock.blockTime || fromBlock.transactions.length === 0) {
return this.getMessagesForBlocks(fromSlot + 1, toSlot);
}
const toSignature = fromBlock.transactions[0].transaction.signatures[0];
// let fromBlock: VersionedBlockResponse | null = null;
// try {
// fromBlock = await connection.getBlock(fromSlot, { maxSupportedTransactionVersion: 0 });
// } catch (e) {
// if (e instanceof SolanaJSONRPCError && (e.code === -32007 || e.code === -32009)) {
// // failed to get confirmed block: slot was skipped or missing in long-term storage
// return this.getMessagesForBlocks(fromSlot + 1, toSlot);
// } else {
// throw e;
// }
// }
// if (!fromBlock || !fromBlock.blockTime || fromBlock.transactions.length === 0) {
// return this.getMessagesForBlocks(fromSlot + 1, toSlot);
// }
// const toSignature = fromBlock.transactions[0].transaction.signatures[0];
// get all core bridge signatures between fromTransaction and toTransaction
let numSignatures = this.getSignaturesLimit;
let currSignature: string | undefined = fromSignature;
while (numSignatures === this.getSignaturesLimit) {
const signatures: ConfirmedSignatureInfo[] = await connection.getSignaturesForAddress(
new PublicKey(WORMHOLE_PROGRAM_ID),
{
before: currSignature,
until: toSignature,
limit: this.getSignaturesLimit,
}
);
// // get all core bridge signatures between fromTransaction and toTransaction
// let numSignatures = this.getSignaturesLimit;
// let currSignature: string | undefined = fromSignature;
// while (numSignatures === this.getSignaturesLimit) {
// const signatures: ConfirmedSignatureInfo[] = await connection.getSignaturesForAddress(
// new PublicKey(WORMHOLE_PROGRAM_ID),
// {
// before: currSignature,
// until: toSignature,
// limit: this.getSignaturesLimit,
// }
// );
this.logger.info(`processing ${signatures.length} transactions`);
// this.logger.info(`processing ${signatures.length} transactions`);
// In order to determine if a transaction has a Wormhole message, we normalize and iterate
// through all instructions in the transaction. Only PostMessage instructions are relevant
// when looking for messages. PostMessageUnreliable instructions are ignored because there
// are no data availability guarantees (ie the associated message accounts may have been
// reused, overwriting previous data). Then, the message account is the account given by
// the second index in the instruction's account key indices. From here, we can fetch the
// message data from the account and parse out the emitter and sequence.
const results = await connection.getTransactions(
signatures.map((s) => s.signature),
{
maxSupportedTransactionVersion: 0,
}
);
if (results.length !== signatures.length) {
throw new Error(`solana: failed to fetch tx for signatures`);
}
for (const res of results) {
if (res?.meta?.err) {
// skip errored txs
continue;
}
if (!res || !res.blockTime) {
throw new Error(
`solana: failed to fetch tx for signature ${
res?.transaction.signatures[0] || 'unknown'
}`
);
}
// // In order to determine if a transaction has a Wormhole message, we normalize and iterate
// // through all instructions in the transaction. Only PostMessage instructions are relevant
// // when looking for messages. PostMessageUnreliable instructions are ignored because there
// // are no data availability guarantees (ie the associated message accounts may have been
// // reused, overwriting previous data). Then, the message account is the account given by
// // the second index in the instruction's account key indices. From here, we can fetch the
// // message data from the account and parse out the emitter and sequence.
// const results = await connection.getTransactions(
// signatures.map((s) => s.signature),
// {
// maxSupportedTransactionVersion: 0,
// }
// );
// if (results.length !== signatures.length) {
// throw new Error(`solana: failed to fetch tx for signatures`);
// }
// for (const res of results) {
// if (res?.meta?.err) {
// // skip errored txs
// continue;
// }
// if (!res || !res.blockTime) {
// throw new Error(
// `solana: failed to fetch tx for signature ${
// res?.transaction.signatures[0] || 'unknown'
// }`
// );
// }
const message = res.transaction.message;
const accountKeys = isLegacyMessage(message)
? message.accountKeys
: message.staticAccountKeys;
const programIdIndex = accountKeys.findIndex((i) => i.toBase58() === WORMHOLE_PROGRAM_ID);
const instructions = message.compiledInstructions;
const innerInstructions =
res.meta?.innerInstructions?.flatMap((i) =>
i.instructions.map(normalizeCompileInstruction)
) || [];
const whInstructions = innerInstructions
.concat(instructions)
.filter((i) => i.programIdIndex === programIdIndex);
for (const instruction of whInstructions) {
// skip if not postMessage instruction
const instructionId = instruction.data;
if (instructionId[0] !== 0x01) continue;
// const message = res.transaction.message;
// const accountKeys = isLegacyMessage(message)
// ? message.accountKeys
// : message.staticAccountKeys;
// const programIdIndex = accountKeys.findIndex((i) => i.toBase58() === WORMHOLE_PROGRAM_ID);
// const instructions = message.compiledInstructions;
// const innerInstructions =
// res.meta?.innerInstructions?.flatMap((i) =>
// i.instructions.map(normalizeCompileInstruction)
// ) || [];
// const whInstructions = innerInstructions
// .concat(instructions)
// .filter((i) => i.programIdIndex === programIdIndex);
// for (const instruction of whInstructions) {
// // skip if not postMessage instruction
// const instructionId = instruction.data;
// if (instructionId[0] !== 0x01) continue;
const accountId = accountKeys[instruction.accountKeyIndexes[1]];
const {
message: { emitterAddress, sequence },
} = await getPostedMessage(connection, accountId.toBase58(), COMMITMENT);
const blockKey = makeBlockKey(
res.slot.toString(),
new Date(res.blockTime * 1000).toISOString()
);
const vaaKey = makeVaaKey(
res.transaction.signatures[0],
this.chain,
emitterAddress.toString('hex'),
sequence.toString()
);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
}
// const accountId = accountKeys[instruction.accountKeyIndexes[1]];
// const {
// message: { emitterAddress, sequence },
// } = await getPostedMessage(connection, accountId.toBase58(), COMMITMENT);
// const blockKey = makeBlockKey(
// res.slot.toString(),
// new Date(res.blockTime * 1000).toISOString()
// );
// const vaaKey = makeVaaKey(
// res.transaction.signatures[0],
// this.chain,
// emitterAddress.toString('hex'),
// sequence.toString()
// );
// vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
// }
// }
numSignatures = signatures.length;
currSignature = signatures.at(-1)?.signature;
}
// numSignatures = signatures.length;
// currSignature = signatures.at(-1)?.signature;
// }
// add last block for storeVaasByBlock
const lastBlockKey = makeBlockKey(
toSlot.toString(),
new Date(toBlock.blockTime * 1000).toISOString()
);
return { [lastBlockKey]: [], ...vaasByBlock };
// // add last block for storeVaasByBlock
// const lastBlockKey = makeBlockKey(
// toSlot.toString(),
// new Date(toBlock.blockTime * 1000).toISOString()
// );
// return { [lastBlockKey]: [], ...vaasByBlock };
// }
override getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
}
isValidVaaKey(key: string) {
override isValidVaaKey(key: string) {
try {
const [txHash, vaaKey] = key.split(':');
const txHashDecoded = Buffer.from(decode(txHash)).toString('hex');

View File

@ -7,9 +7,9 @@ import {
} from '@mysten/sui.js';
import { array } from 'superstruct';
import { RPCS_BY_CHAIN } from '../consts';
import { VaasByBlock } from '../databases/types';
import { Watcher } from './Watcher';
import BaseWatcher from './BaseWatcher';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { VaaLog } from '../databases/types';
const SUI_EVENT_HANDLE = `0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a::publish_message::WormholeMessage`;
@ -22,9 +22,9 @@ type PublishMessageEvent = {
timestamp: string;
};
export class SuiWatcher extends Watcher {
export class SuiWatcher extends BaseWatcher {
client: JsonRpcClient;
maximumBatchSize: number = 100000; // arbitrarily large as this pages back by events
override maximumBatchSize: number = 100_000; // arbitrarily large as this pages back by events
constructor() {
super('sui');
@ -32,92 +32,96 @@ export class SuiWatcher extends Watcher {
}
// TODO: this might break using numbers, the whole service needs a refactor to use BigInt
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
return Number(
(await this.client.request('sui_getLatestCheckpointSequenceNumber', undefined)).result
(await this.client.request('sui_getLatestCheckpointSequenceNumber', undefined)).result,
);
}
// TODO: this might break using numbers, the whole service needs a refactor to use BigInt
async getMessagesForBlocks(fromCheckpoint: number, toCheckpoint: number): Promise<VaasByBlock> {
this.logger.info(`fetching info for checkpoints ${fromCheckpoint} to ${toCheckpoint}`);
const vaasByBlock: VaasByBlock = {};
// async getMessagesForBlocks(fromCheckpoint: number, toCheckpoint: number): Promise<VaasByBlock> {
// this.logger.info(`fetching info for checkpoints ${fromCheckpoint} to ${toCheckpoint}`);
// const vaasByBlock: VaasByBlock = {};
{
// reserve empty slot for initial block so query is cataloged
const fromCheckpointTimestamp = new Date(
Number(
(
await this.client.requestWithType(
'sui_getCheckpoint',
{ id: fromCheckpoint.toString() },
Checkpoint
)
).timestampMs
)
).toISOString();
const fromBlockKey = makeBlockKey(fromCheckpoint.toString(), fromCheckpointTimestamp);
vaasByBlock[fromBlockKey] = [];
}
// {
// // reserve empty slot for initial block so query is cataloged
// const fromCheckpointTimestamp = new Date(
// Number(
// (
// await this.client.requestWithType(
// 'sui_getCheckpoint',
// { id: fromCheckpoint.toString() },
// Checkpoint,
// )
// ).timestampMs,
// ),
// ).toISOString();
// const fromBlockKey = makeBlockKey(fromCheckpoint.toString(), fromCheckpointTimestamp);
// vaasByBlock[fromBlockKey] = [];
// }
let lastCheckpoint: null | number = null;
let cursor: any = undefined;
let hasNextPage = false;
do {
const response = await this.client.requestWithType(
'suix_queryEvents',
{
query: { MoveEventType: SUI_EVENT_HANDLE },
cursor,
descending_order: true,
},
PaginatedEvents
);
const digest = response.data.length
? response.data[response.data.length - 1].id.txDigest
: null;
lastCheckpoint = digest
? Number(
(
await this.client.requestWithType(
'sui_getTransactionBlock',
{ digest },
SuiTransactionBlockResponse
)
).checkpoint!
)
: null;
cursor = response.nextCursor;
hasNextPage = response.hasNextPage;
const txBlocks = await this.client.requestWithType(
'sui_multiGetTransactionBlocks',
{ digests: response.data.map((e) => e.id.txDigest) },
array(SuiTransactionBlockResponse)
);
const checkpointByTxDigest = txBlocks.reduce<Record<string, string | undefined>>(
(value, { digest, checkpoint }) => {
value[digest] = checkpoint;
return value;
},
{}
);
for (const event of response.data) {
const checkpoint = checkpointByTxDigest[event.id.txDigest];
if (!checkpoint) continue;
const checkpointNum = Number(checkpoint);
if (checkpointNum < fromCheckpoint || checkpointNum > toCheckpoint) continue;
const msg = event.parsedJson as PublishMessageEvent;
const timestamp = new Date(Number(msg.timestamp) * 1000).toISOString();
const vaaKey = makeVaaKey(
event.id.txDigest,
CHAIN_ID_SUI,
msg.sender.slice(2),
msg.sequence
);
const blockKey = makeBlockKey(checkpoint, timestamp);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
} while (hasNextPage && lastCheckpoint && fromCheckpoint < lastCheckpoint);
return vaasByBlock;
// let lastCheckpoint: null | number = null;
// let cursor: any = undefined;
// let hasNextPage = false;
// do {
// const response = await this.client.requestWithType(
// 'suix_queryEvents',
// {
// query: { MoveEventType: SUI_EVENT_HANDLE },
// cursor,
// descending_order: true,
// },
// PaginatedEvents,
// );
// const digest = response.data.length
// ? response.data[response.data.length - 1].id.txDigest
// : null;
// lastCheckpoint = digest
// ? Number(
// (
// await this.client.requestWithType(
// 'sui_getTransactionBlock',
// { digest },
// SuiTransactionBlockResponse,
// )
// ).checkpoint!,
// )
// : null;
// cursor = response.nextCursor;
// hasNextPage = response.hasNextPage;
// const txBlocks = await this.client.requestWithType(
// 'sui_multiGetTransactionBlocks',
// { digests: response.data.map((e) => e.id.txDigest) },
// array(SuiTransactionBlockResponse),
// );
// const checkpointByTxDigest = txBlocks.reduce<Record<string, string | undefined>>(
// (value, { digest, checkpoint }) => {
// value[digest] = checkpoint;
// return value;
// },
// {},
// );
// for (const event of response.data) {
// const checkpoint = checkpointByTxDigest[event.id.txDigest];
// if (!checkpoint) continue;
// const checkpointNum = Number(checkpoint);
// if (checkpointNum < fromCheckpoint || checkpointNum > toCheckpoint) continue;
// const msg = event.parsedJson as PublishMessageEvent;
// const timestamp = new Date(Number(msg.timestamp) * 1000).toISOString();
// const vaaKey = makeVaaKey(
// event.id.txDigest,
// CHAIN_ID_SUI,
// msg.sender.slice(2),
// msg.sequence,
// );
// const blockKey = makeBlockKey(checkpoint, timestamp);
// vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
// }
// } while (hasNextPage && lastCheckpoint && fromCheckpoint < lastCheckpoint);
// return vaasByBlock;
// }
override getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
}
}

View File

@ -1,13 +1,13 @@
import { CONTRACTS, CosmWasmChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import axios from 'axios';
import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts';
import { VaasByBlock } from '../databases/types';
import { VaaLog } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { Watcher } from './Watcher';
import BaseWatcher from './BaseWatcher';
export class TerraExplorerWatcher extends Watcher {
export class TerraExplorerWatcher extends BaseWatcher {
// Arbitrarily large since the code here is capable of pulling all logs from all via indexer pagination
maximumBatchSize: number = 100000;
override maximumBatchSize: number = 100_000;
latestBlockTag: string;
getBlockTag: string;
@ -27,7 +27,7 @@ export class TerraExplorerWatcher extends Watcher {
this.latestBlockHeight = 0;
}
async getFinalizedBlockNumber(): Promise<number> {
override async getFinalizedBlockNumber(): Promise<number> {
const result = (await axios.get(`${this.rpc}/${this.latestBlockTag}`, AXIOS_CONFIG_JSON)).data;
if (result && result.block.header.height) {
let blockHeight: number = parseInt(result.block.header.height);
@ -43,123 +43,127 @@ export class TerraExplorerWatcher extends Watcher {
// retrieve blocks for core contract.
// use "next": as the pagination key
// compare block height ("height":) with what is passed in.
async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
const address = CONTRACTS.MAINNET[this.chain].core;
if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
}
this.logger.debug(`core contract for ${this.chain} is ${address}`);
let vaasByBlock: VaasByBlock = {};
this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
// async getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock> {
// const address = CONTRACTS.MAINNET[this.chain].core;
// if (!address) {
// throw new Error(`Core contract not defined for ${this.chain}`);
// }
// this.logger.debug(`core contract for ${this.chain} is ${address}`);
// let vaasByBlock: VaasByBlock = {};
// this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const limit: number = 100;
let done: boolean = false;
let offset: number = 0;
let lastBlockInserted: number = 0;
while (!done) {
// This URL gets the paginated list of transactions for the core contract
let url: string = `${this.rpc}/${this.allTxsTag}offset=${offset}&limit=${limit}&account=${address}`;
this.logger.debug(`Query string = ${url}`);
const bulkTxnResult: BulkTxnResult = (
await axios.get(url, {
headers: {
'User-Agent': 'Mozilla/5.0',
'Accept-Encoding': 'application/json',
},
})
).data;
if (!bulkTxnResult) {
throw new Error('bad bulkTxnResult');
}
offset = bulkTxnResult.next;
const bulkTxns: BulkTxn[] = bulkTxnResult.txs;
if (!bulkTxns) {
throw new Error('No transactions');
}
for (let i: number = 0; i < bulkTxns.length; ++i) {
// Walk the transactions
const txn: BulkTxn = bulkTxns[i];
const height: number = parseInt(txn.height);
if (height >= fromBlock && height <= toBlock) {
// We only care about the transactions in the given block range
this.logger.debug(`Found one: ${fromBlock}, ${height}, ${toBlock}`);
const blockKey = makeBlockKey(txn.height, new Date(txn.timestamp).toISOString());
vaasByBlock[blockKey] = [];
lastBlockInserted = height;
this.logger.debug(`lastBlockInserted = ${lastBlockInserted}`);
let vaaKey: string = '';
// Each txn has an array of raw_logs
const rawLogs: RawLogEvents[] = JSON.parse(txn.raw_log);
for (let j: number = 0; j < rawLogs.length; ++j) {
const rawLog: RawLogEvents = rawLogs[j];
const events: EventObjectsTypes[] = rawLog.events;
if (!events) {
this.logger.debug(
`No events in rawLog${j} for block ${height}, hash = ${txn.txhash}`
);
continue;
}
for (let k: number = 0; k < events.length; k++) {
const event: EventObjectsTypes = events[k];
if (event.type === 'wasm') {
if (event.attributes) {
let attrs = event.attributes;
let emitter: string = '';
let sequence: string = '';
let coreContract: boolean = false;
// only care about _contract_address, message.sender and message.sequence
const numAttrs = attrs.length;
for (let l = 0; l < numAttrs; l++) {
const key = attrs[l].key;
if (key === 'message.sender') {
emitter = attrs[l].value;
} else if (key === 'message.sequence') {
sequence = attrs[l].value;
} else if (key === '_contract_address' || key === 'contract_address') {
let addr = attrs[l].value;
if (addr === address) {
coreContract = true;
}
}
}
if (coreContract && emitter !== '' && sequence !== '') {
vaaKey = makeVaaKey(txn.txhash, this.chain, emitter, sequence);
this.logger.debug('blockKey: ' + blockKey);
this.logger.debug('Making vaaKey: ' + vaaKey);
vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
}
}
}
}
}
}
if (height < fromBlock) {
this.logger.debug('Breaking out due to height < fromBlock');
done = true;
break;
}
}
if (bulkTxns.length < limit) {
this.logger.debug('Breaking out due to ran out of txns.');
done = true;
}
}
if (lastBlockInserted < toBlock) {
// Need to create something for the last requested block because it will
// become the new starting point for subsequent calls.
this.logger.debug(`Adding filler for block ${toBlock}`);
const blkUrl = `${this.rpc}/${this.getBlockTag}${toBlock}`;
const result: CosmwasmBlockResult = (await axios.get(blkUrl, AXIOS_CONFIG_JSON)).data;
if (!result) {
throw new Error(`Unable to get block information for block ${toBlock}`);
}
const blockKey = makeBlockKey(
result.block.header.height.toString(),
new Date(result.block.header.time).toISOString()
);
vaasByBlock[blockKey] = [];
}
return vaasByBlock;
// const limit: number = 100;
// let done: boolean = false;
// let offset: number = 0;
// let lastBlockInserted: number = 0;
// while (!done) {
// // This URL gets the paginated list of transactions for the core contract
// let url: string = `${this.rpc}/${this.allTxsTag}offset=${offset}&limit=${limit}&account=${address}`;
// this.logger.debug(`Query string = ${url}`);
// const bulkTxnResult: BulkTxnResult = (
// await axios.get(url, {
// headers: {
// 'User-Agent': 'Mozilla/5.0',
// 'Accept-Encoding': 'application/json',
// },
// })
// ).data;
// if (!bulkTxnResult) {
// throw new Error('bad bulkTxnResult');
// }
// offset = bulkTxnResult.next;
// const bulkTxns: BulkTxn[] = bulkTxnResult.txs;
// if (!bulkTxns) {
// throw new Error('No transactions');
// }
// for (let i: number = 0; i < bulkTxns.length; ++i) {
// // Walk the transactions
// const txn: BulkTxn = bulkTxns[i];
// const height: number = parseInt(txn.height);
// if (height >= fromBlock && height <= toBlock) {
// // We only care about the transactions in the given block range
// this.logger.debug(`Found one: ${fromBlock}, ${height}, ${toBlock}`);
// const blockKey = makeBlockKey(txn.height, new Date(txn.timestamp).toISOString());
// vaasByBlock[blockKey] = [];
// lastBlockInserted = height;
// this.logger.debug(`lastBlockInserted = ${lastBlockInserted}`);
// let vaaKey: string = '';
// // Each txn has an array of raw_logs
// const rawLogs: RawLogEvents[] = JSON.parse(txn.raw_log);
// for (let j: number = 0; j < rawLogs.length; ++j) {
// const rawLog: RawLogEvents = rawLogs[j];
// const events: EventObjectsTypes[] = rawLog.events;
// if (!events) {
// this.logger.debug(
// `No events in rawLog${j} for block ${height}, hash = ${txn.txhash}`,
// );
// continue;
// }
// for (let k: number = 0; k < events.length; k++) {
// const event: EventObjectsTypes = events[k];
// if (event.type === 'wasm') {
// if (event.attributes) {
// let attrs = event.attributes;
// let emitter: string = '';
// let sequence: string = '';
// let coreContract: boolean = false;
// // only care about _contract_address, message.sender and message.sequence
// const numAttrs = attrs.length;
// for (let l = 0; l < numAttrs; l++) {
// const key = attrs[l].key;
// if (key === 'message.sender') {
// emitter = attrs[l].value;
// } else if (key === 'message.sequence') {
// sequence = attrs[l].value;
// } else if (key === '_contract_address' || key === 'contract_address') {
// let addr = attrs[l].value;
// if (addr === address) {
// coreContract = true;
// }
// }
// }
// if (coreContract && emitter !== '' && sequence !== '') {
// vaaKey = makeVaaKey(txn.txhash, this.chain, emitter, sequence);
// this.logger.debug('blockKey: ' + blockKey);
// this.logger.debug('Making vaaKey: ' + vaaKey);
// vaasByBlock[blockKey] = [...(vaasByBlock[blockKey] || []), vaaKey];
// }
// }
// }
// }
// }
// }
// if (height < fromBlock) {
// this.logger.debug('Breaking out due to height < fromBlock');
// done = true;
// break;
// }
// }
// if (bulkTxns.length < limit) {
// this.logger.debug('Breaking out due to ran out of txns.');
// done = true;
// }
// }
// if (lastBlockInserted < toBlock) {
// // Need to create something for the last requested block because it will
// // become the new starting point for subsequent calls.
// this.logger.debug(`Adding filler for block ${toBlock}`);
// const blkUrl = `${this.rpc}/${this.getBlockTag}${toBlock}`;
// const result: CosmwasmBlockResult = (await axios.get(blkUrl, AXIOS_CONFIG_JSON)).data;
// if (!result) {
// throw new Error(`Unable to get block information for block ${toBlock}`);
// }
// const blockKey = makeBlockKey(
// result.block.header.height.toString(),
// new Date(result.block.header.time).toISOString(),
// );
// vaasByBlock[blockKey] = [];
// }
// return vaasByBlock;
// }
override getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
}
}

View File

@ -0,0 +1,39 @@
import { ChainName } from '@certusone/wormhole-sdk';
import BaseDB from '../databases/BaseDB';
import { VaaLog } from '../databases/types';
import BaseSNS from '../services/SNS/BaseSNS';
import { WormholeLogger } from '../utils/logger';
import { AlgorandWatcher } from './AlgorandWatcher';
import { AptosWatcher } from './AptosWatcher';
import { BSCWatcher } from './BSCWatcher';
import { CosmwasmWatcher } from './CosmwasmWatcher';
import { EVMWatcher } from './EVMWatcher';
import { InjectiveExplorerWatcher } from './InjectiveExplorerWatcher';
import { NearWatcher } from './NearWatcher';
import { SolanaWatcher } from './SolanaWatcher';
import { SuiWatcher } from './SuiWatcher';
import { TerraExplorerWatcher } from './TerraExplorerWatcher';
export type WatcherOptionTypes =
| SolanaWatcher
| EVMWatcher
| BSCWatcher
| AlgorandWatcher
| AptosWatcher
| NearWatcher
| InjectiveExplorerWatcher
| TerraExplorerWatcher
| CosmwasmWatcher
| SuiWatcher;
export interface WatcherImplementation {
chain: ChainName;
logger: WormholeLogger;
maximumBatchSize: number;
sns?: BaseSNS | null;
db?: BaseDB;
getFinalizedBlockNumber(): Promise<number>;
getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]>;
isValidBlockKey(key: string): boolean;
isValidVaaKey(key: string): boolean;
watch(): Promise<void>;
}

View File

@ -1,4 +1,8 @@
import { ChainName, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import {
ChainName,
CosmWasmChainName,
EVMChainName,
} from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { AlgorandWatcher } from './AlgorandWatcher';
import { AptosWatcher } from './AptosWatcher';
import { ArbitrumWatcher } from './ArbitrumWatcher';
@ -11,10 +15,10 @@ import { NearWatcher } from './NearWatcher';
import { PolygonWatcher } from './PolygonWatcher';
import { SolanaWatcher } from './SolanaWatcher';
import { TerraExplorerWatcher } from './TerraExplorerWatcher';
import { Watcher } from './Watcher';
import { SuiWatcher } from './SuiWatcher';
import { WatcherOptionTypes } from './types';
export function makeFinalizedWatcher(chainName: ChainName): Watcher {
export function makeFinalizedWatcher(chainName: ChainName): WatcherOptionTypes {
if (chainName === 'solana') {
return new SolanaWatcher();
} else if (['ethereum', 'karura', 'acala'].includes(chainName)) {
@ -41,8 +45,8 @@ export function makeFinalizedWatcher(chainName: ChainName): Watcher {
return new InjectiveExplorerWatcher();
} else if (chainName === 'terra') {
return new TerraExplorerWatcher('terra');
} else if (chainName === 'terra2' || chainName === 'xpla') {
return new CosmwasmWatcher(chainName);
} else if (['terra2', 'xpla'].includes(chainName)) {
return new CosmwasmWatcher(chainName as CosmWasmChainName);
} else if (chainName === 'sui') {
return new SuiWatcher();
} else {

View File

@ -15,6 +15,7 @@
"noFallthroughCasesInSwitch": true,
"resolveJsonModule": true,
"noEmit": true,
"noImplicitOverride": true,
"lib": ["es2022"]
},
"include": ["scripts", "src", "src/abi/*.json"]