From 752f6117f5c2dbdbbe8951068414a42bf680ac3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mat=C3=ADas=20Mart=C3=ADnez?= <131624652+mat1asm@users.noreply.github.com> Date: Tue, 9 Jul 2024 13:54:17 -0300 Subject: [PATCH] [blockchain watcher] add influxdb as supported target (#1539) * make targets optional + fix tests * move filtering errors to solana mappers * increase coverage requirement * add influx unit test * not allowing objects or arrays to be part of influx points --- .../config/custom-environment-variables.json | 3 + blockchain-watcher/config/default.json | 1 + blockchain-watcher/package-lock.json | 6 + blockchain-watcher/package.json | 3 +- .../solana/HandleSolanaTransactions.ts | 15 +- .../src/domain/entities/events.ts | 2 + .../src/domain/entities/solana.ts | 1 + .../src/infrastructure/config.ts | 3 + .../solana/solanaLogMessagePublishedMapper.ts | 7 + .../solana/solanaTransferRedeemedMapper.ts | 6 + .../repositories/InfluxEventRepository.ts | 152 ++++++++++++++++++ .../repositories/RepositoriesBuilder.ts | 33 +++- .../repositories/StaticJobRepository.ts | 14 +- .../src/infrastructure/repositories/index.ts | 1 + .../algorand/GetAlgorandTransactions.test.ts | 2 +- .../actions/algorand/PollAlgorand.test.ts | 2 +- .../aptos/GetAptosTransactions.test.ts | 2 +- .../GetAptosTransactionsByEvents.test.ts | 2 +- .../test/domain/actions/evm/PollEvm.test.ts | 2 +- .../domain/actions/sei/GetSeiRedeems.test.ts | 2 +- .../test/domain/actions/sei/PollSei.test.ts | 2 +- .../solana/PollSolanaTransactions.test.ts | 2 +- .../actions/sui/PollSuiTransactions.test.ts | 2 +- .../wormchain/GetWormchainLogs.test.ts | 2 +- .../wormchain/GetWormchainRedeems.test.ts | 2 +- .../actions/wormchain/PollWormchain.test.ts | 2 +- .../InfluxEventRepository.test.ts | 87 ++++++++++ .../repositories/RepositoriesBuilder.test.ts | 6 +- blockchain-watcher/test/mocks/configMock.ts | 9 ++ .../{wait-assertion.ts => waitAssertion.ts} | 0 30 files changed, 338 insertions(+), 35 deletions(-) create mode 100644 blockchain-watcher/src/infrastructure/repositories/InfluxEventRepository.ts create mode 100644 blockchain-watcher/test/infrastructure/repositories/InfluxEventRepository.test.ts rename blockchain-watcher/test/{wait-assertion.ts => waitAssertion.ts} (100%) diff --git a/blockchain-watcher/config/custom-environment-variables.json b/blockchain-watcher/config/custom-environment-variables.json index d92466c0..1fe38668 100644 --- a/blockchain-watcher/config/custom-environment-variables.json +++ b/blockchain-watcher/config/custom-environment-variables.json @@ -14,6 +14,9 @@ "topicArn": "SNS_TOPIC_ARN", "region": "SNS_REGION" }, + "influx": { + "token": "INFLUX_TOKEN" + }, "chains": { "solana": { "network": "SOLANA_NETWORK", diff --git a/blockchain-watcher/config/default.json b/blockchain-watcher/config/default.json index aaf3e97f..4360048b 100644 --- a/blockchain-watcher/config/default.json +++ b/blockchain-watcher/config/default.json @@ -10,6 +10,7 @@ "groupId": "blockchain-watcher", "subject": "blockchain-watcher" }, + "influx": null, "metadata": { "dir": "metadata-repo" }, diff --git a/blockchain-watcher/package-lock.json b/blockchain-watcher/package-lock.json index 1ce0fbe2..5b198e30 100644 --- a/blockchain-watcher/package-lock.json +++ b/blockchain-watcher/package-lock.json @@ -12,6 +12,7 @@ "@aws-sdk/client-sns": "^3.445.0", "@certusone/wormhole-sdk": "0.10.5", "@cosmjs/proto-signing": "^0.32.3", + "@influxdata/influxdb-client": "^1.33.2", "@mysten/sui.js": "^0.49.1", "@xlabs/rpc-pool": "^1.0.0", "algosdk": "^2.8.0", @@ -3014,6 +3015,11 @@ "google-protobuf": "^3.14.0" } }, + "node_modules/@influxdata/influxdb-client": { + "version": "1.33.2", + "resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.33.2.tgz", + "integrity": "sha512-RT5SxH+grHAazo/YK3UTuWK/frPWRM0N7vkrCUyqVprDgQzlLP+bSK4ak2Jv3QVF/pazTnsxWjvtKZdwskV5Xw==" + }, "node_modules/@injectivelabs/core-proto-ts": { "version": "0.0.14", "resolved": "https://registry.npmjs.org/@injectivelabs/core-proto-ts/-/core-proto-ts-0.0.14.tgz", diff --git a/blockchain-watcher/package.json b/blockchain-watcher/package.json index 5536d962..e179e1ef 100644 --- a/blockchain-watcher/package.json +++ b/blockchain-watcher/package.json @@ -21,6 +21,7 @@ "dependencies": { "@aws-sdk/client-sns": "^3.445.0", "@certusone/wormhole-sdk": "0.10.5", + "@influxdata/influxdb-client": "^1.33.2", "@cosmjs/proto-signing": "^0.32.3", "@mysten/sui.js": "^0.49.1", "@xlabs/rpc-pool": "^1.0.0", @@ -86,7 +87,7 @@ "coverageDirectory": "./coverage", "coverageThreshold": { "global": { - "lines": 74 + "lines": 79 } } }, diff --git a/blockchain-watcher/src/domain/actions/solana/HandleSolanaTransactions.ts b/blockchain-watcher/src/domain/actions/solana/HandleSolanaTransactions.ts index e6ffbf4b..b7581863 100644 --- a/blockchain-watcher/src/domain/actions/solana/HandleSolanaTransactions.ts +++ b/blockchain-watcher/src/domain/actions/solana/HandleSolanaTransactions.ts @@ -25,19 +25,8 @@ export class HandleSolanaTransactions { } public async handle(txs: solana.Transaction[]): Promise { - const filteredItems = txs.filter((tx) => { - const hasError = tx.meta?.err; - if (hasError) - this.logger.warn( - `Ignoring tx for program ${this.cfg.programId} in ${tx.slot} has error: ${JSON.stringify( - tx.meta?.err - )}` - ); - return !hasError; - }); - let mappedItems: T[] = []; - for (const tx of filteredItems) { + for (const tx of txs) { const result = await this.mapper(tx, this.cfg); if (result.length) { const txs = result as TransactionFoundEvent[]; @@ -58,6 +47,8 @@ export class HandleSolanaTransactions { } private report(protocol: string) { + if (!this.cfg.metricName) return; + const labels = { job: this.cfg.id, chain: this.cfg.chain ?? "", diff --git a/blockchain-watcher/src/domain/entities/events.ts b/blockchain-watcher/src/domain/entities/events.ts index a73366ff..6a2f1f0a 100644 --- a/blockchain-watcher/src/domain/entities/events.ts +++ b/blockchain-watcher/src/domain/entities/events.ts @@ -4,8 +4,10 @@ export type LogFoundEvent = { chainId: number; txHash: string; blockHeight: bigint; + /* value is in seconds */ blockTime: number; attributes: T; + tags?: Record; }; export type LogMessagePublished = { diff --git a/blockchain-watcher/src/domain/entities/solana.ts b/blockchain-watcher/src/domain/entities/solana.ts index b3ce0d0b..3ac82403 100644 --- a/blockchain-watcher/src/domain/entities/solana.ts +++ b/blockchain-watcher/src/domain/entities/solana.ts @@ -8,6 +8,7 @@ export type Transaction = { }; meta?: { innerInstructions?: CompiledInnerInstruction[] | null; + logMessages?: string[] | null; fee: number; err?: {} | string | null; }; diff --git a/blockchain-watcher/src/infrastructure/config.ts b/blockchain-watcher/src/infrastructure/config.ts index ee565efe..1f22815f 100644 --- a/blockchain-watcher/src/infrastructure/config.ts +++ b/blockchain-watcher/src/infrastructure/config.ts @@ -1,5 +1,6 @@ import config from "config"; import { SnsConfig } from "./repositories/SnsEventRepository"; +import { InfluxConfig } from "./repositories/InfluxEventRepository"; export type Environment = "testnet" | "mainnet"; @@ -11,6 +12,7 @@ export type Config = { logLevel: LogLevel; dryRun: boolean; sns: SnsConfig; + influx?: InfluxConfig; metadata?: { dir: string; }; @@ -48,6 +50,7 @@ export const configuration = { logLevel: config.get("logLevel")?.toLowerCase() ?? "info", dryRun: config.get("dryRun") === "true" ? true : false, sns: config.get("sns"), + influx: config.get("influx"), metadata: { dir: config.get("metadata.dir"), }, diff --git a/blockchain-watcher/src/infrastructure/mappers/solana/solanaLogMessagePublishedMapper.ts b/blockchain-watcher/src/infrastructure/mappers/solana/solanaLogMessagePublishedMapper.ts index 62555dfb..1dbe5bb9 100644 --- a/blockchain-watcher/src/infrastructure/mappers/solana/solanaLogMessagePublishedMapper.ts +++ b/blockchain-watcher/src/infrastructure/mappers/solana/solanaLogMessagePublishedMapper.ts @@ -21,6 +21,13 @@ export const solanaLogMessagePublishedMapper = async ( ); } + if (tx.meta?.err) { + logger.info( + `[solana] Ignoring tx ${tx.transaction.signatures[0]} because it failed: ${tx.meta.err}` + ); + return []; + } + const message = tx.transaction.message; const accountKeys = message.accountKeys; const programIdIndex = accountKeys.findIndex((i) => i === programId); diff --git a/blockchain-watcher/src/infrastructure/mappers/solana/solanaTransferRedeemedMapper.ts b/blockchain-watcher/src/infrastructure/mappers/solana/solanaTransferRedeemedMapper.ts index f1fc4ffe..0ad48d77 100644 --- a/blockchain-watcher/src/infrastructure/mappers/solana/solanaTransferRedeemedMapper.ts +++ b/blockchain-watcher/src/infrastructure/mappers/solana/solanaTransferRedeemedMapper.ts @@ -43,6 +43,12 @@ const processProgram = async ( `[${chain}]Block time is missing for tx ${transaction?.transaction?.signatures} in slot ${transaction?.slot}` ); } + if (transaction.meta?.err) { + logger.info( + `[${chain}] Ignoring tx ${transaction.transaction.signatures[0]} because it failed: ${transaction.meta.err}` + ); + return []; + } const message = transaction.transaction.message; const accountKeys = message.accountKeys; diff --git a/blockchain-watcher/src/infrastructure/repositories/InfluxEventRepository.ts b/blockchain-watcher/src/infrastructure/repositories/InfluxEventRepository.ts new file mode 100644 index 00000000..89c38953 --- /dev/null +++ b/blockchain-watcher/src/infrastructure/repositories/InfluxEventRepository.ts @@ -0,0 +1,152 @@ +import { LogFoundEvent } from "../../domain/entities"; +import winston from "../log"; +import { InfluxDB, Point, convertTimeToNanos } from "@influxdata/influxdb-client"; + +export class InfluxEventRepository { + private client: InfluxDB; + private cfg: InfluxConfig; + private logger: winston.Logger; + + constructor(client: InfluxDB, cfg: InfluxConfig) { + this.client = client; + this.cfg = cfg; + this.logger = winston.child({ module: "InfluxEventRepository" }); + this.logger.info(`Created for bucket ${cfg.bucket}`); + } + + async publish(events: LogFoundEvent[]): Promise { + if (!events.length) { + this.logger.debug("[publish] No events to publish, continuing..."); + return { + status: "success", + }; + } + + const timestamps: Record = {}; + const inputs: Point[] = []; + try { + events.map(InfluxPoint.fromLogFoundEvent).forEach((influxPoint) => { + if (timestamps[influxPoint.timestamp]) { + // see https://docs.influxdata.com/influxdb/v2/write-data/best-practices/duplicate-points/ + while (timestamps[influxPoint.timestamp]) { + influxPoint.timestamp = `${BigInt(influxPoint.timestamp) + BigInt(1)}`; + } + } + timestamps[influxPoint.timestamp] = true; + + const point = new Point(influxPoint.measurement).timestamp(influxPoint.timestamp); + + for (const [k, v] of influxPoint.getTags()) { + point.tag(k, v); + } + + for (const [k, v] of influxPoint.getFields()) { + if (typeof v === "object" || Array.isArray(v)) { + throw new Error(`Unsupported field type for ${k}: ${typeof v}`); + } + + if (typeof v === "number") { + point.intField(k, v); + } else if (typeof v === "boolean") { + point.booleanField(k, v); + } else { + point.stringField(k, v); + } + } + + inputs.push(point); + }); + } catch (error: Error | unknown) { + this.logger.error(`[publish] Failed to build points: ${error}`); + + return { + status: "error", + reason: error instanceof Error ? error.message : "failed to build points", + }; + } + + try { + const writeApi = this.client.getWriteApi(this.cfg.org, this.cfg.bucket, "ns"); + writeApi.writePoints(inputs); + await writeApi.close(); + } catch (error: unknown) { + this.logger.error(`[publish] ${error}`); + + return { + status: "error", + }; + } + + this.logger.info(`[publish] Published ${events.length} points to Influx`); + return { + status: "success", + }; + } + + async asTarget(): Promise<(events: LogFoundEvent[]) => Promise> { + return async (events: LogFoundEvent[]) => { + const result = await this.publish(events); + if (result.status === "error") { + this.logger.error( + `[asTarget] Error publishing events to Influx: ${result.reason ?? result.reasons}` + ); + throw new Error(`Error publishing events to Influx: ${result.reason}`); + } + }; + } +} + +class InfluxPoint { + constructor( + public measurement: string, + public source: string, + public timestamp: string, // in nanoseconds + public version: string, + public fields: Record, + public tags: Record = {} + ) {} + + static fromLogFoundEvent( + logFoundEvent: LogFoundEvent + ): InfluxPoint { + const ts = convertTimeToNanos(new Date(logFoundEvent.blockTime * 1000)); + if (!ts) { + throw new Error(`Invalid timestamp ${logFoundEvent.blockTime}`); + } + + return new InfluxPoint( + logFoundEvent.name, + "blockchain-watcher", + ts, + "1", + logFoundEvent.attributes, + logFoundEvent.tags + ); + } + + getTags() { + return Object.entries(this.tags); + } + + getFields() { + return Object.entries(this.fields); + } +} + +export type InfluxPointData = { + tags: Record; + fields: Record; +}; + +export type InfluxConfig = { + bucket: string; + org: string; + token: string; + url: string; +}; + +export type InfluxPublishResult = { + status: "success" | "error"; + reason?: string; + reasons?: string[]; +}; diff --git a/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts b/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts index 601e6bd1..6d16d60c 100644 --- a/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts +++ b/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts @@ -42,6 +42,8 @@ import { ProviderPool, RpcConfig, } from "@xlabs/rpc-pool"; +import { InfluxEventRepository } from "./InfluxEventRepository"; +import { InfluxDB } from "@influxdata/influxdb-client"; const WORMCHAIN_CHAIN = "wormchain"; const ALGORAND_CHAIN = "algorand"; @@ -92,7 +94,17 @@ export class RepositoriesBuilder { private build(): void { this.snsClient = this.createSnsClient(); this.repositories.set("sns", new SnsEventRepository(this.snsClient, this.cfg.sns)); - + this.cfg.influx && + this.repositories.set( + "infux", + new InfluxEventRepository( + new InfluxDB({ + url: this.cfg.influx.url, + token: this.cfg.influx.token, + }), + this.cfg.influx + ) + ); this.cfg.metadata?.dir && this.repositories.set("metadata", new FileMetadataRepository(this.cfg.metadata.dir)); @@ -119,6 +131,7 @@ export class RepositoriesBuilder { metadataRepo: this.getMetadataRepository(), statsRepo: this.getStatsRepository(), snsRepo: this.getSnsEventRepository(), + influxRepo: this.getInfluxEventRepository(), solanaSlotRepo: this.getSolanaSlotRepository(), suiRepo: this.getSuiRepository(), aptosRepo: this.getAptosRepository(), @@ -137,8 +150,22 @@ export class RepositoriesBuilder { return this.getRepo(instanceRepoName); } - public getSnsEventRepository(): SnsEventRepository { - return this.getRepo("sns"); + public getSnsEventRepository(): SnsEventRepository | undefined { + try { + const sns = this.getRepo("sns"); + return sns; + } catch (e) { + return; + } + } + + public getInfluxEventRepository(): InfluxEventRepository | undefined { + try { + const influx = this.getRepo("infux"); + return influx; + } catch (e) { + return; + } } public getMetadataRepository(): FileMetadataRepository { diff --git a/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts b/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts index 55ec4503..0628c099 100644 --- a/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts +++ b/blockchain-watcher/src/infrastructure/repositories/StaticJobRepository.ts @@ -65,6 +65,7 @@ import { PollAlgorandConfig, PollAlgorand, } from "../../domain/actions/algorand/PollAlgorand"; +import { InfluxEventRepository } from "./InfluxEventRepository"; export class StaticJobRepository implements JobRepository { private fileRepo: FileMetadataRepository; @@ -78,7 +79,8 @@ export class StaticJobRepository implements JobRepository { private evmRepo: (chain: string) => EvmBlockRepository; private metadataRepo: MetadataRepository; private statsRepo: StatRepository; - private snsRepo: SnsEventRepository; + private snsRepo?: SnsEventRepository; + private influxRepo?: InfluxEventRepository; private solanaSlotRepo: SolanaSlotRepository; private suiRepo: SuiRepository; private aptosRepo: AptosRepository; @@ -94,7 +96,8 @@ export class StaticJobRepository implements JobRepository { repos: { metadataRepo: MetadataRepository; statsRepo: StatRepository; - snsRepo: SnsEventRepository; + snsRepo?: SnsEventRepository; + influxRepo?: InfluxEventRepository; solanaSlotRepo: SolanaSlotRepository; suiRepo: SuiRepository; aptosRepo: AptosRepository; @@ -108,6 +111,7 @@ export class StaticJobRepository implements JobRepository { this.metadataRepo = repos.metadataRepo; this.statsRepo = repos.statsRepo; this.snsRepo = repos.snsRepo; + this.influxRepo = repos.influxRepo; this.solanaSlotRepo = repos.solanaSlotRepo; this.suiRepo = repos.suiRepo; this.aptosRepo = repos.aptosRepo; @@ -272,12 +276,14 @@ export class StaticJobRepository implements JobRepository { } private loadTargets(): void { - const snsTarget = () => this.snsRepo.asTarget(); + const snsTarget = () => this.snsRepo!.asTarget(); + const influxTarget = () => this.influxRepo!.asTarget(); const dummyTarget = async () => async (events: any[]) => { log.info(`[target dummy] Got ${events.length} events`); }; - this.targets.set("sns", snsTarget); + this.snsRepo && this.targets.set("sns", snsTarget); + this.influxRepo && this.targets.set("influx", influxTarget); this.targets.set("dummy", dummyTarget); } diff --git a/blockchain-watcher/src/infrastructure/repositories/index.ts b/blockchain-watcher/src/infrastructure/repositories/index.ts index 80650d07..9854ec94 100644 --- a/blockchain-watcher/src/infrastructure/repositories/index.ts +++ b/blockchain-watcher/src/infrastructure/repositories/index.ts @@ -9,6 +9,7 @@ if (!("toJSON" in BigInt.prototype)) { export * from "./FileMetadataRepository"; export * from "./SnsEventRepository"; +export * from "./InfluxEventRepository"; export * from "./evm/EvmJsonRPCBlockRepository"; export * from "./evm/BscEvmJsonRPCBlockRepository"; export * from "./evm/ArbitrumEvmJsonRPCBlockRepository"; diff --git a/blockchain-watcher/test/domain/actions/algorand/GetAlgorandTransactions.test.ts b/blockchain-watcher/test/domain/actions/algorand/GetAlgorandTransactions.test.ts index f2fa5ecf..7579aca3 100644 --- a/blockchain-watcher/test/domain/actions/algorand/GetAlgorandTransactions.test.ts +++ b/blockchain-watcher/test/domain/actions/algorand/GetAlgorandTransactions.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, it, expect, jest } from "@jest/globals"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { AlgorandTransaction } from "../../../../src/domain/entities/algorand"; import { PollAlgorandMetadata, diff --git a/blockchain-watcher/test/domain/actions/algorand/PollAlgorand.test.ts b/blockchain-watcher/test/domain/actions/algorand/PollAlgorand.test.ts index 7180deba..e831d156 100644 --- a/blockchain-watcher/test/domain/actions/algorand/PollAlgorand.test.ts +++ b/blockchain-watcher/test/domain/actions/algorand/PollAlgorand.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, it, expect, jest } from "@jest/globals"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { AlgorandTransaction } from "../../../../src/domain/entities/algorand"; import { PollAlgorandMetadata, diff --git a/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactions.test.ts b/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactions.test.ts index dc207415..c72bd2e4 100644 --- a/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactions.test.ts +++ b/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactions.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, it, expect, jest } from "@jest/globals"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { AptosTransaction } from "../../../../src/domain/entities/aptos"; import { PollAptosTransactionsMetadata, diff --git a/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactionsByEvents.test.ts b/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactionsByEvents.test.ts index a45a0d2e..3f82f777 100644 --- a/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactionsByEvents.test.ts +++ b/blockchain-watcher/test/domain/actions/aptos/GetAptosTransactionsByEvents.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, it, expect, jest } from "@jest/globals"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { AptosTransaction } from "../../../../src/domain/entities/aptos"; import { PollAptosTransactionsMetadata, diff --git a/blockchain-watcher/test/domain/actions/evm/PollEvm.test.ts b/blockchain-watcher/test/domain/actions/evm/PollEvm.test.ts index 871cb197..dd2f755c 100644 --- a/blockchain-watcher/test/domain/actions/evm/PollEvm.test.ts +++ b/blockchain-watcher/test/domain/actions/evm/PollEvm.test.ts @@ -6,7 +6,7 @@ import { StatRepository, } from "../../../../src/domain/repositories"; import { EvmBlock, EvmLog, ReceiptTransaction } from "../../../../src/domain/entities"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; let cfg = PollEvmLogsConfig.fromBlock("acala", 0n); diff --git a/blockchain-watcher/test/domain/actions/sei/GetSeiRedeems.test.ts b/blockchain-watcher/test/domain/actions/sei/GetSeiRedeems.test.ts index 905a3bf4..85fd86eb 100644 --- a/blockchain-watcher/test/domain/actions/sei/GetSeiRedeems.test.ts +++ b/blockchain-watcher/test/domain/actions/sei/GetSeiRedeems.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, it, expect, jest } from "@jest/globals"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { SeiRedeem } from "../../../../src/domain/entities/sei"; import { PollSeiMetadata, diff --git a/blockchain-watcher/test/domain/actions/sei/PollSei.test.ts b/blockchain-watcher/test/domain/actions/sei/PollSei.test.ts index 0d186737..1948265d 100644 --- a/blockchain-watcher/test/domain/actions/sei/PollSei.test.ts +++ b/blockchain-watcher/test/domain/actions/sei/PollSei.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, it, expect, jest } from "@jest/globals"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { SeiRedeem } from "../../../../src/domain/entities/sei"; import { MetadataRepository, diff --git a/blockchain-watcher/test/domain/actions/solana/PollSolanaTransactions.test.ts b/blockchain-watcher/test/domain/actions/solana/PollSolanaTransactions.test.ts index a406224b..0870437b 100644 --- a/blockchain-watcher/test/domain/actions/solana/PollSolanaTransactions.test.ts +++ b/blockchain-watcher/test/domain/actions/solana/PollSolanaTransactions.test.ts @@ -9,7 +9,7 @@ import { SolanaSlotRepository, StatRepository, } from "../../../../src/domain/repositories"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { solana } from "../../../../src/domain/entities"; import { Fallible } from "../../../../src/domain/errors"; diff --git a/blockchain-watcher/test/domain/actions/sui/PollSuiTransactions.test.ts b/blockchain-watcher/test/domain/actions/sui/PollSuiTransactions.test.ts index 37a2b5c1..ded594fa 100644 --- a/blockchain-watcher/test/domain/actions/sui/PollSuiTransactions.test.ts +++ b/blockchain-watcher/test/domain/actions/sui/PollSuiTransactions.test.ts @@ -14,7 +14,7 @@ import { SuiRepository, } from "../../../../src/domain/repositories"; import { mockMetadataRepository, mockStatsRepository } from "../../../mocks/repos"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { SuiTransactionBlockReceipt } from "../../../../src/domain/entities/sui"; let statsRepo: StatRepository; diff --git a/blockchain-watcher/test/domain/actions/wormchain/GetWormchainLogs.test.ts b/blockchain-watcher/test/domain/actions/wormchain/GetWormchainLogs.test.ts index da08ad80..49b9ff3f 100644 --- a/blockchain-watcher/test/domain/actions/wormchain/GetWormchainLogs.test.ts +++ b/blockchain-watcher/test/domain/actions/wormchain/GetWormchainLogs.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, it, expect, jest } from "@jest/globals"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { CosmosTransaction } from "../../../../src/domain/entities/wormchain"; import { PollWormchainLogsMetadata, diff --git a/blockchain-watcher/test/domain/actions/wormchain/GetWormchainRedeems.test.ts b/blockchain-watcher/test/domain/actions/wormchain/GetWormchainRedeems.test.ts index 6c3f6a78..f9398365 100644 --- a/blockchain-watcher/test/domain/actions/wormchain/GetWormchainRedeems.test.ts +++ b/blockchain-watcher/test/domain/actions/wormchain/GetWormchainRedeems.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, it, expect, jest } from "@jest/globals"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { CosmosTransaction } from "../../../../src/domain/entities/wormchain"; import { PollWormchainLogsMetadata, diff --git a/blockchain-watcher/test/domain/actions/wormchain/PollWormchain.test.ts b/blockchain-watcher/test/domain/actions/wormchain/PollWormchain.test.ts index 41124dde..93de4ade 100644 --- a/blockchain-watcher/test/domain/actions/wormchain/PollWormchain.test.ts +++ b/blockchain-watcher/test/domain/actions/wormchain/PollWormchain.test.ts @@ -1,5 +1,5 @@ import { afterEach, describe, it, expect, jest } from "@jest/globals"; -import { thenWaitForAssertion } from "../../../wait-assertion"; +import { thenWaitForAssertion } from "../../../waitAssertion"; import { WormchainBlockLogs } from "../../../../src/domain/entities/wormchain"; import { PollWormchainLogsMetadata, diff --git a/blockchain-watcher/test/infrastructure/repositories/InfluxEventRepository.test.ts b/blockchain-watcher/test/infrastructure/repositories/InfluxEventRepository.test.ts new file mode 100644 index 00000000..73256678 --- /dev/null +++ b/blockchain-watcher/test/infrastructure/repositories/InfluxEventRepository.test.ts @@ -0,0 +1,87 @@ +import { describe, expect, it, jest } from "@jest/globals"; +import { + InfluxConfig, + InfluxEventRepository, +} from "../../../src/infrastructure/repositories/InfluxEventRepository"; +import { InfluxDB, WriteApi } from "@influxdata/influxdb-client"; + +let eventRepository: InfluxEventRepository; +let influxClient: InfluxDB; +let influxWriteApi: WriteApi; +let config: InfluxConfig; + +describe("InfluxEventRepository", () => { + it("should not call influx client when no events given", async () => { + givenInfluxEventRepository(); + + const result = await eventRepository.publish([]); + + expect(result).toEqual({ status: "success" }); + expect(influxWriteApi.writePoints).not.toHaveBeenCalled(); + }); + + it("should publish", async () => { + givenInfluxEventRepository(); + + const result = await eventRepository.publish([ + { + chainId: 1, + address: "0x123456", + txHash: "0x123", + blockHeight: 123n, + blockTime: 0, + name: "LogMessagePublished", + attributes: { + sequence: 1, + }, + tags: { + sender: "0x123456", + }, + }, + ]); + + expect(result).toEqual({ status: "success" }); + expect(influxWriteApi.writePoints).toHaveBeenCalledTimes(1); + }); + + it("should fail to publish unsupported attributes", async () => { + givenInfluxEventRepository(); + + const result = await eventRepository.publish([ + { + chainId: 1, + address: "0x123456", + txHash: "0x123", + blockHeight: 123n, + blockTime: 0, + name: "LogMessagePublished", + attributes: { + sequences: { sequence: 1 }, + }, + }, + ]); + + expect(result).toEqual({ + status: "error", + reason: "Unsupported field type for sequences: object", + }); + expect(influxWriteApi.writePoints).toHaveBeenCalledTimes(0); + }); +}); + +const givenInfluxEventRepository = () => { + config = { + url: "http://localhost:8086", + token: "my-token", + org: "my-org", + bucket: "my-bucket", + }; + influxWriteApi = { + writePoints: jest.fn(() => {}), + close: jest.fn(), + } as unknown as WriteApi; + influxClient = { + getWriteApi: () => influxWriteApi, + } as unknown as InfluxDB; + eventRepository = new InfluxEventRepository(influxClient, config); +}; diff --git a/blockchain-watcher/test/infrastructure/repositories/RepositoriesBuilder.test.ts b/blockchain-watcher/test/infrastructure/repositories/RepositoriesBuilder.test.ts index 88064bc7..6ea92aa7 100644 --- a/blockchain-watcher/test/infrastructure/repositories/RepositoriesBuilder.test.ts +++ b/blockchain-watcher/test/infrastructure/repositories/RepositoriesBuilder.test.ts @@ -18,7 +18,7 @@ import { } from "../../../src/infrastructure/repositories"; describe("RepositoriesBuilder", () => { - it("should be throw error because dose not have any chain", async () => { + it("should throw error because does not have any chain", async () => { try { // When new RepositoriesBuilder(configMock()); @@ -28,7 +28,7 @@ describe("RepositoriesBuilder", () => { } }); - it("should be throw error because dose not support test chain", async () => { + it("should throw error because dose not support test chain", async () => { try { // When new RepositoriesBuilder(configMock()); @@ -38,7 +38,7 @@ describe("RepositoriesBuilder", () => { } }); - it("should be return all repositories instances", async () => { + it("should return all repositories instances", async () => { // When const repos = new RepositoriesBuilder(configMock()); // Then diff --git a/blockchain-watcher/test/mocks/configMock.ts b/blockchain-watcher/test/mocks/configMock.ts index 7a5d630e..a1196ab3 100644 --- a/blockchain-watcher/test/mocks/configMock.ts +++ b/blockchain-watcher/test/mocks/configMock.ts @@ -1,5 +1,6 @@ import { Config, ChainRPCConfig } from "../../src/infrastructure/config"; import { SnsConfig } from "../../src/infrastructure/repositories"; +import { InfluxConfig } from "../../src/infrastructure/repositories/InfluxEventRepository"; export const configMock = (): Config => { const chainsRecord: Record = { @@ -248,12 +249,20 @@ export const configMock = (): Config => { }, }; + const influxConfig: InfluxConfig = { + url: "http://localhost", + token: "aToken", + org: "anOrg", + bucket: "aBucket", + }; + const cfg: Config = { environment: "testnet", port: 999, logLevel: "info", dryRun: false, sns: snsConfig, + influx: influxConfig, metadata: { dir: "./metadata-repo/jobs", }, diff --git a/blockchain-watcher/test/wait-assertion.ts b/blockchain-watcher/test/waitAssertion.ts similarity index 100% rename from blockchain-watcher/test/wait-assertion.ts rename to blockchain-watcher/test/waitAssertion.ts