Find by txHash in observations endpoint (#1450)

Add txHash query parameter in observations endpoint
Add job to add nativeTxHash in observations collection except for solana, aptos and wormchain
Add encoded txHash in obvervations collection
This commit is contained in:
ftocal 2024-06-04 12:12:44 -03:00 committed by GitHub
parent 8ab5a6f5e1
commit e366f04765
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 309 additions and 20 deletions

View File

@ -674,6 +674,12 @@ const docTemplate = `{
"name": "pageSize",
"in": "query"
},
{
"type": "string",
"description": "Transaction hash of the Observations",
"name": "txHash",
"in": "query"
},
{
"enum": [
"ASC",

View File

@ -667,6 +667,12 @@
"name": "pageSize",
"in": "query"
},
{
"type": "string",
"description": "Transaction hash of the Observations",
"name": "txHash",
"in": "query"
},
{
"enum": [
"ASC",

View File

@ -1649,6 +1649,10 @@ paths:
in: query
name: pageSize
type: integer
- description: Transaction hash of the Observations
in: query
name: txHash
type: string
- description: Sort results in ascending or descending order.
enum:
- ASC

View File

@ -6,6 +6,8 @@ import (
"strconv"
"time"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination"
"github.com/wormhole-foundation/wormhole-explorer/common/types"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
@ -39,3 +41,9 @@ func (o *ObservationDoc) MarshalJSON() ([]byte, error) {
Alias: (*Alias)(o),
})
}
// FindAllParams passes input data to the function `FindAll`.
type FindAllParams struct {
Pagination *pagination.Pagination
TxHash *types.TxHash
}

View File

@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination"
"github.com/wormhole-foundation/wormhole-explorer/common/types"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
@ -89,6 +90,7 @@ type ObservationQuery struct {
sequence string
guardianAddr string
hash []byte
txHash *types.TxHash
uint64
}
@ -128,6 +130,12 @@ func (q *ObservationQuery) SetHash(hash []byte) *ObservationQuery {
return q
}
// SetHash set the hash field of the ObservationQuery struct.
func (q *ObservationQuery) SetTxHash(txHash *types.TxHash) *ObservationQuery {
q.txHash = txHash
return q
}
// SetPagination set the pagination field of the ObservationQuery struct.
func (q *ObservationQuery) SetPagination(p *pagination.Pagination) *ObservationQuery {
q.Pagination = *p
@ -151,6 +159,10 @@ func (q *ObservationQuery) toBSON() *bson.D {
if q.guardianAddr != "" {
r = append(r, bson.E{"guardianAddr", q.guardianAddr})
}
if q.txHash != nil {
nativeTxHash := q.txHash.String()
r = append(r, bson.E{"nativeTxHash", nativeTxHash})
}
return &r
}

View File

@ -22,8 +22,8 @@ func NewService(dao *Repository, logger *zap.Logger) *Service {
}
// FindAll get all the observations.
func (s *Service) FindAll(ctx context.Context, p *pagination.Pagination) ([]*ObservationDoc, error) {
return s.repo.Find(ctx, Query().SetPagination(p))
func (s *Service) FindAll(ctx context.Context, p *FindAllParams) ([]*ObservationDoc, error) {
return s.repo.Find(ctx, Query().SetPagination(p.Pagination).SetTxHash(p.TxHash))
}
// FindByChain get all the observations by chainID.

View File

@ -31,6 +31,7 @@ func NewController(srv *observations.Service, logger *zap.Logger) *Controller {
// @ID find-observations
// @Param page query integer false "Page number."
// @Param pageSize query integer false "Number of elements per page."
// @Param txHash query string false "Transaction hash of the Observations"
// @Param sortOrder query string false "Sort results in ascending or descending order." Enums(ASC, DESC)
// @Success 200 {object} []observations.ObservationDoc
// @Failure 400
@ -48,7 +49,17 @@ func (c *Controller) FindAll(ctx *fiber.Ctx) error {
return response.NewInvalidParamError(ctx, "pageSize cannot be greater than 1000", nil)
}
obs, err := c.srv.FindAll(ctx.Context(), p)
txHash, err := middleware.GetTxHash(ctx, c.logger)
if err != nil {
return err
}
params := &observations.FindAllParams{
Pagination: p,
TxHash: txHash,
}
obs, err := c.srv.FindAll(ctx.Context(), params)
if err != nil {
return err
}

View File

@ -8,4 +8,5 @@ const (
GuardianSets = "guardianSets"
NodeGovernorVaas = "nodeGovernorVaas"
GovernorVaas = "governorVaas"
Observations = "observations"
)

View File

@ -0,0 +1,38 @@
apiVersion: batch/v1
kind: Job
metadata:
name: migrate-native-tx-hash
namespace: {{ .NAMESPACE }}
spec:
template:
metadata:
labels:
app: migrate-native-tx-hash
spec:
restartPolicy: Never
terminationGracePeriodSeconds: 40
containers:
- name: migrate-native-tx-hash
image: {{ .IMAGE_NAME }}
imagePullPolicy: Always
env:
- name: ENVIRONMENT
value: {{ .ENVIRONMENT }}
- name: P2P_NETWORK
value: {{ .P2P_NETWORK }}
- name: LOG_LEVEL
value: {{ .LOG_LEVEL }}
- name: JOB_ID
value: JOB_MIGRATE_NATIVE_TX_HASH
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb
key: mongo-uri
- name: MONGODB_DATABASE
valueFrom:
configMapKeyRef:
name: config
key: mongo-database
- name: PAGE_SIZE
value: "1000"

View File

@ -7,6 +7,7 @@ require (
github.com/aws/aws-sdk-go-v2 v1.18.0
github.com/aws/aws-sdk-go-v2/config v1.15.1
github.com/aws/aws-sdk-go-v2/credentials v1.11.0
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.2
github.com/certusone/wormhole/node v0.0.0-20240416174455-25e60611a867
github.com/dgraph-io/ristretto v0.1.1
@ -37,7 +38,6 @@ require (
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.1 // indirect
github.com/aws/smithy-go v1.13.5 // indirect

View File

@ -116,18 +116,27 @@ func Run(db *mongo.Database) error {
// create index in observations collection by indexedAt.
indexObservationsByIndexedAt := mongo.IndexModel{Keys: bson.D{{Key: "indexedAt", Value: 1}}}
_, err = db.Collection("observations").Indexes().CreateOne(context.TODO(), indexObservationsByIndexedAt)
_, err = db.Collection(repository.Observations).Indexes().CreateOne(context.TODO(), indexObservationsByIndexedAt)
if err != nil && isNotAlreadyExistsError(err) {
return err
}
// create index in observations collect.
// create index in observations collection.
indexObservationsByEmitterChainAndAddressAndSequence := mongo.IndexModel{
Keys: bson.D{
{Key: "emitterChain", Value: 1},
{Key: "emitterAddr", Value: 1},
{Key: "sequence", Value: 1}}}
_, err = db.Collection("observations").Indexes().CreateOne(context.TODO(), indexObservationsByEmitterChainAndAddressAndSequence)
_, err = db.Collection(repository.Observations).Indexes().CreateOne(context.TODO(), indexObservationsByEmitterChainAndAddressAndSequence)
if err != nil && isNotAlreadyExistsError(err) {
return err
}
// create index in observations collection by nativeTxHash.
indexObservationsByNativeTxHash := mongo.IndexModel{
Keys: bson.D{
{Key: "nativeTxHash", Value: 1}}}
_, err = db.Collection(repository.Observations).Indexes().CreateOne(context.TODO(), indexObservationsByNativeTxHash)
if err != nil && isNotAlreadyExistsError(err) {
return err
}

View File

@ -82,6 +82,7 @@ type ObservationUpdate struct {
Sequence string `bson:"sequence"`
Hash []byte `bson:"hash"`
TxHash []byte `bson:"txHash"`
NativeTxHash string `bson:"nativeTxHash"`
GuardianAddr string `bson:"guardianAddr"`
Signature []byte `bson:"signature"`
UpdatedAt *time.Time `bson:"updatedAt"`

View File

@ -69,7 +69,7 @@ func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics,
}{
vaas: db.Collection(repository.Vaas),
heartbeats: db.Collection("heartbeats"),
observations: db.Collection("observations"),
observations: db.Collection(repository.Observations),
governorConfig: db.Collection("governorConfig"),
governorStatus: db.Collection("governorStatus"),
vaasPythnet: db.Collection("vaasPythnet"),
@ -163,14 +163,14 @@ func (s *Repository) UpsertObservation(ctx context.Context, o *gossipv1.SignedOb
id := fmt.Sprintf("%s/%s/%s", o.MessageId, hex.EncodeToString(o.Addr), hex.EncodeToString(o.Hash))
now := time.Now()
chainID, err := strconv.ParseUint(chainIDStr, 10, 16)
chainIDUint, err := strconv.ParseUint(chainIDStr, 10, 16)
if err != nil {
s.log.Error("Error parsing chainId", zap.Error(err))
return err
}
// TODO should we notify the caller that pyth observations are not stored?
if vaa.ChainID(chainID) == vaa.ChainIDPythNet {
if vaa.ChainID(chainIDUint) == vaa.ChainIDPythNet {
return nil
}
sequence, err := strconv.ParseUint(sequenceStr, 10, 64)
@ -179,14 +179,25 @@ func (s *Repository) UpsertObservation(ctx context.Context, o *gossipv1.SignedOb
return err
}
chainID := vaa.ChainID(chainIDUint)
var nativeTxHash string
switch chainID {
case vaa.ChainIDSolana,
vaa.ChainIDWormchain,
vaa.ChainIDAptos:
default:
nativeTxHash, _ = domain.EncodeTrxHashByChainID(chainID, o.GetTxHash())
}
addr := eth_common.BytesToAddress(o.GetAddr())
obs := ObservationUpdate{
ChainID: vaa.ChainID(chainID),
ChainID: chainID,
Emitter: emitter,
Sequence: strconv.FormatUint(sequence, 10),
MessageID: o.GetMessageId(),
Hash: o.GetHash(),
TxHash: o.GetTxHash(),
NativeTxHash: nativeTxHash,
GuardianAddr: addr.String(),
Signature: o.GetSignature(),
UpdatedAt: &now,
@ -217,14 +228,14 @@ func (s *Repository) UpsertObservation(ctx context.Context, o *gossipv1.SignedOb
txHash, err := domain.EncodeTrxHashByChainID(vaa.ChainID(chainID), o.GetTxHash())
if err != nil {
s.log.Warn("Error encoding tx hash",
zap.Uint64("chainId", chainID),
zap.Uint64("chainId", chainIDUint),
zap.ByteString("txHash", o.GetTxHash()),
zap.Error(err))
s.metrics.IncObservationWithoutTxHash(vaa.ChainID(chainID))
s.metrics.IncObservationWithoutTxHash(chainID)
}
vaaTxHash := txhash.TxHash{
ChainID: vaa.ChainID(chainID),
ChainID: chainID,
Emitter: emitter,
Sequence: strconv.FormatUint(sequence, 10),
TxHash: txHash,

View File

@ -90,6 +90,9 @@ func main() {
case jobs.JobIDProtocolsStatsDaily:
statsJob := initProtocolStatsDailyJob(ctx, logger)
err = statsJob.Run(ctx)
case jobs.JobIDMigrationNativeTxHash:
job := initMigrateNativeTxHashJob(ctx, logger)
err = job.Run(ctx)
default:
logger.Fatal("Invalid job id", zap.String("job_id", cfg.JobID))
}
@ -238,6 +241,18 @@ func initProtocolStatsDailyJob(ctx context.Context, logger *zap.Logger) *protoco
logger)
}
func initMigrateNativeTxHashJob(ctx context.Context, logger *zap.Logger) *migration.MigrateNativeTxHash {
cfgJob, errCfg := configuration.LoadFromEnv[config.MigrateNativeTxHashConfiguration](ctx)
if errCfg != nil {
log.Fatal("error creating config", errCfg)
}
db, err := dbutil.Connect(ctx, logger, cfgJob.MongoURI, cfgJob.MongoDatabase, false)
if err != nil {
logger.Fatal("Failed to connect MongoDB", zap.Error(err))
}
return migration.NewMigrationNativeTxHash(db.Database, cfgJob.PageSize, logger)
}
func handleExit() {
if r := recover(); r != nil {
if e, ok := r.(exitCode); ok {

View File

@ -72,3 +72,9 @@ type Protocol struct {
type ProtocolsActivityConfiguration struct {
ProtocolsStatsConfiguration
}
type MigrateNativeTxHashConfiguration struct {
MongoURI string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
PageSize int `env:"PAGE_SIZE,default=100"`
}

View File

@ -5,12 +5,13 @@ import "context"
// JobIDNotional is the job id for notional job.
const (
JobIDNotional = "JOB_NOTIONAL_USD"
JobIDTransferReport = "JOB_TRANSFER_REPORT"
JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES"
JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX"
JobIDProtocolsStatsDaily = "JOB_PROTOCOLS_STATS_DAILY"
JobIDProtocolsStatsHourly = "JOB_PROTOCOLS_STATS_HOURLY"
JobIDNotional = "JOB_NOTIONAL_USD"
JobIDTransferReport = "JOB_TRANSFER_REPORT"
JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES"
JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX"
JobIDProtocolsStatsDaily = "JOB_PROTOCOLS_STATS_DAILY"
JobIDProtocolsStatsHourly = "JOB_PROTOCOLS_STATS_HOURLY"
JobIDMigrationNativeTxHash = "JOB_MIGRATE_NATIVE_TX_HASH"
)
// Job is the interface for jobs.

View File

@ -0,0 +1,160 @@
package migration
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// MigrateSourceChainTx is the job to migrate vaa txHash and timestamp from vaa collection to globalTx collection.
type MigrateNativeTxHash struct {
db *mongo.Database
pageSize int
collections struct {
observations *mongo.Collection
}
logger *zap.Logger
}
// NewMigrationNativeTxHash creates a new migration job.
func NewMigrationNativeTxHash(
db *mongo.Database,
pageSize int,
logger *zap.Logger) *MigrateNativeTxHash {
return &MigrateNativeTxHash{
db: db,
pageSize: pageSize,
collections: struct {
observations *mongo.Collection
}{
observations: db.Collection(repository.Observations),
},
logger: logger}
}
// GlobalTransaction represents a global transaction.
type Observation struct {
ID string `bson:"_id" json:"id"`
ChainID vaa.ChainID `bson:"emitterChain"`
TxHash []byte `bson:"txHash"`
IndexedAt time.Time `bson:"indexedAt"`
}
func (m *MigrateNativeTxHash) Run(ctx context.Context) error {
return m.runMigration(ctx)
}
// Run runs the migration job.
func (m *MigrateNativeTxHash) runMigration(ctx context.Context) error {
var updated atomic.Uint64
var total atomic.Uint64
var wg sync.WaitGroup
workerLimit := m.pageSize
jobs := make(chan Observation, workerLimit)
for i := 1; i <= workerLimit; i++ {
wg.Add(1)
go updateNativeTxHash(ctx, &wg, jobs, m.collections.observations, &updated, m.logger)
}
indexedAt := time.Now()
for {
observations, err := m.getObservationsToMigrate(ctx, int64(m.pageSize), indexedAt)
if err != nil {
m.logger.Error("failed to get observations", zap.Error(err))
break
}
if len(observations) == 0 {
break
}
total.Add(uint64(len(observations)))
for _, v := range observations {
jobs <- v
indexedAt = v.IndexedAt
}
m.logger.Info("migrating observations",
zap.String("indexedAt", indexedAt.Format(time.RFC3339)),
zap.Uint64("total", total.Load()),
zap.Uint64("updated", updated.Load()))
}
close(jobs)
wg.Wait()
return nil
}
func updateNativeTxHash(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Observation, collection *mongo.Collection, updated *atomic.Uint64, logger *zap.Logger) {
defer wg.Done()
for v := range jobs {
var ignoreNativeTxHash bool
var nativeTxHash string
// if not exist txhash, skip
if v.TxHash == nil || len(v.TxHash) == 0 {
logger.Warn("txHash is nil", zap.String("id", v.ID))
ignoreNativeTxHash = true
}
if !ignoreNativeTxHash {
txHash, err := domain.EncodeTrxHashByChainID(v.ChainID, v.TxHash)
if err != nil {
logger.Error("failed to encode transaction hash", zap.Error(err), zap.String("id", v.ID))
} else {
nativeTxHash = txHash
}
}
// update observations
update := bson.D{
{Key: "$set", Value: bson.D{{Key: "nativeTxHash", Value: nativeTxHash}}},
}
result, err := collection.UpdateByID(ctx, v.ID, update, &options.UpdateOptions{Upsert: &[]bool{true}[0]})
if err != nil {
logger.Error("failed to update observation", zap.Error(err), zap.String("id", v.ID))
break
}
if result.ModifiedCount == 1 {
updated.Add(1)
logger.Debug("updated nativeTxHash observation", zap.String("id", v.ID))
} else {
logger.Info("nativeTxHash in observation already exists", zap.String("id", v.ID))
}
}
}
func (m *MigrateNativeTxHash) getObservationsToMigrate(ctx context.Context, pageSize int64, lessThan time.Time) ([]Observation, error) {
limit := pageSize
sort := bson.D{{Key: "indexedAt", Value: -1}}
solanaAndAptosAndWormchainIds := []sdk.ChainID{sdk.ChainIDSolana, sdk.ChainIDAptos, sdk.ChainIDWormchain}
filter := bson.D{
{Key: "emitterChain", Value: bson.M{"$nin": solanaAndAptosAndWormchainIds}},
{Key: "nativeTxHash", Value: bson.M{"$exists": false}},
{Key: "indexedAt", Value: bson.M{"$lte": lessThan}},
}
cur, err := m.collections.observations.Find(ctx, filter, &options.FindOptions{Limit: &limit, Sort: sort})
if err != nil {
return []Observation{}, err
}
var observations []Observation
if err := cur.All(ctx, &observations); err != nil {
return []Observation{}, err
}
return observations, nil
}