[API] Add `toChain` filter to `GET /api/v1/vaas/{emitterChain}/{emitterAddr}` (#598)
### Description This pull request adds the parameter `toChain` to the endpoint `GET /api/v1/vaas/{emitterChain}/{emitterAddress}`. Other VAA-related endpoints do not support this parameter. Additionally, for performance reasons, a composite index must be created in MongoDB: `db.parsedVaa.createIndex({"emitterChain": -1, "emitterAddr": -1, "rawStandardizedProperties.toChain": -1, "indexedAt": -1})`
This commit is contained in:
parent
008f7aab88
commit
0f1797e44a
|
@ -1287,6 +1287,12 @@ const docTemplate = `{
|
||||||
"in": "path",
|
"in": "path",
|
||||||
"required": true
|
"required": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"type": "integer",
|
||||||
|
"description": "destination chain",
|
||||||
|
"name": "toChain",
|
||||||
|
"in": "query"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"description": "Page number.",
|
"description": "Page number.",
|
||||||
|
|
|
@ -1280,6 +1280,12 @@
|
||||||
"in": "path",
|
"in": "path",
|
||||||
"required": true
|
"required": true
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"type": "integer",
|
||||||
|
"description": "destination chain",
|
||||||
|
"name": "toChain",
|
||||||
|
"in": "query"
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"type": "integer",
|
"type": "integer",
|
||||||
"description": "Page number.",
|
"description": "Page number.",
|
||||||
|
|
|
@ -1611,6 +1611,10 @@ paths:
|
||||||
name: emitter
|
name: emitter
|
||||||
required: true
|
required: true
|
||||||
type: string
|
type: string
|
||||||
|
- description: destination chain
|
||||||
|
in: query
|
||||||
|
name: toChain
|
||||||
|
type: integer
|
||||||
- description: Page number.
|
- description: Page number.
|
||||||
in: query
|
in: query
|
||||||
name: page
|
name: page
|
||||||
|
|
|
@ -21,6 +21,7 @@ type Repository struct {
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
collections struct {
|
collections struct {
|
||||||
vaas *mongo.Collection
|
vaas *mongo.Collection
|
||||||
|
parsedVaa *mongo.Collection
|
||||||
vaasPythnet *mongo.Collection
|
vaasPythnet *mongo.Collection
|
||||||
invalidVaas *mongo.Collection
|
invalidVaas *mongo.Collection
|
||||||
vaaCount *mongo.Collection
|
vaaCount *mongo.Collection
|
||||||
|
@ -34,12 +35,14 @@ func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository {
|
||||||
logger: logger.With(zap.String("module", "VaaRepository")),
|
logger: logger.With(zap.String("module", "VaaRepository")),
|
||||||
collections: struct {
|
collections: struct {
|
||||||
vaas *mongo.Collection
|
vaas *mongo.Collection
|
||||||
|
parsedVaa *mongo.Collection
|
||||||
vaasPythnet *mongo.Collection
|
vaasPythnet *mongo.Collection
|
||||||
invalidVaas *mongo.Collection
|
invalidVaas *mongo.Collection
|
||||||
vaaCount *mongo.Collection
|
vaaCount *mongo.Collection
|
||||||
globalTransactions *mongo.Collection
|
globalTransactions *mongo.Collection
|
||||||
}{
|
}{
|
||||||
vaas: db.Collection("vaas"),
|
vaas: db.Collection("vaas"),
|
||||||
|
parsedVaa: db.Collection("parsedVaa"),
|
||||||
vaasPythnet: db.Collection("vaasPythnet"),
|
vaasPythnet: db.Collection("vaasPythnet"),
|
||||||
invalidVaas: db.Collection("invalid_vaas"),
|
invalidVaas: db.Collection("invalid_vaas"),
|
||||||
vaaCount: db.Collection("vaaCounts"),
|
vaaCount: db.Collection("vaaCounts"),
|
||||||
|
@ -114,6 +117,81 @@ func (r *Repository) FindVaasByTxHashWorkaround(
|
||||||
return r.FindVaas(ctx, &q)
|
return r.FindVaas(ctx, &q)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindVaasByEmitterAndToChain searches the database for VAAs that match a given emitter chain, address and toChain.
|
||||||
|
func (r *Repository) FindVaasByEmitterAndToChain(
|
||||||
|
ctx context.Context,
|
||||||
|
query *VaaQuery,
|
||||||
|
toChain sdk.ChainID,
|
||||||
|
) ([]*VaaDoc, error) {
|
||||||
|
|
||||||
|
// build a query pipeline based on input parameters
|
||||||
|
var pipeline mongo.Pipeline
|
||||||
|
{
|
||||||
|
// filter by emitterChain, emitterAddr, and toChain
|
||||||
|
pipeline = append(pipeline, bson.D{
|
||||||
|
{"$match", bson.D{bson.E{"emitterChain", query.chainId}}},
|
||||||
|
})
|
||||||
|
pipeline = append(pipeline, bson.D{
|
||||||
|
{"$match", bson.D{bson.E{"emitterAddr", query.emitter}}},
|
||||||
|
})
|
||||||
|
pipeline = append(pipeline, bson.D{
|
||||||
|
{"$match", bson.D{bson.E{"rawStandardizedProperties.toChain", toChain}}},
|
||||||
|
})
|
||||||
|
|
||||||
|
// specify sorting criteria
|
||||||
|
pipeline = append(pipeline, bson.D{{"$sort", bson.D{bson.E{"indexedAt", query.GetSortInt()}}}})
|
||||||
|
|
||||||
|
// skip initial results
|
||||||
|
if query.Pagination.Skip != 0 {
|
||||||
|
pipeline = append(pipeline, bson.D{{"$skip", query.Pagination.Skip}})
|
||||||
|
}
|
||||||
|
|
||||||
|
// limit size of results
|
||||||
|
pipeline = append(pipeline, bson.D{{"$limit", query.Pagination.Limit}})
|
||||||
|
}
|
||||||
|
|
||||||
|
// execute the aggregation pipeline
|
||||||
|
cur, err := r.collections.parsedVaa.Aggregate(ctx, pipeline)
|
||||||
|
if err != nil {
|
||||||
|
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
|
||||||
|
r.logger.Error("failed execute Aggregate command to get vaa by emitter and toChain",
|
||||||
|
zap.Error(err),
|
||||||
|
zap.Any("q", query),
|
||||||
|
zap.Any("toChain", toChain),
|
||||||
|
zap.String("requestID", requestID),
|
||||||
|
)
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// read results from cursor
|
||||||
|
var vaas []struct {
|
||||||
|
ID string `bson:"_id"`
|
||||||
|
}
|
||||||
|
err = cur.All(ctx, &vaas)
|
||||||
|
if err != nil {
|
||||||
|
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
|
||||||
|
r.logger.Error("failed to decode cursor",
|
||||||
|
zap.Error(err),
|
||||||
|
zap.Any("q", query),
|
||||||
|
zap.Any("toChain", toChain),
|
||||||
|
zap.String("requestID", requestID),
|
||||||
|
)
|
||||||
|
return nil, errors.WithStack(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// if no results were found, return an empty slice instead of nil.
|
||||||
|
if len(vaas) == 0 {
|
||||||
|
return make([]*VaaDoc, 0), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// call FindVaas with the IDs we've found
|
||||||
|
q := *query // make a copy to avoid modifying the struct passed by the caller
|
||||||
|
for _, vaa := range vaas {
|
||||||
|
q.ids = append(q.ids, vaa.ID)
|
||||||
|
}
|
||||||
|
return r.FindVaas(ctx, &q)
|
||||||
|
}
|
||||||
|
|
||||||
// FindVaas searches the database for VAAs matching the given filters.
|
// FindVaas searches the database for VAAs matching the given filters.
|
||||||
//
|
//
|
||||||
// When the `q.txHash` field is set, this function will look up transaction hashes in the `vaas` collection.
|
// When the `q.txHash` field is set, this function will look up transaction hashes in the `vaas` collection.
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
"github.com/wormhole-foundation/wormhole-explorer/api/response"
|
"github.com/wormhole-foundation/wormhole-explorer/api/response"
|
||||||
"github.com/wormhole-foundation/wormhole-explorer/api/types"
|
"github.com/wormhole-foundation/wormhole-explorer/api/types"
|
||||||
"github.com/wormhole-foundation/wormhole-explorer/common/client/cache"
|
"github.com/wormhole-foundation/wormhole-explorer/common/client/cache"
|
||||||
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ func (s *Service) FindAll(
|
||||||
// FindByChain get all the vaa by chainID.
|
// FindByChain get all the vaa by chainID.
|
||||||
func (s *Service) FindByChain(
|
func (s *Service) FindByChain(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
chain vaa.ChainID,
|
chain sdk.ChainID,
|
||||||
p *pagination.Pagination,
|
p *pagination.Pagination,
|
||||||
) (*response.Response[[]*VaaDoc], error) {
|
) (*response.Response[[]*VaaDoc], error) {
|
||||||
|
|
||||||
|
@ -101,21 +101,38 @@ func (s *Service) FindByChain(
|
||||||
return &res, err
|
return &res, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindByEmitterParams contains the input parameters for the function `FindByEmitter`.
|
||||||
|
type FindByEmitterParams struct {
|
||||||
|
EmitterChain sdk.ChainID
|
||||||
|
EmitterAddress *types.Address
|
||||||
|
ToChain *sdk.ChainID
|
||||||
|
IncludeParsedPayload bool
|
||||||
|
Pagination *pagination.Pagination
|
||||||
|
}
|
||||||
|
|
||||||
// FindByEmitter get all the vaa by chainID and emitter address.
|
// FindByEmitter get all the vaa by chainID and emitter address.
|
||||||
func (s *Service) FindByEmitter(
|
func (s *Service) FindByEmitter(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
chain vaa.ChainID,
|
params *FindByEmitterParams,
|
||||||
emitter *types.Address,
|
|
||||||
p *pagination.Pagination,
|
|
||||||
) (*response.Response[[]*VaaDoc], error) {
|
) (*response.Response[[]*VaaDoc], error) {
|
||||||
|
|
||||||
query := Query().
|
query := Query().
|
||||||
SetChain(chain).
|
SetChain(params.EmitterChain).
|
||||||
SetEmitter(emitter.Hex()).
|
SetEmitter(params.EmitterAddress.Hex()).
|
||||||
SetPagination(p).
|
SetPagination(params.Pagination).
|
||||||
IncludeParsedPayload(false)
|
IncludeParsedPayload(params.IncludeParsedPayload)
|
||||||
|
|
||||||
vaas, err := s.repo.FindVaas(ctx, query)
|
// In most cases, the data is obtained from the VAA collection.
|
||||||
|
//
|
||||||
|
// The special case of filtering VAAs by `toChain` requires querying
|
||||||
|
// the data from a different collection.
|
||||||
|
var vaas []*VaaDoc
|
||||||
|
var err error
|
||||||
|
if params.ToChain != nil {
|
||||||
|
vaas, err = s.repo.FindVaasByEmitterAndToChain(ctx, query, *params.ToChain)
|
||||||
|
} else {
|
||||||
|
vaas, err = s.repo.FindVaas(ctx, query)
|
||||||
|
}
|
||||||
|
|
||||||
res := response.Response[[]*VaaDoc]{Data: vaas}
|
res := response.Response[[]*VaaDoc]{Data: vaas}
|
||||||
return &res, err
|
return &res, err
|
||||||
|
@ -124,7 +141,7 @@ func (s *Service) FindByEmitter(
|
||||||
// If the parameter [payload] is true, the parse payload is added in the response.
|
// If the parameter [payload] is true, the parse payload is added in the response.
|
||||||
func (s *Service) FindById(
|
func (s *Service) FindById(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
chain vaa.ChainID,
|
chain sdk.ChainID,
|
||||||
emitter *types.Address,
|
emitter *types.Address,
|
||||||
seq string,
|
seq string,
|
||||||
includeParsedPayload bool,
|
includeParsedPayload bool,
|
||||||
|
@ -150,7 +167,7 @@ func (s *Service) FindById(
|
||||||
// findById get a vaa by chainID, emitter address and sequence number.
|
// findById get a vaa by chainID, emitter address and sequence number.
|
||||||
func (s *Service) findById(
|
func (s *Service) findById(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
chain vaa.ChainID,
|
chain sdk.ChainID,
|
||||||
emitter *types.Address,
|
emitter *types.Address,
|
||||||
seq string,
|
seq string,
|
||||||
includeParsedPayload bool,
|
includeParsedPayload bool,
|
||||||
|
@ -193,7 +210,7 @@ func (s *Service) GetVaaCount(ctx context.Context) (*response.Response[[]*VaaSta
|
||||||
// discardVaaNotIndexed discard a vaa request if the input sequence for a chainID, address is greatter than or equals
|
// discardVaaNotIndexed discard a vaa request if the input sequence for a chainID, address is greatter than or equals
|
||||||
// the cached value of the sequence for this chainID, address.
|
// the cached value of the sequence for this chainID, address.
|
||||||
// If the sequence does not exist we can not discard the request.
|
// If the sequence does not exist we can not discard the request.
|
||||||
func (s *Service) discardVaaNotIndexed(ctx context.Context, chain vaa.ChainID, emitter *types.Address, seq string) bool {
|
func (s *Service) discardVaaNotIndexed(ctx context.Context, chain sdk.ChainID, emitter *types.Address, seq string) bool {
|
||||||
key := fmt.Sprintf("%s:%d:%s", "wormscan:vaa-max-sequence", chain, emitter.Hex())
|
key := fmt.Sprintf("%s:%d:%s", "wormscan:vaa-max-sequence", chain, emitter.Hex())
|
||||||
sequence, err := s.getCacheFunc(ctx, key)
|
sequence, err := s.getCacheFunc(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -35,6 +35,31 @@ func ExtractChainID(c *fiber.Ctx, l *zap.Logger) (sdk.ChainID, error) {
|
||||||
return sdk.ChainID(chain), nil
|
return sdk.ChainID(chain), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ExtractFromChain obtains the "toChain" query parameter from the request.
|
||||||
|
//
|
||||||
|
// When the parameter is not present, the function returns: a nil ChainID and a nil error.
|
||||||
|
func ExtractToChain(c *fiber.Ctx, l *zap.Logger) (*sdk.ChainID, error) {
|
||||||
|
|
||||||
|
param := c.Query("toChain")
|
||||||
|
if param == "" {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
chain, err := strconv.ParseInt(param, 10, 16)
|
||||||
|
if err != nil {
|
||||||
|
requestID := fmt.Sprintf("%v", c.Locals("requestid"))
|
||||||
|
l.Error("failed to parse toChain parameter",
|
||||||
|
zap.Error(err),
|
||||||
|
zap.String("requestID", requestID),
|
||||||
|
)
|
||||||
|
|
||||||
|
return nil, response.NewInvalidParamError(c, "INVALID TO_CHAIN VALUE", errors.WithStack(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
result := sdk.ChainID(chain)
|
||||||
|
return &result, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ExtractEmitterAddr parses the emitter address from the request path.
|
// ExtractEmitterAddr parses the emitter address from the request path.
|
||||||
//
|
//
|
||||||
// When the parameter `chainIdHint` is not nil, this function will attempt to parse the
|
// When the parameter `chainIdHint` is not nil, this function will attempt to parse the
|
||||||
|
|
|
@ -109,6 +109,7 @@ func (c *Controller) FindByChain(ctx *fiber.Ctx) error {
|
||||||
// @ID find-vaas-by-emitter
|
// @ID find-vaas-by-emitter
|
||||||
// @Param chain_id path integer true "id of the blockchain"
|
// @Param chain_id path integer true "id of the blockchain"
|
||||||
// @Param emitter path string true "address of the emitter"
|
// @Param emitter path string true "address of the emitter"
|
||||||
|
// @Param toChain query integer false "destination chain"
|
||||||
// @Param page query integer false "Page number."
|
// @Param page query integer false "Page number."
|
||||||
// @Param pageSize query integer false "Number of elements per page."
|
// @Param pageSize query integer false "Number of elements per page."
|
||||||
// @Param sortOrder query string false "Sort results in ascending or descending order." Enums(ASC, DESC)
|
// @Param sortOrder query string false "Sort results in ascending or descending order." Enums(ASC, DESC)
|
||||||
|
@ -118,17 +119,33 @@ func (c *Controller) FindByChain(ctx *fiber.Ctx) error {
|
||||||
// @Router /api/v1/vaas/{chain_id}/{emitter} [get]
|
// @Router /api/v1/vaas/{chain_id}/{emitter} [get]
|
||||||
func (c *Controller) FindByEmitter(ctx *fiber.Ctx) error {
|
func (c *Controller) FindByEmitter(ctx *fiber.Ctx) error {
|
||||||
|
|
||||||
p, err := middleware.ExtractPagination(ctx)
|
// Get query parameters
|
||||||
|
pagination, err := middleware.ExtractPagination(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
chainID, emitter, err := middleware.ExtractVAAChainIDEmitter(ctx, c.logger)
|
chainID, emitter, err := middleware.ExtractVAAChainIDEmitter(ctx, c.logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
toChain, err := middleware.ExtractToChain(ctx, c.logger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
includeParsedPayload, err := middleware.ExtractParsedPayload(ctx, c.logger)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
vaas, err := c.srv.FindByEmitter(ctx.Context(), chainID, emitter, p)
|
// Call the VAA service
|
||||||
|
p := vaa.FindByEmitterParams{
|
||||||
|
EmitterChain: chainID,
|
||||||
|
EmitterAddress: emitter,
|
||||||
|
ToChain: toChain,
|
||||||
|
IncludeParsedPayload: includeParsedPayload,
|
||||||
|
Pagination: pagination,
|
||||||
|
}
|
||||||
|
vaas, err := c.srv.FindByEmitter(ctx.Context(), &p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue