diff --git a/api/docs/docs.go b/api/docs/docs.go index d97a7657..83c56b6b 100644 --- a/api/docs/docs.go +++ b/api/docs/docs.go @@ -674,6 +674,12 @@ const docTemplate = `{ "name": "pageSize", "in": "query" }, + { + "type": "string", + "description": "Transaction hash of the Observations", + "name": "txHash", + "in": "query" + }, { "enum": [ "ASC", diff --git a/api/docs/swagger.json b/api/docs/swagger.json index 3cface37..1ad71869 100644 --- a/api/docs/swagger.json +++ b/api/docs/swagger.json @@ -667,6 +667,12 @@ "name": "pageSize", "in": "query" }, + { + "type": "string", + "description": "Transaction hash of the Observations", + "name": "txHash", + "in": "query" + }, { "enum": [ "ASC", diff --git a/api/docs/swagger.yaml b/api/docs/swagger.yaml index 2da42a2a..a832f381 100644 --- a/api/docs/swagger.yaml +++ b/api/docs/swagger.yaml @@ -1649,6 +1649,10 @@ paths: in: query name: pageSize type: integer + - description: Transaction hash of the Observations + in: query + name: txHash + type: string - description: Sort results in ascending or descending order. enum: - ASC diff --git a/api/handlers/observations/model.go b/api/handlers/observations/model.go index 8b1fa8b7..d2e762cb 100644 --- a/api/handlers/observations/model.go +++ b/api/handlers/observations/model.go @@ -6,6 +6,8 @@ import ( "strconv" "time" + "github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination" + "github.com/wormhole-foundation/wormhole-explorer/common/types" "github.com/wormhole-foundation/wormhole/sdk/vaa" ) @@ -39,3 +41,9 @@ func (o *ObservationDoc) MarshalJSON() ([]byte, error) { Alias: (*Alias)(o), }) } + +// FindAllParams passes input data to the function `FindAll`. +type FindAllParams struct { + Pagination *pagination.Pagination + TxHash *types.TxHash +} diff --git a/api/handlers/observations/repository.go b/api/handlers/observations/repository.go index f78c1605..ce91d8da 100644 --- a/api/handlers/observations/repository.go +++ b/api/handlers/observations/repository.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" "github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination" + "github.com/wormhole-foundation/wormhole-explorer/common/types" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -89,6 +90,7 @@ type ObservationQuery struct { sequence string guardianAddr string hash []byte + txHash *types.TxHash uint64 } @@ -128,6 +130,12 @@ func (q *ObservationQuery) SetHash(hash []byte) *ObservationQuery { return q } +// SetHash set the hash field of the ObservationQuery struct. +func (q *ObservationQuery) SetTxHash(txHash *types.TxHash) *ObservationQuery { + q.txHash = txHash + return q +} + // SetPagination set the pagination field of the ObservationQuery struct. func (q *ObservationQuery) SetPagination(p *pagination.Pagination) *ObservationQuery { q.Pagination = *p @@ -151,6 +159,10 @@ func (q *ObservationQuery) toBSON() *bson.D { if q.guardianAddr != "" { r = append(r, bson.E{"guardianAddr", q.guardianAddr}) } + if q.txHash != nil { + nativeTxHash := q.txHash.String() + r = append(r, bson.E{"nativeTxHash", nativeTxHash}) + } return &r } diff --git a/api/handlers/observations/service.go b/api/handlers/observations/service.go index 9d3b765d..b554d3de 100644 --- a/api/handlers/observations/service.go +++ b/api/handlers/observations/service.go @@ -22,8 +22,8 @@ func NewService(dao *Repository, logger *zap.Logger) *Service { } // FindAll get all the observations. -func (s *Service) FindAll(ctx context.Context, p *pagination.Pagination) ([]*ObservationDoc, error) { - return s.repo.Find(ctx, Query().SetPagination(p)) +func (s *Service) FindAll(ctx context.Context, p *FindAllParams) ([]*ObservationDoc, error) { + return s.repo.Find(ctx, Query().SetPagination(p.Pagination).SetTxHash(p.TxHash)) } // FindByChain get all the observations by chainID. diff --git a/api/routes/wormscan/observations/controller.go b/api/routes/wormscan/observations/controller.go index 0bdb7b3c..4cedd88f 100644 --- a/api/routes/wormscan/observations/controller.go +++ b/api/routes/wormscan/observations/controller.go @@ -31,6 +31,7 @@ func NewController(srv *observations.Service, logger *zap.Logger) *Controller { // @ID find-observations // @Param page query integer false "Page number." // @Param pageSize query integer false "Number of elements per page." +// @Param txHash query string false "Transaction hash of the Observations" // @Param sortOrder query string false "Sort results in ascending or descending order." Enums(ASC, DESC) // @Success 200 {object} []observations.ObservationDoc // @Failure 400 @@ -48,7 +49,17 @@ func (c *Controller) FindAll(ctx *fiber.Ctx) error { return response.NewInvalidParamError(ctx, "pageSize cannot be greater than 1000", nil) } - obs, err := c.srv.FindAll(ctx.Context(), p) + txHash, err := middleware.GetTxHash(ctx, c.logger) + if err != nil { + return err + } + + params := &observations.FindAllParams{ + Pagination: p, + TxHash: txHash, + } + + obs, err := c.srv.FindAll(ctx.Context(), params) if err != nil { return err } diff --git a/common/repository/names.go b/common/repository/names.go index 73b8bde8..fb5ba2d9 100644 --- a/common/repository/names.go +++ b/common/repository/names.go @@ -8,4 +8,5 @@ const ( GuardianSets = "guardianSets" NodeGovernorVaas = "nodeGovernorVaas" GovernorVaas = "governorVaas" + Observations = "observations" ) diff --git a/deploy/jobs/migrate-native-tx-hash.yml b/deploy/jobs/migrate-native-tx-hash.yml new file mode 100644 index 00000000..7267cae1 --- /dev/null +++ b/deploy/jobs/migrate-native-tx-hash.yml @@ -0,0 +1,38 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: migrate-native-tx-hash + namespace: {{ .NAMESPACE }} +spec: + template: + metadata: + labels: + app: migrate-native-tx-hash + spec: + restartPolicy: Never + terminationGracePeriodSeconds: 40 + containers: + - name: migrate-native-tx-hash + image: {{ .IMAGE_NAME }} + imagePullPolicy: Always + env: + - name: ENVIRONMENT + value: {{ .ENVIRONMENT }} + - name: P2P_NETWORK + value: {{ .P2P_NETWORK }} + - name: LOG_LEVEL + value: {{ .LOG_LEVEL }} + - name: JOB_ID + value: JOB_MIGRATE_NATIVE_TX_HASH + - name: MONGODB_URI + valueFrom: + secretKeyRef: + name: mongodb + key: mongo-uri + - name: MONGODB_DATABASE + valueFrom: + configMapKeyRef: + name: config + key: mongo-database + - name: PAGE_SIZE + value: "1000" \ No newline at end of file diff --git a/fly/go.mod b/fly/go.mod index 3446b885..24817830 100644 --- a/fly/go.mod +++ b/fly/go.mod @@ -7,6 +7,7 @@ require ( github.com/aws/aws-sdk-go-v2 v1.18.0 github.com/aws/aws-sdk-go-v2/config v1.15.1 github.com/aws/aws-sdk-go-v2/credentials v1.11.0 + github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 github.com/aws/aws-sdk-go-v2/service/sqs v1.20.2 github.com/certusone/wormhole/node v0.0.0-20240416174455-25e60611a867 github.com/dgraph-io/ristretto v0.1.1 @@ -37,7 +38,6 @@ require ( github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.1 // indirect - github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.11.1 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.16.1 // indirect github.com/aws/smithy-go v1.13.5 // indirect diff --git a/fly/migration/migration.go b/fly/migration/migration.go index bc7c4997..59fbfebf 100644 --- a/fly/migration/migration.go +++ b/fly/migration/migration.go @@ -116,18 +116,27 @@ func Run(db *mongo.Database) error { // create index in observations collection by indexedAt. indexObservationsByIndexedAt := mongo.IndexModel{Keys: bson.D{{Key: "indexedAt", Value: 1}}} - _, err = db.Collection("observations").Indexes().CreateOne(context.TODO(), indexObservationsByIndexedAt) + _, err = db.Collection(repository.Observations).Indexes().CreateOne(context.TODO(), indexObservationsByIndexedAt) if err != nil && isNotAlreadyExistsError(err) { return err } - // create index in observations collect. + // create index in observations collection. indexObservationsByEmitterChainAndAddressAndSequence := mongo.IndexModel{ Keys: bson.D{ {Key: "emitterChain", Value: 1}, {Key: "emitterAddr", Value: 1}, {Key: "sequence", Value: 1}}} - _, err = db.Collection("observations").Indexes().CreateOne(context.TODO(), indexObservationsByEmitterChainAndAddressAndSequence) + _, err = db.Collection(repository.Observations).Indexes().CreateOne(context.TODO(), indexObservationsByEmitterChainAndAddressAndSequence) + if err != nil && isNotAlreadyExistsError(err) { + return err + } + + // create index in observations collection by nativeTxHash. + indexObservationsByNativeTxHash := mongo.IndexModel{ + Keys: bson.D{ + {Key: "nativeTxHash", Value: 1}}} + _, err = db.Collection(repository.Observations).Indexes().CreateOne(context.TODO(), indexObservationsByNativeTxHash) if err != nil && isNotAlreadyExistsError(err) { return err } diff --git a/fly/storage/documents.go b/fly/storage/documents.go index f847319b..6e35ed29 100644 --- a/fly/storage/documents.go +++ b/fly/storage/documents.go @@ -82,6 +82,7 @@ type ObservationUpdate struct { Sequence string `bson:"sequence"` Hash []byte `bson:"hash"` TxHash []byte `bson:"txHash"` + NativeTxHash string `bson:"nativeTxHash"` GuardianAddr string `bson:"guardianAddr"` Signature []byte `bson:"signature"` UpdatedAt *time.Time `bson:"updatedAt"` diff --git a/fly/storage/repository.go b/fly/storage/repository.go index 47ac7d77..b8196928 100644 --- a/fly/storage/repository.go +++ b/fly/storage/repository.go @@ -69,7 +69,7 @@ func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics, }{ vaas: db.Collection(repository.Vaas), heartbeats: db.Collection("heartbeats"), - observations: db.Collection("observations"), + observations: db.Collection(repository.Observations), governorConfig: db.Collection("governorConfig"), governorStatus: db.Collection("governorStatus"), vaasPythnet: db.Collection("vaasPythnet"), @@ -163,14 +163,14 @@ func (s *Repository) UpsertObservation(ctx context.Context, o *gossipv1.SignedOb id := fmt.Sprintf("%s/%s/%s", o.MessageId, hex.EncodeToString(o.Addr), hex.EncodeToString(o.Hash)) now := time.Now() - chainID, err := strconv.ParseUint(chainIDStr, 10, 16) + chainIDUint, err := strconv.ParseUint(chainIDStr, 10, 16) if err != nil { s.log.Error("Error parsing chainId", zap.Error(err)) return err } // TODO should we notify the caller that pyth observations are not stored? - if vaa.ChainID(chainID) == vaa.ChainIDPythNet { + if vaa.ChainID(chainIDUint) == vaa.ChainIDPythNet { return nil } sequence, err := strconv.ParseUint(sequenceStr, 10, 64) @@ -179,14 +179,25 @@ func (s *Repository) UpsertObservation(ctx context.Context, o *gossipv1.SignedOb return err } + chainID := vaa.ChainID(chainIDUint) + var nativeTxHash string + switch chainID { + case vaa.ChainIDSolana, + vaa.ChainIDWormchain, + vaa.ChainIDAptos: + default: + nativeTxHash, _ = domain.EncodeTrxHashByChainID(chainID, o.GetTxHash()) + } + addr := eth_common.BytesToAddress(o.GetAddr()) obs := ObservationUpdate{ - ChainID: vaa.ChainID(chainID), + ChainID: chainID, Emitter: emitter, Sequence: strconv.FormatUint(sequence, 10), MessageID: o.GetMessageId(), Hash: o.GetHash(), TxHash: o.GetTxHash(), + NativeTxHash: nativeTxHash, GuardianAddr: addr.String(), Signature: o.GetSignature(), UpdatedAt: &now, @@ -217,14 +228,14 @@ func (s *Repository) UpsertObservation(ctx context.Context, o *gossipv1.SignedOb txHash, err := domain.EncodeTrxHashByChainID(vaa.ChainID(chainID), o.GetTxHash()) if err != nil { s.log.Warn("Error encoding tx hash", - zap.Uint64("chainId", chainID), + zap.Uint64("chainId", chainIDUint), zap.ByteString("txHash", o.GetTxHash()), zap.Error(err)) - s.metrics.IncObservationWithoutTxHash(vaa.ChainID(chainID)) + s.metrics.IncObservationWithoutTxHash(chainID) } vaaTxHash := txhash.TxHash{ - ChainID: vaa.ChainID(chainID), + ChainID: chainID, Emitter: emitter, Sequence: strconv.FormatUint(sequence, 10), TxHash: txHash, diff --git a/jobs/cmd/main.go b/jobs/cmd/main.go index ecff3d81..785d071e 100644 --- a/jobs/cmd/main.go +++ b/jobs/cmd/main.go @@ -90,6 +90,9 @@ func main() { case jobs.JobIDProtocolsStatsDaily: statsJob := initProtocolStatsDailyJob(ctx, logger) err = statsJob.Run(ctx) + case jobs.JobIDMigrationNativeTxHash: + job := initMigrateNativeTxHashJob(ctx, logger) + err = job.Run(ctx) default: logger.Fatal("Invalid job id", zap.String("job_id", cfg.JobID)) } @@ -238,6 +241,18 @@ func initProtocolStatsDailyJob(ctx context.Context, logger *zap.Logger) *protoco logger) } +func initMigrateNativeTxHashJob(ctx context.Context, logger *zap.Logger) *migration.MigrateNativeTxHash { + cfgJob, errCfg := configuration.LoadFromEnv[config.MigrateNativeTxHashConfiguration](ctx) + if errCfg != nil { + log.Fatal("error creating config", errCfg) + } + db, err := dbutil.Connect(ctx, logger, cfgJob.MongoURI, cfgJob.MongoDatabase, false) + if err != nil { + logger.Fatal("Failed to connect MongoDB", zap.Error(err)) + } + return migration.NewMigrationNativeTxHash(db.Database, cfgJob.PageSize, logger) +} + func handleExit() { if r := recover(); r != nil { if e, ok := r.(exitCode); ok { diff --git a/jobs/config/config.go b/jobs/config/config.go index 5c3ce48d..df52fc7c 100644 --- a/jobs/config/config.go +++ b/jobs/config/config.go @@ -72,3 +72,9 @@ type Protocol struct { type ProtocolsActivityConfiguration struct { ProtocolsStatsConfiguration } + +type MigrateNativeTxHashConfiguration struct { + MongoURI string `env:"MONGODB_URI,required"` + MongoDatabase string `env:"MONGODB_DATABASE,required"` + PageSize int `env:"PAGE_SIZE,default=100"` +} diff --git a/jobs/jobs/jobs.go b/jobs/jobs/jobs.go index e18d4978..c8005cba 100644 --- a/jobs/jobs/jobs.go +++ b/jobs/jobs/jobs.go @@ -5,12 +5,13 @@ import "context" // JobIDNotional is the job id for notional job. const ( - JobIDNotional = "JOB_NOTIONAL_USD" - JobIDTransferReport = "JOB_TRANSFER_REPORT" - JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES" - JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX" - JobIDProtocolsStatsDaily = "JOB_PROTOCOLS_STATS_DAILY" - JobIDProtocolsStatsHourly = "JOB_PROTOCOLS_STATS_HOURLY" + JobIDNotional = "JOB_NOTIONAL_USD" + JobIDTransferReport = "JOB_TRANSFER_REPORT" + JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES" + JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX" + JobIDProtocolsStatsDaily = "JOB_PROTOCOLS_STATS_DAILY" + JobIDProtocolsStatsHourly = "JOB_PROTOCOLS_STATS_HOURLY" + JobIDMigrationNativeTxHash = "JOB_MIGRATE_NATIVE_TX_HASH" ) // Job is the interface for jobs. diff --git a/jobs/jobs/migration/migration_native_tx_hash.go b/jobs/jobs/migration/migration_native_tx_hash.go new file mode 100644 index 00000000..0e28fafd --- /dev/null +++ b/jobs/jobs/migration/migration_native_tx_hash.go @@ -0,0 +1,160 @@ +package migration + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/common/domain" + "github.com/wormhole-foundation/wormhole-explorer/common/repository" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.uber.org/zap" +) + +// MigrateSourceChainTx is the job to migrate vaa txHash and timestamp from vaa collection to globalTx collection. +type MigrateNativeTxHash struct { + db *mongo.Database + pageSize int + collections struct { + observations *mongo.Collection + } + logger *zap.Logger +} + +// NewMigrationNativeTxHash creates a new migration job. +func NewMigrationNativeTxHash( + db *mongo.Database, + pageSize int, + logger *zap.Logger) *MigrateNativeTxHash { + return &MigrateNativeTxHash{ + db: db, + pageSize: pageSize, + collections: struct { + observations *mongo.Collection + }{ + observations: db.Collection(repository.Observations), + }, + logger: logger} +} + +// GlobalTransaction represents a global transaction. +type Observation struct { + ID string `bson:"_id" json:"id"` + ChainID vaa.ChainID `bson:"emitterChain"` + TxHash []byte `bson:"txHash"` + IndexedAt time.Time `bson:"indexedAt"` +} + +func (m *MigrateNativeTxHash) Run(ctx context.Context) error { + return m.runMigration(ctx) +} + +// Run runs the migration job. +func (m *MigrateNativeTxHash) runMigration(ctx context.Context) error { + var updated atomic.Uint64 + var total atomic.Uint64 + var wg sync.WaitGroup + workerLimit := m.pageSize + jobs := make(chan Observation, workerLimit) + + for i := 1; i <= workerLimit; i++ { + wg.Add(1) + go updateNativeTxHash(ctx, &wg, jobs, m.collections.observations, &updated, m.logger) + } + + indexedAt := time.Now() + for { + observations, err := m.getObservationsToMigrate(ctx, int64(m.pageSize), indexedAt) + if err != nil { + m.logger.Error("failed to get observations", zap.Error(err)) + break + } + + if len(observations) == 0 { + break + } + total.Add(uint64(len(observations))) + for _, v := range observations { + jobs <- v + indexedAt = v.IndexedAt + } + m.logger.Info("migrating observations", + zap.String("indexedAt", indexedAt.Format(time.RFC3339)), + zap.Uint64("total", total.Load()), + zap.Uint64("updated", updated.Load())) + } + close(jobs) + wg.Wait() + + return nil +} + +func updateNativeTxHash(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Observation, collection *mongo.Collection, updated *atomic.Uint64, logger *zap.Logger) { + defer wg.Done() + for v := range jobs { + var ignoreNativeTxHash bool + var nativeTxHash string + + // if not exist txhash, skip + if v.TxHash == nil || len(v.TxHash) == 0 { + logger.Warn("txHash is nil", zap.String("id", v.ID)) + ignoreNativeTxHash = true + } + + if !ignoreNativeTxHash { + txHash, err := domain.EncodeTrxHashByChainID(v.ChainID, v.TxHash) + if err != nil { + logger.Error("failed to encode transaction hash", zap.Error(err), zap.String("id", v.ID)) + } else { + nativeTxHash = txHash + } + } + + // update observations + update := bson.D{ + {Key: "$set", Value: bson.D{{Key: "nativeTxHash", Value: nativeTxHash}}}, + } + + result, err := collection.UpdateByID(ctx, v.ID, update, &options.UpdateOptions{Upsert: &[]bool{true}[0]}) + if err != nil { + logger.Error("failed to update observation", zap.Error(err), zap.String("id", v.ID)) + break + } + if result.ModifiedCount == 1 { + updated.Add(1) + logger.Debug("updated nativeTxHash observation", zap.String("id", v.ID)) + } else { + logger.Info("nativeTxHash in observation already exists", zap.String("id", v.ID)) + } + } +} + +func (m *MigrateNativeTxHash) getObservationsToMigrate(ctx context.Context, pageSize int64, lessThan time.Time) ([]Observation, error) { + + limit := pageSize + sort := bson.D{{Key: "indexedAt", Value: -1}} + + solanaAndAptosAndWormchainIds := []sdk.ChainID{sdk.ChainIDSolana, sdk.ChainIDAptos, sdk.ChainIDWormchain} + filter := bson.D{ + {Key: "emitterChain", Value: bson.M{"$nin": solanaAndAptosAndWormchainIds}}, + {Key: "nativeTxHash", Value: bson.M{"$exists": false}}, + {Key: "indexedAt", Value: bson.M{"$lte": lessThan}}, + } + + cur, err := m.collections.observations.Find(ctx, filter, &options.FindOptions{Limit: &limit, Sort: sort}) + if err != nil { + return []Observation{}, err + } + + var observations []Observation + if err := cur.All(ctx, &observations); err != nil { + return []Observation{}, err + } + + return observations, nil +}