From ffeb5927f8f055d09c9821792db73b3d6d6f2e51 Mon Sep 17 00:00:00 2001 From: walker-16 Date: Thu, 20 Apr 2023 17:02:25 -0300 Subject: [PATCH] Add notional cache client (#241) * Add notional cache client * gracefull shutdown pubsub and distributed cache --- api/go.mod | 5 +- api/go.sum | 3 +- api/internal/cache/cache.go | 23 +++-- api/internal/cache/dummycache.go | 9 +- api/internal/cache/notional/cache.go | 149 +++++++++++++++++++++++++++ api/internal/cache/notional/dummy.go | 24 +++++ api/internal/config/config.go | 1 + api/main.go | 33 ++++-- deploy/api/api-service.yaml | 2 + jobs/jobs/notional/notional.go | 2 +- 10 files changed, 230 insertions(+), 21 deletions(-) create mode 100644 api/internal/cache/notional/cache.go create mode 100644 api/internal/cache/notional/dummy.go diff --git a/api/go.mod b/api/go.mod index c596e8d8..82f5128b 100644 --- a/api/go.mod +++ b/api/go.mod @@ -7,7 +7,6 @@ require ( github.com/certusone/wormhole/node v0.0.0-20230315165931-62bef9ffb441 github.com/ethereum/go-ethereum v1.10.21 github.com/gagliardetto/solana-go v1.7.1 - github.com/go-redis/redis/v8 v8.11.5 github.com/gofiber/adaptor/v2 v2.1.29 github.com/gofiber/fiber/v2 v2.39.0 github.com/improbable-eng/grpc-web v0.15.0 @@ -26,6 +25,8 @@ require ( google.golang.org/grpc v1.50.1 ) +require github.com/go-redis/redis/v8 v8.11.5 + require ( contrib.go.opencensus.io/exporter/stackdriver v0.13.14 // indirect filippo.io/edwards25519 v1.0.0 // indirect @@ -38,7 +39,7 @@ require ( github.com/blendle/zapdriver v1.3.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/deepmap/oapi-codegen v1.8.2 // indirect diff --git a/api/go.sum b/api/go.sum index b98d434b..9aaef819 100644 --- a/api/go.sum +++ b/api/go.sum @@ -113,8 +113,9 @@ github.com/certusone/wormhole/node v0.0.0-20230315165931-62bef9ffb441 h1:GA0tsKb github.com/certusone/wormhole/node v0.0.0-20230315165931-62bef9ffb441/go.mod h1:U/qHGQY1vAw//UlPYVQKSL9b3C2nL6YakCYorMEz3GY= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= diff --git a/api/internal/cache/cache.go b/api/internal/cache/cache.go index f5fb844f..f6fccd31 100644 --- a/api/internal/cache/cache.go +++ b/api/internal/cache/cache.go @@ -22,15 +22,20 @@ type CacheClient struct { logger *zap.Logger } +// CacheReadable is the interface for notiona cache client. +type CacheReadable interface { + Get(ctx context.Context, key string) (string, error) + Close() error +} + type CacheGetFunc func(ctx context.Context, key string) (string, error) // NewCacheClient init a new cache client. -func NewCacheClient(url string, enabled bool, log *zap.Logger) *CacheClient { - client := redis.NewClient( - &redis.Options{ - Addr: url, - }) - return &CacheClient{Client: client, Enabled: enabled, logger: log} +func NewCacheClient(redisClient *redis.Client, enabled bool, log *zap.Logger) (*CacheClient, error) { + if redisClient == nil { + return nil, errors.New("redis client is nil") + } + return &CacheClient{Client: redisClient, Enabled: enabled, logger: log}, nil } // Get get a cache value or error from a key. @@ -45,7 +50,7 @@ func (c *CacheClient) Get(ctx context.Context, key string) (string, error) { if err != nil { if err != redis.Nil { requestID := fmt.Sprintf("%v", ctx.Value("requestid")) - c.logger.Error("ket does not exist in cache", + c.logger.Error("key does not exist in cache", zap.Error(err), zap.String("key", key), zap.String("requestID", requestID)) return "", errs.ErrNotFound } @@ -53,3 +58,7 @@ func (c *CacheClient) Get(ctx context.Context, key string) (string, error) { } return value, nil } + +func (c *CacheClient) Close() error { + return c.Client.Close() +} diff --git a/api/internal/cache/dummycache.go b/api/internal/cache/dummycache.go index b1964c14..d83d6a17 100644 --- a/api/internal/cache/dummycache.go +++ b/api/internal/cache/dummycache.go @@ -11,8 +11,8 @@ type DummyCacheClient struct { } // NewDummyCacheClient create a new instance of DummyCacheClient -func NewDummyCacheClient() DummyCacheClient { - return DummyCacheClient{} +func NewDummyCacheClient() *DummyCacheClient { + return &DummyCacheClient{} } // Get get method is a dummy method that always does not find the cache. @@ -20,3 +20,8 @@ func NewDummyCacheClient() DummyCacheClient { func (d *DummyCacheClient) Get(ctx context.Context, key string) (string, error) { return "", errs.ErrNotFound } + +// Close dummy cache client. +func (d *DummyCacheClient) Close() error { + return nil +} diff --git a/api/internal/cache/notional/cache.go b/api/internal/cache/notional/cache.go new file mode 100644 index 00000000..9f116432 --- /dev/null +++ b/api/internal/cache/notional/cache.go @@ -0,0 +1,149 @@ +package notional + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + "github.com/go-redis/redis/v8" + "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" +) + +const ( + wormscanNotionalCacheKeyRegex = "*WORMSCAN:NOTIONAL:CHAIN_ID:*" + wormscanNotionalUpdated = "NOTIONAL_UPDATED" +) + +var ( + ErrNotFound = errors.New("NOT FOUND") + ErrInvalidCacheField = errors.New("INVALID CACHE FIELD") +) + +// NotionalLocalCacheReadable is the interface for notional local cache. +type NotionalLocalCacheReadable interface { + Get(chainID vaa.ChainID) (NotionalCacheField, error) + Close() error +} + +// NotionalCacheField is the notional value of assets in cache. +type NotionalCacheField struct { + NotionalUsd float64 `json:"notional_usd"` + UpdatedAt time.Time `json:"updated_at"` +} + +// NotionalCacheClient redis cache client. +type NotionalCache struct { + client *redis.Client + pubSub *redis.PubSub + channel string + notionalMap sync.Map + logger *zap.Logger +} + +// NewNotionalCache create a new cache client. +// After create a NotionalCache use the Init method to initialize pubsub and load the cache. +func NewNotionalCache(ctx context.Context, redisClient *redis.Client, channel string, log *zap.Logger) (*NotionalCache, error) { + if redisClient == nil { + return nil, errors.New("redis client is nil") + } + + pubsub := redisClient.Subscribe(ctx, channel) + return &NotionalCache{ + client: redisClient, + pubSub: pubsub, + channel: channel, + notionalMap: sync.Map{}, + logger: log}, nil +} + +// Init subscribe to notional pubsub and load the cache. +func (c *NotionalCache) Init(ctx context.Context) error { + // load notional cache + err := c.loadCache(ctx) + if err != nil { + return err + } + + // notional cache updated channel subscribe + c.subscribe(ctx) + + return nil +} + +// loadCache load notional cache from redis. +func (c *NotionalCache) loadCache(ctx context.Context) error { + scanCom := c.client.Scan(ctx, 0, wormscanNotionalCacheKeyRegex, 100) + for { + // Scan for notional keys + keys, cursor, err := scanCom.Result() + if err != nil { + c.logger.Error("loadCache", zap.Error(err)) + return err + } + + // Get notional value from keys + for _, key := range keys { + var field NotionalCacheField + value, err := c.client.Get(ctx, key).Result() + json.Unmarshal([]byte(value), &field) + if err != nil { + c.logger.Error("loadCache", zap.Error(err)) + return err + } + // Save notional value to local cache + c.notionalMap.Store(key, field) + } + + if cursor == 0 { + break + } + } + return nil +} + +// Subscribe to a notional update channel and load new values for the notional cache. +func (c *NotionalCache) subscribe(ctx context.Context) { + ch := c.pubSub.Channel() + + go func() { + for msg := range ch { + if wormscanNotionalUpdated == msg.Payload { + // update notional cache + c.loadCache(ctx) + } + } + }() +} + +// Close the pubsub channel. +func (c *NotionalCache) Close() error { + return c.pubSub.Close() +} + +// Get notional cache value. +func (c *NotionalCache) Get(chainID vaa.ChainID) (NotionalCacheField, error) { + var notional NotionalCacheField + + // get notional cache key + key := fmt.Sprintf("WORMSCAN:NOTIONAL:CHAIN_ID:%d", chainID) + + // get notional cache value + field, ok := c.notionalMap.Load(key) + if !ok { + return notional, ErrNotFound + } + + // convert any field to NotionalCacheField + notional, ok = field.(NotionalCacheField) + if !ok { + c.logger.Error("invalid notional cache field", + zap.Any("field", field), + zap.Any("chainID", chainID)) + return notional, ErrInvalidCacheField + } + return notional, nil +} diff --git a/api/internal/cache/notional/dummy.go b/api/internal/cache/notional/dummy.go new file mode 100644 index 00000000..e63b43d8 --- /dev/null +++ b/api/internal/cache/notional/dummy.go @@ -0,0 +1,24 @@ +package notional + +import ( + "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +// DummyNotionalCache is a dummy notional cache. +type DummyNotionalCache struct { +} + +// NewDummyNotionalCache init a new dummy notional cache. +func NewDummyNotionalCache() *DummyNotionalCache { + return &DummyNotionalCache{} +} + +// Get get notional cache value. +func (c *DummyNotionalCache) Get(chainID vaa.ChainID) (NotionalCacheField, error) { + return NotionalCacheField{}, nil +} + +// Close the dummy cache. +func (c *DummyNotionalCache) Close() error { + return nil +} diff --git a/api/internal/config/config.go b/api/internal/config/config.go index 4c2c3b82..e9733edd 100644 --- a/api/internal/config/config.go +++ b/api/internal/config/config.go @@ -33,6 +33,7 @@ type AppConfig struct { } Cache struct { URL string + Channel string Enabled bool } PORT int diff --git a/api/main.go b/api/main.go index 06cc6113..72f20e21 100644 --- a/api/main.go +++ b/api/main.go @@ -11,6 +11,7 @@ import ( "syscall" "github.com/ansrivas/fiberprometheus/v2" + "github.com/go-redis/redis/v8" "github.com/gofiber/adaptor/v2" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" @@ -28,6 +29,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa" wormscanCache "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache" + wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache/notional" "github.com/wormhole-foundation/wormhole-explorer/api/internal/config" "github.com/wormhole-foundation/wormhole-explorer/api/internal/db" "github.com/wormhole-foundation/wormhole-explorer/api/middleware" @@ -36,7 +38,6 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan" rpcApi "github.com/wormhole-foundation/wormhole-explorer/api/rpc" xlogger "github.com/wormhole-foundation/wormhole-explorer/common/logger" - "go.uber.org/zap" ) @@ -99,7 +100,7 @@ func main() { db := cli.Database(cfg.DB.Name) // Get cache get function - cacheGetFunc := NewCache(cfg, rootLogger) + cache, notionalCache := NewCache(appCtx, cfg, rootLogger) //InfluxDB client influxCli := newInfluxClient(cfg.Influx.URL, cfg.Influx.Token) @@ -115,7 +116,7 @@ func main() { // Set up services addressService := address.NewService(addressRepo, rootLogger) - vaaService := vaa.NewService(vaaRepo, cacheGetFunc, rootLogger) + vaaService := vaa.NewService(vaaRepo, cache.Get, rootLogger) obsService := observations.NewService(obsRepo, rootLogger) governorService := governor.NewService(governorRepo, rootLogger) infrastructureService := infrastructure.NewService(infrastructureRepo, rootLogger) @@ -179,17 +180,33 @@ func main() { rootLogger.Info("cleanup tasks...") rootLogger.Info("shutdown server...") app.Shutdown() + rootLogger.Info("close pubsub notional...") + notionalCache.Close() + rootLogger.Info("close cache...") + cache.Close() rootLogger.Info("finished successfully wormhole api") } -// NewCache return a CacheGetFunc to get a value by a Key from cache. -func NewCache(cfg *config.AppConfig, looger *zap.Logger) wormscanCache.CacheGetFunc { +// NewCache get a CacheGetFunc to get a value by a Key from cache and a CacheReadable to get a value by a Key from notional local cache. +func NewCache(ctx context.Context, cfg *config.AppConfig, logger *zap.Logger) (wormscanCache.CacheReadable, wormscanNotionalCache.NotionalLocalCacheReadable) { + // if run mode is development with cache is disabled, return a dummy cache client and a dummy notional cache client. if cfg.RunMode == config.RunModeDevelopmernt && !cfg.Cache.Enabled { dummyCacheClient := wormscanCache.NewDummyCacheClient() - return dummyCacheClient.Get + dummyNotionalCache := wormscanNotionalCache.NewDummyNotionalCache() + return dummyCacheClient, dummyNotionalCache } - cacheClient := wormscanCache.NewCacheClient(cfg.Cache.URL, cfg.Cache.Enabled, looger) - return cacheClient.Get + + // if we are not in development mode, use a distributed cache and for notional a pubsub to sync local cache. + redisClient := redis.NewClient(&redis.Options{Addr: cfg.Cache.URL}) + + // get cache client + cacheClient, _ := wormscanCache.NewCacheClient(redisClient, cfg.Cache.Enabled, logger) + + // get notional cache client and init load to local cache + notionalCache, _ := wormscanNotionalCache.NewNotionalCache(ctx, redisClient, cfg.Cache.Channel, logger) + notionalCache.Init(ctx) + + return cacheClient, notionalCache } func newInfluxClient(url, token string) influxdb2.Client { diff --git a/deploy/api/api-service.yaml b/deploy/api/api-service.yaml index 303cdfbe..4087849e 100644 --- a/deploy/api/api-service.yaml +++ b/deploy/api/api-service.yaml @@ -78,6 +78,8 @@ spec: key: redis-uri - name: WORMSCAN_CACHE_ENABLED value: "true" + - name: WORMSCAN_CACHE_CHANNEL + value: ""WORMSCAN:NOTIONAL" - name: WORMSCAN_PPROF_ENABLED value: "{{ .WORMSCAN_PPROF_ENABLED }}" - name: WORMSCAN_INFLUX_URL diff --git a/jobs/jobs/notional/notional.go b/jobs/jobs/notional/notional.go index 1304915d..ab18fcfa 100644 --- a/jobs/jobs/notional/notional.go +++ b/jobs/jobs/notional/notional.go @@ -64,7 +64,7 @@ func (j *NotionalJob) Run() error { } // publish notional value of assets to redis pubsub. - err = j.cacheClient.Publish(j.cacheChannel, "NOTIONA_UPDATED").Err() + err = j.cacheClient.Publish(j.cacheChannel, "NOTIONAL_UPDATED").Err() if err != nil { j.logger.Error("failed to publish notional update message to redis pubsub", zap.Error(err))