adding solana repo implementation

This commit is contained in:
matias martinez 2023-11-21 09:37:21 -03:00
parent c8be7bfceb
commit a64a13878b
19 changed files with 349 additions and 839 deletions

View File

@ -0,0 +1,32 @@
import { solana } from "../../entities";
/**
* Handling means mapping and forward to a given target if present.
*/
export class HandleSolanaTransaction<T> {
cfg: HandleSolanaTxConfig;
mapper: (txs: solana.Transaction) => T;
target?: (parsed: T[]) => Promise<void>;
constructor(
cfg: HandleSolanaTxConfig,
mapper: (txs: solana.Transaction) => T,
target?: (parsed: T[]) => Promise<void>
) {
this.cfg = cfg;
this.mapper = mapper;
this.target = target;
}
public async handle(txs: solana.Transaction[]): Promise<T[]> {
const mappedItems = txs.filter((tx) => !tx.meta?.err).map((tx) => this.mapper(tx));
if (this.target) await this.target(mappedItems);
return mappedItems;
}
}
export type HandleSolanaTxConfig = {
programId: string;
};

View File

@ -1,12 +1,13 @@
import winston from "winston";
import { RunPollingJob } from "../RunPollingJob";
import { MetadataRepository, SolanaSlotRepository } from "../../repositories";
import { MetadataRepository, SolanaSlotRepository, StatRepository } from "../../repositories";
import { solana } from "../../entities";
export class PollSolanaTransactions extends RunPollingJob {
private cfg: PollSolanaTransactionsConfig;
private metadataRepo: MetadataRepository<PollSolanaTransactionsMetadata>;
private slotRepository: SolanaSlotRepository;
private statsRepo: StatRepository;
private latestSlot?: number;
private slotCursor?: number;
@ -16,12 +17,14 @@ export class PollSolanaTransactions extends RunPollingJob {
constructor(
metadataRepo: MetadataRepository<any>,
slotRepo: SolanaSlotRepository,
statsRepo: StatRepository,
cfg: PollSolanaTransactionsConfig
) {
super(1_000);
this.metadataRepo = metadataRepo;
this.slotRepository = slotRepo;
this.statsRepo = statsRepo;
this.cfg = cfg;
this.logger = winston.child({ module: "PollSolanaTransactions", label: this.cfg.id });
}

View File

@ -1,4 +1,14 @@
export type Transaction = {};
export type Transaction = {
slot: number;
transaction: {
message: Message;
signatures: string[];
};
meta?: {
err?: {} | string | null;
};
blockTime?: number | null;
};
export type CompiledInstruction = {
programIdIndex: number;
@ -33,7 +43,7 @@ export type MessageCompiledInstruction = {
export type ConfirmedSignatureInfo = {
signature: string;
err: any | null;
err?: any | null;
blockTime?: number | null;
};
@ -43,9 +53,9 @@ export enum ErrorType {
}
export class Failure extends Error {
readonly code?: number;
readonly code?: number | unknown;
readonly type?: ErrorType;
constructor(code: number, message: string, type?: ErrorType) {
constructor(code: number | unknown, message: string, type?: ErrorType) {
super(message);
this.code = code;

View File

@ -1,4 +1,5 @@
import { SNSClient, SNSClientConfig } from "@aws-sdk/client-sns";
import { Connection } from "@solana/web3.js";
import { Config } from "./config";
import {
SnsEventRepository,
@ -7,6 +8,7 @@ import {
FileMetadataRepo,
PromStatRepository,
StaticJobRepository,
Web3SolanaSlotRepository,
} from "./repositories";
import { HttpClient } from "./repositories/HttpClient";
@ -32,13 +34,26 @@ export class RepositoriesBuilder {
this.repositories.set("metadata", new FileMetadataRepo(this.cfg.metadata.dir));
this.cfg.supportedChains.forEach((chain) => {
const httpClient = this.createHttpClient(this.cfg.platforms[chain].timeout);
const repoCfg: EvmJsonRPCBlockRepositoryCfg = {
chain,
rpc: this.cfg.platforms[chain].rpcs[0],
timeout: this.cfg.platforms[chain].timeout,
};
this.repositories.set(`${chain}-evmRepo`, new EvmJsonRPCBlockRepository(repoCfg, httpClient));
if (!this.cfg.platforms[chain]) throw new Error(`No config for chain ${chain}`);
if (chain === "solana") {
const cfg = this.cfg.platforms[chain];
const solanaSlotRepository = new Web3SolanaSlotRepository(new Connection(cfg.rpcs[0]));
this.repositories.set("solana-slotRepo", solanaSlotRepository);
}
if (chain === "ethereum") {
const httpClient = this.createHttpClient(this.cfg.platforms[chain].timeout);
const repoCfg: EvmJsonRPCBlockRepositoryCfg = {
chain,
rpc: this.cfg.platforms[chain].rpcs[0],
timeout: this.cfg.platforms[chain].timeout,
};
this.repositories.set(
`${chain}-evmRepo`,
new EvmJsonRPCBlockRepository(repoCfg, httpClient)
);
}
});
this.repositories.set(
@ -57,36 +72,32 @@ export class RepositoriesBuilder {
}
public getEvmBlockRepository(chain: string): EvmJsonRPCBlockRepository {
const repo = this.repositories.get(`${chain}-evmRepo`);
if (!repo) throw new Error(`No EvmJsonRPCBlockRepository for chain ${chain}`);
return repo;
return this.getRepo(`${chain}-evmRepo`);
}
public getSnsEventRepository(): SnsEventRepository {
const repo = this.repositories.get("sns");
if (!repo) throw new Error(`No SnsEventRepository`);
return repo;
return this.getRepo("sns");
}
public getMetadataRepository(): FileMetadataRepo {
const repo = this.repositories.get("metadata");
if (!repo) throw new Error(`No FileMetadataRepo`);
return repo;
return this.getRepo("metadata");
}
public getStatsRepository(): PromStatRepository {
const repo = this.repositories.get("metrics");
if (!repo) throw new Error(`No PromStatRepository`);
return repo;
return this.getRepo("metrics");
}
public getJobsRepository(): JobRepository {
const repo = this.repositories.get("jobs");
if (!repo) throw new Error(`No JobRepository`);
return this.getRepo("jobs");
}
public getSolanaSlotRepository(): Web3SolanaSlotRepository {
return this.getRepo("solana-slotRepo");
}
private getRepo(name: string): any {
const repo = this.repositories.get(name);
if (!repo) throw new Error(`No repository ${name}`);
return repo;
}

View File

@ -0,0 +1,73 @@
import { BigNumber } from "ethers";
import { solana, LogFoundEvent, LogMessagePublished } from "../../domain/entities";
export const solanaLogMessagePublishedMapper = (
tx: solana.Transaction
): LogFoundEvent<LogMessagePublished>[] => {
if (!tx || !tx.blockTime) {
throw new Error(`Block time is missing for tx in slot ${tx?.slot} @ time ${tx?.blockTime}`);
}
/*
const message = res.transaction.message;
const accountKeys = isLegacyMessage(message)
? message.accountKeys
: message.staticAccountKeys;
const programIdIndex = accountKeys.findIndex((i) => i.toBase58() === WORMHOLE_PROGRAM_ID);
const instructions = message.compiledInstructions;
const innerInstructions =
res.meta?.innerInstructions?.flatMap((i) =>
i.instructions.map(normalizeCompileInstruction),
) || [];
const whInstructions = innerInstructions
.concat(instructions)
.filter((i) => i.programIdIndex === programIdIndex);
for (const instruction of whInstructions) {
// skip if not postMessage instruction
const instructionId = instruction.data;
if (instructionId[0] !== 0x01) continue;
const accountId = accountKeys[instruction.accountKeyIndexes[1]];
const { message } = await getPostedMessage(connection, accountId.toBase58(), COMMITMENT);
const {
sequence,
emitterAddress,
emitterChain,
submissionTime: timestamp,
nonce,
payload,
consistencyLevel,
} = message || {};
// We store `blockNumber` with the slot number.
const blockNumber = res.slot.toString();
const chainId = emitterChain;
const emitter = emitterAddress.toString('hex');
const parsePayload = payload.toString('hex');
const parseSequence = Number(sequence);
const txHash = res.transaction.signatures[0];
}
*/
return [
{
name: "log-message-published",
address: log.address, //
chainId: 1,
txHash: log.transactionHash,
blockHeight: log.blockNumber,
blockTime: log.blockTime,
attributes: {
sender: parsedArgs[0],
sequence: (parsedArgs[1] as BigNumber).toNumber(),
payload: parsedArgs[3],
nonce: parsedArgs[2],
consistencyLevel: parsedArgs[4],
},
},
];
};

View File

@ -3,6 +3,8 @@ import {
PollEvmLogs,
PollEvmLogsConfig,
PollEvmLogsConfigProps,
PollSolanaTransactions,
PollSolanaTransactionsConfig,
RunPollingJob,
} from "../../domain/actions";
import { JobDefinition, Handler, LogFoundEvent } from "../../domain/entities";
@ -10,11 +12,13 @@ import {
EvmBlockRepository,
JobRepository,
MetadataRepository,
SolanaSlotRepository,
StatRepository,
} from "../../domain/repositories";
import { FileMetadataRepo, SnsEventRepository } from "./index";
import { evmLogMessagePublishedMapper } from "../mappers/evmLogMessagePublishedMapper";
import log from "../log";
import { HandleSolanaTransaction } from "../../domain/actions/solana/HandleSolanaTransactions";
export class StaticJobRepository implements JobRepository {
private fileRepo: FileMetadataRepo;
@ -28,6 +32,7 @@ export class StaticJobRepository implements JobRepository {
private metadataRepo: MetadataRepository<any>;
private statsRepo: StatRepository;
private snsRepo: SnsEventRepository;
private solanaSlotRepo: SolanaSlotRepository;
constructor(
path: string,
@ -37,6 +42,7 @@ export class StaticJobRepository implements JobRepository {
metadataRepo: MetadataRepository<any>;
statsRepo: StatRepository;
snsRepo: SnsEventRepository;
solanaSlotRepo: SolanaSlotRepository;
}
) {
this.fileRepo = new FileMetadataRepo(path);
@ -44,6 +50,7 @@ export class StaticJobRepository implements JobRepository {
this.metadataRepo = repos.metadataRepo;
this.statsRepo = repos.statsRepo;
this.snsRepo = repos.snsRepo;
this.solanaSlotRepo = repos.solanaSlotRepo;
this.dryRun = dryRun;
this.fill();
}
@ -94,7 +101,13 @@ export class StaticJobRepository implements JobRepository {
id: jobDef.id,
})
);
const pollSolanaTransactions = (jobDef: JobDefinition) =>
new PollSolanaTransactions(this.metadataRepo, this.solanaSlotRepo, this.statsRepo, {
...(jobDef.source.config as PollSolanaTransactionsConfig),
id: jobDef.id,
});
this.sources.set("PollEvmLogs", pollEvmLogs);
this.sources.set("PollSolanaTransactions", pollSolanaTransactions);
this.mappers.set("evmLogMessagePublishedMapper", evmLogMessagePublishedMapper);
@ -114,7 +127,21 @@ export class StaticJobRepository implements JobRepository {
return instance.handle.bind(instance);
};
const handleSolanaTx = async (config: any, target: string, mapper: any) => {
const instance = new HandleSolanaTransaction(config, mapper, await this.getTarget(target));
return instance.handle.bind(instance);
};
this.handlers.set("HandleEvmLogs", handleEvmLogs);
this.handlers.set("HandleSolanaTransaction", handleSolanaTx);
}
private async getTarget(target: string): Promise<(items: any[]) => Promise<void>> {
const maybeTarget = this.targets.get(this.dryRun ? "dummy" : target);
if (!maybeTarget) {
throw new Error(`Target ${target} not found`);
}
return maybeTarget();
}
}

View File

@ -0,0 +1,67 @@
import { Commitment, Connection, PublicKey, SolanaJSONRPCError } from "@solana/web3.js";
import {} from "../../domain/entities";
import { Fallible, solana } from "../../domain/entities";
import { SolanaSlotRepository } from "../../domain/repositories";
export class Web3SolanaSlotRepository implements SolanaSlotRepository {
connection: Connection;
constructor(connection: Connection) {
this.connection = connection;
}
getLatestSlot(commitment: string): Promise<number> {
return this.connection.getSlot(commitment as Commitment);
}
getBlock(slot: number): Promise<Fallible<solana.Block, solana.Failure>> {
return this.connection
.getBlock(slot, { maxSupportedTransactionVersion: 0 })
.then((block) => {
if (block === null) {
return Fallible.error<solana.Block, solana.Failure>(
new solana.Failure(0, "Block not found")
);
}
return Fallible.ok<solana.Block, solana.Failure>(block as solana.Block);
})
.catch((err) => {
if (err instanceof SolanaJSONRPCError) {
return Fallible.error(new solana.Failure(err.code, err.message));
}
return Fallible.error(new solana.Failure(0, err.message));
});
}
getSignaturesForAddress(
address: string,
beforeSig: string,
afterSig: string,
limit: number
): Promise<solana.ConfirmedSignatureInfo[]> {
return this.connection.getSignaturesForAddress(new PublicKey(address), {
limit: limit,
before: beforeSig,
until: afterSig,
});
}
async getTransactions(sigs: solana.ConfirmedSignatureInfo[]): Promise<solana.Transaction[]> {
const txs = await this.connection.getTransactions(
sigs.map((sig) => sig.signature),
{ maxSupportedTransactionVersion: 0 }
);
if (txs.length !== sigs.length) {
throw new Error(`Expected ${sigs.length} transactions, but got ${txs.length} instead`);
}
return txs
.filter((tx) => tx !== null)
.map((tx, i) => {
return tx as solana.Transaction;
});
}
}

View File

@ -12,3 +12,4 @@ export * from "./SnsEventRepository";
export * from "./EvmJsonRPCBlockRepository";
export * from "./PromStatRepository";
export * from "./StaticJobRepository";
export * from "./Web3SolanaSlotRepository";

View File

@ -1,37 +0,0 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import AbstractHandler from "./handlers/AbstractHandler";
export default abstract class AbstractWatcher {
//store class fields from constructor
public watcherName: string;
public environment: Network;
public events: AbstractHandler<any>[];
public chain: ChainId;
public rpc: string;
public logger: any;
//TODO add persistence module(s) as class fields
//or, alternatively, pull necessary config from the persistence module here
//TODO resumeBlock is needed for the query processor
constructor(
watcherName: string,
environment: Network,
events: AbstractHandler<any>[],
chain: ChainId,
rpc: string,
logger: any
) {
this.watcherName = watcherName;
this.environment = environment;
this.events = events;
this.chain = chain;
this.rpc = rpc;
this.logger = logger;
}
abstract startWebsocketProcessor(): Promise<void>;
abstract startQueryProcessor(): Promise<void>;
abstract startGapProcessor(): Promise<void>;
}

View File

@ -1,26 +0,0 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import AbstractWatcher from "./AbstractWatcher";
import AbstractHandler from "./handlers/AbstractHandler";
export default class EvmWatcher extends AbstractWatcher {
constructor(
watcherName: string,
environment: Network,
events: AbstractHandler<any>[],
chain: ChainId,
rpc: string,
logger: any
) {
super(watcherName, environment, events, chain, rpc, logger);
}
async startWebsocketProcessor(): Promise<void> {
throw new Error("Method not implemented.");
}
async startQueryProcessor(): Promise<void> {
throw new Error("Method not implemented.");
}
async startGapProcessor(): Promise<void> {
throw new Error("Method not implemented.");
}
}

View File

@ -1,170 +0,0 @@
import { ChainId, ChainName, Network, toChainName } from "@certusone/wormhole-sdk";
import AbstractWatcher from "./AbstractWatcher";
import winston from "winston";
import EvmWatcher from "./EvmWatcher";
import AbstractHandler from "./handlers/AbstractHandler";
const MAINNET_RPCS: { [key in ChainName]?: string } = {
ethereum: process.env.ETH_RPC || "https://rpc.ankr.com/eth",
bsc: process.env.BSC_RPC || "https://bsc-dataseed2.defibit.io",
polygon: "https://rpc.ankr.com/polygon",
avalanche: "https://rpc.ankr.com/avalanche",
oasis: "https://emerald.oasis.dev",
algorand: "https://mainnet-api.algonode.cloud",
fantom: "https://rpc.ankr.com/fantom",
karura: "https://eth-rpc-karura.aca-api.network",
acala: "https://eth-rpc-acala.aca-api.network",
klaytn: "https://klaytn-mainnet-rpc.allthatnode.com:8551",
celo: "https://forno.celo.org",
moonbeam: "https://rpc.ankr.com/moonbeam",
arbitrum: "https://arb1.arbitrum.io/rpc",
optimism: "https://rpc.ankr.com/optimism",
aptos: "https://fullnode.mainnet.aptoslabs.com/",
near: "https://rpc.mainnet.near.org",
xpla: "https://dimension-lcd.xpla.dev",
terra2: "https://phoenix-lcd.terra.dev",
terra: "https://terra-classic-fcd.publicnode.com",
injective: "https://api.injective.network",
solana: process.env.SOLANA_RPC ?? "https://api.mainnet-beta.solana.com",
sui: "https://rpc.mainnet.sui.io",
};
const TESTNET_RPCS: { [key in ChainName]?: string } = {
bsc: "https://data-seed-prebsc-2-s3.binance.org:8545",
polygon: "https://matic-mumbai.chainstacklabs.com",
avalanche: "https://api.avax-test.network/ext/bc/C/rpc",
celo: "https://alfajores-forno.celo-testnet.org",
moonbeam: "https://rpc.api.moonbase.moonbeam.network",
};
const DEVNET_RPCS: { [key in ChainName]?: string } = {
ethereum: "http://localhost:8545",
bsc: "http://localhost:8546",
};
let hasLoadedDotEnv = false;
function loadDotEnv() {
if (readEnvironmentVariable("USE_ENV_FILE") === "true" && !hasLoadedDotEnv) {
//use the dotenv library to load in the .env file
require("dotenv").config();
hasLoadedDotEnv = true;
}
}
const readEnvironmentVariable = (name: string): string | null => {
const value = process.env[name];
if (!value) {
return null;
}
return value;
};
export type HandlerConfig = {
name: string;
config: any;
};
export type ConfigFile = {
network: Network;
supportedChains: ChainId[];
rpcs: { chain: ChainId; rpc: string }[];
handlers: HandlerConfig[];
};
export type Environment = {
network: Network;
configurationPath: any;
configuration: ConfigFile;
supportedChains: ChainId[];
rpcs: Map<ChainId, string>;
logger: winston.Logger;
};
let environment: Environment | null = null;
export function getEnvironment(): Environment {
if (environment) {
return environment;
} else {
throw new Error("Environment not set");
}
}
export async function initializeEnvironment(configurationPath: string) {
loadDotEnv();
const configuration = require(configurationPath);
const json: ConfigFile = JSON.parse(JSON.stringify(configuration));
const network = json.network;
if (network !== "MAINNET" && network !== "TESTNET" && network !== "DEVNET") {
throw new Error("Invalid network provided in the configuration file");
}
const supportedChains = json.supportedChains;
if (!supportedChains || supportedChains.length === 0) {
throw new Error("No supported chains provided in the configuration file");
}
const configRpcs = json.rpcs;
const rpcs = new Map<ChainId, string>();
for (const chain of supportedChains) {
configRpcs.forEach((item: any) => {
//double equals for string/int equality
if (item.chain == chain) {
if (!item.rpc) {
throw new Error(`No RPC provided for chain ${chain}`);
}
rpcs.set(chain, item.rpc);
}
});
}
environment = {
network,
configurationPath,
configuration,
supportedChains,
rpcs,
logger: winston.child({}),
};
}
//TODO this
export function createHandlers(env: Environment): AbstractHandler<any>[] {
const handlerArray: AbstractHandler<any>[] = [];
for (const handler of env.configuration.handlers) {
const handlerInstance = new (require(`./handlers/${handler.name}`).default)(
env,
handler.config
);
handlerArray.push(handlerInstance);
}
return handlerArray;
}
//TODO this process probably needs persistence
export function createWatchers(
env: Environment,
handlers: AbstractHandler<any>[]
): AbstractWatcher[] {
const watchers: AbstractWatcher[] = [];
for (const chain of env.supportedChains) {
const rpc = env.rpcs.get(chain);
if (!rpc) {
throw new Error(`No RPC provided for chain ${chain}`);
}
const watcher = new EvmWatcher(
toChainName(chain) + " Watcher",
env.network,
handlers,
chain,
rpc,
env.logger
);
watchers.push(watcher);
}
return watchers;
}

View File

@ -1,95 +0,0 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import { v4 as uuidv4 } from "uuid";
import { Environment } from "../environment";
const { createHash } = require("crypto");
export type SyntheticEvent<T> = {
eventName: string;
eventVersion: number;
eventChain: ChainId;
observationTimestamp: number;
uuid: string; //UUID for the event, good for deduping
dataHash: string; //sha256 hash of the event data, good for deduping
data: T;
};
export default abstract class AbstractHandler<T> {
public name: string;
public environment: Environment;
public config: any;
constructor(name: string, environment: Environment, config: any) {
this.name = name;
this.environment = environment;
this.config = config;
}
//These top level functions must always be implemented
public abstract shouldSupportChain(network: Network, chainId: ChainId): boolean;
//These functions must be implemented if an EVM chain is supported.
//Event to be listened for in ABI format. Example:
//"event Delivery(address indexed recipientContract, uint16 indexed sourceChain, uint64 indexed sequence, bytes32 deliveryVaaHash, uint8 status, uint256 gasUsed, uint8 refundStatus, bytes additionalStatusInfo, bytes overridesInfo)",
public abstract getEventAbiEvm(): string[] | null;
//Event to be listened for in signature format. Example:
//"Delivery(address,uint16,uint64,bytes32,uint8,uint256,uint8,bytes,bytes)"
public abstract getEventSignatureEvm(): string | null;
//This function will be called when a subscribed event is received from the ethers provider.
//TODO pretty sure the ...args is always an ethers.Event object
public abstract handleEventEvm(chainId: ChainId, ...args: any): Promise<SyntheticEvent<T>>;
public abstract getContractAddressEvm(network: Network, chainId: ChainId): string;
//*** Non-abstract functions
//Wrapper function to hand into EVM rpc provider.
//The wrapper is necessary otherwise we can't figure out which chain ID the event came from.
public getEventListener(handler: AbstractHandler<T>, chainId: ChainId) {
//@ts-ignore
return (...args) => {
// @ts-ignore
return handler
.handleEventEvm(chainId, ...args)
.then((records) => {
if (records) {
//TODO persist records. Unsure how exactly this happens atm.
//handler.persistRecord(record);
}
})
.catch((e) => {
console.error("Unexpected error processing the following event: ", chainId, ...args);
console.error(e);
});
};
}
public getName(): string {
return this.name;
}
public generateUuid(): string {
return uuidv4();
}
public getEnvironment(): Environment {
return this.environment;
}
public getConfig(): any {
return this.config;
}
protected wrapEvent(chainId: ChainId, version: number, data: T): SyntheticEvent<T> {
return {
eventName: this.name,
eventVersion: version,
eventChain: chainId,
observationTimestamp: Date.now(),
uuid: this.generateUuid(),
dataHash: createHash("sha256").update(JSON.stringify(data)).digest("hex"),
data: data,
};
}
}

View File

@ -1,100 +0,0 @@
import { ChainId, Network } from "@certusone/wormhole-sdk";
import AbstractHandler, { SyntheticEvent } from "./AbstractHandler";
import { Environment } from "../environment";
import { ethers } from "ethers";
const CURRENT_VERSION = 1;
type LogMessagePublishedConfig = {
chains: {
chainId: ChainId;
coreContract: string;
}[];
};
//VAA structure is the same on all chains.
//therefore, as long as the content of the VAA is readable on-chain, we should be able to create this object for all ecosystems
type LogMessagePublished = {
timestamp: number;
nonce: number;
emitterChain: ChainId;
emitterAddress: string;
sequence: number;
consistencyLevel: number;
payload: string;
hash: string;
};
export default class LogMessagePublishedHandler extends AbstractHandler<LogMessagePublished> {
constructor(env: Environment, config: any) {
super("LogMessagePublished", env, config);
}
public shouldSupportChain(network: Network, chainId: ChainId): boolean {
const found = this.config.chains.find((c: any) => c.chainId === chainId);
return found !== undefined;
}
public getEventAbiEvm(): string[] | null {
return [
"event LogMessagePublished(address indexed sender, uint64 sequence, uint32 nonce, bytes payload, uint8 consistencyLevel);",
];
}
public getEventSignatureEvm(): string | null {
return "LogMessagePublished(address,uint64,uint32,bytes,uint8)";
}
public async handleEventEvm(
chainId: ChainId,
event: ethers.Event
): Promise<SyntheticEvent<LogMessagePublished>> {
const abi = this.getEventAbiEvm() as string[];
const iface = new ethers.utils.Interface(abi);
const parsedLog = iface.parseLog(event);
const timestamp = (await event.getBlock()).timestamp; //TODO see if there's a way we can do this without pulling the block header
const nonce = parsedLog.args[2].toNumber();
const emitterChain = chainId;
const emitterAddress = parsedLog.args[0].toString("hex"); //TODO unsure if this is correct
const sequence = parsedLog.args[1].toNumber();
const consistencyLevel = parsedLog.args[4].toNumber();
const payload = parsedLog.args[3].toString("hex"); //TODO unsure if this is correct
//Encoding from Wormhole ts-sdk
// timestamp: body.readUInt32BE(0),
// nonce: body.readUInt32BE(4),
// emitterChain: body.readUInt16BE(8),
// emitterAddress: body.subarray(10, 42),
// sequence: body.readBigUInt64BE(42),
// consistencyLevel: body[50],
// payload: body.subarray(51),
const body = ethers.utils.defaultAbiCoder.encode(
["uint32", "uint32", "uint16", "bytes32", "uint64", "uint8", "bytes"],
[timestamp, nonce, chainId, emitterAddress, sequence, consistencyLevel, payload]
);
const hash = this.keccak256(body).toString("hex");
const parsedEvent = {
timestamp,
nonce,
emitterChain,
emitterAddress,
sequence,
consistencyLevel,
payload,
hash,
};
return Promise.resolve(this.wrapEvent(chainId, CURRENT_VERSION, parsedEvent));
}
public getContractAddressEvm(network: Network, chainId: ChainId): string {
const found = this.config.chains.find((c: any) => c.chainId === chainId);
if (found === undefined) {
throw new Error("Chain not supported");
}
return found.coreContract;
}
//TODO move to utils
private keccak256(data: ethers.BytesLike): Buffer {
return Buffer.from(ethers.utils.arrayify(ethers.utils.keccak256(data)));
}
}

View File

@ -1,105 +0,0 @@
//TODO refactor this to use the new event handler system
export {};
// import { TypedEvent } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts/common";
// import { ethers } from "ethers";
// import { getEnvironment } from "../environment";
// import { CHAIN_ID_TO_NAME, ChainId, Network } from "@certusone/wormhole-sdk";
// import AbstractHandler from "./AbstractHandler";
// //TODOD consider additional fields:
// // - timestamp
// // - block number
// // - call data
// // - transaction cost
// // - full transaction receipt
// export type WormholeRelayerDeliveryEventRecord = {
// environment: string;
// chainId: ChainId;
// txHash: string;
// recipientContract: string;
// sourceChain: number;
// sequence: string;
// deliveryVaaHash: string;
// status: number;
// gasUsed: string;
// refundStatus: number;
// additionalStatusInfo: string;
// overridesInfo: string;
// };
// //TODO implement this such that it pushes the event to a database
// async function persistRecord(record: WormholeRelayerDeliveryEventRecord) {
// console.log(JSON.stringify(record));
// }
// function getEventAbi(): string[] {
// return [
// "event Delivery(address indexed recipientContract, uint16 indexed sourceChain, uint64 indexed sequence, bytes32 deliveryVaaHash, uint8 status, uint256 gasUsed, uint8 refundStatus, bytes additionalStatusInfo, bytes overridesInfo)",
// ];
// }
// function getEventSignatureEvm(): string {
// return "Delivery(address,uint16,uint64,bytes32,uint8,uint256,uint8,bytes,bytes)";
// }
// async function handleEventEvm(
// chainId: ChainId,
// eventObj: ethers.Event
// ): Promise<SyntheticEvent<WormholeRelayerDeliveryEventRecord>> {
// console.log(
// `Received Delivery event for Wormhole Relayer Contract, txHash: ${eventObj.transactionHash}`
// );
// const environment = await getEnvironment();
// const txHash = eventObj.transactionHash;
// var abi = getEventAbi();
// var iface = new ethers.utils.Interface(abi);
// var parsedLog = iface.parseLog(eventObj);
// const recipientContract = parsedLog.args[0];
// const sourceChain = parsedLog.args[1];
// const sequence = parsedLog.args[2].toString();
// const deliveryVaaHash = parsedLog.args[3];
// const status = parsedLog.args[4];
// const gasUsed = parsedLog.args[5].toString();
// const refundStatus = parsedLog.args[6];
// const additionalStatusInfo = parsedLog.args[7];
// const overridesInfo = parsedLog.args[8];
// return AbstractHandler.prototype.wrapEvent(chainId, 1, {
// environment: environment.network,
// chainId,
// txHash,
// recipientContract,
// sourceChain,
// sequence,
// deliveryVaaHash,
// status,
// gasUsed,
// refundStatus,
// additionalStatusInfo,
// overridesInfo,
// });
// }
// function getContractAddressEvm(network: Network, chainId: ChainId): string {
// return ""; //TODO //getWormholeRelayerAddressWrapped(CHAIN_ID_TO_NAME[chainId], network);
// }
// function shouldSupportChain(network: Network, chainId: ChainId): boolean {
// return true; //TODO currently the supported chains are determined by the relayer contract, so this is trivially true.
// //It might not be true in the future.
// }
// const WormholeRelayerEventHandler: AbstractHandler =
// {
// name: "Wormhole Relayer Delivery Event Handler",
// getEventSignatureEvm,
// getEventAbiEvm: getEventAbi,
// handleEventEvm,
// persistRecord,
// getContractAddressEvm,
// shouldSupportChain,
// getEventListener: AbstractHandler.prototype.getEventListener, //TODO not any of this
// };
// export default WormholeRelayerEventHandler;

View File

@ -1,80 +0,0 @@
//TODO refactor this to use the new event handler system
export {};
// import { TypedEvent } from "@certusone/wormhole-sdk/lib/cjs/ethers-contracts/common";
// import { ethers } from "ethers";
// import { getEnvironment } from "../environment";
// import { CHAIN_ID_TO_NAME, ChainId, Network } from "@certusone/wormhole-sdk";
// import AbstractHandler from "./AbstractHandler";
// //TODOD consider additional fields:
// // - timestamp
// // - entire transaction receipt
// // - deduplication info
// export type WormholeRelayerSendEventRecord = {
// environment: string;
// chainId: ChainId;
// txHash: string;
// sequence: string;
// deliveryQuote: string;
// paymentForExtraReceiverValue: string;
// };
// //TODO implement this such that it pushes the event to a database
// async function persistRecord(record: WormholeRelayerSendEventRecord) {
// console.log(JSON.stringify(record));
// }
// function getEventAbiEvm(): string[] {
// return [
// "event SendEvent(uint64 indexed sequence, uint256 deliveryQuote, uint256 paymentForExtraReceiverValue)",
// ];
// }
// async function handleEventEvm(
// chainId: ChainId,
// eventObj: ethers.Event
// ): Promise<WormholeRelayerSendEventRecord | null> {
// console.log(
// `Received Send event for Wormhole Relayer Contract, txHash: ${eventObj.transactionHash}`
// );
// const abi = getEventAbiEvm();
// var iface = new ethers.utils.Interface(abi);
// var parsedLog = iface.parseLog(eventObj);
// return {
// //TODO env type broke
// environment: await getEnvironment().network,
// chainId: chainId,
// txHash: eventObj.transactionHash,
// sequence: parsedLog.args[0].toString(),
// deliveryQuote: parsedLog.args[1].toString(),
// paymentForExtraReceiverValue: parsedLog.args[2].toString(),
// };
// }
// function getContractAddressEvm(network: Network, chainId: ChainId): string {
// return ""; //TODO //getWormholeRelayerAddressWrapped(CHAIN_ID_TO_NAME[chainId], network);
// }
// function shouldSupportChain(network: Network, chainId: ChainId): boolean {
// return true; //TODO currently the supported chains are determined by the relayer contract, so this is trivially true.
// //It might not be true in the future.
// }
// function getEventSignatureEvm(): string {
// return "SendEvent(uint64,uint256,uint256)";
// }
// const WormholeRelayerSendEventHandler: AbstractHandler<WormholeRelayerSendEventRecord> =
// {
// name: "Wormhole Relayer Send Event Handler",
// getEventSignatureEvm,
// getEventAbiEvm,
// handleEventEvm,
// persistRecord,
// getContractAddressEvm,
// shouldSupportChain,
// getEventListener: AbstractHandler.prototype.getEventListener, //TODO not any of this
// };
// export default WormholeRelayerSendEventHandler;

View File

@ -1,112 +0,0 @@
import {
createHandlers,
createWatchers,
getEnvironment,
initializeEnvironment,
} from "./environment";
import AbstractWatcher from "./AbstractWatcher";
async function run() {
initializeEnvironment(process.env.WATCHER_CONFIG_PATH || "../config/local.json");
const ENVIRONMENT = await getEnvironment();
//TODO instantiate the persistence module(s)
//TODO either hand the persistence module to the watcher, or pull necessary config from the persistence module here
//TODO the event watchers currently instantiate themselves, which isn't ideal. Refactor for next version
const handlers = createHandlers(ENVIRONMENT);
const watchers = createWatchers(ENVIRONMENT, handlers);
await runAllProcesses(watchers);
}
async function runAllProcesses(allWatchers: AbstractWatcher[]) {
//These are all the raw processes that will run, wrapped to contain their process ID and a top level error handler
let allProcesses = new Map<number, () => Promise<number>>();
let processIdCounter = 0;
//These are all the processes, keyed by their process ID, that we know are not currently running.
const unstartedProcesses = new Set<number>();
//Go through all the watchers, wrap their processes, and add them to the unstarted processes set
for (const watcher of allWatchers) {
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startWebsocketProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startQueryProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
allProcesses.set(
processIdCounter,
wrapProcessWithTracker(processIdCounter, watcher.startGapProcessor)
);
unstartedProcesses.add(processIdCounter);
processIdCounter++;
}
//If a process ends, reenqueue it into the unstarted processes set
const reenqueueCallback = (processId: number) => {
unstartedProcesses.add(processId);
};
//Every 5 seconds, try to start any unstarted processes
while (true) {
for (const processId of unstartedProcesses) {
const process = allProcesses.get(processId);
if (process) {
//TODO the process ID is a good key but is difficult to track to meaningful information
console.log(`Starting process ${processId}`);
unstartedProcesses.delete(processId);
process()
.then((processId) => {
reenqueueCallback(processId);
})
.catch((e) => {
reenqueueCallback(processId);
});
} else {
//should never happen
console.error(`Process ${processId} not found`);
}
}
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
function wrapProcessWithTracker(
processId: number,
process: () => Promise<void>
): () => Promise<number> {
return () => {
return process()
.then(() => {
console.log(`Process ${processId} exited via promise resolution`);
return processId;
})
.catch((e) => {
console.error(`Process ${processId} exited via promise rejection`);
console.error(e);
return processId;
});
};
}
//run should never stop, unless an unexpected fatal error occurs
run()
.then(() => {
console.log("run() finished");
})
.catch((e) => {
console.error(e);
console.error("Fatal error caused process to exit");
});

View File

@ -1,82 +0,0 @@
import { ethers } from "ethers";
const WEBSOCKET_PING_INTERVAL = 10000;
const WEBSOCKET_PONG_TIMEOUT = 5000;
const WEBSOCKET_RECONNECT_DELAY = 100;
const WebSocketProviderClass = (): new () => ethers.providers.WebSocketProvider =>
class {} as never;
export class WebSocketProvider extends WebSocketProviderClass() {
private provider?: ethers.providers.WebSocketProvider;
private events: ethers.providers.WebSocketProvider["_events"] = [];
private requests: ethers.providers.WebSocketProvider["_requests"] = {};
private handler = {
get(target: WebSocketProvider, prop: string, receiver: unknown) {
const value = target.provider && Reflect.get(target.provider, prop, receiver);
return value instanceof Function ? value.bind(target.provider) : value;
},
};
constructor(private providerUrl: string) {
super();
this.create();
return new Proxy(this, this.handler);
}
private create() {
if (this.provider) {
this.events = [...this.events, ...this.provider._events];
this.requests = { ...this.requests, ...this.provider._requests };
}
const provider = new ethers.providers.WebSocketProvider(
this.providerUrl,
this.provider?.network?.chainId
);
let pingInterval: NodeJS.Timer | undefined;
let pongTimeout: NodeJS.Timeout | undefined;
provider._websocket.on("open", () => {
pingInterval = setInterval(() => {
provider._websocket.ping();
pongTimeout = setTimeout(() => {
provider._websocket.terminate();
}, WEBSOCKET_PONG_TIMEOUT);
}, WEBSOCKET_PING_INTERVAL);
let event;
while ((event = this.events.pop())) {
provider._events.push(event);
provider._startEvent(event);
}
for (const key in this.requests) {
provider._requests[key] = this.requests[key];
provider._websocket.send(this.requests[key].payload);
delete this.requests[key];
}
});
provider._websocket.on("pong", () => {
if (pongTimeout) clearTimeout(pongTimeout);
});
provider._websocket.on("close", (code: number) => {
provider._wsReady = false;
if (pingInterval) clearInterval(pingInterval);
if (pongTimeout) clearTimeout(pongTimeout);
if (code !== 1000) {
setTimeout(() => this.create(), WEBSOCKET_RECONNECT_DELAY);
}
});
this.provider = provider;
}
}

View File

@ -14,7 +14,7 @@ async function run(): Promise<void> {
repos = new RepositoriesBuilder(configuration);
const startJobs = new StartJobs(repos.getJobsRepository());
await startServer(repos, startJobs);
await startServer(repos);
await startJobs.run();
// Just keep this running until killed
@ -29,7 +29,7 @@ async function run(): Promise<void> {
process.on("SIGTERM", handleShutdown);
}
const startServer = async (repos: RepositoriesBuilder, startJobs: StartJobs) => {
const startServer = async (repos: RepositoriesBuilder) => {
server = new WebServer(configuration.port, new HealthController(repos.getStatsRepository()));
};

View File

@ -0,0 +1,93 @@
import { expect, describe, it } from "@jest/globals";
import { solana } from "../../../src/domain/entities";
import { Web3SolanaSlotRepository } from "../../../src/infrastructure/repositories";
describe("Web3SolanaSlotRepository", () => {
describe("getLatestSlot", () => {
it("should return the latest slot number", async () => {
const connectionMock = {
getSlot: () => Promise.resolve(100),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const latestSlot = await repository.getLatestSlot("finalized");
expect(latestSlot).toBe(100);
});
});
describe("getBlock", () => {
it("should return a block for a given slot number", async () => {
const expected = {
blockTime: 100,
transactions: [],
};
const connectionMock = {
getBlock: (slot: number) => Promise.resolve(expected),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const block = (await repository.getBlock(100)).getValue();
expect(block.blockTime).toBe(expected.blockTime);
expect(block.transactions).toHaveLength(expected.transactions.length);
});
});
describe("getSignaturesForAddress", () => {
it("should return confirmed signature info for a given address", async () => {
const expected = [
{
signature: "signature1",
slot: 100,
},
{
signature: "signature2",
slot: 200,
},
];
const connectionMock = {
getSignaturesForAddress: () => Promise.resolve(expected),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const signatures = await repository.getSignaturesForAddress(
"BTcueXFisZiqE49Ne2xTZjHV9bT5paVZhpKc1k4L3n1c",
"before",
"after",
10
);
expect(signatures).toBe(expected);
});
});
describe("getTransactions", () => {
it("should return transactions for a given array of confirmed signature info", async () => {
const expected = [
{
signature: "signature1",
slot: 100,
transaction: {
message: {
accountKeys: [],
instructions: [],
},
},
},
];
const connectionMock = {
getTransactions: (sigs: solana.ConfirmedSignatureInfo[]) => Promise.resolve(expected),
};
const repository = new Web3SolanaSlotRepository(connectionMock as any);
const transactions = await repository.getTransactions([
{
signature: "signature1",
},
]);
expect(transactions).toStrictEqual(expected);
});
});
});