From a00cd74bd0a0bcd4f4edf25c706d6f2d5d21b736 Mon Sep 17 00:00:00 2001 From: Mariano <9205080+marianososto@users.noreply.github.com> Date: Thu, 12 Sep 2024 14:21:10 -0300 Subject: [PATCH] [1587] Change CCTP stats for /protocols/stats (#1673) * #1587 add task to downsample total cctp volume and txs * fix identation * add adjustment to task * change query for cctp in /protocols/stats * fix identations * fix unit-tests --- analytics/scripts/cctp_all_time_1h.flux | 67 +++++++++++++++++++++++++ api/handlers/protocols/repository.go | 63 ++++++++++++++++++++--- api/handlers/protocols/service.go | 26 +++++++--- api/handlers/protocols/service_test.go | 8 +-- api/main.go | 1 + 5 files changed, 148 insertions(+), 17 deletions(-) create mode 100644 analytics/scripts/cctp_all_time_1h.flux diff --git a/analytics/scripts/cctp_all_time_1h.flux b/analytics/scripts/cctp_all_time_1h.flux new file mode 100644 index 00000000..1c5872fd --- /dev/null +++ b/analytics/scripts/cctp_all_time_1h.flux @@ -0,0 +1,67 @@ +import "date" + +option task = { + name: "cctp total volume all-time every hour", + every: 1h, +} + +bucketInfinite = "wormscan" +bucket24Hr = "wormscan-24hours" +destMeasurement = "cctp_status_total_v2" +nowts = date.truncate(t: now(), unit: 1h) + +lastData = from(bucket: bucket24Hr) + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == destMeasurement) + |> last() + |> drop(columns:["_start","_stop","_measurement"]) + +lastTxs = lastData + |> filter(fn: (r) => r._field == "txs") + +lastVolume = lastData + |> filter(fn: (r) => r._field == "volume") + +lastExecutionTime = lastData + |> keep(columns:["_time"]) + |> tableFind(fn: (key) => true) + |> getRecord(idx: 0) + +deltaData = from(bucket: bucketInfinite) + |> range(start: lastExecutionTime._time, stop:now()) + |> filter(fn: (r) => r._measurement == "circle-message-sent") + |> filter(fn: (r) => r.messageProtocol == "wormhole") + |> filter(fn: (r) => r._field == "amount") + |> keep(columns:["_field","_value"]) + |> toUInt() + |> reduce( + identity: { + volume: uint(v:0), + txs: uint(v:0) + }, + fn: (r, accumulator) => ({ + volume: accumulator.volume + r._value, + txs: accumulator.txs + uint(v:1) + }) + ) + +deltaTxs = deltaData + |> drop(columns:["volume"]) + |> rename(columns: {txs: "_value"}) + |> set(key:"_field",value:"txs") + +deltaVolume = deltaData + |> drop(columns:["txs"]) + |> rename(columns: {volume: "_value"}) + |> set(key:"_field",value:"volume") + +txs = union(tables:[lastTxs, deltaTxs]) + |> sum() + +volume = union(tables:[lastVolume, deltaVolume]) + |> sum() + +union(tables:[txs, volume]) + |> set(key:"_measurement", value: destMeasurement) + |> map(fn: (r) => ({ r with _time: nowts })) + |> to(bucket: bucket24Hr) diff --git a/api/handlers/protocols/repository.go b/api/handlers/protocols/repository.go index 12d4ac24..68898cef 100644 --- a/api/handlers/protocols/repository.go +++ b/api/handlers/protocols/repository.go @@ -186,6 +186,7 @@ const QueryLast24HrActivity = ` type Repository struct { queryAPI QueryDoer logger *zap.Logger + bucket24Hrs string bucketInfinite string bucket30d string coreProtocolMeasurement struct { @@ -203,9 +204,9 @@ type rowStat struct { } type intRowStat struct { - Protocol string `mapstructure:"app_id"` - TotalMessages uint64 `mapstructure:"total_messages"` - TotalValueTransferred uint64 `mapstructure:"total_value_transferred"` + Protocol string `mapstructure:"app_id"` + TotalMessages uint64 `mapstructure:"total_messages"` + TotalValueTransferred float64 `mapstructure:"total_value_transferred"` } type intStats struct { @@ -247,11 +248,12 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer { return &queryApiWrapper{qApi: qApi} } -func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d string, logger *zap.Logger) *Repository { +func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, bucket24Hrs string, logger *zap.Logger) *Repository { return &Repository{ queryAPI: qApi, bucketInfinite: bucketInfinite, bucket30d: bucket30d, + bucket24Hrs: bucket24Hrs, logger: logger, coreProtocolMeasurement: struct { Daily string @@ -335,7 +337,7 @@ func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) ( }, nil } -// returns latest and last 24 hr for core protocols (cctp and portal_token_bridge) +// returns latest and last 24 hr for core protocols (portal_token_bridge and ntt) func (r *Repository) getCoreProtocolStats(ctx context.Context, protocol string) (intStats, error) { // calculate total values till the start of current day @@ -355,7 +357,7 @@ func (r *Repository) getCoreProtocolStats(ctx context.Context, protocol string) latestTotal := intRowStat{ Protocol: protocol, TotalMessages: totalsUntilToday.TotalMessages + currentDayStats.TotalMessages, - TotalValueTransferred: totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred, + TotalValueTransferred: (totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred) / getProtocolDecimals(protocol), } result := intStats{ @@ -368,11 +370,51 @@ func (r *Repository) getCoreProtocolStats(ctx context.Context, protocol string) if errQ3 != nil { return result, errQ3 } + deltaYesterdayStats.TotalValueTransferred = deltaYesterdayStats.TotalValueTransferred / getProtocolDecimals(protocol) result.DeltaLast24hr = deltaYesterdayStats return result, nil } +func (r *Repository) getCCTPStats(ctx context.Context, protocol string) (intStats, error) { + + queryTemplate := ` + from(bucket: "%s") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "cctp_status_total_v2") + %s + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> rename(columns: {txs: "total_messages", volume: "total_value_transferred"}) + ` + q := fmt.Sprintf(queryTemplate, r.bucket24Hrs, "|> last()") + statsData, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q, protocol) + if err != nil { + r.logger.Error("error fetching cctp totals stats", zap.Error(err)) + return intStats{}, err + } + + q = fmt.Sprintf(queryTemplate, r.bucket24Hrs, "|> first()") + totals24HrAgo, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q, protocol) + if err != nil { + r.logger.Error("error fetching cctp totals stats", zap.Error(err)) + return intStats{}, err + } + + return intStats{ + Latest: intRowStat{ + Protocol: protocol, + TotalMessages: statsData.TotalMessages, + TotalValueTransferred: statsData.TotalValueTransferred / getProtocolDecimals(protocol), + }, + DeltaLast24hr: intRowStat{ + Protocol: protocol, + TotalMessages: statsData.TotalMessages - totals24HrAgo.TotalMessages, + TotalValueTransferred: (statsData.TotalValueTransferred - totals24HrAgo.TotalValueTransferred) / getProtocolDecimals(protocol), + }, + }, nil + +} + func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, query, protocol string) (T, error) { var res T result, err := queryAPI.Query(ctx, query) @@ -394,3 +436,12 @@ func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx co err = mapstructure.Decode(result.Record().Values(), &res) return res, err } + +func getProtocolDecimals(protocol string) float64 { + switch protocol { + case CCTP: + return 1e6 + default: + return 1e8 + } +} diff --git a/api/handlers/protocols/service.go b/api/handlers/protocols/service.go index e12d225f..4ffdee5c 100644 --- a/api/handlers/protocols/service.go +++ b/api/handlers/protocols/service.go @@ -48,6 +48,8 @@ type tvlProvider interface { Get(ctx context.Context) (string, error) } +type fetchProtocolStats func(ctx context.Context, protocol string) (intStats, error) + func NewService(extProtocols, coreProtocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int, metrics metrics.Metrics, tvlProvider tvlProvider) *Service { return &Service{ Protocols: extProtocols, @@ -64,10 +66,10 @@ func NewService(extProtocols, coreProtocols []string, repo *Repository, logger * func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalValuesDTO { - wg := &sync.WaitGroup{} totalProtocols := len(s.Protocols) + len(s.coreProtocols) - wg.Add(totalProtocols) results := make(chan ProtocolTotalValuesDTO, totalProtocols) + wg := &sync.WaitGroup{} + wg.Add(totalProtocols) for _, p := range s.Protocols { go s.fetchProtocolValues(ctx, wg, p, results, s.getProtocolStats) @@ -75,6 +77,7 @@ func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalVa for _, p := range s.coreProtocols { go s.fetchProtocolValues(ctx, wg, p, results, s.getCoreProtocolStats) } + wg.Wait() close(results) @@ -122,14 +125,14 @@ func (s *Service) fetchProtocolValues(ctx context.Context, wg *sync.WaitGroup, p results <- res } -// getProtocolStats fetches stats for CCTP and PortalTokenBridge +// getProtocolStats fetches stats for PortalTokenBridge and NTT func (s *Service) getCoreProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { - protocolStats, err := s.repo.getCoreProtocolStats(ctx, protocol) + protocolStats, err := s.getFetchFn(protocol)(ctx, protocol) if err != nil { return ProtocolStats{ Protocol: protocol, - TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8, + TotalValueTransferred: protocolStats.Latest.TotalValueTransferred, TotalMessages: protocolStats.Latest.TotalMessages, }, err } @@ -137,10 +140,10 @@ func (s *Service) getCoreProtocolStats(ctx context.Context, protocol string) (Pr diffLastDay := protocolStats.DeltaLast24hr.TotalMessages val := ProtocolStats{ Protocol: protocol, - TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8, + TotalValueTransferred: protocolStats.Latest.TotalValueTransferred, TotalMessages: protocolStats.Latest.TotalMessages, LastDayMessages: diffLastDay, - Last24HourVolume: float64(protocolStats.DeltaLast24hr.TotalValueTransferred) / 1e8, + Last24HourVolume: protocolStats.DeltaLast24hr.TotalValueTransferred, } lastDayTotalMessages := protocolStats.Latest.TotalMessages - diffLastDay @@ -166,6 +169,15 @@ func (s *Service) getCoreProtocolStats(ctx context.Context, protocol string) (Pr return val, nil } +func (s *Service) getFetchFn(protocol string) fetchProtocolStats { + switch protocol { + case CCTP: + return s.repo.getCCTPStats + default: + return s.repo.getCoreProtocolStats + } +} + func (s *Service) getProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { type statsResult struct { diff --git a/api/handlers/protocols/service_test.go b/api/handlers/protocols/service_test.go index 39c4a178..565b3ea0 100644 --- a/api/handlers/protocols/service_test.go +++ b/api/handlers/protocols/service_test.go @@ -84,7 +84,7 @@ func TestService_GetProtocolsTotalValues(t *testing.T) { queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucket30d", ts.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, "protocol1")).Return(respActivity2, nil) queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryLast24HrActivity, "bucketInfinite", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(last24respActivity, nil) - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "bucket24hr", zap.NewNop()) service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) @@ -130,7 +130,7 @@ func TestService_GetProtocolsTotalValues_FailedFetchingActivity(t *testing.T) { queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStatsLastDay, "bucket30d", from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLastDay, nil) queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucketInfinite", "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(&mockQueryTableResult{}, errors.New("mocked_error")) - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "bucket24hr", zap.NewNop()) service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) @@ -187,7 +187,7 @@ func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucket30d", ts.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, "protocol1")).Return(respActivity2, nil) queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryLast24HrActivity, "bucketInfinite", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(respActivity2, nil) - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "bucket24hr", zap.NewNop()) service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) @@ -257,7 +257,7 @@ func TestService_GetCCTP_Stats(t *testing.T) { queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolDeltaSinceStartOfDay, "bucket30d", dbconsts.TotalProtocolsStatsHourly, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(deltaSinceStartOfDay, errNil) queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolDeltaLastDay, "bucket30d", dbconsts.TotalProtocolsStatsHourly, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(deltaLastDay, errNil) - repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop()) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "bucket24hr", zap.NewNop()) service := protocols.NewService([]string{}, []string{protocols.PortalTokenBridge}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.NotNil(t, values) diff --git a/api/main.go b/api/main.go index c19491be..77d43d56 100644 --- a/api/main.go +++ b/api/main.go @@ -197,6 +197,7 @@ func main() { protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), cfg.Influx.BucketInfinite, cfg.Influx.Bucket30Days, + cfg.Influx.Bucket24Hours, rootLogger) guardianSetRepository := repository.NewGuardianSetRepository(db.Database, rootLogger)