Feat/748 add last sequence number (#749)

* feat(743): add Wormchain support

* feat(748): Add lastSequeceNumber to lastBlocksByChain collection
This commit is contained in:
Ricardo Olarte 2023-10-17 12:39:21 -05:00 committed by GitHub
parent b6ce099a05
commit 14161d1569
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 98 additions and 32 deletions

View File

@ -29,8 +29,8 @@ export const TIMEOUT = 1000;
export const RPCS_BY_CHAIN_MAINNET: { [key in ChainName]?: string } = {
acala: env.ACALA_RPC || 'https://eth-rpc-acala.aca-api.network',
algorand: env.ALGORAND_RPC || 'https://mainnet-api.algonode.cloud',
aptos: env.APTOS_RPC || 'https://rpc.ankr.com/aptos',
arbitrum: env.ARBITRUM_RPC || 'https://rpc.ankr.com/arbitrum',
aptos: env.APTOS_RPC || 'https://fullnode.mainnet.aptoslabs.com',
arbitrum: env.ARBITRUM_RPC || 'https://arb1.arbitrum.io/rpc',
avalanche: env.AVALANCHE_RPC || 'https://rpc.ankr.com/avalanche',
base: env.BASE_RPC || 'https://mainnet.base.org',
bsc: env.BSC_RPC || 'https://rpc.ankr.com/bsc_testnet_chapel',
@ -45,9 +45,9 @@ export const RPCS_BY_CHAIN_MAINNET: { [key in ChainName]?: string } = {
oasis: env.OASIS_RPC || 'https://emerald.oasis.dev',
optimism: env.OPTIMISM_RPC || 'https://rpc.ankr.com/optimism',
polygon: env.POLYGON_RPC || 'https://rpc.ankr.com/polygon',
sei: env.SEI_RPC || 'https://rpc.ankr.com/sei', // https://docs.sei.io/develop/resources
sei: env.SEI_RPC || 'https://sei-rest.brocha.in', // https://docs.sei.io/develop/resources
solana: env.SOLANA_RPC || 'https://api.mainnet-beta.solana.com',
sui: env.SUI_RPC || 'https://rpc.ankr.com/sui',
sui: env.SUI_RPC || 'https://rpc.mainnet.sui.io',
terra: env.TERRA_RPC || 'https://terra-classic-fcd.publicnode.com', // 'https://columbus-fcd.terra.dev',
terra2: env.TERRA2_RPC || 'https://phoenix-lcd.terra.dev',
wormchain: env.WORMCHAIN_RPC || 'https://wormchain.jumpisolated.com',

View File

@ -67,7 +67,11 @@ abstract class BaseDB implements DBImplementation {
abstract getLastBlocksProcessed(): Promise<void>;
abstract storeWhTxs(chain: ChainName, whTxs: WHTransaction[]): Promise<void>;
abstract storeRedeemedTxs(chain: ChainName, redeemedTxs: WHTransferRedeemed[]): Promise<void>;
abstract storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void>;
abstract storeLatestProcessBlock(
chain: ChainName,
lastBlock: number,
lastSequenceNumber: number | null,
): Promise<void>;
}
export default BaseDB;

View File

@ -61,7 +61,7 @@ export default class JsonDB extends BaseDB {
}
}
override async storeWhTxs(chainName: ChainName, whTxs: WHTransaction[]): Promise<void> {
async storeWhTxs(chainName: ChainName, whTxs: WHTransaction[]): Promise<void> {
try {
for (let i = 0; i < whTxs.length; i++) {
let message = 'Insert Wormhole Transaction Event Log to JSON file';
@ -106,10 +106,7 @@ export default class JsonDB extends BaseDB {
}
}
override async storeRedeemedTxs(
chainName: ChainName,
redeemedTxs: WHTransferRedeemed[],
): Promise<void> {
async storeRedeemedTxs(chainName: ChainName, redeemedTxs: WHTransferRedeemed[]): Promise<void> {
// For JsonDB we are only pushing all the "redeemed" logs into GLOBAL_TX_FILE simulating a globalTransactions collection
try {
@ -167,7 +164,11 @@ export default class JsonDB extends BaseDB {
}
}
override async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void> {
async storeLatestProcessBlock(
chain: ChainName,
lastBlock: number,
lastSequenceNumber: number | null,
): Promise<void> {
const chainId = coalesceChainId(chain);
const updatedLastBlocksByChain = [...this.lastBlocksByChain];
const itemIndex = updatedLastBlocksByChain.findIndex((item) => {
@ -179,12 +180,14 @@ export default class JsonDB extends BaseDB {
updatedLastBlocksByChain[itemIndex] = {
...updatedLastBlocksByChain[itemIndex],
blockNumber: lastBlock,
lastSequenceNumber,
updatedAt: new Date(),
};
} else {
updatedLastBlocksByChain.push({
id: chain,
blockNumber: lastBlock,
lastSequenceNumber,
chainId,
createdAt: new Date(),
updatedAt: new Date(),

View File

@ -60,7 +60,7 @@ export default class MongoDB extends BaseDB {
}
}
override async storeWhTxs(chainName: ChainName, whTxs: WHTransaction[]): Promise<void> {
async storeWhTxs(chainName: ChainName, whTxs: WHTransaction[]): Promise<void> {
try {
for (let i = 0; i < whTxs.length; i++) {
let message = `Insert Wormhole Transaction Event Log to ${WORMHOLE_TX_COLLECTION} collection`;
@ -108,10 +108,7 @@ export default class MongoDB extends BaseDB {
}
}
override async storeRedeemedTxs(
chainName: ChainName,
redeemedTxs: WHTransferRedeemed[],
): Promise<void> {
async storeRedeemedTxs(chainName: ChainName, redeemedTxs: WHTransferRedeemed[]): Promise<void> {
try {
for (let i = 0; i < redeemedTxs.length; i++) {
let message = `Insert Wormhole Transfer Redeemed Event Log to ${GLOBAL_TX_COLLECTION} collection`;
@ -182,7 +179,11 @@ export default class MongoDB extends BaseDB {
}
}
override async storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void> {
async storeLatestProcessBlock(
chain: ChainName,
lastBlock: number,
lastSequenceNumber: number | null,
): Promise<void> {
const chainId = coalesceChainId(chain);
try {
@ -195,6 +196,7 @@ export default class MongoDB extends BaseDB {
},
$set: {
blockNumber: lastBlock,
lastSequenceNumber,
updatedAt: new Date(),
},
},

View File

@ -11,7 +11,11 @@ export interface DBImplementation {
getLastBlockByChain(chain: ChainName): string | null;
storeWhTxs(chain: ChainName, whTxs: WHTransaction[]): Promise<void>;
storeRedeemedTxs(chain: ChainName, redeemedTxs: WHTransferRedeemed[]): Promise<void>;
storeLatestProcessBlock(chain: ChainName, lastBlock: number): Promise<void>;
storeLatestProcessBlock(
chain: ChainName,
lastBlock: number,
lastSequenceNumber: number | null,
): Promise<void>;
}
export type VaasByBlock = { [blockInfo: string]: string[] };
@ -38,6 +42,7 @@ export type EventLog = {
type LastBlockItem = {
blockNumber: number;
lastSequenceNumber: number | null;
chainId: number;
createdAt: Date | string;
updatedAt: Date | string;

View File

@ -60,17 +60,41 @@ abstract class BaseWatcher implements WatcherImplementation {
}
}
async getLastSequenceNumber(whTxs: WHTransaction[]): Promise<number | null> {
if (whTxs.length > 0) {
return whTxs[whTxs.length - 1].eventLog.sequence;
}
return null;
}
async getWhEvents(
fromBlock: number,
toBlock: number,
): Promise<{ whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] }> {
const whEvents: { whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] } = {
): Promise<{
whTxs: WHTransaction[];
redeemedTxs: WHTransferRedeemed[];
lastSequenceNumber: number | null;
}> {
const whEvents: {
whTxs: WHTransaction[];
redeemedTxs: WHTransferRedeemed[];
lastSequenceNumber: number | null;
} = {
whTxs: [],
redeemedTxs: [],
lastSequenceNumber: null,
};
whEvents.whTxs = await this.getWhTxs(fromBlock, toBlock);
whEvents.redeemedTxs = await this.getRedeemedTxs(fromBlock, toBlock);
const sortedWhTxs = (await this.getWhTxs(fromBlock, toBlock))?.sort((a, b) => {
return a.eventLog.sequence - b.eventLog.sequence;
});
const sortedRedeemedTxs = await this.getRedeemedTxs(fromBlock, toBlock);
const lastSequenceNumber = await this.getLastSequenceNumber(sortedWhTxs);
whEvents.whTxs = sortedWhTxs;
whEvents.redeemedTxs = sortedRedeemedTxs;
whEvents.lastSequenceNumber = lastSequenceNumber;
return whEvents;
}
@ -102,7 +126,10 @@ abstract class BaseWatcher implements WatcherImplementation {
// Events from:
// whTxs: LOG_MESSAGE_PUBLISHED_TOPIC (Core Contract)
// redeemedTxs: TRANSFER_REDEEMED_TOPIC (Token Bridge Contract)
const { whTxs, redeemedTxs } = await this.getWhEvents(fromBlock, toBlock);
const { whTxs, redeemedTxs, lastSequenceNumber } = await this.getWhEvents(
fromBlock,
toBlock,
);
if (whTxs?.length > 0) {
// Then store the wormhole txs logs processed in db
@ -118,7 +145,7 @@ abstract class BaseWatcher implements WatcherImplementation {
}
// Then store the latest processed block by Chain Id
await this.db?.storeLatestProcessBlock(this.chain, toBlock);
await this.db?.storeLatestProcessBlock(this.chain, toBlock, lastSequenceNumber);
} catch (e: unknown) {
let message;
if (e instanceof Error) {

View File

@ -107,6 +107,7 @@ export class EVMWatcher extends BaseWatcher {
`Unable to parse result of eth_getBlockByNumber for ${blockNumberOrTag} on ${rpc}`,
);
}
async getBlocks(fromBlock: number, toBlock: number): Promise<Block[]> {
const rpc = NETWORK_RPCS_BY_CHAIN[this.chain];
if (!rpc) {
@ -164,6 +165,7 @@ export class EVMWatcher extends BaseWatcher {
`Unable to parse result of eth_getBlockByNumber for range ${fromBlock}-${toBlock} on ${rpc}`,
);
}
async getLogs(
fromBlock: number,
toBlock: number,
@ -207,7 +209,7 @@ export class EVMWatcher extends BaseWatcher {
throw new Error(`Unable to parse result of eth_getLogs for ${fromBlock}-${toBlock} on ${rpc}`);
}
override async getFinalizedBlockNumber(): Promise<number> {
async getFinalizedBlockNumber(): Promise<number> {
this.logger.debug(`fetching block ${this.finalizedBlockTag}`);
const block: Block = await this.getBlock(this.finalizedBlockTag);
this.latestFinalizedBlockNumber = block.number;
@ -247,10 +249,19 @@ export class EVMWatcher extends BaseWatcher {
override async getWhEvents(
fromBlock: number,
toBlock: number,
): Promise<{ whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] }> {
const whEvents: { whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] } = {
): Promise<{
whTxs: WHTransaction[];
redeemedTxs: WHTransferRedeemed[];
lastSequenceNumber: number | null;
}> {
const whEvents: {
whTxs: WHTransaction[];
redeemedTxs: WHTransferRedeemed[];
lastSequenceNumber: number | null;
} = {
whTxs: [],
redeemedTxs: [],
lastSequenceNumber: null,
};
// We collect the blocks data here to avoid making multiple requests to the RPC
@ -261,13 +272,22 @@ export class EVMWatcher extends BaseWatcher {
timestampsByBlock[block.number] = timestamp;
}
whEvents.whTxs = await this.getWhTxs(fromBlock, toBlock, timestampsByBlock);
whEvents.redeemedTxs = await this.getRedeemedTxs(fromBlock, toBlock, timestampsByBlock);
const sortedWhTxs = (await this.getWhTxs(fromBlock, toBlock, timestampsByBlock))?.sort(
(a, b) => {
return a.eventLog.sequence - b.eventLog.sequence;
},
);
const sortedRedeemedTxs = await this.getRedeemedTxs(fromBlock, toBlock, timestampsByBlock);
const lastSequenceNumber = await this.getLastSequenceNumber(sortedWhTxs);
whEvents.whTxs = sortedWhTxs;
whEvents.redeemedTxs = sortedRedeemedTxs;
whEvents.lastSequenceNumber = lastSequenceNumber;
return whEvents;
}
override async getWhTxs(
async getWhTxs(
fromBlock: number,
toBlock: number,
timestampsByBlock?: Record<number, Date>,
@ -327,7 +347,7 @@ export class EVMWatcher extends BaseWatcher {
return whTxs;
}
override async getRedeemedTxs(
async getRedeemedTxs(
fromBlock: number,
toBlock: number,
timestampsByBlock?: Record<number, Date>,

View File

@ -36,9 +36,14 @@ export interface WatcherImplementation {
getWhEvents(
fromBlock: number,
toBlock: number,
): Promise<{ whTxs: WHTransaction[]; redeemedTxs: WHTransferRedeemed[] }>;
): Promise<{
whTxs: WHTransaction[];
redeemedTxs: WHTransferRedeemed[];
lastSequenceNumber: number | null;
}>;
getWhTxs(fromBlock: number, toBlock: number): Promise<WHTransaction[]>;
getRedeemedTxs(fromBlock: number, toBlock: number): Promise<WHTransferRedeemed[]>;
getLastSequenceNumber(whTxs: WHTransaction[]): Promise<number | null>;
isValidBlockKey(key: string): boolean;
isValidVaaKey(key: string): boolean;
watch(): Promise<void>;