From 6bbfa7bf237f3a91dd8e7f60b7e9c5170c0c046d Mon Sep 17 00:00:00 2001 From: Mariano <9205080+marianososto@users.noreply.github.com> Date: Mon, 4 Mar 2024 15:55:53 -0300 Subject: [PATCH] Add cache to protocols stats endpoints (/protocols/stats) (#1149) * add cache to protocols stats endpoint add mock for cache add unit test case for cache miss change from to current * add configs * add missing config for api-service.yaml * add cache ttl for staging-mainnet --- api/handlers/protocols/service.go | 58 +++++++++++---- api/handlers/protocols/service_test.go | 95 +++++++++++++++++++++++- api/internal/config/config.go | 28 ++++--- api/main.go | 2 +- common/client/cache/mock/mock.go | 26 +++++++ deploy/api/api-service.yaml | 4 + deploy/api/env/production-mainnet.env | 1 + deploy/api/env/production-testnet.env | 1 + deploy/api/env/staging-mainnet.env | 1 + deploy/api/env/staging-testnet.env | 1 + jobs/jobs/protocols/activity/activity.go | 4 +- 11 files changed, 188 insertions(+), 33 deletions(-) create mode 100644 common/client/cache/mock/mock.go diff --git a/api/handlers/protocols/service.go b/api/handlers/protocols/service.go index 45bfbe3a..7c28cf4e 100644 --- a/api/handlers/protocols/service.go +++ b/api/handlers/protocols/service.go @@ -2,15 +2,22 @@ package protocols import ( "context" + "encoding/json" + "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" "go.uber.org/zap" "strconv" + "strings" "sync" + "time" ) type Service struct { - Protocols []string - repo *Repository - logger *zap.Logger + Protocols []string + repo *Repository + logger *zap.Logger + cache cache.Cache + cacheKeyPrefix string + cacheTTL int } type ProtocolTotalValuesDTO struct { @@ -24,11 +31,14 @@ type ProtocolTotalValuesDTO struct { Error string `json:"error,omitempty"` } -func NewService(protocols []string, repo *Repository, logger *zap.Logger) *Service { +func NewService(protocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int) *Service { return &Service{ - Protocols: protocols, - repo: repo, - logger: logger, + Protocols: protocols, + repo: repo, + logger: logger, + cache: cache, + cacheKeyPrefix: cacheKeyPrefix, + cacheTTL: cacheTTL, } } @@ -51,36 +61,48 @@ func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalVa return resultsSlice } -func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup, contributor string, results chan<- ProtocolTotalValuesDTO) { +func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup, protocol string, results chan<- ProtocolTotalValuesDTO) { defer wg.Done() + cacheKey := s.cacheKeyPrefix + ":" + strings.ToUpper(protocol) + cachedValue, errCache := s.cache.Get(ctx, cacheKey) + if errCache == nil { + var val ProtocolTotalValuesDTO + errCacheUnmarshall := json.Unmarshal([]byte(cachedValue), &val) + if errCacheUnmarshall == nil { + results <- val + return + } + s.logger.Error("error unmarshalling cache value", zap.Error(errCacheUnmarshall), zap.String("cache_key", cacheKey)) + } + type statsResult struct { result stats Err error } statsRes := make(chan statsResult, 1) go func() { - rowStats, errStats := s.repo.getProtocolStats(ctx, contributor) + rowStats, errStats := s.repo.getProtocolStats(ctx, protocol) statsRes <- statsResult{result: rowStats, Err: errStats} close(statsRes) }() - activity, err := s.repo.getProtocolActivity(ctx, contributor) + activity, err := s.repo.getProtocolActivity(ctx, protocol) if err != nil { - s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", contributor)) - results <- ProtocolTotalValuesDTO{Protocol: contributor, Error: err.Error()} + s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", protocol)) + results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: err.Error()} return } rStats := <-statsRes if rStats.Err != nil { - s.logger.Error("error fetching protocol stats", zap.Error(rStats.Err), zap.String("protocol", contributor)) - results <- ProtocolTotalValuesDTO{Protocol: contributor, Error: rStats.Err.Error()} + s.logger.Error("error fetching protocol stats", zap.Error(rStats.Err), zap.String("protocol", protocol)) + results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: rStats.Err.Error()} return } dto := ProtocolTotalValuesDTO{ - Protocol: contributor, + Protocol: protocol, TotalValueLocked: rStats.result.Latest.TotalValueLocked, TotalMessages: rStats.result.Latest.TotalMessages, TotalValueTransferred: activity.TotalValueTransferred, @@ -95,5 +117,11 @@ func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup dto.LastDayDiffPercentage = strconv.FormatFloat(float64(last24HrMessages)/float64(totalMessagesAsFromLast24hr)*100, 'f', 2, 64) + "%" } + dtoJson, _ := json.Marshal(dto) // don't handle error since the full lifecycle of the dto is under this scope + errCache = s.cache.Set(ctx, cacheKey, string(dtoJson), time.Duration(s.cacheTTL)*time.Minute) + if errCache != nil { + s.logger.Error("error setting cache", zap.Error(errCache), zap.String("cache_key", cacheKey)) + } + results <- dto } diff --git a/api/handlers/protocols/service_test.go b/api/handlers/protocols/service_test.go index 4ab43039..0c6f6e09 100644 --- a/api/handlers/protocols/service_test.go +++ b/api/handlers/protocols/service_test.go @@ -9,9 +9,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/test-go/testify/mock" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/protocols" + "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" + cacheMock "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/mock" "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" "go.uber.org/zap" "testing" + "time" ) func TestService_GetProtocolsTotalValues(t *testing.T) { @@ -56,7 +59,7 @@ func TestService_GetProtocolsTotalValues(t *testing.T) { queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil) repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) @@ -101,7 +104,7 @@ func TestService_GetProtocolsTotalValues_FailedFetchingActivity(t *testing.T) { queryAPI.On("Query", ctx, activityQuery).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_activity_error")) repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) @@ -143,7 +146,7 @@ func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, errNil) repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) @@ -152,6 +155,92 @@ func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { assert.Equal(t, "mocked_fetching_stats_error", values[0].Error) } +func TestService_GetProtocolsTotalValues_CacheHit(t *testing.T) { + ctx := context.Background() + mockCache := &cacheMock.CacheMock{} + var cacheErr error + cacheErr = nil + mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return(`{"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":4,"last_day_diff_percentage":"75.00%"}`, cacheErr) + service := protocols.NewService([]string{"protocol1"}, nil, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 0) + values := service.GetProtocolsTotalValues(ctx) + assert.Equal(t, 1, len(values)) + assert.Equal(t, "protocol1", values[0].Protocol) + assert.Equal(t, 5.00, values[0].TotalValueLocked) + assert.Equal(t, uint64(7), values[0].TotalMessages) + assert.Equal(t, 9.00, values[0].TotalValueSecured) + assert.Equal(t, 7.00, values[0].TotalValueTransferred) + assert.Equal(t, uint64(4), values[0].LastDayMessages) + assert.Equal(t, "75.00%", values[0].LastDayDiffPercentage) + +} + +func TestService_GetProtocolsTotalValues_CacheMiss_FetchAndUpdate(t *testing.T) { + + ctx := context.Background() + mockCache := &cacheMock.CacheMock{} + mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return("", cache.ErrNotFound) // mock cache miss + + // mock cache update, validate it's called once. + mockCache.On("Set", + ctx, + "WORMSCAN:PROTOCOLS:PROTOCOL1", + `{"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":3,"last_day_diff_percentage":"75.00%"}`, + time.Duration(60)*time.Minute). + Return(nil). + Times(1) + + var errNil error + respStatsLatest := &mockQueryTableResult{} + respStatsLatest.On("Next").Return(true) + respStatsLatest.On("Err").Return(errNil) + respStatsLatest.On("Close").Return(errNil) + respStatsLatest.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(7), + "total_value_locked": float64(5), + })) + + respStatsLastDay := &mockQueryTableResult{} + respStatsLastDay.On("Next").Return(true) + respStatsLastDay.On("Err").Return(errNil) + respStatsLastDay.On("Close").Return(errNil) + respStatsLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_locked": float64(5), + })) + + respActivityLast := &mockQueryTableResult{} + respActivityLast.On("Next").Return(true) + respActivityLast.On("Err").Return(errNil) + respActivityLast.On("Close").Return(errNil) + respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(15), + "total_value_transferred": float64(7), + "total_value_secure": float64(9), + })) + + queryAPI := &mockQueryAPI{} + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil) + repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 60) + + values := service.GetProtocolsTotalValues(ctx) + assert.Equal(t, 1, len(values)) + assert.Equal(t, "protocol1", values[0].Protocol) + assert.Equal(t, 5.00, values[0].TotalValueLocked) + assert.Equal(t, uint64(7), values[0].TotalMessages) + assert.Equal(t, 9.00, values[0].TotalValueSecured) + assert.Equal(t, 7.00, values[0].TotalValueTransferred) + assert.Equal(t, uint64(3), values[0].LastDayMessages) + assert.Equal(t, "75.00%", values[0].LastDayDiffPercentage) +} + type mockQueryAPI struct { mock.Mock } diff --git a/api/internal/config/config.go b/api/internal/config/config.go index afc61483..1fcd00f7 100644 --- a/api/internal/config/config.go +++ b/api/internal/config/config.go @@ -32,12 +32,14 @@ type AppConfig struct { Name string } Cache struct { - URL string - TvlKey string - TvlExpiration int - Enabled bool - MetricExpiration int - Prefix string + URL string + TvlKey string + TvlExpiration int + Enabled bool + MetricExpiration int + Prefix string + ProtocolsStatsKey string + ProtocolsStatsExpiration int } PORT int LogLevel string @@ -80,12 +82,14 @@ func (cfg *AppConfig) GetLogLevel() (ipfslog.LogLevel, error) { func defaulConfig() *AppConfig { return &AppConfig{ Cache: struct { - URL string - TvlKey string - TvlExpiration int - Enabled bool - MetricExpiration int - Prefix string + URL string + TvlKey string + TvlExpiration int + Enabled bool + MetricExpiration int + Prefix string + ProtocolsStatsKey string + ProtocolsStatsExpiration int }{ MetricExpiration: 10, }, diff --git a/api/main.go b/api/main.go index 50f8f38a..f66ea69c 100644 --- a/api/main.go +++ b/api/main.go @@ -179,7 +179,7 @@ func main() { relaysService := relays.NewService(relaysRepo, rootLogger) operationsService := operations.NewService(operationsRepo, rootLogger) statsService := stats.NewService(statsRepo, cache, expirationTime, metrics, rootLogger) - protocolsService := protocols.NewService(cfg.Protocols, protocolsRepo, rootLogger) + protocolsService := protocols.NewService(cfg.Protocols, protocolsRepo, rootLogger, cache, cfg.Cache.ProtocolsStatsKey, cfg.Cache.ProtocolsStatsExpiration) // Set up a custom error handler response.SetEnableStackTrace(*cfg) diff --git a/common/client/cache/mock/mock.go b/common/client/cache/mock/mock.go new file mode 100644 index 00000000..76c6bb73 --- /dev/null +++ b/common/client/cache/mock/mock.go @@ -0,0 +1,26 @@ +package mock + +import ( + "context" + "github.com/test-go/testify/mock" + "time" +) + +// CacheMock exported type to provide mock for cache.Cache interface +type CacheMock struct { + mock.Mock +} + +func (c *CacheMock) Get(ctx context.Context, key string) (string, error) { + args := c.Called(ctx, key) + return args.String(0), args.Error(1) +} + +func (c *CacheMock) Close() error { + return nil +} + +func (c *CacheMock) Set(ctx context.Context, key string, value interface{}, expirations time.Duration) error { + args := c.Called(ctx, key, value, expirations) + return args.Error(0) +} diff --git a/deploy/api/api-service.yaml b/deploy/api/api-service.yaml index 19530b40..65e5db8d 100644 --- a/deploy/api/api-service.yaml +++ b/deploy/api/api-service.yaml @@ -151,6 +151,10 @@ spec: key: protocols-activity-version - name: WORMSCAN_PROTOCOLS value: {{ .WORMSCAN_PROTOCOLS }} + - name: WORMSCAN_CACHE_PROTOCOLSSTATSEXPIRATION + value: "{{ .WORMSCAN_CACHE_PROTOCOLSSTATSEXPIRATION }}" + - name: WORMSCAN_CACHE_PROTOCOLSSTATSKEY + value: "WORMSCAN:PROTOCOLS_STATS" resources: limits: memory: {{ .RESOURCES_LIMITS_MEMORY }} diff --git a/deploy/api/env/production-mainnet.env b/deploy/api/env/production-mainnet.env index 675fef72..058d138a 100644 --- a/deploy/api/env/production-mainnet.env +++ b/deploy/api/env/production-mainnet.env @@ -21,3 +21,4 @@ WORMSCAN_VAAPAYLOADPARSER_URL= WORMSCAN_VAAPAYLOADPARSER_TIMEOUT=10 WORMSCAN_VAAPAYLOADPARSER_ENABLED=true WORMSCAN_PROTOCOLS= +WORMSCAN_CACHE_PROTOCOLSSTATSEXPIRATION=60 \ No newline at end of file diff --git a/deploy/api/env/production-testnet.env b/deploy/api/env/production-testnet.env index 2d20b1e2..41381130 100644 --- a/deploy/api/env/production-testnet.env +++ b/deploy/api/env/production-testnet.env @@ -21,3 +21,4 @@ WORMSCAN_VAAPAYLOADPARSER_URL= WORMSCAN_VAAPAYLOADPARSER_TIMEOUT=10 WORMSCAN_VAAPAYLOADPARSER_ENABLED=true WORMSCAN_PROTOCOLS= +WORMSCAN_CACHE_PROTOCOLSSTATSEXPIRATION=60 \ No newline at end of file diff --git a/deploy/api/env/staging-mainnet.env b/deploy/api/env/staging-mainnet.env index 9abb47cc..18860396 100644 --- a/deploy/api/env/staging-mainnet.env +++ b/deploy/api/env/staging-mainnet.env @@ -21,3 +21,4 @@ WORMSCAN_VAAPAYLOADPARSER_URL= WORMSCAN_VAAPAYLOADPARSER_TIMEOUT=10 WORMSCAN_VAAPAYLOADPARSER_ENABLED=true WORMSCAN_PROTOCOLS=allbridge,mayan +WORMSCAN_CACHE_PROTOCOLSSTATSEXPIRATION=60 \ No newline at end of file diff --git a/deploy/api/env/staging-testnet.env b/deploy/api/env/staging-testnet.env index e8daa771..05b8eab3 100644 --- a/deploy/api/env/staging-testnet.env +++ b/deploy/api/env/staging-testnet.env @@ -21,3 +21,4 @@ WORMSCAN_VAAPAYLOADPARSER_URL= WORMSCAN_VAAPAYLOADPARSER_TIMEOUT=10 WORMSCAN_VAAPAYLOADPARSER_ENABLED=true WORMSCAN_PROTOCOLS= +WORMSCAN_CACHE_PROTOCOLSSTATSEXPIRATION=60 \ No newline at end of file diff --git a/jobs/jobs/protocols/activity/activity.go b/jobs/jobs/protocols/activity/activity.go index bfc54c08..151aa762 100644 --- a/jobs/jobs/protocols/activity/activity.go +++ b/jobs/jobs/protocols/activity/activity.go @@ -30,7 +30,7 @@ func (m *ProtocolsActivityJob) Run(ctx context.Context) error { wg.Add(clientsQty) errs := make(chan error, clientsQty) ts := time.Now().UTC().Truncate(time.Hour) // make minutes and seconds zero, so we only work with date and hour - from := ts.Add(-1 * time.Hour) + from := time.Unix(0, 0).UTC() m.logger.Info("running protocols activity job ", zap.Time("from", from), zap.Time("to", ts)) for _, cs := range m.activityFetchers { go func(c ClientActivity) { @@ -40,7 +40,7 @@ func (m *ProtocolsActivityJob) Run(ctx context.Context) error { errs <- err return } - errs <- m.updateActivity(ctx, c.ProtocolName(), m.version, activity, from) + errs <- m.updateActivity(ctx, c.ProtocolName(), m.version, activity, ts) }(cs) }