Add top-symbols-by-volume endpoint and influx tasks (#887)

This commit is contained in:
ftocal 2023-12-11 14:32:25 -03:00 committed by GitHub
parent 6edd3c4711
commit cff86e4469
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 502 additions and 2 deletions

View File

@ -0,0 +1,35 @@
import "date"
import "influxdata/influxdb/schema"
import "json"
option task = {
name: "assets by symbol for 15 days with 3-hour granularity",
every: 3h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-24hours"
measurement = "assets_by_symbol_15_days_3h_v2"
start = date.truncate(t: -15d, unit: 24h)
execution = date.truncate(t: now(), unit: 1h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and (r._field == "symbol" or r._field == "volume"))
|> schema.fieldsAsCols()
|> filter(fn: (r) => r.symbol != "")
|> map(fn: (r) => ({r with _value: r.volume}))
|> group(columns: ["symbol","emitter_chain", "token_address", "token_chain"])
|> reduce(
fn: (r, accumulator) => ({
volume: accumulator.volume + r._value,
count: accumulator.count + 1,
}),
identity: {volume: uint(v: 0), count: 0}
)
|> group()
|> map(fn: (r) => ({r with _time: execution, _field: "txs_volume", _value: string(v: json.encode(v: {"txs": r.count, "volume": r.volume}))}))
|> drop(columns: ["volume", "count"])
|> set(key: "_measurement", value: measurement)
|> to(bucket: destinationBucket)

View File

@ -0,0 +1,35 @@
import "date"
import "influxdata/influxdb/schema"
import "json"
option task = {
name: "assets by symbol for 30 days with 3-hour granularity",
every: 3h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-24hours"
measurement = "assets_by_symbol_30_days_3h_v2"
start = date.truncate(t: -30d, unit: 24h)
execution = date.truncate(t: now(), unit: 1h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and (r._field == "symbol" or r._field == "volume"))
|> schema.fieldsAsCols()
|> filter(fn: (r) => r.symbol != "")
|> map(fn: (r) => ({r with _value: r.volume}))
|> group(columns: ["symbol","emitter_chain", "token_address", "token_chain"])
|> reduce(
fn: (r, accumulator) => ({
volume: accumulator.volume + r._value,
count: accumulator.count + 1,
}),
identity: {volume: uint(v: 0), count: 0}
)
|> group()
|> map(fn: (r) => ({r with _time: execution, _field: "txs_volume", _value: string(v: json.encode(v: {"txs": r.count, "volume": r.volume}))}))
|> drop(columns: ["volume", "count"])
|> set(key: "_measurement", value: measurement)
|> to(bucket: destinationBucket)

View File

@ -0,0 +1,35 @@
import "date"
import "influxdata/influxdb/schema"
import "json"
option task = {
name: "assets by symbol for 7 days with 3-hour granularity",
every: 3h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-24hours"
measurement = "assets_by_symbol_7_days_3h_v2"
start = date.truncate(t: -7d, unit: 24h)
execution = date.truncate(t: now(), unit: 1h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and (r._field == "symbol" or r._field == "volume"))
|> schema.fieldsAsCols()
|> filter(fn: (r) => r.symbol != "")
|> map(fn: (r) => ({r with _value: r.volume}))
|> group(columns: ["symbol","emitter_chain", "token_address", "token_chain"])
|> reduce(
fn: (r, accumulator) => ({
volume: accumulator.volume + r._value,
count: accumulator.count + 1,
}),
identity: {volume: uint(v: 0), count: 0}
)
|> group()
|> map(fn: (r) => ({r with _time: execution, _field: "txs_volume", _value: string(v: json.encode(v: {"txs": r.count, "volume": r.volume}))}))
|> drop(columns: ["volume", "count"])
|> set(key: "_measurement", value: measurement)
|> to(bucket: destinationBucket)

View File

@ -0,0 +1,19 @@
package stats
import (
"fmt"
"time"
)
const queryTemplateSymbolWithAssets = `
from(bucket: "%s")
|> range(start: %s)
|> filter(fn: (r) => r._measurement == "%s" and r._field == "txs_volume")
|> last()
|> group()
`
func buildSymbolWithAssets(bucket string, t time.Time, measurement string) string {
start := t.Truncate(time.Hour * 24).Format(time.RFC3339Nano)
return fmt.Sprintf(queryTemplateSymbolWithAssets, bucket, start, measurement)
}

View File

@ -0,0 +1,130 @@
package stats
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/mitchellh/mapstructure"
"github.com/shopspring/decimal"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type Repository struct {
influxCli influxdb2.Client
queryAPI api.QueryAPI
bucket24HoursRetention string
logger *zap.Logger
}
func NewRepository(
client influxdb2.Client,
org string,
bucket24HoursRetention string,
logger *zap.Logger,
) *Repository {
r := Repository{
influxCli: client,
queryAPI: client.QueryAPI(org),
bucket24HoursRetention: bucket24HoursRetention,
logger: logger,
}
return &r
}
func (r *Repository) GetSymbolWithAssets(ctx context.Context, timeSpan SymbolWithAssetsTimeSpan) ([]SymbolWithAssetDTO, error) {
var measurement string
switch timeSpan {
case TimeSpan7Days:
measurement = "assets_by_symbol_7_days_3h_v2"
case TimeSpan15Days:
measurement = "assets_by_symbol_15_days_3h_v2"
case TimeSpan30Days:
measurement = "assets_by_symbol_30_days_3h_v2"
default:
measurement = "assets_by_symbol_7_days_3h_v2"
}
query := buildSymbolWithAssets(r.bucket24HoursRetention, time.Now(), measurement)
result, err := r.queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
if result.Err() != nil {
return nil, result.Err()
}
// Scan query results
type Row struct {
Symbol string `mapstructure:"symbol"`
EmitterChain string `mapstructure:"emitter_chain"`
TokenChain string `mapstructure:"token_chain"`
TokenAddress string `mapstructure:"token_address"`
JsonValue string `mapstructure:"_value"`
}
type TxsVolume struct {
Txs decimal.Decimal
Volume decimal.Decimal
}
var rows []Row
for result.Next() {
var row Row
if err := mapstructure.Decode(result.Record().Values(), &row); err != nil {
return nil, err
}
rows = append(rows, row)
}
divisor := decimal.NewFromInt(1_0000_0000)
// Convert the rows into the response model
var values []SymbolWithAssetDTO
for _, row := range rows {
// parse emitter chain
emitterChain, err := strconv.ParseUint(row.EmitterChain, 10, 16)
if err != nil {
return nil, fmt.Errorf("failed to convert emitter chain field to uint16. %v", err)
}
// parse token chain
tokenChain, err := strconv.ParseUint(row.TokenChain, 10, 16)
if err != nil {
return nil, fmt.Errorf("failed to convert token chain field to uint16. %v", err)
}
// parse the json value
var txsVolume TxsVolume
if err := json.Unmarshal([]byte(row.JsonValue), &txsVolume); err != nil {
return nil, fmt.Errorf("failed to convert _value to struct. %v", err)
}
// append the new item to the response
value := SymbolWithAssetDTO{
Symbol: row.Symbol,
EmitterChainID: sdk.ChainID(emitterChain),
TokenChainID: sdk.ChainID(tokenChain),
TokenAddress: row.TokenAddress,
Volume: txsVolume.Volume.Div(divisor),
Txs: txsVolume.Txs,
}
// do not include invalid chain IDs in the response
if !domain.ChainIdIsValid(value.EmitterChainID) {
continue
}
values = append(values, value)
}
return values, nil
}

View File

@ -0,0 +1,23 @@
package stats
import (
"context"
"testing"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)
func Test_convertToDecimal(t *testing.T) {
url := "https://us-east-1-1.aws.cloud2.influxdata.com"
token := "FQ14tMrjuumxGGPlCIQvWfX_JDLUPJDOaTXKH_t3pHNDIvN13rbbmlG0JuuWvqo15Gw_qEjRqaeZ-BnCf0VaXA=="
cli := influxdb2.NewClient(url, token)
logger := zap.NewExample()
ctx := context.Background()
repo := NewRepository(cli, "xlabs", "wormscan-24hours-mainnet-staging", logger)
result, err := repo.GetSymbolWithAssets(ctx, TimeSpan30Days)
assert.NoError(t, err)
assert.NotNil(t, result)
}

View File

@ -0,0 +1,36 @@
package stats
import (
"context"
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/api/cacheable"
"github.com/wormhole-foundation/wormhole-explorer/common/client/cache"
"go.uber.org/zap"
)
type Service struct {
repo *Repository
cache cache.Cache
expiration time.Duration
logger *zap.Logger
}
const (
topSymbolsByVolumeKey = "wormscan:top-assets-symbol-by-volume"
)
// NewService create a new Service.
func NewService(repo *Repository, cache cache.Cache, expiration time.Duration, logger *zap.Logger) *Service {
return &Service{repo: repo, cache: cache, expiration: expiration, logger: logger.With(zap.String("module", "StatsService"))}
}
func (s *Service) GetSymbolWithAssets(ctx context.Context, ts SymbolWithAssetsTimeSpan) ([]SymbolWithAssetDTO, error) {
key := topSymbolsByVolumeKey
key = fmt.Sprintf("%s:%s", key, ts)
return cacheable.GetOrLoad(ctx, s.logger, s.cache, s.expiration, key,
func() ([]SymbolWithAssetDTO, error) {
return s.repo.GetSymbolWithAssets(ctx, ts)
})
}

View File

@ -0,0 +1,40 @@
package stats
import (
"fmt"
"github.com/shopspring/decimal"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// SymbolWithAssetsTimeSpan is used as an input parameter for the functions `GetTopAssets` and `GetTopChainPairs`.
type SymbolWithAssetsTimeSpan string
const (
TimeSpan7Days SymbolWithAssetsTimeSpan = "7d"
TimeSpan15Days SymbolWithAssetsTimeSpan = "15d"
TimeSpan30Days SymbolWithAssetsTimeSpan = "30d"
)
// ParseSymbolsWithAssetsTimeSpan parses a string and returns a `SymbolsWithAssetsTimeSpan`.
func ParseSymbolsWithAssetsTimeSpan(s string) (*SymbolWithAssetsTimeSpan, error) {
if s == string(TimeSpan7Days) ||
s == string(TimeSpan15Days) ||
s == string(TimeSpan30Days) {
tmp := SymbolWithAssetsTimeSpan(s)
return &tmp, nil
}
return nil, fmt.Errorf("invalid time span: %s", s)
}
type SymbolWithAssetDTO struct {
Symbol string
EmitterChainID sdk.ChainID
TokenChainID sdk.ChainID
TokenAddress string
Volume decimal.Decimal
Txs decimal.Decimal
}

View File

@ -31,6 +31,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/observations"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/operations"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/relays"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/stats"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/config"
@ -154,21 +155,24 @@ func main() {
)
relaysRepo := relays.NewRepository(db.Database, rootLogger)
operationsRepo := operations.NewRepository(db.Database, rootLogger)
statsRepo := stats.NewRepository(influxCli, cfg.Influx.Organization, cfg.Influx.Bucket24Hours, rootLogger)
// create token provider
tokenProvider := domain.NewTokenProvider(cfg.P2pNetwork)
// Set up services
rootLogger.Info("initializing services")
expirationTime := time.Duration(cfg.Cache.MetricExpiration) * time.Second
addressService := address.NewService(addressRepo, rootLogger)
vaaService := vaa.NewService(vaaRepo, cache.Get, vaaParserFunc, rootLogger)
obsService := observations.NewService(obsRepo, rootLogger)
governorService := governor.NewService(governorRepo, rootLogger)
infrastructureService := infrastructure.NewService(infrastructureRepo, rootLogger)
heartbeatsService := heartbeats.NewService(heartbeatsRepo, rootLogger)
transactionsService := transactions.NewService(transactionsRepo, cache, time.Duration(cfg.Cache.MetricExpiration)*time.Second, tokenProvider, rootLogger)
transactionsService := transactions.NewService(transactionsRepo, cache, expirationTime, tokenProvider, rootLogger)
relaysService := relays.NewService(relaysRepo, rootLogger)
operationsService := operations.NewService(operationsRepo, rootLogger)
statsService := stats.NewService(statsRepo, cache, expirationTime, rootLogger)
// Set up a custom error handler
response.SetEnableStackTrace(*cfg)
@ -210,7 +214,7 @@ func main() {
// Set up route handlers
app.Get("/swagger.json", GetSwagger)
wormscan.RegisterRoutes(app, rootLogger, addressService, vaaService, obsService, governorService, infrastructureService, transactionsService, relaysService, operationsService)
wormscan.RegisterRoutes(app, rootLogger, addressService, vaaService, obsService, governorService, infrastructureService, transactionsService, relaysService, operationsService, statsService)
guardian.RegisterRoutes(cfg, app, rootLogger, vaaService, governorService, heartbeatsService)
// Set up gRPC handlers

View File

@ -10,6 +10,7 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/pkg/errors"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/stats"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions"
"github.com/wormhole-foundation/wormhole-explorer/api/response"
"github.com/wormhole-foundation/wormhole-explorer/common/types"
@ -391,3 +392,17 @@ func ExtractTokenAddress(c *fiber.Ctx, l *zap.Logger) (*types.Address, error) {
}
return tokenAddress, nil
}
func ExtractSymbolWithAssetsTimeSpan(ctx *fiber.Ctx) (*stats.SymbolWithAssetsTimeSpan, error) {
defaultTimeSpan := stats.TimeSpan7Days
s := ctx.Query("timeSpan")
if s == "" {
return &defaultTimeSpan, nil
}
timeSpan, err := stats.ParseSymbolsWithAssetsTimeSpan(s)
if err != nil {
return nil, response.NewInvalidQueryParamError(ctx, "INVALID <timeSpan> QUERY PARAMETER", nil)
}
return timeSpan, nil
}

View File

@ -12,6 +12,7 @@ import (
obssvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/observations"
opsvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/operations"
relayssvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/relays"
statssvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/stats"
trxsvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions"
vaasvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/address"
@ -20,6 +21,8 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/observations"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/operations"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/relays"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/stats"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/transactions"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/vaa"
@ -47,6 +50,7 @@ func RegisterRoutes(
transactionsService *trxsvc.Service,
relaysService *relayssvc.Service,
operationsService *opsvc.Service,
statsService *statssvc.Service,
) {
// Set up controllers
@ -58,6 +62,7 @@ func RegisterRoutes(
transactionCtrl := transactions.NewController(transactionsService, rootLogger)
relaysCtrl := relays.NewController(relaysService, rootLogger)
opsCtrl := operations.NewController(operationsService, rootLogger)
statsCrtl := stats.NewController(statsService, rootLogger)
// Set up route handlers
api := app.Group("/api/v1")
@ -82,6 +87,9 @@ func RegisterRoutes(
api.Get("/transactions", transactionCtrl.ListTransactions)
api.Get("/transactions/:chain/:emitter/:sequence", transactionCtrl.GetTransactionByID)
// stats custom endpoints
api.Get("/top-symbols-by-volume", statsCrtl.GetTopSymbolsByVolume)
// operations resource
operations := api.Group("/operations")
operations.Get("/", opsCtrl.FindAll)

View File

@ -0,0 +1,95 @@
package stats
import (
"sort"
"github.com/gofiber/fiber/v2"
"github.com/shopspring/decimal"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/stats"
"github.com/wormhole-foundation/wormhole-explorer/api/middleware"
"go.uber.org/zap"
)
// Controller is the controller for the transactions resource.
type Controller struct {
srv *stats.Service
logger *zap.Logger
}
// NewController create a new controler.
func NewController(statsService *stats.Service, logger *zap.Logger) *Controller {
return &Controller{
srv: statsService,
logger: logger.With(zap.String("module", "StatsController")),
}
}
// GetTopSymbolsByVolume godoc
// @Description Returns a list of symbols by origin chain and tokens.
// @Description The volume is calculated using the notional price of the symbol at the day the VAA was emitted.
// @Tags wormholescan
// @ID top-symbols-by-volume
// @Param timeSpan query string false "Time span, supported values: 7d, 15d and 30d (default is 7d)."
// @Success 200 {object} stats.TopSymbolByVolumeResult
// @Failure 400
// @Failure 500
// @Router /api/v1/top-symbols-by-volume [get]
func (c *Controller) GetTopSymbolsByVolume(ctx *fiber.Ctx) error {
timeSpan, err := middleware.ExtractSymbolWithAssetsTimeSpan(ctx)
if err != nil {
return err
}
// Get the chain activity.
assets, err := c.srv.GetSymbolWithAssets(ctx.Context(), *timeSpan)
if err != nil {
c.logger.Error("Error getting symbol with assets", zap.Error(err))
return err
}
// Convert the result to the expected format.
symbols, err := c.createTopSymbolsByVolumeResult(assets)
if err != nil {
return err
}
return ctx.JSON(TopSymbolByVolumeResult{Symbols: symbols})
}
func (c *Controller) createTopSymbolsByVolumeResult(assets []stats.SymbolWithAssetDTO) ([]*TopSymbolResult, error) {
txByChainID := make(map[string]*TopSymbolResult)
for _, item := range assets {
t, ok := txByChainID[item.Symbol]
if !ok {
tokens := make([]TokenResult, 0)
t = &TopSymbolResult{Symbol: item.Symbol, Volume: decimal.Zero, Txs: decimal.Zero, Tokens: tokens}
}
token := TokenResult{
EmitterChainID: item.EmitterChainID,
TokenChainID: item.TokenChainID,
TokenAddress: item.TokenAddress,
Volume: item.Volume,
Txs: item.Txs}
t.Tokens = append(t.Tokens, token)
t.Volume = t.Volume.Add(item.Volume)
t.Txs = t.Txs.Add(item.Txs)
txByChainID[item.Symbol] = t
}
values := make([]*TopSymbolResult, 0, len(txByChainID))
for _, value := range txByChainID {
values = append(values, value)
}
sort.Slice(values[:], func(i, j int) bool {
return values[i].Volume.GreaterThan(values[j].Volume)
})
if len(values) >= 7 {
return values[:7], nil
}
return values, nil
}

View File

@ -0,0 +1,25 @@
package stats
import (
"github.com/shopspring/decimal"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
)
type TopSymbolResult struct {
Symbol string `json:"symbol"`
Volume decimal.Decimal `json:"volume"`
Txs decimal.Decimal `json:"txs"`
Tokens []TokenResult `json:"tokens"`
}
type TokenResult struct {
EmitterChainID sdk.ChainID `json:"emitter_chain"`
TokenChainID sdk.ChainID `json:"token_chain"`
TokenAddress string `json:"token_address"`
Volume decimal.Decimal `json:"volume"`
Txs decimal.Decimal `json:"txs"`
}
type TopSymbolByVolumeResult struct {
Symbols []*TopSymbolResult `json:"symbols"`
}