diff --git a/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts b/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts index 9f89ac45..3d553e71 100644 --- a/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts +++ b/blockchain-watcher/src/infrastructure/repositories/RepositoriesBuilder.ts @@ -1,3 +1,4 @@ +import { InstrumentedSuiClient, ProviderPool, RpcConfig } from "@xlabs/rpc-pool"; import { RateLimitedWormchainJsonRPCBlockRepository } from "./wormchain/RateLimitedWormchainJsonRPCBlockRepository"; import { RateLimitedAlgorandJsonRPCBlockRepository } from "./algorand/RateLimitedAlgorandJsonRPCBlockRepository"; import { RateLimitedCosmosJsonRPCBlockRepository } from "./cosmos/RateLimitedCosmosJsonRPCBlockRepository"; @@ -7,6 +8,7 @@ import { RateLimitedEvmJsonRPCBlockRepository } from "./evm/RateLimitedEvmJsonRP import { RateLimitedSuiJsonRPCBlockRepository } from "./sui/RateLimitedSuiJsonRPCBlockRepository"; import { WormchainJsonRPCBlockRepository } from "./wormchain/WormchainJsonRPCBlockRepository"; import { AlgorandJsonRPCBlockRepository } from "./algorand/AlgorandJsonRPCBlockRepository"; +import { InstrumentedConnectionWrapper } from "../rpc/http/InstrumentedConnectionWrapper"; import { CosmosJsonRPCBlockRepository } from "./cosmos/CosmosJsonRPCBlockRepository"; import { extendedProviderPoolSupplier } from "../rpc/http/ProviderPoolDecorator"; import { AptosJsonRPCBlockRepository } from "./aptos/AptosJsonRPCBlockRepository"; @@ -39,12 +41,6 @@ import { PromStatRepository, SnsEventRepository, } from "."; -import { - InstrumentedConnection, - InstrumentedSuiClient, - ProviderPool, - RpcConfig, -} from "@xlabs/rpc-pool"; const WORMCHAIN_CHAIN = "wormchain"; const ALGORAND_CHAIN = "algorand"; @@ -225,16 +221,22 @@ export class RepositoriesBuilder { private buildSolanaRepository(chain: string): void { if (chain == SOLANA_CHAIN) { + const cfg = this.cfg.chains[chain]; + const solanaProviderPool = extendedProviderPoolSupplier( this.cfg.chains[chain].rpcs.map((url) => ({ url })), (rpcCfg: RpcConfig) => - new InstrumentedConnection(rpcCfg.url, { - commitment: rpcCfg.commitment || "confirmed", - }), + new InstrumentedConnectionWrapper( + rpcCfg.url, + { + commitment: rpcCfg.commitment || "confirmed", + }, + cfg.timeout ?? 1_000, + SOLANA_CHAIN + ), POOL_STRATEGY ); - const cfg = this.cfg.chains[chain]; const solanaSlotRepository = new RateLimitedSolanaSlotRepository( new Web3SolanaSlotRepository(solanaProviderPool), SOLANA_CHAIN, diff --git a/blockchain-watcher/src/infrastructure/repositories/solana/Web3SolanaSlotRepository.ts b/blockchain-watcher/src/infrastructure/repositories/solana/Web3SolanaSlotRepository.ts index aa692c90..2a25b2a6 100644 --- a/blockchain-watcher/src/infrastructure/repositories/solana/Web3SolanaSlotRepository.ts +++ b/blockchain-watcher/src/infrastructure/repositories/solana/Web3SolanaSlotRepository.ts @@ -1,34 +1,39 @@ -import { - Commitment, - Finality, - PublicKey, - SolanaJSONRPCError, - VersionedTransactionResponse, -} from "@solana/web3.js"; -import { InstrumentedConnection, ProviderPool } from "@xlabs/rpc-pool"; -import { solana } from "../../../domain/entities"; +import { InstrumentedConnectionWrapper } from "../../rpc/http/InstrumentedConnectionWrapper"; import { Fallible, SolanaFailure } from "../../../domain/errors"; import { SolanaSlotRepository } from "../../../domain/repositories"; +import { ProviderPool } from "@xlabs/rpc-pool"; +import { solana } from "../../../domain/entities"; +import winston from "../../log"; +import { + VersionedTransactionResponse, + SolanaJSONRPCError, + Commitment, + PublicKey, + Finality, +} from "@solana/web3.js"; export class Web3SolanaSlotRepository implements SolanaSlotRepository { - constructor(private readonly pool: ProviderPool) {} + protected readonly logger; + + constructor(private readonly pool: ProviderPool) { + this.logger = winston.child({ module: "Web3SolanaSlotRepository" }); + } getLatestSlot(commitment: string): Promise { return this.pool.get().getSlot(commitment as Commitment); } getBlock(slot: number, finality?: string): Promise> { - return this.pool - .get() + const provider = this.pool.get(); + return provider .getBlock(slot, { maxSupportedTransactionVersion: 0, commitment: this.normalizeFinality(finality), }) .then((block) => { if (block === null) { - return Fallible.error( - new SolanaFailure(0, "Block not found") - ); + // In this case we throw and error and we retry the request + throw new Error("Unable to parse result of getBlock"); } return Fallible.ok({ ...block, @@ -40,9 +45,16 @@ export class Web3SolanaSlotRepository implements SolanaSlotRepository { }) .catch((err) => { if (err instanceof SolanaJSONRPCError) { + // We skip the block if it is not available (e.g Slot N was skipped - Error code: -32007, -32009) return Fallible.error(new SolanaFailure(err.code, err.message)); } + this.logger.error( + `[solana][getBlock] Cannot process this slot: ${slot}}, error ${JSON.stringify( + err + )} on ${provider.getUrl()}` + ); + provider.setProviderOffline(); throw err; }); } @@ -69,13 +81,20 @@ export class Web3SolanaSlotRepository implements SolanaSlotRepository { sigs: solana.ConfirmedSignatureInfo[], finality?: string ): Promise { - const txs = await this.pool.get().getTransactions( + const provider = this.pool.get(); + const txs = await provider.getTransactions( sigs.map((sig) => sig.signature), { maxSupportedTransactionVersion: 0, commitment: this.normalizeFinality(finality) } ); if (txs.length !== sigs.length) { - throw new Error(`Expected ${sigs.length} transactions, but got ${txs.length} instead`); + this.logger.error( + `[solana][getTransactions] Expected ${sigs.length} transactions, but got ${ + txs.length + } instead on ${provider.getUrl()}` + ); + provider.setProviderOffline(); + throw new Error("Unable to parse result of getTransactions"); } return txs diff --git a/blockchain-watcher/src/infrastructure/rpc/http/InstrumentedConnectionWrapper.ts b/blockchain-watcher/src/infrastructure/rpc/http/InstrumentedConnectionWrapper.ts new file mode 100644 index 00000000..29f2e5aa --- /dev/null +++ b/blockchain-watcher/src/infrastructure/rpc/http/InstrumentedConnectionWrapper.ts @@ -0,0 +1,26 @@ +import { Commitment, Connection, ConnectionConfig } from "@solana/web3.js"; +import { ProviderHealthInstrumentation } from "@xlabs/rpc-pool"; + +export class InstrumentedConnectionWrapper extends Connection { + health: ProviderHealthInstrumentation; + private url: string; + + constructor( + endpoint: string, + commitment: Commitment | ConnectionConfig, + timeout: number, + chain: string + ) { + super(endpoint, commitment); + this.health = new ProviderHealthInstrumentation(timeout, chain); + this.url = endpoint; + } + + public setProviderOffline(): void { + this.health.serviceOfflineSince = new Date(); + } + + public getUrl(): string { + return this.url; + } +} diff --git a/blockchain-watcher/test/infrastructure/repositories/Web3SolanaSlotRepository.test.ts b/blockchain-watcher/test/infrastructure/repositories/Web3SolanaSlotRepository.test.ts index 4452fd2c..8c36f3f5 100644 --- a/blockchain-watcher/test/infrastructure/repositories/Web3SolanaSlotRepository.test.ts +++ b/blockchain-watcher/test/infrastructure/repositories/Web3SolanaSlotRepository.test.ts @@ -74,15 +74,19 @@ describe("Web3SolanaSlotRepository", () => { const connectionMock = { rpcEndpoint: "http://solanafake.com", getBlock: (slot: number) => Promise.resolve(null), + getUrl: () => "https://api.mainnet-beta.solana.com", + setProviderOffline: () => new Date(), }; const poolMock = { get: () => connectionMock, }; const repository = new Web3SolanaSlotRepository(poolMock as any); - const block = await repository.getBlock(100); - - expect(block.getError()).toBeDefined(); + try { + await repository.getBlock(100); + } catch (e) { + expect(e).toBeDefined(); + } }); }); diff --git a/deploy/blockchain-watcher/workers/solana-source-events-1.yaml b/deploy/blockchain-watcher/workers/solana-source-events-1.yaml index 5b4f001f..0ca32e4c 100644 --- a/deploy/blockchain-watcher/workers/solana-source-events-1.yaml +++ b/deploy/blockchain-watcher/workers/solana-source-events-1.yaml @@ -145,13 +145,12 @@ spec: value: {{ .SNS_REGION }} - name: NODE_OPTIONS value: {{ .NODE_OPTIONS }} - - name: SOLANA_RPCS - valueFrom: - secretKeyRef: - name: blockchain-watcher - key: solana-urls - name: JOBS_DIR value: /home/node/app/jobs + {{ if .SOLANA_RPCS }} + - name: SOLANA_RPCS + value: '{{ .SOLANA_RPCS }}' + {{ end }} resources: limits: cpu: {{ .SOLANA_RESOURCES_LIMITS_CPU }} diff --git a/deploy/blockchain-watcher/workers/solana-target-events-1.yaml b/deploy/blockchain-watcher/workers/solana-target-events-1.yaml index e7a1f282..be01a840 100644 --- a/deploy/blockchain-watcher/workers/solana-target-events-1.yaml +++ b/deploy/blockchain-watcher/workers/solana-target-events-1.yaml @@ -230,13 +230,12 @@ spec: value: {{ .SNS_REGION }} - name: NODE_OPTIONS value: {{ .NODE_OPTIONS }} - - name: SOLANA_RPCS - valueFrom: - secretKeyRef: - name: blockchain-watcher - key: solana-urls - name: JOBS_DIR value: /home/node/app/jobs + {{ if .SOLANA_RPCS }} + - name: SOLANA_RPCS + value: '{{ .SOLANA_RPCS }}' + {{ end }} resources: limits: cpu: {{ .SOLANA_RESOURCES_LIMITS_CPU }}