diff --git a/parser/README.md b/parser/README.md index 16d589ca..ca2ad264 100644 --- a/parser/README.md +++ b/parser/README.md @@ -1,11 +1,19 @@ # Parser +## Config sns topic + +aws --profile localstack --endpoint-url=http://localhost:4566 sns create-topic --name vaas-pipeline.fifo --attributes FifoTopic=true,ContentBasedDeduplication=false + ## Config SQS FIFO with dead letter queue localstack aws --profile localstack --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=wormhole-vaa-parser-dlq-queue.fifo --attributes "FifoQueue=true" aws --profile localstack --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=wormhole-vaa-parser-queue.fifo --attributes FifoQueue=true,MessageRetentionPeriod=3600,ReceiveMessageWaitTimeSeconds=5,VisibilityTimeout=20,RedrivePolicy="\"{\\\"deadLetterTargetArn\\\":\\\"arn:aws:sqs:us-east-1:000000000000:wormhole-vaa-parser-dlq-queue.fifo\\\",\\\"maxReceiveCount\\\":\\\"2\\\"}\"" +## Subscribe SQS FIFO to vaas-pipeline.fifo topic + +aws --profile localstack --endpoint-url=http://localhost:4566 sns subscribe --topic-arn arn:aws:sns:us-east-1:000000000000:vaas-pipeline.fifo --protocol sqs --notification-endpoint http://localhost:4566/000000000000/wormhole-vaa-parser-queue.fifo + ## Check message in the dead letter queue localstack aws --profile localstack --endpoint-url=http://localhost:4566 sqs receive-message --queue-url=http://localhost:4566/000000000000/wormhole-vaa-parser-dlq-queue.fifo \ No newline at end of file diff --git a/parser/cmd/main.go b/parser/cmd/main.go index 29f660aa..9a7c2cc8 100644 --- a/parser/cmd/main.go +++ b/parser/cmd/main.go @@ -12,13 +12,12 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" ipfslog "github.com/ipfs/go-log/v2" "github.com/wormhole-foundation/wormhole-explorer/parser/config" + "github.com/wormhole-foundation/wormhole-explorer/parser/consumer" "github.com/wormhole-foundation/wormhole-explorer/parser/http/infrastructure" "github.com/wormhole-foundation/wormhole-explorer/parser/internal/db" "github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs" "github.com/wormhole-foundation/wormhole-explorer/parser/parser" - "github.com/wormhole-foundation/wormhole-explorer/parser/pipeline" "github.com/wormhole-foundation/wormhole-explorer/parser/queue" - "github.com/wormhole-foundation/wormhole-explorer/parser/watcher" "go.uber.org/zap" ) @@ -65,20 +64,12 @@ func main() { logger.Fatal("failed to create parse vaa api client") } - // get publish function. - sqsConsumer, vaaPushFunc, vaaConsumeFunc := newVAAPublishAndConsume(rootCtx, config, logger) + // get consumer function. + sqsConsumer, vaaConsumeFunc := newVAAConsume(rootCtx, config, logger) repository := parser.NewRepository(db.Database, logger) - // // create a new publisher. - publisher := pipeline.NewPublisher(logger, repository, vaaPushFunc) - watcher := watcher.NewWatcher(rootCtx, db.Database, config.MongoDatabase, publisher.Publish, logger) - err = watcher.Start(rootCtx) - if err != nil { - logger.Fatal("failed to watch MongoDB", zap.Error(err)) - } - - // create a consumer - consumer := pipeline.NewConsumer(vaaConsumeFunc, repository, parserVAAAPIClient, logger) + // create and start a consumer + consumer := consumer.New(vaaConsumeFunc, repository, parserVAAAPIClient, logger) consumer.Start(rootCtx) server := infrastructure.NewServer(logger, config.Port, config.PprofEnabled, config.IsQueueConsumer(), sqsConsumer, db.Database) @@ -129,35 +120,16 @@ func newAwsConfig(appCtx context.Context, cfg *config.Configuration) (aws.Config return awsCfg, err } -// Creates two callbacks depending on whether the execution is local (memory queue) or not (SQS queue) -func newVAAPublishAndConsume(appCtx context.Context, config *config.Configuration, logger *zap.Logger) (*sqs.Consumer, queue.VAAPushFunc, queue.VAAConsumeFunc) { - // check is consumer type. - if !config.IsQueueConsumer() { - vaaQueue := queue.NewVAAInMemory() - return nil, vaaQueue.Publish, vaaQueue.Consume - } - +// Creates a callbacks depending on whether the execution is local (memory queue) or not (SQS queue) +func newVAAConsume(appCtx context.Context, config *config.Configuration, logger *zap.Logger) (*sqs.Consumer, queue.VAAConsumeFunc) { sqsConsumer, err := newSQSConsumer(appCtx, config) if err != nil { logger.Fatal("failed to create sqs consumer", zap.Error(err)) } - sqsProducer, err := newSQSProducer(appCtx, config) - if err != nil { - logger.Fatal("failed to create sqs producer", zap.Error(err)) - } - - vaaQueue := queue.NewVAASQS(sqsProducer, sqsConsumer, logger) - return sqsConsumer, vaaQueue.Publish, vaaQueue.Consume -} - -func newSQSProducer(appCtx context.Context, config *config.Configuration) (*sqs.Producer, error) { - awsConfig, err := newAwsConfig(appCtx, config) - if err != nil { - return nil, err - } - - return sqs.NewProducer(awsConfig, config.SQSUrl) + filterConsumeFunc := newFilterFunc(config) + vaaQueue := queue.NewVAASQS(sqsConsumer, filterConsumeFunc, logger) + return sqsConsumer, vaaQueue.Consume } func newSQSConsumer(appCtx context.Context, config *config.Configuration) (*sqs.Consumer, error) { @@ -170,3 +142,10 @@ func newSQSConsumer(appCtx context.Context, config *config.Configuration) (*sqs. sqs.WithMaxMessages(10), sqs.WithVisibilityTimeout(120)) } + +func newFilterFunc(cfg *config.Configuration) queue.FilterConsumeFunc { + if cfg.P2pNetwork == config.P2pMainNet { + return queue.PythFilter + } + return queue.NonFilter +} diff --git a/parser/config/config.go b/parser/config/config.go index bc7e4763..c269f8bd 100644 --- a/parser/config/config.go +++ b/parser/config/config.go @@ -7,6 +7,13 @@ import ( "github.com/sethvargo/go-envconfig" ) +// p2p network constants. +const ( + P2pMainNet = "mainnet" + P2pTestNet = "testnet" + P2pDevNet = "devnet" +) + // Configuration represents the application configuration with the default values. type Configuration struct { Env string `env:"ENV,default=development"` @@ -23,6 +30,7 @@ type Configuration struct { VaaPayloadParserURL string `env:"VAA_PAYLOAD_PARSER_URL, required"` VaaPayloadParserTimeout int64 `env:"VAA_PAYLOAD_PARSER_TIMEOUT, required"` PprofEnabled bool `env:"PPROF_ENABLED,default=false"` + P2pNetwork string `env:"P2P_NETWORK,required"` } // New creates a configuration with the values from .env file and environment variables. diff --git a/parser/pipeline/consumer.go b/parser/consumer/consumer.go similarity index 71% rename from parser/pipeline/consumer.go rename to parser/consumer/consumer.go index 80d28e86..223453b9 100644 --- a/parser/pipeline/consumer.go +++ b/parser/consumer/consumer.go @@ -1,16 +1,17 @@ -package pipeline +package consumer import ( "context" "errors" - "strconv" "time" "github.com/wormhole-foundation/wormhole-explorer/parser/parser" "github.com/wormhole-foundation/wormhole-explorer/parser/queue" + "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) +// Consumer consumer struct definition. type Consumer struct { consume queue.VAAConsumeFunc repository *parser.Repository @@ -18,8 +19,8 @@ type Consumer struct { logger *zap.Logger } -// NewConsumer creates a new vaa consumer. -func NewConsumer(consume queue.VAAConsumeFunc, repository *parser.Repository, parser parser.ParserVAAAPIClient, logger *zap.Logger) *Consumer { +// New creates a new vaa consumer. +func New(consume queue.VAAConsumeFunc, repository *parser.Repository, parser parser.ParserVAAAPIClient, logger *zap.Logger) *Consumer { return &Consumer{consume: consume, repository: repository, parser: parser, logger: logger} } @@ -31,24 +32,31 @@ func (c *Consumer) Start(ctx context.Context) { // check id message is expired. if msg.IsExpired() { - c.logger.Warn("Message with vaa expired", zap.String("id", event.ID())) + c.logger.Warn("Message with vaa expired", zap.String("id", event.ID)) + msg.Failed() + continue + } + + // unmarshal vaa. + vaa, err := vaa.Unmarshal(event.Vaa) + if err != nil { + c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err)) msg.Failed() continue } // call vaa-payload-parser api to parse a VAA. - sequence := strconv.FormatUint(event.Sequence, 10) - vaaParseResponse, err := c.parser.Parse(event.ChainID, event.EmitterAddress, sequence, event.Vaa) + vaaParseResponse, err := c.parser.Parse(event.ChainID, event.EmitterAddress, event.Sequence, vaa.Payload) if err != nil { if errors.Is(err, parser.ErrInternalError) { c.logger.Info("error parsing VAA, will retry later", zap.Uint16("chainID", event.ChainID), - zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err)) + zap.String("address", event.EmitterAddress), zap.String("sequence", event.Sequence), zap.Error(err)) msg.Failed() continue } c.logger.Info("VAA cannot be parsed", zap.Uint16("chainID", event.ChainID), - zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err)) + zap.String("address", event.EmitterAddress), zap.String("sequence", event.Sequence), zap.Error(err)) msg.Done() continue } @@ -56,10 +64,10 @@ func (c *Consumer) Start(ctx context.Context) { // create ParsedVaaUpdate to upsert. now := time.Now() vaaParsed := parser.ParsedVaaUpdate{ - ID: event.ID(), + ID: event.ID, EmitterChain: event.ChainID, EmitterAddr: event.EmitterAddress, - Sequence: strconv.FormatUint(event.Sequence, 10), + Sequence: event.Sequence, AppID: vaaParseResponse.AppID, Result: vaaParseResponse.Result, UpdatedAt: &now, @@ -68,13 +76,13 @@ func (c *Consumer) Start(ctx context.Context) { err = c.repository.UpsertParsedVaa(ctx, vaaParsed) if err != nil { c.logger.Error("Error inserting vaa in repository", - zap.String("id", event.ID()), + zap.String("id", event.ID), zap.Error(err)) msg.Failed() continue } msg.Done() - c.logger.Info("Vaa save in repository", zap.String("id", event.ID())) + c.logger.Info("Vaa save in repository", zap.String("id", event.ID)) } }() } diff --git a/parser/internal/sqs/sqs_producer.go b/parser/internal/sqs/sqs_producer.go deleted file mode 100644 index d40bd699..00000000 --- a/parser/internal/sqs/sqs_producer.go +++ /dev/null @@ -1,33 +0,0 @@ -package sqs - -import ( - "context" - - "github.com/aws/aws-sdk-go-v2/aws" - aws_sqs "github.com/aws/aws-sdk-go-v2/service/sqs" -) - -// Producer represents SQS producer. -type Producer struct { - api *aws_sqs.Client - url string -} - -func NewProducer(awsConfig aws.Config, url string) (*Producer, error) { - return &Producer{ - api: aws_sqs.NewFromConfig(awsConfig), - url: url, - }, nil -} - -// SendMessage sends messages to SQS. -func (p *Producer) SendMessage(ctx context.Context, groupID, deduplicationID, body string) error { - _, err := p.api.SendMessage(ctx, - &aws_sqs.SendMessageInput{ - MessageGroupId: aws.String(groupID), - MessageDeduplicationId: aws.String(deduplicationID), - MessageBody: aws.String(body), - QueueUrl: aws.String(p.url), - }) - return err -} diff --git a/parser/pipeline/publisher.go b/parser/pipeline/publisher.go deleted file mode 100644 index 0253d770..00000000 --- a/parser/pipeline/publisher.go +++ /dev/null @@ -1,49 +0,0 @@ -package pipeline - -import ( - "context" - - "github.com/wormhole-foundation/wormhole-explorer/parser/parser" - "github.com/wormhole-foundation/wormhole-explorer/parser/queue" - "github.com/wormhole-foundation/wormhole-explorer/parser/watcher" - "github.com/wormhole-foundation/wormhole/sdk/vaa" - "go.uber.org/zap" -) - -// Publisher definition. -type Publisher struct { - logger *zap.Logger - repository *parser.Repository - pushFunc queue.VAAPushFunc -} - -// NewPublisher creates a new publisher for vaa with parse configuration. -func NewPublisher(logger *zap.Logger, repository *parser.Repository, pushFunc queue.VAAPushFunc) *Publisher { - return &Publisher{logger: logger, repository: repository, pushFunc: pushFunc} -} - -// Publish sends a VaaEvent for the vaa that has parse configuration defined. -func (p *Publisher) Publish(ctx context.Context, e *watcher.Event) { - // deserializes the binary representation of a VAA - vaa, err := vaa.Unmarshal(e.Vaas) - if err != nil { - p.logger.Error("error Unmarshal vaa", zap.Error(err)) - return - } - - // V2 Get chainID/address that have parser function defined and add to sqs only that vaa. - - // create a VaaEvent. - event := queue.VaaEvent{ - ChainID: uint16(vaa.EmitterChain), - EmitterAddress: vaa.EmitterAddress.String(), - Sequence: vaa.Sequence, - Vaa: vaa.Payload, - } - - // push messages to queue. - err = p.pushFunc(ctx, &event) - if err != nil { - p.logger.Error("can not push event to queue", zap.Error(err), zap.String("event", event.ID())) - } -} diff --git a/parser/queue/filter.go b/parser/queue/filter.go new file mode 100644 index 00000000..dc0a1f8b --- /dev/null +++ b/parser/queue/filter.go @@ -0,0 +1,13 @@ +package queue + +import "github.com/wormhole-foundation/wormhole/sdk/vaa" + +// PythFilter filter vaa event from pyth chain. +func PythFilter(vaaEvent *VaaEvent) bool { + return vaaEvent.ChainID == uint16(vaa.ChainIDPythNet) +} + +// NonFilter non filter vaa evant. +func NonFilter(vaaEvent *VaaEvent) bool { + return false +} diff --git a/parser/queue/queue.go b/parser/queue/queue.go index 4a8dda16..9e414076 100644 --- a/parser/queue/queue.go +++ b/parser/queue/queue.go @@ -2,15 +2,28 @@ package queue import ( "context" - "fmt" + "time" ) +type sqsEvent struct { + MessageID string `json:"MessageId"` + Message string `json:"Message"` +} + // VaaEvent represents a vaa data to be handle by the pipeline. type VaaEvent struct { - ChainID uint16 `json:"chainId"` - EmitterAddress string `json:"emitter"` - Sequence uint64 `json:"sequence"` - Vaa []byte `json:"vaa"` + ID string `json:"id"` + ChainID uint16 `json:"emitterChain"` + EmitterAddress string `json:"emitterAddr"` + Sequence string `json:"sequence"` + GuardianSetIndex uint32 `json:"guardianSetIndex"` + Vaa []byte `json:"vaas"` + IndexedAt time.Time `json:"indexedAt"` + Timestamp *time.Time `json:"timestamp"` + UpdatedAt *time.Time `json:"updatedAt"` + TxHash string `json:"txHash"` + Version uint16 `json:"version"` + Revision uint16 `json:"revision"` } // ConsumerMessage defition. @@ -21,13 +34,5 @@ type ConsumerMessage interface { IsExpired() bool } -// ID get vaa ID (chainID/emiiterAddress/sequence) -func (v *VaaEvent) ID() string { - return fmt.Sprintf("%d/%s/%d", v.ChainID, v.EmitterAddress, v.Sequence) -} - -// VAAPushFunc is a function to push VAAEvent. -type VAAPushFunc func(context.Context, *VaaEvent) error - // VAAConsumeFunc is a function to consume VAAEvent. type VAAConsumeFunc func(context.Context) <-chan ConsumerMessage diff --git a/parser/queue/vaa_memory.go b/parser/queue/vaa_memory.go deleted file mode 100644 index 720cec56..00000000 --- a/parser/queue/vaa_memory.go +++ /dev/null @@ -1,60 +0,0 @@ -package queue - -import ( - "context" -) - -// VAAInMemoryOption represents a VAA queue in memory option function. -type VAAInMemoryOption func(*VAAInMemory) - -// VAAInMemory represents VAA queue in memory. -type VAAInMemory struct { - ch chan ConsumerMessage - size int -} - -// NewVAAInMemory creates a VAA queue in memory instances. -func NewVAAInMemory(opts ...VAAInMemoryOption) *VAAInMemory { - m := &VAAInMemory{size: 100} - for _, opt := range opts { - opt(m) - } - m.ch = make(chan ConsumerMessage, m.size) - return m -} - -// WithSize allows to specify an channel size when setting a value. -func WithSize(v int) VAAInMemoryOption { - return func(i *VAAInMemory) { - i.size = v - } -} - -// Publish sends the message to a channel. -func (i *VAAInMemory) Publish(_ context.Context, message *VaaEvent) error { - i.ch <- &memoryConsumerMessage{ - data: message, - } - return nil -} - -// Consume returns the channel with the received messages. -func (i *VAAInMemory) Consume(_ context.Context) <-chan ConsumerMessage { - return i.ch -} - -type memoryConsumerMessage struct { - data *VaaEvent -} - -func (m *memoryConsumerMessage) Data() *VaaEvent { - return m.data -} - -func (m *memoryConsumerMessage) Done() {} - -func (m *memoryConsumerMessage) Failed() {} - -func (m *memoryConsumerMessage) IsExpired() bool { - return false -} diff --git a/parser/queue/vaa_sqs.go b/parser/queue/vaa_sqs.go index 82d0139f..515592aa 100644 --- a/parser/queue/vaa_sqs.go +++ b/parser/queue/vaa_sqs.go @@ -3,7 +3,6 @@ package queue import ( "context" "encoding/json" - "fmt" "sync" "time" @@ -16,21 +15,24 @@ type SQSOption func(*SQS) // SQS represents a VAA queue in SQS. type SQS struct { - producer *sqs.Producer - consumer *sqs.Consumer - ch chan ConsumerMessage - chSize int - wg sync.WaitGroup - logger *zap.Logger + consumer *sqs.Consumer + ch chan ConsumerMessage + chSize int + wg sync.WaitGroup + filterConsume FilterConsumeFunc + logger *zap.Logger } +// FilterConsumeFunc filter vaaa func definition. +type FilterConsumeFunc func(vaaEvent *VaaEvent) bool + // NewVAASQS creates a VAA queue in SQS instances. -func NewVAASQS(producer *sqs.Producer, consumer *sqs.Consumer, logger *zap.Logger, opts ...SQSOption) *SQS { +func NewVAASQS(consumer *sqs.Consumer, filterConsume FilterConsumeFunc, logger *zap.Logger, opts ...SQSOption) *SQS { s := &SQS{ - producer: producer, - consumer: consumer, - chSize: 10, - logger: logger} + consumer: consumer, + chSize: 10, + filterConsume: filterConsume, + logger: logger} for _, opt := range opts { opt(s) } @@ -45,17 +47,6 @@ func WithChannelSize(size int) SQSOption { } } -// Publish sends the message to a SQS queue. -func (q *SQS) Publish(ctx context.Context, message *VaaEvent) error { - body, err := json.Marshal(message) - if err != nil { - return err - } - groupID := fmt.Sprintf("%d/%s", message.ChainID, message.EmitterAddress) - deduplicationID := fmt.Sprintf("%d/%s/%d", message.ChainID, message.EmitterAddress, message.Sequence) - return q.producer.SendMessage(ctx, groupID, deduplicationID, string(body)) -} - // Consume returns the channel with the received messages from SQS queue. func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { go func() { @@ -67,16 +58,34 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage { } expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout()) for _, msg := range messages { - var body VaaEvent - err := json.Unmarshal([]byte(*msg.Body), &body) + // unmarshal body to sqsEvent + var sqsEvent sqsEvent + err := json.Unmarshal([]byte(*msg.Body), &sqsEvent) if err != nil { q.logger.Error("Error decoding message from SQS", zap.Error(err)) continue } + + // unmarshal message to vaaEvent + var vaaEvent VaaEvent + err = json.Unmarshal([]byte(sqsEvent.Message), &vaaEvent) + if err != nil { + q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err)) + continue + } + + // filter vaaEvent by p2p net. + if q.filterConsume(&vaaEvent) { + if err := q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil { + q.logger.Error("Error deleting message from SQS", zap.Error(err)) + } + continue + } + q.wg.Add(1) q.ch <- &sqsConsumerMessage{ id: msg.ReceiptHandle, - data: &body, + data: &vaaEvent, wg: &q.wg, logger: q.logger, consumer: q.consumer, diff --git a/parser/watcher/watcher.go b/parser/watcher/watcher.go deleted file mode 100644 index c9e45dba..00000000 --- a/parser/watcher/watcher.go +++ /dev/null @@ -1,86 +0,0 @@ -package watcher - -import ( - "context" - "fmt" - - "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/mongo" - "go.uber.org/zap" -) - -// Watcher represents a listener of database changes. -type Watcher struct { - db *mongo.Database - dbName string - handler WatcherFunc - logger *zap.Logger -} - -// WatcherFunc is a function to send database changes. -type WatcherFunc func(context.Context, *Event) - -type watchEvent struct { - DocumentKey documentKey `bson:"documentKey"` - OperationType string `bson:"operationType"` - DbFullDocument Event `bson:"fullDocument"` -} -type documentKey struct { - ID string `bson:"_id"` -} - -// Event represents a database change. -type Event struct { - ID string `bson:"_id"` - Vaas []byte -} - -const queryTemplate = ` - [ - { - "$match" : { - "operationType" : "insert", - "ns": { "$in": [{"db": "%s", "coll": "vaas"}] } - } - } - ] -` - -// NewWatcher creates a new database event watcher. -func NewWatcher(ctx context.Context, db *mongo.Database, dbName string, handler WatcherFunc, logger *zap.Logger) *Watcher { - return &Watcher{ - db: db, - dbName: dbName, - handler: handler, - logger: logger, - } -} - -// Start executes database event consumption. -func (w *Watcher) Start(ctx context.Context) error { - query := fmt.Sprintf(queryTemplate, w.dbName) - var steps []bson.D - err := bson.UnmarshalExtJSON([]byte(query), true, &steps) - if err != nil { - return err - } - - stream, err := w.db.Watch(ctx, steps) - if err != nil { - return err - } - go func() { - for stream.Next(ctx) { - var e watchEvent - if err := stream.Decode(&e); err != nil { - w.logger.Error("Error unmarshalling event", zap.Error(err)) - continue - } - w.handler(ctx, &Event{ - ID: e.DbFullDocument.ID, - Vaas: e.DbFullDocument.Vaas, - }) - } - }() - return nil -}