This commit is contained in:
Mariano 2024-04-28 03:01:36 +00:00 committed by GitHub
commit 950ba5cbd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 380 additions and 90 deletions

View File

@ -2741,17 +2741,17 @@ const docTemplate = `{
"type": "string"
},
"sourceChain": {
"$ref": "#/definitions/operations.SourceChain"
"$ref": "#/definitions/operations.SourceChains"
},
"targetChain": {
"$ref": "#/definitions/operations.TargetChain"
"$ref": "#/definitions/operations.TargetChains"
},
"vaa": {
"$ref": "#/definitions/operations.Vaa"
}
}
},
"operations.SourceChain": {
"operations.SourceChains": {
"type": "object",
"properties": {
"attribute": {
@ -2815,7 +2815,7 @@ const docTemplate = `{
}
}
},
"operations.TargetChain": {
"operations.TargetChains": {
"type": "object",
"properties": {
"chainId": {

View File

@ -111,53 +111,46 @@ type OperationQuery struct {
Pagination pagination.Pagination
TxHash string
Address string
SourceChainID *vaa.ChainID
TargetChainID *vaa.ChainID
AppID string
SourceChainIDs []vaa.ChainID
TargetChainIDs []vaa.ChainID
AppIDs []string
ExclusiveAppId bool
}
func buildQueryOperationsByChain(sourceChainID, targetChainID *vaa.ChainID) bson.D {
func buildQueryOperationsByChain(sourceChainIDs, targetChainIDs []vaa.ChainID) bson.D {
var allMatch bson.A
if sourceChainID != nil {
matchSourceChain := bson.M{"rawStandardizedProperties.fromChain": *sourceChainID}
if len(sourceChainIDs) > 0 {
matchSourceChain := bson.M{"rawStandardizedProperties.fromChain": bson.M{"$in": sourceChainIDs}}
allMatch = append(allMatch, matchSourceChain)
}
if targetChainID != nil {
matchTargetChain := bson.M{"rawStandardizedProperties.toChain": *targetChainID}
if len(targetChainIDs) > 0 {
matchTargetChain := bson.M{"rawStandardizedProperties.toChain": bson.M{"$in": targetChainIDs}}
allMatch = append(allMatch, matchTargetChain)
}
if (sourceChainID != nil && targetChainID != nil) && (*sourceChainID == *targetChainID) {
if (len(sourceChainIDs) == 1 && len(targetChainIDs) == 1) && (sourceChainIDs[0] == targetChainIDs[0]) {
return bson.D{{Key: "$match", Value: bson.M{"$or": allMatch}}}
}
return bson.D{{Key: "$match", Value: bson.M{"$and": allMatch}}}
}
func buildQueryOperationsByAppID(appID string, exclusive bool) []bson.D {
var result []bson.D
if appID == "" {
result = append(result, bson.D{{Key: "$match", Value: bson.M{}}})
return result
func buildQueryOperationsByAppID(appIDs []string, exclusive bool) bson.D {
if !exclusive {
return bson.D{{Key: "$match", Value: bson.M{"rawStandardizedProperties.appIds": bson.M{"$in": appIDs}}}}
}
if exclusive {
result = append(result, bson.D{{Key: "$match", Value: bson.M{
"$and": bson.A{
bson.M{"rawStandardizedProperties.appIds": bson.M{"$eq": []string{appID}}},
bson.M{"rawStandardizedProperties.appIds": bson.M{"$size": 1}},
}}}})
return result
} else {
result = append(result, bson.D{{Key: "$match", Value: bson.M{"rawStandardizedProperties.appIds": bson.M{"$in": []string{appID}}}}})
matchAppID := bson.A{}
for _, appID := range appIDs {
cond := bson.M{"$and": bson.A{
bson.M{"rawStandardizedProperties.appIds": bson.M{"$eq": appID}},
bson.M{"rawStandardizedProperties.appIds": bson.M{"$size": 1}},
}}
matchAppID = append(matchAppID, cond)
}
return result
return bson.D{{Key: "$match", Value: bson.M{"$or": matchAppID}}}
}
// findOperationsIdByAddress returns all operations filtered by address.
@ -236,16 +229,36 @@ func (r *Repository) matchOperationByTxHash(ctx context.Context, txHash string)
func (r *Repository) FindByChainAndAppId(ctx context.Context, query OperationQuery) ([]*OperationDto, error) {
pipeline := BuildPipelineSearchByChainAndAppID(query)
cur, err := r.collections.parsedVaa.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return nil, err
}
// Read results from cursor
var operations []*OperationDto
err = cur.All(ctx, &operations)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return nil, err
}
return operations, nil
}
func BuildPipelineSearchByChainAndAppID(query OperationQuery) mongo.Pipeline {
var pipeline mongo.Pipeline
if query.SourceChainID != nil || query.TargetChainID != nil {
matchBySourceTargetChain := buildQueryOperationsByChain(query.SourceChainID, query.TargetChainID)
if len(query.SourceChainIDs) > 0 || len(query.TargetChainIDs) > 0 {
matchBySourceTargetChain := buildQueryOperationsByChain(query.SourceChainIDs, query.TargetChainIDs)
pipeline = append(pipeline, matchBySourceTargetChain)
}
if len(query.AppID) > 0 {
matchByAppId := buildQueryOperationsByAppID(query.AppID, query.ExclusiveAppId)
pipeline = append(pipeline, matchByAppId...)
if len(query.AppIDs) > 0 {
matchByAppId := buildQueryOperationsByAppID(query.AppIDs, query.ExclusiveAppId)
pipeline = append(pipeline, matchByAppId)
}
pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{
@ -279,22 +292,7 @@ func (r *Repository) FindByChainAndAppId(ctx context.Context, query OperationQue
// unset
pipeline = append(pipeline, bson.D{{Key: "$unset", Value: bson.A{"transferPrices"}}})
cur, err := r.collections.parsedVaa.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return nil, err
}
// Read results from cursor
var operations []*OperationDto
err = cur.All(ctx, &operations)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return nil, err
}
return operations, nil
return pipeline
}
// FindAll returns all operations filtered by q.

View File

@ -0,0 +1,235 @@
package operations_test
import (
"github.com/stretchr/testify/assert"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/operations"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"testing"
)
var sortStage = bson.D{{"$sort", bson.D{{"timestamp", -1}, {"_id", -1}}}}
var skipStage = bson.D{{"$skip", int64(0)}}
var limitStage = bson.D{{"$limit", int64(0)}}
var lookupVaasStage = bson.D{
{"$lookup", bson.D{
{"from", "vaas"},
{"localField", "_id"},
{"foreignField", "_id"},
{"as", "vaas"}}}}
var lookupTransferPricesStage = bson.D{{"$lookup", bson.D{
{"from", "transferPrices"},
{"localField", "_id"},
{"foreignField", "_id"},
{"as", "transferPrices"},
}}}
var lookupGlobalTransactionsStage = bson.D{{"$lookup", bson.D{
{"from", "globalTransactions"},
{"localField", "_id"},
{"foreignField", "_id"},
{"as", "globalTransactions"},
}}}
var addFieldsStage = bson.D{{"$addFields", bson.D{
{"payload", "$parsedPayload"},
{"vaa", bson.D{{"$arrayElemAt", bson.A{"$vaas", 0}}}},
{"symbol", bson.D{{"$arrayElemAt", bson.A{"$transferPrices.symbol", 0}}}},
{"usdAmount", bson.D{{"$arrayElemAt", bson.A{"$transferPrices.usdAmount", 0}}}},
{"tokenAmount", bson.D{{"$arrayElemAt", bson.A{"$transferPrices.tokenAmount", 0}}}},
{"originTx", bson.D{{"$arrayElemAt", bson.A{"$globalTransactions.originTx", 0}}}},
{"destinationTx", bson.D{{"$arrayElemAt", bson.A{"$globalTransactions.destinationTx", 0}}}},
}}}
var unSetStage = bson.D{{"$unset", bson.A{"transferPrices"}}}
func TestPipeline_FindByChainAndAppId(t *testing.T) {
cases := []struct {
name string
query operations.OperationQuery
expected mongo.Pipeline
}{
{
name: "Search with no query filters",
query: operations.OperationQuery{},
expected: mongo.Pipeline{
sortStage,
skipStage,
limitStage,
lookupVaasStage,
lookupTransferPricesStage,
lookupGlobalTransactionsStage,
addFieldsStage,
unSetStage,
},
},
{
name: "Search with single source_chain ",
query: operations.OperationQuery{
SourceChainIDs: []sdk.ChainID{1},
},
expected: mongo.Pipeline{
bson.D{{"$match", bson.M{"$and": bson.A{
bson.M{"rawStandardizedProperties.fromChain": bson.M{"$in": []sdk.ChainID{1}}},
}}}},
sortStage,
skipStage,
limitStage,
lookupVaasStage,
lookupTransferPricesStage,
lookupGlobalTransactionsStage,
addFieldsStage,
unSetStage,
},
},
{
name: "Search with multiple source_chain ",
query: operations.OperationQuery{
SourceChainIDs: []sdk.ChainID{1, 2},
},
expected: mongo.Pipeline{
bson.D{{"$match", bson.M{"$and": bson.A{
bson.M{"rawStandardizedProperties.fromChain": bson.M{"$in": []sdk.ChainID{1, 2}}},
}}}},
sortStage,
skipStage,
limitStage,
lookupVaasStage,
lookupTransferPricesStage,
lookupGlobalTransactionsStage,
addFieldsStage,
unSetStage,
},
},
{
name: "Search with single target_chain ",
query: operations.OperationQuery{
TargetChainIDs: []sdk.ChainID{1},
},
expected: mongo.Pipeline{
bson.D{{"$match", bson.M{"$and": bson.A{
bson.M{"rawStandardizedProperties.toChain": bson.M{"$in": []sdk.ChainID{1}}},
}}}},
sortStage,
skipStage,
limitStage,
lookupVaasStage,
lookupTransferPricesStage,
lookupGlobalTransactionsStage,
addFieldsStage,
unSetStage,
},
},
{
name: "Search with single target_chain ",
query: operations.OperationQuery{
TargetChainIDs: []sdk.ChainID{1, 2},
},
expected: mongo.Pipeline{
bson.D{{"$match", bson.M{"$and": bson.A{
bson.M{"rawStandardizedProperties.toChain": bson.M{"$in": []sdk.ChainID{1, 2}}},
}}}},
sortStage,
skipStage,
limitStage,
lookupVaasStage,
lookupTransferPricesStage,
lookupGlobalTransactionsStage,
addFieldsStage,
unSetStage,
},
},
{
name: "Search with same source and target chain",
query: operations.OperationQuery{
SourceChainIDs: []sdk.ChainID{1},
TargetChainIDs: []sdk.ChainID{1},
},
expected: mongo.Pipeline{
bson.D{{"$match", bson.M{"$or": bson.A{
bson.M{"rawStandardizedProperties.fromChain": bson.M{"$in": []sdk.ChainID{1}}},
bson.M{"rawStandardizedProperties.toChain": bson.M{"$in": []sdk.ChainID{1}}},
}}}},
sortStage,
skipStage,
limitStage,
lookupVaasStage,
lookupTransferPricesStage,
lookupGlobalTransactionsStage,
addFieldsStage,
unSetStage,
},
},
{
name: "Search with different source and target chain",
query: operations.OperationQuery{
SourceChainIDs: []sdk.ChainID{1},
TargetChainIDs: []sdk.ChainID{2},
},
expected: mongo.Pipeline{
bson.D{{"$match", bson.M{"$and": bson.A{
bson.M{"rawStandardizedProperties.fromChain": bson.M{"$in": []sdk.ChainID{1}}},
bson.M{"rawStandardizedProperties.toChain": bson.M{"$in": []sdk.ChainID{2}}},
}}}},
sortStage,
skipStage,
limitStage,
lookupVaasStage,
lookupTransferPricesStage,
lookupGlobalTransactionsStage,
addFieldsStage,
unSetStage,
},
},
{
name: "Search by appID exclusive",
query: operations.OperationQuery{
AppIDs: []string{"CCTP_WORMHOLE_INTEGRATION", "PORTAL_TOKEN_BRIDGE"},
ExclusiveAppId: true,
},
expected: mongo.Pipeline{
bson.D{{"$match", bson.M{"$or": bson.A{
bson.M{"$and": bson.A{
bson.M{"rawStandardizedProperties.appIds": bson.M{"$eq": "CCTP_WORMHOLE_INTEGRATION"}},
bson.M{"rawStandardizedProperties.appIds": bson.M{"$size": 1}},
}},
bson.M{"$and": bson.A{
bson.M{"rawStandardizedProperties.appIds": bson.M{"$eq": "PORTAL_TOKEN_BRIDGE"}},
bson.M{"rawStandardizedProperties.appIds": bson.M{"$size": 1}},
}},
}}}},
sortStage,
skipStage,
limitStage,
lookupVaasStage,
lookupTransferPricesStage,
lookupGlobalTransactionsStage,
addFieldsStage,
unSetStage,
},
},
{
name: "Search by appID exclusive false",
query: operations.OperationQuery{
AppIDs: []string{"CCTP_WORMHOLE_INTEGRATION", "PORTAL_TOKEN_BRIDGE"},
ExclusiveAppId: false,
},
expected: mongo.Pipeline{
bson.D{{"$match", bson.M{"rawStandardizedProperties.appIds": bson.M{"$in": []string{"CCTP_WORMHOLE_INTEGRATION", "PORTAL_TOKEN_BRIDGE"}}}}},
sortStage,
skipStage,
limitStage,
lookupVaasStage,
lookupTransferPricesStage,
lookupGlobalTransactionsStage,
addFieldsStage,
unSetStage,
},
},
}
for _, testCase := range cases {
t.Run(testCase.name, func(t *testing.T) {
result := operations.BuildPipelineSearchByChainAndAppID(testCase.query)
assert.Equal(t, testCase.expected, result, "Expected pipeline did not match actual pipeline")
})
}
}

View File

@ -34,9 +34,9 @@ func (s *Service) FindById(ctx context.Context, chainID vaa.ChainID,
type OperationFilter struct {
TxHash *types.TxHash
Address string
SourceChainID *vaa.ChainID
TargetChainID *vaa.ChainID
AppID string
SourceChainIDs []vaa.ChainID
TargetChainIDs []vaa.ChainID
AppIDs []string
ExclusiveAppId bool
Pagination pagination.Pagination
}
@ -52,13 +52,13 @@ func (s *Service) FindAll(ctx context.Context, filter OperationFilter) ([]*Opera
TxHash: txHash,
Address: filter.Address,
Pagination: filter.Pagination,
SourceChainID: filter.SourceChainID,
TargetChainID: filter.TargetChainID,
AppID: filter.AppID,
SourceChainIDs: filter.SourceChainIDs,
TargetChainIDs: filter.TargetChainIDs,
AppIDs: filter.AppIDs,
ExclusiveAppId: filter.ExclusiveAppId,
}
if operationQuery.AppID != "" || operationQuery.SourceChainID != nil || operationQuery.TargetChainID != nil {
if len(operationQuery.AppIDs) != 0 || len(operationQuery.SourceChainIDs) > 0 || len(operationQuery.TargetChainIDs) > 0 {
return s.repo.FindByChainAndAppId(ctx, operationQuery)
}

View File

@ -215,12 +215,12 @@ type TransactionDto struct {
}
type ChainActivityTopsQuery struct {
SourceChain *sdk.ChainID `json:"source_chain"`
TargetChain *sdk.ChainID `json:"target_chain"`
AppId string `json:"app_id"`
From time.Time `json:"from"`
To time.Time `json:"to"`
Timespan Timespan `json:"timespan"`
SourceChains []sdk.ChainID `json:"source_chain"`
TargetChains []sdk.ChainID `json:"target_chain"`
AppId string `json:"app_id"`
From time.Time `json:"from"`
To time.Time `json:"to"`
Timespan Timespan `json:"timespan"`
}
type Timespan string

View File

@ -1095,13 +1095,23 @@ func (r *Repository) buildChainActivityQueryTops(q ChainActivityTopsQuery) strin
}
filterTargetChain := ""
if q.TargetChain != nil {
filterTargetChain = "|> filter(fn: (r) => r.destination_chain == \"" + strconv.Itoa(int(*q.TargetChain)) + "\")"
if len(q.TargetChains) > 0 {
val := fmt.Sprintf("r.destination_chain == \"%d\"", q.TargetChains[0])
buff := ""
for _, tc := range q.TargetChains[1:] {
buff += fmt.Sprintf("or r.destination_chain == \"%d\" ", tc)
}
filterTargetChain = fmt.Sprintf("|> filter(fn: (r) => %s %s)", val, buff)
}
filterSourceChain := ""
if q.SourceChain != nil {
filterSourceChain = "|> filter(fn: (r) => r.emitter_chain == \"" + strconv.Itoa(int(*q.SourceChain)) + "\")"
if len(q.SourceChains) > 0 {
val := fmt.Sprintf("r.emitter_chain == \"%d\"", q.SourceChains[0])
buff := ""
for _, tc := range q.SourceChains[1:] {
buff += fmt.Sprintf("or r.emitter_chain == \"%d\" ", tc)
}
filterSourceChain = fmt.Sprintf("|> filter(fn: (r) => %s %s)", val, buff)
}
filterAppId := ""
@ -1109,7 +1119,7 @@ func (r *Repository) buildChainActivityQueryTops(q ChainActivityTopsQuery) strin
filterAppId = "|> filter(fn: (r) => r.app_id == \"" + q.AppId + "\")"
}
if q.TargetChain == nil && q.AppId == "" {
if len(q.TargetChains) == 0 && q.AppId == "" {
return r.buildQueryChainActivityTopsByEmitter(q, start, stop, filterSourceChain)
}

View File

@ -61,12 +61,54 @@ func ExtractToChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
return &result, nil
}
func ExtractSourceChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
return extractChainQueryParam(c, l, "sourceChain")
func ExtractSourceChain(c *fiber.Ctx, l *zap.Logger) ([]sdk.ChainID, error) {
param := c.Query("sourceChain")
if param == "" {
return nil, nil
}
result := make([]sdk.ChainID, 0, len(param))
for _, val := range strings.Split(param, ",") {
chain, err := parseChainIDParam(val)
if err != nil {
requestID := fmt.Sprintf("%v", c.Locals("requestid"))
l.Error("failed to parse sourceChain parameter",
zap.Error(err),
zap.String("requestID", requestID),
)
return nil, response.NewInvalidParamError(c, "INVALID SOURCE_CHAIN VALUE", errors.WithStack(err))
}
result = append(result, chain)
}
return result, nil
}
func ExtractTargetChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
return extractChainQueryParam(c, l, "targetChain")
func ExtractTargetChain(c *fiber.Ctx, l *zap.Logger) ([]sdk.ChainID, error) {
param := c.Query("targetChain")
if param == "" {
return nil, nil
}
result := make([]sdk.ChainID, 0, len(param))
for _, val := range strings.Split(param, ",") {
chain, err := parseChainIDParam(val)
if err != nil {
requestID := fmt.Sprintf("%v", c.Locals("requestid"))
l.Error("failed to parse targetChain parameter",
zap.Error(err),
zap.String("requestID", requestID),
)
return nil, response.NewInvalidParamError(c, "INVALID TARGET_CHAIN VALUE", errors.WithStack(err))
}
result = append(result, chain)
}
return result, nil
}
func parseChainIDParam(param string) (sdk.ChainID, error) {
chain, err := strconv.ParseInt(param, 10, 16)
if err != nil {
return sdk.ChainIDUnset, err
}
return sdk.ChainID(chain), nil
}
func extractChainQueryParam(c *fiber.Ctx, l *zap.Logger, queryParam string) (*sdk.ChainID, error) {

View File

@ -1,13 +1,13 @@
package operations
import (
"strconv"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/operations"
"github.com/wormhole-foundation/wormhole-explorer/api/middleware"
"github.com/wormhole-foundation/wormhole-explorer/api/response"
"go.uber.org/zap"
"strconv"
"strings"
)
// Controller is the controller for the operation resource.
@ -75,14 +75,19 @@ func (c *Controller) FindAll(ctx *fiber.Ctx) error {
return err
}
appID := middleware.ExtractAppId(ctx, c.logger)
var appIDs []string
appIDQueryParam := ctx.Query("appId")
if appIDQueryParam != "" {
appIDs = strings.Split(appIDQueryParam, ",")
}
exclusiveAppId, err := middleware.ExtractExclusiveAppId(ctx)
if err != nil {
return err
}
searchBySourceTargetChain := sourceChain != nil || targetChain != nil
searchByAppId := appID != ""
searchBySourceTargetChain := len(sourceChain) != 0 || targetChain != nil
searchByAppId := len(appIDs) != 0
if (searchByAddress || searchByTxHash) && (searchBySourceTargetChain || searchByAppId) {
return response.NewInvalidParamError(ctx, "address/txHash cannot be combined with sourceChain/targetChain/appId query filter", nil)
@ -91,9 +96,9 @@ func (c *Controller) FindAll(ctx *fiber.Ctx) error {
filter := operations.OperationFilter{
TxHash: txHash,
Address: address,
SourceChainID: sourceChain,
TargetChainID: targetChain,
AppID: appID,
SourceChainIDs: sourceChain,
TargetChainIDs: targetChain,
AppIDs: appIDs,
ExclusiveAppId: exclusiveAppId,
Pagination: *pagination,
}

View File

@ -200,11 +200,11 @@ func (c *Controller) GetTopAssets(ctx *fiber.Ctx) error {
// @Router /api/v1/x-chain-activity/tops [get]
func (c *Controller) GetChainActivityTops(ctx *fiber.Ctx) error {
sourceChain, err := middleware.ExtractSourceChain(ctx, c.logger)
sourceChains, err := middleware.ExtractSourceChain(ctx, c.logger)
if err != nil {
return err
}
targetChain, err := middleware.ExtractTargetChain(ctx, c.logger)
targetChains, err := middleware.ExtractTargetChain(ctx, c.logger)
if err != nil {
return err
}
@ -221,12 +221,12 @@ func (c *Controller) GetChainActivityTops(ctx *fiber.Ctx) error {
}
payload := transactions.ChainActivityTopsQuery{
SourceChain: sourceChain,
TargetChain: targetChain,
From: *from,
To: *to,
AppId: middleware.ExtractAppId(ctx, c.logger),
Timespan: transactions.Timespan(ctx.Query("timespan")),
SourceChains: sourceChains,
TargetChains: targetChains,
From: *from,
To: *to,
AppId: middleware.ExtractAppId(ctx, c.logger),
Timespan: transactions.Timespan(ctx.Query("timespan")),
}
if !payload.Timespan.IsValid() {