Add backfiller for analytics from mongodb (#562)

This commit is contained in:
ftocal 2023-07-25 15:36:05 -03:00 committed by GitHub
parent 8fc89e35b2
commit 13819e2d7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 403 additions and 167 deletions

View File

@ -71,24 +71,64 @@ func addVaaCountCommand(parent *cobra.Command) {
parent.AddCommand(vaaCountCmd)
}
func addVaaVolumeCommand(parent *cobra.Command) {
func addVaaVolumeFromFileCommand(parent *cobra.Command) {
var input, output, prices string
vaaVolumeCmd := &cobra.Command{
Use: "vaa-volume",
//vaa-volume from csv file
vaaVolumeFileCmd := &cobra.Command{
Use: "file",
Short: "Generate volume metrics from a VAA csv file",
Run: func(_ *cobra.Command, _ []string) {
metrics.RunVaaVolume(input, output, prices)
metrics.RunVaaVolumeFromFile(input, output, prices)
},
}
// input flag
vaaVolumeCmd.Flags().StringVar(&input, "input", "", "path to input vaa file")
vaaVolumeCmd.MarkFlagRequired("input")
// output flag
vaaVolumeCmd.Flags().StringVar(&output, "output", "", "path to output file")
vaaVolumeCmd.MarkFlagRequired("output")
// prices flag
vaaVolumeCmd.Flags().StringVar(&prices, "prices", "prices.csv", "path to prices file")
// input flag
vaaVolumeFileCmd.Flags().StringVar(&input, "input", "", "path to input vaa file")
vaaVolumeFileCmd.MarkFlagRequired("input")
// output flag
vaaVolumeFileCmd.Flags().StringVar(&output, "output", "", "path to output file")
vaaVolumeFileCmd.MarkFlagRequired("output")
// prices flag
vaaVolumeFileCmd.Flags().StringVar(&prices, "prices", "prices.csv", "path to prices file")
parent.AddCommand(vaaVolumeFileCmd)
}
func addVaaVolumeFromMongoCommand(parent *cobra.Command) {
var mongoUri, mongoDb, output, prices string
//vaa-volume from MongoDB
vaaVolumeMongoCmd := &cobra.Command{
Use: "mongo",
Short: "Generate volume metrics from MongoDB",
Run: func(_ *cobra.Command, _ []string) {
metrics.RunVaaVolumeFromMongo(mongoUri, mongoDb, output, prices)
},
}
//mongo flags
vaaVolumeMongoCmd.Flags().StringVar(&mongoUri, "mongo-uri", "", "Mongo connection")
vaaVolumeMongoCmd.Flags().StringVar(&mongoDb, "mongo-database", "", "Mongo database")
// output flag
vaaVolumeMongoCmd.Flags().StringVar(&output, "output", "", "path to output file")
vaaVolumeMongoCmd.MarkFlagRequired("output")
// prices flag
vaaVolumeMongoCmd.Flags().StringVar(&prices, "prices", "prices.csv", "path to prices file")
parent.AddCommand(vaaVolumeMongoCmd)
}
func addVaaVolumeCommand(parent *cobra.Command) {
vaaVolumeCmd := &cobra.Command{
Use: "vaa-volume",
Short: "Generate volume metric",
}
addVaaVolumeFromFileCommand(vaaVolumeCmd)
addVaaVolumeFromMongoCommand(vaaVolumeCmd)
parent.AddCommand(vaaVolumeCmd)
}

134
analytics/cmd/metrics/volume.go Executable file → Normal file
View File

@ -1,13 +1,8 @@
package metrics
import (
"bufio"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/influxdata/influxdb-client-go/v2/api/write"
@ -16,102 +11,18 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/analytics/metric"
"github.com/wormhole-foundation/wormhole-explorer/analytics/prices"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type LineParser struct {
type VaaConverter struct {
MissingTokens map[sdk.Address]sdk.ChainID
MissingTokensCounter map[sdk.Address]int
PriceCache *prices.CoinPricesCache
Metrics metrics.Metrics
}
// read a csv file with VAAs and convert into a decoded csv file
// ready to upload to the database
func RunVaaVolume(inputFile, outputFile, pricesFile string) {
// build logger
logger := logger.New("wormhole-explorer-analytics")
logger.Info("starting wormhole-explorer-analytics ...")
// open input file
f, err := os.Open(inputFile)
if err != nil {
logger.Fatal("opening input file", zap.Error(err))
}
defer f.Close()
// create missing tokens file
missingTokensFile := "missing_tokens.csv"
fmissingTokens, err := os.Create(missingTokensFile)
if err != nil {
logger.Fatal("creating missing tokens file", zap.Error(err))
}
defer fmissingTokens.Close()
//open output file for writing
fout, err := os.Create(outputFile)
if err != nil {
logger.Fatal("creating output file", zap.Error(err))
}
defer fout.Close()
// init price cache!
logger.Info("loading historical prices...")
lp := NewLineParser(pricesFile)
lp.PriceCache.InitCache()
logger.Info("loaded historical prices")
r := bufio.NewReader(f)
c := 0
i := 0
// read file line by line and send to workpool
for {
line, _, err := r.ReadLine() //loading chunk into buffer
if err != nil {
if err == io.EOF {
break
}
logger.Fatal("a real error happened here", zap.Error(err))
}
nl, err := lp.ParseLine(line)
if err != nil {
//fmt.Printf(",")
} else {
c++
fout.Write([]byte(nl))
if c == 10000 {
fmt.Printf(".")
c = 0
i := i + 1
if i == 10 {
fmt.Printf("\n")
i = 0
}
}
}
}
for k := range lp.MissingTokensCounter {
fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), lp.MissingTokens[k], lp.MissingTokensCounter[k]))
}
logger.Info("missing tokens", zap.Int("count", len(lp.MissingTokens)))
logger.Info("finished wormhole-explorer-analytics")
}
func NewLineParser(filename string) *LineParser {
priceCache := prices.NewCoinPricesCache(filename)
return &LineParser{
func NewVaaConverter(priceCache *prices.CoinPricesCache) *VaaConverter {
return &VaaConverter{
MissingTokens: make(map[sdk.Address]sdk.ChainID),
MissingTokensCounter: make(map[sdk.Address]int),
PriceCache: priceCache,
@ -119,31 +30,16 @@ func NewLineParser(filename string) *LineParser {
}
}
// ParseLine takes a CSV line as input, and generates a line protocol entry as output.
//
// The format for InfluxDB line protocol is: vaa,tags fields timestamp
func (lp *LineParser) ParseLine(line []byte) (string, error) {
func (c *VaaConverter) Convert(vaaBytes []byte) (string, error) {
// Parse the VAA and payload
var vaa *sdk.VAA
var payload *sdk.TransferPayloadHdr
{
tt := strings.Split(string(line), ",")
if len(tt) != 2 {
return "", fmt.Errorf("expected line to have two tokens, but has %d: %s", len(tt), line)
}
vaaBytes, err := hex.DecodeString(tt[1])
if err != nil {
return "", fmt.Errorf("error decoding: %v", err)
}
vaa, err = sdk.Unmarshal(vaaBytes)
if err != nil {
return "", fmt.Errorf("error unmarshaling vaa: %v", err)
}
payload, err = sdk.DecodeTransferPayloadHdr(vaa.Payload)
if err != nil {
return "", fmt.Errorf("error decoding payload: %v", err)
}
vaa, err := sdk.Unmarshal(vaaBytes)
if err != nil {
return "", fmt.Errorf("error unmarshaling vaa: %v", err)
}
payload, err := sdk.DecodeTransferPayloadHdr(vaa.Payload)
if err != nil {
return "", fmt.Errorf("error decoding payload: %v", err)
}
// Look up token metadata
@ -151,8 +47,8 @@ func (lp *LineParser) ParseLine(line []byte) (string, error) {
if !ok {
// if not found, add to missing tokens
lp.MissingTokens[payload.OriginAddress] = vaa.EmitterChain
lp.MissingTokensCounter[payload.OriginAddress] = lp.MissingTokensCounter[payload.OriginAddress] + 1
c.MissingTokens[payload.OriginAddress] = vaa.EmitterChain
c.MissingTokensCounter[payload.OriginAddress] = c.MissingTokensCounter[payload.OriginAddress] + 1
return "", fmt.Errorf("unknown token: %s %s", payload.OriginChain.String(), payload.OriginAddress.String())
}
@ -165,14 +61,14 @@ func (lp *LineParser) ParseLine(line []byte) (string, error) {
TokenPriceFunc: func(_ domain.Symbol, timestamp time.Time) (decimal.Decimal, error) {
// fetch the historic price from cache
price, err := lp.PriceCache.GetPriceByTime(tokenMetadata.CoingeckoID, timestamp)
price, err := c.PriceCache.GetPriceByTime(tokenMetadata.CoingeckoID, timestamp)
if err != nil {
return decimal.NewFromInt(0), err
}
return price, nil
},
Metrics: lp.Metrics,
Metrics: c.Metrics,
}
var err error

View File

@ -0,0 +1,124 @@
package metrics
import (
"bufio"
"encoding/hex"
"fmt"
"io"
"os"
"strings"
"github.com/wormhole-foundation/wormhole-explorer/analytics/prices"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"go.uber.org/zap"
)
type LineParser struct {
converter *VaaConverter
}
// read a csv file with VAAs and convert into a decoded csv file
// ready to upload to the database
func RunVaaVolumeFromFile(inputFile, outputFile, pricesFile string) {
// build logger
logger := logger.New("wormhole-explorer-analytics")
logger.Info("starting wormhole-explorer-analytics ...")
// open input file
f, err := os.Open(inputFile)
if err != nil {
logger.Fatal("opening input file", zap.Error(err))
}
defer f.Close()
// create missing tokens file
missingTokensFile := "missing_tokens.csv"
fmissingTokens, err := os.Create(missingTokensFile)
if err != nil {
logger.Fatal("creating missing tokens file", zap.Error(err))
}
defer fmissingTokens.Close()
//open output file for writing
fout, err := os.Create(outputFile)
if err != nil {
logger.Fatal("creating output file", zap.Error(err))
}
defer fout.Close()
// init price cache!
logger.Info("loading historical prices...")
priceCache := prices.NewCoinPricesCache(pricesFile)
priceCache.InitCache()
converter := NewVaaConverter(priceCache)
lp := NewLineParser(converter)
logger.Info("loaded historical prices")
r := bufio.NewReader(f)
c := 0
i := 0
// read file line by line and send to workpool
for {
line, _, err := r.ReadLine() //loading chunk into buffer
if err != nil {
if err == io.EOF {
break
}
logger.Fatal("a real error happened here", zap.Error(err))
}
nl, err := lp.ParseLine(line)
if err != nil {
//fmt.Printf(",")
} else {
c++
fout.Write([]byte(nl))
if c == 10000 {
fmt.Printf(".")
c = 0
i := i + 1
if i == 10 {
fmt.Printf("\n")
i = 0
}
}
}
}
for k := range converter.MissingTokensCounter {
fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), converter.MissingTokens[k], converter.MissingTokensCounter[k]))
}
logger.Info("missing tokens", zap.Int("count", len(converter.MissingTokens)))
logger.Info("finished wormhole-explorer-analytics")
}
func NewLineParser(converter *VaaConverter) *LineParser {
return &LineParser{
converter: converter,
}
}
// ParseLine takes a CSV line as input, and generates a line protocol entry as output.
//
// The format for InfluxDB line protocol is: vaa,tags fields timestamp
func (lp *LineParser) ParseLine(line []byte) (string, error) {
// Parse the VAA and payload
tt := strings.Split(string(line), ",")
if len(tt) != 2 {
return "", fmt.Errorf("expected line to have two tokens, but has %d: %s", len(tt), line)
}
vaaBytes, err := hex.DecodeString(tt[1])
if err != nil {
return "", fmt.Errorf("error decoding: %v", err)
}
return lp.converter.Convert(vaaBytes)
}

View File

@ -0,0 +1,112 @@
package metrics
import (
"context"
"fmt"
"os"
"time"
"github.com/wormhole-foundation/wormhole-explorer/analytics/prices"
"github.com/wormhole-foundation/wormhole-explorer/common/db"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
"go.uber.org/zap"
)
// read a csv file with VAAs and convert into a decoded csv file
// ready to upload to the database
func RunVaaVolumeFromMongo(mongoUri, mongoDb, outputFile, pricesFile string) {
rootCtx := context.Background()
// build logger
logger := logger.New("wormhole-explorer-analytics")
logger.Info("starting wormhole-explorer-analytics ...")
//setup DB connection
db, err := db.New(rootCtx, logger, mongoUri, mongoDb)
if err != nil {
logger.Fatal("Failed to connect MongoDB", zap.Error(err))
}
// create a new VAA repository
vaaRepository := repository.NewVaaRepository(db.Database, logger)
// create missing tokens file
missingTokensFile := "missing_tokens.csv"
fmissingTokens, err := os.Create(missingTokensFile)
if err != nil {
logger.Fatal("creating missing tokens file", zap.Error(err))
}
defer fmissingTokens.Close()
//open output file for writing
fout, err := os.Create(outputFile)
if err != nil {
logger.Fatal("creating output file", zap.Error(err))
}
defer fout.Close()
// init price cache!
logger.Info("loading historical prices...")
priceCache := prices.NewCoinPricesCache(pricesFile)
priceCache.InitCache()
converter := NewVaaConverter(priceCache)
logger.Info("loaded historical prices")
endTime := time.Now()
startTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC)
// start backfilling
page := int64(0)
c := 0
for {
logger.Info("Processing page", zap.Int64("page", page),
zap.String("start_time", startTime.Format(time.RFC3339)),
zap.String("end_time", endTime.Format(time.RFC3339)))
vaas, err := vaaRepository.FindPageByTimeRange(rootCtx, startTime, endTime, page, 1000, true)
if err != nil {
logger.Error("Failed to get vaas", zap.Error(err))
break
}
if len(vaas) == 0 {
logger.Info("Empty page", zap.Int64("page", page))
break
}
for i, v := range vaas {
logger.Debug("Processing vaa", zap.String("id", v.ID))
nl, err := converter.Convert(v.Vaa)
if err != nil {
//fmt.Printf(",")
} else {
c++
fout.Write([]byte(nl))
if c == 10000 {
fmt.Printf(".")
c = 0
i := i + 1
if i == 10 {
fmt.Printf("\n")
i = 0
}
}
}
}
page++
}
logger.Info("Closing database connections ...")
db.Close()
for k := range converter.MissingTokensCounter {
fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), converter.MissingTokens[k], converter.MissingTokensCounter[k]))
}
logger.Info("missing tokens", zap.Int("count", len(converter.MissingTokens)))
logger.Info("finished wormhole-explorer-analytics")
}

View File

@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"syscall"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
@ -24,10 +23,10 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/analytics/queue"
wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional"
sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs"
"github.com/wormhole-foundation/wormhole-explorer/common/db"
health "github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
@ -58,7 +57,7 @@ func Run() {
// setup DB connection
logger.Info("connecting to MongoDB...")
db, err := NewDatabase(rootCtx, logger, config.MongodbURI, config.MongodbDatabase)
db, err := db.New(rootCtx, logger, config.MongodbURI, config.MongodbDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
@ -220,36 +219,3 @@ func newNotionalCache(
return notionalCache, nil
}
// Database contains handles to MongoDB.
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// NewDatabase connects to DB and returns a client that will disconnect when the passed in context is cancelled.
func NewDatabase(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
const databaseCloseDeadline = 30 * time.Second
// Close attempts to gracefully Close the database connection.
func (d *Database) Close() error {
ctx, cancelFunc := context.WithDeadline(
context.Background(),
time.Now().Add(databaseCloseDeadline),
)
err := d.client.Disconnect(ctx)
cancelFunc()
return err
}

View File

@ -20,6 +20,7 @@ require (
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8
go.mongodb.org/mongo-driver v1.11.2
go.uber.org/zap v1.24.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
)
require (

View File

@ -706,7 +706,10 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw=
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

31
common/db/db.go Normal file
View File

@ -0,0 +1,31 @@
package db
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// Database definition.
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// New connects to DB and returns a client that will disconnect when the passed in context is cancelled.
func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
// Close closes the database connections.
func (d *Database) Close() error {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
return d.client.Disconnect(ctx)
}

63
common/repository/vaa.go Normal file
View File

@ -0,0 +1,63 @@
package repository
import (
"context"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// VaaRepository is a repository for VAA.
type VaaRepository struct {
db *mongo.Database
logger *zap.Logger
vaas *mongo.Collection
}
// VaaDoc is a document for VAA.
type VaaDoc struct {
ID string `bson:"_id" json:"id"`
Vaa []byte `bson:"vaas" json:"vaa"`
}
// NewVaaRepository create a new Vaa repository.
func NewVaaRepository(db *mongo.Database, logger *zap.Logger) *VaaRepository {
return &VaaRepository{db: db,
logger: logger.With(zap.String("module", "VaaRepository")),
vaas: db.Collection("vaas"),
}
}
// FindById finds VAA by id.
func (r *VaaRepository) FindById(ctx context.Context, id string) (*VaaDoc, error) {
var vaaDoc VaaDoc
err := r.vaas.FindOne(ctx, bson.M{"_id": id}).Decode(&vaaDoc)
return &vaaDoc, err
}
// FindPageByTimeRange finds VAA by time range.
func (r *VaaRepository) FindPageByTimeRange(ctx context.Context, startTime time.Time, endTime time.Time, page, pageSize int64, sortAsc bool) ([]*VaaDoc, error) {
filter := bson.M{
"timestamp": bson.M{
"$gte": startTime,
"$lt": endTime,
},
}
sort := -1
if sortAsc {
sort = 1
}
skip := page * pageSize
opts := &options.FindOptions{Skip: &skip, Limit: &pageSize, Sort: bson.M{"timestamp": sort}}
cur, err := r.vaas.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
var vaas []*VaaDoc
err = cur.All(ctx, &vaas)
return vaas, err
}