move query to a separate pipeline which starts from parsedVAA

This commit is contained in:
Mariano 2024-03-26 18:25:15 -03:00
parent c0dc5ccc77
commit 015f8f16ad
2 changed files with 66 additions and 181 deletions

View File

@ -122,31 +122,19 @@ func buildQueryOperationsByChain(sourceChainID, targetChainID *vaa.ChainID) bson
var allMatch bson.A
if sourceChainID != nil {
matchByEmitterChain := bson.M{"emitterChain": *sourceChainID}
matchByRawStandardizedProperties := bson.M{"rawStandardizedProperties.fromChain": *sourceChainID}
matchByStandardizedProperties := bson.M{"standardizedProperties.fromChain": *sourceChainID}
matchSourceChain := bson.D{
{Key: "$or", Value: bson.A{
bson.D{{Key: "vaas", Value: bson.M{"$elemMatch": matchByEmitterChain}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": matchByRawStandardizedProperties}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": matchByStandardizedProperties}}},
}},
}
matchSourceChain := bson.M{"rawStandardizedProperties.fromChain": *sourceChainID}
allMatch = append(allMatch, matchSourceChain)
}
if targetChainID != nil {
matchTargetChain := bson.D{
{Key: "$or", Value: bson.A{
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"targetChain": *targetChainID}}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"rawStandardizedProperties.toChain": *targetChainID}}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"standardizedProperties.toChain": *targetChainID}}}},
}},
}
matchTargetChain := bson.M{"rawStandardizedProperties.toChain": *targetChainID}
allMatch = append(allMatch, matchTargetChain)
}
if (sourceChainID != nil && targetChainID != nil) && (*sourceChainID == *targetChainID) {
return bson.D{{Key: "$match", Value: bson.M{"$or": allMatch}}}
}
return bson.D{{Key: "$match", Value: bson.M{"$and": allMatch}}}
}
@ -163,130 +151,11 @@ func buildQueryOperationsByAppID(appID string, exclusive bool) bson.D {
appIdsCondition = bson.M{"$in": []string{appID}}
}
matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"appIds": appIdsCondition}}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"rawStandardizedProperties.appIds": appIdsCondition}}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"standardizedProperties.appIds": appIdsCondition}}}},
}}}}}
matchParsedVaa := bson.D{{Key: "$match", Value: bson.M{"rawStandardizedProperties.appIds": appIdsCondition}}}
return matchParsedVaa
}
// strict flag is to force that both source and target chain id must match
func findOperationsIdByChain(ctx context.Context, db *mongo.Database, sourceChainID, targetChainID *vaa.ChainID, strict bool) ([]string, error) {
var allMatch bson.A
if sourceChainID != nil {
matchSourceChain := bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "rawStandardizedProperties.fromChain", Value: bson.M{"$eq": sourceChainID}}},
bson.D{{Key: "standardizedProperties.fromChain", Value: bson.M{"$eq": sourceChainID}}},
}}}
allMatch = append(allMatch, matchSourceChain)
}
if targetChainID != nil {
matchTargetChain := bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "parsedPayload.toChain", Value: bson.M{"$eq": targetChainID}}},
bson.D{{Key: "parsedPayload.targetChainId", Value: bson.M{"$eq": targetChainID}}},
bson.D{{Key: "standardizedProperties.toChain", Value: bson.M{"$eq": targetChainID}}},
bson.D{{Key: "rawStandardizedProperties.toChain", Value: bson.M{"$eq": targetChainID}}},
}}}
allMatch = append(allMatch, matchTargetChain)
}
var matchParsedVaa bson.D
if strict {
matchParsedVaa = bson.D{{Key: "$match", Value: bson.D{{Key: "$and", Value: allMatch}}}}
} else {
matchParsedVaa = bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: allMatch}}}}
}
cur, err := db.Collection("parsedVaa").Aggregate(ctx, mongo.Pipeline{matchParsedVaa})
if err != nil {
return nil, err
}
var documents []mongoID
err = cur.All(ctx, &documents)
if err != nil {
return nil, err
}
var ids []string
for _, doc := range documents {
ids = append(ids, doc.Id)
}
return ids, nil
}
/*
func findOperationsIdByChain(ctx context.Context, db *mongo.Database, chainId vaa.ChainID) ([]string, error) {
matchGlobalTransactions := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "destinationTx.chainId", Value: bson.M{"$eq": chainId}}}},
}}}}
matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "parsedPayload.targetChainId", Value: bson.M{"$eq": chainId}}},
bson.D{{Key: "rawStandardizedProperties.fromChain", Value: bson.M{"$eq": chainId}}},
bson.D{{Key: "rawStandardizedProperties.toChain", Value: bson.M{"$eq": chainId}}},
bson.D{{Key: "standardizedProperties.fromChain", Value: bson.M{"$eq": chainId}}},
bson.D{{Key: "standardizedProperties.toChain", Value: bson.M{"$eq": chainId}}},
}}}}}
globalTransactionFilter := bson.D{{Key: "$unionWith", Value: bson.D{{Key: "coll", Value: "globalTransactions"}, {Key: "pipeline", Value: bson.A{matchGlobalTransactions}}}}}
parserFilter := bson.D{{Key: "$unionWith", Value: bson.D{{Key: "coll", Value: "parsedVaa"}, {Key: "pipeline", Value: bson.A{matchParsedVaa}}}}}
group := bson.D{{Key: "$group", Value: bson.D{{Key: "_id", Value: "$_id"}}}}
pipeline := []bson.D{globalTransactionFilter, parserFilter, group}
cur, err := db.Collection("_operationsTemporal").Aggregate(ctx, pipeline)
if err != nil {
return nil, err
}
var documents []mongoID
err = cur.All(ctx, &documents)
if err != nil {
return nil, err
}
var ids []string
for _, doc := range documents {
ids = append(ids, doc.Id)
}
return ids, nil
}
func findOperationsIdByAppID(ctx context.Context, db *mongo.Database, appID string, exclusive bool) ([]string, error) {
var appIdsCondition interface{}
if exclusive {
appIdsCondition = bson.M{"$eq": []string{appID}}
} else {
appIdsCondition = bson.M{"$in": []string{appID}}
}
matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "appIds", Value: appIdsCondition}},
bson.D{{Key: "rawStandardizedProperties.appIds", Value: appIdsCondition}},
bson.D{{Key: "standardizedProperties.appIds", Value: appIdsCondition}},
}}}}}
query := mongo.Pipeline{matchParsedVaa}
cur, err := db.Collection("parsedVaa").Aggregate(ctx, query)
if err != nil {
return nil, err
}
var documents []mongoID
err = cur.All(ctx, &documents)
if err != nil {
return nil, err
}
var ids []string
for _, doc := range documents {
ids = append(ids, doc.Id)
}
return ids, nil
}
*/
// findOperationsIdByAddress returns all operations filtered by address.
func findOperationsIdByAddress(ctx context.Context, db *mongo.Database, address string, pagination *pagination.Pagination) ([]string, error) {
addressHex := strings.ToLower(address)
@ -361,8 +230,7 @@ func (r *Repository) matchOperationByTxHash(ctx context.Context, txHash string)
}}}}}
}
/*
func (r *Repository) findOpsIdByChainAndAppId(ctx context.Context, query OperationQuery) ([]string, error) {
func (r *Repository) FindByChainAndAppId(ctx context.Context, query OperationQuery) ([]*OperationDto, error) {
var pipeline mongo.Pipeline
@ -370,27 +238,63 @@ func (r *Repository) findOpsIdByChainAndAppId(ctx context.Context, query Operati
matchBySourceTargetChain := buildQueryOperationsByChain(query.SourceChainID, query.TargetChainID)
pipeline = append(pipeline, matchBySourceTargetChain)
}
if len(query.AppID) > 0 {
matchByAppId := buildQueryOperationsByAppID(query.AppID, query.ExclusiveAppId)
pipeline = append(pipeline, matchByAppId)
}
cur, err := r.db.Collection("parsedVaa").Aggregate(ctx, pipeline)
pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{
bson.E{Key: "updatedAt", Value: query.Pagination.GetSortInt()},
bson.E{Key: "_id", Value: -1},
}}})
// Skip initial results
pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}})
// Limit size of results
pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}})
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "vaas"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "vaas"}}}})
// lookup globalTransactions
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "globalTransactions"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "globalTransactions"}}}})
// lookup transferPrices
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "transferPrices"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "transferPrices"}}}})
// lookup parsedVaa
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "parsedVaa"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "parsedVaa"}}}})
// add fields
pipeline = append(pipeline, bson.D{{Key: "$addFields", Value: bson.D{
{Key: "payload", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$parsedVaa.parsedPayload", 0}}}},
{Key: "vaa", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$vaas", 0}}}},
{Key: "standardizedProperties", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$parsedVaa.standardizedProperties", 0}}}},
{Key: "symbol", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$transferPrices.symbol", 0}}}},
{Key: "usdAmount", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$transferPrices.usdAmount", 0}}}},
{Key: "tokenAmount", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$transferPrices.tokenAmount", 0}}}},
}}})
// unset
pipeline = append(pipeline, bson.D{{Key: "$unset", Value: bson.A{"transferPrices", "parsedVaa"}}})
cur, err := r.collections.parsedVaa.Aggregate(ctx, pipeline)
if err != nil {
r.logger.Error("failed execute aggregation pipeline", zap.Error(err))
return nil, err
}
var documents []mongoID
err = cur.All(ctx, &documents)
// Read results from cursor
var operations []*OperationDto
err = cur.All(ctx, &operations)
if err != nil {
r.logger.Error("failed to decode cursor", zap.Error(err))
return nil, err
}
var ids []string
for _, doc := range documents {
ids = append(ids, doc.Id)
}
return ids, nil
return operations, nil
}
*/
// FindAll returns all operations filtered by q.
func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*OperationDto, error) {
@ -414,20 +318,18 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper
pipeline = append(pipeline, matchByTxHash)
}
/*
// sort
pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{
bson.E{Key: "originTx.timestamp", Value: query.Pagination.GetSortInt()},
bson.E{Key: "_id", Value: -1},
}}})
// sort
pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{
bson.E{Key: "originTx.timestamp", Value: query.Pagination.GetSortInt()},
bson.E{Key: "_id", Value: -1},
}}})
// Skip initial results
pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}})
// Skip initial results
pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}})
// Limit size of results
pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}})
// Limit size of results
pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}})
*/
// lookup vaas
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "vaas"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "vaas"}}}})
@ -440,27 +342,6 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper
// lookup parsedVaa
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "parsedVaa"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "parsedVaa"}}}})
if query.SourceChainID != nil || query.TargetChainID != nil {
matchBySourceTargetChain := buildQueryOperationsByChain(query.SourceChainID, query.TargetChainID)
pipeline = append(pipeline, matchBySourceTargetChain)
}
if len(query.AppID) > 0 {
matchByAppId := buildQueryOperationsByAppID(query.AppID, query.ExclusiveAppId)
pipeline = append(pipeline, matchByAppId)
}
pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{
bson.E{Key: "originTx.timestamp", Value: query.Pagination.GetSortInt()},
bson.E{Key: "_id", Value: -1},
}}})
// Skip initial results
pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}})
// Limit size of results
pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}})
// add fields
pipeline = append(pipeline, bson.D{{Key: "$addFields", Value: bson.D{
{Key: "payload", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$parsedVaa.parsedPayload", 0}}}},

View File

@ -58,6 +58,10 @@ func (s *Service) FindAll(ctx context.Context, filter OperationFilter) ([]*Opera
ExclusiveAppId: filter.ExclusiveAppId,
}
if operationQuery.AppID != "" || operationQuery.SourceChainID != nil || operationQuery.TargetChainID != nil {
return s.repo.FindByChainAndAppId(ctx, operationQuery)
}
operations, err := s.repo.FindAll(ctx, operationQuery)
if err != nil {
return nil, err