Implement rpc pool library (#1085)

* Add rpc-pool dependency and make the minimum changes to make the codebase compatible with it

* Implement rpc provider pools for EVMs and Sui

* Implement rpc provider pools for Solana

* Adapt tests to new repo interfaces and constructors

* Fix tests

* Modify gh action to use the xlabs registry

* Mock rpc-pool module for tests

* Set up husky and pre commit hook to run prettier

* Bump rpc-pool

* Expand default mainnet rpc configs

* Add link-staged to run prettier for staged files

* prettier config

* Address PR comments

* Add bsc rpc node

* fix rpc configs
This commit is contained in:
Martin Picco 2024-02-07 16:50:05 -03:00 committed by GitHub
parent c8665d68f5
commit 59705a5bad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1884 additions and 251 deletions

View File

@ -20,12 +20,16 @@ jobs:
- uses: actions/setup-node@v3
with:
node-version: 18
registry-url: https://npm.pkg.github.com
scope: '@xlabs'
cache: "npm"
cache-dependency-path: |
./blockchain-watcher/package-lock.json
- name: npm ci
run: npm ci
working-directory: ./blockchain-watcher
env:
NODE_AUTH_TOKEN: ${{ secrets.XLABS_TOKEN }}
- name: typecheck
run: npm run build
working-directory: ./blockchain-watcher

View File

@ -3,4 +3,6 @@ lib
coverage
metadata-repo
config/dev.json
config/dev.json
.ignore

View File

@ -0,0 +1,4 @@
#!/usr/bin/env sh
cd blockchain-watcher
npx lint-staged

View File

@ -0,0 +1 @@
@xlabs:registry=https://npm.pkg.github.com

View File

@ -22,6 +22,7 @@
"network": "devnet",
"chainId": 1,
"rpcs": ["https://api.devnet.solana.com"],
"commitment": "finalized",
"timeout": 10000,
"rateLimit": {
"period": 9000,
@ -39,7 +40,12 @@
"name": "bsc",
"network": "BNB Smart Chain testnet",
"chainId": 4,
"rpcs": ["https://endpoints.omniatech.io/v1/bsc/testnet/public"],
"rpcs": [
"https://data-seed-prebsc-1-s1.bnbchain.org:8545",
"https://data-seed-prebsc-2-s2.bnbchain.org:8545",
"https://data-seed-prebsc-2-s1.bnbchain.org:8545",
"https://data-seed-prebsc-2-s1.bnbchain.org:8545"
],
"timeout": 10000
},
"polygon": {

View File

@ -7,19 +7,35 @@
},
"ethereum": {
"network": "mainnet",
"rpcs": ["https://rpc.notadegen.com/eth"]
"rpcs": [
"https://rpc.notadegen.com/eth",
"https://rpc.flashbots.net/fast",
"https://rpc.notadegen.com/eth",
"https://api.securerpc.com/v1",
"https://rpc.mevblocker.io/noreverts"
]
},
"bsc": {
"network": "mainnet",
"rpcs": ["https://bscrpc.com"]
"rpcs": [
"https://bscrpc.com",
"https://bsc-dataseed1.defibit.io",
"https://bsc-dataseed3.defibit.io",
"https://bsc-dataseed3.bnbchain.org",
"https://bsc-dataseed.bnbchain.org"
]
},
"polygon": {
"network": "mainnet",
"rpcs": ["https://rpc-mainnet.matic.quiknode.pro"]
"rpcs": [
"https://rpc-mainnet.matic.quiknode.pro",
"https://polygon-rpc.com",
"https://rpc-mainnet.maticvigil.com"
]
},
"avalanche": {
"network": "mainnet",
"rpcs": ["https://api.avax.network/ext/bc/C/rpc"]
"rpcs": ["https://api.avax.network/ext/bc/C/rpc", "https://avalanche.public-rpc.com"]
},
"oasis": {
"network": "mainnet",
@ -27,19 +43,32 @@
},
"fantom": {
"network": "mainnet",
"rpcs": ["https://fantom.publicnode.com"]
"rpcs": [
"https://fantom.publicnode.com",
"https://rpc.fantom.network",
"https://rpc2.fantom.network",
"https://rpc.ftm.tools",
"https://rpcapi.fantom.network"
]
},
"karura": {
"network": "mainnet",
"rpcs": ["https://eth-rpc-karura.aca-api.network"]
"rpcs": [
"https://eth-rpc-karura.aca-api.network",
"https://eth-rpc-karura.aca-staging.network",
"https://rpc.evm.karura.network"
]
},
"acala": {
"network": "mainnet",
"rpcs": ["https://eth-rpc-acala.aca-api.network"]
"rpcs": ["https://eth-rpc-acala.aca-api.network", "https://rpc.evm.acala.network"]
},
"klaytn": {
"network": "mainnet",
"rpcs": ["https://klaytn-mainnet-rpc.allthatnode.com:8551"]
"rpcs": [
"https://klaytn-mainnet-rpc.allthatnode.com:8551",
"https://public-en-cypress.klaytn.net"
]
},
"celo": {
"network": "mainnet",
@ -51,7 +80,11 @@
},
"arbitrum": {
"network": "mainnet",
"rpcs": ["https://arb1.arbitrum.io/rpc"]
"rpcs": [
"https://arb1.arbitrum.io/rpc",
"https://arbitrum.blockpi.network/v1/rpc/public",
"https://arbitrum-one.public.blastapi.io"
]
},
"optimism": {
"network": "mainnet",
@ -59,7 +92,7 @@
},
"base": {
"network": "mainnet",
"rpcs": ["https://base.publicnode.com"]
"rpcs": ["https://base.publicnode.com", "https://mainnet.base.org"]
},
"sui": {
"network": "mainnet",

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,7 @@
"version": "0.0.5",
"description": "A process for watching blockchain events and moving them to persistent storage",
"main": "index.js",
"type": "module",
"scripts": {
"start": "node lib/start.js",
"start:ncc": "node lib/index.js",
@ -10,10 +11,11 @@
"test:coverage": "jest --collectCoverage=true",
"build": "tsc",
"build:ncc": "ncc build src/start.ts -o lib",
"dev": "ts-node src/start.ts",
"dev:debug:testnet": "ts-node src/start.ts start --debug --watch",
"dev:debug:mainnet": "NODE_ENV=mainnet ts-node src/start.ts start --debug --watch",
"prettier": "prettier --write ."
"dev": "npx tsx src/start.ts",
"dev:debug:testnet": "npx tsx src/start.ts start --debug --watch",
"dev:debug:mainnet": "NODE_ENV=mainnet npx tsx src/start.ts start --debug --watch",
"prettier": "prettier --write .",
"prepare": "cd .. && husky blockchain-watcher/.husky"
},
"author": "chase-45",
"license": "ISC",
@ -21,6 +23,7 @@
"@aws-sdk/client-sns": "^3.445.0",
"@certusone/wormhole-sdk": "0.10.5",
"@mysten/sui.js": "^0.49.1",
"@xlabs/rpc-pool": "^0.0.3",
"axios": "^1.6.0",
"bs58": "^5.0.0",
"config": "^3.3.9",
@ -37,7 +40,9 @@
"@types/node": "^20.11.5",
"@types/ws": "^8.5.10",
"@vercel/ncc": "^0.38.1",
"husky": "^9.0.10",
"jest": "^29.7.0",
"lint-staged": "^15.2.2",
"nock": "^13.3.8",
"prettier": "^2.8.7",
"ts-jest": "^29.1.1",
@ -82,5 +87,8 @@
"lines": 74
}
}
},
"lint-staged": {
"**/*": "prettier --workspaces --if-present --write --ignore-unknown"
}
}

View File

@ -1,9 +1,9 @@
import { decode } from "bs58";
import { Connection, Commitment } from "@solana/web3.js";
import { getPostedMessage } from "@certusone/wormhole-sdk/lib/cjs/solana/wormhole";
import { solana, LogFoundEvent, LogMessagePublished } from "../../../domain/entities";
import { CompiledInstruction, MessageCompiledInstruction } from "../../../domain/entities/solana";
import { configuration } from "../../config";
import bs58 from "bs58";
import winston from "winston";
const connection = new Connection(configuration.chains.solana.rpcs[0]); // TODO: should be better to inject this to improve testability
@ -87,7 +87,7 @@ const normalizeCompileInstruction = (
if ("accounts" in instruction) {
return {
accountKeyIndexes: instruction.accounts,
data: decode(instruction.data),
data: bs58.decode(instruction.data),
programIdIndex: instruction.programIdIndex,
};
} else {

View File

@ -4,8 +4,8 @@ import { Protocol, contractsMapperConfig } from "../contractsMapper";
import { Connection, Commitment } from "@solana/web3.js";
import { getPostedMessage } from "@certusone/wormhole-sdk/lib/cjs/solana/wormhole";
import { configuration } from "../../config";
import { decode } from "bs58";
import winston from "winston";
import bs58 from "bs58";
let logger: winston.Logger;
logger = winston.child({ module: "solanaTransferRedeemedMapper" });
@ -96,7 +96,7 @@ const normalizeCompileInstruction = (
if ("accounts" in instruction) {
return {
accountKeyIndexes: instruction.accounts,
data: decode(instruction.data),
data: bs58.decode(instruction.data),
programIdIndex: instruction.programIdIndex,
};
} else {

View File

@ -1,23 +1,29 @@
import { SNSClient, SNSClientConfig } from "@aws-sdk/client-sns";
import { Connection } from "@solana/web3.js";
import { Config } from "../config";
import {
SnsEventRepository,
InstrumentedConnection,
InstrumentedSuiClient,
RpcConfig,
providerPoolSupplier,
} from "@xlabs/rpc-pool";
import {
ArbitrumEvmJsonRPCBlockRepository,
BscEvmJsonRPCBlockRepository,
EvmJsonRPCBlockRepository,
EvmJsonRPCBlockRepositoryCfg,
FileMetadataRepository,
PromStatRepository,
StaticJobRepository,
Web3SolanaSlotRepository,
RateLimitedSolanaSlotRepository,
BscEvmJsonRPCBlockRepository,
ArbitrumEvmJsonRPCBlockRepository,
PolygonJsonRPCBlockRepository,
MoonbeamEvmJsonRPCBlockRepository,
PolygonJsonRPCBlockRepository,
PromStatRepository,
ProviderPoolMap,
RateLimitedSolanaSlotRepository,
SnsEventRepository,
StaticJobRepository,
SuiJsonRPCBlockRepository,
Web3SolanaSlotRepository,
} from ".";
import { HttpClient } from "../rpc/http/HttpClient";
import { JobRepository, SuiRepository } from "../../domain/repositories";
import { Config } from "../config";
import { InstrumentedHttpProvider } from "../rpc/http/InstrumentedHttpProvider";
const SOLANA_CHAIN = "solana";
const EVM_CHAIN = "evm";
@ -43,6 +49,8 @@ const EVM_CHAINS = new Map([
]);
const SUI_CHAIN = "sui";
const POOL_STRATEGY = "weighted";
export class RepositoriesBuilder {
private cfg: Config;
private snsClient?: SNSClient;
@ -64,44 +72,49 @@ export class RepositoriesBuilder {
this.cfg.enabledPlatforms.forEach((chain) => {
if (chain === SOLANA_CHAIN) {
const solanaProviderPool = providerPoolSupplier(
this.cfg.chains[chain].rpcs.map((url) => ({ url })),
(rpcCfg: RpcConfig) =>
new InstrumentedConnection(rpcCfg.url, {
commitment: rpcCfg.commitment || "confirmed",
}),
POOL_STRATEGY
);
const cfg = this.cfg.chains[chain];
const solanaSlotRepository = new RateLimitedSolanaSlotRepository(
new Web3SolanaSlotRepository(
new Connection(cfg.rpcs[0], { disableRetryOnRateLimit: true })
),
new Web3SolanaSlotRepository(solanaProviderPool),
cfg.rateLimit
);
this.repositories.set("solana-slotRepo", solanaSlotRepository);
}
if (chain === EVM_CHAIN) {
const httpClient = this.createHttpClient();
const pools = this.createEvmProviderPools();
const repoCfg: EvmJsonRPCBlockRepositoryCfg = {
chains: this.cfg.chains,
};
this.repositories.set("bsc-evmRepo", new BscEvmJsonRPCBlockRepository(repoCfg, httpClient));
this.repositories.set("evmRepo", new EvmJsonRPCBlockRepository(repoCfg, httpClient));
this.repositories.set(
"polygon-evmRepo",
new PolygonJsonRPCBlockRepository(repoCfg, httpClient)
);
this.repositories.set("bsc-evmRepo", new BscEvmJsonRPCBlockRepository(repoCfg, pools));
this.repositories.set("evmRepo", new EvmJsonRPCBlockRepository(repoCfg, pools));
this.repositories.set("polygon-evmRepo", new PolygonJsonRPCBlockRepository(repoCfg, pools));
this.repositories.set(
"moonbeam-evmRepo",
new MoonbeamEvmJsonRPCBlockRepository(repoCfg, httpClient)
new MoonbeamEvmJsonRPCBlockRepository(repoCfg, pools)
);
this.repositories.set(
"arbitrum-evmRepo",
new ArbitrumEvmJsonRPCBlockRepository(repoCfg, httpClient, this.getMetadataRepository())
new ArbitrumEvmJsonRPCBlockRepository(repoCfg, pools, this.getMetadataRepository())
);
}
if (chain === SUI_CHAIN) {
this.repositories.set(
"sui-repo",
new SuiJsonRPCBlockRepository({
rpc: this.cfg.chains[chain].rpcs[0],
})
const suiProviderPool = providerPoolSupplier(
this.cfg.chains[chain].rpcs.map((url) => ({ url })),
(rpcCfg: RpcConfig) => new InstrumentedSuiClient(rpcCfg.url, 2000),
POOL_STRATEGY
);
this.repositories.set("sui-repo", new SuiJsonRPCBlockRepository(suiProviderPool));
}
});
@ -177,8 +190,23 @@ export class RepositoriesBuilder {
return new SNSClient(snsCfg);
}
private createHttpClient(): HttpClient {
return new HttpClient({
private createEvmProviderPools(): ProviderPoolMap {
let pools: ProviderPoolMap = {};
for (const chain in this.cfg.chains) {
const cfg = this.cfg.chains[chain];
pools[chain] = providerPoolSupplier(
cfg.rpcs.map((url) => ({ url })),
(rpcCfg: RpcConfig) => this.createHttpClient(chain, rpcCfg.url),
POOL_STRATEGY
);
}
return pools;
}
private createHttpClient(chain: string, url: string): InstrumentedHttpProvider {
return new InstrumentedHttpProvider({
chain,
url,
retries: 3,
timeout: 1_0000,
initialDelay: 1_000,

View File

@ -1,11 +1,11 @@
import { EvmTag } from "../../../domain/entities";
import { MetadataRepository } from "../../../domain/repositories";
import { HttpClientError } from "../../errors/HttpClientError";
import { HttpClient } from "../../rpc/http/HttpClient";
import { EvmTag } from "../../../domain/entities";
import winston from "../../log";
import {
EvmJsonRPCBlockRepository,
EvmJsonRPCBlockRepositoryCfg,
ProviderPoolMap,
} from "./EvmJsonRPCBlockRepository";
const FINALIZED = "finalized";
@ -19,10 +19,10 @@ export class ArbitrumEvmJsonRPCBlockRepository extends EvmJsonRPCBlockRepository
constructor(
cfg: EvmJsonRPCBlockRepositoryCfg,
httpClient: HttpClient,
pools: ProviderPoolMap,
metadataRepo: MetadataRepository<any>
) {
super(cfg, httpClient);
super(cfg, pools);
this.metadataRepo = metadataRepo;
this.latestL2Finalized = 0;
this.latestEthTime = 0;
@ -35,8 +35,7 @@ export class ArbitrumEvmJsonRPCBlockRepository extends EvmJsonRPCBlockRepository
try {
// This gets the latest L2 block so we can get the associated L1 block number
response = await this.httpClient.post<typeof response>(
chainCfg.rpc.href,
response = await this.getChainProvider(chain).post<typeof response>(
{
jsonrpc: "2.0",
id: 1,

View File

@ -1,13 +1,13 @@
import { EvmTag } from "../../../domain/entities";
import { HttpClient } from "../../rpc/http/HttpClient";
import {
EvmJsonRPCBlockRepository,
EvmJsonRPCBlockRepositoryCfg,
ProviderPoolMap,
} from "./EvmJsonRPCBlockRepository";
export class BscEvmJsonRPCBlockRepository extends EvmJsonRPCBlockRepository {
constructor(cfg: EvmJsonRPCBlockRepositoryCfg, httpClient: HttpClient) {
super(cfg, httpClient);
constructor(cfg: EvmJsonRPCBlockRepositoryCfg, pools: ProviderPoolMap) {
super(cfg, pools);
}
async getBlockHeight(chain: string, finality: EvmTag): Promise<bigint> {

View File

@ -7,10 +7,11 @@ import {
} from "../../../domain/entities";
import { EvmBlockRepository } from "../../../domain/repositories";
import winston from "../../log";
import { HttpClient } from "../../rpc/http/HttpClient";
import { InstrumentedHttpProvider } from "../../rpc/http/InstrumentedHttpProvider";
import { HttpClientError } from "../../errors/HttpClientError";
import { ChainRPCConfig } from "../../config";
import { divideIntoBatches } from "../common/utils";
import { ProviderPool } from "@xlabs/rpc-pool";
/**
* EvmJsonRPCBlockRepository is a repository that uses a JSON RPC endpoint to fetch blocks.
@ -20,14 +21,19 @@ import { divideIntoBatches } from "../common/utils";
const HEXADECIMAL_PREFIX = "0x";
const TX_BATCH_SIZE = 10;
export type ProviderPoolMap = Record<string, ProviderPool<InstrumentedHttpProvider>>;
export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
protected httpClient: HttpClient;
protected pool: ProviderPoolMap;
protected cfg: EvmJsonRPCBlockRepositoryCfg;
protected readonly logger;
constructor(cfg: EvmJsonRPCBlockRepositoryCfg, httpClient: HttpClient) {
this.httpClient = httpClient;
constructor(
cfg: EvmJsonRPCBlockRepositoryCfg,
pool: Record<string, ProviderPool<InstrumentedHttpProvider>>
) {
this.cfg = cfg;
this.pool = pool;
this.logger = winston.child({ module: "EvmJsonRPCBlockRepository" });
this.logger.info(`Created for ${Object.keys(this.cfg.chains)}`);
@ -66,7 +72,7 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
let results: (undefined | ResultBlocks)[] = [];
try {
results = await this.httpClient.post<typeof results>(chainCfg.rpc.href, reqs, {
results = await this.getChainProvider(chain).post<typeof results>(reqs, {
timeout: chainCfg.timeout,
retries: chainCfg.retries,
});
@ -151,8 +157,7 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
const chainCfg = this.getCurrentChain(chain);
let response: { result: Log[]; error?: ErrorBlock };
try {
response = await this.httpClient.post<typeof response>(
chainCfg.rpc.href,
response = await this.getChainProvider(chain).post<typeof response>(
{
jsonrpc: "2.0",
method: "eth_getLogs",
@ -203,8 +208,7 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
const chainCfg = this.getCurrentChain(chain);
let response: { result?: EvmBlock; error?: ErrorBlock };
try {
response = await this.httpClient.post<typeof response>(
chainCfg.rpc.href,
response = await this.getChainProvider(chain).post<typeof response>(
{
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
@ -267,7 +271,7 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}
try {
results = await this.httpClient.post<typeof results>(chainCfg.rpc.href, reqs, {
results = await this.getChainProvider(chain).post<typeof results>(reqs, {
timeout: chainCfg.timeout,
retries: chainCfg.retries,
});
@ -336,6 +340,14 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}
}
protected getChainProvider(chain: string): InstrumentedHttpProvider {
const pool = this.pool[chain];
if (!pool) {
throw new Error(`No provider pool configured for chain ${chain}`);
}
return pool.get();
}
protected getCurrentChain(chain: string) {
const cfg = this.cfg.chains[chain];
return {

View File

@ -1,10 +1,10 @@
import { HttpClient } from "../../rpc/http/HttpClient";
import { setTimeout } from "timers/promises";
import { EvmTag } from "../../../domain/entities";
import winston from "../../log";
import {
EvmJsonRPCBlockRepository,
EvmJsonRPCBlockRepositoryCfg,
ProviderPoolMap,
} from "./EvmJsonRPCBlockRepository";
const GROW_SLEEP_TIME = 350;
@ -13,8 +13,8 @@ const MAX_ATTEMPTS = 10;
export class MoonbeamEvmJsonRPCBlockRepository extends EvmJsonRPCBlockRepository {
override readonly logger = winston.child({ module: "MoonbeamEvmJsonRPCBlockRepository" });
constructor(cfg: EvmJsonRPCBlockRepositoryCfg, httpClient: HttpClient) {
super(cfg, httpClient);
constructor(cfg: EvmJsonRPCBlockRepositoryCfg, pools: ProviderPoolMap) {
super(cfg, pools);
}
async getBlockHeight(chain: string, finality: EvmTag): Promise<bigint> {
@ -31,8 +31,7 @@ export class MoonbeamEvmJsonRPCBlockRepository extends EvmJsonRPCBlockRepository
const { hash } = await super.getBlock(chain, blockNumber);
const { result } = await this.httpClient.post<BlockIsFinalizedResult>(
chainCfg.rpc.href,
const { result } = await this.getChainProvider(chain).post<BlockIsFinalizedResult>(
{
jsonrpc: "2.0",
id: 1,

View File

@ -1,18 +1,17 @@
import { BytesLike, ethers } from "ethers";
import { EvmTag } from "../../../domain/entities";
import { HttpClient } from "../../rpc/http/HttpClient";
import {
EvmJsonRPCBlockRepository,
EvmJsonRPCBlockRepositoryCfg,
ProviderPoolMap,
} from "./EvmJsonRPCBlockRepository";
const POLYGON_ROOT_CHAIN_ADDRESS = "0x86E4Dc95c7FBdBf52e33D563BbDB00823894C287";
const FINALIZED = "finalized";
const ETHEREUM = "ethereum";
export class PolygonJsonRPCBlockRepository extends EvmJsonRPCBlockRepository {
constructor(cfg: EvmJsonRPCBlockRepositoryCfg, httpClient: HttpClient) {
super(cfg, httpClient);
constructor(cfg: EvmJsonRPCBlockRepositoryCfg, pools: ProviderPoolMap) {
super(cfg, pools);
}
async getBlockHeight(chain: string, finality: EvmTag): Promise<bigint> {
@ -23,17 +22,14 @@ export class PolygonJsonRPCBlockRepository extends EvmJsonRPCBlockRepository {
]);
const callData = rootChain.encodeFunctionData("getLastChildBlock");
const callResult: CallResult[] = await this.httpClient.post(
this.cfg.chains[ETHEREUM].rpcs[0],
[
{
jsonrpc: "2.0",
id: 1,
method: "eth_call",
params: [{ to: POLYGON_ROOT_CHAIN_ADDRESS, data: callData }, FINALIZED],
},
]
);
const callResult: CallResult[] = await this.getChainProvider(chain).post([
{
jsonrpc: "2.0",
id: 1,
method: "eth_call",
params: [{ to: POLYGON_ROOT_CHAIN_ADDRESS, data: callData }, FINALIZED],
},
]);
const block = rootChain.decodeFunctionResult("getLastChildBlock", callResult[0].result)[0];
return BigInt(block);

View File

@ -1,32 +1,25 @@
import {
Commitment,
Connection,
Finality,
PublicKey,
VersionedTransactionResponse,
SolanaJSONRPCError,
VersionedTransactionResponse,
} from "@solana/web3.js";
import { InstrumentedConnection, ProviderPool } from "@xlabs/rpc-pool";
import { solana } from "../../../domain/entities";
import { SolanaSlotRepository } from "../../../domain/repositories";
import { Fallible, SolanaFailure } from "../../../domain/errors";
import winston from "../../../infrastructure/log";
import { SolanaSlotRepository } from "../../../domain/repositories";
export class Web3SolanaSlotRepository implements SolanaSlotRepository {
private connection: Connection;
private logger: winston.Logger = winston.child({ module: "Web3SolanaSlotRepository" });
constructor(connection: Connection) {
this.connection = connection;
this.logger.info(`Using RPC node ${new URL(connection.rpcEndpoint).hostname}`);
}
constructor(private readonly pool: ProviderPool<InstrumentedConnection>) {}
getLatestSlot(commitment: string): Promise<number> {
return this.connection.getSlot(commitment as Commitment);
return this.pool.get().getSlot(commitment as Commitment);
}
getBlock(slot: number, finality?: string): Promise<Fallible<solana.Block, SolanaFailure>> {
return this.connection
return this.pool
.get()
.getBlock(slot, {
maxSupportedTransactionVersion: 0,
commitment: this.normalizeFinality(finality),
@ -58,7 +51,7 @@ export class Web3SolanaSlotRepository implements SolanaSlotRepository {
limit: number,
finality?: string
): Promise<solana.ConfirmedSignatureInfo[]> {
return this.connection.getSignaturesForAddress(
return this.pool.get().getSignaturesForAddress(
new PublicKey(address),
{
limit: limit,
@ -73,7 +66,7 @@ export class Web3SolanaSlotRepository implements SolanaSlotRepository {
sigs: solana.ConfirmedSignatureInfo[],
finality?: string
): Promise<solana.Transaction[]> {
const txs = await this.connection.getTransactions(
const txs = await this.pool.get().getTransactions(
sigs.map((sig) => sig.signature),
{ maxSupportedTransactionVersion: 0, commitment: this.normalizeFinality(finality) }
);

View File

@ -1,10 +1,10 @@
import {
Checkpoint,
SuiClient,
SuiEventFilter,
SuiTransactionBlockResponse,
TransactionFilter,
} from "@mysten/sui.js/client";
import { InstrumentedSuiClient, ProviderPool } from "@xlabs/rpc-pool";
import winston from "winston";
import { Range } from "../../../domain/entities";
import { SuiTransactionBlockReceipt } from "../../../domain/entities/sui";
@ -15,18 +15,15 @@ const QUERY_MAX_RESULT_LIMIT_CHECKPOINTS = 100;
const TX_BATCH_SIZE = 50;
export class SuiJsonRPCBlockRepository implements SuiRepository {
private readonly client: SuiClient;
private readonly logger: winston.Logger;
constructor(private readonly cfg: SuiJsonRPCBlockRepositoryConfig) {
this.client = new SuiClient({ url: this.cfg.rpc });
constructor(private readonly pool: ProviderPool<InstrumentedSuiClient>) {
this.logger = winston.child({ module: "SuiJsonRPCBlockRepository" });
this.logger.info(`[sui] Using RPC node ${this.cfg.rpc}`);
}
async getLastCheckpointNumber(): Promise<bigint> {
try {
const res = await this.client.getLatestCheckpointSequenceNumber();
const res = await this.pool.get().getLatestCheckpointSequenceNumber();
return BigInt(res);
} catch (e) {
this.handleError(e, "getLatestCheckpointNumber");
@ -46,7 +43,7 @@ export class SuiJsonRPCBlockRepository implements SuiRepository {
for (const batch of batches) {
let res;
try {
res = await this.client.getCheckpoints({
res = await this.pool.get().getCheckpoints({
cursor: (BigInt(Array.from(batch)[0]) - 1n).toString(),
descendingOrder: false,
limit: Math.min(count, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS),
@ -71,7 +68,7 @@ export class SuiJsonRPCBlockRepository implements SuiRepository {
for (const batch of batches) {
let res;
try {
res = await this.client.multiGetTransactionBlocks({
res = await this.pool.get().multiGetTransactionBlocks({
digests: Array.from(batch),
options: { showEvents: true, showInput: true, showEffects: true },
});
@ -106,7 +103,7 @@ export class SuiJsonRPCBlockRepository implements SuiRepository {
): Promise<SuiTransactionBlockReceipt[]> {
let response;
try {
response = await this.client.queryTransactionBlocks({
response = await this.pool.get().queryTransactionBlocks({
filter,
order: "ascending",
cursor,
@ -130,7 +127,7 @@ export class SuiJsonRPCBlockRepository implements SuiRepository {
): Promise<SuiTransactionBlockReceipt[]> {
let response;
try {
response = await this.client.queryEvents({
response = await this.pool.get().queryEvents({
query,
order: "ascending",
cursor: cursor ? { txDigest: cursor, eventSeq: "0" } : undefined,
@ -148,7 +145,7 @@ export class SuiJsonRPCBlockRepository implements SuiRepository {
async getCheckpoint(id: string | bigint | number): Promise<Checkpoint> {
try {
return this.client.getCheckpoint({ id: id.toString() });
return this.pool.get().getCheckpoint({ id: id.toString() });
} catch (e) {
this.handleError(e, "getCheckpoint");
throw e;
@ -161,10 +158,6 @@ export class SuiJsonRPCBlockRepository implements SuiRepository {
}
private handleError(e: any, method: string) {
this.logger.error(`[sui] Error calling ${method}: ${e.message} (rpc ${this.cfg.rpc})`);
this.logger.error(`[sui] Error calling ${method}: ${e.message}`);
}
}
export type SuiJsonRPCBlockRepositoryConfig = {
rpc: string;
};

View File

@ -1,42 +1,47 @@
import axios, { AxiosError, AxiosInstance } from "axios";
import { ProviderHealthInstrumentation } from "@xlabs/rpc-pool";
import { AxiosError } from "axios";
import { setTimeout } from "timers/promises";
import { HttpClientError } from "../../errors/HttpClientError";
// make url and chain required
type InstrumentedHttpProviderOptions = Required<Pick<HttpClientOptions, "url" | "chain">> &
HttpClientOptions;
/**
* A simple HTTP client with exponential backoff retries and 429 handling.
*/
export class HttpClient {
export class InstrumentedHttpProvider {
private initialDelay: number = 1_000;
private maxDelay: number = 60_000;
private retries: number = 0;
private timeout: number = 5_000;
private axios: AxiosInstance;
private url: string;
health: ProviderHealthInstrumentation;
constructor(options?: HttpClientOptions) {
constructor(options: InstrumentedHttpProviderOptions) {
options?.initialDelay && (this.initialDelay = options.initialDelay);
options?.maxDelay && (this.maxDelay = options.maxDelay);
options?.retries && (this.retries = options.retries);
options?.timeout && (this.timeout = options.timeout);
this.axios = axios.create();
if (!options.url) throw new Error("URL is required");
this.url = options.url;
if (!options.chain) throw new Error("Chain is required");
this.health = new ProviderHealthInstrumentation(this.timeout, options.chain);
}
public async post<T>(url: string, body: any, opts?: HttpClientOptions): Promise<T> {
return this.executeWithRetry(url, "POST", body, opts);
public async post<T>(body: any, opts?: HttpClientOptions): Promise<T> {
return this.executeWithRetry("POST", body, opts);
}
private async execute<T>(
url: string,
method: string,
body?: any,
opts?: HttpClientOptions
): Promise<T> {
private async execute<T>(method: string, body?: any, opts?: HttpClientOptions): Promise<T> {
let response;
try {
response = await this.axios.request<T>({
url: url,
response = await this.health.fetch(this.url, {
method: method,
data: body,
timeout: opts?.timeout ?? this.timeout,
body: JSON.stringify(body),
signal: AbortSignal.timeout(opts?.timeout ?? this.timeout),
});
} catch (err: AxiosError | any) {
@ -49,14 +54,13 @@ export class HttpClient {
}
if (!(response.status > 200) && !(response.status < 300)) {
throw new HttpClientError(undefined, response, response.data);
throw new HttpClientError(undefined, response, response.json());
}
return response.data;
return response.json() as T;
}
private async executeWithRetry<T>(
url: string,
method: string,
body?: any,
opts?: HttpClientOptions
@ -67,7 +71,7 @@ export class HttpClient {
const maxDelay = opts?.maxDelay ?? this.maxDelay;
while (maxRetries >= 0) {
try {
return await this.execute(url, method, body, opts);
return await this.execute(method, body, opts);
} catch (err) {
if (err instanceof HttpClientError) {
if (retries < maxRetries) {
@ -86,11 +90,13 @@ export class HttpClient {
}
}
throw new Error(`Failed to reach ${url}`);
throw new Error(`Failed to reach ${this.url}`);
}
}
export type HttpClientOptions = {
chain?: string;
url?: string;
initialDelay?: number;
maxDelay?: number;
retries?: number;

View File

@ -1,11 +1,15 @@
import { mockRpcPool } from "../../mocks/mockRpcPool";
mockRpcPool();
import { describe, it, expect, afterEach, afterAll, jest } from "@jest/globals";
import { ArbitrumEvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories";
import { MetadataRepository } from "../../../src/domain/repositories";
import { HttpClient } from "../../../src/infrastructure/rpc/http/HttpClient";
import { InstrumentedHttpProvider } from "../../../src/infrastructure/rpc/http/InstrumentedHttpProvider";
import { EvmTag } from "../../../src/domain/entities/evm";
import axios from "axios";
import nock from "nock";
import fs from "fs";
import { FirstProviderPool } from "@xlabs/rpc-pool";
const dirPath = "./metadata-repo";
axios.defaults.adapter = "http"; // needed by nock
@ -87,7 +91,10 @@ const givenARepo = () => {
},
},
},
new HttpClient(),
{
ethereum: { get: () => new InstrumentedHttpProvider({ url: rpc, chain: "ethereum" }) },
arbitrum: { get: () => new InstrumentedHttpProvider({ url: rpc, chain: "arbitrum" }) },
} as any,
givenMetadataRepository([{ associatedL1Block: 18764852, l2BlockNumber: 157542621 }])
);
};

View File

@ -1,6 +1,9 @@
import { mockRpcPool } from "../../mocks/mockRpcPool";
mockRpcPool();
import { describe, it, expect, afterEach, afterAll } from "@jest/globals";
import { BscEvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories";
import { HttpClient } from "../../../src/infrastructure/rpc/http/HttpClient";
import { InstrumentedHttpProvider } from "../../../src/infrastructure/rpc/http/InstrumentedHttpProvider";
import { EvmTag } from "../../../src/domain/entities/evm";
import axios from "axios";
import nock from "nock";
@ -43,7 +46,7 @@ const givenARepo = () => {
bsc: { rpcs: [rpc], timeout: 100, name: bsc, network: "mainnet", chainId: 4 },
},
},
new HttpClient()
{ bsc: { get: () => new InstrumentedHttpProvider({ url: rpc, chain: "bsc" }) } } as any
);
};

View File

@ -1,9 +1,12 @@
import { mockRpcPool } from "../../mocks/mockRpcPool";
mockRpcPool();
import { describe, it, expect, afterEach, afterAll } from "@jest/globals";
import { EvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories";
import axios from "axios";
import nock from "nock";
import { EvmLogFilter, EvmTag } from "../../../src/domain/entities";
import { HttpClient } from "../../../src/infrastructure/rpc/http/HttpClient";
import { InstrumentedHttpProvider } from "../../../src/infrastructure/rpc/http/InstrumentedHttpProvider";
axios.defaults.adapter = "http"; // needed by nock
const eth = "ethereum";
@ -107,7 +110,9 @@ const givenARepo = () => {
ethereum: { rpcs: [rpc], timeout: 100, name: "ethereum", network: "mainnet", chainId: 2 },
},
},
new HttpClient()
{
ethereum: { get: () => new InstrumentedHttpProvider({ url: rpc, chain: "ethereum" }) },
} as any
);
};

View File

@ -1,6 +1,9 @@
import { mockRpcPool } from "../../mocks/mockRpcPool";
mockRpcPool();
import { describe, it, expect, afterEach, afterAll } from "@jest/globals";
import { MoonbeamEvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories";
import { HttpClient } from "../../../src/infrastructure/rpc/http/HttpClient";
import { InstrumentedHttpProvider } from "../../../src/infrastructure/rpc/http/InstrumentedHttpProvider";
import { EvmTag } from "../../../src/domain/entities";
import axios from "axios";
import nock from "nock";
@ -45,7 +48,9 @@ const givenARepo = () => {
moonbeam: { rpcs: [rpc], timeout: 100, name: moonbeam, network: "mainnet", chainId: 16 },
},
},
new HttpClient()
{
moonbeam: { get: () => new InstrumentedHttpProvider({ url: rpc, chain: "moonbeam" }) },
} as any
);
};

View File

@ -1,3 +1,6 @@
import { mockRpcPool } from "../../mocks/mockRpcPool";
mockRpcPool();
import { MoonbeamEvmJsonRPCBlockRepository } from "../../../src/infrastructure/repositories/evm/MoonbeamEvmJsonRPCBlockRepository";
import { describe, expect, it } from "@jest/globals";
import { RepositoriesBuilder } from "../../../src/infrastructure/repositories/RepositoriesBuilder";

View File

@ -1,10 +1,9 @@
import { afterAll, afterEach, describe, expect, it, jest } from "@jest/globals";
import { SuiClient } from "@mysten/sui.js/client";
import base58 from "bs58";
import { randomBytes } from "crypto";
import nock from "nock";
import { SuiJsonRPCBlockRepository } from "../../../src/infrastructure/repositories";
import { SuiClient, getFullnodeUrl } from "@mysten/sui.js/client";
import { count } from "console";
import { randomBytes } from "crypto";
import base58 from "bs58";
const rpc = "http://localhost";
let repo: SuiJsonRPCBlockRepository;
@ -86,10 +85,14 @@ describe("SuiJsonRPCBlockRepository", () => {
});
const givenARepo = () => {
repo = new SuiJsonRPCBlockRepository({ rpc });
const client = new SuiClient({ url: rpc });
const pool = {
get: () => client,
};
repo = new SuiJsonRPCBlockRepository(pool as any);
getTxsSpy = jest.spyOn((repo as any).client as SuiClient, "multiGetTransactionBlocks");
getCheckpointsSpy = jest.spyOn((repo as any).client as SuiClient, "getCheckpoints");
getTxsSpy = jest.spyOn(client, "multiGetTransactionBlocks");
getCheckpointsSpy = jest.spyOn(client, "getCheckpoints");
};
const givenLastCheckpointIs = (sequence: bigint) => {

View File

@ -10,7 +10,10 @@ describe("Web3SolanaSlotRepository", () => {
rpcEndpoint: "http://solanafake.com",
getSlot: () => Promise.resolve(100),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const poolMock = {
get: () => connectionMock,
};
const repository = new Web3SolanaSlotRepository(poolMock as any);
const latestSlot = await repository.getLatestSlot("finalized");
@ -53,7 +56,10 @@ describe("Web3SolanaSlotRepository", () => {
rpcEndpoint: "http://solanafake.com",
getBlock: (slot: number) => Promise.resolve(expected),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const poolMock = {
get: () => connectionMock,
};
const repository = new Web3SolanaSlotRepository(poolMock as any);
const block = (await repository.getBlock(100)).getValue();
@ -66,7 +72,10 @@ describe("Web3SolanaSlotRepository", () => {
rpcEndpoint: "http://solanafake.com",
getBlock: (slot: number) => Promise.resolve(null),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const poolMock = {
get: () => connectionMock,
};
const repository = new Web3SolanaSlotRepository(poolMock as any);
const block = await repository.getBlock(100);
@ -90,7 +99,10 @@ describe("Web3SolanaSlotRepository", () => {
rpcEndpoint: "http://solanafake.com",
getSignaturesForAddress: () => Promise.resolve(expected),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const poolMock = {
get: () => connectionMock,
};
const repository = new Web3SolanaSlotRepository(poolMock as any);
const signatures = await repository.getSignaturesForAddress(
"BTcueXFisZiqE49Ne2xTZjHV9bT5paVZhpKc1k4L3n1c",
@ -123,7 +135,10 @@ describe("Web3SolanaSlotRepository", () => {
rpcEndpoint: "http://solanafake.com",
getTransactions: (sigs: solana.ConfirmedSignatureInfo[]) => Promise.resolve(expected),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const poolMock = {
get: () => connectionMock,
};
const repository = new Web3SolanaSlotRepository(poolMock as any);
const transactions = await repository.getTransactions([
{

View File

@ -0,0 +1,42 @@
import { jest } from "@jest/globals";
import axios from "axios";
export class ProviderHealthInstrumentationMock {
fetch = async (input: string | URL | Request, init?: RequestInit) => {
const body = typeof init?.body === "string" ? JSON.parse(init.body) : init?.body;
const res = await axios.request({
url: input.toString(),
method: "POST",
data: body,
});
return {
status: 200,
json: () => res.data,
};
};
}
type RpcConfig = { url: string };
type PoolSupplier = <T>(
cfg: RpcConfig,
createProvider: (cfg: RpcConfig) => T,
type?: string
) => { get: () => T };
const providerPoolSupplier: PoolSupplier = <T>(
cfg: RpcConfig,
createProvider: (cfg: RpcConfig) => T,
type?: string
) => {
return {
get: () => createProvider(cfg),
};
};
export function mockRpcPool() {
jest.mock("@xlabs/rpc-pool", () => {
return {
ProviderHealthInstrumentation: ProviderHealthInstrumentationMock,
providerPoolSupplier,
};
});
}