diff --git a/analytics/cmd/metrics/volume.go b/analytics/cmd/metrics/volume.go index a206d6fb..23cf2a1b 100644 --- a/analytics/cmd/metrics/volume.go +++ b/analytics/cmd/metrics/volume.go @@ -43,11 +43,11 @@ func (c *VaaConverter) Convert(vaaBytes []byte) (string, error) { } // Look up token metadata - tokenMetadata, ok := domain.GetTokenByAddress(vaa.EmitterChain, payload.OriginAddress.String()) + tokenMetadata, ok := domain.GetTokenByAddress(payload.OriginChain, payload.OriginAddress.String()) if !ok { // if not found, add to missing tokens - c.MissingTokens[payload.OriginAddress] = vaa.EmitterChain + c.MissingTokens[payload.OriginAddress] = payload.OriginChain c.MissingTokensCounter[payload.OriginAddress] = c.MissingTokensCounter[payload.OriginAddress] + 1 return "", fmt.Errorf("unknown token: %s %s", payload.OriginChain.String(), payload.OriginAddress.String()) @@ -58,7 +58,7 @@ func (c *VaaConverter) Convert(vaaBytes []byte) (string, error) { { p := metric.MakePointForVaaVolumeParams{ Vaa: vaa, - TokenPriceFunc: func(_ domain.Symbol, timestamp time.Time) (decimal.Decimal, error) { + TokenPriceFunc: func(_ string, timestamp time.Time) (decimal.Decimal, error) { // fetch the historic price from cache price, err := c.PriceCache.GetPriceByTime(tokenMetadata.CoingeckoID, timestamp) diff --git a/analytics/metric/metric.go b/analytics/metric/metric.go index 8d81401c..f0c2915a 100644 --- a/analytics/metric/metric.go +++ b/analytics/metric/metric.go @@ -85,9 +85,9 @@ func (m *Metric) Push(ctx context.Context, vaa *sdk.VAA) error { m.logger, vaa, m.transferPrices, - func(symbol domain.Symbol, timestamp time.Time) (decimal.Decimal, error) { + func(tokenID string, timestamp time.Time) (decimal.Decimal, error) { - priceData, err := m.notionalCache.Get(symbol) + priceData, err := m.notionalCache.Get(tokenID) if err != nil { return decimal.NewFromInt(0), err } @@ -192,9 +192,9 @@ func (m *Metric) volumeMeasurement(ctx context.Context, vaa *sdk.VAA) error { p := MakePointForVaaVolumeParams{ Logger: m.logger, Vaa: vaa, - TokenPriceFunc: func(symbol domain.Symbol, timestamp time.Time) (decimal.Decimal, error) { + TokenPriceFunc: func(tokenID string, timestamp time.Time) (decimal.Decimal, error) { - priceData, err := m.notionalCache.Get(symbol) + priceData, err := m.notionalCache.Get(tokenID) if err != nil { return decimal.NewFromInt(0), err } @@ -257,7 +257,7 @@ type MakePointForVaaVolumeParams struct { Vaa *sdk.VAA // TokenPriceFunc returns the price of the given token at the specified timestamp. - TokenPriceFunc func(symbol domain.Symbol, timestamp time.Time) (decimal.Decimal, error) + TokenPriceFunc func(tokenID string, timestamp time.Time) (decimal.Decimal, error) // Logger is an optional parameter, in case the caller wants additional visibility. Logger *zap.Logger @@ -349,7 +349,7 @@ func MakePointForVaaVolume(params *MakePointForVaaVolumeParams) (*write.Point, e } // Try to obtain the token notional value from the cache - notionalUSD, err := params.TokenPriceFunc(tokenMeta.Symbol, params.Vaa.Timestamp) + notionalUSD, err := params.TokenPriceFunc(tokenMeta.GetTokenID(), params.Vaa.Timestamp) if err != nil { params.Metrics.IncMissingNotional(tokenMeta.Symbol.String()) if params.Logger != nil { diff --git a/analytics/metric/mongo.go b/analytics/metric/mongo.go index 89c0685c..dc371b61 100644 --- a/analytics/metric/mongo.go +++ b/analytics/metric/mongo.go @@ -34,7 +34,7 @@ func upsertTransferPrices( logger *zap.Logger, vaa *sdk.VAA, transferPrices *mongo.Collection, - tokenPriceFunc func(symbol domain.Symbol, timestamp time.Time) (decimal.Decimal, error), + tokenPriceFunc func(tokenID string, timestamp time.Time) (decimal.Decimal, error), ) error { // Do not generate this metric for PythNet VAAs @@ -57,7 +57,7 @@ func upsertTransferPrices( } // Try to obtain the token notional value from the cache - notionalUSD, err := tokenPriceFunc(tokenMeta.Symbol, vaa.Timestamp) + notionalUSD, err := tokenPriceFunc(tokenMeta.GetTokenID(), vaa.Timestamp) if err != nil { logger.Warn("failed to obtain notional for this token", zap.String("vaaId", vaa.MessageID()), diff --git a/common/client/cache/notional/cache.go b/common/client/cache/notional/cache.go index 4691e9f8..396d1d22 100644 --- a/common/client/cache/notional/cache.go +++ b/common/client/cache/notional/cache.go @@ -10,14 +10,13 @@ import ( "github.com/go-redis/redis/v8" "github.com/shopspring/decimal" - "github.com/wormhole-foundation/wormhole-explorer/common/domain" "go.uber.org/zap" ) const ( - wormscanNotionalUpdated = "NOTIONAL_UPDATED" - wormscanNotionalCacheKeyRegex = "WORMSCAN:NOTIONAL:SYMBOL:*" - KeyFormatString = "WORMSCAN:NOTIONAL:SYMBOL:%s" + wormscanNotionalUpdated = "NOTIONAL_UPDATED" + wormscanTokenNotionalCacheKeyRegex = "WORMSCAN:NOTIONAL:TOKEN:*" + KeyTokenFormatString = "WORMSCAN:NOTIONAL:TOKEN:%s" ) var ( @@ -27,7 +26,7 @@ var ( // NotionalLocalCacheReadable is the interface for notional local cache. type NotionalLocalCacheReadable interface { - Get(symbol domain.Symbol) (PriceData, error) + Get(tokenID string) (PriceData, error) Close() error } @@ -48,7 +47,6 @@ func (p PriceData) MarshalBinary() ([]byte, error) { type NotionalCache struct { client *redis.Client pubSub *redis.PubSub - channel string notionalMap sync.Map prefix string logger *zap.Logger @@ -60,12 +58,10 @@ func NewNotionalCache(ctx context.Context, redisClient *redis.Client, prefix str if redisClient == nil { return nil, errors.New("redis client is nil") } - - pubsub := redisClient.Subscribe(ctx, channel) + pubsub := redisClient.Subscribe(ctx, formatChannel(prefix, channel)) return &NotionalCache{ client: redisClient, pubSub: pubsub, - channel: formatChannel(prefix, channel), notionalMap: sync.Map{}, prefix: prefix, logger: log}, nil @@ -131,6 +127,7 @@ func (c *NotionalCache) subscribe(ctx context.Context) { go func() { for msg := range ch { + c.logger.Info("receive message from channel", zap.String("channel", msg.Channel), zap.String("payload", msg.Payload)) if wormscanNotionalUpdated == msg.Payload { // update notional cache c.loadCache(ctx) @@ -145,11 +142,11 @@ func (c *NotionalCache) Close() error { } // Get notional cache value. -func (c *NotionalCache) Get(symbol domain.Symbol) (PriceData, error) { +func (c *NotionalCache) Get(tokenID string) (PriceData, error) { var notional PriceData // get notional cache key - key := fmt.Sprintf(KeyFormatString, symbol) + key := fmt.Sprintf(KeyTokenFormatString, tokenID) key = c.renderKey(key) // get notional cache value @@ -163,7 +160,7 @@ func (c *NotionalCache) Get(symbol domain.Symbol) (PriceData, error) { if !ok { c.logger.Error("invalid notional cache field", zap.Any("field", field), - zap.String("symbol", symbol.String())) + zap.String("tokenId", tokenID)) return notional, ErrInvalidCacheField } return notional, nil @@ -178,7 +175,7 @@ func (c *NotionalCache) renderKey(key string) string { } func (c *NotionalCache) renderRegExp() string { - return "*" + c.renderKey(wormscanNotionalCacheKeyRegex) + return "*" + c.renderKey(wormscanTokenNotionalCacheKeyRegex) } func formatChannel(prefix string, channel string) string { diff --git a/common/domain/tokens.go b/common/domain/tokens.go index 7ed26c63..959a38c3 100644 --- a/common/domain/tokens.go +++ b/common/domain/tokens.go @@ -30,6 +30,10 @@ var ( tokenMetadataByCoingeckoID = make(map[string]*TokenMetadata) ) +func (t *TokenMetadata) GetTokenID() string { + return fmt.Sprintf("%d/%s", t.TokenChain, t.TokenAddress) +} + func init() { for i := range tokenMetadata { diff --git a/jobs/jobs/notional/notional.go b/jobs/jobs/notional/notional.go index e9686113..3059a48d 100644 --- a/jobs/jobs/notional/notional.go +++ b/jobs/jobs/notional/notional.go @@ -77,10 +77,10 @@ func (j *NotionalJob) Run() error { } // updateNotionalCache updates the notional value of assets in cache. -func (j *NotionalJob) updateNotionalCache(notionals map[domain.Symbol]notional.PriceData) error { +func (j *NotionalJob) updateNotionalCache(notionals map[string]notional.PriceData) error { for chainID, n := range notionals { - key := j.renderKey(fmt.Sprintf(notional.KeyFormatString, chainID)) + key := j.renderKey(fmt.Sprintf(notional.KeyTokenFormatString, chainID)) err := j.cacheClient.Set(key, n, 0).Err() if err != nil { return err @@ -93,28 +93,24 @@ func (j *NotionalJob) updateNotionalCache(notionals map[domain.Symbol]notional.P // convertToSymbols converts the coingecko response into a symbol map // // The returned map has symbols as keys, and price data as the values. -func (j *NotionalJob) convertToSymbols(m map[string]coingecko.NotionalUSD) map[domain.Symbol]notional.PriceData { +func (j *NotionalJob) convertToSymbols(m map[string]coingecko.NotionalUSD) map[string]notional.PriceData { - w := make(map[domain.Symbol]notional.PriceData, len(m)) + w := make(map[string]notional.PriceData, len(m)) now := time.Now() - for k, v := range m { - - // Do not update the dictionary when the token price is nil - if v.Price == nil { - j.logger.Info("skipping nil price", zap.String("coingeckoID", k)) - continue - } - - // Translate coingecko IDs into their associated ticker symbols - tokenMeta, ok := domain.GetTokenByCoingeckoID(k) + for _, v := range domain.GetAllTokens() { + notionalUSD, ok := m[v.CoingeckoID] if !ok { - j.logger.Info("skipping unknown coingecko ID", zap.String("coingeckoID", k)) + j.logger.Info("skipping unknown coingecko ID", zap.String("coingeckoID", v.CoingeckoID)) continue } - - // Set price data for the current symbol - w[tokenMeta.Symbol] = notional.PriceData{NotionalUsd: *v.Price, UpdatedAt: now} + // Do not update the dictionary when the token price is nil + if notionalUSD.Price == nil { + j.logger.Info("skipping nil price", zap.String("coingeckoID", v.CoingeckoID)) + continue + } + // Set price data for the current token + w[v.GetTokenID()] = notional.PriceData{NotionalUsd: *notionalUSD.Price, UpdatedAt: now} } return w