[1587] Change CCTP stats for /protocols/stats (#1673)

* #1587 add task to downsample total cctp volume and txs

* fix identation

* add adjustment to task

* change query for cctp in /protocols/stats

* fix identations

* fix unit-tests
This commit is contained in:
Mariano 2024-09-12 14:21:10 -03:00 committed by GitHub
parent 847a9b1336
commit a00cd74bd0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 148 additions and 17 deletions

View File

@ -0,0 +1,67 @@
import "date"
option task = {
name: "cctp total volume all-time every hour",
every: 1h,
}
bucketInfinite = "wormscan"
bucket24Hr = "wormscan-24hours"
destMeasurement = "cctp_status_total_v2"
nowts = date.truncate(t: now(), unit: 1h)
lastData = from(bucket: bucket24Hr)
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == destMeasurement)
|> last()
|> drop(columns:["_start","_stop","_measurement"])
lastTxs = lastData
|> filter(fn: (r) => r._field == "txs")
lastVolume = lastData
|> filter(fn: (r) => r._field == "volume")
lastExecutionTime = lastData
|> keep(columns:["_time"])
|> tableFind(fn: (key) => true)
|> getRecord(idx: 0)
deltaData = from(bucket: bucketInfinite)
|> range(start: lastExecutionTime._time, stop:now())
|> filter(fn: (r) => r._measurement == "circle-message-sent")
|> filter(fn: (r) => r.messageProtocol == "wormhole")
|> filter(fn: (r) => r._field == "amount")
|> keep(columns:["_field","_value"])
|> toUInt()
|> reduce(
identity: {
volume: uint(v:0),
txs: uint(v:0)
},
fn: (r, accumulator) => ({
volume: accumulator.volume + r._value,
txs: accumulator.txs + uint(v:1)
})
)
deltaTxs = deltaData
|> drop(columns:["volume"])
|> rename(columns: {txs: "_value"})
|> set(key:"_field",value:"txs")
deltaVolume = deltaData
|> drop(columns:["txs"])
|> rename(columns: {volume: "_value"})
|> set(key:"_field",value:"volume")
txs = union(tables:[lastTxs, deltaTxs])
|> sum()
volume = union(tables:[lastVolume, deltaVolume])
|> sum()
union(tables:[txs, volume])
|> set(key:"_measurement", value: destMeasurement)
|> map(fn: (r) => ({ r with _time: nowts }))
|> to(bucket: bucket24Hr)

View File

@ -186,6 +186,7 @@ const QueryLast24HrActivity = `
type Repository struct {
queryAPI QueryDoer
logger *zap.Logger
bucket24Hrs string
bucketInfinite string
bucket30d string
coreProtocolMeasurement struct {
@ -203,9 +204,9 @@ type rowStat struct {
}
type intRowStat struct {
Protocol string `mapstructure:"app_id"`
TotalMessages uint64 `mapstructure:"total_messages"`
TotalValueTransferred uint64 `mapstructure:"total_value_transferred"`
Protocol string `mapstructure:"app_id"`
TotalMessages uint64 `mapstructure:"total_messages"`
TotalValueTransferred float64 `mapstructure:"total_value_transferred"`
}
type intStats struct {
@ -247,11 +248,12 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer {
return &queryApiWrapper{qApi: qApi}
}
func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d string, logger *zap.Logger) *Repository {
func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, bucket24Hrs string, logger *zap.Logger) *Repository {
return &Repository{
queryAPI: qApi,
bucketInfinite: bucketInfinite,
bucket30d: bucket30d,
bucket24Hrs: bucket24Hrs,
logger: logger,
coreProtocolMeasurement: struct {
Daily string
@ -335,7 +337,7 @@ func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (
}, nil
}
// returns latest and last 24 hr for core protocols (cctp and portal_token_bridge)
// returns latest and last 24 hr for core protocols (portal_token_bridge and ntt)
func (r *Repository) getCoreProtocolStats(ctx context.Context, protocol string) (intStats, error) {
// calculate total values till the start of current day
@ -355,7 +357,7 @@ func (r *Repository) getCoreProtocolStats(ctx context.Context, protocol string)
latestTotal := intRowStat{
Protocol: protocol,
TotalMessages: totalsUntilToday.TotalMessages + currentDayStats.TotalMessages,
TotalValueTransferred: totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred,
TotalValueTransferred: (totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred) / getProtocolDecimals(protocol),
}
result := intStats{
@ -368,11 +370,51 @@ func (r *Repository) getCoreProtocolStats(ctx context.Context, protocol string)
if errQ3 != nil {
return result, errQ3
}
deltaYesterdayStats.TotalValueTransferred = deltaYesterdayStats.TotalValueTransferred / getProtocolDecimals(protocol)
result.DeltaLast24hr = deltaYesterdayStats
return result, nil
}
func (r *Repository) getCCTPStats(ctx context.Context, protocol string) (intStats, error) {
queryTemplate := `
from(bucket: "%s")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "cctp_status_total_v2")
%s
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> rename(columns: {txs: "total_messages", volume: "total_value_transferred"})
`
q := fmt.Sprintf(queryTemplate, r.bucket24Hrs, "|> last()")
statsData, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q, protocol)
if err != nil {
r.logger.Error("error fetching cctp totals stats", zap.Error(err))
return intStats{}, err
}
q = fmt.Sprintf(queryTemplate, r.bucket24Hrs, "|> first()")
totals24HrAgo, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q, protocol)
if err != nil {
r.logger.Error("error fetching cctp totals stats", zap.Error(err))
return intStats{}, err
}
return intStats{
Latest: intRowStat{
Protocol: protocol,
TotalMessages: statsData.TotalMessages,
TotalValueTransferred: statsData.TotalValueTransferred / getProtocolDecimals(protocol),
},
DeltaLast24hr: intRowStat{
Protocol: protocol,
TotalMessages: statsData.TotalMessages - totals24HrAgo.TotalMessages,
TotalValueTransferred: (statsData.TotalValueTransferred - totals24HrAgo.TotalValueTransferred) / getProtocolDecimals(protocol),
},
}, nil
}
func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, query, protocol string) (T, error) {
var res T
result, err := queryAPI.Query(ctx, query)
@ -394,3 +436,12 @@ func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx co
err = mapstructure.Decode(result.Record().Values(), &res)
return res, err
}
func getProtocolDecimals(protocol string) float64 {
switch protocol {
case CCTP:
return 1e6
default:
return 1e8
}
}

View File

@ -48,6 +48,8 @@ type tvlProvider interface {
Get(ctx context.Context) (string, error)
}
type fetchProtocolStats func(ctx context.Context, protocol string) (intStats, error)
func NewService(extProtocols, coreProtocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int, metrics metrics.Metrics, tvlProvider tvlProvider) *Service {
return &Service{
Protocols: extProtocols,
@ -64,10 +66,10 @@ func NewService(extProtocols, coreProtocols []string, repo *Repository, logger *
func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalValuesDTO {
wg := &sync.WaitGroup{}
totalProtocols := len(s.Protocols) + len(s.coreProtocols)
wg.Add(totalProtocols)
results := make(chan ProtocolTotalValuesDTO, totalProtocols)
wg := &sync.WaitGroup{}
wg.Add(totalProtocols)
for _, p := range s.Protocols {
go s.fetchProtocolValues(ctx, wg, p, results, s.getProtocolStats)
@ -75,6 +77,7 @@ func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalVa
for _, p := range s.coreProtocols {
go s.fetchProtocolValues(ctx, wg, p, results, s.getCoreProtocolStats)
}
wg.Wait()
close(results)
@ -122,14 +125,14 @@ func (s *Service) fetchProtocolValues(ctx context.Context, wg *sync.WaitGroup, p
results <- res
}
// getProtocolStats fetches stats for CCTP and PortalTokenBridge
// getProtocolStats fetches stats for PortalTokenBridge and NTT
func (s *Service) getCoreProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) {
protocolStats, err := s.repo.getCoreProtocolStats(ctx, protocol)
protocolStats, err := s.getFetchFn(protocol)(ctx, protocol)
if err != nil {
return ProtocolStats{
Protocol: protocol,
TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8,
TotalValueTransferred: protocolStats.Latest.TotalValueTransferred,
TotalMessages: protocolStats.Latest.TotalMessages,
}, err
}
@ -137,10 +140,10 @@ func (s *Service) getCoreProtocolStats(ctx context.Context, protocol string) (Pr
diffLastDay := protocolStats.DeltaLast24hr.TotalMessages
val := ProtocolStats{
Protocol: protocol,
TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8,
TotalValueTransferred: protocolStats.Latest.TotalValueTransferred,
TotalMessages: protocolStats.Latest.TotalMessages,
LastDayMessages: diffLastDay,
Last24HourVolume: float64(protocolStats.DeltaLast24hr.TotalValueTransferred) / 1e8,
Last24HourVolume: protocolStats.DeltaLast24hr.TotalValueTransferred,
}
lastDayTotalMessages := protocolStats.Latest.TotalMessages - diffLastDay
@ -166,6 +169,15 @@ func (s *Service) getCoreProtocolStats(ctx context.Context, protocol string) (Pr
return val, nil
}
func (s *Service) getFetchFn(protocol string) fetchProtocolStats {
switch protocol {
case CCTP:
return s.repo.getCCTPStats
default:
return s.repo.getCoreProtocolStats
}
}
func (s *Service) getProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) {
type statsResult struct {

View File

@ -84,7 +84,7 @@ func TestService_GetProtocolsTotalValues(t *testing.T) {
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucket30d", ts.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, "protocol1")).Return(respActivity2, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryLast24HrActivity, "bucketInfinite", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(last24respActivity, nil)
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop())
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "bucket24hr", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{})
values := service.GetProtocolsTotalValues(ctx)
@ -130,7 +130,7 @@ func TestService_GetProtocolsTotalValues_FailedFetchingActivity(t *testing.T) {
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolStatsLastDay, "bucket30d", from.Format(time.RFC3339), to.Format(time.RFC3339), dbconsts.ProtocolsStatsMeasurementHourly, "protocol1")).Return(respStatsLastDay, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucketInfinite", "1970-01-01T00:00:00Z", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(&mockQueryTableResult{}, errors.New("mocked_error"))
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop())
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "bucket24hr", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{})
values := service.GetProtocolsTotalValues(ctx)
@ -187,7 +187,7 @@ func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) {
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateProtocolActivity, "bucket30d", ts.Format(time.RFC3339), dbconsts.ProtocolsActivityMeasurementHourly, "protocol1")).Return(respActivity2, nil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryLast24HrActivity, "bucketInfinite", dbconsts.ProtocolsActivityMeasurementDaily, "protocol1")).Return(respActivity2, nil)
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop())
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "bucket24hr", zap.NewNop())
service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{})
values := service.GetProtocolsTotalValues(ctx)
@ -257,7 +257,7 @@ func TestService_GetCCTP_Stats(t *testing.T) {
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolDeltaSinceStartOfDay, "bucket30d", dbconsts.TotalProtocolsStatsHourly, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(deltaSinceStartOfDay, errNil)
queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryCoreProtocolDeltaLastDay, "bucket30d", dbconsts.TotalProtocolsStatsHourly, protocols.PortalTokenBridge, protocols.PortalTokenBridge)).Return(deltaLastDay, errNil)
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", zap.NewNop())
repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "bucket24hr", zap.NewNop())
service := protocols.NewService([]string{}, []string{protocols.PortalTokenBridge}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{})
values := service.GetProtocolsTotalValues(ctx)
assert.NotNil(t, values)

View File

@ -197,6 +197,7 @@ func main() {
protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)),
cfg.Influx.BucketInfinite,
cfg.Influx.Bucket30Days,
cfg.Influx.Bucket24Hours,
rootLogger)
guardianSetRepository := repository.NewGuardianSetRepository(db.Database, rootLogger)