Feature/refactor parser to use sns sqs (#148)
* Add README to configurate SNS and subscribe with parser FIFO SQS * Remove producer and watcher, handler new queue vaa event message
This commit is contained in:
parent
e40154bf01
commit
cbb7956f90
|
@ -1,11 +1,19 @@
|
|||
# Parser
|
||||
|
||||
## Config sns topic
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sns create-topic --name vaas-pipeline.fifo --attributes FifoTopic=true,ContentBasedDeduplication=false
|
||||
|
||||
## Config SQS FIFO with dead letter queue localstack
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=wormhole-vaa-parser-dlq-queue.fifo --attributes "FifoQueue=true"
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=wormhole-vaa-parser-queue.fifo --attributes FifoQueue=true,MessageRetentionPeriod=3600,ReceiveMessageWaitTimeSeconds=5,VisibilityTimeout=20,RedrivePolicy="\"{\\\"deadLetterTargetArn\\\":\\\"arn:aws:sqs:us-east-1:000000000000:wormhole-vaa-parser-dlq-queue.fifo\\\",\\\"maxReceiveCount\\\":\\\"2\\\"}\""
|
||||
|
||||
## Subscribe SQS FIFO to vaas-pipeline.fifo topic
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sns subscribe --topic-arn arn:aws:sns:us-east-1:000000000000:vaas-pipeline.fifo --protocol sqs --notification-endpoint http://localhost:4566/000000000000/wormhole-vaa-parser-queue.fifo
|
||||
|
||||
## Check message in the dead letter queue localstack
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sqs receive-message --queue-url=http://localhost:4566/000000000000/wormhole-vaa-parser-dlq-queue.fifo
|
|
@ -12,13 +12,12 @@ import (
|
|||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
ipfslog "github.com/ipfs/go-log/v2"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/config"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/consumer"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/http/infrastructure"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/db"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/pipeline"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/watcher"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -65,20 +64,12 @@ func main() {
|
|||
logger.Fatal("failed to create parse vaa api client")
|
||||
}
|
||||
|
||||
// get publish function.
|
||||
sqsConsumer, vaaPushFunc, vaaConsumeFunc := newVAAPublishAndConsume(rootCtx, config, logger)
|
||||
// get consumer function.
|
||||
sqsConsumer, vaaConsumeFunc := newVAAConsume(rootCtx, config, logger)
|
||||
repository := parser.NewRepository(db.Database, logger)
|
||||
|
||||
// // create a new publisher.
|
||||
publisher := pipeline.NewPublisher(logger, repository, vaaPushFunc)
|
||||
watcher := watcher.NewWatcher(rootCtx, db.Database, config.MongoDatabase, publisher.Publish, logger)
|
||||
err = watcher.Start(rootCtx)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to watch MongoDB", zap.Error(err))
|
||||
}
|
||||
|
||||
// create a consumer
|
||||
consumer := pipeline.NewConsumer(vaaConsumeFunc, repository, parserVAAAPIClient, logger)
|
||||
// create and start a consumer
|
||||
consumer := consumer.New(vaaConsumeFunc, repository, parserVAAAPIClient, logger)
|
||||
consumer.Start(rootCtx)
|
||||
|
||||
server := infrastructure.NewServer(logger, config.Port, config.PprofEnabled, config.IsQueueConsumer(), sqsConsumer, db.Database)
|
||||
|
@ -129,35 +120,16 @@ func newAwsConfig(appCtx context.Context, cfg *config.Configuration) (aws.Config
|
|||
return awsCfg, err
|
||||
}
|
||||
|
||||
// Creates two callbacks depending on whether the execution is local (memory queue) or not (SQS queue)
|
||||
func newVAAPublishAndConsume(appCtx context.Context, config *config.Configuration, logger *zap.Logger) (*sqs.Consumer, queue.VAAPushFunc, queue.VAAConsumeFunc) {
|
||||
// check is consumer type.
|
||||
if !config.IsQueueConsumer() {
|
||||
vaaQueue := queue.NewVAAInMemory()
|
||||
return nil, vaaQueue.Publish, vaaQueue.Consume
|
||||
}
|
||||
|
||||
// Creates a callbacks depending on whether the execution is local (memory queue) or not (SQS queue)
|
||||
func newVAAConsume(appCtx context.Context, config *config.Configuration, logger *zap.Logger) (*sqs.Consumer, queue.VAAConsumeFunc) {
|
||||
sqsConsumer, err := newSQSConsumer(appCtx, config)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create sqs consumer", zap.Error(err))
|
||||
}
|
||||
|
||||
sqsProducer, err := newSQSProducer(appCtx, config)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create sqs producer", zap.Error(err))
|
||||
}
|
||||
|
||||
vaaQueue := queue.NewVAASQS(sqsProducer, sqsConsumer, logger)
|
||||
return sqsConsumer, vaaQueue.Publish, vaaQueue.Consume
|
||||
}
|
||||
|
||||
func newSQSProducer(appCtx context.Context, config *config.Configuration) (*sqs.Producer, error) {
|
||||
awsConfig, err := newAwsConfig(appCtx, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sqs.NewProducer(awsConfig, config.SQSUrl)
|
||||
filterConsumeFunc := newFilterFunc(config)
|
||||
vaaQueue := queue.NewVAASQS(sqsConsumer, filterConsumeFunc, logger)
|
||||
return sqsConsumer, vaaQueue.Consume
|
||||
}
|
||||
|
||||
func newSQSConsumer(appCtx context.Context, config *config.Configuration) (*sqs.Consumer, error) {
|
||||
|
@ -170,3 +142,10 @@ func newSQSConsumer(appCtx context.Context, config *config.Configuration) (*sqs.
|
|||
sqs.WithMaxMessages(10),
|
||||
sqs.WithVisibilityTimeout(120))
|
||||
}
|
||||
|
||||
func newFilterFunc(cfg *config.Configuration) queue.FilterConsumeFunc {
|
||||
if cfg.P2pNetwork == config.P2pMainNet {
|
||||
return queue.PythFilter
|
||||
}
|
||||
return queue.NonFilter
|
||||
}
|
||||
|
|
|
@ -7,6 +7,13 @@ import (
|
|||
"github.com/sethvargo/go-envconfig"
|
||||
)
|
||||
|
||||
// p2p network constants.
|
||||
const (
|
||||
P2pMainNet = "mainnet"
|
||||
P2pTestNet = "testnet"
|
||||
P2pDevNet = "devnet"
|
||||
)
|
||||
|
||||
// Configuration represents the application configuration with the default values.
|
||||
type Configuration struct {
|
||||
Env string `env:"ENV,default=development"`
|
||||
|
@ -23,6 +30,7 @@ type Configuration struct {
|
|||
VaaPayloadParserURL string `env:"VAA_PAYLOAD_PARSER_URL, required"`
|
||||
VaaPayloadParserTimeout int64 `env:"VAA_PAYLOAD_PARSER_TIMEOUT, required"`
|
||||
PprofEnabled bool `env:"PPROF_ENABLED,default=false"`
|
||||
P2pNetwork string `env:"P2P_NETWORK,required"`
|
||||
}
|
||||
|
||||
// New creates a configuration with the values from .env file and environment variables.
|
||||
|
|
|
@ -1,16 +1,17 @@
|
|||
package pipeline
|
||||
package consumer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
|
||||
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Consumer consumer struct definition.
|
||||
type Consumer struct {
|
||||
consume queue.VAAConsumeFunc
|
||||
repository *parser.Repository
|
||||
|
@ -18,8 +19,8 @@ type Consumer struct {
|
|||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NewConsumer creates a new vaa consumer.
|
||||
func NewConsumer(consume queue.VAAConsumeFunc, repository *parser.Repository, parser parser.ParserVAAAPIClient, logger *zap.Logger) *Consumer {
|
||||
// New creates a new vaa consumer.
|
||||
func New(consume queue.VAAConsumeFunc, repository *parser.Repository, parser parser.ParserVAAAPIClient, logger *zap.Logger) *Consumer {
|
||||
return &Consumer{consume: consume, repository: repository, parser: parser, logger: logger}
|
||||
}
|
||||
|
||||
|
@ -31,24 +32,31 @@ func (c *Consumer) Start(ctx context.Context) {
|
|||
|
||||
// check id message is expired.
|
||||
if msg.IsExpired() {
|
||||
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID()))
|
||||
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID))
|
||||
msg.Failed()
|
||||
continue
|
||||
}
|
||||
|
||||
// unmarshal vaa.
|
||||
vaa, err := vaa.Unmarshal(event.Vaa)
|
||||
if err != nil {
|
||||
c.logger.Error("Invalid vaa", zap.String("id", event.ID), zap.Error(err))
|
||||
msg.Failed()
|
||||
continue
|
||||
}
|
||||
|
||||
// call vaa-payload-parser api to parse a VAA.
|
||||
sequence := strconv.FormatUint(event.Sequence, 10)
|
||||
vaaParseResponse, err := c.parser.Parse(event.ChainID, event.EmitterAddress, sequence, event.Vaa)
|
||||
vaaParseResponse, err := c.parser.Parse(event.ChainID, event.EmitterAddress, event.Sequence, vaa.Payload)
|
||||
if err != nil {
|
||||
if errors.Is(err, parser.ErrInternalError) {
|
||||
c.logger.Info("error parsing VAA, will retry later", zap.Uint16("chainID", event.ChainID),
|
||||
zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err))
|
||||
zap.String("address", event.EmitterAddress), zap.String("sequence", event.Sequence), zap.Error(err))
|
||||
msg.Failed()
|
||||
continue
|
||||
}
|
||||
|
||||
c.logger.Info("VAA cannot be parsed", zap.Uint16("chainID", event.ChainID),
|
||||
zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err))
|
||||
zap.String("address", event.EmitterAddress), zap.String("sequence", event.Sequence), zap.Error(err))
|
||||
msg.Done()
|
||||
continue
|
||||
}
|
||||
|
@ -56,10 +64,10 @@ func (c *Consumer) Start(ctx context.Context) {
|
|||
// create ParsedVaaUpdate to upsert.
|
||||
now := time.Now()
|
||||
vaaParsed := parser.ParsedVaaUpdate{
|
||||
ID: event.ID(),
|
||||
ID: event.ID,
|
||||
EmitterChain: event.ChainID,
|
||||
EmitterAddr: event.EmitterAddress,
|
||||
Sequence: strconv.FormatUint(event.Sequence, 10),
|
||||
Sequence: event.Sequence,
|
||||
AppID: vaaParseResponse.AppID,
|
||||
Result: vaaParseResponse.Result,
|
||||
UpdatedAt: &now,
|
||||
|
@ -68,13 +76,13 @@ func (c *Consumer) Start(ctx context.Context) {
|
|||
err = c.repository.UpsertParsedVaa(ctx, vaaParsed)
|
||||
if err != nil {
|
||||
c.logger.Error("Error inserting vaa in repository",
|
||||
zap.String("id", event.ID()),
|
||||
zap.String("id", event.ID),
|
||||
zap.Error(err))
|
||||
msg.Failed()
|
||||
continue
|
||||
}
|
||||
msg.Done()
|
||||
c.logger.Info("Vaa save in repository", zap.String("id", event.ID()))
|
||||
c.logger.Info("Vaa save in repository", zap.String("id", event.ID))
|
||||
}
|
||||
}()
|
||||
}
|
|
@ -1,33 +0,0 @@
|
|||
package sqs
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
aws_sqs "github.com/aws/aws-sdk-go-v2/service/sqs"
|
||||
)
|
||||
|
||||
// Producer represents SQS producer.
|
||||
type Producer struct {
|
||||
api *aws_sqs.Client
|
||||
url string
|
||||
}
|
||||
|
||||
func NewProducer(awsConfig aws.Config, url string) (*Producer, error) {
|
||||
return &Producer{
|
||||
api: aws_sqs.NewFromConfig(awsConfig),
|
||||
url: url,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SendMessage sends messages to SQS.
|
||||
func (p *Producer) SendMessage(ctx context.Context, groupID, deduplicationID, body string) error {
|
||||
_, err := p.api.SendMessage(ctx,
|
||||
&aws_sqs.SendMessageInput{
|
||||
MessageGroupId: aws.String(groupID),
|
||||
MessageDeduplicationId: aws.String(deduplicationID),
|
||||
MessageBody: aws.String(body),
|
||||
QueueUrl: aws.String(p.url),
|
||||
})
|
||||
return err
|
||||
}
|
|
@ -1,49 +0,0 @@
|
|||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/watcher"
|
||||
"github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Publisher definition.
|
||||
type Publisher struct {
|
||||
logger *zap.Logger
|
||||
repository *parser.Repository
|
||||
pushFunc queue.VAAPushFunc
|
||||
}
|
||||
|
||||
// NewPublisher creates a new publisher for vaa with parse configuration.
|
||||
func NewPublisher(logger *zap.Logger, repository *parser.Repository, pushFunc queue.VAAPushFunc) *Publisher {
|
||||
return &Publisher{logger: logger, repository: repository, pushFunc: pushFunc}
|
||||
}
|
||||
|
||||
// Publish sends a VaaEvent for the vaa that has parse configuration defined.
|
||||
func (p *Publisher) Publish(ctx context.Context, e *watcher.Event) {
|
||||
// deserializes the binary representation of a VAA
|
||||
vaa, err := vaa.Unmarshal(e.Vaas)
|
||||
if err != nil {
|
||||
p.logger.Error("error Unmarshal vaa", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
// V2 Get chainID/address that have parser function defined and add to sqs only that vaa.
|
||||
|
||||
// create a VaaEvent.
|
||||
event := queue.VaaEvent{
|
||||
ChainID: uint16(vaa.EmitterChain),
|
||||
EmitterAddress: vaa.EmitterAddress.String(),
|
||||
Sequence: vaa.Sequence,
|
||||
Vaa: vaa.Payload,
|
||||
}
|
||||
|
||||
// push messages to queue.
|
||||
err = p.pushFunc(ctx, &event)
|
||||
if err != nil {
|
||||
p.logger.Error("can not push event to queue", zap.Error(err), zap.String("event", event.ID()))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package queue
|
||||
|
||||
import "github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||
|
||||
// PythFilter filter vaa event from pyth chain.
|
||||
func PythFilter(vaaEvent *VaaEvent) bool {
|
||||
return vaaEvent.ChainID == uint16(vaa.ChainIDPythNet)
|
||||
}
|
||||
|
||||
// NonFilter non filter vaa evant.
|
||||
func NonFilter(vaaEvent *VaaEvent) bool {
|
||||
return false
|
||||
}
|
|
@ -2,15 +2,28 @@ package queue
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
type sqsEvent struct {
|
||||
MessageID string `json:"MessageId"`
|
||||
Message string `json:"Message"`
|
||||
}
|
||||
|
||||
// VaaEvent represents a vaa data to be handle by the pipeline.
|
||||
type VaaEvent struct {
|
||||
ChainID uint16 `json:"chainId"`
|
||||
EmitterAddress string `json:"emitter"`
|
||||
Sequence uint64 `json:"sequence"`
|
||||
Vaa []byte `json:"vaa"`
|
||||
ID string `json:"id"`
|
||||
ChainID uint16 `json:"emitterChain"`
|
||||
EmitterAddress string `json:"emitterAddr"`
|
||||
Sequence string `json:"sequence"`
|
||||
GuardianSetIndex uint32 `json:"guardianSetIndex"`
|
||||
Vaa []byte `json:"vaas"`
|
||||
IndexedAt time.Time `json:"indexedAt"`
|
||||
Timestamp *time.Time `json:"timestamp"`
|
||||
UpdatedAt *time.Time `json:"updatedAt"`
|
||||
TxHash string `json:"txHash"`
|
||||
Version uint16 `json:"version"`
|
||||
Revision uint16 `json:"revision"`
|
||||
}
|
||||
|
||||
// ConsumerMessage defition.
|
||||
|
@ -21,13 +34,5 @@ type ConsumerMessage interface {
|
|||
IsExpired() bool
|
||||
}
|
||||
|
||||
// ID get vaa ID (chainID/emiiterAddress/sequence)
|
||||
func (v *VaaEvent) ID() string {
|
||||
return fmt.Sprintf("%d/%s/%d", v.ChainID, v.EmitterAddress, v.Sequence)
|
||||
}
|
||||
|
||||
// VAAPushFunc is a function to push VAAEvent.
|
||||
type VAAPushFunc func(context.Context, *VaaEvent) error
|
||||
|
||||
// VAAConsumeFunc is a function to consume VAAEvent.
|
||||
type VAAConsumeFunc func(context.Context) <-chan ConsumerMessage
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// VAAInMemoryOption represents a VAA queue in memory option function.
|
||||
type VAAInMemoryOption func(*VAAInMemory)
|
||||
|
||||
// VAAInMemory represents VAA queue in memory.
|
||||
type VAAInMemory struct {
|
||||
ch chan ConsumerMessage
|
||||
size int
|
||||
}
|
||||
|
||||
// NewVAAInMemory creates a VAA queue in memory instances.
|
||||
func NewVAAInMemory(opts ...VAAInMemoryOption) *VAAInMemory {
|
||||
m := &VAAInMemory{size: 100}
|
||||
for _, opt := range opts {
|
||||
opt(m)
|
||||
}
|
||||
m.ch = make(chan ConsumerMessage, m.size)
|
||||
return m
|
||||
}
|
||||
|
||||
// WithSize allows to specify an channel size when setting a value.
|
||||
func WithSize(v int) VAAInMemoryOption {
|
||||
return func(i *VAAInMemory) {
|
||||
i.size = v
|
||||
}
|
||||
}
|
||||
|
||||
// Publish sends the message to a channel.
|
||||
func (i *VAAInMemory) Publish(_ context.Context, message *VaaEvent) error {
|
||||
i.ch <- &memoryConsumerMessage{
|
||||
data: message,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Consume returns the channel with the received messages.
|
||||
func (i *VAAInMemory) Consume(_ context.Context) <-chan ConsumerMessage {
|
||||
return i.ch
|
||||
}
|
||||
|
||||
type memoryConsumerMessage struct {
|
||||
data *VaaEvent
|
||||
}
|
||||
|
||||
func (m *memoryConsumerMessage) Data() *VaaEvent {
|
||||
return m.data
|
||||
}
|
||||
|
||||
func (m *memoryConsumerMessage) Done() {}
|
||||
|
||||
func (m *memoryConsumerMessage) Failed() {}
|
||||
|
||||
func (m *memoryConsumerMessage) IsExpired() bool {
|
||||
return false
|
||||
}
|
|
@ -3,7 +3,6 @@ package queue
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -16,21 +15,24 @@ type SQSOption func(*SQS)
|
|||
|
||||
// SQS represents a VAA queue in SQS.
|
||||
type SQS struct {
|
||||
producer *sqs.Producer
|
||||
consumer *sqs.Consumer
|
||||
ch chan ConsumerMessage
|
||||
chSize int
|
||||
wg sync.WaitGroup
|
||||
logger *zap.Logger
|
||||
consumer *sqs.Consumer
|
||||
ch chan ConsumerMessage
|
||||
chSize int
|
||||
wg sync.WaitGroup
|
||||
filterConsume FilterConsumeFunc
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// FilterConsumeFunc filter vaaa func definition.
|
||||
type FilterConsumeFunc func(vaaEvent *VaaEvent) bool
|
||||
|
||||
// NewVAASQS creates a VAA queue in SQS instances.
|
||||
func NewVAASQS(producer *sqs.Producer, consumer *sqs.Consumer, logger *zap.Logger, opts ...SQSOption) *SQS {
|
||||
func NewVAASQS(consumer *sqs.Consumer, filterConsume FilterConsumeFunc, logger *zap.Logger, opts ...SQSOption) *SQS {
|
||||
s := &SQS{
|
||||
producer: producer,
|
||||
consumer: consumer,
|
||||
chSize: 10,
|
||||
logger: logger}
|
||||
consumer: consumer,
|
||||
chSize: 10,
|
||||
filterConsume: filterConsume,
|
||||
logger: logger}
|
||||
for _, opt := range opts {
|
||||
opt(s)
|
||||
}
|
||||
|
@ -45,17 +47,6 @@ func WithChannelSize(size int) SQSOption {
|
|||
}
|
||||
}
|
||||
|
||||
// Publish sends the message to a SQS queue.
|
||||
func (q *SQS) Publish(ctx context.Context, message *VaaEvent) error {
|
||||
body, err := json.Marshal(message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
groupID := fmt.Sprintf("%d/%s", message.ChainID, message.EmitterAddress)
|
||||
deduplicationID := fmt.Sprintf("%d/%s/%d", message.ChainID, message.EmitterAddress, message.Sequence)
|
||||
return q.producer.SendMessage(ctx, groupID, deduplicationID, string(body))
|
||||
}
|
||||
|
||||
// Consume returns the channel with the received messages from SQS queue.
|
||||
func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
|
||||
go func() {
|
||||
|
@ -67,16 +58,34 @@ func (q *SQS) Consume(ctx context.Context) <-chan ConsumerMessage {
|
|||
}
|
||||
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
|
||||
for _, msg := range messages {
|
||||
var body VaaEvent
|
||||
err := json.Unmarshal([]byte(*msg.Body), &body)
|
||||
// unmarshal body to sqsEvent
|
||||
var sqsEvent sqsEvent
|
||||
err := json.Unmarshal([]byte(*msg.Body), &sqsEvent)
|
||||
if err != nil {
|
||||
q.logger.Error("Error decoding message from SQS", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// unmarshal message to vaaEvent
|
||||
var vaaEvent VaaEvent
|
||||
err = json.Unmarshal([]byte(sqsEvent.Message), &vaaEvent)
|
||||
if err != nil {
|
||||
q.logger.Error("Error decoding vaaEvent message from SQSEvent", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// filter vaaEvent by p2p net.
|
||||
if q.filterConsume(&vaaEvent) {
|
||||
if err := q.consumer.DeleteMessage(ctx, msg.ReceiptHandle); err != nil {
|
||||
q.logger.Error("Error deleting message from SQS", zap.Error(err))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
q.wg.Add(1)
|
||||
q.ch <- &sqsConsumerMessage{
|
||||
id: msg.ReceiptHandle,
|
||||
data: &body,
|
||||
data: &vaaEvent,
|
||||
wg: &q.wg,
|
||||
logger: q.logger,
|
||||
consumer: q.consumer,
|
||||
|
|
|
@ -1,86 +0,0 @@
|
|||
package watcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Watcher represents a listener of database changes.
|
||||
type Watcher struct {
|
||||
db *mongo.Database
|
||||
dbName string
|
||||
handler WatcherFunc
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// WatcherFunc is a function to send database changes.
|
||||
type WatcherFunc func(context.Context, *Event)
|
||||
|
||||
type watchEvent struct {
|
||||
DocumentKey documentKey `bson:"documentKey"`
|
||||
OperationType string `bson:"operationType"`
|
||||
DbFullDocument Event `bson:"fullDocument"`
|
||||
}
|
||||
type documentKey struct {
|
||||
ID string `bson:"_id"`
|
||||
}
|
||||
|
||||
// Event represents a database change.
|
||||
type Event struct {
|
||||
ID string `bson:"_id"`
|
||||
Vaas []byte
|
||||
}
|
||||
|
||||
const queryTemplate = `
|
||||
[
|
||||
{
|
||||
"$match" : {
|
||||
"operationType" : "insert",
|
||||
"ns": { "$in": [{"db": "%s", "coll": "vaas"}] }
|
||||
}
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
// NewWatcher creates a new database event watcher.
|
||||
func NewWatcher(ctx context.Context, db *mongo.Database, dbName string, handler WatcherFunc, logger *zap.Logger) *Watcher {
|
||||
return &Watcher{
|
||||
db: db,
|
||||
dbName: dbName,
|
||||
handler: handler,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Start executes database event consumption.
|
||||
func (w *Watcher) Start(ctx context.Context) error {
|
||||
query := fmt.Sprintf(queryTemplate, w.dbName)
|
||||
var steps []bson.D
|
||||
err := bson.UnmarshalExtJSON([]byte(query), true, &steps)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stream, err := w.db.Watch(ctx, steps)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
for stream.Next(ctx) {
|
||||
var e watchEvent
|
||||
if err := stream.Decode(&e); err != nil {
|
||||
w.logger.Error("Error unmarshalling event", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
w.handler(ctx, &Event{
|
||||
ID: e.DbFullDocument.ID,
|
||||
Vaas: e.DbFullDocument.Vaas,
|
||||
})
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue