This commit is contained in:
Fernando Torres 2024-04-24 11:00:30 -03:00
parent 793b37fc32
commit b638aaafd7
No known key found for this signature in database
GPG Key ID: 962617117C40D42A
7 changed files with 135 additions and 10 deletions

26
fly/builder/event.go Normal file
View File

@ -0,0 +1,26 @@
package builder
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
"github.com/wormhole-foundation/wormhole-explorer/fly/event"
"go.uber.org/zap"
)
func NewEventDispatcher(ctx context.Context, config *config.Configuration, logger *zap.Logger) event.EventDispatcher {
if config.IsLocal {
return event.NewNoopEventDispatcher()
}
awsConfig, err := NewAwsConfig(ctx, config)
if err != nil {
logger.Fatal("could not create aws config", zap.Error(err))
}
ed, err := event.NewSnsEventDispatcher(awsConfig, config.Aws.EventsSnsUrl)
if err != nil {
logger.Fatal("could not create sns event dispatcher", zap.Error(err))
}
return ed
}

View File

@ -77,6 +77,7 @@ type AwsConfiguration struct {
AwsEndpoint string `env:"AWS_ENDPOINT"`
SqsUrl string `env:"SQS_URL,required"`
ObservationsSqsUrl string `env:"OBSERVATIONS_SQS_URL,required"`
EventsSnsUrl string `env:"EVENTS_SNS_URL,required"`
}
type Cache struct {

13
fly/event/noop.go Normal file
View File

@ -0,0 +1,13 @@
package event
import "context"
type NoopEventDispatcher struct{}
func NewNoopEventDispatcher() *NoopEventDispatcher {
return &NoopEventDispatcher{}
}
func (n *NoopEventDispatcher) NewDuplicateVaa(context.Context, DuplicateVaa) error {
return nil
}

42
fly/event/sns.go Normal file
View File

@ -0,0 +1,42 @@
package event
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
aws_sns "github.com/aws/aws-sdk-go-v2/service/sns"
)
type SnsEventDispatcher struct {
api *aws_sns.Client
url string
}
func NewSnsEventDispatcher(awsConfig aws.Config, url string) (*SnsEventDispatcher, error) {
return &SnsEventDispatcher{
api: aws_sns.NewFromConfig(awsConfig),
url: url,
}, nil
}
func (s *SnsEventDispatcher) NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error {
body, err := json.Marshal(event{
Type: "duplicated-vaa",
Source: "fly",
Data: e,
})
if err != nil {
return err
}
groupID := fmt.Sprintf("%s-%s", e.VaaID, e.Digest)
_, err = s.api.Publish(ctx,
&aws_sns.PublishInput{
MessageGroupId: aws.String(groupID),
MessageDeduplicationId: aws.String(groupID),
Message: aws.String(string(body)),
TopicArn: aws.String(s.url),
})
return err
}

26
fly/event/types.go Normal file
View File

@ -0,0 +1,26 @@
package event
import (
"context"
"time"
)
type DuplicateVaa struct {
VaaID string `json:"vaaId"`
Version uint8 `json:"version"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`
Digest string `json:"digest"`
ConsistencyLevel uint8 `json:"consistencyLevel"`
Timestamp *time.Time `json:"timestamp"`
}
type event struct {
Type string `json:"type"`
Source string `json:"source"`
Data any `json:"data"`
}
type EventDispatcher interface {
NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error
}

View File

@ -99,7 +99,9 @@ func main() {
if err != nil {
logger.Fatal("could not create tx hash store", zap.Error(err))
}
repository := storage.NewRepository(alertClient, metrics, db.Database, producerFunc, txHashStore, logger)
eventDispatcher := builder.NewEventDispatcher(rootCtx, cfg, logger)
repository := storage.NewRepository(alertClient, metrics, db.Database, producerFunc, txHashStore, eventDispatcher, logger)
vaaNonPythDedup, err := builder.NewDeduplicator("vaas-dedup", cfg.VaasDedup, logger)
if err != nil {

View File

@ -15,6 +15,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/common/events"
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
"github.com/wormhole-foundation/wormhole-explorer/common/utils"
"github.com/wormhole-foundation/wormhole-explorer/fly/event"
flyAlert "github.com/wormhole-foundation/wormhole-explorer/fly/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/track"
@ -30,13 +31,14 @@ import (
// TODO separate and maybe share between fly and web
type Repository struct {
alertClient alert.AlertClient
metrics metrics.Metrics
db *mongo.Database
afterUpdate producer.PushFunc
txHashStore txhash.TxHashStore
log *zap.Logger
collections struct {
alertClient alert.AlertClient
metrics metrics.Metrics
db *mongo.Database
afterUpdate producer.PushFunc
txHashStore txhash.TxHashStore
eventDispatcher event.EventDispatcher
log *zap.Logger
collections struct {
vaas *mongo.Collection
heartbeats *mongo.Collection
observations *mongo.Collection
@ -53,8 +55,9 @@ func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics,
db *mongo.Database,
vaaTopicFunc producer.PushFunc,
txHashStore txhash.TxHashStore,
eventDispatcher event.EventDispatcher,
log *zap.Logger) *Repository {
return &Repository{alertService, metrics, db, vaaTopicFunc, txHashStore, log, struct {
return &Repository{alertService, metrics, db, vaaTopicFunc, txHashStore, eventDispatcher, log, struct {
vaas *mongo.Collection
heartbeats *mongo.Collection
observations *mongo.Collection
@ -514,7 +517,19 @@ func (s *Repository) UpsertDuplicateVaa(ctx context.Context, v *vaa.VAA, seriali
// send signedvaa event to topic.
if s.isNewRecord(result) {
return s.notifyNewVaa(ctx, v, serializedVaa, duplicateVaaDoc.TxHash)
err := s.notifyNewVaa(ctx, v, serializedVaa, duplicateVaaDoc.TxHash)
if err != nil {
return err
}
return s.eventDispatcher.NewDuplicateVaa(ctx, event.DuplicateVaa{
VaaID: v.MessageID(),
Version: v.Version,
GuardianSetIndex: v.GuardianSetIndex,
Vaa: serializedVaa,
Digest: utils.NormalizeHex(v.HexDigest()),
ConsistencyLevel: v.ConsistencyLevel,
Timestamp: &v.Timestamp,
})
}
return nil