adding finality param to get solana block

This commit is contained in:
matias martinez 2023-11-22 15:39:24 -03:00
parent a8ce767d98
commit 782d8d168c
5 changed files with 19 additions and 12 deletions

View File

@ -6,13 +6,13 @@ import { solana } from "../../entities";
*/ */
export class HandleSolanaTransactions<T> { export class HandleSolanaTransactions<T> {
cfg: HandleSolanaTxConfig; cfg: HandleSolanaTxConfig;
mapper: (txs: solana.Transaction, args: { programId: string }) => Promise<T>; mapper: (txs: solana.Transaction, args: { programId: string }) => Promise<T[]>;
target?: (parsed: T[]) => Promise<void>; target?: (parsed: T[]) => Promise<void>;
logger: winston.Logger = winston.child({ module: "HandleSolanaTransaction" }); logger: winston.Logger = winston.child({ module: "HandleSolanaTransaction" });
constructor( constructor(
cfg: HandleSolanaTxConfig, cfg: HandleSolanaTxConfig,
mapper: (txs: solana.Transaction) => Promise<T>, mapper: (txs: solana.Transaction) => Promise<T[]>,
target?: (parsed: T[]) => Promise<void> target?: (parsed: T[]) => Promise<void>
) { ) {
this.cfg = cfg; this.cfg = cfg;
@ -33,7 +33,7 @@ export class HandleSolanaTransactions<T> {
let mappedItems: T[] = []; let mappedItems: T[] = [];
for (const tx of filteredItems) { for (const tx of filteredItems) {
const result = await this.mapper(tx, { programId: this.cfg.programId }); const result = await this.mapper(tx, { programId: this.cfg.programId });
if (result) { if (result.length) {
mappedItems = mappedItems.concat(result); mappedItems = mappedItems.concat(result);
} }
} }

View File

@ -81,7 +81,7 @@ export class PollSolanaTransactions extends RunPollingJob {
this.cfg.signaturesLimit this.cfg.signaturesLimit
); );
this.logger.debug( this.logger.debug(
`Got ${sigs.length} signatures for address ${this.cfg.programId} between ${fromBlock.blockTime} and ${toBlock.blockTime}` `Got ${sigs.length} signatures for address ${this.cfg.programId} between sigs ${afterSignature} and ${beforeSignature} [slots: ${range.fromSlot} - ${range.toSlot}]`
); );
const txs = await this.slotRepository.getTransactions(sigs); const txs = await this.slotRepository.getTransactions(sigs);
@ -101,7 +101,7 @@ export class PollSolanaTransactions extends RunPollingJob {
} }
private getSlotRange(latestSlot: number): { fromSlot: number; toSlot: number } { private getSlotRange(latestSlot: number): { fromSlot: number; toSlot: number } {
let fromSlot = this.slotCursor ? this.slotCursor + 1 : (this.cfg.fromSlot ?? latestSlot); let fromSlot = this.slotCursor ? this.slotCursor + 1 : this.cfg.fromSlot ?? latestSlot;
// cfg.fromSlot is present and is greater than current slot height, then we allow to skip slots. // cfg.fromSlot is present and is greater than current slot height, then we allow to skip slots.
if (this.slotCursor && this.cfg.fromSlot && this.cfg.fromSlot > this.slotCursor) { if (this.slotCursor && this.cfg.fromSlot && this.cfg.fromSlot > this.slotCursor) {
fromSlot = this.cfg.fromSlot; fromSlot = this.cfg.fromSlot;
@ -114,7 +114,9 @@ export class PollSolanaTransactions extends RunPollingJob {
} }
if (fromSlot > toSlot) { if (fromSlot > toSlot) {
throw new Error(`Invalid slot range: fromSlot=${fromSlot} toSlot=${toSlot}`); throw new Error(
`Invalid slot range: fromSlot=${fromSlot} toSlot=${toSlot}. Might be cause we are up to date.`
);
} }
return { fromSlot, toSlot }; return { fromSlot, toSlot };
@ -141,12 +143,13 @@ export class PollSolanaTransactions extends RunPollingJob {
slot: number, slot: number,
nextSlot: (slot: number) => number nextSlot: (slot: number) => number
): Promise<solana.Block> { ): Promise<solana.Block> {
const blockResult = await this.slotRepository.getBlock(slot); const blockResult = await this.slotRepository.getBlock(slot, this.cfg.commitment);
if (blockResult.isOk()) { if (blockResult.isOk()) {
return Promise.resolve(blockResult.getValue()); return Promise.resolve(blockResult.getValue());
} }
if (blockResult.getError().skippedSlot() || blockResult.getError().noBlockOrBlockTime()) { if (blockResult.getError().skippedSlot() || blockResult.getError().noBlockOrBlockTime()) {
this.logger.warn(`No block found for slot ${slot}, trying next slot ${nextSlot(slot)}`);
return this.findValidBlock(nextSlot(slot), nextSlot); return this.findValidBlock(nextSlot(slot), nextSlot);
} }

View File

@ -18,7 +18,7 @@ export interface EvmBlockRepository {
export interface SolanaSlotRepository { export interface SolanaSlotRepository {
getLatestSlot(commitment: string): Promise<number>; getLatestSlot(commitment: string): Promise<number>;
getBlock(slot: number): Promise<Fallible<solana.Block, solana.Failure>>; getBlock(slot: number, finality?: string): Promise<Fallible<solana.Block, solana.Failure>>;
getSignaturesForAddress( getSignaturesForAddress(
address: string, address: string,
beforeSig: string, beforeSig: string,

View File

@ -4,6 +4,7 @@ import {
PublicKey, PublicKey,
VersionedTransactionResponse, VersionedTransactionResponse,
SolanaJSONRPCError, SolanaJSONRPCError,
Finality,
} from "@solana/web3.js"; } from "@solana/web3.js";
import { Fallible, solana } from "../../domain/entities"; import { Fallible, solana } from "../../domain/entities";
@ -20,9 +21,12 @@ export class Web3SolanaSlotRepository implements SolanaSlotRepository {
return this.connection.getSlot(commitment as Commitment); return this.connection.getSlot(commitment as Commitment);
} }
getBlock(slot: number): Promise<Fallible<solana.Block, solana.Failure>> { getBlock(slot: number, finality?: string): Promise<Fallible<solana.Block, solana.Failure>> {
return this.connection return this.connection
.getBlock(slot, { maxSupportedTransactionVersion: 0 }) .getBlock(slot, {
maxSupportedTransactionVersion: 0,
commitment: finality === "finalized" || finality === "confirmed" ? finality : undefined,
})
.then((block) => { .then((block) => {
if (block === null) { if (block === null) {
return Fallible.error<solana.Block, solana.Failure>( return Fallible.error<solana.Block, solana.Failure>(

View File

@ -49,7 +49,7 @@ describe("PollSolanaTransactions", () => {
await thenWaitForAssertion( await thenWaitForAssertion(
() => expect(metadataGetSpy).toHaveBeenCalledWith(cfg.id), () => expect(metadataGetSpy).toHaveBeenCalledWith(cfg.id),
() => expect(getLatestSlotSpy).toHaveBeenCalledWith(cfg.commitment), () => expect(getLatestSlotSpy).toHaveBeenCalledWith(cfg.commitment),
() => expect(getBlockSpy).toHaveBeenCalledWith(currentSlot), () => expect(getBlockSpy).toHaveBeenCalledWith(currentSlot, cfg.commitment),
() => expect(handlerSpy).toHaveBeenCalledWith(expectedTxs), () => expect(handlerSpy).toHaveBeenCalledWith(expectedTxs),
() => () =>
expect(getSigsSpy).toHaveBeenCalledWith( expect(getSigsSpy).toHaveBeenCalledWith(
@ -80,7 +80,7 @@ describe("PollSolanaTransactions", () => {
pollSolanaTransactions.run([handlers.working]); pollSolanaTransactions.run([handlers.working]);
await thenWaitForAssertion( await thenWaitForAssertion(
() => expect(getBlockSpy).toHaveBeenCalledWith(lastSlot + 1), () => expect(getBlockSpy).toHaveBeenCalledWith(lastSlot + 1, cfg.commitment),
() => expect(handlerSpy).toHaveBeenCalledWith(expectedTxs), () => expect(handlerSpy).toHaveBeenCalledWith(expectedTxs),
() => () =>
expect(getSigsSpy).toHaveBeenCalledWith( expect(getSigsSpy).toHaveBeenCalledWith(