[ISSUE-1220] Add filters to /operations endpoint (#1262)

* start

 add search by chain

 change chainId query param handling

 change condition

 only from chain

change query

 add filter by appId

 add payload type for operations query

add logs

 add log for error in mongodb call

 add more logs and recover to find possible panic

change type to float64

add more logs for troubleshooting

 add more logs for troubleshooting payloadType query param

add another defer

* change query

* add exclusiveAppId,sourceChain and targetChain

* unify search criteria

* change queryies

* combine query params filters

* change implementation of sourceChain and targetChain

* insert filtering by chain and by appid as stages in aggregation pipeline

* fix appIds matching condition

* move query to a separate pipeline which starts from parsedVAA

* adjust query by appId

* add matching also for standardizedProperties

* change

* try using  instead of

* simplify query

* add queryLoggging and remove other parts of the query to troubleshooting

* working

* add index creation

* update swagger docs

* tweak index performance and fix timestamp on parsedVaa collection

* start

 add search by chain

 change chainId query param handling

 change condition

 only from chain

change query

 add filter by appId

 add payload type for operations query

add logs

 add log for error in mongodb call

 add more logs and recover to find possible panic

change type to float64

add more logs for troubleshooting

 add more logs for troubleshooting payloadType query param

add another defer

* change query

* add exclusiveAppId,sourceChain and targetChain

* unify search criteria

* change queryies

* combine query params filters

* change implementation of sourceChain and targetChain

* insert filtering by chain and by appid as stages in aggregation pipeline

* fix appIds matching condition

* move query to a separate pipeline which starts from parsedVAA

* adjust query by appId

* add matching also for standardizedProperties

* change

* try using  instead of

* simplify query

* add queryLoggging and remove other parts of the query to troubleshooting

* working

* add index creation

* update swagger docs

* tweak index performance and fix timestamp on parsedVaa collection

* add lookup for globaltransactions
This commit is contained in:
Mariano 2024-04-09 09:57:08 -03:00 committed by GitHub
parent bc0014885e
commit e06057acfd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 321 additions and 60 deletions

View File

@ -912,6 +912,30 @@ const docTemplate = `{
"description": "pageSize",
"name": "pageSize",
"in": "query"
},
{
"type": "string",
"description": "source chain of the operation",
"name": "sourceChain",
"in": "query"
},
{
"type": "string",
"description": "target chain of the operation",
"name": "targetChain",
"in": "query"
},
{
"type": "string",
"description": "appID of the operation",
"name": "appId",
"in": "query"
},
{
"type": "boolean",
"description": "single appId of the operation",
"name": "exclusiveAppId",
"in": "query"
}
],
"responses": {
@ -2729,25 +2753,12 @@ const docTemplate = `{
"parser.ParseVaaWithStandarizedPropertiesdResponse": {
"type": "object",
"properties": {
"parsedPayload": {
"$ref": "#/definitions/parser.ParsedPayload"
},
"parsedPayload": {},
"standardizedProperties": {
"$ref": "#/definitions/parser.StandardizedProperties"
}
}
},
"parser.ParsedPayload": {
"type": "object",
"properties": {
"tokenAddress": {
"type": "string"
},
"tokenChain": {
"type": "integer"
}
}
},
"parser.StandardizedProperties": {
"type": "object",
"properties": {

View File

@ -905,6 +905,30 @@
"description": "pageSize",
"name": "pageSize",
"in": "query"
},
{
"type": "string",
"description": "source chain of the operation",
"name": "sourceChain",
"in": "query"
},
{
"type": "string",
"description": "target chain of the operation",
"name": "targetChain",
"in": "query"
},
{
"type": "string",
"description": "appID of the operation",
"name": "appId",
"in": "query"
},
{
"type": "boolean",
"description": "single appId of the operation",
"name": "exclusiveAppId",
"in": "query"
}
],
"responses": {
@ -2722,25 +2746,12 @@
"parser.ParseVaaWithStandarizedPropertiesdResponse": {
"type": "object",
"properties": {
"parsedPayload": {
"$ref": "#/definitions/parser.ParsedPayload"
},
"parsedPayload": {},
"standardizedProperties": {
"$ref": "#/definitions/parser.StandardizedProperties"
}
}
},
"parser.ParsedPayload": {
"type": "object",
"properties": {
"tokenAddress": {
"type": "string"
},
"tokenChain": {
"type": "integer"
}
}
},
"parser.StandardizedProperties": {
"type": "object",
"properties": {

View File

@ -483,18 +483,10 @@ definitions:
type: object
parser.ParseVaaWithStandarizedPropertiesdResponse:
properties:
parsedPayload:
$ref: '#/definitions/parser.ParsedPayload'
parsedPayload: {}
standardizedProperties:
$ref: '#/definitions/parser.StandardizedProperties'
type: object
parser.ParsedPayload:
properties:
tokenAddress:
type: string
tokenChain:
type: integer
type: object
parser.StandardizedProperties:
properties:
amount:
@ -1749,6 +1741,22 @@ paths:
in: query
name: pageSize
type: integer
- description: source chain of the operation
in: query
name: sourceChain
type: string
- description: target chain of the operation
in: query
name: targetChain
type: string
- description: appID of the operation
in: query
name: appId
type: string
- description: single appId of the operation
in: query
name: exclusiveAppId
type: boolean
responses:
"200":
description: OK

View File

@ -3,6 +3,7 @@ package operations
import (
"context"
"fmt"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"strings"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/errors"
@ -107,9 +108,56 @@ type mongoID struct {
}
type OperationQuery struct {
Pagination pagination.Pagination
TxHash string
Address string
Pagination pagination.Pagination
TxHash string
Address string
SourceChainID *vaa.ChainID
TargetChainID *vaa.ChainID
AppID string
ExclusiveAppId bool
}
func buildQueryOperationsByChain(sourceChainID, targetChainID *vaa.ChainID) bson.D {
var allMatch bson.A
if sourceChainID != nil {
matchSourceChain := bson.M{"rawStandardizedProperties.fromChain": *sourceChainID}
allMatch = append(allMatch, matchSourceChain)
}
if targetChainID != nil {
matchTargetChain := bson.M{"rawStandardizedProperties.toChain": *targetChainID}
allMatch = append(allMatch, matchTargetChain)
}
if (sourceChainID != nil && targetChainID != nil) && (*sourceChainID == *targetChainID) {
return bson.D{{Key: "$match", Value: bson.M{"$or": allMatch}}}
}
return bson.D{{Key: "$match", Value: bson.M{"$and": allMatch}}}
}
func buildQueryOperationsByAppID(appID string, exclusive bool) []bson.D {
var result []bson.D
if appID == "" {
result = append(result, bson.D{{Key: "$match", Value: bson.M{}}})
return result
}
if exclusive {
result = append(result, bson.D{{Key: "$match", Value: bson.M{
"$and": bson.A{
bson.M{"rawStandardizedProperties.appIds": bson.M{"$eq": []string{appID}}},
bson.M{"rawStandardizedProperties.appIds": bson.M{"$size": 1}},
}}}})
return result
} else {
result = append(result, bson.D{{Key: "$match", Value: bson.M{"rawStandardizedProperties.appIds": bson.M{"$in": []string{appID}}}}})
}
return result
}
// findOperationsIdByAddress returns all operations filtered by address.
@ -186,6 +234,69 @@ func (r *Repository) matchOperationByTxHash(ctx context.Context, txHash string)
}}}}}
}
func (r *Repository) FindByChainAndAppId(ctx context.Context, query OperationQuery) ([]*OperationDto, error) {
var pipeline mongo.Pipeline
if query.SourceChainID != nil || query.TargetChainID != nil {
matchBySourceTargetChain := buildQueryOperationsByChain(query.SourceChainID, query.TargetChainID)
pipeline = append(pipeline, matchBySourceTargetChain)
}
if len(query.AppID) > 0 {
matchByAppId := buildQueryOperationsByAppID(query.AppID, query.ExclusiveAppId)
pipeline = append(pipeline, matchByAppId...)
}
pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{
bson.E{Key: "timestamp", Value: query.Pagination.GetSortInt()},
bson.E{Key: "_id", Value: -1},
}}})
// Skip initial results
pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}})
// Limit size of results
pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}})
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "vaas"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "vaas"}}}})
// lookup transferPrices
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "transferPrices"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "transferPrices"}}}})
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "globalTransactions"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "globalTransactions"}}}})
// add fields
pipeline = append(pipeline, bson.D{{Key: "$addFields", Value: bson.D{
{Key: "payload", Value: "$parsedPayload"},
{Key: "vaa", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$vaas", 0}}}},
{Key: "symbol", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$transferPrices.symbol", 0}}}},
{Key: "usdAmount", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$transferPrices.usdAmount", 0}}}},
{Key: "tokenAmount", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$transferPrices.tokenAmount", 0}}}},
{Key: "originTx", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$globalTransactions.originTx", 0}}}},
{Key: "destinationTx", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$globalTransactions.destinationTx", 0}}}},
}}})
// unset
pipeline = append(pipeline, bson.D{{Key: "$unset", Value: bson.A{"transferPrices"}}})
cur, err := r.collections.parsedVaa.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return nil, err
}
// Read results from cursor
var operations []*OperationDto
err = cur.All(ctx, &operations)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return nil, err
}
return operations, nil
}
// FindAll returns all operations filtered by q.
func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*OperationDto, error) {
@ -198,7 +309,6 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper
if err != nil {
return nil, err
}
if len(ids) == 0 {
return []*OperationDto{}, nil
}

View File

@ -32,9 +32,13 @@ func (s *Service) FindById(ctx context.Context, chainID vaa.ChainID,
}
type OperationFilter struct {
TxHash *types.TxHash
Address string
Pagination pagination.Pagination
TxHash *types.TxHash
Address string
SourceChainID *vaa.ChainID
TargetChainID *vaa.ChainID
AppID string
ExclusiveAppId bool
Pagination pagination.Pagination
}
// FindAll returns all operations filtered by q.
@ -45,9 +49,17 @@ func (s *Service) FindAll(ctx context.Context, filter OperationFilter) ([]*Opera
}
operationQuery := OperationQuery{
TxHash: txHash,
Address: filter.Address,
Pagination: filter.Pagination,
TxHash: txHash,
Address: filter.Address,
Pagination: filter.Pagination,
SourceChainID: filter.SourceChainID,
TargetChainID: filter.TargetChainID,
AppID: filter.AppID,
ExclusiveAppId: filter.ExclusiveAppId,
}
if operationQuery.AppID != "" || operationQuery.SourceChainID != nil || operationQuery.TargetChainID != nil {
return s.repo.FindByChainAndAppId(ctx, operationQuery)
}
operations, err := s.repo.FindAll(ctx, operationQuery)

View File

@ -61,6 +61,40 @@ func ExtractToChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
return &result, nil
}
func ExtractChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
return extractChainQueryParam(c, l, "chain")
}
func ExtractSourceChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
return extractChainQueryParam(c, l, "sourceChain")
}
func ExtractTargetChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
return extractChainQueryParam(c, l, "targetChain")
}
func extractChainQueryParam(c *fiber.Ctx, l *zap.Logger, queryParam string) (*sdk.ChainID, error) {
param := c.Query(queryParam)
if param == "" {
return nil, nil
}
chain, err := strconv.ParseInt(param, 10, 16)
if err != nil {
requestID := fmt.Sprintf("%v", c.Locals("requestid"))
l.Error("failed to parse toChain parameter",
zap.Error(err),
zap.String("requestID", requestID),
)
return nil, response.NewInvalidParamError(c, "INVALID CHAIN VALUE", errors.WithStack(err))
}
result := sdk.ChainID(chain)
return &result, nil
}
// ExtractEmitterAddr parses the emitter address from the request path.
//
// When the parameter `chainIdHint` is not nil, this function will attempt to parse the
@ -257,6 +291,14 @@ func ExtractAppId(c *fiber.Ctx, l *zap.Logger) string {
return c.Query("appId")
}
func ExtractExclusiveAppId(c *fiber.Ctx) (bool, error) {
query := c.Query("exclusiveAppId")
if query == "" {
return false, nil
}
return strconv.ParseBool(query)
}
func ExtractTimeSpan(c *fiber.Ctx, l *zap.Logger) (string, error) {
// get the timeSpan from query params
timeSpanStr := c.Query("timeSpan", "1d")

View File

@ -32,6 +32,10 @@ func NewController(operationService *operations.Service, logger *zap.Logger) *Co
// @Param txHash query string false "hash of the transaction"
// @Param page query integer false "page number"
// @Param pageSize query integer false "pageSize". Maximum value is 100.
// @Param sourceChain query string false "source chain of the operation".
// @Param targetChain query string false "target chain of the operation".
// @Param appId query string false "appID of the operation".
// @Param exclusiveAppId query boolean false "single appId of the operation".
// @Success 200 {object} []OperationResponse
// @Failure 400
// @Failure 500
@ -54,28 +58,55 @@ func (c *Controller) FindAll(ctx *fiber.Ctx) error {
return err
}
// Check if address and txHash query param are used together
if address != "" && txHash != nil {
if txHash.String() != "" {
return response.NewInvalidParamError(ctx, "address and txHash query param cannot be used together", nil)
}
searchByAddress := address != ""
searchByTxHash := txHash != nil && txHash.String() != ""
if searchByAddress && searchByTxHash {
return response.NewInvalidParamError(ctx, "address and txHash cannot be used at the same time", nil)
}
sourceChain, err := middleware.ExtractSourceChain(ctx, c.logger)
if err != nil {
return err
}
targetChain, err := middleware.ExtractTargetChain(ctx, c.logger)
if err != nil {
return err
}
appID := middleware.ExtractAppId(ctx, c.logger)
exclusiveAppId, err := middleware.ExtractExclusiveAppId(ctx)
if err != nil {
return err
}
searchBySourceTargetChain := sourceChain != nil || targetChain != nil
searchByAppId := appID != ""
if (searchByAddress || searchByTxHash) && (searchBySourceTargetChain || searchByAppId) {
return response.NewInvalidParamError(ctx, "address/txHash cannot be combined with sourceChain/targetChain/appId query filter", nil)
}
filter := operations.OperationFilter{
TxHash: txHash,
Address: address,
Pagination: *pagination,
TxHash: txHash,
Address: address,
SourceChainID: sourceChain,
TargetChainID: targetChain,
AppID: appID,
ExclusiveAppId: exclusiveAppId,
Pagination: *pagination,
}
// Find operations by q search param.
operations, err := c.srv.FindAll(ctx.Context(), filter)
ops, err := c.srv.FindAll(ctx.Context(), filter)
if err != nil {
return err
}
// build response
response := toListOperationResponse(operations, c.logger)
return ctx.JSON(response)
resp := toListOperationResponse(ops, c.logger)
return ctx.JSON(resp)
}
// FindById godoc

View File

@ -3,7 +3,6 @@ package migration
import (
"context"
"errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
@ -205,6 +204,43 @@ func Run(db *mongo.Database) error {
return err
}
// create index in parsedVaa collection by rawStandardizedProperties.appIds.
indexParsedVaaRawByAppIds := mongo.IndexModel{
Keys: bson.D{{Key: "rawStandardizedProperties.appIds", Value: 1},
{Key: "timestamp", Value: -1},
{Key: "_id", Value: -1},
}}
_, err = db.Collection("parsedVaa").Indexes().CreateOne(context.TODO(), indexParsedVaaRawByAppIds)
if err != nil && isNotAlreadyExistsError(err) {
return err
}
// create index for querying by fromChain
compoundIndexParsedVaaByFromToChain := mongo.IndexModel{
Keys: bson.D{
{Key: "rawStandardizedProperties.fromChain", Value: -1},
{Key: "timestamp", Value: -1},
{Key: "_id", Value: -1},
}}
_, err = db.Collection("parsedVaa").Indexes().CreateOne(context.TODO(), compoundIndexParsedVaaByFromToChain)
if err != nil && isNotAlreadyExistsError(err) {
return err
}
// create index for querying by toChain
indexParsedVaaByToChain := mongo.IndexModel{
Keys: bson.D{
{Key: "rawStandardizedProperties.toChain", Value: 1},
{Key: "timestamp", Value: -1},
{Key: "_id", Value: -1},
},
}
_, err = db.Collection("parsedVaa").Indexes().CreateOne(context.TODO(), indexParsedVaaByToChain)
if err != nil && isNotAlreadyExistsError(err) {
return err
}
return nil
}

View File

@ -18,5 +18,5 @@ type ParsedVaaUpdate struct {
RawStandardizedProperties vaaPayloadParser.StandardizedProperties `bson:"rawStandardizedProperties" json:"rawStandardizedProperties"`
StandardizedProperties vaaPayloadParser.StandardizedProperties `bson:"standardizedProperties" json:"standardizedProperties"`
UpdatedAt *time.Time `bson:"updatedAt" json:"updatedAt"`
Timestamp time.Time `bson:"-" json:"-"`
Timestamp time.Time `bson:"timestamp" json:"timestamp"`
}