[Blockchain Watcher] (FIX) Improve rpc providers instance (#1524)

* Improve evm client instance and logs events

* Improve solana redeem log

* Improve rpc poll providers

* Create decorator class

* Change MAX_DIFF_BLOCK_HEIGHT value for 10_000

* Improve code

* Change debug log for info

* Override get function

* Improve imports

* Update rpc poll version

* Resolve test and mock

* Improve staticJob test

---------

Co-authored-by: julian merlo <julianmerlo@julians-MacBook-Pro-2.local>
This commit is contained in:
Julian 2024-07-01 17:35:37 -03:00 committed by GitHub
parent 7e4a687904
commit b248892feb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 900 additions and 749 deletions

File diff suppressed because it is too large Load Diff

View File

@ -23,7 +23,7 @@
"@certusone/wormhole-sdk": "0.10.5",
"@cosmjs/proto-signing": "^0.32.3",
"@mysten/sui.js": "^0.49.1",
"@xlabs/rpc-pool": "^0.0.4",
"@xlabs/rpc-pool": "^1.0.0",
"algosdk": "^2.8.0",
"axios": "^1.6.0",
"bs58": "^5.0.0",

View File

@ -6,7 +6,7 @@ import { GetEvmLogs } from "./GetEvmLogs";
import { Filters } from "./types";
import winston from "winston";
const MAX_DIFF_BLOCK_HEIGHT = 5_000;
const MAX_DIFF_BLOCK_HEIGHT = 10_000;
const ID = "watch-evm-logs";
/**

View File

@ -33,7 +33,7 @@ export const algorandLogMessagePublishedMapper = (
const sequence = Number(`0x${Buffer.from(innetTx.logs[0], "base64").toString("hex")}`);
logger.info(
`[algorand] Source event info: [tx: ${transaction.hash}][${CHAIN_ID_ALGORAND}/${emitterChain}/${sequence}]`
`[algorand] Source event info: [tx: ${transaction.hash}][VAA: ${CHAIN_ID_ALGORAND}/${emitterChain}/${sequence}]`
);
return {

View File

@ -16,7 +16,7 @@ export const aptosLogMessagePublishedMapper = (
const address = transaction.payload.function.split("::")[0];
logger.info(
`[aptos] Source event info: [tx: ${transaction.hash}][emitterChain: ${CHAIN_ID_APTOS}][sender: ${wormholeData.sender}}][sequence: ${wormholeData.sequence}]`
`[aptos] Source event info: [tx: ${transaction.hash}][VAA: ${CHAIN_ID_APTOS}/${wormholeData.sender}/${wormholeData.sequence}]`
);
return {

View File

@ -18,7 +18,7 @@ export const evmLogMessagePublishedMapper = (
const sequence = (parsedArgs[1] as BigNumber).toNumber();
logger.info(
`[${log.chain}] Source event info: [tx: ${txHash}][emitterChain: ${chainId}][sender: ${sender}}][sequence: ${sequence}]`
`[${log.chain}] Source event info: [tx: ${txHash}][VAA: ${chainId}/${sender}/${sequence}]`
);
return {

View File

@ -44,7 +44,7 @@ export const evmRedeemedTransactionFoundMapper = (
const emitterChain = vaaInformation.emitterChain;
const sequence = vaaInformation.sequence;
logger.debug(
logger.info(
`[${transaction.chain}] Redeemed transaction info: [hash: ${transaction.hash}][VAA: ${emitterChain}/${emitterAddress}/${sequence}][protocol: ${protocolType}/${protocolMethod}]`
);

View File

@ -43,22 +43,14 @@ export const solanaLogMessagePublishedMapper = async (
const accountId = accountKeys[instruction.accountKeyIndexes[1]];
const { message } = await getPostedMessage(connection, accountId, commitment);
const {
sequence,
emitterAddress,
emitterChain,
submissionTime: timestamp,
nonce,
payload,
consistencyLevel,
} = message || {};
const { sequence, emitterAddress, emitterChain, nonce, payload, consistencyLevel } =
message || {};
const emitterAddressToHex = emitterAddress.toString("hex");
const txHash = tx.transaction.signatures[0];
logger.debug(
`[solana] Source event info: [hash: ${txHash}][emitterChain: ${emitterChain}][sender: ${emitterAddress.toString(
"hex"
)}][sequence: ${sequence}]`
logger.info(
`[solana] Source event info: [hash: ${txHash}][VAA: ${emitterChain}/${emitterAddressToHex}/${sequence}]`
);
results.push({
@ -69,7 +61,7 @@ export const solanaLogMessagePublishedMapper = async (
blockHeight: BigInt(tx.slot.toString()),
blockTime: tx.blockTime,
attributes: {
sender: emitterAddress.toString("hex"),
sender: emitterAddressToHex,
sequence: Number(sequence),
payload: payload.toString("hex"),
nonce,

View File

@ -71,11 +71,10 @@ const processProgram = async (
const protocol = findProtocol(SOLANA_CHAIN, programId, hexData, txHash);
const protocolMethod = protocol?.method ?? "unknown";
const protocolType = protocol?.type ?? "unknown";
const emitterAddressToHex = emitterAddress.toString("hex");
logger.debug(
`[${chain}}] Redeemed transaction info: [hash: ${txHash}][VAA: ${emitterChain}/${emitterAddress.toString(
"hex"
)}/${sequence}][protocol: ${protocolType}/${protocolMethod}]`
logger.info(
`[${chain}] Redeemed transaction info: [hash: ${txHash}][VAA: ${emitterChain}/${emitterAddressToHex}/${sequence}][protocol: ${protocolType}/${protocolMethod}]`
);
results.push({
@ -89,7 +88,7 @@ const processProgram = async (
methodsByAddress: protocol?.method ?? "unknownInstruction",
status: mappedStatus(transaction),
emitterChain: emitterChain,
emitterAddress: emitterAddress.toString("hex"),
emitterAddress: emitterAddressToHex,
sequence: Number(sequence),
protocol: protocolType,
fee: transaction.meta?.fee,

View File

@ -21,7 +21,7 @@ export const suiLogMessagePublishedMapper = (
const { nonce, sender, sequence, payload, consistencyLevel } = logMessage;
logger.info(
`[sui] Source event info: [digest: ${receipt.digest}][emitterChain: ${CHAIN_ID_SUI}][sender: ${sender}}][sequence: ${sequence}]`
`[sui] Source event info: [digest: ${receipt.digest}][VAA: ${CHAIN_ID_SUI}/${sender}/${sequence}]`
);
return {

View File

@ -29,6 +29,7 @@ export abstract class RateLimitedRPCRepository<T> {
this.logger.warn("Got no healthy providers from RPC node. Retrying in 5 secs...");
return 5_000; // Wait 5 secs if we get a no healthy providers
} else {
this.logger.warn("Retry according to config...");
return true; // Retry according to config
}
},

View File

@ -6,6 +6,7 @@ import { RateLimitedSeiJsonRPCBlockRepository } from "./sei/RateLimitedSeiJsonRP
import { RateLimitedSuiJsonRPCBlockRepository } from "./sui/RateLimitedSuiJsonRPCBlockRepository";
import { WormchainJsonRPCBlockRepository } from "./wormchain/WormchainJsonRPCBlockRepository";
import { AlgorandJsonRPCBlockRepository } from "./algorand/AlgorandJsonRPCBlockRepository";
import { extendedProviderPoolSupplier } from "../rpc/http/ProviderPoolDecorator";
import { AptosJsonRPCBlockRepository } from "./aptos/AptosJsonRPCBlockRepository";
import { SNSClient, SNSClientConfig } from "@aws-sdk/client-sns";
import { SeiJsonRPCBlockRepository } from "./sei/SeiJsonRPCBlockRepository";
@ -19,13 +20,6 @@ import {
SuiRepository,
SeiRepository,
} from "../../domain/repositories";
import {
InstrumentedConnection,
InstrumentedSuiClient,
providerPoolSupplier,
ProviderPool,
RpcConfig,
} from "@xlabs/rpc-pool";
import {
MoonbeamEvmJsonRPCBlockRepository,
ArbitrumEvmJsonRPCBlockRepository,
@ -42,6 +36,12 @@ import {
SnsEventRepository,
ProviderPoolMap,
} from ".";
import {
InstrumentedConnection,
InstrumentedSuiClient,
ProviderPool,
RpcConfig,
} from "@xlabs/rpc-pool";
const WORMCHAIN_CHAIN = "wormchain";
const ALGORAND_CHAIN = "algorand";
@ -77,7 +77,7 @@ const EVM_CHAINS = new Map([
["xlayer", "evmRepo"],
]);
const POOL_STRATEGY = "weighted";
const POOL_STRATEGY = "healthy";
export class RepositoriesBuilder {
private repositories = new Map();
@ -183,7 +183,7 @@ export class RepositoriesBuilder {
private buildSolanaRepository(chain: string): void {
if (chain == SOLANA_CHAIN) {
const solanaProviderPool = providerPoolSupplier(
const solanaProviderPool = extendedProviderPoolSupplier(
this.cfg.chains[chain].rpcs.map((url) => ({ url })),
(rpcCfg: RpcConfig) =>
new InstrumentedConnection(rpcCfg.url, {
@ -235,7 +235,7 @@ export class RepositoriesBuilder {
private buildSuiRepository(chain: string): void {
if (chain == SUI_CHAIN) {
const suiProviderPool = providerPoolSupplier(
const suiProviderPool = extendedProviderPoolSupplier(
this.cfg.chains[chain].rpcs.map((url) => ({ url })),
(rpcCfg: RpcConfig) => new InstrumentedSuiClient(rpcCfg.url, 2000),
POOL_STRATEGY
@ -338,7 +338,7 @@ export class RepositoriesBuilder {
let pools: ProviderPoolMap = {};
for (const chain in this.cfg.chains) {
const cfg = this.cfg.chains[chain];
pools[chain] = providerPoolSupplier(
pools[chain] = extendedProviderPoolSupplier(
cfg.rpcs.map((url) => ({ url })),
(rpcCfg: RpcConfig) => this.createHttpClient(chain, rpcCfg.url),
POOL_STRATEGY
@ -352,8 +352,7 @@ export class RepositoriesBuilder {
rpcs = this.cfg.chains[chain].rpcs;
}
const cfg = this.cfg.chains[chain];
const pools = providerPoolSupplier(
const pools = extendedProviderPoolSupplier(
rpcs.map((url) => ({ url })),
(rpcCfg: RpcConfig) => this.createHttpClient(chain, rpcCfg.url),
POOL_STRATEGY

View File

@ -75,7 +75,7 @@ export class StaticJobRepository implements JobRepository {
new Map();
private mappers: Map<string, any> = new Map();
private targets: Map<string, () => Promise<(items: any[]) => Promise<void>>> = new Map();
private blockRepoProvider: (chain: string) => EvmBlockRepository;
private evmRepo: (chain: string) => EvmBlockRepository;
private metadataRepo: MetadataRepository<any>;
private statsRepo: StatRepository;
private snsRepo: SnsEventRepository;
@ -90,7 +90,7 @@ export class StaticJobRepository implements JobRepository {
environment: string,
path: string,
dryRun: boolean,
blockRepoProvider: (chain: string) => EvmBlockRepository,
evmRepo: (chain: string) => EvmBlockRepository,
repos: {
metadataRepo: MetadataRepository<any>;
statsRepo: StatRepository;
@ -104,7 +104,7 @@ export class StaticJobRepository implements JobRepository {
}
) {
this.fileRepo = new FileMetadataRepository(path);
this.blockRepoProvider = blockRepoProvider;
this.evmRepo = evmRepo;
this.metadataRepo = repos.metadataRepo;
this.statsRepo = repos.statsRepo;
this.snsRepo = repos.snsRepo;
@ -174,7 +174,7 @@ export class StaticJobRepository implements JobRepository {
private loadActions(): void {
const pollEvm = (jobDef: JobDefinition) =>
new PollEvm(
this.blockRepoProvider(jobDef.source.config.chain),
this.evmRepo(jobDef.source.config.chain),
this.metadataRepo,
this.statsRepo,
new PollEvmLogsConfig({

View File

@ -34,7 +34,6 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
) {
this.cfg = cfg;
this.pool = pool;
this.logger = winston.child({ module: "EvmJsonRPCBlockRepository" });
this.logger.info(`Created for ${Object.keys(this.cfg.chains)}`);
}
@ -158,10 +157,12 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
parsedFilters.address = filter.addresses;
}
const provider = this.getChainProvider(chain);
const chainCfg = this.getCurrentChain(chain);
let response: { result: Log[]; error?: ErrorBlock };
try {
response = await this.getChainProvider(chain).post<typeof response>(
response = await provider.post<typeof response>(
{
jsonrpc: "2.0",
method: "eth_getLogs",
@ -175,6 +176,9 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}
if (response.error) {
// If we get an error, we'll mark the provider as offline
provider.setProviderOffline();
throw new Error(
`[${chain}][getFilteredLogs] Error fetching logs with message: ${
response.error.message

View File

@ -6,7 +6,6 @@ import winston from "winston";
const TRANSACTION_SEARCH_ENDPOINT = "/tx_search";
const BLOCK_ENDPOINT = "/block";
const ACTION = "complete_transfer_with_payload";
type ProviderPoolMap = ProviderPool<InstrumentedHttpProvider>;

View File

@ -89,6 +89,8 @@ export class Web3SolanaSlotRepository implements SolanaSlotRepository {
return {
...tx,
chain: "solana",
chainId: 1,
transaction: {
...tx?.transaction,
message: {

View File

@ -1,15 +1,15 @@
import {
Checkpoint,
SuiEventFilter,
SuiTransactionBlockResponse,
TransactionFilter,
} from "@mysten/sui.js/client";
import { InstrumentedSuiClient, ProviderPool } from "@xlabs/rpc-pool";
import { SuiTransactionBlockReceipt } from "../../../domain/entities/sui";
import { divideIntoBatches } from "../common/utils";
import { SuiRepository } from "../../../domain/repositories";
import winston from "winston";
import { Range } from "../../../domain/entities";
import { SuiTransactionBlockReceipt } from "../../../domain/entities/sui";
import { SuiRepository } from "../../../domain/repositories";
import { divideIntoBatches } from "../common/utils";
import {
SuiTransactionBlockResponse,
TransactionFilter,
SuiEventFilter,
Checkpoint,
} from "@mysten/sui.js/client";
const QUERY_MAX_RESULT_LIMIT_CHECKPOINTS = 100;
const TX_BATCH_SIZE = 50;

View File

@ -0,0 +1,22 @@
import {
InstrumentedEthersProvider,
InstrumentedConnection,
WeightedProvidersPool,
InstrumentedSuiClient,
} from "@xlabs/rpc-pool";
export class HealthyProvidersPool<
T extends InstrumentedEthersProvider | InstrumentedConnection | InstrumentedSuiClient
> extends WeightedProvidersPool<T> {
get(): T {
const healthyProviders = this.getAllHealthy();
if (healthyProviders && healthyProviders.length > 0) {
return healthyProviders[0];
}
const unhealthyProviders = this.getAllUnhealthy();
const randomProvider =
unhealthyProviders[Math.floor(Math.random() * unhealthyProviders.length)];
return randomProvider;
}
}

View File

@ -48,6 +48,10 @@ export class InstrumentedHttpProvider {
return this.execute("GET", undefined, endpointBuild, opts);
}
public setProviderOffline(): void {
this.health.serviceOfflineSince = new Date();
}
private async execute<T>(
method: string,
body?: any,

View File

@ -0,0 +1,30 @@
import { HealthyProvidersPool } from "./HealthyProvidersPool";
import { Logger } from "winston";
import {
InstrumentedEthersProvider,
InstrumentedConnection,
providerPoolSupplier,
InstrumentedRpc,
ProviderPool,
RpcConfig,
} from "@xlabs/rpc-pool";
export function extendedProviderPoolSupplier<T extends InstrumentedRpc>(
rpcs: RpcConfig[],
createProvider: (rpcCfg: RpcConfig) => T,
type?: string,
logger?: Logger
): ProviderPool<T> {
switch (type) {
case "healthy":
return HealthyProvidersPool.fromConfigs(
rpcs,
createProvider as unknown as (
rpc: RpcConfig
) => InstrumentedEthersProvider | InstrumentedConnection,
logger
) as unknown as ProviderPool<T>;
default:
return providerPoolSupplier(rpcs, createProvider, type, logger);
}
}

View File

@ -92,8 +92,12 @@ const givenARepo = () => {
environment: "mainnet",
},
{
ethereum: { get: () => new InstrumentedHttpProvider({ url: rpc, chain: "ethereum" }) },
arbitrum: { get: () => new InstrumentedHttpProvider({ url: rpc, chain: "arbitrum" }) },
ethereum: {
get: () => new InstrumentedHttpProvider({ url: rpc, chain: "ethereum" }),
},
arbitrum: {
get: () => new InstrumentedHttpProvider({ url: rpc, chain: "arbitrum" }),
},
} as any,
givenMetadataRepository([{ associatedL1Block: 18764852, l2BlockNumber: 157542621 }])
);

View File

@ -50,7 +50,9 @@ const givenARepo = () => {
environment: "testnet",
},
{
moonbeam: { get: () => new InstrumentedHttpProvider({ url: rpc, chain: "moonbeam" }) },
moonbeam: {
get: () => new InstrumentedHttpProvider({ url: rpc, chain: "moonbeam" }),
},
} as any
);
};

View File

@ -57,38 +57,66 @@ describe("StaticJobRepository", () => {
givenJobsPresent();
const jobs = await repo.getJobDefinitions();
expect(jobs).toHaveLength(1);
expect(jobs[0].id).toEqual("poll-log-message-published-ethereum");
expect(jobs[0].id).toEqual("poll-redeemed-transactions-ethereum");
});
});
const givenJobsPresent = () => {
const jobs = [
{
id: "poll-log-message-published-ethereum",
id: "poll-redeemed-transactions-ethereum",
chain: "ethereum",
source: {
action: "PollEvm",
records: "GetEvmTransactions",
config: {
fromBlock: 10012499n,
blockBatchSize: 100,
commitment: "latest",
interval: 15_000,
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
interval: 15000,
filters: [
{
addresses: [],
type: "Portal Token Bridge (Connect, Portico, Omniswap, tBTC, etc)",
topics: ["0xcaf280c8cfeba144da67230d9b009c8f868a75bac9a528fa0474be1ba317c169"],
strategy: "GetTransactionsByLogFiltersStrategy",
},
{
addresses: [],
type: "CCTP",
topics: ["0xf02867db6908ee5f81fd178573ae9385837f0a0a72553f8c08306759a7e0f00e"],
strategy: "GetTransactionsByLogFiltersStrategy",
},
{
addresses: [],
type: "Standard Relayer",
topics: ["0xbccc00b713f54173962e7de6098f643d8ebf53d488d71f4b2a5171496d038f9e"],
strategy: "GetTransactionsByLogFiltersStrategy",
},
{
addresses: [],
type: "NTT",
topics: ["0xf6fc529540981400dc64edf649eb5e2e0eb5812a27f8c81bac2c1d317e71a5f0"],
strategy: "GetTransactionsByLogFiltersStrategy",
},
{
addresses: ["0x6FFd7EdE62328b3Af38FCD61461Bbfc52F5651fE"],
type: "NFT",
topics: ["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"],
strategy: "GetTransactionsByBlocksStrategy",
},
],
chain: "ethereum",
topics: [],
chainId: 2,
},
},
handlers: [
{
action: "HandleEvmLogs",
action: "HandleEvmTransactions",
target: "sns",
mapper: "evmLogMessagePublishedMapper",
mapper: "evmRedeemedTransactionFoundMapper",
config: {
abi: "event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel)",
filter: {
addresses: ["0x706abc4E45D419950511e474C7B9Ed348A4a716c"],
topics: ["0x6eb224fb001ed210e379b335e35efe88672a8ce935d981a6896b27ffdf52a3b2"],
},
abi: "",
metricName: "process_vaa_event",
},
},
],

View File

@ -124,6 +124,8 @@ describe("Web3SolanaSlotRepository", () => {
{
signature: "signature1",
slot: 100,
chain: "solana",
chainId: 1,
transaction: {
message: {
version: "legacy",

View File

@ -442,7 +442,7 @@ describe("WormchainJsonRPCBlockRepository", () => {
};
it("should not be able to get redeems because do not find the cosmos client", async () => {
givenARepo({ get: (chain: number) => cosmosPools.get(chain) } as any);
givenARepo(cosmosPools);
givenBlockHeightIs();
osmosisRedeem.targetChain = 20;

View File

@ -1,5 +1,5 @@
import { SnsConfig } from "../../src/infrastructure/repositories";
import { Config, ChainRPCConfig } from "../../src/infrastructure/config";
import { SnsConfig } from "../../src/infrastructure/repositories";
export const configMock = (): Config => {
const chainsRecord: Record<string, ChainRPCConfig> = {

View File

@ -17,6 +17,12 @@ export class ProviderHealthInstrumentationMock {
};
}
class WeightedProvidersPool {
fromConfigs() {
return this;
}
}
type RpcConfig = { url: string };
type PoolSupplier = <T>(
cfg: RpcConfig,
@ -36,9 +42,16 @@ const providerPoolSupplier: PoolSupplier = <T>(
export function mockRpcPool() {
jest.mock("@xlabs/rpc-pool", () => {
return {
providerPoolRegistry: new prometheus.Registry(),
ProviderHealthInstrumentation: ProviderHealthInstrumentationMock,
providerPoolRegistry: new prometheus.Registry(),
WeightedProvidersPool,
providerPoolSupplier,
};
});
jest.mock("../../src/infrastructure/rpc/http/HealthyProvidersPool", () => ({
HealthyProvidersPool: {
fromConfigs: jest.fn().mockReturnValue({}),
},
}));
}