ratelimited solana client + increase coverage

This commit is contained in:
matias martinez 2023-11-28 11:29:59 -03:00
parent a469be1331
commit 4d71c0038c
15 changed files with 243 additions and 11 deletions

View File

@ -1,6 +1,7 @@
/** @type {import('ts-jest').JestConfigWithTsJest} */
module.exports = {
moduleFileExtensions: ["js", "json", "ts"],
setupFiles: ["<rootDir>/src/infrastructure/log.ts"],
roots: ["test", "src"],
testRegex: ".*\\.test\\.ts$",
transform: {
@ -12,7 +13,7 @@ module.exports = {
coverageDirectory: "./coverage",
coverageThreshold: {
global: {
lines: 55,
lines: 63,
},
},
};

View File

@ -17,6 +17,7 @@
"config": "^3.3.9",
"dotenv": "^16.3.1",
"ethers": "^5",
"mollitia": "^0.1.0",
"prom-client": "^15.0.0",
"uuid": "^9.0.1",
"winston": "3.8.2"
@ -9683,6 +9684,11 @@
"node": ">=10"
}
},
"node_modules/mollitia": {
"version": "0.1.0",
"resolved": "https://registry.npmjs.org/mollitia/-/mollitia-0.1.0.tgz",
"integrity": "sha512-lbbFJdhrNEuReGlbsMqXyTnTiO8Pt+8rKAlLcVyRPNmsRyL+YWR3MlC9Sx8UkOSSLIAUNvIoNlXqD/BWBv9TFQ=="
},
"node_modules/ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",
@ -19057,6 +19063,11 @@
"integrity": "sha512-vVqVZQyf3WLx2Shd0qJ9xuvqgAyKPLAiqITEtqW0oIUjzo3PePDd6fW9iFz30ef7Ysp/oiWqbhszeGWW2T6Gzw==",
"optional": true
},
"mollitia": {
"version": "0.1.0",
"resolved": "https://registry.npmjs.org/mollitia/-/mollitia-0.1.0.tgz",
"integrity": "sha512-lbbFJdhrNEuReGlbsMqXyTnTiO8Pt+8rKAlLcVyRPNmsRyL+YWR3MlC9Sx8UkOSSLIAUNvIoNlXqD/BWBv9TFQ=="
},
"ms": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz",

View File

@ -23,6 +23,7 @@
"config": "^3.3.9",
"dotenv": "^16.3.1",
"ethers": "^5",
"mollitia": "^0.1.0",
"prom-client": "^15.0.0",
"uuid": "^9.0.1",
"winston": "3.8.2"

View File

@ -31,8 +31,8 @@ export abstract class RunPollingJob {
try {
items = await this.get();
await Promise.all(handlers.map((handler) => handler(items)));
} catch (e) {
this.logger.error("Error processing items", e);
} catch (e: Error | any) {
this.logger.error("Error processing items", e.stack ?? e);
await setTimeout(this.interval);
continue;
}

View File

@ -12,7 +12,7 @@ export class HandleSolanaTransactions<T> {
constructor(
cfg: HandleSolanaTxConfig,
mapper: (txs: solana.Transaction) => Promise<T[]>,
mapper: (tx: solana.Transaction) => Promise<T[]>,
target?: (parsed: T[]) => Promise<void>
) {
this.cfg = cfg;

View File

@ -1,6 +1,7 @@
export enum ErrorType {
SkippedSlot,
NoBlockOrBlockTime,
Ratelimit,
}
export class SolanaFailure extends Error {

View File

@ -9,6 +9,7 @@ import {
PromStatRepository,
StaticJobRepository,
Web3SolanaSlotRepository,
RateLimitedSolanaSlotRepository,
} from "./repositories";
import { HttpClient } from "./http/HttpClient";
@ -38,7 +39,10 @@ export class RepositoriesBuilder {
if (chain === "solana") {
const cfg = this.cfg.platforms[chain];
const solanaSlotRepository = new Web3SolanaSlotRepository(new Connection(cfg.rpcs[0]));
const solanaSlotRepository = new RateLimitedSolanaSlotRepository(
new Web3SolanaSlotRepository(new Connection(cfg.rpcs[0])),
cfg.rateLimit
);
this.repositories.set("solana-slotRepo", solanaSlotRepository);
}

View File

@ -23,6 +23,10 @@ export type PlatformConfig = {
chainId: number;
rpcs: string[];
timeout?: number;
rateLimit?: {
period: number;
limit: number;
};
};
/*

View File

@ -82,6 +82,7 @@ export class SnsEventRepository {
};
}
this.logger.info(`Published ${events.length} events to SNS`);
return {
status: "success",
};
@ -94,7 +95,6 @@ export class SnsEventRepository {
this.logger.error(`Error publishing events to SNS: ${result.reason ?? result.reasons}`);
throw new Error(`Error publishing events to SNS: ${result.reason}`);
}
this.logger.info(`Published ${events.length} events to SNS`);
};
}
}

View File

@ -12,4 +12,5 @@ export * from "./SnsEventRepository";
export * from "./EvmJsonRPCBlockRepository";
export * from "./PromStatRepository";
export * from "./StaticJobRepository";
export * from "./Web3SolanaSlotRepository";
export * from "./solana/Web3SolanaSlotRepository";
export * from "./solana/RateLimitedSolanaSlotRepository";

View File

@ -0,0 +1,58 @@
import { Commitment } from "@solana/web3.js";
import { Circuit, Ratelimit, RatelimitError } from "mollitia";
import { solana } from "../../../domain/entities";
import { SolanaSlotRepository } from "../../../domain/repositories";
import { Fallible, SolanaFailure, ErrorType } from "../../../domain/errors";
export class RateLimitedSolanaSlotRepository implements SolanaSlotRepository {
delegate: SolanaSlotRepository;
breaker: Circuit;
constructor(delegate: SolanaSlotRepository, opts: Options = { period: 10_000, limit: 50 }) {
this.delegate = delegate;
this.breaker = new Circuit({
options: {
modules: [new Ratelimit({ limitPeriod: opts.period, limitForPeriod: opts.limit })],
},
});
}
getLatestSlot(commitment: string): Promise<number> {
return this.breaker.fn(() => this.delegate.getLatestSlot(commitment)).execute();
}
async getBlock(slot: number, finality?: string): Promise<Fallible<solana.Block, SolanaFailure>> {
try {
const result: Fallible<solana.Block, SolanaFailure> = await this.breaker
.fn(() => this.delegate.getBlock(slot, finality))
.execute();
return result;
} catch (err) {
if (err instanceof RatelimitError) {
return Fallible.error(new SolanaFailure(0, err.message, ErrorType.Ratelimit));
}
return Fallible.error(new SolanaFailure(err, "unknown error"));
}
}
getSignaturesForAddress(
address: string,
beforeSig: string,
afterSig: string,
limit: number
): Promise<solana.ConfirmedSignatureInfo[]> {
return this.breaker
.fn(() => this.delegate.getSignaturesForAddress(address, beforeSig, afterSig, limit))
.execute(address, beforeSig, afterSig, limit);
}
getTransactions(sigs: solana.ConfirmedSignatureInfo[]): Promise<solana.Transaction[]> {
return this.breaker.fn(() => this.delegate.getTransactions(sigs)).execute(sigs);
}
}
export type Options = {
period: number;
limit: number;
};

View File

@ -6,9 +6,9 @@ import {
SolanaJSONRPCError,
} from "@solana/web3.js";
import { solana } from "../../domain/entities";
import { SolanaSlotRepository } from "../../domain/repositories";
import { Fallible, SolanaFailure } from "../../domain/errors";
import { solana } from "../../../domain/entities";
import { SolanaSlotRepository } from "../../../domain/repositories";
import { Fallible, SolanaFailure } from "../../../domain/errors";
export class Web3SolanaSlotRepository implements SolanaSlotRepository {
connection: Connection;

View File

@ -0,0 +1,59 @@
import { describe, jest, it, expect } from "@jest/globals";
import {
HandleSolanaTransactions,
HandleSolanaTxConfig,
} from "../../../../src/domain/actions/solana/HandleSolanaTransactions";
import { solana } from "../../../../src/domain/entities";
let solanaTxs: solana.Transaction[];
describe("HandleSolanaTransactions", () => {
let handleSolanaTransactions: HandleSolanaTransactions<any>;
const mockConfig: HandleSolanaTxConfig = {
programId: "mockProgramId",
};
it("should handle Solana transactions", async () => {
givenSolanaTransactions();
handleSolanaTransactions = new HandleSolanaTransactions<any>(
mockConfig,
async (tx: solana.Transaction) => {
return [tx];
}
);
const result = await handleSolanaTransactions.handle(solanaTxs);
expect(result).toEqual(solanaTxs);
});
it("should handle Solana transactions with a target", async () => {
givenSolanaTransactions();
const mockTarget = jest.fn<(parsed: any[]) => Promise<void>>();
handleSolanaTransactions = new HandleSolanaTransactions<any>(
mockConfig,
async (tx: solana.Transaction) => {
return [tx];
},
mockTarget
);
const mockTransactions: solana.Transaction[] = await handleSolanaTransactions.handle(solanaTxs);
expect(mockTarget).toHaveBeenCalledWith(mockTransactions);
});
});
const givenSolanaTransactions = () =>
(solanaTxs = [
{
slot: 1,
transaction: {
message: {
accountKeys: [],
instructions: [],
compiledInstructions: [],
},
signatures: [],
},
},
]);

View File

@ -0,0 +1,55 @@
import { expect, describe, it } from "@jest/globals";
import {
Web3SolanaSlotRepository,
RateLimitedSolanaSlotRepository,
} from "../../../src/infrastructure/repositories";
const repoMock = {
getSlot: () => Promise.resolve(100),
getLatestSlot: () => Promise.resolve(100),
getBlock: () => Promise.resolve({ blockTime: 100, transactions: [] }),
getSignaturesForAddress: () => Promise.resolve([]),
getTransactions: () => Promise.resolve([]),
} as any as Web3SolanaSlotRepository;
describe("RateLimitedSolanaSlotRepository", () => {
describe("getLatestSlot", () => {
it("should fail when ratelimit is exceeded", async () => {
const repository = new RateLimitedSolanaSlotRepository(repoMock, { period: 1000, limit: 1 });
await repository.getLatestSlot("confirmed");
await expect(repository.getLatestSlot("confirmed")).rejects.toThrowError();
});
});
describe("getBlock", () => {
it("should fail when ratelimit is exceeded", async () => {
const repository = new RateLimitedSolanaSlotRepository(repoMock, { period: 1000, limit: 1 });
await repository.getBlock(1);
const failure = await repository.getBlock(1);
expect(failure.getError()).toHaveProperty("message", "Ratelimited");
});
});
describe("getSignaturesForAddress", () => {
it("should fail when ratelimit is exceeded", async () => {
const repository = new RateLimitedSolanaSlotRepository(repoMock, { period: 1000, limit: 1 });
await repository.getSignaturesForAddress("address", "before", "after", 1);
await expect(
repository.getSignaturesForAddress("address", "before", "after", 1)
).rejects.toThrowError();
});
});
describe("getTransactions", () => {
it("should fail when ratelimit is exceeded", async () => {
const repository = new RateLimitedSolanaSlotRepository(repoMock, { period: 1000, limit: 1 });
await repository.getTransactions([]);
await expect(repository.getTransactions([])).rejects.toThrowError();
});
});
});

View File

@ -1,4 +1,5 @@
import { expect, describe, it } from "@jest/globals";
import { PublicKey } from "@solana/web3.js";
import { solana } from "../../../src/domain/entities";
import { Web3SolanaSlotRepository } from "../../../src/infrastructure/repositories";
@ -20,7 +21,32 @@ describe("Web3SolanaSlotRepository", () => {
it("should return a block for a given slot number", async () => {
const expected = {
blockTime: 100,
transactions: [],
transactions: [
{
signature: "signature1",
slot: 100,
transaction: {
message: {
version: "legacy",
accountKeys: [new PublicKey("3u8hJUVTA4jH1wYAyUur7FFZVQ8H635K3tSHHF4ssjQ5")],
instructions: [],
compiledInstructions: [],
},
},
},
{
signature: "signature1",
slot: 100,
transaction: {
message: {
version: 0,
staticAccountKeys: [new PublicKey("3u8hJUVTA4jH1wYAyUur7FFZVQ8H635K3tSHHF4ssjQ5")],
instructions: [],
compiledInstructions: [],
},
},
},
],
};
const connectionMock = {
getBlock: (slot: number) => Promise.resolve(expected),
@ -32,6 +58,17 @@ describe("Web3SolanaSlotRepository", () => {
expect(block.blockTime).toBe(expected.blockTime);
expect(block.transactions).toHaveLength(expected.transactions.length);
});
it("should return an error when the block is not found", async () => {
const connectionMock = {
getBlock: (slot: number) => Promise.resolve(null),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const block = await repository.getBlock(100);
expect(block.getError()).toBeDefined();
});
});
describe("getSignaturesForAddress", () => {