Use Redis channel in spy and fly (#753)

This commit is contained in:
ftocal 2023-10-18 11:18:32 -03:00 committed by GitHub
parent 14161d1569
commit 91bf42971b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 860 additions and 161 deletions

63
common/domain/events.go Normal file
View File

@ -0,0 +1,63 @@
package domain
import (
"encoding/json"
"time"
)
const (
SignedVaaType = "signed-vaa"
PublishedLogMessageType = "published-log-message"
)
type NotificationEvent struct {
TrackID string `json:"trackId"`
Source string `json:"source"`
Type string `json:"type"`
Payload json.RawMessage `json:"payload"`
}
func NewNotificationEvent[T EventPayload](trackID, source, _type string, payload T) (*NotificationEvent, error) {
p, err := json.Marshal(payload)
if err != nil {
return nil, err
}
return &NotificationEvent{
TrackID: trackID,
Source: source,
Type: _type,
Payload: json.RawMessage(p),
}, nil
}
type EventPayload interface {
SignedVaa | PublishedLogMessage
}
func GetEventPayload[T EventPayload](e *NotificationEvent) (T, error) {
var payload T
err := json.Unmarshal(e.Payload, &payload)
return payload, err
}
type SignedVaa struct {
ID string `json:"id"`
EmitterChain uint16 `json:"emitterChain"`
EmitterAddr string `json:"emitterAddr"`
Sequence uint64 `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Timestamp time.Time `json:"timestamp"`
Vaa []byte `json:"vaa"`
TxHash string `json:"txHash"`
Version int `json:"version"`
}
type PublishedLogMessage struct {
ID string `json:"id"`
EmitterChain uint16 `json:"emitterChain"`
EmitterAddr string `json:"emitterAddr"`
Sequence uint64 `json:"sequence"`
Timestamp time.Time `json:"timestamp"`
Vaa []byte `json:"vaa"`
TxHash string `json:"txHash"`
}

View File

@ -0,0 +1,57 @@
package domain
import (
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
)
// TestGetEventPayload contains a test harness for the `GetEventPayload` function.
func Test_GetEventPayload(t *testing.T) {
body := `{
"trackId": "63e16082da939a263512a307",
"source": "fly",
"type": "signed-vaa",
"payload": {
"id": "2/000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7/162727",
"emitterChain": 2,
"emitterAddr": "000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7",
"sequence": 162727,
"guardianSetIndex": 0,
"timestamp": "2023-08-04T11:43:48.000Z",
"vaa": "010000000001005defe63f46c192b506758684fada6b97f5a8ee287a82efefa35c59dcf369a83b1abfe5431ad51a31051bf42851b5f699421e525745db03e8bc43a6b36dde6fc00064cd0ea4446900000002000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d70000000000027ba7010300000000000000000000000000000000000000000000000000000000004c4b40000000000000000000000000b4fbf271143f4fbf7b91a5ded31805e42b2208d600026d9ae6b2d333c1d65301a59da3eed388ca5dc60cb12496584b75cbe6b15fdbed002000000000000000000000000072b916142650cb48bbbed0acaeb5b287d1c55d917b2262617369635f726563697069656e74223a7b22726563697069656e74223a22633256704d58426f4e445631626a646a4e6a426c6448566d6432317964575272617a4a3061336877647a4e6f595859794e6d4e6d5a6a5933227d7d",
"txHash" : "406065c15b62426c51f987f5923fb376f6b60cb1c15724cc5460a08d18ccc337",
"version" : 1
}
}`
event := NotificationEvent{}
err := json.Unmarshal([]byte(body), &event)
assert.NoError(t, err)
assert.Equal(t, "63e16082da939a263512a307", event.TrackID)
assert.Equal(t, "fly", event.Source)
assert.Equal(t, SignedVaaType, event.Type)
signedVaa, err := GetEventPayload[SignedVaa](&event)
assert.NoError(t, err)
assert.Equal(t, "2/000000000000000000000000f890982f9310df57d00f659cf4fd87e65aded8d7/162727", signedVaa.ID)
}
func Test_GetEventPayload_Error(t *testing.T) {
body := `{
"trackId": "63e16082da939a263512a307",
"source": "fly",
"type": "signed-vaa"
}`
event := NotificationEvent{}
err := json.Unmarshal([]byte(body), &event)
assert.NoError(t, err)
assert.Equal(t, "63e16082da939a263512a307", event.TrackID)
assert.Equal(t, "fly", event.Source)
assert.Equal(t, SignedVaaType, event.Type)
_, err = GetEventPayload[SignedVaa](&event)
assert.Error(t, err)
}

15
common/health/redis.go Normal file
View File

@ -0,0 +1,15 @@
package health
import (
"context"
"github.com/go-redis/redis/v8"
)
// Redis does a ping.
func Redis(client *redis.Client) Check {
return func(ctx context.Context) error {
result := client.Ping(ctx)
return result.Err()
}
}

View File

@ -21,4 +21,5 @@ OBSERVATIONS_CHANNEL_SIZE=15000
VAAS_CHANNEL_SIZE=5000
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
REDIS_VAA_CHANNEL=gossip-signed-vaas

View File

@ -21,4 +21,5 @@ OBSERVATIONS_CHANNEL_SIZE=5000
VAAS_CHANNEL_SIZE=5000
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
REDIS_VAA_CHANNEL=gossip-signed-vaas

View File

@ -21,4 +21,5 @@ OBSERVATIONS_CHANNEL_SIZE=5000
VAAS_CHANNEL_SIZE=5000
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
REDIS_VAA_CHANNEL=gossip-signed-vaas

View File

@ -21,4 +21,5 @@ OBSERVATIONS_CHANNEL_SIZE=5000
VAAS_CHANNEL_SIZE=5000
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
REDIS_VAA_CHANNEL=gossip-signed-vaas

View File

@ -83,6 +83,8 @@ spec:
configMapKeyRef:
name: config
key: redis-prefix
- name: REDIS_VAA_CHANNEL
value: "{{ .REDIS_VAA_CHANNEL }}"
- name: MAX_HEALTH_TIME_SECONDS
value: "{{ .MAX_HEALTH_TIME_SECONDS }}"
- name: ALERT_API_KEY

View File

@ -10,4 +10,5 @@ RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
GRPC_ADDRESS=0.0.0.0:7777
HOSTNAME=spy.wormscan.io
PPROF_ENABLED=false
PPROF_ENABLED=false
REDIS_VAA_CHANNEL=gossip-signed-vaas

View File

@ -10,4 +10,5 @@ RESOURCES_REQUESTS_MEMORY=16Mi
RESOURCES_REQUESTS_CPU=10m
GRPC_ADDRESS=0.0.0.0:7777
HOSTNAME=spy.prod.testnet.wormscan.io
PPROF_ENABLED=false
PPROF_ENABLED=false
REDIS_VAA_CHANNEL=gossip-signed-vaas

View File

@ -10,4 +10,5 @@ RESOURCES_REQUESTS_MEMORY=15Mi
RESOURCES_REQUESTS_CPU=10m
GRPC_ADDRESS=0.0.0.0:7777
HOSTNAME=spy.staging.wormscan.io
PPROF_ENABLED=true
PPROF_ENABLED=true
REDIS_VAA_CHANNEL=gossip-signed-vaas

View File

@ -10,4 +10,5 @@ RESOURCES_REQUESTS_MEMORY=16Mi
RESOURCES_REQUESTS_CPU=10m
GRPC_ADDRESS=0.0.0.0:7777
HOSTNAME=spy.testnet.wormscan.io
PPROF_ENABLED=false
PPROF_ENABLED=false
REDIS_VAA_CHANNEL=gossip-signed-vaas

View File

@ -55,16 +55,18 @@ spec:
path: /api/health
port: 8000
env:
- name: MONGODB_URI
valueFrom:
secretKeyRef:
name: mongodb
key: mongo-uri
- name: MONGODB_DATABASE
- name: REDIS_URI
valueFrom:
configMapKeyRef:
name: config
key: mongo-database
key: redis-uri
- name: REDIS_PREFIX
valueFrom:
configMapKeyRef:
name: config
key: redis-prefix
- name: REDIS_VAA_CHANNEL
value: "{{ .REDIS_VAA_CHANNEL }}"
- name: GRPC_ADDRESS
value: {{ .GRPC_ADDRESS }}
- name: PORT

View File

@ -29,18 +29,25 @@ func execute() error {
}
func addVaaBackfillerCommand(root *cobra.Command) {
var mongoUri, mongoDb, filename string
var mongoUri, mongoDb, filename, awsRegion, awsAccessKeyId, awsSecretKey, AwsEndpoint, AwsSnsURL string
var workerCount int
var notifyEnabled bool
vaaBackfillerCommand := &cobra.Command{
Use: "vaa",
Short: "Run vaa backfiller",
Run: func(_ *cobra.Command, _ []string) {
workerConfiguration := WorkerConfiguration{
MongoURI: mongoUri,
MongoDatabase: mongoDb,
Filename: filename,
WorkerCount: workerCount,
MongoURI: mongoUri,
MongoDatabase: mongoDb,
Filename: filename,
WorkerCount: workerCount,
NotifyEnabled: notifyEnabled,
AwsRegion: awsRegion,
AwsAccessKeyId: awsAccessKeyId,
AwsSecretKey: awsSecretKey,
AwsEndpoint: AwsEndpoint,
AwsSnsURL: AwsSnsURL,
}
RunBackfiller(workerConfiguration, workerVaa)
},
@ -49,28 +56,42 @@ func addVaaBackfillerCommand(root *cobra.Command) {
vaaBackfillerCommand.Flags().StringVar(&mongoDb, "mongo-database", "", "Mongo database")
vaaBackfillerCommand.Flags().StringVar(&filename, "filename", "", "vaa backfiller filename")
vaaBackfillerCommand.Flags().IntVar(&workerCount, "worker-count", 100, "backfiller worker count")
vaaBackfillerCommand.Flags().BoolVar(&notifyEnabled, "notify-enabled", true, "backfiller notify pipeline")
vaaBackfillerCommand.Flags().StringVar(&awsRegion, "aws-region", "", "AWS region")
vaaBackfillerCommand.Flags().StringVar(&awsAccessKeyId, "aws-access-key-id", "", "AWS access key id")
vaaBackfillerCommand.Flags().StringVar(&awsSecretKey, "aws-secret-access-key", "", "AWS secret access key")
vaaBackfillerCommand.Flags().StringVar(&AwsEndpoint, "aws-endpoint", "", "AWS endpoint")
vaaBackfillerCommand.Flags().StringVar(&AwsSnsURL, "aws-sns-url", "", "AWS SNS URL")
vaaBackfillerCommand.MarkFlagRequired("mongo-uri")
vaaBackfillerCommand.MarkFlagRequired("mongo-database")
vaaBackfillerCommand.MarkFlagRequired("filename")
vaaBackfillerCommand.MarkFlagRequired("aws-region")
vaaBackfillerCommand.MarkFlagRequired("aws-sns-url")
root.AddCommand(vaaBackfillerCommand)
}
func addTxHashCommand(root *cobra.Command) {
var mongoUri, mongoDb, filename string
var mongoUri, mongoDb, filename, awsRegion, awsAccessKeyId, awsSecretKey, AwsEndpoint, AwsSnsURL string
var workerCount int
var notifyEnabled bool
txHashBackfillerCommand := &cobra.Command{
Use: "txhash",
Short: "Run txhash backfiller",
Run: func(_ *cobra.Command, _ []string) {
workerConfiguration := WorkerConfiguration{
MongoURI: mongoUri,
MongoDatabase: mongoDb,
Filename: filename,
WorkerCount: workerCount,
MongoURI: mongoUri,
MongoDatabase: mongoDb,
Filename: filename,
WorkerCount: workerCount,
AwsRegion: awsRegion,
AwsAccessKeyId: awsAccessKeyId,
AwsSecretKey: awsSecretKey,
AwsEndpoint: AwsEndpoint,
AwsSnsURL: AwsSnsURL,
}
RunBackfiller(workerConfiguration, workerTxHash)
},
@ -79,10 +100,18 @@ func addTxHashCommand(root *cobra.Command) {
txHashBackfillerCommand.Flags().StringVar(&mongoDb, "mongo-database", "", "Mongo database")
txHashBackfillerCommand.Flags().StringVar(&filename, "filename", "", "vaa backfiller filename")
txHashBackfillerCommand.Flags().IntVar(&workerCount, "worker-count", 100, "backfiller worker count")
txHashBackfillerCommand.Flags().BoolVar(&notifyEnabled, "notify-enabled", false, "backfiller notify pipeline")
txHashBackfillerCommand.Flags().StringVar(&awsRegion, "aws-region", "", "AWS region")
txHashBackfillerCommand.Flags().StringVar(&awsAccessKeyId, "aws-access-key-id", "", "AWS access key id")
txHashBackfillerCommand.Flags().StringVar(&awsSecretKey, "aws-secret-access-key", "", "AWS secret access key")
txHashBackfillerCommand.Flags().StringVar(&AwsEndpoint, "aws-endpoint", "", "AWS endpoint")
txHashBackfillerCommand.Flags().StringVar(&AwsSnsURL, "aws-sns-url", "", "AWS SNS URL")
txHashBackfillerCommand.MarkFlagRequired("mongo-uri")
txHashBackfillerCommand.MarkFlagRequired("mongo-database")
txHashBackfillerCommand.MarkFlagRequired("filename")
txHashBackfillerCommand.MarkFlagRequired("aws-region")
txHashBackfillerCommand.MarkFlagRequired("aws-sns-url")
root.AddCommand(txHashBackfillerCommand)
}

View File

@ -0,0 +1,84 @@
package main
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/client/sns"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
"go.uber.org/zap"
)
// newAwsConfig creates a new AWS config from the given configuration.
func newAwsConfig(ctx context.Context, cfg WorkerConfiguration) (aws.Config, error) {
region := cfg.AwsRegion
if region == "" {
return aws.Config{}, fmt.Errorf("AWS_REGION is required")
}
awsSecretId := cfg.AwsAccessKeyId
awsSecretKey := cfg.AwsSecretKey
if awsSecretId != "" && awsSecretKey != "" {
credentials := credentials.NewStaticCredentialsProvider(awsSecretId, awsSecretKey, "")
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
if cfg.AwsEndpoint != "" {
return aws.Endpoint{
PartitionID: "aws",
URL: cfg.AwsEndpoint,
SigningRegion: region,
}, nil
}
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
})
awsCfg, err := awsconfig.LoadDefaultConfig(ctx,
awsconfig.WithRegion(region),
awsconfig.WithEndpointResolver(customResolver),
awsconfig.WithCredentialsProvider(credentials),
)
return awsCfg, err
}
return awsconfig.LoadDefaultConfig(ctx, awsconfig.WithRegion(region))
}
// newSNSProducer creates a new SNS producer from the given configuration.
func newSNSProducer(ctx context.Context, cfg WorkerConfiguration, alertClient alert.AlertClient, metricsClient metrics.Metrics, logger *zap.Logger) (*producer.SNSProducer, error) {
if cfg.AwsSnsURL == "" {
return nil, fmt.Errorf("AWS_SNS_URL is required")
}
awsConfig, err := newAwsConfig(ctx, cfg)
if err != nil {
return nil, err
}
snsProducer, err := sns.NewProducer(awsConfig, cfg.AwsSnsURL)
if err != nil {
return nil, err
}
return producer.NewSNSProducer(snsProducer, alertClient, metricsClient, logger), nil
}
// newVAATopicProducerFunc creates a new VAA topic producer function from the given configuration.
func newVAATopicProducerFunc(ctx context.Context, cfg WorkerConfiguration, alertClient alert.AlertClient, metricsClient metrics.Metrics, logger *zap.Logger) (producer.PushFunc, error) {
if !cfg.NotifyEnabled {
return func(context.Context, *producer.NotificationEvent) error {
return nil
}, nil
}
snsProducer, err := newSNSProducer(ctx, cfg, alertClient, metricsClient, logger)
if err != nil {
logger.Fatal("could not create vaa topic producer ", zap.Error(err))
return nil, err
}
return snsProducer.Push, nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
@ -33,7 +34,12 @@ func RunTxHashEncoding(cfg TxHashEncondingConfig) {
}
defer db.DisconnectWithTimeout(10 * time.Second)
repository := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), db.Database, logger)
repository := storage.NewRepository(
alert.NewDummyClient(),
metrics.NewDummyMetrics(),
db.Database,
producer.NewVAAInMemory(logger).Push,
logger)
workerTxHashEncoding(ctx, logger, repository, vaa.ChainID(cfg.ChainID), cfg.PageSize)
}

View File

@ -7,9 +7,7 @@ import (
"time"
"github.com/schollz/progressbar/v3"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.uber.org/zap"
)
@ -24,13 +22,20 @@ type Workpool struct {
Log *zap.Logger
Bar *progressbar.ProgressBar
WorkerFunc GenericWorker
Repository *storage.Repository
}
type WorkerConfiguration struct {
MongoURI string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
Filename string `env:"FILENAME,required"`
WorkerCount int `env:"WORKER_COUNT"`
MongoURI string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
Filename string `env:"FILENAME,required"`
WorkerCount int `env:"WORKER_COUNT"`
NotifyEnabled bool `env:"NOTIFY_ENABLED"`
AwsRegion string `env:"AWS_REGION"`
AwsAccessKeyId string `env:"AWS_ACCESS_KEY_ID"`
AwsSecretKey string `env:"AWS_SECRET_ACCESS_KEY"`
AwsEndpoint string `env:"AWS_ENDPOINT"`
AwsSnsURL string `env:"AWS_SNS_URL"`
}
func NewWorkpool(ctx context.Context, cfg WorkerConfiguration, workerFunc GenericWorker) *Workpool {
@ -60,7 +65,6 @@ func NewWorkpool(ctx context.Context, cfg WorkerConfiguration, workerFunc Generi
}
func (w *Workpool) Process(ctx context.Context) error {
repo := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), w.DB.Database, w.Log)
var err error
defer w.DB.DisconnectWithTimeout(10 * time.Second)
@ -72,7 +76,7 @@ func (w *Workpool) Process(ctx context.Context) error {
w.WG.Done()
return nil
}
err = w.WorkerFunc(ctx, repo, line)
err = w.WorkerFunc(ctx, w.Repository, line)
if err != nil {
fmt.Println(err)
break

0
fly/data.json Normal file
View File

View File

@ -34,6 +34,7 @@ require (
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.1.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.1.1 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
@ -133,7 +134,7 @@ require (
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v1.12.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect

View File

@ -105,6 +105,8 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34 h1:gGLG7yKaXG02/jBlg210R7VgQIo
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.34/go.mod h1:Etz2dj6UHYuw+Xw830KfzCfWGMzqvUTCjUj5b76GVDc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.2 h1:4AH9fFjUlVktQMznF+YN33aWNXaR4VgDXyP28qokJC0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.2/go.mod h1:45MfaXZ0cNbeuT0KQ1XJylq8A6+OpVV2E5kvY/Kq+u8=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 h1:MU/v2qtfGjKexJ09BMqE8pXo9xYMhT13FXjKgFc0cFw=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2/go.mod h1:VN2n9SOMS1lNbh5YD7o+ho0/rgfifSrK//YYNiVVF5E=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.2 h1:CSNIo1jiw7KrkdgZjCOnotu6yuB3IybhKLuSQrTLNfo=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.2/go.mod h1:1ttxGjUHZliCQMpPss1sU5+Ph/5NvdMFRzr96bv8gm0=
github.com/aws/aws-sdk-go-v2/service/sso v1.1.1 h1:37QubsarExl5ZuCBlnRP+7l1tNwZPBSTqpTBrPH98RU=

View File

@ -38,6 +38,9 @@ func (d *DummyMetrics) IncObservationInserted(chain sdk.ChainID) {}
// IncObservationWithoutTxHash increases the number of observation without tx hash.
func (d *DummyMetrics) IncObservationWithoutTxHash(chain sdk.ChainID) {}
// IncVaaSendNotification increases the number of vaa send notifcations to pipeline.
func (d *DummyMetrics) IncVaaSendNotification(chain sdk.ChainID) {}
// IncObservationTotal increases the number of observation received from Gossip network.
func (d *DummyMetrics) IncObservationTotal() {}

View File

@ -10,6 +10,7 @@ type Metrics interface {
IncVaaUnfiltered(chain sdk.ChainID)
IncVaaConsumedFromQueue(chain sdk.ChainID)
IncVaaInserted(chain sdk.ChainID)
IncVaaSendNotification(chain sdk.ChainID)
IncVaaTotal()
// observation metrics

View File

@ -130,6 +130,11 @@ func (m *PrometheusMetrics) IncVaaInserted(chain sdk.ChainID) {
m.vaaReceivedCount.WithLabelValues(chain.String(), "inserted").Inc()
}
// IncVaaSendNotification increases the number of vaa send notifcations to pipeline.
func (m *PrometheusMetrics) IncVaaSendNotification(chain sdk.ChainID) {
m.vaaReceivedCount.WithLabelValues(chain.String(), "send-notification").Inc()
}
// IncVaaTotal increases the number of vaa received from Gossip network.
func (m *PrometheusMetrics) IncVaaTotal() {
m.vaaTotal.Inc()

View File

@ -0,0 +1,13 @@
package track
import (
"fmt"
"github.com/google/uuid"
)
// GetTrackID returns a unique track id for the pipeline.
func GetTrackID(vaaID string) string {
uuid := uuid.New()
return fmt.Sprintf("gossip-signed-vaa-%s-%s", vaaID, uuid.String())
}

View File

@ -29,6 +29,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/fly/migration"
"github.com/wormhole-foundation/wormhole-explorer/fly/notifier"
"github.com/wormhole-foundation/wormhole-explorer/fly/processor"
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
"github.com/wormhole-foundation/wormhole-explorer/fly/queue"
"github.com/wormhole-foundation/wormhole-explorer/fly/server"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
@ -212,6 +213,35 @@ func newMetrics(enviroment string) metrics.Metrics {
return metrics.NewPrometheusMetrics(enviroment)
}
// Creates a callback to publish VAA messages to a redis pubsub
func newVAARedisProducerFunc(ctx context.Context, isLocal bool, logger *zap.Logger) (producer.PushFunc, error) {
if isLocal {
return func(context.Context, *producer.NotificationEvent) error {
return nil
}, nil
}
redisUri, err := getenv("REDIS_URI")
if err != nil {
logger.Fatal("could not create vaa notifier ", zap.Error(err))
}
redisPrefix, err := getenv("REDIS_PREFIX")
if err != nil {
logger.Fatal("could not create vaa notifier ", zap.Error(err))
}
redisChannel, err := getenv("REDIS_VAA_CHANNEL")
if err != nil {
logger.Fatal("could not create vaa notifier ", zap.Error(err))
}
channel := fmt.Sprintf("%s:%s", redisPrefix, redisChannel)
logger.Info("using redis producer", zap.String("channel", channel))
client := redis.NewClient(&redis.Options{Addr: redisUri})
return producer.NewRedisProducer(client, channel).Push, nil
}
func main() {
//TODO: use a configuration structure to obtain the configuration
_ = godotenv.Load()
@ -281,7 +311,18 @@ func main() {
logger.Fatal("error running migration", zap.Error(err))
}
repository := storage.NewRepository(alertClient, metrics, db.Database, logger)
// Creates a callback to publish VAA messages to a redis pubsub
vaaRedisProducerFunc, err := newVAARedisProducerFunc(rootCtx, *isLocal, logger)
if err != nil {
logger.Fatal("could not create vaa redis producer ", zap.Error(err))
}
// Creates a composite callback to publish VAA messages to a redis pubsub
producerFunc := producer.NewComposite(vaaRedisProducerFunc)
// Creates a callback to publish VAA messages to a redis pubsub
repository := storage.NewRepository(alertClient, metrics, db.Database, producerFunc, logger)
// Outbound gossip message queue
sendC := make(chan []byte)

40
fly/producer/producer.go Normal file
View File

@ -0,0 +1,40 @@
package producer
import (
"context"
"time"
)
// PushFunc is a function to push VAAEvent.
type PushFunc func(context.Context, *NotificationEvent) error
type NotificationEvent struct {
TrackID string `json:"trackId"`
Source string `json:"source"`
Type string `json:"type"`
Payload SignedVaa `json:"payload"`
}
type SignedVaa struct {
ID string `json:"id"`
EmitterChain uint16 `json:"emitterChain"`
EmitterAddr string `json:"emitterAddr"`
Sequence uint64 `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Timestamp time.Time `json:"timestamp"`
Vaa []byte `json:"vaa"`
TxHash string `json:"txHash"`
Version int `json:"version"`
}
// NewComposite returns a PushFunc that calls all the given producers.
func NewComposite(producers ...PushFunc) PushFunc {
return func(ctx context.Context, event *NotificationEvent) error {
for _, producer := range producers {
if err := producer(ctx, event); err != nil {
return err
}
}
return nil
}
}

View File

@ -0,0 +1,23 @@
package producer
import (
"context"
"go.uber.org/zap"
)
// VAAInMemory represents VAA queue in memory.
type VAAInMemory struct {
logger *zap.Logger
}
// NewVAAInMemory creates a VAA queue in memory instances.
func NewVAAInMemory(logger *zap.Logger) *VAAInMemory {
m := &VAAInMemory{logger: logger}
return m
}
// Push pushes a VAAEvent to memory.
func (m *VAAInMemory) Push(ctx context.Context, event *NotificationEvent) error {
return nil
}

32
fly/producer/vaa_redis.go Normal file
View File

@ -0,0 +1,32 @@
package producer
import (
"context"
"encoding/json"
"github.com/go-redis/redis/v8"
)
// RedisProducer represents a redis producer.
type RedisProducer struct {
client *redis.Client
channel string
}
// NewRedisProducer returns a PushFunc that pushes NotificationEvent to redis.
func NewRedisProducer(c *redis.Client, channel string) *RedisProducer {
return &RedisProducer{
client: c,
channel: channel,
}
}
// Push pushes a NotificationEvent to redis.
func (p *RedisProducer) Push(ctx context.Context, event *NotificationEvent) error {
body, err := json.Marshal(event)
if err != nil {
return err
}
return p.client.Publish(ctx, p.channel, string(body)).Err()
}

47
fly/producer/vaa_sns.go Normal file
View File

@ -0,0 +1,47 @@
package producer
import (
"context"
"encoding/json"
"fmt"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/client/sns"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// SNSProducer is a producer for SNS.
type SNSProducer struct {
producer *sns.Producer
alertClient alert.AlertClient
metrics metrics.Metrics
logger *zap.Logger
}
// NewSNSProducer creates a new SNSProducer.
func NewSNSProducer(producer *sns.Producer, alertClient alert.AlertClient, metrics metrics.Metrics, logger *zap.Logger) *SNSProducer {
return &SNSProducer{
producer: producer,
alertClient: alertClient,
metrics: metrics,
logger: logger,
}
}
// Push pushes a VAAEvent to SNS.
func (p *SNSProducer) Push(ctx context.Context, event *NotificationEvent) error {
body, err := json.Marshal(event)
if err != nil {
return err
}
deduplicationID := fmt.Sprintf("gossip-event-%s", event.Payload.ID)
p.logger.Debug("Publishing signedVaa event", zap.String("groupID", event.Payload.ID))
err = p.producer.SendMessage(ctx, event.Payload.ID, deduplicationID, string(body))
if err == nil {
p.metrics.IncVaaSendNotification(vaa.ChainID(event.Payload.EmitterChain))
}
return err
}

View File

@ -14,6 +14,8 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
flyAlert "github.com/wormhole-foundation/wormhole-explorer/fly/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/track"
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
@ -27,6 +29,7 @@ type Repository struct {
alertClient alert.AlertClient
metrics metrics.Metrics
db *mongo.Database
afterUpdate producer.PushFunc
log *zap.Logger
collections struct {
vaas *mongo.Collection
@ -41,8 +44,8 @@ type Repository struct {
}
// TODO wrap repository with a service that filters using redis
func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics, db *mongo.Database, log *zap.Logger) *Repository {
return &Repository{alertService, metrics, db, log, struct {
func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics, db *mongo.Database, vaaTopicFunc producer.PushFunc, log *zap.Logger) *Repository {
return &Repository{alertService, metrics, db, vaaTopicFunc, log, struct {
vaas *mongo.Collection
heartbeats *mongo.Collection
observations *mongo.Collection
@ -115,6 +118,26 @@ func (s *Repository) UpsertVaa(ctx context.Context, v *vaa.VAA, serializedVaa []
if err == nil && s.isNewRecord(result) {
s.metrics.IncVaaInserted(v.EmitterChain)
s.updateVAACount(v.EmitterChain)
// send signedvaa event to topic.
event := &producer.NotificationEvent{
TrackID: track.GetTrackID(v.MessageID()),
Source: "fly",
Type: domain.SignedVaaType,
Payload: producer.SignedVaa{
ID: v.MessageID(),
EmitterChain: uint16(v.EmitterChain),
EmitterAddr: v.EmitterAddress.String(),
Sequence: v.Sequence,
GuardianSetIndex: v.GuardianSetIndex,
Timestamp: v.Timestamp,
Vaa: serializedVaa,
TxHash: vaaDoc.TxHash,
Version: int(v.Version),
},
}
err = s.afterUpdate(ctx, event)
}
return err
}

View File

@ -6,15 +6,15 @@ import (
"os"
"os/signal"
"syscall"
"time"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/go-redis/redis/v8"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/spy/config"
"github.com/wormhole-foundation/wormhole-explorer/spy/grpc"
"github.com/wormhole-foundation/wormhole-explorer/spy/http/infraestructure"
"github.com/wormhole-foundation/wormhole-explorer/spy/storage"
"github.com/wormhole-foundation/wormhole-explorer/spy/source"
"go.uber.org/zap"
)
@ -29,6 +29,17 @@ func handleExit() {
}
}
func newHealthChecks(
ctx context.Context,
client *redis.Client,
) ([]health.Check, error) {
healthChecks := []health.Check{
health.Redis(client),
}
return healthChecks, nil
}
func main() {
defer handleExit()
@ -69,18 +80,25 @@ func main() {
publisher := grpc.NewPublisher(svs, avs, logger)
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
client := redis.NewClient(&redis.Options{Addr: config.RedisURI})
watcher, err := source.NewRedisSubscriber(rootCtx, client, config.RedisPrefix, config.RedisChannel, publisher.Publish, logger)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
logger.Fatal("failed to create redis subscriber", zap.Error(err))
}
watcher := storage.NewWatcher(db.Database, config.MongoDatabase, publisher.Publish, logger)
err = watcher.Start(rootCtx)
if err != nil {
logger.Fatal("failed to watch MongoDB", zap.Error(err))
}
// get health check functions.
logger.Info("creating health check functions...")
healthChecks, err := newHealthChecks(rootCtx, client)
if err != nil {
logger.Fatal("failed to create health checks", zap.Error(err))
}
server := infraestructure.NewServer(logger, config.Port, db.Database, config.PprofEnabled)
server := infraestructure.NewServer(logger, config.Port, config.PprofEnabled, healthChecks...)
server.Start()
logger.Info("Started wormhole-explorer-spy")
@ -101,8 +119,13 @@ func main() {
logger.Info("Closing GRPC server ...")
grpcServer.Stop()
logger.Info("Closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Closing Redis connection...")
if err := watcher.Close(rootCtx); err != nil {
logger.Error("Error closing watcher", zap.Error(err))
}
if err := client.Close(); err != nil {
logger.Error("Error closing redis client", zap.Error(err))
}
logger.Info("Closing Http server ...")
server.Stop()

View File

@ -9,13 +9,14 @@ import (
// Configuration represents the application configuration with the default values.
type Configuration struct {
Env string `env:"ENV,default=development"`
LogLevel string `env:"LOG_LEVEL,default=INFO"`
Port string `env:"PORT,default=8000"`
GrpcAddress string `env:"GRPC_ADDRESS,default=0.0.0.0:6789"`
MongoURI string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
PprofEnabled bool `env:"PPROF_ENABLED,default=false"`
Env string `env:"ENV,default=development"`
LogLevel string `env:"LOG_LEVEL,default=INFO"`
Port string `env:"PORT,default=8000"`
GrpcAddress string `env:"GRPC_ADDRESS,default=0.0.0.0:6789"`
RedisURI string `env:"REDIS_URI,required"`
RedisPrefix string `env:"REDIS_PREFIX,required"`
RedisChannel string `env:"REDIS_VAA_CHANNEL,required"`
PprofEnabled bool `env:"PPROF_ENABLED,default=false"`
}
// New creates a configuration with the values from .env file and environment variables.

View File

@ -7,7 +7,7 @@ require (
github.com/gofiber/fiber/v2 v2.47.0
github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.4.0 // Configuration environment
github.com/pkg/errors v0.9.1
github.com/pkg/errors v0.9.1 // indirect
github.com/sethvargo/go-envconfig v0.6.0 // Configuration environment
github.com/stretchr/testify v1.8.1 // Testing
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8
@ -16,17 +16,31 @@ require (
google.golang.org/grpc v1.50.1
)
require github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-00010101000000-000000000000
require (
github.com/go-redis/redis/v8 v8.11.5
github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-00010101000000-000000000000
)
require (
github.com/algorand/go-algorand-sdk v1.23.0 // indirect
github.com/algorand/go-codec/codec v1.1.8 // indirect
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.17.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22 // indirect
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.2 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cosmos/btcutil v1.0.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/ethereum/go-ethereum v1.10.21 // indirect
github.com/gogo/protobuf v1.3.3 // indirect
github.com/golang/protobuf v1.5.2 // indirect
@ -35,6 +49,8 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.5.0 // indirect
github.com/holiman/uint256 v1.2.1 // indirect
github.com/influxdata/influxdb-client-go/v2 v2.12.2 // indirect
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect
github.com/ipfs/go-cid v0.2.0 // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect

View File

@ -38,9 +38,26 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/algorand/go-algorand-sdk v1.23.0 h1:wlEV6OgDVc/sLeF2y41bwNG/Lr8EoMnN87Ur8N2Gyyo=
github.com/algorand/go-algorand-sdk v1.23.0/go.mod h1:7i2peZBcE48kfoxNZnLA+mklKh812jBKvQ+t4bn0KBQ=
github.com/algorand/go-codec v1.1.8/go.mod h1:XhzVs6VVyWMLu6cApb9/192gBjGRVGm5cX5j203Heg4=
github.com/algorand/go-codec/codec v1.1.8 h1:lsFuhcOH2LiEhpBH3BVUUkdevVmwCRyvb7FCAAPeY6U=
github.com/algorand/go-codec/codec v1.1.8/go.mod h1:tQ3zAJ6ijTps6V+wp8KsGDnPC2uhHVC7ANyrtkIY0bA=
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/aws/aws-sdk-go-v2 v1.17.4 h1:wyC6p9Yfq6V2y98wfDsj6OnNQa4w2BLGCLIxzNhwOGY=
github.com/aws/aws-sdk-go-v2 v1.17.4/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.28 h1:r+XwaCLpIvCKjBIYy/HVZujQS9tsz5ohHG3ZIe0wKoE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.28/go.mod h1:3lwChorpIM/BhImY/hy+Z6jekmN92cXGPI1QJasVPYY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22 h1:7AwGYXDdqRQYsluvKFmWoqpcOQJ4bH634SkYf3FNj/A=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22/go.mod h1:EqK7gVrIGAHyZItrD1D8B0ilgwMD1GiWAmbU4u/JHNk=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 h1:MU/v2qtfGjKexJ09BMqE8pXo9xYMhT13FXjKgFc0cFw=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2/go.mod h1:VN2n9SOMS1lNbh5YD7o+ho0/rgfifSrK//YYNiVVF5E=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.2 h1:CSNIo1jiw7KrkdgZjCOnotu6yuB3IybhKLuSQrTLNfo=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.2/go.mod h1:1ttxGjUHZliCQMpPss1sU5+Ph/5NvdMFRzr96bv8gm0=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@ -65,12 +82,20 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cosmos/btcutil v1.0.5 h1:t+ZFcX77LpKtDBhjucvnOH8C2l2ioGsBNEQ3jef8xFk=
github.com/cosmos/btcutil v1.0.5/go.mod h1:IyB7iuqZMJlthe2tkIFL33xPyzbFYP0XVdS8P5lUPis=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc=
github.com/deepmap/oapi-codegen v1.8.2 h1:SegyeYGcdi0jLLrpbCMoJxnUUn8GBXHsvr4rbzjuhfU=
github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@ -78,7 +103,10 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ethereum/go-ethereum v1.10.21 h1:5lqsEx92ZaZzRyOqBEXux4/UR06m296RGzN3ol3teJY=
github.com/ethereum/go-ethereum v1.10.21/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
@ -90,6 +118,10 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gofiber/fiber/v2 v2.47.0 h1:EN5lHVCc+Pyqh5OEsk8fzRiifgwpbrP0rulQ4iNf3fs=
github.com/gofiber/fiber/v2 v2.47.0/go.mod h1:mbFMVN1lQuzziTkkakgtKKdjfsXSw9BKR5lmcNksUoU=
@ -126,6 +158,7 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@ -139,6 +172,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@ -156,6 +190,7 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
@ -167,8 +202,14 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/holiman/uint256 v1.2.1 h1:XRtyuda/zw2l+Bq/38n5XUoEF72aSOu/77Thd9pPp2o=
github.com/holiman/uint256 v1.2.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/influxdata/influxdb-client-go/v2 v2.12.2 h1:uYABKdrEKlYm+++qfKdbgaHKBPmoWR5wpbmj6MBB/2g=
github.com/influxdata/influxdb-client-go/v2 v2.12.2/go.mod h1:YteV91FiQxRdccyJ2cHvj2f/5sq4y4Njqu1fQzsQCOU=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/ipfs/go-cid v0.2.0 h1:01JTiihFq9en9Vz0lc0VDWvZe/uBonGpzo4THP0vcQ0=
github.com/ipfs/go-cid v0.2.0/go.mod h1:P+HXFDF4CVhaVayiEb4wkAy7zBHxBwsJyt0Y5U6MLro=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg=
github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
@ -198,14 +239,25 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/libp2p/go-libp2p v0.22.0 h1:2Tce0kHOp5zASFKJbNzRElvh0iZwdtG5uZheNW8chIw=
github.com/libp2p/go-libp2p v0.22.0/go.mod h1:UDolmweypBSjQb2f7xutPnwZ/fxioLbMBxSjRksxxU4=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
@ -243,6 +295,9 @@ github.com/multiformats/go-varint v0.0.6 h1:gk85QWKxh3TazbLxED/NlDVv8+q+ReFJk7Y2
github.com/multiformats/go-varint v0.0.6/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw=
@ -313,6 +368,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tinylib/msgp v1.1.6/go.mod h1:75BAfg2hauQhs3qedfdDZmWAPcFMAvJE5b9rGOMufyw=
@ -322,6 +378,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.47.0 h1:y7moDoxYzMooFpT5aHgNgVOQDrS3qlkfiP9mDtGGK9c=
github.com/valyala/fasthttp v1.47.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 h1:rrOyHd+H9a6Op1iUyZNCaI5v9D1syq8jDAYyX/2Q4L8=
@ -339,6 +397,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.11.2 h1:+1v2rDQUWNcGW7/7E0Jvdz51V38XXxJfhzbV17aNHCw=
go.mongodb.org/mongo-driver v1.11.2/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
@ -364,6 +423,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
@ -400,6 +461,7 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -431,9 +493,11 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
@ -467,6 +531,7 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -475,11 +540,14 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -494,6 +562,7 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -502,6 +571,7 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -512,6 +582,7 @@ golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
@ -529,6 +600,8 @@ golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
@ -573,6 +646,7 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -670,10 +744,12 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,7 +1,7 @@
package grpc
import (
"github.com/wormhole-foundation/wormhole-explorer/spy/storage"
"github.com/wormhole-foundation/wormhole-explorer/spy/source"
"go.uber.org/zap"
)
@ -18,7 +18,7 @@ func NewPublisher(svs *SignedVaaSubscribers, avs *AllVaaSubscribers, logger *zap
}
// Publish sends a signed VAA that was stored in the storage.
func (p *Publisher) Publish(e *storage.Event) {
func (p *Publisher) Publish(e *source.Event) {
if err := p.svs.HandleVAA(e.Vaas); err != nil {
p.logger.Error("Failed to publish signed VAA", zap.Error(err))

View File

@ -1,19 +1,23 @@
package infraestructure
import (
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"go.uber.org/zap"
)
// Controller definition.
// Controller definition.
type Controller struct {
srv *Service
checks []health.Check
logger *zap.Logger
}
// NewController creates a Controller instance.
func NewController(serv *Service, logger *zap.Logger) *Controller {
return &Controller{srv: serv, logger: logger}
func NewController(checks []health.Check, logger *zap.Logger) *Controller {
return &Controller{checks: checks, logger: logger}
}
// HealthCheck handler for the endpoint /health.
@ -23,18 +27,21 @@ func (c *Controller) HealthCheck(ctx *fiber.Ctx) error {
}{Status: "OK"})
}
// ReadyCheck handler for the endpoint /ready
// ReadyCheck handler for the endpoint /ready.
func (c *Controller) ReadyCheck(ctx *fiber.Ctx) error {
ready, err := c.srv.CheckMongoServerStatus(ctx.Context())
if ready {
return ctx.Status(fiber.StatusOK).JSON(struct {
Ready string `json:"ready"`
}{Ready: "OK"})
rctx := ctx.Context()
requestID := fmt.Sprintf("%v", rctx.Value("requestid"))
for _, check := range c.checks {
if err := check(rctx); err != nil {
c.logger.Error("Ready check failed", zap.Error(err), zap.String("requestID", requestID))
return ctx.Status(fiber.StatusInternalServerError).JSON(struct {
Ready string `json:"ready"`
Error string `json:"error"`
}{Ready: "NO", Error: err.Error()})
}
}
c.logger.Error("Ready check failed", zap.Error(err))
return ctx.Status(fiber.StatusInternalServerError).JSON(struct {
return ctx.Status(fiber.StatusOK).JSON(struct {
Ready string `json:"ready"`
Error string `json:"error"`
}{Ready: "NO", Error: err.Error()})
}{Ready: "OK"})
}

View File

@ -1,46 +0,0 @@
package infraestructure
import (
"context"
"fmt"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// Repository definition.
type Repository struct {
db *mongo.Database
logger *zap.Logger
}
// NewRepository create a new Repository.
func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository {
return &Repository{db: db,
logger: logger.With(zap.String("module", "InfraestructureRepository")),
}
}
// GetMongoStatus get mongo server status
func (r *Repository) GetMongoStatus(ctx context.Context) (*MongoStatus, error) {
command := bson.D{{Key: "serverStatus", Value: 1}}
result := r.db.RunCommand(ctx, command)
if result.Err() != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed execute command mongo serverStatus",
zap.Error(result.Err()), zap.String("requestID", requestID))
return nil, errors.WithStack(result.Err())
}
var mongoStatus MongoStatus
err := result.Decode(&mongoStatus)
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed decoding cursor to *MongoStatus", zap.Error(err),
zap.String("requestID", requestID))
return nil, errors.WithStack(err)
}
return &mongoStatus, nil
}

View File

@ -3,7 +3,7 @@ package infraestructure
import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/pprof"
"go.mongodb.org/mongo-driver/mongo"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"go.uber.org/zap"
)
@ -13,10 +13,8 @@ type Server struct {
logger *zap.Logger
}
func NewServer(logger *zap.Logger, port string, db *mongo.Database, pprofEnabled bool) *Server {
repository := NewRepository(db, logger)
service := NewService(repository, logger)
ctrl := NewController(service, logger)
func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...health.Check) *Server {
ctrl := NewController(checks, logger)
app := fiber.New(fiber.Config{DisableStartupMessage: true})
if pprofEnabled {
app.Use(pprof.New())

View File

@ -1,38 +0,0 @@
package infraestructure
import (
"context"
"fmt"
"go.uber.org/zap"
)
type Service struct {
repo *Repository
logger *zap.Logger
}
// NewService create a new governor.Service.
func NewService(dao *Repository, logger *zap.Logger) *Service {
return &Service{repo: dao, logger: logger.With(zap.String("module", "Infraestructureervice"))}
}
// CheckMongoServerStatus
func (s *Service) CheckMongoServerStatus(ctx context.Context) (bool, error) {
mongoStatus, err := s.repo.GetMongoStatus(ctx)
if err != nil {
return false, err
}
// check mongo server status
mongoStatusCheck := (mongoStatus.Ok == 1 && mongoStatus.Pid > 0 && mongoStatus.Uptime > 0)
if !mongoStatusCheck {
return false, fmt.Errorf("mongo server not ready (Ok = %v, Pid = %v, Uptime = %v)", mongoStatus.Ok, mongoStatus.Pid, mongoStatus.Uptime)
}
// check mongo connections
if mongoStatus.Connections.Available <= 0 {
return false, fmt.Errorf("mongo server without available connections (availableConection = %v)", mongoStatus.Connections.Available)
}
return true, nil
}

View File

@ -0,0 +1,86 @@
package source
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// Watcher represents a listener of database changes.
type MongoWatcher struct {
db *mongo.Database
dbName string
handler WatcherFunc
logger *zap.Logger
}
// WatcherFunc is a function to send database changes.
type WatcherFunc func(*Event)
type watchEvent struct {
DocumentKey documentKey `bson:"documentKey"`
OperationType string `bson:"operationType"`
DbFullDocument Event `bson:"fullDocument"`
}
type documentKey struct {
ID string `bson:"_id"`
}
// Event represents a database change.
type Event struct {
ID string `bson:"_id"`
Vaas []byte
}
const queryTemplate = `
[
{
"$match" : {
"operationType" : "insert",
"ns": { "$in": [{"db": "%s", "coll": "vaasPythnet"}, {"db": "%s", "coll": "vaas"}] }
}
}
]
`
// NewWatcher creates a new database event watcher.
func NewMongoWatcher(db *mongo.Database, dbName string, handler WatcherFunc, logger *zap.Logger) *MongoWatcher {
return &MongoWatcher{
db: db,
dbName: dbName,
handler: handler,
logger: logger,
}
}
// Start executes database event consumption.
func (w *MongoWatcher) Start(ctx context.Context) error {
query := fmt.Sprintf(queryTemplate, w.dbName, w.dbName)
var steps []bson.D
err := bson.UnmarshalExtJSON([]byte(query), true, &steps)
if err != nil {
return err
}
stream, err := w.db.Watch(ctx, steps)
if err != nil {
return err
}
go func() {
for stream.Next(ctx) {
var e watchEvent
if err := stream.Decode(&e); err != nil {
w.logger.Error("Error unmarshalling event", zap.Error(err))
continue
}
w.handler(&Event{
ID: e.DbFullDocument.ID,
Vaas: e.DbFullDocument.Vaas,
})
}
}()
return nil
}

View File

@ -0,0 +1,75 @@
package source
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"go.uber.org/zap"
)
// RedisSubscriber is a redis subscriber.
type RedisSubscriber struct {
client *redis.Client
pubSub *redis.PubSub
logger *zap.Logger
handler WatcherFunc
}
// NewRedisSubscriber create a new redis subscriber.
func NewRedisSubscriber(ctx context.Context, redisClient *redis.Client, prefix, channel string, handler WatcherFunc, log *zap.Logger) (*RedisSubscriber, error) {
if redisClient == nil {
return nil, errors.New("redis client is nil")
}
channel = fmt.Sprintf("%s:%s", prefix, channel)
pubsub := redisClient.Subscribe(ctx, channel)
return &RedisSubscriber{
client: redisClient,
pubSub: pubsub,
handler: handler,
logger: log}, nil
}
// Start executes database event consumption.
func (w *RedisSubscriber) Start(ctx context.Context) error {
w.subscribe(ctx)
return nil
}
// Close closes the redis event consumption.
func (w *RedisSubscriber) Close(ctx context.Context) error {
return w.pubSub.Close()
}
func (r *RedisSubscriber) subscribe(ctx context.Context) {
ch := r.pubSub.Channel()
go func() {
for msg := range ch {
var notification domain.NotificationEvent
err := json.Unmarshal([]byte(msg.Payload), &notification)
if err != nil {
r.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err))
continue
}
switch notification.Type {
case domain.SignedVaaType:
signedVaa, err := domain.GetEventPayload[domain.SignedVaa](&notification)
if err != nil {
r.logger.Error("Error decoding signedVAA from notification event", zap.String("trackId", notification.TrackID), zap.Error(err))
continue
}
r.handler(&Event{
ID: signedVaa.ID,
Vaas: signedVaa.Vaa,
})
default:
continue
}
}
}()
}