2023-02-28 12:58:26 -08:00
|
|
|
package metric
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2023-05-04 16:17:03 -07:00
|
|
|
"fmt"
|
|
|
|
"math"
|
|
|
|
"math/big"
|
2023-03-07 11:25:42 -08:00
|
|
|
"strconv"
|
|
|
|
"time"
|
2023-02-28 12:58:26 -08:00
|
|
|
|
|
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
|
|
"github.com/influxdata/influxdb-client-go/v2/api"
|
2023-05-04 16:17:03 -07:00
|
|
|
wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional"
|
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
|
|
|
|
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
|
2023-02-28 12:58:26 -08:00
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Metric definition.
|
|
|
|
type Metric struct {
|
2023-05-10 14:18:32 -07:00
|
|
|
influxCli influxdb2.Client
|
|
|
|
apiBucketInfinite api.WriteAPIBlocking
|
|
|
|
apiBucket30Days api.WriteAPIBlocking
|
|
|
|
notionalCache wormscanNotionalCache.NotionalLocalCacheReadable
|
|
|
|
logger *zap.Logger
|
2023-02-28 12:58:26 -08:00
|
|
|
}
|
|
|
|
|
2023-03-07 11:25:42 -08:00
|
|
|
// New create a new *Metric.
|
2023-05-04 16:17:03 -07:00
|
|
|
func New(
|
|
|
|
ctx context.Context,
|
|
|
|
influxCli influxdb2.Client,
|
2023-05-10 14:18:32 -07:00
|
|
|
organization string,
|
|
|
|
bucketInifite string,
|
|
|
|
bucket30Days string,
|
|
|
|
notionalCache wormscanNotionalCache.NotionalLocalCacheReadable,
|
2023-05-04 16:17:03 -07:00
|
|
|
logger *zap.Logger,
|
|
|
|
) (*Metric, error) {
|
|
|
|
|
2023-05-10 14:18:32 -07:00
|
|
|
apiBucketInfinite := influxCli.WriteAPIBlocking(organization, bucketInifite)
|
|
|
|
apiBucket30Days := influxCli.WriteAPIBlocking(organization, bucket30Days)
|
2023-05-04 16:17:03 -07:00
|
|
|
|
|
|
|
m := Metric{
|
2023-05-10 14:18:32 -07:00
|
|
|
influxCli: influxCli,
|
|
|
|
apiBucketInfinite: apiBucketInfinite,
|
|
|
|
apiBucket30Days: apiBucket30Days,
|
|
|
|
logger: logger,
|
|
|
|
notionalCache: notionalCache,
|
2023-05-04 16:17:03 -07:00
|
|
|
}
|
|
|
|
return &m, nil
|
|
|
|
}
|
|
|
|
|
2023-03-07 11:25:42 -08:00
|
|
|
// Push implement MetricPushFunc definition.
|
2023-05-04 16:17:03 -07:00
|
|
|
func (m *Metric) Push(ctx context.Context, vaa *sdk.VAA) error {
|
|
|
|
|
|
|
|
err1 := m.vaaCountMeasurement(ctx, vaa)
|
|
|
|
err2 := m.volumeMeasurement(ctx, vaa)
|
|
|
|
|
|
|
|
//TODO if we had go 1.20, we could just use `errors.Join(err1, err2)` here.
|
2023-05-10 09:15:37 -07:00
|
|
|
if err1 != nil || err2 != nil {
|
|
|
|
return fmt.Errorf("err1=%w, err2=%w", err1, err2)
|
|
|
|
}
|
|
|
|
return nil
|
2023-02-28 12:58:26 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
// Close influx client.
|
|
|
|
func (m *Metric) Close() {
|
|
|
|
m.influxCli.Close()
|
|
|
|
}
|
|
|
|
|
2023-05-04 16:17:03 -07:00
|
|
|
// vaaCountMeasurement creates a new point for the `vaa_count` measurement.
|
|
|
|
func (m *Metric) vaaCountMeasurement(ctx context.Context, vaa *sdk.VAA) error {
|
2023-04-12 13:51:16 -07:00
|
|
|
|
2023-05-04 16:17:03 -07:00
|
|
|
const measurement = "vaa_count"
|
2023-02-28 12:58:26 -08:00
|
|
|
|
2023-05-04 16:17:03 -07:00
|
|
|
// Create a new point
|
2023-04-12 13:51:16 -07:00
|
|
|
point := influxdb2.
|
|
|
|
NewPointWithMeasurement(measurement).
|
|
|
|
AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))).
|
|
|
|
AddField("count", 1).
|
|
|
|
SetTime(vaa.Timestamp.Add(time.Nanosecond * time.Duration(vaa.Sequence)))
|
|
|
|
|
2023-05-04 16:17:03 -07:00
|
|
|
// Write the point to influx
|
2023-05-10 14:18:32 -07:00
|
|
|
err := m.apiBucket30Days.WritePoint(ctx, point)
|
2023-02-28 12:58:26 -08:00
|
|
|
if err != nil {
|
2023-04-12 13:51:16 -07:00
|
|
|
m.logger.Error("failed to write metric",
|
|
|
|
zap.String("measurement", measurement),
|
|
|
|
zap.Uint16("chain_id", uint16(vaa.EmitterChain)),
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
2023-02-28 12:58:26 -08:00
|
|
|
return err
|
|
|
|
}
|
2023-04-12 13:51:16 -07:00
|
|
|
|
2023-02-28 12:58:26 -08:00
|
|
|
return nil
|
|
|
|
}
|
2023-05-04 16:17:03 -07:00
|
|
|
|
|
|
|
// volumeMeasurement creates a new point for the `vaa_volume` measurement.
|
|
|
|
func (m *Metric) volumeMeasurement(ctx context.Context, vaa *sdk.VAA) error {
|
|
|
|
|
|
|
|
const measurement = "vaa_volume"
|
|
|
|
|
|
|
|
// Decode the VAA payload
|
|
|
|
//
|
|
|
|
// If the VAA didn't come from the portal token bridge, we just skip it.
|
|
|
|
payload, err := sdk.DecodeTransferPayloadHdr(vaa.Payload)
|
|
|
|
if err != nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the token metadata
|
|
|
|
//
|
|
|
|
// This is complementary data about the token that is not present in the VAA itself.
|
2023-05-08 13:51:18 -07:00
|
|
|
tokenMeta, ok := domain.GetTokenByAddress(payload.OriginChain, payload.OriginAddress.String())
|
2023-05-04 16:17:03 -07:00
|
|
|
if !ok {
|
2023-05-08 13:51:18 -07:00
|
|
|
m.logger.Debug("found no token metadata for VAA",
|
2023-05-04 16:17:03 -07:00
|
|
|
zap.String("vaaId", vaa.MessageID()),
|
|
|
|
zap.String("tokenAddress", payload.OriginAddress.String()),
|
|
|
|
zap.Uint16("tokenChain", uint16(payload.OriginChain)),
|
|
|
|
)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Normalize the amount to 8 decimals
|
|
|
|
amount := payload.Amount
|
|
|
|
if tokenMeta.Decimals < 8 {
|
|
|
|
|
|
|
|
// factor = 10 ^ (8 - tokenMeta.Decimals)
|
|
|
|
var factor big.Int
|
|
|
|
factor.Exp(big.NewInt(10), big.NewInt(int64(8-tokenMeta.Decimals)), nil)
|
|
|
|
|
|
|
|
amount = amount.Mul(amount, &factor)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Try to obtain the token notional value from the cache
|
|
|
|
notional, err := m.notionalCache.Get(tokenMeta.UnderlyingSymbol)
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Warn("failed to obtain notional for this token",
|
|
|
|
zap.String("vaaId", vaa.MessageID()),
|
|
|
|
zap.String("tokenAddress", payload.OriginAddress.String()),
|
|
|
|
zap.Uint16("tokenChain", uint16(payload.OriginChain)),
|
|
|
|
zap.Any("tokenMetadata", tokenMeta),
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Convert the notional value to an integer with an implicit precision of 8 decimals
|
|
|
|
notionalBigInt, err := floatToBigInt(notional.NotionalUsd)
|
|
|
|
if err != nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Calculate the volume, with an implicit precision of 8 decimals
|
|
|
|
var volume big.Int
|
|
|
|
volume.Mul(amount, notionalBigInt)
|
|
|
|
volume.Div(&volume, big.NewInt(1e8))
|
|
|
|
|
|
|
|
m.logger.Info("Pushing volume metrics",
|
|
|
|
zap.String("vaaId", vaa.MessageID()),
|
|
|
|
zap.String("amount", amount.String()),
|
|
|
|
zap.String("notional", notionalBigInt.String()),
|
|
|
|
zap.String("volume", volume.String()),
|
2023-05-08 13:51:18 -07:00
|
|
|
zap.String("underlyingSymbol", tokenMeta.UnderlyingSymbol.String()),
|
2023-05-04 16:17:03 -07:00
|
|
|
)
|
|
|
|
|
|
|
|
// Create a data point with volume-related fields
|
|
|
|
//
|
|
|
|
// We're converting big integers to int64 because influxdb doesn't support bigint/numeric types.
|
|
|
|
point := influxdb2.NewPointWithMeasurement(measurement).
|
2023-05-08 13:51:18 -07:00
|
|
|
// This is always set to the portal token bridge app ID, but we may have other apps in the future
|
2023-05-04 16:17:03 -07:00
|
|
|
AddTag("app_id", domain.AppIdPortalTokenBridge).
|
2023-05-08 13:51:18 -07:00
|
|
|
AddTag("emitter_chain", fmt.Sprintf("%d", vaa.EmitterChain)).
|
|
|
|
// Receiver chain
|
|
|
|
AddTag("destination_chain", fmt.Sprintf("%d", payload.TargetChain)).
|
|
|
|
// Original mint address
|
|
|
|
AddTag("token_address", payload.OriginAddress.String()).
|
|
|
|
// Original mint chain
|
|
|
|
AddTag("token_chain", fmt.Sprintf("%d", payload.OriginChain)).
|
|
|
|
// Amount of tokens transferred, integer, 8 decimals of precision
|
2023-05-10 14:18:32 -07:00
|
|
|
AddField("amount", amount.Uint64()).
|
2023-05-08 13:51:18 -07:00
|
|
|
// Token price at the time the VAA was processed, integer, 8 decimals of precision
|
|
|
|
//
|
|
|
|
// TODO: We should use the price at the time the VAA was emitted instead.
|
2023-05-10 14:18:32 -07:00
|
|
|
AddField("notional", notionalBigInt.Uint64()).
|
2023-05-08 13:51:18 -07:00
|
|
|
// Volume in USD, integer, 8 decimals of precision
|
2023-05-10 14:18:32 -07:00
|
|
|
AddField("volume", volume.Uint64()).
|
2023-05-04 16:17:03 -07:00
|
|
|
SetTime(vaa.Timestamp)
|
|
|
|
|
|
|
|
// Write the point to influx
|
2023-05-10 14:18:32 -07:00
|
|
|
err = m.apiBucketInfinite.WritePoint(ctx, point)
|
2023-05-04 16:17:03 -07:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// toInt converts a float64 into a big.Int with 8 decimals of implicit precision.
|
|
|
|
//
|
|
|
|
// If we ever upgrade the notional cache to store prices as big integers,
|
|
|
|
// this gnarly function won't be needed anymore.
|
|
|
|
func floatToBigInt(f float64) (*big.Int, error) {
|
|
|
|
|
|
|
|
integral, frac := math.Modf(f)
|
|
|
|
|
|
|
|
strIntegral := strconv.FormatFloat(integral, 'f', 0, 64)
|
|
|
|
strFrac := fmt.Sprintf("%.8f", frac)[2:]
|
|
|
|
|
|
|
|
i, err := strconv.ParseInt(strIntegral+strFrac, 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return big.NewInt(i), nil
|
|
|
|
}
|