Add notional cache client (#241)
* Add notional cache client * gracefull shutdown pubsub and distributed cache
This commit is contained in:
parent
c8aba636e4
commit
ffeb5927f8
|
@ -7,7 +7,6 @@ require (
|
|||
github.com/certusone/wormhole/node v0.0.0-20230315165931-62bef9ffb441
|
||||
github.com/ethereum/go-ethereum v1.10.21
|
||||
github.com/gagliardetto/solana-go v1.7.1
|
||||
github.com/go-redis/redis/v8 v8.11.5
|
||||
github.com/gofiber/adaptor/v2 v2.1.29
|
||||
github.com/gofiber/fiber/v2 v2.39.0
|
||||
github.com/improbable-eng/grpc-web v0.15.0
|
||||
|
@ -26,6 +25,8 @@ require (
|
|||
google.golang.org/grpc v1.50.1
|
||||
)
|
||||
|
||||
require github.com/go-redis/redis/v8 v8.11.5
|
||||
|
||||
require (
|
||||
contrib.go.opencensus.io/exporter/stackdriver v0.13.14 // indirect
|
||||
filippo.io/edwards25519 v1.0.0 // indirect
|
||||
|
@ -38,7 +39,7 @@ require (
|
|||
github.com/blendle/zapdriver v1.3.1 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
|
||||
github.com/deepmap/oapi-codegen v1.8.2 // indirect
|
||||
|
|
|
@ -113,8 +113,9 @@ github.com/certusone/wormhole/node v0.0.0-20230315165931-62bef9ffb441 h1:GA0tsKb
|
|||
github.com/certusone/wormhole/node v0.0.0-20230315165931-62bef9ffb441/go.mod h1:U/qHGQY1vAw//UlPYVQKSL9b3C2nL6YakCYorMEz3GY=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
|
||||
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
|
|
|
@ -22,15 +22,20 @@ type CacheClient struct {
|
|||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// CacheReadable is the interface for notiona cache client.
|
||||
type CacheReadable interface {
|
||||
Get(ctx context.Context, key string) (string, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type CacheGetFunc func(ctx context.Context, key string) (string, error)
|
||||
|
||||
// NewCacheClient init a new cache client.
|
||||
func NewCacheClient(url string, enabled bool, log *zap.Logger) *CacheClient {
|
||||
client := redis.NewClient(
|
||||
&redis.Options{
|
||||
Addr: url,
|
||||
})
|
||||
return &CacheClient{Client: client, Enabled: enabled, logger: log}
|
||||
func NewCacheClient(redisClient *redis.Client, enabled bool, log *zap.Logger) (*CacheClient, error) {
|
||||
if redisClient == nil {
|
||||
return nil, errors.New("redis client is nil")
|
||||
}
|
||||
return &CacheClient{Client: redisClient, Enabled: enabled, logger: log}, nil
|
||||
}
|
||||
|
||||
// Get get a cache value or error from a key.
|
||||
|
@ -45,7 +50,7 @@ func (c *CacheClient) Get(ctx context.Context, key string) (string, error) {
|
|||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
|
||||
c.logger.Error("ket does not exist in cache",
|
||||
c.logger.Error("key does not exist in cache",
|
||||
zap.Error(err), zap.String("key", key), zap.String("requestID", requestID))
|
||||
return "", errs.ErrNotFound
|
||||
}
|
||||
|
@ -53,3 +58,7 @@ func (c *CacheClient) Get(ctx context.Context, key string) (string, error) {
|
|||
}
|
||||
return value, nil
|
||||
}
|
||||
|
||||
func (c *CacheClient) Close() error {
|
||||
return c.Client.Close()
|
||||
}
|
||||
|
|
|
@ -11,8 +11,8 @@ type DummyCacheClient struct {
|
|||
}
|
||||
|
||||
// NewDummyCacheClient create a new instance of DummyCacheClient
|
||||
func NewDummyCacheClient() DummyCacheClient {
|
||||
return DummyCacheClient{}
|
||||
func NewDummyCacheClient() *DummyCacheClient {
|
||||
return &DummyCacheClient{}
|
||||
}
|
||||
|
||||
// Get get method is a dummy method that always does not find the cache.
|
||||
|
@ -20,3 +20,8 @@ func NewDummyCacheClient() DummyCacheClient {
|
|||
func (d *DummyCacheClient) Get(ctx context.Context, key string) (string, error) {
|
||||
return "", errs.ErrNotFound
|
||||
}
|
||||
|
||||
// Close dummy cache client.
|
||||
func (d *DummyCacheClient) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
package notional
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
wormscanNotionalCacheKeyRegex = "*WORMSCAN:NOTIONAL:CHAIN_ID:*"
|
||||
wormscanNotionalUpdated = "NOTIONAL_UPDATED"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNotFound = errors.New("NOT FOUND")
|
||||
ErrInvalidCacheField = errors.New("INVALID CACHE FIELD")
|
||||
)
|
||||
|
||||
// NotionalLocalCacheReadable is the interface for notional local cache.
|
||||
type NotionalLocalCacheReadable interface {
|
||||
Get(chainID vaa.ChainID) (NotionalCacheField, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
// NotionalCacheField is the notional value of assets in cache.
|
||||
type NotionalCacheField struct {
|
||||
NotionalUsd float64 `json:"notional_usd"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
// NotionalCacheClient redis cache client.
|
||||
type NotionalCache struct {
|
||||
client *redis.Client
|
||||
pubSub *redis.PubSub
|
||||
channel string
|
||||
notionalMap sync.Map
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewNotionalCache create a new cache client.
|
||||
// After create a NotionalCache use the Init method to initialize pubsub and load the cache.
|
||||
func NewNotionalCache(ctx context.Context, redisClient *redis.Client, channel string, log *zap.Logger) (*NotionalCache, error) {
|
||||
if redisClient == nil {
|
||||
return nil, errors.New("redis client is nil")
|
||||
}
|
||||
|
||||
pubsub := redisClient.Subscribe(ctx, channel)
|
||||
return &NotionalCache{
|
||||
client: redisClient,
|
||||
pubSub: pubsub,
|
||||
channel: channel,
|
||||
notionalMap: sync.Map{},
|
||||
logger: log}, nil
|
||||
}
|
||||
|
||||
// Init subscribe to notional pubsub and load the cache.
|
||||
func (c *NotionalCache) Init(ctx context.Context) error {
|
||||
// load notional cache
|
||||
err := c.loadCache(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// notional cache updated channel subscribe
|
||||
c.subscribe(ctx)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// loadCache load notional cache from redis.
|
||||
func (c *NotionalCache) loadCache(ctx context.Context) error {
|
||||
scanCom := c.client.Scan(ctx, 0, wormscanNotionalCacheKeyRegex, 100)
|
||||
for {
|
||||
// Scan for notional keys
|
||||
keys, cursor, err := scanCom.Result()
|
||||
if err != nil {
|
||||
c.logger.Error("loadCache", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// Get notional value from keys
|
||||
for _, key := range keys {
|
||||
var field NotionalCacheField
|
||||
value, err := c.client.Get(ctx, key).Result()
|
||||
json.Unmarshal([]byte(value), &field)
|
||||
if err != nil {
|
||||
c.logger.Error("loadCache", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
// Save notional value to local cache
|
||||
c.notionalMap.Store(key, field)
|
||||
}
|
||||
|
||||
if cursor == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe to a notional update channel and load new values for the notional cache.
|
||||
func (c *NotionalCache) subscribe(ctx context.Context) {
|
||||
ch := c.pubSub.Channel()
|
||||
|
||||
go func() {
|
||||
for msg := range ch {
|
||||
if wormscanNotionalUpdated == msg.Payload {
|
||||
// update notional cache
|
||||
c.loadCache(ctx)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Close the pubsub channel.
|
||||
func (c *NotionalCache) Close() error {
|
||||
return c.pubSub.Close()
|
||||
}
|
||||
|
||||
// Get notional cache value.
|
||||
func (c *NotionalCache) Get(chainID vaa.ChainID) (NotionalCacheField, error) {
|
||||
var notional NotionalCacheField
|
||||
|
||||
// get notional cache key
|
||||
key := fmt.Sprintf("WORMSCAN:NOTIONAL:CHAIN_ID:%d", chainID)
|
||||
|
||||
// get notional cache value
|
||||
field, ok := c.notionalMap.Load(key)
|
||||
if !ok {
|
||||
return notional, ErrNotFound
|
||||
}
|
||||
|
||||
// convert any field to NotionalCacheField
|
||||
notional, ok = field.(NotionalCacheField)
|
||||
if !ok {
|
||||
c.logger.Error("invalid notional cache field",
|
||||
zap.Any("field", field),
|
||||
zap.Any("chainID", chainID))
|
||||
return notional, ErrInvalidCacheField
|
||||
}
|
||||
return notional, nil
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
package notional
|
||||
|
||||
import (
|
||||
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||
)
|
||||
|
||||
// DummyNotionalCache is a dummy notional cache.
|
||||
type DummyNotionalCache struct {
|
||||
}
|
||||
|
||||
// NewDummyNotionalCache init a new dummy notional cache.
|
||||
func NewDummyNotionalCache() *DummyNotionalCache {
|
||||
return &DummyNotionalCache{}
|
||||
}
|
||||
|
||||
// Get get notional cache value.
|
||||
func (c *DummyNotionalCache) Get(chainID vaa.ChainID) (NotionalCacheField, error) {
|
||||
return NotionalCacheField{}, nil
|
||||
}
|
||||
|
||||
// Close the dummy cache.
|
||||
func (c *DummyNotionalCache) Close() error {
|
||||
return nil
|
||||
}
|
|
@ -33,6 +33,7 @@ type AppConfig struct {
|
|||
}
|
||||
Cache struct {
|
||||
URL string
|
||||
Channel string
|
||||
Enabled bool
|
||||
}
|
||||
PORT int
|
||||
|
|
33
api/main.go
33
api/main.go
|
@ -11,6 +11,7 @@ import (
|
|||
"syscall"
|
||||
|
||||
"github.com/ansrivas/fiberprometheus/v2"
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/gofiber/adaptor/v2"
|
||||
"github.com/gofiber/fiber/v2"
|
||||
"github.com/gofiber/fiber/v2/middleware/cors"
|
||||
|
@ -28,6 +29,7 @@ import (
|
|||
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa"
|
||||
wormscanCache "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache"
|
||||
wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache/notional"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/api/internal/config"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/api/internal/db"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/api/middleware"
|
||||
|
@ -36,7 +38,6 @@ import (
|
|||
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan"
|
||||
rpcApi "github.com/wormhole-foundation/wormhole-explorer/api/rpc"
|
||||
xlogger "github.com/wormhole-foundation/wormhole-explorer/common/logger"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -99,7 +100,7 @@ func main() {
|
|||
db := cli.Database(cfg.DB.Name)
|
||||
|
||||
// Get cache get function
|
||||
cacheGetFunc := NewCache(cfg, rootLogger)
|
||||
cache, notionalCache := NewCache(appCtx, cfg, rootLogger)
|
||||
|
||||
//InfluxDB client
|
||||
influxCli := newInfluxClient(cfg.Influx.URL, cfg.Influx.Token)
|
||||
|
@ -115,7 +116,7 @@ func main() {
|
|||
|
||||
// Set up services
|
||||
addressService := address.NewService(addressRepo, rootLogger)
|
||||
vaaService := vaa.NewService(vaaRepo, cacheGetFunc, rootLogger)
|
||||
vaaService := vaa.NewService(vaaRepo, cache.Get, rootLogger)
|
||||
obsService := observations.NewService(obsRepo, rootLogger)
|
||||
governorService := governor.NewService(governorRepo, rootLogger)
|
||||
infrastructureService := infrastructure.NewService(infrastructureRepo, rootLogger)
|
||||
|
@ -179,17 +180,33 @@ func main() {
|
|||
rootLogger.Info("cleanup tasks...")
|
||||
rootLogger.Info("shutdown server...")
|
||||
app.Shutdown()
|
||||
rootLogger.Info("close pubsub notional...")
|
||||
notionalCache.Close()
|
||||
rootLogger.Info("close cache...")
|
||||
cache.Close()
|
||||
rootLogger.Info("finished successfully wormhole api")
|
||||
}
|
||||
|
||||
// NewCache return a CacheGetFunc to get a value by a Key from cache.
|
||||
func NewCache(cfg *config.AppConfig, looger *zap.Logger) wormscanCache.CacheGetFunc {
|
||||
// NewCache get a CacheGetFunc to get a value by a Key from cache and a CacheReadable to get a value by a Key from notional local cache.
|
||||
func NewCache(ctx context.Context, cfg *config.AppConfig, logger *zap.Logger) (wormscanCache.CacheReadable, wormscanNotionalCache.NotionalLocalCacheReadable) {
|
||||
// if run mode is development with cache is disabled, return a dummy cache client and a dummy notional cache client.
|
||||
if cfg.RunMode == config.RunModeDevelopmernt && !cfg.Cache.Enabled {
|
||||
dummyCacheClient := wormscanCache.NewDummyCacheClient()
|
||||
return dummyCacheClient.Get
|
||||
dummyNotionalCache := wormscanNotionalCache.NewDummyNotionalCache()
|
||||
return dummyCacheClient, dummyNotionalCache
|
||||
}
|
||||
cacheClient := wormscanCache.NewCacheClient(cfg.Cache.URL, cfg.Cache.Enabled, looger)
|
||||
return cacheClient.Get
|
||||
|
||||
// if we are not in development mode, use a distributed cache and for notional a pubsub to sync local cache.
|
||||
redisClient := redis.NewClient(&redis.Options{Addr: cfg.Cache.URL})
|
||||
|
||||
// get cache client
|
||||
cacheClient, _ := wormscanCache.NewCacheClient(redisClient, cfg.Cache.Enabled, logger)
|
||||
|
||||
// get notional cache client and init load to local cache
|
||||
notionalCache, _ := wormscanNotionalCache.NewNotionalCache(ctx, redisClient, cfg.Cache.Channel, logger)
|
||||
notionalCache.Init(ctx)
|
||||
|
||||
return cacheClient, notionalCache
|
||||
}
|
||||
|
||||
func newInfluxClient(url, token string) influxdb2.Client {
|
||||
|
|
|
@ -78,6 +78,8 @@ spec:
|
|||
key: redis-uri
|
||||
- name: WORMSCAN_CACHE_ENABLED
|
||||
value: "true"
|
||||
- name: WORMSCAN_CACHE_CHANNEL
|
||||
value: ""WORMSCAN:NOTIONAL"
|
||||
- name: WORMSCAN_PPROF_ENABLED
|
||||
value: "{{ .WORMSCAN_PPROF_ENABLED }}"
|
||||
- name: WORMSCAN_INFLUX_URL
|
||||
|
|
|
@ -64,7 +64,7 @@ func (j *NotionalJob) Run() error {
|
|||
}
|
||||
|
||||
// publish notional value of assets to redis pubsub.
|
||||
err = j.cacheClient.Publish(j.cacheChannel, "NOTIONA_UPDATED").Err()
|
||||
err = j.cacheClient.Publish(j.cacheChannel, "NOTIONAL_UPDATED").Err()
|
||||
if err != nil {
|
||||
j.logger.Error("failed to publish notional update message to redis pubsub",
|
||||
zap.Error(err))
|
||||
|
|
Loading…
Reference in New Issue