Add parsedPayload query param to /vaa/:chainID/:address/:sequence (#102)
This commit is contained in:
parent
e929104ed9
commit
ca6710d5d5
|
@ -50,3 +50,18 @@ type VaaStats struct {
|
||||||
ChainID vaa.ChainID `bson:"_id" json:"chainId"`
|
ChainID vaa.ChainID `bson:"_id" json:"chainId"`
|
||||||
Count int64 `bson:"count" json:"count"`
|
Count int64 `bson:"count" json:"count"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// VaaWithPayload vaa document with payload.
|
||||||
|
type VaaWithPayload struct {
|
||||||
|
ID string `bson:"_id" json:"id"`
|
||||||
|
Version uint8 `bson:"version" json:"version"`
|
||||||
|
EmitterChain vaa.ChainID `bson:"emitterChain" json:"emitterChain"`
|
||||||
|
EmitterAddr string `bson:"emitterAddr" json:"emitterAddr"`
|
||||||
|
Sequence string `bson:"sequence" json:"-"`
|
||||||
|
GuardianSetIndex uint32 `bson:"guardianSetIndex" json:"guardianSetIndex"`
|
||||||
|
Vaa []byte `bson:"vaas" json:"vaa"`
|
||||||
|
Timestamp *time.Time `bson:"timestamp" json:"timestamp"`
|
||||||
|
UpdatedAt *time.Time `bson:"updatedAt" json:"updatedAt"`
|
||||||
|
IndexedAt *time.Time `bson:"indexedAt" json:"indexedAt"`
|
||||||
|
Payload map[string]interface{} `bson:"payload" json:"payload,omitempty"`
|
||||||
|
}
|
||||||
|
|
|
@ -93,6 +93,88 @@ func (r *Repository) FindOne(ctx context.Context, q *VaaQuery) (*VaaDoc, error)
|
||||||
return &vaaDoc, err
|
return &vaaDoc, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetVaaWithPayload get a vaa with payload if it exists.
|
||||||
|
// The input parameter [q *VaaQuery] define the filters to apply in the query.
|
||||||
|
func (r *Repository) GetVaaWithPayload(ctx context.Context, q *VaaQuery) (*VaaWithPayload, error) {
|
||||||
|
var err error
|
||||||
|
var cur *mongo.Cursor
|
||||||
|
|
||||||
|
matchStage1 := bson.D{
|
||||||
|
{Key: "$match", Value: bson.D{bson.E{Key: "emitterChain", Value: q.chainId}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
matchStage2 := bson.D{
|
||||||
|
{Key: "$match", Value: bson.D{bson.E{Key: "emitterAddr", Value: q.emitter}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
matchStage3 := bson.D{
|
||||||
|
{Key: "$match", Value: bson.D{bson.E{Key: "sequence", Value: q.sequence}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
lookupStage2 := bson.D{
|
||||||
|
{Key: "$lookup", Value: bson.D{
|
||||||
|
{Key: "from", Value: "parsedVaa"},
|
||||||
|
{Key: "localField", Value: "_id"},
|
||||||
|
{Key: "foreignField", Value: "_id"},
|
||||||
|
{Key: "as", Value: "payload"},
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
|
||||||
|
addFieldsStage3 := bson.D{
|
||||||
|
{Key: "$addFields", Value: bson.D{
|
||||||
|
{Key: "payload", Value: bson.M{
|
||||||
|
"$arrayElemAt": []interface{}{"$payload.result", 0},
|
||||||
|
}},
|
||||||
|
}},
|
||||||
|
}
|
||||||
|
|
||||||
|
pipeLine := mongo.Pipeline{
|
||||||
|
matchStage1,
|
||||||
|
matchStage2,
|
||||||
|
matchStage3,
|
||||||
|
lookupStage2,
|
||||||
|
addFieldsStage3,
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute aggregate operations.
|
||||||
|
if q.chainId == vaa.ChainIDPythNet {
|
||||||
|
cur, err = r.collections.vaasPythnet.Aggregate(ctx, pipeLine)
|
||||||
|
} else {
|
||||||
|
cur, err = r.collections.vaas.Aggregate(ctx, pipeLine)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
|
||||||
|
r.logger.Error("failed execute Aggregate command to get vaa with payload",
|
||||||
|
zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID))
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// decode cursor to array vaa with payload
|
||||||
|
var vaasWithPayload []*VaaWithPayload
|
||||||
|
err = cur.All(ctx, &vaasWithPayload)
|
||||||
|
if err != nil {
|
||||||
|
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
|
||||||
|
r.logger.Error("failed decoding cursor to []*VaaWithPayload", zap.Error(err), zap.Any("q", q),
|
||||||
|
zap.String("requestID", requestID))
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check not found
|
||||||
|
if len(vaasWithPayload) == 0 {
|
||||||
|
return nil, errs.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// check can not get more that one field in the response.
|
||||||
|
if len(vaasWithPayload) > 1 {
|
||||||
|
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
|
||||||
|
r.logger.Error("can not get more that one vaa by chainID/address/sequence", zap.Any("q", q),
|
||||||
|
zap.String("requestID", requestID))
|
||||||
|
return nil, errs.ErrInternalError
|
||||||
|
}
|
||||||
|
|
||||||
|
return vaasWithPayload[0], nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetVaaCount get a count of vaa by chainID.
|
// GetVaaCount get a count of vaa by chainID.
|
||||||
func (r *Repository) GetVaaCount(ctx context.Context, q *VaaQuery) ([]*VaaStats, error) {
|
func (r *Repository) GetVaaCount(ctx context.Context, q *VaaQuery) ([]*VaaStats, error) {
|
||||||
if q == nil {
|
if q == nil {
|
||||||
|
|
|
@ -56,18 +56,50 @@ func (s *Service) FindByEmitter(ctx context.Context, chain vaa.ChainID, emitter
|
||||||
return &res, err
|
return &res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// FindById get a vaa by chainID, emitter address and sequence number.
|
// If the parameter [payload] is true, the parse payload is added in the response.
|
||||||
func (s *Service) FindById(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) (*response.Response[*VaaDoc], error) {
|
func (s *Service) FindById(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string, payload bool) (*response.Response[*VaaWithPayload], error) {
|
||||||
// check vaa sequence indexed
|
// check vaa sequence indexed
|
||||||
isVaaNotIndexed := s.discardVaaNotIndexed(ctx, chain, emitter, seq)
|
isVaaNotIndexed := s.discardVaaNotIndexed(ctx, chain, emitter, seq)
|
||||||
if isVaaNotIndexed {
|
if isVaaNotIndexed {
|
||||||
return nil, errs.ErrNotFound
|
return nil, errs.ErrNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if payload {
|
||||||
|
vaaWithPayload, err := s.findByIdWithPayload(ctx, chain, emitter, seq)
|
||||||
|
resp := response.Response[*VaaWithPayload]{Data: vaaWithPayload}
|
||||||
|
return &resp, err
|
||||||
|
} else {
|
||||||
|
vaa, err := s.findById(ctx, chain, emitter, seq)
|
||||||
|
if err != nil {
|
||||||
|
return &response.Response[*VaaWithPayload]{}, err
|
||||||
|
}
|
||||||
|
vaaWithPayload := VaaWithPayload{
|
||||||
|
ID: vaa.ID,
|
||||||
|
Version: vaa.Version,
|
||||||
|
EmitterChain: vaa.EmitterChain,
|
||||||
|
EmitterAddr: vaa.EmitterAddr,
|
||||||
|
Sequence: vaa.Sequence,
|
||||||
|
GuardianSetIndex: vaa.GuardianSetIndex,
|
||||||
|
Timestamp: vaa.Timestamp,
|
||||||
|
IndexedAt: vaa.IndexedAt,
|
||||||
|
UpdatedAt: vaa.UpdatedAt,
|
||||||
|
Vaa: vaa.Vaa,
|
||||||
|
}
|
||||||
|
resp := response.Response[*VaaWithPayload]{Data: &vaaWithPayload}
|
||||||
|
return &resp, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// findById get a vaa by chainID, emitter address and sequence number.
|
||||||
|
func (s *Service) findById(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) (*VaaDoc, error) {
|
||||||
query := Query().SetChain(chain).SetEmitter(emitter.String()).SetSequence(seq)
|
query := Query().SetChain(chain).SetEmitter(emitter.String()).SetSequence(seq)
|
||||||
vaas, err := s.repo.FindOne(ctx, query)
|
return s.repo.FindOne(ctx, query)
|
||||||
res := response.Response[*VaaDoc]{Data: vaas}
|
}
|
||||||
return &res, err
|
|
||||||
|
// findByIdWithPayload get a vaa with payload data by chainID, emitter address and sequence number.
|
||||||
|
func (s *Service) findByIdWithPayload(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) (*VaaWithPayload, error) {
|
||||||
|
query := Query().SetChain(chain).SetEmitter(emitter.String()).SetSequence(seq)
|
||||||
|
return s.repo.GetVaaWithPayload(ctx, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetVaaCount get a list a list of vaa count grouped by chainID.
|
// GetVaaCount get a list a list of vaa count grouped by chainID.
|
||||||
|
|
|
@ -128,3 +128,13 @@ func GetTxHash(c *fiber.Ctx, l *zap.Logger) (*vaa.Address, error) {
|
||||||
}
|
}
|
||||||
return &txHashAddr, nil
|
return &txHashAddr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExtractParsedPayload get parsedPayload query parameter.
|
||||||
|
func ExtractParsedPayload(c *fiber.Ctx, l *zap.Logger) (bool, error) {
|
||||||
|
parsedPayloadStr := c.Query("parsedPayload", "false")
|
||||||
|
parsedPayload, err := strconv.ParseBool(parsedPayloadStr)
|
||||||
|
if err != nil {
|
||||||
|
return false, response.NewInvalidQueryParamError(c, "INVALID <parsedPayload> QUERY PARAMETER", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
return parsedPayload, nil
|
||||||
|
}
|
||||||
|
|
|
@ -126,3 +126,22 @@ func NewNotFoundError(ctx *fiber.Ctx) APIError {
|
||||||
}},
|
}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewInvalidQueryParamError create a query param error
|
||||||
|
func NewInvalidQueryParamError(ctx *fiber.Ctx, message string, err error) APIError {
|
||||||
|
if message == "" {
|
||||||
|
message = "INVALID QUERY PARAM"
|
||||||
|
}
|
||||||
|
detail := ErrorDetail{
|
||||||
|
RequestID: fmt.Sprintf("%v", ctx.Locals("requestid")),
|
||||||
|
}
|
||||||
|
if enableStackTrace && err != nil {
|
||||||
|
detail.StackTrace = fmt.Sprintf("%+v\n", err)
|
||||||
|
}
|
||||||
|
return APIError{
|
||||||
|
StatusCode: fiber.StatusBadRequest,
|
||||||
|
Code: InvalidParam,
|
||||||
|
Message: message,
|
||||||
|
Details: []ErrorDetail{detail},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ func (c *Controller) FindSignedVAAByID(ctx *fiber.Ctx) error {
|
||||||
// return response.NewApiError(ctx, fiber.StatusBadRequest, response.InvalidParam,
|
// return response.NewApiError(ctx, fiber.StatusBadRequest, response.InvalidParam,
|
||||||
// "not supported for PythNet", nil)
|
// "not supported for PythNet", nil)
|
||||||
//}
|
//}
|
||||||
vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10))
|
vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,7 +116,12 @@ func (c *Controller) FindById(ctx *fiber.Ctx) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10))
|
parsedPayload, err := middleware.ExtractParsedPayload(ctx, c.logger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10), parsedPayload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,7 +62,7 @@ func (h *Handler) GetSignedVAA(ctx context.Context, request *publicrpcv1.GetSign
|
||||||
sequence := strconv.FormatUint(request.MessageId.Sequence, 10)
|
sequence := strconv.FormatUint(request.MessageId.Sequence, 10)
|
||||||
|
|
||||||
// get vaa by Id.
|
// get vaa by Id.
|
||||||
vaa, err := h.vaaSrv.FindById(ctx, chainID, addr, sequence)
|
vaa, err := h.vaaSrv.FindById(ctx, chainID, addr, sequence, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, errs.ErrNotFound) {
|
if errors.Is(err, errs.ErrNotFound) {
|
||||||
return nil, status.Error(codes.NotFound, "requested VAA not found in store")
|
return nil, status.Error(codes.NotFound, "requested VAA not found in store")
|
||||||
|
|
Loading…
Reference in New Issue