solana: add retry handling for 429s

This commit is contained in:
matias martinez 2023-11-28 14:54:07 -03:00
parent aaf9614230
commit 933e4dfab0
6 changed files with 43 additions and 12 deletions

View File

@ -25,7 +25,7 @@
"timeout": 10000, "timeout": 10000,
"rateLimit": { "rateLimit": {
"period": 10000, "period": 10000,
"limit": 20 "limit": 40
} }
}, },
"ethereum": { "ethereum": {

View File

@ -37,7 +37,7 @@ export abstract class RunPollingJob {
items = await this.get(); items = await this.get();
await Promise.all(handlers.map((handler) => handler(items))); await Promise.all(handlers.map((handler) => handler(items)));
} catch (e: Error | any) { } catch (e: Error | any) {
this.logger.error("Error processing items", e, e.stack); this.logger.error("Error processing items", e);
this.statRepo?.count("job_runs_total", { id: this.id, status: "error" }); this.statRepo?.count("job_runs_total", { id: this.id, status: "error" });
await setTimeout(this.interval); await setTimeout(this.interval);
continue; continue;

View File

@ -40,7 +40,9 @@ export class RepositoriesBuilder {
if (chain === "solana") { if (chain === "solana") {
const cfg = this.cfg.platforms[chain]; const cfg = this.cfg.platforms[chain];
const solanaSlotRepository = new RateLimitedSolanaSlotRepository( const solanaSlotRepository = new RateLimitedSolanaSlotRepository(
new Web3SolanaSlotRepository(new Connection(cfg.rpcs[0])), new Web3SolanaSlotRepository(
new Connection(cfg.rpcs[0], { disableRetryOnRateLimit: true })
),
cfg.rateLimit cfg.rateLimit
); );
this.repositories.set("solana-slotRepo", solanaSlotRepository); this.repositories.set("solana-slotRepo", solanaSlotRepository);

View File

@ -1,18 +1,36 @@
import { Commitment } from "@solana/web3.js"; import { Circuit, Ratelimit, RatelimitError, Retry, RetryMode } from "mollitia";
import { Circuit, Ratelimit, RatelimitError } from "mollitia";
import { solana } from "../../../domain/entities"; import { solana } from "../../../domain/entities";
import { SolanaSlotRepository } from "../../../domain/repositories"; import { SolanaSlotRepository } from "../../../domain/repositories";
import { Fallible, SolanaFailure, ErrorType } from "../../../domain/errors"; import { Fallible, SolanaFailure, ErrorType } from "../../../domain/errors";
import winston from "../../../infrastructure/log";
export class RateLimitedSolanaSlotRepository implements SolanaSlotRepository { export class RateLimitedSolanaSlotRepository implements SolanaSlotRepository {
delegate: SolanaSlotRepository; delegate: SolanaSlotRepository;
breaker: Circuit; breaker: Circuit;
logger: winston.Logger = winston.child({ module: "RateLimitedSolanaSlotRepository" });
constructor(delegate: SolanaSlotRepository, opts: Options = { period: 10_000, limit: 50 }) { constructor(delegate: SolanaSlotRepository, opts: Options = { period: 10_000, limit: 50 }) {
this.delegate = delegate; this.delegate = delegate;
this.breaker = new Circuit({ this.breaker = new Circuit({
options: { options: {
modules: [new Ratelimit({ limitPeriod: opts.period, limitForPeriod: opts.limit })], modules: [
new Ratelimit({ limitPeriod: opts.period, limitForPeriod: opts.limit }),
new Retry({
attempts: 1,
interval: 10_000,
fastFirst: false,
mode: RetryMode.LINEAR,
factor: 1,
onRejection: (err: Error | any) => {
if (err.message?.startsWith("429 Too Many Requests")) {
this.logger.warn("Got 429 from solana RPC node. Retrying in 10 secs...");
return 10_000; // Wait 10 secs if we get a 429
} else {
return false; // Dont retry, let the caller handle it
}
},
}),
],
}, },
}); });
} }
@ -26,13 +44,23 @@ export class RateLimitedSolanaSlotRepository implements SolanaSlotRepository {
const result: Fallible<solana.Block, SolanaFailure> = await this.breaker const result: Fallible<solana.Block, SolanaFailure> = await this.breaker
.fn(() => this.delegate.getBlock(slot, finality)) .fn(() => this.delegate.getBlock(slot, finality))
.execute(); .execute();
if (!result.isOk()) {
throw result.getError();
}
return result; return result;
} catch (err) { } catch (err: SolanaFailure | any) {
// this needs more handling due to delegate.getBlock returning a Fallible with a SolanaFailure
if (err instanceof RatelimitError) { if (err instanceof RatelimitError) {
return Fallible.error(new SolanaFailure(0, err.message, ErrorType.Ratelimit)); return Fallible.error(new SolanaFailure(0, err.message, ErrorType.Ratelimit));
} }
return Fallible.error(new SolanaFailure(err, "unknown error")); if (err instanceof SolanaFailure) {
return Fallible.error(err);
}
return Fallible.error(new SolanaFailure(err, err?.message ?? "unknown error"));
} }
} }

View File

@ -43,7 +43,7 @@ export class Web3SolanaSlotRepository implements SolanaSlotRepository {
return Fallible.error(new SolanaFailure(err.code, err.message)); return Fallible.error(new SolanaFailure(err.code, err.message));
} }
return Fallible.error(new SolanaFailure(0, err.message)); throw err;
}); });
} }

View File

@ -46,8 +46,9 @@ data:
"commitment": "confirmed", "commitment": "confirmed",
"interval": 5000, "interval": 5000,
"signaturesLimit": 100, "signaturesLimit": 100,
"programId": "3u8hJUVTA4jH1wYAyUur7FFZVQ8H635K3tSHHF4ssjQ5", "programId": "Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o",
"chain": "solana" "chain": "solana",
"network": "devnet"
} }
}, },
"handlers": [ "handlers": [
@ -56,7 +57,7 @@ data:
"target": "sns", "target": "sns",
"mapper": "solanaLogMessagePublishedMapper", "mapper": "solanaLogMessagePublishedMapper",
"config": { "config": {
"programId": "3u8hJUVTA4jH1wYAyUur7FFZVQ8H635K3tSHHF4ssjQ5" "programId": "Bridge1p5gheXUvJ6jGWGeCsgPKgnE3YgdGKRVCMY9o"
} }
} }
] ]