adding solana worker

This commit is contained in:
matias martinez 2023-11-22 08:50:13 -03:00
parent e44ef12df0
commit a8ce767d98
6 changed files with 169 additions and 11 deletions

View File

@ -1,5 +1,12 @@
{
"platforms": {
"solana": {
"name": "solana",
"network": "devnet",
"chainId": 1,
"rpcs": ["https://api.mainnet-beta.solana.com"],
"timeout": 10000
},
"ethereum": {
"name": "ethereum",
"network": "mainnet",

View File

@ -8,7 +8,7 @@
"test": "jest",
"test:coverage": "jest --collectCoverage=true",
"build": "tsc",
"dev": "USE_ENV_FILE=true ts-node src/start.ts",
"dev": "ts-node src/start.ts",
"prettier": "prettier --write ."
},
"author": "chase-45",

View File

@ -4,15 +4,15 @@ import { solana } from "../../entities";
/**
* Handling means mapping and forward to a given target if present.
*/
export class HandleSolanaTransaction<T> {
export class HandleSolanaTransactions<T> {
cfg: HandleSolanaTxConfig;
mapper: (txs: solana.Transaction, args: { programId: string }) => T;
mapper: (txs: solana.Transaction, args: { programId: string }) => Promise<T>;
target?: (parsed: T[]) => Promise<void>;
logger: winston.Logger = winston.child({ module: "HandleSolanaTransaction" });
constructor(
cfg: HandleSolanaTxConfig,
mapper: (txs: solana.Transaction) => T,
mapper: (txs: solana.Transaction) => Promise<T>,
target?: (parsed: T[]) => Promise<void>
) {
this.cfg = cfg;
@ -30,9 +30,12 @@ export class HandleSolanaTransaction<T> {
return !hasError;
});
const mappedItems = [];
let mappedItems: T[] = [];
for (const tx of filteredItems) {
mappedItems.push(await this.mapper(tx, { programId: this.cfg.programId }));
const result = await this.mapper(tx, { programId: this.cfg.programId });
if (result) {
mappedItems = mappedItems.concat(result);
}
}
if (this.target) {

View File

@ -81,7 +81,7 @@ export class PollSolanaTransactions extends RunPollingJob {
this.cfg.signaturesLimit
);
this.logger.debug(
`Got ${sigs.length} signatures for address ${this.cfg.programId} between ${beforeSignature} and ${afterSignature}`
`Got ${sigs.length} signatures for address ${this.cfg.programId} between ${fromBlock.blockTime} and ${toBlock.blockTime}`
);
const txs = await this.slotRepository.getTransactions(sigs);
@ -101,7 +101,7 @@ export class PollSolanaTransactions extends RunPollingJob {
}
private getSlotRange(latestSlot: number): { fromSlot: number; toSlot: number } {
let fromSlot = this.slotCursor ? this.slotCursor + 1 : this.cfg.fromSlot ?? latestSlot;
let fromSlot = this.slotCursor ? this.slotCursor + 1 : (this.cfg.fromSlot ?? latestSlot);
// cfg.fromSlot is present and is greater than current slot height, then we allow to skip slots.
if (this.slotCursor && this.cfg.fromSlot && this.cfg.fromSlot > this.slotCursor) {
fromSlot = this.cfg.fromSlot;

View File

@ -16,7 +16,7 @@ import {
StatRepository,
} from "../../domain/repositories";
import { FileMetadataRepo, SnsEventRepository } from "./index";
import { HandleSolanaTransaction } from "../../domain/actions/solana/HandleSolanaTransactions";
import { HandleSolanaTransactions } from "../../domain/actions/solana/HandleSolanaTransactions";
import { solanaLogMessagePublishedMapper, evmLogMessagePublishedMapper } from "../mappers";
import log from "../log";
@ -129,12 +129,12 @@ export class StaticJobRepository implements JobRepository {
return instance.handle.bind(instance);
};
const handleSolanaTx = async (config: any, target: string, mapper: any) => {
const instance = new HandleSolanaTransaction(config, mapper, await this.getTarget(target));
const instance = new HandleSolanaTransactions(config, mapper, await this.getTarget(target));
return instance.handle.bind(instance);
};
this.handlers.set("HandleEvmLogs", handleEvmLogs);
this.handlers.set("HandleSolanaTransaction", handleSolanaTx);
this.handlers.set("HandleSolanaTransactions", handleSolanaTx);
}
private async getTarget(target: string): Promise<(items: any[]) => Promise<void>> {

View File

@ -0,0 +1,148 @@
---
apiVersion: v1
kind: Service
metadata:
name: {{ .NAME }}-solana
namespace: {{ .NAMESPACE }}
labels:
app: {{ .NAME }}-solana
spec:
selector:
app: {{ .NAME }}-solana
ports:
- port: {{ .PORT }}
targetPort: {{ .PORT }}
name: {{ .NAME }}-solana
protocol: TCP
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: blockchain-watcher-solana-pvc
namespace: {{ .NAMESPACE }}
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Mi
storageClassName: gp2
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .NAME }}-solana-jobs
namespace: {{ .NAMESPACE }}
data:
testnet-jobs.json: |-
[
{
"id": "poll-log-message-published-solana",
"chain": "solana",
"source": {
"action": "PollSolanaTransactions",
"config": {
"slotBatchSize": 10000,
"commitment": "processed",
"interval": 10000,
"programId": "3u8hJUVTA4jH1wYAyUur7FFZVQ8H635K3tSHHF4ssjQ5",
"chain": "solana"
}
},
"handlers": [
{
"action": "HandleSolanaTransactions",
"target": "sns",
"mapper": "solanaLogMessagePublishedMapper",
"config": {
"programId": "3u8hJUVTA4jH1wYAyUur7FFZVQ8H635K3tSHHF4ssjQ5"
}
}
]
}
]
mainnet-jobs.json: |-
[
{
"id": "poll-log-message-published-solana",
"chain": "solana",
"source": {
"action": "PollSolanaTransactions",
"config": {
"slotBatchSize": 10000,
"commitment": "processed",
"interval": 10000,
"signaturesLimit": 1000,
"programId": "worm2ZoG2kUd4vFXhvjh93UUH596ayRfgQ2MgjNMTth",
"chain": "solana"
}
},
"handlers": [
{
"action": "HandleSolanaTransactions",
"target": "sns",
"mapper": "solanaLogMessagePublishedMapper",
"config": {
"programId": "worm2ZoG2kUd4vFXhvjh93UUH596ayRfgQ2MgjNMTth"
}
}
]
}
]
---
apiVersion: v1
kind: Pod
metadata:
name: {{ .NAME }}-solana
namespace: {{ .NAMESPACE }}
labels:
app: {{ .NAME }}-solana
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "{{ .PORT }}"
spec:
restartPolicy: Always
terminationGracePeriodSeconds: 30
serviceAccountName: event-watcher
containers:
- name: {{ .NAME }}
image: {{ .IMAGE_NAME }}
env:
- name: NODE_ENV
value: {{ .NODE_ENV }}
- name: PORT
value: "{{ .PORT }}"
- name: LOG_LEVEL
value: {{ .LOG_LEVEL }}
- name: BLOCKCHAIN_ENV
value: {{ .BLOCKCHAIN_ENV }}
- name: DRY_RUN_ENABLED
value: "{{ .DRY_RUN_ENABLED }}"
- name: SNS_TOPIC_ARN
value: {{ .SNS_TOPIC_ARN }}
- name: SNS_REGION
value: {{ .SNS_REGION }}
- name: JOBS_DIR
value: /home/node/app/jobs
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}
cpu: {{ .RESOURCES_LIMITS_CPU }}
requests:
memory: {{ .RESOURCES_REQUESTS_MEMORY }}
cpu: {{ .RESOURCES_REQUESTS_CPU }}
volumeMounts:
- name: metadata-volume
mountPath: /home/node/app/metadata-repo
- name: jobs-volume
mountPath: /home/node/app/jobs
volumes:
- name: metadata-volume
persistentVolumeClaim:
claimName: blockchain-watcher-solana-pvc
- name: jobs-volume
configMap:
name: {{ .NAME }}-solana-jobs
items:
- key: {{ .BLOCKCHAIN_ENV }}-jobs.json
path: jobs.json