add search by chain

 change chainId query param handling

 change condition

 only from chain

change query

 add filter by appId

 add payload type for operations query

add logs

 add log for error in mongodb call

 add more logs and recover to find possible panic

change type to float64

add more logs for troubleshooting

 add more logs for troubleshooting payloadType query param

add another defer
This commit is contained in:
Mariano 2024-03-19 12:06:47 -03:00
parent e55f6e35e3
commit 7e9bbf2a41
4 changed files with 237 additions and 20 deletions

View File

@ -3,6 +3,8 @@ package operations
import (
"context"
"fmt"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"math"
"strings"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/errors"
@ -107,9 +109,113 @@ type mongoID struct {
}
type OperationQuery struct {
Pagination pagination.Pagination
TxHash string
Address string
Pagination pagination.Pagination
TxHash string
Address string
ChainId *vaa.ChainID
AppId string
PayloadType *float64
}
func findOperationsIdByChain(ctx context.Context, db *mongo.Database, chainId vaa.ChainID) ([]string, error) {
matchGlobalTransactions := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "destinationTx.chainId", Value: bson.M{"$eq": chainId}}}},
}}}}
matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "parsedPayload.targetChainId", Value: bson.M{"$eq": chainId}}},
bson.D{{Key: "rawStandardizedProperties.fromChain", Value: bson.M{"$eq": chainId}}},
bson.D{{Key: "rawStandardizedProperties.toChain", Value: bson.M{"$eq": chainId}}},
bson.D{{Key: "standardizedProperties.fromChain", Value: bson.M{"$eq": chainId}}},
bson.D{{Key: "standardizedProperties.toChain", Value: bson.M{"$eq": chainId}}},
}}}}}
globalTransactionFilter := bson.D{{Key: "$unionWith", Value: bson.D{{Key: "coll", Value: "globalTransactions"}, {Key: "pipeline", Value: bson.A{matchGlobalTransactions}}}}}
parserFilter := bson.D{{Key: "$unionWith", Value: bson.D{{Key: "coll", Value: "parsedVaa"}, {Key: "pipeline", Value: bson.A{matchParsedVaa}}}}}
group := bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: "$_id"}}}}
pipeline := []bson.D{globalTransactionFilter, parserFilter, group}
cur, err := db.Collection("_operationsTemporal").Aggregate(ctx, pipeline)
if err != nil {
return nil, err
}
var documents []mongoID
err = cur.All(ctx, &documents)
if err != nil {
return nil, err
}
var ids []string
for _, doc := range documents {
ids = append(ids, doc.Id)
}
return ids, nil
}
func findOperationsIdByAppID(ctx context.Context, db *mongo.Database, appID string) ([]string, error) {
includesAppId := bson.M{"$in": []string{appID}}
matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "appIds", Value: includesAppId}},
bson.D{{Key: "rawStandardizedProperties.appIds", Value: includesAppId}},
bson.D{{Key: "standardizedProperties.appIds", Value: includesAppId}},
}}}}}
parserFilter := bson.D{{Key: "$unionWith", Value: bson.D{{Key: "coll", Value: "parsedVaa"}, {Key: "pipeline", Value: bson.A{matchParsedVaa}}}}}
group := bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: "$_id"}}}}
pipeline := []bson.D{parserFilter, group}
cur, err := db.Collection("_operationsTemporal").Aggregate(ctx, pipeline)
if err != nil {
return nil, err
}
var documents []mongoID
err = cur.All(ctx, &documents)
if err != nil {
return nil, err
}
var ids []string
for _, doc := range documents {
ids = append(ids, doc.Id)
}
return ids, nil
}
func findOperationsIdByPayloadType(ctx context.Context, db *mongo.Database, logger *zap.Logger, payloadType float64) ([]string, error) {
defer func() {
if r := recover(); r != nil {
logger.Error("recovered from panic in findOperationsIdByPayloadType", zap.Any("recovered", r))
}
}()
logger.Info("findOperationsIdByPayloadType: building pipeline")
matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "parsedPayload.payloadType", Value: bson.M{"$eq": payloadType}}},
}}}}}
parserFilter := bson.D{{Key: "$unionWith", Value: bson.D{{Key: "coll", Value: "parsedVaa"}, {Key: "pipeline", Value: bson.A{matchParsedVaa}}}}}
group := bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: "$_id"}}}}
pipeline := []bson.D{parserFilter, group}
logger.Info("findOperationsIdByPayloadType: finished building pipeline")
cur, err := db.Collection("_operationsTemporal").Aggregate(ctx, pipeline)
if err != nil {
logger.Error("failed execute aggregation pipeline for querying by payload type", zap.Error(err))
return nil, err
}
var documents []mongoID
err = cur.All(ctx, &documents)
if err != nil {
logger.Error("failed executing cur.All for querying by payload type", zap.Error(err))
return nil, err
}
var ids []string
for _, doc := range documents {
ids = append(ids, doc.Id)
}
return ids, nil
}
// findOperationsIdByAddress returns all operations filtered by address.
@ -189,6 +295,12 @@ func (r *Repository) matchOperationByTxHash(ctx context.Context, txHash string)
// FindAll returns all operations filtered by q.
func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*OperationDto, error) {
defer func() {
if er := recover(); er != nil {
r.logger.Error("recovered from panic in FindAll", zap.Any("recovered", er))
}
}()
var pipeline mongo.Pipeline
// filter operations by address or txHash
@ -198,7 +310,6 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper
if err != nil {
return nil, err
}
if len(ids) == 0 {
return []*OperationDto{}, nil
}
@ -207,6 +318,41 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper
// match operation by txHash (source tx and destination tx)
matchByTxHash := r.matchOperationByTxHash(ctx, query.TxHash)
pipeline = append(pipeline, matchByTxHash)
} else if query.ChainId != nil {
ids, err := findOperationsIdByChain(ctx, r.db, *query.ChainId)
if err != nil {
return nil, err
}
if len(ids) == 0 {
return []*OperationDto{}, nil
}
pipeline = append(pipeline, bson.D{{Key: "$match", Value: bson.D{{Key: "_id", Value: bson.D{{Key: "$in", Value: ids}}}}}})
} else if len(query.AppId) > 0 {
ids, err := findOperationsIdByAppID(ctx, r.db, query.AppId)
if err != nil {
return nil, err
}
if len(ids) == 0 {
return []*OperationDto{}, nil
}
pipeline = append(pipeline, bson.D{{Key: "$match", Value: bson.D{{Key: "_id", Value: bson.D{{Key: "$in", Value: ids}}}}}})
} else if query.PayloadType != nil {
r.logger.Info("searching by payloadType")
if math.IsNaN(*query.PayloadType) {
r.logger.Info("searching by payloadType: payloadType is NaN")
return []*OperationDto{}, nil
}
r.logger.Info("searching by payloadType: calling method findOperationsIdByPayloadType with payload type hardcoded to 1")
ids, err := findOperationsIdByPayloadType(ctx, r.db, r.logger, 1)
r.logger.Info("searching by payloadType: finished method findOperationsIdByPayloadType")
if err != nil {
r.logger.Info("searching by payloadType: returning err", zap.Error(err))
return nil, err
}
if len(ids) == 0 {
return []*OperationDto{}, nil
}
pipeline = append(pipeline, bson.D{{Key: "$match", Value: bson.D{{Key: "_id", Value: bson.D{{Key: "$in", Value: ids}}}}}})
}
// sort

View File

@ -32,9 +32,12 @@ func (s *Service) FindById(ctx context.Context, chainID vaa.ChainID,
}
type OperationFilter struct {
TxHash *types.TxHash
Address string
Pagination pagination.Pagination
TxHash *types.TxHash
Address string
ChainID *vaa.ChainID
AppID string
Pagination pagination.Pagination
PayloadType *float64
}
// FindAll returns all operations filtered by q.
@ -45,9 +48,12 @@ func (s *Service) FindAll(ctx context.Context, filter OperationFilter) ([]*Opera
}
operationQuery := OperationQuery{
TxHash: txHash,
Address: filter.Address,
Pagination: filter.Pagination,
TxHash: txHash,
Address: filter.Address,
Pagination: filter.Pagination,
ChainId: filter.ChainID,
AppId: filter.AppID,
PayloadType: filter.PayloadType,
}
operations, err := s.repo.FindAll(ctx, operationQuery)

View File

@ -61,6 +61,28 @@ func ExtractToChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
return &result, nil
}
func ExtractChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
param := c.Query("chain")
if param == "" {
return nil, nil
}
chain, err := strconv.ParseInt(param, 10, 16)
if err != nil {
requestID := fmt.Sprintf("%v", c.Locals("requestid"))
l.Error("failed to parse toChain parameter",
zap.Error(err),
zap.String("requestID", requestID),
)
return nil, response.NewInvalidParamError(c, "INVALID CHAIN VALUE", errors.WithStack(err))
}
result := sdk.ChainID(chain)
return &result, nil
}
// ExtractEmitterAddr parses the emitter address from the request path.
//
// When the parameter `chainIdHint` is not nil, this function will attempt to parse the
@ -257,6 +279,24 @@ func ExtractAppId(c *fiber.Ctx, l *zap.Logger) string {
return c.Query("appId")
}
func ExtractPayloadType(c *fiber.Ctx, l *zap.Logger) (*float64, error) {
payloadTypeParam := c.Query("payloadType")
if payloadTypeParam == "" {
return nil, nil
}
payloadType, err := strconv.ParseFloat(payloadTypeParam, 64)
if err != nil {
requestID := fmt.Sprintf("%v", c.Locals("requestid"))
l.Error("failed to parse payload type parameter",
zap.Error(err),
zap.String("requestID", requestID),
)
return nil, response.NewInvalidParamError(c, "INVALID PAYLOAD TYPE", errors.WithStack(err))
}
return &payloadType, nil
}
func ExtractTimeSpan(c *fiber.Ctx, l *zap.Logger) (string, error) {
// get the timeSpan from query params
timeSpanStr := c.Query("timeSpan", "1d")

View File

@ -54,28 +54,53 @@ func (c *Controller) FindAll(ctx *fiber.Ctx) error {
return err
}
// Check if address and txHash query param are used together
if address != "" && txHash != nil {
if txHash.String() != "" {
return response.NewInvalidParamError(ctx, "address and txHash query param cannot be used together", nil)
chainID, err := middleware.ExtractChain(ctx, c.logger)
if err != nil {
return err
}
appID := middleware.ExtractAppId(ctx, c.logger)
payloadType, err := middleware.ExtractPayloadType(ctx, c.logger)
if err != nil {
return err
}
searchByAddress := address != ""
searchByTxHash := txHash != nil && txHash.String() != ""
searchByChainId := chainID != nil
searchByAppID := len(appID) > 0
searchByPayloadType := payloadType != nil
searchCriteria := []bool{searchByAddress, searchByTxHash, searchByChainId, searchByAppID, searchByPayloadType}
searchCriteriaCount := 0
for _, sc := range searchCriteria {
if sc {
searchCriteriaCount++
}
}
if searchCriteriaCount > 1 {
return response.NewInvalidParamError(ctx, "only one search-criteria can be used at once", nil)
}
filter := operations.OperationFilter{
TxHash: txHash,
Address: address,
Pagination: *pagination,
TxHash: txHash,
Address: address,
ChainID: chainID,
AppID: appID,
Pagination: *pagination,
PayloadType: payloadType,
}
// Find operations by q search param.
operations, err := c.srv.FindAll(ctx.Context(), filter)
ops, err := c.srv.FindAll(ctx.Context(), filter)
if err != nil {
return err
}
// build response
response := toListOperationResponse(operations, c.logger)
return ctx.JSON(response)
resp := toListOperationResponse(ops, c.logger)
return ctx.JSON(resp)
}
// FindById godoc