[Blockchain Watcher] (SOLANA) Create wrapper and offline method (#1669)

* Create wrapper and offline method

* Improve comments

* Improve InstrumentedConnectionWrapper

---------

Co-authored-by: julian merlo <julianmerlo@julians-MacBook-Air.local>
This commit is contained in:
Julian 2024-09-11 08:33:49 -03:00 committed by GitHub
parent ccf605af40
commit ad9e22a820
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 89 additions and 40 deletions

View File

@ -1,3 +1,4 @@
import { InstrumentedSuiClient, ProviderPool, RpcConfig } from "@xlabs/rpc-pool";
import { RateLimitedWormchainJsonRPCBlockRepository } from "./wormchain/RateLimitedWormchainJsonRPCBlockRepository";
import { RateLimitedAlgorandJsonRPCBlockRepository } from "./algorand/RateLimitedAlgorandJsonRPCBlockRepository";
import { RateLimitedCosmosJsonRPCBlockRepository } from "./cosmos/RateLimitedCosmosJsonRPCBlockRepository";
@ -7,6 +8,7 @@ import { RateLimitedEvmJsonRPCBlockRepository } from "./evm/RateLimitedEvmJsonRP
import { RateLimitedSuiJsonRPCBlockRepository } from "./sui/RateLimitedSuiJsonRPCBlockRepository";
import { WormchainJsonRPCBlockRepository } from "./wormchain/WormchainJsonRPCBlockRepository";
import { AlgorandJsonRPCBlockRepository } from "./algorand/AlgorandJsonRPCBlockRepository";
import { InstrumentedConnectionWrapper } from "../rpc/http/InstrumentedConnectionWrapper";
import { CosmosJsonRPCBlockRepository } from "./cosmos/CosmosJsonRPCBlockRepository";
import { extendedProviderPoolSupplier } from "../rpc/http/ProviderPoolDecorator";
import { AptosJsonRPCBlockRepository } from "./aptos/AptosJsonRPCBlockRepository";
@ -39,12 +41,6 @@ import {
PromStatRepository,
SnsEventRepository,
} from ".";
import {
InstrumentedConnection,
InstrumentedSuiClient,
ProviderPool,
RpcConfig,
} from "@xlabs/rpc-pool";
const WORMCHAIN_CHAIN = "wormchain";
const ALGORAND_CHAIN = "algorand";
@ -225,16 +221,22 @@ export class RepositoriesBuilder {
private buildSolanaRepository(chain: string): void {
if (chain == SOLANA_CHAIN) {
const cfg = this.cfg.chains[chain];
const solanaProviderPool = extendedProviderPoolSupplier(
this.cfg.chains[chain].rpcs.map((url) => ({ url })),
(rpcCfg: RpcConfig) =>
new InstrumentedConnection(rpcCfg.url, {
commitment: rpcCfg.commitment || "confirmed",
}),
new InstrumentedConnectionWrapper(
rpcCfg.url,
{
commitment: rpcCfg.commitment || "confirmed",
},
cfg.timeout ?? 1_000,
SOLANA_CHAIN
),
POOL_STRATEGY
);
const cfg = this.cfg.chains[chain];
const solanaSlotRepository = new RateLimitedSolanaSlotRepository(
new Web3SolanaSlotRepository(solanaProviderPool),
SOLANA_CHAIN,

View File

@ -1,34 +1,39 @@
import {
Commitment,
Finality,
PublicKey,
SolanaJSONRPCError,
VersionedTransactionResponse,
} from "@solana/web3.js";
import { InstrumentedConnection, ProviderPool } from "@xlabs/rpc-pool";
import { solana } from "../../../domain/entities";
import { InstrumentedConnectionWrapper } from "../../rpc/http/InstrumentedConnectionWrapper";
import { Fallible, SolanaFailure } from "../../../domain/errors";
import { SolanaSlotRepository } from "../../../domain/repositories";
import { ProviderPool } from "@xlabs/rpc-pool";
import { solana } from "../../../domain/entities";
import winston from "../../log";
import {
VersionedTransactionResponse,
SolanaJSONRPCError,
Commitment,
PublicKey,
Finality,
} from "@solana/web3.js";
export class Web3SolanaSlotRepository implements SolanaSlotRepository {
constructor(private readonly pool: ProviderPool<InstrumentedConnection>) {}
protected readonly logger;
constructor(private readonly pool: ProviderPool<InstrumentedConnectionWrapper>) {
this.logger = winston.child({ module: "Web3SolanaSlotRepository" });
}
getLatestSlot(commitment: string): Promise<number> {
return this.pool.get().getSlot(commitment as Commitment);
}
getBlock(slot: number, finality?: string): Promise<Fallible<solana.Block, SolanaFailure>> {
return this.pool
.get()
const provider = this.pool.get();
return provider
.getBlock(slot, {
maxSupportedTransactionVersion: 0,
commitment: this.normalizeFinality(finality),
})
.then((block) => {
if (block === null) {
return Fallible.error<solana.Block, SolanaFailure>(
new SolanaFailure(0, "Block not found")
);
// In this case we throw and error and we retry the request
throw new Error("Unable to parse result of getBlock");
}
return Fallible.ok<solana.Block, SolanaFailure>({
...block,
@ -40,9 +45,16 @@ export class Web3SolanaSlotRepository implements SolanaSlotRepository {
})
.catch((err) => {
if (err instanceof SolanaJSONRPCError) {
// We skip the block if it is not available (e.g Slot N was skipped - Error code: -32007, -32009)
return Fallible.error(new SolanaFailure(err.code, err.message));
}
this.logger.error(
`[solana][getBlock] Cannot process this slot: ${slot}}, error ${JSON.stringify(
err
)} on ${provider.getUrl()}`
);
provider.setProviderOffline();
throw err;
});
}
@ -69,13 +81,20 @@ export class Web3SolanaSlotRepository implements SolanaSlotRepository {
sigs: solana.ConfirmedSignatureInfo[],
finality?: string
): Promise<solana.Transaction[]> {
const txs = await this.pool.get().getTransactions(
const provider = this.pool.get();
const txs = await provider.getTransactions(
sigs.map((sig) => sig.signature),
{ maxSupportedTransactionVersion: 0, commitment: this.normalizeFinality(finality) }
);
if (txs.length !== sigs.length) {
throw new Error(`Expected ${sigs.length} transactions, but got ${txs.length} instead`);
this.logger.error(
`[solana][getTransactions] Expected ${sigs.length} transactions, but got ${
txs.length
} instead on ${provider.getUrl()}`
);
provider.setProviderOffline();
throw new Error("Unable to parse result of getTransactions");
}
return txs

View File

@ -0,0 +1,26 @@
import { Commitment, Connection, ConnectionConfig } from "@solana/web3.js";
import { ProviderHealthInstrumentation } from "@xlabs/rpc-pool";
export class InstrumentedConnectionWrapper extends Connection {
health: ProviderHealthInstrumentation;
private url: string;
constructor(
endpoint: string,
commitment: Commitment | ConnectionConfig,
timeout: number,
chain: string
) {
super(endpoint, commitment);
this.health = new ProviderHealthInstrumentation(timeout, chain);
this.url = endpoint;
}
public setProviderOffline(): void {
this.health.serviceOfflineSince = new Date();
}
public getUrl(): string {
return this.url;
}
}

View File

@ -74,15 +74,19 @@ describe("Web3SolanaSlotRepository", () => {
const connectionMock = {
rpcEndpoint: "http://solanafake.com",
getBlock: (slot: number) => Promise.resolve(null),
getUrl: () => "https://api.mainnet-beta.solana.com",
setProviderOffline: () => new Date(),
};
const poolMock = {
get: () => connectionMock,
};
const repository = new Web3SolanaSlotRepository(poolMock as any);
const block = await repository.getBlock(100);
expect(block.getError()).toBeDefined();
try {
await repository.getBlock(100);
} catch (e) {
expect(e).toBeDefined();
}
});
});

View File

@ -145,13 +145,12 @@ spec:
value: {{ .SNS_REGION }}
- name: NODE_OPTIONS
value: {{ .NODE_OPTIONS }}
- name: SOLANA_RPCS
valueFrom:
secretKeyRef:
name: blockchain-watcher
key: solana-urls
- name: JOBS_DIR
value: /home/node/app/jobs
{{ if .SOLANA_RPCS }}
- name: SOLANA_RPCS
value: '{{ .SOLANA_RPCS }}'
{{ end }}
resources:
limits:
cpu: {{ .SOLANA_RESOURCES_LIMITS_CPU }}

View File

@ -230,13 +230,12 @@ spec:
value: {{ .SNS_REGION }}
- name: NODE_OPTIONS
value: {{ .NODE_OPTIONS }}
- name: SOLANA_RPCS
valueFrom:
secretKeyRef:
name: blockchain-watcher
key: solana-urls
- name: JOBS_DIR
value: /home/node/app/jobs
{{ if .SOLANA_RPCS }}
- name: SOLANA_RPCS
value: '{{ .SOLANA_RPCS }}'
{{ end }}
resources:
limits:
cpu: {{ .SOLANA_RESOURCES_LIMITS_CPU }}