diff --git a/api/docs/docs.go b/api/docs/docs.go index c9d0916b..d97a7657 100644 --- a/api/docs/docs.go +++ b/api/docs/docs.go @@ -563,6 +563,29 @@ const docTemplate = `{ } } }, + "/api/v1/governor/vaas": { + "get": { + "description": "Returns all vaas in Governor.", + "tags": [ + "wormholescan" + ], + "operationId": "governor-vaas", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/response.Response-array_governor_GovernorVaasResponse" + } + }, + "400": { + "description": "Bad Request" + }, + "500": { + "description": "Internal Server Error" + } + } + } + }, "/api/v1/health": { "get": { "description": "Health check", @@ -2431,6 +2454,35 @@ const docTemplate = `{ } } }, + "governor.GovernorVaasResponse": { + "type": "object", + "properties": { + "amount": { + "type": "integer" + }, + "chainId": { + "$ref": "#/definitions/vaa.ChainID" + }, + "emitterAddress": { + "type": "string" + }, + "releaseTime": { + "type": "string" + }, + "sequence": { + "type": "string" + }, + "status": { + "type": "string" + }, + "txHash": { + "type": "string" + }, + "vaaId": { + "type": "string" + } + } + }, "governor.MaxNotionalAvailableRecord": { "type": "object", "properties": { @@ -3156,6 +3208,20 @@ const docTemplate = `{ } } }, + "response.Response-array_governor_GovernorVaasResponse": { + "type": "object", + "properties": { + "data": { + "type": "array", + "items": { + "$ref": "#/definitions/governor.GovernorVaasResponse" + } + }, + "pagination": { + "$ref": "#/definitions/response.ResponsePagination" + } + } + }, "response.Response-array_governor_NotionalAvailable": { "type": "object", "properties": { diff --git a/api/docs/swagger.json b/api/docs/swagger.json index a4561415..3cface37 100644 --- a/api/docs/swagger.json +++ b/api/docs/swagger.json @@ -556,6 +556,29 @@ } } }, + "/api/v1/governor/vaas": { + "get": { + "description": "Returns all vaas in Governor.", + "tags": [ + "wormholescan" + ], + "operationId": "governor-vaas", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/response.Response-array_governor_GovernorVaasResponse" + } + }, + "400": { + "description": "Bad Request" + }, + "500": { + "description": "Internal Server Error" + } + } + } + }, "/api/v1/health": { "get": { "description": "Health check", @@ -2424,6 +2447,35 @@ } } }, + "governor.GovernorVaasResponse": { + "type": "object", + "properties": { + "amount": { + "type": "integer" + }, + "chainId": { + "$ref": "#/definitions/vaa.ChainID" + }, + "emitterAddress": { + "type": "string" + }, + "releaseTime": { + "type": "string" + }, + "sequence": { + "type": "string" + }, + "status": { + "type": "string" + }, + "txHash": { + "type": "string" + }, + "vaaId": { + "type": "string" + } + } + }, "governor.MaxNotionalAvailableRecord": { "type": "object", "properties": { @@ -3149,6 +3201,20 @@ } } }, + "response.Response-array_governor_GovernorVaasResponse": { + "type": "object", + "properties": { + "data": { + "type": "array", + "items": { + "$ref": "#/definitions/governor.GovernorVaasResponse" + } + }, + "pagination": { + "$ref": "#/definitions/response.ResponsePagination" + } + } + }, "response.Response-array_governor_NotionalAvailable": { "type": "object", "properties": { diff --git a/api/docs/swagger.yaml b/api/docs/swagger.yaml index ff8bee2b..2da42a2a 100644 --- a/api/docs/swagger.yaml +++ b/api/docs/swagger.yaml @@ -199,6 +199,25 @@ definitions: notionalLimit: type: integer type: object + governor.GovernorVaasResponse: + properties: + amount: + type: integer + chainId: + $ref: '#/definitions/vaa.ChainID' + emitterAddress: + type: string + releaseTime: + type: string + sequence: + type: string + status: + type: string + txHash: + type: string + vaaId: + type: string + type: object governor.MaxNotionalAvailableRecord: properties: availableNotional: @@ -672,6 +691,15 @@ definitions: pagination: $ref: '#/definitions/response.ResponsePagination' type: object + response.Response-array_governor_GovernorVaasResponse: + properties: + data: + items: + $ref: '#/definitions/governor.GovernorVaasResponse' + type: array + pagination: + $ref: '#/definitions/response.ResponsePagination' + type: object response.Response-array_governor_NotionalAvailable: properties: data: @@ -1546,6 +1574,21 @@ paths: description: Internal Server Error tags: - wormholescan + /api/v1/governor/vaas: + get: + description: Returns all vaas in Governor. + operationId: governor-vaas + responses: + "200": + description: OK + schema: + $ref: '#/definitions/response.Response-array_governor_GovernorVaasResponse' + "400": + description: Bad Request + "500": + description: Internal Server Error + tags: + - wormholescan /api/v1/health: get: description: Health check diff --git a/api/handlers/governor/repository.go b/api/handlers/governor/repository.go index 39741569..39cfea5a 100644 --- a/api/handlers/governor/repository.go +++ b/api/handlers/governor/repository.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" + mongoTypes "github.com/wormhole-foundation/wormhole-explorer/api/internal/mongo" "github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination" "github.com/wormhole-foundation/wormhole-explorer/common/types" "github.com/wormhole-foundation/wormhole/sdk/vaa" @@ -27,6 +28,7 @@ type Repository struct { collections struct { governorConfig *mongo.Collection governorStatus *mongo.Collection + governorVaas *mongo.Collection } } @@ -37,9 +39,11 @@ func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository { collections: struct { governorConfig *mongo.Collection governorStatus *mongo.Collection + governorVaas *mongo.Collection }{ governorConfig: db.Collection("governorConfig"), governorStatus: db.Collection("governorStatus"), + governorVaas: db.Collection("governorVaas"), }, } } @@ -1725,3 +1729,46 @@ func (r *Repository) IsVaaEnqueued( return true, nil } + +type GovernorVaaDoc struct { + ID string `bson:"_id"` + ChainID vaa.ChainID `bson:"chainId"` + EmitterAddress string `bson:"emitterAddress"` + Sequence string `bson:"sequence"` + TxHash string `bson:"txHash"` + ReleaseTime time.Time `bson:"releaseTime"` + Amount mongoTypes.Uint64 `bson:"amount"` + Vaas []any `bson:"vaas"` +} + +func (r *Repository) GetGovernorVaas(ctx context.Context) ([]GovernorVaaDoc, error) { + // left outer join on the `vaas` collection + pipeline := []bson.D{{{Key: "$lookup", Value: bson.D{ + {Key: "from", Value: "vaas"}, + {Key: "localField", Value: "_id"}, + {Key: "foreignField", Value: "_id"}, + {Key: "as", Value: "vaas"}, + }}}, + } + + cur, err := r.collections.governorVaas.Aggregate(ctx, pipeline) + if err != nil { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + r.logger.Error("failed execute aggregate command to get governor enqueded vaas", + zap.Error(err), zap.String("requestID", requestID)) + return nil, errors.WithStack(err) + } + + // read results from cursor + var result []GovernorVaaDoc + err = cur.All(ctx, &result) + if err != nil { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + r.logger.Error("failed decoding cursor to []*VaaDoc", + zap.Error(err), + zap.String("requestID", requestID), + ) + return nil, errors.WithStack(err) + } + return result, nil +} diff --git a/api/handlers/governor/service.go b/api/handlers/governor/service.go index b6fa2848..e5f35374 100644 --- a/api/handlers/governor/service.go +++ b/api/handlers/governor/service.go @@ -228,3 +228,13 @@ func (s *Service) IsVaaEnqueued(ctx context.Context, chainID vaa.ChainID, emitte isEnqueued, err := s.repo.IsVaaEnqueued(ctx, chainID, emitter, seq) return isEnqueued, err } + +// GetGovernorVaas get enqueued vaas. +// Guardian api migration. +func (s *Service) GetGovernorVaas(ctx context.Context) ([]GovernorVaaDoc, error) { + result, err := s.repo.GetGovernorVaas(ctx) + if err != nil { + return nil, err + } + return result, nil +} diff --git a/api/routes/wormscan/governor/controller.go b/api/routes/wormscan/governor/controller.go index 97484dc4..45b948d3 100644 --- a/api/routes/wormscan/governor/controller.go +++ b/api/routes/wormscan/governor/controller.go @@ -399,3 +399,38 @@ func (c *Controller) GetEnqueuedVaasByChainID(ctx *fiber.Ctx) error { return ctx.JSON(enqueuedVaas) } + +// GetGovernorVaas godoc +// @Description Returns all vaas in Governor. +// @Tags wormholescan +// @ID governor-vaas +// @Success 200 {object} response.Response[[]governor.GovernorVaasResponse] +// @Failure 400 +// @Failure 500 +// @Router /api/v1/governor/vaas [get] +func (c *Controller) GetGovernorVaas(ctx *fiber.Ctx) error { + enqueuedVaas, err := c.srv.GetGovernorVaas(ctx.Context()) + if err != nil { + return err + } + + result := make([]GovernorVaasResponse, 0) + for _, v := range enqueuedVaas { + status := "pending" + if len(v.Vaas) > 0 { + status = "issued" + } + result = append(result, GovernorVaasResponse{ + VaaID: v.ID, + ChainID: v.ChainID, + EmitterAddress: v.EmitterAddress, + Sequence: v.Sequence, + TxHash: v.TxHash, + ReleaseTime: v.ReleaseTime, + Amount: uint64(v.Amount), + Status: status, + }) + } + + return ctx.JSON(result) +} diff --git a/api/routes/wormscan/governor/types.go b/api/routes/wormscan/governor/types.go new file mode 100644 index 00000000..f7c65715 --- /dev/null +++ b/api/routes/wormscan/governor/types.go @@ -0,0 +1,18 @@ +package governor + +import ( + "time" + + "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +type GovernorVaasResponse struct { + VaaID string `json:"vaaId"` + ChainID vaa.ChainID `json:"chainId"` + EmitterAddress string `json:"emitterAddress"` + Sequence string `json:"sequence"` + TxHash string `json:"txHash"` + ReleaseTime time.Time `json:"releaseTime"` + Amount uint64 `json:"amount"` + Status string `json:"status"` +} diff --git a/api/routes/wormscan/routes.go b/api/routes/wormscan/routes.go index 00c5a77c..2e158d8a 100644 --- a/api/routes/wormscan/routes.go +++ b/api/routes/wormscan/routes.go @@ -144,6 +144,7 @@ func RegisterRoutes( enqueueVaas := governor.Group("/enqueued_vaas") enqueueVaas.Get("/", governorCtrl.GetEnqueuedVaas) enqueueVaas.Get("/:chain", governorCtrl.GetEnqueuedVaasByChainID) + governor.Get("/vaas", governorCtrl.GetGovernorVaas) relays := api.Group("/relays") relays.Get("/:chain/:emitter/:sequence", relaysCtrl.FindOne) diff --git a/common/client/txtracker/txtracker.go b/common/client/txtracker/txtracker.go index 26b51dd7..3e338f7a 100644 --- a/common/client/txtracker/txtracker.go +++ b/common/client/txtracker/txtracker.go @@ -15,6 +15,7 @@ const DefaultTimeout = 30 var ( ErrCallEndpoint = errors.New("ERROR CALL ENPOINT") + ErrBadRequest = errors.New("BAD REQUEST") ErrInternalError = errors.New("INTERNAL ERROR") ) @@ -84,3 +85,55 @@ func (c *TxTrackerAPIClient) Process(vaaID string) (*ProcessVaaResponse, error) return nil, ErrInternalError } } + +// CreateTxHashFunc represent a create tx hash function. +type CreateTxHashFunc func(vaaID, txHash string) (*TxHashResponse, error) + +// TxHashResponse represent a create tx hash response. +type TxHashResponse struct { + NativeTxHash string `json:"nativeTxHash"` +} + +// CreateTxHash create tx hash. +func (c *TxTrackerAPIClient) CreateTxHash(vaaID, txHash string) (*TxHashResponse, error) { + endpoint := fmt.Sprintf("%s/vaa/tx-hash", c.BaseURL) + + // create request body. + payload := struct { + VaaID string `json:"id"` + TxHash string `json:"txHash"` + }{ + VaaID: vaaID, + TxHash: txHash, + } + + body, err := json.Marshal(payload) + if err != nil { + c.Logger.Error("error marshalling payload", zap.Error(err), zap.String("vaaID", vaaID), zap.String("txHash", txHash)) + return nil, err + } + + response, err := c.Client.Post(endpoint, "application/json", bytes.NewBuffer(body)) + if err != nil { + c.Logger.Error("error call create tx hash endpoint", + zap.Error(err), + zap.String("vaaID", vaaID), + zap.String("txHash", txHash)) + return nil, ErrCallEndpoint + } + + defer response.Body.Close() + switch response.StatusCode { + case http.StatusOK: + var txHashResponse TxHashResponse + json.NewDecoder(response.Body).Decode(&txHashResponse) + return &txHashResponse, nil + case http.StatusBadRequest: + return nil, ErrBadRequest + case http.StatusInternalServerError: + return nil, ErrInternalError + default: + return nil, ErrInternalError + } + +} diff --git a/common/repository/names.go b/common/repository/names.go index c97ccc96..73b8bde8 100644 --- a/common/repository/names.go +++ b/common/repository/names.go @@ -1,9 +1,11 @@ package repository const ( - VaaIdTxHash = "vaaIdTxHash" - TransferPrices = "transferPrices" - Vaas = "vaas" - DuplicateVaas = "duplicateVaas" - GuardianSets = "guardianSets" + VaaIdTxHash = "vaaIdTxHash" + TransferPrices = "transferPrices" + Vaas = "vaas" + DuplicateVaas = "duplicateVaas" + GuardianSets = "guardianSets" + NodeGovernorVaas = "nodeGovernorVaas" + GovernorVaas = "governorVaas" ) diff --git a/deploy/fly-event-processor/configmap.yaml b/deploy/fly-event-processor/configmap.yaml index ffd4490d..a40114ed 100644 --- a/deploy/fly-event-processor/configmap.yaml +++ b/deploy/fly-event-processor/configmap.yaml @@ -6,4 +6,5 @@ metadata: namespace: {{ .NAMESPACE }} data: aws-region: {{ .SQS_AWS_REGION }} - duplicate-vaa-sqs-url: {{ .DUPLICATE_VAA_SQS_URL }} \ No newline at end of file + duplicate-vaa-sqs-url: {{ .DUPLICATE_VAA_SQS_URL }} + governor-sqs-url: {{ .GOVERNOR_SQS_URL }} diff --git a/deploy/fly-event-processor/env/production-mainnet.env b/deploy/fly-event-processor/env/production-mainnet.env index f0dd2719..b900b004 100644 --- a/deploy/fly-event-processor/env/production-mainnet.env +++ b/deploy/fly-event-processor/env/production-mainnet.env @@ -15,3 +15,5 @@ AWS_IAM_ROLE= ALERT_ENABLED=false METRICS_ENABLED=true CONSUMER_WORKER_SIZE=1 +TX_TRACKER_URL=http://wormscan-tx-tracker.wormscan/api +TX_TRACKER_TIMEOUT=30 diff --git a/deploy/fly-event-processor/env/production-testnet.env b/deploy/fly-event-processor/env/production-testnet.env index 5c5421e6..162827af 100644 --- a/deploy/fly-event-processor/env/production-testnet.env +++ b/deploy/fly-event-processor/env/production-testnet.env @@ -15,3 +15,5 @@ AWS_IAM_ROLE= ALERT_ENABLED=false METRICS_ENABLED=true CONSUMER_WORKER_SIZE=1 +TX_TRACKER_URL=http://wormscan-tx-tracker.wormscan-testnet/api +TX_TRACKER_TIMEOUT=30 diff --git a/deploy/fly-event-processor/env/staging-mainnet.env b/deploy/fly-event-processor/env/staging-mainnet.env index fdb63261..54c03fbd 100644 --- a/deploy/fly-event-processor/env/staging-mainnet.env +++ b/deploy/fly-event-processor/env/staging-mainnet.env @@ -15,3 +15,5 @@ AWS_IAM_ROLE= ALERT_ENABLED=false METRICS_ENABLED=true CONSUMER_WORKER_SIZE=1 +TX_TRACKER_URL=http://wormscan-tx-tracker.wormscan/api +TX_TRACKER_TIMEOUT=30 diff --git a/deploy/fly-event-processor/env/staging-testnet.env b/deploy/fly-event-processor/env/staging-testnet.env index f6b8c598..5cd5afcb 100644 --- a/deploy/fly-event-processor/env/staging-testnet.env +++ b/deploy/fly-event-processor/env/staging-testnet.env @@ -15,3 +15,5 @@ AWS_IAM_ROLE= ALERT_ENABLED=false METRICS_ENABLED=true CONSUMER_WORKER_SIZE=1 +TX_TRACKER_URL=http://wormscan-tx-tracker.wormscan-testnet/api +TX_TRACKER_TIMEOUT=30 diff --git a/deploy/fly-event-processor/fly-event-processor-service.yaml b/deploy/fly-event-processor/fly-event-processor-service.yaml index fe5e059b..ea90be06 100644 --- a/deploy/fly-event-processor/fly-event-processor-service.yaml +++ b/deploy/fly-event-processor/fly-event-processor-service.yaml @@ -64,6 +64,11 @@ spec: configMapKeyRef: name: fly-event-processor key: duplicate-vaa-sqs-url + - name: GOVERNOR_SQS_URL + valueFrom: + configMapKeyRef: + name: fly-event-processor + key: governor-sqs-url - name: AWS_REGION valueFrom: configMapKeyRef: @@ -86,6 +91,10 @@ spec: value: "{{ .CONSUMER_WORKER_SIZE }}" - name: GUARDIAN_API_PROVIDER_PATH value: "/opt/fly-event-processor/guardian-provider.json" + - name: TX_TRACKER_URL + value: "{{ .TX_TRACKER_URL }}" + - name: TX_TRACKER_TIMEOUT + value: "{{ .TX_TRACKER_TIMEOUT }}" resources: limits: memory: {{ .RESOURCES_LIMITS_MEMORY }} diff --git a/fly-event-processor/cmd/service/run.go b/fly-event-processor/cmd/service/run.go index 77a1f812..f87fdf2f 100644 --- a/fly-event-processor/cmd/service/run.go +++ b/fly-event-processor/cmd/service/run.go @@ -3,6 +3,7 @@ package service import ( "context" "errors" + "fmt" "log" "os" "os/signal" @@ -17,16 +18,22 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/common/health" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/common/pool" - "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/vaa" - "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor" + + governorConsumer "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer/governor" + vaaConsumer "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer/vaa" + governorProcessor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/governor" + vaaprocessor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/vaa" + + txTracker "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/config" - "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/infrastructure" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/vaa" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics" ) @@ -61,23 +68,35 @@ func Run() { // create a new repository repository := storage.NewRepository(logger, db.Database) + //TxTracker createTxHash client + createTxHashFunc, err := newCreateTxHashFunc(cfg, logger) + if err != nil { + logger.Fatal("failed to initialize VAA parser", zap.Error(err)) + } + // create a new processor - processor := processor.NewProcessor(guardianApiProviderPool, repository, logger, metrics) + dupVaaProcessor := vaaprocessor.NewProcessor(guardianApiProviderPool, repository, logger, metrics) + governorProcessor := governorProcessor.NewProcessor(repository, createTxHashFunc, logger, metrics) // start serving /health and /ready endpoints healthChecks, err := makeHealthChecks(rootCtx, cfg, db.Database) if err != nil { logger.Fatal("Failed to create health checks", zap.Error(err)) } - vaaCtrl := vaa.NewController(processor.Process, repository, logger) + vaaCtrl := vaa.NewController(dupVaaProcessor.Process, repository, logger) server := infrastructure.NewServer(logger, cfg.Port, vaaCtrl, cfg.PprofEnabled, healthChecks...) server.Start() // create and start a duplicate VAA consumer. duplicateVaaConsumeFunc := newDuplicateVaaConsumeFunc(rootCtx, cfg, metrics, logger) - duplicateVaa := consumer.New(duplicateVaaConsumeFunc, processor.Process, logger, metrics, cfg.P2pNetwork, cfg.ConsumerWorkerSize) + duplicateVaa := vaaConsumer.New(duplicateVaaConsumeFunc, dupVaaProcessor.Process, logger, metrics, cfg.P2pNetwork, cfg.ConsumerWorkerSize) duplicateVaa.Start(rootCtx) + // create and start a governor status consumer. + governorStatusConsumerFunc := newGovernorStatusConsumeFunc(rootCtx, cfg, metrics, logger) + governorStatus := governorConsumer.New(governorStatusConsumerFunc, governorProcessor.Process, logger, metrics, cfg.P2pNetwork, cfg.GovernorConsumerWorkerSize) + governorStatus.Start(rootCtx) + logger.Info("Started wormholescan-fly-event-processor") // Waiting for signal @@ -203,13 +222,49 @@ func newDuplicateVaaConsumeFunc( cfg *config.ServiceConfiguration, metrics metrics.Metrics, logger *zap.Logger, -) queue.ConsumeFunc { +) queue.ConsumeFunc[queue.EventDuplicateVaa] { sqsConsumer, err := newSqsConsumer(ctx, cfg, cfg.DuplicateVaaSQSUrl) if err != nil { logger.Fatal("failed to create sqs consumer", zap.Error(err)) } - vaaQueue := queue.NewEventSqs(sqsConsumer, metrics, logger) + vaaQueue := queue.NewEventSqs[queue.EventDuplicateVaa](sqsConsumer, + metrics.IncDuplicatedVaaConsumedQueue, logger) return vaaQueue.Consume } + +func newGovernorStatusConsumeFunc( + ctx context.Context, + cfg *config.ServiceConfiguration, + metrics metrics.Metrics, + logger *zap.Logger, +) queue.ConsumeFunc[queue.EventGovernorStatus] { + + sqsConsumer, err := newSqsConsumer(ctx, cfg, cfg.GovernorSQSUrl) + if err != nil { + logger.Fatal("failed to create sqs consumer", zap.Error(err)) + } + + governorStatusQueue := queue.NewEventSqs[queue.EventGovernorStatus](sqsConsumer, + metrics.IncGovernorStatusConsumedQueue, logger) + return governorStatusQueue.Consume +} + +func newCreateTxHashFunc( + cfg *config.ServiceConfiguration, + logger *zap.Logger, +) (txTracker.CreateTxHashFunc, error) { + if cfg.Environment == config.EnvironmentLocal { + return func(vaaID, txHash string) (*txTracker.TxHashResponse, error) { + return &txTracker.TxHashResponse{ + NativeTxHash: txHash, + }, nil + }, nil + } + createTxHashClient, err := txTracker.NewTxTrackerAPIClient(cfg.TxTrackerTimeout, cfg.TxTrackerUrl, logger) + if err != nil { + return nil, fmt.Errorf("failed to initialize TxTracker client: %w", err) + } + return createTxHashClient.CreateTxHash, nil +} diff --git a/fly-event-processor/config/config.go b/fly-event-processor/config/config.go index f8e38f59..d3c9e260 100644 --- a/fly-event-processor/config/config.go +++ b/fly-event-processor/config/config.go @@ -10,6 +10,10 @@ import ( "github.com/sethvargo/go-envconfig" ) +const ( + EnvironmentLocal = "local" +) + // p2p network constants. const ( P2pMainNet = "mainnet" @@ -29,7 +33,9 @@ type ServiceConfiguration struct { AlertApiKey string `env:"ALERT_API_KEY"` MetricsEnabled bool `env:"METRICS_ENABLED,default=false"` // Fly event consumer configuration - ConsumerWorkerSize int `env:"CONSUMER_WORKER_SIZE,default=1"` + ConsumerWorkerSize int `env:"CONSUMER_WORKER_SIZE,default=1"` + GovernorConsumerWorkerSize int `env:"GOVERNOR_CONSUMER_WORKER_SIZE,default=1"` + // Database configuration MongoURI string `env:"MONGODB_URI,required"` MongoDatabase string `env:"MONGODB_DATABASE,required"` @@ -39,6 +45,11 @@ type ServiceConfiguration struct { AwsSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"` AwsRegion string `env:"AWS_REGION"` DuplicateVaaSQSUrl string `env:"DUPLICATE_VAA_SQS_URL"` + GovernorSQSUrl string `env:"GOVERNOR_SQS_URL"` + // Tx-tracker client configuration + TxTrackerUrl string `env:"TX_TRACKER_URL,required"` + TxTrackerTimeout int64 `env:"TX_TRACKER_TIMEOUT,default=10"` + // Guardian api provider configuration GuardianAPIProviderPath string `env:"GUARDIAN_API_PROVIDER_PATH,required"` *GuardianAPIConfigurationJson `required:"false"` diff --git a/fly-event-processor/consumer/governor/consumer.go b/fly-event-processor/consumer/governor/consumer.go new file mode 100644 index 00000000..64ae8a2b --- /dev/null +++ b/fly-event-processor/consumer/governor/consumer.go @@ -0,0 +1,103 @@ +package governor + +import ( + "context" + + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/domain" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics" + govprocessor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/governor" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue" + "go.uber.org/zap" +) + +// Consumer consumer struct definition. +type Consumer struct { + consumeFunc queue.ConsumeFunc[queue.EventGovernorStatus] + processor govprocessor.ProcessorFunc + logger *zap.Logger + metrics metrics.Metrics + p2pNetwork string + workersSize int +} + +// New creates a new vaa consumer. +func New( + consumeFunc queue.ConsumeFunc[queue.EventGovernorStatus], + processor govprocessor.ProcessorFunc, + logger *zap.Logger, + metrics metrics.Metrics, + p2pNetwork string, + workersSize int, +) *Consumer { + + c := Consumer{ + consumeFunc: consumeFunc, + processor: processor, + logger: logger, + metrics: metrics, + p2pNetwork: p2pNetwork, + workersSize: workersSize, + } + + return &c +} + +// Start consumes messages from VAA queue, parse and store those messages in a repository. +func (c *Consumer) Start(ctx context.Context) { + ch := c.consumeFunc(ctx) + for i := 0; i < c.workersSize; i++ { + go c.producerLoop(ctx, ch) + } +} + +func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMessage[queue.EventGovernorStatus]) { + for { + select { + case <-ctx.Done(): + return + case msg := <-ch: + c.processEvent(ctx, msg) + } + } +} + +func (c *Consumer) processEvent(ctx context.Context, msg queue.ConsumerMessage[queue.EventGovernorStatus]) { + event := msg.Data() + + // Check if the event is a governor status event. + if event.Type != queue.GovernorStatusEventType { + msg.Done() + c.logger.Debug("event is not a governor status", + zap.Any("event", event)) + return + } + + logger := c.logger.With( + zap.String("trackId", event.TrackID), + zap.String("type", event.Type), + zap.String("node", event.Data.NodeName)) + + if msg.IsExpired() { + msg.Failed() + logger.Debug("event is expired") + c.metrics.IncGovernorStatusExpired(event.Data.NodeName, event.Data.NodeAddress) + return + } + + params := &govprocessor.Params{ + TrackID: event.TrackID, + NodeGovernorVaa: domain.ConvertEventToGovernorVaa(&event), + } + + err := c.processor(ctx, params) + if err != nil { + msg.Failed() + logger.Error("failed to process governor-status event", zap.Error(err)) + c.metrics.IncGovernorStatusFailed(params.NodeGovernorVaa.Name, params.NodeGovernorVaa.Address) + return + } + + msg.Done() + logger.Debug("governor-status event processed") + c.metrics.IncGovernorStatusProcessed(params.NodeGovernorVaa.Name, params.NodeGovernorVaa.Address) +} diff --git a/fly-event-processor/consumer/consumer.go b/fly-event-processor/consumer/vaa/consumer.go similarity index 73% rename from fly-event-processor/consumer/consumer.go rename to fly-event-processor/consumer/vaa/consumer.go index e9df0b9b..eb43dd79 100644 --- a/fly-event-processor/consumer/consumer.go +++ b/fly-event-processor/consumer/vaa/consumer.go @@ -1,11 +1,10 @@ -package consumer +package vaa import ( "context" - "github.com/wormhole-foundation/wormhole-explorer/common/pool" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics" - "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor" + processor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/vaa" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue" sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" @@ -13,18 +12,17 @@ import ( // Consumer consumer struct definition. type Consumer struct { - consumeFunc queue.ConsumeFunc - processor processor.ProcessorFunc - guardianPool *pool.Pool - logger *zap.Logger - metrics metrics.Metrics - p2pNetwork string - workersSize int + consumeFunc queue.ConsumeFunc[queue.EventDuplicateVaa] + processor processor.ProcessorFunc + logger *zap.Logger + metrics metrics.Metrics + p2pNetwork string + workersSize int } // New creates a new vaa consumer. func New( - consumeFunc queue.ConsumeFunc, + consumeFunc queue.ConsumeFunc[queue.EventDuplicateVaa], processor processor.ProcessorFunc, logger *zap.Logger, metrics metrics.Metrics, @@ -52,7 +50,7 @@ func (c *Consumer) Start(ctx context.Context) { } } -func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMessage) { +func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMessage[queue.EventDuplicateVaa]) { for { select { case <-ctx.Done(): @@ -63,13 +61,23 @@ func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMes } } -func (c *Consumer) processEvent(ctx context.Context, msg queue.ConsumerMessage) { +func (c *Consumer) processEvent(ctx context.Context, msg queue.ConsumerMessage[queue.EventDuplicateVaa]) { event := msg.Data() + + // Check if the event is a duplicate VAA event. + if event.Type != queue.DeduplicateVaaEventType { + msg.Done() + c.logger.Debug("event is not a duplicate VAA", + zap.Any("event", event)) + return + } + vaaID := event.Data.VaaID chainID := sdk.ChainID(event.Data.ChainID) logger := c.logger.With( zap.String("trackId", event.TrackID), + zap.String("type", event.Type), zap.String("vaaId", vaaID)) if msg.IsExpired() { diff --git a/fly-event-processor/domain/governor.go b/fly-event-processor/domain/governor.go new file mode 100644 index 00000000..eccc5ee3 --- /dev/null +++ b/fly-event-processor/domain/governor.go @@ -0,0 +1,79 @@ +package domain + +import ( + "fmt" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/common/utils" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +type Node struct { + Name string + Address string +} + +type NodeGovernorVaa struct { + Node + GovernorVaas map[string]GovernorVaa +} + +type GovernorVaa struct { + ID string + ChainID sdk.ChainID + EmitterAddress string + Sequence string + TxHash string + ReleaseTime time.Time + Amount uint64 +} + +// ConvertEventToGovernorVaa convert a event *queue.EventGovernorStatus to a *NodeGovernorVaa. +func ConvertEventToGovernorVaa(event *queue.EventGovernorStatus) *NodeGovernorVaa { + + // check if event is nil. + if event == nil { + return nil + } + + // check if chains is empty. + if len(event.Data.Chains) == 0 { + return nil + } + + governorVaas := make(map[string]GovernorVaa) + for _, chain := range event.Data.Chains { + for _, emitter := range chain.Emitters { + for _, enqueuedVAA := range emitter.EnqueuedVaas { + + normalizeEmitter := utils.NormalizeHex(emitter.EmitterAddress) + normalizeTxHash := utils.NormalizeHex(enqueuedVAA.TxHash) + vaaID := fmt.Sprintf("%d/%s/%s", + chain.ChainId, + normalizeEmitter, + enqueuedVAA.Sequence) + + gs := GovernorVaa{ + ID: vaaID, + ChainID: sdk.ChainID(chain.ChainId), + EmitterAddress: normalizeEmitter, + Sequence: enqueuedVAA.Sequence, + TxHash: normalizeTxHash, + ReleaseTime: time.Unix(int64(enqueuedVAA.ReleaseTime), 0), + Amount: enqueuedVAA.NotionalValue, + } + + governorVaas[vaaID] = gs + } + } + } + + return &NodeGovernorVaa{ + Node: Node{ + Name: event.Data.NodeName, + Address: event.Data.NodeAddress, + }, + GovernorVaas: governorVaas, + } +} diff --git a/fly-event-processor/http/vaa/controller.go b/fly-event-processor/http/vaa/controller.go index d6efb875..6318a22e 100644 --- a/fly-event-processor/http/vaa/controller.go +++ b/fly-event-processor/http/vaa/controller.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/gofiber/fiber/v2" - "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor" + processor "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor/vaa" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage" "go.uber.org/zap" ) diff --git a/fly-event-processor/internal/metrics/dummy.go b/fly-event-processor/internal/metrics/dummy.go index 4551ab41..956dcc37 100644 --- a/fly-event-processor/internal/metrics/dummy.go +++ b/fly-event-processor/internal/metrics/dummy.go @@ -24,3 +24,21 @@ func (d *DummyMetrics) IncDuplicatedVaaExpired(chainID sdk.ChainID) {} // IncDuplicatedVaaCanNotFixed dummy implementation. func (d *DummyMetrics) IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) {} + +// IncGovernorStatusConsumedQueue dummy implementation. +func (d *DummyMetrics) IncGovernorStatusConsumedQueue() {} + +// IncGovernorStatusProcessed dummy implementation. +func (d *DummyMetrics) IncGovernorStatusProcessed(node string, address string) {} + +// IncGovernorStatusFailed dummy implementation. +func (d *DummyMetrics) IncGovernorStatusFailed(node string, address string) {} + +// IncGovernorStatusExpired dummy implementation. +func (d *DummyMetrics) IncGovernorStatusExpired(node string, address string) {} + +// IncGovernorVaaAdded dummy implementation. +func (d *DummyMetrics) IncGovernorVaaAdded(chainID sdk.ChainID) {} + +// IndGovenorVaaDeleted dummy implementation. +func (d *DummyMetrics) IndGovenorVaaDeleted(chainID sdk.ChainID) {} diff --git a/fly-event-processor/internal/metrics/metrics.go b/fly-event-processor/internal/metrics/metrics.go index 953c69db..bce77d3c 100644 --- a/fly-event-processor/internal/metrics/metrics.go +++ b/fly-event-processor/internal/metrics/metrics.go @@ -10,4 +10,18 @@ type Metrics interface { IncDuplicatedVaaFailed(chainID sdk.ChainID) IncDuplicatedVaaExpired(chainID sdk.ChainID) IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) + IncGovernorStatusConsumedQueue() + IncGovernorStatusProcessed(node string, address string) + IncGovernorStatusFailed(node string, address string) + IncGovernorStatusExpired(node string, address string) + IncGovernorVaaAdded(chainID sdk.ChainID) + IndGovenorVaaDeleted(chainID sdk.ChainID) } + +// IncDuplicatedVaaConsumedQueue increments the counter of consumed queue +type IncConsumedQueue func() + +/* +// ProcessorFunc is a function to process a governor message. +type ProcessorFunc func(context.Context, *Params) error +*/ diff --git a/fly-event-processor/internal/metrics/prometheus.go b/fly-event-processor/internal/metrics/prometheus.go index 619b9ac0..bf9e2a4a 100644 --- a/fly-event-processor/internal/metrics/prometheus.go +++ b/fly-event-processor/internal/metrics/prometheus.go @@ -8,7 +8,9 @@ import ( // PrometheusMetrics is a Prometheus implementation of Metric interface. type PrometheusMetrics struct { - duplicatedVaaCount *prometheus.CounterVec + duplicatedVaaCount *prometheus.CounterVec + governorStatusCount *prometheus.CounterVec + governorVaaCount *prometheus.CounterVec } // NewPrometheusMetrics returns a new instance of PrometheusMetrics. @@ -23,29 +25,84 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics { "service": serviceName, }, }, []string{"chain", "type"}), + governorStatusCount: promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "wormscan_fly_event_processor_governor_status_count", + Help: "The total number of governor status processed", + ConstLabels: map[string]string{ + "environment": environment, + "service": serviceName, + }, + }, []string{"node", "address", "type"}), + governorVaaCount: promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "wormscan_fly_event_processor_governor_vaa_count", + Help: "The total number of governor VAA processed", + ConstLabels: map[string]string{ + "environment": environment, + "service": serviceName, + }, + }, []string{"chain", "type"}), } } +// IncDuplicatedVaaConsumedQueue increments the total number of duplicated VAA consumed queue. func (m *PrometheusMetrics) IncDuplicatedVaaConsumedQueue() { m.duplicatedVaaCount.WithLabelValues("all", "consumed_queue").Inc() } +// IncDuplicatedVaaProcessed increments the total number of duplicated VAA processed. func (m *PrometheusMetrics) IncDuplicatedVaaProcessed(chainID sdk.ChainID) { chain := chainID.String() m.duplicatedVaaCount.WithLabelValues(chain, "processed").Inc() } +// IncDuplicatedVaaFailed increments the total number of duplicated VAA failed. func (m *PrometheusMetrics) IncDuplicatedVaaFailed(chainID sdk.ChainID) { chain := chainID.String() m.duplicatedVaaCount.WithLabelValues(chain, "failed").Inc() } +// IncDuplicatedVaaExpired increments the total number of duplicated VAA expired. func (m *PrometheusMetrics) IncDuplicatedVaaExpired(chainID sdk.ChainID) { chain := chainID.String() m.duplicatedVaaCount.WithLabelValues(chain, "expired").Inc() } +// IncDuplicatedVaaCanNotFixed increments the total number of duplicated VAA can not fixed. func (m *PrometheusMetrics) IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) { chain := chainID.String() m.duplicatedVaaCount.WithLabelValues(chain, "can_not_fixed").Inc() } + +// IncGovernorStatusConsumedQueue increments the total number of governor status consumed queue. +func (m *PrometheusMetrics) IncGovernorStatusConsumedQueue() { + m.governorStatusCount.WithLabelValues("all", "", "consumed_queue").Inc() +} + +// IncGovernorStatusProcessed increments the total number of governor status processed. +func (m *PrometheusMetrics) IncGovernorStatusProcessed(node string, address string) { + m.governorStatusCount.WithLabelValues(node, address, "processed").Inc() +} + +// IncGovernorStatusFailed increments the total number of governor status failed. +func (m *PrometheusMetrics) IncGovernorStatusFailed(node string, address string) { + m.governorStatusCount.WithLabelValues(node, address, "failed").Inc() +} + +// IncGovernorStatusExpired increments the total number of governor status expired. +func (m *PrometheusMetrics) IncGovernorStatusExpired(node string, address string) { + m.governorStatusCount.WithLabelValues(node, address, "expired").Inc() +} + +// IncGovernorVaaAdded increments the total number of governor VAA added. +func (m *PrometheusMetrics) IncGovernorVaaAdded(chainID sdk.ChainID) { + chain := chainID.String() + m.governorVaaCount.WithLabelValues(chain, "added").Inc() +} + +// IndGovenorVaaDeleted increments the total number of governor VAA deleted. +func (m *PrometheusMetrics) IndGovenorVaaDeleted(chainID sdk.ChainID) { + chain := chainID.String() + m.governorVaaCount.WithLabelValues(chain, "deleted").Inc() +} diff --git a/fly-event-processor/processor/governor/processor.go b/fly-event-processor/processor/governor/processor.go new file mode 100644 index 00000000..1b371d25 --- /dev/null +++ b/fly-event-processor/processor/governor/processor.go @@ -0,0 +1,320 @@ +package governor + +import ( + "context" + "errors" + "fmt" + + txTracker "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/domain" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage" + "go.uber.org/zap" +) + +// Processor is a governor processor. +type Processor struct { + repository *storage.Repository + createTxHashFunc txTracker.CreateTxHashFunc + logger *zap.Logger + metrics metrics.Metrics +} + +// NewProcessor creates a new governor processor. +func NewProcessor( + repository *storage.Repository, + createTxHashFunc txTracker.CreateTxHashFunc, + logger *zap.Logger, + metrics metrics.Metrics, +) *Processor { + + return &Processor{ + repository: repository, + createTxHashFunc: createTxHashFunc, + logger: logger, + metrics: metrics, + } +} + +// Process processes a governor event. +func (p *Processor) Process( + ctx context.Context, + params *Params) error { + + logger := p.logger.With( + zap.String("trackId", params.TrackID), + ) + + // 1. Check if the event is valid. + if params.NodeGovernorVaa == nil { + logger.Info("event is nil") + return errors.New("event cannot be nil") + } + node := params.NodeGovernorVaa.Node + if node.Address == "" { + logger.Info("node is invalid") + return errors.New("node is invalid") + } + + // 2. Get new and current governorVaa by node. + newNodeGovernorVaas := params.NodeGovernorVaa + nodeGovernorVaaIds, err := p.getNodeGovernorVaaIds(ctx, node, logger) + if err != nil { + logger.Error("failed to get current governorVaa", + zap.Error(err), + zap.String("nodeAddress", node.Address)) + return err + } + + // 3. Get nodeGovernorVaa to add and delete. + nodeGovernorVaasToAdd := getNodeGovernorVaasToAdd( + newNodeGovernorVaas.GovernorVaas, nodeGovernorVaaIds) + nodeGovernorVaaIdsToDelete := getNodeGovernorVaasToDelete( + newNodeGovernorVaas.GovernorVaas, nodeGovernorVaaIds) + + // 4. Get governorVaa to add and delete. + governorVaasToAdd, err := p.getGovernorVaaToAdd(ctx, nodeGovernorVaasToAdd, logger) + if err != nil { + logger.Error("failed to get governorVaa to insert", + zap.Error(err), + zap.String("nodeAddress", node.Address)) + return err + } + governorVaaIdsToDelete, err := p.getGovernorVaaToDelete(ctx, node, nodeGovernorVaaIdsToDelete, logger) + if err != nil { + logger.Error("failed to get governorVaa to delete", + zap.Error(err), + zap.String("nodeAddress", node.Address)) + return err + } + + // 5. Check if there are no changes in governor. + changeNodeGovernorVaas := len(nodeGovernorVaasToAdd) > 0 || len(nodeGovernorVaaIdsToDelete) > 0 + changeGovernorVaas := len(governorVaasToAdd) > 0 || len(governorVaaIdsToDelete) > 0 + if !changeNodeGovernorVaas && !changeGovernorVaas { + logger.Info("no changes in governor", + zap.String("nodeAddress", node.Address)) + return nil + } + + // 6. Update governor data for the node. + err = p.updateGovernor(ctx, + node, + nodeGovernorVaasToAdd, + nodeGovernorVaaIdsToDelete, + governorVaasToAdd, + governorVaaIdsToDelete) + if err != nil { + logger.Error("failed to update governorVaa", + zap.Error(err), + zap.String("nodeAddress", node.Address), + zap.String("node", node.Name)) + return err + } + + return nil +} + +// getNodeGovernorVaaIds gets the current governor vaaIds stored in the database by node address. +func (p *Processor) getNodeGovernorVaaIds( + ctx context.Context, + node domain.Node, + logger *zap.Logger, +) (Set[string], error) { + + // get current nodeGovernorVaa by nodeAddress. + nodeGovernorVaaDoc, err := p.repository.FindNodeGovernorVaaByNodeAddress(ctx, node.Address) + if err != nil { + logger.Error("failed to find nodeGovernorVaa by nodeAddress", + zap.Error(err), + zap.String("nodeAddress", node.Address)) + return Set[string]{}, err + } + + // convert nodeGovernorVaaDoc to Set[string] + nodeGovernorVaaId := make(Set[string]) + for _, governorVaaDoc := range nodeGovernorVaaDoc { + nodeGovernorVaaId.Add(governorVaaDoc.VaaID) + } + return nodeGovernorVaaId, nil +} + +// getNodeGovernorVaasToAdd gets the node governor vaas to add. +func getNodeGovernorVaasToAdd( + newNodeGovernorVaas map[string]domain.GovernorVaa, + nodeGovernorVaaIds Set[string], +) map[string]domain.GovernorVaa { + + nodeGovernorVaasToAdd := make(map[string]domain.GovernorVaa) + for vaaID, governorVaa := range newNodeGovernorVaas { + if ok := nodeGovernorVaaIds.Contains(vaaID); !ok { + nodeGovernorVaasToAdd[vaaID] = governorVaa + } + } + return nodeGovernorVaasToAdd +} + +// getNodeGovernorVaasToDelete gets the node governor vaas to delete. +func getNodeGovernorVaasToDelete( + newNodeGovernorVaas map[string]domain.GovernorVaa, + nodeGovernorVaaIds Set[string], +) Set[string] { + + nodeGovernorVaasToDelete := make(Set[string]) + for vaaID := range nodeGovernorVaaIds { + if _, ok := newNodeGovernorVaas[vaaID]; !ok { + nodeGovernorVaasToDelete.Add(vaaID) + } + } + return nodeGovernorVaasToDelete +} + +// getGovernorVaaToAdd gets the governor vaas to add. +func (p *Processor) getGovernorVaaToAdd( + ctx context.Context, + nodeGovernorVaas map[string]domain.GovernorVaa, + logger *zap.Logger, +) ([]domain.GovernorVaa, error) { + + // get vaaIDs from the nodeGovernorVaas. + vaaIds := make([]string, 0, len(nodeGovernorVaas)) + for vaaId, _ := range nodeGovernorVaas { + vaaIds = append(vaaIds, vaaId) + } + + // get governoVaas already added by vaaIDs. + governorVaas, err := p.repository.FindGovernorVaaByVaaIDs(ctx, vaaIds) + if err != nil { + logger.Error("failed to find governor vaas by a list of vaaIDs", + zap.Error(err), + zap.Strings("vaaIDs", vaaIds)) + return nil, err + } + if len(vaaIds) < len(governorVaas) { + logger.Error("failed to find governorVaa by a list of vaaIDs", + zap.Error(err), + zap.Strings("vaaIDs", vaaIds)) + return nil, errors.New("failed to find governorVaa by vaaIDs") + } + + // check if all the governorVaa are already added + if len(vaaIds) == len(governorVaas) { + return nil, nil + } + + // convert governorVaas to a set of vaaIDs. + governorVaaIds := make(Set[string]) + for _, governorVaa := range governorVaas { + governorVaaIds.Add(governorVaa.ID) + } + + // get governorVaa to insert + var governorVaasToInsert []domain.GovernorVaa + for vaaID, governorVaa := range nodeGovernorVaas { + if ok := governorVaaIds.Contains(vaaID); !ok { + // fix governor vaa txHash + txHash, err := p.createTxHashFunc(governorVaa.ID, governorVaa.TxHash) + if err != nil { + logger.Error("failed to create txHash", + zap.Error(err), + zap.String("vaaID", governorVaa.ID), + zap.String("txHash", governorVaa.TxHash)) + return nil, err + } + governorVaa.TxHash = txHash.NativeTxHash + governorVaasToInsert = append(governorVaasToInsert, governorVaa) + } + } + + return governorVaasToInsert, nil +} + +// getGovernorVaaToDelete gets the governor vaas to delete. +func (p *Processor) getGovernorVaaToDelete( + ctx context.Context, + node domain.Node, + nodeGovernorVaaIds Set[string], + logger *zap.Logger, +) (Set[string], error) { + + // get vaaIDs from the nodeGovernorVaaIds. + vaaIds := make([]string, 0, nodeGovernorVaaIds.Len()) + for vaaID := range nodeGovernorVaaIds { + vaaIds = append(vaaIds, vaaID) + } + + // nodeGovernorVaas contains all the node governor vaas that have the same vaaID. + nodeGovernorVaas, err := p.repository.FindNodeGovernorVaaByVaaIDs(ctx, vaaIds) + if err != nil { + logger.Error("failed to find governorVaa by vaaIDs", + zap.Error(err), + zap.Strings("vaaIDs", vaaIds)) + return nil, err + } + + // nodeAddressByVaaId contains all the node address grouped by vaaID. + nodeAddressByVaaId := make(map[string][]string) + for _, n := range nodeGovernorVaas { + if _, ok := nodeAddressByVaaId[n.VaaID]; !ok { + nodeAddressByVaaId[n.VaaID] = make([]string, 0) + } + nodeAddressByVaaId[n.VaaID] = append(nodeAddressByVaaId[n.VaaID], n.NodeAddress) + } + + // get governorVaa to delete + governorVaaToDelete := make(Set[string]) + for vaaID, nodeAddresses := range nodeAddressByVaaId { + deleteGovernorVaa := len(nodeAddresses) == 1 && node.Address == nodeAddresses[0] + if deleteGovernorVaa { + governorVaaToDelete.Add(vaaID) + } + } + + return governorVaaToDelete, nil +} + +func (p *Processor) updateGovernor(ctx context.Context, + node domain.Node, + nodeGovernorVaasToAdd map[string]domain.GovernorVaa, + nodeGovernorVaaIdsToDelete Set[string], + governorVaasToAdd []domain.GovernorVaa, + governorVaaIdsToDelete Set[string]) error { + + // convert nodeGovernorVaasToAdd to []storage.NodeGovernorVaaDoc + var nodeGovernorVaasToAddDoc []storage.NodeGovernorVaaDoc + for vaaID, _ := range nodeGovernorVaasToAdd { + nodeGovernorVaasToAddDoc = append(nodeGovernorVaasToAddDoc, storage.NodeGovernorVaaDoc{ + ID: fmt.Sprintf("%s-%s", node.Address, vaaID), + NodeName: node.Name, + NodeAddress: node.Address, + VaaID: vaaID, + }) + } + + // convert governorVaasToAdd to []storage.GovernorVaaDoc + var governorVaasToAddDoc []storage.GovernorVaaDoc + for _, governorVaa := range governorVaasToAdd { + governorVaasToAddDoc = append(governorVaasToAddDoc, storage.GovernorVaaDoc{ + ID: governorVaa.ID, + ChainID: governorVaa.ChainID, + EmitterAddress: governorVaa.EmitterAddress, + Sequence: governorVaa.Sequence, + TxHash: governorVaa.TxHash, + ReleaseTime: governorVaa.ReleaseTime, + Amount: storage.Uint64(governorVaa.Amount), + }) + } + + // convert nodeGovernorVaas vaaIds to ids + var nodeGovVaaIdsToDelete []string + for vaaID := range nodeGovernorVaaIdsToDelete { + nodeGovVaaIdsToDelete = append(nodeGovVaaIdsToDelete, fmt.Sprintf("%s-%s", node.Address, vaaID)) + } + + return p.repository.UpdateGovernor( + ctx, + nodeGovernorVaasToAddDoc, + nodeGovVaaIdsToDelete, + governorVaasToAddDoc, + governorVaaIdsToDelete.ToSlice()) +} diff --git a/fly-event-processor/processor/governor/types.go b/fly-event-processor/processor/governor/types.go new file mode 100644 index 00000000..6d64e5ae --- /dev/null +++ b/fly-event-processor/processor/governor/types.go @@ -0,0 +1,43 @@ +package governor + +import ( + "context" + + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/domain" +) + +// Set generic type definition. +type Set[T comparable] map[T]struct{} + +// add a value to the set +func (s Set[T]) Add(v T) { + s[v] = struct{}{} +} + +// check if the set contains a value +func (s Set[T]) Contains(v T) bool { + _, ok := s[v] + return ok +} + +// get length of the set +func (s Set[T]) Len() int { + return len(s) +} + +// to slice +func (s Set[T]) ToSlice() []T { + var slice []T + for k := range s { + slice = append(slice, k) + } + return slice +} + +type Params struct { + TrackID string + NodeGovernorVaa *domain.NodeGovernorVaa +} + +// ProcessorFunc is a function to process a governor message. +type ProcessorFunc func(context.Context, *Params) error diff --git a/fly-event-processor/processor/processor.go b/fly-event-processor/processor/vaa/processor.go similarity index 99% rename from fly-event-processor/processor/processor.go rename to fly-event-processor/processor/vaa/processor.go index fc56d69a..5e763f93 100644 --- a/fly-event-processor/processor/processor.go +++ b/fly-event-processor/processor/vaa/processor.go @@ -1,4 +1,4 @@ -package processor +package vaa import ( "errors" diff --git a/fly-event-processor/processor/types.go b/fly-event-processor/processor/vaa/types.go similarity index 94% rename from fly-event-processor/processor/types.go rename to fly-event-processor/processor/vaa/types.go index c22032ed..df793b27 100644 --- a/fly-event-processor/processor/types.go +++ b/fly-event-processor/processor/vaa/types.go @@ -1,4 +1,4 @@ -package processor +package vaa import ( sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" diff --git a/fly-event-processor/queue/event_sqs.go b/fly-event-processor/queue/event_sqs.go index 472ae1a7..94da49af 100644 --- a/fly-event-processor/queue/event_sqs.go +++ b/fly-event-processor/queue/event_sqs.go @@ -13,42 +13,46 @@ import ( ) // SQSOption represents a VAA queue in SQS option function. -type SQSOption func(*SQS) +type SQSOption[T Event] func(*SQS[T]) // SQS represents a VAA queue in SQS. -type SQS struct { - consumer *sqs_client.Consumer - ch chan ConsumerMessage - chSize int - wg sync.WaitGroup - metrics metrics.Metrics - logger *zap.Logger +type SQS[T Event] struct { + consumer *sqs_client.Consumer + ch chan ConsumerMessage[T] + chSize int + wg sync.WaitGroup + incConsumedQueueFunc metrics.IncConsumedQueue + logger *zap.Logger } // NewEventSqs creates a VAA queue in SQS instances. -func NewEventSqs(consumer *sqs_client.Consumer, metrics metrics.Metrics, logger *zap.Logger, opts ...SQSOption) *SQS { - s := &SQS{ - consumer: consumer, - chSize: 10, - metrics: metrics, - logger: logger.With(zap.String("queueUrl", consumer.GetQueueUrl())), +func NewEventSqs[T Event]( + consumer *sqs_client.Consumer, + incConsumedQueueFunc metrics.IncConsumedQueue, + logger *zap.Logger, + opts ...SQSOption[T]) *SQS[T] { + s := &SQS[T]{ + consumer: consumer, + chSize: 10, + incConsumedQueueFunc: incConsumedQueueFunc, + logger: logger.With(zap.String("queueUrl", consumer.GetQueueUrl())), } for _, opt := range opts { opt(s) } - s.ch = make(chan ConsumerMessage, s.chSize) + s.ch = make(chan ConsumerMessage[T], s.chSize) return s } // WithChannelSize allows to specify an channel size when setting a value. -func WithChannelSize(size int) SQSOption { - return func(d *SQS) { +func WithChannelSize[T Event](size int) SQSOption[T] { + return func(d *SQS[T]) { d.chSize = size } } // Consume returns the channel with the received messages from SQS queue. -func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { +func (q *SQS[T]) Consume(ctx context.Context) <-chan ConsumerMessage[T] { go func() { for { messages, err := q.consumer.GetMessages(ctx) @@ -60,7 +64,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout()) for _, msg := range messages { - q.metrics.IncDuplicatedVaaConsumedQueue() + q.incConsumedQueueFunc() // unmarshal body to sqsEvent var sqsEvent sqsEvent err := json.Unmarshal([]byte(*msg.Body), &sqsEvent) @@ -72,7 +76,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { continue } - var event Event + var event T err = json.Unmarshal([]byte(sqsEvent.Message), &event) if err != nil { q.logger.Error("Error decoding message from SQS", zap.String("body", sqsEvent.Message), zap.Error(err)) @@ -84,15 +88,14 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"]) q.wg.Add(1) - q.ch <- &sqsConsumerMessage{ + q.ch <- &sqsConsumerMessage[T]{ id: msg.ReceiptHandle, - data: &event, + data: event, wg: &q.wg, logger: q.logger, consumer: q.consumer, expiredAt: expiredAt, retry: uint8(retry), - metrics: q.metrics, ctx: ctx, } } @@ -104,30 +107,24 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { } // Close closes all consumer resources. -func (q *SQS) Close() { +func (q *SQS[T]) Close() { close(q.ch) } -type sqsConsumerMessage struct { - data *Event +type sqsConsumerMessage[T Event] struct { + data T consumer *sqs_client.Consumer wg *sync.WaitGroup id *string logger *zap.Logger expiredAt time.Time retry uint8 - metrics metrics.Metrics ctx context.Context } -func (m *sqsConsumerMessage) Data() *Event { - return m.data -} - -func (m *sqsConsumerMessage) Done() { +func (m *sqsConsumerMessage[T]) Done() { if err := m.consumer.DeleteMessage(m.ctx, m.id); err != nil { m.logger.Error("Error deleting message from SQS", - zap.String("vaaId", m.data.Data.VaaID), zap.Bool("isExpired", m.IsExpired()), zap.Time("expiredAt", m.expiredAt), zap.Error(err), @@ -136,14 +133,18 @@ func (m *sqsConsumerMessage) Done() { m.wg.Done() } -func (m *sqsConsumerMessage) Failed() { +func (m *sqsConsumerMessage[T]) Data() T { + return m.data +} + +func (m *sqsConsumerMessage[T]) Failed() { m.wg.Done() } -func (m *sqsConsumerMessage) IsExpired() bool { +func (m *sqsConsumerMessage[T]) IsExpired() bool { return m.expiredAt.Before(time.Now()) } -func (m *sqsConsumerMessage) Retry() uint8 { +func (m *sqsConsumerMessage[T]) Retry() uint8 { return m.retry } diff --git a/fly-event-processor/queue/types.go b/fly-event-processor/queue/types.go index 5204a352..c777cf0c 100644 --- a/fly-event-processor/queue/types.go +++ b/fly-event-processor/queue/types.go @@ -5,20 +5,31 @@ import ( "time" ) +const ( + DeduplicateVaaEventType = "duplicated-vaa" + GovernorStatusEventType = "governor-status" +) + // sqsEvent represents a event data from SQS. type sqsEvent struct { MessageID string `json:"MessageId"` Message string `json:"Message"` } -// Event represents a event data to be handle. -type Event struct { +// Event represents a event data. +type Event interface { + EventDuplicateVaa | EventGovernorStatus +} + +// EventDuplicateVaa defition. +type EventDuplicateVaa struct { TrackID string `json:"trackId"` Type string `json:"type"` Source string `json:"source"` Data DuplicateVaa `json:"data"` } +// DuplicateVaa defition. type DuplicateVaa struct { VaaID string `json:"vaaId"` ChainID uint16 `json:"chainId"` @@ -30,14 +41,53 @@ type DuplicateVaa struct { Timestamp *time.Time `json:"timestamp"` } +// EventGovernorStatus defition. +type EventGovernorStatus struct { + TrackID string `json:"trackId"` + Type string `json:"type"` + Source string `json:"source"` + Data GovernorStatus `json:"data"` +} + +// GovernorStatus defition. +type GovernorStatus struct { + NodeAddress string `json:"nodeAddress"` + NodeName string `json:"nodeName"` + Counter int64 `json:"counter"` + Timestamp int64 `json:"timestamp"` + Chains []*ChainStatus `json:"chains"` +} + +// ChainStatus defition. +type ChainStatus struct { + ChainId uint32 `json:"chainId"` + RemainingAvailableNotional uint64 `json:"remainingAvailableNotional"` + Emitters []*Emitter `json:"emitters"` +} + +// Emitter defition. +type Emitter struct { + EmitterAddress string `bson:"emitteraddress" json:"emitterAddress"` + TotalEnqueuedVaas uint64 `bson:"totalenqueuedvaas" json:"totalEnqueuedVaas"` + EnqueuedVaas []*EnqueuedVAA `bson:"enqueuedvaas" json:"enqueuedVaas"` +} + +// EnqueuedVAA defition. +type EnqueuedVAA struct { + Sequence string `bson:"sequence" json:"sequence"` + ReleaseTime uint64 `bson:"releasetime" json:"releaseTime"` + NotionalValue uint64 `bson:"notionalvalue" json:"notionalValue"` + TxHash string `bson:"txhash" json:"txHash"` +} + // ConsumerMessage defition. -type ConsumerMessage interface { +type ConsumerMessage[T any] interface { Retry() uint8 - Data() *Event + Data() T Done() Failed() IsExpired() bool } // ConsumeFunc is a function to consume Event. -type ConsumeFunc func(context.Context) <-chan ConsumerMessage +type ConsumeFunc[T any] func(context.Context) <-chan ConsumerMessage[T] diff --git a/fly-event-processor/storage/repository.go b/fly-event-processor/storage/repository.go index 132d7bd9..a9ebde97 100644 --- a/fly-event-processor/storage/repository.go +++ b/fly-event-processor/storage/repository.go @@ -11,17 +11,21 @@ import ( // Repository exposes operations over the `globalTransactions` collection. type Repository struct { - logger *zap.Logger - vaas *mongo.Collection - duplicateVaas *mongo.Collection + logger *zap.Logger + vaas *mongo.Collection + duplicateVaas *mongo.Collection + nodeGovernorVaas *mongo.Collection + governorVaas *mongo.Collection } // New creates a new repository. func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository { r := Repository{ - logger: logger, - vaas: db.Collection(commonRepo.Vaas), - duplicateVaas: db.Collection(commonRepo.DuplicateVaas), + logger: logger, + vaas: db.Collection(commonRepo.Vaas), + duplicateVaas: db.Collection(commonRepo.DuplicateVaas), + nodeGovernorVaas: db.Collection(commonRepo.NodeGovernorVaas), + governorVaas: db.Collection(commonRepo.GovernorVaas), } return &r } @@ -125,3 +129,127 @@ func (r *Repository) FixVAA(ctx context.Context, vaaID, duplicateID string) erro return nil } + +// FindNodeGovernorVaaByNodeAddress find governor vaas by node address. +func (r *Repository) FindNodeGovernorVaaByNodeAddress(ctx context.Context, nodeAddress string) ([]NodeGovernorVaaDoc, error) { + var nodeGovernorVaa []NodeGovernorVaaDoc + cursor, err := r.nodeGovernorVaas.Find(ctx, bson.M{"nodeAddress": nodeAddress}) + if err != nil { + return nil, err + } + if err = cursor.All(ctx, &nodeGovernorVaa); err != nil { + return nil, err + } + return nodeGovernorVaa, nil +} + +// FindNodeGovernorVaaByVaaID find governor vaas by vaa id. +func (r *Repository) FindNodeGovernorVaaByVaaID(ctx context.Context, vaaID string) ([]NodeGovernorVaaDoc, error) { + var nodeGovernorVaa []NodeGovernorVaaDoc + cursor, err := r.nodeGovernorVaas.Find(ctx, bson.M{"vaaId": vaaID}) + if err != nil { + return nil, err + } + if err = cursor.All(ctx, &nodeGovernorVaa); err != nil { + return nil, err + } + return nodeGovernorVaa, nil +} + +// FindNodeGovernorVaaByVaaIDs find governor vaas by vaa ids. +func (r *Repository) FindNodeGovernorVaaByVaaIDs(ctx context.Context, vaaID []string) ([]NodeGovernorVaaDoc, error) { + var nodeGovernorVaa []NodeGovernorVaaDoc + cursor, err := r.nodeGovernorVaas.Find(ctx, bson.M{"vaaId": bson.M{"$in": vaaID}}) + if err != nil { + return nil, err + } + if err = cursor.All(ctx, &nodeGovernorVaa); err != nil { + return nil, err + } + return nodeGovernorVaa, nil +} + +// FindGovernorVaaByVaaID find governor vaas by a list of vaaIds +func (r *Repository) FindGovernorVaaByVaaIDs(ctx context.Context, vaaID []string) ([]GovernorVaaDoc, error) { + var governorVaa []GovernorVaaDoc + cursor, err := r.governorVaas.Find(ctx, bson.M{"_id": bson.M{"$in": vaaID}}) + if err != nil { + return nil, err + } + if err = cursor.All(ctx, &governorVaa); err != nil { + return nil, err + } + return governorVaa, nil +} + +func (r *Repository) UpdateGovernor( + ctx context.Context, + nodeGovernorVaaDocToInsert []NodeGovernorVaaDoc, + nodeGovernorVaaDocToDelete []string, + governorVaasToInsert []GovernorVaaDoc, + governorVaaIdsToDelete []string) error { + + // 1. start mongo transaction + session, err := r.vaas.Database().Client().StartSession() + if err != nil { + return err + } + + err = session.StartTransaction() + if err != nil { + return err + } + + // 2. insert node governor vaas. + if len(nodeGovernorVaaDocToInsert) > 0 { + var nodeGovVaadocs []interface{} + for _, doc := range nodeGovernorVaaDocToInsert { + nodeGovVaadocs = append(nodeGovVaadocs, doc) + } + _, err = r.nodeGovernorVaas.InsertMany(ctx, nodeGovVaadocs) + if err != nil { + session.AbortTransaction(ctx) + return err + } + } + + // 3. delete node governor vaas. + if len(nodeGovernorVaaDocToDelete) > 0 { + _, err = r.nodeGovernorVaas.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": nodeGovernorVaaDocToDelete}}) + if err != nil { + session.AbortTransaction(ctx) + return err + } + } + + // 4. insert governor vaas. + if len(governorVaasToInsert) > 0 { + var govVaaDocs []interface{} + for _, doc := range governorVaasToInsert { + govVaaDocs = append(govVaaDocs, doc) + } + _, err = r.governorVaas.InsertMany(ctx, govVaaDocs) + if err != nil { + session.AbortTransaction(ctx) + return err + } + } + + // 5. delete governor vaas. + if len(governorVaaIdsToDelete) > 0 { + _, err = r.governorVaas.DeleteMany(ctx, bson.M{"_id": bson.M{"$in": governorVaaIdsToDelete}}) + if err != nil { + session.AbortTransaction(ctx) + return err + } + } + + // 6. commit transaction + err = session.CommitTransaction(ctx) + if err != nil { + session.AbortTransaction(ctx) + return err + } + + return nil +} diff --git a/fly-event-processor/storage/types.go b/fly-event-processor/storage/types.go index 47b8852c..46ade614 100644 --- a/fly-event-processor/storage/types.go +++ b/fly-event-processor/storage/types.go @@ -3,9 +3,15 @@ package storage import ( "time" + "errors" + "strconv" + "github.com/wormhole-foundation/wormhole-explorer/common/domain" - "github.com/wormhole-foundation/wormhole/sdk/vaa" sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + + "go.mongodb.org/mongo-driver/bson/bsontype" + "go.mongodb.org/mongo-driver/bson/primitive" + "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" ) // VaaDoc represents a VAA document. @@ -43,6 +49,23 @@ type DuplicateVaaDoc struct { UpdatedAt *time.Time `bson:"updatedAt"` } +type NodeGovernorVaaDoc struct { + ID string `bson:"_id"` + NodeName string `bson:"nodeName"` + NodeAddress string `bson:"nodeAddress"` + VaaID string `bson:"vaaId"` +} + +type GovernorVaaDoc struct { + ID string `bson:"_id"` + ChainID sdk.ChainID `bson:"chainId"` + EmitterAddress string `bson:"emitterAddress"` + Sequence string `bson:"sequence"` + TxHash string `bson:"txHash"` + ReleaseTime time.Time `bson:"releaseTime"` + Amount Uint64 `bson:"amount"` +} + func (d *DuplicateVaaDoc) ToVaaDoc(duplicatedFixed bool) *VaaDoc { return &VaaDoc{ ID: d.VaaID, @@ -63,7 +86,7 @@ func (d *DuplicateVaaDoc) ToVaaDoc(duplicatedFixed bool) *VaaDoc { } func (v *VaaDoc) ToDuplicateVaaDoc() (*DuplicateVaaDoc, error) { - vaa, err := vaa.Unmarshal(v.Vaa) + vaa, err := sdk.Unmarshal(v.Vaa) if err != nil { return nil, err } @@ -85,3 +108,26 @@ func (v *VaaDoc) ToDuplicateVaaDoc() (*DuplicateVaaDoc, error) { UpdatedAt: v.UpdatedAt, }, nil } + +type Uint64 uint64 + +func (u Uint64) MarshalBSONValue() (bsontype.Type, []byte, error) { + ui64Str := strconv.FormatUint(uint64(u), 10) + d128, err := primitive.ParseDecimal128(ui64Str) + return bsontype.Decimal128, bsoncore.AppendDecimal128(nil, d128), err +} + +func (u *Uint64) UnmarshalBSONValue(t bsontype.Type, b []byte) error { + d128, _, ok := bsoncore.ReadDecimal128(b) + if !ok { + return errors.New("Uint64 UnmarshalBSONValue error") + } + + ui64, err := strconv.ParseUint(d128.String(), 10, 64) + if err != nil { + return err + } + + *u = Uint64(ui64) + return nil +} diff --git a/fly/event/noop.go b/fly/event/noop.go index a73b644b..f72a1527 100644 --- a/fly/event/noop.go +++ b/fly/event/noop.go @@ -11,3 +11,7 @@ func NewNoopEventDispatcher() *NoopEventDispatcher { func (n *NoopEventDispatcher) NewDuplicateVaa(context.Context, DuplicateVaa) error { return nil } + +func (n *NoopEventDispatcher) NewGovernorStatus(context.Context, GovernorStatus) error { + return nil +} diff --git a/fly/event/sns.go b/fly/event/sns.go index c2c64722..cc3524c4 100644 --- a/fly/event/sns.go +++ b/fly/event/sns.go @@ -10,6 +10,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" aws_sns "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/aws/aws-sdk-go-v2/service/sns/types" "github.com/wormhole-foundation/wormhole-explorer/fly/internal/track" ) @@ -26,6 +27,12 @@ func NewSnsEventDispatcher(awsConfig aws.Config, url string) (*SnsEventDispatche } func (s *SnsEventDispatcher) NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error { + attrs := map[string]types.MessageAttributeValue{ + "messageType": { + DataType: aws.String("String"), + StringValue: aws.String("duplicated-vaa"), + }, + } body, err := json.Marshal(event{ TrackID: track.GetTrackIDForDuplicatedVAA(e.VaaID), Type: "duplicated-vaa", @@ -42,6 +49,7 @@ func (s *SnsEventDispatcher) NewDuplicateVaa(ctx context.Context, e DuplicateVaa MessageDeduplicationId: aws.String(groupID), Message: aws.String(string(body)), TopicArn: aws.String(s.url), + MessageAttributes: attrs, }) return err } @@ -56,3 +64,31 @@ func createDeduplicationIDForDuplicateVaa(e DuplicateVaa) string { } return deduplicationID } + +func (s *SnsEventDispatcher) NewGovernorStatus(ctx context.Context, e GovernorStatus) error { + attrs := map[string]types.MessageAttributeValue{ + "messageType": { + DataType: aws.String("String"), + StringValue: aws.String("governor"), + }, + } + body, err := json.Marshal(event{ + TrackID: track.GetTrackIDForGovernorStatus(e.NodeName, e.Timestamp), + Type: "governor-status", + Source: "fly", + Data: e, + }) + if err != nil { + return err + } + groupID := fmt.Sprintf("%s-%v", e.NodeAddress, e.Timestamp) + _, err = s.api.Publish(ctx, + &aws_sns.PublishInput{ + MessageGroupId: aws.String(groupID), + MessageDeduplicationId: aws.String(groupID), + Message: aws.String(string(body)), + TopicArn: aws.String(s.url), + MessageAttributes: attrs, + }) + return err +} diff --git a/fly/event/types.go b/fly/event/types.go index 91200e45..57a983dc 100644 --- a/fly/event/types.go +++ b/fly/event/types.go @@ -16,6 +16,14 @@ type DuplicateVaa struct { Timestamp *time.Time `json:"timestamp"` } +type GovernorStatus struct { + NodeAddress string `json:"nodeAddress"` + NodeName string `json:"nodeName"` + Counter int64 `json:"counter"` + Timestamp int64 `json:"timestamp"` + Chains any `json:"chains"` +} + type event struct { TrackID string `json:"trackId"` Type string `json:"type"` @@ -25,4 +33,5 @@ type event struct { type EventDispatcher interface { NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error + NewGovernorStatus(ctx context.Context, e GovernorStatus) error } diff --git a/fly/internal/track/trackid.go b/fly/internal/track/trackid.go index 765c137f..5fb88f22 100644 --- a/fly/internal/track/trackid.go +++ b/fly/internal/track/trackid.go @@ -16,3 +16,8 @@ func GetTrackIDForDuplicatedVAA(vaaID string) string { uuid := uuid.New() return fmt.Sprintf("fly-duplicated-vaa-%s-%s", vaaID, uuid.String()) } + +func GetTrackIDForGovernorStatus(nodeName string, timestamp int64) string { + uuid := uuid.New() + return fmt.Sprintf("fly-governor-status-%s-%v-%s", nodeName, timestamp, uuid.String()) +} diff --git a/fly/migration/migration.go b/fly/migration/migration.go index 5706c4a6..bc7c4997 100644 --- a/fly/migration/migration.go +++ b/fly/migration/migration.go @@ -260,6 +260,14 @@ func Run(db *mongo.Database) error { return err } + // create index in nodeGovernorVaas collection by vaaId. + indexNodeGovernorVaasByVaaId := mongo.IndexModel{ + Keys: bson.D{{Key: "vaaId", Value: 1}}} + _, err = db.Collection("nodeGovernorVaas").Indexes().CreateOne(context.TODO(), indexNodeGovernorVaasByVaaId) + if err != nil && isNotAlreadyExistsError(err) { + return err + } + return nil } diff --git a/fly/storage/documents.go b/fly/storage/documents.go index 20d5f208..f847319b 100644 --- a/fly/storage/documents.go +++ b/fly/storage/documents.go @@ -131,22 +131,22 @@ type GovernorStatusUpdate struct { } type ChainGovernorStatusChain struct { - ChainId uint32 `bson:"chainid"` - RemainingAvailableNotional Uint64 `bson:"remainingavailablenotional"` - Emitters []*ChainGovernorStatusEmitter `bson:"emitters"` + ChainId uint32 `bson:"chainid" json:"chainId"` + RemainingAvailableNotional Uint64 `bson:"remainingavailablenotional" json:"remainingAvailableNotional"` + Emitters []*ChainGovernorStatusEmitter `bson:"emitters" json:"emitters"` } type ChainGovernorStatusEmitter struct { - EmitterAddress string `bson:"emitteraddress"` - TotalEnqueuedVaas Uint64 `bson:"totalenqueuedvaas"` - EnqueuedVaas []*ChainGovernorStatusEnqueuedVAA `bson:"enqueuedvaas"` + EmitterAddress string `bson:"emitteraddress" json:"emitterAddress"` + TotalEnqueuedVaas Uint64 `bson:"totalenqueuedvaas" json:"totalEnqueuedVaas"` + EnqueuedVaas []*ChainGovernorStatusEnqueuedVAA `bson:"enqueuedvaas" json:"enqueuedVaas"` } type ChainGovernorStatusEnqueuedVAA struct { - Sequence string `bson:"sequence"` - ReleaseTime uint32 `bson:"releasetime"` - NotionalValue Uint64 `bson:"notionalvalue"` - TxHash string `bson:"txhash"` + Sequence string `bson:"sequence" json:"sequence"` + ReleaseTime uint32 `bson:"releasetime" json:"releaseTime"` + NotionalValue Uint64 `bson:"notionalvalue" json:"notionalValue"` + TxHash string `bson:"txhash" json:"txHash"` } type ChainGovernorConfigUpdate struct { diff --git a/fly/storage/repository.go b/fly/storage/repository.go index adbc8ec2..47ac7d77 100644 --- a/fly/storage/repository.go +++ b/fly/storage/repository.go @@ -335,8 +335,24 @@ func (s *Repository) UpsertGovernorStatus(govS *gossipv1.SignedChainGovernorStat Error: err2, } s.alertClient.CreateAndSend(context.TODO(), flyAlert.ErrorSaveGovernorStatus, alertContext) + return err2 } - return err2 + + // send governor status to topic. + err3 := s.eventDispatcher.NewGovernorStatus(context.TODO(), event.GovernorStatus{ + NodeAddress: id, + NodeName: status.NodeName, + Counter: status.Counter, + Timestamp: status.Timestamp, + Chains: status.Chains, + }) + + if err3 != nil { + s.log.Error("Error sending governor status to topic", + zap.String("guardian", status.NodeName), + zap.Error(err3)) + } + return err3 } func (s *Repository) updateVAACount(chainID vaa.ChainID) { diff --git a/tx-tracker/consumer/repository.go b/tx-tracker/consumer/repository.go index f514a51f..b44dc520 100644 --- a/tx-tracker/consumer/repository.go +++ b/tx-tracker/consumer/repository.go @@ -82,6 +82,7 @@ func createChangesDoc(source, _type string, timestamp *time.Time) bson.D { } } +// UpsertOriginTx upserts a source transaction document. func (r *Repository) UpsertOriginTx(ctx context.Context, params *UpsertOriginTxParams) error { now := time.Now() @@ -364,3 +365,25 @@ func (r *Repository) GetDocumentsByVaas( return globalTransactions, nil } + +// SourceTxDoc represents a source transaction document. +type SourceTxDoc struct { + ID string `bson:"_id"` + OriginTx *struct { + ChainID int `bson:"chainId"` + Status string `bson:"status"` + Processed bool `bson:"processed"` + NativeTxHash string `bson:"nativeTxHash"` + From string `bson:"from"` + } `bson:"originTx"` +} + +// FindSourceTxById returns the source transaction document with the given ID. +func (r *Repository) FindSourceTxById(ctx context.Context, id string) (*SourceTxDoc, error) { + var sourceTxDoc SourceTxDoc + err := r.globalTransactions.FindOne(ctx, bson.M{"_id": id}).Decode(&sourceTxDoc) + if err != nil { + return nil, err + } + return &sourceTxDoc, err +} diff --git a/tx-tracker/http/infrastructure/server.go b/tx-tracker/http/infrastructure/server.go index 7f701fc3..c45dc2d8 100644 --- a/tx-tracker/http/infrastructure/server.go +++ b/tx-tracker/http/infrastructure/server.go @@ -32,6 +32,7 @@ func NewServer(logger *zap.Logger, port string, pprofEnabled bool, vaaController api.Get("/ready", ctrl.ReadyCheck) api.Post("/vaa/process", vaaController.Process) + api.Post("/vaa/tx-hash", vaaController.CreateTxHash) return &Server{ app: app, diff --git a/tx-tracker/http/vaa/controller.go b/tx-tracker/http/vaa/controller.go index 1e561ef2..b74c17fa 100644 --- a/tx-tracker/http/vaa/controller.go +++ b/tx-tracker/http/vaa/controller.go @@ -1,10 +1,14 @@ package vaa import ( + "encoding/hex" "strconv" + "strings" "github.com/gofiber/fiber/v2" + "github.com/wormhole-foundation/wormhole-explorer/common/domain" "github.com/wormhole-foundation/wormhole-explorer/common/pool" + "github.com/wormhole-foundation/wormhole-explorer/common/utils" "github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer" "github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics" sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" @@ -35,9 +39,7 @@ func NewController(rpcPool map[sdk.ChainID]*pool.Pool, wormchainRpcPool map[sdk. } func (c *Controller) Process(ctx *fiber.Ctx) error { - payload := struct { - ID string `json:"id"` - }{} + var payload ProcessVaaRequest if err := ctx.BodyParser(&payload); err != nil { return err @@ -79,3 +81,72 @@ func (c *Controller) Process(ctx *fiber.Ctx) error { Result any `json:"result"` }{Result: result}) } + +func (c *Controller) CreateTxHash(ctx *fiber.Ctx) error { + + var payload TxHashRequest + + if err := ctx.BodyParser(&payload); err != nil { + return err + } + + txHash, err := hex.DecodeString(utils.Remove0x(payload.TxHash)) + if err != nil { + return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid tx hash", "details": err.Error()}) + } + + c.logger.Info("Processing txHash from endpoint", zap.String("id", payload.ID)) + + vaaID := strings.Split(payload.ID, "/") + if len(vaaID) != 3 { + return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid vaa id"}) + } + + chainIDStr, emitter, sequenceStr := vaaID[0], vaaID[1], vaaID[2] + chainIDUint, err := strconv.ParseUint(chainIDStr, 10, 16) + if err != nil { + return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "chain id is not a number", "details": err.Error()}) + } + chainID := sdk.ChainID(chainIDUint) + if !domain.ChainIdIsValid(chainID) { + return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid chain id"}) + } + + encodedTxHash, err := domain.EncodeTrxHashByChainID(chainID, txHash) + if err != nil { + return ctx.Status(fiber.StatusBadRequest).JSON(fiber.Map{"error": "invalid tx hash", "details": err.Error()}) + } + + if chainID != sdk.ChainIDSolana && chainID != sdk.ChainIDAptos && chainID != sdk.ChainIDWormchain { + return ctx.JSON(TxHashResponse{NativeTxHash: encodedTxHash}) + } + + sourceTx, err := c.repository.FindSourceTxById(ctx.Context(), payload.ID) + if err == nil && sourceTx != nil { + if sourceTx.OriginTx != nil && sourceTx.OriginTx.NativeTxHash != "" { + return ctx.JSON(TxHashResponse{NativeTxHash: sourceTx.OriginTx.NativeTxHash}) + } + } + + p := &consumer.ProcessSourceTxParams{ + TrackID: "controller-tx-hash", + Source: "controller", + Timestamp: nil, + VaaId: payload.ID, + ChainId: chainID, + Emitter: emitter, + Sequence: sequenceStr, + TxHash: encodedTxHash, + IsVaaSigned: false, + Metrics: c.metrics, + Overwrite: true, + DisableDBUpsert: true, + } + + result, err := consumer.ProcessSourceTx(ctx.Context(), c.logger, c.rpcPool, c.wormchainRpcPool, c.repository, p, c.p2pNetwork) + if err != nil { + return err + } + + return ctx.JSON(TxHashResponse{NativeTxHash: result.NativeTxHash}) +} diff --git a/tx-tracker/http/vaa/repository.go b/tx-tracker/http/vaa/repository.go index f064616e..0870443f 100644 --- a/tx-tracker/http/vaa/repository.go +++ b/tx-tracker/http/vaa/repository.go @@ -9,9 +9,10 @@ import ( ) type Repository struct { - db *mongo.Database - logger *zap.Logger - vaas *mongo.Collection + db *mongo.Database + logger *zap.Logger + vaas *mongo.Collection + globalTransactions *mongo.Collection } type VaaDoc struct { @@ -23,8 +24,9 @@ type VaaDoc struct { // NewRepository create a new Repository. func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository { return &Repository{db: db, - logger: logger.With(zap.String("module", "VaaRepository")), - vaas: db.Collection("vaas"), + logger: logger.With(zap.String("module", "VaaRepository")), + vaas: db.Collection("vaas"), + globalTransactions: db.Collection("globalTransactions"), } } diff --git a/tx-tracker/http/vaa/types.go b/tx-tracker/http/vaa/types.go new file mode 100644 index 00000000..9f677934 --- /dev/null +++ b/tx-tracker/http/vaa/types.go @@ -0,0 +1,17 @@ +package vaa + +// ProcessVaaRequest request a vaa to process. +type ProcessVaaRequest struct { + ID string `json:"id"` +} + +// TxHashRequest request a tx hash. +type TxHashRequest struct { + ID string `json:"id"` + TxHash string `json:"txHash"` +} + +// ProcessVaaResponse response from processing a vaa. +type TxHashResponse struct { + NativeTxHash string `json:"nativeTxHash"` +}