feat: add EVM event-watcher support with mongodb / sns (#638)

* feat: add EVM event-watcher support with mongodb

* fix commons

* fix: PR comments

* feat: add SNS Client support

* fix: remove bigtable database support

---------

Co-authored-by: gipsh <gipshm@gmail.com>
This commit is contained in:
Ricardo Olarte 2023-08-18 07:47:11 -05:00 committed by GitHub
parent 47b27ec13a
commit 64dc83e079
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 2238 additions and 482 deletions

View File

@ -4,11 +4,12 @@ LOG_LEVEL=info
ETH_RPC=
DB_SOURCE=
JSON_DB_FILE=db.json
FIRESTORE_ACCOUNT_KEY_PATH=
FIRESTORE_COLLECTION=
FIRESTORE_LATEST_COLLECTION=
GOOGLE_APPLICATION_CREDENTIALS=
BIGTABLE_TABLE_ID=
BIGTABLE_INSTANCE_ID=
BIGTABLE_SIGNED_VAAS_TABLE_ID=
BIGTABLE_VAAS_BY_TX_HASH_TABLE_ID=
MONGODB_URI=mongodb://localhost:27017
MONGODB_DATABASE=wormhole
AWS_SNS_REGION=
AWS_TOPIC_ARN=
AWS_SNS_SUBJECT=EventWatcher
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,7 @@
"read-firestore": "ts-node scripts/readFirestore.ts"
},
"dependencies": {
"@aws-sdk/client-sns": "3.391.0",
"@celo-tools/celo-ethers-wrapper": "^0.3.0",
"@certusone/wormhole-sdk": "^0.9.22",
"@fastify/swagger": "^8.8.0",

View File

@ -0,0 +1,7 @@
export function chunkArray<T>(arr: T[], size: number): T[][] {
const chunks = [];
for (let i = 0; i < arr.length; i += size) {
chunks.push(arr.slice(i, i + size));
}
return chunks;
}

View File

@ -0,0 +1,86 @@
import {
ChainId,
ChainName,
coalesceChainName,
} from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
export const INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN: {
[key in ChainName]?: string;
} = {
ethereum: '12959638',
terra: '4810000', // not sure exactly but this should be before the first known message
bsc: '9745450',
polygon: '20629146',
avalanche: '8237163',
oasis: '1757',
algorand: '22931277',
fantom: '31817467',
karura: '1824665',
acala: '1144161',
klaytn: '90563824',
celo: '12947144',
moonbeam: '1486591',
terra2: '399813',
injective: '20908376',
arbitrum: '18128584',
optimism: '69401779',
aptos: '0', // block is 1094390 but AptosWatcher uses sequence number instead
near: '72767136',
xpla: '777549',
solana: '94401321', // https://explorer.solana.com/tx/KhLy688yDxbP7xbXVXK7TGpZU5DAFHbYiaoX16zZArxvVySz8i8g7N7Ss2noQYoq9XRbg6HDzrQBjUfmNcSWwhe
sui: '1485552', // https://explorer.sui.io/txblock/671SoTvVUvBZQWKXeameDvAwzHQvnr8Nj7dR9MUwm3CV?network=https%3A%2F%2Frpc.mainnet.sui.io
base: '1422314',
};
export const TOKEN_BRIDGE_EMITTERS: { [key in ChainName]?: string } = {
solana: 'ec7372995d5cc8732397fb0ad35c0121e0eaa90d26f828a534cab54391b3a4f5',
ethereum: '0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585',
terra: '0000000000000000000000007cf7b764e38a0a5e967972c1df77d432510564e2',
terra2: 'a463ad028fb79679cfc8ce1efba35ac0e77b35080a1abe9bebe83461f176b0a3',
bsc: '000000000000000000000000b6f6d86a8f9879a9c87f643768d9efc38c1da6e7',
polygon: '0000000000000000000000005a58505a96d1dbf8df91cb21b54419fc36e93fde',
avalanche: '0000000000000000000000000e082f06ff657d94310cb8ce8b0d9a04541d8052',
oasis: '0000000000000000000000005848c791e09901b40a9ef749f2a6735b418d7564',
algorand: '67e93fa6c8ac5c819990aa7340c0c16b508abb1178be9b30d024b8ac25193d45',
aptos: '0000000000000000000000000000000000000000000000000000000000000001',
aurora: '00000000000000000000000051b5123a7b0f9b2ba265f9c4c8de7d78d52f510f',
fantom: '0000000000000000000000007c9fc5741288cdfdd83ceb07f3ea7e22618d79d2',
karura: '000000000000000000000000ae9d7fe007b3327aa64a32824aaac52c42a6e624',
acala: '000000000000000000000000ae9d7fe007b3327aa64a32824aaac52c42a6e624',
klaytn: '0000000000000000000000005b08ac39eaed75c0439fc750d9fe7e1f9dd0193f',
celo: '000000000000000000000000796dff6d74f3e27060b71255fe517bfb23c93eed',
near: '148410499d3fcda4dcfd68a1ebfcdddda16ab28326448d4aae4d2f0465cdfcb7',
moonbeam: '000000000000000000000000b1731c586ca89a23809861c6103f0b96b3f57d92',
arbitrum: '0000000000000000000000000b2402144bb366a632d14b83f244d2e0e21bd39c',
optimism: '0000000000000000000000001d68124e65fafc907325e3edbf8c4d84499daa8b',
xpla: '8f9cf727175353b17a5f574270e370776123d90fd74956ae4277962b4fdee24c',
injective: '00000000000000000000000045dbea4617971d93188eda21530bc6503d153313',
sui: 'ccceeb29348f71bdd22ffef43a2a19c1f5b5e17c5cca5411529120182672ade5',
base: '0000000000000000000000008d2de8d2f73F1F4cAB472AC9A881C9b123C79627',
};
export const isTokenBridgeEmitter = (chain: ChainId | ChainName, emitter: string) =>
TOKEN_BRIDGE_EMITTERS[coalesceChainName(chain)] === emitter;
export const NFT_BRIDGE_EMITTERS: { [key in ChainName]?: string } = {
solana: '0def15a24423e1edd1a5ab16f557b9060303ddbab8c803d2ee48f4b78a1cfd6b',
ethereum: '0000000000000000000000006ffd7ede62328b3af38fcd61461bbfc52f5651fe',
bsc: '0000000000000000000000005a58505a96d1dbf8df91cb21b54419fc36e93fde',
polygon: '00000000000000000000000090bbd86a6fe93d3bc3ed6335935447e75fab7fcf',
avalanche: '000000000000000000000000f7b6737ca9c4e08ae573f75a97b73d7a813f5de5',
oasis: '00000000000000000000000004952d522ff217f40b5ef3cbf659eca7b952a6c1',
aurora: '0000000000000000000000006dcc0484472523ed9cdc017f711bcbf909789284',
fantom: '000000000000000000000000a9c7119abda80d4a4e0c06c8f4d8cf5893234535',
karura: '000000000000000000000000b91e3638f82a1facb28690b37e3aae45d2c33808',
acala: '000000000000000000000000b91e3638f82a1facb28690b37e3aae45d2c33808',
klaytn: '0000000000000000000000003c3c561757baa0b78c5c025cdeaa4ee24c1dffef',
celo: '000000000000000000000000a6a377d75ca5c9052c9a77ed1e865cc25bd97bf3',
moonbeam: '000000000000000000000000453cfbe096c0f8d763e8c5f24b441097d577bde2',
arbitrum: '0000000000000000000000003dd14d553cfd986eac8e3bddf629d82073e188c8',
optimism: '000000000000000000000000fe8cd454b4a1ca468b57d79c0cc77ef5b6f64585',
aptos: '0000000000000000000000000000000000000000000000000000000000000005',
base: '000000000000000000000000DA3adC6621B2677BEf9aD26598e6939CF0D92f88',
};
export const isNFTBridgeEmitter = (chain: ChainId | ChainName, emitter: string) =>
NFT_BRIDGE_EMITTERS[coalesceChainName(chain)] === emitter;

View File

@ -0,0 +1,3 @@
export * from './arrays';
export * from './consts';
export * from './utils';

View File

@ -1,7 +0,0 @@
{
"name": "@wormhole-foundation/wormhole-monitor-common",
"version": "0.0.1",
"private": true,
"main": "dist/index.js",
"types": "dist/index.d.ts"
}

View File

@ -1,8 +0,0 @@
{
"extends": "../tsconfig.base.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"include": ["src"]
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,15 @@
export async function sleep(timeout: number) {
return new Promise((resolve) => setTimeout(resolve, timeout));
}
export const assertEnvironmentVariable = (varName: string) => {
if (varName in process.env) return process.env[varName]!;
throw new Error(`Missing required environment variable: ${varName}`);
};
export const MAX_UINT_16 = '65535';
export const padUint16 = (s: string): string => s.padStart(MAX_UINT_16.length, '0');
export const MAX_UINT_64 = '18446744073709551615';
export const padUint64 = (s: string): string => s.padStart(MAX_UINT_64.length, '0');
// make a bigtable row key for the `signedVAAs` table
export const makeSignedVAAsRowKey = (chain: number, emitter: string, sequence: string): string =>
`${padUint16(chain.toString())}/${emitter}/${padUint64(sequence)}`;

View File

@ -26,7 +26,7 @@ export const TIMEOUT = 0.5 * 1000;
export const RPCS_BY_CHAIN: { [key in ChainName]?: string } = {
ethereum: process.env.ETH_RPC || 'https://svc.blockdaemon.com/ethereum/mainnet/native',
bsc: process.env.BSC_RPC || 'https://bsc-dataseed2.defibit.io',
bsc: 'https://bsc-dataseed2.defibit.io',
polygon: 'https://rpc.ankr.com/polygon',
avalanche: 'https://rpc.ankr.com/avalanche',
oasis: 'https://emerald.oasis.dev',
@ -75,9 +75,10 @@ export const DB_LAST_BLOCK_FILE = process.env.DB_LAST_BLOCK_FILE || './lastBlock
// without this, axios request will error `Z_BUF_ERROR`: https://github.com/axios/axios/issues/5346
export const AXIOS_CONFIG_JSON: AxiosRequestConfig = {
headers: { 'Accept-Encoding': 'application/json',
'Authorization': 'Bearer zpka_213d294a9a5a44619cd6a02e55a20417_5f43e4d0'
},
headers: {
'Accept-Encoding': 'application/json',
Authorization: 'Bearer zpka_213d294a9a5a44619cd6a02e55a20417_5f43e4d0',
},
};
export const GUARDIAN_RPC_HOSTS = [

View File

@ -1,309 +0,0 @@
import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { parseVaa } from '@certusone/wormhole-sdk/lib/cjs/vaa/wormhole';
import { Bigtable } from '@google-cloud/bigtable';
import {
assertEnvironmentVariable,
chunkArray,
sleep,
} from '../common';
import { cert, initializeApp } from 'firebase-admin/app';
import { getFirestore } from 'firebase-admin/firestore';
import { Database } from './Database';
import {
BigtableMessagesResultRow,
BigtableMessagesRow,
BigtableSignedVAAsResultRow,
BigtableSignedVAAsRow,
BigtableVAAsByTxHashRow,
VaasByBlock,
} from './types';
import {
makeMessageId,
makeVAAsByTxHashRowKey,
makeSignedVAAsRowKey,
parseMessageId,
} from './utils';
import { getSignedVAA } from '../utils/getSignedVAA';
import { PubSub } from '@google-cloud/pubsub';
const WATCH_MISSING_TIMEOUT = 5 * 60 * 1000;
export class BigtableDatabase extends Database {
msgTableId: string;
signedVAAsTableId: string;
vaasByTxHashTableId: string;
instanceId: string;
bigtable: Bigtable;
firestoreDb: FirebaseFirestore.Firestore;
latestCollectionName: string;
pubsubSignedVAATopic: string;
pubsub: PubSub;
constructor() {
super();
this.msgTableId = assertEnvironmentVariable('BIGTABLE_TABLE_ID');
this.signedVAAsTableId = assertEnvironmentVariable('BIGTABLE_SIGNED_VAAS_TABLE_ID');
this.vaasByTxHashTableId = assertEnvironmentVariable('BIGTABLE_VAAS_BY_TX_HASH_TABLE_ID');
this.instanceId = assertEnvironmentVariable('BIGTABLE_INSTANCE_ID');
this.latestCollectionName = assertEnvironmentVariable('FIRESTORE_LATEST_COLLECTION');
this.pubsubSignedVAATopic = assertEnvironmentVariable('PUBSUB_SIGNED_VAA_TOPIC');
try {
this.bigtable = new Bigtable();
const serviceAccount = require(assertEnvironmentVariable('FIRESTORE_ACCOUNT_KEY_PATH'));
initializeApp({
credential: cert(serviceAccount),
});
this.firestoreDb = getFirestore();
this.pubsub = new PubSub();
} catch (e) {
throw new Error('Could not load bigtable db');
}
}
async getLastBlockByChain(chain: ChainName): Promise<string | null> {
const chainId = coalesceChainId(chain);
const lastObservedBlock = this.firestoreDb
.collection(this.latestCollectionName)
.doc(chainId.toString());
const lastObservedBlockByChain = await lastObservedBlock.get();
const blockKeyData = lastObservedBlockByChain.data();
const lastBlockKey = blockKeyData?.lastBlockKey;
if (lastBlockKey) {
this.logger.info(`for chain=${chain}, found most recent firestore block=${lastBlockKey}`);
const tokens = lastBlockKey.split('/');
return chain === 'aptos' ? tokens.at(-1) : tokens[0];
}
return null;
}
async storeLatestBlock(chain: ChainName, lastBlockKey: string): Promise<void> {
if (this.firestoreDb === undefined) {
this.logger.error('no firestore db set');
return;
}
const chainId = coalesceChainId(chain);
this.logger.info(`storing last block=${lastBlockKey} for chain=${chainId}`);
const lastObservedBlock = this.firestoreDb
.collection(this.latestCollectionName)
.doc(`${chainId.toString()}`);
await lastObservedBlock.set({ lastBlockKey });
}
async storeVaasByBlock(
chain: ChainName,
vaasByBlock: VaasByBlock,
updateLatestBlock: boolean = true
): Promise<void> {
if (this.bigtable === undefined) {
this.logger.warn('no bigtable instance set');
return;
}
const chainId = coalesceChainId(chain);
const filteredBlocks = BigtableDatabase.filterEmptyBlocks(vaasByBlock);
const instance = this.bigtable.instance(this.instanceId);
const table = instance.table(this.msgTableId);
const vaasByTxHashTable = instance.table(this.vaasByTxHashTableId);
const rowsToInsert: BigtableMessagesRow[] = [];
const vaasByTxHash: { [key: string]: string[] } = {};
Object.keys(filteredBlocks).forEach((blockKey) => {
const [block, timestamp] = blockKey.split('/');
filteredBlocks[blockKey].forEach((msgKey) => {
const [txHash, vaaKey] = msgKey.split(':');
const [, emitter, seq] = vaaKey.split('/');
rowsToInsert.push({
key: makeMessageId(chainId, block, emitter, seq),
data: {
info: {
timestamp: {
value: timestamp,
// write 0 timestamp to only keep 1 cell each
// https://cloud.google.com/bigtable/docs/gc-latest-value
timestamp: '0',
},
txHash: {
value: txHash,
timestamp: '0',
},
hasSignedVaa: {
value: 0,
timestamp: '0',
},
},
},
});
const txHashRowKey = makeVAAsByTxHashRowKey(txHash, chainId);
const vaaRowKey = makeSignedVAAsRowKey(chainId, emitter, seq);
vaasByTxHash[txHashRowKey] = [...(vaasByTxHash[txHashRowKey] || []), vaaRowKey];
});
});
const txHashRowsToInsert = Object.entries(vaasByTxHash).map<BigtableVAAsByTxHashRow>(
([txHashRowKey, vaaRowKeys]) => ({
key: txHashRowKey,
data: {
info: {
vaaKeys: { value: JSON.stringify(vaaRowKeys), timestamp: '0' },
},
},
})
);
await Promise.all([table.insert(rowsToInsert), vaasByTxHashTable.insert(txHashRowsToInsert)]);
if (updateLatestBlock) {
// store latest vaasByBlock to firestore
const blockKeys = Object.keys(vaasByBlock).sort(
(bk1, bk2) => Number(bk1.split('/')[0]) - Number(bk2.split('/')[0])
);
if (blockKeys.length) {
const lastBlockKey = blockKeys[blockKeys.length - 1];
this.logger.info(`for chain=${chain}, storing last bigtable block=${lastBlockKey}`);
await this.storeLatestBlock(chain, lastBlockKey);
}
}
}
async updateMessageStatuses(messageKeys: string[], value: number = 1): Promise<void> {
const instance = this.bigtable.instance(this.instanceId);
const table = instance.table(this.msgTableId);
const chunkedMessageKeys = chunkArray(messageKeys, 1000);
for (const chunk of chunkedMessageKeys) {
const rowsToInsert: BigtableMessagesRow[] = chunk.map((id) => ({
key: id,
data: {
info: {
hasSignedVaa: {
value: value,
timestamp: '0',
},
},
},
}));
// console.log(rowsToInsert[0].data.info)
await table.insert(rowsToInsert);
}
}
async fetchMissingVaaMessages(): Promise<BigtableMessagesResultRow[]> {
const instance = this.bigtable.instance(this.instanceId);
const messageTable = instance.table(this.msgTableId);
// TODO: how to filter to only messages with hasSignedVaa === 0
const observedMessages = (await messageTable.getRows())[0] as BigtableMessagesResultRow[];
const missingVaaMessages = observedMessages.filter(
(x) => x.data.info.hasSignedVaa?.[0].value === 0
);
return missingVaaMessages;
}
async watchMissing(): Promise<void> {
const instance = this.bigtable.instance(this.instanceId);
const signedVAAsTable = instance.table(this.signedVAAsTableId);
while (true) {
try {
// this array first stores all of the messages which are missing VAAs
// messages which we find VAAs for are then pruned from the array
// lastly we try to fetch VAAs for the messages in the pruned array from the guardians
const missingVaaMessages = await this.fetchMissingVaaMessages();
const total = missingVaaMessages.length;
this.logger.info(`locating ${total} messages with hasSignedVAA === 0`);
let found = 0;
const chunkedVAAIds = chunkArray(
missingVaaMessages.map((observedMessage) => {
const { chain, emitter, sequence } = parseMessageId(observedMessage.id);
return makeSignedVAAsRowKey(chain, emitter, sequence.toString());
}),
1000
);
let chunkNum = 0;
const foundKeys: string[] = [];
for (const chunk of chunkedVAAIds) {
this.logger.info(`processing chunk ${++chunkNum} of ${chunkedVAAIds.length}`);
const vaaRows = (
await signedVAAsTable.getRows({
keys: chunk,
decode: false,
})
)[0] as BigtableSignedVAAsResultRow[];
for (const row of vaaRows) {
try {
const vaaBytes = row.data.info.bytes[0].value;
const parsed = parseVaa(vaaBytes);
const matchingIndex = missingVaaMessages.findIndex((observedMessage) => {
const { chain, emitter, sequence } = parseMessageId(observedMessage.id);
if (
parsed.emitterChain === chain &&
parsed.emitterAddress.toString('hex') === emitter &&
parsed.sequence === sequence
) {
return true;
}
});
if (matchingIndex !== -1) {
found++;
// remove matches to keep array lean
// messages with missing VAAs will be kept in the array
const [matching] = missingVaaMessages.splice(matchingIndex, 1);
foundKeys.push(matching.id);
}
} catch (e) {}
}
}
this.logger.info(`processed ${total} messages, found ${found}, missing ${total - found}`);
this.updateMessageStatuses(foundKeys);
// attempt to fetch VAAs missing from messages from the guardians and store them
// this is useful for cases where the VAA doesn't exist in the `signedVAAsTable` (perhaps due to an outage) but is available
const missingSignedVAARows: BigtableSignedVAAsRow[] = [];
for (const msg of missingVaaMessages) {
const { chain, emitter, sequence } = parseMessageId(msg.id);
const seq = sequence.toString();
const vaaBytes = await getSignedVAA(chain, emitter, seq);
if (vaaBytes) {
const key = makeSignedVAAsRowKey(chain, emitter, seq);
missingSignedVAARows.push({
key,
data: {
info: {
bytes: { value: vaaBytes, timestamp: '0' },
},
},
});
}
}
this.storeSignedVAAs(missingSignedVAARows);
this.publishSignedVAAs(missingSignedVAARows.map((r) => r.key));
// TODO: add slack message alerts
} catch (e) {
this.logger.error(e);
}
await sleep(WATCH_MISSING_TIMEOUT);
}
}
async storeSignedVAAs(rows: BigtableSignedVAAsRow[]): Promise<void> {
const instance = this.bigtable.instance(this.instanceId);
const table = instance.table(this.signedVAAsTableId);
const chunks = chunkArray(rows, 1000);
for (const chunk of chunks) {
await table.insert(chunk);
this.logger.info(`wrote ${chunk.length} signed VAAs to the ${this.signedVAAsTableId} table`);
}
}
async publishSignedVAAs(keys: string[]): Promise<void> {
if (keys.length === 0) {
return;
}
try {
const topic = this.pubsub.topic(this.pubsubSignedVAATopic);
if (!(await topic.exists())) {
this.logger.error(`pubsub topic doesn't exist: ${this.publishSignedVAAs}`);
return;
}
for (const key of keys) {
await topic.publishMessage({ data: Buffer.from(key) });
}
this.logger.info(
`published ${keys.length} signed VAAs to pubsub topic: ${this.pubsubSignedVAATopic}`
);
} catch (e) {
this.logger.error(`pubsub error - ${e}`);
}
}
}

View File

@ -1,6 +1,6 @@
import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { getLogger, WormholeLogger } from '../utils/logger';
import { VaasByBlock } from './types';
import { VaaLog, VaasByBlock } from './types';
export class Database {
logger: WormholeLogger;
@ -14,10 +14,20 @@ export class Database {
}
return filteredVaasByBlock;
}
async getLastBlockByChain(chain: ChainName): Promise<string | null> {
throw new Error('Not Implemented');
}
async storeVaasByBlock(chain: ChainName, vaasByBlock: VaasByBlock): Promise<void> {
throw new Error('Not Implemented');
}
async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void> {
throw new Error('Not Implemented');
}
async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void> {
throw new Error('Not Implemented');
}
}

View File

@ -1,80 +1,105 @@
import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { readFileSync, writeFileSync } from 'fs';
import { DB_LAST_BLOCK_FILE, JSON_DB_FILE } from '../consts';
import { Database } from './Database';
import { DB, LastBlockByChain, VaasByBlock } from './types';
import * as mongoDB from "mongodb";
import { SequenceNumber } from '@mysten/sui.js';
import { LastBlockByChain, VaaLog, VaasByBlock } from './types';
import * as mongoDB from 'mongodb';
export const collections: { wormholeTx?: mongoDB.Collection } = {}
const ENCODING = 'utf8';
const WORMHOLE_TX_COLLECTION: string = 'wormholeTx';
const WORMHOLE_LAST_BLOCK_COLLECTION: string = 'lastBlockByChain';
export class MongoDatabase extends Database {
lastBlockByChain: LastBlockByChain;
dbFile: string;
dbLastBlockFile: string;
client: mongoDB.MongoClient;
db: mongoDB.Db;
wormholeTx: mongoDB.Collection;
private client: mongoDB.MongoClient | null = null;
private db: mongoDB.Db | null = null;
private wormholeTx: mongoDB.Collection | null = null;
private lastTxBlockByChain: mongoDB.Collection | null = null;
private lastBlockByChain: LastBlockByChain | null = null;
constructor() {
super();
this.client = new mongoDB.MongoClient("mongodb://localhost:27017");
this.client.connect();
this.db = this.client.db("wormhole");
this.wormholeTx = this.db.collection("wormholeTx");
// collections.games = gamesCollection;
console.log(`Successfully connected to database: ${this.db.databaseName} `);
//this.db = client.db("wormhole");
this.lastBlockByChain = {};
if (!process.env.DB_LAST_BLOCK_FILE) {
this.logger.info(`no db file set, using default path=${DB_LAST_BLOCK_FILE}`);
}
this.dbFile = JSON_DB_FILE;
this.dbLastBlockFile = DB_LAST_BLOCK_FILE;
this.lastBlockByChain = null;
try {
const rawLast = readFileSync(this.dbLastBlockFile, ENCODING);
this.lastBlockByChain = JSON.parse(rawLast);
this.client = new mongoDB.MongoClient(process.env.MONGODB_URI as string);
this.connectDB();
this.db = this.client.db(process.env.MONGODB_DATABASE ?? 'wormhole');
this.wormholeTx = this.db.collection(WORMHOLE_TX_COLLECTION);
this.lastTxBlockByChain = this.db.collection(WORMHOLE_LAST_BLOCK_COLLECTION);
} catch (e) {
this.logger.warn('Failed to load DB, initiating a fresh one.');
throw new Error(`(MongoDB) Error: ${e}`);
}
}
async connectDB() {
await this.client?.connect();
console.log(`Successfully connected to database: ${this.db?.databaseName} `);
}
async getLastBlockByChainFromDB() {
const latestBlocks = await this.lastTxBlockByChain?.findOne({});
const json = JSON.parse(JSON.stringify(latestBlocks));
this.lastBlockByChain = json;
}
async getLastBlockByChain(chain: ChainName): Promise<string | null> {
if (!this.lastBlockByChain) await this.getLastBlockByChainFromDB();
const chainId = coalesceChainId(chain);
const blockInfo = this.lastBlockByChain[chainId];
const blockInfo: string | undefined = this.lastBlockByChain?.[chainId];
if (blockInfo) {
const tokens = blockInfo.split('/');
const tokens = String(blockInfo)?.split('/');
return chain === 'aptos' ? tokens.at(-1)! : tokens[0];
}
return null;
}
async storeVaasByBlock(chain: ChainName, vaasByBlock: VaasByBlock): Promise<void> {
const chainId = coalesceChainId(chain);
const filteredVaasByBlock = Database.filterEmptyBlocks(vaasByBlock);
if (Object.keys(filteredVaasByBlock).length) {
}
// const chainId = coalesceChainId(chain);
// const filteredVaasByBlock = Database.filterEmptyBlocks(vaasByBlock);
// if (Object.keys(filteredVaasByBlock).length) {
// }
// this will always overwrite the "last" block, so take caution if manually backfilling gaps
const blockKeys = Object.keys(vaasByBlock).sort(
(bk1, bk2) => Number(bk1.split('/')[0]) - Number(bk2.split('/')[0])
);
if (blockKeys.length) {
this.lastBlockByChain[chainId] = blockKeys[blockKeys.length - 1];
this.wormholeTx.insertOne({chainId: chainId, block: this.lastBlockByChain[chainId], data: vaasByBlock});
//writeFileSync(this.dbLastBlockFile, JSON.stringify(this.lastBlockByChain), ENCODING);
}
// const blockKeys = Object.keys(vaasByBlock).sort(
// (bk1, bk2) => Number(bk1.split('/')[0]) - Number(bk2.split('/')[0]),
// );
// if (blockKeys.length) {
// this.lastBlockByChain[chainId] = blockKeys[blockKeys.length - 1];
// await this.wormholeTx.insertOne({
// chainId: chainId,
// block: this.lastBlockByChain[chainId],
// data: vaasByBlock,
// });
// }
}
async storeVaa(chain: ChainName, txHash: string, vaa_id:string, payload: string): Promise<void> {
async storeVaaLogs(chain: ChainName, vaaLogs: VaaLog[]): Promise<void> {
await this.wormholeTx?.insertMany(vaaLogs);
}
async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void> {
const chainId = coalesceChainId(chain);
this.wormholeTx.insertOne({chainId: chainId, txHash: txHash, vaa_id: vaa_id, payload: payload});
await this.lastTxBlockByChain?.findOneAndUpdate(
{},
{
$set: {
[chainId]: lastBlock,
updatedAt: new Date().getTime(),
},
},
{
upsert: true,
},
);
}
async storeVaa(chain: ChainName, txHash: string, vaa_id: string, payload: string): Promise<void> {
const chainId = coalesceChainId(chain);
this.wormholeTx?.insertOne({
chainId: chainId,
txHash: txHash,
vaa_id: vaa_id,
payload: payload,
});
}
}

View File

@ -1,5 +1,16 @@
import { ChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { Row } from '@google-cloud/bigtable';
export type VaaLog = {
id: string;
chainId: number;
chainName: string;
emitter: string;
sequence: number;
txHash: string;
sender: string;
payload: any;
blockNumber: number;
};
export type VaasByBlock = { [blockInfo: string]: string[] };
export type DB = { [chain in ChainId]?: VaasByBlock };
export type LastBlockByChain = { [chain in ChainId]?: string };

View File

@ -1,15 +1,9 @@
import { ChainId, ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import {
INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN,
MAX_UINT_64,
padUint16,
padUint64,
} from '../common';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN, MAX_UINT_64, padUint16, padUint64 } from '../common';
import { DB_SOURCE } from '../consts';
import { BigtableDatabase } from './BigtableDatabase';
import { Database } from './Database';
import { JsonDatabase } from './JsonDatabase';
import { VaasByBlock } from './types';
import { VaaLog, VaasByBlock } from './types';
import { MongoDatabase } from './MongoDB';
// Bigtable Message ID format
@ -20,10 +14,10 @@ export function makeMessageId(
chainId: number,
block: string,
emitter: string,
sequence: string
sequence: string,
): string {
return `${padUint16(chainId.toString())}/${padUint64(
(BigInt(MAX_UINT_64) - BigInt(block)).toString()
(BigInt(MAX_UINT_64) - BigInt(block)).toString(),
)}/${emitter}/${padUint64(sequence)}`;
}
@ -49,7 +43,7 @@ export const makeVaaKey = (
transactionHash: string,
chain: ChainId | ChainName,
emitter: string,
seq: string
seq: string,
): string => `${transactionHash}:${coalesceChainId(chain)}/${emitter}/${seq}`;
// make a bigtable row key for the `vaasByTxHash` table
@ -62,10 +56,7 @@ export const makeSignedVAAsRowKey = (chain: number, emitter: string, sequence: s
let database: Database = new Database();
export const initDb = (): Database => {
if (DB_SOURCE === 'bigtable') {
database = new BigtableDatabase();
(database as BigtableDatabase).watchMissing();
} else if (DB_SOURCE === 'mongo') {
if (DB_SOURCE === 'mongo') {
database = new MongoDatabase();
} else {
database = new JsonDatabase();
@ -76,6 +67,7 @@ export const initDb = (): Database => {
export const getResumeBlockByChain = async (chain: ChainName): Promise<number | null> => {
const lastBlock = await database.getLastBlockByChain(chain);
const initialBlock = INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN[chain];
return lastBlock !== null
? Number(lastBlock) + 1
: initialBlock !== undefined
@ -85,11 +77,19 @@ export const getResumeBlockByChain = async (chain: ChainName): Promise<number |
export const storeVaasByBlock = async (
chain: ChainName,
vaasByBlock: VaasByBlock
vaasByBlock: VaasByBlock,
): Promise<void> => {
return database.storeVaasByBlock(chain, vaasByBlock);
};
export const storeVaaLogs = (chain: ChainName, vaaLogs: VaaLog[]): Promise<void> => {
return database.storeVaaLogs(chain, vaaLogs);
};
export const storeLatestProcessBlock = (chain: ChainName, lastBlock: number): Promise<void> => {
return database.storeLatestProcessBlock(chain, lastBlock);
};
export function printRow(rowkey: string, rowData: { [x: string]: any }) {
console.log(`Reading data for ${rowkey}:`);

View File

@ -1,60 +1,74 @@
import * as dotenv from 'dotenv';
dotenv.config();
import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { ChainName, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { initDb } from './databases/utils';
import { makeFinalizedWatcher } from './watchers/utils';
import { InfrastructureController } from "./infrastructure/infrastructure.controller";
import { createServer } from "./builder/server";
import { InfrastructureController } from './infrastructure/infrastructure.controller';
import { createServer } from './builder/server';
// EVM Chains not supported
// aurora, gnosis, neon, sepolia
initDb();
const evmChains: EVMChainName[] = [
'acala',
'arbitrum',
'avalanche',
'base',
'bsc',
'celo',
'ethereum',
'fantom',
'karura',
'klaytn',
'moonbeam',
'oasis',
'optimism',
'polygon',
];
const supportedChains: ChainName[] = [
...evmChains,
'algorand',
'aptos',
'injective',
'near',
'solana',
'sui',
'terra',
'terra2',
'xpla',
];
const db = initDb();
const infrastructureController = new InfrastructureController();
const startServer = async () => {
const port = Number(process.env.PORT) || 3005;
const server = await createServer(port);
const server = await createServer(port);
server.get("/ready", { logLevel: "silent" }, infrastructureController.ready);
server.get("/health",{ logLevel: "silent" }, infrastructureController.health);
server.get('/ready', { logLevel: 'silent' }, infrastructureController.ready);
server.get('/health', { logLevel: 'silent' }, infrastructureController.health);
server.listen({ port, host: "0.0.0.0" }, (err: any, address: any) => {
server.listen({ port, host: '0.0.0.0' }, (err: any, address: any) => {
if (err) {
process.exit(1);
}
console.log(`Server listening at ${address}`);
});
}
};
startServer();
const supportedChains: ChainName[] = [
'solana',
'ethereum',
'bsc',
'polygon',
'avalanche',
'oasis',
'algorand',
'fantom',
'karura',
'acala',
'klaytn',
'celo',
'moonbeam',
'arbitrum',
'optimism',
'aptos',
'near',
'terra2',
'terra',
'xpla',
'injective',
'sui',
'base',
];
const start = async () => {
// We wait to the database to fetch the `latestBlocks` (avoid multi requests)
// Im trying not to change too much the codebase.
await db.getLastBlockByChain('unset');
for (const chain of supportedChains) {
makeFinalizedWatcher(chain).watch();
}
// for (const chain of supportedChains) {
for (const chain of evmChains) {
makeFinalizedWatcher(chain).watch();
}
};
start();

View File

@ -0,0 +1,109 @@
import crypto from 'node:crypto';
import {
SNSClient,
PublishCommand,
PublishCommandInput,
PublishBatchCommand,
PublishBatchCommandInput,
PublishBatchRequestEntry,
} from '@aws-sdk/client-sns';
import { SNSConfig, SNSImplementation, SNSInput, SNSPublishMessageOutput } from '../types';
class AwsSNS implements SNSImplementation {
private client: SNSClient | null = null;
private subject: string | null = null;
private topicArn: string | null = null;
constructor(private config: SNSConfig) {
const { region, credentials, subject, topicArn } = this.config;
this.subject = subject;
this.topicArn = topicArn;
this.client = new SNSClient({
region,
credentials,
});
}
async publishMessage({ subject, message }: SNSInput): Promise<SNSPublishMessageOutput> {
const input: PublishCommandInput = {
TopicArn: this.topicArn!,
Subject: subject ?? this.subject!,
Message: message,
};
try {
const command = new PublishCommand(input);
await this.client?.send(command);
} catch (error) {
console.error(error);
return {
status: 'error',
};
}
return {
status: 'success',
};
}
async publishMessages(messages: SNSInput[]): Promise<SNSPublishMessageOutput> {
const CHUNK_SIZE = 10;
const batches: PublishBatchCommandInput[] = [];
const inputs: PublishBatchRequestEntry[] = messages.map(({ subject, message }) => ({
Id: crypto.randomUUID(),
Subject: subject ?? this.subject!,
Message: message,
}));
// PublishBatchCommand: only supports max 10 items per batch
for (let i = 0; i <= inputs.length; i += CHUNK_SIZE) {
const batch: PublishBatchCommandInput = {
TopicArn: this.topicArn!,
PublishBatchRequestEntries: inputs.slice(i, i + CHUNK_SIZE),
};
batches.push(batch);
}
try {
const promises = [];
const errors = [];
for (const batch of batches) {
const command = new PublishBatchCommand(batch);
promises.push(this.client?.send(command));
}
const results = await Promise.allSettled(promises);
for (const result of results) {
if (result.status !== 'fulfilled') {
console.error(result.reason);
errors.push(result.reason);
}
}
if (errors.length > 0) {
console.error(errors);
return {
status: 'error',
reasons: errors,
};
}
} catch (error) {
console.error(error);
return {
status: 'error',
};
}
return {
status: 'success',
};
}
}
export default AwsSNS;

View File

@ -0,0 +1,25 @@
export interface SNSImplementation {
publishMessage(message: SNSInput): Promise<SNSPublishMessageOutput>;
publishMessages(messages: SNSInput[]): Promise<SNSPublishMessageOutput>;
}
export interface SNSConfig {
region: string;
topicArn: string;
subject: string;
credentials: {
accessKeyId: string;
secretAccessKey: string;
};
}
export interface SNSInput {
subject?: string;
message: string;
}
export interface SNSPublishMessageOutput {
status: 'success' | 'error';
reason?: string;
reasons?: string[];
}

View File

@ -1,10 +1,14 @@
import { Implementation__factory } from '@certusone/wormhole-sdk/lib/cjs/ethers-contracts/factories/Implementation__factory';
import { CONTRACTS, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import {
CONTRACTS,
EVMChainName,
coalesceChainId,
} from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { Log } from '@ethersproject/abstract-provider';
import axios from 'axios';
import { BigNumber } from 'ethers';
import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts';
import { VaasByBlock } from '../databases/types';
import { VaaLog, VaasByBlock } from '../databases/types';
import { makeBlockKey, makeVaaKey } from '../databases/utils';
import { Watcher } from './Watcher';
@ -60,14 +64,8 @@ export class EVMWatcher extends Watcher {
false,
],
},
{
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer zpka_213d294a9a5a44619cd6a02e55a20417_5f43e4d0'
}
}
],
AXIOS_CONFIG_JSON
AXIOS_CONFIG_JSON,
)
)?.data?.[0];
if (result && result.result === null) {
@ -100,7 +98,7 @@ export class EVMWatcher extends Watcher {
};
}
throw new Error(
`Unable to parse result of eth_getBlockByNumber for ${blockNumberOrTag} on ${rpc}`
`Unable to parse result of eth_getBlockByNumber for ${blockNumberOrTag} on ${rpc}`,
);
}
async getBlocks(fromBlock: number, toBlock: number): Promise<Block[]> {
@ -151,20 +149,20 @@ export class EVMWatcher extends Watcher {
}
console.error(reqs[idx], response, idx);
throw new Error(
`Unable to parse result of eth_getBlockByNumber for ${fromBlock + idx} on ${rpc}`
`Unable to parse result of eth_getBlockByNumber for ${fromBlock + idx} on ${rpc}`,
);
}
},
);
}
throw new Error(
`Unable to parse result of eth_getBlockByNumber for range ${fromBlock}-${toBlock} on ${rpc}`
`Unable to parse result of eth_getBlockByNumber for range ${fromBlock}-${toBlock} on ${rpc}`,
);
}
async getLogs(
fromBlock: number,
toBlock: number,
address: string,
topics: string[]
topics: string[],
): Promise<Array<Log>> {
const rpc = RPCS_BY_CHAIN[this.chain];
if (!rpc) {
@ -188,7 +186,7 @@ export class EVMWatcher extends Watcher {
],
},
],
AXIOS_CONFIG_JSON
AXIOS_CONFIG_JSON,
)
)?.data?.[0]?.result;
if (result) {
@ -227,11 +225,12 @@ export class EVMWatcher extends Watcher {
vaasByBlock[makeBlockKey(block.number.toString(), timestamp)] = [];
}
this.logger.info(`processing ${logs.length} logs`);
for (const log of logs) {
const blockNumber = log.blockNumber;
const emitter = log.topics[1].slice(2);
const {
args: { sequence },
args: { sequence, sender, payload },
} = wormholeInterface.parseLog(log);
const vaaKey = makeVaaKey(log.transactionHash, this.chain, emitter, sequence.toString());
const blockKey = makeBlockKey(blockNumber.toString(), timestampsByBlock[blockNumber]);
@ -239,4 +238,46 @@ export class EVMWatcher extends Watcher {
}
return vaasByBlock;
}
async getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
const vaaLogs: VaaLog[] = [];
const address = CONTRACTS.MAINNET[this.chain].core;
if (!address) {
throw new Error(`Core contract not defined for ${this.chain}`);
}
// this.logger.info(`fetching info for blocks ${fromBlock} to ${toBlock}`);
const logs = await this.getLogs(fromBlock, toBlock, address, [LOG_MESSAGE_PUBLISHED_TOPIC]);
this.logger.info(`processing ${logs.length} logs`);
for (const log of logs) {
// console.log('log', log);
// console.log('parseLog', wormholeInterface.parseLog(log));
const { args } = wormholeInterface.parseLog(log);
const { sequence, sender, payload } = args || {};
const chainName = this.chain;
const blockNumber = log.blockNumber;
const emitter = log.topics[1].slice(2);
const chainId = coalesceChainId(this.chain);
const vaaId = `${chainId}/${emitter}/${sequence.toString()}`;
const vaaLog: VaaLog = {
id: vaaId,
chainName,
chainId,
emitter,
sequence: sequence.toString(),
txHash: log.transactionHash,
sender,
payload,
blockNumber,
};
vaaLogs.push(vaaLog);
}
return vaaLogs;
}
}

View File

@ -1,22 +1,38 @@
import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import {
INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN,
sleep,
} from '../common';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN, sleep } from '../common';
import { z } from 'zod';
import { TIMEOUT } from '../consts';
import { VaasByBlock } from '../databases/types';
import { getResumeBlockByChain, storeVaasByBlock } from '../databases/utils';
import { VaaLog, VaasByBlock } from '../databases/types';
import {
getResumeBlockByChain,
storeVaaLogs,
storeVaasByBlock,
storeLatestProcessBlock,
} from '../databases/utils';
import { getLogger, WormholeLogger } from '../utils/logger';
import AwsSNS from '../services/SNS/AwsSNS';
import { SNSConfig, SNSInput } from '../services/SNS/types';
const config: SNSConfig = {
region: process.env.AWS_SNS_REGION as string,
subject: process.env.AWS_SNS_SUBJECT as string,
topicArn: process.env.AWS_TOPIC_ARN as string,
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID as string,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY as string,
},
};
export class Watcher {
chain: ChainName;
logger: WormholeLogger;
maximumBatchSize: number = 100;
SNSClient: AwsSNS;
constructor(chain: ChainName) {
this.chain = chain;
this.logger = getLogger(chain);
this.SNSClient = new AwsSNS(config);
}
async getFinalizedBlockNumber(): Promise<number> {
@ -27,6 +43,10 @@ export class Watcher {
throw new Error('Not Implemented');
}
async getVaaLogs(fromBlock: number, toBlock: number): Promise<VaaLog[]> {
throw new Error('Not Implemented');
}
isValidBlockKey(key: string) {
try {
const [block, timestamp] = key.split('/');
@ -51,16 +71,36 @@ export class Watcher {
let toBlock: number | null = null;
let fromBlock: number | null = await getResumeBlockByChain(this.chain);
let retry = 0;
while (true) {
try {
if (fromBlock !== null && toBlock !== null && fromBlock <= toBlock) {
// fetch logs for the block range, inclusive of toBlock
toBlock = Math.min(fromBlock + this.maximumBatchSize - 1, toBlock);
this.logger.info(`fetching messages from ${fromBlock} to ${toBlock}`);
const vaasByBlock = await this.getMessagesForBlocks(fromBlock, toBlock);
await storeVaasByBlock(this.chain, vaasByBlock);
// const vaasByBlock = await this.getMessagesForBlocks(fromBlock, toBlock);
// await storeVaasByBlock(this.chain, vaasByBlock);
// Here we get all the vaa logs from LOG_MESSAGE_PUBLISHED_TOPIC
// Then store the latest processed block by Chain Id
try {
const vaaLogs = await this.getVaaLogs(fromBlock, toBlock);
if (vaaLogs?.length > 0) {
await storeVaaLogs(this.chain, vaaLogs);
const messages: SNSInput[] = vaaLogs.map((log) => ({
message: JSON.stringify({ ...log }),
}));
this.SNSClient.publishMessages(messages);
}
await storeLatestProcessBlock(this.chain, toBlock);
} catch (e) {
this.logger.error(e);
}
fromBlock = toBlock + 1;
}
try {
this.logger.info('fetching finalized block');
toBlock = await this.getFinalizedBlockNumber();

View File

@ -1,4 +1,4 @@
import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { ChainName, EVMChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { AlgorandWatcher } from './AlgorandWatcher';
import { AptosWatcher } from './AptosWatcher';
import { ArbitrumWatcher } from './ArbitrumWatcher';
@ -17,22 +17,16 @@ import { SuiWatcher } from './SuiWatcher';
export function makeFinalizedWatcher(chainName: ChainName): Watcher {
if (chainName === 'solana') {
return new SolanaWatcher();
} else if (chainName === 'ethereum' || chainName === 'karura' || chainName === 'acala') {
return new EVMWatcher(chainName, 'finalized');
} else if (['ethereum', 'karura', 'acala'].includes(chainName)) {
return new EVMWatcher(chainName as EVMChainName, 'finalized');
} else if (chainName === 'bsc') {
return new BSCWatcher();
} else if (chainName === 'polygon') {
return new PolygonWatcher();
} else if (
chainName === 'avalanche' ||
chainName === 'oasis' ||
chainName === 'fantom' ||
chainName === 'klaytn' ||
chainName === 'celo' ||
chainName === 'optimism' ||
chainName === 'base'
['avalanche', 'oasis', 'fantom', 'klaytn', 'celo', 'optimism', 'base'].includes(chainName)
) {
return new EVMWatcher(chainName);
return new EVMWatcher(chainName as EVMChainName);
} else if (chainName === 'algorand') {
return new AlgorandWatcher();
} else if (chainName === 'moonbeam') {

View File

@ -14,6 +14,7 @@
"forceConsistentCasingInFileNames": true,
"noFallthroughCasesInSwitch": true,
"resolveJsonModule": true,
"noEmit": true,
"lib": ["es2022"]
},
"include": ["scripts", "src", "src/abi/*.json"]