migrate vaa to globaltransaction origintx (#940)

* migrate vaa to globaltransaction origintx

* Add deploy configuration for job to migrate vaas to originTx

* Add option to run migration process by range of date

* Update go dependencies for jobs

* Fix kind of job in deployment

---------

Co-authored-by: Fernando Torres <fert1335@gmail.com>
This commit is contained in:
walker-16 2024-01-30 12:19:31 -03:00 committed by GitHub
parent 59c2de9d0c
commit fb0573c160
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 604 additions and 14 deletions

View File

@ -0,0 +1,86 @@
package txtracker
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"go.uber.org/zap"
)
const DefaultTimeout = 30
var (
ErrCallEndpoint = errors.New("ERROR CALL ENPOINT")
ErrInternalError = errors.New("INTERNAL ERROR")
)
// TxTrackerAPIClient tx tracker api client.
type TxTrackerAPIClient struct {
Client http.Client
BaseURL string
Logger *zap.Logger
}
// NewTxTrackerAPIClient create new instances of TxTrackerAPIClient.
func NewTxTrackerAPIClient(timeout int64, baseURL string, logger *zap.Logger) (TxTrackerAPIClient, error) {
if timeout == 0 {
timeout = DefaultTimeout
}
if baseURL == "" {
return TxTrackerAPIClient{}, errors.New("baseURL can not be empty")
}
return TxTrackerAPIClient{
Client: http.Client{
Timeout: time.Duration(timeout) * time.Second,
},
BaseURL: baseURL,
Logger: logger,
}, nil
}
// ProcessVaaResponse represent a process vaa response.
type ProcessVaaResponse struct {
From string `json:"from"`
NativeTxHash string `json:"nativeTxHash"`
Attributes any `json:"attributes"`
}
// Process process vaa.
func (c *TxTrackerAPIClient) Process(vaaID string) (*ProcessVaaResponse, error) {
endpointUrl := fmt.Sprintf("%s/vaa/process", c.BaseURL)
// create request body.
payload := struct {
VaaID string `json:"id"`
}{
VaaID: vaaID,
}
body, err := json.Marshal(payload)
if err != nil {
c.Logger.Error("error marshalling payload", zap.Error(err), zap.String("vaaID", vaaID))
return nil, err
}
response, err := c.Client.Post(endpointUrl, "application/json", bytes.NewBuffer(body))
if err != nil {
c.Logger.Error("error call parse vaa endpoint", zap.Error(err), zap.String("vaaID", vaaID))
return nil, ErrCallEndpoint
}
defer response.Body.Close()
switch response.StatusCode {
case http.StatusOK:
var processVaaResponse ProcessVaaResponse
json.NewDecoder(response.Body).Decode(&processVaaResponse)
return &processVaaResponse, nil
case http.StatusInternalServerError:
return nil, ErrInternalError
default:
return nil, ErrInternalError
}
}

View File

@ -19,4 +19,6 @@ REQUEST_LIMIT_TIME_SECONDS=1
HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * *
#transfer reports jobs
PRICES_URI=http://wormscan-notional.wormscan
OUTPUT_PATH=
OUTPUT_PATH=
#migrate vaa to origintx job
TX_TRACKER_URL=

View File

@ -20,3 +20,5 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * *
#transfer reports jobs
PRICES_URI=http://wormscan-notional.wormscan-testnet
OUTPUT_PATH=
#migrate vaa to origintx job
TX_TRACKER_URL=

View File

@ -20,3 +20,5 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * *
#transfer reports jobs
PRICES_URI=http://wormscan-notional.wormscan
OUTPUT_PATH=
#migrate vaa to origintx job
TX_TRACKER_URL=

View File

@ -20,3 +20,5 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * *
#transfer reports jobs
PRICES_URI=http://wormscan-notional.wormscan-testnet
OUTPUT_PATH=
#migrate vaas to origintx job
TX_TRACKER_URL=

View File

@ -0,0 +1,50 @@
apiVersion: batch/v1
kind: Job
metadata:
name: migrate-vaas-to-origintx
namespace: {{ .NAMESPACE }}
spec:
template:
metadata:
labels:
app: migrate-vaas-to-origintx
spec:
restartPolicy: Never
terminationGracePeriodSeconds: 40
containers:
- name: migrate-vaas-to-origintx
image: {{ .IMAGE_NAME }}
imagePullPolicy: Always
env:
- name: ENVIRONMENT
value: {{ .ENVIRONMENT }}
- name: P2P_NETWORK
value: {{ .P2P_NETWORK }}
- name: LOG_LEVEL
value: {{ .LOG_LEVEL }}
- name: JOB_ID
value: JOB_MIGRATE_SOURCE_TX
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb
key: mongo-uri
- name: MONGODB_DATABASE
valueFrom:
configMapKeyRef:
name: config
key: mongo-database
- name: PAGE_SIZE
value: "1000"
- name: CHAIN_ID
value: "0"
- name: FROM_DATE
value: ""
- name: TO_DATE
value: ""
- name: TX_TRACKER_URL
value: {{ .TX_TRACKER_URL }}
- name: TX_TRACKER_TIMEOUT
value: "30"
- name: SLEEP_TIME_SECONDS
value: "5"

View File

@ -114,6 +114,14 @@ func Run(db *mongo.Database) error {
return err
}
// create index in globaltransactions collect.
indexGlobalTransactionsByOriginTx := mongo.IndexModel{
Keys: bson.D{{Key: "originTx.from", Value: 1}}}
_, err = db.Collection("globaltransactions").Indexes().CreateOne(context.TODO(), indexGlobalTransactionsByOriginTx)
if err != nil && isNotAlreadyExistsError(err) {
return err
}
return nil
}

View File

@ -5,8 +5,10 @@ import (
"log"
"os"
"strings"
"time"
"github.com/go-redis/redis"
txtrackerProcessVaa "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker"
common "github.com/wormhole-foundation/wormhole-explorer/common/coingecko"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
@ -16,8 +18,10 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/jobs/internal/coingecko"
apiPrices "github.com/wormhole-foundation/wormhole-explorer/jobs/internal/prices"
"github.com/wormhole-foundation/wormhole-explorer/jobs/jobs"
"github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/migration"
"github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/notional"
"github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/report"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
@ -60,6 +64,15 @@ func main() {
historyPrices := initHistoricalPricesJob(context, hCfg, logger)
err = historyPrices.Run(context)
case jobs.JobIDMigrationSourceTx:
mCfg, errCfg := config.NewMigrateSourceTxConfiguration(context)
if errCfg != nil {
log.Fatal("error creating config", errCfg)
}
chainID := sdk.ChainID(mCfg.ChainID)
migrationJob := initMigrateSourceTxJob(context, mCfg, chainID, logger)
err = migrationJob.Run(context)
default:
logger.Fatal("Invalid job id", zap.String("job_id", cfg.JobID))
}
@ -124,6 +137,25 @@ func initHistoricalPricesJob(ctx context.Context, cfg *config.HistoricalPricesCo
return notionalJob
}
func initMigrateSourceTxJob(ctx context.Context, cfg *config.MigrateSourceTxConfiguration, chainID sdk.ChainID, logger *zap.Logger) *migration.MigrateSourceChainTx {
//setup DB connection
db, err := dbutil.Connect(ctx, logger, cfg.MongoURI, cfg.MongoDatabase, false)
if err != nil {
logger.Fatal("Failed to connect MongoDB", zap.Error(err))
}
// init tx tracker api client.
txTrackerAPIClient, err := txtrackerProcessVaa.NewTxTrackerAPIClient(cfg.TxTrackerTimeout, cfg.TxTrackerURL, logger)
if err != nil {
logger.Fatal("Failed to create txtracker api client", zap.Error(err))
}
sleepTime := time.Duration(cfg.SleepTimeSeconds) * time.Second
fromDate, _ := time.Parse(time.RFC3339, cfg.FromDate)
toDate, _ := time.Parse(time.RFC3339, cfg.ToDate)
return migration.NewMigrationSourceChainTx(db.Database, cfg.PageSize, sdk.ChainID(cfg.ChainID), fromDate, toDate, txTrackerAPIClient, sleepTime, logger)
}
func handleExit() {
if r := recover(); r != nil {
if e, ok := r.(exitCode); ok {

View File

@ -45,6 +45,18 @@ type HistoricalPricesConfiguration struct {
PriceDays string `env:"PRICE_DAYS,default=max"`
}
type MigrateSourceTxConfiguration struct {
MongoURI string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
PageSize int `env:"PAGE_SIZE,default=100"`
ChainID int64 `env:"CHAIN_ID,default=0"`
FromDate string `env:"FROM_DATE,required"`
ToDate string `env:"TO_DATE,required"`
TxTrackerURL string `env:"TX_TRACKER_URL,required"`
TxTrackerTimeout int64 `env:"TX_TRACKER_TIMEOUT,default=30"`
SleepTimeSeconds int64 `env:"SLEEP_TIME_SECONDS,default=5"`
}
// New creates a default configuration with the values from .env file and environment variables.
func New(ctx context.Context) (*Configuration, error) {
_ = godotenv.Load(".env", "../.env")
@ -92,3 +104,15 @@ func NewHistoricalPricesConfiguration(ctx context.Context) (*HistoricalPricesCon
return &configuration, nil
}
// New creates a migration source tx configuration with the values from .env file and environment variables.
func NewMigrateSourceTxConfiguration(ctx context.Context) (*MigrateSourceTxConfiguration, error) {
_ = godotenv.Load(".env", "../.env")
var configuration MigrateSourceTxConfiguration
if err := envconfig.Process(ctx, &configuration); err != nil {
return nil, err
}
return &configuration, nil
}

View File

@ -6,10 +6,10 @@ require (
github.com/go-redis/redis v6.15.9+incompatible
github.com/go-resty/resty/v2 v2.10.0
github.com/joho/godotenv v1.5.1
github.com/sethvargo/go-envconfig v0.9.0
github.com/sethvargo/go-envconfig v1.0.0
github.com/shopspring/decimal v1.3.1
github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-20230713181709-0425a89e7533
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229
go.mongodb.org/mongo-driver v1.11.2
go.uber.org/zap v1.24.0
)
@ -25,7 +25,6 @@ require (
github.com/ethereum/go-ethereum v1.10.21 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/holiman/uint256 v1.2.1 // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect

View File

@ -33,8 +33,7 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/holiman/uint256 v1.2.1 h1:XRtyuda/zw2l+Bq/38n5XUoEF72aSOu/77Thd9pPp2o=
github.com/holiman/uint256 v1.2.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
@ -59,8 +58,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE=
github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0=
github.com/sethvargo/go-envconfig v1.0.0 h1:1C66wzy4QrROf5ew4KdVw942CQDa55qmlYmw9FZxZdU=
github.com/sethvargo/go-envconfig v1.0.0/go.mod h1:Lzc75ghUn5ucmcRGIdGQ33DKJrcjk4kihFYgSTBmjIc=
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -70,8 +69,8 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 h1:rrOyHd+H9a6Op1iUyZNCaI5v9D1syq8jDAYyX/2Q4L8=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8/go.mod h1:dE12DOucCq23gjGGGhtbyx41FBxuHxjpPvG+ArO+8t0=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229 h1:fqcC4qwEVaJfcpqUVKi5+imz+JpxviQYPW4qu3zILz4=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229/go.mod h1:pE/jYet19kY4P3V6mE2+01zvEfxdyBqv6L6HsnSa5uc=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=

View File

@ -3,9 +3,10 @@ package jobs
// JobIDNotional is the job id for notional job.
const (
JobIDNotional = "JOB_NOTIONAL_USD"
JobIDTransferReport = "JOB_TRANSFER_REPORT"
JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES"
JobIDNotional = "JOB_NOTIONAL_USD"
JobIDTransferReport = "JOB_TRANSFER_REPORT"
JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES"
JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX"
)
// Job is the interface for jobs.

View File

@ -0,0 +1,383 @@
package migration
import (
"context"
"errors"
"strings"
"sync"
"time"
txtrackerProcessVaa "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// MigrateSourceChainTx is the job to migrate vaa txHash and timestamp from vaa collection to globalTx collection.
type MigrateSourceChainTx struct {
db *mongo.Database
pageSize int
chainID sdk.ChainID
FromDate time.Time
ToDate time.Time
txTrackerAPIClient txtrackerProcessVaa.TxTrackerAPIClient
sleepTime time.Duration
collections struct {
vaas *mongo.Collection
globalTransactions *mongo.Collection
}
logger *zap.Logger
}
// NewMigrationSourceChainTx creates a new migration job.
func NewMigrationSourceChainTx(
db *mongo.Database,
pageSize int,
chainID sdk.ChainID,
FromDate time.Time,
ToDate time.Time,
txTrackerAPIClient txtrackerProcessVaa.TxTrackerAPIClient,
sleepTime time.Duration,
logger *zap.Logger) *MigrateSourceChainTx {
return &MigrateSourceChainTx{
db: db,
pageSize: pageSize,
chainID: chainID,
FromDate: FromDate,
ToDate: ToDate,
txTrackerAPIClient: txTrackerAPIClient,
sleepTime: sleepTime,
collections: struct {
vaas *mongo.Collection
globalTransactions *mongo.Collection
}{
vaas: db.Collection("vaas"),
globalTransactions: db.Collection("globalTransactions"),
},
logger: logger}
}
// VAASourceChain defines the structure of vaa fields needed for migration.
type VAASourceChain struct {
ID string `bson:"_id"`
EmitterChain sdk.ChainID `bson:"emitterChain" json:"emitterChain"`
Timestamp *time.Time `bson:"timestamp" json:"timestamp"`
TxHash *string `bson:"txHash" json:"txHash,omitempty"`
}
// GlobalTransaction represents a global transaction.
type GlobalTransaction struct {
ID string `bson:"_id" json:"id"`
OriginTx *OriginTx `bson:"originTx" json:"originTx"`
}
// OriginTx represents a origin transaction.
type OriginTx struct {
TxHash string `bson:"nativeTxHash" json:"txHash"`
From string `bson:"from" json:"from"`
Status string `bson:"status" json:"status"`
}
func (m *MigrateSourceChainTx) Run(ctx context.Context) error {
if m.chainID == sdk.ChainIDSolana || m.chainID == sdk.ChainIDAptos {
return m.runComplexMigration(ctx)
} else {
return m.runMigration(ctx)
}
}
// runComplexMigration runs the migration job for solana and aptos chains calling the txtracker endpoint.
func (m *MigrateSourceChainTx) runComplexMigration(ctx context.Context) error {
if sdk.ChainIDSolana != m.chainID && sdk.ChainIDAptos != m.chainID {
return errors.New("invalid chainID")
}
var page int64 = 0
for {
// get vaas to migrate by page and pageSize.
vaas, err := m.getVaasToMigrate(ctx, m.chainID, m.FromDate, m.ToDate, page, int64(m.pageSize))
if err != nil {
m.logger.Error("failed to get vaas", zap.Error(err), zap.Int64("page", page))
break
}
if len(vaas) == 0 {
break
}
for _, v := range vaas {
// check if global transaction exists and nested originTx exists
filter := bson.D{
{Key: "_id", Value: v.ID},
{Key: "originTx", Value: bson.D{{Key: "$exists", Value: true}}},
}
var globalTransacations GlobalTransaction
err := m.collections.globalTransactions.FindOne(ctx, filter).Decode(&globalTransacations)
// if global transaction exists, skip
if err == nil {
m.logger.Info("global transaction already exists", zap.String("id", v.ID))
continue
}
// if exist and error getting global transaction, log error
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
m.logger.Error("failed to get global transaction", zap.Error(err), zap.String("id", v.ID))
continue
}
// if not exist txhash, skip
if v.TxHash == nil {
m.logger.Error("txHash is nil", zap.String("id", v.ID))
continue
}
_, err = m.txTrackerAPIClient.Process(v.ID)
if err != nil {
m.logger.Error("failed to process vaa", zap.Error(err), zap.String("id", v.ID))
continue
}
time.Sleep(5 * time.Second)
}
page++
}
return nil
}
// Run runs the migration job.
func (m *MigrateSourceChainTx) runMigration(ctx context.Context) error {
var page int64 = 0
var wg sync.WaitGroup
workerLimit := m.pageSize
jobs := make(chan VAASourceChain, workerLimit)
for i := 1; i <= workerLimit; i++ {
wg.Add(1)
go worker(ctx, &wg, jobs, m.collections.globalTransactions, m.logger)
}
for {
// get vaas to migrate by page and pageSize.
vaas, err := m.getVaasToMigrate(ctx, m.chainID, m.FromDate, m.ToDate, page, int64(m.pageSize))
if err != nil {
m.logger.Error("failed to get vaas", zap.Error(err), zap.Int64("page", page))
break
}
if len(vaas) == 0 {
break
}
for _, v := range vaas {
jobs <- v
}
}
close(jobs)
wg.Wait()
return nil
}
func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan VAASourceChain, collection *mongo.Collection, logger *zap.Logger) {
defer wg.Done()
for v := range jobs {
if v.EmitterChain == sdk.ChainIDSolana || v.EmitterChain == sdk.ChainIDAptos {
logger.Debug("skip migration", zap.String("id", v.ID), zap.String("chain", v.EmitterChain.String()))
continue
}
// check if global transaction exists and nested originTx exists
filter := bson.D{
{Key: "_id", Value: v.ID},
{Key: "originTx", Value: bson.D{{Key: "$exists", Value: true}}},
}
var globalTransacations GlobalTransaction
err := collection.FindOne(ctx, filter).Decode(&globalTransacations)
// if global transaction exists, skip
if err == nil {
logger.Info("global transaction already exists", zap.String("id", v.ID))
continue
}
// if exist and error getting global transaction, log error
if err != nil && !errors.Is(err, mongo.ErrNoDocuments) {
logger.Error("failed to get global transaction", zap.Error(err), zap.String("id", v.ID))
continue
}
// if not exist txhash, skip
if v.TxHash == nil {
logger.Error("txHash is nil", zap.String("id", v.ID))
continue
}
// set txHash format by chain
var txHash string
switch v.EmitterChain {
case sdk.ChainIDAcala,
sdk.ChainIDArbitrum,
sdk.ChainIDAvalanche,
sdk.ChainIDBase,
sdk.ChainIDBSC,
sdk.ChainIDCelo,
sdk.ChainIDEthereum,
sdk.ChainIDFantom,
sdk.ChainIDKarura,
sdk.ChainIDKlaytn,
sdk.ChainIDMoonbeam,
sdk.ChainIDOasis,
sdk.ChainIDOptimism,
sdk.ChainIDPolygon:
txHash = txHashLowerCaseWith0x(*v.TxHash)
default:
txHash = *v.TxHash
}
// update global transaction
update := bson.D{
{Key: "$set", Value: bson.D{{Key: "originTx.timestamp", Value: v.Timestamp}}},
{Key: "$set", Value: bson.D{{Key: "originTx.nativeTxHash", Value: txHash}}},
{Key: "$set", Value: bson.D{{Key: "originTx.status", Value: "confirmed"}}},
}
opts := options.Update().SetUpsert(true)
result, err := collection.UpdateByID(ctx, v.ID, update, opts)
if err != nil {
logger.Error("failed to update global transaction", zap.Error(err), zap.String("id", v.ID))
break
}
if result.UpsertedCount == 1 {
logger.Info("inserted global transaction", zap.String("id", v.ID))
} else {
logger.Debug("global transaction already exists", zap.String("id", v.ID))
}
}
}
func txHashLowerCaseWith0x(v string) string {
if strings.HasPrefix(v, "0x") {
return strings.ToLower(v)
}
return "0x" + strings.ToLower(v)
}
func (m *MigrateSourceChainTx) getVaasToMigrate(ctx context.Context, chainID sdk.ChainID, from time.Time, to time.Time, page int64, pageSize int64) ([]VAASourceChain, error) {
skip := page * pageSize
limit := pageSize
sort := bson.D{{Key: "timestamp", Value: 1}}
// add match step by chain
var matchStage1 bson.D
if chainID != sdk.ChainIDUnset {
if chainID == sdk.ChainIDSolana || chainID == sdk.ChainIDAptos {
return []VAASourceChain{}, errors.New("invalid chainID")
}
matchStage1 = bson.D{{Key: "$match", Value: bson.D{
{Key: "emitterChain", Value: chainID},
}}}
} else {
// get all the vaas without solana and aptos
solanaAndAptosIds := []sdk.ChainID{sdk.ChainIDSolana, sdk.ChainIDAptos}
matchStage1 = bson.D{{Key: "$match", Value: bson.D{
{Key: "emitterChain", Value: bson.M{"$nin": solanaAndAptosIds}},
}}}
}
// add match step by range date
var matchStage2 bson.D
if from.IsZero() && to.IsZero() {
matchStage2 = bson.D{{Key: "$match", Value: bson.D{}}}
}
if from.IsZero() && !to.IsZero() {
matchStage2 = bson.D{{Key: "$match", Value: bson.D{
{Key: "timestamp", Value: bson.M{
"$lt": to,
}},
}}}
}
if !from.IsZero() && to.IsZero() {
matchStage2 = bson.D{{Key: "$match", Value: bson.D{
{Key: "timestamp", Value: bson.M{
"$gte": from,
}},
}}}
}
if !from.IsZero() && !to.IsZero() {
matchStage2 = bson.D{{Key: "$match", Value: bson.D{
{Key: "timestamp", Value: bson.M{
"$gte": from,
"$lt": to,
}},
}}}
}
// add match step that txHash exists
var matchStage3 bson.D
matchStage3 = bson.D{{Key: "$match", Value: bson.D{
{Key: "txHash", Value: bson.D{{Key: "$exists", Value: true}}},
}}}
// add lookup step with globalTransactions collection
lookupStage := bson.D{{Key: "$lookup", Value: bson.D{
{Key: "from", Value: "globalTransactions"},
{Key: "localField", Value: "_id"},
{Key: "foreignField", Value: "_id"},
{Key: "as", Value: "globalTransactions"},
}}}
matchStage4 := bson.D{{Key: "$match", Value: bson.D{
{Key: "globalTransactions.originTx", Value: bson.D{{Key: "$exists", Value: false}}},
}}}
// add project step
projectStage := bson.D{{Key: "$project", Value: bson.D{
{Key: "_id", Value: 1},
{Key: "emitterChain", Value: 1},
{Key: "timestamp", Value: 1},
{Key: "txHash", Value: 1},
}}}
// add skip step
skipStage := bson.D{{Key: "$skip", Value: skip}}
// add limit step
limitStage := bson.D{{Key: "$limit", Value: limit}}
// add sort step
sortStage := bson.D{{Key: "$sort", Value: sort}}
// define pipeline
pipeline := mongo.Pipeline{
matchStage1,
matchStage2,
matchStage3,
lookupStage,
matchStage4,
projectStage,
skipStage,
limitStage,
sortStage,
}
// find vaas
cur, err := m.collections.vaas.Aggregate(ctx, pipeline)
if err != nil {
return []VAASourceChain{}, err
}
// decode vaas
vaas := make([]VAASourceChain, pageSize)
if err := cur.All(ctx, &vaas); err != nil {
return []VAASourceChain{}, err
}
return vaas, nil
}

View File

@ -17,8 +17,8 @@ type Controller struct {
vaaRepository *Repository
repository *consumer.Repository
rpcProviderSettings *config.RpcProviderSettings
p2pNetwork string
metrics metrics.Metrics
p2pNetwork string
}
// NewController creates a Controller instance.