Add retry for evm and sui

This commit is contained in:
julian merlo 2024-02-12 16:24:35 -03:00
parent c9bc9660f6
commit e9be36bfd6
10 changed files with 254 additions and 88 deletions

View File

@ -8,11 +8,10 @@
"ethereum": {
"network": "mainnet",
"rpcs": [
"https://rpc.notadegen.com/eth",
"https://rpc.flashbots.net/fast",
"https://rpc.notadegen.com/eth",
"https://api.securerpc.com/v1",
"https://rpc.mevblocker.io/noreverts"
"https://rpc.mevblocker.io/noreverts",
"https://rpc.notadegen.com/eth"
]
},
"bsc": {
@ -81,9 +80,9 @@
"arbitrum": {
"network": "mainnet",
"rpcs": [
"https://arb1.arbitrum.io/rpc",
"https://arbitrum.blockpi.network/v1/rpc/public",
"https://arbitrum-one.public.blastapi.io"
"https://arbitrum-one.public.blastapi.io",
"https://arb1.arbitrum.io/rpc"
]
},
"optimism": {

View File

@ -24,6 +24,8 @@ import {
import { JobRepository, SuiRepository } from "../../domain/repositories";
import { Config } from "../config";
import { InstrumentedHttpProvider } from "../rpc/http/InstrumentedHttpProvider";
import { RateLimitedEvmJsonRPCBlockRepository } from "./evm/RateLimitedEvmJsonRPCBlockRepository";
import { RateLimitedSuiJsonRPCBlockRepository } from "./sui/RateLimitedSuiJsonRPCBlockRepository";
const SOLANA_CHAIN = "solana";
const EVM_CHAIN = "evm";
@ -94,17 +96,28 @@ export class RepositoriesBuilder {
const repoCfg: EvmJsonRPCBlockRepositoryCfg = {
chains: this.cfg.chains,
};
this.repositories.set("bsc-evmRepo", new BscEvmJsonRPCBlockRepository(repoCfg, pools));
this.repositories.set("evmRepo", new EvmJsonRPCBlockRepository(repoCfg, pools));
this.repositories.set("polygon-evmRepo", new PolygonJsonRPCBlockRepository(repoCfg, pools));
this.repositories.set(
"moonbeam-evmRepo",
const moonbeamRepository = new RateLimitedEvmJsonRPCBlockRepository(
new MoonbeamEvmJsonRPCBlockRepository(repoCfg, pools)
);
this.repositories.set(
"arbitrum-evmRepo",
const arbitrumRepository = new RateLimitedEvmJsonRPCBlockRepository(
new ArbitrumEvmJsonRPCBlockRepository(repoCfg, pools, this.getMetadataRepository())
);
const polygonRepository = new RateLimitedEvmJsonRPCBlockRepository(
new PolygonJsonRPCBlockRepository(repoCfg, pools)
);
const bscRepository = new RateLimitedEvmJsonRPCBlockRepository(
new BscEvmJsonRPCBlockRepository(repoCfg, pools)
);
const evmRepository = new RateLimitedEvmJsonRPCBlockRepository(
new EvmJsonRPCBlockRepository(repoCfg, pools)
);
this.repositories.set("moonbeam-evmRepo", moonbeamRepository);
this.repositories.set("arbitrum-evmRepo", arbitrumRepository);
this.repositories.set("polygon-evmRepo", polygonRepository);
this.repositories.set("bsc-evmRepo", bscRepository);
this.repositories.set("evmRepo", evmRepository);
}
if (chain === SUI_CHAIN) {
@ -114,7 +127,11 @@ export class RepositoriesBuilder {
POOL_STRATEGY
);
this.repositories.set("sui-repo", new SuiJsonRPCBlockRepository(suiProviderPool));
const evmsuiRepository = new RateLimitedSuiJsonRPCBlockRepository(
new SuiJsonRPCBlockRepository(suiProviderPool)
);
this.repositories.set("sui-repo", evmsuiRepository);
}
});

View File

@ -36,6 +36,7 @@ export class ArbitrumEvmJsonRPCBlockRepository extends EvmJsonRPCBlockRepository
try {
// This gets the latest L2 block so we can get the associated L1 block number
response = await this.getChainProvider(chain).post<typeof response>(
chain,
{
jsonrpc: "2.0",
id: 1,

View File

@ -72,12 +72,11 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
let results: (undefined | ResultBlocks)[] = [];
try {
results = await this.getChainProvider(chain).post<typeof results>(reqs, {
results = await this.getChainProvider(chain).post<typeof results>(chain, reqs, {
timeout: chainCfg.timeout,
retries: chainCfg.retries,
});
} catch (e: HttpClientError | any) {
this.handleError(chain, e, "getBlocks", "eth_getBlockByNumber");
throw e;
}
@ -158,6 +157,7 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
let response: { result: Log[]; error?: ErrorBlock };
try {
response = await this.getChainProvider(chain).post<typeof response>(
chain,
{
jsonrpc: "2.0",
method: "eth_getLogs",
@ -167,7 +167,6 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
{ timeout: chainCfg.timeout, retries: chainCfg.retries }
);
} catch (e: HttpClientError | any) {
this.handleError(chain, e, "getFilteredLogs", "eth_getLogs");
throw e;
}
@ -209,6 +208,7 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
let response: { result?: EvmBlock; error?: ErrorBlock };
try {
response = await this.getChainProvider(chain).post<typeof response>(
chain,
{
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
@ -218,7 +218,6 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
{ timeout: chainCfg.timeout, retries: chainCfg.retries }
);
} catch (e: HttpClientError | any) {
this.handleError(chain, e, "getBlock", "eth_getBlockByNumber");
throw e;
}
@ -271,12 +270,11 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}
try {
results = await this.getChainProvider(chain).post<typeof results>(reqs, {
results = await this.getChainProvider(chain).post<typeof results>(chain, reqs, {
timeout: chainCfg.timeout,
retries: chainCfg.retries,
});
} catch (e: HttpClientError | any) {
this.handleError(chain, e, "getTransactionReceipt", "eth_getTransactionReceipt");
throw e;
}

View File

@ -32,6 +32,7 @@ export class MoonbeamEvmJsonRPCBlockRepository extends EvmJsonRPCBlockRepository
const { hash } = await super.getBlock(chain, blockNumber);
const { result } = await this.getChainProvider(chain).post<BlockIsFinalizedResult>(
chain,
{
jsonrpc: "2.0",
id: 1,

View File

@ -22,7 +22,7 @@ export class PolygonJsonRPCBlockRepository extends EvmJsonRPCBlockRepository {
]);
const callData = rootChain.encodeFunctionData("getLastChildBlock");
const callResult: CallResult[] = await this.getChainProvider(chain).post([
const callResult: CallResult[] = await this.getChainProvider(chain).post(chain, [
{
jsonrpc: "2.0",
id: 1,

View File

@ -0,0 +1,74 @@
import {
EvmBlock,
EvmLogFilter,
EvmLog,
EvmTag,
ReceiptTransaction,
} from "../../../domain/entities";
import { EvmBlockRepository } from "../../../domain/repositories";
import winston from "../../log";
import { Circuit, Ratelimit, Retry, RetryMode } from "mollitia";
import { Options } from "../solana/RateLimitedSolanaSlotRepository";
export class RateLimitedEvmJsonRPCBlockRepository implements EvmBlockRepository {
private delegate: EvmBlockRepository;
private breaker: Circuit;
private logger: winston.Logger = winston.child({
module: "RateLimitedEvmJsonRPCBlockRepository",
});
constructor(delegate: EvmBlockRepository, opts: Options = { period: 10_000, limit: 50 }) {
this.delegate = delegate;
this.breaker = new Circuit({
options: {
modules: [
new Ratelimit({ limitPeriod: opts.period, limitForPeriod: opts.limit }),
new Retry({
attempts: 2,
interval: 1_000,
fastFirst: false,
mode: RetryMode.EXPONENTIAL,
factor: 1,
onRejection: (err: Error | any) => {
if (err.message?.startsWith("429 Too Many Requests")) {
this.logger.warn("Got 429 from solana RPC node. Retrying in 10 secs...");
return 10_000; // Wait 10 secs if we get a 429
} else {
return true; // Retry according to config
}
},
}),
],
},
});
}
getBlockHeight(chain: string, finality: string): Promise<bigint> {
return this.breaker.fn(() => this.delegate.getBlockHeight(chain, finality)).execute();
}
getBlocks(chain: string, blockNumbers: Set<bigint>): Promise<Record<string, EvmBlock>> {
return this.breaker.fn(() => this.delegate.getBlocks(chain, blockNumbers)).execute();
}
getFilteredLogs(chain: string, filter: EvmLogFilter): Promise<EvmLog[]> {
return this.breaker.fn(() => this.delegate.getFilteredLogs(chain, filter)).execute();
}
getTransactionReceipt(
chain: string,
hashNumbers: Set<string>
): Promise<Record<string, ReceiptTransaction>> {
return this.breaker.fn(() => this.delegate.getTransactionReceipt(chain, hashNumbers)).execute();
}
getBlock(
chain: string,
blockNumberOrTag: bigint | EvmTag,
isTransactionsPresent: boolean
): Promise<EvmBlock> {
return this.breaker
.fn(() => this.delegate.getBlock(chain, blockNumberOrTag, isTransactionsPresent))
.execute();
}
}

View File

@ -0,0 +1,75 @@
import { Circuit, Ratelimit, Retry, RetryMode } from "mollitia";
import { SuiTransactionBlockReceipt } from "../../../domain/entities/sui";
import { SuiRepository } from "../../../domain/repositories";
import { Options } from "../solana/RateLimitedSolanaSlotRepository";
import { Range } from "../../../domain/entities";
import winston from "winston";
import { Checkpoint, SuiEventFilter, TransactionFilter } from "@mysten/sui.js/client";
export class RateLimitedSuiJsonRPCBlockRepository implements SuiRepository {
private delegate: SuiRepository;
private breaker: Circuit;
private logger: winston.Logger = winston.child({
module: "RateLimitedSuiJsonRPCBlockRepository",
});
constructor(delegate: SuiRepository, opts: Options = { period: 10_000, limit: 50 }) {
this.delegate = delegate;
this.breaker = new Circuit({
options: {
modules: [
new Ratelimit({ limitPeriod: opts.period, limitForPeriod: opts.limit }),
new Retry({
attempts: 2,
interval: 1_000,
fastFirst: false,
mode: RetryMode.EXPONENTIAL,
factor: 1,
onRejection: (err: Error | any) => {
if (err.message?.startsWith("429 Too Many Requests")) {
this.logger.warn("Got 429 from solana RPC node. Retrying in 10 secs...");
return 10_000; // Wait 10 secs if we get a 429
} else {
return true; // Retry according to config
}
},
}),
],
},
});
}
getLastCheckpointNumber(): Promise<bigint> {
return this.breaker.fn(() => this.delegate.getLastCheckpointNumber()).execute();
}
getCheckpoint(sequence: string | number | bigint): Promise<Checkpoint> {
return this.breaker.fn(() => this.delegate.getCheckpoint(sequence)).execute();
}
getLastCheckpoint(): Promise<Checkpoint> {
return this.breaker.fn(() => this.delegate.getLastCheckpoint()).execute();
}
getCheckpoints(range: Range): Promise<Checkpoint[]> {
return this.breaker.fn(() => this.delegate.getCheckpoints(range)).execute();
}
getTransactionBlockReceipts(digests: string[]): Promise<SuiTransactionBlockReceipt[]> {
return this.breaker.fn(() => this.delegate.getTransactionBlockReceipts(digests)).execute();
}
queryTransactions(
filter?: TransactionFilter | undefined,
cursor?: string | undefined
): Promise<SuiTransactionBlockReceipt[]> {
return this.breaker.fn(() => this.delegate.queryTransactions(filter, cursor)).execute();
}
queryTransactionsByEvent(
filter: SuiEventFilter,
cursor?: string | undefined
): Promise<SuiTransactionBlockReceipt[]> {
return this.breaker.fn(() => this.delegate.queryTransactionsByEvent(filter, cursor)).execute();
}
}

View File

@ -1,7 +1,7 @@
import { ProviderHealthInstrumentation } from "@xlabs/rpc-pool";
import { AxiosError } from "axios";
import { setTimeout } from "timers/promises";
import { HttpClientError } from "../../errors/HttpClientError";
import { AxiosError } from "axios";
import winston from "winston";
// make url and chain required
type InstrumentedHttpProviderOptions = Required<Pick<HttpClientOptions, "url" | "chain">> &
@ -18,6 +18,8 @@ export class InstrumentedHttpProvider {
private url: string;
health: ProviderHealthInstrumentation;
private logger: winston.Logger = winston.child({ module: "RateLimitedSolanaSlotRepository" });
constructor(options: InstrumentedHttpProviderOptions) {
options?.initialDelay && (this.initialDelay = options.initialDelay);
options?.maxDelay && (this.maxDelay = options.maxDelay);
@ -32,25 +34,37 @@ export class InstrumentedHttpProvider {
this.health = new ProviderHealthInstrumentation(this.timeout, options.chain);
}
public async post<T>(body: any, opts?: HttpClientOptions): Promise<T> {
return this.executeWithRetry("POST", body, opts);
public async post<T>(chain: string, body: any, opts?: HttpClientOptions): Promise<T> {
return this.execute(chain, "POST", body, opts);
}
private async execute<T>(method: string, body?: any, opts?: HttpClientOptions): Promise<T> {
private async execute<T>(
chain: string,
method: string,
body?: any,
opts?: HttpClientOptions
): Promise<T> {
let response;
try {
response = await this.health.fetch(this.url, {
method: method,
body: JSON.stringify(body),
signal: AbortSignal.timeout(opts?.timeout ?? this.timeout),
headers: {
"Content-Type": "application/json",
},
});
} catch (err: AxiosError | any) {
} catch (e: AxiosError | any) {
this.logger.error(
`[${chain}][${body?.method}] Got error from ${this.url} rpc. ${e?.message ?? `${e}`}`
);
// Connection / timeout error:
if (err instanceof AxiosError) {
throw new HttpClientError(err.message ?? err.code, { status: err?.status ?? 0 }, err);
if (e instanceof AxiosError) {
throw new HttpClientError(e.message ?? e.code, { status: e?.status ?? 0 }, e);
}
throw new HttpClientError(err.message ?? err.code, undefined, err);
throw new HttpClientError(e.message ?? e.code, undefined, e);
}
if (!(response.status > 200) && !(response.status < 300)) {
@ -59,39 +73,6 @@ export class InstrumentedHttpProvider {
return response.json() as T;
}
private async executeWithRetry<T>(
method: string,
body?: any,
opts?: HttpClientOptions
): Promise<T> {
const maxRetries = opts?.retries ?? this.retries;
let retries = 0;
const initialDelay = opts?.initialDelay ?? this.initialDelay;
const maxDelay = opts?.maxDelay ?? this.maxDelay;
while (maxRetries >= 0) {
try {
return await this.execute(method, body, opts);
} catch (err) {
if (err instanceof HttpClientError) {
if (retries < maxRetries) {
const retryAfter = err.getRetryAfter(maxDelay, err);
if (retryAfter) {
await setTimeout(retryAfter, { ref: false });
} else {
const timeout = Math.min(initialDelay * 2 ** maxRetries, maxDelay);
await setTimeout(timeout, { ref: false });
}
retries++;
continue;
}
}
throw err;
}
}
throw new Error(`Failed to reach ${this.url}`);
}
}
export type HttpClientOptions = {

View File

@ -1,20 +1,16 @@
import { mockRpcPool } from "../../mocks/mockRpcPool";
mockRpcPool();
import { MoonbeamEvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories/evm/MoonbeamEvmJsonRPCBlockRepository";
import { RateLimitedEvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories/evm/RateLimitedEvmJsonRPCBlockRepository";
import { RateLimitedSuiJsonRPCBlockRepository } from "../../../src/infrastructure/repositories/sui/RateLimitedSuiJsonRPCBlockRepository";
import { describe, expect, it } from "@jest/globals";
import { RepositoriesBuilder } from "../../../src/infrastructure/repositories/RepositoriesBuilder";
import { configMock } from "../../mocks/configMock";
import {
ArbitrumEvmJsonRPCBlockRepository,
BscEvmJsonRPCBlockRepository,
EvmJsonRPCBlockRepository,
FileMetadataRepository,
PolygonJsonRPCBlockRepository,
PromStatRepository,
RateLimitedSolanaSlotRepository,
SnsEventRepository,
SuiJsonRPCBlockRepository,
} from "../../../src/infrastructure/repositories";
describe("RepositoriesBuilder", () => {
@ -45,38 +41,62 @@ describe("RepositoriesBuilder", () => {
const job = repos.getJobsRepository();
expect(job).toBeTruthy();
expect(repos.getEvmBlockRepository("ethereum")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("ethereum-sepolia")).toBeInstanceOf(
EvmJsonRPCBlockRepository
expect(repos.getEvmBlockRepository("ethereum")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("ethereum-sepolia")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("bsc")).toBeInstanceOf(RateLimitedEvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("polygon")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("avalanche")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("oasis")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("fantom")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("karura")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("acala")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("klaytn")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("celo")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("bsc")).toBeInstanceOf(BscEvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("polygon")).toBeInstanceOf(PolygonJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("avalanche")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("oasis")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("fantom")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("karura")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("acala")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("klaytn")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("celo")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("arbitrum")).toBeInstanceOf(
ArbitrumEvmJsonRPCBlockRepository
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("arbitrum-sepolia")).toBeInstanceOf(
ArbitrumEvmJsonRPCBlockRepository
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("moonbeam")).toBeInstanceOf(
MoonbeamEvmJsonRPCBlockRepository
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("optimism")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("optimism")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("optimism-sepolia")).toBeInstanceOf(
EvmJsonRPCBlockRepository
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("base")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("base-sepolia")).toBeInstanceOf(
RateLimitedEvmJsonRPCBlockRepository
);
expect(repos.getEvmBlockRepository("base")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getEvmBlockRepository("base-sepolia")).toBeInstanceOf(EvmJsonRPCBlockRepository);
expect(repos.getMetadataRepository()).toBeInstanceOf(FileMetadataRepository);
expect(repos.getSnsEventRepository()).toBeInstanceOf(SnsEventRepository);
expect(repos.getStatsRepository()).toBeInstanceOf(PromStatRepository);
expect(repos.getSolanaSlotRepository()).toBeInstanceOf(RateLimitedSolanaSlotRepository);
expect(repos.getSuiRepository()).toBeInstanceOf(SuiJsonRPCBlockRepository);
expect(repos.getSuiRepository()).toBeInstanceOf(RateLimitedSuiJsonRPCBlockRepository);
});
});