Merge remote-tracking branch 'origin' into feature/blockchain-watcher-solana-log

This commit is contained in:
matias martinez 2023-11-28 16:03:54 -03:00
commit 72f0e8a618
5 changed files with 436 additions and 1 deletions

View File

@ -0,0 +1,27 @@
import fs from "fs";
import { MetadataRepository } from "../../domain/repositories";
export class FileMetadataRepo implements MetadataRepository<any> {
private readonly dirPath: string;
constructor(dirPath: string) {
this.dirPath = dirPath;
if (!fs.existsSync(this.dirPath)) {
fs.mkdirSync(this.dirPath, { recursive: true });
}
}
async get(id: string): Promise<any> {
const filePath = `${this.dirPath}/${id}.json`;
return fs.promises
.readFile(filePath, "utf8")
.then(JSON.parse)
.catch((err) => null);
}
async save(id: string, metadata: any): Promise<void> {
const filePath = `${this.dirPath}/${id}.json`;
return fs.promises.writeFile(filePath, JSON.stringify(metadata), "utf8");
}
}

View File

@ -0,0 +1,130 @@
import axios, { AxiosError, AxiosInstance } from "axios";
import { setTimeout } from "timers/promises";
/**
* A simple HTTP client with exponential backoff retries and 429 handling.
*/
export class HttpClient {
private initialDelay: number = 1_000;
private maxDelay: number = 60_000;
private retries: number = 0;
private timeout: number = 5_000;
private axios: AxiosInstance;
constructor(options?: HttpClientOptions) {
options?.initialDelay && (this.initialDelay = options.initialDelay);
options?.maxDelay && (this.maxDelay = options.maxDelay);
options?.retries && (this.retries = options.retries);
options?.timeout && (this.timeout = options.timeout);
this.axios = axios.create();
}
public async post<T>(url: string, body: any, opts?: HttpClientOptions): Promise<T> {
return this.executeWithRetry(url, "POST", body, opts);
}
private async execute<T>(
url: string,
method: string,
body?: any,
opts?: HttpClientOptions
): Promise<T> {
let response;
try {
response = await this.axios.request<T>({
url: url,
method: method,
data: body,
timeout: opts?.timeout ?? this.timeout,
signal: AbortSignal.timeout(opts?.timeout ?? this.timeout),
});
} catch (err: AxiosError | any) {
// Connection / timeout error:
if (err instanceof AxiosError) {
throw new HttpClientError(err.message ?? err.code, { status: err?.status ?? 0 }, err);
}
throw new HttpClientError(err.message ?? err.code, undefined, err);
}
if (!(response.status > 200) && !(response.status < 300)) {
throw new HttpClientError(undefined, response, response.data);
}
return response.data;
}
private async executeWithRetry<T>(
url: string,
method: string,
body?: any,
opts?: HttpClientOptions
): Promise<T> {
const maxRetries = opts?.retries ?? this.retries;
let retries = 0;
const initialDelay = opts?.initialDelay ?? this.initialDelay;
const maxDelay = opts?.maxDelay ?? this.maxDelay;
while (maxRetries >= 0) {
try {
return await this.execute(url, method, body, opts);
} catch (err) {
if (err instanceof HttpClientError) {
if (retries < maxRetries) {
const retryAfter = err.getRetryAfter(maxDelay, err);
if (retryAfter) {
await setTimeout(retryAfter, { ref: false });
} else {
const timeout = Math.min(initialDelay * 2 ** maxRetries, maxDelay);
await setTimeout(timeout, { ref: false });
}
retries++;
continue;
}
}
throw err;
}
}
throw new Error(`Failed to reach ${url}`);
}
}
export type HttpClientOptions = {
initialDelay?: number;
maxDelay?: number;
retries?: number;
timeout?: number;
};
export class HttpClientError extends Error {
public readonly status?: number;
public readonly data?: any;
public readonly headers?: any;
constructor(message?: string, response?: { status: number; headers?: any }, data?: any) {
super(message ?? `Unexpected status code: ${response?.status}`);
this.status = response?.status;
this.data = data;
this.headers = response?.headers;
Error.captureStackTrace(this, this.constructor);
}
/**
* Parses the Retry-After header and returns the value in milliseconds.
* @param maxDelay
* @param error
* @throws {HttpClientError} if retry-after is bigger than maxDelay.
* @returns the retry-after value in milliseconds.
*/
public getRetryAfter(maxDelay: number, error: HttpClientError): number | undefined {
const retryAfter = this.headers?.get("Retry-After");
if (retryAfter) {
const value = parseInt(retryAfter) * 1000; // header value is in seconds
if (value <= maxDelay) {
return value;
}
throw error;
}
}
}

View File

@ -0,0 +1,98 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { HandleEvmLogs, HandleEvmLogsConfig } from "../../src/domain/actions/HandleEvmLogs";
import { EvmLog, LogFoundEvent } from "../../src/domain/entities";
const ABI =
"event SendEvent(uint64 indexed sequence, uint256 deliveryQuote, uint256 paymentForExtraReceiverValue)";
const mapper = (log: EvmLog, args: ReadonlyArray<any>) => {
return {
name: "send-event",
address: log.address,
chainId: 1,
txHash: "0x0",
blockHeight: 0n,
blockTime: 0,
attributes: {
sequence: args[0].toString(),
deliveryQuote: args[1].toString(),
paymentForExtraReceiverValue: args[2].toString(),
},
};
};
const targetRepo = {
save: async (events: LogFoundEvent<Record<string, string>>[]) => {
Promise.resolve();
},
failingSave: async (events: LogFoundEvent<Record<string, string>>[]) => {
Promise.reject();
},
};
let targetRepoSpy: jest.SpiedFunction<(typeof targetRepo)["save"]>;
let evmLogs: EvmLog[];
let cfg: HandleEvmLogsConfig;
let handleEvmLogs: HandleEvmLogs<LogFoundEvent<Record<string, string>>>;
describe("HandleEvmLogs", () => {
afterEach(async () => {});
it("should be able to map logs", async () => {
const expectedLength = 5;
givenConfig(ABI);
givenEvmLogs(expectedLength, expectedLength);
givenHandleEvmLogs();
const result = await handleEvmLogs.handle(evmLogs);
expect(result).toHaveLength(expectedLength);
expect(result[0].attributes.sequence).toBe("3389");
expect(result[0].attributes.deliveryQuote).toBe("75150000000000000");
expect(result[0].attributes.paymentForExtraReceiverValue).toBe("0");
expect(targetRepoSpy).toBeCalledWith(result);
});
});
const givenHandleEvmLogs = (targetFn: "save" | "failingSave" = "save") => {
targetRepoSpy = jest.spyOn(targetRepo, targetFn);
handleEvmLogs = new HandleEvmLogs(cfg, mapper, targetRepo[targetFn]);
};
const givenConfig = (abi: string) => {
cfg = {
filter: {
addresses: ["0x28D8F1Be96f97C1387e94A53e00eCcFb4E75175a"],
topics: ["0xda8540426b64ece7b164a9dce95448765f0a7263ef3ff85091c9c7361e485364"],
},
abi,
};
};
const givenEvmLogs = (length: number, matchingFilterOnes: number) => {
evmLogs = [];
let matchingCount = 0;
for (let i = 0; i < length; i++) {
let address = "0x392f472048681816e91026cd768c60958b55352add2837adea9ea6249178b8a8";
let topic: string | undefined = undefined;
if (matchingCount < matchingFilterOnes) {
address = cfg.filter.addresses![0].toUpperCase();
topic = cfg.filter.topics![0];
matchingCount++;
}
evmLogs.push({
blockTime: 0,
blockNumber: BigInt(i + 1),
blockHash: "0x1a07d0bd31c84f0dab36eac31a2f3aa801852bf8240ffba19113c62463f694fa",
address: address,
removed: false,
data: "0x000000000000000000000000000000000000000000000000010afc86dedee0000000000000000000000000000000000000000000000000000000000000000000",
transactionHash: "0x2077dbd0c685c264dfa4e8e048ff15b03775043070216644258bf1bd3e419aa8",
transactionIndex: "0x4",
topics: topic
? [topic, "0x0000000000000000000000000000000000000000000000000000000000000d3d"]
: [],
logIndex: 0,
});
}
};

View File

@ -0,0 +1,180 @@
import { afterEach, describe, it, expect, jest } from "@jest/globals";
import { setTimeout } from "timers/promises";
import {
PollEvmLogsMetadata,
PollEvmLogs,
PollEvmLogsConfig,
} from "../../src/domain/actions/PollEvmLogs";
import {
EvmBlockRepository,
MetadataRepository,
StatRepository,
} from "../../src/domain/repositories";
import { EvmBlock, EvmLog } from "../../src/domain/entities";
let cfg = PollEvmLogsConfig.fromBlock(0n);
let getBlocksSpy: jest.SpiedFunction<EvmBlockRepository["getBlocks"]>;
let getLogsSpy: jest.SpiedFunction<EvmBlockRepository["getFilteredLogs"]>;
let handlerSpy: jest.SpiedFunction<(logs: EvmLog[]) => Promise<void>>;
let metadataSaveSpy: jest.SpiedFunction<MetadataRepository<PollEvmLogsMetadata>["save"]>;
let metadataRepo: MetadataRepository<PollEvmLogsMetadata>;
let evmBlockRepo: EvmBlockRepository;
let statsRepo: StatRepository;
let handlers = {
working: (logs: EvmLog[]) => Promise.resolve(),
failing: (logs: EvmLog[]) => Promise.reject(),
};
let pollEvmLogs: PollEvmLogs;
describe("PollEvmLogs", () => {
afterEach(async () => {
await pollEvmLogs.stop();
});
it("should be able to read logs from latest block when no fromBlock is configured", async () => {
const currentHeight = 10n;
const blocksAhead = 1n;
givenEvmBlockRepository(currentHeight, blocksAhead);
givenMetadataRepository();
givenStatsRepository();
givenPollEvmLogs();
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => expect(getBlocksSpy).toHaveReturnedTimes(1),
() => expect(getBlocksSpy).toHaveBeenCalledWith(new Set([currentHeight, currentHeight + 1n])),
() =>
expect(getLogsSpy).toBeCalledWith({
addresses: cfg.addresses,
topics: cfg.topics,
fromBlock: currentHeight + blocksAhead,
toBlock: currentHeight + blocksAhead,
})
);
});
it("should be able to read logs from last known block when configured from is before", async () => {
const lastExtractedBlock = 10n;
const blocksAhead = 10n;
givenEvmBlockRepository(lastExtractedBlock, blocksAhead);
givenMetadataRepository({ lastBlock: lastExtractedBlock });
givenStatsRepository();
givenPollEvmLogs(lastExtractedBlock - 10n);
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => () =>
expect(getBlocksSpy).toHaveBeenCalledWith(
new Set([lastExtractedBlock, lastExtractedBlock + 1n])
),
() =>
expect(getLogsSpy).toBeCalledWith({
addresses: cfg.addresses,
topics: cfg.topics,
fromBlock: lastExtractedBlock + 1n,
toBlock: lastExtractedBlock + blocksAhead,
})
);
});
it("should pass logs to handlers and persist metadata", async () => {
const currentHeight = 10n;
const blocksAhead = 1n;
givenEvmBlockRepository(currentHeight, blocksAhead);
givenMetadataRepository();
givenStatsRepository();
givenPollEvmLogs(currentHeight);
await whenPollEvmLogsStarts();
await thenWaitForAssertion(
() => expect(handlerSpy).toHaveBeenCalledWith(expect.any(Array)),
() =>
expect(metadataSaveSpy).toBeCalledWith("watch-evm-logs", {
lastBlock: currentHeight + blocksAhead,
})
);
});
});
const givenEvmBlockRepository = (height?: bigint, blocksAhead?: bigint) => {
const logsResponse: EvmLog[] = [];
const blocksResponse: Record<string, EvmBlock> = {};
if (height) {
for (let index = 0n; index <= (blocksAhead ?? 1n); index++) {
logsResponse.push({
blockNumber: height + index,
blockHash: `0x0${index}`,
blockTime: 0,
address: "",
removed: false,
data: "",
transactionHash: "",
transactionIndex: "",
topics: [],
logIndex: 0,
});
blocksResponse[`0x0${index}`] = {
timestamp: 0,
hash: `0x0${index}`,
number: height + index,
};
}
}
evmBlockRepo = {
getBlocks: () => Promise.resolve(blocksResponse),
getBlockHeight: () => Promise.resolve(height ? height + (blocksAhead ?? 10n) : 10n),
getFilteredLogs: () => Promise.resolve(logsResponse),
};
getBlocksSpy = jest.spyOn(evmBlockRepo, "getBlocks");
getLogsSpy = jest.spyOn(evmBlockRepo, "getFilteredLogs");
handlerSpy = jest.spyOn(handlers, "working");
};
const givenMetadataRepository = (data?: PollEvmLogsMetadata) => {
metadataRepo = {
get: () => Promise.resolve(data),
save: () => Promise.resolve(),
};
metadataSaveSpy = jest.spyOn(metadataRepo, "save");
};
const givenStatsRepository = () => {
statsRepo = {
count: () => {},
measure: () => {},
report: () => Promise.resolve(""),
};
};
const givenPollEvmLogs = (from?: bigint) => {
cfg.setFromBlock(from);
pollEvmLogs = new PollEvmLogs(evmBlockRepo, metadataRepo, statsRepo, cfg);
};
const whenPollEvmLogsStarts = async () => {
pollEvmLogs.run([handlers.working]);
};
const thenWaitForAssertion = async (...assertions: (() => void)[]) => {
for (let index = 1; index < 5; index++) {
try {
for (const assertion of assertions) {
assertion();
}
break;
} catch (error) {
if (index === 4) {
throw error;
}
await setTimeout(10, undefined, { ref: false });
}
}
};

View File

@ -18,4 +18,4 @@ SNS_TOPIC_ARN=
SNS_REGION=
SOLANA_RPCS='[""]'
AWS_IAM_ROLE=
AWS_IAM_ROLE=