From b638aaafd768243eb14fe77738bb53c7568092c3 Mon Sep 17 00:00:00 2001 From: Fernando Torres Date: Wed, 24 Apr 2024 11:00:30 -0300 Subject: [PATCH] wip --- fly/builder/event.go | 26 ++++++++++++++++++++++++ fly/config/config.go | 1 + fly/event/noop.go | 13 ++++++++++++ fly/event/sns.go | 42 +++++++++++++++++++++++++++++++++++++++ fly/event/types.go | 26 ++++++++++++++++++++++++ fly/main.go | 4 +++- fly/storage/repository.go | 33 +++++++++++++++++++++--------- 7 files changed, 135 insertions(+), 10 deletions(-) create mode 100644 fly/builder/event.go create mode 100644 fly/event/noop.go create mode 100644 fly/event/sns.go create mode 100644 fly/event/types.go diff --git a/fly/builder/event.go b/fly/builder/event.go new file mode 100644 index 00000000..4144e47f --- /dev/null +++ b/fly/builder/event.go @@ -0,0 +1,26 @@ +package builder + +import ( + "context" + + "github.com/wormhole-foundation/wormhole-explorer/fly/config" + "github.com/wormhole-foundation/wormhole-explorer/fly/event" + "go.uber.org/zap" +) + +func NewEventDispatcher(ctx context.Context, config *config.Configuration, logger *zap.Logger) event.EventDispatcher { + if config.IsLocal { + return event.NewNoopEventDispatcher() + } + + awsConfig, err := NewAwsConfig(ctx, config) + if err != nil { + logger.Fatal("could not create aws config", zap.Error(err)) + } + + ed, err := event.NewSnsEventDispatcher(awsConfig, config.Aws.EventsSnsUrl) + if err != nil { + logger.Fatal("could not create sns event dispatcher", zap.Error(err)) + } + return ed +} diff --git a/fly/config/config.go b/fly/config/config.go index 14eca724..5b71ea2f 100644 --- a/fly/config/config.go +++ b/fly/config/config.go @@ -77,6 +77,7 @@ type AwsConfiguration struct { AwsEndpoint string `env:"AWS_ENDPOINT"` SqsUrl string `env:"SQS_URL,required"` ObservationsSqsUrl string `env:"OBSERVATIONS_SQS_URL,required"` + EventsSnsUrl string `env:"EVENTS_SNS_URL,required"` } type Cache struct { diff --git a/fly/event/noop.go b/fly/event/noop.go new file mode 100644 index 00000000..a73b644b --- /dev/null +++ b/fly/event/noop.go @@ -0,0 +1,13 @@ +package event + +import "context" + +type NoopEventDispatcher struct{} + +func NewNoopEventDispatcher() *NoopEventDispatcher { + return &NoopEventDispatcher{} +} + +func (n *NoopEventDispatcher) NewDuplicateVaa(context.Context, DuplicateVaa) error { + return nil +} diff --git a/fly/event/sns.go b/fly/event/sns.go new file mode 100644 index 00000000..934dc81d --- /dev/null +++ b/fly/event/sns.go @@ -0,0 +1,42 @@ +package event + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + aws_sns "github.com/aws/aws-sdk-go-v2/service/sns" +) + +type SnsEventDispatcher struct { + api *aws_sns.Client + url string +} + +func NewSnsEventDispatcher(awsConfig aws.Config, url string) (*SnsEventDispatcher, error) { + return &SnsEventDispatcher{ + api: aws_sns.NewFromConfig(awsConfig), + url: url, + }, nil +} + +func (s *SnsEventDispatcher) NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error { + body, err := json.Marshal(event{ + Type: "duplicated-vaa", + Source: "fly", + Data: e, + }) + if err != nil { + return err + } + groupID := fmt.Sprintf("%s-%s", e.VaaID, e.Digest) + _, err = s.api.Publish(ctx, + &aws_sns.PublishInput{ + MessageGroupId: aws.String(groupID), + MessageDeduplicationId: aws.String(groupID), + Message: aws.String(string(body)), + TopicArn: aws.String(s.url), + }) + return err +} diff --git a/fly/event/types.go b/fly/event/types.go new file mode 100644 index 00000000..4e4c7260 --- /dev/null +++ b/fly/event/types.go @@ -0,0 +1,26 @@ +package event + +import ( + "context" + "time" +) + +type DuplicateVaa struct { + VaaID string `json:"vaaId"` + Version uint8 `json:"version"` + GuardianSetIndex uint32 `json:"guardianSetIndex"` + Vaa []byte `json:"vaas"` + Digest string `json:"digest"` + ConsistencyLevel uint8 `json:"consistencyLevel"` + Timestamp *time.Time `json:"timestamp"` +} + +type event struct { + Type string `json:"type"` + Source string `json:"source"` + Data any `json:"data"` +} + +type EventDispatcher interface { + NewDuplicateVaa(ctx context.Context, e DuplicateVaa) error +} diff --git a/fly/main.go b/fly/main.go index 53fbdcc7..37bcc5ee 100644 --- a/fly/main.go +++ b/fly/main.go @@ -99,7 +99,9 @@ func main() { if err != nil { logger.Fatal("could not create tx hash store", zap.Error(err)) } - repository := storage.NewRepository(alertClient, metrics, db.Database, producerFunc, txHashStore, logger) + eventDispatcher := builder.NewEventDispatcher(rootCtx, cfg, logger) + + repository := storage.NewRepository(alertClient, metrics, db.Database, producerFunc, txHashStore, eventDispatcher, logger) vaaNonPythDedup, err := builder.NewDeduplicator("vaas-dedup", cfg.VaasDedup, logger) if err != nil { diff --git a/fly/storage/repository.go b/fly/storage/repository.go index 5d39edc0..edb9438f 100644 --- a/fly/storage/repository.go +++ b/fly/storage/repository.go @@ -15,6 +15,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/common/events" "github.com/wormhole-foundation/wormhole-explorer/common/repository" "github.com/wormhole-foundation/wormhole-explorer/common/utils" + "github.com/wormhole-foundation/wormhole-explorer/fly/event" flyAlert "github.com/wormhole-foundation/wormhole-explorer/fly/internal/alert" "github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/fly/internal/track" @@ -30,13 +31,14 @@ import ( // TODO separate and maybe share between fly and web type Repository struct { - alertClient alert.AlertClient - metrics metrics.Metrics - db *mongo.Database - afterUpdate producer.PushFunc - txHashStore txhash.TxHashStore - log *zap.Logger - collections struct { + alertClient alert.AlertClient + metrics metrics.Metrics + db *mongo.Database + afterUpdate producer.PushFunc + txHashStore txhash.TxHashStore + eventDispatcher event.EventDispatcher + log *zap.Logger + collections struct { vaas *mongo.Collection heartbeats *mongo.Collection observations *mongo.Collection @@ -53,8 +55,9 @@ func NewRepository(alertService alert.AlertClient, metrics metrics.Metrics, db *mongo.Database, vaaTopicFunc producer.PushFunc, txHashStore txhash.TxHashStore, + eventDispatcher event.EventDispatcher, log *zap.Logger) *Repository { - return &Repository{alertService, metrics, db, vaaTopicFunc, txHashStore, log, struct { + return &Repository{alertService, metrics, db, vaaTopicFunc, txHashStore, eventDispatcher, log, struct { vaas *mongo.Collection heartbeats *mongo.Collection observations *mongo.Collection @@ -514,7 +517,19 @@ func (s *Repository) UpsertDuplicateVaa(ctx context.Context, v *vaa.VAA, seriali // send signedvaa event to topic. if s.isNewRecord(result) { - return s.notifyNewVaa(ctx, v, serializedVaa, duplicateVaaDoc.TxHash) + err := s.notifyNewVaa(ctx, v, serializedVaa, duplicateVaaDoc.TxHash) + if err != nil { + return err + } + return s.eventDispatcher.NewDuplicateVaa(ctx, event.DuplicateVaa{ + VaaID: v.MessageID(), + Version: v.Version, + GuardianSetIndex: v.GuardianSetIndex, + Vaa: serializedVaa, + Digest: utils.NormalizeHex(v.HexDigest()), + ConsistencyLevel: v.ConsistencyLevel, + Timestamp: &v.Timestamp, + }) } return nil