Persist symbol and price for VAAs (#384)

### Description

Tracking issue: https://github.com/wormhole-foundation/wormhole-explorer/issues/377

This pull request modifies the analytics service so that it persists price information for a VAA in MongoDB. Data is stored only for token bridge VAAs, and only if the price is cached for the symbol being used.

A new collection `transferPrices` has been created, with the following document model:
```
{
  "_id": "22/0000000000000000000000000000000000000000000000000000000000000001/18087",
  "price": "0.999684",
  "symbol": "USDC",
  "timestamp": 2023-06-06T14:04:47.000+00:00
  "tokenAmount": "0.1",
  "usdAmount": "0.0999684"
}
```
This commit is contained in:
agodnic 2023-06-07 12:18:38 -03:00 committed by GitHub
parent b4c217e32c
commit 598db1b876
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 207 additions and 8 deletions

View File

@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"syscall"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
@ -22,6 +23,8 @@ import (
sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs"
health "github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
@ -50,6 +53,13 @@ func Run() {
logger := logger.New("wormhole-explorer-analytics", logger.WithLevel(config.LogLevel))
logger.Info("starting analytics service...")
// setup DB connection
logger.Info("connecting to MongoDB...")
db, err := NewDatabase(rootCtx, logger, config.MongodbURI, config.MongodbDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
// create influxdb client.
logger.Info("initializing InfluxDB client...")
influxCli := newInfluxClient(config.InfluxUrl, config.InfluxToken)
@ -57,7 +67,7 @@ func Run() {
// get health check functions.
logger.Info("creating health check functions...")
healthChecks, err := newHealthChecks(rootCtx, config, influxCli)
healthChecks, err := newHealthChecks(rootCtx, config, influxCli, db.Database)
if err != nil {
logger.Fatal("failed to create health checks", zap.Error(err))
}
@ -71,7 +81,7 @@ func Run() {
// create a metrics instance
logger.Info("initializing metrics instance...")
metric, err := metric.New(rootCtx, influxCli, config.InfluxOrganization, config.InfluxBucketInfinite,
metric, err := metric.New(rootCtx, db.Database, influxCli, config.InfluxOrganization, config.InfluxBucketInfinite,
config.InfluxBucket30Days, config.InfluxBucket24Hours, notionalCache, logger)
if err != nil {
logger.Fatal("failed to create metrics instance", zap.Error(err))
@ -89,7 +99,7 @@ func Run() {
server.Start()
// Waiting for signal
logger.Info("waiting for termination signal or context cancellation")
logger.Info("waiting for termination signal or context cancellation...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
@ -105,6 +115,8 @@ func Run() {
metric.Close()
logger.Info("closing HTTP server...")
server.Stop()
logger.Info("closing MongoDB connection...")
db.Close()
logger.Info("terminated successfully")
}
@ -161,12 +173,24 @@ func newInfluxClient(url, token string) influxdb2.Client {
return influxdb2.NewClient(url, token)
}
func newHealthChecks(ctx context.Context, config *config.Configuration, influxCli influxdb2.Client) ([]health.Check, error) {
func newHealthChecks(
ctx context.Context,
config *config.Configuration,
influxCli influxdb2.Client,
db *mongo.Database,
) ([]health.Check, error) {
awsConfig, err := newAwsConfig(ctx, config)
if err != nil {
return nil, err
}
return []health.Check{health.SQS(awsConfig, config.SQSUrl), health.Influx(influxCli)}, nil
healthChecks := []health.Check{
health.SQS(awsConfig, config.SQSUrl),
health.Influx(influxCli),
health.Mongo(db),
}
return healthChecks, nil
}
func newNotionalCache(
@ -187,3 +211,36 @@ func newNotionalCache(
return notionalCache, nil
}
// Database contains handles to MongoDB.
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// NewDatabase connects to DB and returns a client that will disconnect when the passed in context is cancelled.
func NewDatabase(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
const databaseCloseDeadline = 30 * time.Second
// Close attempts to gracefully Close the database connection.
func (d *Database) Close() error {
ctx, cancelFunc := context.WithDeadline(
context.Background(),
time.Now().Add(databaseCloseDeadline),
)
err := d.client.Disconnect(ctx)
cancelFunc()
return err
}

View File

@ -24,6 +24,8 @@ type Configuration struct {
InfluxBucketInfinite string `env:"INFLUX_BUCKET_INFINITE"`
InfluxBucket30Days string `env:"INFLUX_BUCKET_30_DAYS"`
InfluxBucket24Hours string `env:"INFLUX_BUCKET_24_HOURS"`
MongodbURI string `env:"MONGODB_URI,required"`
MongodbDatabase string `env:"MONGODB_DATABASE,required"`
PprofEnabled bool `env:"PPROF_ENABLED,default=false"`
P2pNetwork string `env:"P2P_NETWORK,required"`
CacheURL string `env:"CACHE_URL,required"`

View File

@ -14,11 +14,15 @@ import (
wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// Metric definition.
type Metric struct {
db *mongo.Database
// transferPrices contains the notional price for each token bridge transfer.
transferPrices *mongo.Collection
influxCli influxdb2.Client
apiBucketInfinite api.WriteAPIBlocking
apiBucket30Days api.WriteAPIBlocking
@ -30,6 +34,7 @@ type Metric struct {
// New create a new *Metric.
func New(
ctx context.Context,
db *mongo.Database,
influxCli influxdb2.Client,
organization string,
bucketInifite string,
@ -45,6 +50,8 @@ func New(
apiBucket24Hours.EnableBatching()
m := Metric{
db: db,
transferPrices: db.Collection("transferPrices"),
influxCli: influxCli,
apiBucketInfinite: apiBucketInfinite,
apiBucket24Hours: apiBucket24Hours,
@ -59,12 +66,29 @@ func New(
func (m *Metric) Push(ctx context.Context, vaa *sdk.VAA) error {
err1 := m.vaaCountMeasurement(ctx, vaa)
err2 := m.volumeMeasurement(ctx, vaa)
err3 := m.vaaCountAllMessagesMeasurement(ctx, vaa)
//TODO if we had go 1.20, we could just use `errors.Join(err1, err2, err3)` here.
if err1 != nil || err2 != nil || err3 != nil {
return fmt.Errorf("err1=%w, err2=%w, err3=%w", err1, err2, err3)
err4 := upsertTransferPrices(
m.logger,
vaa,
m.transferPrices,
func(symbol domain.Symbol, timestamp time.Time) (decimal.Decimal, error) {
priceData, err := m.notionalCache.Get(symbol)
if err != nil {
return decimal.NewFromInt(0), err
}
return priceData.NotionalUsd, nil
},
)
//TODO if we had go 1.20, we could just use `errors.Join(err1, err2, err3, ...)` here.
if err1 != nil || err2 != nil || err3 != nil || err4 != nil {
return fmt.Errorf("err1=%w, err2=%w, err3=%w err4=%w", err1, err2, err3, err4)
}
return nil

106
analytics/metric/mongo.go Normal file
View File

@ -0,0 +1,106 @@
package metric
import (
"context"
"fmt"
"time"
"github.com/shopspring/decimal"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// TransferPriceDoc models a document in the `transferPrices` collection
type TransferPriceDoc struct {
// ID is the unique identifier of the VAA for which we are storing price information.
ID string `bson:"_id"`
// Timestamp is the timestamp of the VAA for which we are storing price information.
Timestamp time.Time `bson:"timestamp"`
// Symbol is the trading symbol of the token being transferred.
Symbol string `bson:"symbol"`
// SymbolPriceUsd is the price of the token in USD at the moment of the transfer.
SymbolPriceUsd string `bson:"price"`
// TokenAmount is the amount of the token being transferred.
TokenAmount string `bson:"tokenAmount"`
// UsdAmount is the value in USD of the token being transferred.
UsdAmount string `bson:"usdAmount"`
}
func upsertTransferPrices(
logger *zap.Logger,
vaa *sdk.VAA,
transferPrices *mongo.Collection,
tokenPriceFunc func(symbol domain.Symbol, timestamp time.Time) (decimal.Decimal, error),
) error {
// Do not generate this metric for PythNet VAAs
if vaa.EmitterChain == sdk.ChainIDPythNet {
return nil
}
// Decode the VAA payload
payload, err := sdk.DecodeTransferPayloadHdr(vaa.Payload)
if err != nil {
return nil
}
// Get the token metadata
//
// This is complementary data about the token that is not present in the VAA itself.
tokenMeta, ok := domain.GetTokenByAddress(payload.OriginChain, payload.OriginAddress.String())
if !ok {
return nil
}
// Try to obtain the token notional value from the cache
notionalUSD, err := tokenPriceFunc(tokenMeta.Symbol, vaa.Timestamp)
if err != nil {
logger.Warn("failed to obtain notional for this token",
zap.String("vaaId", vaa.MessageID()),
zap.String("tokenAddress", payload.OriginAddress.String()),
zap.Uint16("tokenChain", uint16(payload.OriginChain)),
zap.Any("tokenMetadata", tokenMeta),
zap.Error(err),
)
return nil
}
// Compute the amount with decimals
var exp int32
if tokenMeta.Decimals > 8 {
exp = 8
} else {
exp = int32(tokenMeta.Decimals)
}
tokenAmount := decimal.NewFromBigInt(payload.Amount, -exp)
// Compute the amount in USD
usdAmount := tokenAmount.Mul(notionalUSD)
// Upsert the `TransferPrices` collection
update := bson.M{
"$set": TransferPriceDoc{
ID: vaa.MessageID(),
Timestamp: vaa.Timestamp,
Symbol: tokenMeta.Symbol.String(),
SymbolPriceUsd: notionalUSD.Truncate(8).String(),
TokenAmount: tokenAmount.Truncate(8).String(),
UsdAmount: usdAmount.Truncate(8).String(),
},
}
_, err = transferPrices.UpdateByID(
context.Background(),
vaa.MessageID(),
update,
options.Update().SetUpsert(true),
)
if err != nil {
return fmt.Errorf("failed to update transfer price collection: %w", err)
}
return nil
}

View File

@ -51,6 +51,16 @@ spec:
value: "{{ .PPROF_ENABLED }}"
- name: P2P_NETWORK
value: {{ .P2P_NETWORK }}
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb
key: mongo-uri
- name: MONGODB_DATABASE
valueFrom:
configMapKeyRef:
name: config
key: mongo-database
- name: INFLUX_URL
valueFrom:
configMapKeyRef: