Feat/evm transfer redeemed support (#727)

* feat(717): add support for transferRedeemed log message - EVM

* build: update .gitignore file
This commit is contained in:
Ricardo Olarte 2023-10-03 15:03:52 -05:00 committed by GitHub
parent 6be2607c65
commit bc8a3114aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 546 additions and 171 deletions

2
.gitignore vendored
View File

@ -10,6 +10,8 @@ dist/
db.json
lastBlockByChain.json
lastBlocksByChain.json
globalTxs.json
wormholeTxs.json
.env
serviceAccountKey.json
bigtableAccountKey.json

View File

@ -4,14 +4,17 @@ P2P_NETWORK=mainnet
LOG_DIR=
LOG_LEVEL=info
#Database source
DB_SOURCE=mongo
JSON_DB_FILE=db.json
JSON_LAST_BLOCK_FILE=lastBlockByChain.json
#Server
PORT=3005
#Database source (json or mongo)
DB_SOURCE=
#JsonDB
JSON_WH_TXS_FILE=wormholeTxs.json
JSON_GLOBAL_TXS_FILE=globalTxs.json
JSON_LAST_BLOCKS_FILE=lastBlocksByChain.json
#MongoDB
MONGODB_URI=mongodb://localhost:27017
MONGODB_DATABASE=wormhole

View File

@ -8,8 +8,10 @@ export const env = {
LOG_LEVEL: process.env.LOG_LEVEL || 'info',
DB_SOURCE: process.env.DB_SOURCE || 'local',
JSON_DB_FILE: process.env.JSON_DB_FILE || './db.json',
JSON_LAST_BLOCK_FILE: process.env.JSON_LAST_BLOCK_FILE || './lastBlockByChain.json',
JSON_WH_TXS_FILE: process.env.JSON_WH_TXS_FILE || './wormholeTxs.json',
JSON_GLOBAL_TXS_FILE: process.env.JSON_GLOBAL_TXS_FILE || './globalTxs.json',
JSON_LAST_BLOCKS_FILE: process.env.JSON_LAST_BLOCKS_FILE || './lastBlocksByChain.json',
PORT: process.env.PORT,

View File

@ -5,7 +5,7 @@ import {
} from '../common/consts';
import { NETWORK } from '../consts';
import { getLogger, WormholeLogger } from '../utils/logger';
import { DBImplementation, LastBlockByChain, WHTransaction } from './types';
import { DBImplementation, LastBlockByChain, WHTransaction, WHTransferRedeemed } from './types';
abstract class BaseDB implements DBImplementation {
public logger: WormholeLogger;
public lastBlocksByChain: LastBlockByChain[] = [];
@ -66,6 +66,7 @@ abstract class BaseDB implements DBImplementation {
abstract isConnected(): Promise<boolean>;
abstract getLastBlocksProcessed(): Promise<void>;
abstract storeWhTxs(chain: ChainName, whTxs: WHTransaction[]): Promise<void>;
abstract storeRedeemedTxs(chain: ChainName, redeemedTxs: WHTransferRedeemed[]): Promise<void>;
abstract storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void>;
}

View File

@ -2,32 +2,42 @@ import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/util
import { readFileSync, writeFileSync } from 'fs';
import { env } from '../config';
import BaseDB from './BaseDB';
import { WHTransaction } from './types';
import { WHTransaction, WHTransferRedeemed } from './types';
const ENCODING = 'utf8';
const WORMHOLE_TX_FILE: string = env.JSON_WH_TXS_FILE;
const GLOBAL_TX_FILE: string = env.JSON_GLOBAL_TXS_FILE;
const WORMHOLE_LAST_BLOCKS_FILE: string = env.JSON_LAST_BLOCKS_FILE;
export default class JsonDB extends BaseDB {
db: WHTransaction[] = [];
dbFile: string;
dbLastBlockFile: string;
wormholeTxFile: WHTransaction[] = [];
redeemedTxFile: WHTransferRedeemed[] = [];
constructor() {
super('JsonDB');
this.db = [];
this.wormholeTxFile = [];
this.redeemedTxFile = [];
this.lastBlocksByChain = [];
this.dbFile = env.JSON_DB_FILE;
this.dbLastBlockFile = env.JSON_LAST_BLOCK_FILE;
this.logger.info('Connecting...');
}
async connect(): Promise<void> {
try {
const rawDb = readFileSync(this.dbFile, ENCODING);
this.db = rawDb ? JSON.parse(rawDb) : [];
this.logger.info(`${this.dbFile} file ready`);
const whTxsFileRawData = readFileSync(WORMHOLE_TX_FILE, ENCODING);
this.wormholeTxFile = whTxsFileRawData ? JSON.parse(whTxsFileRawData) : [];
this.logger.info(`${WORMHOLE_TX_FILE} file ready`);
} catch (e) {
this.logger.warn(`${this.dbFile} file does not exists, creating new file`);
this.db = [];
this.logger.warn(`${WORMHOLE_TX_FILE} file does not exists, creating new file`);
this.wormholeTxFile = [];
}
try {
const whRedeemedTxsFileRawData = readFileSync(GLOBAL_TX_FILE, ENCODING);
this.redeemedTxFile = whRedeemedTxsFileRawData ? JSON.parse(whRedeemedTxsFileRawData) : [];
this.logger.info(`${GLOBAL_TX_FILE} file ready`);
} catch (e) {
this.logger.warn(`${GLOBAL_TX_FILE} file does not exists, creating new file`);
this.redeemedTxFile = [];
}
}
@ -42,11 +52,11 @@ export default class JsonDB extends BaseDB {
async getLastBlocksProcessed(): Promise<void> {
try {
const lastBlocksByChain = readFileSync(this.dbLastBlockFile, ENCODING);
const lastBlocksByChain = readFileSync(WORMHOLE_LAST_BLOCKS_FILE, ENCODING);
this.lastBlocksByChain = lastBlocksByChain ? JSON.parse(lastBlocksByChain) : [];
this.logger.info(`${this.dbLastBlockFile} file ready`);
this.logger.info(`${WORMHOLE_LAST_BLOCKS_FILE} file ready`);
} catch (e) {
this.logger.warn(`${this.dbLastBlockFile} file does not exists, creating new file`);
this.logger.warn(`${WORMHOLE_LAST_BLOCKS_FILE} file does not exists, creating new file`);
this.lastBlocksByChain = [];
}
}
@ -54,7 +64,7 @@ export default class JsonDB extends BaseDB {
override async storeWhTxs(chainName: ChainName, whTxs: WHTransaction[]): Promise<void> {
try {
for (let i = 0; i < whTxs.length; i++) {
let message = 'Save VAA log to JsonDB';
let message = 'Insert Wormhole Transaction Event Log to JSON file';
const currentWhTx = whTxs[i];
const { id } = currentWhTx;
@ -62,20 +72,20 @@ export default class JsonDB extends BaseDB {
? Buffer.from(currentWhTx.eventLog.unsignedVaa).toString('base64')
: currentWhTx.eventLog.unsignedVaa;
const whTxIndex = this.db?.findIndex((whTx) => whTx.id === id.toString());
const whTxIndex = this.wormholeTxFile?.findIndex((whTx) => whTx.id === id.toString());
if (whTxIndex >= 0) {
const whTx = this.db[whTxIndex];
const whTx = this.wormholeTxFile[whTxIndex];
whTx.eventLog.updatedAt = new Date();
whTx.eventLog.revision ? (whTx.eventLog.revision += 1) : (whTx.eventLog.revision = 1);
message = 'Update VAA log to MongoDB';
message = 'Update Wormhole Transaction Event Log to JSON file';
} else {
this.db.push(currentWhTx);
this.wormholeTxFile.push(currentWhTx);
}
writeFileSync(this.dbFile, JSON.stringify(this.db, null, 2), ENCODING);
writeFileSync(WORMHOLE_TX_FILE, JSON.stringify(this.wormholeTxFile, null, 2), ENCODING);
if (currentWhTx) {
const { id, eventLog } = currentWhTx;
@ -92,7 +102,68 @@ export default class JsonDB extends BaseDB {
}
}
} catch (e: unknown) {
this.logger.error(`Error while storing VAA logs: ${e}`);
this.logger.error(`Error Upsert Wormhole Transaction Event Log: ${e}`);
}
}
override async storeRedeemedTxs(
chainName: ChainName,
redeemedTxs: WHTransferRedeemed[],
): Promise<void> {
// For JsonDB we are only pushing all the "redeemed" logs into GLOBAL_TX_FILE simulating a globalTransactions collection
try {
for (let i = 0; i < redeemedTxs.length; i++) {
let message = 'Insert Wormhole Transfer Redeemed Event Log to JSON file';
const currentRedeemedTx = redeemedTxs[i];
const { id, destinationTx } = currentRedeemedTx;
const { method, status } = destinationTx;
const whTxIndex = this.wormholeTxFile?.findIndex((whTx) => whTx.id === id.toString());
if (whTxIndex >= 0) {
const whTx = this.wormholeTxFile[whTxIndex];
whTx.status = status;
whTx.eventLog.updatedAt = new Date();
whTx.eventLog.revision ? (whTx.eventLog.revision += 1) : (whTx.eventLog.revision = 1);
writeFileSync(WORMHOLE_TX_FILE, JSON.stringify(this.wormholeTxFile, null, 2), ENCODING);
}
const whRedeemedTxIndex = this.redeemedTxFile?.findIndex(
(whRedeemedTx) => whRedeemedTx.id === id.toString(),
);
if (whRedeemedTxIndex >= 0) {
const whRedeemedTx = this.redeemedTxFile[whRedeemedTxIndex];
whRedeemedTx.destinationTx.method = method;
whRedeemedTx.destinationTx.status = status;
whRedeemedTx.destinationTx.updatedAt = new Date();
whRedeemedTx.revision ? (whRedeemedTx.revision += 1) : (whRedeemedTx.revision = 1);
message = 'Update Wormhole Transfer Redeemed Event Log to JSON file';
} else {
this.redeemedTxFile.push(currentRedeemedTx);
}
writeFileSync(GLOBAL_TX_FILE, JSON.stringify(this.redeemedTxFile, null, 2), ENCODING);
if (currentRedeemedTx) {
const { id, destinationTx } = currentRedeemedTx;
const { chainId } = destinationTx;
this.logger.info({
id,
chainId,
chainName,
message,
});
}
}
} catch (e: unknown) {
this.logger.error(`Error Upsert Wormhole Transfer Redeemed Event Log: ${e}`);
}
}
@ -124,12 +195,12 @@ export default class JsonDB extends BaseDB {
try {
writeFileSync(
this.dbLastBlockFile,
WORMHOLE_LAST_BLOCKS_FILE,
JSON.stringify(this.lastBlocksByChain, null, 2),
ENCODING,
);
} catch (e: unknown) {
this.logger.error(`Error while storing latest processed block: ${e}`);
this.logger.error(`Error Insert latest processed block: ${e}`);
}
}
}

View File

@ -1,16 +1,18 @@
import { ChainName, coalesceChainId } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import BaseDB from './BaseDB';
import { LastBlockByChain, WHTransaction } from './types';
import { LastBlockByChain, WHTransaction, WHTransferRedeemed } from './types';
import * as mongoDB from 'mongodb';
import { env } from '../config';
const WORMHOLE_TX_COLLECTION: string = 'wormholeTxs';
const GLOBAL_TX_COLLECTION: string = 'globalTransactions';
const WORMHOLE_LAST_BLOCK_COLLECTION: string = 'lastBlocksByChain';
export default class MongoDB extends BaseDB {
private client: mongoDB.MongoClient | null = null;
private db: mongoDB.Db | null = null;
private wormholeTxCollection: mongoDB.Collection | null = null;
private globalTxCollection: mongoDB.Collection | null = null;
private lastTxBlockByChainCollection: mongoDB.Collection | null = null;
constructor() {
@ -19,6 +21,7 @@ export default class MongoDB extends BaseDB {
this.client = new mongoDB.MongoClient(env.MONGODB_URI as string);
this.db = this.client.db(env.MONGODB_DATABASE ?? 'wormhole');
this.wormholeTxCollection = this.db.collection(WORMHOLE_TX_COLLECTION);
this.globalTxCollection = this.db.collection(GLOBAL_TX_COLLECTION);
this.lastTxBlockByChainCollection = this.db.collection(WORMHOLE_LAST_BLOCK_COLLECTION);
}
@ -60,8 +63,7 @@ export default class MongoDB extends BaseDB {
override async storeWhTxs(chainName: ChainName, whTxs: WHTransaction[]): Promise<void> {
try {
for (let i = 0; i < whTxs.length; i++) {
let _upsertedId = null;
let message = 'Save VAA log to MongoDB';
let message = 'Insert Wormhole Transaction Event Log to MongoDB collection';
const currentWhTx = whTxs[i];
const { id, ...rest } = currentWhTx;
@ -69,7 +71,7 @@ export default class MongoDB extends BaseDB {
const whTxDocument = await this.wormholeTxCollection?.findOne({ _id: id });
if (whTxDocument) {
const response = await this.wormholeTxCollection?.findOneAndUpdate(
await this.wormholeTxCollection?.findOneAndUpdate(
{
// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
_id: id,
@ -82,18 +84,18 @@ export default class MongoDB extends BaseDB {
'eventLog.revision': 1,
},
},
{
returnDocument: 'after',
},
);
_upsertedId = response?.upsertedId;
message = 'Update VAA log to MongoDB';
message = 'Update Wormhole Transaction Event Log to MongoDB collection';
} else {
const response = await this.wormholeTxCollection?.insertOne({
...rest,
await this.wormholeTxCollection?.insertOne({
// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
_id: id,
...rest,
});
_upsertedId = response?.insertedId;
}
if (currentWhTx) {
@ -111,7 +113,90 @@ export default class MongoDB extends BaseDB {
}
}
} catch (e: unknown) {
this.logger.error(`Error while storing VAA logs: ${e}`);
this.logger.error(`Error Upsert Wormhole Transaction Event Log: ${e}`);
}
}
override async storeRedeemedTxs(
chainName: ChainName,
redeemedTxs: WHTransferRedeemed[],
): Promise<void> {
try {
for (let i = 0; i < redeemedTxs.length; i++) {
const message = 'Update Wormhole Transfer Redeemed Event Log to MongoDB collection';
const currentWhRedeemedTx = redeemedTxs[i];
const { id, destinationTx } = currentWhRedeemedTx;
const { method, status } = destinationTx;
const whTxResponse = await this.wormholeTxCollection?.findOneAndUpdate(
{
// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
_id: id,
},
{
$set: {
'eventLog.updatedAt': new Date(),
status: status,
},
$inc: {
'eventLog.revision': 1,
},
},
{
returnDocument: 'after',
},
);
if (!whTxResponse?.value) {
this.logger.error(
`Error Update Wormhole Transfer Redeemed Event Log: ${id} does not exist on ${WORMHOLE_TX_COLLECTION} collection`,
);
return;
}
const globalTxResponse = await this.globalTxCollection?.findOneAndUpdate(
{
// @ts-ignore - I want to pass a custom _id field, but TypeScript doesn't like it (ObjectId error)
_id: id,
},
{
$set: {
'destinationTx.method': method,
'destinationTx.status': status,
'destinationTx.updatedAt': new Date(),
},
$inc: {
revision: 1,
},
},
{
returnDocument: 'after',
},
);
if (!globalTxResponse?.value) {
this.logger.error(
`Error Update Wormhole Transfer Redeemed Event Log: ${id} does not exist on ${GLOBAL_TX_COLLECTION} collection`,
);
return;
}
if (currentWhRedeemedTx) {
const { id, destinationTx } = currentWhRedeemedTx;
const { chainId } = destinationTx;
this.logger.info({
id,
chainId,
chainName,
message,
});
}
}
} catch (e: unknown) {
this.logger.error(`Error Update Wormhole Transfer Redeemed Event Log: ${e}`);
}
}

View File

@ -1,4 +1,4 @@
import { ChainId, ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import JsonDB from './JsonDB';
import MongoDB from './MongoDB';
@ -10,6 +10,7 @@ export interface DBImplementation {
getLastBlocksProcessed(): Promise<void>;
getLastBlockByChain(chain: ChainName): string | null;
storeWhTxs(chain: ChainName, whTxs: WHTransaction[]): Promise<void>;
storeRedeemedTxs(chain: ChainName, redeemedTxs: WHTransferRedeemed[]): Promise<void>;
storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void>;
}
@ -51,3 +52,15 @@ type LastBlockByChainWith_Id = LastBlockItem & {
};
export type LastBlockByChain = LastBlockByChainWith_Id | LastBlockByChainWithId;
export type WHTransferRedeemed = {
id: string;
destinationTx: {
chainId: number;
status: string;
method: string;
timestamp: Date;
updatedAt: Date;
};
revision: number;
};

View File

@ -3,7 +3,7 @@ import { MAX_UINT_64, padUint16, padUint64 } from '../common';
import JsonDB from './JsonDB';
import MongoDB from './MongoDB';
import { env } from '../config';
import { DBOptionTypes, WHTransaction } from './types';
import { DBOptionTypes, WHTransaction, WHTransferRedeemed } from './types';
import { checkIfDateIsInMilliseconds } from '../utils/date';
// Bigtable Message ID format
@ -78,3 +78,29 @@ export const makeWHTransaction = async ({
status: WH_TX_STATUS,
};
};
export const makeWHRedeemedTransaction = async ({
emitterChainId,
emitterAddress,
sequence,
}: {
emitterChainId: number;
emitterAddress: string;
sequence: number;
}): Promise<WHTransferRedeemed> => {
const vaaId = `${emitterChainId}/${emitterAddress}/${sequence}`;
const REDEEMED_TX_STATUS = 'completed';
const REDEEMED_TX_METHOD = 'event-watcher-redeemed';
return {
id: vaaId,
destinationTx: {
chainId: emitterChainId,
status: REDEEMED_TX_STATUS,
method: REDEEMED_TX_METHOD,
timestamp: new Date(),
updatedAt: new Date(),
},
revision: 1,
};
};

View File

@ -35,8 +35,15 @@ class EventWatcher {
watcher.setDB(this.db);
watcher.setServices(this.sns);
watcher.watch();
} catch (error: unknown) {
logInfo({ labels: ['EventWatcher'], message: `${error}` });
} catch (e: unknown) {
let message;
if (e instanceof Error) {
message = e.message;
} else {
message = e;
}
logInfo({ labels: ['EventWatcher'], message: `${message}` });
}
}
}

View File

@ -1,17 +1,21 @@
import crypto from 'node:crypto';
import {
SNSClient,
PublishCommand,
PublishCommandInput,
PublishBatchCommand,
PublishBatchCommandInput,
PublishBatchRequestEntry,
} from '@aws-sdk/client-sns';
import { AwsSNSConfig, SNSInput, SNSMessage, SNSPublishMessageOutput } from '../types';
import {
AwsSNSConfig,
SNSInput,
SNSPublishMessageOutput,
WhEventType,
WhTxSNSMessage,
} from '../types';
import BaseSNS from '../BaseSNS';
import { env } from '../../../config';
import { WHTransaction } from '../../../databases/types';
import { makeSnsMessage } from '../utils';
import { WHTransaction, WHTransferRedeemed } from '../../../databases/types';
import { makeRedeemedTxSnsMessage, makeWhTxSnsMessage } from '../utils';
import { ChainId, coalesceChainName } from '@certusone/wormhole-sdk';
const isDev = env.NODE_ENV !== 'production';
@ -39,69 +43,43 @@ class AwsSNS extends BaseSNS {
this.logger.info('Client initialized');
}
makeSNSInput(whTx: WHTransaction): SNSInput {
const snsMessage = makeSnsMessage(whTx, this.metadata);
makeSNSInput(data: WHTransaction | WHTransferRedeemed, eventType: WhEventType): SNSInput {
let snsMessage;
let deduplicationId;
if (eventType === 'whTx') {
const whTx = data as WHTransaction;
snsMessage = makeWhTxSnsMessage(whTx, this.metadata);
deduplicationId = whTx.id;
}
if (eventType === 'redeemedTx') {
const redeemedTx = data as WHTransferRedeemed;
snsMessage = makeRedeemedTxSnsMessage(redeemedTx, this.metadata);
deduplicationId = 'redeemedTx.id';
}
return {
message: JSON.stringify(snsMessage),
subject: env.AWS_SNS_SUBJECT,
groupId: env.AWS_SNS_SUBJECT,
deduplicationId: whTx.id,
deduplicationId,
};
}
override async publishMessage(
whTx: WHTransaction,
async createMessages(
txs: WHTransaction[] | WHTransferRedeemed[],
eventType: WhEventType,
fifo: boolean = false,
): Promise<SNSPublishMessageOutput> {
const { message, subject, groupId, deduplicationId } = this.makeSNSInput(whTx);
const input: PublishCommandInput = {
TopicArn: this.topicArn!,
Subject: subject ?? this.subject!,
Message: message,
...(fifo && { MessageGroupId: groupId }),
...(fifo && { MessageDeduplicationId: deduplicationId }),
};
) {
const messages: SNSInput[] = txs.map((tx) => this.makeSNSInput(tx, eventType));
try {
const command = new PublishCommand(input);
await this.client?.send(command);
if (input) {
const { Message } = input;
if (Message) {
const snsMessage: SNSMessage = JSON.parse(Message);
const { payload } = snsMessage;
const { id, emitterChain, txHash } = payload;
const chainName = coalesceChainName(emitterChain as ChainId);
this.logger.info({
id,
emitterChain,
chainName,
txHash,
message: 'Publish VAA log to SNS',
});
}
}
} catch (error: unknown) {
this.logger.error(error);
return {
status: 'error',
};
}
return {
status: 'success',
};
this.publishMessages(messages, eventType, fifo);
}
override async publishMessages(
whTxs: WHTransaction[],
messages: SNSInput[],
eventType: WhEventType,
fifo: boolean = false,
): Promise<SNSPublishMessageOutput> {
const messages: SNSInput[] = whTxs.map((whTx) => this.makeSNSInput(whTx));
const CHUNK_SIZE = 10;
const batches: PublishBatchCommandInput[] = [];
const inputs: PublishBatchRequestEntry[] = messages.map(
@ -147,18 +125,27 @@ class AwsSNS extends BaseSNS {
if (input) {
const { Message } = input;
if (Message) {
const snsMessage: SNSMessage = JSON.parse(Message);
const { payload } = snsMessage;
const { id, emitterChain, txHash } = payload;
const chainName = coalesceChainName(emitterChain as ChainId);
let snsMessage;
this.logger.info({
id,
emitterChain,
chainName,
txHash,
message: 'Publish VAA log to SNS',
});
if (eventType === 'whTx') {
snsMessage = JSON.parse(Message) as WhTxSNSMessage;
const { payload } = snsMessage;
const { id, emitterChain, txHash } = payload;
const chainName = coalesceChainName(emitterChain as ChainId);
this.logger.info({
id,
emitterChain,
chainName,
txHash,
message: 'Publish Wormhole Transaction Event Log to SNS',
});
}
if (eventType === 'redeemedTx') {
this.logger.info({
message: 'Publish Wormhole Transfer Redeemed Event Log to SNS',
});
}
}
}
});

View File

@ -1,6 +1,6 @@
import { getLogger, WormholeLogger } from '../../utils/logger';
import { SNSImplementation, SNSInput, SNSPublishMessageOutput } from './types';
import { WHTransaction } from '../../databases/types';
import { SNSImplementation, SNSInput, SNSPublishMessageOutput, WhEventType } from './types';
import { WHTransaction, WHTransferRedeemed } from '../../databases/types';
abstract class BaseSNS implements SNSImplementation {
public logger: WormholeLogger;
@ -14,10 +14,20 @@ abstract class BaseSNS implements SNSImplementation {
this.logger.info(`Initializing as ${this.snsTypeName}...`);
}
abstract makeSNSInput(whTx: WHTransaction): SNSInput;
abstract publishMessage(message: WHTransaction, fifo?: boolean): Promise<SNSPublishMessageOutput>;
abstract makeSNSInput(
data: WHTransaction | WHTransferRedeemed,
type: 'whTx' | 'redeemedTx',
): SNSInput;
abstract createMessages(
txs: WHTransaction[] | WHTransferRedeemed[],
eventType: WhEventType,
fifo?: boolean,
): Promise<void>;
abstract publishMessages(
messages: WHTransaction[],
messages: SNSInput[],
eventType: WhEventType,
fifo?: boolean,
): Promise<SNSPublishMessageOutput>;
}

View File

@ -1,10 +1,19 @@
import { WHTransaction } from '../../databases/types';
import { WHTransaction, WHTransferRedeemed } from '../../databases/types';
import AwsSNS from './AwsSNS';
export type SNSOptionTypes = AwsSNS | null;
export interface SNSImplementation {
publishMessage(message: WHTransaction, fifo?: boolean): Promise<SNSPublishMessageOutput>;
publishMessages(message: WHTransaction[], fifo?: boolean): Promise<SNSPublishMessageOutput>;
createMessages(
txs: WHTransaction[] | WHTransferRedeemed[],
eventType: WhEventType,
fifo?: boolean,
): Promise<void>;
publishMessages(
messages: SNSInput[],
eventType: WhEventType,
fifo?: boolean,
): Promise<SNSPublishMessageOutput>;
}
export interface AwsSNSConfig {
@ -24,7 +33,7 @@ export interface SNSInput {
deduplicationId?: string;
}
export interface SNSMessage {
export interface WhTxSNSMessage {
trackId: string;
source: string;
type: string;
@ -39,8 +48,11 @@ export interface SNSMessage {
};
}
export type RedeemedTxSNSMessage = object;
export interface SNSPublishMessageOutput {
status: 'success' | 'error';
reason?: string;
reasons?: string[];
}
export type WhEventType = 'whTx' | 'redeemedTx';

View File

@ -1,7 +1,7 @@
import { env } from '../../config';
import AwsSNS from './AwsSNS';
import { AwsSNSConfig, SNSOptionTypes, SNSMessage } from './types';
import { WHTransaction } from '../../databases/types';
import { AwsSNSConfig, SNSOptionTypes, WhTxSNSMessage } from './types';
import { WHTransaction, WHTransferRedeemed } from '../../databases/types';
import crypto from 'node:crypto';
const AwsConfig: AwsSNSConfig = {
@ -19,17 +19,17 @@ export const getSNS = (): SNSOptionTypes => {
return null;
};
export const makeSnsMessage = (
export const makeWhTxSnsMessage = (
whTx: WHTransaction,
metadata: { source: string; type: string },
): SNSMessage => {
): WhTxSNSMessage => {
const { id, eventLog } = whTx;
const { emitterChain, emitterAddr, sequence, unsignedVaa, txHash, indexedAt } = eventLog;
const timestamp = indexedAt ? new Date(indexedAt).toISOString() : new Date().toISOString();
const uuid = crypto.randomUUID();
const trackId = `chain-event-${id}-${uuid}`;
const snsMessage: SNSMessage = {
const snsMessage: WhTxSNSMessage = {
trackId,
source: metadata.source,
type: metadata.type,
@ -46,3 +46,10 @@ export const makeSnsMessage = (
return snsMessage;
};
export const makeRedeemedTxSnsMessage = (
_redeemedTx: WHTransferRedeemed,
_metadata: { source: string; type: string },
): string => {
return '';
};

View File

@ -2,7 +2,7 @@ import algosdk from 'algosdk';
import BaseWatcher from './BaseWatcher';
import { ALGORAND_INFO } from '../consts';
import { makeBlockKey, makeVaaKey, makeWHTransaction } from '../databases/utils';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { makeSerializedVAA } from './utils';
import { coalesceChainId } from '@certusone/wormhole-sdk';
@ -192,7 +192,7 @@ export class AlgorandWatcher extends BaseWatcher {
txHash: txHash!,
blockNumber: blockNumber!,
unsignedVaa: unsignedVaaBuffer,
sender: emitter,
sender: '', // sender is not coming from the event log
indexedAt: timestamp,
},
});
@ -202,4 +202,11 @@ export class AlgorandWatcher extends BaseWatcher {
return whTxs;
}
override async getRedeemedTxs(
_fromBlock: number,
_toBlock: number,
): Promise<WHTransferRedeemed[]> {
return [];
}
}

View File

@ -6,7 +6,7 @@ import { NETWORK_CONTRACTS, NETWORK_RPCS_BY_CHAIN } from '../consts';
import { makeVaaKey, makeWHTransaction } from '../databases/utils';
import { AptosEvent } from '../types/aptos';
import BaseWatcher from './BaseWatcher';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { makeSerializedVAA } from './utils';
const APTOS_CORE_BRIDGE_ADDRESS = NETWORK_CONTRACTS.aptos.core;
@ -119,7 +119,7 @@ export class AptosWatcher extends BaseWatcher {
txHash,
blockNumber: blockNumber,
unsignedVaa: unsignedVaaBuffer,
sender: parsedEmitter,
sender: '', // sender is not coming from the event log
indexedAt: Number(timestamp),
},
});
@ -131,6 +131,13 @@ export class AptosWatcher extends BaseWatcher {
return whTxs;
}
override async getRedeemedTxs(
_fromBlock: number,
_toBlock: number,
): Promise<WHTransferRedeemed[]> {
return [];
}
override isValidBlockKey(key: string) {
try {
const [block, timestamp, sequence] = key.split('/');

View File

@ -2,7 +2,7 @@ import { ChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import { INITIAL_DEPLOYMENT_BLOCK_BY_CHAIN, sleep } from '../common';
import { z } from 'zod';
import { TIMEOUT } from '../consts';
import { DBOptionTypes, WHTransaction, VaasByBlock } from '../databases/types';
import { DBOptionTypes, WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { getLogger, WormholeLogger } from '../utils/logger';
import { SNSOptionTypes } from '../services/SNS/types';
import { WatcherImplementation } from './types';
@ -20,6 +20,10 @@ abstract class BaseWatcher implements WatcherImplementation {
this.logger = getLogger(chain);
}
abstract getFinalizedBlockNumber(): Promise<number>;
abstract getWhTxs(fromBlock: number, toBlock: number): Promise<WHTransaction[]>;
abstract getRedeemedTxs(fromBlock: number, toBlock: number): Promise<WHTransferRedeemed[]>;
setDB(db: DBOptionTypes) {
this.db = db;
}
@ -32,9 +36,6 @@ abstract class BaseWatcher implements WatcherImplementation {
throw new Error('Method not implemented.');
}
abstract getFinalizedBlockNumber(): Promise<number>;
abstract getWhTxs(fromBlock: number, toBlock: number): Promise<WHTransaction[]>;
isValidVaaKey(_key: string): boolean {
throw new Error('Method not implemented.');
}
@ -55,6 +56,21 @@ abstract class BaseWatcher implements WatcherImplementation {
}
}
async getWhEvents(
fromBlock: number,
toBlock: number,
): Promise<{ whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] }> {
const whEvents: { whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] } = {
whTxs: [],
redeemedTxs: [],
};
whEvents.whTxs = await this.getWhTxs(fromBlock, toBlock);
whEvents.redeemedTxs = await this.getRedeemedTxs(fromBlock, toBlock);
return whEvents;
}
async stop() {
this.stopWatcher = true;
}
@ -84,20 +100,35 @@ abstract class BaseWatcher implements WatcherImplementation {
try {
this.logger.debug(`fetching messages from ${fromBlock} to ${toBlock}`);
// Here we get all the vaa logs from LOG_MESSAGE_PUBLISHED_TOPIC
const whTxs = await this.getWhTxs(fromBlock, toBlock);
// Events from:
// whTxs: LOG_MESSAGE_PUBLISHED_TOPIC (Core Contract)
// redeemedTxs: TRANSFER_REDEEMED_TOPIC (Token Bridge Contract)
const { whTxs, redeemedTxs } = await this.getWhEvents(fromBlock, toBlock);
if (whTxs?.length > 0) {
// Then store the vaa logs processed in db
// Then store the wormhole txs logs processed in db
await this.db?.storeWhTxs(this.chain, whTxs);
// Then publish the vaa logs processed in SNS
await this.sns?.publishMessages(whTxs, true);
// Then publish the wormhole txs logs processed in SNS
await this.sns?.createMessages(whTxs, 'whTx', true);
}
if (redeemedTxs?.length > 0) {
// Then store the redeemed transfers logs processed in db
await this.db?.storeRedeemedTxs(this.chain, redeemedTxs);
}
// Then store the latest processed block by Chain Id
await this.db?.storeLatestProcessBlock(this.chain, toBlock);
} catch (e) {
this.logger.error(e);
} catch (e: unknown) {
let message;
if (e instanceof Error) {
message = e.message;
} else {
message = e;
}
this.logger.error(message);
}
fromBlock = toBlock + 1;
@ -115,12 +146,19 @@ abstract class BaseWatcher implements WatcherImplementation {
} catch (e) {
// skip attempting to fetch messages until getting the finalized block succeeds
toBlock = null;
this.logger.error(`error fetching finalized block`);
this.logger.error(`Error fetching finalized block`);
throw e;
}
} catch (e) {
retry++;
this.logger.error(e);
let message;
if (e instanceof Error) {
message = e.message;
} else {
message = e;
}
this.logger.error(message);
const backOffTimeoutMS = TIMEOUT * 2 ** retry;
this.logger.warn(`backing off for ${backOffTimeoutMS}ms`);
await sleep(backOffTimeoutMS);

View File

@ -5,7 +5,7 @@ import { makeBlockKey, makeVaaKey, makeWHTransaction } from '../databases/utils'
import BaseWatcher from './BaseWatcher';
import { SHA256 } from 'jscrypto/SHA256';
import { Base64 } from 'jscrypto/Base64';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { makeSerializedVAA } from './utils';
export class CosmwasmWatcher extends BaseWatcher {
@ -281,7 +281,7 @@ export class CosmwasmWatcher extends BaseWatcher {
txHash,
blockNumber: blockNumber,
unsignedVaa: unsignedVaaBuffer,
sender: emitter!,
sender: '', // sender is not coming from the event log
indexedAt: timestamp!,
},
});
@ -311,6 +311,13 @@ export class CosmwasmWatcher extends BaseWatcher {
}
return whTxs;
}
override async getRedeemedTxs(
_fromBlock: number,
_toBlock: number,
): Promise<WHTransferRedeemed[]> {
return [];
}
}
type CosmwasmBlockResult = {

View File

@ -4,16 +4,25 @@ import { Log } from '@ethersproject/abstract-provider';
import axios from 'axios';
import { BigNumber } from 'ethers';
import { AXIOS_CONFIG_JSON, NETWORK_CONTRACTS, NETWORK_RPCS_BY_CHAIN } from '../consts';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import BaseWatcher from './BaseWatcher';
import { makeBlockKey, makeVaaKey, makeWHTransaction } from '../databases/utils';
import {
makeBlockKey,
makeVaaKey,
makeWHRedeemedTransaction,
makeWHTransaction,
} from '../databases/utils';
import { makeSerializedVAA } from './utils';
export const wormholeInterface = Implementation__factory.createInterface();
// This is the hash for topic[0] of the core contract event LogMessagePublished
// https://github.com/wormhole-foundation/wormhole/blob/main/ethereum/contracts/Implementation.sol#L12
export const LOG_MESSAGE_PUBLISHED_TOPIC =
'0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2';
export const wormholeInterface = Implementation__factory.createInterface();
// This is the hash for topic[0] of the token bridge contract event TransferRedeemed
// https://github.com/wormhole-foundation/wormhole/blob/99d01324b80d2e86d0e5b8ea832f9cf9d4119fcd/ethereum/contracts/bridge/Bridge.sol#L29
export const TRANSFER_REDEEMED_TOPIC =
'0xcaf280c8cfeba144da67230d9b009c8f868a75bac9a528fa0474be1ba317c169';
export type BlockTag = 'finalized' | 'safe' | 'latest';
export type Block = {
@ -160,7 +169,7 @@ export class EVMWatcher extends BaseWatcher {
toBlock: number,
address: string,
topics: string[],
): Promise<Array<Log>> {
): Promise<Log[]> {
const rpc = NETWORK_RPCS_BY_CHAIN[this.chain];
if (!rpc) {
throw new Error(`${this.chain} RPC is not defined!`);
@ -250,21 +259,21 @@ export class EVMWatcher extends BaseWatcher {
timestampsByBlock[block.number] = timestamp;
}
const logs = await this.getLogs(fromBlock, toBlock, address, [LOG_MESSAGE_PUBLISHED_TOPIC]);
this.logger.debug(`processing ${logs.length} logs`);
const txLogs = await this.getLogs(fromBlock, toBlock, address, [LOG_MESSAGE_PUBLISHED_TOPIC]);
for (const log of logs) {
// console.log('log', log);
// console.log('parseLog', wormholeInterface.parseLog(log));
this.logger.debug(`processing ${txLogs.length} txLogs`);
for (const txLog of txLogs) {
// console.log('txLog', txLog);
// console.log('txLog::parseLog', wormholeInterface.parseLog(txLog));
const { args } = wormholeInterface.parseLog(log);
const { args } = wormholeInterface.parseLog(txLog);
const { sequence, payload, nonce, consistencyLevel } = args || {};
const blockNumber = log.blockNumber;
const blockNumber = txLog.blockNumber;
const chainName = this.chain;
const chainId = coalesceChainId(chainName);
const emitter = log.topics[1].slice(2);
const emitter = txLog.topics[1].slice(2);
const parseSequence = Number(sequence.toString());
const txHash = log.transactionHash;
const txHash = txLog.transactionHash;
const parsePayload = Buffer.from(payload).toString().slice(2);
const timestamp = timestampsByBlock[blockNumber];
@ -287,7 +296,7 @@ export class EVMWatcher extends BaseWatcher {
txHash,
blockNumber: blockNumber,
unsignedVaa: unsignedVaaBuffer,
sender: emitter,
sender: '', // sender is not coming from the event log
indexedAt: timestamp,
},
});
@ -297,4 +306,38 @@ export class EVMWatcher extends BaseWatcher {
return whTxs;
}
override async getRedeemedTxs(fromBlock: number, toBlock: number): Promise<WHTransferRedeemed[]> {
const redeemedTxs: WHTransferRedeemed[] = [];
const tokenBridgeAddress = NETWORK_CONTRACTS[this.chain].token_bridge;
if (!tokenBridgeAddress) {
throw new Error(`Token Bridge contract not defined for ${this.chain}`);
}
const transferRedeemedLogs = await this.getLogs(fromBlock, toBlock, tokenBridgeAddress, [
TRANSFER_REDEEMED_TOPIC,
]);
this.logger.debug(`processing ${transferRedeemedLogs.length} transferRedeemedLogs`);
for (const transferRedeemedLog of transferRedeemedLogs) {
const [, emitterChainId, emitterAddress, sequence] = transferRedeemedLog?.topics || [];
if (emitterChainId && emitterAddress && sequence) {
const parsedEmitterChainId = Number(emitterChainId.toString());
const parsedEmitterAddress = emitterAddress.slice(2);
const parsedSequence = Number(sequence.toString());
const redeemedTx = await makeWHRedeemedTransaction({
emitterChainId: parsedEmitterChainId,
emitterAddress: parsedEmitterAddress,
sequence: parsedSequence,
});
redeemedTxs.push(redeemedTx);
}
}
return redeemedTxs;
}
}

View File

@ -1,6 +1,6 @@
import axios from 'axios';
import { NETWORK_CONTRACTS, NETWORK_RPCS_BY_CHAIN } from '../consts';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { makeBlockKey, makeVaaKey, makeWHTransaction } from '../databases/utils';
import { EventObjectsTypes, RawLogEvents } from './TerraExplorerWatcher';
import BaseWatcher from './BaseWatcher';
@ -302,7 +302,7 @@ export class InjectiveExplorerWatcher extends BaseWatcher {
txHash,
blockNumber: blockNumber,
unsignedVaa: unsignedVaaBuffer,
sender: emitter!,
sender: '', // sender is not coming from the event log
indexedAt: timestamp!,
},
});
@ -329,6 +329,13 @@ export class InjectiveExplorerWatcher extends BaseWatcher {
return whTxs;
}
override async getRedeemedTxs(
_fromBlock: number,
_toBlock: number,
): Promise<WHTransferRedeemed[]> {
return [];
}
}
type ExplorerBlocks = {

View File

@ -5,7 +5,7 @@ import { BlockResult, ExecutionStatus } from 'near-api-js/lib/providers/provider
import ora from 'ora';
import { z } from 'zod';
import { NETWORK_CONTRACTS, NETWORK_RPCS_BY_CHAIN } from '../consts';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { makeBlockKey, makeVaaKey, makeWHTransaction } from '../databases/utils';
import { EventLog } from '../types/near';
import { getNearProvider, isWormholePublishEventLog } from '../utils/near';
@ -85,6 +85,13 @@ export class NearWatcher extends BaseWatcher {
return await getWhTxsResults(provider, blocks);
}
override async getRedeemedTxs(
_fromBlock: number,
_toBlock: number,
): Promise<WHTransferRedeemed[]> {
return [];
}
async getProvider(): Promise<Provider> {
return (this.provider = this.provider || (await getNearProvider(NETWORK_RPCS_BY_CHAIN.near!)));
}
@ -215,7 +222,7 @@ export const getWhTxsResults = async (
txHash,
blockNumber: blockNumber,
unsignedVaa: unsignedVaaBuffer,
sender: emitter,
sender: '', // sender is not coming from the event log
indexedAt: timestampDate,
},
});

View File

@ -6,7 +6,7 @@ import {
SEI_EXPLORER_GRAPHQL,
SEI_EXPLORER_TXS,
} from '../consts';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { makeBlockKey, makeVaaKey, makeWHTransaction } from '../databases/utils';
import { CosmwasmHashResult, CosmwasmWatcher } from './CosmwasmWatcher';
import { makeSerializedVAA } from './utils';
@ -352,7 +352,7 @@ export class SeiExplorerWatcher extends CosmwasmWatcher {
txHash,
blockNumber: blockNumber,
unsignedVaa: unsignedVaaBuffer,
sender: emitter!,
sender: '', // sender is not coming from the event log
indexedAt: timestamp!,
},
});
@ -394,4 +394,11 @@ export class SeiExplorerWatcher extends CosmwasmWatcher {
// is synced with the block height. Therefore, the latest block will only update when a new transaction appears.
return whTxs;
}
override async getRedeemedTxs(
_fromBlock: number,
_toBlock: number,
): Promise<WHTransferRedeemed[]> {
return [];
}
}

View File

@ -10,7 +10,7 @@ import {
import { decode } from 'bs58';
import { z } from 'zod';
import { NETWORK_CONTRACTS, NETWORK_RPCS_BY_CHAIN } from '../consts';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { makeBlockKey, makeVaaKey, makeWHTransaction } from '../databases/utils';
import { isLegacyMessage, normalizeCompileInstruction } from '../utils/solana';
import BaseWatcher from './BaseWatcher';
@ -328,7 +328,7 @@ export class SolanaWatcher extends BaseWatcher {
txHash,
blockNumber: blockNumber,
unsignedVaa: unsignedVaaBuffer,
sender: emitter,
sender: '', // sender is not coming from the event log
indexedAt: timestamp,
},
});
@ -344,6 +344,13 @@ export class SolanaWatcher extends BaseWatcher {
return whTxs;
}
override async getRedeemedTxs(
_fromBlock: number,
_toBlock: number,
): Promise<WHTransferRedeemed[]> {
return [];
}
override isValidVaaKey(key: string) {
try {
const [txHash, vaaKey] = key.split(':');

View File

@ -9,7 +9,7 @@ import { array } from 'superstruct';
import { NETWORK_RPCS_BY_CHAIN } from '../consts';
import BaseWatcher from './BaseWatcher';
import { makeBlockKey, makeVaaKey, makeWHTransaction } from '../databases/utils';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { makeSerializedVAA } from './utils';
const SUI_EVENT_HANDLE = `0x5306f64e312b581766351c07af79c72fcb1cd25147157fdc2f8ad76de9a3fb6a::publish_message::WormholeMessage`;
@ -218,7 +218,7 @@ export class SuiWatcher extends BaseWatcher {
txHash,
blockNumber: blockNumber,
unsignedVaa: unsignedVaaBuffer,
sender: emitter,
sender: '', // sender is not coming from the event log
indexedAt: timestampDate,
},
});
@ -229,4 +229,11 @@ export class SuiWatcher extends BaseWatcher {
return whTxs;
}
override async getRedeemedTxs(
_fromBlock: number,
_toBlock: number,
): Promise<WHTransferRedeemed[]> {
return [];
}
}

View File

@ -1,7 +1,7 @@
import { CosmWasmChainName } from '@certusone/wormhole-sdk/lib/cjs/utils/consts';
import axios from 'axios';
import { AXIOS_CONFIG_JSON, NETWORK_CONTRACTS, NETWORK_RPCS_BY_CHAIN } from '../consts';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import { makeBlockKey, makeVaaKey, makeWHTransaction } from '../databases/utils';
import BaseWatcher from './BaseWatcher';
import { makeSerializedVAA } from './utils';
@ -294,7 +294,7 @@ export class TerraExplorerWatcher extends BaseWatcher {
txHash,
blockNumber: blockNumber,
unsignedVaa: unsignedVaaBuffer,
sender: emitter!,
sender: '', // sender is not coming from the event log
indexedAt: timestamp!,
},
});
@ -319,6 +319,13 @@ export class TerraExplorerWatcher extends BaseWatcher {
}
return whTxs;
}
override async getRedeemedTxs(
_fromBlock: number,
_toBlock: number,
): Promise<WHTransferRedeemed[]> {
return [];
}
}
type BulkTxnResult = {

View File

@ -1,6 +1,6 @@
import { ChainName } from '@certusone/wormhole-sdk';
import BaseDB from '../databases/BaseDB';
import { WHTransaction, VaasByBlock } from '../databases/types';
import { WHTransaction, VaasByBlock, WHTransferRedeemed } from '../databases/types';
import BaseSNS from '../services/SNS/BaseSNS';
import { WormholeLogger } from '../utils/logger';
import { AlgorandWatcher } from './AlgorandWatcher';
@ -33,7 +33,12 @@ export interface WatcherImplementation {
db?: BaseDB;
getFinalizedBlockNumber(): Promise<number>;
getMessagesForBlocks(fromBlock: number, toBlock: number): Promise<VaasByBlock>;
getWhEvents(
fromBlock: number,
toBlock: number,
): Promise<{ whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] }>;
getWhTxs(fromBlock: number, toBlock: number): Promise<WHTransaction[]>;
getRedeemedTxs(fromBlock: number, toBlock: number): Promise<WHTransferRedeemed[]>;
isValidBlockKey(key: string): boolean;
isValidVaaKey(key: string): boolean;
watch(): Promise<void>;