[Blockchain Watcher] (FIX) Improve code (#1586)

* Validate txs process in repository

* Change commitment in circle jobs for latest

* Improve rateLimited class

* Map interval and attempts variable

* Improve retry log

* Improve for process

* Remove arbitrum rpc

* Add testnet jobs for cctp

* Resolve comment in PR

* Set up finalized commitment

* Improve log error in evm repository

* Map fromConfigs method into HealthyProvidersPool

* Reduce test coverage

* Resolve comment in PR

* Remove reqs param in log

* Change blockBatchSize value for acala

* Change blockBatchSize value for acala
This commit is contained in:
Julian 2024-08-09 16:19:59 -03:00 committed by GitHub
parent a1d6a1325b
commit bdb9e5df1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 436 additions and 93 deletions

View File

@ -130,7 +130,7 @@
"arbitrum": {
"network": "mainnet",
"chainId": 23,
"rpcs": ["https://arbitrum.blockpi.network/v1/rpc/public", "https://arb1.arbitrum.io/rpc"]
"rpcs": ["https://arb1.arbitrum.io/rpc"]
},
"optimism": {
"network": "mainnet",

View File

@ -91,7 +91,7 @@
"coverageDirectory": "./coverage",
"coverageThreshold": {
"global": {
"lines": 77
"lines": 76
}
}
},

View File

@ -5,31 +5,39 @@ import winston from "winston";
export abstract class RateLimitedRPCRepository<T> {
protected delegate: T;
protected breaker: Circuit;
protected chain: string;
protected logger: winston.Logger = winston.child({
module: "RateLimitedRPCRepository",
});
constructor(delegate: T, opts: Options = { period: 10_000, limit: 1_000 }) {
constructor(
delegate: T,
chain: string,
opts: Options = { period: 10_000, limit: 1_000, interval: 1_000, attempts: 10 }
) {
this.delegate = delegate;
this.chain = chain;
this.breaker = new Circuit({
options: {
modules: [
new Ratelimit({ limitPeriod: opts.period, limitForPeriod: opts.limit }),
new Retry({
attempts: 10,
interval: 1_000,
attempts: opts.attempts,
interval: opts.interval,
fastFirst: false,
mode: RetryMode.EXPONENTIAL,
factor: 1,
onRejection: (err: Error | any) => {
if (err.message?.includes("429")) {
this.logger.warn("Got 429 from RPC node. Retrying in 5 secs...");
this.logger.warn(`[${chain}] Got 429 from RPC node. Retrying in 5 secs...`);
return 5_000; // Wait 5 secs if we get a 429
} else if (err.message?.includes("healthy providers")) {
this.logger.warn("Got no healthy providers from RPC node. Retrying in 5 secs...");
this.logger.warn(
`[${chain}] Got no healthy providers from RPC node. Retrying in 5 secs...`
);
return 5_000; // Wait 5 secs if we get a no healthy providers
} else {
this.logger.warn("Retry according to config...");
this.logger.warn(`[${chain}] Retry according to config...`);
return true; // Retry according to config
}
},

View File

@ -226,7 +226,13 @@ export class RepositoriesBuilder {
const cfg = this.cfg.chains[chain];
const solanaSlotRepository = new RateLimitedSolanaSlotRepository(
new Web3SolanaSlotRepository(solanaProviderPool),
cfg.rateLimit
SOLANA_CHAIN,
{
period: cfg.rateLimit?.period ?? 10_000,
limit: cfg.rateLimit?.limit ?? 1_000,
interval: cfg.timeout ?? 1_000,
attempts: cfg.retries ?? 10,
}
);
this.repositories.set("solana-slotRepo", solanaSlotRepository);
}
@ -240,19 +246,24 @@ export class RepositoriesBuilder {
};
const moonbeamRepository = new RateLimitedEvmJsonRPCBlockRepository(
new MoonbeamEvmJsonRPCBlockRepository(repoCfg, pools)
new MoonbeamEvmJsonRPCBlockRepository(repoCfg, pools),
"moonbeam"
);
const arbitrumRepository = new RateLimitedEvmJsonRPCBlockRepository(
new ArbitrumEvmJsonRPCBlockRepository(repoCfg, pools, this.getMetadataRepository())
new ArbitrumEvmJsonRPCBlockRepository(repoCfg, pools, this.getMetadataRepository()),
"arbitrum"
);
const polygonRepository = new RateLimitedEvmJsonRPCBlockRepository(
new PolygonJsonRPCBlockRepository(repoCfg, pools)
new PolygonJsonRPCBlockRepository(repoCfg, pools),
"polygon"
);
const bscRepository = new RateLimitedEvmJsonRPCBlockRepository(
new BscEvmJsonRPCBlockRepository(repoCfg, pools)
new BscEvmJsonRPCBlockRepository(repoCfg, pools),
"bsc"
);
const evmRepository = new RateLimitedEvmJsonRPCBlockRepository(
new EvmJsonRPCBlockRepository(repoCfg, pools)
new EvmJsonRPCBlockRepository(repoCfg, pools),
"evm"
);
this.repositories.set("moonbeam-evmRepo", moonbeamRepository);
@ -272,7 +283,8 @@ export class RepositoriesBuilder {
);
const suiRepository = new RateLimitedSuiJsonRPCBlockRepository(
new SuiJsonRPCBlockRepository(suiProviderPool)
new SuiJsonRPCBlockRepository(suiProviderPool),
SUI_CHAIN
);
this.repositories.set("sui-repo", suiRepository);
@ -284,7 +296,8 @@ export class RepositoriesBuilder {
const pools = this.createDefaultProviderPools(chain);
const aptosRepository = new RateLimitedAptosJsonRPCBlockRepository(
new AptosJsonRPCBlockRepository(pools)
new AptosJsonRPCBlockRepository(pools),
APTOS_CHAIN
);
this.repositories.set("aptos-repo", aptosRepository);
@ -299,7 +312,8 @@ export class RepositoriesBuilder {
};
const CosmosRepository = new RateLimitedCosmosJsonRPCBlockRepository(
new CosmosJsonRPCBlockRepository(repoCfg, pools)
new CosmosJsonRPCBlockRepository(repoCfg, pools),
COSMOS_CHAIN
);
this.repositories.set("cosmos-repo", CosmosRepository);
@ -314,7 +328,8 @@ export class RepositoriesBuilder {
};
const wormchainRepository = new RateLimitedWormchainJsonRPCBlockRepository(
new WormchainJsonRPCBlockRepository(repoCfg, pools)
new WormchainJsonRPCBlockRepository(repoCfg, pools),
WORMCHAIN_CHAIN
);
this.repositories.set("wormchain-repo", wormchainRepository);
@ -330,7 +345,8 @@ export class RepositoriesBuilder {
const algoV2Pools = this.createDefaultProviderPools(chain, algoRpcs);
const algorandRepository = new RateLimitedAlgorandJsonRPCBlockRepository(
new AlgorandJsonRPCBlockRepository(algoV2Pools, algoIndexerPools)
new AlgorandJsonRPCBlockRepository(algoV2Pools, algoIndexerPools),
ALGORAND_CHAIN
);
this.repositories.set("algorand-repo", algorandRepository);

View File

@ -8,8 +8,12 @@ export class RateLimitedAlgorandJsonRPCBlockRepository
extends RateLimitedRPCRepository<AlgorandRepository>
implements AlgorandRepository
{
constructor(delegate: AlgorandRepository, opts: Options = { period: 10_000, limit: 1000 }) {
super(delegate, opts);
constructor(
delegate: AlgorandRepository,
chain: string,
opts: Options = { period: 10_000, limit: 1000, interval: 1_000, attempts: 10 }
) {
super(delegate, chain, opts);
this.logger = winston.child({ module: "RateLimitedAlgorandJsonRPCBlockRepository" });
}

View File

@ -9,8 +9,12 @@ export class RateLimitedAptosJsonRPCBlockRepository
extends RateLimitedRPCRepository<AptosRepository>
implements AptosRepository
{
constructor(delegate: AptosRepository, opts: Options = { period: 10_000, limit: 1000 }) {
super(delegate, opts);
constructor(
delegate: AptosRepository,
chain: string,
opts: Options = { period: 10_000, limit: 1000, interval: 1_000, attempts: 10 }
) {
super(delegate, chain, opts);
this.logger = winston.child({ module: "RateLimitedAptosJsonRPCBlockRepository" });
}

View File

@ -1,4 +1,6 @@
export type Options = {
attempts: number;
interval: number;
period: number;
limit: number;
};

View File

@ -9,8 +9,12 @@ export class RateLimitedCosmosJsonRPCBlockRepository
extends RateLimitedRPCRepository<CosmosRepository>
implements CosmosRepository
{
constructor(delegate: CosmosRepository, opts: Options = { period: 10_000, limit: 1000 }) {
super(delegate, opts);
constructor(
delegate: CosmosRepository,
chain: string,
opts: Options = { period: 10_000, limit: 1000, interval: 1_000, attempts: 10 }
) {
super(delegate, chain, opts);
this.logger = winston.child({ module: "RateLimitedCosmosJsonRPCBlockRepository" });
}

View File

@ -83,9 +83,18 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}
for (let result of results) {
if (result) {
combinedResults.push(result);
// If result is not present or error is present, we throw an error to re-try get the transaction
if (!result || !result.result || result.error) {
const requestDetails = JSON.stringify(reqs.find((r) => r.id === result?.id));
this.logger.error(
`[${chain}][getBlocks] Cannot process this tx: ${requestDetails}, error ${JSON.stringify(
result?.error
)} on ${provider.getUrl()}`
);
provider.setProviderOffline();
throw new Error("Unable to parse result of eth_getBlockByNumber");
}
combinedResults.push(result);
}
}
@ -119,17 +128,13 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
};
}
const msg = `[${chain}][getBlocks] Got error ${
response?.error?.message
} for eth_getBlockByNumber for ${response?.id ?? idx} on ${chainCfg.rpc.hostname}`;
this.logger.error(msg);
throw new Error(
`Unable to parse result of eth_getBlockByNumber[${chain}] for ${
response?.id ?? idx
}: ${msg}`
this.logger.error(
`[${chain}][getBlocks] Got error ${
response?.error?.message
} for eth_getBlockByNumber for ${response?.id ?? idx} on ${provider.getUrl()}`
);
provider.setProviderOffline();
throw new Error("Unable to parse result of eth_getBlockByNumber");
}
)
.reduce((acc: Record<string, EvmBlock>, block: EvmBlock) => {
@ -138,14 +143,13 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}, {});
}
// If we get an error, we'll mark the provider as offline
provider.setProviderOffline();
throw new Error(
`Unable to parse ${
this.logger.error(
`[${chain}][getBlocks] Unable to parse ${
combinedResults?.length ?? 0
} blocks for eth_getBlockByNumber for numbers ${blockNumbers} on ${chainCfg.rpc.hostname}`
} blocks for eth_getBlockByNumber for numbers ${blockNumbers} on ${provider.getUrl()}`
);
provider.setProviderOffline();
throw new Error("Unable to parse result of eth_getBlockByNumber");
}
async getFilteredLogs(chain: string, filter: EvmLogFilter): Promise<EvmLog[]> {
@ -178,14 +182,13 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}
if (response.error) {
// If we get an error, we'll mark the provider as offline
provider.setProviderOffline();
throw new Error(
this.logger.error(
`[${chain}][getFilteredLogs] Error fetching logs with message: ${
response.error.message
}. Filter: ${JSON.stringify(filter)}`
}. Filter: ${JSON.stringify(filter)} on ${provider.getUrl()}`
);
provider.setProviderOffline();
throw new Error("Unable to parse result of eth_getLogs");
}
const logs = response?.result;
@ -196,7 +199,7 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
this.logger.info(
`[${chain}][getFilteredLogs] Got ${logs.length} logs for ${this.describeFilter(
filter
)} from ${chainCfg.rpc.hostname}`
)} from ${provider.getUrl()}`
);
return logs.map((log) => ({
@ -208,10 +211,6 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}));
}
private describeFilter(filter: EvmLogFilter): string {
return `[addresses:${filter.addresses}][topics:${filter.topics}][blocks:${filter.fromBlock} - ${filter.toBlock}]`;
}
/**
* Loosely based on the wormhole-dashboard implementation (minus some specially crafted blocks when null result is obtained)
*/
@ -225,10 +224,12 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
? `${HEXADECIMAL_PREFIX}${blockNumberOrTag.toString(16)}`
: blockNumberOrTag;
const provider = getChainProvider(chain, this.pool);
const chainCfg = this.getCurrentChain(chain);
let response: { result?: EvmBlock; error?: ErrorBlock };
try {
response = await getChainProvider(chain, this.pool).post<typeof response>(
response = await provider.post<typeof response>(
{
jsonrpc: "2.0",
method: "eth_getBlockByNumber",
@ -242,7 +243,6 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}
const result = response?.result;
if (result && result.hash && result.number && result.timestamp) {
// Convert to our domain compatible type
return {
@ -252,11 +252,14 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
transactions: result.transactions,
};
}
throw new Error(
`Unable to parse result of eth_getBlockByNumber for ${blockNumberOrTag} on ${
chainCfg.rpc
}. Response error: ${JSON.stringify(response)}`
this.logger.error(
`[${chain}][getBlock] Unable to parse result of eth_getBlockByNumber for ${blockNumberOrTag} on ${provider.getUrl()}. Response error: ${JSON.stringify(
response
)}`
);
provider.setProviderOffline();
throw new Error("Unable to parse result of eth_getBlockByNumber");
}
/**
@ -274,6 +277,7 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
* This method divide in batches the object to send, because we have one restriction about how many object send to the endpoint
* the maximum is 10 object per request
*/
const provider = getChainProvider(chain, this.pool);
const batches = divideIntoBatches(hashNumbers, TX_BATCH_SIZE);
let combinedResults: ResultTransactionReceipt[] = [];
@ -290,7 +294,7 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}
try {
results = await getChainProvider(chain, this.pool).post<typeof results>(reqs, {
results = await provider.post<typeof results>(reqs, {
timeout: chainCfg.timeout,
retries: chainCfg.retries,
});
@ -299,11 +303,18 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
}
for (let result of results) {
if (result && result.result) {
combinedResults.push(result);
} else {
this.logger.warn(`[${chain}] Can not process this tx: ${JSON.stringify(reqs)}`);
// If result is not present or error is present, we throw an error to re-try get the transaction
if (!result || !result.result || result.error) {
const requestDetails = JSON.stringify(reqs.find((r) => r.id === result?.id));
this.logger.error(
`[${chain}][getTransactionReceipt] Cannot process this tx: ${requestDetails}, error ${JSON.stringify(
result.error
)} on ${provider.getUrl()}`
);
provider.setProviderOffline();
throw new Error("Unable to parse result of eth_getTransactionReceipt");
}
combinedResults.push(result);
}
}
@ -320,17 +331,15 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
};
}
const msg = `[${chain}][getTransactionReceipt] Got error ${
response?.error ?? JSON.stringify(response)
} for eth_getTransactionReceipt for ${JSON.stringify(hashNumbers)} on ${
chainCfg.rpc.hostname
}`;
this.logger.error(msg);
throw new Error(
`Unable to parse result of eth_getTransactionReceipt[${chain}] for ${response?.result}: ${msg}`
this.logger.error(
`[${chain}][getTransactionReceipt] Got error ${
response?.error ?? JSON.stringify(response)
} for eth_getTransactionReceipt for ${JSON.stringify(
hashNumbers
)} on ${provider.getUrl()}`
);
provider.setProviderOffline();
throw new Error("Unable to parse result of eth_getTransactionReceipt");
})
.reduce(
(acc: Record<string, ReceiptTransaction>, receiptTransaction: ReceiptTransaction) => {
@ -340,11 +349,12 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
{}
);
}
throw new Error(
`Unable to parse result of eth_getTransactionReceipt for ${JSON.stringify(hashNumbers)} on ${
chainCfg.rpc
}. Result error: ${JSON.stringify(combinedResults)}`
this.logger.error(
`[${chain}][getTransactionReceipt] Unable to parse result of eth_getTransactionReceipt
for ${JSON.stringify(hashNumbers)} on ${provider.getUrl()}`
);
provider.setProviderOffline();
throw new Error("Unable to parse result of eth_getTransactionReceipt");
}
protected handleError(chain: string, e: any, method: string, apiMethod: string) {
@ -371,6 +381,10 @@ export class EvmJsonRPCBlockRepository implements EvmBlockRepository {
retries: cfg.retries ?? 2,
};
}
private describeFilter(filter: EvmLogFilter): string {
return `[addresses:${filter.addresses}][topics:${filter.topics}][blocks:${filter.fromBlock} - ${filter.toBlock}]`;
}
}
type ErrorBlock = {
@ -391,6 +405,7 @@ type Log = {
};
type ResultTransactionReceipt = {
id: string;
result: ReceiptTransaction;
error?: ErrorBlock;
};

View File

@ -14,8 +14,12 @@ export class RateLimitedEvmJsonRPCBlockRepository
extends RateLimitedRPCRepository<EvmBlockRepository>
implements EvmBlockRepository
{
constructor(delegate: EvmBlockRepository, opts: Options = { period: 10_000, limit: 1000 }) {
super(delegate, opts);
constructor(
delegate: EvmBlockRepository,
chain: string,
opts: Options = { period: 10_000, limit: 1000, interval: 1_000, attempts: 10 }
) {
super(delegate, chain, opts);
this.logger = winston.child({ module: "RateLimitedEvmJsonRPCBlockRepository" });
}

View File

@ -10,8 +10,12 @@ export class RateLimitedSolanaSlotRepository
extends RateLimitedRPCRepository<SolanaSlotRepository>
implements SolanaSlotRepository
{
constructor(delegate: SolanaSlotRepository, opts: Options = { period: 10_000, limit: 50 }) {
super(delegate, opts);
constructor(
delegate: SolanaSlotRepository,
chain: string,
opts: Options = { period: 10_000, limit: 50, interval: 1_000, attempts: 10 }
) {
super(delegate, chain, opts);
this.logger = winston.child({ module: "RateLimitedSolanaSlotRepository" });
}

View File

@ -10,8 +10,12 @@ export class RateLimitedSuiJsonRPCBlockRepository
extends RateLimitedRPCRepository<SuiRepository>
implements SuiRepository
{
constructor(delegate: SuiRepository, opts: Options = { period: 10_000, limit: 1000 }) {
super(delegate, opts);
constructor(
delegate: SuiRepository,
chain: string,
opts: Options = { period: 10_000, limit: 1000, interval: 1_000, attempts: 10 }
) {
super(delegate, chain, opts);
this.logger = winston.child({ module: "RateLimitedSuiJsonRPCBlockRepository" });
}

View File

@ -12,8 +12,12 @@ export class RateLimitedWormchainJsonRPCBlockRepository
extends RateLimitedRPCRepository<WormchainRepository>
implements WormchainRepository
{
constructor(delegate: WormchainRepository, opts: Options = { period: 10_000, limit: 1000 }) {
super(delegate, opts);
constructor(
delegate: WormchainRepository,
chain: string,
opts: Options = { period: 10_000, limit: 1000, interval: 1_000, attempts: 10 }
) {
super(delegate, chain, opts);
this.logger = winston.child({ module: "RateLimitedWormchainJsonRPCBlockRepository" });
}

View File

@ -1,10 +1,15 @@
import { Logger } from "winston";
import {
InstrumentedEthersProvider,
InstrumentedConnection,
WeightedProvidersPool,
InstrumentedSuiClient,
ProviderPoolStrategy,
RpcConfig,
} from "@xlabs/rpc-pool";
type Weighted<T> = { provider: T; weight: number };
export class HealthyProvidersPool<
T extends InstrumentedEthersProvider | InstrumentedConnection | InstrumentedSuiClient
> extends WeightedProvidersPool<T> {
@ -19,4 +24,18 @@ export class HealthyProvidersPool<
unhealthyProviders[Math.floor(Math.random() * unhealthyProviders.length)];
return randomProvider;
}
static fromConfigs<
T extends InstrumentedEthersProvider | InstrumentedConnection | InstrumentedSuiClient
>(
rpcs: RpcConfig[],
createProvider: (rpcCfg: RpcConfig) => T,
logger?: Logger
): ProviderPoolStrategy<T> {
const providers: Weighted<T>[] = [];
for (const rpcCfg of rpcs) {
providers.push({ provider: createProvider(rpcCfg), weight: rpcCfg.weight ?? 1 });
}
return new HealthyProvidersPool(providers, logger);
}
}

View File

@ -52,6 +52,10 @@ export class InstrumentedHttpProvider {
this.health.serviceOfflineSince = new Date();
}
public getUrl(): string {
return this.url;
}
private async execute<T>(
method: string,
body?: any,

View File

@ -18,7 +18,12 @@ const repoMock = {
describe("RateLimitedSolanaSlotRepository", () => {
describe("getLatestSlot", () => {
it("should fail when ratelimit is exceeded", async () => {
const repository = new RateLimitedSolanaSlotRepository(repoMock, { period: 1000, limit: 1 });
const repository = new RateLimitedSolanaSlotRepository(repoMock, "solana", {
period: 1000,
limit: 1,
interval: 1_000,
attempts: 10,
});
await repository.getLatestSlot("confirmed");
await expect(repository.getLatestSlot("confirmed")).rejects.toThrowError();
@ -27,7 +32,12 @@ describe("RateLimitedSolanaSlotRepository", () => {
describe("getBlock", () => {
it("should fail when ratelimit is exceeded", async () => {
const repository = new RateLimitedSolanaSlotRepository(repoMock, { period: 1000, limit: 1 });
const repository = new RateLimitedSolanaSlotRepository(repoMock, "solana", {
period: 1000,
limit: 1,
interval: 1_000,
attempts: 10,
});
await repository.getBlock(1);
const failure = await repository.getBlock(1);
@ -38,7 +48,12 @@ describe("RateLimitedSolanaSlotRepository", () => {
describe("getSignaturesForAddress", () => {
it("should fail when ratelimit is exceeded", async () => {
const repository = new RateLimitedSolanaSlotRepository(repoMock, { period: 1000, limit: 1 });
const repository = new RateLimitedSolanaSlotRepository(repoMock, "solana", {
period: 1000,
limit: 1,
interval: 1_000,
attempts: 10,
});
await repository.getSignaturesForAddress("address", "before", "after", 1);
await expect(
@ -49,7 +64,12 @@ describe("RateLimitedSolanaSlotRepository", () => {
describe("getTransactions", () => {
it("should fail when ratelimit is exceeded", async () => {
const repository = new RateLimitedSolanaSlotRepository(repoMock, { period: 1000, limit: 1 });
const repository = new RateLimitedSolanaSlotRepository(repoMock, "solana", {
period: 1000,
limit: 1,
interval: 1_000,
attempts: 10,
});
await repository.getTransactions([]);
await expect(repository.getTransactions([])).rejects.toThrowError();

View File

@ -36,6 +36,217 @@ metadata:
data:
testnet-jobs.json: |-
[
{
"id": "poll-log-message-sent-ethereum-sepolia",
"chain": "ethereum-sepolia",
"source": {
"action": "PollEvm",
"records": "GetEvmTransactions",
"config": {
"blockBatchSize": 100,
"commitment": "finalized",
"interval": 15000,
"filters": [
{
"addresses": ["0x7865fAfC2db2093669d92c0F33AeEF291086BEFD"],
"type": "CCTP",
"topics": ["0x8c5261668696ce22758910d05bab8f186d6eb247ceac2af2e82c7dc17669b036"],
"strategy": "GetTransactionsByLogFiltersStrategy"
}
],
"chain": "ethereum-sepolia",
"chainId": 10002
}
},
"handlers": [
{
"action": "HandleEvmTransactions",
"target": "influx",
"mapper": "evmLogCircleMessageSentMapper",
"config": {
"abi": "event MessageSent (bytes message)",
"metricName": "process_message_sent_event",
"environment": "testnet"
}
}
]
},
{
"id": "poll-log-message-sent-avalanche",
"chain": "avalanche",
"source": {
"action": "PollEvm",
"records": "GetEvmTransactions",
"config": {
"blockBatchSize": 100,
"commitment": "finalized",
"interval": 15000,
"fromBlock": 16853349,
"filters": [
{
"addresses": ["0xa9fb1b3009dcb79e2fe346c16a604b8fa8ae0a79"],
"type": "CCTP",
"topics": ["0x8c5261668696ce22758910d05bab8f186d6eb247ceac2af2e82c7dc17669b036"],
"strategy": "GetTransactionsByLogFiltersStrategy"
}
],
"chain": "avalanche",
"chainId": 6
}
},
"handlers": [
{
"action": "HandleEvmTransactions",
"target": "influx",
"mapper": "evmLogCircleMessageSentMapper",
"config": {
"abi": "event MessageSent (bytes message)",
"metricName": "process_message_sent_event",
"environment": "testnet"
}
}
]
},
{
"id": "poll-log-message-sent-optimism-sepolia",
"chain": "optimism-sepolia",
"source": {
"action": "PollEvm",
"records": "GetEvmTransactions",
"config": {
"blockBatchSize": 100,
"commitment": "finalized",
"interval": 15000,
"filters": [
{
"addresses": ["0x7865fAfC2db2093669d92c0F33AeEF291086BEFD"],
"type": "CCTP",
"topics": ["0x8c5261668696ce22758910d05bab8f186d6eb247ceac2af2e82c7dc17669b036"],
"strategy": "GetTransactionsByLogFiltersStrategy"
}
],
"chain": "optimism-sepolia",
"chainId": 10005
}
},
"handlers": [
{
"action": "HandleEvmTransactions",
"target": "influx",
"mapper": "evmLogCircleMessageSentMapper",
"config": {
"abi": "event MessageSent (bytes message)",
"metricName": "process_message_sent_event",
"environment": "testnet"
}
}
]
},
{
"id": "poll-log-message-sent-arbitrum-sepolia",
"chain": "arbitrum-sepolia",
"source": {
"action": "PollEvm",
"records": "GetEvmTransactions",
"config": {
"blockBatchSize": 1000,
"commitment": "finalized",
"interval": 15000,
"filters": [
{
"addresses": ["0xaCF1ceeF35caAc005e15888dDb8A3515C41B4872"],
"type": "CCTP",
"topics": ["0x8c5261668696ce22758910d05bab8f186d6eb247ceac2af2e82c7dc17669b036"],
"strategy": "GetTransactionsByLogFiltersStrategy"
}
],
"chain": "arbitrum-sepolia",
"chainId": 10003
}
},
"handlers": [
{
"action": "HandleEvmTransactions",
"target": "influx",
"mapper": "evmLogCircleMessageSentMapper",
"config": {
"abi": "event MessageSent (bytes message)",
"metricName": "process_message_sent_event",
"environment": "testnet"
}
}
]
},
{
"id": "poll-log-message-sent-base-sepolia",
"chain": "base-sepolia",
"source": {
"action": "PollEvm",
"records": "GetEvmTransactions",
"config": {
"blockBatchSize": 100,
"commitment": "finalized",
"interval": 15000,
"filters": [
{
"addresses": ["0x7865fAfC2db2093669d92c0F33AeEF291086BEFD"],
"type": "CCTP",
"topics": ["0x8c5261668696ce22758910d05bab8f186d6eb247ceac2af2e82c7dc17669b036"],
"strategy": "GetTransactionsByLogFiltersStrategy"
}
],
"chain": "base-sepolia",
"chainId": 10004
}
},
"handlers": [
{
"action": "HandleEvmTransactions",
"target": "influx",
"mapper": "evmLogCircleMessageSentMapper",
"config": {
"abi": "event MessageSent (bytes message)",
"metricName": "process_message_sent_event",
"environment": "testnet"
}
}
]
},
{
"id": "poll-log-message-sent-polygon-sepolia",
"chain": "polygon-sepolia",
"source": {
"action": "PollEvm",
"records": "GetEvmTransactions",
"config": {
"blockBatchSize": 100,
"commitment": "finalized",
"interval": 15000,
"filters": [
{
"addresses": ["0x7865fAfC2db2093669d92c0F33AeEF291086BEFD"],
"type": "CCTP",
"topics": ["0x8c5261668696ce22758910d05bab8f186d6eb247ceac2af2e82c7dc17669b036"],
"strategy": "GetTransactionsByLogFiltersStrategy"
}
],
"chain": "polygon-sepolia",
"chainId": 10007
}
},
"handlers": [
{
"action": "HandleEvmTransactions",
"target": "influx",
"mapper": "evmLogCircleMessageSentMapper",
"config": {
"abi": "event MessageSent (bytes message)",
"metricName": "process_message_sent_event",
"environment": "testnet"
}
}
]
}
]
mainnet-jobs.json: |-
[
@ -151,7 +362,7 @@ data:
"action": "PollEvm",
"records": "GetEvmTransactions",
"config": {
"blockBatchSize": 100,
"blockBatchSize": 1000,
"commitment": "finalized",
"interval": 15000,
"filters": [
@ -298,6 +509,26 @@ spec:
value: {{ .NODE_OPTIONS }}
- name: JOBS_DIR
value: /home/node/app/jobs
{{ if .POLYGON_SEPOLIA_RPCS }}
- name: POLYGON_SEPOLIA_RPCS
value: '{{ .POLYGON_SEPOLIA_RPCS }}'
{{ end }}
{{ if .ETHEREUM_SEPOLIA_RPCS }}
- name: ETHEREUM_SEPOLIA_RPCS
value: '{{ .ETHEREUM_SEPOLIA_RPCS }}'
{{ end }}
{{ if .ARBITRUM_SEPOLIA_RPCS }}
- name: ARBITRUM_SEPOLIA_RPCS
value: '{{ .ARBITRUM_SEPOLIA_RPCS }}'
{{ end }}
{{ if .BASE_SEPOLIA_RPCS }}
- name: BASE_SEPOLIA_RPCS
value: '{{ .BASE_SEPOLIA_RPCS }}'
{{ end }}
{{ if .OPTIMISM_SEPOLIA_RPCS }}
- name: OPTIMISM_SEPOLIA_RPCS
value: '{{ .OPTIMISM_SEPOLIA_RPCS }}'
{{ end }}
{{ if .ETHEREUM_RPCS }}
- name: ETHEREUM_RPCS
value: '{{ .ETHEREUM_RPCS }}'

View File

@ -238,7 +238,7 @@ data:
"action": "PollEvm",
"records": "GetEvmTransactions",
"config": {
"blockBatchSize": 10,
"blockBatchSize": 100,
"commitment": "finalized",
"interval": 60000,
"filters": [
@ -504,7 +504,7 @@ data:
"action": "PollEvm",
"records": "GetEvmTransactions",
"config": {
"blockBatchSize": 10,
"blockBatchSize": 100,
"commitment": "finalized",
"interval": 60000,
"filters": [