From ed0985d50251213478abbf0606bd305e81f8fd63 Mon Sep 17 00:00:00 2001 From: agodnic Date: Tue, 31 Jan 2023 15:24:34 -0300 Subject: [PATCH] [API] Add parsed payloads to `GET /api/v1/vaas` (#113) Add query parameter `parsedPayload` to the endpoint `GET /api/v1/vaas`. --- api/handlers/vaa/repository.go | 132 ++++++++++++++------------ api/handlers/vaa/service.go | 74 +++++++++++++-- api/routes/wormscan/vaa/controller.go | 21 +++- 3 files changed, 156 insertions(+), 71 deletions(-) diff --git a/api/handlers/vaa/repository.go b/api/handlers/vaa/repository.go index ab1c9367..474affa4 100644 --- a/api/handlers/vaa/repository.go +++ b/api/handlers/vaa/repository.go @@ -93,54 +93,69 @@ func (r *Repository) FindOne(ctx context.Context, q *VaaQuery) (*VaaDoc, error) return &vaaDoc, err } -// GetVaaWithPayload get a vaa with payload if it exists. -// The input parameter [q *VaaQuery] define the filters to apply in the query. -func (r *Repository) GetVaaWithPayload(ctx context.Context, q *VaaQuery) (*VaaWithPayload, error) { +// FindVaasWithPayload returns VAAs that include a parsed payload. +// The input parameter `q` defines the filters to be applied in the query. +func (r *Repository) FindVaasWithPayload( + ctx context.Context, + q *VaaQuery, +) ([]*VaaWithPayload, error) { + + // build a query pipeline based on input parameters + var pipeline mongo.Pipeline + { + // filter by emitterChain + if q.chainId != 0 { + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{bson.E{"emitterChain", q.chainId}}}, + }) + } + + // filter by emitterAddr + if q.emitter != "" { + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{bson.E{"emitterAddr", q.emitter}}}, + }) + } + + // filter by sequence + if q.sequence != "" { + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{bson.E{"sequence", q.sequence}}}, + }) + } + + // left outer join on the `parsedVaa` collection + pipeline = append(pipeline, bson.D{ + {"$lookup", bson.D{ + {"from", "parsedVaa"}, + {"localField", "_id"}, + {"foreignField", "_id"}, + {"as", "payload"}, + }}, + }) + + // add parsed payload fields + pipeline = append(pipeline, bson.D{ + {"$addFields", bson.D{ + {"payload", bson.M{ + "$arrayElemAt": []interface{}{"$payload.result", 0}, + }}, + }}, + }) + + // limit size of results + pipeline = append(pipeline, bson.D{ + {"$limit", q.Pagination.PageSize}, + }) + } + + // execute the aggregation pipeline var err error var cur *mongo.Cursor - - matchStage1 := bson.D{ - {Key: "$match", Value: bson.D{bson.E{Key: "emitterChain", Value: q.chainId}}}, - } - - matchStage2 := bson.D{ - {Key: "$match", Value: bson.D{bson.E{Key: "emitterAddr", Value: q.emitter}}}, - } - - matchStage3 := bson.D{ - {Key: "$match", Value: bson.D{bson.E{Key: "sequence", Value: q.sequence}}}, - } - - lookupStage2 := bson.D{ - {Key: "$lookup", Value: bson.D{ - {Key: "from", Value: "parsedVaa"}, - {Key: "localField", Value: "_id"}, - {Key: "foreignField", Value: "_id"}, - {Key: "as", Value: "payload"}, - }}, - } - - addFieldsStage3 := bson.D{ - {Key: "$addFields", Value: bson.D{ - {Key: "payload", Value: bson.M{ - "$arrayElemAt": []interface{}{"$payload.result", 0}, - }}, - }}, - } - - pipeLine := mongo.Pipeline{ - matchStage1, - matchStage2, - matchStage3, - lookupStage2, - addFieldsStage3, - } - - // execute aggregate operations. if q.chainId == vaa.ChainIDPythNet { - cur, err = r.collections.vaasPythnet.Aggregate(ctx, pipeLine) + cur, err = r.collections.vaasPythnet.Aggregate(ctx, pipeline) } else { - cur, err = r.collections.vaas.Aggregate(ctx, pipeLine) + cur, err = r.collections.vaas.Aggregate(ctx, pipeline) } if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) @@ -149,30 +164,20 @@ func (r *Repository) GetVaaWithPayload(ctx context.Context, q *VaaQuery) (*VaaWi return nil, errors.WithStack(err) } - // decode cursor to array vaa with payload + // read results from cursor var vaasWithPayload []*VaaWithPayload err = cur.All(ctx, &vaasWithPayload) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) - r.logger.Error("failed decoding cursor to []*VaaWithPayload", zap.Error(err), zap.Any("q", q), - zap.String("requestID", requestID)) + r.logger.Error("failed decoding cursor to []*VaaWithPayload", + zap.Error(err), + zap.Any("q", q), + zap.String("requestID", requestID), + ) return nil, errors.WithStack(err) } - // check not found - if len(vaasWithPayload) == 0 { - return nil, errs.ErrNotFound - } - - // check can not get more that one field in the response. - if len(vaasWithPayload) > 1 { - requestID := fmt.Sprintf("%v", ctx.Value("requestid")) - r.logger.Error("can not get more that one vaa by chainID/address/sequence", zap.Any("q", q), - zap.String("requestID", requestID)) - return nil, errs.ErrInternalError - } - - return vaasWithPayload[0], nil + return vaasWithPayload, nil } // GetVaaCount get a count of vaa by chainID. @@ -180,7 +185,10 @@ func (r *Repository) GetVaaCount(ctx context.Context, q *VaaQuery) ([]*VaaStats, if q == nil { q = Query() } - sort := bson.D{{q.SortBy, q.GetSortInt()}} + sort := bson.D{{ + q.SortBy, + q.GetSortInt(), + }} cur, err := r.collections.vaaCount.Find(ctx, q.toBSON(), options.Find().SetLimit(q.PageSize).SetSkip(q.Offset).SetSort(sort)) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) diff --git a/api/handlers/vaa/service.go b/api/handlers/vaa/service.go index 1c53e8d2..9800a4fe 100644 --- a/api/handlers/vaa/service.go +++ b/api/handlers/vaa/service.go @@ -27,17 +27,57 @@ func NewService(r *Repository, getCacheFunc cache.CacheGetFunc, logger *zap.Logg } // FindAll get all the the vaa. -func (s *Service) FindAll(ctx context.Context, p *pagination.Pagination, txHash *vaa.Address) (*response.Response[[]*VaaDoc], error) { +func (s *Service) FindAll( + ctx context.Context, + p *pagination.Pagination, + txHash *vaa.Address, + includeParsedPayload bool, +) (*response.Response[[]*VaaWithPayload], error) { + if p == nil { p = pagination.FirstPage() } + query := Query().SetPagination(p) if txHash != nil { query = query.SetTxHash(txHash.String()) } - vaas, err := s.repo.Find(ctx, query) - res := response.Response[[]*VaaDoc]{Data: vaas} - return &res, err + + if includeParsedPayload { + vaas, err := s.repo.FindVaasWithPayload(ctx, query) + if err != nil { + return nil, err + } + + return &response.Response[[]*VaaWithPayload]{Data: vaas}, nil + + } else { + vaas, err := s.repo.Find(ctx, query) + if err != nil { + return nil, err + } + + var vaasWithPayload []*VaaWithPayload + for i := range vaas { + vaaWithPayload := VaaWithPayload{ + ID: vaas[i].ID, + Version: vaas[i].Version, + EmitterChain: vaas[i].EmitterChain, + EmitterAddr: vaas[i].EmitterAddr, + Sequence: vaas[i].Sequence, + GuardianSetIndex: vaas[i].GuardianSetIndex, + Timestamp: vaas[i].Timestamp, + IndexedAt: vaas[i].IndexedAt, + UpdatedAt: vaas[i].UpdatedAt, + Vaa: vaas[i].Vaa, + } + + vaasWithPayload = append(vaasWithPayload, &vaaWithPayload) + } + + resp := response.Response[[]*VaaWithPayload]{Data: vaasWithPayload} + return &resp, err + } } // FindByChain get all the vaa by chainID. @@ -57,7 +97,14 @@ func (s *Service) FindByEmitter(ctx context.Context, chain vaa.ChainID, emitter } // If the parameter [payload] is true, the parse payload is added in the response. -func (s *Service) FindById(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string, payload bool) (*response.Response[*VaaWithPayload], error) { +func (s *Service) FindById( + ctx context.Context, + chain vaa.ChainID, + emitter vaa.Address, + seq string, + payload bool, +) (*response.Response[*VaaWithPayload], error) { + // check vaa sequence indexed isVaaNotIndexed := s.discardVaaNotIndexed(ctx, chain, emitter, seq) if isVaaNotIndexed { @@ -99,7 +146,22 @@ func (s *Service) findById(ctx context.Context, chain vaa.ChainID, emitter vaa.A // findByIdWithPayload get a vaa with payload data by chainID, emitter address and sequence number. func (s *Service) findByIdWithPayload(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) (*VaaWithPayload, error) { query := Query().SetChain(chain).SetEmitter(emitter.String()).SetSequence(seq) - return s.repo.GetVaaWithPayload(ctx, query) + + vaas, err := s.repo.FindVaasWithPayload(ctx, query) + if err != nil { + return nil, err + } else if len(vaas) == 0 { + return nil, errs.ErrNotFound + } else if len(vaas) > 1 { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + s.logger.Error("can not get more that one vaa by chainID/address/sequence", + zap.Any("q", query), + zap.String("requestID", requestID), + ) + return nil, errs.ErrInternalError + } + + return vaas[0], nil } // GetVaaCount get a list a list of vaa count grouped by chainID. diff --git a/api/routes/wormscan/vaa/controller.go b/api/routes/wormscan/vaa/controller.go index 660f0376..32b8e4d6 100644 --- a/api/routes/wormscan/vaa/controller.go +++ b/api/routes/wormscan/vaa/controller.go @@ -34,12 +34,20 @@ func NewController(serv *vaa.Service, logger *zap.Logger) *Controller { // @Failure 500 // @Router /api/v1/vaas/ [get] func (c *Controller) FindAll(ctx *fiber.Ctx) error { + p := middleware.GetPaginationFromContext(ctx) + txHash, err := middleware.GetTxHash(ctx, c.logger) if err != nil { return err } - vaas, err := c.srv.FindAll(ctx.Context(), p, txHash) + + includeParsedPayload, err := middleware.ExtractParsedPayload(ctx, c.logger) + if err != nil { + return err + } + + vaas, err := c.srv.FindAll(ctx.Context(), p, txHash, includeParsedPayload) if err != nil { return err } @@ -111,17 +119,24 @@ func (c *Controller) FindByEmitter(ctx *fiber.Ctx) error { // @Failure 500 // @Router /api/v1/vaas/{chain_id}/{emitter}/{seq}/{signer}/{hash} [get] func (c *Controller) FindById(ctx *fiber.Ctx) error { + chainID, emitter, seq, err := middleware.ExtractVAAParams(ctx, c.logger) if err != nil { return err } - parsedPayload, err := middleware.ExtractParsedPayload(ctx, c.logger) + includeParsedPayload, err := middleware.ExtractParsedPayload(ctx, c.logger) if err != nil { return err } - vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10), parsedPayload) + vaa, err := c.srv.FindById( + ctx.Context(), + chainID, + *emitter, + strconv.FormatUint(seq, 10), + includeParsedPayload, + ) if err != nil { return err }