From faa8e38b2ee87d4963e88a2e32a3110d40c845ec Mon Sep 17 00:00:00 2001 From: Mariano <9205080+marianososto@users.noreply.github.com> Date: Mon, 18 Mar 2024 10:21:42 -0300 Subject: [PATCH] [ISSUE-1213] Refactor of top-contributors stats and activity job (#1215) * start refactor of stats and activity job change approach for collecting stats and activity metrics multiple stuff fix unfinished merging conflicts changes update Delete .run/wormhole-explorer-api.run.xml add unit-tesdt add unit-test for mayan and allbrdige add protocols-stats-1h job add new job change stats query fix query start refactor of stats and activity job change approach for collecting stats and activity metrics multiple stuff fix unfinished merging conflicts changes update Delete .run/wormhole-explorer-api.run.xml add unit-tesdt add unit-test for mayan and allbrdige add protocols-stats-1h job add new job change stats query fix query * remove temp backfiller * fix unit-tests * remove useless dbconsts * remove useless consts * fix some renaming * fix unit-tests * fix tvl portal_token_bridge --- .gitignore | 2 +- api/handlers/protocols/repository.go | 234 +++++++++++------- api/handlers/protocols/service.go | 34 ++- api/handlers/protocols/service_test.go | 91 ++++--- api/internal/config/config.go | 4 +- api/main.go | 9 +- common/dbconsts/consts.go | 6 +- deploy/jobs/protocols-stats-daily.yaml | 45 ++++ deploy/jobs/protocols-stats-hourly.yaml | 45 ++++ jobs/cmd/main.go | 68 +++-- jobs/config/config.go | 15 +- jobs/jobs/jobs.go | 12 +- jobs/jobs/protocols/activity/activity.go | 82 ------ jobs/jobs/protocols/activity/activity_test.go | 80 ------ .../internal/repositories/allbridge.go | 141 ----------- .../internal/repositories/repository.go | 26 -- jobs/jobs/protocols/activity/types.go | 41 --- .../jobs/protocols/internal/commons/common.go | 5 + jobs/jobs/protocols/repository/allbridge.go | 234 ++++++++++++++++++ .../allbridge_test.go | 33 +-- .../repositories => repository}/mayan.go | 88 +++++-- .../repositories => repository}/mayan_test.go | 33 +-- jobs/jobs/protocols/repository/types.go | 48 ++++ jobs/jobs/protocols/stats.go | 136 ++++++++++ jobs/jobs/protocols/stats/stats.go | 227 ----------------- jobs/jobs/protocols/stats/stats_test.go | 165 ------------ jobs/jobs/protocols/stats_test.go | 184 ++++++++++++++ 27 files changed, 1090 insertions(+), 998 deletions(-) create mode 100644 deploy/jobs/protocols-stats-daily.yaml create mode 100644 deploy/jobs/protocols-stats-hourly.yaml delete mode 100644 jobs/jobs/protocols/activity/activity.go delete mode 100644 jobs/jobs/protocols/activity/activity_test.go delete mode 100644 jobs/jobs/protocols/activity/internal/repositories/allbridge.go delete mode 100644 jobs/jobs/protocols/activity/internal/repositories/repository.go delete mode 100644 jobs/jobs/protocols/activity/types.go create mode 100644 jobs/jobs/protocols/repository/allbridge.go rename jobs/jobs/protocols/{activity/internal/repositories => repository}/allbridge_test.go (66%) rename jobs/jobs/protocols/{activity/internal/repositories => repository}/mayan.go (52%) rename jobs/jobs/protocols/{activity/internal/repositories => repository}/mayan_test.go (65%) create mode 100644 jobs/jobs/protocols/repository/types.go create mode 100644 jobs/jobs/protocols/stats.go delete mode 100644 jobs/jobs/protocols/stats/stats.go delete mode 100644 jobs/jobs/protocols/stats/stats_test.go create mode 100644 jobs/jobs/protocols/stats_test.go diff --git a/.gitignore b/.gitignore index 8ac2603c..68e22e7f 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,4 @@ serviceAccountKey.json bigtableAccountKey.json tsconfig.tsbuildinfo serviceAccount.json -.run \ No newline at end of file +.run/ \ No newline at end of file diff --git a/api/handlers/protocols/repository.go b/api/handlers/protocols/repository.go index 1fb724cc..8b3fc27f 100644 --- a/api/handlers/protocols/repository.go +++ b/api/handlers/protocols/repository.go @@ -8,38 +8,13 @@ import ( "github.com/mitchellh/mapstructure" "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" "go.uber.org/zap" + "time" ) -const QueryTemplateLatestPoint = ` -from(bucket: "%s") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") - |> last() - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") -` - -const QueryTemplateLast24Point = ` -from(bucket: "%s") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") - |> first() - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") -` - -const QueryTemplateActivityLatestPoint = ` -from(bucket: "%s") - |> range(start: 1970-01-01T00:00:00Z) - |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") - |> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"]) - |> last() - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") -` - -// QueryIntProtocolsTotalStartOfDay Query template for internal protocols (cctp and portal_token_bridge) to fetch total values till the start of current day -const QueryIntProtocolsTotalStartOfDay = ` +// QueryCoreProtocolTotalStartOfDay Query template for core protocols (cctp and portal_token_bridge) to fetch total values till the start of current day +const QueryCoreProtocolTotalStartOfDay = ` import "date" - import "types" - + startOfCurrentDay = date.truncate(t: now(), unit: 1d) data = from(bucket: "%s") @@ -58,6 +33,7 @@ totalMsgs = data |> group() |> sum() |> set(key:"_field",value:"total_messages") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) union(tables:[tvt,totalMsgs]) |> set(key:"_time",value:string(v:startOfCurrentDay)) @@ -65,8 +41,8 @@ union(tables:[tvt,totalMsgs]) |> set(key:"app_id",value:"%s") ` -// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day -const QueryIntProtocolsDeltaSinceStartOfDay = ` +// QueryCoreProtocolDeltaSinceStartOfDay calculate delta since the beginning of current day +const QueryCoreProtocolDeltaSinceStartOfDay = ` import "date" import "types" @@ -89,6 +65,7 @@ totalMsgs = data |> group() |> sum() |> set(key:"_field",value:"total_messages") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) union(tables:[tvt,totalMsgs]) |> set(key:"_time",value:string(v:startOfDay)) @@ -96,11 +73,10 @@ union(tables:[tvt,totalMsgs]) |> set(key:"app_id",value:"%s") ` -// QueryIntProtocolsDeltaLastDay calculate last day delta -const QueryIntProtocolsDeltaLastDay = ` +// QueryCoreProtocolDeltaLastDay calculate last day delta +const QueryCoreProtocolDeltaLastDay = ` import "date" import "types" - ts = date.truncate(t: now(), unit: 1h) yesterday = date.sub(d: 1d, from: ts) @@ -121,6 +97,7 @@ totalMsgs = data |> group() |> sum() |> set(key:"_field",value:"total_messages") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) union(tables:[tvt,totalMsgs]) |> set(key:"_time",value:string(v:yesterday)) @@ -128,23 +105,87 @@ union(tables:[tvt,totalMsgs]) |> set(key:"app_id",value:"%s") ` +const QueryTemplateProtocolStatsLastDay = ` + from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s") + |> first() + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +` + +const QueryTemplateProtocolStats = ` + data = from(bucket: "%s") + |> range(start: -2d) + |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s") + + totalMsg = data + |> filter(fn: (r) => r._field == "total_messages") + |> sort(columns:["_time"],desc:false) + |> last() + + tvl = data + |> filter(fn: (r) => r._field == "total_value_locked") + |> sort(columns:["_time"],desc:false) + |> last() + + volume = data + |> filter(fn: (r) => r._field == "volume") + |> sort(columns:["_time"],desc:false) + |> last() + + union(tables:[totalMsg,tvl,volume]) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +` + +const QueryTemplateProtocolActivity = ` + data = + from(bucket: "%s") + |> range(start: %s) + |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s") + + tvs = data + |> filter(fn: (r) => r._field == "total_value_secure") + |> cumulativeSum() + |> last() + + tvt = data + |> filter(fn: (r) => r._field == "total_value_transferred") + |> cumulativeSum() + |> last() + + volume = data + |> filter(fn: (r) => r._field == "volume") + |> sort(columns:["_time"],desc:false) + |> cumulativeSum() + |> last() + + txs = data + |> filter(fn: (r) => r._field == "txs") + |> sort(columns:["_time"],desc:false) + |> cumulativeSum() + |> last() + + union(tables:[tvs,tvt,volume,txs]) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +` + type Repository struct { - queryAPI QueryDoer - logger *zap.Logger - bucketInfinite string - bucket30d string - statsVersion string - activityVersion string - intProtocolMeasurement map[string]struct { + queryAPI QueryDoer + logger *zap.Logger + bucketInfinite string + bucket30d string + coreProtocolMeasurement map[string]struct { Daily string Hourly string } } type rowStat struct { - Protocol string `mapstructure:"protocol"` - TotalMessages uint64 `mapstructure:"total_messages"` - TotalValueLocked float64 `mapstructure:"total_value_locked"` + Protocol string `mapstructure:"protocol"` + TotalMessages uint64 `mapstructure:"total_messages"` + TotalValueLocked float64 `mapstructure:"total_value_locked"` + Volume float64 `mapstructure:"volume"` + Time time.Time `mapstructure:"_time"` } type intRowStat struct { @@ -159,14 +200,12 @@ type intStats struct { } type rowActivity struct { - Protocol string `mapstructure:"protocol"` - DestinationChainId string `mapstructure:"destination_chain_id"` - EmitterChainId string `mapstructure:"emitter_chain_id"` - From string `mapstructure:"from"` - TotalUsd float64 `mapstructure:"total_usd"` - TotalValueTransferred float64 `mapstructure:"total_value_transferred"` - TotalVolumeSecure float64 `mapstructure:"total_value_secure"` - Txs uint64 `mapstructure:"txs"` + Protocol string `mapstructure:"protocol"` + Time time.Time `mapstructure:"_time"` + TotalUsd float64 `mapstructure:"total_usd"` + TotalValueTransferred float64 `mapstructure:"total_value_transferred"` + TotalValueSecure float64 `mapstructure:"total_value_secure"` + Txs uint64 `mapstructure:"txs"` } type stats struct { @@ -193,15 +232,13 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer { return &queryApiWrapper{qApi: qApi} } -func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, statsVersion, activityVersion string, logger *zap.Logger) *Repository { +func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d string, logger *zap.Logger) *Repository { return &Repository{ - queryAPI: qApi, - bucketInfinite: bucketInfinite, - bucket30d: bucket30d, - statsVersion: statsVersion, - activityVersion: activityVersion, - logger: logger, - intProtocolMeasurement: map[string]struct { + queryAPI: qApi, + bucketInfinite: bucketInfinite, + bucket30d: bucket30d, + logger: logger, + coreProtocolMeasurement: map[string]struct { Daily string Hourly string }{ @@ -215,41 +252,74 @@ func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult, return q.qApi.Query(ctx, query) } -// returns latest and last 24 hr stats for a given protocol -func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (stats, error) { +func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (rowStat, error) { - // fetch latest stat - q := buildQuery(QueryTemplateLatestPoint, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion) - latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) + q := fmt.Sprintf(QueryTemplateProtocolStats, r.bucket30d, dbconsts.ProtocolsStatsMeasurementHourly, protocol) + + statsData, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) if err != nil { - return stats{}, err + r.logger.Error("error fetching latest daily stats", zap.Error(err)) + return rowStat{}, err } - // fetch last 24 hr stat - q = buildQuery(QueryTemplateLast24Point, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion) - last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) - return stats{ - Latest: latest, - Last24: last24hr, - }, err + + return rowStat{ + Protocol: protocol, + TotalMessages: statsData.TotalMessages, + TotalValueLocked: statsData.TotalValueLocked, + Volume: statsData.Volume, + }, nil + +} + +func (r *Repository) getProtocolStatsLastDay(ctx context.Context, protocol string) (rowStat, error) { + + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + q := fmt.Sprintf(QueryTemplateProtocolStatsLastDay, r.bucket30d, from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, protocol) + + lastDayData, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) + if err != nil { + r.logger.Error("error fetching last day stats", zap.Error(err)) + return rowStat{}, err + } + + return lastDayData, nil + } func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (rowActivity, error) { - q := buildQuery(QueryTemplateActivityLatestPoint, r.bucket30d, dbconsts.ProtocolsActivityMeasurement, protocol, r.activityVersion) - return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol) + + q := fmt.Sprintf(QueryTemplateProtocolActivity, r.bucketInfinite, "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, protocol) + activityDaily, err := fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol) + if err != nil { + r.logger.Error("error fetching latest daily activity", zap.Error(err)) + return rowActivity{}, err + } + + q = fmt.Sprintf(QueryTemplateProtocolActivity, r.bucket30d, activityDaily.Time.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, protocol) + activityHourly, err := fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol) + + return rowActivity{ + Protocol: protocol, + Txs: activityDaily.Txs + activityHourly.Txs, + TotalUsd: activityDaily.TotalUsd + activityHourly.TotalUsd, + TotalValueTransferred: activityDaily.TotalValueTransferred + activityHourly.TotalValueTransferred, + TotalValueSecure: activityDaily.TotalValueSecure + activityHourly.TotalValueSecure, + }, nil } -// returns latest and last 24 hr for internal protocols (cctp and portal_token_bridge) -func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) { +// returns latest and last 24 hr for core protocols (cctp and portal_token_bridge) +func (r *Repository) getCoreProtocolStats(ctx context.Context, protocol string) (intStats, error) { // calculate total values till the start of current day - totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol, protocol) + totalTillCurrentDayQuery := fmt.Sprintf(QueryCoreProtocolTotalStartOfDay, r.bucketInfinite, r.coreProtocolMeasurement[protocol].Daily, protocol, protocol) totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol) if err != nil { return intStats{}, err } // calculate delta since the beginning of current day - q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol) + q2 := fmt.Sprintf(QueryCoreProtocolDeltaSinceStartOfDay, r.bucket30d, r.coreProtocolMeasurement[protocol].Hourly, protocol, protocol) currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol) if errCD != nil { return intStats{}, errCD @@ -266,7 +336,7 @@ func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol stri } // calculate last day delta - q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol) + q3 := fmt.Sprintf(QueryCoreProtocolDeltaLastDay, r.bucket30d, r.coreProtocolMeasurement[protocol].Hourly, protocol, protocol) deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol) if errQ3 != nil { return result, errQ3 @@ -297,7 +367,3 @@ func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx co err = mapstructure.Decode(result.Record().Values(), &res) return res, err } - -func buildQuery(queryTemplate, bucket, measurement, contributorName, version string) string { - return fmt.Sprintf(queryTemplate, bucket, measurement, contributorName, version) -} diff --git a/api/handlers/protocols/service.go b/api/handlers/protocols/service.go index bf62698f..c3ac4b65 100644 --- a/api/handlers/protocols/service.go +++ b/api/handlers/protocols/service.go @@ -19,7 +19,7 @@ type Service struct { Protocols []string repo *Repository logger *zap.Logger - intProtocols []string + coreProtocols []string cache cache.Cache cacheKeyPrefix string cacheTTL int @@ -46,12 +46,12 @@ type tvlProvider interface { Get(ctx context.Context) (string, error) } -func NewService(extProtocols, intProtocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int, metrics metrics.Metrics, tvlProvider tvlProvider) *Service { +func NewService(extProtocols, coreProtocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int, metrics metrics.Metrics, tvlProvider tvlProvider) *Service { return &Service{ Protocols: extProtocols, repo: repo, logger: logger, - intProtocols: intProtocols, + coreProtocols: coreProtocols, cache: cache, cacheKeyPrefix: cacheKeyPrefix, cacheTTL: cacheTTL, @@ -63,15 +63,15 @@ func NewService(extProtocols, intProtocols []string, repo *Repository, logger *z func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalValuesDTO { wg := &sync.WaitGroup{} - totalProtocols := len(s.Protocols) + len(s.intProtocols) + totalProtocols := len(s.Protocols) + len(s.coreProtocols) wg.Add(totalProtocols) results := make(chan ProtocolTotalValuesDTO, totalProtocols) for _, p := range s.Protocols { go s.fetchProtocolValues(ctx, wg, p, results, s.getProtocolStats) } - for _, p := range s.intProtocols { - go s.fetchProtocolValues(ctx, wg, p, results, s.getIntProtocolStats) + for _, p := range s.coreProtocols { + go s.fetchProtocolValues(ctx, wg, p, results, s.getCoreProtocolStats) } wg.Wait() close(results) @@ -119,9 +119,9 @@ func (s *Service) fetchProtocolValues(ctx context.Context, wg *sync.WaitGroup, p } // getProtocolStats fetches stats for CCTP and PortalTokenBridge -func (s *Service) getIntProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { +func (s *Service) getCoreProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { - protocolStats, err := s.repo.getInternalProtocolStats(ctx, protocol) + protocolStats, err := s.repo.getCoreProtocolStats(ctx, protocol) if err != nil { return ProtocolStats{ Protocol: protocol, @@ -144,7 +144,7 @@ func (s *Service) getIntProtocolStats(ctx context.Context, protocol string) (Pro val.LastDayDiffPercentage = percentage } - if CCTP == protocol { + if PortalTokenBridge == protocol { tvl, errTvl := s.tvl.Get(ctx) if errTvl != nil { s.logger.Error("error fetching tvl", zap.Error(errTvl), zap.String("protocol", protocol)) @@ -169,16 +169,24 @@ func (s *Service) getProtocolStats(ctx context.Context, protocol string) (Protoc } statsRes := make(chan statsResult, 1) go func() { + defer close(statsRes) rowStats, errStats := s.repo.getProtocolStats(ctx, protocol) - statsRes <- statsResult{result: rowStats, Err: errStats} - close(statsRes) + if errStats != nil { + statsRes <- statsResult{Err: errStats} + return + } + lastDayStats, errStats := s.repo.getProtocolStatsLastDay(ctx, protocol) + if errStats != nil { + statsRes <- statsResult{Err: errStats} + return + } + statsRes <- statsResult{result: stats{Latest: rowStats, Last24: lastDayStats}} }() activity, err := s.repo.getProtocolActivity(ctx, protocol) if err != nil { s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", protocol)) return ProtocolStats{Protocol: protocol}, err - } rStats := <-statsRes @@ -192,7 +200,7 @@ func (s *Service) getProtocolStats(ctx context.Context, protocol string) (Protoc TotalValueLocked: rStats.result.Latest.TotalValueLocked, TotalMessages: rStats.result.Latest.TotalMessages, TotalValueTransferred: activity.TotalValueTransferred, - TotalValueSecured: activity.TotalVolumeSecure, + TotalValueSecured: activity.TotalValueSecure, } totalMsgNow := rStats.result.Latest.TotalMessages diff --git a/api/handlers/protocols/service_test.go b/api/handlers/protocols/service_test.go index d759d639..c695f596 100644 --- a/api/handlers/protocols/service_test.go +++ b/api/handlers/protocols/service_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/api/query" "github.com/stretchr/testify/assert" "github.com/test-go/testify/mock" @@ -44,22 +43,36 @@ func TestService_GetProtocolsTotalValues(t *testing.T) { respActivityLast.On("Next").Return(true) respActivityLast.On("Err").Return(errNil) respActivityLast.On("Close").Return(errNil) + ts := time.Now().UTC() respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ "protocol": "protocol1", "total_messages": uint64(4), "total_value_transferred": float64(7), "total_value_secure": float64(9), + "_time": ts, + })) + + respActivity2 := &mockQueryTableResult{} + respActivity2.On("Next").Return(true) + respActivity2.On("Err").Return(errNil) + respActivity2.On("Close").Return(errNil) + respActivity2.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_transferred": float64(7), + "total_value_secure": float64(9), })) ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStats, "bucket30d", dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLatest, nil) + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStatsLastDay, "bucket30d", from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucketInfinite", "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(respActivityLast, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucket30d", ts.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, "protocol1")).Return(respActivity2, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") - queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil) - - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) @@ -67,8 +80,8 @@ func TestService_GetProtocolsTotalValues(t *testing.T) { assert.Equal(t, "protocol1", values[0].Protocol) assert.Equal(t, 5.00, values[0].TotalValueLocked) assert.Equal(t, uint64(7), values[0].TotalMessages) - assert.Equal(t, 9.00, values[0].TotalValueSecured) - assert.Equal(t, 7.00, values[0].TotalValueTransferred) + assert.Equal(t, 18.00, values[0].TotalValueSecured) + assert.Equal(t, 14.00, values[0].TotalValueTransferred) assert.Equal(t, uint64(3), values[0].LastDayMessages) assert.Equal(t, "75.00%", values[0].LastDayDiffPercentage) @@ -98,20 +111,20 @@ func TestService_GetProtocolsTotalValues_FailedFetchingActivity(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStats, "bucket30d", dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLatest, nil) + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStatsLastDay, "bucket30d", from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucketInfinite", "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(&mockQueryTableResult{}, errors.New("mocked_error")) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") - queryAPI.On("Query", ctx, activityQuery).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_activity_error")) - - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) assert.Equal(t, "protocol1", values[0].Protocol) assert.NotNil(t, values[0].Error) - assert.Equal(t, "mocked_fetching_activity_error", values[0].Error) + assert.Equal(t, "mocked_error", values[0].Error) } func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { @@ -131,29 +144,43 @@ func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { respActivityLast.On("Next").Return(true) respActivityLast.On("Err").Return(errNil) respActivityLast.On("Close").Return(errNil) + ts := time.Now().UTC() respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ "protocol": "protocol1", "total_messages": uint64(4), "total_value_transferred": float64(7), "total_volume_secure": float64(9), + "_time": ts, + })) + + respActivity2 := &mockQueryTableResult{} + respActivity2.On("Next").Return(true) + respActivity2.On("Err").Return(errNil) + respActivity2.On("Close").Return(errNil) + respActivity2.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_transferred": float64(7), + "total_value_secure": float64(9), })) ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_stats_error")) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStats, "bucket30d", dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(&mockQueryTableResult{}, errors.New("mocked_error")) + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStatsLastDay, "bucket30d", from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucketInfinite", "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(respActivityLast, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucket30d", ts.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, "protocol1")).Return(respActivity2, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") - queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, errNil) - - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) assert.Equal(t, "protocol1", values[0].Protocol) assert.NotNil(t, values[0].Error) - assert.Equal(t, "mocked_fetching_stats_error", values[0].Error) + assert.Equal(t, "mocked_error", values[0].Error) } func TestService_GetProtocolsTotalValues_CacheHit(t *testing.T) { @@ -184,7 +211,7 @@ func TestService_GetCCTP_Stats(t *testing.T) { totalStartOfCurrentDay.On("Err").Return(errNil) totalStartOfCurrentDay.On("Close").Return(errNil) totalStartOfCurrentDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "app_id": protocols.CCTP, + "app_id": protocols.PortalTokenBridge, "total_messages": uint64(50), "total_value_transferred": 4e8, })) @@ -194,7 +221,7 @@ func TestService_GetCCTP_Stats(t *testing.T) { deltaSinceStartOfDay.On("Err").Return(errNil) deltaSinceStartOfDay.On("Close").Return(errNil) deltaSinceStartOfDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "app_id": protocols.CCTP, + "app_id": protocols.PortalTokenBridge, "total_messages": uint64(6), "total_value_transferred": 2e8, })) @@ -204,7 +231,7 @@ func TestService_GetCCTP_Stats(t *testing.T) { deltaLastDay.On("Err").Return(errNil) deltaLastDay.On("Close").Return(errNil) deltaLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "app_id": protocols.CCTP, + "app_id": protocols.PortalTokenBridge, "total_messages": uint64(7), "total_value_transferred": 132, })) @@ -212,18 +239,18 @@ func TestService_GetCCTP_Stats(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsTotalStartOfDay, "bucketInfinite", dbconsts.CctpStatsMeasurementDaily, protocols.CCTP, protocols.CCTP)).Return(totalStartOfCurrentDay, errNil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaSinceStartOfDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaSinceStartOfDay, errNil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaLastDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaLastDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolTotalStartOfDay, "bucketInfinite", dbconsts.CctpStatsMeasurementDaily, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(totalStartOfCurrentDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolDeltaSinceStartOfDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(deltaSinceStartOfDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolDeltaLastDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(deltaLastDay, errNil) - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{}, []string{protocols.CCTP}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) + service := protocols.NewService([]string{}, []string{protocols.PortalTokenBridge}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.NotNil(t, values) assert.Equal(t, 1, len(values)) for i := range values { switch values[i].Protocol { - case "cctp": + case "portal_token_bridge": assert.Equal(t, uint64(56), values[i].TotalMessages) assert.Equal(t, 6.0, values[i].TotalValueTransferred) assert.Equal(t, uint64(7), values[i].LastDayMessages) diff --git a/api/internal/config/config.go b/api/internal/config/config.go index 1fcd00f7..ac9aecc0 100644 --- a/api/internal/config/config.go +++ b/api/internal/config/config.go @@ -69,9 +69,7 @@ type AppConfig struct { //Api Tokens Tokens string } - Protocols []string - ProtocolsStatsVersion string - ProtocolsActivityVersion string + Protocols []string } // GetLogLevel get zapcore.Level define in the configuraion. diff --git a/api/main.go b/api/main.go index 0f14683a..b19a4767 100644 --- a/api/main.go +++ b/api/main.go @@ -159,14 +159,7 @@ func main() { relaysRepo := relays.NewRepository(db.Database, rootLogger) operationsRepo := operations.NewRepository(db.Database, rootLogger) statsRepo := stats.NewRepository(influxCli, cfg.Influx.Organization, cfg.Influx.Bucket24Hours, rootLogger) - protocolsRepo := protocols.NewRepository( - protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), - cfg.Influx.BucketInfinite, - cfg.Influx.Bucket30Days, - cfg.ProtocolsStatsVersion, - cfg.ProtocolsActivityVersion, - rootLogger, - ) + protocolsRepo := protocols.NewRepository(protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), cfg.Influx.BucketInfinite, cfg.Influx.Bucket30Days, rootLogger) // create token provider tokenProvider := domain.NewTokenProvider(cfg.P2pNetwork) diff --git a/common/dbconsts/consts.go b/common/dbconsts/consts.go index 12e7fbd1..0b9acca1 100644 --- a/common/dbconsts/consts.go +++ b/common/dbconsts/consts.go @@ -2,8 +2,10 @@ package dbconsts // influx-db constants const ( - ProtocolsActivityMeasurement = "protocols_activity" - ProtocolsStatsMeasurement = "protocols_stats_v1" + ProtocolsActivityMeasurementHourly = "protocols_activity_1h" + ProtocolsActivityMeasurementDaily = "protocols_activity_1d" + ProtocolsStatsMeasurementDaily = "protocols_stats_1d" + ProtocolsStatsMeasurementHourly = "protocols_stats_1h" CctpStatsMeasurementHourly = intProtocolStatsMeasurement1h TokenBridgeStatsMeasurementHourly = intProtocolStatsMeasurement1h diff --git a/deploy/jobs/protocols-stats-daily.yaml b/deploy/jobs/protocols-stats-daily.yaml new file mode 100644 index 00000000..2e8b96c1 --- /dev/null +++ b/deploy/jobs/protocols-stats-daily.yaml @@ -0,0 +1,45 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: protocols-stats-daily + namespace: {{ .NAMESPACE }} +spec: #cronjob specs + schedule: "0 0 * * *" + jobTemplate: + spec: # job specs + template: + spec: # pod specs + containers: + - name: protocols-stats-daily + image: {{ .IMAGE_NAME }} + imagePullPolicy: Always + env: + - name: ENVIRONMENT + value: {{ .ENVIRONMENT }} + - name: LOG_LEVEL + value: {{ .LOG_LEVEL }} + - name: JOB_ID + value: JOB_PROTOCOLS_STATS_DAILY + - name: INFLUX_URL + valueFrom: + configMapKeyRef: + name: config + key: influxdb-url + - name: INFLUX_TOKEN + valueFrom: + secretKeyRef: + name: influxdb + key: token + - name: INFLUX_ORGANIZATION + valueFrom: + configMapKeyRef: + name: config + key: influxdb-organization + - name: INFLUX_BUCKET_INFINITE + valueFrom: + configMapKeyRef: + name: config + key: influxdb-bucket-infinite + - name: PROTOCOLS_JSON + value: {{ .PROTOCOLS_JSON }} + restartPolicy: OnFailure \ No newline at end of file diff --git a/deploy/jobs/protocols-stats-hourly.yaml b/deploy/jobs/protocols-stats-hourly.yaml new file mode 100644 index 00000000..edc14395 --- /dev/null +++ b/deploy/jobs/protocols-stats-hourly.yaml @@ -0,0 +1,45 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: protocols-stats-hourly + namespace: {{ .NAMESPACE }} +spec: #cronjob specs + schedule: "0 * * * *" + jobTemplate: + spec: # job specs + template: + spec: # pod specs + containers: + - name: protocols-stats-hourly + image: {{ .IMAGE_NAME }} + imagePullPolicy: Always + env: + - name: ENVIRONMENT + value: {{ .ENVIRONMENT }} + - name: LOG_LEVEL + value: {{ .LOG_LEVEL }} + - name: JOB_ID + value: JOB_PROTOCOLS_STATS_HOURLY + - name: INFLUX_URL + valueFrom: + configMapKeyRef: + name: config + key: influxdb-url + - name: INFLUX_TOKEN + valueFrom: + secretKeyRef: + name: influxdb + key: token + - name: INFLUX_ORGANIZATION + valueFrom: + configMapKeyRef: + name: config + key: influxdb-organization + - name: INFLUX_BUCKET_30_DAYS + valueFrom: + configMapKeyRef: + name: config + key: influxdb-bucket-30-days + - name: PROTOCOLS_JSON + value: {{ .PROTOCOLS_JSON }} + restartPolicy: OnFailure \ No newline at end of file diff --git a/jobs/cmd/main.go b/jobs/cmd/main.go index 52ca3e93..3721e80b 100644 --- a/jobs/cmd/main.go +++ b/jobs/cmd/main.go @@ -3,16 +3,16 @@ package main import ( "context" "encoding/json" + "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" "log" - "net/http" "os" "strings" "time" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/wormhole-foundation/wormhole-explorer/common/configuration" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/stats" "github.com/go-redis/redis" txtrackerProcessVaa "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker" @@ -83,12 +83,12 @@ func main() { migrationJob := initMigrateSourceTxJob(ctx, mCfg, chainID, logger) err = migrationJob.Run(ctx) - case jobs.JobIDProtocolsStats: - statsJob := initProtocolStatsJob(ctx, logger) + case jobs.JobIDProtocolsStatsHourly: + statsJob := initProtocolStatsHourlyJob(ctx, logger) + err = statsJob.Run(ctx) + case jobs.JobIDProtocolsStatsDaily: + statsJob := initProtocolStatsDailyJob(ctx, logger) err = statsJob.Run(ctx) - case jobs.JobIDProtocolsActivity: - activityJob := initProtocolActivityJob(ctx, logger) - err = activityJob.Run(ctx) default: logger.Fatal("Invalid job id", zap.String("job_id", cfg.JobID)) } @@ -173,7 +173,7 @@ func initMigrateSourceTxJob(ctx context.Context, cfg *config.MigrateSourceTxConf return migration.NewMigrationSourceChainTx(db.Database, cfg.PageSize, sdk.ChainID(cfg.ChainID), fromDate, toDate, txTrackerAPIClient, sleepTime, logger) } -func initProtocolStatsJob(ctx context.Context, logger *zap.Logger) *stats.ProtocolsStatsJob { +func initProtocolStatsHourlyJob(ctx context.Context, logger *zap.Logger) *protocols.StatsJob { cfgJob, errCfg := configuration.LoadFromEnv[config.ProtocolsStatsConfiguration](ctx) if errCfg != nil { log.Fatal("error creating config", errCfg) @@ -184,19 +184,28 @@ func initProtocolStatsJob(ctx context.Context, logger *zap.Logger) *stats.Protoc } dbClient := influxdb2.NewClient(cfgJob.InfluxUrl, cfgJob.InfluxToken) dbWriter := dbClient.WriteAPIBlocking(cfgJob.InfluxOrganization, cfgJob.InfluxBucket30Days) - statsFetchers := make([]stats.ClientStats, 0, len(cfgJob.Protocols)) + + protocolRepos := make([]repository.ProtocolRepository, 0, len(cfgJob.Protocols)) for _, c := range cfgJob.Protocols { - cs := stats.NewHttpRestClientStats(c.Name, - c.Url, - logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url)), - &http.Client{}, - ) - statsFetchers = append(statsFetchers, cs) + builder, ok := repository.ProtocolsRepositoryFactory[c.Name] + if !ok { + log.Fatal("error creating protocol stats client. Unknown protocol:", c.Name, errCfg) + } + cs := builder(c.Url, logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url))) + protocolRepos = append(protocolRepos, cs) } - return stats.NewProtocolsStatsJob(dbWriter, logger, cfgJob.StatsVersion, statsFetchers...) + to := time.Now().UTC().Truncate(1 * time.Hour) + from := to.Add(-1 * time.Hour) + return protocols.NewStatsJob(dbWriter, + from, + to, + dbconsts.ProtocolsActivityMeasurementHourly, + dbconsts.ProtocolsStatsMeasurementHourly, + protocolRepos, + logger) } -func initProtocolActivityJob(ctx context.Context, logger *zap.Logger) *activity.ProtocolsActivityJob { +func initProtocolStatsDailyJob(ctx context.Context, logger *zap.Logger) *protocols.StatsJob { cfgJob, errCfg := configuration.LoadFromEnv[config.ProtocolsStatsConfiguration](ctx) if errCfg != nil { log.Fatal("error creating config", errCfg) @@ -206,17 +215,26 @@ func initProtocolActivityJob(ctx context.Context, logger *zap.Logger) *activity. log.Fatal("error unmarshalling protocols config", errUnmarshal) } dbClient := influxdb2.NewClient(cfgJob.InfluxUrl, cfgJob.InfluxToken) - dbWriter := dbClient.WriteAPIBlocking(cfgJob.InfluxOrganization, cfgJob.InfluxBucket30Days) - activityFetchers := make([]activity.ClientActivity, 0, len(cfgJob.Protocols)) + dbWriter := dbClient.WriteAPIBlocking(cfgJob.InfluxOrganization, cfgJob.InfluxBucketInfinite) + + protocolRepos := make([]repository.ProtocolRepository, 0, len(cfgJob.Protocols)) for _, c := range cfgJob.Protocols { - builder, ok := activity.ActivitiesClientsFactory[c.Name] + builder, ok := repository.ProtocolsRepositoryFactory[c.Name] if !ok { - log.Fatal("error creating protocol activity fetcher. Unknown protocol:", c.Name, errCfg) + log.Fatal("error creating protocol stats client. Unknown protocol:", c.Name, errCfg) } - cs := builder(c.Name, c.Url, logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url))) - activityFetchers = append(activityFetchers, cs) + cs := builder(c.Url, logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url))) + protocolRepos = append(protocolRepos, cs) } - return activity.NewProtocolActivityJob(dbWriter, logger, cfgJob.ActivityVersion, activityFetchers...) + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + return protocols.NewStatsJob(dbWriter, + from, + to, + dbconsts.ProtocolsActivityMeasurementDaily, + dbconsts.ProtocolsStatsMeasurementDaily, + protocolRepos, + logger) } func handleExit() { diff --git a/jobs/config/config.go b/jobs/config/config.go index 9e682ea5..a09fe61b 100644 --- a/jobs/config/config.go +++ b/jobs/config/config.go @@ -53,14 +53,13 @@ type MigrateSourceTxConfiguration struct { } type ProtocolsStatsConfiguration struct { - InfluxUrl string `env:"INFLUX_URL"` - InfluxToken string `env:"INFLUX_TOKEN"` - InfluxOrganization string `env:"INFLUX_ORGANIZATION"` - InfluxBucket30Days string `env:"INFLUX_BUCKET_30_DAYS"` - StatsVersion string `env:"STATS_VERSION"` - ActivityVersion string `env:"ACTIVITY_VERSION"` - ProtocolsJson string `env:"PROTOCOLS_JSON"` - Protocols []Protocol `json:"PROTOCOLS"` + InfluxUrl string `env:"INFLUX_URL"` + InfluxToken string `env:"INFLUX_TOKEN"` + InfluxOrganization string `env:"INFLUX_ORGANIZATION"` + InfluxBucket30Days string `env:"INFLUX_BUCKET_30_DAYS"` + InfluxBucketInfinite string `env:"INFLUX_BUCKET_INFINITE"` + ProtocolsJson string `env:"PROTOCOLS_JSON"` + Protocols []Protocol `json:"PROTOCOLS"` } type Protocol struct { diff --git a/jobs/jobs/jobs.go b/jobs/jobs/jobs.go index a72a8564..e18d4978 100644 --- a/jobs/jobs/jobs.go +++ b/jobs/jobs/jobs.go @@ -5,12 +5,12 @@ import "context" // JobIDNotional is the job id for notional job. const ( - JobIDNotional = "JOB_NOTIONAL_USD" - JobIDTransferReport = "JOB_TRANSFER_REPORT" - JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES" - JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX" - JobIDProtocolsStats = "JOB_PROTOCOLS_STATS" - JobIDProtocolsActivity = "JOB_PROTOCOLS_ACTIVITY" + JobIDNotional = "JOB_NOTIONAL_USD" + JobIDTransferReport = "JOB_TRANSFER_REPORT" + JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES" + JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX" + JobIDProtocolsStatsDaily = "JOB_PROTOCOLS_STATS_DAILY" + JobIDProtocolsStatsHourly = "JOB_PROTOCOLS_STATS_HOURLY" ) // Job is the interface for jobs. diff --git a/jobs/jobs/protocols/activity/activity.go b/jobs/jobs/protocols/activity/activity.go deleted file mode 100644 index 151aa762..00000000 --- a/jobs/jobs/protocols/activity/activity.go +++ /dev/null @@ -1,82 +0,0 @@ -package activity - -import ( - "context" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/influxdata/influxdb-client-go/v2/api/write" - "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity/internal/repositories" - "go.uber.org/zap" - "strconv" - "sync" - "time" -) - -// NewProtocolActivityJob creates an instance of the job implementation. -func NewProtocolActivityJob(statsDB api.WriteAPIBlocking, logger *zap.Logger, version string, activityFetchers ...ClientActivity) *ProtocolsActivityJob { - return &ProtocolsActivityJob{ - statsDB: statsDB, - logger: logger.With(zap.String("module", "ProtocolsActivityJob")), - activityFetchers: activityFetchers, - version: version, - } -} - -func (m *ProtocolsActivityJob) Run(ctx context.Context) error { - - clientsQty := len(m.activityFetchers) - wg := sync.WaitGroup{} - wg.Add(clientsQty) - errs := make(chan error, clientsQty) - ts := time.Now().UTC().Truncate(time.Hour) // make minutes and seconds zero, so we only work with date and hour - from := time.Unix(0, 0).UTC() - m.logger.Info("running protocols activity job ", zap.Time("from", from), zap.Time("to", ts)) - for _, cs := range m.activityFetchers { - go func(c ClientActivity) { - defer wg.Done() - activity, err := c.Get(ctx, from, ts) - if err != nil { - errs <- err - return - } - errs <- m.updateActivity(ctx, c.ProtocolName(), m.version, activity, ts) - }(cs) - } - - wg.Wait() - close(errs) - for err := range errs { - if err != nil { - return err - } - } - - return nil -} - -func (m *ProtocolsActivityJob) updateActivity(ctx context.Context, protocol, version string, activity repositories.ProtocolActivity, ts time.Time) error { - - points := make([]*write.Point, 0, len(activity.Activities)) - - for i := range activity.Activities { - point := influxdb2. - NewPointWithMeasurement(dbconsts.ProtocolsActivityMeasurement). - AddTag("protocol", protocol). - AddTag("emitter_chain_id", strconv.FormatUint(activity.Activities[i].EmitterChainID, 10)). - AddTag("destination_chain_id", strconv.FormatUint(activity.Activities[i].DestinationChainID, 10)). - AddTag("version", version). - AddField("total_value_secure", activity.TotalValueSecure). - AddField("total_value_transferred", activity.TotalValueTransferred). - AddField("txs", activity.Activities[i].Txs). - AddField("total_usd", activity.Activities[i].TotalUSD). - SetTime(ts) - points = append(points, point) - } - - err := m.statsDB.WritePoint(ctx, points...) - if err != nil { - m.logger.Error("failed updating protocol Activities in influxdb", zap.Error(err), zap.String("protocol", protocol)) - } - return err -} diff --git a/jobs/jobs/protocols/activity/activity_test.go b/jobs/jobs/protocols/activity/activity_test.go deleted file mode 100644 index b9426194..00000000 --- a/jobs/jobs/protocols/activity/activity_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package activity_test - -import ( - "context" - "errors" - "github.com/stretchr/testify/assert" - "github.com/test-go/testify/mock" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity/internal/repositories" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" - "go.uber.org/zap" - "testing" - "time" -) - -func Test_ProtocolsActivityJob_Succeed(t *testing.T) { - var mockErr error - activityFetcher := &mockActivityFetch{} - act := repositories.ProtocolActivity{ - Activities: []repositories.Activity{ - { - EmitterChainID: 1, - DestinationChainID: 2, - Txs: 150, - TotalUSD: 250000, - }, - }, - } - - activityFetcher.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(act, mockErr) - activityFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) - - job := activity.NewProtocolActivityJob(mockWriterDB, zap.NewNop(), "v1", activityFetcher) - resultErr := job.Run(context.Background()) - assert.Nil(t, resultErr) -} - -func Test_ProtocolsActivityJob_FailFetching(t *testing.T) { - var mockErr error - activityFetcher := &mockActivityFetch{} - activityFetcher.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(repositories.ProtocolActivity{}, errors.New("mocked_error_fetch")) - activityFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) - - job := activity.NewProtocolActivityJob(mockWriterDB, zap.NewNop(), "v1", activityFetcher) - resultErr := job.Run(context.Background()) - assert.NotNil(t, resultErr) - assert.Equal(t, "mocked_error_fetch", resultErr.Error()) -} - -func Test_ProtocolsActivityJob_FailedUpdatingDB(t *testing.T) { - var mockErr error - activityFetcher := &mockActivityFetch{} - activityFetcher.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(repositories.ProtocolActivity{}, mockErr) - activityFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(errors.New("mocked_error_update_db")) - - job := activity.NewProtocolActivityJob(mockWriterDB, zap.NewNop(), "v1", activityFetcher) - resultErr := job.Run(context.Background()) - assert.NotNil(t, resultErr) - assert.Equal(t, "mocked_error_update_db", resultErr.Error()) -} - -type mockActivityFetch struct { - mock.Mock -} - -func (m *mockActivityFetch) Get(ctx context.Context, from, to time.Time) (repositories.ProtocolActivity, error) { - args := m.Called(ctx, from, to) - return args.Get(0).(repositories.ProtocolActivity), args.Error(1) -} - -func (m *mockActivityFetch) ProtocolName() string { - args := m.Called() - return args.String(0) -} diff --git a/jobs/jobs/protocols/activity/internal/repositories/allbridge.go b/jobs/jobs/protocols/activity/internal/repositories/allbridge.go deleted file mode 100644 index 2b9174e9..00000000 --- a/jobs/jobs/protocols/activity/internal/repositories/allbridge.go +++ /dev/null @@ -1,141 +0,0 @@ -package repositories - -import ( - "context" - "encoding/json" - "github.com/google/uuid" - "github.com/pkg/errors" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" - "go.uber.org/zap" - "io" - "net/http" - "strconv" - "time" -) - -func NewAllBridgeRestClient(name, url string, logger *zap.Logger, httpClient commons.HttpDo) *AllBridgeRestClient { - return &AllBridgeRestClient{ - name: name, - url: url, - logger: logger, - client: httpClient, - } -} - -type AllBridgeRestClient struct { - name string - url string - client commons.HttpDo - logger *zap.Logger -} - -func (d *AllBridgeRestClient) ProtocolName() string { - return d.name -} - -func (d *AllBridgeRestClient) Get(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { - decoratedLogger := d.logger - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) - if err != nil { - decoratedLogger.Error("failed creating http request for retrieving protocol Activities", - zap.Error(err), - ) - return ProtocolActivity{}, errors.WithStack(err) - } - q := req.URL.Query() - q.Set("from", from.Format(time.RFC3339)) - q.Set("to", to.Format(time.RFC3339)) - req.URL.RawQuery = q.Encode() - - reqId := uuid.New().String() - req.Header.Set("X-Request-ID", reqId) - decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) - - resp, err := d.client.Do(req) - if err != nil { - decoratedLogger.Error("failed retrieving protocol Activities", - zap.Error(err), - ) - return ProtocolActivity{}, errors.WithStack(err) - } - defer resp.Body.Close() - - decoratedLogger = decoratedLogger. - With(zap.String("status_code", http.StatusText(resp.StatusCode))). - With(zap.String("response_headers", commons.ToJson(resp.Header))) - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - decoratedLogger.Error("error retrieving protocol Activities: got an invalid response status code", - zap.String("response_body", string(body)), - ) - return ProtocolActivity{}, errors.Errorf("failed retrieving protocol Activities from url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - decoratedLogger.Error("failed reading response body", zap.Error(err)) - return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol Activities. url:%s - status_code:%d", d.url, resp.StatusCode) - } - - var temp allBridgeActivity - err = json.Unmarshal(body, &temp) - if err != nil { - decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) - return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol Activities. url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) - } - - return temp.toProtocolActivity() -} - -type allBridgeActivity struct { - TotalValueSecured string `json:"total_value_secure"` - TotalValueTransferred string `json:"total_value_transferred"` - Activities []struct { - EmitterChainID uint64 `json:"emitter_chain_id"` - DestinationChainID uint64 `json:"destination_chain_id"` - Txs string `json:"txs"` - TotalUSD string `json:"total_usd"` - } `json:"activity"` -} - -func (m *allBridgeActivity) toProtocolActivity() (ProtocolActivity, error) { - result := ProtocolActivity{} - - totalValueSecured, err := strconv.ParseFloat(m.TotalValueSecured, 64) - if err != nil { - return result, errors.Wrap(err, "failed parsing string TotalValueSecure to float64") - } - result.TotalValueSecure = totalValueSecured - - totalValueTransferred, err := strconv.ParseFloat(m.TotalValueTransferred, 64) - if err != nil { - return result, errors.Wrap(err, "failed parsing string TotalValueTransferred to float64") - } - result.TotalValueTransferred = totalValueTransferred - - for i := range m.Activities { - - act := m.Activities[i] - txs, errTxs := strconv.ParseUint(act.Txs, 10, 64) - if errTxs != nil { - return result, errors.Wrap(errTxs, "failed parsing string txs to uint64") - } - - totalUSD, errTotalUSD := strconv.ParseFloat(act.TotalUSD, 64) - if errTotalUSD != nil { - return result, errors.Wrap(errTxs, "failed parsing string total_usd to float64") - } - - a := Activity{ - EmitterChainID: m.Activities[i].EmitterChainID, - DestinationChainID: m.Activities[i].DestinationChainID, - Txs: txs, - TotalUSD: totalUSD, - } - result.Activities = append(result.Activities, a) - } - - return result, nil -} diff --git a/jobs/jobs/protocols/activity/internal/repositories/repository.go b/jobs/jobs/protocols/activity/internal/repositories/repository.go deleted file mode 100644 index d96e2024..00000000 --- a/jobs/jobs/protocols/activity/internal/repositories/repository.go +++ /dev/null @@ -1,26 +0,0 @@ -package repositories - -import ( - "context" - "time" -) - -type ProtocolActivityRepository interface { - Get(ctx context.Context, from, to time.Time) (ProtocolActivity, error) - ProtocolName() string -} - -type ProtocolActivity struct { - TotalValueSecure float64 `json:"total_value_secure"` - TotalValueTransferred float64 `json:"total_value_transferred"` - Volume float64 `json:"volume"` - TotalMessages uint64 `json:"total_messages"` - Activities []Activity `json:"activity"` -} - -type Activity struct { - EmitterChainID uint64 `json:"emitter_chain_id"` - DestinationChainID uint64 `json:"destination_chain_id"` - Txs uint64 `json:"txs"` - TotalUSD float64 `json:"total_usd"` -} diff --git a/jobs/jobs/protocols/activity/types.go b/jobs/jobs/protocols/activity/types.go deleted file mode 100644 index 9cafa6a5..00000000 --- a/jobs/jobs/protocols/activity/types.go +++ /dev/null @@ -1,41 +0,0 @@ -package activity - -import ( - "context" - "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity/internal/repositories" - "go.uber.org/zap" - "net/http" - "time" -) - -// Protocols -const ( - MayanProtocol = "mayan" - AllBridgeProtocol = "allbridge" -) - -type ProtocolsActivityJob struct { - statsDB api.WriteAPIBlocking - logger *zap.Logger - activityFetchers []ClientActivity - version string -} - -// ClientActivity Abstraction for fetching protocol Activity since each client may have different implementation details. -type ClientActivity interface { - Get(ctx context.Context, from, to time.Time) (repositories.ProtocolActivity, error) - ProtocolName() string -} - -// ActivitiesClientsFactory RestClient Factory to create the right client for each protocol. -var ActivitiesClientsFactory = map[string]func(name, url string, logger *zap.Logger) ClientActivity{ - - MayanProtocol: func(name, url string, logger *zap.Logger) ClientActivity { - return repositories.NewMayanRestClient(name, url, logger, &http.Client{}) - }, - - AllBridgeProtocol: func(name, url string, logger *zap.Logger) ClientActivity { - return repositories.NewAllBridgeRestClient(name, url, logger, &http.Client{}) - }, -} diff --git a/jobs/jobs/protocols/internal/commons/common.go b/jobs/jobs/protocols/internal/commons/common.go index e8178a6b..83425036 100644 --- a/jobs/jobs/protocols/internal/commons/common.go +++ b/jobs/jobs/protocols/internal/commons/common.go @@ -5,6 +5,11 @@ import ( "net/http" ) +const ( + MayanProtocol = "mayan" + AllBridgeProtocol = "allbridge" +) + func ToJson(headers http.Header) string { bytes, _ := json.Marshal(headers) return string(bytes) diff --git a/jobs/jobs/protocols/repository/allbridge.go b/jobs/jobs/protocols/repository/allbridge.go new file mode 100644 index 00000000..fb62e5b5 --- /dev/null +++ b/jobs/jobs/protocols/repository/allbridge.go @@ -0,0 +1,234 @@ +package repository + +import ( + "context" + "encoding/json" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" + "go.uber.org/zap" + "io" + "math" + "net/http" + "strconv" + "time" +) + +func NewAllBridgeRestClient(baseURL string, logger *zap.Logger, httpClient commons.HttpDo) *AllBridgeRestClient { + return &AllBridgeRestClient{ + baseURL: baseURL, + logger: logger, + client: httpClient, + } +} + +type AllBridgeRestClient struct { + baseURL string + client commons.HttpDo + logger *zap.Logger +} + +type allBridgeActivity struct { + TotalValueSecured string `json:"total_value_secure"` + TotalValueTransferred string `json:"total_value_transferred"` + Activities []struct { + EmitterChainID uint64 `json:"emitter_chain_id"` + DestinationChainID uint64 `json:"destination_chain_id"` + Txs string `json:"txs"` + TotalUSD string `json:"total_usd"` + } `json:"activity"` +} + +func (d *AllBridgeRestClient) ProtocolName() string { + return commons.AllBridgeProtocol +} + +func (d *AllBridgeRestClient) GetActivity(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { + decoratedLogger := d.logger + + url := d.baseURL + "/wormhole/activity" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + decoratedLogger.Error("failed creating http request for retrieving protocol Activities", + zap.Error(err), + ) + return ProtocolActivity{}, errors.WithStack(err) + } + q := req.URL.Query() + q.Set("from", from.Format(time.RFC3339)) + q.Set("to", to.Format(time.RFC3339)) + req.URL.RawQuery = q.Encode() + + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) + decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) + + resp, err := d.client.Do(req) + if err != nil { + decoratedLogger.Error("failed retrieving protocol Activities", + zap.Error(err), + ) + return ProtocolActivity{}, errors.WithStack(err) + } + defer resp.Body.Close() + + decoratedLogger = decoratedLogger. + With(zap.String("status_code", http.StatusText(resp.StatusCode))). + With(zap.String("response_headers", commons.ToJson(resp.Header))) + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + decoratedLogger.Error("error retrieving protocol Activities: got an invalid response status code", + zap.String("response_body", string(body)), zap.Int("status_code", resp.StatusCode), + ) + return ProtocolActivity{}, errors.Errorf("failed retrieving protocol Activities from baseURL:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err)) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol Activities. baseURL:%s - status_code:%d", d.baseURL, resp.StatusCode) + } + + var temp allBridgeActivity + err = json.Unmarshal(body, &temp) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol Activities. baseURL:%s - status_code:%d - response_body:%s", d.baseURL, resp.StatusCode, string(body)) + } + + return temp.toProtocolActivity() +} + +func (d *AllBridgeRestClient) GetStats(ctx context.Context) (Stats, error) { + + decoratedLogger := d.logger + + url := d.baseURL + "/wormhole/stats" + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + decoratedLogger.Error("failed creating http request for retrieving protocol stats", zap.Error(err)) + return Stats{}, errors.WithStack(err) + } + + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) + decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) + + resp, err := d.client.Do(req) + if err != nil { + decoratedLogger.Error("failed retrieving protocol stats", zap.Error(err)) + return Stats{}, errors.WithStack(err) + } + defer resp.Body.Close() + + decoratedLogger = decoratedLogger. + With(zap.String("status_code", http.StatusText(resp.StatusCode))). + With(zap.String("response_headers", commons.ToJson(resp.Header))) + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + decoratedLogger.Error("error retrieving protocol stats: got an invalid response status code", zap.String("response_body", string(body)), zap.Int("status_code", resp.StatusCode)) + return Stats{}, errors.Errorf("failed retrieving protocol stats from baseURL:%s - status_code:%d - response_body:%s", d.baseURL, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err)) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol stats. baseURL:%s - status_code:%d", d.baseURL, resp.StatusCode) + } + + var allbridgeStats allBridgeStatsResponseDTO + err = json.Unmarshal(body, &allbridgeStats) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol stats. baseURL:%s - status_code:%d - response_body:%s", d.baseURL, resp.StatusCode, string(body)) + } + + return d.toStats(allbridgeStats) +} + +func (m *allBridgeActivity) toProtocolActivity() (ProtocolActivity, error) { + result := ProtocolActivity{} + + totalValueSecured, err := strconv.ParseFloat(m.TotalValueSecured, 64) + if err != nil || math.IsNaN(totalValueSecured) { + return result, errors.Wrap(err, "failed parsing string TotalValueSecure to float64") + } + result.TotalValueSecure = totalValueSecured + + totalValueTransferred, err := strconv.ParseFloat(m.TotalValueTransferred, 64) + if err != nil || math.IsNaN(totalValueTransferred) { + return result, errors.Wrap(err, "failed parsing string TotalValueTransferred to float64") + } + result.TotalValueTransferred = totalValueTransferred + + for i := range m.Activities { + + act := m.Activities[i] + txs, errTxs := strconv.ParseUint(act.Txs, 10, 64) + if errTxs != nil { + return result, errors.Wrap(errTxs, "failed parsing string txs to uint64") + } + + totalUSD, errTotalUSD := strconv.ParseFloat(act.TotalUSD, 64) + if errTotalUSD != nil { + return result, errors.Wrap(errTxs, "failed parsing string total_usd to float64") + } + + a := Activity{ + EmitterChainID: m.Activities[i].EmitterChainID, + DestinationChainID: m.Activities[i].DestinationChainID, + Txs: txs, + TotalUSD: totalUSD, + } + result.Activities = append(result.Activities, a) + } + + return result, nil +} + +type allBridgeStatsResponseDTO struct { + TotalValueLocked string `json:"total_value_locked"` + TotalMessages string `json:"total_messages"` + Volume string `json:"volume"` +} + +func (d *AllBridgeRestClient) toStats(t allBridgeStatsResponseDTO) (Stats, error) { + + convertAndLoad := func(val string, target *float64) error { + if len(val) == 0 { + *target = 0 + return nil + } + floatVal, err := strconv.ParseFloat(val, 64) + if err != nil { + d.logger.Error("failed converting value", zap.Error(err), zap.String("value", val)) + return err + } + *target = floatVal + return nil + } + + var stats Stats + + err := convertAndLoad(t.TotalValueLocked, &stats.TotalValueLocked) + if err != nil { + return stats, err + } + + err = convertAndLoad(t.Volume, &stats.Volume) + if err != nil { + return stats, err + } + + totalMsg, err := strconv.ParseUint(t.TotalMessages, 10, 64) + if err != nil { + return stats, err + } + stats.TotalMessages = totalMsg + + return stats, nil + +} diff --git a/jobs/jobs/protocols/activity/internal/repositories/allbridge_test.go b/jobs/jobs/protocols/repository/allbridge_test.go similarity index 66% rename from jobs/jobs/protocols/activity/internal/repositories/allbridge_test.go rename to jobs/jobs/protocols/repository/allbridge_test.go index 4f650805..3e1e434e 100644 --- a/jobs/jobs/protocols/activity/internal/repositories/allbridge_test.go +++ b/jobs/jobs/protocols/repository/allbridge_test.go @@ -1,4 +1,4 @@ -package repositories +package repository_test import ( "bytes" @@ -6,6 +6,7 @@ import ( "errors" "github.com/stretchr/testify/assert" "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" "go.uber.org/zap" "io" "net/http" @@ -15,77 +16,77 @@ import ( func Test_AllbridgeRestClientActivity_FailRequestCreation(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return nil, nil })) - _, err := a.Get(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error + _, err := a.GetActivity(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error assert.NotNil(t, err) } func Test_AllbridgeRestClientActivity_FailedRequestExecution(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return nil, errors.New("mocked_http_client_do") })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) assert.Equal(t, "mocked_http_client_do", err.Error()) } func Test_AllbridgeRestClientActivity_Status500(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusInternalServerError, Body: io.NopCloser(bytes.NewBufferString("response_body_test")), }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed retrieving protocol Activities from url:localhost - status_code:500 - response_body:response_body_test", err.Error()) + assert.Equal(t, "failed retrieving protocol Activities from baseURL:localhost/wormhole/activity - status_code:500 - response_body:response_body_test", err.Error()) } func Test_AllbridgeRestClientActivity_Status200_FailedReadBody(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: &mocks.MockFailReadCloser{}, }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed reading response body from protocol Activities. url:localhost - status_code:200: mocked_fail_read", err.Error()) + assert.Equal(t, "failed reading response body from protocol Activities. baseURL:localhost - status_code:200: mocked_fail_read", err.Error()) } func Test_AllbridgeRestClientActivity_Status200_FailedParsing(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewBufferString("this should be a json")), }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed unmarshalling response body from protocol Activities. url:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) + assert.Equal(t, "failed unmarshalling response body from protocol Activities. baseURL:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) } func Test_AllbridgeRestClientActivity_Status200_Succeed(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewBufferString("{\"activity\":[{\"emitter_chain_id\":5,\"destination_chain_id\":1,\"txs\":\"1827\",\"total_usd\":\"445743.185719500000\"}],\"total_value_secure\":\"0\",\"total_value_transferred\":\"5734947.136079277\"}")), }, nil })) - resp, err := a.Get(context.Background(), time.Now(), time.Now()) + resp, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.Nil(t, err) assert.Equal(t, float64(0), resp.TotalValueSecure) assert.Equal(t, 5734947.136079277, resp.TotalValueTransferred) diff --git a/jobs/jobs/protocols/activity/internal/repositories/mayan.go b/jobs/jobs/protocols/repository/mayan.go similarity index 52% rename from jobs/jobs/protocols/activity/internal/repositories/mayan.go rename to jobs/jobs/protocols/repository/mayan.go index 74f8032a..3a547acb 100644 --- a/jobs/jobs/protocols/activity/internal/repositories/mayan.go +++ b/jobs/jobs/protocols/repository/mayan.go @@ -1,4 +1,4 @@ -package repositories +package repository import ( "context" @@ -13,32 +13,27 @@ import ( "time" ) -func NewMayanRestClient(name, url string, logger *zap.Logger, httpClient commons.HttpDo) *MayanRestClient { +func NewMayanRestClient(baseURL string, logger *zap.Logger, httpClient commons.HttpDo) *MayanRestClient { return &MayanRestClient{ - name: name, - url: url, - logger: logger, - client: httpClient, + baseURL: baseURL, + logger: logger, + client: httpClient, } } type MayanRestClient struct { - name string - url string - client commons.HttpDo - logger *zap.Logger + baseURL string + client commons.HttpDo + logger *zap.Logger } -func (d *MayanRestClient) ProtocolName() string { - return d.name -} - -func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { +func (d *MayanRestClient) GetActivity(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { decoratedLogger := d.logger - req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) + url := d.baseURL + "/v3/stats/wh/activity" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { - decoratedLogger.Error("failed creating http request for retrieving protocol Activities", + decoratedLogger.Error("failed creating http request for retrieving protocol activities", zap.Error(err), ) return ProtocolActivity{}, errors.WithStack(err) @@ -54,7 +49,7 @@ func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (Protocol resp, err := d.client.Do(req) if err != nil { - decoratedLogger.Error("failed retrieving protocol Activities", + decoratedLogger.Error("failed retrieving protocol activities", zap.Error(err), ) return ProtocolActivity{}, errors.WithStack(err) @@ -67,16 +62,16 @@ func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (Protocol if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - decoratedLogger.Error("error retrieving protocol Activities: got an invalid response status code", + decoratedLogger.Error("error retrieving protocol activities: got an invalid response status code", zap.String("response_body", string(body)), ) - return ProtocolActivity{}, errors.Errorf("failed retrieving protocol Activities from url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + return ProtocolActivity{}, errors.Errorf("failed retrieving protocol activities from url:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) } body, err := io.ReadAll(resp.Body) if err != nil { decoratedLogger.Error("failed reading response body", zap.Error(err)) - return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol Activities. url:%s - status_code:%d", d.url, resp.StatusCode) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol activities. url:%s - status_code:%d", url, resp.StatusCode) } type mayanActivity struct { @@ -93,7 +88,7 @@ func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (Protocol err = json.Unmarshal(body, &mayanResp) if err != nil { decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) - return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol Activities. url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol activities. url:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) } result := ProtocolActivity{ @@ -126,3 +121,52 @@ func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (Protocol return result, nil } + +func (d *MayanRestClient) GetStats(ctx context.Context) (Stats, error) { + decoratedLogger := d.logger + url := d.baseURL + "/v3/stats/wh/stats" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + decoratedLogger.Error("failed creating http request for retrieving protocol stats", zap.Error(err)) + return Stats{}, errors.WithStack(err) + } + + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) + decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) + + resp, err := d.client.Do(req) + if err != nil { + decoratedLogger.Error("failed retrieving protocol stats", zap.Error(err)) + return Stats{}, errors.WithStack(err) + } + defer resp.Body.Close() + + decoratedLogger = decoratedLogger. + With(zap.String("status_code", http.StatusText(resp.StatusCode))). + With(zap.String("response_headers", commons.ToJson(resp.Header))) + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + decoratedLogger.Error("error retrieving client stats: got an invalid response status code", zap.String("response_body", string(body))) + return Stats{}, errors.Errorf("failed retrieving protocol stats from url:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err)) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol stats. url:%s - status_code:%d", url, resp.StatusCode) + } + var stats Stats + err = json.Unmarshal(body, &stats) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol stats. url:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) + } + + return stats, nil +} + +func (d *MayanRestClient) ProtocolName() string { + return commons.MayanProtocol +} diff --git a/jobs/jobs/protocols/activity/internal/repositories/mayan_test.go b/jobs/jobs/protocols/repository/mayan_test.go similarity index 65% rename from jobs/jobs/protocols/activity/internal/repositories/mayan_test.go rename to jobs/jobs/protocols/repository/mayan_test.go index 2c4be316..ed54c165 100644 --- a/jobs/jobs/protocols/activity/internal/repositories/mayan_test.go +++ b/jobs/jobs/protocols/repository/mayan_test.go @@ -1,4 +1,4 @@ -package repositories +package repository_test import ( "bytes" @@ -6,6 +6,7 @@ import ( "errors" "github.com/stretchr/testify/assert" "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" "go.uber.org/zap" "io" "net/http" @@ -15,77 +16,77 @@ import ( func Test_HttpRestClientActivity_FailRequestCreation(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return nil, nil })) - _, err := a.Get(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error + _, err := a.GetActivity(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error assert.NotNil(t, err) } func Test_HttpRestClientActivity_FailedRequestExecution(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return nil, errors.New("mocked_http_client_do") })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) assert.Equal(t, "mocked_http_client_do", err.Error()) } func Test_HttpRestClientActivity_Status500(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusInternalServerError, Body: io.NopCloser(bytes.NewBufferString("response_body_test")), }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed retrieving protocol Activities from url:localhost - status_code:500 - response_body:response_body_test", err.Error()) + assert.Equal(t, "failed retrieving protocol activities from url:localhost/v3/stats/wh/activity - status_code:500 - response_body:response_body_test", err.Error()) } func Test_HttpRestClientActivity_Status200_FailedReadBody(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: &mocks.MockFailReadCloser{}, }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed reading response body from protocol Activities. url:localhost - status_code:200: mocked_fail_read", err.Error()) + assert.Equal(t, "failed reading response body from protocol activities. url:localhost/v3/stats/wh/activity - status_code:200: mocked_fail_read", err.Error()) } func Test_HttpRestClientActivity_Status200_FailedParsing(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewBufferString("this should be a json")), }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed unmarshalling response body from protocol Activities. url:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) + assert.Equal(t, "failed unmarshalling response body from protocol activities. url:localhost/v3/stats/wh/activity - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) } func Test_HttpRestClientActivity_Status200_Succeed(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewBufferString("{\"total_value_secure\":1640898.7106282723,\"total_value_transferred\":2600395.040031102,\"total_messages\":2225,\"activity\":[{\"emmiter_chain_id\":\"1\",\"destination_chain_id\":\"2\",\"txs\":88,\"total_usd\":648500.9762709612}],\"volume\":2761848.9678057004}")), }, nil })) - resp, err := a.Get(context.Background(), time.Now(), time.Now()) + resp, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.Nil(t, err) assert.Equal(t, 1640898.7106282723, resp.TotalValueSecure) assert.Equal(t, 2600395.040031102, resp.TotalValueTransferred) diff --git a/jobs/jobs/protocols/repository/types.go b/jobs/jobs/protocols/repository/types.go new file mode 100644 index 00000000..49e23621 --- /dev/null +++ b/jobs/jobs/protocols/repository/types.go @@ -0,0 +1,48 @@ +package repository + +import ( + "context" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" + "go.uber.org/zap" + "net/http" + "time" +) + +type ProtocolRepository interface { + GetActivity(ctx context.Context, from, to time.Time) (ProtocolActivity, error) + GetStats(ctx context.Context) (Stats, error) + ProtocolName() string +} + +type ProtocolActivity struct { + TotalValueSecure float64 `json:"total_value_secure"` + TotalValueTransferred float64 `json:"total_value_transferred"` + Volume float64 `json:"volume"` + TotalMessages uint64 `json:"total_messages"` + Activities []Activity `json:"activity"` +} + +type Stats struct { + TotalValueLocked float64 `json:"total_value_locked"` + TotalMessages uint64 `json:"total_messages"` + Volume float64 `json:"volume"` +} + +type Activity struct { + EmitterChainID uint64 `json:"emitter_chain_id"` + DestinationChainID uint64 `json:"destination_chain_id"` + Txs uint64 `json:"txs"` + TotalUSD float64 `json:"total_usd"` +} + +// ProtocolsRepositoryFactory RestClient Factory to create the right client for each protocol. +var ProtocolsRepositoryFactory = map[string]func(url string, logger *zap.Logger) ProtocolRepository{ + + commons.MayanProtocol: func(baseURL string, logger *zap.Logger) ProtocolRepository { + return NewMayanRestClient(baseURL, logger, &http.Client{}) + }, + + commons.AllBridgeProtocol: func(baseURL string, logger *zap.Logger) ProtocolRepository { + return NewAllBridgeRestClient(baseURL, logger, &http.Client{}) + }, +} diff --git a/jobs/jobs/protocols/stats.go b/jobs/jobs/protocols/stats.go new file mode 100644 index 00000000..7bffdb98 --- /dev/null +++ b/jobs/jobs/protocols/stats.go @@ -0,0 +1,136 @@ +package protocols + +import ( + "context" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" + "go.uber.org/zap" + "sync" + "time" +) + +type StatsJob struct { + writerDB api.WriteAPIBlocking + logger *zap.Logger + repositories []repository.ProtocolRepository + from time.Time + to time.Time + destinationMeasurement string + statsMeasurement string +} + +// NewStatsJob creates an instance of the job implementation. +func NewStatsJob(writerDB api.WriteAPIBlocking, from, to time.Time, activityMeasurement, statsMeasurement string, repositories []repository.ProtocolRepository, logger *zap.Logger) *StatsJob { + return &StatsJob{ + writerDB: writerDB, + logger: logger.With(zap.String("module", "ProtocolsStatsJob")), + repositories: repositories, + from: from, + to: to, + destinationMeasurement: activityMeasurement, + statsMeasurement: statsMeasurement, + } +} + +func (s *StatsJob) Run(ctx context.Context) error { + + wg := sync.WaitGroup{} + wg.Add(len(s.repositories)) + + s.logger.Info("running protocols stats job", zap.Time("from", s.from), zap.Time("to", s.to)) + + for _, repo := range s.repositories { + go s.processProtocol(ctx, repo, &wg) + } + wg.Wait() + return nil +} + +func (s *StatsJob) processProtocol(ctx context.Context, protocolRepo repository.ProtocolRepository, wg *sync.WaitGroup) { + defer wg.Done() + + var stats repository.Stats + var errStats error + wgStats := sync.WaitGroup{} + wgStats.Add(1) + go func() { + defer wgStats.Done() + stats, errStats = protocolRepo.GetStats(ctx) + }() + + activity, errAct := protocolRepo.GetActivity(ctx, s.from, s.to) + if errAct != nil { + s.logger.Error("failed to get protocol activity", zap.Error(errAct), zap.String("protocol", protocolRepo.ProtocolName()), zap.Time("from", s.from), zap.Time("to", s.to)) + return + } + + wgStats.Wait() + if errStats != nil { + s.logger.Error("failed to get protocol stats", zap.Error(errStats), zap.String("protocol", protocolRepo.ProtocolName())) + return + } + + data := protocolData{ + Stats: stats, + Activity: activity, + } + + errAct = s.updateActivity(ctx, protocolRepo.ProtocolName(), data.Activity, s.from) + if errAct != nil { + s.logger.Error("failed updating protocol activities in influxdb", zap.Error(errAct), zap.String("protocol", protocolRepo.ProtocolName())) + } + + errStats = s.updateStats(ctx, protocolRepo.ProtocolName(), data.Stats, s.to) + if errStats != nil { + s.logger.Error("failed updating protocol stats in influxdb", zap.Error(errStats), zap.String("protocol", protocolRepo.ProtocolName())) + } + +} + +type protocolData struct { + Stats repository.Stats + Activity repository.ProtocolActivity +} + +func (s *StatsJob) updateActivity(ctx context.Context, protocol string, data repository.ProtocolActivity, ts time.Time) error { + + txs := uint64(0) + totalUsd := float64(0) + for _, act := range data.Activities { + txs += act.Txs + totalUsd += act.TotalUSD + } + + point := influxdb2.NewPointWithMeasurement(s.destinationMeasurement). + AddTag("protocol", protocol). + AddField("total_value_secure", data.TotalValueSecure). + AddField("total_value_transferred", data.TotalValueTransferred). + AddField("volume", data.Volume). + AddField("txs", txs). + AddField("total_usd", totalUsd). + SetTime(ts) + + err := s.writerDB.WritePoint(ctx, point) + if err != nil { + s.logger.Error("failed updating protocol Activities in influxdb", zap.Error(err), zap.String("protocol", protocol)) + } + return err +} + +func (s *StatsJob) updateStats(ctx context.Context, protocol string, data repository.Stats, ts time.Time) error { + + point := influxdb2. + NewPointWithMeasurement(s.statsMeasurement). + AddTag("protocol", protocol). + AddField("total_messages", data.TotalMessages). + AddField("total_value_locked", data.TotalValueLocked). + AddField("volume", data.Volume). + SetTime(ts) + + err := s.writerDB.WritePoint(ctx, point) + if err != nil { + s.logger.Error("failed updating protocol stats in influxdb", zap.Error(err), zap.String("protocol", protocol)) + } + return err +} diff --git a/jobs/jobs/protocols/stats/stats.go b/jobs/jobs/protocols/stats/stats.go deleted file mode 100644 index d29e1a46..00000000 --- a/jobs/jobs/protocols/stats/stats.go +++ /dev/null @@ -1,227 +0,0 @@ -package stats - -import ( - "context" - "encoding/json" - "github.com/google/uuid" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/influxdata/influxdb-client-go/v2/api/write" - "github.com/pkg/errors" - "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" - "go.uber.org/zap" - "io" - "net/http" - "strconv" - "sync" - "time" -) - -type ProtocolsStatsJob struct { - statsDB api.WriteAPIBlocking - logger *zap.Logger - statsClientsFetchers []ClientStats - version string -} - -// ClientStats Abstraction for fetching stats since each protocol may have different implementation details. -type ClientStats interface { - Get(ctx context.Context) (Stats, error) - ProtocolName() string -} - -type protocolStats struct { - Stats - Name string -} - -type Stats struct { - TotalValueLocked float64 - TotalMessages uint64 -} - -// NewProtocolsStatsJob creates an instance of the job implementation. -func NewProtocolsStatsJob(statsDB api.WriteAPIBlocking, logger *zap.Logger, version string, statsFetchers ...ClientStats) *ProtocolsStatsJob { - return &ProtocolsStatsJob{ - statsDB: statsDB, - logger: logger.With(zap.String("module", "ProtocolsStatsJob")), - statsClientsFetchers: statsFetchers, - version: version, - } -} - -func (s *ProtocolsStatsJob) Run(ctx context.Context) error { - - clientsQty := len(s.statsClientsFetchers) - wg := sync.WaitGroup{} - wg.Add(clientsQty) - stats := make(chan protocolStats, clientsQty) - var anyError error - - for _, cs := range s.statsClientsFetchers { - go func(c ClientStats) { - defer wg.Done() - st, err := c.Get(ctx) - if err != nil { - anyError = err - return - } - stats <- protocolStats{st, c.ProtocolName()} - }(cs) - } - - wg.Wait() - close(stats) - - err := s.updateStats(ctx, stats) - if err != nil { - anyError = err - } - - return anyError -} - -func (s *ProtocolsStatsJob) updateStats(ctx context.Context, stats <-chan protocolStats) error { - - ts := time.Now().UTC().Truncate(time.Hour) // make minutes and seconds zero, so we only work with date and hour - points := make([]*write.Point, 0, len(stats)) - - for st := range stats { - point := influxdb2. - NewPointWithMeasurement(dbconsts.ProtocolsStatsMeasurement). - AddTag("protocol", st.Name). - AddTag("version", s.version). - AddField("total_messages", st.TotalMessages). - AddField("total_value_locked", st.TotalValueLocked). - SetTime(ts) - - points = append(points, point) - } - - err := s.statsDB.WritePoint(ctx, points...) - if err != nil { - s.logger.Error("failed updating protocol stats in influxdb", zap.Error(err)) - } - return err -} - -// Default implementation of ClientStats interface. Encapsulate the url and http.client for calling a specific external service to retrieve stats -type httpRestClientStats struct { - name string - url string - client httpDo - logger *zap.Logger -} - -type httpDo interface { - Do(req *http.Request) (*http.Response, error) -} - -func NewHttpRestClientStats(name, url string, logger *zap.Logger, httpClient httpDo) ClientStats { - return &httpRestClientStats{ - name: name, - url: url, - logger: logger, - client: httpClient, - } -} - -func (d *httpRestClientStats) ProtocolName() string { - return d.name -} - -func (d *httpRestClientStats) Get(ctx context.Context) (Stats, error) { - - decoratedLogger := d.logger - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) - if err != nil { - decoratedLogger.Error("failed creating http request for retrieving client stats", - zap.Error(err), - ) - return Stats{}, errors.WithStack(err) - } - - reqId := uuid.New().String() - req.Header.Set("X-Request-ID", reqId) - decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) - - resp, err := d.client.Do(req) - if err != nil { - decoratedLogger.Error("failed retrieving client stats", - zap.Error(err), - ) - return Stats{}, errors.WithStack(err) - } - defer resp.Body.Close() - - decoratedLogger = decoratedLogger. - With(zap.String("status_code", http.StatusText(resp.StatusCode))). - With(zap.String("response_headers", toJson(resp.Header))) - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - decoratedLogger.Error("error retrieving client stats: got an invalid response status code", - zap.String("response_body", string(body)), - ) - return Stats{}, errors.Errorf("failed retrieving client stats from url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - decoratedLogger.Error("failed reading response body", zap.Error(err)) - return Stats{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from client stats. url:%s - status_code:%d", d.url, resp.StatusCode) - } - var stats Stats - err = json.Unmarshal(body, &stats) - if err != nil { - decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) - return Stats{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from client stats. url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) - } - - return stats, nil - -} - -func toJson(headers http.Header) string { - bytes, _ := json.Marshal(headers) - return string(bytes) -} - -func (rd *Stats) UnmarshalJSON(data []byte) error { - - temp := struct { - TotalValueLocked json.RawMessage `json:"total_value_locked"` - TotalMessages json.RawMessage `json:"total_messages"` - }{} - - if err := json.Unmarshal(data, &temp); err != nil { - return err - } - - if err := parseJSONNumber(temp.TotalValueLocked, &rd.TotalValueLocked); err != nil { - return err - } - - var totalMsg float64 - if err := parseJSONNumber(temp.TotalMessages, &totalMsg); err != nil { - return err - } - - rd.TotalMessages = uint64(totalMsg) - return nil -} - -// parseJSONNumber helps to support both string and numeric JSON values since different protocols return different types for the same fields. -func parseJSONNumber(raw json.RawMessage, dest *float64) error { - var strVal string - if err := json.Unmarshal(raw, &strVal); err == nil { - val, err1 := strconv.ParseFloat(strVal, 64) - if err1 != nil { - return err1 - } - *dest = val - return nil - } - return json.Unmarshal(raw, dest) -} diff --git a/jobs/jobs/protocols/stats/stats_test.go b/jobs/jobs/protocols/stats/stats_test.go deleted file mode 100644 index e9c91c36..00000000 --- a/jobs/jobs/protocols/stats/stats_test.go +++ /dev/null @@ -1,165 +0,0 @@ -package stats_test - -import ( - "bytes" - "context" - "errors" - "github.com/stretchr/testify/assert" - "github.com/test-go/testify/mock" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/stats" - "go.uber.org/zap" - "io" - "net/http" - "testing" -) - -func Test_ProtocolsStatsJob_Succeed(t *testing.T) { - var mockErr error - statsFetcher := &mockStatsFetch{} - statsFetcher.On("Get", mock.Anything).Return(stats.Stats{}, mockErr) - statsFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) - - job := stats.NewProtocolsStatsJob(mockWriterDB, zap.NewNop(), "v1", statsFetcher) - resultErr := job.Run(context.Background()) - assert.Nil(t, resultErr) -} - -func Test_ProtocolsStatsJob_FailFetching(t *testing.T) { - var mockErr error - statsFetcher := &mockStatsFetch{} - statsFetcher.On("Get", mock.Anything).Return(stats.Stats{}, errors.New("mocked_error_fetch")) - statsFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) - - job := stats.NewProtocolsStatsJob(mockWriterDB, zap.NewNop(), "v1", statsFetcher) - resultErr := job.Run(context.Background()) - assert.NotNil(t, resultErr) - assert.Equal(t, "mocked_error_fetch", resultErr.Error()) -} - -func Test_ProtocolsStatsJob_FailedUpdatingDB(t *testing.T) { - var mockErr error - statsFetcher := &mockStatsFetch{} - statsFetcher.On("Get", mock.Anything).Return(stats.Stats{}, mockErr) - statsFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(errors.New("mocked_error_update_db")) - - job := stats.NewProtocolsStatsJob(mockWriterDB, zap.NewNop(), "v1", statsFetcher) - resultErr := job.Run(context.Background()) - assert.NotNil(t, resultErr) - assert.Equal(t, "mocked_error_update_db", resultErr.Error()) -} - -func Test_HttpRestClientStats_FailRequestCreation(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return nil, nil - })) - _, err := a.Get(nil) // passing ctx nil to force request creation error - assert.NotNil(t, err) -} - -func Test_HttpRestClientStats_FailedRequestExecution(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return nil, errors.New("mocked_http_client_do") - })) - _, err := a.Get(context.Background()) - assert.NotNil(t, err) - assert.Equal(t, "mocked_http_client_do", err.Error()) -} - -func Test_HttpRestClientStats_Status500(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusInternalServerError, - Body: io.NopCloser(bytes.NewBufferString("response_body_test")), - }, nil - })) - _, err := a.Get(context.Background()) - assert.NotNil(t, err) - assert.Equal(t, "failed retrieving client stats from url:localhost - status_code:500 - response_body:response_body_test", err.Error()) -} - -func Test_HttpRestClientStats_Status200_FailedReadBody(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Body: &mockFailReadCloser{}, - }, nil - })) - _, err := a.Get(context.Background()) - assert.NotNil(t, err) - assert.Equal(t, "failed reading response body from client stats. url:localhost - status_code:200: mocked_fail_read", err.Error()) -} - -func Test_HttpRestClientStats_Status200_FailedParsing(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewBufferString("this should be a json")), - }, nil - })) - _, err := a.Get(context.Background()) - assert.NotNil(t, err) - assert.Equal(t, "failed unmarshalling response body from client stats. url:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) -} - -func Test_HttpRestClientStats_Status200_Succeed(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewBufferString("{\"total_value_locked\":\"123\",\"total_messages\":\"456\"}")), - }, nil - })) - resp, err := a.Get(context.Background()) - assert.Nil(t, err) - assert.Equal(t, float64(123), resp.TotalValueLocked) - assert.Equal(t, uint64(456), resp.TotalMessages) -} - -type mockStatsFetch struct { - mock.Mock -} - -func (m *mockStatsFetch) Get(ctx context.Context) (stats.Stats, error) { - args := m.Called(ctx) - return args.Get(0).(stats.Stats), args.Error(1) -} - -func (m *mockStatsFetch) ProtocolName() string { - args := m.Called() - return args.String(0) -} - -type mockHttpClient func(req *http.Request) (*http.Response, error) - -func (m mockHttpClient) Do(req *http.Request) (*http.Response, error) { - return m(req) -} - -type mockFailReadCloser struct { -} - -func (m *mockFailReadCloser) Read(p []byte) (n int, err error) { - return 0, errors.New("mocked_fail_read") -} - -func (m *mockFailReadCloser) Close() error { - return nil -} diff --git a/jobs/jobs/protocols/stats_test.go b/jobs/jobs/protocols/stats_test.go new file mode 100644 index 00000000..ff458fc6 --- /dev/null +++ b/jobs/jobs/protocols/stats_test.go @@ -0,0 +1,184 @@ +package protocols_test + +import ( + "context" + "errors" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api/write" + "github.com/stretchr/testify/assert" + "github.com/test-go/testify/mock" + "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" + "go.uber.org/zap" + "testing" + "time" +) + +func Test_ProtocolsStatsJob_Success(t *testing.T) { + + ctx := context.Background() + from, _ := time.Parse(time.RFC3339, "2024-02-01T00:00:00Z") + to, _ := time.Parse(time.RFC3339, "2024-02-02T00:00:00Z") + + mr := &mockProtocolRepo{} + activity := repository.ProtocolActivity{ + TotalValueSecure: 10, + TotalValueTransferred: 20, + Volume: 30, + TotalMessages: 40, + Activities: []repository.Activity{ + { + EmitterChainID: 1, + DestinationChainID: 2, + Txs: 50, + TotalUSD: 60, + }, + { + EmitterChainID: 1, + DestinationChainID: 2, + Txs: 25, + TotalUSD: 30, + }, + }, + } + + stats := repository.Stats{ + TotalValueLocked: 70, + TotalMessages: 80, + Volume: 90, + } + + mr.On("GetActivity", ctx, from, to).Return(activity, nil) + mr.On("GetStats", ctx).Return(stats, nil) + mr.On("ProtocolName").Return(commons.MayanProtocol) + + mockWriterDB := &mocks.MockWriterApi{} + + expectedStatsPoint := influxdb2. + NewPointWithMeasurement(dbconsts.ProtocolsStatsMeasurementDaily). + AddTag("protocol", commons.MayanProtocol). + AddField("total_messages", stats.TotalMessages). + AddField("total_value_locked", stats.TotalValueLocked). + AddField("volume", stats.Volume). + SetTime(from) + + expectedActivityPoint := influxdb2.NewPointWithMeasurement(dbconsts.ProtocolsActivityMeasurementDaily). + AddTag("protocol", commons.MayanProtocol). + AddField("total_value_secure", activity.TotalValueSecure). + AddField("total_value_transferred", activity.TotalValueTransferred). + AddField("volume", activity.Volume). + AddField("txs", 75). + AddField("total_usd", 90). + SetTime(from) + + mockWriterDB.On("WritePoint", ctx, mock.MatchedBy(pointMatcher{Expected: expectedStatsPoint}.Matches)).Return(nil) + mockWriterDB.On("WritePoint", ctx, mock.MatchedBy(pointMatcher{Expected: expectedActivityPoint}.Matches)).Return(nil).Times(1) + + job := protocols.NewStatsJob(mockWriterDB, + from, + to, + dbconsts.ProtocolsActivityMeasurementDaily, + dbconsts.ProtocolsStatsMeasurementDaily, + []repository.ProtocolRepository{mr}, + zap.NewNop()) + + err := job.Run(ctx) + assert.Nil(t, err) + mockWriterDB.AssertNumberOfCalls(t, "WritePoint", 2) +} + +func Test_ProtocolsStatsJob_FailedFetchingStats(t *testing.T) { + ctx := context.Background() + from, _ := time.Parse(time.RFC3339, "2024-02-01T00:00:00Z") + to, _ := time.Parse(time.RFC3339, "2024-02-02T00:00:00Z") + + mr := &mockProtocolRepo{} + activity := repository.ProtocolActivity{} + + stats := repository.Stats{} + + mr.On("GetActivity", ctx, from, to).Return(activity, nil) + mr.On("GetStats", ctx).Return(stats, errors.New("mocked_error")) + mr.On("ProtocolName").Return(commons.MayanProtocol) + + mockWriterDB := &mocks.MockWriterApi{} + job := protocols.NewStatsJob(mockWriterDB, + from, + to, + dbconsts.ProtocolsActivityMeasurementDaily, + dbconsts.ProtocolsStatsMeasurementDaily, + []repository.ProtocolRepository{mr}, + zap.NewNop()) + + err := job.Run(ctx) + assert.Nil(t, err) + mockWriterDB.AssertNumberOfCalls(t, "WritePoint", 0) +} + +func Test_ProtocolsStatsJob_FailedFetchingActivity(t *testing.T) { + ctx := context.Background() + from, _ := time.Parse(time.RFC3339, "2024-02-01T00:00:00Z") + to, _ := time.Parse(time.RFC3339, "2024-02-02T00:00:00Z") + + mr := &mockProtocolRepo{} + activity := repository.ProtocolActivity{} + + stats := repository.Stats{} + + mr.On("GetActivity", ctx, from, to).Return(activity, errors.New("mocked_error")) + mr.On("GetStats", ctx).Return(stats, nil) + mr.On("ProtocolName").Return(commons.MayanProtocol) + + mockWriterDB := &mocks.MockWriterApi{} + job := protocols.NewStatsJob(mockWriterDB, + from, + to, + dbconsts.ProtocolsActivityMeasurementDaily, + dbconsts.ProtocolsStatsMeasurementDaily, + []repository.ProtocolRepository{mr}, + zap.NewNop()) + + err := job.Run(ctx) + assert.Nil(t, err) + mockWriterDB.AssertNumberOfCalls(t, "WritePoint", 0) + +} + +type mockProtocolRepo struct { + mock.Mock +} + +func (m *mockProtocolRepo) GetActivity(ctx context.Context, from, to time.Time) (repository.ProtocolActivity, error) { + args := m.Called(ctx, from, to) + return args.Get(0).(repository.ProtocolActivity), args.Error(1) +} +func (m *mockProtocolRepo) GetStats(ctx context.Context) (repository.Stats, error) { + args := m.Called(ctx) + return args.Get(0).(repository.Stats), args.Error(1) +} +func (m *mockProtocolRepo) ProtocolName() string { + args := m.Called() + return args.String(0) +} + +type pointMatcher struct { + Expected *write.Point +} + +func (p pointMatcher) Matches(x interface{}) bool { + actual, ok := x.([]*write.Point) + if !ok || len(actual) != 1 { + return false + } + + // Perform your comparison logic here + // For example, check if the measurement name matches + return actual[0].Name() == p.Expected.Name() +} + +func (p pointMatcher) String() string { + return "matches the expected point" +}