From 1e5aeedfd96167c9f3ae49c11e55340bc99b0be4 Mon Sep 17 00:00:00 2001 From: walker-16 Date: Tue, 18 Jul 2023 11:33:17 -0300 Subject: [PATCH] =?UTF-8?q?Fix=20notional=20error=20with=20coingecko=20and?= =?UTF-8?q?=20add=20endpoint=20to=20push=20metrics=20in=E2=80=A6=20(#551)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix notional error with coingecko and add endpoint to push metrics in analytics Co-authored-by: ftocal --- analytics/cmd/service/run.go | 9 ++- analytics/http/{infrastructure => }/server.go | 9 ++- analytics/http/vaa/controller.go | 56 +++++++++++++++ analytics/http/vaa/repository.go | 37 ++++++++++ jobs/internal/coingecko/coingecko.go | 70 +++++++++++++------ jobs/jobs/notional/notional.go | 8 ++- 6 files changed, 162 insertions(+), 27 deletions(-) rename analytics/http/{infrastructure => }/server.go (76%) create mode 100644 analytics/http/vaa/controller.go create mode 100644 analytics/http/vaa/repository.go diff --git a/analytics/cmd/service/run.go b/analytics/cmd/service/run.go index df18477c..a4bed472 100644 --- a/analytics/cmd/service/run.go +++ b/analytics/cmd/service/run.go @@ -12,11 +12,13 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/go-redis/redis/v8" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/wormhole-foundation/wormhole-explorer/analytics/config" "github.com/wormhole-foundation/wormhole-explorer/analytics/consumer" - "github.com/wormhole-foundation/wormhole-explorer/analytics/http/infrastructure" + "github.com/wormhole-foundation/wormhole-explorer/analytics/http" + "github.com/wormhole-foundation/wormhole-explorer/analytics/http/vaa" "github.com/wormhole-foundation/wormhole-explorer/analytics/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/analytics/metric" "github.com/wormhole-foundation/wormhole-explorer/analytics/queue" @@ -99,7 +101,10 @@ func Run() { // create and start server. logger.Info("initializing infrastructure server...") - server := infrastructure.NewServer(logger, config.Port, config.PprofEnabled, healthChecks...) + + vaaRepository := vaa.NewRepository(db.Database, logger) + vaaController := vaa.NewController(metric.Push, vaaRepository, logger) + server := http.NewServer(logger, config.Port, config.PprofEnabled, vaaController, healthChecks...) server.Start() // Waiting for signal diff --git a/analytics/http/infrastructure/server.go b/analytics/http/server.go similarity index 76% rename from analytics/http/infrastructure/server.go rename to analytics/http/server.go index 4dc0f8d7..639826a7 100644 --- a/analytics/http/infrastructure/server.go +++ b/analytics/http/server.go @@ -1,9 +1,11 @@ -package infrastructure +package http import ( "github.com/ansrivas/fiberprometheus/v2" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/pprof" + "github.com/wormhole-foundation/wormhole-explorer/analytics/http/infrastructure" + "github.com/wormhole-foundation/wormhole-explorer/analytics/http/vaa" health "github.com/wormhole-foundation/wormhole-explorer/common/health" "go.uber.org/zap" ) @@ -14,7 +16,7 @@ type Server struct { logger *zap.Logger } -func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...health.Check) *Server { +func NewServer(logger *zap.Logger, port string, pprofEnabled bool, vaaController *vaa.Controller, checks ...health.Check) *Server { app := fiber.New(fiber.Config{DisableStartupMessage: true}) // Configure prometheus middleware @@ -27,10 +29,11 @@ func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...hea app.Use(pprof.New()) } - ctrl := NewController(checks, logger) + ctrl := infrastructure.NewController(checks, logger) api := app.Group("/api") api.Get("/health", ctrl.HealthCheck) api.Get("/ready", ctrl.ReadyCheck) + api.Post("/vaa/metrics", vaaController.PushVAAMetrics) return &Server{ app: app, diff --git a/analytics/http/vaa/controller.go b/analytics/http/vaa/controller.go new file mode 100644 index 00000000..5941e5bf --- /dev/null +++ b/analytics/http/vaa/controller.go @@ -0,0 +1,56 @@ +package vaa + +import ( + "github.com/gofiber/fiber/v2" + "github.com/wormhole-foundation/wormhole-explorer/analytics/metric" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" +) + +// Controller controller struct definition. +type Controller struct { + pushMetric metric.MetricPushFunc + repository *Repository + logger *zap.Logger +} + +// NewController create a new controller. +func NewController(pushMetric metric.MetricPushFunc, repository *Repository, logger *zap.Logger) *Controller { + return &Controller{pushMetric: pushMetric, repository: repository, logger: logger} +} + +// PushVAAMetrics push vaa metrics. +func (c *Controller) PushVAAMetrics(ctx *fiber.Ctx) error { + payload := struct { + ID string `json:"id"` + }{} + + if err := ctx.BodyParser(&payload); err != nil { + c.logger.Error("Error parsing request body", zap.Error(err)) + return err + } + + c.logger.Info("Push VAA from endpoint", zap.String("id", payload.ID)) + + vaaDoc, err := c.repository.FindById(ctx.Context(), payload.ID) + if err != nil { + c.logger.Error("Error finding VAA", zap.Error(err)) + return err + } + + vaa, err := sdk.Unmarshal(vaaDoc.Vaa) + if err != nil { + c.logger.Error("Error unmarshalling VAA", zap.Error(err)) + return err + } + + err = c.pushMetric(ctx.Context(), vaa) + if err != nil { + c.logger.Error("Error pushing metric", zap.Error(err)) + return err + } + + return ctx.Status(fiber.StatusOK).JSON(struct { + Push bool `json:"push"` + }{Push: true}) +} diff --git a/analytics/http/vaa/repository.go b/analytics/http/vaa/repository.go new file mode 100644 index 00000000..3bbcf851 --- /dev/null +++ b/analytics/http/vaa/repository.go @@ -0,0 +1,37 @@ +package vaa + +import ( + "context" + + "go.mongodb.org/mongo-driver/mongo" + "go.uber.org/zap" + "gopkg.in/mgo.v2/bson" +) + +// Repository repository struct definition. +type Repository struct { + db *mongo.Database + logger *zap.Logger + vaas *mongo.Collection +} + +// VaaDoc vaa document struct definition. +type VaaDoc struct { + ID string `bson:"_id" json:"id"` + Vaa []byte `bson:"vaas" json:"vaa"` +} + +// NewRepository create a new Repository. +func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository { + return &Repository{db: db, + logger: logger.With(zap.String("module", "VaaRepository")), + vaas: db.Collection("vaas"), + } +} + +// FindById find a vaa by id. +func (r *Repository) FindById(ctx context.Context, id string) (*VaaDoc, error) { + var vaaDoc VaaDoc + err := r.vaas.FindOne(ctx, bson.M{"_id": id}).Decode(&vaaDoc) + return &vaaDoc, err +} diff --git a/jobs/internal/coingecko/coingecko.go b/jobs/internal/coingecko/coingecko.go index a30e3c67..8c62b485 100644 --- a/jobs/internal/coingecko/coingecko.go +++ b/jobs/internal/coingecko/coingecko.go @@ -13,15 +13,17 @@ import ( // CoingeckoAPI is a client for the coingecko API type CoingeckoAPI struct { - url string - client *http.Client + url string + chunkSize int + client *http.Client } // NewCoingeckoAPI creates a new coingecko client func NewCoingeckoAPI(url string) *CoingeckoAPI { return &CoingeckoAPI{ - url: url, - client: http.DefaultClient, + url: url, + chunkSize: 200, + client: http.DefaultClient, } } @@ -33,25 +35,53 @@ type NotionalUSD struct { // GetNotionalUSD returns the notional USD value for the given ids // ids is a list of coingecko chain identifier. func (c *CoingeckoAPI) GetNotionalUSD(ids []string) (map[string]NotionalUSD, error) { - var response map[string]NotionalUSD - notionalUrl := fmt.Sprintf("%s/simple/price?ids=%s&vs_currencies=usd", c.url, strings.Join(ids, ",")) + response := map[string]NotionalUSD{} + chunksIds := chunkChainIds(ids, c.chunkSize) - req, err := http.NewRequest(http.MethodGet, notionalUrl, nil) - if err != nil { - return response, err - } - res, err := c.client.Do(req) - if err != nil { - return response, err - } - defer res.Body.Close() + // iterate over chunks of ids. + for _, chunk := range chunksIds { + notionalUrl := fmt.Sprintf("%s/simple/price?ids=%s&vs_currencies=usd", c.url, strings.Join(chunk, ",")) - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return response, err + req, err := http.NewRequest(http.MethodGet, notionalUrl, nil) + if err != nil { + return response, err + } + res, err := c.client.Do(req) + if err != nil { + return response, err + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return response, err + } + + chunkResponse := map[string]NotionalUSD{} + err = json.Unmarshal(body, &chunkResponse) + if err != nil { + return response, err + } + + // merge chunk response with response. + for k, v := range chunkResponse { + response[k] = v + } } - err = json.Unmarshal(body, &response) - return response, err + + return response, nil +} + +func chunkChainIds(slice []string, chunkSize int) [][]string { + var chunks [][]string + for i := 0; i < len(slice); i += chunkSize { + end := i + chunkSize + if end > len(slice) { + end = len(slice) + } + chunks = append(chunks, slice[i:end]) + } + return chunks } // GetChainIDs returns the coingecko chain ids for the given p2p network. diff --git a/jobs/jobs/notional/notional.go b/jobs/jobs/notional/notional.go index 0aa3592d..cc97a163 100644 --- a/jobs/jobs/notional/notional.go +++ b/jobs/jobs/notional/notional.go @@ -50,9 +50,11 @@ func (j *NotionalJob) Run() error { zap.Error(err)) return err } + j.logger.Info("found notionals", zap.Int("chainIDs", len(chainIDs)), zap.Int("notionals", len(coingeckoNotionals))) // convert notionals with coingecko assets ids to notionals with wormhole chainIDs. - notionals := convertToSymbols(coingeckoNotionals) + notionals := j.convertToSymbols(coingeckoNotionals) + j.logger.Info("convert to symbol", zap.Int("notionals", len(coingeckoNotionals)), zap.Int("symbols", len(notionals))) // save notional value of assets in cache. err = j.updateNotionalCache(notionals) @@ -91,7 +93,7 @@ func (j *NotionalJob) updateNotionalCache(notionals map[domain.Symbol]notional.P // convertToSymbols converts the coingecko response into a symbol map // // The returned map has symbols as keys, and price data as the values. -func convertToSymbols(m map[string]coingecko.NotionalUSD) map[domain.Symbol]notional.PriceData { +func (j *NotionalJob) convertToSymbols(m map[string]coingecko.NotionalUSD) map[domain.Symbol]notional.PriceData { w := make(map[domain.Symbol]notional.PriceData, len(m)) now := time.Now() @@ -100,12 +102,14 @@ func convertToSymbols(m map[string]coingecko.NotionalUSD) map[domain.Symbol]noti // Do not update the dictionary when the token price is nil if v.Price == nil { + j.logger.Info("skipping nil price", zap.String("coingeckoID", k)) continue } // Translate coingecko IDs into their associated ticker symbols tokenMeta, ok := domain.GetTokenByCoingeckoID(k) if !ok { + j.logger.Info("skipping unknown coingecko ID", zap.String("coingeckoID", k)) continue }