[ISSUE-1089] Collect CCTP - Metrics (#1165)

Add cctp and portal_token_bridge stats to protocols-stats endpoint

changes

 indent

more changes on the script

 new working approach on influx task

 indent script

 tested insert

 add 2 versions of the script

multiple changes

 add 1day task

 add logic to retrieve internal protocols

 remove unecessary code

 readd empty script

fix  unit-tests and measurement namings

 fix queries

 fix alignment

rename function task

fix names

improvements on influx task

 add .run config to gitignore

add .run to gitignore

fix task and rename

working api

 multiple things

Delete .run/wormscan api.run.xml

Delete analytics/scripts/test_query.flux

wip

 multiple fixes

 fix test

wip

 fix queries

fix unit-test due to query changes
This commit is contained in:
Mariano 2024-03-11 17:33:36 -03:00 committed by GitHub
parent 2d61225ac2
commit 4009482dcc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 503 additions and 133 deletions

3
.gitignore vendored
View File

@ -16,4 +16,5 @@ wormholeTxs.json
serviceAccountKey.json
bigtableAccountKey.json
tsconfig.tsbuildinfo
serviceAccount.json
serviceAccount.json
.run

View File

@ -0,0 +1,56 @@
import "date"
calculateProtocolStats = (protocol,protocolVersion,taskCfg) => {
totalValueTransferred = from(bucket: taskCfg.sourceBucket)
|> range(start: taskCfg.since, stop:taskCfg.ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume" and r._value > 0)
|> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"])
|> group()
|> sum()
|> map(fn: (r) => ({r with _time: time(v:taskCfg.since)}))
|> set(key: "_field", value: "total_value_transferred")
totalMessages = from(bucket: taskCfg.sourceBucket)
|> range(start: taskCfg.since, stop:taskCfg.ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume")
|> group()
|> count()
|> map(fn: (r) => ({r with _time: time(v:taskCfg.since)}))
|> set(key: "_field", value: "total_messages")
return union(tables:[totalMessages,totalValueTransferred])
|> set(key: "app_id", value: protocol)
|> set(key: "version", value: protocolVersion)
|> set(key: "_measurement", value: taskCfg.destMeasurement)
|> map(fn: (r) => ({r with _time: time(v:taskCfg.since)}))
|>to(bucket: taskCfg.destBucket)
}
ts = date.truncate(t: now(), unit: 1d)
bucketInfinite = "wormscan"
bucket30d = "wormscan-30days"
cfg = {
sourceBucket:bucketInfinite,
destBucket:bucketInfinite,
destMeasurement:"core_protocols_stats_1d",
since: date.sub(d: 1d, from: ts),
ts:ts,
}
// Set this variable with the cfg of the desired task
option task = {
name: "cctp and portal_token_bridge metrics every day",
every: 1d,
}
calculateProtocolStats(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",taskCfg:cfg)
calculateProtocolStats(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",taskCfg:cfg)

View File

@ -0,0 +1,50 @@
import "date"
option task = {
name: "cctp and portal_token_bridge metrics every hour",
every: 1h,
}
calculateLastHourMetrics = (protocol,protocolVersion,ts) => {
since = date.sub(d: 1h, from: ts)
sourceBucket = "wormscan"
destMeasurement = "core_protocols_stats_1h"
bucket30d = "wormscan-30days"
totalValueTransferred = from(bucket: sourceBucket)
|> range(start: since, stop:ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume" and r._value > 0)
|> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"])
|> group()
|> sum()
|> map(fn: (r) => ({r with _time: since}))
|> set(key: "_field", value: "total_value_transferred")
totalMessages = from(bucket: sourceBucket)
|> range(start: since, stop:ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume")
|> group()
|> count()
|> map(fn: (r) => ({r with _time: since}))
|> set(key: "_field", value: "total_messages")
return union(tables:[totalMessages,totalValueTransferred]) // if nothing happened during the last hour then union will result in empty and no point will be added.
|> set(key: "app_id", value: protocol)
|> set(key: "version", value: protocolVersion)
|> set(key: "_measurement", value: destMeasurement)
|> map(fn: (r) => ({r with _time: since}))
|>to(bucket: bucket30d)
}
ts = date.truncate(t: now(), unit: 1h)
// execute function for CCTP_WORMHOLE_INTEGRATION
calculateLastHourMetrics(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",ts:ts)
// execute function for PORTAL_TOKEN_BRIDGE
calculateLastHourMetrics(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",ts:ts)

View File

@ -28,20 +28,117 @@ from(bucket: "%s")
const QueryTemplateActivityLatestPoint = `
from(bucket: "%s")
|> range(start: -1d)
|> range(start: 1970-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s")
|> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"])
|> last()
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
`
// QueryIntProtocolsTotalStartOfDay Query template for internal protocols (cctp and portal_token_bridge) to fetch total values till the start of current day
const QueryIntProtocolsTotalStartOfDay = `
import "date"
import "types"
startOfCurrentDay = date.truncate(t: now(), unit: 1d)
data = from(bucket: "%s")
|> range(start: 1970-01-01T00:00:00Z,stop:startOfCurrentDay)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")
union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:startOfCurrentDay))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`
// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day
const QueryIntProtocolsDeltaSinceStartOfDay = `
import "date"
import "types"
ts = date.truncate(t: now(), unit: 1h)
startOfDay = date.truncate(t: now(), unit: 1d)
data = from(bucket: "%s")
|> range(start: startOfDay,stop:ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")
union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:startOfDay))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`
// QueryIntProtocolsDeltaLastDay calculate last day delta
const QueryIntProtocolsDeltaLastDay = `
import "date"
import "types"
ts = date.truncate(t: now(), unit: 1h)
yesterday = date.sub(d: 1d, from: ts)
data = from(bucket: "%s")
|> range(start: yesterday,stop:ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")
union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:yesterday))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`
type Repository struct {
queryAPI QueryDoer
logger *zap.Logger
statsBucket string
activityBucket string
statsVersion string
activityVersion string
queryAPI QueryDoer
logger *zap.Logger
bucketInfinite string
bucket30d string
statsVersion string
activityVersion string
intProtocolMeasurement map[string]struct {
Daily string
Hourly string
}
}
type rowStat struct {
@ -50,6 +147,17 @@ type rowStat struct {
TotalValueLocked float64 `mapstructure:"total_value_locked"`
}
type intRowStat struct {
Protocol string `mapstructure:"app_id"`
TotalMessages uint64 `mapstructure:"total_messages"`
TotalValueTransferred uint64 `mapstructure:"total_value_transferred"`
}
type intStats struct {
Latest intRowStat
DeltaLast24hr intRowStat
}
type rowActivity struct {
Protocol string `mapstructure:"protocol"`
DestinationChainId string `mapstructure:"destination_chain_id"`
@ -85,14 +193,21 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer {
return &queryApiWrapper{qApi: qApi}
}
func NewRepository(qApi QueryDoer, statsBucket, activityBucket, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
return &Repository{
queryAPI: qApi,
statsBucket: statsBucket,
activityBucket: activityBucket,
bucketInfinite: bucketInfinite,
bucket30d: bucket30d,
statsVersion: statsVersion,
activityVersion: activityVersion,
logger: logger,
intProtocolMeasurement: map[string]struct {
Daily string
Hourly string
}{
CCTP: {Daily: dbconsts.CctpStatsMeasurementDaily, Hourly: dbconsts.CctpStatsMeasurementHourly},
PortalTokenBridge: {Daily: dbconsts.TokenBridgeStatsMeasurementDaily, Hourly: dbconsts.TokenBridgeStatsMeasurementHourly},
},
}
}
@ -101,40 +216,81 @@ func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult,
}
// returns latest and last 24 hr stats for a given protocol
func (r *Repository) getProtocolStats(ctx context.Context, contributor string) (stats, error) {
func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (stats, error) {
// fetch latest stat
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLatestPoint, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q := buildQuery(QueryTemplateLatestPoint, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
if err != nil {
return stats{}, err
}
// fetch last 24 hr stat
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLast24Point, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q = buildQuery(QueryTemplateLast24Point, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
return stats{
Latest: latest,
Last24: last24hr,
}, err
}
func (r *Repository) getProtocolActivity(ctx context.Context, contributor string) (rowActivity, error) {
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, r.activityBucket, QueryTemplateActivityLatestPoint, dbconsts.ProtocolsActivityMeasurement, contributor, r.activityVersion)
func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (rowActivity, error) {
q := buildQuery(QueryTemplateActivityLatestPoint, r.bucket30d, dbconsts.ProtocolsActivityMeasurement, protocol, r.activityVersion)
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol)
}
func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, bucket, queryTemplate, measurement, contributor, version string) (T, error) {
var res T
q := buildQuery(queryTemplate, bucket, measurement, contributor, version)
result, err := queryAPI.Query(ctx, q)
// returns latest and last 24 hr for internal protocols (cctp and portal_token_bridge)
func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) {
// calculate total values till the start of current day
totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol, protocol)
totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol)
if err != nil {
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", contributor), zap.String("query", q))
return intStats{}, err
}
// calculate delta since the beginning of current day
q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol)
currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol)
if errCD != nil {
return intStats{}, errCD
}
latestTotal := intRowStat{
Protocol: protocol,
TotalMessages: totalsUntilToday.TotalMessages + currentDayStats.TotalMessages,
TotalValueTransferred: totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred,
}
result := intStats{
Latest: latestTotal,
}
// calculate last day delta
q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol)
deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol)
if errQ3 != nil {
return result, errQ3
}
result.DeltaLast24hr = deltaYesterdayStats
return result, nil
}
func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, query, protocol string) (T, error) {
var res T
result, err := queryAPI.Query(ctx, query)
if err != nil {
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", protocol), zap.String("query", query))
return res, err
}
defer result.Close()
if !result.Next() {
if result.Err() != nil {
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", contributor), zap.String("query", q))
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", protocol), zap.String("query", query))
return res, result.Err()
}
logger.Info("empty query response", zap.String("protocol", contributor), zap.String("query", q))
logger.Info("empty query response", zap.String("protocol", protocol), zap.String("query", query))
return res, err
}

View File

@ -2,7 +2,8 @@ package protocols
import (
"context"
"encoding/json"
"github.com/wormhole-foundation/wormhole-explorer/api/cacheable"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/common/client/cache"
"go.uber.org/zap"
"strconv"
@ -11,16 +12,27 @@ import (
"time"
)
const CCTP = "CCTP_WORMHOLE_INTEGRATION"
const PortalTokenBridge = "PORTAL_TOKEN_BRIDGE"
type Service struct {
Protocols []string
repo *Repository
logger *zap.Logger
intProtocols []string
cache cache.Cache
cacheKeyPrefix string
cacheTTL int
metrics metrics.Metrics
tvl tvlProvider
}
type ProtocolTotalValuesDTO struct {
ProtocolStats
Error string `json:"error,omitempty"`
}
type ProtocolStats struct {
Protocol string `json:"protocol"`
TotalMessages uint64 `json:"total_messages"`
TotalValueLocked float64 `json:"total_value_locked,omitempty"`
@ -28,53 +40,128 @@ type ProtocolTotalValuesDTO struct {
TotalValueTransferred float64 `json:"total_value_transferred,omitempty"`
LastDayMessages uint64 `json:"last_day_messages,omitempty"`
LastDayDiffPercentage string `json:"last_day_diff_percentage,omitempty"`
Error string `json:"error,omitempty"`
}
func NewService(protocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int) *Service {
type tvlProvider interface {
Get(ctx context.Context) (string, error)
}
func NewService(extProtocols, intProtocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int, metrics metrics.Metrics, tvlProvider tvlProvider) *Service {
return &Service{
Protocols: protocols,
Protocols: extProtocols,
repo: repo,
logger: logger,
intProtocols: intProtocols,
cache: cache,
cacheKeyPrefix: cacheKeyPrefix,
cacheTTL: cacheTTL,
metrics: metrics,
tvl: tvlProvider,
}
}
func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalValuesDTO {
wg := &sync.WaitGroup{}
wg.Add(len(s.Protocols))
results := make(chan ProtocolTotalValuesDTO, len(s.Protocols))
totalProtocols := len(s.Protocols) + len(s.intProtocols)
wg.Add(totalProtocols)
results := make(chan ProtocolTotalValuesDTO, totalProtocols)
for i := range s.Protocols {
go s.getProtocolTotalValues(ctx, wg, s.Protocols[i], results)
for _, p := range s.Protocols {
go s.fetchProtocolValues(ctx, wg, p, results, s.getProtocolStats)
}
for _, p := range s.intProtocols {
go s.fetchProtocolValues(ctx, wg, p, results, s.getIntProtocolStats)
}
wg.Wait()
close(results)
resultsSlice := make([]ProtocolTotalValuesDTO, 0, len(s.Protocols))
for r := range results {
r.Protocol = getProtocolNameDto(r.Protocol)
resultsSlice = append(resultsSlice, r)
}
return resultsSlice
}
func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup, protocol string, results chan<- ProtocolTotalValuesDTO) {
func getProtocolNameDto(protocol string) string {
switch protocol {
case CCTP:
return "cctp"
case PortalTokenBridge:
return "portal_token_bridge"
default:
return protocol
}
}
func (s *Service) fetchProtocolValues(ctx context.Context, wg *sync.WaitGroup, protocol string, results chan<- ProtocolTotalValuesDTO, fetch func(context.Context, string) (ProtocolStats, error)) {
defer wg.Done()
cacheKey := s.cacheKeyPrefix + ":" + strings.ToUpper(protocol)
cachedValue, errCache := s.cache.Get(ctx, cacheKey)
if errCache == nil {
var val ProtocolTotalValuesDTO
errCacheUnmarshall := json.Unmarshal([]byte(cachedValue), &val)
if errCacheUnmarshall == nil {
results <- val
return
}
s.logger.Error("error unmarshalling cache value", zap.Error(errCacheUnmarshall), zap.String("cache_key", cacheKey))
val, err := cacheable.GetOrLoad[ProtocolStats](ctx,
s.logger,
s.cache,
time.Duration(s.cacheTTL)*time.Minute,
s.cacheKeyPrefix+":"+strings.ToUpper(protocol),
s.metrics,
func() (ProtocolStats, error) {
return fetch(ctx, protocol)
},
)
res := ProtocolTotalValuesDTO{
ProtocolStats: val,
}
if err != nil {
res.Error = err.Error()
}
results <- res
}
// getProtocolStats fetches stats for CCTP and PortalTokenBridge
func (s *Service) getIntProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) {
protocolStats, err := s.repo.getInternalProtocolStats(ctx, protocol)
if err != nil {
return ProtocolStats{
Protocol: protocol,
TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8,
TotalMessages: protocolStats.Latest.TotalMessages,
}, err
}
diffLastDay := protocolStats.DeltaLast24hr.TotalMessages
val := ProtocolStats{
Protocol: protocol,
TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8,
TotalMessages: protocolStats.Latest.TotalMessages,
LastDayMessages: diffLastDay,
}
lastDayTotalMessages := protocolStats.Latest.TotalMessages - diffLastDay
if lastDayTotalMessages != 0 {
percentage := strconv.FormatFloat(float64(diffLastDay)/float64(lastDayTotalMessages)*100, 'f', 2, 64) + "%"
val.LastDayDiffPercentage = percentage
}
if CCTP == protocol {
tvl, errTvl := s.tvl.Get(ctx)
if errTvl != nil {
s.logger.Error("error fetching tvl", zap.Error(errTvl), zap.String("protocol", protocol))
return val, errTvl
}
tvlFloat, errTvl := strconv.ParseFloat(tvl, 64)
if errTvl != nil {
s.logger.Error("error parsing tvl value", zap.Error(errTvl), zap.String("protocol", protocol), zap.String("tvl_str", tvl))
return val, errTvl
}
val.TotalValueLocked = tvlFloat
}
return val, nil
}
func (s *Service) getProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) {
type statsResult struct {
result stats
@ -90,18 +177,17 @@ func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup
activity, err := s.repo.getProtocolActivity(ctx, protocol)
if err != nil {
s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", protocol))
results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: err.Error()}
return
return ProtocolStats{Protocol: protocol}, err
}
rStats := <-statsRes
if rStats.Err != nil {
s.logger.Error("error fetching protocol stats", zap.Error(rStats.Err), zap.String("protocol", protocol))
results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: rStats.Err.Error()}
return
return ProtocolStats{Protocol: protocol}, rStats.Err
}
dto := ProtocolTotalValuesDTO{
dto := ProtocolStats{
Protocol: protocol,
TotalValueLocked: rStats.result.Latest.TotalValueLocked,
TotalMessages: rStats.result.Latest.TotalMessages,
@ -117,11 +203,5 @@ func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup
dto.LastDayDiffPercentage = strconv.FormatFloat(float64(last24HrMessages)/float64(totalMessagesAsFromLast24hr)*100, 'f', 2, 64) + "%"
}
dtoJson, _ := json.Marshal(dto) // don't handle error since the full lifecycle of the dto is under this scope
errCache = s.cache.Set(ctx, cacheKey, string(dtoJson), time.Duration(s.cacheTTL)*time.Minute)
if errCache != nil {
s.logger.Error("error setting cache", zap.Error(errCache), zap.String("cache_key", cacheKey))
}
results <- dto
return dto, nil
}

View File

@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/test-go/testify/mock"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/protocols"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/common/client/cache"
cacheMock "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/mock"
"github.com/wormhole-foundation/wormhole-explorer/common/dbconsts"
@ -52,14 +53,14 @@ func TestService_GetProtocolsTotalValues(t *testing.T) {
ctx := context.Background()
queryAPI := &mockQueryAPI{}
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil)
activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1")
activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1")
queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil)
repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0)
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{})
values := service.GetProtocolsTotalValues(ctx)
assert.Equal(t, 1, len(values))
@ -97,14 +98,14 @@ func TestService_GetProtocolsTotalValues_FailedFetchingActivity(t *testing.T) {
ctx := context.Background()
queryAPI := &mockQueryAPI{}
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil)
activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1")
activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1")
queryAPI.On("Query", ctx, activityQuery).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_activity_error"))
repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0)
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{})
values := service.GetProtocolsTotalValues(ctx)
assert.Equal(t, 1, len(values))
@ -139,14 +140,14 @@ func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) {
ctx := context.Background()
queryAPI := &mockQueryAPI{}
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_stats_error"))
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_stats_error"))
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil)
activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1")
activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1")
queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, errNil)
repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0)
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{})
values := service.GetProtocolsTotalValues(ctx)
assert.Equal(t, 1, len(values))
@ -160,8 +161,9 @@ func TestService_GetProtocolsTotalValues_CacheHit(t *testing.T) {
mockCache := &cacheMock.CacheMock{}
var cacheErr error
cacheErr = nil
mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return(`{"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":4,"last_day_diff_percentage":"75.00%"}`, cacheErr)
service := protocols.NewService([]string{"protocol1"}, nil, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 0)
cachedValue := fmt.Sprintf(`{"result": {"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":4,"last_day_diff_percentage":"75.00%%"},"timestamp":"%s"}`, time.Now().Format(time.RFC3339))
mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return(cachedValue, cacheErr)
service := protocols.NewService([]string{"protocol1"}, nil, nil, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 60, metrics.NewNoOpMetrics(), &mockTvl{})
values := service.GetProtocolsTotalValues(ctx)
assert.Equal(t, 1, len(values))
assert.Equal(t, "protocol1", values[0].Protocol)
@ -174,71 +176,63 @@ func TestService_GetProtocolsTotalValues_CacheHit(t *testing.T) {
}
func TestService_GetProtocolsTotalValues_CacheMiss_FetchAndUpdate(t *testing.T) {
func TestService_GetCCTP_Stats(t *testing.T) {
var errNil error
totalStartOfCurrentDay := &mockQueryTableResult{}
totalStartOfCurrentDay.On("Next").Return(true)
totalStartOfCurrentDay.On("Err").Return(errNil)
totalStartOfCurrentDay.On("Close").Return(errNil)
totalStartOfCurrentDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{
"app_id": protocols.CCTP,
"total_messages": uint64(50),
"total_value_transferred": 4e8,
}))
deltaSinceStartOfDay := &mockQueryTableResult{}
deltaSinceStartOfDay.On("Next").Return(true)
deltaSinceStartOfDay.On("Err").Return(errNil)
deltaSinceStartOfDay.On("Close").Return(errNil)
deltaSinceStartOfDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{
"app_id": protocols.CCTP,
"total_messages": uint64(6),
"total_value_transferred": 2e8,
}))
deltaLastDay := &mockQueryTableResult{}
deltaLastDay.On("Next").Return(true)
deltaLastDay.On("Err").Return(errNil)
deltaLastDay.On("Close").Return(errNil)
deltaLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{
"app_id": protocols.CCTP,
"total_messages": uint64(7),
"total_value_transferred": 132,
}))
ctx := context.Background()
mockCache := &cacheMock.CacheMock{}
mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return("", cache.ErrNotFound) // mock cache miss
// mock cache update, validate it's called once.
mockCache.On("Set",
ctx,
"WORMSCAN:PROTOCOLS:PROTOCOL1",
`{"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":3,"last_day_diff_percentage":"75.00%"}`,
time.Duration(60)*time.Minute).
Return(nil).
Times(1)
var errNil error
respStatsLatest := &mockQueryTableResult{}
respStatsLatest.On("Next").Return(true)
respStatsLatest.On("Err").Return(errNil)
respStatsLatest.On("Close").Return(errNil)
respStatsLatest.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{
"protocol": "protocol1",
"total_messages": uint64(7),
"total_value_locked": float64(5),
}))
respStatsLastDay := &mockQueryTableResult{}
respStatsLastDay.On("Next").Return(true)
respStatsLastDay.On("Err").Return(errNil)
respStatsLastDay.On("Close").Return(errNil)
respStatsLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{
"protocol": "protocol1",
"total_messages": uint64(4),
"total_value_locked": float64(5),
}))
respActivityLast := &mockQueryTableResult{}
respActivityLast.On("Next").Return(true)
respActivityLast.On("Err").Return(errNil)
respActivityLast.On("Close").Return(errNil)
respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{
"protocol": "protocol1",
"total_messages": uint64(15),
"total_value_transferred": float64(7),
"total_value_secure": float64(9),
}))
queryAPI := &mockQueryAPI{}
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil)
activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1")
queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil)
repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 60)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsTotalStartOfDay, "bucketInfinite", dbconsts.CctpStatsMeasurementDaily, protocols.CCTP, protocols.CCTP)).Return(totalStartOfCurrentDay, errNil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaSinceStartOfDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaSinceStartOfDay, errNil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaLastDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaLastDay, errNil)
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop())
service := protocols.NewService([]string{}, []string{protocols.CCTP}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{})
values := service.GetProtocolsTotalValues(ctx)
assert.NotNil(t, values)
assert.Equal(t, 1, len(values))
assert.Equal(t, "protocol1", values[0].Protocol)
assert.Equal(t, 5.00, values[0].TotalValueLocked)
assert.Equal(t, uint64(7), values[0].TotalMessages)
assert.Equal(t, 9.00, values[0].TotalValueSecured)
assert.Equal(t, 7.00, values[0].TotalValueTransferred)
assert.Equal(t, uint64(3), values[0].LastDayMessages)
assert.Equal(t, "75.00%", values[0].LastDayDiffPercentage)
for i := range values {
switch values[i].Protocol {
case "cctp":
assert.Equal(t, uint64(56), values[i].TotalMessages)
assert.Equal(t, 6.0, values[i].TotalValueTransferred)
assert.Equal(t, uint64(7), values[i].LastDayMessages)
assert.Equal(t, "14.29%", values[i].LastDayDiffPercentage)
assert.Equal(t, 1235.523, values[i].TotalValueLocked)
default:
t.Errorf("unexpected protocol %s", values[i].Protocol)
}
}
}
type mockQueryAPI struct {
@ -273,3 +267,10 @@ func (m *mockQueryTableResult) Close() error {
args := m.Called()
return args.Error(0)
}
type mockTvl struct {
}
func (t *mockTvl) Get(ctx context.Context) (string, error) {
return "1235.523", nil
}

View File

@ -30,3 +30,12 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
func (m *PrometheusMetrics) IncExpiredCacheResponse(key string) {
m.expiredCacheResponseCount.WithLabelValues(key).Inc()
}
type noOpMetrics struct{}
func (s *noOpMetrics) IncExpiredCacheResponse(_ string) {
}
func NewNoOpMetrics() Metrics {
return &noOpMetrics{}
}

View File

@ -159,7 +159,14 @@ func main() {
relaysRepo := relays.NewRepository(db.Database, rootLogger)
operationsRepo := operations.NewRepository(db.Database, rootLogger)
statsRepo := stats.NewRepository(influxCli, cfg.Influx.Organization, cfg.Influx.Bucket24Hours, rootLogger)
protocolsRepo := protocols.NewRepository(protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), cfg.Influx.Bucket30Days, cfg.Influx.Bucket30Days, cfg.ProtocolsStatsVersion, cfg.ProtocolsActivityVersion, rootLogger)
protocolsRepo := protocols.NewRepository(
protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)),
cfg.Influx.BucketInfinite,
cfg.Influx.Bucket30Days,
cfg.ProtocolsStatsVersion,
cfg.ProtocolsActivityVersion,
rootLogger,
)
// create token provider
tokenProvider := domain.NewTokenProvider(cfg.P2pNetwork)
@ -179,7 +186,7 @@ func main() {
relaysService := relays.NewService(relaysRepo, rootLogger)
operationsService := operations.NewService(operationsRepo, rootLogger)
statsService := stats.NewService(statsRepo, cache, expirationTime, metrics, rootLogger)
protocolsService := protocols.NewService(cfg.Protocols, protocolsRepo, rootLogger, cache, cfg.Cache.ProtocolsStatsKey, cfg.Cache.ProtocolsStatsExpiration)
protocolsService := protocols.NewService(cfg.Protocols, []string{protocols.CCTP, protocols.PortalTokenBridge}, protocolsRepo, rootLogger, cache, cfg.Cache.ProtocolsStatsKey, cfg.Cache.ProtocolsStatsExpiration, metrics, tvl)
// Set up a custom error handler
response.SetEnableStackTrace(*cfg)

View File

@ -45,8 +45,10 @@ func TestGetContributorsTotalValues(t *testing.T) {
service := mockService(func(ctx context.Context) []contributorsHandlerPkg.ProtocolTotalValuesDTO {
return []contributorsHandlerPkg.ProtocolTotalValuesDTO{
{
Protocol: "protocol1",
Error: inputArgs.mockError,
ProtocolStats: contributorsHandlerPkg.ProtocolStats{
Protocol: "protocol1",
},
Error: inputArgs.mockError,
},
}
})

View File

@ -4,4 +4,12 @@ package dbconsts
const (
ProtocolsActivityMeasurement = "protocols_activity"
ProtocolsStatsMeasurement = "protocols_stats_v1"
CctpStatsMeasurementHourly = intProtocolStatsMeasurement1h
TokenBridgeStatsMeasurementHourly = intProtocolStatsMeasurement1h
intProtocolStatsMeasurement1h = "core_protocols_stats_1h"
CctpStatsMeasurementDaily = intProtocolStatsMeasurement1d
TokenBridgeStatsMeasurementDaily = intProtocolStatsMeasurement1d
intProtocolStatsMeasurement1d = "core_protocols_stats_1d"
)