Improve fly reliability (#1094)

Add sqs queue for observations
Add tx hash strategy for vaas
Refactoring to organize the construction
This commit is contained in:
ftocal 2024-02-06 21:00:45 -03:00 committed by GitHub
parent 61c5f3e29d
commit 7d5a3bbe52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
62 changed files with 1823 additions and 865 deletions

View File

@ -11,3 +11,9 @@ type ReadyCheck struct {
}
type Check func(context.Context) error
func Noop() Check {
return func(ctx context.Context) error {
return nil
}
}

View File

@ -0,0 +1,3 @@
package repository
const VaaIdTxHash = "vaaIdTxHash"

10
deploy/fly/configmap.yaml Normal file
View File

@ -0,0 +1,10 @@
---
kind: ConfigMap
apiVersion: v1
metadata:
name: fly
namespace: {{ .NAMESPACE }}
data:
aws-region: {{ .SQS_AWS_REGION }}
vaas-sqs-url: {{ .VAAS_SQS_URL }}
observations-sqs-url: {{ .OBSERVATIONS_SQS_URL }}

View File

@ -8,8 +8,9 @@ RESOURCES_LIMITS_MEMORY=512Mi
RESOURCES_LIMITS_CPU=700m
RESOURCES_REQUESTS_MEMORY=384Mi
RESOURCES_REQUESTS_CPU=500m
SQS_URL=
SQS_AWS_REGION=
VAAS_SQS_URL=
OBSERVATIONS_SQS_URL=
P2P_NETWORK=mainnet
P2P_PORT=8999
PPROF_ENABLED=false

View File

@ -8,8 +8,9 @@ RESOURCES_LIMITS_MEMORY=256Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
SQS_URL=
SQS_AWS_REGION=
VAAS_SQS_URL=
OBSERVATIONS_SQS_URL=
P2P_NETWORK=testnet
P2P_PORT=8999
PPROF_ENABLED=false

View File

@ -8,8 +8,9 @@ RESOURCES_LIMITS_MEMORY=512Mi
RESOURCES_LIMITS_CPU=700m
RESOURCES_REQUESTS_MEMORY=384Mi
RESOURCES_REQUESTS_CPU=500m
SQS_URL=
SQS_AWS_REGION=
VAAS_SQS_URL=
OBSERVATIONS_SQS_URL=
P2P_NETWORK=mainnet
P2P_PORT=8999
PPROF_ENABLED=true

View File

@ -8,8 +8,9 @@ RESOURCES_LIMITS_MEMORY=256Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
SQS_URL=
SQS_AWS_REGION=
VAAS_SQS_URL=
OBSERVATIONS_SQS_URL=
P2P_NETWORK=testnet
P2P_PORT=8998
PPROF_ENABLED=false

View File

@ -69,10 +69,21 @@ spec:
value: "{{ .P2P_PORT }}"
- name: PPROF_ENABLED
value: "{{ .PPROF_ENABLED }}"
- name: SQS_URL
value: {{ .SQS_URL }}
- name: AWS_REGION
value: {{ .SQS_AWS_REGION }}
valueFrom:
configMapKeyRef:
name: fly
key: aws-region
- name: SQS_URL
valueFrom:
configMapKeyRef:
name: fly
key: vaas-sqs-url
- name: OBSERVATIONS_SQS_URL
valueFrom:
configMapKeyRef:
name: fly
key: observations-sqs-url
- name: REDIS_URI
valueFrom:
configMapKeyRef:

View File

@ -5,8 +5,8 @@ WORKDIR /app
COPY fly fly
COPY common common
# Build the Go app
RUN cd fly && CGO_ENABLED=1 GOOS=linux go build -o "./fly" main.go && \
RUN --mount=type=cache,target=/root/.go --mount=type=cache,target=/go \
cd fly && CGO_ENABLED=1 GOOS=linux go build -o "./fly" main.go && \
go get github.com/CosmWasm/wasmvm@v1.0.0 && \
cp /go/pkg/mod/github.com/!cosm!wasm/wasmvm@v1.0.0/api/libwasmvm.x86_64.so /usr/lib/

60
fly/builder/aws.go Normal file
View File

@ -0,0 +1,60 @@
package builder
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
)
func NewAwsConfig(ctx context.Context, config *config.Configuration) (aws.Config, error) {
if config.IsLocal {
return *aws.NewConfig(), nil
}
awsSecretId := config.Aws.AwsAccessKeyID
awsSecretKey := config.Aws.AwsSecretAccessKey
if awsSecretId != "" && awsSecretKey != "" {
credentials := credentials.NewStaticCredentialsProvider(awsSecretId, awsSecretKey, "")
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
awsEndpoint := config.Aws.AwsEndpoint
if awsEndpoint != "" {
return aws.Endpoint{
PartitionID: "aws",
URL: awsEndpoint,
SigningRegion: region,
}, nil
}
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
})
awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
awsconfig.WithRegion(config.Aws.AwsRegion),
awsconfig.WithEndpointResolver(customResolver),
awsconfig.WithCredentialsProvider(credentials),
)
return awsCfg, err
}
return awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(config.Aws.AwsRegion))
}
func NewSQSProducer(awsConfig aws.Config, sqsURL string) (*sqs.Producer, error) {
return sqs.NewProducer(awsConfig, sqsURL)
}
func NewSQSConsumer(sqsURL string, ctx context.Context, cfg *config.Configuration) (*sqs.Consumer, error) {
awsConfig, err := NewAwsConfig(ctx, cfg)
if err != nil {
return nil, err
}
return sqs.NewConsumer(awsConfig, sqsURL,
sqs.WithMaxMessages(10),
sqs.WithVisibilityTimeout(120))
}

31
fly/builder/cache.go Normal file
View File

@ -0,0 +1,31 @@
package builder
import (
"github.com/dgraph-io/ristretto"
"github.com/eko/gocache/v3/cache"
"github.com/eko/gocache/v3/store"
"github.com/wormhole-foundation/wormhole-explorer/fly/deduplicator"
"go.uber.org/zap"
)
func NewCache[T any]() (cache.CacheInterface[T], error) {
c, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 10000, // Num keys to track frequency of (1000).
MaxCost: 10 * (1 << 20), // Maximum cost of cache (10 MB).
BufferItems: 64, // Number of keys per Get buffer.
})
if err != nil {
return nil, err
}
store := store.NewRistretto(c)
return cache.New[T](store), nil
}
func NewDeduplicator(logger *zap.Logger) (*deduplicator.Deduplicator, error) {
// Creates a deduplicator to discard VAA messages that were processed previously
deduplicatorCache, err := NewCache[bool]()
if err != nil {
return nil, err
}
return deduplicator.New(deduplicatorCache, logger), nil
}

28
fly/builder/gossip.go Normal file
View File

@ -0,0 +1,28 @@
package builder
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
"github.com/wormhole-foundation/wormhole-explorer/fly/gossip"
)
func NewGossipChannels(cfg *config.Configuration) *gossip.GossipChannels {
return &gossip.GossipChannels{
SendChannel: make(chan []byte),
ObsvChannel: make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], cfg.ObservationsChannelSize),
ObsvReqChannel: make(chan *gossipv1.ObservationRequest, 50),
SignedInChannel: make(chan *gossipv1.SignedVAAWithQuorum, cfg.VaasChannelSize),
HeartbeatChannel: make(chan *gossipv1.Heartbeat, cfg.HeartbeatsChannelSize),
GovConfigChannel: make(chan *gossipv1.SignedChainGovernorConfig, cfg.GovernorConfigChannelSize),
GovStatusChannel: make(chan *gossipv1.SignedChainGovernorStatus, cfg.GovernorStatusChannelSize),
}
}

13
fly/builder/mongo.go Normal file
View File

@ -0,0 +1,13 @@
package builder
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
"go.uber.org/zap"
)
func NewDatabase(ctx context.Context, cfg *config.Configuration, logger *zap.Logger) (*dbutil.Session, error) {
return dbutil.Connect(ctx, logger, cfg.MongoUri, cfg.MongoDatabase, cfg.MongoEnableQueryLog)
}

42
fly/builder/monitoring.go Normal file
View File

@ -0,0 +1,42 @@
package builder
import (
"context"
"errors"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
healthcheck "github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
flyAlert "github.com/wormhole-foundation/wormhole-explorer/fly/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
)
func NewAlertClient(cfg *config.Configuration) (alert.AlertClient, error) {
if !cfg.AlertEnabled {
return alert.NewDummyClient(), nil
}
alertConfig := alert.AlertConfig{
Environment: cfg.Environment,
Enabled: cfg.AlertEnabled,
ApiKey: cfg.AlertApiKey,
}
return alert.NewAlertService(alertConfig, flyAlert.LoadAlerts)
}
func NewMetrics(cfg *config.Configuration) metrics.Metrics {
if !cfg.MetricsEnabled {
return metrics.NewDummyMetrics()
}
return metrics.NewPrometheusMetrics(cfg.Environment)
}
func CheckGuardian(guardian *health.GuardianCheck) healthcheck.Check {
return func(ctx context.Context) error {
isAlive := guardian.IsAlive()
if !isAlive {
return errors.New("guardian healthcheck not arrive in time")
}
return nil
}
}

View File

@ -0,0 +1,68 @@
package builder
import (
"context"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
"github.com/wormhole-foundation/wormhole-explorer/fly/deduplicator"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/processor"
"github.com/wormhole-foundation/wormhole-explorer/fly/queue"
"github.com/wormhole-foundation/wormhole-explorer/fly/txhash"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// Creates two callbacks depending on whether the execution is local (memory queue) or not (SQS queue)
// callback to obtain queue messages from a queue
// callback to publish vaa non pyth messages to a sink
func NewObservationConsumePublish(ctx context.Context, config *config.Configuration, logger *zap.Logger) (health.Check, processor.ObservationQueueConsumeFunc, processor.ObservationPushFunc) {
if config.IsLocal {
obsQueue := queue.NewObservationInMemory()
return health.Noop(), obsQueue.Consume, obsQueue.Publish
}
awsConfig, err := NewAwsConfig(ctx, config)
if err != nil {
logger.Fatal("could not create aws config", zap.Error(err))
}
sqsProducer, err := NewSQSProducer(awsConfig, config.Aws.ObservationsSqsUrl)
if err != nil {
logger.Fatal("could not create sqs producer", zap.Error(err))
}
sqsConsumer, err := NewSQSConsumer(config.Aws.ObservationsSqsUrl, ctx, config)
if err != nil {
logger.Fatal("could not create sqs consumer", zap.Error(err))
}
observationQueue := queue.NewObservationSqs(sqsProducer, sqsConsumer, logger)
return health.SQS(awsConfig, config.Aws.ObservationsSqsUrl), observationQueue.Consume, observationQueue.Publish
}
func NewTxHashStore(ctx context.Context, config *config.Configuration, metrics metrics.Metrics, db *mongo.Database, logger *zap.Logger) (txhash.TxHashStore, error) {
// Creates a deduplicator to discard VAA messages that were processed previously
deduplicatorCache, err := NewCache[bool]()
if err != nil {
return nil, err
}
deduplicator := deduplicator.New(deduplicatorCache, logger)
cacheTxHash, err := NewCache[txhash.TxHash]()
if err != nil {
return nil, err
}
var txHashStores []txhash.TxHashStore
txHashStores = append(txHashStores, txhash.NewCacheTxHash(cacheTxHash, 30*time.Minute, logger))
if !config.IsLocal {
redisClient := NewRedisClient(config)
txHashStores = append(txHashStores, txhash.NewRedisTxHash(redisClient, config.Redis.RedisPrefix, 30*time.Minute, logger))
}
txHashStores = append(txHashStores, txhash.NewMongoTxHash(db, logger))
txHashStore := txhash.NewComposite(txHashStores, metrics, logger)
dedupTxHashStore := txhash.NewDedupTxHashStore(txHashStore, deduplicator, logger)
return dedupTxHashStore, nil
}

10
fly/builder/redis.go Normal file
View File

@ -0,0 +1,10 @@
package builder
import (
"github.com/go-redis/redis/v8"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
)
func NewRedisClient(cfg *config.Configuration) *redis.Client {
return redis.NewClient(&redis.Options{Addr: cfg.Redis.RedisUri})
}

70
fly/builder/vaa.go Normal file
View File

@ -0,0 +1,70 @@
package builder
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
"github.com/wormhole-foundation/wormhole-explorer/fly/notifier"
"github.com/wormhole-foundation/wormhole-explorer/fly/processor"
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
"github.com/wormhole-foundation/wormhole-explorer/fly/queue"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// Creates a callback to publish VAA messages to a redis pubsub
func NewVAARedisProducerFunc(cfg *config.Configuration, logger *zap.Logger) (producer.PushFunc, error) {
if cfg.IsLocal {
return func(context.Context, *producer.Notification) error {
return nil
}, nil
}
client := NewRedisClient(cfg)
channel := fmt.Sprintf("%s:%s", cfg.Redis.RedisPrefix, cfg.Redis.RedisVaaChannel)
logger.Info("using redis producer", zap.String("channel", channel))
return producer.NewRedisProducer(client, channel).Push, nil
}
// Creates two callbacks depending on whether the execution is local (memory queue) or not (SQS queue)
// callback to obtain queue messages from a queue
// callback to publish vaa non pyth messages to a sink
func NewVAAConsumePublish(ctx context.Context, cfg *config.Configuration, logger *zap.Logger) (health.Check, processor.VAAQueueConsumeFunc, processor.VAAPushFunc) {
if cfg.IsLocal {
vaaQueue := queue.NewVAAInMemory()
return health.Noop(), vaaQueue.Consume, vaaQueue.Publish
}
awsConfig, err := NewAwsConfig(ctx, cfg)
if err != nil {
logger.Fatal("could not create aws config", zap.Error(err))
}
sqsProducer, err := NewSQSProducer(awsConfig, cfg.Aws.SqsUrl)
if err != nil {
logger.Fatal("could not create sqs producer", zap.Error(err))
}
sqsConsumer, err := NewSQSConsumer(cfg.Aws.SqsUrl, ctx, cfg)
if err != nil {
logger.Fatal("could not create sqs consumer", zap.Error(err))
}
vaaQueue := queue.NewVaaSqs(sqsProducer, sqsConsumer, logger)
return health.SQS(awsConfig, cfg.Aws.SqsUrl), vaaQueue.Consume, vaaQueue.Publish
}
func NewVAANotifierFunc(cfg *config.Configuration, logger *zap.Logger) processor.VAANotifyFunc {
if cfg.IsLocal {
return func(context.Context, *vaa.VAA, []byte) error {
return nil
}
}
logger.Info("using redis notifier", zap.String("prefix", cfg.Redis.RedisPrefix))
client := redis.NewClient(&redis.Options{Addr: cfg.Redis.RedisUri})
return notifier.NewLastSequenceNotifier(client, cfg.Redis.RedisPrefix).Notify
}

View File

@ -12,6 +12,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole-explorer/fly/txhash"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
@ -39,6 +40,7 @@ func RunTxHashEncoding(cfg TxHashEncondingConfig) {
metrics.NewDummyMetrics(),
db.Database,
producer.NewVAAInMemory(logger).Push,
txhash.NewMongoTxHash(db.Database, logger),
logger)
workerTxHashEncoding(ctx, logger, repository, vaa.ChainID(cfg.ChainID), cfg.PageSize)

View File

@ -5,13 +5,13 @@ import (
"fmt"
"strconv"
"strings"
"time"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole-explorer/fly/txhash"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
func workerTxHash(ctx context.Context, repo *storage.Repository, line string) error {
func workerTxHash(ctx context.Context, repo *storage.Repository, txHashStore txhash.TxHashStore, line string) error {
tokens := strings.Split(line, ",")
if len(tokens) != 4 {
return fmt.Errorf("invalid line: %s", line)
@ -33,17 +33,16 @@ func workerTxHash(ctx context.Context, repo *storage.Repository, line string) er
txHash := strings.ToLower(tokens[3])
now := time.Now()
id := fmt.Sprintf("%d/%s%s", intChainID, tokens[1], tokens[2])
vaaTxHash := storage.VaaIdTxHashUpdate{
ChainID: vaa.ChainID(intChainID),
Emitter: tokens[1],
Sequence: tokens[2],
TxHash: txHash,
UpdatedAt: &now,
vaaTxHash := txhash.TxHash{
ChainID: vaa.ChainID(intChainID),
Emitter: tokens[1],
Sequence: tokens[2],
TxHash: txHash,
}
err = repo.UpsertTxHash(ctx, vaaTxHash)
err = txHashStore.Set(ctx, id, vaaTxHash)
if err != nil {
return fmt.Errorf("error upserting vaa: %v\n", err)
}

View File

@ -7,10 +7,11 @@ import (
"strings"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole-explorer/fly/txhash"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
func workerVaa(ctx context.Context, repo *storage.Repository, line string) error {
func workerVaa(ctx context.Context, repo *storage.Repository, txHashStore txhash.TxHashStore, line string) error {
tokens := strings.Split(line, ",")
//fmt.Printf("bcid %s, emmiter %s, seq %s\n", header[0], header[1], header[2])
@ -31,7 +32,7 @@ func workerVaa(ctx context.Context, repo *storage.Repository, line string) error
err = repo.UpsertVaa(ctx, v, data)
if err != nil {
return fmt.Errorf("error upserting vaa: %v\n", err)
return fmt.Errorf("error upserting vaa: %v", err)
}
return nil

View File

@ -9,20 +9,22 @@ import (
"github.com/schollz/progressbar/v3"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole-explorer/fly/txhash"
"go.uber.org/zap"
)
type GenericWorker func(ctx context.Context, repo *storage.Repository, item string) error
type GenericWorker func(ctx context.Context, repo *storage.Repository, txHashStore txhash.TxHashStore, item string) error
type Workpool struct {
Workers int
Queue chan string
WG sync.WaitGroup
DB *dbutil.Session
Log *zap.Logger
Bar *progressbar.ProgressBar
WorkerFunc GenericWorker
Repository *storage.Repository
Workers int
Queue chan string
WG sync.WaitGroup
DB *dbutil.Session
Log *zap.Logger
Bar *progressbar.ProgressBar
WorkerFunc GenericWorker
Repository *storage.Repository
TxHashStore txhash.TxHashStore
}
type WorkerConfiguration struct {
@ -76,7 +78,7 @@ func (w *Workpool) Process(ctx context.Context) error {
w.WG.Done()
return nil
}
err = w.WorkerFunc(ctx, w.Repository, line)
err = w.WorkerFunc(ctx, w.Repository, w.TxHashStore, line)
if err != nil {
fmt.Println(err)
break

View File

@ -3,17 +3,12 @@ package config
import (
"context"
"fmt"
"os"
"strconv"
"github.com/joho/godotenv"
"github.com/sethvargo/go-envconfig"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
)
const defaultMaxHealthTimeSeconds = 60
// p2p network configuration constants.
const (
// mainnet p2p config.
@ -39,10 +34,79 @@ type P2pNetworkConfig struct {
P2pBootstrap string
}
// GetP2pNetwork get p2p network config.
func GetP2pNetwork() (*P2pNetworkConfig, error) {
type Configuration struct {
P2pNetwork string `env:"P2P_NETWORK,required"`
Environment string `env:"ENVIRONMENT,required"`
LogLevel string `env:"LOG_LEVEL,default=warn"`
MongoUri string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
MongoEnableQueryLog bool `env:"MONGODB_ENABLE_QUERY_LOG"`
ObservationsChannelSize int `env:"OBSERVATIONS_CHANNEL_SIZE,required"`
VaasChannelSize int `env:"VAAS_CHANNEL_SIZE,required"`
HeartbeatsChannelSize int `env:"HEARTBEATS_CHANNEL_SIZE,required"`
GovernorConfigChannelSize int `env:"GOVERNOR_CONFIG_CHANNEL_SIZE,required"`
GovernorStatusChannelSize int `env:"GOVERNOR_STATUS_CHANNEL_SIZE,required"`
VaasWorkersSize int `env:"VAAS_WORKERS_SIZE,default=5"`
ObservationsWorkersSize int `env:"OBSERVATIONS_WORKERS_SIZE,default=10"`
AlertEnabled bool `env:"ALERT_ENABLED"`
AlertApiKey string `env:"ALERT_API_KEY"`
MetricsEnabled bool `env:"METRICS_ENABLED"`
ApiPort uint `env:"API_PORT,required"`
P2pPort uint `env:"P2P_PORT,required"`
PprofEnabled bool `env:"PPROF_ENABLED"`
MaxHealthTimeSeconds int64 `env:"MAX_HEALTH_TIME_SECONDS,default=60"`
IsLocal bool
Redis *RedisConfiguration
Aws *AwsConfiguration
}
p2pEnviroment := os.Getenv("P2P_NETWORK")
type RedisConfiguration struct {
RedisUri string `env:"REDIS_URI,required"`
RedisPrefix string `env:"REDIS_PREFIX,required"`
RedisVaaChannel string `env:"REDIS_VAA_CHANNEL,required"`
}
type AwsConfiguration struct {
AwsRegion string `env:"AWS_REGION,required"`
AwsAccessKeyID string `env:"AWS_ACCESS_KEY_ID"`
AwsSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"`
AwsEndpoint string `env:"AWS_ENDPOINT"`
SqsUrl string `env:"SQS_URL,required"`
ObservationsSqsUrl string `env:"OBSERVATIONS_SQS_URL,required"`
}
// New creates a configuration with the values from .env file and environment variables.
func New(ctx context.Context, isLocal *bool) (*Configuration, error) {
_ = godotenv.Load(".env", "../.env")
var configuration Configuration
if err := envconfig.Process(ctx, &configuration); err != nil {
return nil, err
}
configuration.IsLocal = isLocal != nil && *isLocal
if !configuration.IsLocal {
var redis RedisConfiguration
if err := envconfig.Process(ctx, &redis); err != nil {
return nil, err
}
configuration.Redis = &redis
var aws AwsConfiguration
if err := envconfig.Process(ctx, &aws); err != nil {
return nil, err
}
configuration.Aws = &aws
}
return &configuration, nil
}
// GetP2pNetwork get p2p network config.
func (c *Configuration) GetP2pNetwork() (*P2pNetworkConfig, error) {
p2pEnviroment := c.P2pNetwork
switch p2pEnviroment {
case domain.P2pMainNet:
@ -56,91 +120,7 @@ func GetP2pNetwork() (*P2pNetworkConfig, error) {
}
}
// GetPprofEnabled get if pprof is enabled.
func GetPprofEnabled() bool {
strPprofEnable := os.Getenv("PPROF_ENABLED")
pprofEnabled, _ := strconv.ParseBool(strPprofEnable)
return pprofEnabled
}
// GetMaxHealthTimeSeconds get MaxHealthTimeSeconds env value.
func GetMaxHealthTimeSeconds() int64 {
var maxHealthTimeSeconds int
strMaxHealthTimeSeconds := os.Getenv("MAX_HEALTH_TIME_SECONDS")
maxHealthTimeSeconds, err := strconv.Atoi(strMaxHealthTimeSeconds)
if err != nil {
maxHealthTimeSeconds = defaultMaxHealthTimeSeconds
}
return int64(maxHealthTimeSeconds)
}
// GetEnvironment get environment.
func GetEnvironment() string {
return os.Getenv("ENVIRONMENT")
}
// GetAlertConfig get alert config.
func GetAlertConfig() (alert.AlertConfig, error) {
return alert.AlertConfig{
Environment: GetEnvironment(),
Enabled: getAlertEnabled(),
ApiKey: getAlertApiKey(),
}, nil
}
// getAlertEnabled get if alert is enabled.
func getAlertEnabled() bool {
strAlertEnabled := os.Getenv("ALERT_ENABLED")
alertEnabled, err := strconv.ParseBool(strAlertEnabled)
if err != nil {
alertEnabled = false
}
return alertEnabled
}
// getAlertApiKey get alert api key.
func getAlertApiKey() string {
return os.Getenv("ALERT_API_KEY")
}
// GetMetricsEnabled get if metrics is enabled.
func GetMetricsEnabled() bool {
strMetricsEnabled := os.Getenv("METRICS_ENABLED")
metricsEnabled, err := strconv.ParseBool(strMetricsEnabled)
if err != nil {
metricsEnabled = false
}
return metricsEnabled
}
func GetPrefix() string {
p2pNetwork, err := GetP2pNetwork()
if err != nil {
return ""
}
prefix := p2pNetwork.Enviroment + "-" + GetEnvironment()
func (c *Configuration) GetPrefix() string {
prefix := c.P2pNetwork + "-" + c.Environment
return prefix
}
type Configuration struct {
ObservationsChannelSize int `env:"OBSERVATIONS_CHANNEL_SIZE,required"`
VaasChannelSize int `env:"VAAS_CHANNEL_SIZE,required"`
HeartbeatsChannelSize int `env:"HEARTBEATS_CHANNEL_SIZE,required"`
GovernorConfigChannelSize int `env:"GOVERNOR_CONFIG_CHANNEL_SIZE,required"`
GovernorStatusChannelSize int `env:"GOVERNOR_STATUS_CHANNEL_SIZE,required"`
ObservationsWorkersSize int `env:"OBSERVATIONS_WORKERS_SIZE,default=10"`
ApiPort uint `env:"API_PORT,required"`
P2pPort uint `env:"P2P_PORT,required"`
}
// New creates a configuration with the values from .env file and environment variables.
func New(ctx context.Context) (*Configuration, error) {
_ = godotenv.Load(".env", "../.env")
var configuration Configuration
if err := envconfig.Process(ctx, &configuration); err != nil {
return nil, err
}
return &configuration, nil
}

View File

@ -1,6 +1,7 @@
package config
import (
"context"
"os"
"testing"
@ -9,11 +10,11 @@ import (
func TestGetPrefix(t *testing.T) {
os.Clearenv()
os.Setenv("P2P_NETWORK", "mainnet")
os.Setenv("ENVIRONMENT", "staging")
prefix := GetPrefix()
cfg := Configuration{
P2pNetwork: "mainnet",
Environment: "staging",
}
prefix := cfg.GetPrefix()
assert.Equal(t, "mainnet-staging", prefix)
}
@ -21,7 +22,7 @@ func TestGetPrefixNoP2P(t *testing.T) {
os.Clearenv()
os.Setenv("ENVIRONMENT", "staging")
prefix := GetPrefix()
assert.Equal(t, "", prefix)
isLocal := true
_, err := New(context.TODO(), &isLocal)
assert.NotNil(t, err)
}

View File

@ -14,9 +14,9 @@ require (
github.com/ethereum/go-ethereum v1.10.21
github.com/go-redis/redis/v8 v8.11.5
github.com/gofiber/fiber/v2 v2.47.0
github.com/joho/godotenv v1.4.0
github.com/joho/godotenv v1.5.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/sethvargo/go-envconfig v0.9.0
github.com/sethvargo/go-envconfig v1.0.0
github.com/stretchr/testify v1.8.4
github.com/test-go/testify v1.1.4
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229
@ -41,6 +41,7 @@ require (
github.com/cosmos/cosmos-proto v1.0.0-alpha8 // indirect
github.com/cosmos/gogoproto v1.4.3 // indirect
github.com/cosmos/ibc-go/v4 v4.2.2 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
@ -52,6 +53,8 @@ require (
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
github.com/holiman/uint256 v1.2.1 // indirect
github.com/influxdata/influxdb-client-go/v2 v2.12.2 // indirect
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect
github.com/ipfs/boxo v0.8.0 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
@ -146,7 +149,7 @@ require (
github.com/gtank/ristretto255 v0.1.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hdevalence/ed25519consensus v0.1.0 // indirect

View File

@ -200,6 +200,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creachadair/taskgroup v0.3.2 h1:zlfutDS+5XG40AOxcHDSThxKzns8Tnr9jnr6VqkYlkM=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/danieljoos/wincred v1.1.2 h1:QLdCxFs1/Yl4zduvBdcHB8goaYk9RARS2SgLLRuAyr0=
github.com/danieljoos/wincred v1.1.2/go.mod h1:GijpziifJoIBfYh+S7BbkdUTU4LfM+QnGqR5Vl2tAx0=
github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -211,6 +212,8 @@ github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c/go.mod h1:6Uh
github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0=
github.com/deepmap/oapi-codegen v1.8.2 h1:SegyeYGcdi0jLLrpbCMoJxnUUn8GBXHsvr4rbzjuhfU=
github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw=
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f h1:U5y3Y5UE0w7amNe7Z5G/twsBW0KEalRQXZzf8ufSh9I=
github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o=
github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk=
@ -220,6 +223,7 @@ github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KP
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
@ -270,9 +274,11 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
@ -298,9 +304,11 @@ github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+35s3my2LFTysnkMfxsJBAMHj/DoqoB9knIWoYG/Vk0=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg=
github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc=
github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
@ -363,6 +371,7 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
@ -382,7 +391,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
@ -483,6 +492,10 @@ github.com/improbable-eng/grpc-web v0.15.0 h1:BN+7z6uNXZ1tQGcNAuaU1YjsLTApzkjt2t
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc=
github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/influxdata/influxdb-client-go/v2 v2.12.2 h1:uYABKdrEKlYm+++qfKdbgaHKBPmoWR5wpbmj6MBB/2g=
github.com/influxdata/influxdb-client-go/v2 v2.12.2/go.mod h1:YteV91FiQxRdccyJ2cHvj2f/5sq4y4Njqu1fQzsQCOU=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/ipfs/boxo v0.8.0 h1:UdjAJmHzQHo/j3g3b1bAcAXCj/GM6iTwvSlBDvPBNBs=
github.com/ipfs/boxo v0.8.0/go.mod h1:RIsi4CnTyQ7AUsNn5gXljJYZlQrHBMnJp94p73liFiA=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
@ -512,8 +525,8 @@ github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHW
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U=
github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ=
github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg=
github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
@ -551,6 +564,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs=
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
@ -587,12 +602,19 @@ github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamh
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU=
github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
@ -807,8 +829,8 @@ github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJ
github.com/schollz/progressbar/v3 v3.13.0 h1:9TeeWRcjW2qd05I8Kf9knPkW4vLM/hYoa6z9ABvxje8=
github.com/schollz/progressbar/v3 v3.13.0/go.mod h1:ZBYnSuLAX2LU8P8UiKN/KgF2DY58AJC8yfVYLPC8Ly4=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE=
github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0=
github.com/sethvargo/go-envconfig v1.0.0 h1:1C66wzy4QrROf5ew4KdVw942CQDa55qmlYmw9FZxZdU=
github.com/sethvargo/go-envconfig v1.0.0/go.mod h1:Lzc75ghUn5ucmcRGIdGQ33DKJrcjk4kihFYgSTBmjIc=
github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY=
github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM=
github.com/shurcooL/github_flavored_markdown v0.0.0-20181002035957-2122de532470/go.mod h1:2dOwnU2uBioM+SGy2aZoq1f/Sd1l9OkAeAUvjSyvgU0=
@ -916,6 +938,8 @@ github.com/valyala/fasthttp v1.43.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seB
github.com/valyala/fasthttp v1.44.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY=
github.com/valyala/fasthttp v1.47.0 h1:y7moDoxYzMooFpT5aHgNgVOQDrS3qlkfiP9mDtGGK9c=
github.com/valyala/fasthttp v1.47.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
@ -1004,6 +1028,8 @@ golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
@ -1159,6 +1185,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1170,6 +1197,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1188,6 +1216,7 @@ golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -1223,6 +1252,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@ -1237,6 +1267,7 @@ golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
@ -1247,6 +1278,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@ -0,0 +1,73 @@
package gossip
import (
"context"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
type governorConfigHandler struct {
govConfigC chan *gossipv1.SignedChainGovernorConfig
repository *storage.Repository
guardian *health.GuardianCheck
metrics metrics.Metrics
logger *zap.Logger
}
func NewGovernorConfigHandler(
govConfigC chan *gossipv1.SignedChainGovernorConfig,
repository *storage.Repository,
guardian *health.GuardianCheck,
metrics metrics.Metrics,
logger *zap.Logger,
) *governorConfigHandler {
return &governorConfigHandler{
govConfigC: govConfigC,
repository: repository,
guardian: guardian,
metrics: metrics,
logger: logger,
}
}
func (h *governorConfigHandler) Start(ctx context.Context) {
// Log governor config
go func() {
for {
select {
case <-ctx.Done():
return
case govConfig := <-h.govConfigC:
h.guardian.Ping(ctx)
nodeName, err := h.getGovernorConfigNodeName(govConfig)
if err != nil {
h.logger.Error("Error getting gov config node name", zap.Error(err))
continue
}
h.metrics.IncGovernorConfigFromGossipNetwork(nodeName)
err = h.repository.UpsertGovernorConfig(govConfig)
if err != nil {
h.logger.Error("Error inserting gov config", zap.Error(err))
} else {
h.metrics.IncGovernorConfigInserted(nodeName)
}
}
}
}()
}
// getGovernorConfigNodeName get node name from governor config.
func (h *governorConfigHandler) getGovernorConfigNodeName(govConfig *gossipv1.SignedChainGovernorConfig) (string, error) {
var gCfg gossipv1.ChainGovernorConfig
err := proto.Unmarshal(govConfig.Config, &gCfg)
if err != nil {
return "", err
}
return gCfg.NodeName, nil
}

View File

@ -0,0 +1,72 @@
package gossip
import (
"context"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
type governorStatusHandler struct {
govStatusC chan *gossipv1.SignedChainGovernorStatus
repository *storage.Repository
guardian *health.GuardianCheck
metrics metrics.Metrics
logger *zap.Logger
}
func NewGovernorStatusHandler(
govStatusC chan *gossipv1.SignedChainGovernorStatus,
repository *storage.Repository,
guardian *health.GuardianCheck,
metrics metrics.Metrics,
logger *zap.Logger,
) *governorStatusHandler {
return &governorStatusHandler{
govStatusC: govStatusC,
repository: repository,
guardian: guardian,
metrics: metrics,
logger: logger,
}
}
func (h *governorStatusHandler) Start(ctx context.Context) {
// Log govStatus
go func() {
for {
select {
case <-ctx.Done():
return
case govStatus := <-h.govStatusC:
h.guardian.Ping(ctx)
nodeName, err := h.getGovernorStatusNodeName(govStatus)
if err != nil {
h.logger.Error("Error getting gov status node name", zap.Error(err))
continue
}
h.metrics.IncGovernorStatusFromGossipNetwork(nodeName)
err = h.repository.UpsertGovernorStatus(govStatus)
if err != nil {
h.logger.Error("Error inserting gov status", zap.Error(err))
} else {
h.metrics.IncGovernorStatusInserted(nodeName)
}
}
}
}()
}
// getGovernorStatusNodeName get node name from governor status.
func (h *governorStatusHandler) getGovernorStatusNodeName(govStatus *gossipv1.SignedChainGovernorStatus) (string, error) {
var gStatus gossipv1.ChainGovernorStatus
err := proto.Unmarshal(govStatus.Status, &gStatus)
if err != nil {
return "", err
}
return gStatus.NodeName, nil
}

View File

@ -0,0 +1,57 @@
package gossip
import (
"context"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.uber.org/zap"
)
type heartbeatsHandler struct {
heartbeatsC chan *gossipv1.Heartbeat
repository *storage.Repository
guardian *health.GuardianCheck
metrics metrics.Metrics
logger *zap.Logger
}
func NewHeartbeatsHandler(
heartbeatsC chan *gossipv1.Heartbeat,
repository *storage.Repository,
guardian *health.GuardianCheck,
metrics metrics.Metrics,
logger *zap.Logger,
) *heartbeatsHandler {
return &heartbeatsHandler{
heartbeatsC: heartbeatsC,
repository: repository,
guardian: guardian,
metrics: metrics,
logger: logger,
}
}
func (h *heartbeatsHandler) Start(ctx context.Context) {
// Log heartbeats
go func() {
for {
select {
case <-ctx.Done():
return
case hb := <-h.heartbeatsC:
h.guardian.Ping(ctx)
h.metrics.IncHeartbeatFromGossipNetwork(hb.NodeName)
err := h.repository.UpsertHeartbeat(hb)
if err != nil {
h.logger.Error("Error inserting heartbeat", zap.Error(err))
} else {
h.metrics.IncHeartbeatInserted(hb.NodeName)
}
}
}
}()
}

View File

@ -0,0 +1,49 @@
package gossip
import (
"context"
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/processor"
)
type observationHandler struct {
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
pushFunc processor.ObservationPushFunc
guardian *health.GuardianCheck
metrics metrics.Metrics
}
func NewObservationHandler(
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation],
pushFunc processor.ObservationPushFunc,
guardian *health.GuardianCheck,
metrics metrics.Metrics,
) *observationHandler {
return &observationHandler{
obsvC: obsvC,
pushFunc: pushFunc,
guardian: guardian,
metrics: metrics,
}
}
func (h *observationHandler) Start(ctx context.Context) {
// Log observations
go func() {
for {
select {
case <-ctx.Done():
return
case m := <-h.obsvC:
o := m.Msg
h.guardian.Ping(ctx)
h.metrics.IncObservationTotal()
h.pushFunc(ctx, o)
}
}
}()
}

30
fly/gossip/types.go Normal file
View File

@ -0,0 +1,30 @@
package gossip
import (
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
)
type GossipChannels struct {
// Outbound gossip message queue
SendChannel chan []byte
// Inbound observations
ObsvChannel chan *common.MsgWithTimeStamp[gossipv1.SignedObservation]
// Inbound observation requests - we don't add a environment because we are going to delete this channel
ObsvReqChannel chan *gossipv1.ObservationRequest
// Inbound signed VAAs
SignedInChannel chan *gossipv1.SignedVAAWithQuorum
// Heartbeat updates
HeartbeatChannel chan *gossipv1.Heartbeat
// Governor cfg
GovConfigChannel chan *gossipv1.SignedChainGovernorConfig
// Governor status
GovStatusChannel chan *gossipv1.SignedChainGovernorStatus
}

85
fly/gossip/vaa_handler.go Normal file
View File

@ -0,0 +1,85 @@
package gossip
import (
"context"
"strings"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/processor"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type vaaHandler struct {
p2pNetworkConfig *config.P2pNetworkConfig
metrics metrics.Metrics
signedInC chan *gossipv1.SignedVAAWithQuorum
vaaHandlerFunc processor.VAAPushFunc
guardian *health.GuardianCheck
logger *zap.Logger
}
func NewVaaHandler(
p2pNetworkConfig *config.P2pNetworkConfig,
metrics metrics.Metrics,
signedInC chan *gossipv1.SignedVAAWithQuorum,
vaaHandlerFunc processor.VAAPushFunc,
guardian *health.GuardianCheck,
logger *zap.Logger,
) *vaaHandler {
return &vaaHandler{
p2pNetworkConfig: p2pNetworkConfig,
metrics: metrics,
signedInC: signedInC,
vaaHandlerFunc: vaaHandlerFunc,
guardian: guardian,
logger: logger,
}
}
func (h *vaaHandler) Start(rootCtx context.Context) {
go func() {
for {
select {
case <-rootCtx.Done():
return
case sVaa := <-h.signedInC:
h.guardian.Ping(rootCtx)
h.metrics.IncVaaTotal()
vaa, err := sdk.Unmarshal(sVaa.Vaa)
if err != nil {
h.logger.Error("Error unmarshalling vaa", zap.Error(err))
continue
}
h.metrics.IncVaaFromGossipNetwork(vaa.EmitterChain)
// apply filter observations by env.
if filterVaasByEnv(vaa, h.p2pNetworkConfig.Enviroment) {
continue
}
// Push an incoming VAA to be processed
if err := h.vaaHandlerFunc(rootCtx, vaa, sVaa.Vaa); err != nil {
h.logger.Error("Error inserting vaa", zap.Error(err))
}
}
}
}()
}
// filterVaasByEnv filter vaa by enviroment.
func filterVaasByEnv(v *sdk.VAA, enviroment string) bool {
if enviroment == domain.P2pTestNet {
vaaFromSolana := v.EmitterChain == sdk.ChainIDSolana
addressToFilter := strings.ToLower(v.EmitterAddress.String()) == "f346195ac02f37d60d4db8ffa6ef74cb1be3550047543a4a9ee9acf4d78697b0"
isPyth := v.EmitterChain == sdk.ChainIDPythNet
if (vaaFromSolana && addressToFilter) || isPyth {
return true
}
}
return false
}

View File

@ -64,3 +64,7 @@ func (d *DummyMetrics) IncGovernorStatusInserted(guardianName string) {}
// IncMaxSequenceCacheError increases the number of errors when updating max sequence cache.
func (d *DummyMetrics) IncMaxSequenceCacheError(chain sdk.ChainID) {}
func (m *DummyMetrics) IncFoundTxHash(t string) {}
func (m *DummyMetrics) IncNotFoundTxHash(t string) {}

View File

@ -34,4 +34,8 @@ type Metrics interface {
// max sequence cache metrics
IncMaxSequenceCacheError(chain sdk.ChainID)
// tx hash metrics
IncFoundTxHash(t string)
IncNotFoundTxHash(t string)
}

View File

@ -16,6 +16,7 @@ type PrometheusMetrics struct {
governorConfigReceivedCount *prometheus.CounterVec
governorStatusReceivedCount *prometheus.CounterVec
maxSequenceCacheCount *prometheus.CounterVec
txHashSearchCount *prometheus.CounterVec
}
// NewPrometheusMetrics returns a new instance of PrometheusMetrics.
@ -98,6 +99,15 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
"service": serviceName,
},
}, []string{"chain"})
txHashSearchCount := promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "tx_hash_search_count_by_store",
Help: "Total number of errors when updating max sequence cache",
ConstLabels: map[string]string{
"environment": environment,
"service": serviceName,
},
}, []string{"store", "action"})
return &PrometheusMetrics{
vaaReceivedCount: vaaReceivedCount,
vaaTotal: vaaTotal,
@ -107,6 +117,7 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics {
governorConfigReceivedCount: governorConfigReceivedCount,
governorStatusReceivedCount: governorStatusReceivedCount,
maxSequenceCacheCount: maxSequenceCacheCount,
txHashSearchCount: txHashSearchCount,
}
}
@ -199,3 +210,11 @@ func (m *PrometheusMetrics) IncGovernorStatusInserted(guardianName string) {
func (m *PrometheusMetrics) IncMaxSequenceCacheError(chain sdk.ChainID) {
m.maxSequenceCacheCount.WithLabelValues(chain.String()).Inc()
}
func (m *PrometheusMetrics) IncFoundTxHash(t string) {
m.txHashSearchCount.WithLabelValues(t, "found").Inc()
}
func (m *PrometheusMetrics) IncNotFoundTxHash(t string) {
m.txHashSearchCount.WithLabelValues(t, "not_found").Inc()
}

View File

@ -108,3 +108,8 @@ func (c *Consumer) GetQueueAttributes(ctx context.Context) (*aws_sqs.GetQueueAtt
}
return c.api.GetQueueAttributes(ctx, params)
}
// GetQueueUrl returns queue url.
func (c *Consumer) GetQueueUrl() string {
return c.url
}

View File

@ -4,271 +4,58 @@ import (
"context"
"flag"
"log"
"strings"
"time"
"fmt"
"os"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/go-redis/redis/v8"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
healthcheck "github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/fly/builder"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
"github.com/wormhole-foundation/wormhole-explorer/fly/deduplicator"
"github.com/wormhole-foundation/wormhole-explorer/fly/gossip"
"github.com/wormhole-foundation/wormhole-explorer/fly/guardiansets"
flyAlert "github.com/wormhole-foundation/wormhole-explorer/fly/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
"github.com/wormhole-foundation/wormhole-explorer/fly/migration"
"github.com/wormhole-foundation/wormhole-explorer/fly/notifier"
"github.com/wormhole-foundation/wormhole-explorer/fly/processor"
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
"github.com/wormhole-foundation/wormhole-explorer/fly/queue"
"github.com/wormhole-foundation/wormhole-explorer/fly/server"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"google.golang.org/protobuf/proto"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/dgraph-io/ristretto"
"github.com/eko/gocache/v3/cache"
"github.com/eko/gocache/v3/store"
crypto2 "github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"github.com/joho/godotenv"
)
var (
rootCtx context.Context
rootCtxCancel context.CancelFunc
)
var (
nodeKeyPath string
logLevel string
)
func getenv(key string) (string, error) {
v := os.Getenv(key)
if v == "" {
return "", fmt.Errorf("[%s] env is required", key)
}
return v, nil
}
// TODO refactor to another file/package
func newAwsConfig(ctx context.Context) (aws.Config, error) {
region, err := getenv("AWS_REGION")
if err != nil {
return *aws.NewConfig(), err
}
awsSecretId, _ := getenv("AWS_ACCESS_KEY_ID")
awsSecretKey, _ := getenv("AWS_SECRET_ACCESS_KEY")
if awsSecretId != "" && awsSecretKey != "" {
credentials := credentials.NewStaticCredentialsProvider(awsSecretId, awsSecretKey, "")
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
awsEndpoint, _ := getenv("AWS_ENDPOINT")
if awsEndpoint != "" {
return aws.Endpoint{
PartitionID: "aws",
URL: awsEndpoint,
SigningRegion: region,
}, nil
}
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
})
awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
awsconfig.WithRegion(region),
awsconfig.WithEndpointResolver(customResolver),
awsconfig.WithCredentialsProvider(credentials),
)
return awsCfg, err
}
return awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(region))
}
// TODO refactor to another file/package
func newSQSProducer(ctx context.Context) (*sqs.Producer, error) {
sqsURL, err := getenv("SQS_URL")
if err != nil {
return nil, err
}
awsConfig, err := newAwsConfig(ctx)
if err != nil {
return nil, err
}
return sqs.NewProducer(awsConfig, sqsURL)
}
// TODO refactor to another file/package
func newSQSConsumer(ctx context.Context) (*sqs.Consumer, error) {
sqsURL, err := getenv("SQS_URL")
if err != nil {
return nil, err
}
awsConfig, err := newAwsConfig(ctx)
if err != nil {
return nil, err
}
return sqs.NewConsumer(awsConfig, sqsURL,
sqs.WithMaxMessages(10),
sqs.WithVisibilityTimeout(120))
}
// TODO refactor to another file/package
func newCache() (cache.CacheInterface[bool], error) {
c, err := ristretto.NewCache(&ristretto.Config{
NumCounters: 10000, // Num keys to track frequency of (1000).
MaxCost: 10 * (1 << 20), // Maximum cost of cache (10 MB).
BufferItems: 64, // Number of keys per Get buffer.
})
if err != nil {
return nil, err
}
store := store.NewRistretto(c)
return cache.New[bool](store), nil
}
// Creates two callbacks depending on whether the execution is local (memory queue) or not (SQS queue)
// callback to obtain queue messages from a queue
// callback to publish vaa non pyth messages to a sink
func newVAAConsumePublish(ctx context.Context, isLocal bool, logger *zap.Logger) (*sqs.Consumer, processor.VAAQueueConsumeFunc, processor.VAAPushFunc) {
if isLocal {
vaaQueue := queue.NewVAAInMemory()
return nil, vaaQueue.Consume, vaaQueue.Publish
}
sqsProducer, err := newSQSProducer(ctx)
if err != nil {
logger.Fatal("could not create sqs producer", zap.Error(err))
}
sqsConsumer, err := newSQSConsumer(ctx)
if err != nil {
logger.Fatal("could not create sqs consumer", zap.Error(err))
}
vaaQueue := queue.NewVAASQS(sqsProducer, sqsConsumer, logger)
return sqsConsumer, vaaQueue.Consume, vaaQueue.Publish
}
func newVAANotifierFunc(isLocal bool, logger *zap.Logger) processor.VAANotifyFunc {
if isLocal {
return func(context.Context, *vaa.VAA, []byte) error {
return nil
}
}
redisUri, err := getenv("REDIS_URI")
if err != nil {
logger.Fatal("could not create vaa notifier ", zap.Error(err))
}
redisPrefix, err := getenv("REDIS_PREFIX")
if err != nil {
logger.Fatal("could not create vaa notifier ", zap.Error(err))
}
logger.Info("using redis notifier", zap.String("prefix", redisPrefix))
client := redis.NewClient(&redis.Options{Addr: redisUri})
return notifier.NewLastSequenceNotifier(client, redisPrefix).Notify
}
func newAlertClient() (alert.AlertClient, error) {
alertConfig, err := config.GetAlertConfig()
if err != nil {
return nil, err
}
if !alertConfig.Enabled {
return alert.NewDummyClient(), nil
}
return alert.NewAlertService(alertConfig, flyAlert.LoadAlerts)
}
func newMetrics(enviroment string) metrics.Metrics {
metricsEnabled := config.GetMetricsEnabled()
if !metricsEnabled {
return metrics.NewDummyMetrics()
}
return metrics.NewPrometheusMetrics(enviroment)
}
// Creates a callback to publish VAA messages to a redis pubsub
func newVAARedisProducerFunc(ctx context.Context, isLocal bool, logger *zap.Logger) (producer.PushFunc, error) {
if isLocal {
return func(context.Context, *producer.Notification) error {
return nil
}, nil
}
redisUri, err := getenv("REDIS_URI")
if err != nil {
logger.Fatal("could not create vaa notifier ", zap.Error(err))
}
redisPrefix, err := getenv("REDIS_PREFIX")
if err != nil {
logger.Fatal("could not create vaa notifier ", zap.Error(err))
}
redisChannel, err := getenv("REDIS_VAA_CHANNEL")
if err != nil {
logger.Fatal("could not create vaa notifier ", zap.Error(err))
}
channel := fmt.Sprintf("%s:%s", redisPrefix, redisChannel)
logger.Info("using redis producer", zap.String("channel", channel))
client := redis.NewClient(&redis.Options{Addr: redisUri})
return producer.NewRedisProducer(client, channel).Push, nil
}
func main() {
//TODO: use a configuration structure to obtain the configuration
_ = godotenv.Load()
// load configuration
cfg, err := config.New(rootCtx)
// Node's main lifecycle context.
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
defer rootCtxCancel()
isLocal := flag.Bool("local", false, "a bool")
flag.Parse()
// Load configuration
cfg, err := config.New(rootCtx, isLocal)
if err != nil {
log.Fatal("Error creating config", err)
}
// Node's main lifecycle context.
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()
// get p2p values to connect p2p network
p2pNetworkConfig, err := config.GetP2pNetwork()
// Get p2p values to connect p2p network
p2pNetworkConfig, err := cfg.GetP2pNetwork()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
nodeKeyPath = "/tmp/node.key"
logLevel = "warn"
nodeKeyPath := "/tmp/node.key"
common.SetRestrictiveUmask()
logger := logger.New("wormhole-fly", logger.WithLevel(logLevel))
isLocal := flag.Bool("local", false, "a bool")
flag.Parse()
logger := logger.New("wormhole-fly", logger.WithLevel(cfg.LogLevel))
// Verify flags
if nodeKeyPath == "" {
@ -278,27 +65,17 @@ func main() {
logger.Fatal("Please specify --bootstrap")
}
// get Alert client
alertClient, err := newAlertClient()
// New alert client
alertClient, err := builder.NewAlertClient(cfg)
if err != nil {
logger.Fatal("could not create alert client", zap.Error(err))
}
// new metrics client
metrics := newMetrics(config.GetEnvironment())
// New metrics client
metrics := builder.NewMetrics(cfg)
// Setup DB
uri := os.Getenv("MONGODB_URI")
if uri == "" {
logger.Fatal("You must set your 'MONGODB_URI' environmental variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable")
}
databaseName := os.Getenv("MONGODB_DATABASE")
if databaseName == "" {
logger.Fatal("You must set your 'MONGODB_DATABASE' environmental variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable")
}
db, err := dbutil.Connect(rootCtx, logger, uri, databaseName, false)
// New database session
db, err := builder.NewDatabase(rootCtx, cfg, logger)
if err != nil {
logger.Fatal("could not connect to DB", zap.Error(err))
}
@ -310,41 +87,28 @@ func main() {
}
// Creates a callback to publish VAA messages to a redis pubsub
vaaRedisProducerFunc, err := newVAARedisProducerFunc(rootCtx, *isLocal, logger)
vaaRedisProducerFunc, err := builder.NewVAARedisProducerFunc(cfg, logger)
if err != nil {
logger.Fatal("could not create vaa redis producer ", zap.Error(err))
logger.Fatal("could not create vaa redis producer", zap.Error(err))
}
// Creates a composite callback to publish VAA messages to a redis pubsub
producerFunc := producer.NewComposite(vaaRedisProducerFunc)
// Creates a callback to publish VAA messages to a redis pubsub
txHashStore, err := builder.NewTxHashStore(rootCtx, cfg, metrics, db.Database, logger)
if err != nil {
logger.Fatal("could not create tx hash store", zap.Error(err))
}
repository := storage.NewRepository(alertClient, metrics, db.Database, producerFunc, txHashStore, logger)
repository := storage.NewRepository(alertClient, metrics, db.Database, producerFunc, logger)
deduplicator, err := builder.NewDeduplicator(logger)
if err != nil {
logger.Fatal("could not create deduplicator", zap.Error(err))
}
// Outbound gossip message queue
sendC := make(chan []byte)
channels := builder.NewGossipChannels(cfg)
// Inbound observations
obsvC := make(chan *common.MsgWithTimeStamp[gossipv1.SignedObservation], cfg.ObservationsChannelSize)
// Inbound observation requests - we don't add a environment because we are going to delete this channel
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, cfg.VaasChannelSize)
// Heartbeat updates
heartbeatC := make(chan *gossipv1.Heartbeat, cfg.HeartbeatsChannelSize)
// Guardian set state managed by processor
gst := common.NewGuardianSetState(heartbeatC)
// Governor cfg
govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, cfg.GovernorConfigChannelSize)
// Governor status
govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, cfg.GovernorStatusChannelSize)
gst := common.NewGuardianSetState(channels.HeartbeatChannel)
// Bootstrap guardian set, otherwise heartbeats would be skipped
// TODO: fetch this and probably figure out how to update it live
@ -354,41 +118,25 @@ func main() {
// Ignore observation requests
// Note: without this, the whole program hangs on observation requests
discardMessages(rootCtx, obsvReqC)
maxHealthTimeSeconds := config.GetMaxHealthTimeSeconds()
guardianCheck := health.NewGuardianCheck(maxHealthTimeSeconds)
discardMessages(rootCtx, channels.ObsvReqChannel)
guardianCheck := health.NewGuardianCheck(cfg.MaxHealthTimeSeconds)
observationConsumer := processor.NewObservationGossipConsumer(repository, gst,
p2pNetworkConfig.Enviroment, cfg.ObservationsChannelSize, cfg.ObservationsWorkersSize, metrics, logger)
observationConsumer.Start(rootCtx)
healthObservations, observationQueueConsume, observationPublish := builder.NewObservationConsumePublish(rootCtx, cfg, logger)
observationGossipConsumer := processor.NewObservationGossipConsumer(observationPublish, gst,
p2pNetworkConfig.Enviroment, cfg.ObservationsChannelSize, cfg.ObservationsWorkersSize, metrics, txHashStore, logger)
observationQueueConsumer := processor.NewObservationQueueConsumer(observationQueueConsume, repository, metrics, logger)
observationGossipConsumer.Start(rootCtx)
observationQueueConsumer.Start(rootCtx)
// Log observations
go func() {
for {
select {
case <-rootCtx.Done():
return
case m := <-obsvC:
o := m.Msg
guardianCheck.Ping(rootCtx)
metrics.IncObservationTotal()
observationConsumer.Push(rootCtx, o)
}
}
}()
observationHandler := gossip.NewObservationHandler(channels.ObsvChannel, observationGossipConsumer.Push, guardianCheck, metrics)
observationHandler.Start(rootCtx)
// Log signed VAAs
cache, err := newCache()
if err != nil {
logger.Fatal("could not create cache", zap.Error(err))
}
isLocalFlag := isLocal != nil && *isLocal
// Creates a deduplicator to discard VAA messages that were processed previously
deduplicator := deduplicator.New(cache, logger)
// Creates two callbacks
sqsConsumer, vaaQueueConsume, nonPythVaaPublish := newVAAConsumePublish(rootCtx, isLocalFlag, logger)
healthVaas, vaaQueueConsume, nonPythVaaPublish := builder.NewVAAConsumePublish(rootCtx, cfg, logger)
// Create a vaa notifier
notifierFunc := newVAANotifierFunc(isLocalFlag, logger)
notifierFunc := builder.NewVAANotifierFunc(cfg, logger)
// Creates a instance to consume VAA messages from Gossip network and handle the messages
// When recive a message, the message filter by deduplicator
// if VAA is from pyhnet should be saved directly to repository
@ -398,110 +146,31 @@ func main() {
vaaQueueConsumer := processor.NewVAAQueueConsumer(vaaQueueConsume, repository, notifierFunc, metrics, logger)
// Creates a wrapper that splits the incoming VAAs into 2 channels (pyth to non pyth) in order
// to be able to process them in a differentiated way
vaaGossipConsumerSplitter := processor.NewVAAGossipSplitterConsumer(vaaGossipConsumer.Push, logger, processor.WithSize(1000))
vaaGossipConsumerSplitter := processor.NewVAAGossipSplitterConsumer(vaaGossipConsumer.Push, cfg.VaasWorkersSize, logger, processor.WithSize(cfg.VaasChannelSize))
vaaQueueConsumer.Start(rootCtx)
vaaGossipConsumerSplitter.Start(rootCtx)
// start fly http server.
pprofEnabled := config.GetPprofEnabled()
server := server.NewServer(cfg.ApiPort, guardianCheck, logger, repository, sqsConsumer, *isLocal, pprofEnabled, alertClient)
healthChecks := []healthcheck.Check{healthObservations, healthVaas, builder.CheckGuardian(guardianCheck)}
pprofEnabled := cfg.PprofEnabled
server := server.NewServer(cfg.ApiPort, guardianCheck, logger, repository, pprofEnabled, alertClient, healthChecks...)
server.Start()
go func() {
for {
select {
case <-rootCtx.Done():
return
case sVaa := <-signedInC:
guardianCheck.Ping(rootCtx)
metrics.IncVaaTotal()
v, err := vaa.Unmarshal(sVaa.Vaa)
if err != nil {
logger.Error("Error unmarshalling vaa", zap.Error(err))
continue
}
// VAA handler
vaaHandler := gossip.NewVaaHandler(p2pNetworkConfig, metrics, channels.SignedInChannel, vaaGossipConsumerSplitter.Push, guardianCheck, logger)
vaaHandler.Start(rootCtx)
metrics.IncVaaFromGossipNetwork(v.EmitterChain)
// apply filter observations by env.
if filterVaasByEnv(v, p2pNetworkConfig.Enviroment) {
continue
}
// Heartbeats handler
hearbeatsHandler := gossip.NewHeartbeatsHandler(channels.HeartbeatChannel, repository, guardianCheck, metrics, logger)
hearbeatsHandler.Start(rootCtx)
// Push an incoming VAA to be processed
if err := vaaGossipConsumerSplitter.Push(rootCtx, v, sVaa.Vaa); err != nil {
logger.Error("Error inserting vaa", zap.Error(err))
}
}
}
}()
// Governor config handler
governorConfigHandler := gossip.NewGovernorConfigHandler(channels.GovConfigChannel, repository, guardianCheck, metrics, logger)
governorConfigHandler.Start(rootCtx)
// Log heartbeats
go func(guardianCheck *health.GuardianCheck) {
for {
select {
case <-rootCtx.Done():
return
case hb := <-heartbeatC:
guardianCheck.Ping(rootCtx)
metrics.IncHeartbeatFromGossipNetwork(hb.NodeName)
err := repository.UpsertHeartbeat(hb)
if err != nil {
logger.Error("Error inserting heartbeat", zap.Error(err))
} else {
metrics.IncHeartbeatInserted(hb.NodeName)
}
}
}
}(guardianCheck)
// Log govConfigs
go func() {
for {
select {
case <-rootCtx.Done():
return
case govConfig := <-govConfigC:
guardianCheck.Ping(rootCtx)
nodeName, err := getGovernorConfigNodeName(govConfig)
if err != nil {
logger.Error("Error getting gov config node name", zap.Error(err))
continue
}
metrics.IncGovernorConfigFromGossipNetwork(nodeName)
err = repository.UpsertGovernorConfig(govConfig)
if err != nil {
logger.Error("Error inserting gov config", zap.Error(err))
} else {
metrics.IncGovernorConfigInserted(nodeName)
}
}
}
}()
// Log govStatus
go func() {
for {
select {
case <-rootCtx.Done():
return
case govStatus := <-govStatusC:
guardianCheck.Ping(rootCtx)
nodeName, err := getGovernorStatusNodeName(govStatus)
if err != nil {
logger.Error("Error getting gov status node name", zap.Error(err))
continue
}
metrics.IncGovernorStatusFromGossipNetwork(nodeName)
err = repository.UpsertGovernorStatus(govStatus)
if err != nil {
logger.Error("Error inserting gov status", zap.Error(err))
} else {
metrics.IncGovernorStatusInserted(nodeName)
}
}
}
}()
// Governor status handler
governorStatusHandler := gossip.NewGovernorStatusHandler(channels.GovStatusChannel, repository, guardianCheck, metrics, logger)
governorStatusHandler.Start(rootCtx)
// Load p2p private key
var priv crypto.PrivKey
@ -523,13 +192,14 @@ func main() {
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
components := p2p.DefaultComponents()
components.Port = cfg.P2pPort
components.WarnChannelOverflow = true
if err := supervisor.Run(ctx, "p2p",
p2p.Run(
obsvC,
obsvReqC,
channels.ObsvChannel,
channels.ObsvReqChannel,
nil,
sendC,
signedInC,
channels.SendChannel,
channels.SignedInChannel,
priv,
gk,
gst,
@ -540,8 +210,8 @@ func main() {
rootCtxCancel,
nil,
nil,
govConfigC,
govStatusC,
channels.GovConfigChannel,
channels.GovStatusChannel,
components,
nil, // ibc feature string
false, // gateway relayer enabled
@ -568,33 +238,13 @@ func main() {
// TODO: wait for things to shut down gracefully
vaaGossipConsumerSplitter.Close()
observationGossipConsumer.Close()
server.Stop()
observationConsumer.Close()
logger.Info("Closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
}
// getGovernorConfigNodeName get node name from governor config.
func getGovernorConfigNodeName(govConfig *gossipv1.SignedChainGovernorConfig) (string, error) {
var gCfg gossipv1.ChainGovernorConfig
err := proto.Unmarshal(govConfig.Config, &gCfg)
if err != nil {
return "", err
}
return gCfg.NodeName, nil
}
// getGovernorStatusNodeName get node name from governor status.
func getGovernorStatusNodeName(govStatus *gossipv1.SignedChainGovernorStatus) (string, error) {
var gStatus gossipv1.ChainGovernorStatus
err := proto.Unmarshal(govStatus.Status, &gStatus)
if err != nil {
return "", err
}
return gStatus.NodeName, nil
}
func discardMessages[T any](ctx context.Context, obsvReqC chan T) {
go func() {
for {
@ -606,16 +256,3 @@ func discardMessages[T any](ctx context.Context, obsvReqC chan T) {
}
}()
}
// filterVaasByEnv filter vaa by enviroment.
func filterVaasByEnv(v *vaa.VAA, enviroment string) bool {
if enviroment == domain.P2pTestNet {
vaaFromSolana := v.EmitterChain == vaa.ChainIDSolana
addressToFilter := strings.ToLower(v.EmitterAddress.String()) == "f346195ac02f37d60d4db8ffa6ef74cb1be3550047543a4a9ee9acf4d78697b0"
isPyth := v.EmitterChain == vaa.ChainIDPythNet
if (vaaFromSolana && addressToFilter) || isPyth {
return true
}
}
return false
}

View File

@ -1,7 +1,6 @@
package notifier
import (
"os"
"testing"
"github.com/test-go/testify/assert"
@ -17,19 +16,18 @@ func TestNewLastSequenceNotifier(t *testing.T) {
func TestNewLastSequenceNotifierBackwardsCompat(t *testing.T) {
prefix := config.GetPrefix()
l := NewLastSequenceNotifier(nil, prefix)
l := NewLastSequenceNotifier(nil, "")
assert.Equal(t, "wormscan:vaa-max-sequence", l.prefix)
}
func TestNewLastSequenceNotifierWithPrefix(t *testing.T) {
os.Setenv("P2P_NETWORK", "mainnet")
os.Setenv("ENVIRONMENT", "staging")
prefix := config.GetPrefix()
cfg := config.Configuration{
Environment: "staging",
P2pNetwork: "mainnet",
}
prefix := cfg.GetPrefix()
l := NewLastSequenceNotifier(nil, prefix)

View File

@ -12,40 +12,43 @@ import (
crypto2 "github.com/ethereum/go-ethereum/crypto"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole-explorer/fly/txhash"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type observationGossipConsumer struct {
signedObsCh chan *gossipv1.SignedObservation
repository *storage.Repository
gst *common.GuardianSetState
environment string
workerSize int
metrics metrics.Metrics
wgBlock sync.WaitGroup
logger *zap.Logger
signedObsCh chan *gossipv1.SignedObservation
observationProcess ObservationPushFunc
gst *common.GuardianSetState
environment string
workerSize int
metrics metrics.Metrics
wgBlock sync.WaitGroup
txHashStore txhash.TxHashStore
logger *zap.Logger
}
// NewObservationGossipConsumer creates a new processor instances.
func NewObservationGossipConsumer(
repository *storage.Repository,
observationProcess ObservationPushFunc,
gst *common.GuardianSetState,
environment string,
channelSize int,
workerSize int,
metrics metrics.Metrics,
txHashStore txhash.TxHashStore,
logger *zap.Logger,
) *observationGossipConsumer {
return &observationGossipConsumer{
repository: repository,
gst: gst,
environment: environment,
workerSize: workerSize,
metrics: metrics,
logger: logger,
signedObsCh: make(chan *gossipv1.SignedObservation, channelSize),
observationProcess: observationProcess,
gst: gst,
environment: environment,
workerSize: workerSize,
metrics: metrics,
txHashStore: txHashStore,
logger: logger,
signedObsCh: make(chan *gossipv1.SignedObservation, channelSize),
}
}
@ -92,7 +95,7 @@ func (c *observationGossipConsumer) process(ctx context.Context, o *gossipv1.Sig
// get chainID from observationID.
chainID, err := getObservationChainID(c.logger, o)
if err != nil {
c.logger.Error("Error getting chainID", zap.Error(err))
c.logger.Error("Error getting chainID", zap.String("id", o.MessageId), zap.Error(err))
return
}
c.metrics.IncObservationFromGossipNetwork(chainID)
@ -104,9 +107,16 @@ func (c *observationGossipConsumer) process(ctx context.Context, o *gossipv1.Sig
c.metrics.IncObservationUnfiltered(chainID)
err = c.repository.UpsertObservation(o)
go func(consumer *observationGossipConsumer, ctx context.Context, obs *gossipv1.SignedObservation) {
err = consumer.txHashStore.SetObservation(ctx, obs)
if err != nil {
consumer.logger.Error("Error setting txHash", zap.Error(err))
}
}(c, ctx, o)
err = c.observationProcess(ctx, o)
if err != nil {
c.logger.Error("Error inserting observation", zap.Error(err))
c.logger.Error("Error processing observation", zap.String("id", o.MessageId), zap.Error(err))
}
}
@ -153,12 +163,12 @@ func filterObservationByEnv(o *gossipv1.SignedObservation, enviroment string) bo
if enviroment == domain.P2pTestNet {
// filter pyth message in testnet gossip network (for solana and pyth chain).
if strings.Contains((o.GetMessageId()), "1/f346195ac02f37d60d4db8ffa6ef74cb1be3550047543a4a9ee9acf4d78697b0") ||
strings.HasPrefix("26/", o.GetMessageId()) {
strings.HasPrefix(o.GetMessageId(), "26/") {
return true
}
}
// filter pyth message in mainnet gossip network (for pyth chain).
if enviroment == domain.P2pMainNet && strings.HasPrefix("26/", o.GetMessageId()) {
if enviroment == domain.P2pMainNet && strings.HasPrefix(o.GetMessageId(), "26/") {
return true
}
return false

View File

@ -0,0 +1,57 @@
package processor
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.uber.org/zap"
)
// ObservationQueueConsumer represents a observation queue consumer.
type ObservationQueueConsumer struct {
consume ObservationQueueConsumeFunc
repository *storage.Repository
metrics metrics.Metrics
logger *zap.Logger
}
// ObservationQueueConsumer creates a new observation queue consumer instances.
func NewObservationQueueConsumer(
consume ObservationQueueConsumeFunc,
repository *storage.Repository,
metrics metrics.Metrics,
logger *zap.Logger) *ObservationQueueConsumer {
return &ObservationQueueConsumer{
consume: consume,
repository: repository,
metrics: metrics,
logger: logger,
}
}
// Start consumes messages from observation queue and store those messages in a repository.
func (c *ObservationQueueConsumer) Start(ctx context.Context) {
go func() {
for msg := range c.consume(ctx) {
obs := msg.Data()
log := c.logger.With(zap.String("id", obs.MessageId))
log.Info("Observation message received")
if msg.IsExpired() {
log.Warn("Message with observation expired")
msg.Failed()
continue
}
err := c.repository.UpsertObservation(ctx, obs)
if err != nil {
log.Error("Error inserting observation in repository", zap.Error(err))
msg.Failed()
continue
}
msg.Done(ctx)
c.logger.Info("Observation saved in repository")
}
}()
}

24
fly/processor/types.go Normal file
View File

@ -0,0 +1,24 @@
package processor
import (
"context"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/fly/queue"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// VAAPushFunc is a function to push VAA message.
type VAAPushFunc func(context.Context, *vaa.VAA, []byte) error
// VAANotifyFunc is a function to notify saved VAA message.
type VAANotifyFunc func(context.Context, *vaa.VAA, []byte) error
// VAAQueueConsumeFunc is a function to obtain messages from a queue
type VAAQueueConsumeFunc func(context.Context) <-chan queue.Message[[]byte]
// ObservationPushFunc is a function to push observation message.
type ObservationPushFunc func(ctx context.Context, o *gossipv1.SignedObservation) error
// VAAQueueConsumeFunc is a function to obtain messages from a queue
type ObservationQueueConsumeFunc func(context.Context) <-chan queue.Message[*gossipv1.SignedObservation]

View File

@ -2,6 +2,7 @@ package processor
import (
"context"
"fmt"
"github.com/wormhole-foundation/wormhole-explorer/fly/deduplicator"
"github.com/wormhole-foundation/wormhole-explorer/fly/guardiansets"
@ -48,7 +49,8 @@ func (p *vaaGossipConsumer) Push(ctx context.Context, v *vaa.VAA, serializedVaa
return err
}
err := p.deduplicator.Apply(ctx, v.MessageID(), func() error {
key := fmt.Sprintf("vaa:%s", v.MessageID())
err := p.deduplicator.Apply(ctx, key, func() error {
p.metrics.IncVaaUnfiltered(v.EmitterChain)
if vaa.ChainIDPythNet == v.EmitterChain {
return p.pythProcess(ctx, v, serializedVaa)

View File

@ -2,6 +2,7 @@ package processor
import (
"context"
"sync"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
@ -12,11 +13,13 @@ type VAAGossipConsumerSplitterOption func(*VAAGossipConsumerSplitter)
// VAAGossipConsumerSplitter represents a vaa message splitter.
type VAAGossipConsumerSplitter struct {
push VAAPushFunc
pythCh chan *sppliterMessage
nonPythCh chan *sppliterMessage
logger *zap.Logger
size int
push VAAPushFunc
pythCh chan *sppliterMessage
nonPythCh chan *sppliterMessage
logger *zap.Logger
workerSize int
wgBlock sync.WaitGroup
size int
}
type sppliterMessage struct {
@ -27,12 +30,14 @@ type sppliterMessage struct {
// NewVAAGossipSplitterConsumer creates a splitter instance.
func NewVAAGossipSplitterConsumer(
publish VAAPushFunc,
workerSize int,
logger *zap.Logger,
opts ...VAAGossipConsumerSplitterOption) *VAAGossipConsumerSplitter {
v := &VAAGossipConsumerSplitter{
push: publish,
logger: logger,
size: 50,
push: publish,
logger: logger,
workerSize: workerSize,
size: 50,
}
for _, opt := range opts {
opt(v)
@ -74,7 +79,10 @@ func (p *VAAGossipConsumerSplitter) Push(ctx context.Context, v *vaa.VAA, serial
// Start runs two go routine to process messages for both channels.
func (p *VAAGossipConsumerSplitter) Start(ctx context.Context) {
go p.executePyth(ctx)
for i := 0; i < p.workerSize; i++ {
p.wgBlock.Add(1)
go p.executePyth(ctx)
}
go p.executeNonPyth(ctx)
}
@ -82,9 +90,11 @@ func (p *VAAGossipConsumerSplitter) Start(ctx context.Context) {
func (p *VAAGossipConsumerSplitter) Close() {
close(p.nonPythCh)
close(p.pythCh)
p.wgBlock.Wait()
}
func (p *VAAGossipConsumerSplitter) executePyth(ctx context.Context) {
defer p.wgBlock.Done()
for {
select {
case <-ctx.Done():

View File

@ -19,7 +19,7 @@ func TestVAAGossipConsumerSplitter_PushPyth(t *testing.T) {
return nil
}
logger := zaptest.NewLogger(t)
splitter := NewVAAGossipSplitterConsumer(pushFunc, logger, WithSize(1))
splitter := NewVAAGossipSplitterConsumer(pushFunc, 1, logger, WithSize(1))
splitter.Start(ctx)
splitter.Push(ctx, &vaa.VAA{EmitterChain: vaa.ChainIDPythNet, Sequence: 1}, nil)
@ -41,7 +41,7 @@ func TestVAAGossipConsumerSplitter_PushNonPyth(t *testing.T) {
return nil
}
logger := zaptest.NewLogger(t)
splitter := NewVAAGossipSplitterConsumer(pushFunc, logger, WithSize(1))
splitter := NewVAAGossipSplitterConsumer(pushFunc, 1, logger, WithSize(1))
splitter.Start(ctx)
splitter.Push(ctx, &vaa.VAA{EmitterChain: vaa.ChainIDEthereum, Sequence: 1}, nil)

View File

@ -4,16 +4,12 @@ import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/queue"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// VAAQueueConsumeFunc is a function to obtain messages from a queue
type VAAQueueConsumeFunc func(context.Context) <-chan queue.Message
// VAAQueueConsumer represents a VAA queue consumer.
type VAAQueueConsumer struct {
consume VAAQueueConsumeFunc
@ -78,7 +74,7 @@ func (c *VAAQueueConsumer) Start(ctx context.Context) {
}
msg.Done(ctx)
c.logger.Info("Vaa save in repository", zap.String("id", v.MessageID()))
c.logger.Info("Vaa saved in repository", zap.String("id", v.MessageID()))
}
}()
}

View File

@ -1,13 +0,0 @@
package processor
import (
"context"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
// VAAPushFunc is a function to push VAA message.
type VAAPushFunc func(context.Context, *vaa.VAA, []byte) error
// VAANotifyFunc is a function to notify saved VAA message.
type VAANotifyFunc func(context.Context, *vaa.VAA, []byte) error

23
fly/queue/converter.go Normal file
View File

@ -0,0 +1,23 @@
package queue
import gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
func toObservation(o *gossipv1.SignedObservation) Observation {
return Observation{
Addr: o.Addr,
Hash: o.Hash,
Signature: o.Signature,
TxHash: o.TxHash,
MessageID: o.MessageId,
}
}
func fromObservation(o *Observation) *gossipv1.SignedObservation {
return &gossipv1.SignedObservation{
Addr: o.Addr,
Hash: o.Hash,
Signature: o.Signature,
TxHash: o.TxHash,
MessageId: o.MessageID,
}
}

55
fly/queue/message.go Normal file
View File

@ -0,0 +1,55 @@
package queue
import (
"context"
"sync"
"time"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
"go.uber.org/zap"
)
type sqsConsumerMessage[T any] struct {
data T
consumer *sqs.Consumer
id *string
logger *zap.Logger
expiredAt time.Time
wg *sync.WaitGroup
ctx context.Context
}
func (m *sqsConsumerMessage[T]) Data() T {
return m.data
}
func (m *sqsConsumerMessage[T]) Done(ctx context.Context) {
if err := m.consumer.DeleteMessage(ctx, m.id); err != nil {
m.logger.Error("Error deleting message from SQS", zap.Error(err))
}
m.wg.Done()
}
func (m *sqsConsumerMessage[T]) Failed() {
m.wg.Done()
}
func (m *sqsConsumerMessage[T]) IsExpired() bool {
return m.expiredAt.Before(time.Now())
}
type memoryConsumerMessageQueue[T any] struct {
data T
}
func (m *memoryConsumerMessageQueue[T]) Data() T {
return m.data
}
func (m *memoryConsumerMessageQueue[T]) Done(_ context.Context) {}
func (m *memoryConsumerMessageQueue[T]) Failed() {}
func (m *memoryConsumerMessageQueue[T]) IsExpired() bool {
return false
}

View File

@ -0,0 +1,46 @@
package queue
import (
"context"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
)
// VAAInMemoryOption represents a VAA queue in memory option function.
type ObservationInMemoryOption func(*ObservationInMemory)
// VAAInMemory represents VAA queue in memory.
type ObservationInMemory struct {
ch chan Message[*gossipv1.SignedObservation]
size int
}
// NewVAAInMemory creates a VAA queue in memory instances.
func NewObservationInMemory(opts ...ObservationInMemoryOption) *ObservationInMemory {
m := &ObservationInMemory{size: 100}
for _, opt := range opts {
opt(m)
}
m.ch = make(chan Message[*gossipv1.SignedObservation], m.size)
return m
}
// WithSize allows to specify an channel size when setting a value.
func ObservationWithSize(v int) ObservationInMemoryOption {
return func(i *ObservationInMemory) {
i.size = v
}
}
// Publish sends the message to a channel.
func (i *ObservationInMemory) Publish(_ context.Context, o *gossipv1.SignedObservation) error {
i.ch <- &memoryConsumerMessageQueue[*gossipv1.SignedObservation]{
data: o,
}
return nil
}
// Consume returns the channel with the received messages.
func (i *ObservationInMemory) Consume(_ context.Context) <-chan Message[*gossipv1.SignedObservation] {
return i.ch
}

View File

@ -0,0 +1,91 @@
package queue
import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"sync"
"time"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
"go.uber.org/zap"
)
// ObservationSqs represents a observation queue in SQS.
type ObservationSqs struct {
producer *sqs.Producer
consumer *sqs.Consumer
ch chan Message[*gossipv1.SignedObservation]
chSize int
wg sync.WaitGroup
logger *zap.Logger
}
// NewObservationSqs creates a observation queue in SQS instances.
func NewObservationSqs(producer *sqs.Producer, consumer *sqs.Consumer, logger *zap.Logger, opts ...VAASqsOption) *ObservationSqs {
s := &ObservationSqs{
producer: producer,
consumer: consumer,
chSize: 10,
logger: logger.With(zap.String("queueUrl", consumer.GetQueueUrl()))}
s.ch = make(chan Message[*gossipv1.SignedObservation], s.chSize)
return s
}
// Publish sends the message to a SQS queue.
func (q *ObservationSqs) Publish(ctx context.Context, o *gossipv1.SignedObservation) error {
dto := toObservation(o)
body, err := json.Marshal(dto)
if err != nil {
return err
}
id := fmt.Sprintf("%s/%s/%s", o.MessageId, hex.EncodeToString(o.Addr), hex.EncodeToString(o.Hash))
deduplicationId := base64.StdEncoding.EncodeToString([]byte(id))[:127]
return q.producer.SendMessage(ctx, deduplicationId, deduplicationId, string(body))
}
// Consume returns the channel with the received messages from SQS queue.
func (q *ObservationSqs) Consume(ctx context.Context) <-chan Message[*gossipv1.SignedObservation] {
go func() {
for {
messages, err := q.consumer.GetMessages(ctx)
if err != nil {
q.logger.Error("Error getting messages from SQS", zap.Error(err))
continue
}
q.logger.Info("Received messages from SQS", zap.Int("count", len(messages)))
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
for _, msg := range messages {
var obs Observation
err := json.Unmarshal([]byte(*msg.Body), &obs)
if err != nil {
q.logger.Error("Error decoding message from SQS", zap.Error(err))
continue
}
q.logger.Info("Observation message received", zap.String("id", obs.MessageID))
//TODO check if callback is better than channel
q.wg.Add(1)
q.ch <- &sqsConsumerMessage[*gossipv1.SignedObservation]{
id: msg.ReceiptHandle,
data: fromObservation(&obs),
wg: &q.wg,
logger: q.logger,
consumer: q.consumer,
expiredAt: expiredAt,
ctx: ctx,
}
}
q.wg.Wait()
}
}()
return q.ch
}
// Close closes all consumer resources.
func (q *ObservationSqs) Close() {
close(q.ch)
}

View File

@ -1,11 +1,22 @@
package queue
import "context"
import (
"context"
)
// Message represents a message from a queue.
type Message interface {
Data() []byte
type Message[T any] interface {
Data() T
Done(context.Context)
Failed()
IsExpired() bool
}
// Observation represents a signed observation.
type Observation struct {
Addr []byte `json:"addr"`
Hash []byte `json:"hash"`
Signature []byte `json:"signature"`
TxHash []byte `json:"txHash"`
MessageID string `json:"messageId"`
}

View File

@ -11,7 +11,7 @@ type VAAInMemoryOption func(*VAAInMemory)
// VAAInMemory represents VAA queue in memory.
type VAAInMemory struct {
ch chan Message
ch chan Message[[]byte]
size int
}
@ -21,7 +21,7 @@ func NewVAAInMemory(opts ...VAAInMemoryOption) *VAAInMemory {
for _, opt := range opts {
opt(m)
}
m.ch = make(chan Message, m.size)
m.ch = make(chan Message[[]byte], m.size)
return m
}
@ -41,7 +41,7 @@ func (i *VAAInMemory) Publish(_ context.Context, v *vaa.VAA, data []byte) error
}
// Consume returns the channel with the received messages.
func (i *VAAInMemory) Consume(_ context.Context) <-chan Message {
func (i *VAAInMemory) Consume(_ context.Context) <-chan Message[[]byte] {
return i.ch
}

View File

@ -13,49 +13,49 @@ import (
"go.uber.org/zap"
)
// SQSOption represents a VAA queue in SQS option function.
type SQSOption func(*SQS)
// VAASqsOption represents a VAA queue in SQS option function.
type VAASqsOption func(*VAASqs)
// SQS represents a VAA queue in SQS.
type SQS struct {
// VAASqs represents a VAA queue in VAASqs.
type VAASqs struct {
producer *sqs.Producer
consumer *sqs.Consumer
ch chan Message
ch chan Message[[]byte]
chSize int
wg sync.WaitGroup
logger *zap.Logger
}
// NewVAASQS creates a VAA queue in SQS instances.
func NewVAASQS(producer *sqs.Producer, consumer *sqs.Consumer, logger *zap.Logger, opts ...SQSOption) *SQS {
s := &SQS{
// NewVaaSqs creates a VAA queue in SQS instances.
func NewVaaSqs(producer *sqs.Producer, consumer *sqs.Consumer, logger *zap.Logger, opts ...VAASqsOption) *VAASqs {
s := &VAASqs{
producer: producer,
consumer: consumer,
chSize: 10,
logger: logger}
logger: logger.With(zap.String("queueUrl", consumer.GetQueueUrl()))}
for _, opt := range opts {
opt(s)
}
s.ch = make(chan Message, s.chSize)
s.ch = make(chan Message[[]byte], s.chSize)
return s
}
// WithChannelSize allows to specify an channel size when setting a value.
func WithChannelSize(size int) SQSOption {
return func(d *SQS) {
func WithChannelSize(size int) VAASqsOption {
return func(d *VAASqs) {
d.chSize = size
}
}
// Publish sends the message to a SQS queue.
func (q *SQS) Publish(ctx context.Context, v *vaa.VAA, data []byte) error {
func (q *VAASqs) Publish(ctx context.Context, v *vaa.VAA, data []byte) error {
body := base64.StdEncoding.EncodeToString(data)
groupID := fmt.Sprintf("%d/%s", v.EmitterChain, v.EmitterAddress)
return q.producer.SendMessage(ctx, groupID, v.MessageID(), body)
}
// Consume returns the channel with the received messages from SQS queue.
func (q *SQS) Consume(ctx context.Context) <-chan Message {
func (q *VAASqs) Consume(ctx context.Context) <-chan Message[[]byte] {
go func() {
for {
messages, err := q.consumer.GetMessages(ctx)
@ -73,7 +73,7 @@ func (q *SQS) Consume(ctx context.Context) <-chan Message {
//TODO check if callback is better than channel
q.wg.Add(1)
q.ch <- &sqsConsumerMessage{
q.ch <- &sqsConsumerMessage[[]byte]{
id: msg.ReceiptHandle,
data: body,
wg: &q.wg,
@ -90,35 +90,6 @@ func (q *SQS) Consume(ctx context.Context) <-chan Message {
}
// Close closes all consumer resources.
func (q *SQS) Close() {
func (q *VAASqs) Close() {
close(q.ch)
}
type sqsConsumerMessage struct {
data []byte
consumer *sqs.Consumer
id *string
logger *zap.Logger
expiredAt time.Time
wg *sync.WaitGroup
ctx context.Context
}
func (m *sqsConsumerMessage) Data() []byte {
return m.data
}
func (m *sqsConsumerMessage) Done(ctx context.Context) {
if err := m.consumer.DeleteMessage(ctx, m.id); err != nil {
m.logger.Error("Error deleting message from SQS", zap.Error(err))
}
m.wg.Done()
}
func (m *sqsConsumerMessage) Failed() {
m.wg.Done()
}
func (m *sqsConsumerMessage) IsExpired() bool {
return m.expiredAt.Before(time.Now())
}

View File

@ -1,125 +0,0 @@
package server
import (
"context"
"errors"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
flyAlert "github.com/wormhole-foundation/wormhole-explorer/fly/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.uber.org/zap"
)
// Controller definition.
type Controller struct {
guardianCheck *health.GuardianCheck
repository *storage.Repository
consumer *sqs.Consumer
isLocal bool
logger *zap.Logger
alertClient alert.AlertClient
}
// NewController creates a Controller instance.
func NewController(gCheck *health.GuardianCheck, repo *storage.Repository, consumer *sqs.Consumer, isLocal bool, alertClient alert.AlertClient, logger *zap.Logger) *Controller {
return &Controller{guardianCheck: gCheck, repository: repo, consumer: consumer, isLocal: isLocal, alertClient: alertClient, logger: logger}
}
// HealthCheck handler for the endpoint /health.
func (c *Controller) HealthCheck(ctx *fiber.Ctx) error {
// check guardian gossip network is ready.
guardianErr := c.checkGuardianStatus(ctx.Context())
if guardianErr != nil {
c.logger.Error("Health check failed", zap.Error(guardianErr))
// send alert when exists an error saving ptth vaa.
alertContext := alert.AlertContext{
Error: guardianErr,
}
c.alertClient.CreateAndSend(ctx.Context(), flyAlert.ErrorGuardianNoActivity, alertContext)
return ctx.Status(fiber.StatusInternalServerError).JSON(struct {
Status string `json:"status"`
Error string `json:"error"`
}{Status: "NO", Error: guardianErr.Error()})
}
return ctx.JSON(struct {
Status string `json:"status"`
}{Status: "OK"})
}
// ReadyCheck handler for the endpoint /ready
func (c *Controller) ReadyCheck(ctx *fiber.Ctx) error {
// check mongo db is ready.
mongoErr := c.checkMongoStatus(ctx.Context())
if mongoErr != nil {
c.logger.Error("Ready check failed", zap.Error(mongoErr))
return ctx.Status(fiber.StatusInternalServerError).JSON(struct {
Ready string `json:"ready"`
Error string `json:"error"`
}{Ready: "NO", Error: mongoErr.Error()})
}
// check aws SQS is ready.
queueErr := c.checkQueueStatus(ctx.Context())
if queueErr != nil {
c.logger.Error("Ready check failed", zap.Error(queueErr))
return ctx.Status(fiber.StatusInternalServerError).JSON(struct {
Ready string `json:"ready"`
Error string `json:"error"`
}{Ready: "NO", Error: queueErr.Error()})
}
// return success response.
return ctx.Status(fiber.StatusOK).JSON(struct {
Ready string `json:"ready"`
}{Ready: "OK"})
}
func (c *Controller) checkMongoStatus(ctx context.Context) error {
mongoStatus, err := c.repository.GetMongoStatus(ctx)
if err != nil {
return err
}
// check mongo server status
mongoStatusCheck := (mongoStatus.Ok == 1 && mongoStatus.Pid > 0 && mongoStatus.Uptime > 0)
if !mongoStatusCheck {
return errors.New("mongo invalid status")
}
// check mongo connections
if mongoStatus.Connections.Available <= 0 {
return errors.New("mongo hasn't available connections")
}
return nil
}
func (c *Controller) checkQueueStatus(ctx context.Context) error {
// vaa queue handle in memory [local enviroment]
if c.isLocal {
return nil
}
// get queue attributes
queueAttributes, err := c.consumer.GetQueueAttributes(ctx)
if err != nil {
return err
}
if queueAttributes == nil {
return errors.New("can't get attributes for sqs")
}
// check queue created
createdTimestamp := queueAttributes.Attributes["CreatedTimestamp"]
if createdTimestamp == "" {
return errors.New("sqs queue hasn't been created")
}
return nil
}
func (c *Controller) checkGuardianStatus(ctx context.Context) error {
isAlive := c.guardianCheck.IsAlive()
if !isAlive {
return errors.New("guardian healthcheck not arrive in time")
}
return nil
}

View File

@ -7,8 +7,8 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/pprof"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
healthcheck "github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.uber.org/zap"
)
@ -19,9 +19,9 @@ type Server struct {
logger *zap.Logger
}
func NewServer(port uint, guardianCheck *health.GuardianCheck, logger *zap.Logger, repository *storage.Repository, consumer *sqs.Consumer, isLocal, pprofEnabled bool, alertClient alert.AlertClient) *Server {
ctrl := NewController(guardianCheck, repository, consumer, isLocal, alertClient, logger)
func NewServer(port uint, guardianCheck *health.GuardianCheck, logger *zap.Logger, repository *storage.Repository, pprofEnabled bool, alertClient alert.AlertClient, checks ...healthcheck.Check) *Server {
app := fiber.New(fiber.Config{DisableStartupMessage: true})
ctrl := healthcheck.NewController(checks, logger)
// Configure middleware
prometheus := fiberprometheus.New("wormscan-fly")

View File

@ -64,15 +64,6 @@ func (v *ObservationUpdate) ToMap() map[string]string {
}
}
type VaaIdTxHashUpdate struct {
ChainID vaa.ChainID `bson:"emitterChain"`
Emitter string `bson:"emitterAddr"`
Sequence string `bson:"sequence"`
TxHash string `bson:"txHash"`
OriginTxHash *string `bson:"_originTxHash,omitempty"` //this is temporary field for fix enconding txHash
UpdatedAt *time.Time `bson:"updatedAt"`
}
func indexedAt(t time.Time) IndexingTimestamps {
return IndexingTimestamps{
IndexedAt: t,

View File

@ -17,6 +17,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/track"
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
"github.com/wormhole-foundation/wormhole-explorer/fly/txhash"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
@ -31,6 +32,7 @@ type Repository struct {
metrics metrics.Metrics
db *mongo.Database
afterUpdate producer.PushFunc
txHashStore txhash.TxHashStore
log *zap.Logger
collections struct {
vaas *mongo.Collection
@ -40,13 +42,16 @@ type Repository struct {
governorStatus *mongo.Collection
vaasPythnet *mongo.Collection
vaaCounts *mongo.Collection
vaaIdTxHash *mongo.Collection
}
}
// TODO wrap repository with a service that filters using redis
func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics, db *mongo.Database, vaaTopicFunc producer.PushFunc, log *zap.Logger) *Repository {
return &Repository{alertService, metrics, db, vaaTopicFunc, log, struct {
func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics,
db *mongo.Database,
vaaTopicFunc producer.PushFunc,
txHashStore txhash.TxHashStore,
log *zap.Logger) *Repository {
return &Repository{alertService, metrics, db, vaaTopicFunc, txHashStore, log, struct {
vaas *mongo.Collection
heartbeats *mongo.Collection
observations *mongo.Collection
@ -54,7 +59,6 @@ func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics, db *
governorStatus *mongo.Collection
vaasPythnet *mongo.Collection
vaaCounts *mongo.Collection
vaaIdTxHash *mongo.Collection
}{
vaas: db.Collection("vaas"),
heartbeats: db.Collection("heartbeats"),
@ -62,8 +66,7 @@ func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics, db *
governorConfig: db.Collection("governorConfig"),
governorStatus: db.Collection("governorStatus"),
vaasPythnet: db.Collection("vaasPythnet"),
vaaCounts: db.Collection("vaaCounts"),
vaaIdTxHash: db.Collection("vaaIdTxHash")}}
vaaCounts: db.Collection("vaaCounts")}}
}
func (s *Repository) UpsertVaa(ctx context.Context, v *vaa.VAA, serializedVaa []byte) error {
@ -101,11 +104,13 @@ func (s *Repository) UpsertVaa(ctx context.Context, v *vaa.VAA, serializedVaa []
s.alertClient.CreateAndSend(ctx, flyAlert.ErrorSavePyth, alertContext)
}
} else {
var vaaIdTxHash VaaIdTxHashUpdate
if err := s.collections.vaaIdTxHash.FindOne(ctx, bson.M{"_id": id}).Decode(&vaaIdTxHash); err != nil {
txHash, err := s.txHashStore.Get(ctx, id)
if err != nil {
s.log.Warn("Finding vaaIdTxHash", zap.String("id", id), zap.Error(err))
}
vaaDoc.TxHash = vaaIdTxHash.TxHash
if txHash != nil {
vaaDoc.TxHash = txHash.TxHash
}
result, err = s.collections.vaas.UpdateByID(ctx, id, update, opts)
if err != nil {
// send alert when exists an error saving vaa.
@ -142,8 +147,7 @@ func (s *Repository) UpsertVaa(ctx context.Context, v *vaa.VAA, serializedVaa []
return err
}
func (s *Repository) UpsertObservation(o *gossipv1.SignedObservation) error {
ctx := context.TODO()
func (s *Repository) UpsertObservation(ctx context.Context, o *gossipv1.SignedObservation) error {
vaaID := strings.Split(o.MessageId, "/")
chainIDStr, emitter, sequenceStr := vaaID[0], vaaID[1], vaaID[2]
id := fmt.Sprintf("%s/%s/%s", o.MessageId, hex.EncodeToString(o.Addr), hex.EncodeToString(o.Hash))
@ -207,17 +211,16 @@ func (s *Repository) UpsertObservation(o *gossipv1.SignedObservation) error {
s.metrics.IncObservationWithoutTxHash(vaa.ChainID(chainID))
}
vaaTxHash := VaaIdTxHashUpdate{
ChainID: vaa.ChainID(chainID),
Emitter: emitter,
Sequence: strconv.FormatUint(sequence, 10),
TxHash: txHash,
UpdatedAt: &now,
vaaTxHash := txhash.TxHash{
ChainID: vaa.ChainID(chainID),
Emitter: emitter,
Sequence: strconv.FormatUint(sequence, 10),
TxHash: txHash,
}
err = s.UpsertTxHash(ctx, vaaTxHash)
err = s.txHashStore.Set(ctx, o.MessageId, vaaTxHash)
if err != nil {
s.log.Error("Error inserting vaaIdTxHash", zap.Error(err))
s.log.Error("Error setting txHash", zap.Error(err))
return err
}
@ -236,32 +239,9 @@ func (s *Repository) ReplaceVaaTxHash(ctx context.Context, vaaID, oldTxHash, new
if err != nil {
return nil
}
_, err = s.collections.vaaIdTxHash.UpdateByID(ctx, vaaID, update)
if err != nil {
return nil
}
return nil
}
func (s *Repository) UpsertTxHash(ctx context.Context, vaaTxHash VaaIdTxHashUpdate) error {
id := fmt.Sprintf("%d/%s/%s", vaaTxHash.ChainID, vaaTxHash.Emitter, vaaTxHash.Sequence)
updateVaaTxHash := bson.M{
"$set": vaaTxHash,
"$setOnInsert": indexedAt(time.Now()),
"$inc": bson.D{{Key: "revision", Value: 1}},
}
_, err := s.collections.vaaIdTxHash.UpdateByID(ctx, id, updateVaaTxHash, options.Update().SetUpsert(true))
if err != nil {
s.log.Error("Error inserting vaaIdTxHash", zap.Error(err))
return err
}
return err
}
func (s *Repository) UpsertHeartbeat(hb *gossipv1.Heartbeat) error {
id := hb.GuardianAddr
now := time.Now()

56
fly/txhash/cache.go Normal file
View File

@ -0,0 +1,56 @@
package txhash
import (
"context"
"time"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/eko/gocache/v3/cache"
"github.com/eko/gocache/v3/store"
"go.uber.org/zap"
)
type cacheTxHash struct {
cache cache.CacheInterface[TxHash]
expiration time.Duration
logger *zap.Logger
}
func NewCacheTxHash(cache cache.CacheInterface[TxHash],
expiration time.Duration,
logger *zap.Logger) *cacheTxHash {
return &cacheTxHash{
cache: cache,
expiration: expiration,
logger: logger,
}
}
func (t *cacheTxHash) Set(ctx context.Context, vaaID string, txHash TxHash) error {
if err := t.cache.Set(ctx, vaaID, txHash, store.WithCost(16), store.WithExpiration(t.expiration)); err != nil {
t.logger.Error("Error setting tx hash in cache", zap.Error(err))
return err
}
return nil
}
func (r *cacheTxHash) SetObservation(ctx context.Context, o *gossipv1.SignedObservation) error {
txHash, err := CreateTxHash(r.logger, o)
if err != nil {
r.logger.Error("Error creating txHash", zap.Error(err))
return err
}
return r.Set(ctx, o.MessageId, *txHash)
}
func (r *cacheTxHash) Get(ctx context.Context, vaaID string) (*TxHash, error) {
txHash, err := r.cache.Get(ctx, vaaID)
if err == nil {
return &txHash, nil
}
return nil, ErrTxHashNotFound
}
func (r *cacheTxHash) GetName() string {
return "memory"
}

94
fly/txhash/composite.go Normal file
View File

@ -0,0 +1,94 @@
package txhash
import (
"context"
"strconv"
"strings"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/hashicorp/go-multierror"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
type composite struct {
hashStores []TxHashStore
metrics metrics.Metrics
logger *zap.Logger
}
func NewComposite(
hashStores []TxHashStore,
metrics metrics.Metrics,
logger *zap.Logger) *composite {
return &composite{
hashStores: hashStores,
metrics: metrics,
logger: logger,
}
}
func (t *composite) Set(ctx context.Context, vaaID string, txHash TxHash) error {
var result multierror.Error
for _, store := range t.hashStores {
if err := store.Set(ctx, vaaID, txHash); err != nil {
t.logger.Error("Error setting tx hash",
zap.String("vaaId", vaaID),
zap.String("store", store.GetName()),
zap.Error(err))
result.Errors = append(result.Errors, err)
}
}
return result.ErrorOrNil()
}
func (t *composite) SetObservation(ctx context.Context, o *gossipv1.SignedObservation) error {
vaaID := strings.Split(o.MessageId, "/")
chainIDStr, emitter, sequenceStr := vaaID[0], vaaID[1], vaaID[2]
chainID, err := strconv.ParseUint(chainIDStr, 10, 16)
if err != nil {
t.logger.Error("Error parsing chainId", zap.Error(err))
return err
}
txHash, err := domain.EncodeTrxHashByChainID(vaa.ChainID(chainID), o.GetTxHash())
if err != nil {
t.logger.Warn("Error encoding tx hash",
zap.Uint64("chainId", chainID),
zap.ByteString("txHash", o.GetTxHash()),
zap.Error(err))
}
vaaTxHash := TxHash{
ChainID: vaa.ChainID(chainID),
Emitter: emitter,
Sequence: sequenceStr,
TxHash: txHash,
}
return t.Set(ctx, o.MessageId, vaaTxHash)
}
func (t *composite) Get(ctx context.Context, vaaID string) (*TxHash, error) {
log := t.logger.With(zap.String("vaaId", vaaID))
for _, store := range t.hashStores {
txHash, err := store.Get(ctx, vaaID)
if err == nil {
t.metrics.IncFoundTxHash(store.GetName())
return txHash, nil
}
if err != ErrTxHashNotFound {
log.Error("Error getting tx hash", zap.String("store", store.GetName()), zap.Error(err))
} else {
log.Info("Cannot get txHash", zap.String("store", store.GetName()))
}
t.metrics.IncNotFoundTxHash(store.GetName())
}
return nil, ErrTxHashNotFound
}
func (t *composite) GetName() string {
return "multi"
}

48
fly/txhash/dedup.go Normal file
View File

@ -0,0 +1,48 @@
package txhash
import (
"context"
"fmt"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/fly/deduplicator"
"go.uber.org/zap"
)
type dedupTxHashStore struct {
txHashStore TxHashStore
deduplicator *deduplicator.Deduplicator
logger *zap.Logger
}
func NewDedupTxHashStore(txHashStore TxHashStore, deduplicator *deduplicator.Deduplicator, logger *zap.Logger) *dedupTxHashStore {
return &dedupTxHashStore{
txHashStore: txHashStore,
deduplicator: deduplicator,
logger: logger,
}
}
func (d *dedupTxHashStore) Set(ctx context.Context, vaaID string, txHash TxHash) error {
key := fmt.Sprintf("observation:%s", vaaID)
return d.deduplicator.Apply(ctx, key, func() error {
return d.txHashStore.Set(ctx, vaaID, txHash)
})
}
func (d *dedupTxHashStore) SetObservation(ctx context.Context, o *gossipv1.SignedObservation) error {
txHash, err := CreateTxHash(d.logger, o)
if err != nil {
d.logger.Error("Error creating txHash", zap.Error(err))
return err
}
return d.Set(ctx, o.MessageId, *txHash)
}
func (d *dedupTxHashStore) Get(ctx context.Context, vaaID string) (*TxHash, error) {
return d.txHashStore.Get(ctx, vaaID)
}
func (r *dedupTxHashStore) GetName() string {
return "dedup"
}

84
fly/txhash/mongo.go Normal file
View File

@ -0,0 +1,84 @@
package txhash
import (
"context"
"fmt"
"time"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
type mongoTxHash struct {
vaaIdTxHashCollection *mongo.Collection
logger *zap.Logger
}
type vaaIdTxHashUpdate struct {
ChainID sdk.ChainID `bson:"emitterChain"`
Emitter string `bson:"emitterAddr"`
Sequence string `bson:"sequence"`
TxHash string `bson:"txHash"`
UpdatedAt *time.Time `bson:"updatedAt"`
}
func NewMongoTxHash(database *mongo.Database, logger *zap.Logger) *mongoTxHash {
return &mongoTxHash{
vaaIdTxHashCollection: database.Collection(repository.VaaIdTxHash),
logger: logger,
}
}
func (m *mongoTxHash) Set(ctx context.Context, vaaID string, txHash TxHash) error {
id := fmt.Sprintf("%d/%s/%s", txHash.ChainID, txHash.Emitter, txHash.Sequence)
now := time.Now()
udpate := vaaIdTxHashUpdate{
ChainID: txHash.ChainID,
Emitter: txHash.Emitter,
Sequence: txHash.Sequence,
TxHash: txHash.TxHash,
UpdatedAt: &now,
}
updateVaaTxHash := bson.M{
"$set": udpate,
"$setOnInsert": repository.IndexedAt(now),
"$inc": bson.D{{Key: "revision", Value: 1}},
}
_, err := m.vaaIdTxHashCollection.UpdateByID(ctx, id, updateVaaTxHash, options.Update().SetUpsert(true))
if err != nil {
m.logger.Error("Error inserting vaaIdTxHash in mongodb", zap.String("id", id), zap.Error(err))
return err
}
return nil
}
func (r *mongoTxHash) SetObservation(ctx context.Context, o *gossipv1.SignedObservation) error {
txHash, err := CreateTxHash(r.logger, o)
if err != nil {
r.logger.Error("Error creating txHash", zap.Error(err))
return err
}
return r.Set(ctx, o.MessageId, *txHash)
}
func (m *mongoTxHash) Get(ctx context.Context, vaaID string) (*TxHash, error) {
var mongoTxHash TxHash
if err := m.vaaIdTxHashCollection.FindOne(ctx, bson.M{"_id": vaaID}).Decode(&mongoTxHash); err != nil {
if err == mongo.ErrNoDocuments {
return nil, ErrTxHashNotFound
}
m.logger.Error("Finding vaaIdTxHash", zap.String("id", vaaID), zap.Error(err))
return nil, err
}
return &mongoTxHash, nil
}
func (r *mongoTxHash) GetName() string {
return "mongo"
}

82
fly/txhash/redis.go Normal file
View File

@ -0,0 +1,82 @@
package txhash
import (
"context"
"encoding/json"
"fmt"
"time"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/go-redis/redis/v8"
"go.uber.org/zap"
)
const txHashByVaaIDKey = "tx-hash-by-vaa-id"
type redisTxHash struct {
client *redis.Client
prefix string
expiration time.Duration
logger *zap.Logger
}
func NewRedisTxHash(client *redis.Client,
prefix string,
expiration time.Duration,
logger *zap.Logger) *redisTxHash {
return &redisTxHash{
client: client,
prefix: prefix,
expiration: expiration,
logger: logger,
}
}
func (t *redisTxHash) Set(ctx context.Context, vaaID string, txHash TxHash) error {
body, err := json.Marshal(txHash)
if err != nil {
return err
}
key := t.createKey(vaaID)
if res := t.client.Set(ctx, key, string(body), t.expiration); res.Err() != nil {
t.logger.Warn("Error setting tx hash in redis", zap.Error(res.Err()), zap.String("vaaId", vaaID))
return res.Err()
}
return nil
}
func (r *redisTxHash) SetObservation(ctx context.Context, o *gossipv1.SignedObservation) error {
txHash, err := CreateTxHash(r.logger, o)
if err != nil {
r.logger.Error("Error creating txHash", zap.Error(err))
return err
}
return r.Set(ctx, o.MessageId, *txHash)
}
func (r *redisTxHash) Get(ctx context.Context, vaaID string) (*TxHash, error) {
key := r.createKey(vaaID)
res := r.client.Get(ctx, key)
if res.Err() == nil {
var txHash TxHash
err := json.Unmarshal([]byte(res.Val()), &txHash)
if err != nil {
return nil, err
}
return &txHash, nil
}
if res.Err() == redis.Nil {
return nil, ErrTxHashNotFound
}
return nil, res.Err()
}
func (r *redisTxHash) createKey(vaaID string) string {
return fmt.Sprintf("%s:%s:%s", r.prefix, txHashByVaaIDKey, vaaID)
}
func (r *redisTxHash) GetName() string {
return "redis"
}

55
fly/txhash/types.go Normal file
View File

@ -0,0 +1,55 @@
package txhash
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
var ErrTxHashNotFound = errors.New("tx hash not found")
type TxHash struct {
ChainID sdk.ChainID
Emitter string
Sequence string
TxHash string
}
type TxHashStore interface {
Get(ctx context.Context, vaaID string) (*TxHash, error)
Set(ctx context.Context, vaaID string, txHash TxHash) error
SetObservation(ctx context.Context, o *gossipv1.SignedObservation) error
GetName() string
}
func CreateTxHash(logger *zap.Logger, o *gossipv1.SignedObservation) (*TxHash, error) {
vaaID := strings.Split(o.MessageId, "/")
chainIDStr, emitter, sequenceStr := vaaID[0], vaaID[1], vaaID[2]
chainID, err := strconv.ParseUint(chainIDStr, 10, 16)
if err != nil {
return nil, fmt.Errorf("error parsing chainId: %w", err)
}
txHash, err := domain.EncodeTrxHashByChainID(sdk.ChainID(chainID), o.GetTxHash())
if err != nil {
logger.Warn("Error encoding tx hash",
zap.Uint64("chainId", chainID),
zap.ByteString("txHash", o.GetTxHash()),
zap.Error(err))
}
return &TxHash{
ChainID: sdk.ChainID(chainID),
Emitter: emitter,
Sequence: sequenceStr,
TxHash: txHash,
}, nil
}