diff --git a/fly-event-processor/cmd/service/run.go b/fly-event-processor/cmd/service/run.go index 18980b6a..77a1f812 100644 --- a/fly-event-processor/cmd/service/run.go +++ b/fly-event-processor/cmd/service/run.go @@ -17,19 +17,19 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/common/health" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/common/pool" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/vaa" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/zap" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/config" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer" - consumerRepo "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/consumer" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/infrastructure" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics" ) -type exitCode int - func Run() { rootCtx, rootCtxCancel := context.WithCancel(context.Background()) @@ -48,7 +48,6 @@ func Run() { // create guardian provider pool guardianApiProviderPool, err := newGuardianProviderPool(cfg) - _, err = newGuardianProviderPool(cfg) if err != nil { logger.Fatal("Error creating guardian provider pool: ", zap.Error(err)) } @@ -59,19 +58,24 @@ func Run() { log.Fatal("Failed to initialize MongoDB client: ", err) } - repository := consumerRepo.NewRepository(logger, db.Database) + // create a new repository + repository := storage.NewRepository(logger, db.Database) + + // create a new processor + processor := processor.NewProcessor(guardianApiProviderPool, repository, logger, metrics) // start serving /health and /ready endpoints healthChecks, err := makeHealthChecks(rootCtx, cfg, db.Database) if err != nil { logger.Fatal("Failed to create health checks", zap.Error(err)) } - server := infrastructure.NewServer(logger, cfg.Port, cfg.PprofEnabled, healthChecks...) + vaaCtrl := vaa.NewController(processor.Process, repository, logger) + server := infrastructure.NewServer(logger, cfg.Port, vaaCtrl, cfg.PprofEnabled, healthChecks...) server.Start() // create and start a duplicate VAA consumer. duplicateVaaConsumeFunc := newDuplicateVaaConsumeFunc(rootCtx, cfg, metrics, logger) - duplicateVaa := consumer.New(duplicateVaaConsumeFunc, guardianApiProviderPool, rootCtx, logger, repository, metrics, cfg.P2pNetwork, cfg.ConsumerWorkerSize) + duplicateVaa := consumer.New(duplicateVaaConsumeFunc, processor.Process, logger, metrics, cfg.P2pNetwork, cfg.ConsumerWorkerSize) duplicateVaa.Start(rootCtx) logger.Info("Started wormholescan-fly-event-processor") diff --git a/fly-event-processor/consumer/consumer.go b/fly-event-processor/consumer/consumer.go index dd88787f..e9df0b9b 100644 --- a/fly-event-processor/consumer/consumer.go +++ b/fly-event-processor/consumer/consumer.go @@ -5,16 +5,18 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/common/pool" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor" "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/queue" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) // Consumer consumer struct definition. type Consumer struct { consumeFunc queue.ConsumeFunc + processor processor.ProcessorFunc guardianPool *pool.Pool logger *zap.Logger - repository *Repository metrics metrics.Metrics p2pNetwork string workersSize int @@ -23,23 +25,20 @@ type Consumer struct { // New creates a new vaa consumer. func New( consumeFunc queue.ConsumeFunc, - guardianPool *pool.Pool, - ctx context.Context, + processor processor.ProcessorFunc, logger *zap.Logger, - repository *Repository, metrics metrics.Metrics, p2pNetwork string, workersSize int, ) *Consumer { c := Consumer{ - consumeFunc: consumeFunc, - guardianPool: guardianPool, - logger: logger, - repository: repository, - metrics: metrics, - p2pNetwork: p2pNetwork, - workersSize: workersSize, + consumeFunc: consumeFunc, + processor: processor, + logger: logger, + metrics: metrics, + p2pNetwork: p2pNetwork, + workersSize: workersSize, } return &c @@ -54,17 +53,47 @@ func (c *Consumer) Start(ctx context.Context) { } func (c *Consumer) producerLoop(ctx context.Context, ch <-chan queue.ConsumerMessage) { - for { select { case <-ctx.Done(): return case msg := <-ch: - c.processEvent(ctx, *msg.Data()) + c.processEvent(ctx, msg) } } } -func (c *Consumer) processEvent(ctx context.Context, event queue.Event) { - //TODO +func (c *Consumer) processEvent(ctx context.Context, msg queue.ConsumerMessage) { + event := msg.Data() + vaaID := event.Data.VaaID + chainID := sdk.ChainID(event.Data.ChainID) + + logger := c.logger.With( + zap.String("trackId", event.TrackID), + zap.String("vaaId", vaaID)) + + if msg.IsExpired() { + msg.Failed() + logger.Debug("event is expired") + c.metrics.IncDuplicatedVaaExpired(chainID) + return + } + + params := &processor.Params{ + TrackID: event.TrackID, + VaaID: vaaID, + ChainID: chainID, + } + + err := c.processor(ctx, params) + if err != nil { + msg.Failed() + logger.Error("error processing event", zap.Error(err)) + c.metrics.IncDuplicatedVaaFailed(chainID) + return + } + + msg.Done() + logger.Debug("event processed") + c.metrics.IncDuplicatedVaaProcessed(chainID) } diff --git a/fly-event-processor/consumer/repository.go b/fly-event-processor/consumer/repository.go deleted file mode 100644 index f6d354c6..00000000 --- a/fly-event-processor/consumer/repository.go +++ /dev/null @@ -1,24 +0,0 @@ -package consumer - -import ( - commonRepo "github.com/wormhole-foundation/wormhole-explorer/common/repository" - "go.mongodb.org/mongo-driver/mongo" - "go.uber.org/zap" -) - -// Repository exposes operations over the `globalTransactions` collection. -type Repository struct { - logger *zap.Logger - vaas *mongo.Collection - duplicateVaas *mongo.Collection -} - -// New creates a new repository. -func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository { - r := Repository{ - logger: logger, - vaas: db.Collection(commonRepo.Vaas), - duplicateVaas: db.Collection(commonRepo.DuplicateVaas), - } - return &r -} diff --git a/fly-event-processor/guardian/guardian.go b/fly-event-processor/guardian/guardian.go new file mode 100644 index 00000000..1e392a8e --- /dev/null +++ b/fly-event-processor/guardian/guardian.go @@ -0,0 +1,20 @@ +package guardian + +import ( + client "github.com/wormhole-foundation/wormhole-explorer/common/client/guardian" + "github.com/wormhole-foundation/wormhole-explorer/common/pool" +) + +// GuardianAPIClient is a wrapper around the Guardian API client and the pool of providers. +type GuardianAPIClient struct { + Client *client.GuardianAPIClient + Pool *pool.Pool +} + +// NewGuardianAPIClient creates a new Guardian API client. +func NewGuardianAPIClient(client *client.GuardianAPIClient, pool *pool.Pool) *GuardianAPIClient { + return &GuardianAPIClient{ + Client: client, + Pool: pool, + } +} diff --git a/fly-event-processor/http/infrastructure/server.go b/fly-event-processor/http/infrastructure/server.go index 6448c74c..a30cc84c 100644 --- a/fly-event-processor/http/infrastructure/server.go +++ b/fly-event-processor/http/infrastructure/server.go @@ -5,6 +5,7 @@ import ( "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/pprof" health "github.com/wormhole-foundation/wormhole-explorer/common/health" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/http/vaa" "go.uber.org/zap" ) @@ -14,7 +15,7 @@ type Server struct { logger *zap.Logger } -func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...health.Check) *Server { +func NewServer(logger *zap.Logger, port string, vaaController *vaa.Controller, pprofEnabled bool, checks ...health.Check) *Server { app := fiber.New(fiber.Config{DisableStartupMessage: true}) prometheus := fiberprometheus.New("wormscan-fly-event-processor") prometheus.RegisterAt(app, "/metrics") @@ -29,7 +30,7 @@ func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...hea api := app.Group("/api") api.Get("/health", ctrl.HealthCheck) api.Get("/ready", ctrl.ReadyCheck) - + api.Post("/vaa/duplicated", vaaController.Process) return &Server{ app: app, port: port, diff --git a/fly-event-processor/http/vaa/controller.go b/fly-event-processor/http/vaa/controller.go new file mode 100644 index 00000000..f05c797f --- /dev/null +++ b/fly-event-processor/http/vaa/controller.go @@ -0,0 +1,55 @@ +package vaa + +import ( + "fmt" + + "github.com/gofiber/fiber/v2" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/processor" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage" + "go.uber.org/zap" +) + +// Controller definition. +type Controller struct { + logger *zap.Logger + repository *storage.Repository + processor processor.ProcessorFunc +} + +// NewController creates a Controller instance. +// func NewController(repository *Repository, processor processor.ProcessorFunc, logger *zap.Logger) *Controller { +func NewController(processor processor.ProcessorFunc, repository *storage.Repository, logger *zap.Logger) *Controller { + return &Controller{processor: processor, repository: repository, logger: logger} +} + +// Process processes the VAA message. +func (c *Controller) Process(ctx *fiber.Ctx) error { + request := struct { + VaaID string `json:"vaaId"` + }{} + + if err := ctx.BodyParser(&request); err != nil { + c.logger.Error("error parsing request", zap.Error(err)) + return err + } + + vaa, err := c.repository.FindVAAById(ctx.Context(), request.VaaID) + if err != nil { + c.logger.Error("error getting vaa from collection", zap.Error(err)) + return err + } + + params := processor.Params{ + TrackID: fmt.Sprintf("controller-%s", request.VaaID), + VaaID: request.VaaID, + ChainID: vaa.EmitterChain, + } + + err = c.processor(ctx.Context(), ¶ms) + if err != nil { + c.logger.Error("error processing vaa", zap.Error(err)) + return err + } + + return ctx.JSON(fiber.Map{"message": "success"}) +} diff --git a/fly-event-processor/internal/metrics/dummy.go b/fly-event-processor/internal/metrics/dummy.go index 107f0d7b..4551ab41 100644 --- a/fly-event-processor/internal/metrics/dummy.go +++ b/fly-event-processor/internal/metrics/dummy.go @@ -1,5 +1,7 @@ package metrics +import sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + // DummyMetrics is a dummy implementation of Metric interface. type DummyMetrics struct{} @@ -7,3 +9,18 @@ type DummyMetrics struct{} func NewDummyMetrics() *DummyMetrics { return &DummyMetrics{} } + +// IncDuplicatedVaaConsumedQueue dummy implementation. +func (d *DummyMetrics) IncDuplicatedVaaConsumedQueue() {} + +// IncDuplicatedVaaProcessed dummy implementation. +func (d *DummyMetrics) IncDuplicatedVaaProcessed(chainID sdk.ChainID) {} + +// IncDuplicatedVaaFailed dummy implementation. +func (d *DummyMetrics) IncDuplicatedVaaFailed(chainID sdk.ChainID) {} + +// IncDuplicatedVaaExpired dummy implementation. +func (d *DummyMetrics) IncDuplicatedVaaExpired(chainID sdk.ChainID) {} + +// IncDuplicatedVaaCanNotFixed dummy implementation. +func (d *DummyMetrics) IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) {} diff --git a/fly-event-processor/internal/metrics/metrics.go b/fly-event-processor/internal/metrics/metrics.go index 73cce557..953c69db 100644 --- a/fly-event-processor/internal/metrics/metrics.go +++ b/fly-event-processor/internal/metrics/metrics.go @@ -1,6 +1,13 @@ package metrics -const serviceName = "wormholescan-fly-event-processor" +import sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + +const serviceName = "wormscan-fly-event-processor" type Metrics interface { + IncDuplicatedVaaConsumedQueue() + IncDuplicatedVaaProcessed(chainID sdk.ChainID) + IncDuplicatedVaaFailed(chainID sdk.ChainID) + IncDuplicatedVaaExpired(chainID sdk.ChainID) + IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) } diff --git a/fly-event-processor/internal/metrics/prometheus.go b/fly-event-processor/internal/metrics/prometheus.go index bb2d8d59..619b9ac0 100644 --- a/fly-event-processor/internal/metrics/prometheus.go +++ b/fly-event-processor/internal/metrics/prometheus.go @@ -1,10 +1,51 @@ package metrics +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + // PrometheusMetrics is a Prometheus implementation of Metric interface. type PrometheusMetrics struct { + duplicatedVaaCount *prometheus.CounterVec } // NewPrometheusMetrics returns a new instance of PrometheusMetrics. func NewPrometheusMetrics(environment string) *PrometheusMetrics { - return &PrometheusMetrics{} + return &PrometheusMetrics{ + duplicatedVaaCount: promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "wormscan_fly_event_processor_duplicated_vaa_count", + Help: "The total number of duplicated VAA processed", + ConstLabels: map[string]string{ + "environment": environment, + "service": serviceName, + }, + }, []string{"chain", "type"}), + } +} + +func (m *PrometheusMetrics) IncDuplicatedVaaConsumedQueue() { + m.duplicatedVaaCount.WithLabelValues("all", "consumed_queue").Inc() +} + +func (m *PrometheusMetrics) IncDuplicatedVaaProcessed(chainID sdk.ChainID) { + chain := chainID.String() + m.duplicatedVaaCount.WithLabelValues(chain, "processed").Inc() +} + +func (m *PrometheusMetrics) IncDuplicatedVaaFailed(chainID sdk.ChainID) { + chain := chainID.String() + m.duplicatedVaaCount.WithLabelValues(chain, "failed").Inc() +} + +func (m *PrometheusMetrics) IncDuplicatedVaaExpired(chainID sdk.ChainID) { + chain := chainID.String() + m.duplicatedVaaCount.WithLabelValues(chain, "expired").Inc() +} + +func (m *PrometheusMetrics) IncDuplicatedVaaCanNotFixed(chainID sdk.ChainID) { + chain := chainID.String() + m.duplicatedVaaCount.WithLabelValues(chain, "can_not_fixed").Inc() } diff --git a/fly-event-processor/processor/processor.go b/fly-event-processor/processor/processor.go new file mode 100644 index 00000000..a3884293 --- /dev/null +++ b/fly-event-processor/processor/processor.go @@ -0,0 +1,220 @@ +package processor + +import ( + "errors" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/common/client/guardian" + "github.com/wormhole-foundation/wormhole-explorer/common/pool" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/internal/metrics" + "github.com/wormhole-foundation/wormhole-explorer/fly-event-processor/storage" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" + "golang.org/x/net/context" +) + +type Processor struct { + guardianPool *pool.Pool + repository *storage.Repository + logger *zap.Logger + metrics metrics.Metrics +} + +func NewProcessor( + guardianPool *pool.Pool, + repository *storage.Repository, + logger *zap.Logger, + metrics metrics.Metrics, +) *Processor { + return &Processor{ + guardianPool: guardianPool, + repository: repository, + logger: logger, + metrics: metrics, + } +} + +func (p *Processor) Process(ctx context.Context, params *Params) error { + logger := p.logger.With( + zap.String("trackId", params.TrackID), + zap.String("vaaId", params.VaaID)) + + // 1. check if the vaa stored in the VAA collections is the correct one. + + // 1.1 get vaa from Vaas collection + vaaDoc, err := p.repository.FindVAAById(ctx, params.VaaID) + if err != nil { + logger.Error("error getting vaa from collection", zap.Error(err)) + return err + } + + // 1.2 if the event time has not reached the finality time, the event fail and + // will be reprocesed on the next retry. + finalityTime := getFinalityTimeByChainID(params.ChainID) + if vaaDoc.Timestamp == nil { + logger.Error("vaa timestamp is nil") + return errors.New("vaa timestamp is nil") + } + + vaaTimestamp := *vaaDoc.Timestamp + reachedFinalityTime := time.Now().After(vaaTimestamp.Add(finalityTime)) + if !reachedFinalityTime { + logger.Debug("event time has not reached the finality time", + zap.Time("finalityTime", vaaTimestamp.Add(finalityTime))) + return errors.New("event time has not reached the finality time") + } + + // 1.3 call guardian api to get signed_vaa. + guardians := p.guardianPool.GetItems() + var signedVaa *guardian.SignedVaa + for _, g := range guardians { + g.Wait(ctx) + guardianAPIClient, err := guardian.NewGuardianAPIClient( + guardian.DefaultTimeout, + g.Id, + logger) + if err != nil { + logger.Error("error creating guardian api client", zap.Error(err)) + continue + } + signedVaa, err = guardianAPIClient.GetSignedVAA(params.VaaID) + if err != nil { + logger.Error("error getting signed vaa from guardian api", zap.Error(err)) + continue + } + break + } + + if signedVaa == nil { + logger.Error("error getting signed vaa from guardian api") + return errors.New("error getting signed vaa from guardian api") + } + + // 1.4 compare digest from vaa and signedVaa + guardianVAA, err := sdk.Unmarshal(signedVaa.VaaBytes) + if err != nil { + logger.Error("error unmarshalling guardian signed vaa", zap.Error(err)) + return err + } + + vaa, err := sdk.Unmarshal(vaaDoc.Vaa) + if err != nil { + logger.Error("error unmarshalling vaa", zap.Error(err)) + return err + } + + // If the guardian digest is the same that the vaa digest, + // the stored vaa in the vaas collection is the correct one. + if guardianVAA.HexDigest() == vaa.HexDigest() { + logger.Debug("vaa stored in Vaas collections is the correct") + return nil + } + + // 2. Check for each duplicate VAAs to detect which is the correct one. + + // 2.1 This check is necessary to avoid race conditions when the vaa is processed + if vaaDoc.TxHash == "" { + logger.Error("vaa txHash is empty") + return errors.New("vaa txHash is empty") + } + + // 2.2 Get all duplicate vaas by vaaId + duplicateVaaDocs, err := p.repository.FindDuplicateVAAs(ctx, params.TrackID) + if err != nil { + logger.Error("error getting duplicate vaas from collection", zap.Error(err)) + return err + } + + // 2.3 Check each duplicate VAA to detect which is the correct one. + for _, duplicateVaaDoc := range duplicateVaaDocs { + duplicateVaa, err := sdk.Unmarshal(duplicateVaaDoc.Vaa) + if err != nil { + logger.Error("error unmarshalling vaa", zap.Error(err)) + return err + } + + if guardianVAA.HexDigest() == duplicateVaa.HexDigest() { + err := p.repository.FixVAA(ctx, params.VaaID, duplicateVaaDoc.ID) + if err != nil { + logger.Error("error fixing vaa", zap.Error(err)) + return err + } + logger.Debug("vaa fixed") + return nil + } + } + + logger.Debug("can't fix duplicate vaa") + p.metrics.IncDuplicatedVaaCanNotFixed(params.ChainID) + return errors.New("can't fix duplicate vaa") +} + +func getFinalityTimeByChainID(chainID sdk.ChainID) time.Duration { + // Time to finalize for each chain. + // ref: https://docs.wormhole.com/wormhole/reference/constants + switch chainID { + case sdk.ChainIDSolana: + return 14 * time.Second + case sdk.ChainIDEthereum: + return 975 * time.Second + case sdk.ChainIDTerra: + return 6 * time.Second + case sdk.ChainIDBSC: + return 48 * time.Second + case sdk.ChainIDPolygon: + return 66 * time.Second + case sdk.ChainIDAvalanche: + return 2 * time.Second + case sdk.ChainIDOasis: + return 12 * time.Second + case sdk.ChainIDAlgorand: + return 4 * time.Second + case sdk.ChainIDFantom: + return 5 * time.Second + case sdk.ChainIDKarura: + return 24 * time.Second + case sdk.ChainIDAcala: + return 24 * time.Second + case sdk.ChainIDKlaytn: + return 1 * time.Second + case sdk.ChainIDCelo: + return 10 * time.Second + case sdk.ChainIDNear: + return 2 * time.Second + case sdk.ChainIDMoonbeam: + return 24 * time.Second + case sdk.ChainIDTerra2: + return 6 * time.Second + case sdk.ChainIDInjective: + return 3 * time.Second + case sdk.ChainIDSui: + return 3 * time.Second + case sdk.ChainIDAptos: + return 4 * time.Second + case sdk.ChainIDArbitrum: + return 1066 * time.Second + case sdk.ChainIDOptimism: + return 1026 * time.Second + case sdk.ChainIDXpla: + return 5 * time.Second + case sdk.ChainIDBase: + return 1026 * time.Second + case sdk.ChainIDSei: + return 1 * time.Second + case sdk.ChainIDWormchain: + return 5 * time.Second + case sdk.ChainIDSepolia: + return 975 * time.Second + case sdk.ChainIDArbitrumSepolia: + return 1066 * time.Second + case sdk.ChainIDBaseSepolia: + return 1026 * time.Second + case sdk.ChainIDOptimismSepolia: + return 1026 * time.Second + case sdk.ChainIDHolesky: + return 975 * time.Second + default: + // The default value is the max finality time. + return 1066 * time.Second + } +} diff --git a/fly-event-processor/processor/types.go b/fly-event-processor/processor/types.go new file mode 100644 index 00000000..c22032ed --- /dev/null +++ b/fly-event-processor/processor/types.go @@ -0,0 +1,15 @@ +package processor + +import ( + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + "golang.org/x/net/context" +) + +type Params struct { + TrackID string + VaaID string + ChainID sdk.ChainID +} + +// ProcessorFunc is a function to process vaa message. +type ProcessorFunc func(context.Context, *Params) error diff --git a/fly-event-processor/queue/event_sql.go b/fly-event-processor/queue/event_sqs.go similarity index 96% rename from fly-event-processor/queue/event_sql.go rename to fly-event-processor/queue/event_sqs.go index 3a8d836e..266ec96c 100644 --- a/fly-event-processor/queue/event_sql.go +++ b/fly-event-processor/queue/event_sqs.go @@ -59,6 +59,8 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { q.logger.Debug("Received messages from SQS", zap.Int("count", len(messages))) expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout()) for _, msg := range messages { + + q.metrics.IncDuplicatedVaaConsumedQueue() // unmarshal body to sqsEvent var sqsEvent sqsEvent err := json.Unmarshal([]byte(*msg.Body), &sqsEvent) @@ -88,7 +90,6 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { continue } - // TODO add metrics to DuplicateVAA consume retry, _ := strconv.Atoi(msg.Attributes["ApproximateReceiveCount"]) q.wg.Add(1) q.ch <- &sqsConsumerMessage{ @@ -140,12 +141,10 @@ func (m *sqsConsumerMessage) Done() { zap.Error(err), ) } - // TODO add metrics to DuplicateVAA consume m.wg.Done() } func (m *sqsConsumerMessage) Failed() { - // TODO add metrics to duplicateVAA consume failed m.wg.Done() } diff --git a/fly-event-processor/queue/types.go b/fly-event-processor/queue/types.go index ff4d1254..5204a352 100644 --- a/fly-event-processor/queue/types.go +++ b/fly-event-processor/queue/types.go @@ -13,13 +13,15 @@ type sqsEvent struct { // Event represents a event data to be handle. type Event struct { - Type string `json:"type"` - Source string `json:"source"` - Data DuplicateVaa `json:"data"` + TrackID string `json:"trackId"` + Type string `json:"type"` + Source string `json:"source"` + Data DuplicateVaa `json:"data"` } type DuplicateVaa struct { VaaID string `json:"vaaId"` + ChainID uint16 `json:"chainId"` Version uint8 `json:"version"` GuardianSetIndex uint32 `json:"guardianSetIndex"` Vaa []byte `json:"vaas"` diff --git a/fly-event-processor/storage/repository.go b/fly-event-processor/storage/repository.go new file mode 100644 index 00000000..132d7bd9 --- /dev/null +++ b/fly-event-processor/storage/repository.go @@ -0,0 +1,127 @@ +package storage + +import ( + "context" + + commonRepo "github.com/wormhole-foundation/wormhole-explorer/common/repository" + "go.mongodb.org/mongo-driver/mongo" + "go.uber.org/zap" + "gopkg.in/mgo.v2/bson" +) + +// Repository exposes operations over the `globalTransactions` collection. +type Repository struct { + logger *zap.Logger + vaas *mongo.Collection + duplicateVaas *mongo.Collection +} + +// New creates a new repository. +func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository { + r := Repository{ + logger: logger, + vaas: db.Collection(commonRepo.Vaas), + duplicateVaas: db.Collection(commonRepo.DuplicateVaas), + } + return &r +} + +// FindVAAById find a vaa by id. +func (r *Repository) FindVAAById(ctx context.Context, vaaID string) (*VaaDoc, error) { + var vaaDoc VaaDoc + err := r.vaas.FindOne(ctx, bson.M{"_id": vaaID}).Decode(&vaaDoc) + return &vaaDoc, err +} + +// FindDuplicateVAAById find a duplicate vaa by id. +func (r *Repository) FindDuplicateVAAById(ctx context.Context, id string) (*DuplicateVaaDoc, error) { + var duplicateVaaDoc DuplicateVaaDoc + err := r.duplicateVaas.FindOne(ctx, bson.M{"_id": id}).Decode(&duplicateVaaDoc) + return &duplicateVaaDoc, err +} + +// FindDuplicateVAAs find duplicate vaas by vaa id. +func (r *Repository) FindDuplicateVAAs(ctx context.Context, vaaID string) ([]DuplicateVaaDoc, error) { + var duplicateVaaDocs []DuplicateVaaDoc + cursor, err := r.duplicateVaas.Find(ctx, bson.M{"vaaId": vaaID}) + if err != nil { + return nil, err + } + if err = cursor.All(ctx, &duplicateVaaDocs); err != nil { + return nil, err + } + return duplicateVaaDocs, nil +} + +// FixVAA fix a vaa by id. +func (r *Repository) FixVAA(ctx context.Context, vaaID, duplicateID string) error { + // start mongo transaction + session, err := r.vaas.Database().Client().StartSession() + if err != nil { + return err + } + + err = session.StartTransaction() + if err != nil { + return err + } + + // get VAA by id + vaaDoc, err := r.FindVAAById(ctx, vaaID) + if err != nil { + session.AbortTransaction(ctx) + return err + } + + // get duplicate vaa by id + duplicateVaaDoc, err := r.FindDuplicateVAAById(ctx, duplicateID) + if err != nil { + session.AbortTransaction(ctx) + return err + } + + // create new vaa and new duplicate vaa + newVaa := duplicateVaaDoc.ToVaaDoc(true) + newDuplicateVaa, err := vaaDoc.ToDuplicateVaaDoc() + if err != nil { + session.AbortTransaction(ctx) + return err + } + + // remove vaa + _, err = r.vaas.DeleteOne(ctx, bson.M{"_id": vaaID}) + if err != nil { + session.AbortTransaction(ctx) + return err + } + + // remove duplicate vaa + _, err = r.duplicateVaas.DeleteOne(ctx, bson.M{"_id": duplicateID}) + if err != nil { + session.AbortTransaction(ctx) + return err + } + + // insert new vaa + _, err = r.vaas.InsertOne(ctx, newVaa) + if err != nil { + session.AbortTransaction(ctx) + return err + } + + // insert new duplicate vaa + _, err = r.duplicateVaas.InsertOne(ctx, newDuplicateVaa) + if err != nil { + session.AbortTransaction(ctx) + return err + } + + // commit transaction + err = session.CommitTransaction(ctx) + if err != nil { + session.AbortTransaction(ctx) + return err + } + + return nil +} diff --git a/fly-event-processor/storage/types.go b/fly-event-processor/storage/types.go new file mode 100644 index 00000000..47b8852c --- /dev/null +++ b/fly-event-processor/storage/types.go @@ -0,0 +1,87 @@ +package storage + +import ( + "time" + + "github.com/wormhole-foundation/wormhole-explorer/common/domain" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +// VaaDoc represents a VAA document. +type VaaDoc struct { + ID string `bson:"_id"` + Version uint8 `bson:"version"` + EmitterChain sdk.ChainID `bson:"emitterChain"` + EmitterAddr string `bson:"emitterAddr"` + Sequence string `bson:"sequence"` + GuardianSetIndex uint32 `bson:"guardianSetIndex"` + Vaa []byte `bson:"vaas"` + TxHash string `bson:"txHash,omitempty"` + OriginTxHash *string `bson:"_originTxHash,omitempty"` //this is temporary field for fix enconding txHash + Timestamp *time.Time `bson:"timestamp"` + UpdatedAt *time.Time `bson:"updatedAt"` + Digest string `bson:"digest"` + IsDuplicated bool `bson:"isDuplicated"` + DuplicatedFixed bool `bson:"duplicatedFixed"` +} + +// DuplicateVaaDoc represents a duplicate VAA document. +type DuplicateVaaDoc struct { + ID string `bson:"_id"` + VaaID string `bson:"vaaId"` + Version uint8 `bson:"version"` + EmitterChain sdk.ChainID `bson:"emitterChain"` + EmitterAddr string `bson:"emitterAddr"` + Sequence string `bson:"sequence"` + GuardianSetIndex uint32 `bson:"guardianSetIndex"` + Vaa []byte `bson:"vaas"` + Digest string `bson:"digest"` + ConsistencyLevel uint8 `bson:"consistencyLevel"` + TxHash string `bson:"txHash,omitempty"` + Timestamp *time.Time `bson:"timestamp"` + UpdatedAt *time.Time `bson:"updatedAt"` +} + +func (d *DuplicateVaaDoc) ToVaaDoc(duplicatedFixed bool) *VaaDoc { + return &VaaDoc{ + ID: d.VaaID, + Version: d.Version, + EmitterChain: d.EmitterChain, + EmitterAddr: d.EmitterAddr, + Sequence: d.Sequence, + GuardianSetIndex: d.GuardianSetIndex, + Vaa: d.Vaa, + Digest: d.Digest, + TxHash: d.TxHash, + OriginTxHash: nil, + Timestamp: d.Timestamp, + UpdatedAt: d.UpdatedAt, + DuplicatedFixed: duplicatedFixed, + IsDuplicated: true, + } +} + +func (v *VaaDoc) ToDuplicateVaaDoc() (*DuplicateVaaDoc, error) { + vaa, err := vaa.Unmarshal(v.Vaa) + if err != nil { + return nil, err + } + + uniqueId := domain.CreateUniqueVaaID(vaa) + return &DuplicateVaaDoc{ + ID: uniqueId, + VaaID: v.ID, + Version: v.Version, + EmitterChain: v.EmitterChain, + EmitterAddr: v.EmitterAddr, + Sequence: v.Sequence, + GuardianSetIndex: v.GuardianSetIndex, + Vaa: v.Vaa, + Digest: v.Digest, + TxHash: v.TxHash, + ConsistencyLevel: vaa.ConsistencyLevel, + Timestamp: v.Timestamp, + UpdatedAt: v.UpdatedAt, + }, nil +} diff --git a/fly/event/sns.go b/fly/event/sns.go index 934dc81d..4e33ad5d 100644 --- a/fly/event/sns.go +++ b/fly/event/sns.go @@ -7,6 +7,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" aws_sns "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/wormhole-foundation/wormhole-explorer/fly/internal/track" ) type SnsEventDispatcher struct { @@ -23,9 +24,10 @@ func NewSnsEventDispatcher(awsConfig aws.Config, url string) (*SnsEventDispatche func (s *SnsEventDispatcher) NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error { body, err := json.Marshal(event{ - Type: "duplicated-vaa", - Source: "fly", - Data: e, + TrackID: track.GetTrackIDForDuplicatedVAA(e.VaaID), + Type: "duplicated-vaa", + Source: "fly", + Data: e, }) if err != nil { return err diff --git a/fly/event/types.go b/fly/event/types.go index 4f6ff7f9..91200e45 100644 --- a/fly/event/types.go +++ b/fly/event/types.go @@ -17,9 +17,10 @@ type DuplicateVaa struct { } type event struct { - Type string `json:"type"` - Source string `json:"source"` - Data any `json:"data"` + TrackID string `json:"trackId"` + Type string `json:"type"` + Source string `json:"source"` + Data any `json:"data"` } type EventDispatcher interface { diff --git a/fly/internal/track/trackid.go b/fly/internal/track/trackid.go index 6d0b876e..765c137f 100644 --- a/fly/internal/track/trackid.go +++ b/fly/internal/track/trackid.go @@ -11,3 +11,8 @@ func GetTrackID(vaaID string) string { uuid := uuid.New() return fmt.Sprintf("gossip-signed-vaa-%s-%s", vaaID, uuid.String()) } + +func GetTrackIDForDuplicatedVAA(vaaID string) string { + uuid := uuid.New() + return fmt.Sprintf("fly-duplicated-vaa-%s-%s", vaaID, uuid.String()) +}