Fix intermittent failure in x-chain-activity endpoint (#420)

* Replace start_time and end_time query parameters with fixed ranges at the endpoint of the chain activity
Add InfluxDB tasks for enpdoint chain-activity for each timeSpan
Add cache for chain-activity endpoint

* Code review updates
This commit is contained in:
ftocal 2023-06-20 10:34:20 -03:00 committed by GitHub
parent 50c1d479a6
commit 69885aed0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 901 additions and 673 deletions

View File

@ -0,0 +1,31 @@
import "date"
option task = {
name: "chain activity for 1 year with 3-hour granularity",
every: 3h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-24hours"
execution = date.truncate(t: now(), unit: 1h)
start = date.truncate(t: -1y, unit: 24h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_1_year_3h")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_1_year_3h")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -0,0 +1,31 @@
import "date"
option task = {
name: "chain activity for 30 days with 3-hour granularity",
every: 3h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-24hours"
execution = date.truncate(t: now(), unit: 1h)
start = date.truncate(t: -30d, unit: 24h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_30_days_3h")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_30_days_3h")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -0,0 +1,31 @@
import "date"
option task = {
name: "chain activity for 7 days with 3-hour granularity",
every: 3h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-24hours"
execution = date.truncate(t: now(), unit: 1h)
start = date.truncate(t: -7d, unit: 24h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_7_days_3h")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_7_days_3h")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -0,0 +1,31 @@
import "date"
option task = {
name: "chain activity for 90 days with 3-hour granularity",
every: 3h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-24hours"
execution = date.truncate(t: now(), unit: 1h)
start = date.truncate(t: -90d, unit: 24h)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_90_days_3h")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: start)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_90_days_3h")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -0,0 +1,30 @@
import "date"
option task = {
name: "chain activity for all time with 3-hour granularity",
every: 3h,
}
sourceBucket = "wormscan"
destinationBucket = "wormscan-24hours"
execution = date.truncate(t: now(), unit: 1h)
from(bucket: sourceBucket)
|> range(start: 1970-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> count(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_all_time_3h")
|> set(key: "_field", value: "count")
|> to(bucket: destinationBucket)
from(bucket: sourceBucket)
|> range(start: 1970-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> group(columns: ["emitter_chain", "destination_chain", "app_id"])
|> sum(column: "_value")
|> map(fn: (r) => ({r with _time: execution}))
|> set(key: "_measurement", value: "chain_activity_all_time_3h")
|> set(key: "_field", value: "notional")
|> to(bucket: destinationBucket)

View File

@ -0,0 +1,76 @@
package cacheable
import (
"context"
"encoding/json"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/client/cache"
"go.uber.org/zap"
)
// GetOrLoad is a function that tries to get the result from the cache, if it is not found or it is expired, then it loads the result.
func GetOrLoad[T any](
ctx context.Context,
logger *zap.Logger,
cacheClient cache.Cache,
expirations time.Duration,
key string,
load func() (T, error),
) (T, error) {
log := logger.With(zap.String("key", key))
// Try to get the result from the cache.
value, err := cacheClient.Get(ctx, key)
foundCache := true
//If the result is not found in the cache or fails, then load the result.
if err != nil {
foundCache = false
if err != cache.ErrNotFound {
log.Warn("getting result from cache", zap.Error(err))
}
}
var cached CachedResult[T]
//If the result is found in the cache and it is not expired, then return the result.
if foundCache {
err = json.Unmarshal([]byte(value), &cached)
if err != nil {
log.Warn("unmarshal cache", zap.Error(err))
} else if cached.Timestamp.Add(expirations).After(time.Now()) {
return cached.Result, nil
}
}
//If the result is not found in the cache or it is expired, then load the result.
result, err := load()
if err != nil {
//If the load function fails and the cache was found and is expired, the cache value is returned anyway.
if foundCache {
log.Warn("load function fails but returns cached result",
zap.Error(err), zap.String("cacheTime", cached.Timestamp.String()))
return cached.Result, nil
}
return result, err
}
//Saves the result of the execution of the load function in cache.
newValue := CachedResult[T]{Timestamp: time.Now(), Result: result}
err = cacheClient.Set(ctx, key, newValue, 10*expirations)
if err != nil {
log.Warn("saving the result in the cache", zap.Error(err))
}
//Returns the result of the execution of the function load
return result, nil
}
type CachedResult[T any] struct {
Timestamp time.Time `json:"timestamp"`
Result T `json:"result"`
}
func (c CachedResult[T]) MarshalBinary() ([]byte, error) {
return json.Marshal(c)
}

View File

@ -1,4 +1,5 @@
// Package docs Code generated by swaggo/swag. DO NOT EDIT
// Code generated by swaggo/swag. DO NOT EDIT.
package docs
import "github.com/swaggo/swag"
@ -1343,7 +1344,7 @@ const docTemplate = `{
},
"/api/v1/x-chain-activity": {
"get": {
"description": "Returns a list of chain pairs by origin chain and destination chain.\nThe list could be rendered by volume or transaction count.\nThe volume is calculated using the notional price of the symbol at the day the VAA was emitted.",
"description": "Returns a list of chain pairs by origin chain and destination chain.\nThe list could be rendered by notional or transaction count.\nThe volume is calculated using the notional price of the symbol at the day the VAA was emitted.",
"tags": [
"Wormscan"
],
@ -1351,19 +1352,13 @@ const docTemplate = `{
"parameters": [
{
"type": "string",
"description": "Star time (format: ISO-8601).",
"name": "start_time",
"description": "Time span, supported values: 7d, 30d, 90d, 1y and all-time (default is 7d).",
"name": "timeSpan",
"in": "query"
},
{
"type": "string",
"description": "End time (format: ISO-8601).",
"name": "end_time",
"in": "query"
},
{
"type": "string",
"description": "Renders the results using volume or tx count (default is volume).",
"description": "Renders the results using notional or tx count (default is notional).",
"name": "by",
"in": "query"
},

View File

@ -1336,7 +1336,7 @@
},
"/api/v1/x-chain-activity": {
"get": {
"description": "Returns a list of chain pairs by origin chain and destination chain.\nThe list could be rendered by volume or transaction count.\nThe volume is calculated using the notional price of the symbol at the day the VAA was emitted.",
"description": "Returns a list of chain pairs by origin chain and destination chain.\nThe list could be rendered by notional or transaction count.\nThe volume is calculated using the notional price of the symbol at the day the VAA was emitted.",
"tags": [
"Wormscan"
],
@ -1344,19 +1344,13 @@
"parameters": [
{
"type": "string",
"description": "Star time (format: ISO-8601).",
"name": "start_time",
"description": "Time span, supported values: 7d, 30d, 90d, 1y and all-time (default is 7d).",
"name": "timeSpan",
"in": "query"
},
{
"type": "string",
"description": "End time (format: ISO-8601).",
"name": "end_time",
"in": "query"
},
{
"type": "string",
"description": "Renders the results using volume or tx count (default is volume).",
"description": "Renders the results using notional or tx count (default is notional).",
"name": "by",
"in": "query"
},

View File

@ -1634,19 +1634,16 @@ paths:
get:
description: |-
Returns a list of chain pairs by origin chain and destination chain.
The list could be rendered by volume or transaction count.
The list could be rendered by notional or transaction count.
The volume is calculated using the notional price of the symbol at the day the VAA was emitted.
operationId: x-chain-activity
parameters:
- description: 'Star time (format: ISO-8601).'
- description: 'Time span, supported values: 7d, 30d, 90d, 1y and all-time (default
is 7d).'
in: query
name: start_time
name: timeSpan
type: string
- description: 'End time (format: ISO-8601).'
in: query
name: end_time
type: string
- description: Renders the results using volume or tx count (default is volume).
- description: Renders the results using notional or tx count (default is notional).
in: query
name: by
type: string

View File

@ -130,16 +130,40 @@ type TransactionCountResult struct {
}
type ChainActivityResult struct {
ChainSourceID string `mapstructure:"emitter_chain"`
ChainDestinationID string `mapstructure:"destination_chain"`
Volume uint64 `mapstructure:"volume"`
ChainSourceID string `mapstructure:"emitter_chain" json:"emitter_chain"`
ChainDestinationID string `mapstructure:"destination_chain" json:"destination_chain"`
Volume uint64 `mapstructure:"_value" json:"volume"`
}
type ChainActivityTimeSpan string
const (
ChainActivityTs7Days ChainActivityTimeSpan = "7d"
ChainActivityTs30Days ChainActivityTimeSpan = "30d"
ChainActivityTs90Days ChainActivityTimeSpan = "90d"
ChainActivityTs1Year ChainActivityTimeSpan = "1y"
ChainActivityTsAllTime ChainActivityTimeSpan = "all-time"
)
// ParseChainActivityTimeSpan parses a string and returns a `ChainActivityTimeSpan`.
func ParseChainActivityTimeSpan(s string) (ChainActivityTimeSpan, error) {
if s == string(ChainActivityTs7Days) ||
s == string(ChainActivityTs30Days) ||
s == string(ChainActivityTs90Days) ||
s == string(ChainActivityTs1Year) ||
s == string(ChainActivityTsAllTime) {
tmp := ChainActivityTimeSpan(s)
return tmp, nil
}
return "", fmt.Errorf("invalid time span: %s", s)
}
type ChainActivityQuery struct {
Start *time.Time
End *time.Time
AppIDs []string
TimeSpan ChainActivityTimeSpan
IsNotional bool
AppIDs []string
}
func (q *ChainActivityQuery) HasAppIDS() bool {
@ -150,20 +174,6 @@ func (q *ChainActivityQuery) GetAppIDs() []string {
return q.AppIDs
}
func (q *ChainActivityQuery) GetStart() time.Time {
if q.Start == nil {
return time.UnixMilli(0)
}
return *q.Start
}
func (q *ChainActivityQuery) GetEnd() time.Time {
if q.End == nil {
return time.Now()
}
return *q.End
}
// Token represents a token.
type Token struct {
Symbol domain.Symbol `json:"symbol"`

View File

@ -23,24 +23,23 @@ import (
"go.uber.org/zap"
)
const queryTemplate = `
const queryTemplateChainActivity = `
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> range(start: %s)
|> filter(fn: (r) => r._measurement == "%s" and r._field == "%s")
|> last()
|> group(columns: ["emitter_chain", "destination_chain"])
|> %s(column: "volume")
|> sum()
`
const queryTemplateWithApps = `
const queryTemplateChainActivityWithApps = `
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "vaa_volume")
|> filter(fn: (r) => r._field == "volume" or r._field == "app_id")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> range(start: %s)
|> filter(fn: (r) => r._measurement == "%s" and r._field == "%s")
|> filter(fn: (r) => contains(value: r.app_id, set: %s))
|> last()
|> group(columns: ["emitter_chain", "destination_chain"])
|> %s(column: "volume")
|> sum()
`
const queryTemplateTxCount24h = `
@ -315,7 +314,7 @@ func convertToDecimal(amount uint64) string {
}
func (r *Repository) FindChainActivity(ctx context.Context, q *ChainActivityQuery) ([]ChainActivityResult, error) {
query := r.buildFindVolumeQuery(q)
query := r.buildChainActivityQuery(q)
result, err := r.queryAPI.Query(ctx, query)
if err != nil {
return nil, err
@ -334,20 +333,37 @@ func (r *Repository) FindChainActivity(ctx context.Context, q *ChainActivityQuer
return response, nil
}
func (r *Repository) buildFindVolumeQuery(q *ChainActivityQuery) string {
start := q.GetStart().UTC().Format(time.RFC3339)
stop := q.GetEnd().UTC().Format(time.RFC3339)
var operation string
func (r *Repository) buildChainActivityQuery(q *ChainActivityQuery) string {
var field string
if q.IsNotional {
operation = "sum"
field = "notional"
} else {
operation = "count"
field = "count"
}
var measurement string
switch q.TimeSpan {
case ChainActivityTs7Days:
measurement = "chain_activity_7_days_3h"
case ChainActivityTs30Days:
measurement = "chain_activity_30_days_3h"
case ChainActivityTs90Days:
measurement = "chain_activity_90_days_3h"
case ChainActivityTs1Year:
measurement = "chain_activity_1_year_3h"
case ChainActivityTsAllTime:
measurement = "chain_activity_all_time_3h"
default:
measurement = "chain_activity_7_days_3h"
}
//today without hours
start := time.Now().Truncate(24 * time.Hour).UTC().Format(time.RFC3339)
if q.HasAppIDS() {
apps := `["` + strings.Join(q.GetAppIDs(), `","`) + `"]`
return fmt.Sprintf(queryTemplateWithApps, r.bucketInfiniteRetention, start, stop, apps, operation)
return fmt.Sprintf(queryTemplateChainActivityWithApps, r.bucket24HoursRetention, start, measurement, field, apps)
} else {
return fmt.Sprintf(queryTemplateChainActivity, r.bucket24HoursRetention, start, measurement, field)
}
return fmt.Sprintf(queryTemplate, r.bucketInfiniteRetention, start, stop, operation)
}
func (r *Repository) GetScorecards(ctx context.Context) (*Scorecards, error) {

View File

@ -3,10 +3,14 @@ package transactions
import (
"context"
"fmt"
"strings"
"time"
"github.com/wormhole-foundation/wormhole-explorer/api/cacheable"
errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination"
"github.com/wormhole-foundation/wormhole-explorer/api/types"
"github.com/wormhole-foundation/wormhole-explorer/common/client/cache"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
@ -14,14 +18,15 @@ import (
type Service struct {
repo *Repository
cache cache.Cache
supportedChainIDs map[vaa.ChainID]string
logger *zap.Logger
}
// NewService create a new Service.
func NewService(repo *Repository, logger *zap.Logger) *Service {
func NewService(repo *Repository, cache cache.Cache, logger *zap.Logger) *Service {
supportedChainIDs := domain.GetSupportedChainIDs()
return &Service{repo: repo, supportedChainIDs: supportedChainIDs, logger: logger.With(zap.String("module", "TransactionService"))}
return &Service{repo: repo, supportedChainIDs: supportedChainIDs, cache: cache, logger: logger.With(zap.String("module", "TransactionService"))}
}
// GetTransactionCount get the last transactions.
@ -43,7 +48,11 @@ func (s *Service) GetTopChainPairs(ctx context.Context, timeSpan *TopStatisticsT
// GetChainActivity get chain activity.
func (s *Service) GetChainActivity(ctx context.Context, q *ChainActivityQuery) ([]ChainActivityResult, error) {
return s.repo.FindChainActivity(ctx, q)
key := fmt.Sprintf("wormscan:chain-activity:%s:%v:%s", q.TimeSpan, q.IsNotional, strings.Join(q.GetAppIDs(), ","))
return cacheable.GetOrLoad(ctx, s.logger, s.cache, 5*time.Minute, key,
func() ([]ChainActivityResult, error) {
return s.repo.FindChainActivity(ctx, q)
})
}
// FindGlobalTransactionByID find a global transaction by id.

View File

@ -149,7 +149,7 @@ func main() {
governorService := governor.NewService(governorRepo, rootLogger)
infrastructureService := infrastructure.NewService(infrastructureRepo, rootLogger)
heartbeatsService := heartbeats.NewService(heartbeatsRepo, rootLogger)
transactionsService := transactions.NewService(transactionsRepo, rootLogger)
transactionsService := transactions.NewService(transactionsRepo, cache, rootLogger)
// Set up a custom error handler
response.SetEnableStackTrace(*cfg)

View File

@ -353,6 +353,15 @@ func ExtractIsNotional(ctx *fiber.Ctx) (bool, error) {
return false, response.NewInvalidQueryParamError(ctx, "INVALID <by> QUERY PARAMETER", nil)
}
func ExtractChainActivityTimeSpan(ctx *fiber.Ctx) (transactions.ChainActivityTimeSpan, error) {
s := ctx.Query("timeSpan", string(transactions.ChainActivityTs7Days))
timeSpan, err := transactions.ParseChainActivityTimeSpan(s)
if err != nil {
return "", response.NewInvalidQueryParamError(ctx, "INVALID <timeSpan> QUERY PARAMETER", nil)
}
return timeSpan, nil
}
// ExtractTopStatisticsTimeSpan parses the `timespan` parameter used on top statistics endpoints.
//
// The endpoints that accept this parameter are:
@ -369,35 +378,6 @@ func ExtractTopStatisticsTimeSpan(ctx *fiber.Ctx) (*transactions.TopStatisticsTi
return timeSpan, nil
}
func ExtractTimeRange(ctx *fiber.Ctx) (*time.Time, *time.Time, error) {
startTime, err := ExtractTime(ctx, "start_time")
if err != nil {
return nil, nil, err
}
// check if start_time is in the future
if startTime != nil && startTime.After(time.Now()) {
return nil, nil, response.NewInvalidQueryParamError(ctx, "INVALID <start_time> QUERY PARAMETER, CANNOT BE GREATER THAN TODAYS DATE", nil)
}
endTime, err := ExtractTime(ctx, "end_time")
if err != nil {
return nil, nil, err
}
if startTime != nil && endTime != nil {
// check if start_time and end_time are equal
if startTime.Equal(*endTime) {
return nil, nil, response.NewInvalidQueryParamError(ctx, "INVALID <start_time>, <end_time> QUERY PARAMETER, <start_time> CANNOT BE EQUAL TO <end_time>", nil)
}
// check if start_time is greater than end_time
if startTime.After(*endTime) {
return nil, nil, response.NewInvalidQueryParamError(ctx, "INVALID <start_time>, <end_time> QUERY PARAMETER, <start_time> CANNOT BE GREATER THAN <end_time>", nil)
}
}
return startTime, endTime, nil
}
// ExtractTokenAddress get token address from route path.
func ExtractTokenAddress(c *fiber.Ctx, l *zap.Logger) (*types.Address, error) {
strTokenAddress := c.Params("token_address")

View File

@ -181,40 +181,38 @@ func (c *Controller) GetTopAssets(ctx *fiber.Ctx) error {
// GetChainActivity godoc
// @Description Returns a list of chain pairs by origin chain and destination chain.
// @Description The list could be rendered by volume or transaction count.
// @Description The list could be rendered by notional or transaction count.
// @Description The volume is calculated using the notional price of the symbol at the day the VAA was emitted.
// @Tags Wormscan
// @ID x-chain-activity
// @Param start_time query string false "Star time (format: ISO-8601)."
// @Param end_time query string false "End time (format: ISO-8601)."
// @Param by query string false "Renders the results using volume or tx count (default is volume)."
// @Param timeSpan query string false "Time span, supported values: 7d, 30d, 90d, 1y and all-time (default is 7d)."
// @Param by query string false "Renders the results using notional or tx count (default is notional)."
// @Param apps query string false "List of apps separated by comma (default is all apps)."
// @Success 200 {object} transactions.ChainActivity
// @Failure 400
// @Failure 500
// @Router /api/v1/x-chain-activity [get]
func (c *Controller) GetChainActivity(ctx *fiber.Ctx) error {
startTime, endTime, err := middleware.ExtractTimeRange(ctx)
if err != nil {
return err
}
apps, err := middleware.ExtractApps(ctx)
if err != nil {
return err
}
isNotional, err := middleware.ExtractIsNotional(ctx)
if err != nil {
return err
}
timeSpan, err := middleware.ExtractChainActivityTimeSpan(ctx)
if err != nil {
return err
}
q := &transactions.ChainActivityQuery{
Start: startTime,
End: endTime,
AppIDs: apps,
TimeSpan: timeSpan,
IsNotional: isNotional,
AppIDs: apps,
}
// Get the chain activity.
activity, err := c.srv.GetChainActivity(ctx.Context(), q)
if err != nil {

File diff suppressed because it is too large Load Diff