diff --git a/api/go.mod b/api/go.mod index 915071f6..ffeed106 100644 --- a/api/go.mod +++ b/api/go.mod @@ -6,6 +6,7 @@ require ( github.com/ansrivas/fiberprometheus/v2 v2.4.1 github.com/certusone/wormhole/node v0.0.0-20220907133901-8e231501b6cd github.com/ethereum/go-ethereum v1.10.6 + github.com/go-redis/redis/v8 v8.11.5 github.com/gofiber/fiber/v2 v2.39.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/pkg/errors v0.9.1 @@ -24,6 +25,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect diff --git a/api/go.sum b/api/go.sum index 7ac1ad93..702152b0 100644 --- a/api/go.sum +++ b/api/go.sum @@ -142,6 +142,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dlclark/regexp2 v1.2.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/docker/docker v1.4.2-0.20180625184442-8e610b2b55bf/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= @@ -187,6 +189,8 @@ github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KE github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-sourcemap/sourcemap v2.1.2+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= @@ -436,7 +440,7 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= diff --git a/api/handlers/vaa/service.go b/api/handlers/vaa/service.go index 179c142b..d02e85b3 100644 --- a/api/handlers/vaa/service.go +++ b/api/handlers/vaa/service.go @@ -2,8 +2,13 @@ package vaa import ( "context" + "fmt" + "strconv" "github.com/certusone/wormhole/node/pkg/vaa" + "github.com/pkg/errors" + "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache" + errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" "github.com/wormhole-foundation/wormhole-explorer/api/internal/pagination" "github.com/wormhole-foundation/wormhole-explorer/api/response" "go.uber.org/zap" @@ -11,13 +16,14 @@ import ( // Service definition. type Service struct { - repo *Repository - logger *zap.Logger + repo *Repository + getCacheFunc cache.CacheGetFunc + logger *zap.Logger } // NewService create a new Service. -func NewService(r *Repository, logger *zap.Logger) *Service { - return &Service{repo: r, logger: logger.With(zap.String("module", "VaaService"))} +func NewService(r *Repository, getCacheFunc cache.CacheGetFunc, logger *zap.Logger) *Service { + return &Service{repo: r, getCacheFunc: getCacheFunc, logger: logger.With(zap.String("module", "VaaService"))} } // FindAll get all the the vaa. @@ -50,6 +56,12 @@ func (s *Service) FindByEmitter(ctx context.Context, chain vaa.ChainID, emitter // FindById get a vaa by chainID, emitter address and sequence number. func (s *Service) FindById(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) (*response.Response[*VaaDoc], error) { + // check vaa sequence indexed + isVaaNotIndexed := s.discardVaaNotIndexed(ctx, chain, emitter, seq) + if isVaaNotIndexed { + return nil, errs.ErrNotFound + } + query := Query().SetChain(chain).SetEmitter(emitter.String()).SetSequence(seq) vaas, err := s.repo.FindOne(ctx, query) res := response.Response[*VaaDoc]{Data: vaas} @@ -66,3 +78,40 @@ func (s *Service) GetVaaCount(ctx context.Context, p *pagination.Pagination) (*r res := response.Response[[]*VaaStats]{Data: stats} return &res, err } + +// discardVaaNotIndexed discard a vaa request if the input sequence for a chainID, address is greatter than or equals +// the cached value of the sequence for this chainID, address. +// If the sequence does not exist we can not discard the request. +func (s *Service) discardVaaNotIndexed(ctx context.Context, chain vaa.ChainID, emitter vaa.Address, seq string) bool { + key := fmt.Sprintf("%s:%d:%s", "wormscan:vaa-max-sequence", chain, emitter.String()) + sequence, err := s.getCacheFunc(ctx, key) + if err != nil { + if errors.Is(err, errs.ErrInternalError) { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + s.logger.Error("error getting value from cache", + zap.Error(err), zap.String("requestID", requestID)) + } + return false + } + + inputSquence, err := strconv.ParseUint(seq, 10, 64) + if err != nil { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + s.logger.Error("error invalid input sequence number", + zap.Error(err), zap.String("seq", seq), zap.String("requestID", requestID)) + return false + } + cacheSequence, err := strconv.ParseUint(sequence, 10, 64) + if err != nil { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + s.logger.Error("error invalid cached sequence number", + zap.Error(err), zap.String("sequence", sequence), zap.String("requestID", requestID)) + return false + } + + // Check that the input sequence is indexed. + if cacheSequence >= inputSquence { + return false + } + return true +} diff --git a/api/internal/cache/cache.go b/api/internal/cache/cache.go new file mode 100644 index 00000000..f5fb844f --- /dev/null +++ b/api/internal/cache/cache.go @@ -0,0 +1,55 @@ +// Package cache implement a simple cache redis client. +// It define a type [Cache] that represent the cache client and +// It define the methods Get to get a valur from a cache key. +package cache + +import ( + "context" + "errors" + "fmt" + + "github.com/go-redis/redis/v8" + errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" + "go.uber.org/zap" +) + +var ErrCacheNotEnabled = errors.New("CACHE NOT ENABLED") + +// CacheClient redis cache client. +type CacheClient struct { + Client *redis.Client + Enabled bool + logger *zap.Logger +} + +type CacheGetFunc func(ctx context.Context, key string) (string, error) + +// NewCacheClient init a new cache client. +func NewCacheClient(url string, enabled bool, log *zap.Logger) *CacheClient { + client := redis.NewClient( + &redis.Options{ + Addr: url, + }) + return &CacheClient{Client: client, Enabled: enabled, logger: log} +} + +// Get get a cache value or error from a key. +// If the cache is not enabled, the error value +// If the cache not contain a value from a key, the error value errors.ErrNotFound is returned. +// If exist some internal error in the cache, the error value errros.ErrInternalError is returned. +func (c *CacheClient) Get(ctx context.Context, key string) (string, error) { + if !c.Enabled { + return "", ErrCacheNotEnabled + } + value, err := c.Client.Get(ctx, key).Result() + if err != nil { + if err != redis.Nil { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + c.logger.Error("ket does not exist in cache", + zap.Error(err), zap.String("key", key), zap.String("requestID", requestID)) + return "", errs.ErrNotFound + } + return "", errs.ErrInternalError + } + return value, nil +} diff --git a/api/internal/cache/dummycache.go b/api/internal/cache/dummycache.go new file mode 100644 index 00000000..b1964c14 --- /dev/null +++ b/api/internal/cache/dummycache.go @@ -0,0 +1,22 @@ +package cache + +import ( + "context" + + errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" +) + +// DummyCacheClient dummy cache client. +type DummyCacheClient struct { +} + +// NewDummyCacheClient create a new instance of DummyCacheClient +func NewDummyCacheClient() DummyCacheClient { + return DummyCacheClient{} +} + +// Get get method is a dummy method that always does not find the cache. +// Use this Get function when run development enviroment +func (d *DummyCacheClient) Get(ctx context.Context, key string) (string, error) { + return "", errs.ErrNotFound +} diff --git a/api/internal/config/config.go b/api/internal/config/config.go index 0fa7cb9d..06b70b5d 100644 --- a/api/internal/config/config.go +++ b/api/internal/config/config.go @@ -24,7 +24,10 @@ type AppConfig struct { // database name Name string } - + Cache struct { + URL string + Enabled bool + } PORT int LogLevel string RunMode string diff --git a/api/main.go b/api/main.go index 93165bb9..7e217683 100644 --- a/api/main.go +++ b/api/main.go @@ -20,10 +20,12 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/api/handlers/infraestructure" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/observations" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa" + wormscanCache "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache" "github.com/wormhole-foundation/wormhole-explorer/api/internal/config" "github.com/wormhole-foundation/wormhole-explorer/api/internal/db" "github.com/wormhole-foundation/wormhole-explorer/api/middleware" "github.com/wormhole-foundation/wormhole-explorer/api/response" + "go.uber.org/zap" ) var cacheConfig = cache.Config{ @@ -68,6 +70,9 @@ func main() { } db := cli.Database(cfg.DB.Name) + // Get cache get function + cacheGetFunc := NewCache(cfg, rootLogger) + // Setup repositories vaaRepo := vaa.NewRepository(db, rootLogger) obsRepo := observations.NewRepository(db, rootLogger) @@ -76,7 +81,7 @@ func main() { heartbeatsRepo := heartbeats.NewRepository(db, rootLogger) // Setup services - vaaService := vaa.NewService(vaaRepo, rootLogger) + vaaService := vaa.NewService(vaaRepo, cacheGetFunc, rootLogger) obsService := observations.NewService(obsRepo, rootLogger) governorService := governor.NewService(governorRepo, rootLogger) infraestructureService := infraestructure.NewService(infraestructureRepo, rootLogger) @@ -175,3 +180,13 @@ func main() { app.Listen(":" + strconv.Itoa(cfg.PORT)) } + +// NewCache return a CacheGetFunc to get a value by a Key from cache. +func NewCache(cfg *config.AppConfig, looger *zap.Logger) wormscanCache.CacheGetFunc { + if cfg.RunMode == config.RunModeDevelopmernt && !cfg.Cache.Enabled { + dummyCacheClient := wormscanCache.NewDummyCacheClient() + return dummyCacheClient.Get + } + cacheClient := wormscanCache.NewCacheClient(cfg.Cache.URL, cfg.Cache.Enabled, looger) + return cacheClient.Get +} diff --git a/deploy/api/api-service.yaml b/deploy/api/api-service.yaml index 427ef912..3821e654 100644 --- a/deploy/api/api-service.yaml +++ b/deploy/api/api-service.yaml @@ -68,7 +68,17 @@ spec: name: mongodb key: mongo-uri - name: WORMSCAN_DB_NAME - value: {{ .WORMSCAN_DB_NAME }} + valueFrom: + configMapKeyRef: + name: config + key: mongo-database + - name: WORMSCAN_CACHE_URL + valueFrom: + configMapKeyRef: + name: config + key: redis-uri + - name: WORMSCAN_CACHE_ENABLED + value: "true" resources: limits: memory: {{ .RESOURCES_LIMITS_MEMORY }} diff --git a/deploy/api/env/dev.env b/deploy/api/env/dev.env index 8381d499..8a2fb7af 100644 --- a/deploy/api/env/dev.env +++ b/deploy/api/env/dev.env @@ -7,7 +7,6 @@ RESOURCES_LIMITS_MEMORY=256Mi RESOURCES_LIMITS_CPU=500m RESOURCES_REQUESTS_MEMORY=128Mi RESOURCES_REQUESTS_CPU=250m -WORMSCAN_DB_NAME=wormhole WORMSCAN_RUNMODE=DEVELOPMENT WORMSCAN_LOGLEVEL=INFO HOSTNAME=staging-api.wormscan.io diff --git a/deploy/common/configmap.yaml b/deploy/common/configmap.yaml new file mode 100644 index 00000000..3db9b896 --- /dev/null +++ b/deploy/common/configmap.yaml @@ -0,0 +1,9 @@ +--- +kind: ConfigMap +apiVersion: v1 +metadata: + name: config + namespace: {{ .NAMESPACE }} +data: + mongo-database: {{ .MONGODB_DATABASE }} + redis-uri: {{ .REDIS_URI }} diff --git a/deploy/common/env/dev.env b/deploy/common/env/dev.env index d2bf30cc..25ca15f2 100644 --- a/deploy/common/env/dev.env +++ b/deploy/common/env/dev.env @@ -1,2 +1,4 @@ NAMESPACE=wormscan MONGODB_URI= +MONGODB_DATABASE=wormhole +REDIS_URI= diff --git a/deploy/fly/fly-service.yaml b/deploy/fly/fly-service.yaml index 0668e561..349653dd 100644 --- a/deploy/fly/fly-service.yaml +++ b/deploy/fly/fly-service.yaml @@ -44,6 +44,11 @@ spec: secretKeyRef: name: mongodb key: mongo-uri + - name: MONGODB_DATABASE + valueFrom: + configMapKeyRef: + name: config + key: mongo-database - name: SQS_URL value: {{ .SQS_URL }} - name: AWS_REGION @@ -58,6 +63,11 @@ spec: secretKeyRef: name: fly-sqs key: aws-secret-access-key + - name: REDIS_URI + valueFrom: + configMapKeyRef: + name: config + key: redis-uri resources: limits: memory: {{ .RESOURCES_LIMITS_MEMORY }} diff --git a/deploy/spy/env/dev.env b/deploy/spy/env/dev.env index fa01b416..cf3de7a6 100644 --- a/deploy/spy/env/dev.env +++ b/deploy/spy/env/dev.env @@ -7,6 +7,5 @@ RESOURCES_LIMITS_MEMORY=256Mi RESOURCES_LIMITS_CPU=500m RESOURCES_REQUESTS_MEMORY=128Mi RESOURCES_REQUESTS_CPU=250m -MONGODB_DATABASE=wormhole GRPC_ADDRESS=0.0.0.0:7777 HOSTNAME=staging-spy.wormscan.io diff --git a/deploy/spy/spy-service.yaml b/deploy/spy/spy-service.yaml index 8be93f2f..453c09a2 100644 --- a/deploy/spy/spy-service.yaml +++ b/deploy/spy/spy-service.yaml @@ -62,7 +62,10 @@ spec: name: mongodb key: mongo-uri - name: MONGODB_DATABASE - value: {{ .MONGODB_DATABASE }} + valueFrom: + configMapKeyRef: + name: config + key: mongo-database - name: GRPC_ADDRESS value: {{ .GRPC_ADDRESS }} - name: PORT diff --git a/devnet/api.yaml b/devnet/api.yaml index eb9393bf..9ed29b15 100644 --- a/devnet/api.yaml +++ b/devnet/api.yaml @@ -37,6 +37,8 @@ spec: value: mongodb://mongo-0.mongo/?replicaSet=rs0 - name: WORMSCAN_PORT value: "8000" + - name: WORMSCAN_CACHE_ENABLED + value: "false" readinessProbe: tcpSocket: port: 8000 diff --git a/fly/go.mod b/fly/go.mod index bbb944b1..e6abf56a 100644 --- a/fly/go.mod +++ b/fly/go.mod @@ -8,6 +8,7 @@ require ( github.com/dgraph-io/ristretto v0.1.1 github.com/eko/gocache/v3 v3.1.2 github.com/ethereum/go-ethereum v1.10.21 + github.com/go-redis/redis/v8 v8.11.5 github.com/gofiber/fiber/v2 v2.40.1 github.com/ipfs/go-log/v2 v2.5.1 github.com/joho/godotenv v1.4.0 @@ -44,7 +45,6 @@ require ( github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect - github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.3 // indirect diff --git a/fly/main.go b/fly/main.go index 330f4b03..56799048 100644 --- a/fly/main.go +++ b/fly/main.go @@ -10,10 +10,12 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" + "github.com/go-redis/redis/v8" "github.com/wormhole-foundation/wormhole-explorer/fly/deduplicator" "github.com/wormhole-foundation/wormhole-explorer/fly/guardiansets" "github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs" "github.com/wormhole-foundation/wormhole-explorer/fly/migration" + "github.com/wormhole-foundation/wormhole-explorer/fly/notifier" "github.com/wormhole-foundation/wormhole-explorer/fly/processor" "github.com/wormhole-foundation/wormhole-explorer/fly/queue" "github.com/wormhole-foundation/wormhole-explorer/fly/server" @@ -144,6 +146,24 @@ func newVAAConsumePublish(isLocal bool, logger *zap.Logger) (*sqs.Consumer, proc return sqsConsumer, vaaQueue.Consume, vaaQueue.Publish } +func newVAANotifierFunc(isLocal bool, logger *zap.Logger) processor.VAANotifyFunc { + + if isLocal { + return func(context.Context, *vaa.VAA, []byte) error { + return nil + } + } + + redisUri, err := getenv("REDIS_URI") + if err != nil { + logger.Fatal("could not create vaa notifier ", zap.Error(err)) + } + + client := redis.NewClient(&redis.Options{Addr: redisUri}) + + return notifier.NewLastSequenceNotifier(client).Notify +} + func main() { // Node's main lifecycle context. @@ -191,7 +211,12 @@ func main() { logger.Fatal("You must set your 'MONGODB_URI' environmental variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable") } - db, err := storage.GetDB(rootCtx, logger, uri, "wormhole") + databaseName := os.Getenv("MONGODB_DATABASE") + if databaseName == "" { + logger.Fatal("You must set your 'MONGODB_DATABASE' environmental variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable") + } + + db, err := storage.GetDB(rootCtx, logger, uri, databaseName) if err != nil { logger.Fatal("could not connect to DB", zap.Error(err)) } @@ -261,17 +286,20 @@ func main() { if err != nil { logger.Fatal("could not create cache", zap.Error(err)) } + isLocalFlag := isLocal != nil && *isLocal // Creates a deduplicator to discard VAA messages that were processed previously deduplicator := deduplicator.New(cache, logger) // Creates two callbacks - sqsConsumer, vaaQueueConsume, nonPythVaaPublish := newVAAConsumePublish(isLocal != nil && *isLocal, logger) + sqsConsumer, vaaQueueConsume, nonPythVaaPublish := newVAAConsumePublish(isLocalFlag, logger) + // Create a vaa notifier + notifierFunc := newVAANotifierFunc(isLocalFlag, logger) // Creates a instance to consume VAA messages from Gossip network and handle the messages // When recive a message, the message filter by deduplicator // if VAA is from pyhnet should be saved directly to repository // if VAA is from non pyhnet should be publish with nonPythVaaPublish vaaGossipConsumer := processor.NewVAAGossipConsumer(gst, deduplicator, nonPythVaaPublish, repository.UpsertVaa, logger) // Creates a instance to consume VAA messages (non pyth) from a queue and store in a storage - vaaQueueConsumer := processor.NewVAAQueueConsumer(vaaQueueConsume, repository, logger) + vaaQueueConsumer := processor.NewVAAQueueConsumer(vaaQueueConsume, repository, notifierFunc, logger) // Creates a wrapper that splits the incoming VAAs into 2 channels (pyth to non pyth) in order // to be able to process them in a differentiated way vaaGossipConsumerSplitter := processor.NewVAAGossipSplitterConsumer(vaaGossipConsumer.Push, logger) diff --git a/fly/notifier/vaa_last_sequence.go b/fly/notifier/vaa_last_sequence.go new file mode 100644 index 00000000..f3981b25 --- /dev/null +++ b/fly/notifier/vaa_last_sequence.go @@ -0,0 +1,53 @@ +package notifier + +import ( + "context" + "fmt" + "strconv" + + "github.com/go-redis/redis/v8" + "github.com/wormhole-foundation/wormhole/sdk/vaa" +) + +const LUA_SCRIPT = ` +local newValue = ARGV[1]; +if (newValue == "" or newValue:find("%D")) then + return redis.error_reply(string.format("[%s] is not a valid number", newValue)); +end +local currentValue = redis.call('get', KEYS[1]); +if currentValue then + if string.len(newValue) > string.len(currentValue) then + redis.call('set', KEYS[1], ARGV[1]); + return newValue + elseif string.len(newValue) < string.len(currentValue) then + return currentValue; + elseif newValue > currentValue then + redis.call('set', KEYS[1], ARGV[1]) + return newValue + else + return currentValue + end +else + redis.call('set', KEYS[1], ARGV[1]) + return newValue +end +` + +type LastSequenceNotifier struct { + client *redis.Client + script *redis.Script +} + +func NewLastSequenceNotifier(c *redis.Client) *LastSequenceNotifier { + return &LastSequenceNotifier{ + client: c, + script: redis.NewScript(LUA_SCRIPT), + } +} + +func (l *LastSequenceNotifier) Notify(ctx context.Context, v *vaa.VAA, _ []byte) error { + key := fmt.Sprintf("wormscan:vaa-max-sequence:%d:%s", v.EmitterChain, v.EmitterAddress.String()) + sequence := strconv.FormatUint(v.Sequence, 10) + _, err := l.script.Run(ctx, l.client, []string{key}, sequence).Result() + return err +} diff --git a/fly/processor/vaa_gossip_consumer.go b/fly/processor/vaa_gossip_consumer.go index c1dddb1e..cc1c0bd6 100644 --- a/fly/processor/vaa_gossip_consumer.go +++ b/fly/processor/vaa_gossip_consumer.go @@ -54,5 +54,6 @@ func (p *vaaGossipConsumer) Push(ctx context.Context, v *vaa.VAA, serializedVaa zap.Error(err)) return err } + return nil } diff --git a/fly/processor/vaa_queue_consumer.go b/fly/processor/vaa_queue_consumer.go index 79fd9357..c8bc88c9 100644 --- a/fly/processor/vaa_queue_consumer.go +++ b/fly/processor/vaa_queue_consumer.go @@ -17,6 +17,7 @@ type VAAQueueConsumeFunc func(context.Context) <-chan *queue.Message type VAAQueueConsumer struct { consume VAAQueueConsumeFunc repository *storage.Repository + notifyFunc VAANotifyFunc logger *zap.Logger } @@ -24,10 +25,12 @@ type VAAQueueConsumer struct { func NewVAAQueueConsumer( consume VAAQueueConsumeFunc, repository *storage.Repository, + notifyFunc VAANotifyFunc, logger *zap.Logger) *VAAQueueConsumer { return &VAAQueueConsumer{ consume: consume, repository: repository, + notifyFunc: notifyFunc, logger: logger, } } @@ -58,6 +61,15 @@ func (c *VAAQueueConsumer) Start(ctx context.Context) { zap.Error(err)) continue } + + err = c.notifyFunc(ctx, v, msg.Data) + if err != nil { + c.logger.Error("Error notifying vaa", + zap.String("id", v.MessageID()), + zap.Error(err)) + continue + } + msg.Ack() c.logger.Info("Vaa save in repository", zap.String("id", v.MessageID())) } diff --git a/fly/processor/vaa_types.go b/fly/processor/vaa_types.go index 5dfcba61..7db929a1 100644 --- a/fly/processor/vaa_types.go +++ b/fly/processor/vaa_types.go @@ -8,3 +8,6 @@ import ( // VAAPushFunc is a function to push VAA message. type VAAPushFunc func(context.Context, *vaa.VAA, []byte) error + +// VAANotifyFunc is a function to notify saved VAA message. +type VAANotifyFunc func(context.Context, *vaa.VAA, []byte) error