From 1f47f887da653e455fcb0b3c76e69514241118d5 Mon Sep 17 00:00:00 2001 From: agodnic Date: Wed, 5 Apr 2023 10:33:28 -0300 Subject: [PATCH] [API/TX-TRACKER] Add support for Solana tx hashes (#218) ### Summary This pull request adds the functionality to search for VAAs using Solana tx hashes, e.g.: `GET /api/v1/vaas?txHash=2qnNNBQQQ152sGkQ1Rnq2Kdr48LvdN4oXwnTWyEmJqYYWFusLWxbwhpGS5KTXH1bKKx9gg9BLZMKd3g1cXiy7QMz`. Also, the VAA objects returned by the API now have a new field `nativeTxHash`, which contains the tx hash that generated the VAA in the native format of the emitter chain. Tracking issues: https://github.com/wormhole-foundation/wormhole-explorer/issues/212, https://github.com/wormhole-foundation/wormhole-explorer/issues/214 ### Testing guidelines Endpoints affected: * `GET /api/v1/vaas` now supports Solana tx hashes by using the `?txHash=` query parameter. This search is limited to VAAs created by the portal token bridge. * Some VAAs returned by the API should have a new field `nativeTxHash`. All VAAs related to the portal token bridge should have this field set. * An internal refactoring affected pretty much all routes under `GET /api/v1/vaas*`, which should be re-tested. * In the response of `/api/v1/global-tx/`, the field `originTx.signer` has been renamed to `originTx.from` --- api/handlers/transactions/model.go | 4 +- api/handlers/vaa/model.go | 2 + api/handlers/vaa/repository.go | 182 ++++++++++-------- api/handlers/vaa/service.go | 61 +++--- api/middleware/extract_parameters.go | 16 +- api/types/tx_hash.go | 102 ++++++++++ api/types/tx_hash_test.go | 88 +++++++++ .../tx-tracker-backfiller-job.yaml | 6 + .../chains/{ankr.go => ankrAdvancedApi.go} | 0 tx-tracker/chains/eth.go | 4 + tx-tracker/chains/solanaRpc.go | 5 +- tx-tracker/cmd/backfiller/README.md | 7 + tx-tracker/cmd/backfiller/main.go | 86 ++++++++- tx-tracker/config/structs.go | 18 ++ tx-tracker/consumer/repository.go | 139 ++++++++++++- 15 files changed, 582 insertions(+), 138 deletions(-) create mode 100644 api/types/tx_hash.go create mode 100644 api/types/tx_hash_test.go rename tx-tracker/chains/{ankr.go => ankrAdvancedApi.go} (100%) create mode 100644 tx-tracker/cmd/backfiller/README.md diff --git a/api/handlers/transactions/model.go b/api/handlers/transactions/model.go index 2bc6a22a..fd856d82 100644 --- a/api/handlers/transactions/model.go +++ b/api/handlers/transactions/model.go @@ -16,10 +16,10 @@ type GlobalTransactionDoc struct { // OriginTx representa a origin transaction. type OriginTx struct { ChainID vaa.ChainID `bson:"chainId" json:"chainId"` - TxHash string `bson:"txHash" json:"txHash"` + TxHash string `bson:"nativeTxHash" json:"txHash"` Status string `bson:"status" json:"status"` Timestamp *time.Time `bson:"timestamp" json:"timestamp"` - Signer *string `bson:"signer" json:"signer"` + From *string `bson:"signer" json:"from"` } // DestinationTx representa a destination transaction. diff --git a/api/handlers/vaa/model.go b/api/handlers/vaa/model.go index 2e947bb7..7610b222 100644 --- a/api/handlers/vaa/model.go +++ b/api/handlers/vaa/model.go @@ -31,6 +31,8 @@ type VaaDoc struct { AppId string `bson:"appId" json:"appId,omitempty"` // Payload is an extension field - it is not present in the guardian API. Payload map[string]interface{} `bson:"payload" json:"payload,omitempty"` + // NativeTxHash is an extension field - it is not present in the guardian API. + NativeTxHash string `bson:"nativeTxHash" json:"nativeTxHash,omitempty"` } // MarshalJSON interface implementation. diff --git a/api/handlers/vaa/repository.go b/api/handlers/vaa/repository.go index cf0c81d8..4f8ef428 100644 --- a/api/handlers/vaa/repository.go +++ b/api/handlers/vaa/repository.go @@ -5,7 +5,7 @@ import ( "fmt" "github.com/pkg/errors" - errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" + "github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions" "github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.mongodb.org/mongo-driver/bson" @@ -19,10 +19,11 @@ type Repository struct { db *mongo.Database logger *zap.Logger collections struct { - vaas *mongo.Collection - vaasPythnet *mongo.Collection - invalidVaas *mongo.Collection - vaaCount *mongo.Collection + vaas *mongo.Collection + vaasPythnet *mongo.Collection + invalidVaas *mongo.Collection + vaaCount *mongo.Collection + globalTransactions *mongo.Collection } } @@ -31,89 +32,73 @@ func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository { return &Repository{db: db, logger: logger.With(zap.String("module", "VaaRepository")), collections: struct { - vaas *mongo.Collection - vaasPythnet *mongo.Collection - invalidVaas *mongo.Collection - vaaCount *mongo.Collection - }{vaas: db.Collection("vaas"), vaasPythnet: db.Collection("vaasPythnet"), invalidVaas: db.Collection("invalid_vaas"), - vaaCount: db.Collection("vaaCounts")}} + vaas *mongo.Collection + vaasPythnet *mongo.Collection + invalidVaas *mongo.Collection + vaaCount *mongo.Collection + globalTransactions *mongo.Collection + }{ + vaas: db.Collection("vaas"), + vaasPythnet: db.Collection("vaasPythnet"), + invalidVaas: db.Collection("invalid_vaas"), + vaaCount: db.Collection("vaaCounts"), + globalTransactions: db.Collection("globalTransactions"), + }, + } } -// Find searches the database for VAAs. -// The input parameter [q *VaaQuery] define the filters to apply in the query. -func (r *Repository) Find(ctx context.Context, q *VaaQuery) ([]*VaaDoc, error) { +// FindVaasBySolanaTxHash searches the database for VAAs that match a given Solana transaction hash. +func (r *Repository) FindVaasBySolanaTxHash( + ctx context.Context, + txHash string, + includeParsedPayload bool, +) ([]*VaaDoc, error) { - var err error - var cur *mongo.Cursor - if q.chainId == vaa.ChainIDPythNet { - cur, err = r.collections.vaasPythnet.Find(ctx, q.toBSON(), q.findOptions()) - } else { - cur, err = r.collections.vaas.Find(ctx, q.toBSON(), q.findOptions()) - } + // Find globalTransactions that match the given Solana TxHash + cur, err := r.collections.globalTransactions.Find( + ctx, + bson.D{bson.E{"originTx.nativeTxHash", txHash}}, + nil, + ) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) - r.logger.Error("failed execute Find command to get vaas", - zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID)) + r.logger.Error("failed to find globalTransactions by Solana TxHash", + zap.Error(err), + zap.String("requestID", requestID), + ) return nil, errors.WithStack(err) } - var vaas []*VaaDoc - err = cur.All(ctx, &vaas) + // Read results from cursor + var globalTxs []transactions.GlobalTransactionDoc + err = cur.All(ctx, &globalTxs) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) - r.logger.Error("failed decoding cursor to []*VaaDoc", zap.Error(err), zap.Any("q", q), - zap.String("requestID", requestID)) + r.logger.Error("failed to decode cursor to []GlobalTransactionDoc", + zap.Error(err), + zap.String("requestID", requestID), + ) return nil, errors.WithStack(err) } - // Clear the `Payload` and `AppId` fields. - // (this function doesn't return those fields currently, but may do so in the future by adding new parameters) - for i := range vaas { - vaas[i].Payload = nil - vaas[i].AppId = "" - } - // If no results were found, return an empty slice instead of nil. - if vaas == nil { - vaas = make([]*VaaDoc, 0) + if len(globalTxs) == 0 { + result := make([]*VaaDoc, 0) + return result, nil + } + if len(globalTxs) > 1 { + return nil, fmt.Errorf("expected at most one transaction, but found %d", len(globalTxs)) } - return vaas, err + // Find VAAs that match the given VAA ID + q := Query(). + SetID(globalTxs[0].ID). + IncludeParsedPayload(includeParsedPayload) + return r.FindVaas(ctx, q) } -// FindOne get *VaaDoc. -// The input parameter [q *VaaQuery] define the filters to apply in the query. -func (r *Repository) FindOne(ctx context.Context, q *VaaQuery) (*VaaDoc, error) { - - var vaaDoc VaaDoc - var err error - - if q.chainId == vaa.ChainIDPythNet { - err = r.collections.vaasPythnet.FindOne(ctx, q.toBSON()).Decode(&vaaDoc) - } else { - err = r.collections.vaas.FindOne(ctx, q.toBSON()).Decode(&vaaDoc) - } - if err != nil { - if errors.Is(err, mongo.ErrNoDocuments) { - return nil, errs.ErrNotFound - } - requestID := fmt.Sprintf("%v", ctx.Value("requestid")) - r.logger.Error("failed execute FindOne command to get vaas", - zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID)) - return nil, errors.WithStack(err) - } - - // Clear the `Payload` and `AppId` fields. - // (this function doesn't return those fields currently, but may do so in the future by adding new parameters) - vaaDoc.Payload = nil - vaaDoc.AppId = "" - - return &vaaDoc, err -} - -// FindVaasWithPayload returns VAAs that include a parsed payload. -// The input parameter `q` defines the filters to be applied in the query. -func (r *Repository) FindVaasWithPayload( +// FindVaas searches the database for VAAs matching the given filters. +func (r *Repository) FindVaas( ctx context.Context, q *VaaQuery, ) ([]*VaaDoc, error) { @@ -126,6 +111,13 @@ func (r *Repository) FindVaasWithPayload( {"$sort", bson.D{bson.E{q.SortBy, q.GetSortInt()}}}, }) + // filter by _id + if q.id != "" { + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{bson.E{"_id", q.id}}}, + }) + } + // filter by emitterChain if q.chainId != 0 { pipeline = append(pipeline, bson.D{ @@ -172,6 +164,23 @@ func (r *Repository) FindVaasWithPayload( }}, }) + // left outer join on the `globalTransaction` collection + pipeline = append(pipeline, bson.D{ + {"$lookup", bson.D{ + {"from", "globalTransactions"}, + {"localField", "_id"}, + {"foreignField", "_id"}, + {"as", "globalTransaction"}, + }}, + }) + + // add globalTransaction fields + pipeline = append(pipeline, bson.D{ + {"$addFields", bson.D{ + {"nativeTxHash", bson.M{"$arrayElemAt": []interface{}{"$globalTransaction.originTx.nativeTxHash", 0}}}, + }}, + }) + // filter by appId if q.appId != "" { pipeline = append(pipeline, bson.D{ @@ -225,6 +234,13 @@ func (r *Repository) FindVaasWithPayload( vaasWithPayload = make([]*VaaDoc, 0) } + // If the payload field was not requested, remove it from the results. + if !q.includeParsedPayload && q.appId == "" { + for i := range vaasWithPayload { + vaasWithPayload[i].Payload = nil + } + } + return vaasWithPayload, nil } @@ -252,11 +268,13 @@ func (r *Repository) GetVaaCount(ctx context.Context, q *VaaQuery) ([]*VaaStats, // VaaQuery respresent a query for the vaa mongodb document. type VaaQuery struct { pagination.Pagination - chainId vaa.ChainID - emitter string - sequence string - txHash string - appId string + id string + chainId vaa.ChainID + emitter string + sequence string + txHash string + appId string + includeParsedPayload bool } // Query create a new VaaQuery with default pagination vaues. @@ -265,6 +283,12 @@ func Query() *VaaQuery { return &VaaQuery{Pagination: *p} } +// SetChain sets the id field of the VaaQuery struct. +func (q *VaaQuery) SetID(id string) *VaaQuery { + q.id = id + return q +} + // SetChain set the chainId field of the VaaQuery struct. func (q *VaaQuery) SetChain(chainID vaa.ChainID) *VaaQuery { q.chainId = chainID @@ -300,8 +324,16 @@ func (q *VaaQuery) SetAppId(appId string) *VaaQuery { return q } +func (q *VaaQuery) IncludeParsedPayload(val bool) *VaaQuery { + q.includeParsedPayload = val + return q +} + func (q *VaaQuery) toBSON() *bson.D { r := bson.D{} + if q.id != "" { + r = append(r, bson.E{"_id", q.id}) + } if q.chainId > 0 { r = append(r, bson.E{"emitterChain", q.chainId}) } diff --git a/api/handlers/vaa/service.go b/api/handlers/vaa/service.go index 856af01a..de01c49a 100644 --- a/api/handlers/vaa/service.go +++ b/api/handlers/vaa/service.go @@ -30,7 +30,7 @@ func NewService(r *Repository, getCacheFunc cache.CacheGetFunc, logger *zap.Logg // FindAllParams passes input data to the function `FindAll`. type FindAllParams struct { Pagination *pagination.Pagination - TxHash *vaa.Address + TxHash *types.TxHash IncludeParsedPayload bool AppId string } @@ -42,7 +42,8 @@ func (s *Service) FindAll( ) (*response.Response[[]*VaaDoc], error) { // set up query parameters - query := Query() + query := Query(). + IncludeParsedPayload(params.IncludeParsedPayload) if params.Pagination != nil { query.SetPagination(params.Pagination) } @@ -56,10 +57,10 @@ func (s *Service) FindAll( // execute the database query var err error var vaas []*VaaDoc - if params.IncludeParsedPayload { - vaas, err = s.repo.FindVaasWithPayload(ctx, query) + if params.TxHash != nil && params.TxHash.IsSolanaTxHash() { + vaas, err = s.repo.FindVaasBySolanaTxHash(ctx, params.TxHash.String(), params.IncludeParsedPayload) } else { - vaas, err = s.repo.Find(ctx, query) + vaas, err = s.repo.FindVaas(ctx, query) } if err != nil { return nil, err @@ -79,9 +80,10 @@ func (s *Service) FindByChain( query := Query(). SetChain(chain). - SetPagination(p) + SetPagination(p). + IncludeParsedPayload(false) - vaas, err := s.repo.Find(ctx, query) + vaas, err := s.repo.FindVaas(ctx, query) res := response.Response[[]*VaaDoc]{Data: vaas} return &res, err @@ -98,9 +100,10 @@ func (s *Service) FindByEmitter( query := Query(). SetChain(chain). SetEmitter(emitter.Hex()). - SetPagination(p) + SetPagination(p). + IncludeParsedPayload(false) - vaas, err := s.repo.Find(ctx, query) + vaas, err := s.repo.FindVaas(ctx, query) res := response.Response[[]*VaaDoc]{Data: vaas} return &res, err @@ -122,13 +125,7 @@ func (s *Service) FindById( } // execute the database query - var err error - var vaa *VaaDoc - if includeParsedPayload { - vaa, err = s.findByIdWithPayload(ctx, chain, emitter, seq) - } else { - vaa, err = s.findById(ctx, chain, emitter, seq) - } + vaa, err := s.findById(ctx, chain, emitter, seq, includeParsedPayload) if err != nil { return &response.Response[*VaaDoc]{}, err } @@ -144,39 +141,33 @@ func (s *Service) findById( chain vaa.ChainID, emitter *types.Address, seq string, + includeParsedPayload bool, ) (*VaaDoc, error) { + // query matching documents from the database query := Query(). SetChain(chain). SetEmitter(emitter.Hex()). - SetSequence(seq) - - return s.repo.FindOne(ctx, query) -} - -// findByIdWithPayload get a vaa with payload data by chainID, emitter address and sequence number. -func (s *Service) findByIdWithPayload(ctx context.Context, chain vaa.ChainID, emitter *types.Address, seq string) (*VaaDoc, error) { - - query := Query(). - SetChain(chain). - SetEmitter(emitter.Hex()). - SetSequence(seq) - - vaas, err := s.repo.FindVaasWithPayload(ctx, query) + SetSequence(seq). + IncludeParsedPayload(includeParsedPayload) + docs, err := s.repo.FindVaas(ctx, query) if err != nil { return nil, err - } else if len(vaas) == 0 { + } + + // we're expecting exactly one document + if len(docs) == 0 { return nil, errs.ErrNotFound - } else if len(vaas) > 1 { + } + if len(docs) > 1 { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) s.logger.Error("can not get more that one vaa by chainID/address/sequence", zap.Any("q", query), - zap.String("requestID", requestID), - ) + zap.String("requestID", requestID)) return nil, errs.ErrInternalError } - return vaas[0], nil + return docs[0], nil } // GetVaaCount get a list a list of vaa count grouped by chainID. diff --git a/api/middleware/extract_parameters.go b/api/middleware/extract_parameters.go index 1dcd1a68..ca6e3e1c 100644 --- a/api/middleware/extract_parameters.go +++ b/api/middleware/extract_parameters.go @@ -188,26 +188,26 @@ func ExtractObservationHash(c *fiber.Ctx, l *zap.Logger) (string, error) { return hash, nil } -// GetTxHash get txHash parameter from query param. -func GetTxHash(c *fiber.Ctx, l *zap.Logger) (*sdk.Address, error) { +// GetTxHash parses the `txHash` parameter from query params. +func GetTxHash(c *fiber.Ctx, l *zap.Logger) (*types.TxHash, error) { - txHash := c.Query("txHash") - if txHash == "" { + value := c.Query("txHash") + if value == "" { return nil, nil } - txHashAddr, err := sdk.StringToAddress(txHash) + txHash, err := types.ParseTxHash(value) if err != nil { requestID := fmt.Sprintf("%v", c.Locals("requestid")) - l.Error("failed to covert txHash to address", + l.Error("failed to parse txHash", zap.Error(err), - zap.String("txHash", txHash), + zap.String("txHash", value), zap.String("requestID", requestID), ) return nil, response.NewInvalidParamError(c, "MALFORMED TX HASH", errors.WithStack(err)) } - return &txHashAddr, nil + return txHash, nil } // ExtractParsedPayload get parsedPayload query parameter. diff --git a/api/types/tx_hash.go b/api/types/tx_hash.go new file mode 100644 index 00000000..9c98dde6 --- /dev/null +++ b/api/types/tx_hash.go @@ -0,0 +1,102 @@ +package types + +import ( + "encoding/hex" + "fmt" + "strings" + + "github.com/mr-tron/base58" +) + +const solanaTxHashLen = 88 +const wormholeMinTxHashLen = 64 +const wormholeMaxTxHashLen = 66 + +// TxHash represents a transaction hash passed by query params. +type TxHash struct { + hash string + isWormhole bool + isSolana bool +} + +// ParseTxHash parses a transaction hash from a string. +// +// The transaction hash can be provided in different formats, +// depending on the blockchain it belongs to: +// * Solana: 64 bytes, encoded as base58. +// * All other chains: 32 bytes, encoded as hex. +// +// More cases could be added in the future as needed. +func ParseTxHash(value string) (*TxHash, error) { + + // Solana txHashes are 64 bytes long, encoded as base58. + if len(value) == solanaTxHashLen { + return parseSolanaTxHash(value) + } + + // Wormhole txHashes are 32 bytes long, encoded as hex. + // Optionally, they can be prefixed with "0x" or "0X". + if len(value) >= wormholeMinTxHashLen && len(value) <= wormholeMaxTxHashLen { + return parseWormholeTxHash(value) + } + + return nil, fmt.Errorf("invalid txHash length: %d", len(value)) +} + +func parseSolanaTxHash(value string) (*TxHash, error) { + + // Decode the string from base58 to binary + bytes, err := base58.Decode(value) + if err != nil { + return nil, fmt.Errorf("failed to decode txHash from base58: %w", err) + } + + // Make sure we have the expected amount of bytes + if len(bytes) != 64 { + return nil, fmt.Errorf("solana txHash must be exactly 64 bytes, but got %d bytes", len(bytes)) + } + + // Populate the result struct and return + result := TxHash{ + hash: base58.Encode(bytes), + isSolana: true, + } + return &result, nil +} + +func parseWormholeTxHash(value string) (*TxHash, error) { + + // Trim any preceding "0x" to the address + value = strings.TrimPrefix(value, "0x") + value = strings.TrimPrefix(value, "0X") + + // Decode the string from hex to binary + bytes, err := hex.DecodeString(value) + if err != nil { + return nil, fmt.Errorf("failed to decode txHash from hex: %w", err) + } + + // Make sure we have the expected amount of bytes + if len(bytes) != 32 { + return nil, fmt.Errorf("wormhole txHash must be exactly 32 bytes, but got %d bytes", len(bytes)) + } + + // Populate the result struct and return + result := TxHash{ + hash: hex.EncodeToString(bytes), + isWormhole: true, + } + return &result, nil +} + +func (h *TxHash) IsSolanaTxHash() bool { + return h.isSolana +} + +func (h *TxHash) IsWormholeTxHash() bool { + return h.isWormhole +} + +func (h *TxHash) String() string { + return h.hash +} diff --git a/api/types/tx_hash_test.go b/api/types/tx_hash_test.go new file mode 100644 index 00000000..a3d7b5b1 --- /dev/null +++ b/api/types/tx_hash_test.go @@ -0,0 +1,88 @@ +package types + +import "testing" + +// TestParseTxHash tests the ParseTxHash function. +func TestParseTxHash(t *testing.T) { + + // a table containing several test cases + tcs := []struct { + input string + output string + isSolanaTxHash bool + isWormholeTxHash bool + }{ + { + // Solana hash + input: "2maR6uDZzroV7JFF76rp5QR4CFP1PFUe76VRE8gF8QtWRifpGAKJQo4SQDBNs3TAM9RrchJhnJ644jUL2yfagZco", + output: "2maR6uDZzroV7JFF76rp5QR4CFP1PFUe76VRE8gF8QtWRifpGAKJQo4SQDBNs3TAM9RrchJhnJ644jUL2yfagZco", + isSolanaTxHash: true, + }, + { + // Solana hash w/ invalid length + input: "2maR6uDZzroV7JFF76rp5QR4CFP1PFUe76VRE8gF8QtWRifpGAKJQo4SQDBNs3TAM9RrchJhnJ644jUL2yfagZc", + }, + { + // Solana hash w/ invalid length + input: "2maR6uDZzroV7JFF76rp5QR4CFP1PFUe76VRE8gF8QtWRifpGAKJQo4SQDBNs3TAM9RrchJhnJ644jUL2yfagZco2", + }, + { + // Wormhole hash with 0x prefix + input: "0x3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637", + output: "3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637", + isWormholeTxHash: true, + }, + { + // Wormhole hash with 0X prefix + input: "0X3F77F8B44F35FF047A74EE8235CE007AFBAB357D4E30010D51B6F6990F921637", + output: "3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637", + isWormholeTxHash: true, + }, + { + // Wormhole hash with no prefix + input: "3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637", + output: "3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637", + isWormholeTxHash: true, + }, + { + // Wormhole hash w/ indalid length + input: "33f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637", + output: "", + }, + { + // A bunch of random characters + input: "434234i32042oiu08d8sauf0suif", + output: "", + }, + } + + // run each test case in the table + for i := range tcs { + tc := tcs[i] + + // try to parse the hash + txHash, err := ParseTxHash(tc.input) + if tc.output == "" && err == nil { + t.Fatalf("expected parseTxHash(%s) to fail", tc.input) + } else if tc.output != "" && err != nil { + t.Fatalf("parseTxHash(%s) failed with error %v", tc.input, err) + } + + if tc.output == "" { + continue + } + + // make assertions about the output struct + if tc.output != txHash.String() { + t.Fatalf("expected TxHash.String()=%s, got %s", tc.output, txHash.String()) + } + if tc.isSolanaTxHash != txHash.IsSolanaTxHash() { + t.Fatalf("expected TxHash.IsSolanaHash()=%t, but got %t", tc.isSolanaTxHash, txHash.IsSolanaTxHash()) + } + if tc.isWormholeTxHash != txHash.IsWormholeTxHash() { + t.Fatalf("expected TxHash.IsWormholeHash()=%t, but got %t", tc.isWormholeTxHash, txHash.IsWormholeTxHash()) + } + + } + +} diff --git a/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml b/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml index 3d3331d1..46127e52 100644 --- a/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml +++ b/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml @@ -101,6 +101,12 @@ spec: value: "20" - name: BULK_SIZE value: "500" + - name: STRATEGY_NAME + value: "{{ .STRATEGY_NAME }}" + - name: STRATEGY_TIMESTAMP_AFTER + value: "{{ .STRATEGY_TIMESTAMP_AFTER }}" + - name: STRATEGY_TIMESTAMP_BEFORE + value: "{{ .STRATEGY_TIMESTAMP_BEFORE }}" resources: limits: memory: {{ .RESOURCES_LIMITS_MEMORY }} diff --git a/tx-tracker/chains/ankr.go b/tx-tracker/chains/ankrAdvancedApi.go similarity index 100% rename from tx-tracker/chains/ankr.go rename to tx-tracker/chains/ankrAdvancedApi.go diff --git a/tx-tracker/chains/eth.go b/tx-tracker/chains/eth.go index f77ebc5a..54481402 100644 --- a/tx-tracker/chains/eth.go +++ b/tx-tracker/chains/eth.go @@ -2,6 +2,7 @@ package chains import ( "context" + "errors" "fmt" "strings" @@ -46,6 +47,9 @@ func fetchEthTx( if err != nil { return nil, fmt.Errorf("failed to get tx by hash: %w", err) } + if txReply.BlockHash == "" || txReply.From == "" { + return nil, errors.New("received empty response from the RPC service") + } // query block data blkParams := []interface{}{ diff --git a/tx-tracker/chains/solanaRpc.go b/tx-tracker/chains/solanaRpc.go index 8864f7ef..d768901f 100644 --- a/tx-tracker/chains/solanaRpc.go +++ b/tx-tracker/chains/solanaRpc.go @@ -87,7 +87,10 @@ func fetchSolanaTx( if err != nil { return nil, fmt.Errorf("failed to get signatures for account: %w (%+v)", err, err) } - if len(sigs) != 1 { + if len(sigs) == 0 { + return nil, ErrTransactionNotFound + } + if len(sigs) > 1 { return nil, fmt.Errorf("expected exactly one signature, but found %d", len(sigs)) } diff --git a/tx-tracker/cmd/backfiller/README.md b/tx-tracker/cmd/backfiller/README.md new file mode 100644 index 00000000..6070ccf2 --- /dev/null +++ b/tx-tracker/cmd/backfiller/README.md @@ -0,0 +1,7 @@ +# Backfiller + +Reprocess all VAAs in a specified time range: +`STRATEGY_NAME=time_range STRATEGY_TIMESTAMP_AFTER=2023-01-01T00:00:00.000Z STRATEGY_TIMESTAMP_BEFORE=2023-04-01T00:00:00.000Z ./backfiller` + +Reprocess only VAAs that failed due to internal errors: +`STRATEGY_NAME=reprocess_failed ./backfiller` \ No newline at end of file diff --git a/tx-tracker/cmd/backfiller/main.go b/tx-tracker/cmd/backfiller/main.go index f42e188a..6ae296b9 100644 --- a/tx-tracker/cmd/backfiller/main.go +++ b/tx-tracker/cmd/backfiller/main.go @@ -9,6 +9,7 @@ import ( "sync" "sync/atomic" "syscall" + "time" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" @@ -19,6 +20,8 @@ import ( "go.uber.org/zap" ) +const layoutRcf3339 = "2006-01-02T15:04:05.000Z" + func makeLogger(logger *zap.Logger, name string) *zap.Logger { rightPadding := fmt.Sprintf("%-10s", name) @@ -69,8 +72,13 @@ func main() { defer cli.Disconnect(rootCtx) repository := consumer.NewRepository(rootLogger, cli.Database(cfg.MongodbDatabase)) + strategyCallbacks, err := parseStrategyCallbacks(mainLogger, cfg, repository) + if err != nil { + log.Fatal("Failed to parse strategy callbacks: ", err) + } + // Count the number of documents to process - totalDocuments, err := repository.CountIncompleteDocuments(rootCtx) + totalDocuments, err := strategyCallbacks.countFn(rootCtx) if err != nil { log.Fatal("Closing - failed to count number of global transactions: ", err) } @@ -81,10 +89,11 @@ func main() { // The producer sends tasks to the workers via a buffered channel. queue := make(chan consumer.GlobalTransaction, cfg.BulkSize) p := producerParams{ - logger: makeLogger(rootLogger, "producer"), - repository: repository, - queueTx: queue, - bulkSize: cfg.BulkSize, + logger: makeLogger(rootLogger, "producer"), + repository: repository, + queueTx: queue, + bulkSize: cfg.BulkSize, + strategyCallbacks: strategyCallbacks, } go produce(rootCtx, &p) @@ -112,12 +121,69 @@ func main() { mainLogger.Info("Closing main goroutine") } +func parseStrategyCallbacks( + logger *zap.Logger, + cfg *config.BackfillerSettings, + r *consumer.Repository, +) (*strategyCallbacks, error) { + + switch cfg.Strategy.Name { + + case config.BackfillerStrategyReprocessFailed: + cb := strategyCallbacks{ + countFn: r.CountIncompleteDocuments, + iteratorFn: r.GetIncompleteDocuments, + } + + logger.Info("backfilling incomplete documents") + + return &cb, nil + + case config.BackfillerStrategyTimeRange: + + timestampAfter, err := time.Parse(layoutRcf3339, cfg.Strategy.TimestampAfter) + if err != nil { + return nil, fmt.Errorf("failed to parse timestampAfter: %w", err) + } + timestampBefore, err := time.Parse(layoutRcf3339, cfg.Strategy.TimestampBefore) + if err != nil { + return nil, fmt.Errorf("failed to parse timestampBefore: %w", err) + } + + cb := strategyCallbacks{ + countFn: func(ctx context.Context) (uint64, error) { + return r.CountDocumentsByTimeRange(ctx, timestampAfter, timestampBefore) + }, + iteratorFn: func(ctx context.Context, maxId string, limit uint) ([]consumer.GlobalTransaction, error) { + return r.GetDocumentsByTimeRange(ctx, maxId, limit, timestampAfter, timestampBefore) + }, + } + + logger.Info("backfilling by time range", + zap.Time("after", timestampAfter), + zap.Time("before", timestampBefore), + ) + + return &cb, nil + + default: + return nil, fmt.Errorf("unknown strategy: %s", cfg.Strategy.Name) + } + +} + +type strategyCallbacks struct { + countFn func(ctx context.Context) (uint64, error) + iteratorFn func(ctx context.Context, maxId string, limit uint) ([]consumer.GlobalTransaction, error) +} + // producerParams contains the parameters for the producer goroutine. type producerParams struct { - logger *zap.Logger - repository *consumer.Repository - queueTx chan<- consumer.GlobalTransaction - bulkSize uint + logger *zap.Logger + repository *consumer.Repository + queueTx chan<- consumer.GlobalTransaction + bulkSize uint + strategyCallbacks *strategyCallbacks } // produce reads VAA IDs from the database, and sends them through a channel for the workers to consume. @@ -134,7 +200,7 @@ func produce(ctx context.Context, params *producerParams) { for { // Get a batch of VAA IDs from the database - globalTxs, err := params.repository.GetIncompleteDocuments(ctx, maxId, params.bulkSize) + globalTxs, err := params.strategyCallbacks.iteratorFn(ctx, maxId, params.bulkSize) if err != nil { params.logger.Error("Closing: failed to read from cursor", zap.Error(err)) return diff --git a/tx-tracker/config/structs.go b/tx-tracker/config/structs.go index a750a595..82021165 100644 --- a/tx-tracker/config/structs.go +++ b/tx-tracker/config/structs.go @@ -7,11 +7,29 @@ import ( "github.com/kelseyhightower/envconfig" ) +type BackfillingStrategy string + +const ( + // StrategyReprocessAll will reprocess documents in the `globalTransactions` + // collection that don't have the `sourceTx` field set, or that have the + // `sourceTx.status` field set to "internalError". + BackfillerStrategyReprocessFailed BackfillingStrategy = "reprocess_failed" + // BackfillerStrategyTimeRange will reprocess all VAAs that have a timestamp between the specified range. + BackfillerStrategyTimeRange BackfillingStrategy = "time_range" +) + type BackfillerSettings struct { LogLevel string `split_words:"true" default:"INFO"` NumWorkers uint `split_words:"true" required:"true"` BulkSize uint `split_words:"true" required:"true"` + // Strategy determines which VAAs will be affected by the backfiller. + Strategy struct { + Name BackfillingStrategy `split_words:"true" required:"true"` + TimestampAfter string `split_words:"true" required:"false"` + TimestampBefore string `split_words:"true" required:"false"` + } + VaaPayloadParserSettings MongodbSettings RpcProviderSettings diff --git a/tx-tracker/consumer/repository.go b/tx-tracker/consumer/repository.go index 4fb094e0..d6b13c49 100644 --- a/tx-tracker/consumer/repository.go +++ b/tx-tracker/consumer/repository.go @@ -3,6 +3,7 @@ package consumer import ( "context" "fmt" + "time" "github.com/pkg/errors" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa" @@ -19,6 +20,7 @@ import ( type Repository struct { logger *zap.Logger globalTransactions *mongo.Collection + vaas *mongo.Collection } // New creates a new repository. @@ -27,6 +29,7 @@ func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository { r := Repository{ logger: logger, globalTransactions: db.Collection("globalTransactions"), + vaas: db.Collection("vaas"), } return &r @@ -52,10 +55,7 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP if params.TxDetail != nil { fields = append(fields, primitive.E{Key: "timestamp", Value: params.TxDetail.Timestamp}) fields = append(fields, primitive.E{Key: "signer", Value: params.TxDetail.Signer}) - - // It is still to be defined whether we want to expose this field to the API consumers, - // since it can be obtained from the original TxHash. - //fields = append(fields, primitive.E{Key: "nativeTxHash", Value: txDetail.NativeTxHash}) + fields = append(fields, primitive.E{Key: "nativeTxHash", Value: params.TxDetail.NativeTxHash}) } update := bson.D{ @@ -80,6 +80,61 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP return nil } +// CountDocumentsByTimeRange returns the number of documents that match the given time range. +func (r *Repository) CountDocumentsByTimeRange( + ctx context.Context, + timeAfter time.Time, + timeBefore time.Time, +) (uint64, error) { + + // Build the aggregation pipeline + var pipeline mongo.Pipeline + { + // filter by time range + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{ + {"timestamp", bson.D{{"$gte", timeAfter}}}, + }}, + }) + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{ + {"timestamp", bson.D{{"$lte", timeBefore}}}, + }}, + }) + + // Count the number of results + pipeline = append(pipeline, bson.D{ + {"$count", "numDocuments"}, + }) + } + + // Execute the aggregation pipeline + cur, err := r.vaas.Aggregate(ctx, pipeline) + if err != nil { + r.logger.Error("failed execute aggregation pipeline", zap.Error(err)) + return 0, err + } + + // Read results from cursor + var results []struct { + NumDocuments uint64 `bson:"numDocuments"` + } + err = cur.All(ctx, &results) + if err != nil { + r.logger.Error("failed to decode cursor", zap.Error(err)) + return 0, err + } + if len(results) == 0 { + return 0, nil + } + if len(results) > 1 { + r.logger.Error("too many results", zap.Int("numResults", len(results))) + return 0, err + } + + return results[0].NumDocuments, nil +} + // CountIncompleteDocuments returns the number of documents that have destTx data, but don't have sourceTx data. func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, error) { @@ -100,7 +155,7 @@ func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, erro // Count the number of results pipeline = append(pipeline, bson.D{ - {"$count", "numGlobalTransactions"}, + {"$count", "numDocuments"}, }) } @@ -113,7 +168,7 @@ func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, erro // Read results from cursor var results []struct { - NumGlobalTransactions uint64 `bson:"numGlobalTransactions"` + NumDocuments uint64 `bson:"numDocuments"` } err = cur.All(ctx, &results) if err != nil { @@ -128,7 +183,7 @@ func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, erro return 0, err } - return results[0].NumGlobalTransactions, nil + return results[0].NumDocuments, nil } type GlobalTransaction struct { @@ -136,6 +191,76 @@ type GlobalTransaction struct { Vaas []vaa.VaaDoc `bson:"vaas"` } +// GetDocumentsByTimeRange iterates through documents within a specified time range. +func (r *Repository) GetDocumentsByTimeRange( + ctx context.Context, + maxId string, + limit uint, + timeAfter time.Time, + timeBefore time.Time, +) ([]GlobalTransaction, error) { + + // Build the aggregation pipeline + var pipeline mongo.Pipeline + { + // Specify sorting criteria + pipeline = append(pipeline, bson.D{ + {"$sort", bson.D{bson.E{"_id", 1}}}, + }) + + // filter out already processed documents + // + // We use the _id field as a pagination cursor + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{{"_id", bson.M{"$gt": maxId}}}}, + }) + + // filter by time range + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{ + {"timestamp", bson.D{{"$gte", timeAfter}}}, + }}, + }) + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{ + {"timestamp", bson.D{{"$lte", timeBefore}}}, + }}, + }) + + // Limit size of results + pipeline = append(pipeline, bson.D{ + {"$limit", limit}, + }) + } + + // Execute the aggregation pipeline + cur, err := r.vaas.Aggregate(ctx, pipeline) + if err != nil { + r.logger.Error("failed execute aggregation pipeline", zap.Error(err)) + return nil, errors.WithStack(err) + } + + // Read results from cursor + var documents []vaa.VaaDoc + err = cur.All(ctx, &documents) + if err != nil { + r.logger.Error("failed to decode cursor", zap.Error(err)) + return nil, errors.WithStack(err) + } + + // Build the result + var globalTransactions []GlobalTransaction + for i := range documents { + globalTransaction := GlobalTransaction{ + Id: documents[i].ID, + Vaas: []vaa.VaaDoc{documents[i]}, + } + globalTransactions = append(globalTransactions, globalTransaction) + } + + return globalTransactions, nil +} + // GetIncompleteDocuments gets a batch of VAA IDs from the database. func (r *Repository) GetIncompleteDocuments( ctx context.Context,