diff --git a/.gitignore b/.gitignore index 8ac2603c..68e22e7f 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,4 @@ serviceAccountKey.json bigtableAccountKey.json tsconfig.tsbuildinfo serviceAccount.json -.run \ No newline at end of file +.run/ \ No newline at end of file diff --git a/api/handlers/protocols/repository.go b/api/handlers/protocols/repository.go index 1fb724cc..8b3fc27f 100644 --- a/api/handlers/protocols/repository.go +++ b/api/handlers/protocols/repository.go @@ -8,38 +8,13 @@ import ( "github.com/mitchellh/mapstructure" "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" "go.uber.org/zap" + "time" ) -const QueryTemplateLatestPoint = ` -from(bucket: "%s") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") - |> last() - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") -` - -const QueryTemplateLast24Point = ` -from(bucket: "%s") - |> range(start: -1d) - |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") - |> first() - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") -` - -const QueryTemplateActivityLatestPoint = ` -from(bucket: "%s") - |> range(start: 1970-01-01T00:00:00Z) - |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") - |> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"]) - |> last() - |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") -` - -// QueryIntProtocolsTotalStartOfDay Query template for internal protocols (cctp and portal_token_bridge) to fetch total values till the start of current day -const QueryIntProtocolsTotalStartOfDay = ` +// QueryCoreProtocolTotalStartOfDay Query template for core protocols (cctp and portal_token_bridge) to fetch total values till the start of current day +const QueryCoreProtocolTotalStartOfDay = ` import "date" - import "types" - + startOfCurrentDay = date.truncate(t: now(), unit: 1d) data = from(bucket: "%s") @@ -58,6 +33,7 @@ totalMsgs = data |> group() |> sum() |> set(key:"_field",value:"total_messages") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) union(tables:[tvt,totalMsgs]) |> set(key:"_time",value:string(v:startOfCurrentDay)) @@ -65,8 +41,8 @@ union(tables:[tvt,totalMsgs]) |> set(key:"app_id",value:"%s") ` -// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day -const QueryIntProtocolsDeltaSinceStartOfDay = ` +// QueryCoreProtocolDeltaSinceStartOfDay calculate delta since the beginning of current day +const QueryCoreProtocolDeltaSinceStartOfDay = ` import "date" import "types" @@ -89,6 +65,7 @@ totalMsgs = data |> group() |> sum() |> set(key:"_field",value:"total_messages") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) union(tables:[tvt,totalMsgs]) |> set(key:"_time",value:string(v:startOfDay)) @@ -96,11 +73,10 @@ union(tables:[tvt,totalMsgs]) |> set(key:"app_id",value:"%s") ` -// QueryIntProtocolsDeltaLastDay calculate last day delta -const QueryIntProtocolsDeltaLastDay = ` +// QueryCoreProtocolDeltaLastDay calculate last day delta +const QueryCoreProtocolDeltaLastDay = ` import "date" import "types" - ts = date.truncate(t: now(), unit: 1h) yesterday = date.sub(d: 1d, from: ts) @@ -121,6 +97,7 @@ totalMsgs = data |> group() |> sum() |> set(key:"_field",value:"total_messages") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) union(tables:[tvt,totalMsgs]) |> set(key:"_time",value:string(v:yesterday)) @@ -128,23 +105,87 @@ union(tables:[tvt,totalMsgs]) |> set(key:"app_id",value:"%s") ` +const QueryTemplateProtocolStatsLastDay = ` + from(bucket: "%s") + |> range(start: %s, stop: %s) + |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s") + |> first() + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +` + +const QueryTemplateProtocolStats = ` + data = from(bucket: "%s") + |> range(start: -2d) + |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s") + + totalMsg = data + |> filter(fn: (r) => r._field == "total_messages") + |> sort(columns:["_time"],desc:false) + |> last() + + tvl = data + |> filter(fn: (r) => r._field == "total_value_locked") + |> sort(columns:["_time"],desc:false) + |> last() + + volume = data + |> filter(fn: (r) => r._field == "volume") + |> sort(columns:["_time"],desc:false) + |> last() + + union(tables:[totalMsg,tvl,volume]) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +` + +const QueryTemplateProtocolActivity = ` + data = + from(bucket: "%s") + |> range(start: %s) + |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s") + + tvs = data + |> filter(fn: (r) => r._field == "total_value_secure") + |> cumulativeSum() + |> last() + + tvt = data + |> filter(fn: (r) => r._field == "total_value_transferred") + |> cumulativeSum() + |> last() + + volume = data + |> filter(fn: (r) => r._field == "volume") + |> sort(columns:["_time"],desc:false) + |> cumulativeSum() + |> last() + + txs = data + |> filter(fn: (r) => r._field == "txs") + |> sort(columns:["_time"],desc:false) + |> cumulativeSum() + |> last() + + union(tables:[tvs,tvt,volume,txs]) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +` + type Repository struct { - queryAPI QueryDoer - logger *zap.Logger - bucketInfinite string - bucket30d string - statsVersion string - activityVersion string - intProtocolMeasurement map[string]struct { + queryAPI QueryDoer + logger *zap.Logger + bucketInfinite string + bucket30d string + coreProtocolMeasurement map[string]struct { Daily string Hourly string } } type rowStat struct { - Protocol string `mapstructure:"protocol"` - TotalMessages uint64 `mapstructure:"total_messages"` - TotalValueLocked float64 `mapstructure:"total_value_locked"` + Protocol string `mapstructure:"protocol"` + TotalMessages uint64 `mapstructure:"total_messages"` + TotalValueLocked float64 `mapstructure:"total_value_locked"` + Volume float64 `mapstructure:"volume"` + Time time.Time `mapstructure:"_time"` } type intRowStat struct { @@ -159,14 +200,12 @@ type intStats struct { } type rowActivity struct { - Protocol string `mapstructure:"protocol"` - DestinationChainId string `mapstructure:"destination_chain_id"` - EmitterChainId string `mapstructure:"emitter_chain_id"` - From string `mapstructure:"from"` - TotalUsd float64 `mapstructure:"total_usd"` - TotalValueTransferred float64 `mapstructure:"total_value_transferred"` - TotalVolumeSecure float64 `mapstructure:"total_value_secure"` - Txs uint64 `mapstructure:"txs"` + Protocol string `mapstructure:"protocol"` + Time time.Time `mapstructure:"_time"` + TotalUsd float64 `mapstructure:"total_usd"` + TotalValueTransferred float64 `mapstructure:"total_value_transferred"` + TotalValueSecure float64 `mapstructure:"total_value_secure"` + Txs uint64 `mapstructure:"txs"` } type stats struct { @@ -193,15 +232,13 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer { return &queryApiWrapper{qApi: qApi} } -func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, statsVersion, activityVersion string, logger *zap.Logger) *Repository { +func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d string, logger *zap.Logger) *Repository { return &Repository{ - queryAPI: qApi, - bucketInfinite: bucketInfinite, - bucket30d: bucket30d, - statsVersion: statsVersion, - activityVersion: activityVersion, - logger: logger, - intProtocolMeasurement: map[string]struct { + queryAPI: qApi, + bucketInfinite: bucketInfinite, + bucket30d: bucket30d, + logger: logger, + coreProtocolMeasurement: map[string]struct { Daily string Hourly string }{ @@ -215,41 +252,74 @@ func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult, return q.qApi.Query(ctx, query) } -// returns latest and last 24 hr stats for a given protocol -func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (stats, error) { +func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (rowStat, error) { - // fetch latest stat - q := buildQuery(QueryTemplateLatestPoint, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion) - latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) + q := fmt.Sprintf(QueryTemplateProtocolStats, r.bucket30d, dbconsts.ProtocolsStatsMeasurementHourly, protocol) + + statsData, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) if err != nil { - return stats{}, err + r.logger.Error("error fetching latest daily stats", zap.Error(err)) + return rowStat{}, err } - // fetch last 24 hr stat - q = buildQuery(QueryTemplateLast24Point, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion) - last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) - return stats{ - Latest: latest, - Last24: last24hr, - }, err + + return rowStat{ + Protocol: protocol, + TotalMessages: statsData.TotalMessages, + TotalValueLocked: statsData.TotalValueLocked, + Volume: statsData.Volume, + }, nil + +} + +func (r *Repository) getProtocolStatsLastDay(ctx context.Context, protocol string) (rowStat, error) { + + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + q := fmt.Sprintf(QueryTemplateProtocolStatsLastDay, r.bucket30d, from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, protocol) + + lastDayData, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) + if err != nil { + r.logger.Error("error fetching last day stats", zap.Error(err)) + return rowStat{}, err + } + + return lastDayData, nil + } func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (rowActivity, error) { - q := buildQuery(QueryTemplateActivityLatestPoint, r.bucket30d, dbconsts.ProtocolsActivityMeasurement, protocol, r.activityVersion) - return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol) + + q := fmt.Sprintf(QueryTemplateProtocolActivity, r.bucketInfinite, "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, protocol) + activityDaily, err := fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol) + if err != nil { + r.logger.Error("error fetching latest daily activity", zap.Error(err)) + return rowActivity{}, err + } + + q = fmt.Sprintf(QueryTemplateProtocolActivity, r.bucket30d, activityDaily.Time.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, protocol) + activityHourly, err := fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol) + + return rowActivity{ + Protocol: protocol, + Txs: activityDaily.Txs + activityHourly.Txs, + TotalUsd: activityDaily.TotalUsd + activityHourly.TotalUsd, + TotalValueTransferred: activityDaily.TotalValueTransferred + activityHourly.TotalValueTransferred, + TotalValueSecure: activityDaily.TotalValueSecure + activityHourly.TotalValueSecure, + }, nil } -// returns latest and last 24 hr for internal protocols (cctp and portal_token_bridge) -func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) { +// returns latest and last 24 hr for core protocols (cctp and portal_token_bridge) +func (r *Repository) getCoreProtocolStats(ctx context.Context, protocol string) (intStats, error) { // calculate total values till the start of current day - totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol, protocol) + totalTillCurrentDayQuery := fmt.Sprintf(QueryCoreProtocolTotalStartOfDay, r.bucketInfinite, r.coreProtocolMeasurement[protocol].Daily, protocol, protocol) totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol) if err != nil { return intStats{}, err } // calculate delta since the beginning of current day - q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol) + q2 := fmt.Sprintf(QueryCoreProtocolDeltaSinceStartOfDay, r.bucket30d, r.coreProtocolMeasurement[protocol].Hourly, protocol, protocol) currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol) if errCD != nil { return intStats{}, errCD @@ -266,7 +336,7 @@ func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol stri } // calculate last day delta - q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol) + q3 := fmt.Sprintf(QueryCoreProtocolDeltaLastDay, r.bucket30d, r.coreProtocolMeasurement[protocol].Hourly, protocol, protocol) deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol) if errQ3 != nil { return result, errQ3 @@ -297,7 +367,3 @@ func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx co err = mapstructure.Decode(result.Record().Values(), &res) return res, err } - -func buildQuery(queryTemplate, bucket, measurement, contributorName, version string) string { - return fmt.Sprintf(queryTemplate, bucket, measurement, contributorName, version) -} diff --git a/api/handlers/protocols/service.go b/api/handlers/protocols/service.go index bf62698f..c3ac4b65 100644 --- a/api/handlers/protocols/service.go +++ b/api/handlers/protocols/service.go @@ -19,7 +19,7 @@ type Service struct { Protocols []string repo *Repository logger *zap.Logger - intProtocols []string + coreProtocols []string cache cache.Cache cacheKeyPrefix string cacheTTL int @@ -46,12 +46,12 @@ type tvlProvider interface { Get(ctx context.Context) (string, error) } -func NewService(extProtocols, intProtocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int, metrics metrics.Metrics, tvlProvider tvlProvider) *Service { +func NewService(extProtocols, coreProtocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int, metrics metrics.Metrics, tvlProvider tvlProvider) *Service { return &Service{ Protocols: extProtocols, repo: repo, logger: logger, - intProtocols: intProtocols, + coreProtocols: coreProtocols, cache: cache, cacheKeyPrefix: cacheKeyPrefix, cacheTTL: cacheTTL, @@ -63,15 +63,15 @@ func NewService(extProtocols, intProtocols []string, repo *Repository, logger *z func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalValuesDTO { wg := &sync.WaitGroup{} - totalProtocols := len(s.Protocols) + len(s.intProtocols) + totalProtocols := len(s.Protocols) + len(s.coreProtocols) wg.Add(totalProtocols) results := make(chan ProtocolTotalValuesDTO, totalProtocols) for _, p := range s.Protocols { go s.fetchProtocolValues(ctx, wg, p, results, s.getProtocolStats) } - for _, p := range s.intProtocols { - go s.fetchProtocolValues(ctx, wg, p, results, s.getIntProtocolStats) + for _, p := range s.coreProtocols { + go s.fetchProtocolValues(ctx, wg, p, results, s.getCoreProtocolStats) } wg.Wait() close(results) @@ -119,9 +119,9 @@ func (s *Service) fetchProtocolValues(ctx context.Context, wg *sync.WaitGroup, p } // getProtocolStats fetches stats for CCTP and PortalTokenBridge -func (s *Service) getIntProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { +func (s *Service) getCoreProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { - protocolStats, err := s.repo.getInternalProtocolStats(ctx, protocol) + protocolStats, err := s.repo.getCoreProtocolStats(ctx, protocol) if err != nil { return ProtocolStats{ Protocol: protocol, @@ -144,7 +144,7 @@ func (s *Service) getIntProtocolStats(ctx context.Context, protocol string) (Pro val.LastDayDiffPercentage = percentage } - if CCTP == protocol { + if PortalTokenBridge == protocol { tvl, errTvl := s.tvl.Get(ctx) if errTvl != nil { s.logger.Error("error fetching tvl", zap.Error(errTvl), zap.String("protocol", protocol)) @@ -169,16 +169,24 @@ func (s *Service) getProtocolStats(ctx context.Context, protocol string) (Protoc } statsRes := make(chan statsResult, 1) go func() { + defer close(statsRes) rowStats, errStats := s.repo.getProtocolStats(ctx, protocol) - statsRes <- statsResult{result: rowStats, Err: errStats} - close(statsRes) + if errStats != nil { + statsRes <- statsResult{Err: errStats} + return + } + lastDayStats, errStats := s.repo.getProtocolStatsLastDay(ctx, protocol) + if errStats != nil { + statsRes <- statsResult{Err: errStats} + return + } + statsRes <- statsResult{result: stats{Latest: rowStats, Last24: lastDayStats}} }() activity, err := s.repo.getProtocolActivity(ctx, protocol) if err != nil { s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", protocol)) return ProtocolStats{Protocol: protocol}, err - } rStats := <-statsRes @@ -192,7 +200,7 @@ func (s *Service) getProtocolStats(ctx context.Context, protocol string) (Protoc TotalValueLocked: rStats.result.Latest.TotalValueLocked, TotalMessages: rStats.result.Latest.TotalMessages, TotalValueTransferred: activity.TotalValueTransferred, - TotalValueSecured: activity.TotalVolumeSecure, + TotalValueSecured: activity.TotalValueSecure, } totalMsgNow := rStats.result.Latest.TotalMessages diff --git a/api/handlers/protocols/service_test.go b/api/handlers/protocols/service_test.go index d759d639..c695f596 100644 --- a/api/handlers/protocols/service_test.go +++ b/api/handlers/protocols/service_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/influxdata/influxdb-client-go/v2/api" "github.com/influxdata/influxdb-client-go/v2/api/query" "github.com/stretchr/testify/assert" "github.com/test-go/testify/mock" @@ -44,22 +43,36 @@ func TestService_GetProtocolsTotalValues(t *testing.T) { respActivityLast.On("Next").Return(true) respActivityLast.On("Err").Return(errNil) respActivityLast.On("Close").Return(errNil) + ts := time.Now().UTC() respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ "protocol": "protocol1", "total_messages": uint64(4), "total_value_transferred": float64(7), "total_value_secure": float64(9), + "_time": ts, + })) + + respActivity2 := &mockQueryTableResult{} + respActivity2.On("Next").Return(true) + respActivity2.On("Err").Return(errNil) + respActivity2.On("Close").Return(errNil) + respActivity2.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_transferred": float64(7), + "total_value_secure": float64(9), })) ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStats, "bucket30d", dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLatest, nil) + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStatsLastDay, "bucket30d", from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucketInfinite", "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(respActivityLast, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucket30d", ts.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, "protocol1")).Return(respActivity2, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") - queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil) - - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) @@ -67,8 +80,8 @@ func TestService_GetProtocolsTotalValues(t *testing.T) { assert.Equal(t, "protocol1", values[0].Protocol) assert.Equal(t, 5.00, values[0].TotalValueLocked) assert.Equal(t, uint64(7), values[0].TotalMessages) - assert.Equal(t, 9.00, values[0].TotalValueSecured) - assert.Equal(t, 7.00, values[0].TotalValueTransferred) + assert.Equal(t, 18.00, values[0].TotalValueSecured) + assert.Equal(t, 14.00, values[0].TotalValueTransferred) assert.Equal(t, uint64(3), values[0].LastDayMessages) assert.Equal(t, "75.00%", values[0].LastDayDiffPercentage) @@ -98,20 +111,20 @@ func TestService_GetProtocolsTotalValues_FailedFetchingActivity(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStats, "bucket30d", dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLatest, nil) + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStatsLastDay, "bucket30d", from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucketInfinite", "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(&mockQueryTableResult{}, errors.New("mocked_error")) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") - queryAPI.On("Query", ctx, activityQuery).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_activity_error")) - - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) assert.Equal(t, "protocol1", values[0].Protocol) assert.NotNil(t, values[0].Error) - assert.Equal(t, "mocked_fetching_activity_error", values[0].Error) + assert.Equal(t, "mocked_error", values[0].Error) } func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { @@ -131,29 +144,43 @@ func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { respActivityLast.On("Next").Return(true) respActivityLast.On("Err").Return(errNil) respActivityLast.On("Close").Return(errNil) + ts := time.Now().UTC() respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ "protocol": "protocol1", "total_messages": uint64(4), "total_value_transferred": float64(7), "total_volume_secure": float64(9), + "_time": ts, + })) + + respActivity2 := &mockQueryTableResult{} + respActivity2.On("Next").Return(true) + respActivity2.On("Err").Return(errNil) + respActivity2.On("Close").Return(errNil) + respActivity2.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_transferred": float64(7), + "total_value_secure": float64(9), })) ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_stats_error")) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStats, "bucket30d", dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(&mockQueryTableResult{}, errors.New("mocked_error")) + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStatsLastDay, "bucket30d", from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucketInfinite", "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(respActivityLast, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucket30d", ts.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, "protocol1")).Return(respActivity2, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") - queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, errNil) - - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) assert.Equal(t, "protocol1", values[0].Protocol) assert.NotNil(t, values[0].Error) - assert.Equal(t, "mocked_fetching_stats_error", values[0].Error) + assert.Equal(t, "mocked_error", values[0].Error) } func TestService_GetProtocolsTotalValues_CacheHit(t *testing.T) { @@ -184,7 +211,7 @@ func TestService_GetCCTP_Stats(t *testing.T) { totalStartOfCurrentDay.On("Err").Return(errNil) totalStartOfCurrentDay.On("Close").Return(errNil) totalStartOfCurrentDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "app_id": protocols.CCTP, + "app_id": protocols.PortalTokenBridge, "total_messages": uint64(50), "total_value_transferred": 4e8, })) @@ -194,7 +221,7 @@ func TestService_GetCCTP_Stats(t *testing.T) { deltaSinceStartOfDay.On("Err").Return(errNil) deltaSinceStartOfDay.On("Close").Return(errNil) deltaSinceStartOfDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "app_id": protocols.CCTP, + "app_id": protocols.PortalTokenBridge, "total_messages": uint64(6), "total_value_transferred": 2e8, })) @@ -204,7 +231,7 @@ func TestService_GetCCTP_Stats(t *testing.T) { deltaLastDay.On("Err").Return(errNil) deltaLastDay.On("Close").Return(errNil) deltaLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "app_id": protocols.CCTP, + "app_id": protocols.PortalTokenBridge, "total_messages": uint64(7), "total_value_transferred": 132, })) @@ -212,18 +239,18 @@ func TestService_GetCCTP_Stats(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsTotalStartOfDay, "bucketInfinite", dbconsts.CctpStatsMeasurementDaily, protocols.CCTP, protocols.CCTP)).Return(totalStartOfCurrentDay, errNil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaSinceStartOfDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaSinceStartOfDay, errNil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaLastDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaLastDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolTotalStartOfDay, "bucketInfinite", dbconsts.CctpStatsMeasurementDaily, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(totalStartOfCurrentDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolDeltaSinceStartOfDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(deltaSinceStartOfDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolDeltaLastDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(deltaLastDay, errNil) - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{}, []string{protocols.CCTP}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) + service := protocols.NewService([]string{}, []string{protocols.PortalTokenBridge}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.NotNil(t, values) assert.Equal(t, 1, len(values)) for i := range values { switch values[i].Protocol { - case "cctp": + case "portal_token_bridge": assert.Equal(t, uint64(56), values[i].TotalMessages) assert.Equal(t, 6.0, values[i].TotalValueTransferred) assert.Equal(t, uint64(7), values[i].LastDayMessages) diff --git a/api/internal/config/config.go b/api/internal/config/config.go index 1fcd00f7..ac9aecc0 100644 --- a/api/internal/config/config.go +++ b/api/internal/config/config.go @@ -69,9 +69,7 @@ type AppConfig struct { //Api Tokens Tokens string } - Protocols []string - ProtocolsStatsVersion string - ProtocolsActivityVersion string + Protocols []string } // GetLogLevel get zapcore.Level define in the configuraion. diff --git a/api/main.go b/api/main.go index 0f14683a..b19a4767 100644 --- a/api/main.go +++ b/api/main.go @@ -159,14 +159,7 @@ func main() { relaysRepo := relays.NewRepository(db.Database, rootLogger) operationsRepo := operations.NewRepository(db.Database, rootLogger) statsRepo := stats.NewRepository(influxCli, cfg.Influx.Organization, cfg.Influx.Bucket24Hours, rootLogger) - protocolsRepo := protocols.NewRepository( - protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), - cfg.Influx.BucketInfinite, - cfg.Influx.Bucket30Days, - cfg.ProtocolsStatsVersion, - cfg.ProtocolsActivityVersion, - rootLogger, - ) + protocolsRepo := protocols.NewRepository(protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), cfg.Influx.BucketInfinite, cfg.Influx.Bucket30Days, rootLogger) // create token provider tokenProvider := domain.NewTokenProvider(cfg.P2pNetwork) diff --git a/common/dbconsts/consts.go b/common/dbconsts/consts.go index 12e7fbd1..0b9acca1 100644 --- a/common/dbconsts/consts.go +++ b/common/dbconsts/consts.go @@ -2,8 +2,10 @@ package dbconsts // influx-db constants const ( - ProtocolsActivityMeasurement = "protocols_activity" - ProtocolsStatsMeasurement = "protocols_stats_v1" + ProtocolsActivityMeasurementHourly = "protocols_activity_1h" + ProtocolsActivityMeasurementDaily = "protocols_activity_1d" + ProtocolsStatsMeasurementDaily = "protocols_stats_1d" + ProtocolsStatsMeasurementHourly = "protocols_stats_1h" CctpStatsMeasurementHourly = intProtocolStatsMeasurement1h TokenBridgeStatsMeasurementHourly = intProtocolStatsMeasurement1h diff --git a/deploy/jobs/protocols-stats-daily.yaml b/deploy/jobs/protocols-stats-daily.yaml new file mode 100644 index 00000000..2e8b96c1 --- /dev/null +++ b/deploy/jobs/protocols-stats-daily.yaml @@ -0,0 +1,45 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: protocols-stats-daily + namespace: {{ .NAMESPACE }} +spec: #cronjob specs + schedule: "0 0 * * *" + jobTemplate: + spec: # job specs + template: + spec: # pod specs + containers: + - name: protocols-stats-daily + image: {{ .IMAGE_NAME }} + imagePullPolicy: Always + env: + - name: ENVIRONMENT + value: {{ .ENVIRONMENT }} + - name: LOG_LEVEL + value: {{ .LOG_LEVEL }} + - name: JOB_ID + value: JOB_PROTOCOLS_STATS_DAILY + - name: INFLUX_URL + valueFrom: + configMapKeyRef: + name: config + key: influxdb-url + - name: INFLUX_TOKEN + valueFrom: + secretKeyRef: + name: influxdb + key: token + - name: INFLUX_ORGANIZATION + valueFrom: + configMapKeyRef: + name: config + key: influxdb-organization + - name: INFLUX_BUCKET_INFINITE + valueFrom: + configMapKeyRef: + name: config + key: influxdb-bucket-infinite + - name: PROTOCOLS_JSON + value: {{ .PROTOCOLS_JSON }} + restartPolicy: OnFailure \ No newline at end of file diff --git a/deploy/jobs/protocols-stats-hourly.yaml b/deploy/jobs/protocols-stats-hourly.yaml new file mode 100644 index 00000000..edc14395 --- /dev/null +++ b/deploy/jobs/protocols-stats-hourly.yaml @@ -0,0 +1,45 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: protocols-stats-hourly + namespace: {{ .NAMESPACE }} +spec: #cronjob specs + schedule: "0 * * * *" + jobTemplate: + spec: # job specs + template: + spec: # pod specs + containers: + - name: protocols-stats-hourly + image: {{ .IMAGE_NAME }} + imagePullPolicy: Always + env: + - name: ENVIRONMENT + value: {{ .ENVIRONMENT }} + - name: LOG_LEVEL + value: {{ .LOG_LEVEL }} + - name: JOB_ID + value: JOB_PROTOCOLS_STATS_HOURLY + - name: INFLUX_URL + valueFrom: + configMapKeyRef: + name: config + key: influxdb-url + - name: INFLUX_TOKEN + valueFrom: + secretKeyRef: + name: influxdb + key: token + - name: INFLUX_ORGANIZATION + valueFrom: + configMapKeyRef: + name: config + key: influxdb-organization + - name: INFLUX_BUCKET_30_DAYS + valueFrom: + configMapKeyRef: + name: config + key: influxdb-bucket-30-days + - name: PROTOCOLS_JSON + value: {{ .PROTOCOLS_JSON }} + restartPolicy: OnFailure \ No newline at end of file diff --git a/jobs/cmd/main.go b/jobs/cmd/main.go index 52ca3e93..3721e80b 100644 --- a/jobs/cmd/main.go +++ b/jobs/cmd/main.go @@ -3,16 +3,16 @@ package main import ( "context" "encoding/json" + "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" "log" - "net/http" "os" "strings" "time" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/wormhole-foundation/wormhole-explorer/common/configuration" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/stats" "github.com/go-redis/redis" txtrackerProcessVaa "github.com/wormhole-foundation/wormhole-explorer/common/client/txtracker" @@ -83,12 +83,12 @@ func main() { migrationJob := initMigrateSourceTxJob(ctx, mCfg, chainID, logger) err = migrationJob.Run(ctx) - case jobs.JobIDProtocolsStats: - statsJob := initProtocolStatsJob(ctx, logger) + case jobs.JobIDProtocolsStatsHourly: + statsJob := initProtocolStatsHourlyJob(ctx, logger) + err = statsJob.Run(ctx) + case jobs.JobIDProtocolsStatsDaily: + statsJob := initProtocolStatsDailyJob(ctx, logger) err = statsJob.Run(ctx) - case jobs.JobIDProtocolsActivity: - activityJob := initProtocolActivityJob(ctx, logger) - err = activityJob.Run(ctx) default: logger.Fatal("Invalid job id", zap.String("job_id", cfg.JobID)) } @@ -173,7 +173,7 @@ func initMigrateSourceTxJob(ctx context.Context, cfg *config.MigrateSourceTxConf return migration.NewMigrationSourceChainTx(db.Database, cfg.PageSize, sdk.ChainID(cfg.ChainID), fromDate, toDate, txTrackerAPIClient, sleepTime, logger) } -func initProtocolStatsJob(ctx context.Context, logger *zap.Logger) *stats.ProtocolsStatsJob { +func initProtocolStatsHourlyJob(ctx context.Context, logger *zap.Logger) *protocols.StatsJob { cfgJob, errCfg := configuration.LoadFromEnv[config.ProtocolsStatsConfiguration](ctx) if errCfg != nil { log.Fatal("error creating config", errCfg) @@ -184,19 +184,28 @@ func initProtocolStatsJob(ctx context.Context, logger *zap.Logger) *stats.Protoc } dbClient := influxdb2.NewClient(cfgJob.InfluxUrl, cfgJob.InfluxToken) dbWriter := dbClient.WriteAPIBlocking(cfgJob.InfluxOrganization, cfgJob.InfluxBucket30Days) - statsFetchers := make([]stats.ClientStats, 0, len(cfgJob.Protocols)) + + protocolRepos := make([]repository.ProtocolRepository, 0, len(cfgJob.Protocols)) for _, c := range cfgJob.Protocols { - cs := stats.NewHttpRestClientStats(c.Name, - c.Url, - logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url)), - &http.Client{}, - ) - statsFetchers = append(statsFetchers, cs) + builder, ok := repository.ProtocolsRepositoryFactory[c.Name] + if !ok { + log.Fatal("error creating protocol stats client. Unknown protocol:", c.Name, errCfg) + } + cs := builder(c.Url, logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url))) + protocolRepos = append(protocolRepos, cs) } - return stats.NewProtocolsStatsJob(dbWriter, logger, cfgJob.StatsVersion, statsFetchers...) + to := time.Now().UTC().Truncate(1 * time.Hour) + from := to.Add(-1 * time.Hour) + return protocols.NewStatsJob(dbWriter, + from, + to, + dbconsts.ProtocolsActivityMeasurementHourly, + dbconsts.ProtocolsStatsMeasurementHourly, + protocolRepos, + logger) } -func initProtocolActivityJob(ctx context.Context, logger *zap.Logger) *activity.ProtocolsActivityJob { +func initProtocolStatsDailyJob(ctx context.Context, logger *zap.Logger) *protocols.StatsJob { cfgJob, errCfg := configuration.LoadFromEnv[config.ProtocolsStatsConfiguration](ctx) if errCfg != nil { log.Fatal("error creating config", errCfg) @@ -206,17 +215,26 @@ func initProtocolActivityJob(ctx context.Context, logger *zap.Logger) *activity. log.Fatal("error unmarshalling protocols config", errUnmarshal) } dbClient := influxdb2.NewClient(cfgJob.InfluxUrl, cfgJob.InfluxToken) - dbWriter := dbClient.WriteAPIBlocking(cfgJob.InfluxOrganization, cfgJob.InfluxBucket30Days) - activityFetchers := make([]activity.ClientActivity, 0, len(cfgJob.Protocols)) + dbWriter := dbClient.WriteAPIBlocking(cfgJob.InfluxOrganization, cfgJob.InfluxBucketInfinite) + + protocolRepos := make([]repository.ProtocolRepository, 0, len(cfgJob.Protocols)) for _, c := range cfgJob.Protocols { - builder, ok := activity.ActivitiesClientsFactory[c.Name] + builder, ok := repository.ProtocolsRepositoryFactory[c.Name] if !ok { - log.Fatal("error creating protocol activity fetcher. Unknown protocol:", c.Name, errCfg) + log.Fatal("error creating protocol stats client. Unknown protocol:", c.Name, errCfg) } - cs := builder(c.Name, c.Url, logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url))) - activityFetchers = append(activityFetchers, cs) + cs := builder(c.Url, logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url))) + protocolRepos = append(protocolRepos, cs) } - return activity.NewProtocolActivityJob(dbWriter, logger, cfgJob.ActivityVersion, activityFetchers...) + to := time.Now().UTC().Truncate(24 * time.Hour) + from := to.Add(-24 * time.Hour) + return protocols.NewStatsJob(dbWriter, + from, + to, + dbconsts.ProtocolsActivityMeasurementDaily, + dbconsts.ProtocolsStatsMeasurementDaily, + protocolRepos, + logger) } func handleExit() { diff --git a/jobs/config/config.go b/jobs/config/config.go index 9e682ea5..a09fe61b 100644 --- a/jobs/config/config.go +++ b/jobs/config/config.go @@ -53,14 +53,13 @@ type MigrateSourceTxConfiguration struct { } type ProtocolsStatsConfiguration struct { - InfluxUrl string `env:"INFLUX_URL"` - InfluxToken string `env:"INFLUX_TOKEN"` - InfluxOrganization string `env:"INFLUX_ORGANIZATION"` - InfluxBucket30Days string `env:"INFLUX_BUCKET_30_DAYS"` - StatsVersion string `env:"STATS_VERSION"` - ActivityVersion string `env:"ACTIVITY_VERSION"` - ProtocolsJson string `env:"PROTOCOLS_JSON"` - Protocols []Protocol `json:"PROTOCOLS"` + InfluxUrl string `env:"INFLUX_URL"` + InfluxToken string `env:"INFLUX_TOKEN"` + InfluxOrganization string `env:"INFLUX_ORGANIZATION"` + InfluxBucket30Days string `env:"INFLUX_BUCKET_30_DAYS"` + InfluxBucketInfinite string `env:"INFLUX_BUCKET_INFINITE"` + ProtocolsJson string `env:"PROTOCOLS_JSON"` + Protocols []Protocol `json:"PROTOCOLS"` } type Protocol struct { diff --git a/jobs/jobs/jobs.go b/jobs/jobs/jobs.go index a72a8564..e18d4978 100644 --- a/jobs/jobs/jobs.go +++ b/jobs/jobs/jobs.go @@ -5,12 +5,12 @@ import "context" // JobIDNotional is the job id for notional job. const ( - JobIDNotional = "JOB_NOTIONAL_USD" - JobIDTransferReport = "JOB_TRANSFER_REPORT" - JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES" - JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX" - JobIDProtocolsStats = "JOB_PROTOCOLS_STATS" - JobIDProtocolsActivity = "JOB_PROTOCOLS_ACTIVITY" + JobIDNotional = "JOB_NOTIONAL_USD" + JobIDTransferReport = "JOB_TRANSFER_REPORT" + JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES" + JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX" + JobIDProtocolsStatsDaily = "JOB_PROTOCOLS_STATS_DAILY" + JobIDProtocolsStatsHourly = "JOB_PROTOCOLS_STATS_HOURLY" ) // Job is the interface for jobs. diff --git a/jobs/jobs/protocols/activity/activity.go b/jobs/jobs/protocols/activity/activity.go deleted file mode 100644 index 151aa762..00000000 --- a/jobs/jobs/protocols/activity/activity.go +++ /dev/null @@ -1,82 +0,0 @@ -package activity - -import ( - "context" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/influxdata/influxdb-client-go/v2/api/write" - "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity/internal/repositories" - "go.uber.org/zap" - "strconv" - "sync" - "time" -) - -// NewProtocolActivityJob creates an instance of the job implementation. -func NewProtocolActivityJob(statsDB api.WriteAPIBlocking, logger *zap.Logger, version string, activityFetchers ...ClientActivity) *ProtocolsActivityJob { - return &ProtocolsActivityJob{ - statsDB: statsDB, - logger: logger.With(zap.String("module", "ProtocolsActivityJob")), - activityFetchers: activityFetchers, - version: version, - } -} - -func (m *ProtocolsActivityJob) Run(ctx context.Context) error { - - clientsQty := len(m.activityFetchers) - wg := sync.WaitGroup{} - wg.Add(clientsQty) - errs := make(chan error, clientsQty) - ts := time.Now().UTC().Truncate(time.Hour) // make minutes and seconds zero, so we only work with date and hour - from := time.Unix(0, 0).UTC() - m.logger.Info("running protocols activity job ", zap.Time("from", from), zap.Time("to", ts)) - for _, cs := range m.activityFetchers { - go func(c ClientActivity) { - defer wg.Done() - activity, err := c.Get(ctx, from, ts) - if err != nil { - errs <- err - return - } - errs <- m.updateActivity(ctx, c.ProtocolName(), m.version, activity, ts) - }(cs) - } - - wg.Wait() - close(errs) - for err := range errs { - if err != nil { - return err - } - } - - return nil -} - -func (m *ProtocolsActivityJob) updateActivity(ctx context.Context, protocol, version string, activity repositories.ProtocolActivity, ts time.Time) error { - - points := make([]*write.Point, 0, len(activity.Activities)) - - for i := range activity.Activities { - point := influxdb2. - NewPointWithMeasurement(dbconsts.ProtocolsActivityMeasurement). - AddTag("protocol", protocol). - AddTag("emitter_chain_id", strconv.FormatUint(activity.Activities[i].EmitterChainID, 10)). - AddTag("destination_chain_id", strconv.FormatUint(activity.Activities[i].DestinationChainID, 10)). - AddTag("version", version). - AddField("total_value_secure", activity.TotalValueSecure). - AddField("total_value_transferred", activity.TotalValueTransferred). - AddField("txs", activity.Activities[i].Txs). - AddField("total_usd", activity.Activities[i].TotalUSD). - SetTime(ts) - points = append(points, point) - } - - err := m.statsDB.WritePoint(ctx, points...) - if err != nil { - m.logger.Error("failed updating protocol Activities in influxdb", zap.Error(err), zap.String("protocol", protocol)) - } - return err -} diff --git a/jobs/jobs/protocols/activity/activity_test.go b/jobs/jobs/protocols/activity/activity_test.go deleted file mode 100644 index b9426194..00000000 --- a/jobs/jobs/protocols/activity/activity_test.go +++ /dev/null @@ -1,80 +0,0 @@ -package activity_test - -import ( - "context" - "errors" - "github.com/stretchr/testify/assert" - "github.com/test-go/testify/mock" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity/internal/repositories" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" - "go.uber.org/zap" - "testing" - "time" -) - -func Test_ProtocolsActivityJob_Succeed(t *testing.T) { - var mockErr error - activityFetcher := &mockActivityFetch{} - act := repositories.ProtocolActivity{ - Activities: []repositories.Activity{ - { - EmitterChainID: 1, - DestinationChainID: 2, - Txs: 150, - TotalUSD: 250000, - }, - }, - } - - activityFetcher.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(act, mockErr) - activityFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) - - job := activity.NewProtocolActivityJob(mockWriterDB, zap.NewNop(), "v1", activityFetcher) - resultErr := job.Run(context.Background()) - assert.Nil(t, resultErr) -} - -func Test_ProtocolsActivityJob_FailFetching(t *testing.T) { - var mockErr error - activityFetcher := &mockActivityFetch{} - activityFetcher.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(repositories.ProtocolActivity{}, errors.New("mocked_error_fetch")) - activityFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) - - job := activity.NewProtocolActivityJob(mockWriterDB, zap.NewNop(), "v1", activityFetcher) - resultErr := job.Run(context.Background()) - assert.NotNil(t, resultErr) - assert.Equal(t, "mocked_error_fetch", resultErr.Error()) -} - -func Test_ProtocolsActivityJob_FailedUpdatingDB(t *testing.T) { - var mockErr error - activityFetcher := &mockActivityFetch{} - activityFetcher.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(repositories.ProtocolActivity{}, mockErr) - activityFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(errors.New("mocked_error_update_db")) - - job := activity.NewProtocolActivityJob(mockWriterDB, zap.NewNop(), "v1", activityFetcher) - resultErr := job.Run(context.Background()) - assert.NotNil(t, resultErr) - assert.Equal(t, "mocked_error_update_db", resultErr.Error()) -} - -type mockActivityFetch struct { - mock.Mock -} - -func (m *mockActivityFetch) Get(ctx context.Context, from, to time.Time) (repositories.ProtocolActivity, error) { - args := m.Called(ctx, from, to) - return args.Get(0).(repositories.ProtocolActivity), args.Error(1) -} - -func (m *mockActivityFetch) ProtocolName() string { - args := m.Called() - return args.String(0) -} diff --git a/jobs/jobs/protocols/activity/internal/repositories/allbridge.go b/jobs/jobs/protocols/activity/internal/repositories/allbridge.go deleted file mode 100644 index 2b9174e9..00000000 --- a/jobs/jobs/protocols/activity/internal/repositories/allbridge.go +++ /dev/null @@ -1,141 +0,0 @@ -package repositories - -import ( - "context" - "encoding/json" - "github.com/google/uuid" - "github.com/pkg/errors" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" - "go.uber.org/zap" - "io" - "net/http" - "strconv" - "time" -) - -func NewAllBridgeRestClient(name, url string, logger *zap.Logger, httpClient commons.HttpDo) *AllBridgeRestClient { - return &AllBridgeRestClient{ - name: name, - url: url, - logger: logger, - client: httpClient, - } -} - -type AllBridgeRestClient struct { - name string - url string - client commons.HttpDo - logger *zap.Logger -} - -func (d *AllBridgeRestClient) ProtocolName() string { - return d.name -} - -func (d *AllBridgeRestClient) Get(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { - decoratedLogger := d.logger - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) - if err != nil { - decoratedLogger.Error("failed creating http request for retrieving protocol Activities", - zap.Error(err), - ) - return ProtocolActivity{}, errors.WithStack(err) - } - q := req.URL.Query() - q.Set("from", from.Format(time.RFC3339)) - q.Set("to", to.Format(time.RFC3339)) - req.URL.RawQuery = q.Encode() - - reqId := uuid.New().String() - req.Header.Set("X-Request-ID", reqId) - decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) - - resp, err := d.client.Do(req) - if err != nil { - decoratedLogger.Error("failed retrieving protocol Activities", - zap.Error(err), - ) - return ProtocolActivity{}, errors.WithStack(err) - } - defer resp.Body.Close() - - decoratedLogger = decoratedLogger. - With(zap.String("status_code", http.StatusText(resp.StatusCode))). - With(zap.String("response_headers", commons.ToJson(resp.Header))) - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - decoratedLogger.Error("error retrieving protocol Activities: got an invalid response status code", - zap.String("response_body", string(body)), - ) - return ProtocolActivity{}, errors.Errorf("failed retrieving protocol Activities from url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - decoratedLogger.Error("failed reading response body", zap.Error(err)) - return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol Activities. url:%s - status_code:%d", d.url, resp.StatusCode) - } - - var temp allBridgeActivity - err = json.Unmarshal(body, &temp) - if err != nil { - decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) - return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol Activities. url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) - } - - return temp.toProtocolActivity() -} - -type allBridgeActivity struct { - TotalValueSecured string `json:"total_value_secure"` - TotalValueTransferred string `json:"total_value_transferred"` - Activities []struct { - EmitterChainID uint64 `json:"emitter_chain_id"` - DestinationChainID uint64 `json:"destination_chain_id"` - Txs string `json:"txs"` - TotalUSD string `json:"total_usd"` - } `json:"activity"` -} - -func (m *allBridgeActivity) toProtocolActivity() (ProtocolActivity, error) { - result := ProtocolActivity{} - - totalValueSecured, err := strconv.ParseFloat(m.TotalValueSecured, 64) - if err != nil { - return result, errors.Wrap(err, "failed parsing string TotalValueSecure to float64") - } - result.TotalValueSecure = totalValueSecured - - totalValueTransferred, err := strconv.ParseFloat(m.TotalValueTransferred, 64) - if err != nil { - return result, errors.Wrap(err, "failed parsing string TotalValueTransferred to float64") - } - result.TotalValueTransferred = totalValueTransferred - - for i := range m.Activities { - - act := m.Activities[i] - txs, errTxs := strconv.ParseUint(act.Txs, 10, 64) - if errTxs != nil { - return result, errors.Wrap(errTxs, "failed parsing string txs to uint64") - } - - totalUSD, errTotalUSD := strconv.ParseFloat(act.TotalUSD, 64) - if errTotalUSD != nil { - return result, errors.Wrap(errTxs, "failed parsing string total_usd to float64") - } - - a := Activity{ - EmitterChainID: m.Activities[i].EmitterChainID, - DestinationChainID: m.Activities[i].DestinationChainID, - Txs: txs, - TotalUSD: totalUSD, - } - result.Activities = append(result.Activities, a) - } - - return result, nil -} diff --git a/jobs/jobs/protocols/activity/internal/repositories/repository.go b/jobs/jobs/protocols/activity/internal/repositories/repository.go deleted file mode 100644 index d96e2024..00000000 --- a/jobs/jobs/protocols/activity/internal/repositories/repository.go +++ /dev/null @@ -1,26 +0,0 @@ -package repositories - -import ( - "context" - "time" -) - -type ProtocolActivityRepository interface { - Get(ctx context.Context, from, to time.Time) (ProtocolActivity, error) - ProtocolName() string -} - -type ProtocolActivity struct { - TotalValueSecure float64 `json:"total_value_secure"` - TotalValueTransferred float64 `json:"total_value_transferred"` - Volume float64 `json:"volume"` - TotalMessages uint64 `json:"total_messages"` - Activities []Activity `json:"activity"` -} - -type Activity struct { - EmitterChainID uint64 `json:"emitter_chain_id"` - DestinationChainID uint64 `json:"destination_chain_id"` - Txs uint64 `json:"txs"` - TotalUSD float64 `json:"total_usd"` -} diff --git a/jobs/jobs/protocols/activity/types.go b/jobs/jobs/protocols/activity/types.go deleted file mode 100644 index 9cafa6a5..00000000 --- a/jobs/jobs/protocols/activity/types.go +++ /dev/null @@ -1,41 +0,0 @@ -package activity - -import ( - "context" - "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity/internal/repositories" - "go.uber.org/zap" - "net/http" - "time" -) - -// Protocols -const ( - MayanProtocol = "mayan" - AllBridgeProtocol = "allbridge" -) - -type ProtocolsActivityJob struct { - statsDB api.WriteAPIBlocking - logger *zap.Logger - activityFetchers []ClientActivity - version string -} - -// ClientActivity Abstraction for fetching protocol Activity since each client may have different implementation details. -type ClientActivity interface { - Get(ctx context.Context, from, to time.Time) (repositories.ProtocolActivity, error) - ProtocolName() string -} - -// ActivitiesClientsFactory RestClient Factory to create the right client for each protocol. -var ActivitiesClientsFactory = map[string]func(name, url string, logger *zap.Logger) ClientActivity{ - - MayanProtocol: func(name, url string, logger *zap.Logger) ClientActivity { - return repositories.NewMayanRestClient(name, url, logger, &http.Client{}) - }, - - AllBridgeProtocol: func(name, url string, logger *zap.Logger) ClientActivity { - return repositories.NewAllBridgeRestClient(name, url, logger, &http.Client{}) - }, -} diff --git a/jobs/jobs/protocols/internal/commons/common.go b/jobs/jobs/protocols/internal/commons/common.go index e8178a6b..83425036 100644 --- a/jobs/jobs/protocols/internal/commons/common.go +++ b/jobs/jobs/protocols/internal/commons/common.go @@ -5,6 +5,11 @@ import ( "net/http" ) +const ( + MayanProtocol = "mayan" + AllBridgeProtocol = "allbridge" +) + func ToJson(headers http.Header) string { bytes, _ := json.Marshal(headers) return string(bytes) diff --git a/jobs/jobs/protocols/repository/allbridge.go b/jobs/jobs/protocols/repository/allbridge.go new file mode 100644 index 00000000..fb62e5b5 --- /dev/null +++ b/jobs/jobs/protocols/repository/allbridge.go @@ -0,0 +1,234 @@ +package repository + +import ( + "context" + "encoding/json" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" + "go.uber.org/zap" + "io" + "math" + "net/http" + "strconv" + "time" +) + +func NewAllBridgeRestClient(baseURL string, logger *zap.Logger, httpClient commons.HttpDo) *AllBridgeRestClient { + return &AllBridgeRestClient{ + baseURL: baseURL, + logger: logger, + client: httpClient, + } +} + +type AllBridgeRestClient struct { + baseURL string + client commons.HttpDo + logger *zap.Logger +} + +type allBridgeActivity struct { + TotalValueSecured string `json:"total_value_secure"` + TotalValueTransferred string `json:"total_value_transferred"` + Activities []struct { + EmitterChainID uint64 `json:"emitter_chain_id"` + DestinationChainID uint64 `json:"destination_chain_id"` + Txs string `json:"txs"` + TotalUSD string `json:"total_usd"` + } `json:"activity"` +} + +func (d *AllBridgeRestClient) ProtocolName() string { + return commons.AllBridgeProtocol +} + +func (d *AllBridgeRestClient) GetActivity(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { + decoratedLogger := d.logger + + url := d.baseURL + "/wormhole/activity" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + decoratedLogger.Error("failed creating http request for retrieving protocol Activities", + zap.Error(err), + ) + return ProtocolActivity{}, errors.WithStack(err) + } + q := req.URL.Query() + q.Set("from", from.Format(time.RFC3339)) + q.Set("to", to.Format(time.RFC3339)) + req.URL.RawQuery = q.Encode() + + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) + decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) + + resp, err := d.client.Do(req) + if err != nil { + decoratedLogger.Error("failed retrieving protocol Activities", + zap.Error(err), + ) + return ProtocolActivity{}, errors.WithStack(err) + } + defer resp.Body.Close() + + decoratedLogger = decoratedLogger. + With(zap.String("status_code", http.StatusText(resp.StatusCode))). + With(zap.String("response_headers", commons.ToJson(resp.Header))) + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + decoratedLogger.Error("error retrieving protocol Activities: got an invalid response status code", + zap.String("response_body", string(body)), zap.Int("status_code", resp.StatusCode), + ) + return ProtocolActivity{}, errors.Errorf("failed retrieving protocol Activities from baseURL:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err)) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol Activities. baseURL:%s - status_code:%d", d.baseURL, resp.StatusCode) + } + + var temp allBridgeActivity + err = json.Unmarshal(body, &temp) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol Activities. baseURL:%s - status_code:%d - response_body:%s", d.baseURL, resp.StatusCode, string(body)) + } + + return temp.toProtocolActivity() +} + +func (d *AllBridgeRestClient) GetStats(ctx context.Context) (Stats, error) { + + decoratedLogger := d.logger + + url := d.baseURL + "/wormhole/stats" + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + decoratedLogger.Error("failed creating http request for retrieving protocol stats", zap.Error(err)) + return Stats{}, errors.WithStack(err) + } + + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) + decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) + + resp, err := d.client.Do(req) + if err != nil { + decoratedLogger.Error("failed retrieving protocol stats", zap.Error(err)) + return Stats{}, errors.WithStack(err) + } + defer resp.Body.Close() + + decoratedLogger = decoratedLogger. + With(zap.String("status_code", http.StatusText(resp.StatusCode))). + With(zap.String("response_headers", commons.ToJson(resp.Header))) + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + decoratedLogger.Error("error retrieving protocol stats: got an invalid response status code", zap.String("response_body", string(body)), zap.Int("status_code", resp.StatusCode)) + return Stats{}, errors.Errorf("failed retrieving protocol stats from baseURL:%s - status_code:%d - response_body:%s", d.baseURL, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err)) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol stats. baseURL:%s - status_code:%d", d.baseURL, resp.StatusCode) + } + + var allbridgeStats allBridgeStatsResponseDTO + err = json.Unmarshal(body, &allbridgeStats) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol stats. baseURL:%s - status_code:%d - response_body:%s", d.baseURL, resp.StatusCode, string(body)) + } + + return d.toStats(allbridgeStats) +} + +func (m *allBridgeActivity) toProtocolActivity() (ProtocolActivity, error) { + result := ProtocolActivity{} + + totalValueSecured, err := strconv.ParseFloat(m.TotalValueSecured, 64) + if err != nil || math.IsNaN(totalValueSecured) { + return result, errors.Wrap(err, "failed parsing string TotalValueSecure to float64") + } + result.TotalValueSecure = totalValueSecured + + totalValueTransferred, err := strconv.ParseFloat(m.TotalValueTransferred, 64) + if err != nil || math.IsNaN(totalValueTransferred) { + return result, errors.Wrap(err, "failed parsing string TotalValueTransferred to float64") + } + result.TotalValueTransferred = totalValueTransferred + + for i := range m.Activities { + + act := m.Activities[i] + txs, errTxs := strconv.ParseUint(act.Txs, 10, 64) + if errTxs != nil { + return result, errors.Wrap(errTxs, "failed parsing string txs to uint64") + } + + totalUSD, errTotalUSD := strconv.ParseFloat(act.TotalUSD, 64) + if errTotalUSD != nil { + return result, errors.Wrap(errTxs, "failed parsing string total_usd to float64") + } + + a := Activity{ + EmitterChainID: m.Activities[i].EmitterChainID, + DestinationChainID: m.Activities[i].DestinationChainID, + Txs: txs, + TotalUSD: totalUSD, + } + result.Activities = append(result.Activities, a) + } + + return result, nil +} + +type allBridgeStatsResponseDTO struct { + TotalValueLocked string `json:"total_value_locked"` + TotalMessages string `json:"total_messages"` + Volume string `json:"volume"` +} + +func (d *AllBridgeRestClient) toStats(t allBridgeStatsResponseDTO) (Stats, error) { + + convertAndLoad := func(val string, target *float64) error { + if len(val) == 0 { + *target = 0 + return nil + } + floatVal, err := strconv.ParseFloat(val, 64) + if err != nil { + d.logger.Error("failed converting value", zap.Error(err), zap.String("value", val)) + return err + } + *target = floatVal + return nil + } + + var stats Stats + + err := convertAndLoad(t.TotalValueLocked, &stats.TotalValueLocked) + if err != nil { + return stats, err + } + + err = convertAndLoad(t.Volume, &stats.Volume) + if err != nil { + return stats, err + } + + totalMsg, err := strconv.ParseUint(t.TotalMessages, 10, 64) + if err != nil { + return stats, err + } + stats.TotalMessages = totalMsg + + return stats, nil + +} diff --git a/jobs/jobs/protocols/activity/internal/repositories/allbridge_test.go b/jobs/jobs/protocols/repository/allbridge_test.go similarity index 66% rename from jobs/jobs/protocols/activity/internal/repositories/allbridge_test.go rename to jobs/jobs/protocols/repository/allbridge_test.go index 4f650805..3e1e434e 100644 --- a/jobs/jobs/protocols/activity/internal/repositories/allbridge_test.go +++ b/jobs/jobs/protocols/repository/allbridge_test.go @@ -1,4 +1,4 @@ -package repositories +package repository_test import ( "bytes" @@ -6,6 +6,7 @@ import ( "errors" "github.com/stretchr/testify/assert" "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" "go.uber.org/zap" "io" "net/http" @@ -15,77 +16,77 @@ import ( func Test_AllbridgeRestClientActivity_FailRequestCreation(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return nil, nil })) - _, err := a.Get(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error + _, err := a.GetActivity(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error assert.NotNil(t, err) } func Test_AllbridgeRestClientActivity_FailedRequestExecution(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return nil, errors.New("mocked_http_client_do") })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) assert.Equal(t, "mocked_http_client_do", err.Error()) } func Test_AllbridgeRestClientActivity_Status500(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusInternalServerError, Body: io.NopCloser(bytes.NewBufferString("response_body_test")), }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed retrieving protocol Activities from url:localhost - status_code:500 - response_body:response_body_test", err.Error()) + assert.Equal(t, "failed retrieving protocol Activities from baseURL:localhost/wormhole/activity - status_code:500 - response_body:response_body_test", err.Error()) } func Test_AllbridgeRestClientActivity_Status200_FailedReadBody(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: &mocks.MockFailReadCloser{}, }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed reading response body from protocol Activities. url:localhost - status_code:200: mocked_fail_read", err.Error()) + assert.Equal(t, "failed reading response body from protocol Activities. baseURL:localhost - status_code:200: mocked_fail_read", err.Error()) } func Test_AllbridgeRestClientActivity_Status200_FailedParsing(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewBufferString("this should be a json")), }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed unmarshalling response body from protocol Activities. url:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) + assert.Equal(t, "failed unmarshalling response body from protocol Activities. baseURL:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) } func Test_AllbridgeRestClientActivity_Status200_Succeed(t *testing.T) { - a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewAllBridgeRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewBufferString("{\"activity\":[{\"emitter_chain_id\":5,\"destination_chain_id\":1,\"txs\":\"1827\",\"total_usd\":\"445743.185719500000\"}],\"total_value_secure\":\"0\",\"total_value_transferred\":\"5734947.136079277\"}")), }, nil })) - resp, err := a.Get(context.Background(), time.Now(), time.Now()) + resp, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.Nil(t, err) assert.Equal(t, float64(0), resp.TotalValueSecure) assert.Equal(t, 5734947.136079277, resp.TotalValueTransferred) diff --git a/jobs/jobs/protocols/activity/internal/repositories/mayan.go b/jobs/jobs/protocols/repository/mayan.go similarity index 52% rename from jobs/jobs/protocols/activity/internal/repositories/mayan.go rename to jobs/jobs/protocols/repository/mayan.go index 74f8032a..3a547acb 100644 --- a/jobs/jobs/protocols/activity/internal/repositories/mayan.go +++ b/jobs/jobs/protocols/repository/mayan.go @@ -1,4 +1,4 @@ -package repositories +package repository import ( "context" @@ -13,32 +13,27 @@ import ( "time" ) -func NewMayanRestClient(name, url string, logger *zap.Logger, httpClient commons.HttpDo) *MayanRestClient { +func NewMayanRestClient(baseURL string, logger *zap.Logger, httpClient commons.HttpDo) *MayanRestClient { return &MayanRestClient{ - name: name, - url: url, - logger: logger, - client: httpClient, + baseURL: baseURL, + logger: logger, + client: httpClient, } } type MayanRestClient struct { - name string - url string - client commons.HttpDo - logger *zap.Logger + baseURL string + client commons.HttpDo + logger *zap.Logger } -func (d *MayanRestClient) ProtocolName() string { - return d.name -} - -func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { +func (d *MayanRestClient) GetActivity(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { decoratedLogger := d.logger - req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) + url := d.baseURL + "/v3/stats/wh/activity" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { - decoratedLogger.Error("failed creating http request for retrieving protocol Activities", + decoratedLogger.Error("failed creating http request for retrieving protocol activities", zap.Error(err), ) return ProtocolActivity{}, errors.WithStack(err) @@ -54,7 +49,7 @@ func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (Protocol resp, err := d.client.Do(req) if err != nil { - decoratedLogger.Error("failed retrieving protocol Activities", + decoratedLogger.Error("failed retrieving protocol activities", zap.Error(err), ) return ProtocolActivity{}, errors.WithStack(err) @@ -67,16 +62,16 @@ func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (Protocol if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - decoratedLogger.Error("error retrieving protocol Activities: got an invalid response status code", + decoratedLogger.Error("error retrieving protocol activities: got an invalid response status code", zap.String("response_body", string(body)), ) - return ProtocolActivity{}, errors.Errorf("failed retrieving protocol Activities from url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + return ProtocolActivity{}, errors.Errorf("failed retrieving protocol activities from url:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) } body, err := io.ReadAll(resp.Body) if err != nil { decoratedLogger.Error("failed reading response body", zap.Error(err)) - return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol Activities. url:%s - status_code:%d", d.url, resp.StatusCode) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol activities. url:%s - status_code:%d", url, resp.StatusCode) } type mayanActivity struct { @@ -93,7 +88,7 @@ func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (Protocol err = json.Unmarshal(body, &mayanResp) if err != nil { decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) - return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol Activities. url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol activities. url:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) } result := ProtocolActivity{ @@ -126,3 +121,52 @@ func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (Protocol return result, nil } + +func (d *MayanRestClient) GetStats(ctx context.Context) (Stats, error) { + decoratedLogger := d.logger + url := d.baseURL + "/v3/stats/wh/stats" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + decoratedLogger.Error("failed creating http request for retrieving protocol stats", zap.Error(err)) + return Stats{}, errors.WithStack(err) + } + + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) + decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) + + resp, err := d.client.Do(req) + if err != nil { + decoratedLogger.Error("failed retrieving protocol stats", zap.Error(err)) + return Stats{}, errors.WithStack(err) + } + defer resp.Body.Close() + + decoratedLogger = decoratedLogger. + With(zap.String("status_code", http.StatusText(resp.StatusCode))). + With(zap.String("response_headers", commons.ToJson(resp.Header))) + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + decoratedLogger.Error("error retrieving client stats: got an invalid response status code", zap.String("response_body", string(body))) + return Stats{}, errors.Errorf("failed retrieving protocol stats from url:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err)) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol stats. url:%s - status_code:%d", url, resp.StatusCode) + } + var stats Stats + err = json.Unmarshal(body, &stats) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol stats. url:%s - status_code:%d - response_body:%s", url, resp.StatusCode, string(body)) + } + + return stats, nil +} + +func (d *MayanRestClient) ProtocolName() string { + return commons.MayanProtocol +} diff --git a/jobs/jobs/protocols/activity/internal/repositories/mayan_test.go b/jobs/jobs/protocols/repository/mayan_test.go similarity index 65% rename from jobs/jobs/protocols/activity/internal/repositories/mayan_test.go rename to jobs/jobs/protocols/repository/mayan_test.go index 2c4be316..ed54c165 100644 --- a/jobs/jobs/protocols/activity/internal/repositories/mayan_test.go +++ b/jobs/jobs/protocols/repository/mayan_test.go @@ -1,4 +1,4 @@ -package repositories +package repository_test import ( "bytes" @@ -6,6 +6,7 @@ import ( "errors" "github.com/stretchr/testify/assert" "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" "go.uber.org/zap" "io" "net/http" @@ -15,77 +16,77 @@ import ( func Test_HttpRestClientActivity_FailRequestCreation(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return nil, nil })) - _, err := a.Get(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error + _, err := a.GetActivity(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error assert.NotNil(t, err) } func Test_HttpRestClientActivity_FailedRequestExecution(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return nil, errors.New("mocked_http_client_do") })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) assert.Equal(t, "mocked_http_client_do", err.Error()) } func Test_HttpRestClientActivity_Status500(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusInternalServerError, Body: io.NopCloser(bytes.NewBufferString("response_body_test")), }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed retrieving protocol Activities from url:localhost - status_code:500 - response_body:response_body_test", err.Error()) + assert.Equal(t, "failed retrieving protocol activities from url:localhost/v3/stats/wh/activity - status_code:500 - response_body:response_body_test", err.Error()) } func Test_HttpRestClientActivity_Status200_FailedReadBody(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: &mocks.MockFailReadCloser{}, }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed reading response body from protocol Activities. url:localhost - status_code:200: mocked_fail_read", err.Error()) + assert.Equal(t, "failed reading response body from protocol activities. url:localhost/v3/stats/wh/activity - status_code:200: mocked_fail_read", err.Error()) } func Test_HttpRestClientActivity_Status200_FailedParsing(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewBufferString("this should be a json")), }, nil })) - _, err := a.Get(context.Background(), time.Now(), time.Now()) + _, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.NotNil(t, err) - assert.Equal(t, "failed unmarshalling response body from protocol Activities. url:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) + assert.Equal(t, "failed unmarshalling response body from protocol activities. url:localhost/v3/stats/wh/activity - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) } func Test_HttpRestClientActivity_Status200_Succeed(t *testing.T) { - a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + a := repository.NewMayanRestClient("localhost", zap.NewNop(), mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { return &http.Response{ StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewBufferString("{\"total_value_secure\":1640898.7106282723,\"total_value_transferred\":2600395.040031102,\"total_messages\":2225,\"activity\":[{\"emmiter_chain_id\":\"1\",\"destination_chain_id\":\"2\",\"txs\":88,\"total_usd\":648500.9762709612}],\"volume\":2761848.9678057004}")), }, nil })) - resp, err := a.Get(context.Background(), time.Now(), time.Now()) + resp, err := a.GetActivity(context.Background(), time.Now(), time.Now()) assert.Nil(t, err) assert.Equal(t, 1640898.7106282723, resp.TotalValueSecure) assert.Equal(t, 2600395.040031102, resp.TotalValueTransferred) diff --git a/jobs/jobs/protocols/repository/types.go b/jobs/jobs/protocols/repository/types.go new file mode 100644 index 00000000..49e23621 --- /dev/null +++ b/jobs/jobs/protocols/repository/types.go @@ -0,0 +1,48 @@ +package repository + +import ( + "context" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" + "go.uber.org/zap" + "net/http" + "time" +) + +type ProtocolRepository interface { + GetActivity(ctx context.Context, from, to time.Time) (ProtocolActivity, error) + GetStats(ctx context.Context) (Stats, error) + ProtocolName() string +} + +type ProtocolActivity struct { + TotalValueSecure float64 `json:"total_value_secure"` + TotalValueTransferred float64 `json:"total_value_transferred"` + Volume float64 `json:"volume"` + TotalMessages uint64 `json:"total_messages"` + Activities []Activity `json:"activity"` +} + +type Stats struct { + TotalValueLocked float64 `json:"total_value_locked"` + TotalMessages uint64 `json:"total_messages"` + Volume float64 `json:"volume"` +} + +type Activity struct { + EmitterChainID uint64 `json:"emitter_chain_id"` + DestinationChainID uint64 `json:"destination_chain_id"` + Txs uint64 `json:"txs"` + TotalUSD float64 `json:"total_usd"` +} + +// ProtocolsRepositoryFactory RestClient Factory to create the right client for each protocol. +var ProtocolsRepositoryFactory = map[string]func(url string, logger *zap.Logger) ProtocolRepository{ + + commons.MayanProtocol: func(baseURL string, logger *zap.Logger) ProtocolRepository { + return NewMayanRestClient(baseURL, logger, &http.Client{}) + }, + + commons.AllBridgeProtocol: func(baseURL string, logger *zap.Logger) ProtocolRepository { + return NewAllBridgeRestClient(baseURL, logger, &http.Client{}) + }, +} diff --git a/jobs/jobs/protocols/stats.go b/jobs/jobs/protocols/stats.go new file mode 100644 index 00000000..7bffdb98 --- /dev/null +++ b/jobs/jobs/protocols/stats.go @@ -0,0 +1,136 @@ +package protocols + +import ( + "context" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" + "go.uber.org/zap" + "sync" + "time" +) + +type StatsJob struct { + writerDB api.WriteAPIBlocking + logger *zap.Logger + repositories []repository.ProtocolRepository + from time.Time + to time.Time + destinationMeasurement string + statsMeasurement string +} + +// NewStatsJob creates an instance of the job implementation. +func NewStatsJob(writerDB api.WriteAPIBlocking, from, to time.Time, activityMeasurement, statsMeasurement string, repositories []repository.ProtocolRepository, logger *zap.Logger) *StatsJob { + return &StatsJob{ + writerDB: writerDB, + logger: logger.With(zap.String("module", "ProtocolsStatsJob")), + repositories: repositories, + from: from, + to: to, + destinationMeasurement: activityMeasurement, + statsMeasurement: statsMeasurement, + } +} + +func (s *StatsJob) Run(ctx context.Context) error { + + wg := sync.WaitGroup{} + wg.Add(len(s.repositories)) + + s.logger.Info("running protocols stats job", zap.Time("from", s.from), zap.Time("to", s.to)) + + for _, repo := range s.repositories { + go s.processProtocol(ctx, repo, &wg) + } + wg.Wait() + return nil +} + +func (s *StatsJob) processProtocol(ctx context.Context, protocolRepo repository.ProtocolRepository, wg *sync.WaitGroup) { + defer wg.Done() + + var stats repository.Stats + var errStats error + wgStats := sync.WaitGroup{} + wgStats.Add(1) + go func() { + defer wgStats.Done() + stats, errStats = protocolRepo.GetStats(ctx) + }() + + activity, errAct := protocolRepo.GetActivity(ctx, s.from, s.to) + if errAct != nil { + s.logger.Error("failed to get protocol activity", zap.Error(errAct), zap.String("protocol", protocolRepo.ProtocolName()), zap.Time("from", s.from), zap.Time("to", s.to)) + return + } + + wgStats.Wait() + if errStats != nil { + s.logger.Error("failed to get protocol stats", zap.Error(errStats), zap.String("protocol", protocolRepo.ProtocolName())) + return + } + + data := protocolData{ + Stats: stats, + Activity: activity, + } + + errAct = s.updateActivity(ctx, protocolRepo.ProtocolName(), data.Activity, s.from) + if errAct != nil { + s.logger.Error("failed updating protocol activities in influxdb", zap.Error(errAct), zap.String("protocol", protocolRepo.ProtocolName())) + } + + errStats = s.updateStats(ctx, protocolRepo.ProtocolName(), data.Stats, s.to) + if errStats != nil { + s.logger.Error("failed updating protocol stats in influxdb", zap.Error(errStats), zap.String("protocol", protocolRepo.ProtocolName())) + } + +} + +type protocolData struct { + Stats repository.Stats + Activity repository.ProtocolActivity +} + +func (s *StatsJob) updateActivity(ctx context.Context, protocol string, data repository.ProtocolActivity, ts time.Time) error { + + txs := uint64(0) + totalUsd := float64(0) + for _, act := range data.Activities { + txs += act.Txs + totalUsd += act.TotalUSD + } + + point := influxdb2.NewPointWithMeasurement(s.destinationMeasurement). + AddTag("protocol", protocol). + AddField("total_value_secure", data.TotalValueSecure). + AddField("total_value_transferred", data.TotalValueTransferred). + AddField("volume", data.Volume). + AddField("txs", txs). + AddField("total_usd", totalUsd). + SetTime(ts) + + err := s.writerDB.WritePoint(ctx, point) + if err != nil { + s.logger.Error("failed updating protocol Activities in influxdb", zap.Error(err), zap.String("protocol", protocol)) + } + return err +} + +func (s *StatsJob) updateStats(ctx context.Context, protocol string, data repository.Stats, ts time.Time) error { + + point := influxdb2. + NewPointWithMeasurement(s.statsMeasurement). + AddTag("protocol", protocol). + AddField("total_messages", data.TotalMessages). + AddField("total_value_locked", data.TotalValueLocked). + AddField("volume", data.Volume). + SetTime(ts) + + err := s.writerDB.WritePoint(ctx, point) + if err != nil { + s.logger.Error("failed updating protocol stats in influxdb", zap.Error(err), zap.String("protocol", protocol)) + } + return err +} diff --git a/jobs/jobs/protocols/stats/stats.go b/jobs/jobs/protocols/stats/stats.go deleted file mode 100644 index d29e1a46..00000000 --- a/jobs/jobs/protocols/stats/stats.go +++ /dev/null @@ -1,227 +0,0 @@ -package stats - -import ( - "context" - "encoding/json" - "github.com/google/uuid" - influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api" - "github.com/influxdata/influxdb-client-go/v2/api/write" - "github.com/pkg/errors" - "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" - "go.uber.org/zap" - "io" - "net/http" - "strconv" - "sync" - "time" -) - -type ProtocolsStatsJob struct { - statsDB api.WriteAPIBlocking - logger *zap.Logger - statsClientsFetchers []ClientStats - version string -} - -// ClientStats Abstraction for fetching stats since each protocol may have different implementation details. -type ClientStats interface { - Get(ctx context.Context) (Stats, error) - ProtocolName() string -} - -type protocolStats struct { - Stats - Name string -} - -type Stats struct { - TotalValueLocked float64 - TotalMessages uint64 -} - -// NewProtocolsStatsJob creates an instance of the job implementation. -func NewProtocolsStatsJob(statsDB api.WriteAPIBlocking, logger *zap.Logger, version string, statsFetchers ...ClientStats) *ProtocolsStatsJob { - return &ProtocolsStatsJob{ - statsDB: statsDB, - logger: logger.With(zap.String("module", "ProtocolsStatsJob")), - statsClientsFetchers: statsFetchers, - version: version, - } -} - -func (s *ProtocolsStatsJob) Run(ctx context.Context) error { - - clientsQty := len(s.statsClientsFetchers) - wg := sync.WaitGroup{} - wg.Add(clientsQty) - stats := make(chan protocolStats, clientsQty) - var anyError error - - for _, cs := range s.statsClientsFetchers { - go func(c ClientStats) { - defer wg.Done() - st, err := c.Get(ctx) - if err != nil { - anyError = err - return - } - stats <- protocolStats{st, c.ProtocolName()} - }(cs) - } - - wg.Wait() - close(stats) - - err := s.updateStats(ctx, stats) - if err != nil { - anyError = err - } - - return anyError -} - -func (s *ProtocolsStatsJob) updateStats(ctx context.Context, stats <-chan protocolStats) error { - - ts := time.Now().UTC().Truncate(time.Hour) // make minutes and seconds zero, so we only work with date and hour - points := make([]*write.Point, 0, len(stats)) - - for st := range stats { - point := influxdb2. - NewPointWithMeasurement(dbconsts.ProtocolsStatsMeasurement). - AddTag("protocol", st.Name). - AddTag("version", s.version). - AddField("total_messages", st.TotalMessages). - AddField("total_value_locked", st.TotalValueLocked). - SetTime(ts) - - points = append(points, point) - } - - err := s.statsDB.WritePoint(ctx, points...) - if err != nil { - s.logger.Error("failed updating protocol stats in influxdb", zap.Error(err)) - } - return err -} - -// Default implementation of ClientStats interface. Encapsulate the url and http.client for calling a specific external service to retrieve stats -type httpRestClientStats struct { - name string - url string - client httpDo - logger *zap.Logger -} - -type httpDo interface { - Do(req *http.Request) (*http.Response, error) -} - -func NewHttpRestClientStats(name, url string, logger *zap.Logger, httpClient httpDo) ClientStats { - return &httpRestClientStats{ - name: name, - url: url, - logger: logger, - client: httpClient, - } -} - -func (d *httpRestClientStats) ProtocolName() string { - return d.name -} - -func (d *httpRestClientStats) Get(ctx context.Context) (Stats, error) { - - decoratedLogger := d.logger - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) - if err != nil { - decoratedLogger.Error("failed creating http request for retrieving client stats", - zap.Error(err), - ) - return Stats{}, errors.WithStack(err) - } - - reqId := uuid.New().String() - req.Header.Set("X-Request-ID", reqId) - decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) - - resp, err := d.client.Do(req) - if err != nil { - decoratedLogger.Error("failed retrieving client stats", - zap.Error(err), - ) - return Stats{}, errors.WithStack(err) - } - defer resp.Body.Close() - - decoratedLogger = decoratedLogger. - With(zap.String("status_code", http.StatusText(resp.StatusCode))). - With(zap.String("response_headers", toJson(resp.Header))) - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - decoratedLogger.Error("error retrieving client stats: got an invalid response status code", - zap.String("response_body", string(body)), - ) - return Stats{}, errors.Errorf("failed retrieving client stats from url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - decoratedLogger.Error("failed reading response body", zap.Error(err)) - return Stats{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from client stats. url:%s - status_code:%d", d.url, resp.StatusCode) - } - var stats Stats - err = json.Unmarshal(body, &stats) - if err != nil { - decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) - return Stats{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from client stats. url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) - } - - return stats, nil - -} - -func toJson(headers http.Header) string { - bytes, _ := json.Marshal(headers) - return string(bytes) -} - -func (rd *Stats) UnmarshalJSON(data []byte) error { - - temp := struct { - TotalValueLocked json.RawMessage `json:"total_value_locked"` - TotalMessages json.RawMessage `json:"total_messages"` - }{} - - if err := json.Unmarshal(data, &temp); err != nil { - return err - } - - if err := parseJSONNumber(temp.TotalValueLocked, &rd.TotalValueLocked); err != nil { - return err - } - - var totalMsg float64 - if err := parseJSONNumber(temp.TotalMessages, &totalMsg); err != nil { - return err - } - - rd.TotalMessages = uint64(totalMsg) - return nil -} - -// parseJSONNumber helps to support both string and numeric JSON values since different protocols return different types for the same fields. -func parseJSONNumber(raw json.RawMessage, dest *float64) error { - var strVal string - if err := json.Unmarshal(raw, &strVal); err == nil { - val, err1 := strconv.ParseFloat(strVal, 64) - if err1 != nil { - return err1 - } - *dest = val - return nil - } - return json.Unmarshal(raw, dest) -} diff --git a/jobs/jobs/protocols/stats/stats_test.go b/jobs/jobs/protocols/stats/stats_test.go deleted file mode 100644 index e9c91c36..00000000 --- a/jobs/jobs/protocols/stats/stats_test.go +++ /dev/null @@ -1,165 +0,0 @@ -package stats_test - -import ( - "bytes" - "context" - "errors" - "github.com/stretchr/testify/assert" - "github.com/test-go/testify/mock" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" - "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/stats" - "go.uber.org/zap" - "io" - "net/http" - "testing" -) - -func Test_ProtocolsStatsJob_Succeed(t *testing.T) { - var mockErr error - statsFetcher := &mockStatsFetch{} - statsFetcher.On("Get", mock.Anything).Return(stats.Stats{}, mockErr) - statsFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) - - job := stats.NewProtocolsStatsJob(mockWriterDB, zap.NewNop(), "v1", statsFetcher) - resultErr := job.Run(context.Background()) - assert.Nil(t, resultErr) -} - -func Test_ProtocolsStatsJob_FailFetching(t *testing.T) { - var mockErr error - statsFetcher := &mockStatsFetch{} - statsFetcher.On("Get", mock.Anything).Return(stats.Stats{}, errors.New("mocked_error_fetch")) - statsFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) - - job := stats.NewProtocolsStatsJob(mockWriterDB, zap.NewNop(), "v1", statsFetcher) - resultErr := job.Run(context.Background()) - assert.NotNil(t, resultErr) - assert.Equal(t, "mocked_error_fetch", resultErr.Error()) -} - -func Test_ProtocolsStatsJob_FailedUpdatingDB(t *testing.T) { - var mockErr error - statsFetcher := &mockStatsFetch{} - statsFetcher.On("Get", mock.Anything).Return(stats.Stats{}, mockErr) - statsFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") - mockWriterDB := &mocks.MockWriterApi{} - mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(errors.New("mocked_error_update_db")) - - job := stats.NewProtocolsStatsJob(mockWriterDB, zap.NewNop(), "v1", statsFetcher) - resultErr := job.Run(context.Background()) - assert.NotNil(t, resultErr) - assert.Equal(t, "mocked_error_update_db", resultErr.Error()) -} - -func Test_HttpRestClientStats_FailRequestCreation(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return nil, nil - })) - _, err := a.Get(nil) // passing ctx nil to force request creation error - assert.NotNil(t, err) -} - -func Test_HttpRestClientStats_FailedRequestExecution(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return nil, errors.New("mocked_http_client_do") - })) - _, err := a.Get(context.Background()) - assert.NotNil(t, err) - assert.Equal(t, "mocked_http_client_do", err.Error()) -} - -func Test_HttpRestClientStats_Status500(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusInternalServerError, - Body: io.NopCloser(bytes.NewBufferString("response_body_test")), - }, nil - })) - _, err := a.Get(context.Background()) - assert.NotNil(t, err) - assert.Equal(t, "failed retrieving client stats from url:localhost - status_code:500 - response_body:response_body_test", err.Error()) -} - -func Test_HttpRestClientStats_Status200_FailedReadBody(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Body: &mockFailReadCloser{}, - }, nil - })) - _, err := a.Get(context.Background()) - assert.NotNil(t, err) - assert.Equal(t, "failed reading response body from client stats. url:localhost - status_code:200: mocked_fail_read", err.Error()) -} - -func Test_HttpRestClientStats_Status200_FailedParsing(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewBufferString("this should be a json")), - }, nil - })) - _, err := a.Get(context.Background()) - assert.NotNil(t, err) - assert.Equal(t, "failed unmarshalling response body from client stats. url:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) -} - -func Test_HttpRestClientStats_Status200_Succeed(t *testing.T) { - - a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), - mockHttpClient(func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewBufferString("{\"total_value_locked\":\"123\",\"total_messages\":\"456\"}")), - }, nil - })) - resp, err := a.Get(context.Background()) - assert.Nil(t, err) - assert.Equal(t, float64(123), resp.TotalValueLocked) - assert.Equal(t, uint64(456), resp.TotalMessages) -} - -type mockStatsFetch struct { - mock.Mock -} - -func (m *mockStatsFetch) Get(ctx context.Context) (stats.Stats, error) { - args := m.Called(ctx) - return args.Get(0).(stats.Stats), args.Error(1) -} - -func (m *mockStatsFetch) ProtocolName() string { - args := m.Called() - return args.String(0) -} - -type mockHttpClient func(req *http.Request) (*http.Response, error) - -func (m mockHttpClient) Do(req *http.Request) (*http.Response, error) { - return m(req) -} - -type mockFailReadCloser struct { -} - -func (m *mockFailReadCloser) Read(p []byte) (n int, err error) { - return 0, errors.New("mocked_fail_read") -} - -func (m *mockFailReadCloser) Close() error { - return nil -} diff --git a/jobs/jobs/protocols/stats_test.go b/jobs/jobs/protocols/stats_test.go new file mode 100644 index 00000000..ff458fc6 --- /dev/null +++ b/jobs/jobs/protocols/stats_test.go @@ -0,0 +1,184 @@ +package protocols_test + +import ( + "context" + "errors" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api/write" + "github.com/stretchr/testify/assert" + "github.com/test-go/testify/mock" + "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/repository" + "go.uber.org/zap" + "testing" + "time" +) + +func Test_ProtocolsStatsJob_Success(t *testing.T) { + + ctx := context.Background() + from, _ := time.Parse(time.RFC3339, "2024-02-01T00:00:00Z") + to, _ := time.Parse(time.RFC3339, "2024-02-02T00:00:00Z") + + mr := &mockProtocolRepo{} + activity := repository.ProtocolActivity{ + TotalValueSecure: 10, + TotalValueTransferred: 20, + Volume: 30, + TotalMessages: 40, + Activities: []repository.Activity{ + { + EmitterChainID: 1, + DestinationChainID: 2, + Txs: 50, + TotalUSD: 60, + }, + { + EmitterChainID: 1, + DestinationChainID: 2, + Txs: 25, + TotalUSD: 30, + }, + }, + } + + stats := repository.Stats{ + TotalValueLocked: 70, + TotalMessages: 80, + Volume: 90, + } + + mr.On("GetActivity", ctx, from, to).Return(activity, nil) + mr.On("GetStats", ctx).Return(stats, nil) + mr.On("ProtocolName").Return(commons.MayanProtocol) + + mockWriterDB := &mocks.MockWriterApi{} + + expectedStatsPoint := influxdb2. + NewPointWithMeasurement(dbconsts.ProtocolsStatsMeasurementDaily). + AddTag("protocol", commons.MayanProtocol). + AddField("total_messages", stats.TotalMessages). + AddField("total_value_locked", stats.TotalValueLocked). + AddField("volume", stats.Volume). + SetTime(from) + + expectedActivityPoint := influxdb2.NewPointWithMeasurement(dbconsts.ProtocolsActivityMeasurementDaily). + AddTag("protocol", commons.MayanProtocol). + AddField("total_value_secure", activity.TotalValueSecure). + AddField("total_value_transferred", activity.TotalValueTransferred). + AddField("volume", activity.Volume). + AddField("txs", 75). + AddField("total_usd", 90). + SetTime(from) + + mockWriterDB.On("WritePoint", ctx, mock.MatchedBy(pointMatcher{Expected: expectedStatsPoint}.Matches)).Return(nil) + mockWriterDB.On("WritePoint", ctx, mock.MatchedBy(pointMatcher{Expected: expectedActivityPoint}.Matches)).Return(nil).Times(1) + + job := protocols.NewStatsJob(mockWriterDB, + from, + to, + dbconsts.ProtocolsActivityMeasurementDaily, + dbconsts.ProtocolsStatsMeasurementDaily, + []repository.ProtocolRepository{mr}, + zap.NewNop()) + + err := job.Run(ctx) + assert.Nil(t, err) + mockWriterDB.AssertNumberOfCalls(t, "WritePoint", 2) +} + +func Test_ProtocolsStatsJob_FailedFetchingStats(t *testing.T) { + ctx := context.Background() + from, _ := time.Parse(time.RFC3339, "2024-02-01T00:00:00Z") + to, _ := time.Parse(time.RFC3339, "2024-02-02T00:00:00Z") + + mr := &mockProtocolRepo{} + activity := repository.ProtocolActivity{} + + stats := repository.Stats{} + + mr.On("GetActivity", ctx, from, to).Return(activity, nil) + mr.On("GetStats", ctx).Return(stats, errors.New("mocked_error")) + mr.On("ProtocolName").Return(commons.MayanProtocol) + + mockWriterDB := &mocks.MockWriterApi{} + job := protocols.NewStatsJob(mockWriterDB, + from, + to, + dbconsts.ProtocolsActivityMeasurementDaily, + dbconsts.ProtocolsStatsMeasurementDaily, + []repository.ProtocolRepository{mr}, + zap.NewNop()) + + err := job.Run(ctx) + assert.Nil(t, err) + mockWriterDB.AssertNumberOfCalls(t, "WritePoint", 0) +} + +func Test_ProtocolsStatsJob_FailedFetchingActivity(t *testing.T) { + ctx := context.Background() + from, _ := time.Parse(time.RFC3339, "2024-02-01T00:00:00Z") + to, _ := time.Parse(time.RFC3339, "2024-02-02T00:00:00Z") + + mr := &mockProtocolRepo{} + activity := repository.ProtocolActivity{} + + stats := repository.Stats{} + + mr.On("GetActivity", ctx, from, to).Return(activity, errors.New("mocked_error")) + mr.On("GetStats", ctx).Return(stats, nil) + mr.On("ProtocolName").Return(commons.MayanProtocol) + + mockWriterDB := &mocks.MockWriterApi{} + job := protocols.NewStatsJob(mockWriterDB, + from, + to, + dbconsts.ProtocolsActivityMeasurementDaily, + dbconsts.ProtocolsStatsMeasurementDaily, + []repository.ProtocolRepository{mr}, + zap.NewNop()) + + err := job.Run(ctx) + assert.Nil(t, err) + mockWriterDB.AssertNumberOfCalls(t, "WritePoint", 0) + +} + +type mockProtocolRepo struct { + mock.Mock +} + +func (m *mockProtocolRepo) GetActivity(ctx context.Context, from, to time.Time) (repository.ProtocolActivity, error) { + args := m.Called(ctx, from, to) + return args.Get(0).(repository.ProtocolActivity), args.Error(1) +} +func (m *mockProtocolRepo) GetStats(ctx context.Context) (repository.Stats, error) { + args := m.Called(ctx) + return args.Get(0).(repository.Stats), args.Error(1) +} +func (m *mockProtocolRepo) ProtocolName() string { + args := m.Called() + return args.String(0) +} + +type pointMatcher struct { + Expected *write.Point +} + +func (p pointMatcher) Matches(x interface{}) bool { + actual, ok := x.([]*write.Point) + if !ok || len(actual) != 1 { + return false + } + + // Perform your comparison logic here + // For example, check if the measurement name matches + return actual[0].Name() == p.Expected.Name() +} + +func (p pointMatcher) String() string { + return "matches the expected point" +}