Feature/handle governor fly event processor (#1436)

* changes in fly component to send governor status events

* split processor in vaaProcessor and governor processor

* add governor processsor v1

* Add endpoint in tx-tracker to calculate txHash for a vaa id

* fly-event-processor integration with new tx-tracker endpoint and refactor

* Add governor vaas endpoint in api

* api, fix amount data type in governor vaas endpoint

* fly-event-processor normalize emitter and txHash

* fly-event-processor fix nodeGovernorVaa id

* fly-event-processor control array not empty in insert/delete many operation

* add index in nodeGovernorVaas collection by vaaId

* add prometheus metrics

* add tx-tracker url for fly-event-processor deployment

* Add sns attributes into sns messages

Co-authored-by: walker-16 <agpazos85@gmail.com>

* fix governor vaa endpoint empty response

---------

Co-authored-by: Fernando Torres <fert1335@gmail.com>
This commit is contained in:
walker-16 2024-05-31 15:38:21 -03:00 committed by GitHub
parent 8628d1f915
commit 7545acb77e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 1584 additions and 100 deletions

View File

@ -563,6 +563,29 @@ const docTemplate = `{
}
}
},
"/api/v1/governor/vaas": {
"get": {
"description": "Returns all vaas in Governor.",
"tags": [
"wormholescan"
],
"operationId": "governor-vaas",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/response.Response-array_governor_GovernorVaasResponse"
}
},
"400": {
"description": "Bad Request"
},
"500": {
"description": "Internal Server Error"
}
}
}
},
"/api/v1/health": {
"get": {
"description": "Health check",
@ -2431,6 +2454,35 @@ const docTemplate = `{
}
}
},
"governor.GovernorVaasResponse": {
"type": "object",
"properties": {
"amount": {
"type": "integer"
},
"chainId": {
"$ref": "#/definitions/vaa.ChainID"
},
"emitterAddress": {
"type": "string"
},
"releaseTime": {
"type": "string"
},
"sequence": {
"type": "string"
},
"status": {
"type": "string"
},
"txHash": {
"type": "string"
},
"vaaId": {
"type": "string"
}
}
},
"governor.MaxNotionalAvailableRecord": {
"type": "object",
"properties": {
@ -3156,6 +3208,20 @@ const docTemplate = `{
}
}
},
"response.Response-array_governor_GovernorVaasResponse": {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"$ref": "#/definitions/governor.GovernorVaasResponse"
}
},
"pagination": {
"$ref": "#/definitions/response.ResponsePagination"
}
}
},
"response.Response-array_governor_NotionalAvailable": {
"type": "object",
"properties": {

View File

@ -556,6 +556,29 @@
}
}
},
"/api/v1/governor/vaas": {
"get": {
"description": "Returns all vaas in Governor.",
"tags": [
"wormholescan"
],
"operationId": "governor-vaas",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/response.Response-array_governor_GovernorVaasResponse"
}
},
"400": {
"description": "Bad Request"
},
"500": {
"description": "Internal Server Error"
}
}
}
},
"/api/v1/health": {
"get": {
"description": "Health check",
@ -2424,6 +2447,35 @@
}
}
},
"governor.GovernorVaasResponse": {
"type": "object",
"properties": {
"amount": {
"type": "integer"
},
"chainId": {
"$ref": "#/definitions/vaa.ChainID"
},
"emitterAddress": {
"type": "string"
},
"releaseTime": {
"type": "string"
},
"sequence": {
"type": "string"
},
"status": {
"type": "string"
},
"txHash": {
"type": "string"
},
"vaaId": {
"type": "string"
}
}
},
"governor.MaxNotionalAvailableRecord": {
"type": "object",
"properties": {
@ -3149,6 +3201,20 @@
}
}
},
"response.Response-array_governor_GovernorVaasResponse": {
"type": "object",
"properties": {
"data": {
"type": "array",
"items": {
"$ref": "#/definitions/governor.GovernorVaasResponse"
}
},
"pagination": {
"$ref": "#/definitions/response.ResponsePagination"
}
}
},
"response.Response-array_governor_NotionalAvailable": {
"type": "object",
"properties": {

View File

@ -199,6 +199,25 @@ definitions:
notionalLimit:
type: integer
type: object
governor.GovernorVaasResponse:
properties:
amount:
type: integer
chainId:
$ref: '#/definitions/vaa.ChainID'
emitterAddress:
type: string
releaseTime:
type: string
sequence:
type: string
status:
type: string
txHash:
type: string
vaaId:
type: string
type: object
governor.MaxNotionalAvailableRecord:
properties:
availableNotional:
@ -672,6 +691,15 @@ definitions:
pagination:
$ref: '#/definitions/response.ResponsePagination'
type: object
response.Response-array_governor_GovernorVaasResponse:
properties:
data:
items:
$ref: '#/definitions/governor.GovernorVaasResponse'
type: array
pagination:
$ref: '#/definitions/response.ResponsePagination'
type: object
response.Response-array_governor_NotionalAvailable:
properties:
data:
@ -1546,6 +1574,21 @@ paths:
description: Internal Server Error
tags:
- wormholescan
/api/v1/governor/vaas:
get:
description: Returns all vaas in Governor.
operationId: governor-vaas
responses:
"200":
description: OK
schema:
$ref: '#/definitions/response.Response-array_governor_GovernorVaasResponse'
"400":
description: Bad Request
"500":
description: Internal Server Error
tags:
- wormholescan
/api/v1/health:
get:
description: Health check

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors"
mongoTypes "github.com/wormhole-foundation/wormhole-explorer/api/internal/mongo"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination"
"github.com/wormhole-foundation/wormhole-explorer/common/types"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -27,6 +28,7 @@ type Repository struct {
collections struct {
governorConfig *mongo.Collection
governorStatus *mongo.Collection
governorVaas *mongo.Collection
}
}
@ -37,9 +39,11 @@ func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository {
collections: struct {
governorConfig *mongo.Collection
governorStatus *mongo.Collection
governorVaas *mongo.Collection
}{
governorConfig: db.Collection("governorConfig"),
governorStatus: db.Collection("governorStatus"),
governorVaas: db.Collection("governorVaas"),
},
}
}
@ -1725,3 +1729,46 @@ func (r *Repository) IsVaaEnqueued(
return true, nil
}
type GovernorVaaDoc struct {
ID string `bson:"_id"`
ChainID vaa.ChainID `bson:"chainId"`
EmitterAddress string `bson:"emitterAddress"`
Sequence string `bson:"sequence"`
TxHash string `bson:"txHash"`
ReleaseTime time.Time `bson:"releaseTime"`
Amount mongoTypes.Uint64 `bson:"amount"`
Vaas []any `bson:"vaas"`
}
func (r *Repository) GetGovernorVaas(ctx context.Context) ([]GovernorVaaDoc, error) {
// left outer join on the `vaas` collection
pipeline := []bson.D{{{Key: "$lookup", Value: bson.D{
{Key: "from", Value: "vaas"},
{Key: "localField", Value: "_id"},
{Key: "foreignField", Value: "_id"},
{Key: "as", Value: "vaas"},
}}},
}
cur, err := r.collections.governorVaas.Aggregate(ctx, pipeline)
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed execute aggregate command to get governor enqueded vaas",
zap.Error(err), zap.String("requestID", requestID))
return nil, errors.WithStack(err)
}
// read results from cursor
var result []GovernorVaaDoc
err = cur.All(ctx, &result)
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed decoding cursor to []*VaaDoc",
zap.Error(err),
zap.String("requestID", requestID),
)
return nil, errors.WithStack(err)
}
return result, nil
}

View File

@ -228,3 +228,13 @@ func (s *Service) IsVaaEnqueued(ctx context.Context, chainID vaa.ChainID, emitte
isEnqueued, err := s.repo.IsVaaEnqueued(ctx, chainID, emitter, seq)
return isEnqueued, err
}
// GetGovernorVaas get enqueued vaas.
// Guardian api migration.
func (s *Service) GetGovernorVaas(ctx context.Context) ([]GovernorVaaDoc, error) {
result, err := s.repo.GetGovernorVaas(ctx)
if err != nil {
return nil, err
}
return result, nil
}

View File

@ -399,3 +399,38 @@ func (c *Controller) GetEnqueuedVaasByChainID(ctx *fiber.Ctx) error {
return ctx.JSON(enqueuedVaas)
}
// GetGovernorVaas godoc
// @Description Returns all vaas in Governor.
// @Tags wormholescan
// @ID governor-vaas
// @Success 200 {object} response.Response[[]governor.GovernorVaasResponse]
// @Failure 400
// @Failure 500
// @Router /api/v1/governor/vaas [get]
func (c *Controller) GetGovernorVaas(ctx *fiber.Ctx) error {
enqueuedVaas, err := c.srv.GetGovernorVaas(ctx.Context())
if err != nil {
return err
}
result := make([]GovernorVaasResponse, 0)
for _, v := range enqueuedVaas {
status := "pending"
if len(v.Vaas) > 0 {
status = "issued"
}
result = append(result, GovernorVaasResponse{
VaaID: v.ID,
ChainID: v.ChainID,
EmitterAddress: v.EmitterAddress,
Sequence: v.Sequence,
TxHash: v.TxHash,
ReleaseTime: v.ReleaseTime,
Amount: uint64(v.Amount),
Status: status,
})
}
return ctx.JSON(result)
}

View File

@ -0,0 +1,18 @@
package governor
import (
"time"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type GovernorVaasResponse struct {
VaaID string `json:"vaaId"`
ChainID vaa.ChainID `json:"chainId"`
EmitterAddress string `json:"emitterAddress"`
Sequence string `json:"sequence"`
TxHash string `json:"txHash"`
ReleaseTime time.Time `json:"releaseTime"`
Amount uint64 `json:"amount"`
Status string `json:"status"`
}

View File

@ -144,6 +144,7 @@ func RegisterRoutes(
enqueueVaas := governor.Group("/enqueued_vaas")
enqueueVaas.Get("/", governorCtrl.GetEnqueuedVaas)
enqueueVaas.Get("/:chain", governorCtrl.GetEnqueuedVaasByChainID)
governor.Get("/vaas", governorCtrl.GetGovernorVaas)
relays := api.Group("/relays")
relays.Get("/:chain/:emitter/:sequence", relaysCtrl.FindOne)

View File

@ -15,6 +15,7 @@ const DefaultTimeout = 30
var (
ErrCallEndpoint = errors.New("ERROR CALL ENPOINT")
ErrBadRequest = errors.New("BAD REQUEST")
ErrInternalError = errors.New("INTERNAL ERROR")
)
@ -84,3 +85,55 @@ func (c *TxTrackerAPIClient) Process(vaaID string) (*ProcessVaaResponse, error)
return nil, ErrInternalError
}
}
// CreateTxHashFunc represent a create tx hash function.
type CreateTxHashFunc func(vaaID, txHash string) (*TxHashResponse, error)
// TxHashResponse represent a create tx hash response.
type TxHashResponse struct {
NativeTxHash string `json:"nativeTxHash"`
}
// CreateTxHash create tx hash.
func (c *TxTrackerAPIClient) CreateTxHash(vaaID, txHash string) (*TxHashResponse, error) {
endpoint := fmt.Sprintf("%s/vaa/tx-hash", c.BaseURL)
// create request body.
payload := struct {
VaaID string `json:"id"`
TxHash string `json:"txHash"`
}{
VaaID: vaaID,
TxHash: txHash,
}
body, err := json.Marshal(payload)
if err != nil {
c.Logger.Error("error marshalling payload", zap.Error(err), zap.String("vaaID", vaaID), zap.String("txHash", txHash))
return nil, err
}
response, err := c.Client.Post(endpoint, "application/json", bytes.NewBuffer(body))
if err != nil {
c.Logger.Error("error call create tx hash endpoint",
zap.Error(err),
zap.String("vaaID", vaaID),
zap.String("txHash", txHash))
return nil, ErrCallEndpoint
}
defer response.Body.Close()
switch response.StatusCode {
case http.StatusOK:
var txHashResponse TxHashResponse
json.NewDecoder(response.Body).Decode(&txHashResponse)
return &txHashResponse, nil
case http.StatusBadRequest:
return nil, ErrBadRequest
case http.StatusInternalServerError:
return nil, ErrInternalError
default:
return nil, ErrInternalError
}
}

View File

@ -1,9 +1,11 @@
package repository
const (
VaaIdTxHash = "vaaIdTxHash"
TransferPrices = "transferPrices"
Vaas = "vaas"
DuplicateVaas = "duplicateVaas"
GuardianSets = "guardianSets"
VaaIdTxHash = "vaaIdTxHash"
TransferPrices = "transferPrices"
Vaas = "vaas"
DuplicateVaas = "duplicateVaas"
GuardianSets = "guardianSets"
NodeGovernorVaas = "nodeGovernorVaas"
GovernorVaas = "governorVaas"
)

View File

@ -6,4 +6,5 @@ metadata:
namespace: {{ .NAMESPACE }}
data:
aws-region: {{ .SQS_AWS_REGION }}
duplicate-vaa-sqs-url: {{ .DUPLICATE_VAA_SQS_URL }}
duplicate-vaa-sqs-url: {{ .DUPLICATE_VAA_SQS_URL }}
governor-sqs-url: {{ .GOVERNOR_SQS_URL }}

View File

@ -15,3 +15,5 @@ AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
CONSUMER_WORKER_SIZE=1
TX_TRACKER_URL=http://wormscan-tx-tracker.wormscan/api
TX_TRACKER_TIMEOUT=30

View File

@ -15,3 +15,5 @@ AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
CONSUMER_WORKER_SIZE=1
TX_TRACKER_URL=http://wormscan-tx-tracker.wormscan-testnet/api
TX_TRACKER_TIMEOUT=30

View File

@ -15,3 +15,5 @@ AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
CONSUMER_WORKER_SIZE=1
TX_TRACKER_URL=http://wormscan-tx-tracker.wormscan/api
TX_TRACKER_TIMEOUT=30

View File

@ -15,3 +15,5 @@ AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
CONSUMER_WORKER_SIZE=1
TX_TRACKER_URL=http://wormscan-tx-tracker.wormscan-testnet/api
TX_TRACKER_TIMEOUT=30

View File

@ -64,6 +64,11 @@ spec:
configMapKeyRef:
name: fly-event-processor
key: duplicate-vaa-sqs-url
- name: GOVERNOR_SQS_URL
valueFrom:
configMapKeyRef:
name: fly-event-processor
key: governor-sqs-url
- name: AWS_REGION
valueFrom:
configMapKeyRef:
@ -86,6 +91,10 @@ spec:
value: "{{ .CONSUMER_WORKER_SIZE }}"
- name: GUARDIAN_API_PROVIDER_PATH
value: "/opt/fly-event-processor/guardian-provider.json"
- name: TX_TRACKER_URL
value: "{{ .TX_TRACKER_URL }}"
- name: TX_TRACKER_TIMEOUT
value: "{{ .TX_TRACKER_TIMEOUT }}"
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}

View File

@ -3,6 +3,7 @@ package service
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
@ -17,16 +18,22 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/vaa"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor"
governorConsumer "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer/governor"
vaaConsumer "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer/vaa"
governorProcessor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/governor"
vaaprocessor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/vaa"
txTracker "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/config"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/vaa"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics"
)
@ -61,23 +68,35 @@ func Run() {
// create a new repository
repository := storage.NewRepository(logger, db.Database)
//TxTracker createTxHash client
createTxHashFunc, err := newCreateTxHashFunc(cfg, logger)
if err != nil {
logger.Fatal("failed to initialize VAA parser", zap.Error(err))
}
// create a new processor
processor := processor.NewProcessor(guardianApiProviderPool, repository, logger, metrics)
dupVaaProcessor := vaaprocessor.NewProcessor(guardianApiProviderPool, repository, logger, metrics)
governorProcessor := governorProcessor.NewProcessor(repository, createTxHashFunc, logger, metrics)
// start serving /health and /ready endpoints
healthChecks, err := makeHealthChecks(rootCtx, cfg, db.Database)
if err != nil {
logger.Fatal("Failed to create health checks", zap.Error(err))
}
vaaCtrl := vaa.NewController(processor.Process, repository, logger)
vaaCtrl := vaa.NewController(dupVaaProcessor.Process, repository, logger)
server := infrastructure.NewServer(logger, cfg.Port, vaaCtrl, cfg.PprofEnabled, healthChecks...)
server.Start()
// create and start a duplicate VAA consumer.
duplicateVaaConsumeFunc := newDuplicateVaaConsumeFunc(rootCtx, cfg, metrics, logger)
duplicateVaa := consumer.New(duplicateVaaConsumeFunc, processor.Process, logger, metrics, cfg.P2pNetwork, cfg.ConsumerWorkerSize)
duplicateVaa := vaaConsumer.New(duplicateVaaConsumeFunc, dupVaaProcessor.Process, logger, metrics, cfg.P2pNetwork, cfg.ConsumerWorkerSize)
duplicateVaa.Start(rootCtx)
// create and start a governor status consumer.
governorStatusConsumerFunc := newGovernorStatusConsumeFunc(rootCtx, cfg, metrics, logger)
governorStatus := governorConsumer.New(governorStatusConsumerFunc, governorProcessor.Process, logger, metrics, cfg.P2pNetwork, cfg.GovernorConsumerWorkerSize)
governorStatus.Start(rootCtx)
logger.Info("Started wormholescan-fly-event-processor")
// Waiting for signal
@ -203,13 +222,49 @@ func newDuplicateVaaConsumeFunc(
cfg *config.ServiceConfiguration,
metrics metrics.Metrics,
logger *zap.Logger,
) queue.ConsumeFunc {
) queue.ConsumeFunc[queue.EventDuplicateVaa] {
sqsConsumer, err := newSqsConsumer(ctx, cfg, cfg.DuplicateVaaSQSUrl)
if err != nil {
logger.Fatal("failed to create sqs consumer", zap.Error(err))
}
vaaQueue := queue.NewEventSqs(sqsConsumer, metrics, logger)
vaaQueue := queue.NewEventSqs[queue.EventDuplicateVaa](sqsConsumer,
metrics.IncDuplicatedVaaConsumedQueue, logger)
return vaaQueue.Consume
}
func newGovernorStatusConsumeFunc(
ctx context.Context,
cfg *config.ServiceConfiguration,
metrics metrics.Metrics,
logger *zap.Logger,
) queue.ConsumeFunc[queue.EventGovernorStatus] {
sqsConsumer, err := newSqsConsumer(ctx, cfg, cfg.GovernorSQSUrl)
if err != nil {
logger.Fatal("failed to create sqs consumer", zap.Error(err))
}
governorStatusQueue := queue.NewEventSqs[queue.EventGovernorStatus](sqsConsumer,
metrics.IncGovernorStatusConsumedQueue, logger)
return governorStatusQueue.Consume
}
func newCreateTxHashFunc(
cfg *config.ServiceConfiguration,
logger *zap.Logger,
) (txTracker.CreateTxHashFunc, error) {
if cfg.Environment == config.EnvironmentLocal {
return func(vaaID, txHash string) (*txTracker.TxHashResponse, error) {
return &txTracker.TxHashResponse{
NativeTxHash: txHash,
}, nil
}, nil
}
createTxHashClient, err := txTracker.NewTxTrackerAPIClient(cfg.TxTrackerTimeout, cfg.TxTrackerUrl, logger)
if err != nil {
return nil, fmt.Errorf("failed to initialize TxTracker client: %w", err)
}
return createTxHashClient.CreateTxHash, nil
}

View File

@ -10,6 +10,10 @@ import (
"github.com/sethvargo/go-envconfig"
)
const (
EnvironmentLocal = "local"
)
// p2p network constants.
const (
P2pMainNet = "mainnet"
@ -29,7 +33,9 @@ type ServiceConfiguration struct {
AlertApiKey string `env:"ALERT_API_KEY"`
MetricsEnabled bool `env:"METRICS_ENABLED,default=false"`
// Fly event consumer configuration
ConsumerWorkerSize int `env:"CONSUMER_WORKER_SIZE,default=1"`
ConsumerWorkerSize int `env:"CONSUMER_WORKER_SIZE,default=1"`
GovernorConsumerWorkerSize int `env:"GOVERNOR_CONSUMER_WORKER_SIZE,default=1"`
// Database configuration
MongoURI string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
@ -39,6 +45,11 @@ type ServiceConfiguration struct {
AwsSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"`
AwsRegion string `env:"AWS_REGION"`
DuplicateVaaSQSUrl string `env:"DUPLICATE_VAA_SQS_URL"`
GovernorSQSUrl string `env:"GOVERNOR_SQS_URL"`
// Tx-tracker client configuration
TxTrackerUrl string `env:"TX_TRACKER_URL,required"`
TxTrackerTimeout int64 `env:"TX_TRACKER_TIMEOUT,default=10"`
// Guardian api provider configuration
GuardianAPIProviderPath string `env:"GUARDIAN_API_PROVIDER_PATH,required"`
*GuardianAPIConfigurationJson `required:"false"`

View File

@ -0,0 +1,103 @@
package governor
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/domain"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics"
govprocessor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/governor"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue"
"go.uber.org/zap"
)
// Consumer consumer struct definition.
type Consumer struct {
consumeFunc queue.ConsumeFunc[queue.EventGovernorStatus]
processor govprocessor.ProcessorFunc
logger *zap.Logger
metrics metrics.Metrics
p2pNetwork string
workersSize int
}
// New creates a new vaa consumer.
func New(
consumeFunc queue.ConsumeFunc[queue.EventGovernorStatus],
processor govprocessor.ProcessorFunc,
logger *zap.Logger,
metrics metrics.Metrics,
p2pNetwork string,
workersSize int,
) *Consumer {
c := Consumer{
consumeFunc: consumeFunc,
processor: processor,
logger: logger,
metrics: metrics,
p2pNetwork: p2pNetwork,
workersSize: workersSize,
}
return &c
}
// Start consumes messages from VAA queue, parse and store those messages in a repository.
func (c *Consumer) Start(ctx context.Context) {
ch := c.consumeFunc(ctx)
for i := 0; i < c.workersSize; i++ {
go c.producerLoop(ctx, ch)
}
}
func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMessage[queue.EventGovernorStatus]) {
for {
select {
case <-ctx.Done():
return
case msg := <-ch:
c.processEvent(ctx, msg)
}
}
}
func (c *Consumer) processEvent(ctx context.Context, msg queue.ConsumerMessage[queue.EventGovernorStatus]) {
event := msg.Data()
// Check if the event is a governor status event.
if event.Type != queue.GovernorStatusEventType {
msg.Done()
c.logger.Debug("event is not a governor status",
zap.Any("event", event))
return
}
logger := c.logger.With(
zap.String("trackId", event.TrackID),
zap.String("type", event.Type),
zap.String("node", event.Data.NodeName))
if msg.IsExpired() {
msg.Failed()
logger.Debug("event is expired")
c.metrics.IncGovernorStatusExpired(event.Data.NodeName, event.Data.NodeAddress)
return
}
params := &govprocessor.Params{
TrackID: event.TrackID,
NodeGovernorVaa: domain.ConvertEventToGovernorVaa(&event),
}
err := c.processor(ctx, params)
if err != nil {
msg.Failed()
logger.Error("failed to process governor-status event", zap.Error(err))
c.metrics.IncGovernorStatusFailed(params.NodeGovernorVaa.Name, params.NodeGovernorVaa.Address)
return
}
msg.Done()
logger.Debug("governor-status event processed")
c.metrics.IncGovernorStatusProcessed(params.NodeGovernorVaa.Name, params.NodeGovernorVaa.Address)
}

View File

@ -1,11 +1,10 @@
package consumer
package vaa
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor"
processor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/vaa"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
@ -13,18 +12,17 @@ import (
// Consumer consumer struct definition.
type Consumer struct {
consumeFunc queue.ConsumeFunc
processor processor.ProcessorFunc
guardianPool *pool.Pool
logger *zap.Logger
metrics metrics.Metrics
p2pNetwork string
workersSize int
consumeFunc queue.ConsumeFunc[queue.EventDuplicateVaa]
processor processor.ProcessorFunc
logger *zap.Logger
metrics metrics.Metrics
p2pNetwork string
workersSize int
}
// New creates a new vaa consumer.
func New(
consumeFunc queue.ConsumeFunc,
consumeFunc queue.ConsumeFunc[queue.EventDuplicateVaa],
processor processor.ProcessorFunc,
logger *zap.Logger,
metrics metrics.Metrics,
@ -52,7 +50,7 @@ func (c *Consumer) Start(ctx context.Context) {
}
}
func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMessage) {
func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMessage[queue.EventDuplicateVaa]) {
for {
select {
case <-ctx.Done():
@ -63,13 +61,23 @@ func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMes
}
}
func (c *Consumer) processEvent(ctx context.Context, msg queue.ConsumerMessage) {
func (c *Consumer) processEvent(ctx context.Context, msg queue.ConsumerMessage[queue.EventDuplicateVaa]) {
event := msg.Data()
// Check if the event is a duplicate VAA event.
if event.Type != queue.DeduplicateVaaEventType {
msg.Done()
c.logger.Debug("event is not a duplicate VAA",
zap.Any("event", event))
return
}
vaaID := event.Data.VaaID
chainID := sdk.ChainID(event.Data.ChainID)
logger := c.logger.With(
zap.String("trackId", event.TrackID),
zap.String("type", event.Type),
zap.String("vaaId", vaaID))
if msg.IsExpired() {

View File

@ -0,0 +1,79 @@
package domain
import (
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/utils"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type Node struct {
Name string
Address string
}
type NodeGovernorVaa struct {
Node
GovernorVaas map[string]GovernorVaa
}
type GovernorVaa struct {
ID string
ChainID sdk.ChainID
EmitterAddress string
Sequence string
TxHash string
ReleaseTime time.Time
Amount uint64
}
// ConvertEventToGovernorVaa convert a event *queue.EventGovernorStatus to a *NodeGovernorVaa.
func ConvertEventToGovernorVaa(event *queue.EventGovernorStatus) *NodeGovernorVaa {
// check if event is nil.
if event == nil {
return nil
}
// check if chains is empty.
if len(event.Data.Chains) == 0 {
return nil
}
governorVaas := make(map[string]GovernorVaa)
for _, chain := range event.Data.Chains {
for _, emitter := range chain.Emitters {
for _, enqueuedVAA := range emitter.EnqueuedVaas {
normalizeEmitter := utils.NormalizeHex(emitter.EmitterAddress)
normalizeTxHash := utils.NormalizeHex(enqueuedVAA.TxHash)
vaaID := fmt.Sprintf("%d/%s/%s",
chain.ChainId,
normalizeEmitter,
enqueuedVAA.Sequence)
gs := GovernorVaa{
ID: vaaID,
ChainID: sdk.ChainID(chain.ChainId),
EmitterAddress: normalizeEmitter,
Sequence: enqueuedVAA.Sequence,
TxHash: normalizeTxHash,
ReleaseTime: time.Unix(int64(enqueuedVAA.ReleaseTime), 0),
Amount: enqueuedVAA.NotionalValue,
}
governorVaas[vaaID] = gs
}
}
}
return &NodeGovernorVaa{
Node: Node{
Name: event.Data.NodeName,
Address: event.Data.NodeAddress,
},
GovernorVaas: governorVaas,
}
}

View File

@ -4,7 +4,7 @@ import (
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor"
processor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/vaa"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage"
"go.uber.org/zap"
)

View File

@ -24,3 +24,21 @@ func (d *DummyMetrics) IncDuplicatedVaaExpired(chainID sdk.ChainID) {}
// IncDuplicatedVaaCanNotFixed dummy implementation.
func (d *DummyMetrics) IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) {}
// IncGovernorStatusConsumedQueue dummy implementation.
func (d *DummyMetrics) IncGovernorStatusConsumedQueue() {}
// IncGovernorStatusProcessed dummy implementation.
func (d *DummyMetrics) IncGovernorStatusProcessed(node string, address string) {}
// IncGovernorStatusFailed dummy implementation.
func (d *DummyMetrics) IncGovernorStatusFailed(node string, address string) {}
// IncGovernorStatusExpired dummy implementation.
func (d *DummyMetrics) IncGovernorStatusExpired(node string, address string) {}
// IncGovernorVaaAdded dummy implementation.
func (d *DummyMetrics) IncGovernorVaaAdded(chainID sdk.ChainID) {}
// IndGovenorVaaDeleted dummy implementation.
func (d *DummyMetrics) IndGovenorVaaDeleted(chainID sdk.ChainID) {}

View File

@ -10,4 +10,18 @@ type Metrics interface {
IncDuplicatedVaaFailed(chainID sdk.ChainID)
IncDuplicatedVaaExpired(chainID sdk.ChainID)
IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID)
IncGovernorStatusConsumedQueue()
IncGovernorStatusProcessed(node string, address string)
IncGovernorStatusFailed(node string, address string)
IncGovernorStatusExpired(node string, address string)
IncGovernorVaaAdded(chainID sdk.ChainID)
IndGovenorVaaDeleted(chainID sdk.ChainID)
}
// IncDuplicatedVaaConsumedQueue increments the counter of consumed queue
type IncConsumedQueue func()
/*
// ProcessorFunc is a function to process a governor message.
type ProcessorFunc func(context.Context, *Params) error
*/

View File

@ -8,7 +8,9 @@ import (
// PrometheusMetrics is a Prometheus implementation of Metric interface.
type PrometheusMetrics struct {
duplicatedVaaCount *prometheus.CounterVec
duplicatedVaaCount *prometheus.CounterVec
governorStatusCount *prometheus.CounterVec
governorVaaCount *prometheus.CounterVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
@ -23,29 +25,84 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
"service": serviceName,
},
}, []string{"chain", "type"}),
governorStatusCount: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormscan_fly_event_processor_governor_status_count",
Help: "The total number of governor status processed",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"node", "address", "type"}),
governorVaaCount: promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wormscan_fly_event_processor_governor_vaa_count",
Help: "The total number of governor VAA processed",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"chain", "type"}),
}
}
// IncDuplicatedVaaConsumedQueue increments the total number of duplicated VAA consumed queue.
func (m *PrometheusMetrics) IncDuplicatedVaaConsumedQueue() {
m.duplicatedVaaCount.WithLabelValues("all", "consumed_queue").Inc()
}
// IncDuplicatedVaaProcessed increments the total number of duplicated VAA processed.
func (m *PrometheusMetrics) IncDuplicatedVaaProcessed(chainID sdk.ChainID) {
chain := chainID.String()
m.duplicatedVaaCount.WithLabelValues(chain, "processed").Inc()
}
// IncDuplicatedVaaFailed increments the total number of duplicated VAA failed.
func (m *PrometheusMetrics) IncDuplicatedVaaFailed(chainID sdk.ChainID) {
chain := chainID.String()
m.duplicatedVaaCount.WithLabelValues(chain, "failed").Inc()
}
// IncDuplicatedVaaExpired increments the total number of duplicated VAA expired.
func (m *PrometheusMetrics) IncDuplicatedVaaExpired(chainID sdk.ChainID) {
chain := chainID.String()
m.duplicatedVaaCount.WithLabelValues(chain, "expired").Inc()
}
// IncDuplicatedVaaCanNotFixed increments the total number of duplicated VAA can not fixed.
func (m *PrometheusMetrics) IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) {
chain := chainID.String()
m.duplicatedVaaCount.WithLabelValues(chain, "can_not_fixed").Inc()
}
// IncGovernorStatusConsumedQueue increments the total number of governor status consumed queue.
func (m *PrometheusMetrics) IncGovernorStatusConsumedQueue() {
m.governorStatusCount.WithLabelValues("all", "", "consumed_queue").Inc()
}
// IncGovernorStatusProcessed increments the total number of governor status processed.
func (m *PrometheusMetrics) IncGovernorStatusProcessed(node string, address string) {
m.governorStatusCount.WithLabelValues(node, address, "processed").Inc()
}
// IncGovernorStatusFailed increments the total number of governor status failed.
func (m *PrometheusMetrics) IncGovernorStatusFailed(node string, address string) {
m.governorStatusCount.WithLabelValues(node, address, "failed").Inc()
}
// IncGovernorStatusExpired increments the total number of governor status expired.
func (m *PrometheusMetrics) IncGovernorStatusExpired(node string, address string) {
m.governorStatusCount.WithLabelValues(node, address, "expired").Inc()
}
// IncGovernorVaaAdded increments the total number of governor VAA added.
func (m *PrometheusMetrics) IncGovernorVaaAdded(chainID sdk.ChainID) {
chain := chainID.String()
m.governorVaaCount.WithLabelValues(chain, "added").Inc()
}
// IndGovenorVaaDeleted increments the total number of governor VAA deleted.
func (m *PrometheusMetrics) IndGovenorVaaDeleted(chainID sdk.ChainID) {
chain := chainID.String()
m.governorVaaCount.WithLabelValues(chain, "deleted").Inc()
}

View File

@ -0,0 +1,320 @@
package governor
import (
"context"
"errors"
"fmt"
txTracker "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/domain"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage"
"go.uber.org/zap"
)
// Processor is a governor processor.
type Processor struct {
repository *storage.Repository
createTxHashFunc txTracker.CreateTxHashFunc
logger *zap.Logger
metrics metrics.Metrics
}
// NewProcessor creates a new governor processor.
func NewProcessor(
repository *storage.Repository,
createTxHashFunc txTracker.CreateTxHashFunc,
logger *zap.Logger,
metrics metrics.Metrics,
) *Processor {
return &Processor{
repository: repository,
createTxHashFunc: createTxHashFunc,
logger: logger,
metrics: metrics,
}
}
// Process processes a governor event.
func (p *Processor) Process(
ctx context.Context,
params *Params) error {
logger := p.logger.With(
zap.String("trackId", params.TrackID),
)
// 1. Check if the event is valid.
if params.NodeGovernorVaa == nil {
logger.Info("event is nil")
return errors.New("event cannot be nil")
}
node := params.NodeGovernorVaa.Node
if node.Address == "" {
logger.Info("node is invalid")
return errors.New("node is invalid")
}
// 2. Get new and current governorVaa by node.
newNodeGovernorVaas := params.NodeGovernorVaa
nodeGovernorVaaIds, err := p.getNodeGovernorVaaIds(ctx, node, logger)
if err != nil {
logger.Error("failed to get current governorVaa",
zap.Error(err),
zap.String("nodeAddress", node.Address))
return err
}
// 3. Get nodeGovernorVaa to add and delete.
nodeGovernorVaasToAdd := getNodeGovernorVaasToAdd(
newNodeGovernorVaas.GovernorVaas, nodeGovernorVaaIds)
nodeGovernorVaaIdsToDelete := getNodeGovernorVaasToDelete(
newNodeGovernorVaas.GovernorVaas, nodeGovernorVaaIds)
// 4. Get governorVaa to add and delete.
governorVaasToAdd, err := p.getGovernorVaaToAdd(ctx, nodeGovernorVaasToAdd, logger)
if err != nil {
logger.Error("failed to get governorVaa to insert",
zap.Error(err),
zap.String("nodeAddress", node.Address))
return err
}
governorVaaIdsToDelete, err := p.getGovernorVaaToDelete(ctx, node, nodeGovernorVaaIdsToDelete, logger)
if err != nil {
logger.Error("failed to get governorVaa to delete",
zap.Error(err),
zap.String("nodeAddress", node.Address))
return err
}
// 5. Check if there are no changes in governor.
changeNodeGovernorVaas := len(nodeGovernorVaasToAdd) > 0 || len(nodeGovernorVaaIdsToDelete) > 0
changeGovernorVaas := len(governorVaasToAdd) > 0 || len(governorVaaIdsToDelete) > 0
if !changeNodeGovernorVaas && !changeGovernorVaas {
logger.Info("no changes in governor",
zap.String("nodeAddress", node.Address))
return nil
}
// 6. Update governor data for the node.
err = p.updateGovernor(ctx,
node,
nodeGovernorVaasToAdd,
nodeGovernorVaaIdsToDelete,
governorVaasToAdd,
governorVaaIdsToDelete)
if err != nil {
logger.Error("failed to update governorVaa",
zap.Error(err),
zap.String("nodeAddress", node.Address),
zap.String("node", node.Name))
return err
}
return nil
}
// getNodeGovernorVaaIds gets the current governor vaaIds stored in the database by node address.
func (p *Processor) getNodeGovernorVaaIds(
ctx context.Context,
node domain.Node,
logger *zap.Logger,
) (Set[string], error) {
// get current nodeGovernorVaa by nodeAddress.
nodeGovernorVaaDoc, err := p.repository.FindNodeGovernorVaaByNodeAddress(ctx, node.Address)
if err != nil {
logger.Error("failed to find nodeGovernorVaa by nodeAddress",
zap.Error(err),
zap.String("nodeAddress", node.Address))
return Set[string]{}, err
}
// convert nodeGovernorVaaDoc to Set[string]
nodeGovernorVaaId := make(Set[string])
for _, governorVaaDoc := range nodeGovernorVaaDoc {
nodeGovernorVaaId.Add(governorVaaDoc.VaaID)
}
return nodeGovernorVaaId, nil
}
// getNodeGovernorVaasToAdd gets the node governor vaas to add.
func getNodeGovernorVaasToAdd(
newNodeGovernorVaas map[string]domain.GovernorVaa,
nodeGovernorVaaIds Set[string],
) map[string]domain.GovernorVaa {
nodeGovernorVaasToAdd := make(map[string]domain.GovernorVaa)
for vaaID, governorVaa := range newNodeGovernorVaas {
if ok := nodeGovernorVaaIds.Contains(vaaID); !ok {
nodeGovernorVaasToAdd[vaaID] = governorVaa
}
}
return nodeGovernorVaasToAdd
}
// getNodeGovernorVaasToDelete gets the node governor vaas to delete.
func getNodeGovernorVaasToDelete(
newNodeGovernorVaas map[string]domain.GovernorVaa,
nodeGovernorVaaIds Set[string],
) Set[string] {
nodeGovernorVaasToDelete := make(Set[string])
for vaaID := range nodeGovernorVaaIds {
if _, ok := newNodeGovernorVaas[vaaID]; !ok {
nodeGovernorVaasToDelete.Add(vaaID)
}
}
return nodeGovernorVaasToDelete
}
// getGovernorVaaToAdd gets the governor vaas to add.
func (p *Processor) getGovernorVaaToAdd(
ctx context.Context,
nodeGovernorVaas map[string]domain.GovernorVaa,
logger *zap.Logger,
) ([]domain.GovernorVaa, error) {
// get vaaIDs from the nodeGovernorVaas.
vaaIds := make([]string, 0, len(nodeGovernorVaas))
for vaaId, _ := range nodeGovernorVaas {
vaaIds = append(vaaIds, vaaId)
}
// get governoVaas already added by vaaIDs.
governorVaas, err := p.repository.FindGovernorVaaByVaaIDs(ctx, vaaIds)
if err != nil {
logger.Error("failed to find governor vaas by a list of vaaIDs",
zap.Error(err),
zap.Strings("vaaIDs", vaaIds))
return nil, err
}
if len(vaaIds) < len(governorVaas) {
logger.Error("failed to find governorVaa by a list of vaaIDs",
zap.Error(err),
zap.Strings("vaaIDs", vaaIds))
return nil, errors.New("failed to find governorVaa by vaaIDs")
}
// check if all the governorVaa are already added
if len(vaaIds) == len(governorVaas) {
return nil, nil
}
// convert governorVaas to a set of vaaIDs.
governorVaaIds := make(Set[string])
for _, governorVaa := range governorVaas {
governorVaaIds.Add(governorVaa.ID)
}
// get governorVaa to insert
var governorVaasToInsert []domain.GovernorVaa
for vaaID, governorVaa := range nodeGovernorVaas {
if ok := governorVaaIds.Contains(vaaID); !ok {
// fix governor vaa txHash
txHash, err := p.createTxHashFunc(governorVaa.ID, governorVaa.TxHash)
if err != nil {
logger.Error("failed to create txHash",
zap.Error(err),
zap.String("vaaID", governorVaa.ID),
zap.String("txHash", governorVaa.TxHash))
return nil, err
}
governorVaa.TxHash = txHash.NativeTxHash
governorVaasToInsert = append(governorVaasToInsert, governorVaa)
}
}
return governorVaasToInsert, nil
}
// getGovernorVaaToDelete gets the governor vaas to delete.
func (p *Processor) getGovernorVaaToDelete(
ctx context.Context,
node domain.Node,
nodeGovernorVaaIds Set[string],
logger *zap.Logger,
) (Set[string], error) {
// get vaaIDs from the nodeGovernorVaaIds.
vaaIds := make([]string, 0, nodeGovernorVaaIds.Len())
for vaaID := range nodeGovernorVaaIds {
vaaIds = append(vaaIds, vaaID)
}
// nodeGovernorVaas contains all the node governor vaas that have the same vaaID.
nodeGovernorVaas, err := p.repository.FindNodeGovernorVaaByVaaIDs(ctx, vaaIds)
if err != nil {
logger.Error("failed to find governorVaa by vaaIDs",
zap.Error(err),
zap.Strings("vaaIDs", vaaIds))
return nil, err
}
// nodeAddressByVaaId contains all the node address grouped by vaaID.
nodeAddressByVaaId := make(map[string][]string)
for _, n := range nodeGovernorVaas {
if _, ok := nodeAddressByVaaId[n.VaaID]; !ok {
nodeAddressByVaaId[n.VaaID] = make([]string, 0)
}
nodeAddressByVaaId[n.VaaID] = append(nodeAddressByVaaId[n.VaaID], n.NodeAddress)
}
// get governorVaa to delete
governorVaaToDelete := make(Set[string])
for vaaID, nodeAddresses := range nodeAddressByVaaId {
deleteGovernorVaa := len(nodeAddresses) == 1 && node.Address == nodeAddresses[0]
if deleteGovernorVaa {
governorVaaToDelete.Add(vaaID)
}
}
return governorVaaToDelete, nil
}
func (p *Processor) updateGovernor(ctx context.Context,
node domain.Node,
nodeGovernorVaasToAdd map[string]domain.GovernorVaa,
nodeGovernorVaaIdsToDelete Set[string],
governorVaasToAdd []domain.GovernorVaa,
governorVaaIdsToDelete Set[string]) error {
// convert nodeGovernorVaasToAdd to []storage.NodeGovernorVaaDoc
var nodeGovernorVaasToAddDoc []storage.NodeGovernorVaaDoc
for vaaID, _ := range nodeGovernorVaasToAdd {
nodeGovernorVaasToAddDoc = append(nodeGovernorVaasToAddDoc, storage.NodeGovernorVaaDoc{
ID: fmt.Sprintf("%s-%s", node.Address, vaaID),
NodeName: node.Name,
NodeAddress: node.Address,
VaaID: vaaID,
})
}
// convert governorVaasToAdd to []storage.GovernorVaaDoc
var governorVaasToAddDoc []storage.GovernorVaaDoc
for _, governorVaa := range governorVaasToAdd {
governorVaasToAddDoc = append(governorVaasToAddDoc, storage.GovernorVaaDoc{
ID: governorVaa.ID,
ChainID: governorVaa.ChainID,
EmitterAddress: governorVaa.EmitterAddress,
Sequence: governorVaa.Sequence,
TxHash: governorVaa.TxHash,
ReleaseTime: governorVaa.ReleaseTime,
Amount: storage.Uint64(governorVaa.Amount),
})
}
// convert nodeGovernorVaas vaaIds to ids
var nodeGovVaaIdsToDelete []string
for vaaID := range nodeGovernorVaaIdsToDelete {
nodeGovVaaIdsToDelete = append(nodeGovVaaIdsToDelete, fmt.Sprintf("%s-%s", node.Address, vaaID))
}
return p.repository.UpdateGovernor(
ctx,
nodeGovernorVaasToAddDoc,
nodeGovVaaIdsToDelete,
governorVaasToAddDoc,
governorVaaIdsToDelete.ToSlice())
}

View File

@ -0,0 +1,43 @@
package governor
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/domain"
)
// Set generic type definition.
type Set[T comparable] map[T]struct{}
// add a value to the set
func (s Set[T]) Add(v T) {
s[v] = struct{}{}
}
// check if the set contains a value
func (s Set[T]) Contains(v T) bool {
_, ok := s[v]
return ok
}
// get length of the set
func (s Set[T]) Len() int {
return len(s)
}
// to slice
func (s Set[T]) ToSlice() []T {
var slice []T
for k := range s {
slice = append(slice, k)
}
return slice
}
type Params struct {
TrackID string
NodeGovernorVaa *domain.NodeGovernorVaa
}
// ProcessorFunc is a function to process a governor message.
type ProcessorFunc func(context.Context, *Params) error

View File

@ -1,4 +1,4 @@
package processor
package vaa
import (
"errors"

View File

@ -1,4 +1,4 @@
package processor
package vaa
import (
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"

View File

@ -13,42 +13,46 @@ import (
)
// SQSOption represents a VAA queue in SQS option function.
type SQSOption func(*SQS)
type SQSOption[T Event] func(*SQS[T])
// SQS represents a VAA queue in SQS.
type SQS struct {
consumer *sqs_client.Consumer
ch chan ConsumerMessage
chSize int
wg sync.WaitGroup
metrics metrics.Metrics
logger *zap.Logger
type SQS[T Event] struct {
consumer *sqs_client.Consumer
ch chan ConsumerMessage[T]
chSize int
wg sync.WaitGroup
incConsumedQueueFunc metrics.IncConsumedQueue
logger *zap.Logger
}
// NewEventSqs creates a VAA queue in SQS instances.
func NewEventSqs(consumer *sqs_client.Consumer, metrics metrics.Metrics, logger *zap.Logger, opts ...SQSOption) *SQS {
s := &SQS{
consumer: consumer,
chSize: 10,
metrics: metrics,
logger: logger.With(zap.String("queueUrl", consumer.GetQueueUrl())),
func NewEventSqs[T Event](
consumer *sqs_client.Consumer,
incConsumedQueueFunc metrics.IncConsumedQueue,
logger *zap.Logger,
opts ...SQSOption[T]) *SQS[T] {
s := &SQS[T]{
consumer: consumer,
chSize: 10,
incConsumedQueueFunc: incConsumedQueueFunc,
logger: logger.With(zap.String("queueUrl", consumer.GetQueueUrl())),
}
for _, opt := range opts {
opt(s)
}
s.ch = make(chan ConsumerMessage, s.chSize)
s.ch = make(chan ConsumerMessage[T], s.chSize)
return s
}
// WithChannelSize allows to specify an channel size when setting a value.
func WithChannelSize(size int) SQSOption {
return func(d *SQS) {
func WithChannelSize[T Event](size int) SQSOption[T] {
return func(d *SQS[T]) {
d.chSize = size
}
}
// Consume returns the channel with the received messages from SQS queue.
func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
func (q *SQS[T]) Consume(ctx context.Context) <-chan ConsumerMessage[T] {
go func() {
for {
messages, err := q.consumer.GetMessages(ctx)
@ -60,7 +64,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
for _, msg := range messages {
q.metrics.IncDuplicatedVaaConsumedQueue()
q.incConsumedQueueFunc()
// unmarshal body to sqsEvent
var sqsEvent sqsEvent
err := json.Unmarshal([]byte(*msg.Body), &sqsEvent)
@ -72,7 +76,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
continue
}
var event Event
var event T
err = json.Unmarshal([]byte(sqsEvent.Message), &event)
if err != nil {
q.logger.Error("Error decoding message from SQS", zap.String("body", sqsEvent.Message), zap.Error(err))
@ -84,15 +88,14 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"])
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
q.ch <- &sqsConsumerMessage[T]{
id: msg.ReceiptHandle,
data: &event,
data: event,
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
retry: uint8(retry),
metrics: q.metrics,
ctx: ctx,
}
}
@ -104,30 +107,24 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
}
// Close closes all consumer resources.
func (q *SQS) Close() {
func (q *SQS[T]) Close() {
close(q.ch)
}
type sqsConsumerMessage struct {
data *Event
type sqsConsumerMessage[T Event] struct {
data T
consumer *sqs_client.Consumer
wg *sync.WaitGroup
id *string
logger *zap.Logger
expiredAt time.Time
retry uint8
metrics metrics.Metrics
ctx context.Context
}
func (m *sqsConsumerMessage) Data() *Event {
return m.data
}
func (m *sqsConsumerMessage) Done() {
func (m *sqsConsumerMessage[T]) Done() {
if err := m.consumer.DeleteMessage(m.ctx, m.id); err != nil {
m.logger.Error("Error deleting message from SQS",
zap.String("vaaId", m.data.Data.VaaID),
zap.Bool("isExpired", m.IsExpired()),
zap.Time("expiredAt", m.expiredAt),
zap.Error(err),
@ -136,14 +133,18 @@ func (m *sqsConsumerMessage) Done() {
m.wg.Done()
}
func (m *sqsConsumerMessage) Failed() {
func (m *sqsConsumerMessage[T]) Data() T {
return m.data
}
func (m *sqsConsumerMessage[T]) Failed() {
m.wg.Done()
}
func (m *sqsConsumerMessage) IsExpired() bool {
func (m *sqsConsumerMessage[T]) IsExpired() bool {
return m.expiredAt.Before(time.Now())
}
func (m *sqsConsumerMessage) Retry() uint8 {
func (m *sqsConsumerMessage[T]) Retry() uint8 {
return m.retry
}

View File

@ -5,20 +5,31 @@ import (
"time"
)
const (
DeduplicateVaaEventType = "duplicated-vaa"
GovernorStatusEventType = "governor-status"
)
// sqsEvent represents a event data from SQS.
type sqsEvent struct {
MessageID string `json:"MessageId"`
Message string `json:"Message"`
}
// Event represents a event data to be handle.
type Event struct {
// Event represents a event data.
type Event interface {
EventDuplicateVaa | EventGovernorStatus
}
// EventDuplicateVaa defition.
type EventDuplicateVaa struct {
TrackID string `json:"trackId"`
Type string `json:"type"`
Source string `json:"source"`
Data DuplicateVaa `json:"data"`
}
// DuplicateVaa defition.
type DuplicateVaa struct {
VaaID string `json:"vaaId"`
ChainID uint16 `json:"chainId"`
@ -30,14 +41,53 @@ type DuplicateVaa struct {
Timestamp *time.Time `json:"timestamp"`
}
// EventGovernorStatus defition.
type EventGovernorStatus struct {
TrackID string `json:"trackId"`
Type string `json:"type"`
Source string `json:"source"`
Data GovernorStatus `json:"data"`
}
// GovernorStatus defition.
type GovernorStatus struct {
NodeAddress string `json:"nodeAddress"`
NodeName string `json:"nodeName"`
Counter int64 `json:"counter"`
Timestamp int64 `json:"timestamp"`
Chains []*ChainStatus `json:"chains"`
}
// ChainStatus defition.
type ChainStatus struct {
ChainId uint32 `json:"chainId"`
RemainingAvailableNotional uint64 `json:"remainingAvailableNotional"`
Emitters []*Emitter `json:"emitters"`
}
// Emitter defition.
type Emitter struct {
EmitterAddress string `bson:"emitteraddress" json:"emitterAddress"`
TotalEnqueuedVaas uint64 `bson:"totalenqueuedvaas" json:"totalEnqueuedVaas"`
EnqueuedVaas []*EnqueuedVAA `bson:"enqueuedvaas" json:"enqueuedVaas"`
}
// EnqueuedVAA defition.
type EnqueuedVAA struct {
Sequence string `bson:"sequence" json:"sequence"`
ReleaseTime uint64 `bson:"releasetime" json:"releaseTime"`
NotionalValue uint64 `bson:"notionalvalue" json:"notionalValue"`
TxHash string `bson:"txhash" json:"txHash"`
}
// ConsumerMessage defition.
type ConsumerMessage interface {
type ConsumerMessage[T any] interface {
Retry() uint8
Data() *Event
Data() T
Done()
Failed()
IsExpired() bool
}
// ConsumeFunc is a function to consume Event.
type ConsumeFunc func(context.Context) <-chan ConsumerMessage
type ConsumeFunc[T any] func(context.Context) <-chan ConsumerMessage[T]

View File

@ -11,17 +11,21 @@ import (
// Repository exposes operations over the `globalTransactions` collection.
type Repository struct {
logger *zap.Logger
vaas *mongo.Collection
duplicateVaas *mongo.Collection
logger *zap.Logger
vaas *mongo.Collection
duplicateVaas *mongo.Collection
nodeGovernorVaas *mongo.Collection
governorVaas *mongo.Collection
}
// New creates a new repository.
func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository {
r := Repository{
logger: logger,
vaas: db.Collection(commonRepo.Vaas),
duplicateVaas: db.Collection(commonRepo.DuplicateVaas),
logger: logger,
vaas: db.Collection(commonRepo.Vaas),
duplicateVaas: db.Collection(commonRepo.DuplicateVaas),
nodeGovernorVaas: db.Collection(commonRepo.NodeGovernorVaas),
governorVaas: db.Collection(commonRepo.GovernorVaas),
}
return &r
}
@ -125,3 +129,127 @@ func (r *Repository) FixVAA(ctx context.Context, vaaID, duplicateID string) erro
return nil
}
// FindNodeGovernorVaaByNodeAddress find governor vaas by node address.
func (r *Repository) FindNodeGovernorVaaByNodeAddress(ctx context.Context, nodeAddress string) ([]NodeGovernorVaaDoc, error) {
var nodeGovernorVaa []NodeGovernorVaaDoc
cursor, err := r.nodeGovernorVaas.Find(ctx, bson.M{"nodeAddress": nodeAddress})
if err != nil {
return nil, err
}
if err = cursor.All(ctx, &nodeGovernorVaa); err != nil {
return nil, err
}
return nodeGovernorVaa, nil
}
// FindNodeGovernorVaaByVaaID find governor vaas by vaa id.
func (r *Repository) FindNodeGovernorVaaByVaaID(ctx context.Context, vaaID string) ([]NodeGovernorVaaDoc, error) {
var nodeGovernorVaa []NodeGovernorVaaDoc
cursor, err := r.nodeGovernorVaas.Find(ctx, bson.M{"vaaId": vaaID})
if err != nil {
return nil, err
}
if err = cursor.All(ctx, &nodeGovernorVaa); err != nil {
return nil, err
}
return nodeGovernorVaa, nil
}
// FindNodeGovernorVaaByVaaIDs find governor vaas by vaa ids.
func (r *Repository) FindNodeGovernorVaaByVaaIDs(ctx context.Context, vaaID []string) ([]NodeGovernorVaaDoc, error) {
var nodeGovernorVaa []NodeGovernorVaaDoc
cursor, err := r.nodeGovernorVaas.Find(ctx, bson.M{"vaaId": bson.M{"$in": vaaID}})
if err != nil {
return nil, err
}
if err = cursor.All(ctx, &nodeGovernorVaa); err != nil {
return nil, err
}
return nodeGovernorVaa, nil
}
// FindGovernorVaaByVaaID find governor vaas by a list of vaaIds
func (r *Repository) FindGovernorVaaByVaaIDs(ctx context.Context, vaaID []string) ([]GovernorVaaDoc, error) {
var governorVaa []GovernorVaaDoc
cursor, err := r.governorVaas.Find(ctx, bson.M{"_id": bson.M{"$in": vaaID}})
if err != nil {
return nil, err
}
if err = cursor.All(ctx, &governorVaa); err != nil {
return nil, err
}
return governorVaa, nil
}
func (r *Repository) UpdateGovernor(
ctx context.Context,
nodeGovernorVaaDocToInsert []NodeGovernorVaaDoc,
nodeGovernorVaaDocToDelete []string,
governorVaasToInsert []GovernorVaaDoc,
governorVaaIdsToDelete []string) error {
// 1. start mongo transaction
session, err := r.vaas.Database().Client().StartSession()
if err != nil {
return err
}
err = session.StartTransaction()
if err != nil {
return err
}
// 2. insert node governor vaas.
if len(nodeGovernorVaaDocToInsert) > 0 {
var nodeGovVaadocs []interface{}
for _, doc := range nodeGovernorVaaDocToInsert {
nodeGovVaadocs = append(nodeGovVaadocs, doc)
}
_, err = r.nodeGovernorVaas.InsertMany(ctx, nodeGovVaadocs)
if err != nil {
session.AbortTransaction(ctx)
return err
}
}
// 3. delete node governor vaas.
if len(nodeGovernorVaaDocToDelete) > 0 {
_, err = r.nodeGovernorVaas.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": nodeGovernorVaaDocToDelete}})
if err != nil {
session.AbortTransaction(ctx)
return err
}
}
// 4. insert governor vaas.
if len(governorVaasToInsert) > 0 {
var govVaaDocs []interface{}
for _, doc := range governorVaasToInsert {
govVaaDocs = append(govVaaDocs, doc)
}
_, err = r.governorVaas.InsertMany(ctx, govVaaDocs)
if err != nil {
session.AbortTransaction(ctx)
return err
}
}
// 5. delete governor vaas.
if len(governorVaaIdsToDelete) > 0 {
_, err = r.governorVaas.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": governorVaaIdsToDelete}})
if err != nil {
session.AbortTransaction(ctx)
return err
}
}
// 6. commit transaction
err = session.CommitTransaction(ctx)
if err != nil {
session.AbortTransaction(ctx)
return err
}
return nil
}

View File

@ -3,9 +3,15 @@ package storage
import (
"time"
"errors"
"strconv"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson/bsontype"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
)
// VaaDoc represents a VAA document.
@ -43,6 +49,23 @@ type DuplicateVaaDoc struct {
UpdatedAt *time.Time `bson:"updatedAt"`
}
type NodeGovernorVaaDoc struct {
ID string `bson:"_id"`
NodeName string `bson:"nodeName"`
NodeAddress string `bson:"nodeAddress"`
VaaID string `bson:"vaaId"`
}
type GovernorVaaDoc struct {
ID string `bson:"_id"`
ChainID sdk.ChainID `bson:"chainId"`
EmitterAddress string `bson:"emitterAddress"`
Sequence string `bson:"sequence"`
TxHash string `bson:"txHash"`
ReleaseTime time.Time `bson:"releaseTime"`
Amount Uint64 `bson:"amount"`
}
func (d *DuplicateVaaDoc) ToVaaDoc(duplicatedFixed bool) *VaaDoc {
return &VaaDoc{
ID: d.VaaID,
@ -63,7 +86,7 @@ func (d *DuplicateVaaDoc) ToVaaDoc(duplicatedFixed bool) *VaaDoc {
}
func (v *VaaDoc) ToDuplicateVaaDoc() (*DuplicateVaaDoc, error) {
vaa, err := vaa.Unmarshal(v.Vaa)
vaa, err := sdk.Unmarshal(v.Vaa)
if err != nil {
return nil, err
}
@ -85,3 +108,26 @@ func (v *VaaDoc) ToDuplicateVaaDoc() (*DuplicateVaaDoc, error) {
UpdatedAt: v.UpdatedAt,
}, nil
}
type Uint64 uint64
func (u Uint64) MarshalBSONValue() (bsontype.Type, []byte, error) {
ui64Str := strconv.FormatUint(uint64(u), 10)
d128, err := primitive.ParseDecimal128(ui64Str)
return bsontype.Decimal128, bsoncore.AppendDecimal128(nil, d128), err
}
func (u *Uint64) UnmarshalBSONValue(t bsontype.Type, b []byte) error {
d128, _, ok := bsoncore.ReadDecimal128(b)
if !ok {
return errors.New("Uint64 UnmarshalBSONValue error")
}
ui64, err := strconv.ParseUint(d128.String(), 10, 64)
if err != nil {
return err
}
*u = Uint64(ui64)
return nil
}

View File

@ -11,3 +11,7 @@ func NewNoopEventDispatcher() *NoopEventDispatcher {
func (n *NoopEventDispatcher) NewDuplicateVaa(context.Context, DuplicateVaa) error {
return nil
}
func (n *NoopEventDispatcher) NewGovernorStatus(context.Context, GovernorStatus) error {
return nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
aws_sns "github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/aws/aws-sdk-go-v2/service/sns/types"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/track"
)
@ -26,6 +27,12 @@ func NewSnsEventDispatcher(awsConfig aws.Config, url string) (*SnsEventDispatche
}
func (s *SnsEventDispatcher) NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error {
attrs := map[string]types.MessageAttributeValue{
"messageType": {
DataType: aws.String("String"),
StringValue: aws.String("duplicated-vaa"),
},
}
body, err := json.Marshal(event{
TrackID: track.GetTrackIDForDuplicatedVAA(e.VaaID),
Type: "duplicated-vaa",
@ -42,6 +49,7 @@ func (s *SnsEventDispatcher) NewDuplicateVaa(ctx context.Context, e DuplicateVaa
MessageDeduplicationId: aws.String(groupID),
Message: aws.String(string(body)),
TopicArn: aws.String(s.url),
MessageAttributes: attrs,
})
return err
}
@ -56,3 +64,31 @@ func createDeduplicationIDForDuplicateVaa(e DuplicateVaa) string {
}
return deduplicationID
}
func (s *SnsEventDispatcher) NewGovernorStatus(ctx context.Context, e GovernorStatus) error {
attrs := map[string]types.MessageAttributeValue{
"messageType": {
DataType: aws.String("String"),
StringValue: aws.String("governor"),
},
}
body, err := json.Marshal(event{
TrackID: track.GetTrackIDForGovernorStatus(e.NodeName, e.Timestamp),
Type: "governor-status",
Source: "fly",
Data: e,
})
if err != nil {
return err
}
groupID := fmt.Sprintf("%s-%v", e.NodeAddress, e.Timestamp)
_, err = s.api.Publish(ctx,
&aws_sns.PublishInput{
MessageGroupId: aws.String(groupID),
MessageDeduplicationId: aws.String(groupID),
Message: aws.String(string(body)),
TopicArn: aws.String(s.url),
MessageAttributes: attrs,
})
return err
}

View File

@ -16,6 +16,14 @@ type DuplicateVaa struct {
Timestamp *time.Time `json:"timestamp"`
}
type GovernorStatus struct {
NodeAddress string `json:"nodeAddress"`
NodeName string `json:"nodeName"`
Counter int64 `json:"counter"`
Timestamp int64 `json:"timestamp"`
Chains any `json:"chains"`
}
type event struct {
TrackID string `json:"trackId"`
Type string `json:"type"`
@ -25,4 +33,5 @@ type event struct {
type EventDispatcher interface {
NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error
NewGovernorStatus(ctx context.Context, e GovernorStatus) error
}

View File

@ -16,3 +16,8 @@ func GetTrackIDForDuplicatedVAA(vaaID string) string {
uuid := uuid.New()
return fmt.Sprintf("fly-duplicated-vaa-%s-%s", vaaID, uuid.String())
}
func GetTrackIDForGovernorStatus(nodeName string, timestamp int64) string {
uuid := uuid.New()
return fmt.Sprintf("fly-governor-status-%s-%v-%s", nodeName, timestamp, uuid.String())
}

View File

@ -260,6 +260,14 @@ func Run(db *mongo.Database) error {
return err
}
// create index in nodeGovernorVaas collection by vaaId.
indexNodeGovernorVaasByVaaId := mongo.IndexModel{
Keys: bson.D{{Key: "vaaId", Value: 1}}}
_, err = db.Collection("nodeGovernorVaas").Indexes().CreateOne(context.TODO(), indexNodeGovernorVaasByVaaId)
if err != nil && isNotAlreadyExistsError(err) {
return err
}
return nil
}

View File

@ -131,22 +131,22 @@ type GovernorStatusUpdate struct {
}
type ChainGovernorStatusChain struct {
ChainId uint32 `bson:"chainid"`
RemainingAvailableNotional Uint64 `bson:"remainingavailablenotional"`
Emitters []*ChainGovernorStatusEmitter `bson:"emitters"`
ChainId uint32 `bson:"chainid" json:"chainId"`
RemainingAvailableNotional Uint64 `bson:"remainingavailablenotional" json:"remainingAvailableNotional"`
Emitters []*ChainGovernorStatusEmitter `bson:"emitters" json:"emitters"`
}
type ChainGovernorStatusEmitter struct {
EmitterAddress string `bson:"emitteraddress"`
TotalEnqueuedVaas Uint64 `bson:"totalenqueuedvaas"`
EnqueuedVaas []*ChainGovernorStatusEnqueuedVAA `bson:"enqueuedvaas"`
EmitterAddress string `bson:"emitteraddress" json:"emitterAddress"`
TotalEnqueuedVaas Uint64 `bson:"totalenqueuedvaas" json:"totalEnqueuedVaas"`
EnqueuedVaas []*ChainGovernorStatusEnqueuedVAA `bson:"enqueuedvaas" json:"enqueuedVaas"`
}
type ChainGovernorStatusEnqueuedVAA struct {
Sequence string `bson:"sequence"`
ReleaseTime uint32 `bson:"releasetime"`
NotionalValue Uint64 `bson:"notionalvalue"`
TxHash string `bson:"txhash"`
Sequence string `bson:"sequence" json:"sequence"`
ReleaseTime uint32 `bson:"releasetime" json:"releaseTime"`
NotionalValue Uint64 `bson:"notionalvalue" json:"notionalValue"`
TxHash string `bson:"txhash" json:"txHash"`
}
type ChainGovernorConfigUpdate struct {

View File

@ -335,8 +335,24 @@ func (s *Repository) UpsertGovernorStatus(govS *gossipv1.SignedChainGovernorStat
Error: err2,
}
s.alertClient.CreateAndSend(context.TODO(), flyAlert.ErrorSaveGovernorStatus, alertContext)
return err2
}
return err2
// send governor status to topic.
err3 := s.eventDispatcher.NewGovernorStatus(context.TODO(), event.GovernorStatus{
NodeAddress: id,
NodeName: status.NodeName,
Counter: status.Counter,
Timestamp: status.Timestamp,
Chains: status.Chains,
})
if err3 != nil {
s.log.Error("Error sending governor status to topic",
zap.String("guardian", status.NodeName),
zap.Error(err3))
}
return err3
}
func (s *Repository) updateVAACount(chainID vaa.ChainID) {

View File

@ -82,6 +82,7 @@ func createChangesDoc(source, _type string, timestamp *time.Time) bson.D {
}
}
// UpsertOriginTx upserts a source transaction document.
func (r *Repository) UpsertOriginTx(ctx context.Context, params *UpsertOriginTxParams) error {
now := time.Now()
@ -364,3 +365,25 @@ func (r *Repository) GetDocumentsByVaas(
return globalTransactions, nil
}
// SourceTxDoc represents a source transaction document.
type SourceTxDoc struct {
ID string `bson:"_id"`
OriginTx *struct {
ChainID int `bson:"chainId"`
Status string `bson:"status"`
Processed bool `bson:"processed"`
NativeTxHash string `bson:"nativeTxHash"`
From string `bson:"from"`
} `bson:"originTx"`
}
// FindSourceTxById returns the source transaction document with the given ID.
func (r *Repository) FindSourceTxById(ctx context.Context, id string) (*SourceTxDoc, error) {
var sourceTxDoc SourceTxDoc
err := r.globalTransactions.FindOne(ctx, bson.M{"_id": id}).Decode(&sourceTxDoc)
if err != nil {
return nil, err
}
return &sourceTxDoc, err
}

View File

@ -32,6 +32,7 @@ func NewServer(logger *zap.Logger, port string, pprofEnabled bool, vaaController
api.Get("/ready", ctrl.ReadyCheck)
api.Post("/vaa/process", vaaController.Process)
api.Post("/vaa/tx-hash", vaaController.CreateTxHash)
return &Server{
app: app,

View File

@ -1,10 +1,14 @@
package vaa
import (
"encoding/hex"
"strconv"
"strings"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/pool"
"github.com/wormhole-foundation/wormhole-explorer/common/utils"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
@ -35,9 +39,7 @@ func NewController(rpcPool map[sdk.ChainID]*pool.Pool, wormchainRpcPool map[sdk.
}
func (c *Controller) Process(ctx *fiber.Ctx) error {
payload := struct {
ID string `json:"id"`
}{}
var payload ProcessVaaRequest
if err := ctx.BodyParser(&payload); err != nil {
return err
@ -79,3 +81,72 @@ func (c *Controller) Process(ctx *fiber.Ctx) error {
Result any `json:"result"`
}{Result: result})
}
func (c *Controller) CreateTxHash(ctx *fiber.Ctx) error {
var payload TxHashRequest
if err := ctx.BodyParser(&payload); err != nil {
return err
}
txHash, err := hex.DecodeString(utils.Remove0x(payload.TxHash))
if err != nil {
return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid tx hash", "details": err.Error()})
}
c.logger.Info("Processing txHash from endpoint", zap.String("id", payload.ID))
vaaID := strings.Split(payload.ID, "/")
if len(vaaID) != 3 {
return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid vaa id"})
}
chainIDStr, emitter, sequenceStr := vaaID[0], vaaID[1], vaaID[2]
chainIDUint, err := strconv.ParseUint(chainIDStr, 10, 16)
if err != nil {
return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "chain id is not a number", "details": err.Error()})
}
chainID := sdk.ChainID(chainIDUint)
if !domain.ChainIdIsValid(chainID) {
return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid chain id"})
}
encodedTxHash, err := domain.EncodeTrxHashByChainID(chainID, txHash)
if err != nil {
return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid tx hash", "details": err.Error()})
}
if chainID != sdk.ChainIDSolana && chainID != sdk.ChainIDAptos && chainID != sdk.ChainIDWormchain {
return ctx.JSON(TxHashResponse{NativeTxHash: encodedTxHash})
}
sourceTx, err := c.repository.FindSourceTxById(ctx.Context(), payload.ID)
if err == nil && sourceTx != nil {
if sourceTx.OriginTx != nil && sourceTx.OriginTx.NativeTxHash != "" {
return ctx.JSON(TxHashResponse{NativeTxHash: sourceTx.OriginTx.NativeTxHash})
}
}
p := &consumer.ProcessSourceTxParams{
TrackID: "controller-tx-hash",
Source: "controller",
Timestamp: nil,
VaaId: payload.ID,
ChainId: chainID,
Emitter: emitter,
Sequence: sequenceStr,
TxHash: encodedTxHash,
IsVaaSigned: false,
Metrics: c.metrics,
Overwrite: true,
DisableDBUpsert: true,
}
result, err := consumer.ProcessSourceTx(ctx.Context(), c.logger, c.rpcPool, c.wormchainRpcPool, c.repository, p, c.p2pNetwork)
if err != nil {
return err
}
return ctx.JSON(TxHashResponse{NativeTxHash: result.NativeTxHash})
}

View File

@ -9,9 +9,10 @@ import (
)
type Repository struct {
db *mongo.Database
logger *zap.Logger
vaas *mongo.Collection
db *mongo.Database
logger *zap.Logger
vaas *mongo.Collection
globalTransactions *mongo.Collection
}
type VaaDoc struct {
@ -23,8 +24,9 @@ type VaaDoc struct {
// NewRepository create a new Repository.
func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository {
return &Repository{db: db,
logger: logger.With(zap.String("module", "VaaRepository")),
vaas: db.Collection("vaas"),
logger: logger.With(zap.String("module", "VaaRepository")),
vaas: db.Collection("vaas"),
globalTransactions: db.Collection("globalTransactions"),
}
}

View File

@ -0,0 +1,17 @@
package vaa
// ProcessVaaRequest request a vaa to process.
type ProcessVaaRequest struct {
ID string `json:"id"`
}
// TxHashRequest request a tx hash.
type TxHashRequest struct {
ID string `json:"id"`
TxHash string `json:"txHash"`
}
// ProcessVaaResponse response from processing a vaa.
type TxHashResponse struct {
NativeTxHash string `json:"nativeTxHash"`
}