[blockchain watcher] add influxdb as supported target (#1539)

* make targets optional + fix tests

* move filtering errors to solana mappers

* increase coverage requirement

* add influx unit test

* not allowing objects or arrays to be part of influx points
This commit is contained in:
Matías Martínez 2024-07-09 13:54:17 -03:00 committed by GitHub
parent f53b030465
commit 752f6117f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 338 additions and 35 deletions

View File

@ -14,6 +14,9 @@
"topicArn": "SNS_TOPIC_ARN",
"region": "SNS_REGION"
},
"influx": {
"token": "INFLUX_TOKEN"
},
"chains": {
"solana": {
"network": "SOLANA_NETWORK",

View File

@ -10,6 +10,7 @@
"groupId": "blockchain-watcher",
"subject": "blockchain-watcher"
},
"influx": null,
"metadata": {
"dir": "metadata-repo"
},

View File

@ -12,6 +12,7 @@
"@aws-sdk/client-sns": "^3.445.0",
"@certusone/wormhole-sdk": "0.10.5",
"@cosmjs/proto-signing": "^0.32.3",
"@influxdata/influxdb-client": "^1.33.2",
"@mysten/sui.js": "^0.49.1",
"@xlabs/rpc-pool": "^1.0.0",
"algosdk": "^2.8.0",
@ -3014,6 +3015,11 @@
"google-protobuf": "^3.14.0"
}
},
"node_modules/@influxdata/influxdb-client": {
"version": "1.33.2",
"resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.33.2.tgz",
"integrity": "sha512-RT5SxH+grHAazo/YK3UTuWK/frPWRM0N7vkrCUyqVprDgQzlLP+bSK4ak2Jv3QVF/pazTnsxWjvtKZdwskV5Xw=="
},
"node_modules/@injectivelabs/core-proto-ts": {
"version": "0.0.14",
"resolved": "https://registry.npmjs.org/@injectivelabs/core-proto-ts/-/core-proto-ts-0.0.14.tgz",

View File

@ -21,6 +21,7 @@
"dependencies": {
"@aws-sdk/client-sns": "^3.445.0",
"@certusone/wormhole-sdk": "0.10.5",
"@influxdata/influxdb-client": "^1.33.2",
"@cosmjs/proto-signing": "^0.32.3",
"@mysten/sui.js": "^0.49.1",
"@xlabs/rpc-pool": "^1.0.0",
@ -86,7 +87,7 @@
"coverageDirectory": "./coverage",
"coverageThreshold": {
"global": {
"lines": 74
"lines": 79
}
}
},

View File

@ -25,19 +25,8 @@ export class HandleSolanaTransactions<T> {
}
public async handle(txs: solana.Transaction[]): Promise<T[]> {
const filteredItems = txs.filter((tx) => {
const hasError = tx.meta?.err;
if (hasError)
this.logger.warn(
`Ignoring tx for program ${this.cfg.programId} in ${tx.slot} has error: ${JSON.stringify(
tx.meta?.err
)}`
);
return !hasError;
});
let mappedItems: T[] = [];
for (const tx of filteredItems) {
for (const tx of txs) {
const result = await this.mapper(tx, this.cfg);
if (result.length) {
const txs = result as TransactionFoundEvent<InstructionFound>[];
@ -58,6 +47,8 @@ export class HandleSolanaTransactions<T> {
}
private report(protocol: string) {
if (!this.cfg.metricName) return;
const labels = {
job: this.cfg.id,
chain: this.cfg.chain ?? "",

View File

@ -4,8 +4,10 @@ export type LogFoundEvent<T> = {
chainId: number;
txHash: string;
blockHeight: bigint;
/* value is in seconds */
blockTime: number;
attributes: T;
tags?: Record<string, string>;
};
export type LogMessagePublished = {

View File

@ -8,6 +8,7 @@ export type Transaction = {
};
meta?: {
innerInstructions?: CompiledInnerInstruction[] | null;
logMessages?: string[] | null;
fee: number;
err?: {} | string | null;
};

View File

@ -1,5 +1,6 @@
import config from "config";
import { SnsConfig } from "./repositories/SnsEventRepository";
import { InfluxConfig } from "./repositories/InfluxEventRepository";
export type Environment = "testnet" | "mainnet";
@ -11,6 +12,7 @@ export type Config = {
logLevel: LogLevel;
dryRun: boolean;
sns: SnsConfig;
influx?: InfluxConfig;
metadata?: {
dir: string;
};
@ -48,6 +50,7 @@ export const configuration = {
logLevel: config.get<string>("logLevel")?.toLowerCase() ?? "info",
dryRun: config.get<string>("dryRun") === "true" ? true : false,
sns: config.get<SnsConfig>("sns"),
influx: config.get<InfluxConfig>("influx"),
metadata: {
dir: config.get<string>("metadata.dir"),
},

View File

@ -21,6 +21,13 @@ export const solanaLogMessagePublishedMapper = async (
);
}
if (tx.meta?.err) {
logger.info(
`[solana] Ignoring tx ${tx.transaction.signatures[0]} because it failed: ${tx.meta.err}`
);
return [];
}
const message = tx.transaction.message;
const accountKeys = message.accountKeys;
const programIdIndex = accountKeys.findIndex((i) => i === programId);

View File

@ -43,6 +43,12 @@ const processProgram = async (
`[${chain}]Block time is missing for tx ${transaction?.transaction?.signatures} in slot ${transaction?.slot}`
);
}
if (transaction.meta?.err) {
logger.info(
`[${chain}] Ignoring tx ${transaction.transaction.signatures[0]} because it failed: ${transaction.meta.err}`
);
return [];
}
const message = transaction.transaction.message;
const accountKeys = message.accountKeys;

View File

@ -0,0 +1,152 @@
import { LogFoundEvent } from "../../domain/entities";
import winston from "../log";
import { InfluxDB, Point, convertTimeToNanos } from "@influxdata/influxdb-client";
export class InfluxEventRepository {
private client: InfluxDB;
private cfg: InfluxConfig;
private logger: winston.Logger;
constructor(client: InfluxDB, cfg: InfluxConfig) {
this.client = client;
this.cfg = cfg;
this.logger = winston.child({ module: "InfluxEventRepository" });
this.logger.info(`Created for bucket ${cfg.bucket}`);
}
async publish(events: LogFoundEvent<any>[]): Promise<InfluxPublishResult> {
if (!events.length) {
this.logger.debug("[publish] No events to publish, continuing...");
return {
status: "success",
};
}
const timestamps: Record<string, boolean> = {};
const inputs: Point[] = [];
try {
events.map(InfluxPoint.fromLogFoundEvent).forEach((influxPoint) => {
if (timestamps[influxPoint.timestamp]) {
// see https://docs.influxdata.com/influxdb/v2/write-data/best-practices/duplicate-points/
while (timestamps[influxPoint.timestamp]) {
influxPoint.timestamp = `${BigInt(influxPoint.timestamp) + BigInt(1)}`;
}
}
timestamps[influxPoint.timestamp] = true;
const point = new Point(influxPoint.measurement).timestamp(influxPoint.timestamp);
for (const [k, v] of influxPoint.getTags()) {
point.tag(k, v);
}
for (const [k, v] of influxPoint.getFields()) {
if (typeof v === "object" || Array.isArray(v)) {
throw new Error(`Unsupported field type for ${k}: ${typeof v}`);
}
if (typeof v === "number") {
point.intField(k, v);
} else if (typeof v === "boolean") {
point.booleanField(k, v);
} else {
point.stringField(k, v);
}
}
inputs.push(point);
});
} catch (error: Error | unknown) {
this.logger.error(`[publish] Failed to build points: ${error}`);
return {
status: "error",
reason: error instanceof Error ? error.message : "failed to build points",
};
}
try {
const writeApi = this.client.getWriteApi(this.cfg.org, this.cfg.bucket, "ns");
writeApi.writePoints(inputs);
await writeApi.close();
} catch (error: unknown) {
this.logger.error(`[publish] ${error}`);
return {
status: "error",
};
}
this.logger.info(`[publish] Published ${events.length} points to Influx`);
return {
status: "success",
};
}
async asTarget(): Promise<(events: LogFoundEvent<any>[]) => Promise<void>> {
return async (events: LogFoundEvent<any>[]) => {
const result = await this.publish(events);
if (result.status === "error") {
this.logger.error(
`[asTarget] Error publishing events to Influx: ${result.reason ?? result.reasons}`
);
throw new Error(`Error publishing events to Influx: ${result.reason}`);
}
};
}
}
class InfluxPoint {
constructor(
public measurement: string,
public source: string,
public timestamp: string, // in nanoseconds
public version: string,
public fields: Record<string, any>,
public tags: Record<string, string> = {}
) {}
static fromLogFoundEvent<T extends InfluxPointData>(
logFoundEvent: LogFoundEvent<T>
): InfluxPoint {
const ts = convertTimeToNanos(new Date(logFoundEvent.blockTime * 1000));
if (!ts) {
throw new Error(`Invalid timestamp ${logFoundEvent.blockTime}`);
}
return new InfluxPoint(
logFoundEvent.name,
"blockchain-watcher",
ts,
"1",
logFoundEvent.attributes,
logFoundEvent.tags
);
}
getTags() {
return Object.entries(this.tags);
}
getFields() {
return Object.entries(this.fields);
}
}
export type InfluxPointData = {
tags: Record<string, string>;
fields: Record<string, any>;
};
export type InfluxConfig = {
bucket: string;
org: string;
token: string;
url: string;
};
export type InfluxPublishResult = {
status: "success" | "error";
reason?: string;
reasons?: string[];
};

View File

@ -42,6 +42,8 @@ import {
ProviderPool,
RpcConfig,
} from "@xlabs/rpc-pool";
import { InfluxEventRepository } from "./InfluxEventRepository";
import { InfluxDB } from "@influxdata/influxdb-client";
const WORMCHAIN_CHAIN = "wormchain";
const ALGORAND_CHAIN = "algorand";
@ -92,7 +94,17 @@ export class RepositoriesBuilder {
private build(): void {
this.snsClient = this.createSnsClient();
this.repositories.set("sns", new SnsEventRepository(this.snsClient, this.cfg.sns));
this.cfg.influx &&
this.repositories.set(
"infux",
new InfluxEventRepository(
new InfluxDB({
url: this.cfg.influx.url,
token: this.cfg.influx.token,
}),
this.cfg.influx
)
);
this.cfg.metadata?.dir &&
this.repositories.set("metadata", new FileMetadataRepository(this.cfg.metadata.dir));
@ -119,6 +131,7 @@ export class RepositoriesBuilder {
metadataRepo: this.getMetadataRepository(),
statsRepo: this.getStatsRepository(),
snsRepo: this.getSnsEventRepository(),
influxRepo: this.getInfluxEventRepository(),
solanaSlotRepo: this.getSolanaSlotRepository(),
suiRepo: this.getSuiRepository(),
aptosRepo: this.getAptosRepository(),
@ -137,8 +150,22 @@ export class RepositoriesBuilder {
return this.getRepo(instanceRepoName);
}
public getSnsEventRepository(): SnsEventRepository {
return this.getRepo("sns");
public getSnsEventRepository(): SnsEventRepository | undefined {
try {
const sns = this.getRepo("sns");
return sns;
} catch (e) {
return;
}
}
public getInfluxEventRepository(): InfluxEventRepository | undefined {
try {
const influx = this.getRepo("infux");
return influx;
} catch (e) {
return;
}
}
public getMetadataRepository(): FileMetadataRepository {

View File

@ -65,6 +65,7 @@ import {
PollAlgorandConfig,
PollAlgorand,
} from "../../domain/actions/algorand/PollAlgorand";
import { InfluxEventRepository } from "./InfluxEventRepository";
export class StaticJobRepository implements JobRepository {
private fileRepo: FileMetadataRepository;
@ -78,7 +79,8 @@ export class StaticJobRepository implements JobRepository {
private evmRepo: (chain: string) => EvmBlockRepository;
private metadataRepo: MetadataRepository<any>;
private statsRepo: StatRepository;
private snsRepo: SnsEventRepository;
private snsRepo?: SnsEventRepository;
private influxRepo?: InfluxEventRepository;
private solanaSlotRepo: SolanaSlotRepository;
private suiRepo: SuiRepository;
private aptosRepo: AptosRepository;
@ -94,7 +96,8 @@ export class StaticJobRepository implements JobRepository {
repos: {
metadataRepo: MetadataRepository<any>;
statsRepo: StatRepository;
snsRepo: SnsEventRepository;
snsRepo?: SnsEventRepository;
influxRepo?: InfluxEventRepository;
solanaSlotRepo: SolanaSlotRepository;
suiRepo: SuiRepository;
aptosRepo: AptosRepository;
@ -108,6 +111,7 @@ export class StaticJobRepository implements JobRepository {
this.metadataRepo = repos.metadataRepo;
this.statsRepo = repos.statsRepo;
this.snsRepo = repos.snsRepo;
this.influxRepo = repos.influxRepo;
this.solanaSlotRepo = repos.solanaSlotRepo;
this.suiRepo = repos.suiRepo;
this.aptosRepo = repos.aptosRepo;
@ -272,12 +276,14 @@ export class StaticJobRepository implements JobRepository {
}
private loadTargets(): void {
const snsTarget = () => this.snsRepo.asTarget();
const snsTarget = () => this.snsRepo!.asTarget();
const influxTarget = () => this.influxRepo!.asTarget();
const dummyTarget = async () => async (events: any[]) => {
log.info(`[target dummy] Got ${events.length} events`);
};
this.targets.set("sns", snsTarget);
this.snsRepo && this.targets.set("sns", snsTarget);
this.influxRepo && this.targets.set("influx", influxTarget);
this.targets.set("dummy", dummyTarget);
}

View File

@ -9,6 +9,7 @@ if (!("toJSON" in BigInt.prototype)) {
export * from "./FileMetadataRepository";
export * from "./SnsEventRepository";
export * from "./InfluxEventRepository";
export * from "./evm/EvmJsonRPCBlockRepository";
export * from "./evm/BscEvmJsonRPCBlockRepository";
export * from "./evm/ArbitrumEvmJsonRPCBlockRepository";

View File

@ -1,5 +1,5 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { AlgorandTransaction } from "../../../../src/domain/entities/algorand";
import {
PollAlgorandMetadata,

View File

@ -1,5 +1,5 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { AlgorandTransaction } from "../../../../src/domain/entities/algorand";
import {
PollAlgorandMetadata,

View File

@ -1,5 +1,5 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { AptosTransaction } from "../../../../src/domain/entities/aptos";
import {
PollAptosTransactionsMetadata,

View File

@ -1,5 +1,5 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { AptosTransaction } from "../../../../src/domain/entities/aptos";
import {
PollAptosTransactionsMetadata,

View File

@ -6,7 +6,7 @@ import {
StatRepository,
} from "../../../../src/domain/repositories";
import { EvmBlock, EvmLog, ReceiptTransaction } from "../../../../src/domain/entities";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
let cfg = PollEvmLogsConfig.fromBlock("acala", 0n);

View File

@ -1,5 +1,5 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { SeiRedeem } from "../../../../src/domain/entities/sei";
import {
PollSeiMetadata,

View File

@ -1,5 +1,5 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { SeiRedeem } from "../../../../src/domain/entities/sei";
import {
MetadataRepository,

View File

@ -9,7 +9,7 @@ import {
SolanaSlotRepository,
StatRepository,
} from "../../../../src/domain/repositories";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { solana } from "../../../../src/domain/entities";
import { Fallible } from "../../../../src/domain/errors";

View File

@ -14,7 +14,7 @@ import {
SuiRepository,
} from "../../../../src/domain/repositories";
import { mockMetadataRepository, mockStatsRepository } from "../../../mocks/repos";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { SuiTransactionBlockReceipt } from "../../../../src/domain/entities/sui";
let statsRepo: StatRepository;

View File

@ -1,5 +1,5 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { CosmosTransaction } from "../../../../src/domain/entities/wormchain";
import {
PollWormchainLogsMetadata,

View File

@ -1,5 +1,5 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { CosmosTransaction } from "../../../../src/domain/entities/wormchain";
import {
PollWormchainLogsMetadata,

View File

@ -1,5 +1,5 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { thenWaitForAssertion } from "../../../wait-assertion";
import { thenWaitForAssertion } from "../../../waitAssertion";
import { WormchainBlockLogs } from "../../../../src/domain/entities/wormchain";
import {
PollWormchainLogsMetadata,

View File

@ -0,0 +1,87 @@
import { describe, expect, it, jest } from "@jest/globals";
import {
InfluxConfig,
InfluxEventRepository,
} from "../../../src/infrastructure/repositories/InfluxEventRepository";
import { InfluxDB, WriteApi } from "@influxdata/influxdb-client";
let eventRepository: InfluxEventRepository;
let influxClient: InfluxDB;
let influxWriteApi: WriteApi;
let config: InfluxConfig;
describe("InfluxEventRepository", () => {
it("should not call influx client when no events given", async () => {
givenInfluxEventRepository();
const result = await eventRepository.publish([]);
expect(result).toEqual({ status: "success" });
expect(influxWriteApi.writePoints).not.toHaveBeenCalled();
});
it("should publish", async () => {
givenInfluxEventRepository();
const result = await eventRepository.publish([
{
chainId: 1,
address: "0x123456",
txHash: "0x123",
blockHeight: 123n,
blockTime: 0,
name: "LogMessagePublished",
attributes: {
sequence: 1,
},
tags: {
sender: "0x123456",
},
},
]);
expect(result).toEqual({ status: "success" });
expect(influxWriteApi.writePoints).toHaveBeenCalledTimes(1);
});
it("should fail to publish unsupported attributes", async () => {
givenInfluxEventRepository();
const result = await eventRepository.publish([
{
chainId: 1,
address: "0x123456",
txHash: "0x123",
blockHeight: 123n,
blockTime: 0,
name: "LogMessagePublished",
attributes: {
sequences: { sequence: 1 },
},
},
]);
expect(result).toEqual({
status: "error",
reason: "Unsupported field type for sequences: object",
});
expect(influxWriteApi.writePoints).toHaveBeenCalledTimes(0);
});
});
const givenInfluxEventRepository = () => {
config = {
url: "http://localhost:8086",
token: "my-token",
org: "my-org",
bucket: "my-bucket",
};
influxWriteApi = {
writePoints: jest.fn(() => {}),
close: jest.fn(),
} as unknown as WriteApi;
influxClient = {
getWriteApi: () => influxWriteApi,
} as unknown as InfluxDB;
eventRepository = new InfluxEventRepository(influxClient, config);
};

View File

@ -18,7 +18,7 @@ import {
} from "../../../src/infrastructure/repositories";
describe("RepositoriesBuilder", () => {
it("should be throw error because dose not have any chain", async () => {
it("should throw error because does not have any chain", async () => {
try {
// When
new RepositoriesBuilder(configMock());
@ -28,7 +28,7 @@ describe("RepositoriesBuilder", () => {
}
});
it("should be throw error because dose not support test chain", async () => {
it("should throw error because dose not support test chain", async () => {
try {
// When
new RepositoriesBuilder(configMock());
@ -38,7 +38,7 @@ describe("RepositoriesBuilder", () => {
}
});
it("should be return all repositories instances", async () => {
it("should return all repositories instances", async () => {
// When
const repos = new RepositoriesBuilder(configMock());
// Then

View File

@ -1,5 +1,6 @@
import { Config, ChainRPCConfig } from "../../src/infrastructure/config";
import { SnsConfig } from "../../src/infrastructure/repositories";
import { InfluxConfig } from "../../src/infrastructure/repositories/InfluxEventRepository";
export const configMock = (): Config => {
const chainsRecord: Record<string, ChainRPCConfig> = {
@ -248,12 +249,20 @@ export const configMock = (): Config => {
},
};
const influxConfig: InfluxConfig = {
url: "http://localhost",
token: "aToken",
org: "anOrg",
bucket: "aBucket",
};
const cfg: Config = {
environment: "testnet",
port: 999,
logLevel: "info",
dryRun: false,
sns: snsConfig,
influx: influxConfig,
metadata: {
dir: "./metadata-repo/jobs",
},