[API] Add parsed payloads to `GET /api/v1/vaas` (#113)

Add query parameter `parsedPayload` to the endpoint `GET /api/v1/vaas`.
This commit is contained in:
agodnic 2023-01-31 15:24:34 -03:00 committed by GitHub
parent 1c7e58a27a
commit ed0985d502
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 156 additions and 71 deletions

View File

@ -93,54 +93,69 @@ func (r *Repository) FindOne(ctx context.Context, q *VaaQuery) (*VaaDoc, error)
return &vaaDoc, err
}
// GetVaaWithPayload get a vaa with payload if it exists.
// The input parameter [q *VaaQuery] define the filters to apply in the query.
func (r *Repository) GetVaaWithPayload(ctx context.Context, q *VaaQuery) (*VaaWithPayload, error) {
// FindVaasWithPayload returns VAAs that include a parsed payload.
// The input parameter `q` defines the filters to be applied in the query.
func (r *Repository) FindVaasWithPayload(
ctx context.Context,
q *VaaQuery,
) ([]*VaaWithPayload, error) {
// build a query pipeline based on input parameters
var pipeline mongo.Pipeline
{
// filter by emitterChain
if q.chainId != 0 {
pipeline = append(pipeline, bson.D{
{"$match", bson.D{bson.E{"emitterChain", q.chainId}}},
})
}
// filter by emitterAddr
if q.emitter != "" {
pipeline = append(pipeline, bson.D{
{"$match", bson.D{bson.E{"emitterAddr", q.emitter}}},
})
}
// filter by sequence
if q.sequence != "" {
pipeline = append(pipeline, bson.D{
{"$match", bson.D{bson.E{"sequence", q.sequence}}},
})
}
// left outer join on the `parsedVaa` collection
pipeline = append(pipeline, bson.D{
{"$lookup", bson.D{
{"from", "parsedVaa"},
{"localField", "_id"},
{"foreignField", "_id"},
{"as", "payload"},
}},
})
// add parsed payload fields
pipeline = append(pipeline, bson.D{
{"$addFields", bson.D{
{"payload", bson.M{
"$arrayElemAt": []interface{}{"$payload.result", 0},
}},
}},
})
// limit size of results
pipeline = append(pipeline, bson.D{
{"$limit", q.Pagination.PageSize},
})
}
// execute the aggregation pipeline
var err error
var cur *mongo.Cursor
matchStage1 := bson.D{
{Key: "$match", Value: bson.D{bson.E{Key: "emitterChain", Value: q.chainId}}},
}
matchStage2 := bson.D{
{Key: "$match", Value: bson.D{bson.E{Key: "emitterAddr", Value: q.emitter}}},
}
matchStage3 := bson.D{
{Key: "$match", Value: bson.D{bson.E{Key: "sequence", Value: q.sequence}}},
}
lookupStage2 := bson.D{
{Key: "$lookup", Value: bson.D{
{Key: "from", Value: "parsedVaa"},
{Key: "localField", Value: "_id"},
{Key: "foreignField", Value: "_id"},
{Key: "as", Value: "payload"},
}},
}
addFieldsStage3 := bson.D{
{Key: "$addFields", Value: bson.D{
{Key: "payload", Value: bson.M{
"$arrayElemAt": []interface{}{"$payload.result", 0},
}},
}},
}
pipeLine := mongo.Pipeline{
matchStage1,
matchStage2,
matchStage3,
lookupStage2,
addFieldsStage3,
}
// execute aggregate operations.
if q.chainId == vaa.ChainIDPythNet {
cur, err = r.collections.vaasPythnet.Aggregate(ctx, pipeLine)
cur, err = r.collections.vaasPythnet.Aggregate(ctx, pipeline)
} else {
cur, err = r.collections.vaas.Aggregate(ctx, pipeLine)
cur, err = r.collections.vaas.Aggregate(ctx, pipeline)
}
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
@ -149,30 +164,20 @@ func (r *Repository) GetVaaWithPayload(ctx context.Context, q *VaaQuery) (*VaaWi
return nil, errors.WithStack(err)
}
// decode cursor to array vaa with payload
// read results from cursor
var vaasWithPayload []*VaaWithPayload
err = cur.All(ctx, &vaasWithPayload)
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed decoding cursor to []*VaaWithPayload", zap.Error(err), zap.Any("q", q),
zap.String("requestID", requestID))
r.logger.Error("failed decoding cursor to []*VaaWithPayload",
zap.Error(err),
zap.Any("q", q),
zap.String("requestID", requestID),
)
return nil, errors.WithStack(err)
}
// check not found
if len(vaasWithPayload) == 0 {
return nil, errs.ErrNotFound
}
// check can not get more that one field in the response.
if len(vaasWithPayload) > 1 {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("can not get more that one vaa by chainID/address/sequence", zap.Any("q", q),
zap.String("requestID", requestID))
return nil, errs.ErrInternalError
}
return vaasWithPayload[0], nil
return vaasWithPayload, nil
}
// GetVaaCount get a count of vaa by chainID.
@ -180,7 +185,10 @@ func (r *Repository) GetVaaCount(ctx context.Context, q *VaaQuery) ([]*VaaStats,
if q == nil {
q = Query()
}
sort := bson.D{{q.SortBy, q.GetSortInt()}}
sort := bson.D{{
q.SortBy,
q.GetSortInt(),
}}
cur, err := r.collections.vaaCount.Find(ctx, q.toBSON(), options.Find().SetLimit(q.PageSize).SetSkip(q.Offset).SetSort(sort))
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))

View File

@ -27,17 +27,57 @@ func NewService(r *Repository, getCacheFunc cache.CacheGetFunc, logger *zap.Logg
}
// FindAll get all the the vaa.
func (s *Service) FindAll(ctx context.Context, p *pagination.Pagination, txHash *vaa.Address) (*response.Response[[]*VaaDoc], error) {
func (s *Service) FindAll(
ctx context.Context,
p *pagination.Pagination,
txHash *vaa.Address,
includeParsedPayload bool,
) (*response.Response[[]*VaaWithPayload], error) {
if p == nil {
p = pagination.FirstPage()
}
query := Query().SetPagination(p)
if txHash != nil {
query = query.SetTxHash(txHash.String())
}
vaas, err := s.repo.Find(ctx, query)
res := response.Response[[]*VaaDoc]{Data: vaas}
return &res, err
if includeParsedPayload {
vaas, err := s.repo.FindVaasWithPayload(ctx, query)
if err != nil {
return nil, err
}
return &response.Response[[]*VaaWithPayload]{Data: vaas}, nil
} else {
vaas, err := s.repo.Find(ctx, query)
if err != nil {
return nil, err
}
var vaasWithPayload []*VaaWithPayload
for i := range vaas {
vaaWithPayload := VaaWithPayload{
ID: vaas[i].ID,
Version: vaas[i].Version,
EmitterChain: vaas[i].EmitterChain,
EmitterAddr: vaas[i].EmitterAddr,
Sequence: vaas[i].Sequence,
GuardianSetIndex: vaas[i].GuardianSetIndex,
Timestamp: vaas[i].Timestamp,
IndexedAt: vaas[i].IndexedAt,
UpdatedAt: vaas[i].UpdatedAt,
Vaa: vaas[i].Vaa,
}
vaasWithPayload = append(vaasWithPayload, &vaaWithPayload)
}
resp := response.Response[[]*VaaWithPayload]{Data: vaasWithPayload}
return &resp, err
}
}
// FindByChain get all the vaa by chainID.
@ -57,7 +97,14 @@ func (s *Service) FindByEmitter(ctx context.Context, chain vaa.ChainID, emitter
}
// If the parameter [payload] is true, the parse payload is added in the response.
func (s *Service) FindById(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string, payload bool) (*response.Response[*VaaWithPayload], error) {
func (s *Service) FindById(
ctx context.Context,
chain vaa.ChainID,
emitter vaa.Address,
seq string,
payload bool,
) (*response.Response[*VaaWithPayload], error) {
// check vaa sequence indexed
isVaaNotIndexed := s.discardVaaNotIndexed(ctx, chain, emitter, seq)
if isVaaNotIndexed {
@ -99,7 +146,22 @@ func (s *Service) findById(ctx context.Context, chain vaa.ChainID, emitter vaa.A
// findByIdWithPayload get a vaa with payload data by chainID, emitter address and sequence number.
func (s *Service) findByIdWithPayload(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) (*VaaWithPayload, error) {
query := Query().SetChain(chain).SetEmitter(emitter.String()).SetSequence(seq)
return s.repo.GetVaaWithPayload(ctx, query)
vaas, err := s.repo.FindVaasWithPayload(ctx, query)
if err != nil {
return nil, err
} else if len(vaas) == 0 {
return nil, errs.ErrNotFound
} else if len(vaas) > 1 {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
s.logger.Error("can not get more that one vaa by chainID/address/sequence",
zap.Any("q", query),
zap.String("requestID", requestID),
)
return nil, errs.ErrInternalError
}
return vaas[0], nil
}
// GetVaaCount get a list a list of vaa count grouped by chainID.

View File

@ -34,12 +34,20 @@ func NewController(serv *vaa.Service, logger *zap.Logger) *Controller {
// @Failure 500
// @Router /api/v1/vaas/ [get]
func (c *Controller) FindAll(ctx *fiber.Ctx) error {
p := middleware.GetPaginationFromContext(ctx)
txHash, err := middleware.GetTxHash(ctx, c.logger)
if err != nil {
return err
}
vaas, err := c.srv.FindAll(ctx.Context(), p, txHash)
includeParsedPayload, err := middleware.ExtractParsedPayload(ctx, c.logger)
if err != nil {
return err
}
vaas, err := c.srv.FindAll(ctx.Context(), p, txHash, includeParsedPayload)
if err != nil {
return err
}
@ -111,17 +119,24 @@ func (c *Controller) FindByEmitter(ctx *fiber.Ctx) error {
// @Failure 500
// @Router /api/v1/vaas/{chain_id}/{emitter}/{seq}/{signer}/{hash} [get]
func (c *Controller) FindById(ctx *fiber.Ctx) error {
chainID, emitter, seq, err := middleware.ExtractVAAParams(ctx, c.logger)
if err != nil {
return err
}
parsedPayload, err := middleware.ExtractParsedPayload(ctx, c.logger)
includeParsedPayload, err := middleware.ExtractParsedPayload(ctx, c.logger)
if err != nil {
return err
}
vaa, err := c.srv.FindById(ctx.Context(), chainID, *emitter, strconv.FormatUint(seq, 10), parsedPayload)
vaa, err := c.srv.FindById(
ctx.Context(),
chainID,
*emitter,
strconv.FormatUint(seq, 10),
includeParsedPayload,
)
if err != nil {
return err
}