diff --git a/api/handlers/operations/repository.go b/api/handlers/operations/repository.go index bb9be2de..6312e808 100644 --- a/api/handlers/operations/repository.go +++ b/api/handlers/operations/repository.go @@ -122,31 +122,19 @@ func buildQueryOperationsByChain(sourceChainID, targetChainID *vaa.ChainID) bson var allMatch bson.A if sourceChainID != nil { - matchByEmitterChain := bson.M{"emitterChain": *sourceChainID} - matchByRawStandardizedProperties := bson.M{"rawStandardizedProperties.fromChain": *sourceChainID} - matchByStandardizedProperties := bson.M{"standardizedProperties.fromChain": *sourceChainID} - - matchSourceChain := bson.D{ - {Key: "$or", Value: bson.A{ - bson.D{{Key: "vaas", Value: bson.M{"$elemMatch": matchByEmitterChain}}}, - bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": matchByRawStandardizedProperties}}}, - bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": matchByStandardizedProperties}}}, - }}, - } + matchSourceChain := bson.M{"rawStandardizedProperties.fromChain": *sourceChainID} allMatch = append(allMatch, matchSourceChain) } if targetChainID != nil { - matchTargetChain := bson.D{ - {Key: "$or", Value: bson.A{ - bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"targetChain": *targetChainID}}}}, - bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"rawStandardizedProperties.toChain": *targetChainID}}}}, - bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"standardizedProperties.toChain": *targetChainID}}}}, - }}, - } + matchTargetChain := bson.M{"rawStandardizedProperties.toChain": *targetChainID} allMatch = append(allMatch, matchTargetChain) } + if (sourceChainID != nil && targetChainID != nil) && (*sourceChainID == *targetChainID) { + return bson.D{{Key: "$match", Value: bson.M{"$or": allMatch}}} + } + return bson.D{{Key: "$match", Value: bson.M{"$and": allMatch}}} } @@ -163,130 +151,11 @@ func buildQueryOperationsByAppID(appID string, exclusive bool) bson.D { appIdsCondition = bson.M{"$in": []string{appID}} } - matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{ - bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"appIds": appIdsCondition}}}}, - bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"rawStandardizedProperties.appIds": appIdsCondition}}}}, - bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"standardizedProperties.appIds": appIdsCondition}}}}, - }}}}} + matchParsedVaa := bson.D{{Key: "$match", Value: bson.M{"rawStandardizedProperties.appIds": appIdsCondition}}} return matchParsedVaa } -// strict flag is to force that both source and target chain id must match -func findOperationsIdByChain(ctx context.Context, db *mongo.Database, sourceChainID, targetChainID *vaa.ChainID, strict bool) ([]string, error) { - - var allMatch bson.A - - if sourceChainID != nil { - matchSourceChain := bson.D{{Key: "$or", Value: bson.A{ - bson.D{{Key: "rawStandardizedProperties.fromChain", Value: bson.M{"$eq": sourceChainID}}}, - bson.D{{Key: "standardizedProperties.fromChain", Value: bson.M{"$eq": sourceChainID}}}, - }}} - allMatch = append(allMatch, matchSourceChain) - } - if targetChainID != nil { - matchTargetChain := bson.D{{Key: "$or", Value: bson.A{ - bson.D{{Key: "parsedPayload.toChain", Value: bson.M{"$eq": targetChainID}}}, - bson.D{{Key: "parsedPayload.targetChainId", Value: bson.M{"$eq": targetChainID}}}, - bson.D{{Key: "standardizedProperties.toChain", Value: bson.M{"$eq": targetChainID}}}, - bson.D{{Key: "rawStandardizedProperties.toChain", Value: bson.M{"$eq": targetChainID}}}, - }}} - allMatch = append(allMatch, matchTargetChain) - } - - var matchParsedVaa bson.D - if strict { - matchParsedVaa = bson.D{{Key: "$match", Value: bson.D{{Key: "$and", Value: allMatch}}}} - } else { - matchParsedVaa = bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: allMatch}}}} - } - - cur, err := db.Collection("parsedVaa").Aggregate(ctx, mongo.Pipeline{matchParsedVaa}) - if err != nil { - return nil, err - } - var documents []mongoID - err = cur.All(ctx, &documents) - if err != nil { - return nil, err - } - var ids []string - for _, doc := range documents { - ids = append(ids, doc.Id) - } - return ids, nil -} - -/* -func findOperationsIdByChain(ctx context.Context, db *mongo.Database, chainId vaa.ChainID) ([]string, error) { - - matchGlobalTransactions := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{ - bson.D{{Key: "destinationTx.chainId", Value: bson.M{"$eq": chainId}}}}, - }}}} - - matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{ - bson.D{{Key: "parsedPayload.targetChainId", Value: bson.M{"$eq": chainId}}}, - bson.D{{Key: "rawStandardizedProperties.fromChain", Value: bson.M{"$eq": chainId}}}, - bson.D{{Key: "rawStandardizedProperties.toChain", Value: bson.M{"$eq": chainId}}}, - bson.D{{Key: "standardizedProperties.fromChain", Value: bson.M{"$eq": chainId}}}, - bson.D{{Key: "standardizedProperties.toChain", Value: bson.M{"$eq": chainId}}}, - }}}}} - - globalTransactionFilter := bson.D{{Key: "$unionWith", Value: bson.D{{Key: "coll", Value: "globalTransactions"}, {Key: "pipeline", Value: bson.A{matchGlobalTransactions}}}}} - parserFilter := bson.D{{Key: "$unionWith", Value: bson.D{{Key: "coll", Value: "parsedVaa"}, {Key: "pipeline", Value: bson.A{matchParsedVaa}}}}} - group := bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: "$_id"}}}} - pipeline := []bson.D{globalTransactionFilter, parserFilter, group} - - cur, err := db.Collection("_operationsTemporal").Aggregate(ctx, pipeline) - if err != nil { - return nil, err - } - var documents []mongoID - err = cur.All(ctx, &documents) - if err != nil { - return nil, err - } - var ids []string - for _, doc := range documents { - ids = append(ids, doc.Id) - } - return ids, nil -} - - -func findOperationsIdByAppID(ctx context.Context, db *mongo.Database, appID string, exclusive bool) ([]string, error) { - - var appIdsCondition interface{} - if exclusive { - appIdsCondition = bson.M{"$eq": []string{appID}} - } else { - appIdsCondition = bson.M{"$in": []string{appID}} - } - - matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{ - bson.D{{Key: "appIds", Value: appIdsCondition}}, - bson.D{{Key: "rawStandardizedProperties.appIds", Value: appIdsCondition}}, - bson.D{{Key: "standardizedProperties.appIds", Value: appIdsCondition}}, - }}}}} - - query := mongo.Pipeline{matchParsedVaa} - cur, err := db.Collection("parsedVaa").Aggregate(ctx, query) - if err != nil { - return nil, err - } - var documents []mongoID - err = cur.All(ctx, &documents) - if err != nil { - return nil, err - } - var ids []string - for _, doc := range documents { - ids = append(ids, doc.Id) - } - return ids, nil -} -*/ - // findOperationsIdByAddress returns all operations filtered by address. func findOperationsIdByAddress(ctx context.Context, db *mongo.Database, address string, pagination *pagination.Pagination) ([]string, error) { addressHex := strings.ToLower(address) @@ -361,8 +230,7 @@ func (r *Repository) matchOperationByTxHash(ctx context.Context, txHash string) }}}}} } -/* -func (r *Repository) findOpsIdByChainAndAppId(ctx context.Context, query OperationQuery) ([]string, error) { +func (r *Repository) FindByChainAndAppId(ctx context.Context, query OperationQuery) ([]*OperationDto, error) { var pipeline mongo.Pipeline @@ -370,27 +238,63 @@ func (r *Repository) findOpsIdByChainAndAppId(ctx context.Context, query Operati matchBySourceTargetChain := buildQueryOperationsByChain(query.SourceChainID, query.TargetChainID) pipeline = append(pipeline, matchBySourceTargetChain) } + if len(query.AppID) > 0 { matchByAppId := buildQueryOperationsByAppID(query.AppID, query.ExclusiveAppId) pipeline = append(pipeline, matchByAppId) } - cur, err := r.db.Collection("parsedVaa").Aggregate(ctx, pipeline) + pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{ + bson.E{Key: "updatedAt", Value: query.Pagination.GetSortInt()}, + bson.E{Key: "_id", Value: -1}, + }}}) + + // Skip initial results + pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}}) + + // Limit size of results + pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}}) + + pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "vaas"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "vaas"}}}}) + + // lookup globalTransactions + pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "globalTransactions"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "globalTransactions"}}}}) + + // lookup transferPrices + pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "transferPrices"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "transferPrices"}}}}) + + // lookup parsedVaa + pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "parsedVaa"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "parsedVaa"}}}}) + + // add fields + pipeline = append(pipeline, bson.D{{Key: "$addFields", Value: bson.D{ + {Key: "payload", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$parsedVaa.parsedPayload", 0}}}}, + {Key: "vaa", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$vaas", 0}}}}, + {Key: "standardizedProperties", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$parsedVaa.standardizedProperties", 0}}}}, + {Key: "symbol", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$transferPrices.symbol", 0}}}}, + {Key: "usdAmount", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$transferPrices.usdAmount", 0}}}}, + {Key: "tokenAmount", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$transferPrices.tokenAmount", 0}}}}, + }}}) + + // unset + pipeline = append(pipeline, bson.D{{Key: "$unset", Value: bson.A{"transferPrices", "parsedVaa"}}}) + + cur, err := r.collections.parsedVaa.Aggregate(ctx, pipeline) if err != nil { + r.logger.Error("failed execute aggregation pipeline", zap.Error(err)) return nil, err } - var documents []mongoID - err = cur.All(ctx, &documents) + + // Read results from cursor + var operations []*OperationDto + err = cur.All(ctx, &operations) if err != nil { + r.logger.Error("failed to decode cursor", zap.Error(err)) return nil, err } - var ids []string - for _, doc := range documents { - ids = append(ids, doc.Id) - } - return ids, nil + + return operations, nil } -*/ // FindAll returns all operations filtered by q. func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*OperationDto, error) { @@ -414,20 +318,18 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper pipeline = append(pipeline, matchByTxHash) } - /* - // sort - pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{ - bson.E{Key: "originTx.timestamp", Value: query.Pagination.GetSortInt()}, - bson.E{Key: "_id", Value: -1}, - }}}) + // sort + pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{ + bson.E{Key: "originTx.timestamp", Value: query.Pagination.GetSortInt()}, + bson.E{Key: "_id", Value: -1}, + }}}) - // Skip initial results - pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}}) + // Skip initial results + pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}}) - // Limit size of results - pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}}) + // Limit size of results + pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}}) - */ // lookup vaas pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "vaas"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "vaas"}}}}) @@ -440,27 +342,6 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper // lookup parsedVaa pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "parsedVaa"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "parsedVaa"}}}}) - if query.SourceChainID != nil || query.TargetChainID != nil { - matchBySourceTargetChain := buildQueryOperationsByChain(query.SourceChainID, query.TargetChainID) - pipeline = append(pipeline, matchBySourceTargetChain) - } - - if len(query.AppID) > 0 { - matchByAppId := buildQueryOperationsByAppID(query.AppID, query.ExclusiveAppId) - pipeline = append(pipeline, matchByAppId) - } - - pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{ - bson.E{Key: "originTx.timestamp", Value: query.Pagination.GetSortInt()}, - bson.E{Key: "_id", Value: -1}, - }}}) - - // Skip initial results - pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}}) - - // Limit size of results - pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}}) - // add fields pipeline = append(pipeline, bson.D{{Key: "$addFields", Value: bson.D{ {Key: "payload", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$parsedVaa.parsedPayload", 0}}}}, diff --git a/api/handlers/operations/service.go b/api/handlers/operations/service.go index 43b5bf4d..340a2efa 100644 --- a/api/handlers/operations/service.go +++ b/api/handlers/operations/service.go @@ -58,6 +58,10 @@ func (s *Service) FindAll(ctx context.Context, filter OperationFilter) ([]*Opera ExclusiveAppId: filter.ExclusiveAppId, } + if operationQuery.AppID != "" || operationQuery.SourceChainID != nil || operationQuery.TargetChainID != nil { + return s.repo.FindByChainAndAppId(ctx, operationQuery) + } + operations, err := s.repo.FindAll(ctx, operationQuery) if err != nil { return nil, err