diff --git a/common/client/txtracker/txtracker.go b/common/client/txtracker/txtracker.go new file mode 100644 index 00000000..26b51dd7 --- /dev/null +++ b/common/client/txtracker/txtracker.go @@ -0,0 +1,86 @@ +package txtracker + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" + + "go.uber.org/zap" +) + +const DefaultTimeout = 30 + +var ( + ErrCallEndpoint = errors.New("ERROR CALL ENPOINT") + ErrInternalError = errors.New("INTERNAL ERROR") +) + +// TxTrackerAPIClient tx tracker api client. +type TxTrackerAPIClient struct { + Client http.Client + BaseURL string + Logger *zap.Logger +} + +// NewTxTrackerAPIClient create new instances of TxTrackerAPIClient. +func NewTxTrackerAPIClient(timeout int64, baseURL string, logger *zap.Logger) (TxTrackerAPIClient, error) { + if timeout == 0 { + timeout = DefaultTimeout + } + if baseURL == "" { + return TxTrackerAPIClient{}, errors.New("baseURL can not be empty") + } + + return TxTrackerAPIClient{ + Client: http.Client{ + Timeout: time.Duration(timeout) * time.Second, + }, + BaseURL: baseURL, + Logger: logger, + }, nil +} + +// ProcessVaaResponse represent a process vaa response. +type ProcessVaaResponse struct { + From string `json:"from"` + NativeTxHash string `json:"nativeTxHash"` + Attributes any `json:"attributes"` +} + +// Process process vaa. +func (c *TxTrackerAPIClient) Process(vaaID string) (*ProcessVaaResponse, error) { + endpointUrl := fmt.Sprintf("%s/vaa/process", c.BaseURL) + + // create request body. + payload := struct { + VaaID string `json:"id"` + }{ + VaaID: vaaID, + } + + body, err := json.Marshal(payload) + if err != nil { + c.Logger.Error("error marshalling payload", zap.Error(err), zap.String("vaaID", vaaID)) + return nil, err + } + + response, err := c.Client.Post(endpointUrl, "application/json", bytes.NewBuffer(body)) + if err != nil { + c.Logger.Error("error call parse vaa endpoint", zap.Error(err), zap.String("vaaID", vaaID)) + return nil, ErrCallEndpoint + } + defer response.Body.Close() + switch response.StatusCode { + case http.StatusOK: + var processVaaResponse ProcessVaaResponse + json.NewDecoder(response.Body).Decode(&processVaaResponse) + return &processVaaResponse, nil + case http.StatusInternalServerError: + return nil, ErrInternalError + default: + return nil, ErrInternalError + } +} diff --git a/deploy/jobs/env/production-mainnet.env b/deploy/jobs/env/production-mainnet.env index e872e193..43ee2957 100644 --- a/deploy/jobs/env/production-mainnet.env +++ b/deploy/jobs/env/production-mainnet.env @@ -19,4 +19,6 @@ REQUEST_LIMIT_TIME_SECONDS=1 HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * * #transfer reports jobs PRICES_URI=http://wormscan-notional.wormscan -OUTPUT_PATH= \ No newline at end of file +OUTPUT_PATH= +#migrate vaa to origintx job +TX_TRACKER_URL= \ No newline at end of file diff --git a/deploy/jobs/env/production-testnet.env b/deploy/jobs/env/production-testnet.env index 50662cdc..b30ae399 100644 --- a/deploy/jobs/env/production-testnet.env +++ b/deploy/jobs/env/production-testnet.env @@ -20,3 +20,5 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * * #transfer reports jobs PRICES_URI=http://wormscan-notional.wormscan-testnet OUTPUT_PATH= +#migrate vaa to origintx job +TX_TRACKER_URL= \ No newline at end of file diff --git a/deploy/jobs/env/staging-mainnet.env b/deploy/jobs/env/staging-mainnet.env index 7428b016..633eb6a6 100644 --- a/deploy/jobs/env/staging-mainnet.env +++ b/deploy/jobs/env/staging-mainnet.env @@ -20,3 +20,5 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * * #transfer reports jobs PRICES_URI=http://wormscan-notional.wormscan OUTPUT_PATH= +#migrate vaa to origintx job +TX_TRACKER_URL= \ No newline at end of file diff --git a/deploy/jobs/env/staging-testnet.env b/deploy/jobs/env/staging-testnet.env index 76fb1e07..67ba5095 100644 --- a/deploy/jobs/env/staging-testnet.env +++ b/deploy/jobs/env/staging-testnet.env @@ -20,3 +20,5 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * * #transfer reports jobs PRICES_URI=http://wormscan-notional.wormscan-testnet OUTPUT_PATH= +#migrate vaas to origintx job +TX_TRACKER_URL= \ No newline at end of file diff --git a/deploy/jobs/migrate-vaa-to-origintx.yml b/deploy/jobs/migrate-vaa-to-origintx.yml new file mode 100644 index 00000000..8f042e16 --- /dev/null +++ b/deploy/jobs/migrate-vaa-to-origintx.yml @@ -0,0 +1,50 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: migrate-vaas-to-origintx + namespace: {{ .NAMESPACE }} +spec: + template: + metadata: + labels: + app: migrate-vaas-to-origintx + spec: + restartPolicy: Never + terminationGracePeriodSeconds: 40 + containers: + - name: migrate-vaas-to-origintx + image: {{ .IMAGE_NAME }} + imagePullPolicy: Always + env: + - name: ENVIRONMENT + value: {{ .ENVIRONMENT }} + - name: P2P_NETWORK + value: {{ .P2P_NETWORK }} + - name: LOG_LEVEL + value: {{ .LOG_LEVEL }} + - name: JOB_ID + value: JOB_MIGRATE_SOURCE_TX + - name: MONGODB_URI + valueFrom: + secretKeyRef: + name: mongodb + key: mongo-uri + - name: MONGODB_DATABASE + valueFrom: + configMapKeyRef: + name: config + key: mongo-database + - name: PAGE_SIZE + value: "1000" + - name: CHAIN_ID + value: "0" + - name: FROM_DATE + value: "" + - name: TO_DATE + value: "" + - name: TX_TRACKER_URL + value: {{ .TX_TRACKER_URL }} + - name: TX_TRACKER_TIMEOUT + value: "30" + - name: SLEEP_TIME_SECONDS + value: "5" \ No newline at end of file diff --git a/fly/migration/migration.go b/fly/migration/migration.go index 4c60929d..915c70b5 100644 --- a/fly/migration/migration.go +++ b/fly/migration/migration.go @@ -114,6 +114,14 @@ func Run(db *mongo.Database) error { return err } + // create index in globaltransactions collect. + indexGlobalTransactionsByOriginTx := mongo.IndexModel{ + Keys: bson.D{{Key: "originTx.from", Value: 1}}} + _, err = db.Collection("globaltransactions").Indexes().CreateOne(context.TODO(), indexGlobalTransactionsByOriginTx) + if err != nil && isNotAlreadyExistsError(err) { + return err + } + return nil } diff --git a/jobs/cmd/main.go b/jobs/cmd/main.go index 7b10e338..877a256e 100644 --- a/jobs/cmd/main.go +++ b/jobs/cmd/main.go @@ -5,8 +5,10 @@ import ( "log" "os" "strings" + "time" "github.com/go-redis/redis" + txtrackerProcessVaa "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker" common "github.com/wormhole-foundation/wormhole-explorer/common/coingecko" "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/domain" @@ -16,8 +18,10 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/jobs/internal/coingecko" apiPrices "github.com/wormhole-foundation/wormhole-explorer/jobs/internal/prices" "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/migration" "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/notional" "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/report" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) @@ -60,6 +64,15 @@ func main() { historyPrices := initHistoricalPricesJob(context, hCfg, logger) err = historyPrices.Run(context) + case jobs.JobIDMigrationSourceTx: + mCfg, errCfg := config.NewMigrateSourceTxConfiguration(context) + if errCfg != nil { + log.Fatal("error creating config", errCfg) + } + + chainID := sdk.ChainID(mCfg.ChainID) + migrationJob := initMigrateSourceTxJob(context, mCfg, chainID, logger) + err = migrationJob.Run(context) default: logger.Fatal("Invalid job id", zap.String("job_id", cfg.JobID)) } @@ -124,6 +137,25 @@ func initHistoricalPricesJob(ctx context.Context, cfg *config.HistoricalPricesCo return notionalJob } +func initMigrateSourceTxJob(ctx context.Context, cfg *config.MigrateSourceTxConfiguration, chainID sdk.ChainID, logger *zap.Logger) *migration.MigrateSourceChainTx { + //setup DB connection + db, err := dbutil.Connect(ctx, logger, cfg.MongoURI, cfg.MongoDatabase, false) + if err != nil { + logger.Fatal("Failed to connect MongoDB", zap.Error(err)) + } + + // init tx tracker api client. + txTrackerAPIClient, err := txtrackerProcessVaa.NewTxTrackerAPIClient(cfg.TxTrackerTimeout, cfg.TxTrackerURL, logger) + if err != nil { + logger.Fatal("Failed to create txtracker api client", zap.Error(err)) + } + sleepTime := time.Duration(cfg.SleepTimeSeconds) * time.Second + fromDate, _ := time.Parse(time.RFC3339, cfg.FromDate) + toDate, _ := time.Parse(time.RFC3339, cfg.ToDate) + + return migration.NewMigrationSourceChainTx(db.Database, cfg.PageSize, sdk.ChainID(cfg.ChainID), fromDate, toDate, txTrackerAPIClient, sleepTime, logger) +} + func handleExit() { if r := recover(); r != nil { if e, ok := r.(exitCode); ok { diff --git a/jobs/config/config.go b/jobs/config/config.go index c3bffc52..5904786c 100644 --- a/jobs/config/config.go +++ b/jobs/config/config.go @@ -45,6 +45,18 @@ type HistoricalPricesConfiguration struct { PriceDays string `env:"PRICE_DAYS,default=max"` } +type MigrateSourceTxConfiguration struct { + MongoURI string `env:"MONGODB_URI,required"` + MongoDatabase string `env:"MONGODB_DATABASE,required"` + PageSize int `env:"PAGE_SIZE,default=100"` + ChainID int64 `env:"CHAIN_ID,default=0"` + FromDate string `env:"FROM_DATE,required"` + ToDate string `env:"TO_DATE,required"` + TxTrackerURL string `env:"TX_TRACKER_URL,required"` + TxTrackerTimeout int64 `env:"TX_TRACKER_TIMEOUT,default=30"` + SleepTimeSeconds int64 `env:"SLEEP_TIME_SECONDS,default=5"` +} + // New creates a default configuration with the values from .env file and environment variables. func New(ctx context.Context) (*Configuration, error) { _ = godotenv.Load(".env", "../.env") @@ -92,3 +104,15 @@ func NewHistoricalPricesConfiguration(ctx context.Context) (*HistoricalPricesCon return &configuration, nil } + +// New creates a migration source tx configuration with the values from .env file and environment variables. +func NewMigrateSourceTxConfiguration(ctx context.Context) (*MigrateSourceTxConfiguration, error) { + _ = godotenv.Load(".env", "../.env") + + var configuration MigrateSourceTxConfiguration + if err := envconfig.Process(ctx, &configuration); err != nil { + return nil, err + } + + return &configuration, nil +} diff --git a/jobs/go.mod b/jobs/go.mod index b1ba406d..ddeebeb0 100644 --- a/jobs/go.mod +++ b/jobs/go.mod @@ -6,10 +6,10 @@ require ( github.com/go-redis/redis v6.15.9+incompatible github.com/go-resty/resty/v2 v2.10.0 github.com/joho/godotenv v1.5.1 - github.com/sethvargo/go-envconfig v0.9.0 + github.com/sethvargo/go-envconfig v1.0.0 github.com/shopspring/decimal v1.3.1 github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-20230713181709-0425a89e7533 - github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 + github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229 go.mongodb.org/mongo-driver v1.11.2 go.uber.org/zap v1.24.0 ) @@ -25,7 +25,6 @@ require ( github.com/ethereum/go-ethereum v1.10.21 // indirect github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/go-cmp v0.5.9 // indirect github.com/holiman/uint256 v1.2.1 // indirect github.com/klauspost/compress v1.16.3 // indirect github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect diff --git a/jobs/go.sum b/jobs/go.sum index 9ffd32a7..509822ce 100644 --- a/jobs/go.sum +++ b/jobs/go.sum @@ -33,8 +33,7 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/holiman/uint256 v1.2.1 h1:XRtyuda/zw2l+Bq/38n5XUoEF72aSOu/77Thd9pPp2o= github.com/holiman/uint256 v1.2.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= @@ -59,8 +58,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE= -github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0= +github.com/sethvargo/go-envconfig v1.0.0 h1:1C66wzy4QrROf5ew4KdVw942CQDa55qmlYmw9FZxZdU= +github.com/sethvargo/go-envconfig v1.0.0/go.mod h1:Lzc75ghUn5ucmcRGIdGQ33DKJrcjk4kihFYgSTBmjIc= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -70,8 +69,8 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= -github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 h1:rrOyHd+H9a6Op1iUyZNCaI5v9D1syq8jDAYyX/2Q4L8= -github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8/go.mod h1:dE12DOucCq23gjGGGhtbyx41FBxuHxjpPvG+ArO+8t0= +github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229 h1:fqcC4qwEVaJfcpqUVKi5+imz+JpxviQYPW4qu3zILz4= +github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229/go.mod h1:pE/jYet19kY4P3V6mE2+01zvEfxdyBqv6L6HsnSa5uc= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= diff --git a/jobs/jobs/jobs.go b/jobs/jobs/jobs.go index b2eded35..7785104b 100644 --- a/jobs/jobs/jobs.go +++ b/jobs/jobs/jobs.go @@ -3,9 +3,10 @@ package jobs // JobIDNotional is the job id for notional job. const ( - JobIDNotional = "JOB_NOTIONAL_USD" - JobIDTransferReport = "JOB_TRANSFER_REPORT" - JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES" + JobIDNotional = "JOB_NOTIONAL_USD" + JobIDTransferReport = "JOB_TRANSFER_REPORT" + JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES" + JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX" ) // Job is the interface for jobs. diff --git a/jobs/jobs/migration/migration_source_tx.go b/jobs/jobs/migration/migration_source_tx.go new file mode 100644 index 00000000..47d22bc8 --- /dev/null +++ b/jobs/jobs/migration/migration_source_tx.go @@ -0,0 +1,383 @@ +package migration + +import ( + "context" + "errors" + "strings" + "sync" + "time" + + txtrackerProcessVaa "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.uber.org/zap" +) + +// MigrateSourceChainTx is the job to migrate vaa txHash and timestamp from vaa collection to globalTx collection. +type MigrateSourceChainTx struct { + db *mongo.Database + pageSize int + chainID sdk.ChainID + FromDate time.Time + ToDate time.Time + txTrackerAPIClient txtrackerProcessVaa.TxTrackerAPIClient + sleepTime time.Duration + collections struct { + vaas *mongo.Collection + globalTransactions *mongo.Collection + } + logger *zap.Logger +} + +// NewMigrationSourceChainTx creates a new migration job. +func NewMigrationSourceChainTx( + db *mongo.Database, + pageSize int, + chainID sdk.ChainID, + FromDate time.Time, + ToDate time.Time, + txTrackerAPIClient txtrackerProcessVaa.TxTrackerAPIClient, + sleepTime time.Duration, + logger *zap.Logger) *MigrateSourceChainTx { + return &MigrateSourceChainTx{ + db: db, + pageSize: pageSize, + chainID: chainID, + FromDate: FromDate, + ToDate: ToDate, + txTrackerAPIClient: txTrackerAPIClient, + sleepTime: sleepTime, + collections: struct { + vaas *mongo.Collection + globalTransactions *mongo.Collection + }{ + vaas: db.Collection("vaas"), + globalTransactions: db.Collection("globalTransactions"), + }, + logger: logger} +} + +// VAASourceChain defines the structure of vaa fields needed for migration. +type VAASourceChain struct { + ID string `bson:"_id"` + EmitterChain sdk.ChainID `bson:"emitterChain" json:"emitterChain"` + Timestamp *time.Time `bson:"timestamp" json:"timestamp"` + TxHash *string `bson:"txHash" json:"txHash,omitempty"` +} + +// GlobalTransaction represents a global transaction. +type GlobalTransaction struct { + ID string `bson:"_id" json:"id"` + OriginTx *OriginTx `bson:"originTx" json:"originTx"` +} + +// OriginTx represents a origin transaction. +type OriginTx struct { + TxHash string `bson:"nativeTxHash" json:"txHash"` + From string `bson:"from" json:"from"` + Status string `bson:"status" json:"status"` +} + +func (m *MigrateSourceChainTx) Run(ctx context.Context) error { + if m.chainID == sdk.ChainIDSolana || m.chainID == sdk.ChainIDAptos { + return m.runComplexMigration(ctx) + } else { + return m.runMigration(ctx) + } +} + +// runComplexMigration runs the migration job for solana and aptos chains calling the txtracker endpoint. +func (m *MigrateSourceChainTx) runComplexMigration(ctx context.Context) error { + if sdk.ChainIDSolana != m.chainID && sdk.ChainIDAptos != m.chainID { + return errors.New("invalid chainID") + } + + var page int64 = 0 + for { + // get vaas to migrate by page and pageSize. + vaas, err := m.getVaasToMigrate(ctx, m.chainID, m.FromDate, m.ToDate, page, int64(m.pageSize)) + if err != nil { + m.logger.Error("failed to get vaas", zap.Error(err), zap.Int64("page", page)) + break + } + + if len(vaas) == 0 { + break + } + + for _, v := range vaas { + + // check if global transaction exists and nested originTx exists + filter := bson.D{ + {Key: "_id", Value: v.ID}, + {Key: "originTx", Value: bson.D{{Key: "$exists", Value: true}}}, + } + + var globalTransacations GlobalTransaction + err := m.collections.globalTransactions.FindOne(ctx, filter).Decode(&globalTransacations) + + // if global transaction exists, skip + if err == nil { + m.logger.Info("global transaction already exists", zap.String("id", v.ID)) + continue + } + + // if exist and error getting global transaction, log error + if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { + m.logger.Error("failed to get global transaction", zap.Error(err), zap.String("id", v.ID)) + continue + } + + // if not exist txhash, skip + if v.TxHash == nil { + m.logger.Error("txHash is nil", zap.String("id", v.ID)) + continue + } + + _, err = m.txTrackerAPIClient.Process(v.ID) + if err != nil { + m.logger.Error("failed to process vaa", zap.Error(err), zap.String("id", v.ID)) + continue + } + time.Sleep(5 * time.Second) + } + page++ + } + return nil +} + +// Run runs the migration job. +func (m *MigrateSourceChainTx) runMigration(ctx context.Context) error { + var page int64 = 0 + var wg sync.WaitGroup + workerLimit := m.pageSize + jobs := make(chan VAASourceChain, workerLimit) + + for i := 1; i <= workerLimit; i++ { + wg.Add(1) + go worker(ctx, &wg, jobs, m.collections.globalTransactions, m.logger) + } + + for { + // get vaas to migrate by page and pageSize. + vaas, err := m.getVaasToMigrate(ctx, m.chainID, m.FromDate, m.ToDate, page, int64(m.pageSize)) + if err != nil { + m.logger.Error("failed to get vaas", zap.Error(err), zap.Int64("page", page)) + break + } + + if len(vaas) == 0 { + break + } + + for _, v := range vaas { + jobs <- v + } + + } + close(jobs) + wg.Wait() + + return nil +} + +func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan VAASourceChain, collection *mongo.Collection, logger *zap.Logger) { + defer wg.Done() + for v := range jobs { + if v.EmitterChain == sdk.ChainIDSolana || v.EmitterChain == sdk.ChainIDAptos { + logger.Debug("skip migration", zap.String("id", v.ID), zap.String("chain", v.EmitterChain.String())) + continue + } + + // check if global transaction exists and nested originTx exists + filter := bson.D{ + {Key: "_id", Value: v.ID}, + {Key: "originTx", Value: bson.D{{Key: "$exists", Value: true}}}, + } + + var globalTransacations GlobalTransaction + err := collection.FindOne(ctx, filter).Decode(&globalTransacations) + + // if global transaction exists, skip + if err == nil { + logger.Info("global transaction already exists", zap.String("id", v.ID)) + continue + } + + // if exist and error getting global transaction, log error + if err != nil && !errors.Is(err, mongo.ErrNoDocuments) { + logger.Error("failed to get global transaction", zap.Error(err), zap.String("id", v.ID)) + continue + } + + // if not exist txhash, skip + if v.TxHash == nil { + logger.Error("txHash is nil", zap.String("id", v.ID)) + continue + } + + // set txHash format by chain + var txHash string + switch v.EmitterChain { + case sdk.ChainIDAcala, + sdk.ChainIDArbitrum, + sdk.ChainIDAvalanche, + sdk.ChainIDBase, + sdk.ChainIDBSC, + sdk.ChainIDCelo, + sdk.ChainIDEthereum, + sdk.ChainIDFantom, + sdk.ChainIDKarura, + sdk.ChainIDKlaytn, + sdk.ChainIDMoonbeam, + sdk.ChainIDOasis, + sdk.ChainIDOptimism, + sdk.ChainIDPolygon: + txHash = txHashLowerCaseWith0x(*v.TxHash) + default: + txHash = *v.TxHash + } + + // update global transaction + update := bson.D{ + {Key: "$set", Value: bson.D{{Key: "originTx.timestamp", Value: v.Timestamp}}}, + {Key: "$set", Value: bson.D{{Key: "originTx.nativeTxHash", Value: txHash}}}, + {Key: "$set", Value: bson.D{{Key: "originTx.status", Value: "confirmed"}}}, + } + + opts := options.Update().SetUpsert(true) + result, err := collection.UpdateByID(ctx, v.ID, update, opts) + if err != nil { + logger.Error("failed to update global transaction", zap.Error(err), zap.String("id", v.ID)) + break + } + if result.UpsertedCount == 1 { + logger.Info("inserted global transaction", zap.String("id", v.ID)) + } else { + logger.Debug("global transaction already exists", zap.String("id", v.ID)) + } + } +} + +func txHashLowerCaseWith0x(v string) string { + if strings.HasPrefix(v, "0x") { + return strings.ToLower(v) + } + return "0x" + strings.ToLower(v) +} + +func (m *MigrateSourceChainTx) getVaasToMigrate(ctx context.Context, chainID sdk.ChainID, from time.Time, to time.Time, page int64, pageSize int64) ([]VAASourceChain, error) { + + skip := page * pageSize + limit := pageSize + sort := bson.D{{Key: "timestamp", Value: 1}} + + // add match step by chain + var matchStage1 bson.D + if chainID != sdk.ChainIDUnset { + if chainID == sdk.ChainIDSolana || chainID == sdk.ChainIDAptos { + return []VAASourceChain{}, errors.New("invalid chainID") + } + matchStage1 = bson.D{{Key: "$match", Value: bson.D{ + {Key: "emitterChain", Value: chainID}, + }}} + } else { + // get all the vaas without solana and aptos + solanaAndAptosIds := []sdk.ChainID{sdk.ChainIDSolana, sdk.ChainIDAptos} + matchStage1 = bson.D{{Key: "$match", Value: bson.D{ + {Key: "emitterChain", Value: bson.M{"$nin": solanaAndAptosIds}}, + }}} + } + + // add match step by range date + var matchStage2 bson.D + if from.IsZero() && to.IsZero() { + matchStage2 = bson.D{{Key: "$match", Value: bson.D{}}} + } + if from.IsZero() && !to.IsZero() { + matchStage2 = bson.D{{Key: "$match", Value: bson.D{ + {Key: "timestamp", Value: bson.M{ + "$lt": to, + }}, + }}} + } + if !from.IsZero() && to.IsZero() { + matchStage2 = bson.D{{Key: "$match", Value: bson.D{ + {Key: "timestamp", Value: bson.M{ + "$gte": from, + }}, + }}} + } + if !from.IsZero() && !to.IsZero() { + matchStage2 = bson.D{{Key: "$match", Value: bson.D{ + {Key: "timestamp", Value: bson.M{ + "$gte": from, + "$lt": to, + }}, + }}} + } + + // add match step that txHash exists + var matchStage3 bson.D + matchStage3 = bson.D{{Key: "$match", Value: bson.D{ + {Key: "txHash", Value: bson.D{{Key: "$exists", Value: true}}}, + }}} + + // add lookup step with globalTransactions collection + lookupStage := bson.D{{Key: "$lookup", Value: bson.D{ + {Key: "from", Value: "globalTransactions"}, + {Key: "localField", Value: "_id"}, + {Key: "foreignField", Value: "_id"}, + {Key: "as", Value: "globalTransactions"}, + }}} + + matchStage4 := bson.D{{Key: "$match", Value: bson.D{ + {Key: "globalTransactions.originTx", Value: bson.D{{Key: "$exists", Value: false}}}, + }}} + + // add project step + projectStage := bson.D{{Key: "$project", Value: bson.D{ + {Key: "_id", Value: 1}, + {Key: "emitterChain", Value: 1}, + {Key: "timestamp", Value: 1}, + {Key: "txHash", Value: 1}, + }}} + + // add skip step + skipStage := bson.D{{Key: "$skip", Value: skip}} + + // add limit step + limitStage := bson.D{{Key: "$limit", Value: limit}} + + // add sort step + sortStage := bson.D{{Key: "$sort", Value: sort}} + + // define pipeline + pipeline := mongo.Pipeline{ + matchStage1, + matchStage2, + matchStage3, + lookupStage, + matchStage4, + projectStage, + skipStage, + limitStage, + sortStage, + } + + // find vaas + cur, err := m.collections.vaas.Aggregate(ctx, pipeline) + if err != nil { + return []VAASourceChain{}, err + } + + // decode vaas + vaas := make([]VAASourceChain, pageSize) + if err := cur.All(ctx, &vaas); err != nil { + return []VAASourceChain{}, err + } + + return vaas, nil +} diff --git a/tx-tracker/http/vaa/controller.go b/tx-tracker/http/vaa/controller.go index 746f11b3..8e14fefe 100644 --- a/tx-tracker/http/vaa/controller.go +++ b/tx-tracker/http/vaa/controller.go @@ -17,8 +17,8 @@ type Controller struct { vaaRepository *Repository repository *consumer.Repository rpcProviderSettings *config.RpcProviderSettings - p2pNetwork string metrics metrics.Metrics + p2pNetwork string } // NewController creates a Controller instance.