// Package governor handle the request of governor data from governor endpoint defined in the api. package governor import ( "context" "fmt" "sort" "time" "github.com/pkg/errors" errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" "github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination" "github.com/wormhole-foundation/wormhole-explorer/common/types" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" ) const minGuardianNum = 13 // Repository definition. type Repository struct { db *mongo.Database logger *zap.Logger collections struct { governorConfig *mongo.Collection governorStatus *mongo.Collection } } // NewRepository create a new Repository. func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository { return &Repository{db: db, logger: logger.With(zap.String("module", "GovernorRepository")), collections: struct { governorConfig *mongo.Collection governorStatus *mongo.Collection }{ governorConfig: db.Collection("governorConfig"), governorStatus: db.Collection("governorStatus"), }, } } // GovernorQuery respresent a query for the governors mongodb documents. type GovernorQuery struct { pagination.Pagination id *types.Address } // NewGovernorQuery creates a new `*GovernorQuery` with default pagination values. func NewGovernorQuery() *GovernorQuery { p := pagination.Default() return &GovernorQuery{Pagination: *p} } // SetID sets the `id` field of the GovernorQuery struct. func (q *GovernorQuery) SetID(id *types.Address) *GovernorQuery { // Make a deep copy to avoid aliasing bugs q.id = id.Copy() return q } // SetPagination set the pagination field of the GovernorQuery struct. func (q *GovernorQuery) SetPagination(p *pagination.Pagination) *GovernorQuery { q.Pagination = *p return q } func (q *GovernorQuery) toBSON() *bson.D { r := bson.D{} if q.id != nil { // In the `governorConfig` and `governorStatus` collections, addresses // are stored as 40 hex digits (instead of the full 64 hex digits). // // Hence, we're using the `ShortHex()` function instead of just `Hex()`. r = append(r, bson.E{Key: "_id", Value: q.id.ShortHex()}) } return &r } // FindGovConfigurations get a list of *GovConfig. func (r *Repository) FindGovConfigurations( ctx context.Context, q *GovernorQuery, ) ([]*GovConfig, error) { sort := bson.D{{Key: "_id", Value: 1}} projection := bson.D{ {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodename", Value: "$parsedConfig.nodename"}, {Key: "counter", Value: "$parsedConfig.counter"}, {Key: "chains", Value: "$parsedConfig.chains"}, {Key: "tokens", Value: "$parsedConfig.tokens"}, } options := options. Find(). SetProjection(projection). SetLimit(q.Limit). SetSkip(q.Skip). SetSort(sort) cur, err := r.collections.governorConfig.Find(ctx, q.toBSON(), options) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Find command to get governor configurations", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } var govConfigs []*GovConfig err = cur.All(ctx, &govConfigs) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*GovConfig", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } return govConfigs, err } // FindGovernorStatus get a list of *GovStatus. func (r *Repository) FindGovernorStatus( ctx context.Context, q *GovernorQuery, ) ([]*GovStatus, error) { // Sort guardians by ascending ID to guarantee deterministic output. sort := bson.D{{Key: "_id", Value: 1}} projection := bson.D{ {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodename", Value: "$parsedStatus.nodename"}, {Key: "chains", Value: "$parsedStatus.chains"}, } options := options. Find(). SetProjection(projection). SetLimit(q.Limit). SetSkip(q.Skip). SetSort(sort) cur, err := r.collections.governorStatus.Find(ctx, q.toBSON(), options) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Find command to get all governor status", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } var govStatus []*GovStatus err = cur.All(ctx, &govStatus) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*GovStatus", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } return govStatus, err } // FindOneGovernorStatus get a *GovStatus. The q parameter define the filter to apply to the query. func (r *Repository) FindOneGovernorStatus( ctx context.Context, q *GovernorQuery, ) (*GovStatus, error) { projection := bson.D{ {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodename", Value: "$parsedStatus.nodename"}, {Key: "chains", Value: "$parsedStatus.chains"}, } options := options. FindOne(). SetProjection(projection) var govConfig GovStatus err := r.collections.governorStatus.FindOne(ctx, q.toBSON(), options).Decode(&govConfig) if err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil, errs.ErrNotFound } requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute FindOne command to get governor status", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } return &govConfig, err } // NotionalLimitQuery type NotionalLimitQuery struct { pagination.Pagination id string chainID vaa.ChainID } // QueryNotionalLimit create a new NotionalLimitQuery with default pagination values. func QueryNotionalLimit() *NotionalLimitQuery { p := pagination.Default() return &NotionalLimitQuery{Pagination: *p} } // SetID set the id field of the NotionalLimitQuery struct. func (q *NotionalLimitQuery) SetID(id string) *NotionalLimitQuery { q.id = id return q } // SetChain set the chainID field of the NotionalLimitQuery struct. func (q *NotionalLimitQuery) SetChain(chainID vaa.ChainID) *NotionalLimitQuery { q.chainID = chainID return q } // SetPagination set the Pagination field of the NotionalLimitQuery struct. func (q *NotionalLimitQuery) SetPagination(p *pagination.Pagination) *NotionalLimitQuery { q.Pagination = *p return q } // FindNotionalLimit get a list *NotionalLimit. func (r *Repository) FindNotionalLimit( ctx context.Context, q *NotionalLimitQuery, ) ([]*NotionalLimit, error) { // agreggation stages to get notionalLimit for each chainID. matchStage1 := bson.D{{Key: "$match", Value: bson.D{}}} projectStage2 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chains", Value: "$parsedConfig.chains"}, }}, } unwindStage3 := bson.D{ {Key: "$unwind", Value: "$chains"}, } sortStage4 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "chains.chainid", Value: 1}, {Key: "chains.notionallimit", Value: -1}, {Key: "chains.bigtransactionsize", Value: -1}, }}, } groupStage5 := bson.D{ {Key: "$group", Value: bson.D{ {Key: "_id", Value: "$chains.chainid"}, {Key: "notionalLimits", Value: bson.D{ {Key: "$push", Value: bson.D{ {Key: "notionalLimit", Value: "$chains.notionallimit"}, {Key: "maxTransactionSize", Value: "$chains.bigtransactionsize"}, }}, }}, }}, } projectStage6 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainId", Value: "$_id"}, {Key: "notionalLimit", Value: bson.M{ "$arrayElemAt": []interface{}{"$notionalLimits", minGuardianNum - 1}, }}, }}, } projectStage7 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainId", Value: 1}, {Key: "notionalLimit", Value: "$notionalLimit.notionalLimit"}, {Key: "maxTransactionSize", Value: "$notionalLimit.maxTransactionSize"}, }}, } sortStage8 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "chainId", Value: 1}, }}, } // define aggregate pipeline pipeLine := mongo.Pipeline{ matchStage1, projectStage2, unwindStage3, sortStage4, groupStage5, projectStage6, projectStage7, sortStage8, } // execute aggregate operations. cur, err := r.collections.governorConfig.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get notional limit", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decodes to NotionalLimit. var notionalLimits []*NotionalLimit err = cur.All(ctx, ¬ionalLimits) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*NotionalLimit", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // check records exists. if len(notionalLimits) == 0 { return nil, errs.ErrNotFound } return notionalLimits, nil } // GetNotionalLimitByChainID get a list *NotionalLimitDetail. func (r *Repository) GetNotionalLimitByChainID( ctx context.Context, q *NotionalLimitQuery, ) ([]*NotionalLimitDetail, error) { // agreggation stages to get notionalLimit by chainID. matchStage1 := bson.D{{Key: "$match", Value: bson.D{}}} projectStage2 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: "$parsedConfig.nodename"}, {Key: "parsedConfig.chains", Value: bson.D{ {Key: "$filter", Value: bson.D{ {Key: "input", Value: "$parsedConfig.chains"}, {Key: "as", Value: "chain"}, {Key: "cond", Value: bson.D{ {Key: "$eq", Value: bson.A{"$$chain.chainid", q.chainID}}, }}, }}, }}, }}, } projectStage3 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: 1}, {Key: "notionalLimits", Value: bson.M{ "$arrayElemAt": []interface{}{"$parsedConfig.chains", 0}, }}, }}, } projectStage4 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: 1}, {Key: "chainId", Value: "$notionalLimits.chainid"}, {Key: "notionalLimit", Value: "$notionalLimits.notionallimit"}, {Key: "maxTransactionSize", Value: "$notionalLimits.bigtransactionsize"}, }}, } pipeLine := mongo.Pipeline{ matchStage1, projectStage2, projectStage3, projectStage4, } // execute aggregate operations. cur, err := r.collections.governorConfig.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get notional limit by chainID", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decode to []NotionalLimitRecord. var notionalLimits []*NotionalLimitDetail err = cur.All(ctx, ¬ionalLimits) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*NotionalLimitDetail", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } return notionalLimits, nil } // GetAvailableNotional get a list of *NotionalAvailable. func (r *Repository) GetAvailableNotional( ctx context.Context, q *NotionalLimitQuery, ) ([]*NotionalAvailable, error) { // stage. matchStage1 := bson.D{{Key: "$match", Value: bson.D{}}} // project. projectStage2 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chains", Value: "$parsedStatus.chains"}, }}, } // deconstructs. unwindStage3 := bson.D{ {Key: "$unwind", Value: "$chains"}, } // sort. sortStage4 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "chains.chainid", Value: 1}, {Key: "chains.remainingavailablenotional", Value: -1}, }}, } // group. groupStage5 := bson.D{ {Key: "$group", Value: bson.D{ {Key: "_id", Value: "$chains.chainid"}, {Key: "availableNotionals", Value: bson.D{ {Key: "$push", Value: bson.D{ {Key: "availableNotional", Value: "$chains.remainingavailablenotional"}, }}, }}, }}, } // project. projectStage6 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainId", Value: "$_id"}, {Key: "availableNotional", Value: bson.M{ "$arrayElemAt": []interface{}{"$availableNotionals", minGuardianNum - 1}, }}, }}, } // projects. projectStage7 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainId", Value: 1}, {Key: "availableNotional", Value: "$availableNotional.availableNotional"}, }}, } // sort stage sortStage8 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "chainId", Value: 1}, }}, } // skip initial pages skipStage9 := bson.D{{"$skip", q.Pagination.Skip}} // limit size of results limitStage10 := bson.D{{"$limit", q.Pagination.Limit}} pipeLine := mongo.Pipeline{ matchStage1, projectStage2, unwindStage3, sortStage4, groupStage5, projectStage6, projectStage7, sortStage8, skipStage9, limitStage10, } // execute aggregate operations. cur, err := r.collections.governorStatus.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get available notional", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decode to []NotionalLimitRecord. var notionalAvailables []*NotionalAvailable err = cur.All(ctx, ¬ionalAvailables) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*NotionalAvailable", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // check exists records if len(notionalAvailables) == 0 { return nil, errs.ErrNotFound } return notionalAvailables, nil } // GetAvailableNotionalByChainID get a list of *NotionalAvailableDetail. func (r *Repository) GetAvailableNotionalByChainID( ctx context.Context, q *NotionalLimitQuery, ) ([]*NotionalAvailableDetail, error) { // sort documents to provide deterministic output sortStage1 := bson.D{ { Key: "$sort", Value: bson.D{ {Key: "_id", Value: 1}, }, }, } // projection projectStage2 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: "$parsedStatus.nodename"}, {Key: "parsedStatus.chains", Value: bson.D{ {Key: "$filter", Value: bson.D{ {Key: "input", Value: "$parsedStatus.chains"}, {Key: "as", Value: "chain"}, {Key: "cond", Value: bson.D{ {Key: "$eq", Value: bson.A{"$$chain.chainid", q.chainID}}, }}, }}, }}, }}, } // projection projectStage3 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: 1}, {Key: "availableNotional", Value: bson.M{ "$arrayElemAt": []interface{}{"$parsedStatus.chains", 0}, }}, }}, } // projection projectStage4 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: 1}, {Key: "chainId", Value: "$availableNotional.chainid"}, {Key: "availableNotional", Value: "$availableNotional.remainingavailablenotional"}, }}, } // skip initial pages skipStage5 := bson.D{{"$skip", q.Pagination.Skip}} // limit size of results limitStage6 := bson.D{{"$limit", q.Pagination.Limit}} pipeLine := mongo.Pipeline{ sortStage1, projectStage2, projectStage3, projectStage4, skipStage5, limitStage6, } // execute aggregate operations. cur, err := r.collections.governorStatus.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get available notional by chainID", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decode to []NotionalLimitRecord. var notionalAvailability []*NotionalAvailableDetail err = cur.All(ctx, ¬ionalAvailability) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*NotionalAvailableDetail", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } return notionalAvailability, nil } // GetMaxNotionalAvailableByChainID get a *MaxNotionalAvailableRecord. func (r *Repository) GetMaxNotionalAvailableByChainID( ctx context.Context, q *NotionalLimitQuery, ) (*MaxNotionalAvailableRecord, error) { // stage definitions. matchStage1 := bson.D{{Key: "$match", Value: bson.D{}}} // projection projectStage2 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: "$parsedStatus.nodename"}, {Key: "parsedStatus.chains", Value: bson.D{ {Key: "$filter", Value: bson.D{ {Key: "input", Value: "$parsedStatus.chains"}, {Key: "as", Value: "chain"}, {Key: "cond", Value: bson.D{ {Key: "$eq", Value: bson.A{"$$chain.chainid", q.chainID}}, }}, }}, }}, }}, } // projection projectStage3 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: 1}, {Key: "availableNotional", Value: bson.M{ "$arrayElemAt": []interface{}{"$parsedStatus.chains", 0}, }}, }}, } // projection projectStage4 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: 1}, {Key: "chainId", Value: "$availableNotional.chainid"}, {Key: "availableNotional", Value: "$availableNotional.remainingavailablenotional"}, {Key: "emitters", Value: "$availableNotional.emitters"}, }}, } // TODO: CHECK sortStage5 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "availableNotional", Value: -1}, }}, } pipeLine := mongo.Pipeline{ matchStage1, projectStage2, projectStage3, projectStage4, sortStage5, } // execute aggregate operations. cur, err := r.collections.governorStatus.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get maximun available notional by chainID", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decode to []NotionalLimitRecord. var rows []*MaxNotionalAvailableRecord err = cur.All(ctx, &rows) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*MaxNotionalAvailableRecord", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // check exists records if len(rows) == 0 { return nil, errs.ErrNotFound } if len(rows) < minGuardianNum { return nil, errs.ErrNotFound } maxNotionalLimit := rows[minGuardianNum-1] return maxNotionalLimit, nil } // EnqueuedVaaQuery respresent a query for enqueuedVaa queries. type EnqueuedVaaQuery struct { pagination.Pagination id string chainID vaa.ChainID } // QueryEnqueuedVaa create a new EnqueuedVaaQuery with default pagination values. func QueryEnqueuedVaa() *EnqueuedVaaQuery { page := pagination.Default() return &EnqueuedVaaQuery{Pagination: *page} } // SetID set the id field of the EnqueuedVaaQuery struct. func (q *EnqueuedVaaQuery) SetID(id string) *EnqueuedVaaQuery { q.id = id return q } // SetChain set the chainID field of the EnqueuedVaaQuery struct. func (q *EnqueuedVaaQuery) SetChain(chainID vaa.ChainID) *EnqueuedVaaQuery { q.chainID = chainID return q } // SetPagination set the pagination field of the EnqueuedVaaQuery struct. func (q *EnqueuedVaaQuery) SetPagination(p *pagination.Pagination) *EnqueuedVaaQuery { q.Pagination = *p return q } // GetEnqueueVass get a list of *EnqueuedVaas. func (r *Repository) GetEnqueueVass(ctx context.Context, q *EnqueuedVaaQuery) ([]*EnqueuedVaas, error) { // match stage. matchStage1 := bson.D{{Key: "$match", Value: bson.D{}}} // match project. projectStage2 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chains", Value: "$parsedStatus.chains"}, }}, } // match unwind. unwindStage3 := bson.D{ {Key: "$unwind", Value: "$chains"}, } // match project. projectStage4 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "chainId", Value: "$chains.chainid"}, {Key: "emitters", Value: "$chains.emitters"}, }}, } // match group. groupStage5 := bson.D{ {Key: "$group", Value: bson.D{ {Key: "_id", Value: "$chainId"}, {Key: "emitters", Value: bson.D{ {Key: "$push", Value: bson.D{ {Key: "emitterAddress", Value: bson.M{ "$arrayElemAt": []interface{}{"$emitters.emitteraddress", 0}, }}, {Key: "enqueuedVaas", Value: bson.M{ "$arrayElemAt": []interface{}{"$emitters.enqueuedvaas", 0}, }}, }}, }}, }}, } pipeLine := mongo.Pipeline{ matchStage1, projectStage2, unwindStage3, projectStage4, groupStage5, } cur, err := r.collections.governorStatus.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get enqueued vaas", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } var rows []struct { ID vaa.ChainID `bson:"_id"` Emitters []*struct { Address string `bson:"emitterAddress"` EnqueuedVaas []*struct { Sequence string `bson:"sequence"` ReleaseTime *time.Time `bson:"releasetime"` NotionalValue int64 `bson:"notionalValue"` TxHash string `bson:"txhash"` } `bson:"enqueuedVaas"` } `bson:"emitters"` } // decode query response. err = cur.All(ctx, &rows) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into rows", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // TODO: Change this logic to mongo query code. // proecess and build the response. keys := map[string]string{} enqueuedVaas := []*EnqueuedVaa{} for _, row := range rows { chainID := row.ID emiiterAddress := row.Emitters for _, ea := range emiiterAddress { emitterAddress := ea.Address enqueuedVaa := ea.EnqueuedVaas for _, v := range enqueuedVaa { key := fmt.Sprintf("%s/%s/%s", emitterAddress, v.Sequence, v.TxHash) if _, ok := keys[key]; !ok { enqueuedVaa := EnqueuedVaa{ ChainID: chainID, EmitterAddress: emitterAddress, Sequence: v.Sequence, NotionalValue: v.NotionalValue, TxHash: v.TxHash, } enqueuedVaas = append(enqueuedVaas, &enqueuedVaa) keys[key] = key } } } } // group by chainID. enqueuedVaasGroupedByChainID := map[vaa.ChainID][]*EnqueuedVaa{} for _, f := range enqueuedVaas { if _, ok := enqueuedVaasGroupedByChainID[f.ChainID]; !ok { enqueuedVaasGroupedByChainID[f.ChainID] = []*EnqueuedVaa{f} } else { fr := enqueuedVaasGroupedByChainID[f.ChainID] fr = append(fr, f) enqueuedVaasGroupedByChainID[f.ChainID] = fr } } // create response. response := []*EnqueuedVaas{} for k, v := range enqueuedVaasGroupedByChainID { r := EnqueuedVaas{ ChainID: k, EnqueuedVaa: v, } response = append(response, &r) } return response, nil } // GetEnqueueVassByChainID get a list of *EnqueuedVaaDetail by chainID. func (r *Repository) GetEnqueueVassByChainID( ctx context.Context, q *EnqueuedVaaQuery, ) ([]*EnqueuedVaaDetail, error) { // stage definitions. matchStage1 := bson.D{{Key: "$match", Value: bson.D{}}} // project stage. projectStage2 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: "$parsedStatus.nodename"}, {Key: "parsedStatus.chains", Value: bson.D{ {Key: "$filter", Value: bson.D{ {Key: "input", Value: "$parsedStatus.chains"}, {Key: "as", Value: "chain"}, {Key: "cond", Value: bson.D{ {Key: "$eq", Value: bson.A{"$$chain.chainid", q.chainID}}, }}, }}, }}, }}, } // project stage. projectStage3 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 1}, {Key: "createdAt", Value: 1}, {Key: "updatedAt", Value: 1}, {Key: "nodeName", Value: 1}, {Key: "emitters", Value: "$parsedStatus.chains.emitters"}, }}, } // unwind stage. unwindStage4 := bson.D{ {Key: "$unwind", Value: "$emitters"}, } // group stage. groupStage5 := bson.D{ {Key: "$group", Value: bson.D{ {Key: "_id", Value: bson.M{ "$arrayElemAt": []interface{}{"$emitters.emitteraddress", 0}, }}, {Key: "enqueuedVaas", Value: bson.D{ {Key: "$push", Value: bson.D{ {Key: "enqueuedVaa", Value: "$emitters.enqueuedvaas"}, }}, }}, }}, } pipeline := mongo.Pipeline{ matchStage1, projectStage2, projectStage3, unwindStage4, groupStage5, } cur, err := r.collections.governorStatus.Aggregate(ctx, pipeline) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get enqueued vaas by chainID", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decode query response. var rows []*struct { ID string `bson:"_id"` EnqueuedVaas []*struct { EnqueuedVaas [][]*struct { Sequence string `bson:"sequence"` ReleaseTime int64 `bson:"releasetime"` NotionalValue int64 `bson:"notionalValue"` TxHash string `bson:"txhash"` } `bson:"enqueuedVaa"` } `bson:"enqueuedVaas"` } err = cur.All(ctx, &rows) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into rows", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // TODO: Change this logic to mongo query code. // build response. keys := map[string]string{} response := []*EnqueuedVaaDetail{} for _, row := range rows { emitterAddress := row.ID enqueuedVaas := row.EnqueuedVaas for _, ev := range enqueuedVaas { for _, v := range ev.EnqueuedVaas[0] { key := fmt.Sprintf("%s/%s/%s", emitterAddress, v.Sequence, v.TxHash) if _, ok := keys[key]; !ok { fr := EnqueuedVaaDetail{ ChainID: q.chainID, EmitterAddress: emitterAddress, Sequence: v.Sequence, NotionalValue: v.NotionalValue, TxHash: v.TxHash, ReleaseTime: v.ReleaseTime, } response = append(response, &fr) keys[key] = key } } } } if len(response) == 0 { return nil, errs.ErrNotFound } // sort response by sequence. sort.Slice(response, func(i, j int) bool { return response[i].Sequence < response[j].Sequence }) return response, nil } // GetGovernorLimit get a list of *GovernorLimit. func (r *Repository) GetGovernorLimit( ctx context.Context, q *GovernorQuery, ) ([]*GovernorLimit, error) { // lookup. lookupStage1 := bson.D{ {Key: "$lookup", Value: bson.D{ {Key: "from", Value: "governorStatus"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "status"}, }}, } // unwind. unwindStage2 := bson.D{ {Key: "$unwind", Value: "$status"}, } projectStage3 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "configChains", Value: "$parsedConfig.chains"}, {Key: "statusChains", Value: "$status.parsedStatus.chains"}, }}, } unwindStage4 := bson.D{ {Key: "$unwind", Value: "$configChains"}, } unwindStage5 := bson.D{ {Key: "$unwind", Value: "$statusChains"}, } matchStage6 := bson.D{ {Key: "$match", Value: bson.D{ {Key: "$expr", Value: bson.D{ {Key: "$eq", Value: bson.A{"$configChains.chainid", "$statusChains.chainid"}}, }}, }}, } groupStage7 := bson.D{ {Key: "$group", Value: bson.D{ {Key: "_id", Value: "$configChains.chainid"}, {Key: "notionalLimits", Value: bson.D{ {Key: "$push", Value: bson.D{ {Key: "notionalLimit", Value: "$configChains.notionallimit"}, }}, }}, {Key: "maxTransactionSizes", Value: bson.D{ {Key: "$push", Value: bson.D{ {Key: "maxTransactionSize", Value: "$configChains.bigtransactionsize"}, }}, }}, {Key: "availableNotionals", Value: bson.D{ {Key: "$push", Value: bson.D{ {Key: "availableNotional", Value: "$statusChains.remainingavailablenotional"}, }}, }}, }}, } projectStage8 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "notionalLimits", Value: bson.D{ {Key: "$sortArray", Value: bson.D{ {Key: "input", Value: "$notionalLimits"}, {Key: "sortBy", Value: bson.D{ {Key: "notionalLimit", Value: -1}, }}, }}, }}, {Key: "maxTransactionSizes", Value: bson.D{ {Key: "$sortArray", Value: bson.D{ {Key: "input", Value: "$maxTransactionSizes"}, {Key: "sortBy", Value: bson.D{ {Key: "maxTransactionSize", Value: -1}, }}, }}, }}, {Key: "availableNotionals", Value: bson.D{ {Key: "$sortArray", Value: bson.D{ {Key: "input", Value: "$availableNotionals"}, {Key: "sortBy", Value: bson.D{ {Key: "availableNotional", Value: -1}, }}, }}, }}, }}, } projectStage9 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "notionalLimit", Value: bson.M{ "$arrayElemAt": []interface{}{"$notionalLimits", minGuardianNum - 1}, }}, {Key: "maxTransactionSize", Value: bson.M{ "$arrayElemAt": []interface{}{"$maxTransactionSizes", minGuardianNum - 1}, }}, {Key: "availableNotional", Value: bson.M{ "$arrayElemAt": []interface{}{"$availableNotionals", minGuardianNum - 1}, }}, }}, } projectStage10 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainId", Value: "$_id"}, {Key: "notionalLimit", Value: "$notionalLimit.notionalLimit"}, {Key: "maxTransactionSize", Value: "$maxTransactionSize.maxTransactionSize"}, {Key: "availableNotional", Value: "$availableNotional.availableNotional"}, }}, } sortStage11 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "chainId", Value: 1}, }}, } // define aggregate pipeline pipeline := mongo.Pipeline{ lookupStage1, unwindStage2, projectStage3, unwindStage4, unwindStage5, matchStage6, groupStage7, projectStage8, projectStage9, projectStage10, sortStage11, } // skip initial results if q.Pagination.Skip != 0 { pipeline = append(pipeline, bson.D{ {"$skip", q.Pagination.Skip}, }) } // limit size of results pipeline = append(pipeline, bson.D{ {"$limit", q.Pagination.Limit}, }) // execute aggregate operations. cur, err := r.collections.governorConfig.Aggregate(ctx, pipeline) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get governor limit", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decodes to RawDocRecord. governorLimits := []*GovernorLimit{} err = cur.All(ctx, &governorLimits) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*GovernorLimit", zap.Error(err), zap.Any("q", q), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } return governorLimits, nil } // GetAvailNotionByChain get the limits by chainID. // // In this version returns the minimum value of the availableNotional per chainID // by analyzing the data of all guardian nodes. func (r *Repository) GetAvailNotionByChain( ctx context.Context, ) ([]*AvailableNotionalByChain, error) { lookupStage1 := bson.D{ {Key: "$lookup", Value: bson.D{ {Key: "from", Value: "governorStatus"}, {Key: "localField", Value: "_id"}, {Key: "foreignField", Value: "_id"}, {Key: "as", Value: "status"}, }}, } unwindStage2 := bson.D{ {Key: "$unwind", Value: "$status"}, } projectStage3 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "configChains", Value: "$parsedConfig.chains"}, {Key: "statusChains", Value: "$status.parsedStatus.chains"}, }}, } unwindStage4 := bson.D{ {Key: "$unwind", Value: "$configChains"}, } unwindStage5 := bson.D{ {Key: "$unwind", Value: "$statusChains"}, } matchStage6 := bson.D{ {Key: "$match", Value: bson.D{ {Key: "$expr", Value: bson.D{ {Key: "$eq", Value: bson.A{"$configChains.chainid", "$statusChains.chainid"}}, }}, }}, } groupStage7 := bson.D{ {Key: "$group", Value: bson.D{ {Key: "_id", Value: "$configChains.chainid"}, {Key: "notionalLimits", Value: bson.D{ {Key: "$push", Value: bson.D{ {Key: "notionalLimit", Value: "$configChains.notionallimit"}, {Key: "maxTransactionSize", Value: "$configChains.bigtransactionsize"}, {Key: "availableNotional", Value: "$statusChains.remainingavailablenotional"}, }}, }}, }}, } projectStage8 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "governorLimit", Value: bson.D{ {Key: "$sortArray", Value: bson.D{ {Key: "input", Value: "$notionalLimits"}, {Key: "sortBy", Value: bson.D{ {Key: "availableNotional", Value: 1}, }}, }}, }}, }}, } projectStage9 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "governorLimit", Value: bson.M{ "$arrayElemAt": []interface{}{"$governorLimit", 0}, }}, }}, } projectStage10 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainId", Value: "$_id"}, {Key: "notionalLimit", Value: "$governorLimit.notionalLimit"}, {Key: "maxTransactionSize", Value: "$governorLimit.maxTransactionSize"}, {Key: "availableNotional", Value: "$governorLimit.availableNotional"}, }}, } sortStage11 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "chainId", Value: 1}, }}, } // define aggregate pipeline pipeLine := mongo.Pipeline{ lookupStage1, unwindStage2, projectStage3, unwindStage4, unwindStage5, matchStage6, groupStage7, projectStage8, projectStage9, projectStage10, sortStage11, } // execute aggregate operations. cur, err := r.collections.governorConfig.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get governor limit", zap.Error(err), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decodes to GovernorLimitV2. var availbleNotional []*AvailableNotionalByChain err = cur.All(ctx, &availbleNotional) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*AvailableNotionalByChain", zap.Error(err), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // check exists records if len(availbleNotional) == 0 { return nil, errs.ErrNotFound } return availbleNotional, nil } // GetTokenList get token lists. func (r *Repository) GetTokenList(ctx context.Context) ([]*TokenList, error) { projectStage1 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "tokens", Value: "$parsedConfig.tokens"}, }}, } unwindStage2 := bson.D{ {Key: "$unwind", Value: "$tokens"}, } projectStage3 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "originaddress", Value: "$tokens.originaddress"}, {Key: "originchainid", Value: "$tokens.originchainid"}, {Key: "price", Value: "$tokens.price"}, }}, } groupStage4 := bson.D{ {Key: "$group", Value: bson.D{ {Key: "_id", Value: bson.D{ {Key: "originaddress", Value: "$originaddress"}, {Key: "originchainid", Value: "$originchainid"}, }}, {Key: "prices", Value: bson.D{ {Key: "$push", Value: bson.D{ {Key: "price", Value: "$price"}, }}, }}, }}, } unwindStage5 := bson.D{ {Key: "$unwind", Value: "$prices"}, } groupStage6 := bson.D{ {Key: "$group", Value: bson.D{ {Key: "_id", Value: bson.D{ {Key: "_id", Value: "$_id"}, {Key: "prices", Value: "$prices"}, }}, {Key: "count", Value: bson.D{ {Key: "$sum", Value: 1}, }}, }}, } sortStage7 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "_id._id.originchainid", Value: 1}, {Key: "_id._id.originaddress", Value: 1}, {Key: "count", Value: -1}, }}, } groupStage8 := bson.D{ {Key: "$group", Value: bson.D{ {Key: "_id", Value: bson.D{ {Key: "originchainid", Value: "$_id._id.originchainid"}, {Key: "originaddress", Value: "$_id._id.originaddress"}, }}, {Key: "price", Value: bson.D{ {Key: "$first", Value: "$_id.prices"}, }}, {Key: "count", Value: bson.D{ {Key: "$first", Value: "$count"}, }}, }}, } sortStage9 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "_id.originchainid", Value: 1}, {Key: "_id.originaddress", Value: 1}, }}, } projectStage10 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "_id", Value: 0}, {Key: "originchainid", Value: "$_id.originchainid"}, {Key: "originaddress", Value: "$_id.originaddress"}, {Key: "price", Value: "$price.price"}, }}, } // define aggregate pipeline pipeLine := mongo.Pipeline{ projectStage1, unwindStage2, projectStage3, groupStage4, unwindStage5, groupStage6, sortStage7, groupStage8, sortStage9, projectStage10, } // execute aggregate operations. cur, err := r.collections.governorConfig.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get token list", zap.Error(err), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decodes to RawDocRecord. var tokens []*TokenList err = cur.All(ctx, &tokens) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*TokenList", zap.Error(err), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // check exists records if len(tokens) == 0 { return nil, errs.ErrNotFound } return tokens, nil } // GetEnqueuedVaas get enqueued vaas. func (r *Repository) GetEnqueuedVaas(ctx context.Context) ([]*EnqueuedVaaItem, error) { projectStage1 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chains", Value: "$parsedStatus.chains"}, }}, } unwindStage2 := bson.D{ {Key: "$unwind", Value: "$chains"}, } matchStage3 := bson.D{ {Key: "$match", Value: bson.D{ {Key: "$expr", Value: bson.D{ {Key: "$gt", Value: bson.A{"chains.emitters.totalenqueuedvaas", 0}}, }}, }}, } projectStage4 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainid", Value: "$chains.chainid"}, {Key: "emitters", Value: "$chains.emitters"}, }}, } unwindStage5 := bson.D{ {Key: "$unwind", Value: "$emitters"}, } projectStage6 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainid", Value: "$chainid"}, {Key: "emitteraddress", Value: "$emitters.emitteraddress"}, {Key: "enqueuedvaas", Value: "$emitters.enqueuedvaas"}, }}, } unwindStage7 := bson.D{ {Key: "$unwind", Value: "$enqueuedvaas"}, } projectStage8 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainid", Value: 1}, {Key: "emitteraddress", Value: 1}, {Key: "notionalvalue", Value: "$enqueuedvaas.notionalvalue"}, {Key: "releasetime", Value: "$enqueuedvaas.releasetime"}, {Key: "sequence", Value: "$enqueuedvaas.sequence"}, {Key: "txhash", Value: "$enqueuedvaas.txhash"}, }}, } sortStage9 := bson.D{ {Key: "$sort", Value: bson.D{ {Key: "chainId", Value: 1}, {Key: "emitteraddress", Value: 1}, {Key: "sequence", Value: 1}, {Key: "releasetime", Value: -1}, }}, } // define aggregate pipeline pipeLine := mongo.Pipeline{ projectStage1, unwindStage2, matchStage3, projectStage4, unwindStage5, projectStage6, unwindStage7, projectStage8, sortStage9, } // execute aggregate operations. cur, err := r.collections.governorStatus.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get enqueuedVAA", zap.Error(err), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } // decodes to []*EnqueuedVaaItem. var enqueuedVAA []*EnqueuedVaaItem err = cur.All(ctx, &enqueuedVAA) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*EnqueuedVaaItem", zap.Error(err), zap.String("requestID", requestID), ) return nil, errors.WithStack(err) } return enqueuedVAA, nil } type EnqueuedResponse struct { ChainID int64 `bson:"chainid" json:"chainID"` EmitterAddress string `bson:"emitteraddress" json:"emitterAddress"` Sequence string `bson:"sequence" json:"sequence"` } // IsVaaEnqueued check vaa is enqueued. func (r *Repository) IsVaaEnqueued( ctx context.Context, chainID vaa.ChainID, emitter *types.Address, sequence string, ) (bool, error) { projectStage1 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chains", Value: "$parsedStatus.chains"}, }}, } unwindStage2 := bson.D{ {Key: "$unwind", Value: "$chains"}, } projectStage3 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainid", Value: "$chains.chainid"}, {Key: "availableNotion", Value: "$chains.remainingavailablenotional"}, {Key: "emitters", Value: "$chains.emitters"}, }}, } unwindStage4 := bson.D{ {Key: "$unwind", Value: "$emitters"}, } unwindStage5 := bson.D{ {Key: "$unwind", Value: "$emitters.enqueuedvaas"}, } matchStage6 := bson.D{ {"$match", bson.D{ {"chainid", chainID}, {"emitters.emitteraddress", fmt.Sprintf("0x%s", emitter.Hex())}, {"emitters.enqueuedvaas.sequence", sequence}, }}, } projectStage7 := bson.D{ {Key: "$project", Value: bson.D{ {Key: "chainid", Value: "$chainid"}, {Key: "emitteraddress", Value: "$emitters.emitteraddress"}, {Key: "sequence", Value: "$emitters.enqueuedvaas.sequence"}, }}, } // define aggregate pipeline pipeLine := mongo.Pipeline{ projectStage1, unwindStage2, projectStage3, unwindStage4, unwindStage5, matchStage6, projectStage7, } // execute aggregate operations. cur, err := r.collections.governorStatus.Aggregate(ctx, pipeLine) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to execute Aggregate command to get token list", zap.Error(err), zap.String("requestID", requestID), ) return false, errors.WithStack(err) } // decodes to RawDocRecord. var response []*EnqueuedResponse err = cur.All(ctx, &response) if err != nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) r.logger.Error("failed to decode cursor into []*EnqueuedResponse", zap.Error(err), zap.String("requestID", requestID), ) return false, errors.WithStack(err) } // check exists records if len(response) == 0 { return false, nil } return true, nil }