insert filtering by chain and by appid as stages in aggregation pipeline

This commit is contained in:
Mariano 2024-03-25 23:31:30 -03:00
parent 3ff29e1639
commit cdfc975eea
1 changed files with 47 additions and 71 deletions

View File

@ -118,52 +118,36 @@ type OperationQuery struct {
}
func buildQueryOperationsByChain(sourceChainID, targetChainID *vaa.ChainID) bson.D {
var allMatch bson.A
if sourceChainID != nil {
matchByEmitterChain := bson.M{"emitterChain": *sourceChainID}
matchByRawStandardizedProperties := bson.M{"rawStandardizedProperties.fromChain": *sourceChainID}
matchByStandardizedProperties := bson.M{"standardizedProperties.fromChain": *sourceChainID}
matchSourceChain := bson.D{
{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"$or": bson.A{
bson.M{"rawStandardizedProperties.fromChain": sourceChainID},
bson.M{"standardizedProperties.fromChain": sourceChainID},
bson.M{"emitterChain": sourceChainID},
}}}},
{Key: "$or", Value: bson.A{
bson.D{{Key: "vaas", Value: bson.M{"$elemMatch": matchByEmitterChain}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": matchByRawStandardizedProperties}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": matchByStandardizedProperties}}},
}},
}
allMatch = append(allMatch, matchSourceChain)
}
if targetChainID != nil {
matchTargetChain := bson.D{
{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"$or": bson.A{
bson.M{"rawStandardizedProperties.toChain": targetChainID},
bson.M{"standardizedProperties.toChain": targetChainID},
bson.M{"parsedPayload.targetChainId": targetChainID},
}}}},
{Key: "$or", Value: bson.A{
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"targetChain": *targetChainID}}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"rawStandardizedProperties.toChain": *targetChainID}}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"standardizedProperties.toChain": *targetChainID}}}},
}},
}
allMatch = append(allMatch, matchTargetChain)
}
/*
if sourceChainID != nil {
matchSourceChain := bson.D{{Key: "$or", Value: bson.A{
//bson.D{{Key: "parsedVaa.rawStandardizedProperties.fromChain", Value: bson.M{"$eq": sourceChainID}}},
//bson.D{{Key: "parsedVaa.standardizedProperties.fromChain", Value: bson.M{"$eq": sourceChainID}}},
}}}
allMatch = append(allMatch, matchSourceChain)
}
if targetChainID != nil {
matchTargetChain := bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "parsedVaa.parsedPayload.toChain", Value: bson.M{"$eq": targetChainID}}},
bson.D{{Key: "parsedVaa.parsedPayload.targetChainId", Value: bson.M{"$eq": targetChainID}}},
bson.D{{Key: "parsedVaa.standardizedProperties.toChain", Value: bson.M{"$eq": targetChainID}}},
bson.D{{Key: "parsedVaa.rawStandardizedProperties.toChain", Value: bson.M{"$eq": targetChainID}}},
}}}
allMatch = append(allMatch, matchTargetChain)
}
*/
return bson.D{{Key: "$match", Value: bson.D{{Key: "$and", Value: allMatch}}}}
return bson.D{{Key: "$match", Value: bson.M{"$and": allMatch}}}
}
func buildQueryOperationsByAppID(appID string, exclusive bool) bson.D {
@ -180,10 +164,9 @@ func buildQueryOperationsByAppID(appID string, exclusive bool) bson.D {
}
matchParsedVaa := bson.D{{Key: "$match", Value: bson.D{{Key: "$or", Value: bson.A{
bson.D{{Key: "parsedVaa.appIds", Value: appIdsCondition}},
bson.D{{Key: "parsedVaa.rawStandardizedProperties.appIds", Value: appIdsCondition}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"appIds": appIdsCondition}}}},
bson.D{{Key: "parsedVaa", Value: bson.M{"$elemMatch": bson.M{"rawStandardizedProperties.appIds": appIdsCondition}}}},
bson.D{{Key: "standardizedProperties.appIds", Value: appIdsCondition}},
//bson.D{{Key: "parsedVaa.standardizedProperties.appIds", Value: appIdsCondition}},
}}}}}
return matchParsedVaa
@ -378,6 +361,7 @@ func (r *Repository) matchOperationByTxHash(ctx context.Context, txHash string)
}}}}}
}
/*
func (r *Repository) findOpsIdByChainAndAppId(ctx context.Context, query OperationQuery) ([]string, error) {
var pipeline mongo.Pipeline
@ -406,16 +390,11 @@ func (r *Repository) findOpsIdByChainAndAppId(ctx context.Context, query Operati
}
return ids, nil
}
*/
// FindAll returns all operations filtered by q.
func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*OperationDto, error) {
defer func() {
if er := recover(); er != nil {
r.logger.Error("recovered from panic in FindAll", zap.Any("recovered", er))
}
}()
var pipeline mongo.Pipeline
// filter operations by address or txHash
@ -434,21 +413,8 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper
matchByTxHash := r.matchOperationByTxHash(ctx, query.TxHash)
pipeline = append(pipeline, matchByTxHash)
}
/*
if query.SourceChainID != nil || query.TargetChainID != nil || query.AppID != "" {
// find all ids that match by source and target chain id
ids, err := r.findOpsIdByChainAndAppId(ctx, query)
if err != nil {
return nil, err
}
if len(ids) == 0 {
return []*OperationDto{}, nil
}
pipeline = append(pipeline, bson.D{{Key: "$match", Value: bson.D{{Key: "_id", Value: bson.D{{Key: "$in", Value: ids}}}}}})
}
*/
// sort
pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{
bson.E{Key: "originTx.timestamp", Value: query.Pagination.GetSortInt()},
@ -461,6 +427,7 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper
// Limit size of results
pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}})
*/
// lookup vaas
pipeline = append(pipeline, bson.D{{Key: "$lookup", Value: bson.D{{Key: "from", Value: "vaas"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "vaas"}}}})
@ -478,12 +445,21 @@ func (r *Repository) FindAll(ctx context.Context, query OperationQuery) ([]*Oper
pipeline = append(pipeline, matchBySourceTargetChain)
}
/*
if len(query.AppID) > 0 {
matchByAppId := buildQueryOperationsByAppID(query.AppID, query.ExclusiveAppId)
pipeline = append(pipeline, matchByAppId)
}
*/
pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{
bson.E{Key: "originTx.timestamp", Value: query.Pagination.GetSortInt()},
bson.E{Key: "_id", Value: -1},
}}})
// Skip initial results
pipeline = append(pipeline, bson.D{{Key: "$skip", Value: query.Pagination.Skip}})
// Limit size of results
pipeline = append(pipeline, bson.D{{Key: "$limit", Value: query.Pagination.Limit}})
// add fields
pipeline = append(pipeline, bson.D{{Key: "$addFields", Value: bson.D{