From d9d49ec919853cc4eed4ebc8fd0db6c9921de63c Mon Sep 17 00:00:00 2001 From: agodnic Date: Thu, 4 May 2023 20:17:03 -0300 Subject: [PATCH] Add 24h volume to scorecards (#274) ### Summary This pull request adds volume metrics to influxdb. Also, it adds the 24h volume metric to `GET /api/v1/scorecards`. Tracking issues: https://github.com/wormhole-foundation/wormhole-explorer/issues/221, https://github.com/wormhole-foundation/wormhole-explorer/issues/280 ### Changes: * The `parser` service no longer generates metrics for influxdb. All metrics-related code was removed from that service, that code was moved to the analytics service instead. * New volume metrics were added to the analytics service. * The notional cache was modified to use token names (i.e.: ticker symbols) as keys instead of chain IDs. * The notional cache reader was moved to the `common/client/cache` package. * A little bit of duplicated code between the cache reader and writer was removed. * A 24h volume metric was added to `GET /api/v1/scorecards`. * A dictionary that stores token metadata was added under `common/domain/tokenbridge.go`. More tokens will be added to it in the near future. --- analytic/.gitignore | 4 +- analytic/cmd/main.go | 5 +- analytic/config/config.go | 2 + analytic/metric/metric.go | 193 ++++++++++++++++-- api/handlers/transactions/model.go | 3 + api/handlers/transactions/repository.go | 68 +++++- api/handlers/vaa/service.go | 21 +- api/main.go | 4 +- .../wormscan/transactions/controller.go | 4 +- api/routes/wormscan/transactions/response.go | 13 +- .../internal => common/client}/cache/cache.go | 11 +- .../client}/cache/dummycache.go | 4 +- .../client}/cache/notional/cache.go | 29 ++- .../client}/cache/notional/dummy.go | 8 +- common/domain/tokenbridge.go | 68 ++++++ jobs/.gitignore | 2 + jobs/internal/coingecko/coingecko.go | 7 +- jobs/jobs/notional/notional.go | 161 ++++++--------- parser/cmd/main.go | 5 +- parser/metrics/metrics.go | 50 ----- parser/processor/processor.go | 41 +--- 21 files changed, 448 insertions(+), 255 deletions(-) rename {api/internal => common/client}/cache/cache.go (88%) rename {api/internal => common/client}/cache/dummycache.go (83%) rename {api/internal => common/client}/cache/notional/cache.go (81%) rename {api/internal => common/client}/cache/notional/dummy.go (66%) create mode 100644 common/domain/tokenbridge.go create mode 100644 jobs/.gitignore delete mode 100644 parser/metrics/metrics.go diff --git a/analytic/.gitignore b/analytic/.gitignore index e40a4e46..6c0c3f40 100644 --- a/analytic/.gitignore +++ b/analytic/.gitignore @@ -1 +1,3 @@ -__debug_bin \ No newline at end of file +__debug_bin +.env +analytic diff --git a/analytic/cmd/main.go b/analytic/cmd/main.go index 61dab18f..10064a7c 100644 --- a/analytic/cmd/main.go +++ b/analytic/cmd/main.go @@ -59,7 +59,10 @@ func main() { } // create a metrics instance - metric := metric.New(influxCli, config.InfluxOrganization, config.InfluxBucket, logger) + metric, err := metric.New(rootCtx, influxCli, config, logger) + if err != nil { + logger.Fatal("failed to create metrics instance", zap.Error(err)) + } // create and start a consumer. vaaConsumeFunc := newVAAConsume(rootCtx, config, logger) diff --git a/analytic/config/config.go b/analytic/config/config.go index e9050046..5433df0c 100644 --- a/analytic/config/config.go +++ b/analytic/config/config.go @@ -24,6 +24,8 @@ type Configuration struct { InfluxBucket string `env:"INFLUX_BUCKET"` PprofEnabled bool `env:"PPROF_ENABLED,default=false"` P2pNetwork string `env:"P2P_NETWORK,required"` + CacheURL string `env:"CACHE_URL"` + CacheChannel string `env:"CACHE_CHANNEL"` } // New creates a configuration with the values from .env file and environment variables. diff --git a/analytic/metric/metric.go b/analytic/metric/metric.go index 8d68522e..8810ac66 100644 --- a/analytic/metric/metric.go +++ b/analytic/metric/metric.go @@ -2,31 +2,88 @@ package metric import ( "context" + "fmt" + "math" + "math/big" "strconv" "time" + "github.com/go-redis/redis/v8" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/wormhole-foundation/wormhole/sdk/vaa" + "github.com/wormhole-foundation/wormhole-explorer/analytic/config" + wormscanCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" + wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional" + "github.com/wormhole-foundation/wormhole-explorer/common/domain" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) // Metric definition. type Metric struct { - influxCli influxdb2.Client - writeApi api.WriteAPIBlocking - logger *zap.Logger + influxCli influxdb2.Client + writeApi api.WriteAPIBlocking + logger *zap.Logger + notionalCache wormscanNotionalCache.NotionalLocalCacheReadable } // New create a new *Metric. -func New(influxCli influxdb2.Client, organization, bucket string, logger *zap.Logger) *Metric { - writeAPI := influxCli.WriteAPIBlocking(organization, bucket) - return &Metric{influxCli: influxCli, writeApi: writeAPI, logger: logger} +func New( + ctx context.Context, + influxCli influxdb2.Client, + cfg *config.Configuration, + logger *zap.Logger, +) (*Metric, error) { + + writeAPI := influxCli.WriteAPIBlocking(cfg.InfluxOrganization, cfg.InfluxBucket) + + _, notionalCache, err := newCache(ctx, cfg, logger) + if err != nil { + return nil, err + } + + m := Metric{ + influxCli: influxCli, + writeApi: writeAPI, + logger: logger, + notionalCache: notionalCache, + } + return &m, nil +} + +func newCache( + ctx context.Context, + cfg *config.Configuration, + logger *zap.Logger, +) (wormscanCache.CacheReadable, wormscanNotionalCache.NotionalLocalCacheReadable, error) { + + // use a distributed cache and for notional a pubsub to sync local cache. + redisClient := redis.NewClient(&redis.Options{Addr: cfg.CacheURL}) + + // get cache client + cacheClient, err := wormscanCache.NewCacheClient(redisClient, true /*enabled*/, logger) + if err != nil { + return nil, nil, fmt.Errorf("failed to create cache client: %w", err) + } + + // get notional cache client and init load to local cache + notionalCache, err := wormscanNotionalCache.NewNotionalCache(ctx, redisClient, cfg.CacheChannel, logger) + if err != nil { + return nil, nil, fmt.Errorf("failed to create notional cache client: %w", err) + } + notionalCache.Init(ctx) + + return cacheClient, notionalCache, nil } // Push implement MetricPushFunc definition. -func (m *Metric) Push(ctx context.Context, vaa *vaa.VAA) error { - return m.vaaCountMeasurement(ctx, vaa) +func (m *Metric) Push(ctx context.Context, vaa *sdk.VAA) error { + + err1 := m.vaaCountMeasurement(ctx, vaa) + err2 := m.volumeMeasurement(ctx, vaa) + + //TODO if we had go 1.20, we could just use `errors.Join(err1, err2)` here. + return fmt.Errorf("err1=%w, err2=%w", err1, err2) } // Close influx client. @@ -34,19 +91,19 @@ func (m *Metric) Close() { m.influxCli.Close() } -// vaaCountMeasurement handle the push of metric point for measurement vaa_count. -func (m *Metric) vaaCountMeasurement(ctx context.Context, vaa *vaa.VAA) error { +// vaaCountMeasurement creates a new point for the `vaa_count` measurement. +func (m *Metric) vaaCountMeasurement(ctx context.Context, vaa *sdk.VAA) error { - measurement := "vaa_count" + const measurement = "vaa_count" - // Create a new point for the `vaa_count` measurement. + // Create a new point point := influxdb2. NewPointWithMeasurement(measurement). AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))). AddField("count", 1). SetTime(vaa.Timestamp.Add(time.Nanosecond * time.Duration(vaa.Sequence))) - // Write the point to influx. + // Write the point to influx err := m.writeApi.WritePoint(ctx, point) if err != nil { m.logger.Error("failed to write metric", @@ -59,3 +116,111 @@ func (m *Metric) vaaCountMeasurement(ctx context.Context, vaa *vaa.VAA) error { return nil } + +// volumeMeasurement creates a new point for the `vaa_volume` measurement. +func (m *Metric) volumeMeasurement(ctx context.Context, vaa *sdk.VAA) error { + + const measurement = "vaa_volume" + + // Decode the VAA payload + // + // If the VAA didn't come from the portal token bridge, we just skip it. + payload, err := sdk.DecodeTransferPayloadHdr(vaa.Payload) + if err != nil { + return nil + } + + // Get the token metadata + // + // This is complementary data about the token that is not present in the VAA itself. + tokenMeta, ok := domain.GetTokenMetadata(payload.OriginChain, "0x"+payload.OriginAddress.String()) + if !ok { + m.logger.Warn("found no token metadata for VAA", + zap.String("vaaId", vaa.MessageID()), + zap.String("tokenAddress", payload.OriginAddress.String()), + zap.Uint16("tokenChain", uint16(payload.OriginChain)), + ) + return nil + } + + // Normalize the amount to 8 decimals + amount := payload.Amount + if tokenMeta.Decimals < 8 { + + // factor = 10 ^ (8 - tokenMeta.Decimals) + var factor big.Int + factor.Exp(big.NewInt(10), big.NewInt(int64(8-tokenMeta.Decimals)), nil) + + amount = amount.Mul(amount, &factor) + } + + // Try to obtain the token notional value from the cache + notional, err := m.notionalCache.Get(tokenMeta.UnderlyingSymbol) + if err != nil { + m.logger.Warn("failed to obtain notional for this token", + zap.String("vaaId", vaa.MessageID()), + zap.String("tokenAddress", payload.OriginAddress.String()), + zap.Uint16("tokenChain", uint16(payload.OriginChain)), + zap.Any("tokenMetadata", tokenMeta), + zap.Error(err), + ) + return nil + } + + // Convert the notional value to an integer with an implicit precision of 8 decimals + notionalBigInt, err := floatToBigInt(notional.NotionalUsd) + if err != nil { + return nil + } + + // Calculate the volume, with an implicit precision of 8 decimals + var volume big.Int + volume.Mul(amount, notionalBigInt) + volume.Div(&volume, big.NewInt(1e8)) + + m.logger.Info("Pushing volume metrics", + zap.String("vaaId", vaa.MessageID()), + zap.String("amount", amount.String()), + zap.String("notional", notionalBigInt.String()), + zap.String("volume", volume.String()), + ) + + // Create a data point with volume-related fields + // + // We're converting big integers to int64 because influxdb doesn't support bigint/numeric types. + point := influxdb2.NewPointWithMeasurement(measurement). + AddTag("chain_source_id", fmt.Sprintf("%d", payload.OriginChain)). + AddTag("chain_destination_id", fmt.Sprintf("%d", payload.TargetChain)). + AddTag("app_id", domain.AppIdPortalTokenBridge). + AddField("amount", amount.Int64()). + AddField("notional", notionalBigInt.Int64()). + AddField("volume", volume.Int64()). + SetTime(vaa.Timestamp) + + // Write the point to influx + err = m.writeApi.WritePoint(ctx, point) + if err != nil { + return err + } + + return nil +} + +// toInt converts a float64 into a big.Int with 8 decimals of implicit precision. +// +// If we ever upgrade the notional cache to store prices as big integers, +// this gnarly function won't be needed anymore. +func floatToBigInt(f float64) (*big.Int, error) { + + integral, frac := math.Modf(f) + + strIntegral := strconv.FormatFloat(integral, 'f', 0, 64) + strFrac := fmt.Sprintf("%.8f", frac)[2:] + + i, err := strconv.ParseInt(strIntegral+strFrac, 10, 64) + if err != nil { + return nil, err + } + + return big.NewInt(i), nil +} diff --git a/api/handlers/transactions/model.go b/api/handlers/transactions/model.go index 9cb30020..f5b45764 100644 --- a/api/handlers/transactions/model.go +++ b/api/handlers/transactions/model.go @@ -13,6 +13,9 @@ type Scorecards struct { // Number of VAAs emitted in the last 24 hours (does not include Pyth messages). TxCount24h string + + // Volume transferred through the token bridge in the last 24 hours, in USD. + Volume24h string } type GlobalTransactionDoc struct { diff --git a/api/handlers/transactions/repository.go b/api/handlers/transactions/repository.go index e9d0bc9c..7bb3834b 100644 --- a/api/handlers/transactions/repository.go +++ b/api/handlers/transactions/repository.go @@ -60,6 +60,16 @@ from(bucket: "%s") |> count() ` +const queryTemplateVolume24h = ` +from(bucket: "%s") + |> range(start: -24h) + |> filter(fn: (r) => r._measurement == "vaa_volume") + |> filter(fn:(r) => r._field == "volume") + |> drop(columns: ["_measurement", "app_id", "chain_destination_id", "chain_source_id", "symbol"]) + |> sum(column: "_value") + |> toString() +` + type Repository struct { influxCli influxdb2.Client queryAPI api.QueryAPI @@ -119,20 +129,28 @@ func (r *Repository) buildFindVolumeQuery(q *ChainActivityQuery) string { func (r *Repository) GetScorecards(ctx context.Context) (*Scorecards, error) { - totalTxCount, err := r.getTotalTxCount(ctx) - if err != nil { - r.logger.Error("failed to query total transaction count", zap.Error(err)) - } + //TODO the underlying query in this code is not using pre-summarized data. + // We should fix that before re-enabling the metric. + //totalTxCount, err := r.getTotalTxCount(ctx) + //if err != nil { + // return nil, fmt.Errorf("failed to query all-time tx count") + //} txCount24h, err := r.getTxCount24h(ctx) if err != nil { return nil, fmt.Errorf("failed to query 24h transactions: %w", err) } + volume24h, err := r.getVolume24h(ctx) + if err != nil { + return nil, fmt.Errorf("failed to query 24h volume: %w", err) + } + // build the result and return scorecards := Scorecards{ - TotalTxCount: totalTxCount, - TxCount24h: txCount24h, + //TotalTxCount: totalTxCount, + TxCount24h: txCount24h, + Volume24h: volume24h, } return &scorecards, nil @@ -194,6 +212,44 @@ func (r *Repository) getTxCount24h(ctx context.Context) (string, error) { return fmt.Sprint(row.Value), nil } +func (r *Repository) getVolume24h(ctx context.Context) (string, error) { + + // query 24h volume + query := fmt.Sprintf(queryTemplateVolume24h, r.bucket) + result, err := r.queryAPI.Query(ctx, query) + if err != nil { + r.logger.Error("failed to query 24h volume", zap.Error(err)) + return "", err + } + if result.Err() != nil { + r.logger.Error("24h volume query result has errors", zap.Error(err)) + return "", result.Err() + } + if !result.Next() { + return "", errors.New("expected at least one record in 24h volume query result") + } + + // deserialize the row returned + row := struct { + Value string `mapstructure:"_value"` + }{} + if err := mapstructure.Decode(result.Record().Values(), &row); err != nil { + return "", fmt.Errorf("failed to decode 24h volume count query response: %w", err) + } + + // If there is less than 1 USD un volume, round it down to 0 to make math simpler in the next step + l := len(row.Value) + if l < 9 { + return "0.00000000", nil + } + + // Turn the integer amount into a decimal. + // The number always has 8 decimals, so we just need to insert a dot 8 digits from the end. + volume := row.Value[:l-8] + "." + row.Value[l-8:] + + return volume, nil +} + // GetTransactionCount get the last transactions. func (r *Repository) GetTransactionCount(ctx context.Context, q *TransactionCountQuery) ([]TransactionCountResult, error) { query := r.buildLastTrxQuery(q) diff --git a/api/handlers/vaa/service.go b/api/handlers/vaa/service.go index de01c49a..569931af 100644 --- a/api/handlers/vaa/service.go +++ b/api/handlers/vaa/service.go @@ -2,15 +2,15 @@ package vaa import ( "context" + "errors" "fmt" "strconv" - "github.com/pkg/errors" - "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache" errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" "github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination" "github.com/wormhole-foundation/wormhole-explorer/api/response" "github.com/wormhole-foundation/wormhole-explorer/api/types" + "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) @@ -24,7 +24,14 @@ type Service struct { // NewService creates a new VAA Service. func NewService(r *Repository, getCacheFunc cache.CacheGetFunc, logger *zap.Logger) *Service { - return &Service{repo: r, getCacheFunc: getCacheFunc, logger: logger.With(zap.String("module", "VaaService"))} + + s := Service{ + repo: r, + getCacheFunc: getCacheFunc, + logger: logger.With(zap.String("module", "VaaService")), + } + + return &s } // FindAllParams passes input data to the function `FindAll`. @@ -188,10 +195,12 @@ func (s *Service) discardVaaNotIndexed(ctx context.Context, chain vaa.ChainID, e key := fmt.Sprintf("%s:%d:%s", "wormscan:vaa-max-sequence", chain, emitter.Hex()) sequence, err := s.getCacheFunc(ctx, key) if err != nil { - if errors.Is(err, errs.ErrInternalError) { + if errors.Is(err, cache.ErrInternal) { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) - s.logger.Error("error getting value from cache", - zap.Error(err), zap.String("requestID", requestID)) + s.logger.Error("encountered an internal error while getting value from cache", + zap.Error(err), + zap.String("requestID", requestID), + ) } return false } diff --git a/api/main.go b/api/main.go index 72f20e21..0c7b30f6 100644 --- a/api/main.go +++ b/api/main.go @@ -28,8 +28,6 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/api/handlers/observations" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa" - wormscanCache "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache" - wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache/notional" "github.com/wormhole-foundation/wormhole-explorer/api/internal/config" "github.com/wormhole-foundation/wormhole-explorer/api/internal/db" "github.com/wormhole-foundation/wormhole-explorer/api/middleware" @@ -37,6 +35,8 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/api/routes/guardian" "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan" rpcApi "github.com/wormhole-foundation/wormhole-explorer/api/rpc" + wormscanCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" + wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional" xlogger "github.com/wormhole-foundation/wormhole-explorer/common/logger" "go.uber.org/zap" ) diff --git a/api/routes/wormscan/transactions/controller.go b/api/routes/wormscan/transactions/controller.go index 538b69ea..5be21d03 100644 --- a/api/routes/wormscan/transactions/controller.go +++ b/api/routes/wormscan/transactions/controller.go @@ -70,13 +70,15 @@ func (c *Controller) GetScorecards(ctx *fiber.Ctx) error { // Query indicators from the database scorecards, err := c.srv.GetScorecards(ctx.Context()) if err != nil { + c.logger.Error("failed to get scorecards", zap.Error(err)) return err } // Convert indicators to the response model response := ScorecardsResponse{ - TxCount24h: scorecards.TxCount24h, TotalTxCount: scorecards.TotalTxCount, + TxCount24h: scorecards.TxCount24h, + Volume24h: scorecards.Volume24h, } return ctx.JSON(response) diff --git a/api/routes/wormscan/transactions/response.go b/api/routes/wormscan/transactions/response.go index cf0e28dd..7c4aa114 100644 --- a/api/routes/wormscan/transactions/response.go +++ b/api/routes/wormscan/transactions/response.go @@ -24,18 +24,19 @@ type ChainActivity struct { type ScorecardsResponse struct { //TODO: we don't have the data for these fields yet, uncomment as the data becomes available. - //TVL string `json:"tvl"` - - //TotalVolume string `json:"total_volume"` + // Number of VAAs emitted in the last 24 hours (includes Pyth messages). + //Messages24h string `json:"24h_messages"` // Number of VAAs emitted since the creation of the network (does not include Pyth messages) TotalTxCount string `json:"total_tx_count,omitempty"` - //Volume24h string `json:"24h_volume"` + //TotalVolume string `json:"total_volume"` + + //TVL string `json:"tvl"` // Number of VAAs emitted in the last 24 hours (does not include Pyth messages). TxCount24h string `json:"24h_tx_count"` - // Number of VAAs emitted in the last 24 hours (includes Pyth messages). - //Messages24h string `json:"24h_messages"` + // Volume transferred through the token bridge in the last 24 hours, in USD. + Volume24h string `json:"24h_volume"` } diff --git a/api/internal/cache/cache.go b/common/client/cache/cache.go similarity index 88% rename from api/internal/cache/cache.go rename to common/client/cache/cache.go index f6fccd31..bea5ed24 100644 --- a/api/internal/cache/cache.go +++ b/common/client/cache/cache.go @@ -9,11 +9,14 @@ import ( "fmt" "github.com/go-redis/redis/v8" - errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" "go.uber.org/zap" ) -var ErrCacheNotEnabled = errors.New("CACHE NOT ENABLED") +var ( + ErrCacheNotEnabled = errors.New("CACHE NOT ENABLED") + ErrNotFound = errors.New("KEY NOT FOUND IN CACHE") + ErrInternal = errors.New("INTERNAL CACHE ERROR") +) // CacheClient redis cache client. type CacheClient struct { @@ -52,9 +55,9 @@ func (c *CacheClient) Get(ctx context.Context, key string) (string, error) { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) c.logger.Error("key does not exist in cache", zap.Error(err), zap.String("key", key), zap.String("requestID", requestID)) - return "", errs.ErrNotFound + return "", ErrNotFound } - return "", errs.ErrInternalError + return "", ErrInternal } return value, nil } diff --git a/api/internal/cache/dummycache.go b/common/client/cache/dummycache.go similarity index 83% rename from api/internal/cache/dummycache.go rename to common/client/cache/dummycache.go index d83d6a17..767263d8 100644 --- a/api/internal/cache/dummycache.go +++ b/common/client/cache/dummycache.go @@ -2,8 +2,6 @@ package cache import ( "context" - - errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" ) // DummyCacheClient dummy cache client. @@ -18,7 +16,7 @@ func NewDummyCacheClient() *DummyCacheClient { // Get get method is a dummy method that always does not find the cache. // Use this Get function when run development enviroment func (d *DummyCacheClient) Get(ctx context.Context, key string) (string, error) { - return "", errs.ErrNotFound + return "", ErrNotFound } // Close dummy cache client. diff --git a/api/internal/cache/notional/cache.go b/common/client/cache/notional/cache.go similarity index 81% rename from api/internal/cache/notional/cache.go rename to common/client/cache/notional/cache.go index 9f116432..68af076c 100644 --- a/api/internal/cache/notional/cache.go +++ b/common/client/cache/notional/cache.go @@ -9,13 +9,13 @@ import ( "time" "github.com/go-redis/redis/v8" - "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) const ( - wormscanNotionalCacheKeyRegex = "*WORMSCAN:NOTIONAL:CHAIN_ID:*" wormscanNotionalUpdated = "NOTIONAL_UPDATED" + wormscanNotionalCacheKeyRegex = "*WORMSCAN:NOTIONAL:SYMBOL:*" + KeyFormatString = "WORMSCAN:NOTIONAL:SYMBOL:%s" ) var ( @@ -25,16 +25,23 @@ var ( // NotionalLocalCacheReadable is the interface for notional local cache. type NotionalLocalCacheReadable interface { - Get(chainID vaa.ChainID) (NotionalCacheField, error) + Get(symbol string) (PriceData, error) Close() error } -// NotionalCacheField is the notional value of assets in cache. -type NotionalCacheField struct { +// PriceData is the notional value of assets in cache. +type PriceData struct { NotionalUsd float64 `json:"notional_usd"` UpdatedAt time.Time `json:"updated_at"` } +// MarshalBinary implements the encoding.BinaryMarshaler interface. +// +// This function is used when the notional job writes data to redis. +func (p PriceData) MarshalBinary() ([]byte, error) { + return json.Marshal(p) +} + // NotionalCacheClient redis cache client. type NotionalCache struct { client *redis.Client @@ -87,7 +94,7 @@ func (c *NotionalCache) loadCache(ctx context.Context) error { // Get notional value from keys for _, key := range keys { - var field NotionalCacheField + var field PriceData value, err := c.client.Get(ctx, key).Result() json.Unmarshal([]byte(value), &field) if err != nil { @@ -125,11 +132,11 @@ func (c *NotionalCache) Close() error { } // Get notional cache value. -func (c *NotionalCache) Get(chainID vaa.ChainID) (NotionalCacheField, error) { - var notional NotionalCacheField +func (c *NotionalCache) Get(symbol string) (PriceData, error) { + var notional PriceData // get notional cache key - key := fmt.Sprintf("WORMSCAN:NOTIONAL:CHAIN_ID:%d", chainID) + key := fmt.Sprintf(KeyFormatString, symbol) // get notional cache value field, ok := c.notionalMap.Load(key) @@ -138,11 +145,11 @@ func (c *NotionalCache) Get(chainID vaa.ChainID) (NotionalCacheField, error) { } // convert any field to NotionalCacheField - notional, ok = field.(NotionalCacheField) + notional, ok = field.(PriceData) if !ok { c.logger.Error("invalid notional cache field", zap.Any("field", field), - zap.Any("chainID", chainID)) + zap.String("symbol", symbol)) return notional, ErrInvalidCacheField } return notional, nil diff --git a/api/internal/cache/notional/dummy.go b/common/client/cache/notional/dummy.go similarity index 66% rename from api/internal/cache/notional/dummy.go rename to common/client/cache/notional/dummy.go index e63b43d8..d3b2d238 100644 --- a/api/internal/cache/notional/dummy.go +++ b/common/client/cache/notional/dummy.go @@ -1,9 +1,5 @@ package notional -import ( - "github.com/wormhole-foundation/wormhole/sdk/vaa" -) - // DummyNotionalCache is a dummy notional cache. type DummyNotionalCache struct { } @@ -14,8 +10,8 @@ func NewDummyNotionalCache() *DummyNotionalCache { } // Get get notional cache value. -func (c *DummyNotionalCache) Get(chainID vaa.ChainID) (NotionalCacheField, error) { - return NotionalCacheField{}, nil +func (c *DummyNotionalCache) Get(symbol string) (PriceData, error) { + return PriceData{}, nil } // Close the dummy cache. diff --git a/common/domain/tokenbridge.go b/common/domain/tokenbridge.go new file mode 100644 index 00000000..d2321641 --- /dev/null +++ b/common/domain/tokenbridge.go @@ -0,0 +1,68 @@ +package domain + +import ( + "fmt" + + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +// TokenMetadata contains information about a token supported by Portal Token Bridge. +type TokenMetadata struct { + // UnderlyingSymbol is the name that crypto exchanges use to list the underlying asset represented by this token. + // For example, the underlying symbol of the token "WFTM (wrapped fantom)" is "FTM". + UnderlyingSymbol string + Decimals uint8 +} + +// GetTokenMetadata returns information about a token identified by the pair (tokenChain, tokenAddr). +func GetTokenMetadata(tokenChain sdk.ChainID, tokenAddr string) (*TokenMetadata, bool) { + + key := fmt.Sprintf("%d-%s", tokenChain, tokenAddr) + + result, ok := tokenMetadata[key] + if !ok { + return nil, false + } + + // The variable `result` is a copy of the value in the map, + // so we can safely return it without worrying about it being modified. + return &result, true +} + +// tokenMetadata contains information about some of the tokens supported by Portal Token Bridge. +// +// The map is indexed by "-", which you can find on Token Bridge transfer payloads. +var tokenMetadata = map[string]TokenMetadata{ + // ETH - Ether (Portal) + // + // Examples: + // * https://api.staging.wormscan.io/api/v1/vaas/1/ec7372995d5cc8732397fb0ad35c0121e0eaa90d26f828a534cab54391b3a4f5/288088?parsedPayload=true + "2-0x000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2": { + UnderlyingSymbol: "ETH", + Decimals: 8, + }, + // UST (Wormhole) + // + // Examples: + // * https://api.staging.wormscan.io/api/v1/vaas/2/0000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585/111492?parsedPayload=true + "3-0x0100000000000000000000000000000000000000000000000000000075757364": { + UnderlyingSymbol: "UST", + Decimals: 8, + }, + // Binance-Peg BSC-USD + // + // Examples: + // * https://api.staging.wormscan.io/api/v1/vaas/4/000000000000000000000000b6f6d86a8f9879a9c87f643768d9efc38c1da6e7/242342?parsedPayload=true + "4-0x00000000000000000000000055d398326f99059ff775485246999027b3197955": { + UnderlyingSymbol: "BUSD", + Decimals: 8, + }, + // WFTM - Wrapped Fantom + // + // Examples: + // * https://api.staging.wormscan.io/api/v1/vaas/10/0000000000000000000000007c9fc5741288cdfdd83ceb07f3ea7e22618d79d2/25144?parsedPayload=true + "10-0x00000000000000000000000021be370d5312f44cb42ce377bc9b8a0cef1a4c83": { + UnderlyingSymbol: "FTM", + Decimals: 8, + }, +} diff --git a/jobs/.gitignore b/jobs/.gitignore new file mode 100644 index 00000000..80c422ae --- /dev/null +++ b/jobs/.gitignore @@ -0,0 +1,2 @@ +.env +jobs/main diff --git a/jobs/internal/coingecko/coingecko.go b/jobs/internal/coingecko/coingecko.go index 955a0045..aaa4ac87 100644 --- a/jobs/internal/coingecko/coingecko.go +++ b/jobs/internal/coingecko/coingecko.go @@ -82,7 +82,12 @@ func GetChainIDs(p2pNetwork string) []string { "optimism", "xpla", "bitcoin", - "base-protocol"} + "base-protocol", + "tether", + "usd-coin", + "binance-usd", + "terrausd-wormhole", + } } // TODO: define chains ids for testnet. return []string{} diff --git a/jobs/jobs/notional/notional.go b/jobs/jobs/notional/notional.go index ab18fcfa..c61344fb 100644 --- a/jobs/jobs/notional/notional.go +++ b/jobs/jobs/notional/notional.go @@ -2,18 +2,16 @@ package notional import ( - "encoding/json" "fmt" "time" "github.com/go-redis/redis" + "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional" "github.com/wormhole-foundation/wormhole-explorer/jobs/internal/coingecko" - "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) -// NotionalCacheKey is the cache key for notional value by chainID -const NotionalCacheKey = "WORMSCAN:NOTIONAL:CHAIN_ID:%d" +type Symbol string // NotionalJob is the job to get the notional value of assets. type NotionalJob struct { @@ -52,7 +50,7 @@ func (j *NotionalJob) Run() error { } // convert notionals with coingecko assets ids to notionals with wormhole chainIDs. - notionals := convertToWormholeChainIDs(coingeckoNotionals) + notionals := convertToSymbols(coingeckoNotionals) // save notional value of assets in cache. err = j.updateNotionalCache(notionals) @@ -75,138 +73,101 @@ func (j *NotionalJob) Run() error { } // updateNotionalCache updates the notional value of assets in cache. -func (j *NotionalJob) updateNotionalCache(notionals map[vaa.ChainID]NotionalCacheField) error { - for chainID, notional := range notionals { - key := fmt.Sprintf(NotionalCacheKey, chainID) - err := j.cacheClient.Set(key, notional, 0).Err() +func (j *NotionalJob) updateNotionalCache(notionals map[Symbol]notional.PriceData) error { + + for chainID, n := range notionals { + key := fmt.Sprintf(notional.KeyFormatString, chainID) + err := j.cacheClient.Set(key, n, 0).Err() if err != nil { return err } } + return nil } -// NotionalCacheField is the notional value of assets in cache. -type NotionalCacheField struct { - NotionalUsd float64 `json:"notional_usd"` - UpdatedAt time.Time `json:"updated_at"` -} +// convertToSymbols converts the coingecko response into a symbol map +// +// The returned map has symbols as keys, and price data as the values. +func convertToSymbols(m map[string]coingecko.NotionalUSD) map[Symbol]notional.PriceData { -// MarshalBinary implements the encoding.BinaryMarshaler interface. -func (n NotionalCacheField) MarshalBinary() ([]byte, error) { - return json.Marshal(n) -} - -// convertToWormholeChainIDs converts the coingecko chain ids to wormhole chain ids. -func convertToWormholeChainIDs(m map[string]coingecko.NotionalUSD) map[vaa.ChainID]NotionalCacheField { - w := make(map[vaa.ChainID]NotionalCacheField, len(m)) + w := make(map[Symbol]notional.PriceData, len(m)) now := time.Now() + for k, v := range m { + + // Do not update the dictionary when the token price is nil + if v.Price == nil { + continue + } + + var symbol Symbol + switch k { case "solana": - if v.Price != nil { - w[vaa.ChainIDSolana] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "SOL" case "ethereum": - if v.Price != nil { - w[vaa.ChainIDEthereum] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "ETH" case "terra-luna": - if v.Price != nil { - w[vaa.ChainIDTerra] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "LUNC" case "binancecoin": - if v.Price != nil { - w[vaa.ChainIDBSC] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "BNB" case "matic-network": - if v.Price != nil { - w[vaa.ChainIDPolygon] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "MATIC" case "avalanche-2": - if v.Price != nil { - w[vaa.ChainIDAvalanche] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "AVAX" case "oasis-network": - if v.Price != nil { - w[vaa.ChainIDOasis] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "ROSE" case "algorand": - if v.Price != nil { - w[vaa.ChainIDAlgorand] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "ALGO" case "aurora": - if v.Price != nil { - w[vaa.ChainIDAurora] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "AURORA" case "fantom": - if v.Price != nil { - w[vaa.ChainIDFantom] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "FTM" case "karura": - if v.Price != nil { - w[vaa.ChainIDKarura] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "KAR" case "acala": - if v.Price != nil { - w[vaa.ChainIDAcala] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "ACA" case "klay-token": - if v.Price != nil { - w[vaa.ChainIDKlaytn] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "KLAY" case "celo": - if v.Price != nil { - w[vaa.ChainIDCelo] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "CELO" case "near": - if v.Price != nil { - w[vaa.ChainIDNear] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "NEAR" case "moonbeam": - if v.Price != nil { - w[vaa.ChainIDMoonbeam] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "GLMR" case "neon": - if v.Price != nil { - w[vaa.ChainIDNeon] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "NEON" case "terra-luna-2": - if v.Price != nil { - w[vaa.ChainIDTerra2] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "LUNA" case "injective-protocol": - if v.Price != nil { - w[vaa.ChainIDInjective] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "INJ" case "aptos": - if v.Price != nil { - w[vaa.ChainIDAptos] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "APT" case "sui": - if v.Price != nil { - w[vaa.ChainIDSui] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "SUI" case "arbitrum": - if v.Price != nil { - w[vaa.ChainIDArbitrum] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "ARB" case "optimism": - if v.Price != nil { - w[vaa.ChainIDOptimism] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "OP" case "xpla": - if v.Price != nil { - w[vaa.ChainIDXpla] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "XPLA" case "bitcoin": - if v.Price != nil { - w[vaa.ChainIDBtc] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "BTC" case "base-protocol": - if v.Price != nil { - w[vaa.ChainIDBase] = NotionalCacheField{NotionalUsd: *v.Price, UpdatedAt: now} - } + symbol = "BASE" + case "tether": + symbol = "USDT" + case "usd-coin": + symbol = "USDC" + case "binance-usd": + symbol = "BUSD" + case "terrausd-wormhole": + symbol = "UST" + } + + if symbol != "" { + w[symbol] = notional.PriceData{NotionalUsd: *v.Price, UpdatedAt: now} } } return w diff --git a/parser/cmd/main.go b/parser/cmd/main.go index 3cf1453f..c10ad3f2 100644 --- a/parser/cmd/main.go +++ b/parser/cmd/main.go @@ -17,7 +17,6 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/parser/http/infrastructure" "github.com/wormhole-foundation/wormhole-explorer/parser/internal/db" "github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs" - "github.com/wormhole-foundation/wormhole-explorer/parser/metrics" "github.com/wormhole-foundation/wormhole-explorer/parser/parser" "github.com/wormhole-foundation/wormhole-explorer/parser/processor" "github.com/wormhole-foundation/wormhole-explorer/parser/queue" @@ -66,9 +65,7 @@ func main() { repository := parser.NewRepository(db.Database, logger) //create a processor - influxCli := newInfluxClient(config.InfluxUrl, config.InfluxToken) - metrics := metrics.New(influxCli, config.InfluxOrg, config.InfluxBucket, logger) - processor := processor.New(repository, metrics, logger) + processor := processor.New(repository, logger) // create and start a consumer consumer := consumer.New(vaaConsumeFunc, processor.Process, parserVAAAPIClient, logger) diff --git a/parser/metrics/metrics.go b/parser/metrics/metrics.go deleted file mode 100644 index 33056bd8..00000000 --- a/parser/metrics/metrics.go +++ /dev/null @@ -1,50 +0,0 @@ -package metrics - -import ( - "context" - "fmt" - "time" - - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api" - "go.uber.org/zap" -) - -const measurement = "vaa_volume" - -// Metric definition. -type Metrics struct { - influxCli influxdb2.Client - writeApi api.WriteAPIBlocking - logger *zap.Logger -} - -type Volume struct { - ChainSourceID uint16 - ChainDestinationID uint16 - Value uint64 - Timestamp time.Time - AppID string -} - -// New create a new *Metric -func New(influxCli influxdb2.Client, organization, bucket string, logger *zap.Logger) *Metrics { - writeAPI := influxCli.WriteAPIBlocking(organization, bucket) - return &Metrics{influxCli: influxCli, writeApi: writeAPI, logger: logger} -} - -func (m *Metrics) PushVolume(ctx context.Context, v *Volume) error { - point := influxdb2.NewPointWithMeasurement(measurement). - AddTag("chain_source_id", fmt.Sprintf("%d", v.ChainSourceID)). - AddTag("chain_destination_id", fmt.Sprintf("%d", v.ChainDestinationID)). - AddField("volume", v.Value). - AddField("app_id", v.AppID). - SetTime(v.Timestamp) - - // write point to influx - err := m.writeApi.WritePoint(ctx, point) - if err != nil { - return err - } - return nil -} diff --git a/parser/processor/processor.go b/parser/processor/processor.go index a97080e5..7e2a7574 100644 --- a/parser/processor/processor.go +++ b/parser/processor/processor.go @@ -3,9 +3,6 @@ package processor import ( "context" - "github.com/mitchellh/mapstructure" - "github.com/wormhole-foundation/wormhole-explorer/common/domain" - "github.com/wormhole-foundation/wormhole-explorer/parser/metrics" "github.com/wormhole-foundation/wormhole-explorer/parser/parser" "go.uber.org/zap" ) @@ -24,14 +21,12 @@ type portalTokenBridgePayload struct { type Processor struct { repository *parser.Repository - metrics *metrics.Metrics logger *zap.Logger } -func New(repository *parser.Repository, metrics *metrics.Metrics, logger *zap.Logger) *Processor { +func New(repository *parser.Repository, logger *zap.Logger) *Processor { return &Processor{ repository: repository, - metrics: metrics, logger: logger, } } @@ -46,38 +41,6 @@ func (p *Processor) Process(ctx context.Context, vaaParsed *parser.ParsedVaaUpda return err } - p.logger.Info("Vaa save in repository", zap.String("id", vaaParsed.ID)) - - if vaaParsed.AppID == domain.AppIdPortalTokenBridge { - input, ok := vaaParsed.Result.(map[string]interface{}) - if ok { - var result portalTokenBridgePayload - err := mapstructure.Decode(input, &result) - if err != nil { - p.logger.Warn("Decoding map to payload struct", zap.String("id", vaaParsed.ID), zap.Error(err)) - return nil - } - if result.PayloadType == transferPayloadType || result.PayloadType == transferWithPayloadPayloadType { - if result.Amount == nil || result.ToChainID == nil { - p.logger.Warn("amount or toChain are empty", zap.String("id", vaaParsed.ID), zap.Any("payload", input)) - return nil - } - metric := &metrics.Volume{ - ChainSourceID: vaaParsed.EmitterChain, - ChainDestinationID: *result.ToChainID, - Value: *result.Amount, - Timestamp: vaaParsed.Timestamp, - AppID: vaaParsed.AppID, - } - err := p.metrics.PushVolume(ctx, metric) - if err != nil { - return err - } - } - } else { - p.logger.Warn("Casting parsed vaa to map", zap.String("id", vaaParsed.ID)) - } - } - + p.logger.Info("parsed VAA was successfully persisted", zap.String("id", vaaParsed.ID)) return nil }