[API/TX-TRACKER] Add support for Solana tx hashes (#218)

### Summary
This pull request adds the functionality to search for VAAs using Solana tx hashes, e.g.:

`GET /api/v1/vaas?txHash=2qnNNBQQQ152sGkQ1Rnq2Kdr48LvdN4oXwnTWyEmJqYYWFusLWxbwhpGS5KTXH1bKKx9gg9BLZMKd3g1cXiy7QMz`.

Also, the VAA objects returned by the API now have a new field `nativeTxHash`, which contains the tx hash that generated the VAA in the native format of the emitter chain.

Tracking issues: https://github.com/wormhole-foundation/wormhole-explorer/issues/212, https://github.com/wormhole-foundation/wormhole-explorer/issues/214

### Testing guidelines

Endpoints affected:
* `GET /api/v1/vaas` now supports Solana tx hashes by using the `?txHash=` query parameter. This search is limited to VAAs created by the portal token bridge.
* Some VAAs returned by the API should have a new field `nativeTxHash`. All VAAs related to the portal token bridge should have this field set.
*  An internal refactoring affected pretty much all routes under `GET /api/v1/vaas*`, which should be re-tested.
* In the response of `/api/v1/global-tx/`, the field `originTx.signer` has been renamed to `originTx.from`
This commit is contained in:
agodnic 2023-04-05 10:33:28 -03:00 committed by GitHub
parent a5df48c967
commit 1f47f887da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 582 additions and 138 deletions

View File

@ -16,10 +16,10 @@ type GlobalTransactionDoc struct {
// OriginTx representa a origin transaction.
type OriginTx struct {
ChainID vaa.ChainID `bson:"chainId" json:"chainId"`
TxHash string `bson:"txHash" json:"txHash"`
TxHash string `bson:"nativeTxHash" json:"txHash"`
Status string `bson:"status" json:"status"`
Timestamp *time.Time `bson:"timestamp" json:"timestamp"`
Signer *string `bson:"signer" json:"signer"`
From *string `bson:"signer" json:"from"`
}
// DestinationTx representa a destination transaction.

View File

@ -31,6 +31,8 @@ type VaaDoc struct {
AppId string `bson:"appId" json:"appId,omitempty"`
// Payload is an extension field - it is not present in the guardian API.
Payload map[string]interface{} `bson:"payload" json:"payload,omitempty"`
// NativeTxHash is an extension field - it is not present in the guardian API.
NativeTxHash string `bson:"nativeTxHash" json:"nativeTxHash,omitempty"`
}
// MarshalJSON interface implementation.

View File

@ -5,7 +5,7 @@ import (
"fmt"
"github.com/pkg/errors"
errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
@ -19,10 +19,11 @@ type Repository struct {
db *mongo.Database
logger *zap.Logger
collections struct {
vaas *mongo.Collection
vaasPythnet *mongo.Collection
invalidVaas *mongo.Collection
vaaCount *mongo.Collection
vaas *mongo.Collection
vaasPythnet *mongo.Collection
invalidVaas *mongo.Collection
vaaCount *mongo.Collection
globalTransactions *mongo.Collection
}
}
@ -31,89 +32,73 @@ func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository {
return &Repository{db: db,
logger: logger.With(zap.String("module", "VaaRepository")),
collections: struct {
vaas *mongo.Collection
vaasPythnet *mongo.Collection
invalidVaas *mongo.Collection
vaaCount *mongo.Collection
}{vaas: db.Collection("vaas"), vaasPythnet: db.Collection("vaasPythnet"), invalidVaas: db.Collection("invalid_vaas"),
vaaCount: db.Collection("vaaCounts")}}
vaas *mongo.Collection
vaasPythnet *mongo.Collection
invalidVaas *mongo.Collection
vaaCount *mongo.Collection
globalTransactions *mongo.Collection
}{
vaas: db.Collection("vaas"),
vaasPythnet: db.Collection("vaasPythnet"),
invalidVaas: db.Collection("invalid_vaas"),
vaaCount: db.Collection("vaaCounts"),
globalTransactions: db.Collection("globalTransactions"),
},
}
}
// Find searches the database for VAAs.
// The input parameter [q *VaaQuery] define the filters to apply in the query.
func (r *Repository) Find(ctx context.Context, q *VaaQuery) ([]*VaaDoc, error) {
// FindVaasBySolanaTxHash searches the database for VAAs that match a given Solana transaction hash.
func (r *Repository) FindVaasBySolanaTxHash(
ctx context.Context,
txHash string,
includeParsedPayload bool,
) ([]*VaaDoc, error) {
var err error
var cur *mongo.Cursor
if q.chainId == vaa.ChainIDPythNet {
cur, err = r.collections.vaasPythnet.Find(ctx, q.toBSON(), q.findOptions())
} else {
cur, err = r.collections.vaas.Find(ctx, q.toBSON(), q.findOptions())
}
// Find globalTransactions that match the given Solana TxHash
cur, err := r.collections.globalTransactions.Find(
ctx,
bson.D{bson.E{"originTx.nativeTxHash", txHash}},
nil,
)
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed execute Find command to get vaas",
zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID))
r.logger.Error("failed to find globalTransactions by Solana TxHash",
zap.Error(err),
zap.String("requestID", requestID),
)
return nil, errors.WithStack(err)
}
var vaas []*VaaDoc
err = cur.All(ctx, &vaas)
// Read results from cursor
var globalTxs []transactions.GlobalTransactionDoc
err = cur.All(ctx, &globalTxs)
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed decoding cursor to []*VaaDoc", zap.Error(err), zap.Any("q", q),
zap.String("requestID", requestID))
r.logger.Error("failed to decode cursor to []GlobalTransactionDoc",
zap.Error(err),
zap.String("requestID", requestID),
)
return nil, errors.WithStack(err)
}
// Clear the `Payload` and `AppId` fields.
// (this function doesn't return those fields currently, but may do so in the future by adding new parameters)
for i := range vaas {
vaas[i].Payload = nil
vaas[i].AppId = ""
}
// If no results were found, return an empty slice instead of nil.
if vaas == nil {
vaas = make([]*VaaDoc, 0)
if len(globalTxs) == 0 {
result := make([]*VaaDoc, 0)
return result, nil
}
if len(globalTxs) > 1 {
return nil, fmt.Errorf("expected at most one transaction, but found %d", len(globalTxs))
}
return vaas, err
// Find VAAs that match the given VAA ID
q := Query().
SetID(globalTxs[0].ID).
IncludeParsedPayload(includeParsedPayload)
return r.FindVaas(ctx, q)
}
// FindOne get *VaaDoc.
// The input parameter [q *VaaQuery] define the filters to apply in the query.
func (r *Repository) FindOne(ctx context.Context, q *VaaQuery) (*VaaDoc, error) {
var vaaDoc VaaDoc
var err error
if q.chainId == vaa.ChainIDPythNet {
err = r.collections.vaasPythnet.FindOne(ctx, q.toBSON()).Decode(&vaaDoc)
} else {
err = r.collections.vaas.FindOne(ctx, q.toBSON()).Decode(&vaaDoc)
}
if err != nil {
if errors.Is(err, mongo.ErrNoDocuments) {
return nil, errs.ErrNotFound
}
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed execute FindOne command to get vaas",
zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID))
return nil, errors.WithStack(err)
}
// Clear the `Payload` and `AppId` fields.
// (this function doesn't return those fields currently, but may do so in the future by adding new parameters)
vaaDoc.Payload = nil
vaaDoc.AppId = ""
return &vaaDoc, err
}
// FindVaasWithPayload returns VAAs that include a parsed payload.
// The input parameter `q` defines the filters to be applied in the query.
func (r *Repository) FindVaasWithPayload(
// FindVaas searches the database for VAAs matching the given filters.
func (r *Repository) FindVaas(
ctx context.Context,
q *VaaQuery,
) ([]*VaaDoc, error) {
@ -126,6 +111,13 @@ func (r *Repository) FindVaasWithPayload(
{"$sort", bson.D{bson.E{q.SortBy, q.GetSortInt()}}},
})
// filter by _id
if q.id != "" {
pipeline = append(pipeline, bson.D{
{"$match", bson.D{bson.E{"_id", q.id}}},
})
}
// filter by emitterChain
if q.chainId != 0 {
pipeline = append(pipeline, bson.D{
@ -172,6 +164,23 @@ func (r *Repository) FindVaasWithPayload(
}},
})
// left outer join on the `globalTransaction` collection
pipeline = append(pipeline, bson.D{
{"$lookup", bson.D{
{"from", "globalTransactions"},
{"localField", "_id"},
{"foreignField", "_id"},
{"as", "globalTransaction"},
}},
})
// add globalTransaction fields
pipeline = append(pipeline, bson.D{
{"$addFields", bson.D{
{"nativeTxHash", bson.M{"$arrayElemAt": []interface{}{"$globalTransaction.originTx.nativeTxHash", 0}}},
}},
})
// filter by appId
if q.appId != "" {
pipeline = append(pipeline, bson.D{
@ -225,6 +234,13 @@ func (r *Repository) FindVaasWithPayload(
vaasWithPayload = make([]*VaaDoc, 0)
}
// If the payload field was not requested, remove it from the results.
if !q.includeParsedPayload && q.appId == "" {
for i := range vaasWithPayload {
vaasWithPayload[i].Payload = nil
}
}
return vaasWithPayload, nil
}
@ -252,11 +268,13 @@ func (r *Repository) GetVaaCount(ctx context.Context, q *VaaQuery) ([]*VaaStats,
// VaaQuery respresent a query for the vaa mongodb document.
type VaaQuery struct {
pagination.Pagination
chainId vaa.ChainID
emitter string
sequence string
txHash string
appId string
id string
chainId vaa.ChainID
emitter string
sequence string
txHash string
appId string
includeParsedPayload bool
}
// Query create a new VaaQuery with default pagination vaues.
@ -265,6 +283,12 @@ func Query() *VaaQuery {
return &VaaQuery{Pagination: *p}
}
// SetChain sets the id field of the VaaQuery struct.
func (q *VaaQuery) SetID(id string) *VaaQuery {
q.id = id
return q
}
// SetChain set the chainId field of the VaaQuery struct.
func (q *VaaQuery) SetChain(chainID vaa.ChainID) *VaaQuery {
q.chainId = chainID
@ -300,8 +324,16 @@ func (q *VaaQuery) SetAppId(appId string) *VaaQuery {
return q
}
func (q *VaaQuery) IncludeParsedPayload(val bool) *VaaQuery {
q.includeParsedPayload = val
return q
}
func (q *VaaQuery) toBSON() *bson.D {
r := bson.D{}
if q.id != "" {
r = append(r, bson.E{"_id", q.id})
}
if q.chainId > 0 {
r = append(r, bson.E{"emitterChain", q.chainId})
}

View File

@ -30,7 +30,7 @@ func NewService(r *Repository, getCacheFunc cache.CacheGetFunc, logger *zap.Logg
// FindAllParams passes input data to the function `FindAll`.
type FindAllParams struct {
Pagination *pagination.Pagination
TxHash *vaa.Address
TxHash *types.TxHash
IncludeParsedPayload bool
AppId string
}
@ -42,7 +42,8 @@ func (s *Service) FindAll(
) (*response.Response[[]*VaaDoc], error) {
// set up query parameters
query := Query()
query := Query().
IncludeParsedPayload(params.IncludeParsedPayload)
if params.Pagination != nil {
query.SetPagination(params.Pagination)
}
@ -56,10 +57,10 @@ func (s *Service) FindAll(
// execute the database query
var err error
var vaas []*VaaDoc
if params.IncludeParsedPayload {
vaas, err = s.repo.FindVaasWithPayload(ctx, query)
if params.TxHash != nil && params.TxHash.IsSolanaTxHash() {
vaas, err = s.repo.FindVaasBySolanaTxHash(ctx, params.TxHash.String(), params.IncludeParsedPayload)
} else {
vaas, err = s.repo.Find(ctx, query)
vaas, err = s.repo.FindVaas(ctx, query)
}
if err != nil {
return nil, err
@ -79,9 +80,10 @@ func (s *Service) FindByChain(
query := Query().
SetChain(chain).
SetPagination(p)
SetPagination(p).
IncludeParsedPayload(false)
vaas, err := s.repo.Find(ctx, query)
vaas, err := s.repo.FindVaas(ctx, query)
res := response.Response[[]*VaaDoc]{Data: vaas}
return &res, err
@ -98,9 +100,10 @@ func (s *Service) FindByEmitter(
query := Query().
SetChain(chain).
SetEmitter(emitter.Hex()).
SetPagination(p)
SetPagination(p).
IncludeParsedPayload(false)
vaas, err := s.repo.Find(ctx, query)
vaas, err := s.repo.FindVaas(ctx, query)
res := response.Response[[]*VaaDoc]{Data: vaas}
return &res, err
@ -122,13 +125,7 @@ func (s *Service) FindById(
}
// execute the database query
var err error
var vaa *VaaDoc
if includeParsedPayload {
vaa, err = s.findByIdWithPayload(ctx, chain, emitter, seq)
} else {
vaa, err = s.findById(ctx, chain, emitter, seq)
}
vaa, err := s.findById(ctx, chain, emitter, seq, includeParsedPayload)
if err != nil {
return &response.Response[*VaaDoc]{}, err
}
@ -144,39 +141,33 @@ func (s *Service) findById(
chain vaa.ChainID,
emitter *types.Address,
seq string,
includeParsedPayload bool,
) (*VaaDoc, error) {
// query matching documents from the database
query := Query().
SetChain(chain).
SetEmitter(emitter.Hex()).
SetSequence(seq)
return s.repo.FindOne(ctx, query)
}
// findByIdWithPayload get a vaa with payload data by chainID, emitter address and sequence number.
func (s *Service) findByIdWithPayload(ctx context.Context, chain vaa.ChainID, emitter *types.Address, seq string) (*VaaDoc, error) {
query := Query().
SetChain(chain).
SetEmitter(emitter.Hex()).
SetSequence(seq)
vaas, err := s.repo.FindVaasWithPayload(ctx, query)
SetSequence(seq).
IncludeParsedPayload(includeParsedPayload)
docs, err := s.repo.FindVaas(ctx, query)
if err != nil {
return nil, err
} else if len(vaas) == 0 {
}
// we're expecting exactly one document
if len(docs) == 0 {
return nil, errs.ErrNotFound
} else if len(vaas) > 1 {
}
if len(docs) > 1 {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
s.logger.Error("can not get more that one vaa by chainID/address/sequence",
zap.Any("q", query),
zap.String("requestID", requestID),
)
zap.String("requestID", requestID))
return nil, errs.ErrInternalError
}
return vaas[0], nil
return docs[0], nil
}
// GetVaaCount get a list a list of vaa count grouped by chainID.

View File

@ -188,26 +188,26 @@ func ExtractObservationHash(c *fiber.Ctx, l *zap.Logger) (string, error) {
return hash, nil
}
// GetTxHash get txHash parameter from query param.
func GetTxHash(c *fiber.Ctx, l *zap.Logger) (*sdk.Address, error) {
// GetTxHash parses the `txHash` parameter from query params.
func GetTxHash(c *fiber.Ctx, l *zap.Logger) (*types.TxHash, error) {
txHash := c.Query("txHash")
if txHash == "" {
value := c.Query("txHash")
if value == "" {
return nil, nil
}
txHashAddr, err := sdk.StringToAddress(txHash)
txHash, err := types.ParseTxHash(value)
if err != nil {
requestID := fmt.Sprintf("%v", c.Locals("requestid"))
l.Error("failed to covert txHash to address",
l.Error("failed to parse txHash",
zap.Error(err),
zap.String("txHash", txHash),
zap.String("txHash", value),
zap.String("requestID", requestID),
)
return nil, response.NewInvalidParamError(c, "MALFORMED TX HASH", errors.WithStack(err))
}
return &txHashAddr, nil
return txHash, nil
}
// ExtractParsedPayload get parsedPayload query parameter.

102
api/types/tx_hash.go Normal file
View File

@ -0,0 +1,102 @@
package types
import (
"encoding/hex"
"fmt"
"strings"
"github.com/mr-tron/base58"
)
const solanaTxHashLen = 88
const wormholeMinTxHashLen = 64
const wormholeMaxTxHashLen = 66
// TxHash represents a transaction hash passed by query params.
type TxHash struct {
hash string
isWormhole bool
isSolana bool
}
// ParseTxHash parses a transaction hash from a string.
//
// The transaction hash can be provided in different formats,
// depending on the blockchain it belongs to:
// * Solana: 64 bytes, encoded as base58.
// * All other chains: 32 bytes, encoded as hex.
//
// More cases could be added in the future as needed.
func ParseTxHash(value string) (*TxHash, error) {
// Solana txHashes are 64 bytes long, encoded as base58.
if len(value) == solanaTxHashLen {
return parseSolanaTxHash(value)
}
// Wormhole txHashes are 32 bytes long, encoded as hex.
// Optionally, they can be prefixed with "0x" or "0X".
if len(value) >= wormholeMinTxHashLen && len(value) <= wormholeMaxTxHashLen {
return parseWormholeTxHash(value)
}
return nil, fmt.Errorf("invalid txHash length: %d", len(value))
}
func parseSolanaTxHash(value string) (*TxHash, error) {
// Decode the string from base58 to binary
bytes, err := base58.Decode(value)
if err != nil {
return nil, fmt.Errorf("failed to decode txHash from base58: %w", err)
}
// Make sure we have the expected amount of bytes
if len(bytes) != 64 {
return nil, fmt.Errorf("solana txHash must be exactly 64 bytes, but got %d bytes", len(bytes))
}
// Populate the result struct and return
result := TxHash{
hash: base58.Encode(bytes),
isSolana: true,
}
return &result, nil
}
func parseWormholeTxHash(value string) (*TxHash, error) {
// Trim any preceding "0x" to the address
value = strings.TrimPrefix(value, "0x")
value = strings.TrimPrefix(value, "0X")
// Decode the string from hex to binary
bytes, err := hex.DecodeString(value)
if err != nil {
return nil, fmt.Errorf("failed to decode txHash from hex: %w", err)
}
// Make sure we have the expected amount of bytes
if len(bytes) != 32 {
return nil, fmt.Errorf("wormhole txHash must be exactly 32 bytes, but got %d bytes", len(bytes))
}
// Populate the result struct and return
result := TxHash{
hash: hex.EncodeToString(bytes),
isWormhole: true,
}
return &result, nil
}
func (h *TxHash) IsSolanaTxHash() bool {
return h.isSolana
}
func (h *TxHash) IsWormholeTxHash() bool {
return h.isWormhole
}
func (h *TxHash) String() string {
return h.hash
}

88
api/types/tx_hash_test.go Normal file
View File

@ -0,0 +1,88 @@
package types
import "testing"
// TestParseTxHash tests the ParseTxHash function.
func TestParseTxHash(t *testing.T) {
// a table containing several test cases
tcs := []struct {
input string
output string
isSolanaTxHash bool
isWormholeTxHash bool
}{
{
// Solana hash
input: "2maR6uDZzroV7JFF76rp5QR4CFP1PFUe76VRE8gF8QtWRifpGAKJQo4SQDBNs3TAM9RrchJhnJ644jUL2yfagZco",
output: "2maR6uDZzroV7JFF76rp5QR4CFP1PFUe76VRE8gF8QtWRifpGAKJQo4SQDBNs3TAM9RrchJhnJ644jUL2yfagZco",
isSolanaTxHash: true,
},
{
// Solana hash w/ invalid length
input: "2maR6uDZzroV7JFF76rp5QR4CFP1PFUe76VRE8gF8QtWRifpGAKJQo4SQDBNs3TAM9RrchJhnJ644jUL2yfagZc",
},
{
// Solana hash w/ invalid length
input: "2maR6uDZzroV7JFF76rp5QR4CFP1PFUe76VRE8gF8QtWRifpGAKJQo4SQDBNs3TAM9RrchJhnJ644jUL2yfagZco2",
},
{
// Wormhole hash with 0x prefix
input: "0x3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637",
output: "3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637",
isWormholeTxHash: true,
},
{
// Wormhole hash with 0X prefix
input: "0X3F77F8B44F35FF047A74EE8235CE007AFBAB357D4E30010D51B6F6990F921637",
output: "3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637",
isWormholeTxHash: true,
},
{
// Wormhole hash with no prefix
input: "3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637",
output: "3f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637",
isWormholeTxHash: true,
},
{
// Wormhole hash w/ indalid length
input: "33f77f8b44f35ff047a74ee8235ce007afbab357d4e30010d51b6f6990f921637",
output: "",
},
{
// A bunch of random characters
input: "434234i32042oiu08d8sauf0suif",
output: "",
},
}
// run each test case in the table
for i := range tcs {
tc := tcs[i]
// try to parse the hash
txHash, err := ParseTxHash(tc.input)
if tc.output == "" && err == nil {
t.Fatalf("expected parseTxHash(%s) to fail", tc.input)
} else if tc.output != "" && err != nil {
t.Fatalf("parseTxHash(%s) failed with error %v", tc.input, err)
}
if tc.output == "" {
continue
}
// make assertions about the output struct
if tc.output != txHash.String() {
t.Fatalf("expected TxHash.String()=%s, got %s", tc.output, txHash.String())
}
if tc.isSolanaTxHash != txHash.IsSolanaTxHash() {
t.Fatalf("expected TxHash.IsSolanaHash()=%t, but got %t", tc.isSolanaTxHash, txHash.IsSolanaTxHash())
}
if tc.isWormholeTxHash != txHash.IsWormholeTxHash() {
t.Fatalf("expected TxHash.IsWormholeHash()=%t, but got %t", tc.isWormholeTxHash, txHash.IsWormholeTxHash())
}
}
}

View File

@ -101,6 +101,12 @@ spec:
value: "20"
- name: BULK_SIZE
value: "500"
- name: STRATEGY_NAME
value: "{{ .STRATEGY_NAME }}"
- name: STRATEGY_TIMESTAMP_AFTER
value: "{{ .STRATEGY_TIMESTAMP_AFTER }}"
- name: STRATEGY_TIMESTAMP_BEFORE
value: "{{ .STRATEGY_TIMESTAMP_BEFORE }}"
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}

View File

@ -2,6 +2,7 @@ package chains
import (
"context"
"errors"
"fmt"
"strings"
@ -46,6 +47,9 @@ func fetchEthTx(
if err != nil {
return nil, fmt.Errorf("failed to get tx by hash: %w", err)
}
if txReply.BlockHash == "" || txReply.From == "" {
return nil, errors.New("received empty response from the RPC service")
}
// query block data
blkParams := []interface{}{

View File

@ -87,7 +87,10 @@ func fetchSolanaTx(
if err != nil {
return nil, fmt.Errorf("failed to get signatures for account: %w (%+v)", err, err)
}
if len(sigs) != 1 {
if len(sigs) == 0 {
return nil, ErrTransactionNotFound
}
if len(sigs) > 1 {
return nil, fmt.Errorf("expected exactly one signature, but found %d", len(sigs))
}

View File

@ -0,0 +1,7 @@
# Backfiller
Reprocess all VAAs in a specified time range:
`STRATEGY_NAME=time_range STRATEGY_TIMESTAMP_AFTER=2023-01-01T00:00:00.000Z STRATEGY_TIMESTAMP_BEFORE=2023-04-01T00:00:00.000Z ./backfiller`
Reprocess only VAAs that failed due to internal errors:
`STRATEGY_NAME=reprocess_failed ./backfiller`

View File

@ -9,6 +9,7 @@ import (
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
@ -19,6 +20,8 @@ import (
"go.uber.org/zap"
)
const layoutRcf3339 = "2006-01-02T15:04:05.000Z"
func makeLogger(logger *zap.Logger, name string) *zap.Logger {
rightPadding := fmt.Sprintf("%-10s", name)
@ -69,8 +72,13 @@ func main() {
defer cli.Disconnect(rootCtx)
repository := consumer.NewRepository(rootLogger, cli.Database(cfg.MongodbDatabase))
strategyCallbacks, err := parseStrategyCallbacks(mainLogger, cfg, repository)
if err != nil {
log.Fatal("Failed to parse strategy callbacks: ", err)
}
// Count the number of documents to process
totalDocuments, err := repository.CountIncompleteDocuments(rootCtx)
totalDocuments, err := strategyCallbacks.countFn(rootCtx)
if err != nil {
log.Fatal("Closing - failed to count number of global transactions: ", err)
}
@ -81,10 +89,11 @@ func main() {
// The producer sends tasks to the workers via a buffered channel.
queue := make(chan consumer.GlobalTransaction, cfg.BulkSize)
p := producerParams{
logger: makeLogger(rootLogger, "producer"),
repository: repository,
queueTx: queue,
bulkSize: cfg.BulkSize,
logger: makeLogger(rootLogger, "producer"),
repository: repository,
queueTx: queue,
bulkSize: cfg.BulkSize,
strategyCallbacks: strategyCallbacks,
}
go produce(rootCtx, &p)
@ -112,12 +121,69 @@ func main() {
mainLogger.Info("Closing main goroutine")
}
func parseStrategyCallbacks(
logger *zap.Logger,
cfg *config.BackfillerSettings,
r *consumer.Repository,
) (*strategyCallbacks, error) {
switch cfg.Strategy.Name {
case config.BackfillerStrategyReprocessFailed:
cb := strategyCallbacks{
countFn: r.CountIncompleteDocuments,
iteratorFn: r.GetIncompleteDocuments,
}
logger.Info("backfilling incomplete documents")
return &cb, nil
case config.BackfillerStrategyTimeRange:
timestampAfter, err := time.Parse(layoutRcf3339, cfg.Strategy.TimestampAfter)
if err != nil {
return nil, fmt.Errorf("failed to parse timestampAfter: %w", err)
}
timestampBefore, err := time.Parse(layoutRcf3339, cfg.Strategy.TimestampBefore)
if err != nil {
return nil, fmt.Errorf("failed to parse timestampBefore: %w", err)
}
cb := strategyCallbacks{
countFn: func(ctx context.Context) (uint64, error) {
return r.CountDocumentsByTimeRange(ctx, timestampAfter, timestampBefore)
},
iteratorFn: func(ctx context.Context, maxId string, limit uint) ([]consumer.GlobalTransaction, error) {
return r.GetDocumentsByTimeRange(ctx, maxId, limit, timestampAfter, timestampBefore)
},
}
logger.Info("backfilling by time range",
zap.Time("after", timestampAfter),
zap.Time("before", timestampBefore),
)
return &cb, nil
default:
return nil, fmt.Errorf("unknown strategy: %s", cfg.Strategy.Name)
}
}
type strategyCallbacks struct {
countFn func(ctx context.Context) (uint64, error)
iteratorFn func(ctx context.Context, maxId string, limit uint) ([]consumer.GlobalTransaction, error)
}
// producerParams contains the parameters for the producer goroutine.
type producerParams struct {
logger *zap.Logger
repository *consumer.Repository
queueTx chan<- consumer.GlobalTransaction
bulkSize uint
logger *zap.Logger
repository *consumer.Repository
queueTx chan<- consumer.GlobalTransaction
bulkSize uint
strategyCallbacks *strategyCallbacks
}
// produce reads VAA IDs from the database, and sends them through a channel for the workers to consume.
@ -134,7 +200,7 @@ func produce(ctx context.Context, params *producerParams) {
for {
// Get a batch of VAA IDs from the database
globalTxs, err := params.repository.GetIncompleteDocuments(ctx, maxId, params.bulkSize)
globalTxs, err := params.strategyCallbacks.iteratorFn(ctx, maxId, params.bulkSize)
if err != nil {
params.logger.Error("Closing: failed to read from cursor", zap.Error(err))
return

View File

@ -7,11 +7,29 @@ import (
"github.com/kelseyhightower/envconfig"
)
type BackfillingStrategy string
const (
// StrategyReprocessAll will reprocess documents in the `globalTransactions`
// collection that don't have the `sourceTx` field set, or that have the
// `sourceTx.status` field set to "internalError".
BackfillerStrategyReprocessFailed BackfillingStrategy = "reprocess_failed"
// BackfillerStrategyTimeRange will reprocess all VAAs that have a timestamp between the specified range.
BackfillerStrategyTimeRange BackfillingStrategy = "time_range"
)
type BackfillerSettings struct {
LogLevel string `split_words:"true" default:"INFO"`
NumWorkers uint `split_words:"true" required:"true"`
BulkSize uint `split_words:"true" required:"true"`
// Strategy determines which VAAs will be affected by the backfiller.
Strategy struct {
Name BackfillingStrategy `split_words:"true" required:"true"`
TimestampAfter string `split_words:"true" required:"false"`
TimestampBefore string `split_words:"true" required:"false"`
}
VaaPayloadParserSettings
MongodbSettings
RpcProviderSettings

View File

@ -3,6 +3,7 @@ package consumer
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa"
@ -19,6 +20,7 @@ import (
type Repository struct {
logger *zap.Logger
globalTransactions *mongo.Collection
vaas *mongo.Collection
}
// New creates a new repository.
@ -27,6 +29,7 @@ func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository {
r := Repository{
logger: logger,
globalTransactions: db.Collection("globalTransactions"),
vaas: db.Collection("vaas"),
}
return &r
@ -52,10 +55,7 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP
if params.TxDetail != nil {
fields = append(fields, primitive.E{Key: "timestamp", Value: params.TxDetail.Timestamp})
fields = append(fields, primitive.E{Key: "signer", Value: params.TxDetail.Signer})
// It is still to be defined whether we want to expose this field to the API consumers,
// since it can be obtained from the original TxHash.
//fields = append(fields, primitive.E{Key: "nativeTxHash", Value: txDetail.NativeTxHash})
fields = append(fields, primitive.E{Key: "nativeTxHash", Value: params.TxDetail.NativeTxHash})
}
update := bson.D{
@ -80,6 +80,61 @@ func (r *Repository) UpsertDocument(ctx context.Context, params *UpsertDocumentP
return nil
}
// CountDocumentsByTimeRange returns the number of documents that match the given time range.
func (r *Repository) CountDocumentsByTimeRange(
ctx context.Context,
timeAfter time.Time,
timeBefore time.Time,
) (uint64, error) {
// Build the aggregation pipeline
var pipeline mongo.Pipeline
{
// filter by time range
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"timestamp", bson.D{{"$gte", timeAfter}}},
}},
})
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"timestamp", bson.D{{"$lte", timeBefore}}},
}},
})
// Count the number of results
pipeline = append(pipeline, bson.D{
{"$count", "numDocuments"},
})
}
// Execute the aggregation pipeline
cur, err := r.vaas.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return 0, err
}
// Read results from cursor
var results []struct {
NumDocuments uint64 `bson:"numDocuments"`
}
err = cur.All(ctx, &results)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return 0, err
}
if len(results) == 0 {
return 0, nil
}
if len(results) > 1 {
r.logger.Error("too many results", zap.Int("numResults", len(results)))
return 0, err
}
return results[0].NumDocuments, nil
}
// CountIncompleteDocuments returns the number of documents that have destTx data, but don't have sourceTx data.
func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, error) {
@ -100,7 +155,7 @@ func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, erro
// Count the number of results
pipeline = append(pipeline, bson.D{
{"$count", "numGlobalTransactions"},
{"$count", "numDocuments"},
})
}
@ -113,7 +168,7 @@ func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, erro
// Read results from cursor
var results []struct {
NumGlobalTransactions uint64 `bson:"numGlobalTransactions"`
NumDocuments uint64 `bson:"numDocuments"`
}
err = cur.All(ctx, &results)
if err != nil {
@ -128,7 +183,7 @@ func (r *Repository) CountIncompleteDocuments(ctx context.Context) (uint64, erro
return 0, err
}
return results[0].NumGlobalTransactions, nil
return results[0].NumDocuments, nil
}
type GlobalTransaction struct {
@ -136,6 +191,76 @@ type GlobalTransaction struct {
Vaas []vaa.VaaDoc `bson:"vaas"`
}
// GetDocumentsByTimeRange iterates through documents within a specified time range.
func (r *Repository) GetDocumentsByTimeRange(
ctx context.Context,
maxId string,
limit uint,
timeAfter time.Time,
timeBefore time.Time,
) ([]GlobalTransaction, error) {
// Build the aggregation pipeline
var pipeline mongo.Pipeline
{
// Specify sorting criteria
pipeline = append(pipeline, bson.D{
{"$sort", bson.D{bson.E{"_id", 1}}},
})
// filter out already processed documents
//
// We use the _id field as a pagination cursor
pipeline = append(pipeline, bson.D{
{"$match", bson.D{{"_id", bson.M{"$gt": maxId}}}},
})
// filter by time range
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"timestamp", bson.D{{"$gte", timeAfter}}},
}},
})
pipeline = append(pipeline, bson.D{
{"$match", bson.D{
{"timestamp", bson.D{{"$lte", timeBefore}}},
}},
})
// Limit size of results
pipeline = append(pipeline, bson.D{
{"$limit", limit},
})
}
// Execute the aggregation pipeline
cur, err := r.vaas.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return nil, errors.WithStack(err)
}
// Read results from cursor
var documents []vaa.VaaDoc
err = cur.All(ctx, &documents)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return nil, errors.WithStack(err)
}
// Build the result
var globalTransactions []GlobalTransaction
for i := range documents {
globalTransaction := GlobalTransaction{
Id: documents[i].ID,
Vaas: []vaa.VaaDoc{documents[i]},
}
globalTransactions = append(globalTransactions, globalTransaction)
}
return globalTransactions, nil
}
// GetIncompleteDocuments gets a batch of VAA IDs from the database.
func (r *Repository) GetIncompleteDocuments(
ctx context.Context,