diff --git a/blockchain-watcher/config/default.json b/blockchain-watcher/config/default.json index 7fe84ba3..a7a12e91 100644 --- a/blockchain-watcher/config/default.json +++ b/blockchain-watcher/config/default.json @@ -3,7 +3,7 @@ "port": 9090, "logLevel": "debug", "dryRun": true, - "enabledPlatforms": ["solana", "evm", "sui", "aptos"], + "enabledPlatforms": ["solana", "evm", "sui", "aptos", "wormchain"], "sns": { "topicArn": "arn:aws:sns:us-east-1:000000000000:localstack-topic.fifo", "region": "us-east-1", @@ -197,6 +197,13 @@ "chainId": 10006, "rpcs": ["https://rpc.ankr.com/eth_holesky"], "timeout": 10000 + }, + "wormchain": { + "name": "wormchain", + "network": "testnet", + "chainId": 0, + "rpcs": ["https://wormchain-testnet.jumpisolated.com"], + "timeout": 10000 } } } diff --git a/blockchain-watcher/config/mainnet.json b/blockchain-watcher/config/mainnet.json index 0253a072..afd5186c 100644 --- a/blockchain-watcher/config/mainnet.json +++ b/blockchain-watcher/config/mainnet.json @@ -122,6 +122,10 @@ "aptos": { "network": "mainnet", "rpcs": ["https://fullnode.mainnet.aptoslabs.com/v1"] + }, + "wormchain": { + "network": "mainnet", + "rpcs": ["https://wormchain-rpc.quickapi.com"] } } } diff --git a/blockchain-watcher/src/domain/actions/evm/HandleEvmLogs.ts b/blockchain-watcher/src/domain/actions/evm/HandleEvmLogs.ts index 0930ae03..983445ad 100644 --- a/blockchain-watcher/src/domain/actions/evm/HandleEvmLogs.ts +++ b/blockchain-watcher/src/domain/actions/evm/HandleEvmLogs.ts @@ -1,7 +1,7 @@ -import { EvmLog, EvmTopicFilter } from "../../entities"; +import { HandleEvmLogsConfig } from "./types"; import { StatRepository } from "../../repositories"; import { ethers } from "ethers"; -import { HandleEvmLogsConfig } from "./types"; +import { EvmLog } from "../../entities"; /** * Handling means mapping and forward to a given target. diff --git a/blockchain-watcher/src/domain/actions/wormchain/GetWormchainLogs.ts b/blockchain-watcher/src/domain/actions/wormchain/GetWormchainLogs.ts new file mode 100644 index 00000000..6caaa2e4 --- /dev/null +++ b/blockchain-watcher/src/domain/actions/wormchain/GetWormchainLogs.ts @@ -0,0 +1,59 @@ +import { WormchainRepository } from "../../repositories"; +import { WormchainLog } from "../../entities/wormchain"; +import winston from "winston"; + +export class GetWormchainLogs { + private readonly blockRepo: WormchainRepository; + protected readonly logger: winston.Logger; + + constructor(blockRepo: WormchainRepository) { + this.blockRepo = blockRepo; + this.logger = winston.child({ module: "GetWormchainLogs" }); + } + + async execute(range: Range, opts: GetWormchainOpts): Promise { + const fromBlock = range.fromBlock; + const toBlock = range.toBlock; + + const collectWormchainLogs: WormchainLog[] = []; + + if (fromBlock > toBlock) { + this.logger.info(`[exec] Invalid range [fromBlock: ${fromBlock} - toBlock: ${toBlock}]`); + return []; + } + + for (let blockNumber = fromBlock; blockNumber <= toBlock; blockNumber++) { + const wormchainLogs = await this.blockRepo.getBlockLogs(blockNumber); + + if (wormchainLogs && wormchainLogs.transactions && wormchainLogs.transactions.length > 0) { + collectWormchainLogs.push(wormchainLogs); + } + } + + this.logger.info( + `[wormchain][exec] Got ${ + collectWormchainLogs?.length + } transactions to process for ${this.populateLog(opts, fromBlock, toBlock)}` + ); + return collectWormchainLogs; + } + + private populateLog(opts: GetWormchainOpts, fromBlock: bigint, toBlock: bigint): string { + return `[addresses:${opts.addresses}][topics:${opts.topics}][blocks:${fromBlock} - ${toBlock}]`; + } +} + +type Range = { + fromBlock: bigint; + toBlock: bigint; +}; + +type TopicFilter = string | string[]; + +type GetWormchainOpts = { + addresses?: string[]; + topics?: TopicFilter[]; + chain: string; + chainId: number; + environment: string; +}; diff --git a/blockchain-watcher/src/domain/actions/wormchain/HandleWormchainLogs.ts b/blockchain-watcher/src/domain/actions/wormchain/HandleWormchainLogs.ts new file mode 100644 index 00000000..302224cd --- /dev/null +++ b/blockchain-watcher/src/domain/actions/wormchain/HandleWormchainLogs.ts @@ -0,0 +1,45 @@ +import { TransactionFoundEvent } from "../../entities"; +import { StatRepository } from "../../repositories"; +import { WormchainLog } from "../../entities/wormchain"; + +export class HandleWormchainLogs { + constructor( + private readonly cfg: HandleWormchainLogsOptions, + private readonly mapper: (tx: WormchainLog) => TransactionFoundEvent, + private readonly target: (parsed: TransactionFoundEvent[]) => Promise, + private readonly statsRepo: StatRepository + ) {} + + public async handle(logs: WormchainLog[]): Promise { + const mappedItems = logs.map((log) => { + const logMap = this.mapper(log); + return logMap; + }); + + const filterItems = mappedItems.filter((item) => { + if (item) { + this.report(); + return item; + } + }); + + await this.target(filterItems); + return filterItems; + } + + private report() { + const labels = { + job: this.cfg.id, + chain: "wormchain", + commitment: "immediate", + }; + this.statsRepo.count(this.cfg.metricName, labels); + } +} + +export interface HandleWormchainLogsOptions { + metricLabels?: { job: string; chain: string; commitment: string }; + eventTypes?: string[]; + metricName: string; + id: string; +} diff --git a/blockchain-watcher/src/domain/actions/wormchain/PollWormchain.ts b/blockchain-watcher/src/domain/actions/wormchain/PollWormchain.ts new file mode 100644 index 00000000..5fd0ea38 --- /dev/null +++ b/blockchain-watcher/src/domain/actions/wormchain/PollWormchain.ts @@ -0,0 +1,229 @@ +import { MetadataRepository, StatRepository, WormchainRepository } from "../../repositories"; +import { GetWormchainLogs } from "./GetWormchainLogs"; +import { RunPollingJob } from "../RunPollingJob"; +import winston from "winston"; + +const ID = "watch-wormchain-logs"; + +export class PollWormchain extends RunPollingJob { + protected readonly logger: winston.Logger; + + private readonly blockRepo: WormchainRepository; + private readonly metadataRepo: MetadataRepository; + private readonly statsRepo: StatRepository; + private readonly getWormchain: GetWormchainLogs; + + private cfg: PollWormchainLogsConfig; + private latestBlockHeight?: bigint; + private blockHeightCursor?: bigint; + private lastRange?: { fromBlock: bigint; toBlock: bigint }; + private getWormchainRecords: { [key: string]: any } = { + GetWormchainLogs, + }; + + constructor( + blockRepo: WormchainRepository, + metadataRepo: MetadataRepository, + statsRepo: StatRepository, + cfg: PollWormchainLogsConfig, + getWormchain: string + ) { + super(cfg.id, statsRepo, cfg.interval); + this.blockRepo = blockRepo; + this.metadataRepo = metadataRepo; + this.statsRepo = statsRepo; + this.cfg = cfg; + this.logger = winston.child({ module: "PollWormchain", label: this.cfg.id }); + this.getWormchain = new this.getWormchainRecords[getWormchain ?? "GetWormchainLogs"](blockRepo); + } + + protected async preHook(): Promise { + const metadata = await this.metadataRepo.get(this.cfg.id); + if (metadata) { + this.blockHeightCursor = BigInt(metadata.lastBlock); + } + } + + protected async hasNext(): Promise { + const hasFinished = this.cfg.hasFinished(this.blockHeightCursor); + if (hasFinished) { + this.logger.info( + `[hasNext] PollWormchain: (${this.cfg.id}) Finished processing all blocks from ${this.cfg.fromBlock} to ${this.cfg.toBlock}` + ); + } + + return !hasFinished; + } + + protected async get(): Promise { + const latestBlockHeight = await this.blockRepo.getBlockHeight(this.cfg.getCommitment()); + + if (!latestBlockHeight) { + throw new Error(`Could not obtain latest block height: ${latestBlockHeight}`); + } + + const range = this.getBlockRange(latestBlockHeight); + + const records = await this.getWormchain.execute(range, { + chain: this.cfg.chain, + chainId: this.cfg.chainId, + addresses: this.cfg.addresses, + topics: this.cfg.topics, + environment: this.cfg.environment, + }); + + this.lastRange = range; + + return records; + } + + private getBlockRange(latestBlockHeight: bigint): { + fromBlock: bigint; + toBlock: bigint; + } { + let fromBlock = this.blockHeightCursor + ? this.blockHeightCursor + 1n + : this.cfg.fromBlock ?? latestBlockHeight; + // fromBlock is configured and is greater than current block height, then we allow to skip blocks. + if ( + this.blockHeightCursor && + this.cfg.fromBlock && + this.cfg.fromBlock > this.blockHeightCursor + ) { + fromBlock = this.cfg.fromBlock; + } + + let toBlock = BigInt(fromBlock) + BigInt(this.cfg.getBlockBatchSize()); + // limit toBlock to obtained block height + if (toBlock > fromBlock && toBlock > latestBlockHeight) { + toBlock = latestBlockHeight; + } + // limit toBlock to configured toBlock + if (this.cfg.toBlock && toBlock > this.cfg.toBlock) { + toBlock = this.cfg.toBlock; + } + + return { fromBlock: BigInt(fromBlock), toBlock: BigInt(toBlock) }; + } + + protected async persist(): Promise { + this.blockHeightCursor = this.lastRange?.toBlock ?? this.blockHeightCursor; + if (this.blockHeightCursor) { + await this.metadataRepo.save(this.cfg.id, { lastBlock: this.blockHeightCursor }); + } + } + + protected report(): void { + const labels = { + job: this.cfg.id, + chain: this.cfg.chain ?? "", + commitment: this.cfg.getCommitment(), + }; + this.statsRepo.count("job_execution", labels); + this.statsRepo.measure("polling_cursor", this.latestBlockHeight ?? 0n, { + ...labels, + type: "max", + }); + this.statsRepo.measure("polling_cursor", this.blockHeightCursor ?? 0n, { + ...labels, + type: "current", + }); + } +} + +export type PollWormchainLogsMetadata = { + lastBlock: bigint; +}; + +export interface PollWormchainLogsConfigProps { + fromBlock?: bigint; + toBlock?: bigint; + blockBatchSize?: number; + commitment?: string; + interval?: number; + addresses: string[]; + topics: (string | string[])[]; + id?: string; + chain: string; + chainId: number; + environment: string; +} + +export class PollWormchainLogsConfig { + private props: PollWormchainLogsConfigProps; + + constructor(props: PollWormchainLogsConfigProps) { + if (props.fromBlock && props.toBlock && props.fromBlock > props.toBlock) { + throw new Error("fromBlock must be less than or equal to toBlock"); + } + + this.props = props; + } + + public getBlockBatchSize() { + return this.props.blockBatchSize ?? 100; + } + + public getCommitment() { + return this.props.commitment ?? "latest"; + } + + public hasFinished(currentFromBlock?: bigint): boolean { + return ( + currentFromBlock != undefined && + this.props.toBlock != undefined && + currentFromBlock >= this.props.toBlock + ); + } + + public get fromBlock() { + return this.props.fromBlock ? BigInt(this.props.fromBlock) : undefined; + } + + public setFromBlock(fromBlock: bigint | undefined) { + this.props.fromBlock = fromBlock; + } + + public get toBlock() { + return this.props.toBlock; + } + + public get interval() { + return this.props.interval; + } + + public get addresses() { + return this.props.addresses.map((address) => address.toLowerCase()); + } + + public get topics() { + return this.props.topics; + } + + public get id() { + return this.props.id ?? ID; + } + + public get chain() { + return this.props.chain; + } + + public get environment() { + return this.props.environment; + } + + public get chainId() { + return this.props.chainId; + } + + static fromBlock(chain: string, fromBlock: bigint) { + return new PollWormchainLogsConfig({ + chain, + fromBlock, + addresses: [], + topics: [], + environment: "", + chainId: 0, + }); + } +} diff --git a/blockchain-watcher/src/domain/entities/wormchain.ts b/blockchain-watcher/src/domain/entities/wormchain.ts new file mode 100644 index 00000000..0904b579 --- /dev/null +++ b/blockchain-watcher/src/domain/entities/wormchain.ts @@ -0,0 +1,13 @@ +export type WormchainLog = { + blockHeight: bigint; + timestamp: number; + transactions?: { + hash: string; + type: string; + attributes: { + key: string; + value: string; + index: boolean; + }[]; + }[]; +}; diff --git a/blockchain-watcher/src/domain/repositories.ts b/blockchain-watcher/src/domain/repositories.ts index f4e27abf..cf342b5c 100644 --- a/blockchain-watcher/src/domain/repositories.ts +++ b/blockchain-watcher/src/domain/repositories.ts @@ -83,6 +83,11 @@ export interface AptosRepository { ): Promise; } +export interface WormchainRepository { + getBlockHeight(finality: string): Promise; + getBlockLogs(blockNumber: bigint): Promise; +} + export interface MetadataRepository { get(id: string): Promise; save(id: string, metadata: Metadata): Promise; diff --git a/blockchain-watcher/src/infrastructure/mappers/wormchain/wormchainLogMessagePublishedMapper.ts b/blockchain-watcher/src/infrastructure/mappers/wormchain/wormchainLogMessagePublishedMapper.ts new file mode 100644 index 00000000..fb194b55 --- /dev/null +++ b/blockchain-watcher/src/infrastructure/mappers/wormchain/wormchainLogMessagePublishedMapper.ts @@ -0,0 +1,82 @@ +import { LogFoundEvent, LogMessagePublished } from "../../../domain/entities"; +import { WormchainLog } from "../../../domain/entities/wormchain"; +import winston from "winston"; + +const CHAIN_ID_WORMCHAIN = 22; +const CORE_ADDRESS = "wormhole1ufs3tlq4umljk0qfe8k5ya0x6hpavn897u2cnf9k0en9jr7qarqqaqfk2j"; + +let logger: winston.Logger = winston.child({ module: "wormchainLogMessagePublishedMapper" }); + +export const wormchainLogMessagePublishedMapper = ( + log: WormchainLog, + parsedArgs: ReadonlyArray +): LogFoundEvent | undefined => { + const { coreContract, sequence, emitter, hash } = transactionAttibutes(log); + + if (coreContract && sequence && emitter && hash) { + logger.info( + `[wormchain] Source event info: [tx: ][emitterChain: ${CHAIN_ID_WORMCHAIN}][sender: }}][sequence: ]` + ); + + return { + name: "log-message-published", + address: CORE_ADDRESS, + chainId: 0, + txHash: hash, + blockHeight: log.blockHeight, + blockTime: log.timestamp, + attributes: { + sender: emitter, + sequence: sequence, + payload: parsedArgs[3], + nonce: parsedArgs[2], + consistencyLevel: parsedArgs[4], + }, + }; + } +}; + +function transactionAttibutes(log: WormchainLog): TransactionAttributes { + let coreContract; + let sequence; + let emitter; + let hash; + + log.transactions?.forEach((tx) => { + hash = tx.hash; + + tx.attributes.forEach((attr) => { + const key = Buffer.from(attr.key, "base64").toString().toLowerCase(); + const value = Buffer.from(attr.value, "base64").toString().toLowerCase(); + + switch (key) { + case "message.sender": + emitter = value; + break; + case "message.sequence": + sequence = Number(value); + break; + case "_contract_address": + case "contract_address": + if (value.toLocaleLowerCase() === CORE_ADDRESS.toLowerCase()) { + coreContract = true; + } + break; + } + }); + }); + + return { + coreContract, + sequence, + emitter, + hash, + }; +} + +type TransactionAttributes = { + coreContract: boolean | undefined; + sequence: number | undefined; + emitter: string | undefined; + hash: string | undefined; +}; diff --git a/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts b/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts index cfa42cac..31d638e1 100644 --- a/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts +++ b/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts @@ -1,10 +1,17 @@ -import { AptosRepository, JobRepository, SuiRepository } from "../../domain/repositories"; import { RateLimitedAptosJsonRPCBlockRepository } from "./aptos/RateLimitedAptosJsonRPCBlockRepository"; import { RateLimitedEvmJsonRPCBlockRepository } from "./evm/RateLimitedEvmJsonRPCBlockRepository"; import { RateLimitedSuiJsonRPCBlockRepository } from "./sui/RateLimitedSuiJsonRPCBlockRepository"; +import { WormchainJsonRPCBlockRepository } from "./wormchain/WormchainJsonRPCBlockRepository"; +import { AptosJsonRPCBlockRepository } from "./aptos/AptosJsonRPCBlockRepository"; import { SNSClient, SNSClientConfig } from "@aws-sdk/client-sns"; import { InstrumentedHttpProvider } from "../rpc/http/InstrumentedHttpProvider"; import { Config } from "../config"; +import { + WormchainRepository, + AptosRepository, + JobRepository, + SuiRepository, +} from "../../domain/repositories"; import { InstrumentedConnection, InstrumentedSuiClient, @@ -27,8 +34,9 @@ import { SnsEventRepository, ProviderPoolMap, } from "."; -import { AptosJsonRPCBlockRepository } from "./aptos/AptosJsonRPCBlockRepository"; +import { RateLimitedWormchainJsonRPCBlockRepository } from "./wormchain/RateLimitedWormchainJsonRPCBlockRepository"; +const WORMCHAIN_CHAIN = "wormchain"; const SOLANA_CHAIN = "solana"; const APTOS_CHAIN = "aptos"; const EVM_CHAIN = "evm"; @@ -140,7 +148,7 @@ export class RepositoriesBuilder { } if (chain === APTOS_CHAIN) { - const pools = this.createAptosProviderPools(); + const pools = this.createDefaultProviderPools(APTOS_CHAIN); const aptosRepository = new RateLimitedAptosJsonRPCBlockRepository( new AptosJsonRPCBlockRepository(pools) @@ -148,6 +156,16 @@ export class RepositoriesBuilder { this.repositories.set("aptos-repo", aptosRepository); } + + if (chain === WORMCHAIN_CHAIN) { + const pools = this.createDefaultProviderPools(WORMCHAIN_CHAIN); + + const wormchainRepository = new RateLimitedWormchainJsonRPCBlockRepository( + new WormchainJsonRPCBlockRepository(pools) + ); + + this.repositories.set("wormchain-repo", wormchainRepository); + } }); this.repositories.set( @@ -164,6 +182,7 @@ export class RepositoriesBuilder { solanaSlotRepo: this.getSolanaSlotRepository(), suiRepo: this.getSuiRepository(), aptosRepo: this.getAptosRepository(), + wormchainRepo: this.getWormchainRepository(), } ) ); @@ -203,6 +222,10 @@ export class RepositoriesBuilder { return this.getRepo("aptos-repo"); } + public getWormchainRepository(): WormchainRepository { + return this.getRepo("wormchain-repo"); + } + private getRepo(name: string): any { const repo = this.repositories.get(name); if (!repo) throw new Error(`No repository ${name}`); @@ -240,11 +263,11 @@ export class RepositoriesBuilder { return pools; } - private createAptosProviderPools() { - const cfg = this.cfg.chains[APTOS_CHAIN]; + private createDefaultProviderPools(chain: string) { + const cfg = this.cfg.chains[chain]; const pools = providerPoolSupplier( cfg.rpcs.map((url) => ({ url })), - (rpcCfg: RpcConfig) => this.createHttpClient(APTOS_CHAIN, rpcCfg.url), + (rpcCfg: RpcConfig) => this.createHttpClient(chain, rpcCfg.url), POOL_STRATEGY ); return pools; diff --git a/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts b/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts index e7604905..85457eda 100644 --- a/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts +++ b/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts @@ -1,47 +1,55 @@ +import { FileMetadataRepository, SnsEventRepository } from "./index"; +import { JobDefinition, Handler, LogFoundEvent } from "../../domain/entities"; +import { aptosRedeemedTransactionFoundMapper } from "../mappers/aptos/aptosRedeemedTransactionFoundMapper"; +import { wormchainLogMessagePublishedMapper } from "../mappers/wormchain/wormchainLogMessagePublishedMapper"; +import { suiRedeemedTransactionFoundMapper } from "../mappers/sui/suiRedeemedTransactionFoundMapper"; +import { aptosLogMessagePublishedMapper } from "../mappers/aptos/aptosLogMessagePublishedMapper"; +import { suiLogMessagePublishedMapper } from "../mappers/sui/suiLogMessagePublishedMapper"; +import { HandleSolanaTransactions } from "../../domain/actions/solana/HandleSolanaTransactions"; +import { HandleAptosTransactions } from "../../domain/actions/aptos/HandleAptosTransactions"; +import { HandleEvmTransactions } from "../../domain/actions/evm/HandleEvmTransactions"; +import { HandleSuiTransactions } from "../../domain/actions/sui/HandleSuiTransactions"; +import { HandleWormchainLogs } from "../../domain/actions/wormchain/HandleWormchainLogs"; +import log from "../log"; import { + PollWormchainLogsConfigProps, + PollWormchainLogsConfig, + PollWormchain, +} from "../../domain/actions/wormchain/PollWormchain"; +import { + SolanaSlotRepository, + EvmBlockRepository, + MetadataRepository, + AptosRepository, + StatRepository, + JobRepository, + SuiRepository, + WormchainRepository, +} from "../../domain/repositories"; +import { + PollSolanaTransactionsConfig, + PollSolanaTransactions, + PollEvmLogsConfigProps, + PollEvmLogsConfig, + RunPollingJob, HandleEvmLogs, PollEvm, - PollEvmLogsConfig, - PollEvmLogsConfigProps, - PollSolanaTransactions, - PollSolanaTransactionsConfig, - RunPollingJob, } from "../../domain/actions"; -import { JobDefinition, Handler, LogFoundEvent } from "../../domain/entities"; -import { - AptosRepository, - EvmBlockRepository, - JobRepository, - MetadataRepository, - SolanaSlotRepository, - StatRepository, - SuiRepository, -} from "../../domain/repositories"; -import { FileMetadataRepository, SnsEventRepository } from "./index"; -import { HandleSolanaTransactions } from "../../domain/actions/solana/HandleSolanaTransactions"; import { + evmRedeemedTransactionFoundMapper, solanaLogMessagePublishedMapper, solanaTransferRedeemedMapper, evmLogMessagePublishedMapper, - evmRedeemedTransactionFoundMapper, } from "../mappers"; -import log from "../log"; -import { HandleEvmTransactions } from "../../domain/actions/evm/HandleEvmTransactions"; -import { suiRedeemedTransactionFoundMapper } from "../mappers/sui/suiRedeemedTransactionFoundMapper"; -import { HandleSuiTransactions } from "../../domain/actions/sui/HandleSuiTransactions"; -import { suiLogMessagePublishedMapper } from "../mappers/sui/suiLogMessagePublishedMapper"; import { - PollSuiTransactions, PollSuiTransactionsConfig, + PollSuiTransactions, } from "../../domain/actions/sui/PollSuiTransactions"; import { - PollAptos, - PollAptosTransactionsConfig, PollAptosTransactionsConfigProps, + PollAptosTransactionsConfig, + PollAptos, } from "../../domain/actions/aptos/PollAptos"; -import { HandleAptosTransactions } from "../../domain/actions/aptos/HandleAptosTransactions"; -import { aptosLogMessagePublishedMapper } from "../mappers/aptos/aptosLogMessagePublishedMapper"; -import { aptosRedeemedTransactionFoundMapper } from "../mappers/aptos/aptosRedeemedTransactionFoundMapper"; export class StaticJobRepository implements JobRepository { private fileRepo: FileMetadataRepository; @@ -59,6 +67,7 @@ export class StaticJobRepository implements JobRepository { private solanaSlotRepo: SolanaSlotRepository; private suiRepo: SuiRepository; private aptosRepo: AptosRepository; + private wormchainRepo: WormchainRepository; constructor( environment: string, @@ -72,6 +81,7 @@ export class StaticJobRepository implements JobRepository { solanaSlotRepo: SolanaSlotRepository; suiRepo: SuiRepository; aptosRepo: AptosRepository; + wormchainRepo: WormchainRepository; } ) { this.fileRepo = new FileMetadataRepository(path); @@ -82,6 +92,7 @@ export class StaticJobRepository implements JobRepository { this.solanaSlotRepo = repos.solanaSlotRepo; this.suiRepo = repos.suiRepo; this.aptosRepo = repos.aptosRepo; + this.wormchainRepo = repos.wormchainRepo; this.environment = environment; this.dryRun = dryRun; this.fill(); @@ -92,7 +103,6 @@ export class StaticJobRepository implements JobRepository { if (!persisted) { return Promise.resolve([]); } - return persisted; } @@ -101,7 +111,6 @@ export class StaticJobRepository implements JobRepository { if (!src) { throw new Error(`Source ${jobDef.source.action} not found`); } - return src(jobDef); } @@ -126,12 +135,21 @@ export class StaticJobRepository implements JobRepository { }; result.push((await maybeHandler(config, handler.target, mapper)).bind(maybeHandler)); } - return result; } + /** + * Fill all resources that applications needs to work + * Resources are: actions, mappers, targets and handlers + */ private fill() { - // Actions + this.loadActions(); + this.loadMappers(); + this.loadTargets(); + this.loadHandlers(); + } + + private loadActions(): void { const pollEvm = (jobDef: JobDefinition) => new PollEvm( this.blockRepoProvider(jobDef.source.config.chain), @@ -156,7 +174,6 @@ export class StaticJobRepository implements JobRepository { this.metadataRepo, this.suiRepo ); - const pollAptos = (jobDef: JobDefinition) => new PollAptos( new PollAptosTransactionsConfig({ @@ -169,12 +186,27 @@ export class StaticJobRepository implements JobRepository { this.aptosRepo, jobDef.source.records ); + const pollWormchain = (jobDef: JobDefinition) => + new PollWormchain( + this.wormchainRepo, + this.metadataRepo, + this.statsRepo, + new PollWormchainLogsConfig({ + ...(jobDef.source.config as PollWormchainLogsConfigProps), + id: jobDef.id, + environment: this.environment, + }), + jobDef.source.records + ); + this.sources.set("PollEvm", pollEvm); this.sources.set("PollSolanaTransactions", pollSolanaTransactions); this.sources.set("PollSuiTransactions", pollSuiTransactions); this.sources.set("PollAptos", pollAptos); + this.sources.set("PollWormchain", pollWormchain); + } - // Mappers + private loadMappers(): void { this.mappers.set("evmLogMessagePublishedMapper", evmLogMessagePublishedMapper); this.mappers.set("evmRedeemedTransactionFoundMapper", evmRedeemedTransactionFoundMapper); this.mappers.set("solanaLogMessagePublishedMapper", solanaLogMessagePublishedMapper); @@ -183,16 +215,20 @@ export class StaticJobRepository implements JobRepository { this.mappers.set("suiRedeemedTransactionFoundMapper", suiRedeemedTransactionFoundMapper); this.mappers.set("aptosLogMessagePublishedMapper", aptosLogMessagePublishedMapper); this.mappers.set("aptosRedeemedTransactionFoundMapper", aptosRedeemedTransactionFoundMapper); + this.mappers.set("wormchainLogMessagePublishedMapper", wormchainLogMessagePublishedMapper); + } - // Targets + private loadTargets(): void { const snsTarget = () => this.snsRepo.asTarget(); const dummyTarget = async () => async (events: any[]) => { log.info(`[target dummy] Got ${events.length} events`); }; + this.targets.set("sns", snsTarget); this.targets.set("dummy", dummyTarget); + } - // Handles + private loadHandlers(): void { const handleEvmLogs = async (config: any, target: string, mapper: any) => { const instance = new HandleEvmLogs>( config, @@ -200,7 +236,6 @@ export class StaticJobRepository implements JobRepository { await this.targets.get(this.dryRun ? "dummy" : target)!(), this.statsRepo ); - return instance.handle.bind(instance); }; const handleEvmTransactions = async (config: any, target: string, mapper: any) => { @@ -210,7 +245,6 @@ export class StaticJobRepository implements JobRepository { await this.targets.get(this.dryRun ? "dummy" : target)!(), this.statsRepo ); - return instance.handle.bind(instance); }; const handleSolanaTx = async (config: any, target: string, mapper: any) => { @@ -220,7 +254,6 @@ export class StaticJobRepository implements JobRepository { await this.getTarget(target), this.statsRepo ); - return instance.handle.bind(instance); }; const handleSuiTx = async (config: any, target: string, mapper: any) => { @@ -230,7 +263,6 @@ export class StaticJobRepository implements JobRepository { await this.getTarget(target), this.statsRepo ); - return instance.handle.bind(instance); }; const handleAptosTx = async (config: any, target: string, mapper: any) => { @@ -240,7 +272,16 @@ export class StaticJobRepository implements JobRepository { await this.getTarget(target), this.statsRepo ); + return instance.handle.bind(instance); + }; + const handleWormchainLogs = async (config: any, target: string, mapper: any) => { + const instance = new HandleWormchainLogs( + config, + mapper, + await this.getTarget(target), + this.statsRepo + ); return instance.handle.bind(instance); }; @@ -249,6 +290,7 @@ export class StaticJobRepository implements JobRepository { this.handlers.set("HandleSolanaTransactions", handleSolanaTx); this.handlers.set("HandleSuiTransactions", handleSuiTx); this.handlers.set("HandleAptosTransactions", handleAptosTx); + this.handlers.set("HandleWormchainLogs", handleWormchainLogs); } private async getTarget(target: string): Promise<(items: any[]) => Promise> { @@ -256,7 +298,6 @@ export class StaticJobRepository implements JobRepository { if (!maybeTarget) { throw new Error(`Target ${target} not found`); } - return maybeTarget(); } } diff --git a/blockchain-watcher/src/infrastructure/repositories/wormchain/RateLimitedWormchainJsonRPCBlockRepository.ts b/blockchain-watcher/src/infrastructure/repositories/wormchain/RateLimitedWormchainJsonRPCBlockRepository.ts new file mode 100644 index 00000000..b3f6f989 --- /dev/null +++ b/blockchain-watcher/src/infrastructure/repositories/wormchain/RateLimitedWormchainJsonRPCBlockRepository.ts @@ -0,0 +1,22 @@ +import { RateLimitedRPCRepository } from "../RateLimitedRPCRepository"; +import { WormchainRepository } from "../../../domain/repositories"; +import { Options } from "../common/rateLimitedOptions"; +import winston from "winston"; + +export class RateLimitedWormchainJsonRPCBlockRepository + extends RateLimitedRPCRepository + implements WormchainRepository +{ + constructor(delegate: WormchainRepository, opts: Options = { period: 10_000, limit: 1000 }) { + super(delegate, opts); + this.logger = winston.child({ module: "RateLimitedWormchainJsonRPCBlockRepository" }); + } + + getBlockHeight(finality: string): Promise { + return this.breaker.fn(() => this.delegate.getBlockHeight(finality)).execute(); + } + + getBlockLogs(blockNumber: bigint): Promise { + return this.breaker.fn(() => this.delegate.getBlockLogs(blockNumber)).execute(); + } +} diff --git a/blockchain-watcher/src/infrastructure/repositories/wormchain/WormchainJsonRPCBlockRepository.ts b/blockchain-watcher/src/infrastructure/repositories/wormchain/WormchainJsonRPCBlockRepository.ts new file mode 100644 index 00000000..9ce9c4ad --- /dev/null +++ b/blockchain-watcher/src/infrastructure/repositories/wormchain/WormchainJsonRPCBlockRepository.ts @@ -0,0 +1,248 @@ +import { InstrumentedHttpProvider } from "../../rpc/http/InstrumentedHttpProvider"; +import { WormchainRepository } from "../../../domain/repositories"; +import { divideIntoBatches } from "../common/utils"; +import { ProviderPool } from "@xlabs/rpc-pool"; +import { WormchainLog } from "../../../domain/entities/wormchain"; +import { SHA256 } from "jscrypto/SHA256"; +import { Base64 } from "jscrypto/Base64"; +import winston from "winston"; + +type ProviderPoolMap = ProviderPool; + +export class WormchainJsonRPCBlockRepository implements WormchainRepository { + private readonly logger: winston.Logger; + protected pool: ProviderPoolMap; + + constructor(pool: ProviderPool) { + this.logger = winston.child({ module: "WormchainJsonRPCBlockRepository" }); + this.pool = pool; + } + + async getBlockHeight(finality: string): Promise { + try { + const endpoint = `/abci_info`; + let results: ResultBlockHeight; + + results = await this.pool.get().get({ endpoint }); + + if ( + results && + results.result && + results.result.response && + results.result.response.last_block_height + ) { + const blockHeight = results.result.response.last_block_height; + return BigInt(blockHeight); + } + return undefined; + } catch (e) { + this.handleError(`Error: ${e}`, "getBlockHeight"); + throw e; + } + } + + async getBlockLogs(blockNumber: bigint): Promise { + try { + const blockEndpoint = `/block?height=${blockNumber}`; + let resultsBlock: ResultBlock; + + resultsBlock = await this.pool.get().get({ endpoint: blockEndpoint }); + const txs = resultsBlock.result.block.data.txs; + + if (!txs) { + return { + transactions: [], + blockHeight: BigInt(resultsBlock.result.block.header.height), + timestamp: Number(resultsBlock.result.block.header.time), + }; + } + + const transactions: TransactionType[] = []; + const hashNumbers = new Set(txs.map((tx) => tx)); + const batches = divideIntoBatches(hashNumbers, 10); + + for (const batch of batches) { + for (let hashBatch of batch) { + let resultTransaction: ResultTransaction; + const hash: string = this.hexToHash(hashBatch); + const txEndpoint = `/tx?hash=0x${hash}`; + resultTransaction = await this.pool + .get() + .get({ endpoint: txEndpoint }); + + if ( + resultTransaction && + resultTransaction.result.tx_result && + resultTransaction.result.tx_result.events + ) { + resultTransaction.result.tx_result.events.filter((event) => { + if (event.type === "wasm") { + transactions.push({ + hash: `0x${hash}`.toLocaleLowerCase(), + type: event.type, + attributes: event.attributes, + }); + } + }); + } + } + } + + const dateTime: Date = new Date(resultsBlock.result.block.header.time); + const timestamp: number = dateTime.getTime(); + + return { + transactions: transactions || [], + blockHeight: BigInt(resultsBlock.result.block.header.height), + timestamp: timestamp, + }; + } catch (e) { + this.handleError(`Error: ${e}`, "getBlockHeight"); + throw e; + } + } + + private handleError(e: any, method: string) { + this.logger.error(`[wormchain] Error calling ${method}: ${e.message ?? e}`); + } + + private hexToHash(data: string): string { + return SHA256.hash(Base64.parse(data)).toString().toUpperCase(); + } +} + +type ResultBlockHeight = { result: { response: { last_block_height: string } } }; + +type ResultBlock = { + result: { + block_id: { + hash: string; + parts: { + total: number; + hash: string; + }; + }; + block: { + header: { + version: { block: string }; + chain_id: string; + height: string; + time: string; // eg. '2023-01-03T12:13:00.849094631Z' + last_block_id: { hash: string; parts: { total: number; hash: string } }; + last_commit_hash: string; + data_hash: string; + validators_hash: string; + next_validators_hash: string; + consensus_hash: string; + app_hash: string; + last_results_hash: string; + evidence_hash: string; + proposer_address: string; + }; + data: { txs: string[] | null }; + evidence: { evidence: null }; + last_commit: { + height: string; + round: number; + block_id: { hash: string; parts: { total: number; hash: string } }; + signatures: string[]; + }; + }; + }; +}; + +type ResultTransaction = { + result: { + tx: { + body: { + messages: string[]; + memo: string; + timeout_height: string; + extension_options: []; + non_critical_extension_options: []; + }; + auth_info: { + signer_infos: string[]; + fee: { + amount: [{ denom: string; amount: string }]; + gas_limit: string; + payer: string; + granter: string; + }; + }; + signatures: string[]; + }; + tx_result: { + height: string; + txhash: string; + codespace: string; + code: 0; + data: string; + raw_log: string; + logs: [{ msg_index: number; log: string; events: EventsType }]; + info: string; + gas_wanted: string; + gas_used: string; + tx: { + "@type": "/cosmos.tx.v1beta1.Tx"; + body: { + messages: [ + { + "@type": "/cosmos.staking.v1beta1.MsgBeginRedelegate"; + delegator_address: string; + validator_src_address: string; + validator_dst_address: string; + amount: { denom: string; amount: string }; + } + ]; + memo: ""; + timeout_height: "0"; + extension_options: []; + non_critical_extension_options: []; + }; + auth_info: { + signer_infos: [ + { + public_key: { + "@type": "/cosmos.crypto.secp256k1.PubKey"; + key: string; + }; + mode_info: { single: { mode: string } }; + sequence: string; + } + ]; + fee: { + amount: [{ denom: string; amount: string }]; + gas_limit: string; + payer: string; + granter: string; + }; + }; + signatures: string[]; + }; + timestamp: string; // eg. '2023-01-03T12:12:54Z' + events: EventsType[]; + }; + }; +}; + +type EventsType = { + type: string; + attributes: [ + { + key: string; + value: string; + index: boolean; + } + ]; +}; + +type TransactionType = { + hash: string; + type: string; + attributes: { + key: string; + value: string; + index: boolean; + }[]; +}; diff --git a/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactions.test.ts b/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactions.test.ts index 3b3de2ba..9faa9944 100644 --- a/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactions.test.ts +++ b/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactions.test.ts @@ -125,7 +125,7 @@ describe("GetAptosTransactions", () => { givenPollAptosTx(cfg); // When - await whenPollEvmLogsStarts(); + await whenPollAptosLogsStarts(); // Then await thenWaitForAssertion( @@ -277,7 +277,7 @@ describe("GetAptosTransactions", () => { givenPollAptosTx(cfg); // Whem - await whenPollEvmLogsStarts(); + await whenPollAptosLogsStarts(); // Then await thenWaitForAssertion( @@ -297,7 +297,7 @@ describe("GetAptosTransactions", () => { givenPollAptosTx(cfg); // Whem - await whenPollEvmLogsStarts(); + await whenPollAptosLogsStarts(); // Then await thenWaitForAssertion( @@ -316,7 +316,7 @@ describe("GetAptosTransactions", () => { givenPollAptosTx(cfg); // Whem - await whenPollEvmLogsStarts(); + await whenPollAptosLogsStarts(); // Then await thenWaitForAssertion( @@ -381,6 +381,6 @@ const givenPollAptosTx = (cfg: PollAptosTransactionsConfig) => { pollAptos = new PollAptos(cfg, statsRepo, metadataRepo, aptosRepo, "GetAptosTransactions"); }; -const whenPollEvmLogsStarts = async () => { +const whenPollAptosLogsStarts = async () => { pollAptos.run([handlers.working]); }; diff --git a/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactionsByEvents.test.ts b/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactionsByEvents.test.ts index 0e218020..1c982780 100644 --- a/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactionsByEvents.test.ts +++ b/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactionsByEvents.test.ts @@ -64,7 +64,7 @@ describe("GetAptosTransactionsByEvents", () => { givenPollAptosTx(cfg); // When - await whenPollEvmLogsStarts(); + await whenPollAptosLogsStarts(); // Then await thenWaitForAssertion( @@ -94,7 +94,7 @@ describe("GetAptosTransactionsByEvents", () => { givenPollAptosTx(cfg); // When - await whenPollEvmLogsStarts(); + await whenPollAptosLogsStarts(); // Then await thenWaitForAssertion( @@ -124,7 +124,7 @@ describe("GetAptosTransactionsByEvents", () => { givenPollAptosTx(cfg); // When - await whenPollEvmLogsStarts(); + await whenPollAptosLogsStarts(); // Then await thenWaitForAssertion( @@ -154,7 +154,7 @@ describe("GetAptosTransactionsByEvents", () => { givenPollAptosTx(cfg); // When - await whenPollEvmLogsStarts(); + await whenPollAptosLogsStarts(); // Then await thenWaitForAssertion( @@ -311,6 +311,6 @@ const givenPollAptosTx = (cfg: PollAptosTransactionsConfig) => { ); }; -const whenPollEvmLogsStarts = async () => { +const whenPollAptosLogsStarts = async () => { pollAptos.run([handlers.working]); }; diff --git a/blockchain-watcher/test/domain/actions/wormchain/GetWormchainLogs.test.ts b/blockchain-watcher/test/domain/actions/wormchain/GetWormchainLogs.test.ts new file mode 100644 index 00000000..8dbf1915 --- /dev/null +++ b/blockchain-watcher/test/domain/actions/wormchain/GetWormchainLogs.test.ts @@ -0,0 +1,165 @@ +import { afterEach, describe, it, expect, jest } from "@jest/globals"; +import { thenWaitForAssertion } from "../../../wait-assertion"; +import { AptosTransaction } from "../../../../src/domain/entities/aptos"; +import { + PollWormchainLogsMetadata, + PollWormchainLogsConfig, + PollWormchain, +} from "../../../../src/domain/actions/wormchain/PollWormchain"; +import { + WormchainRepository, + MetadataRepository, + StatRepository, +} from "../../../../src/domain/repositories"; + +let getBlockHeightSpy: jest.SpiedFunction; +let getBlockLogsSpy: jest.SpiedFunction; +let metadataSaveSpy: jest.SpiedFunction["save"]>; + +let handlerSpy: jest.SpiedFunction<(txs: AptosTransaction[]) => Promise>; + +let metadataRepo: MetadataRepository; +let wormchainRepo: WormchainRepository; +let statsRepo: StatRepository; + +let handlers = { + working: (txs: AptosTransaction[]) => Promise.resolve(), + failing: (txs: AptosTransaction[]) => Promise.reject(), +}; +let pollWormchain: PollWormchain; + +let props = { + blockBatchSize: 100, + from: 0n, + limit: 0n, + environment: "testnet", + commitment: "immediate", + addresses: ["wormhole1ufs3tlq4umljk0qfe8k5ya0x6hpavn897u2cnf9k0en9jr7qarqqaqfk2j"], + interval: 5000, + topics: [], + chainId: 0, // TODO: Change to the correct chainId + filter: { + address: "wormhole1ufs3tlq4umljk0qfe8k5ya0x6hpavn897u2cnf9k0en9jr7qarqqaqfk2j", + }, + chain: "wormchain", + id: "poll-log-message-published-wormchain", +}; + +let cfg = new PollWormchainLogsConfig(props); + +describe("GetWormchainLogs", () => { + afterEach(async () => { + await pollWormchain.stop(); + }); + + it("should be skip the transations blocks, because the transactions will be undefined", async () => { + // Given + givenAptosBlockRepository(7606614n); + givenMetadataRepository({ lastBlock: 7606613n }); + givenStatsRepository(); + givenPollWormchainTx(cfg); + + // When + await whenPollWormchainLogsStarts(); + + // Then + await thenWaitForAssertion(() => expect(getBlockLogsSpy).toBeCalledWith(7606614n)); + }); + + it("should be process the log because it contains wasm transactions", async () => { + // Given + const log = { + transactions: [ + { + hash: "0xd84a9c85170c28b12a1436082e99c1ea2598cbf36f9e263bfc0b7fb79a972dfe", + type: "wasm", + attributes: [ + { + key: "X2NvbnRyYWN0X2FkZHJlc3M=", + value: + "d29ybWhvbGUxNGhqMnRhdnE4ZnBlc2R3eHhjdTQ0cnR5M2hoOTB2aHVqcnZjbXN0bDR6cjN0eG1mdnc5c3JyZzQ2NQ==", + index: true, + }, + { key: "YWN0aW9u", value: "c3VibWl0X29ic2VydmF0aW9ucw==", index: true }, + { + key: "b3duZXI=", + value: "d29ybWhvbGUxcWdqZmRmNWczMnN4d2pqanduNHZwOGZnZzJmdzBtOHh4aDdheGM=", + index: true, + }, + ], + }, + { + hash: "0x9042d7f656f2292e8a4bfa9468ee8215fd6de9ff23b447e20f96f6a70559df68", + type: "wasm", + attributes: [ + { + key: "X2NvbnRyYWN0X2FkZHJlc3M=", + value: + "d29ybWhvbGUxNGhqMnRhdnE4ZnBlc2R3eHhjdTQ0cnR5M2hoOTB2aHVqcnZjbXN0bDR6cjN0eG1mdnc5c3JyZzQ2NQ==", + index: true, + }, + { key: "YWN0aW9u", value: "c3VibWl0X29ic2VydmF0aW9ucw==", index: true }, + { + key: "b3duZXI=", + value: "d29ybWhvbGUxNW5rbTdhdnB4eHNuY3I0Z2c4dTJxbDdnY2tsbTJrcmt6d2U3N20=", + index: true, + }, + ], + }, + ], + blockHeight: "7606615", + timestamp: 1711025902418, + }; + + givenAptosBlockRepository(7606615n, log); + givenMetadataRepository({ lastBlock: 7606614n }); + givenStatsRepository(); + givenPollWormchainTx(cfg); + + // When + await whenPollWormchainLogsStarts(); + + // Then + await thenWaitForAssertion(() => expect(getBlockLogsSpy).toBeCalledWith(7606615n)); + }); +}); + +const givenAptosBlockRepository = (blockHeigh: bigint, log: any = {}) => { + wormchainRepo = { + getBlockHeight: () => Promise.resolve(blockHeigh), + getBlockLogs: () => Promise.resolve(log), + }; + + getBlockHeightSpy = jest.spyOn(wormchainRepo, "getBlockHeight"); + getBlockLogsSpy = jest.spyOn(wormchainRepo, "getBlockLogs"); +}; + +const givenMetadataRepository = (data?: PollWormchainLogsMetadata) => { + metadataRepo = { + get: () => Promise.resolve(data), + save: () => Promise.resolve(), + }; + metadataSaveSpy = jest.spyOn(metadataRepo, "save"); +}; + +const givenStatsRepository = () => { + statsRepo = { + count: () => {}, + measure: () => {}, + report: () => Promise.resolve(""), + }; +}; + +const givenPollWormchainTx = (cfg: PollWormchainLogsConfig) => { + pollWormchain = new PollWormchain( + wormchainRepo, + metadataRepo, + statsRepo, + cfg, + "GetWormchainLogs" + ); +}; + +const whenPollWormchainLogsStarts = async () => { + pollWormchain.run([handlers.working]); +}; diff --git a/blockchain-watcher/test/domain/actions/wormchain/HandleWormchainLogs.test.ts b/blockchain-watcher/test/domain/actions/wormchain/HandleWormchainLogs.test.ts new file mode 100644 index 00000000..cbaf6360 --- /dev/null +++ b/blockchain-watcher/test/domain/actions/wormchain/HandleWormchainLogs.test.ts @@ -0,0 +1,120 @@ +import { afterEach, describe, it, expect, jest } from "@jest/globals"; +import { StatRepository } from "../../../../src/domain/repositories"; +import { LogFoundEvent } from "../../../../src/domain/entities"; +import { WormchainLog } from "../../../../src/domain/entities/wormchain"; +import { + HandleWormchainLogsOptions, + HandleWormchainLogs, +} from "../../../../src/domain/actions/wormchain/HandleWormchainLogs"; + +let targetRepoSpy: jest.SpiedFunction<(typeof targetRepo)["save"]>; +let statsRepo: StatRepository; + +let handleWormchainLogs: HandleWormchainLogs; +let logs: WormchainLog[]; +let cfg: HandleWormchainLogsOptions; + +describe("HandleWormchainLogs", () => { + afterEach(async () => {}); + + it("should be able to map source events log", async () => { + // Given + givenConfig(); + givenStatsRepository(); + givenHandleEvmLogs(); + + // When + const result = await handleWormchainLogs.handle(logs); + + // Then + expect(result).toHaveLength(1); + expect(result[0].name).toBe("log-message-published"); + expect(result[0].chainId).toBe(0); // TODO: check this attribute + expect(result[0].txHash).toBe( + "0x7f61bf387fdb700d32d2b40ccecfb70ae46a2f82775242d04202bb7a538667c6" + ); + expect(result[0].address).toBe( + "wormhole1ufs3tlq4umljk0qfe8k5ya0x6hpavn897u2cnf9k0en9jr7qarqqaqfk2j" + ); + }); +}); + +const mapper = (tx: WormchainLog) => { + return { + name: "log-message-published", + address: "wormhole1ufs3tlq4umljk0qfe8k5ya0x6hpavn897u2cnf9k0en9jr7qarqqaqfk2j", + chainId: 0, // TODO: check this attribute + txHash: "0x7f61bf387fdb700d32d2b40ccecfb70ae46a2f82775242d04202bb7a538667c6", + blockHeight: 153549311n, + blockTime: 1709645685704036, + attributes: { + sender: "wormhole1ufs3tlq4umljk0qfe8k5ya0x6hpavn897u2cnf9k0en9jr7qarqqaqfk2j", // TODO: check this attribute + sequence: 203, + payload: "", + nonce: 75952, + consistencyLevel: 0, + protocol: "Token Bridge", + }, + }; +}; + +const targetRepo = { + save: async (events: LogFoundEvent>[]) => { + Promise.resolve(); + }, + failingSave: async (events: LogFoundEvent>[]) => { + Promise.reject(); + }, +}; + +const givenHandleEvmLogs = (targetFn: "save" | "failingSave" = "save") => { + targetRepoSpy = jest.spyOn(targetRepo, targetFn); + handleWormchainLogs = new HandleWormchainLogs(cfg, mapper, () => Promise.resolve(), statsRepo); +}; + +const givenConfig = () => { + cfg = { + id: "poll-log-message-published-wormchain", + metricName: "process_source_event", + metricLabels: { + job: "poll-log-message-published-wormchain", + chain: "wormchain", + commitment: "immediate", + }, + }; +}; + +const givenStatsRepository = () => { + statsRepo = { + count: () => {}, + measure: () => {}, + report: () => Promise.resolve(""), + }; +}; + +logs = [ + { + transactions: [ + { + hash: "0x7f61bf387fdb700d32d2b40ccecfb70ae46a2f82775242d04202bb7a538667c6", + type: "wasm", + attributes: [ + { + key: "X2NvbnRyYWN0X2FkZHJlc3M=", + value: + "d29ybWhvbGUxNGhqMnRhdnE4ZnBlc2R3eHhjdTQ0cnR5M2hoOTB2aHVqcnZjbXN0bDR6cjN0eG1mdnc5c3JyZzQ2NQ==", + index: true, + }, + { key: "YWN0aW9u", value: "c3VibWl0X29ic2VydmF0aW9ucw==", index: true }, + { + key: "b3duZXI=", + value: "d29ybWhvbGUxOHl3NmY4OHA3Znc2bTk5eDlrbnJmejNwMHk2OTNoaDBhaDh5Mm0=", + index: true, + }, + ], + }, + ], + blockHeight: BigInt(7606614), + timestamp: 1711025896481, + }, +]; diff --git a/blockchain-watcher/test/infrastructure/repositories/RepositoriesBuilder.test.ts b/blockchain-watcher/test/infrastructure/repositories/RepositoriesBuilder.test.ts index b2d790c3..8954e212 100644 --- a/blockchain-watcher/test/infrastructure/repositories/RepositoriesBuilder.test.ts +++ b/blockchain-watcher/test/infrastructure/repositories/RepositoriesBuilder.test.ts @@ -1,6 +1,8 @@ import { mockRpcPool } from "../../mocks/mockRpcPool"; mockRpcPool(); +import { RateLimitedWormchainJsonRPCBlockRepository } from "../../../src/infrastructure/repositories/wormchain/RateLimitedWormchainJsonRPCBlockRepository"; +import { RateLimitedAptosJsonRPCBlockRepository } from "../../../src/infrastructure/repositories/aptos/RateLimitedAptosJsonRPCBlockRepository"; import { RateLimitedEvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories/evm/RateLimitedEvmJsonRPCBlockRepository"; import { RateLimitedSuiJsonRPCBlockRepository } from "../../../src/infrastructure/repositories/sui/RateLimitedSuiJsonRPCBlockRepository"; import { describe, expect, it } from "@jest/globals"; @@ -12,7 +14,6 @@ import { RateLimitedSolanaSlotRepository, SnsEventRepository, } from "../../../src/infrastructure/repositories"; -import { RateLimitedAptosJsonRPCBlockRepository } from "../../../src/infrastructure/repositories/aptos/RateLimitedAptosJsonRPCBlockRepository"; describe("RepositoriesBuilder", () => { it("should be throw error because dose not have any chain", async () => { @@ -97,12 +98,14 @@ describe("RepositoriesBuilder", () => { expect(repos.getEvmBlockRepository("ethereum-holesky")).toBeInstanceOf( RateLimitedEvmJsonRPCBlockRepository ); - expect(repos.getAptosRepository()).toBeInstanceOf(RateLimitedAptosJsonRPCBlockRepository); expect(repos.getMetadataRepository()).toBeInstanceOf(FileMetadataRepository); expect(repos.getSnsEventRepository()).toBeInstanceOf(SnsEventRepository); expect(repos.getStatsRepository()).toBeInstanceOf(PromStatRepository); expect(repos.getSolanaSlotRepository()).toBeInstanceOf(RateLimitedSolanaSlotRepository); expect(repos.getSuiRepository()).toBeInstanceOf(RateLimitedSuiJsonRPCBlockRepository); expect(repos.getAptosRepository()).toBeInstanceOf(RateLimitedAptosJsonRPCBlockRepository); + expect(repos.getWormchainRepository()).toBeInstanceOf( + RateLimitedWormchainJsonRPCBlockRepository + ); }); }); diff --git a/blockchain-watcher/test/infrastructure/repositories/StaticJobRepository.test.ts b/blockchain-watcher/test/infrastructure/repositories/StaticJobRepository.test.ts index 0269c19c..8d0e9f5b 100644 --- a/blockchain-watcher/test/infrastructure/repositories/StaticJobRepository.test.ts +++ b/blockchain-watcher/test/infrastructure/repositories/StaticJobRepository.test.ts @@ -11,6 +11,7 @@ import { SolanaSlotRepository, StatRepository, SuiRepository, + WormchainRepository, } from "../../../src/domain/repositories"; const dirPath = "./metadata-repo/jobs"; @@ -21,6 +22,7 @@ const snsRepo = {} as any as SnsEventRepository; const solanaSlotRepo = {} as any as SolanaSlotRepository; const suiRepo = {} as any as SuiRepository; const aptosRepo = {} as any as AptosRepository; +const wormchainRepo = {} as any as WormchainRepository; let repo: StaticJobRepository; @@ -36,6 +38,7 @@ describe("StaticJobRepository", () => { solanaSlotRepo, suiRepo, aptosRepo, + wormchainRepo, }); }); diff --git a/blockchain-watcher/test/mocks/configMock.ts b/blockchain-watcher/test/mocks/configMock.ts index 832157b4..2fa94540 100644 --- a/blockchain-watcher/test/mocks/configMock.ts +++ b/blockchain-watcher/test/mocks/configMock.ts @@ -150,6 +150,13 @@ export const configMock = (): Config => { rpcs: ["http://localhost"], timeout: 10000, }, + wormchain: { + name: "wormchain", + network: "testnet", + chainId: 0, // TODO: check this attribute + rpcs: ["http://localhost"], + timeout: 10000, + }, }; const snsConfig: SnsConfig = { @@ -177,7 +184,7 @@ export const configMock = (): Config => { dir: "./metadata-repo/jobs", }, chains: chainsRecord, - enabledPlatforms: ["solana", "evm", "sui", "aptos"], + enabledPlatforms: ["solana", "evm", "sui", "aptos", "wormchain"], }; return cfg; diff --git a/deploy/blockchain-watcher/workers/source-events-1.yaml b/deploy/blockchain-watcher/workers/source-events-1.yaml index 45fef25f..a44f67d7 100644 --- a/deploy/blockchain-watcher/workers/source-events-1.yaml +++ b/deploy/blockchain-watcher/workers/source-events-1.yaml @@ -223,20 +223,20 @@ data: }, "handlers": [ { - "action": "HandleEvmLogs", - "target": "sns", - "mapper": "evmLogMessagePublishedMapper", - "config": { - "abi": "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)", - "filter": { - "addresses": [ - "0x7bbcE28e64B3F8b84d876Ab298393c38ad7aac4C" - ], - "topics": [ - "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2" - ] - }, - "metricName": "process_source_event" + "action": "HandleEvmLogs", + "target": "sns", + "mapper": "evmLogMessagePublishedMapper", + "config": { + "abi": "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)", + "filter": { + "addresses": [ + "0x7bbcE28e64B3F8b84d876Ab298393c38ad7aac4C" + ], + "topics": [ + "0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2" + ] + }, + "metricName": "process_source_event" } } ] @@ -394,7 +394,7 @@ data: "config": { "abi": "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)", "filter": { - "addresses": ["0x98f3c9e6E3fAce36bAAd05FE09d375Ef1464288B"], + "addresses": ["0x98f3c9e6E3fAce36bAAd05FE09d375Ef1464288B", "0x27428dd2d3dd32a4d7f7c497eaaa23130d894911", "0x3ee18b2214aff97000d974cf647e7c347e8fa585"], "topics": ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"] }, "metricName": "process_source_event" diff --git a/deploy/blockchain-watcher/workers/source-events-2.yaml b/deploy/blockchain-watcher/workers/source-events-2.yaml index 873e3c85..ab3c6174 100644 --- a/deploy/blockchain-watcher/workers/source-events-2.yaml +++ b/deploy/blockchain-watcher/workers/source-events-2.yaml @@ -245,6 +245,35 @@ data: } } ] + }, + { + "id": "poll-log-message-published-wormchain", + "chain": "wormchain", + "source": { + "action": "PollWormchain", + "config": { + "blockBatchSize": 100, + "commitment": "latest", + "interval": 5000, + "addresses": ["wormhole16jzpxp0e8550c9aht6q9svcux30vtyyyyxv5w2l2djjra46580wsazcjwp"], + "chain": "wormchain", + "chainId": 0 // TODO: Update chainId + } + }, + "handlers": [ + { + "action": "HandleWormchainLogs", + "target": "sns", + "mapper": "wormchainLogMessagePublishedMapper", + "config": { + "abi": "", + "filter": { + "addresses": ["wormhole16jzpxp0e8550c9aht6q9svcux30vtyyyyxv5w2l2djjra46580wsazcjwp"] + }, + "metricName": "process_source_event" + } + } + ] } ] mainnet-jobs.json: |- @@ -458,6 +487,35 @@ data: } } ] + }, + { + "id": "poll-log-message-published-wormchain", + "chain": "wormchain", + "source": { + "action": "PollWormchain", + "config": { + "blockBatchSize": 100, + "commitment": "latest", + "interval": 5000, + "addresses": ["wormhole1ufs3tlq4umljk0qfe8k5ya0x6hpavn897u2cnf9k0en9jr7qarqqaqfk2j"], + "chain": "wormchain", + "chainId": 0 // TODO: Update chainId + } + }, + "handlers": [ + { + "action": "HandleWormchainLogs", + "target": "sns", + "mapper": "wormchainLogMessagePublishedMapper", + "config": { + "abi": "", + "filter": { + "addresses": ["wormhole1ufs3tlq4umljk0qfe8k5ya0x6hpavn897u2cnf9k0en9jr7qarqqaqfk2j"] + }, + "metricName": "process_source_event" + } + } + ] } ] ---