diff --git a/api/handlers/vaa/model.go b/api/handlers/vaa/model.go index 26d5ffae..9d6d875c 100644 --- a/api/handlers/vaa/model.go +++ b/api/handlers/vaa/model.go @@ -50,3 +50,18 @@ type VaaStats struct { ChainID vaa.ChainID `bson:"_id" json:"chainId"` Count int64 `bson:"count" json:"count"` } + +// VaaWithPayload vaa document with payload. +type VaaWithPayload struct { + ID string `bson:"_id" json:"id"` + Version uint8 `bson:"version" json:"version"` + EmitterChain vaa.ChainID `bson:"emitterChain" json:"emitterChain"` + EmitterAddr string `bson:"emitterAddr" json:"emitterAddr"` + Sequence string `bson:"sequence" json:"-"` + GuardianSetIndex uint32 `bson:"guardianSetIndex" json:"guardianSetIndex"` + Vaa []byte `bson:"vaas" json:"vaa"` + Timestamp *time.Time `bson:"timestamp" json:"timestamp"` + UpdatedAt *time.Time `bson:"updatedAt" json:"updatedAt"` + IndexedAt *time.Time `bson:"indexedAt" json:"indexedAt"` + Payload map[string]interface{} `bson:"payload" json:"payload,omitempty"` +} diff --git a/api/handlers/vaa/repository.go b/api/handlers/vaa/repository.go index f8821938..ab1c9367 100644 --- a/api/handlers/vaa/repository.go +++ b/api/handlers/vaa/repository.go @@ -93,6 +93,88 @@ func (r *Repository) FindOne(ctx context.Context, q *VaaQuery) (*VaaDoc, error) return &vaaDoc, err } +// GetVaaWithPayload get a vaa with payload if it exists. +// The input parameter [q *VaaQuery] define the filters to apply in the query. +func (r *Repository) GetVaaWithPayload(ctx context.Context, q *VaaQuery) (*VaaWithPayload, error) { + var err error + var cur *mongo.Cursor + + matchStage1 := bson.D{ + {Key: "$match", Value: bson.D{bson.E{Key: "emitterChain", Value: q.chainId}}}, + } + + matchStage2 := bson.D{ + {Key: "$match", Value: bson.D{bson.E{Key: "emitterAddr", Value: q.emitter}}}, + } + + matchStage3 := bson.D{ + {Key: "$match", Value: bson.D{bson.E{Key: "sequence", Value: q.sequence}}}, + } + + lookupStage2 := bson.D{ + {Key: "$lookup", Value: bson.D{ + {Key: "from", Value: "parsedVaa"}, + {Key: "localField", Value: "_id"}, + {Key: "foreignField", Value: "_id"}, + {Key: "as", Value: "payload"}, + }}, + } + + addFieldsStage3 := bson.D{ + {Key: "$addFields", Value: bson.D{ + {Key: "payload", Value: bson.M{ + "$arrayElemAt": []interface{}{"$payload.result", 0}, + }}, + }}, + } + + pipeLine := mongo.Pipeline{ + matchStage1, + matchStage2, + matchStage3, + lookupStage2, + addFieldsStage3, + } + + // execute aggregate operations. + if q.chainId == vaa.ChainIDPythNet { + cur, err = r.collections.vaasPythnet.Aggregate(ctx, pipeLine) + } else { + cur, err = r.collections.vaas.Aggregate(ctx, pipeLine) + } + if err != nil { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + r.logger.Error("failed execute Aggregate command to get vaa with payload", + zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID)) + return nil, errors.WithStack(err) + } + + // decode cursor to array vaa with payload + var vaasWithPayload []*VaaWithPayload + err = cur.All(ctx, &vaasWithPayload) + if err != nil { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + r.logger.Error("failed decoding cursor to []*VaaWithPayload", zap.Error(err), zap.Any("q", q), + zap.String("requestID", requestID)) + return nil, errors.WithStack(err) + } + + // check not found + if len(vaasWithPayload) == 0 { + return nil, errs.ErrNotFound + } + + // check can not get more that one field in the response. + if len(vaasWithPayload) > 1 { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + r.logger.Error("can not get more that one vaa by chainID/address/sequence", zap.Any("q", q), + zap.String("requestID", requestID)) + return nil, errs.ErrInternalError + } + + return vaasWithPayload[0], nil +} + // GetVaaCount get a count of vaa by chainID. func (r *Repository) GetVaaCount(ctx context.Context, q *VaaQuery) ([]*VaaStats, error) { if q == nil { diff --git a/api/handlers/vaa/service.go b/api/handlers/vaa/service.go index e678ac4d..1c53e8d2 100644 --- a/api/handlers/vaa/service.go +++ b/api/handlers/vaa/service.go @@ -56,18 +56,50 @@ func (s *Service) FindByEmitter(ctx context.Context, chain vaa.ChainID, emitter return &res, err } -// FindById get a vaa by chainID, emitter address and sequence number. -func (s *Service) FindById(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) (*response.Response[*VaaDoc], error) { +// If the parameter [payload] is true, the parse payload is added in the response. +func (s *Service) FindById(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string, payload bool) (*response.Response[*VaaWithPayload], error) { // check vaa sequence indexed isVaaNotIndexed := s.discardVaaNotIndexed(ctx, chain, emitter, seq) if isVaaNotIndexed { return nil, errs.ErrNotFound } + if payload { + vaaWithPayload, err := s.findByIdWithPayload(ctx, chain, emitter, seq) + resp := response.Response[*VaaWithPayload]{Data: vaaWithPayload} + return &resp, err + } else { + vaa, err := s.findById(ctx, chain, emitter, seq) + if err != nil { + return &response.Response[*VaaWithPayload]{}, err + } + vaaWithPayload := VaaWithPayload{ + ID: vaa.ID, + Version: vaa.Version, + EmitterChain: vaa.EmitterChain, + EmitterAddr: vaa.EmitterAddr, + Sequence: vaa.Sequence, + GuardianSetIndex: vaa.GuardianSetIndex, + Timestamp: vaa.Timestamp, + IndexedAt: vaa.IndexedAt, + UpdatedAt: vaa.UpdatedAt, + Vaa: vaa.Vaa, + } + resp := response.Response[*VaaWithPayload]{Data: &vaaWithPayload} + return &resp, err + } +} + +// findById get a vaa by chainID, emitter address and sequence number. +func (s *Service) findById(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) (*VaaDoc, error) { query := Query().SetChain(chain).SetEmitter(emitter.String()).SetSequence(seq) - vaas, err := s.repo.FindOne(ctx, query) - res := response.Response[*VaaDoc]{Data: vaas} - return &res, err + return s.repo.FindOne(ctx, query) +} + +// findByIdWithPayload get a vaa with payload data by chainID, emitter address and sequence number. +func (s *Service) findByIdWithPayload(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) (*VaaWithPayload, error) { + query := Query().SetChain(chain).SetEmitter(emitter.String()).SetSequence(seq) + return s.repo.GetVaaWithPayload(ctx, query) } // GetVaaCount get a list a list of vaa count grouped by chainID. diff --git a/api/middleware/extract_parameters.go b/api/middleware/extract_parameters.go index 1ec1dbfd..438e003d 100644 --- a/api/middleware/extract_parameters.go +++ b/api/middleware/extract_parameters.go @@ -128,3 +128,13 @@ func GetTxHash(c *fiber.Ctx, l *zap.Logger) (*vaa.Address, error) { } return &txHashAddr, nil } + +// ExtractParsedPayload get parsedPayload query parameter. +func ExtractParsedPayload(c *fiber.Ctx, l *zap.Logger) (bool, error) { + parsedPayloadStr := c.Query("parsedPayload", "false") + parsedPayload, err := strconv.ParseBool(parsedPayloadStr) + if err != nil { + return false, response.NewInvalidQueryParamError(c, "INVALID QUERY PARAMETER", errors.WithStack(err)) + } + return parsedPayload, nil +} diff --git a/api/response/error.go b/api/response/error.go index fe374c78..42b8278d 100644 --- a/api/response/error.go +++ b/api/response/error.go @@ -126,3 +126,22 @@ func NewNotFoundError(ctx *fiber.Ctx) APIError { }}, } } + +// NewInvalidQueryParamError create a query param error +func NewInvalidQueryParamError(ctx *fiber.Ctx, message string, err error) APIError { + if message == "" { + message = "INVALID QUERY PARAM" + } + detail := ErrorDetail{ + RequestID: fmt.Sprintf("%v", ctx.Locals("requestid")), + } + if enableStackTrace && err != nil { + detail.StackTrace = fmt.Sprintf("%+v\n", err) + } + return APIError{ + StatusCode: fiber.StatusBadRequest, + Code: InvalidParam, + Message: message, + Details: []ErrorDetail{detail}, + } +} diff --git a/api/routes/guardian/vaa/controller.go b/api/routes/guardian/vaa/controller.go index bdb50c67..25ca15a4 100644 --- a/api/routes/guardian/vaa/controller.go +++ b/api/routes/guardian/vaa/controller.go @@ -45,7 +45,7 @@ func (c *Controller) FindSignedVAAByID(ctx *fiber.Ctx) error { // return response.NewApiError(ctx, fiber.StatusBadRequest, response.InvalidParam, // "not supported for PythNet", nil) //} - vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10)) + vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10), false) if err != nil { return err } diff --git a/api/routes/wormscan/vaa/controller.go b/api/routes/wormscan/vaa/controller.go index 2e85ec78..660f0376 100644 --- a/api/routes/wormscan/vaa/controller.go +++ b/api/routes/wormscan/vaa/controller.go @@ -116,7 +116,12 @@ func (c *Controller) FindById(ctx *fiber.Ctx) error { return err } - vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10)) + parsedPayload, err := middleware.ExtractParsedPayload(ctx, c.logger) + if err != nil { + return err + } + + vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10), parsedPayload) if err != nil { return err } diff --git a/api/rpc/handler.go b/api/rpc/handler.go index 40e39054..5af301e4 100644 --- a/api/rpc/handler.go +++ b/api/rpc/handler.go @@ -62,7 +62,7 @@ func (h *Handler) GetSignedVAA(ctx context.Context, request *publicrpcv1.GetSign sequence := strconv.FormatUint(request.MessageId.Sequence, 10) // get vaa by Id. - vaa, err := h.vaaSrv.FindById(ctx, chainID, addr, sequence) + vaa, err := h.vaaSrv.FindById(ctx, chainID, addr, sequence, false) if err != nil { if errors.Is(err, errs.ErrNotFound) { return nil, status.Error(codes.NotFound, "requested VAA not found in store")