Feature/add parser pipeline (#46)

* Add pipeline to parse vaa

* Add parser repository and start building pipeline publisher

* Add queue sqs and in-memory to pipeline

* Add ses publisher and queue sqs/memory

* Fix watcher query

* Fix type to marshall/unsmarshall chainID and emitterAddress

* Fix js execution is run many times

* Add comments

* Add comments and improvements

* Change VAA Parser implementation

* Fix and change integration-test vaa-payload-parser

* fix appID mongo field name

* Fix guardianSet version 3, add xlabs to position 8

Co-authored-by: Agustin Pazos <agpazos85@gmail.com>
This commit is contained in:
ftocal 2023-01-18 12:42:14 -03:00 committed by GitHub
parent b5e5ce73c5
commit 303b731d60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1582 additions and 4 deletions

View File

@ -8,7 +8,7 @@ import (
)
var ByIndex = []common.GuardianSet{gs0, gs1, gs2, gs3}
var ExpirationTimeByIndex = []time.Time{gs0ValidUntil, gs1ValidUntil, gs2ValidUntil}
var ExpirationTimeByIndex = []time.Time{gs0ValidUntil, gs1ValidUntil, gs2ValidUntil, gs3ValidUntil}
func IsValid(gsIx uint32, t time.Time) bool {
if gsIx < 0 || int(gsIx) > len(ByIndex) {
@ -97,6 +97,7 @@ var gs3 = common.GuardianSet{
eth_common.HexToAddress("0x8C82B2fd82FaeD2711d59AF0F2499D16e726f6b2"), // Inotel
eth_common.HexToAddress("0x11b39756C042441BE6D8650b69b54EbE715E2343"), // HashQuark
eth_common.HexToAddress("0x54Ce5B4D348fb74B958e8966e2ec3dBd4958a7cd"), // ChainLayer
eth_common.HexToAddress("0x15e7cAF07C4e3DC8e7C469f92C8Cd88FB8005a20"), // xLabs
eth_common.HexToAddress("0x74a3bf913953D695260D88BC1aA25A4eeE363ef0"), // Forbole
eth_common.HexToAddress("0x000aC0076727b35FBea2dAc28fEE5cCB0fEA768e"), // Staking Fund
eth_common.HexToAddress("0xAF45Ced136b9D9e24903464AE889F5C8a723FC14"), // MoonletWallet
@ -108,6 +109,5 @@ var gs3 = common.GuardianSet{
eth_common.HexToAddress("0x178e21ad2E77AE06711549CFBB1f9c7a9d8096e8"), // syncnode
eth_common.HexToAddress("0x5E1487F35515d02A92753504a8D75471b9f49EdB"), // Triton
eth_common.HexToAddress("0x6FbEBc898F403E4773E95feB15E80C9A99c8348d"), // Staking Facilities
eth_common.HexToAddress("0x15e7cAF07C4e3DC8e7C469f92C8Cd88FB8005a20"), // xLabs
},
}

View File

@ -10,7 +10,7 @@ import (
)
var ByIndex = []common.GuardianSet{gs0, gs1, gs2, gs3}
var ExpirationTimeByIndex = []time.Time{gs0ValidUntil, gs1ValidUntil, gs2ValidUntil}
var ExpirationTimeByIndex = []time.Time{gs0ValidUntil, gs1ValidUntil, gs2ValidUntil, gs3ValidUntil}
func IsValid(gsIx uint32, t time.Time) bool {
if gsIx < 0 || int(gsIx) > len(ByIndex) {
@ -99,6 +99,7 @@ var gs3 = common.GuardianSet{
eth_common.HexToAddress("0x8C82B2fd82FaeD2711d59AF0F2499D16e726f6b2"), // Inotel
eth_common.HexToAddress("0x11b39756C042441BE6D8650b69b54EbE715E2343"), // HashQuark
eth_common.HexToAddress("0x54Ce5B4D348fb74B958e8966e2ec3dBd4958a7cd"), // ChainLayer
eth_common.HexToAddress("0x15e7cAF07C4e3DC8e7C469f92C8Cd88FB8005a20"), // xLabs
eth_common.HexToAddress("0x74a3bf913953D695260D88BC1aA25A4eeE363ef0"), // Forbole
eth_common.HexToAddress("0x000aC0076727b35FBea2dAc28fEE5cCB0fEA768e"), // Staking Fund
eth_common.HexToAddress("0xAF45Ced136b9D9e24903464AE889F5C8a723FC14"), // MoonletWallet
@ -110,6 +111,6 @@ var gs3 = common.GuardianSet{
eth_common.HexToAddress("0x178e21ad2E77AE06711549CFBB1f9c7a9d8096e8"), // syncnode
eth_common.HexToAddress("0x5E1487F35515d02A92753504a8D75471b9f49EdB"), // Triton
eth_common.HexToAddress("0x6FbEBc898F403E4773E95feB15E80C9A99c8348d"), // Staking Facilities
eth_common.HexToAddress("0x15e7cAF07C4e3DC8e7C469f92C8Cd88FB8005a20"), // xLabs
},
}

4
parser/.env.example Normal file
View File

@ -0,0 +1,4 @@
MONGODB_URI=
MONGODB_DATABASE=
VAA_PAYLOAD_PARSER_URL=
VAA_PAYLOAD_PARSER_TIMEOUT=

1
parser/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.env

20
parser/Dockerfile Normal file
View File

@ -0,0 +1,20 @@
# syntax=docker.io/docker/dockerfile:1.3@sha256:42399d4635eddd7a9b8a24be879d2f9a930d0ed040a61324cfdf59ef1357b3b2
FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a2dae4a09aa13d0aa56e6d23c4ec2b1e4faacf86a813 AS build
WORKDIR /app
COPY . .
# Build the Go app
RUN CGO_ENABLED=0 GOOS=linux go build -o "./parser" cmd/main.go
############################
# STEP 2 build a small image
############################
FROM alpine
#Copy certificates
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy our static executable.
COPY --from=build "/app/parser" "/parser"
# Run the binary.
ENTRYPOINT ["/parser"]

11
parser/README.md Normal file
View File

@ -0,0 +1,11 @@
# Parser
## Config SQS FIFO with dead letter queue localstack
aws --profile localstack --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=wormhole-vaa-parser-dlq-queue.fifo --attributes "FifoQueue=true"
aws --profile localstack --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=wormhole-vaa-parser-queue.fifo --attributes FifoQueue=true,MessageRetentionPeriod=3600,ReceiveMessageWaitTimeSeconds=5,VisibilityTimeout=20,RedrivePolicy="\"{\\\"deadLetterTargetArn\\\":\\\"arn:aws:sqs:us-east-1:000000000000:wormhole-vaa-parser-dlq-queue.fifo\\\",\\\"maxReceiveCount\\\":\\\"2\\\"}\""
## Check message in the dead letter queue localstack
aws --profile localstack --endpoint-url=http://localhost:4566 sqs receive-message --queue-url=http://localhost:4566/000000000000/wormhole-vaa-parser-dlq-queue.fifo

160
parser/cmd/main.go Normal file
View File

@ -0,0 +1,160 @@
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/wormhole-foundation/wormhole-explorer/parser/config"
"github.com/wormhole-foundation/wormhole-explorer/parser/http/infraestructure"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
"github.com/wormhole-foundation/wormhole-explorer/parser/pipeline"
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
"github.com/wormhole-foundation/wormhole-explorer/parser/watcher"
"go.uber.org/zap"
)
type exitCode int
func handleExit() {
if r := recover(); r != nil {
if e, ok := r.(exitCode); ok {
os.Exit(int(e))
}
panic(r) // not an Exit, bubble up
}
}
func main() {
defer handleExit()
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
config, err := config.New(rootCtx)
if err != nil {
log.Fatal("Error creating config", err)
}
level, err := ipfslog.LevelFromString(config.LogLevel)
if err != nil {
log.Fatal("Invalid log level", err)
}
logger := ipfslog.Logger("wormhole-explorer-parser").Desugar()
ipfslog.SetAllLoggers(level)
logger.Info("Starting wormhole-explorer-parser ...")
//setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
parserVAAAPIClient, err := parser.NewParserVAAAPIClient(config.VaaPayloadParserTimeout,
config.VaaPayloadParserURL, logger)
if err != nil {
logger.Fatal("failed to create parse vaa api client")
}
// get publish function.
sqsConsumer, vaaPushFunc, vaaConsumeFunc := newVAAPublishAndConsume(config, logger)
repository := parser.NewRepository(db.Database, logger)
// // create a new publisher.
publisher := pipeline.NewPublisher(logger, repository, vaaPushFunc)
watcher := watcher.NewWatcher(db.Database, config.MongoDatabase, publisher.Publish, logger)
err = watcher.Start(rootCtx)
if err != nil {
logger.Fatal("failed to watch MongoDB", zap.Error(err))
}
// create a consumer
consumer := pipeline.NewConsumer(vaaConsumeFunc, repository, parserVAAAPIClient, logger)
consumer.Start(rootCtx)
server := infraestructure.NewServer(logger, config.Port, config.IsQueueConsumer(), sqsConsumer, db.Database)
server.Start()
logger.Info("Started wormhole-explorer-parser")
// Waiting for signal
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rootCtx.Done():
logger.Warn("Terminating with root context cancelled.")
case signal := <-sigterm:
logger.Info("Terminating with signal.", zap.String("signal", signal.String()))
}
logger.Info("root context cancelled, exiting...")
rootCtxCancel()
logger.Info("Closing database connections ...")
db.Close()
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-parser")
}
func newAwsSession(cfg *config.Configuration) (*session.Session, error) {
region := cfg.AwsRegion
config := aws.NewConfig().WithRegion(region)
if cfg.AwsAccessKeyID != "" && cfg.AwsSecretAccessKey != "" {
config.WithCredentials(credentials.NewStaticCredentials(cfg.AwsAccessKeyID, cfg.AwsSecretAccessKey, ""))
}
if cfg.AwsEndpoint != "" {
config.WithEndpoint(cfg.AwsEndpoint)
}
return session.NewSession(config)
}
// Creates two callbacks depending on whether the execution is local (memory queue) or not (SQS queue)
func newVAAPublishAndConsume(config *config.Configuration, logger *zap.Logger) (*sqs.Consumer, queue.VAAPushFunc, queue.VAAConsumeFunc) {
// check is consumer type.
if !config.IsQueueConsumer() {
vaaQueue := queue.NewVAAInMemory()
return nil, vaaQueue.Publish, vaaQueue.Consume
}
sqsConsumer, err := newSQSConsumer(config)
if err != nil {
logger.Fatal("failed to create sqs consumer", zap.Error(err))
}
sqsProducer, err := newSQSProducer(config)
if err != nil {
logger.Fatal("failed to create sqs producer", zap.Error(err))
}
vaaQueue := queue.NewVAASQS(sqsProducer, sqsConsumer, logger)
return sqsConsumer, vaaQueue.Publish, vaaQueue.Consume
}
func newSQSProducer(config *config.Configuration) (*sqs.Producer, error) {
session, err := newAwsSession(config)
if err != nil {
return nil, err
}
return sqs.NewProducer(session, config.SQSUrl)
}
func newSQSConsumer(config *config.Configuration) (*sqs.Consumer, error) {
session, err := newAwsSession(config)
if err != nil {
return nil, err
}
return sqs.NewConsumer(session, config.SQSUrl,
sqs.WithMaxMessages(10),
sqs.WithVisibilityTimeout(120))
}

42
parser/config/config.go Normal file
View File

@ -0,0 +1,42 @@
package config
import (
"context"
"github.com/joho/godotenv"
"github.com/sethvargo/go-envconfig"
)
// Configuration represents the application configuration with the default values.
type Configuration struct {
Env string `env:"ENV,default=development"`
LogLevel string `env:"LOG_LEVEL,default=INFO"`
Port string `env:"PORT,default=8000"`
ConsumerMode string `env:"CONSUMER_MODE,default=QUEUE"`
MongoURI string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
AwsEndpoint string `env:"AWS_ENDPOINT"`
AwsAccessKeyID string `env:"AWS_ACCESS_KEY_ID"`
AwsSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"`
AwsRegion string `env:"AWS_REGION"`
SQSUrl string `env:"SQS_URL"`
VaaPayloadParserURL string `env:"VAA_PAYLOAD_PARSER_URL, required"`
VaaPayloadParserTimeout int64 `env:"VAA_PAYLOAD_PARSER_TIMEOUT, required"`
}
// New creates a configuration with the values from .env file and environment variables.
func New(ctx context.Context) (*Configuration, error) {
_ = godotenv.Load(".env", "../.env")
var configuration Configuration
if err := envconfig.Process(ctx, &configuration); err != nil {
return nil, err
}
return &configuration, nil
}
// IsQueueConsumer check if consumer mode is QUEUE.
func (c *Configuration) IsQueueConsumer() bool {
return c.ConsumerMode == "QUEUE"
}

53
parser/go.mod Normal file
View File

@ -0,0 +1,53 @@
module github.com/wormhole-foundation/wormhole-explorer/parser
go 1.19
require (
github.com/aws/aws-sdk-go v1.44.161
github.com/gofiber/fiber/v2 v2.40.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/joho/godotenv v1.4.0 // Configuration environment
github.com/pkg/errors v0.9.1
github.com/sethvargo/go-envconfig v0.6.0 // Configuration environment
github.com/stretchr/testify v1.8.1 // indirect; Testing
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20221118153622-cddfe74b6787
go.mongodb.org/mongo-driver v1.11.0
go.uber.org/zap v1.23.0
)
require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/ethereum/go-ethereum v1.10.21 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.41.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.1 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
)
// Needed for cosmos-sdk based chains. See
// https://github.com/cosmos/cosmos-sdk/issues/10925 for more details.
replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

177
parser/go.sum Normal file
View File

@ -0,0 +1,177 @@
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/aws/aws-sdk-go v1.44.161 h1:uZdZJ30mlbaU2wsrd/wzibrX01cbgKE2t486TtRjeHs=
github.com/aws/aws-sdk-go v1.44.161/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc=
github.com/ethereum/go-ethereum v1.10.21 h1:5lqsEx92ZaZzRyOqBEXux4/UR06m296RGzN3ol3teJY=
github.com/ethereum/go-ethereum v1.10.21/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg=
github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pIl4=
github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY=
github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg=
github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/sethvargo/go-envconfig v0.6.0 h1:GxxdoeiNpWgGiVEphNFNObgMYRN/ZvI2dN7rBwadyss=
github.com/sethvargo/go-envconfig v0.6.0/go.mod h1:00S1FAhRUuTNJazWBWcJGvEHOM+NO6DhoRMAOX7FY5o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY=
github.com/valyala/fasthttp v1.41.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20221118153622-cddfe74b6787 h1:DTlEqjjlMddN3py3sGtuqOOtxvhXiFIH9N9nhawC7t4=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20221118153622-cddfe74b6787/go.mod h1:Vg7Cbb370S+JihB+of1rWm9Aaxzf0GPPvKszPeSb7AE=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE=
go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,33 @@
package infraestructure
import "github.com/gofiber/fiber/v2"
// Controller definition.
type Controller struct {
srv *Service
}
// NewController creates a Controller instance.
func NewController(serv *Service) *Controller {
return &Controller{srv: serv}
}
// HealthCheck handler for the endpoint /health.
func (c *Controller) HealthCheck(ctx *fiber.Ctx) error {
return ctx.JSON(struct {
Status string `json:"status"`
}{Status: "OK"})
}
// ReadyCheck handler for the endpoint /ready.
func (c *Controller) ReadyCheck(ctx *fiber.Ctx) error {
ready, _ := c.srv.CheckIsReady(ctx.Context())
if ready {
return ctx.Status(fiber.StatusOK).JSON(struct {
Ready string `json:"ready"`
}{Ready: "OK"})
}
return ctx.Status(fiber.StatusInternalServerError).JSON(struct {
Ready string `json:"ready"`
}{Ready: "NO"})
}

View File

@ -0,0 +1,19 @@
package infraestructure
// MongoStatus represent a mongo server status.
type MongoStatus struct {
Ok int32 `bson:"ok"`
Host string `bson:"host"`
Version string `bson:"version"`
Process string `bson:"process"`
Pid int32 `bson:"pid"`
Uptime int32 `bson:"uptime"`
Connections *MongoConnections `bson:"connections"`
}
// MongoConnections represents a mongo server connection.
type MongoConnections struct {
Current int32 `bson:"current"`
Available int32 `bson:"available"`
TotalCreated int32 `bson:"totalCreated"`
}

View File

@ -0,0 +1,46 @@
package infraestructure
import (
"context"
"fmt"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// Repository definition.
type Repository struct {
db *mongo.Database
logger *zap.Logger
}
// NewRepository create a new Repository instance.
func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository {
return &Repository{db: db,
logger: logger.With(zap.String("module", "InfraestructureRepository")),
}
}
// GetMongoStatus get mongo server status.
func (r *Repository) GetMongoStatus(ctx context.Context) (*MongoStatus, error) {
command := bson.D{{Key: "serverStatus", Value: 1}}
result := r.db.RunCommand(ctx, command)
if result.Err() != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed execute command mongo serverStatus",
zap.Error(result.Err()), zap.String("requestID", requestID))
return nil, errors.WithStack(result.Err())
}
var mongoStatus MongoStatus
err := result.Decode(&mongoStatus)
if err != nil {
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed decoding cursor to *MongoStatus", zap.Error(err),
zap.String("requestID", requestID))
return nil, errors.WithStack(err)
}
return &mongoStatus, nil
}

View File

@ -0,0 +1,45 @@
package infraestructure
import (
"github.com/gofiber/fiber/v2"
fiberLog "github.com/gofiber/fiber/v2/middleware/logger"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
type Server struct {
app *fiber.App
port string
logger *zap.Logger
}
func NewServer(logger *zap.Logger, port string, isQueueConsumer bool, consumer *sqs.Consumer, db *mongo.Database) *Server {
repository := NewRepository(db, logger)
service := NewService(repository, consumer, isQueueConsumer, logger)
ctrl := NewController(service)
app := fiber.New()
app.Use(fiberLog.New(fiberLog.Config{
Format: "level=info timestamp=${time} method=${method} path=${path} status${status} request_id=${locals:requestid}\n",
}))
api := app.Group("/api")
api.Get("/health", ctrl.HealthCheck)
api.Get("/ready", ctrl.ReadyCheck)
return &Server{
app: app,
port: port,
logger: logger,
}
}
// Start listen serves HTTP requests from addr.
func (s *Server) Start() {
go func() {
s.app.Listen(":" + s.port)
}()
}
// Stop gracefull server.
func (s *Server) Stop() {
_ = s.app.Shutdown()
}

View File

@ -0,0 +1,83 @@
package infraestructure
import (
"context"
"errors"
"fmt"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
"go.uber.org/zap"
)
// Service definition.
type Service struct {
repo *Repository
consumer *sqs.Consumer
isQueueConsumer bool
logger *zap.Logger
}
// NewService create a new Service instance.
func NewService(dao *Repository, consumer *sqs.Consumer, isQueueConsumer bool, logger *zap.Logger) *Service {
return &Service{repo: dao, consumer: consumer, isQueueConsumer: isQueueConsumer, logger: logger.With(zap.String("module", "Infraestructureervice"))}
}
// CheckIsReady check if the service is ready.
func (s *Service) CheckIsReady(ctx context.Context) (bool, error) {
// check if mongodb is ready
isMongoReady, err := s.CheckMongoServerStatus(ctx)
if err != nil {
return false, err
}
// check if aws sqs is ready
isAwsSQSReady, err := s.CheckAwsSQS(ctx)
if err != nil {
return false, err
}
if !(isMongoReady && isAwsSQSReady) {
return false, errors.New("error services not ready")
}
return true, nil
}
// CheckMongoServerStatus check mongodb status.
func (s *Service) CheckMongoServerStatus(ctx context.Context) (bool, error) {
mongoStatus, err := s.repo.GetMongoStatus(ctx)
if err != nil {
return false, err
}
// check mongo server status
mongoStatusCheck := (mongoStatus.Ok == 1 && mongoStatus.Pid > 0 && mongoStatus.Uptime > 0)
if !mongoStatusCheck {
return false, fmt.Errorf("mongo server not ready (Ok = %v, Pid = %v, Uptime = %v)", mongoStatus.Ok, mongoStatus.Pid, mongoStatus.Uptime)
}
// check mongo connections
if mongoStatus.Connections.Available <= 0 {
return false, fmt.Errorf("mongo server without available connections (availableConection = %v)", mongoStatus.Connections.Available)
}
return true, nil
}
// CheckAwsSQS check aws sqs status.
func (s *Service) CheckAwsSQS(ctx context.Context) (bool, error) {
// vaa queue handle in memory [local enviroment]
if !s.isQueueConsumer {
return true, nil
}
// get queue attributes
queueAttributes, err := s.consumer.GetQueueAttributes()
if err != nil || queueAttributes == nil {
return false, err
}
// check queue created
createdTimestamp := queueAttributes.Attributes["CreatedTimestamp"]
if createdTimestamp == nil {
return false, errors.New("error createdTimestamp attributes does not exist")
}
return *createdTimestamp != "", nil
}

31
parser/internal/db/db.go Normal file
View File

@ -0,0 +1,31 @@
package db
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// Database definition.
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// New connects to DB and returns a client that will disconnect when the passed in context is cancelled.
func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
// Close closes the database connections.
func (d *Database) Close() error {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
return d.client.Disconnect(ctx)
}

View File

@ -0,0 +1,110 @@
package sqs
import (
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
aws_sqs "github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
)
// ConsumerOption represents a consumer option function.
type ConsumerOption func(*Consumer)
// Consumer represents SQS consumer.
type Consumer struct {
api sqsiface.SQSAPI
url string
maxMessages *int64
visibilityTimeout *int64
waitTimeSeconds *int64
}
// New instances of a Consumer to consume SQS messages.
func NewConsumer(sess *session.Session, url string, opts ...ConsumerOption) (*Consumer, error) {
consumer := &Consumer{
api: aws_sqs.New(sess),
url: url,
maxMessages: aws.Int64(10),
visibilityTimeout: aws.Int64(60),
waitTimeSeconds: aws.Int64(20),
}
for _, opt := range opts {
opt(consumer)
}
return consumer, nil
}
// WithMaxMessages allows to specify an maximum number of messages to return when setting a value.
func WithMaxMessages(v int64) ConsumerOption {
return func(c *Consumer) {
c.maxMessages = aws.Int64(v)
}
}
// WithVisibilityTimeout allows to specify a visibility timeout when setting a value.
func WithVisibilityTimeout(v int64) ConsumerOption {
return func(c *Consumer) {
c.visibilityTimeout = aws.Int64(v)
}
}
// WithWaitTimeSeconds allows to specify a wait time when setting a value.
func WithWaitTimeSeconds(v int64) ConsumerOption {
return func(c *Consumer) {
c.waitTimeSeconds = aws.Int64(v)
}
}
// GetMessages retrieves messages from SQS.
func (c *Consumer) GetMessages() ([]*aws_sqs.Message, error) {
params := &aws_sqs.ReceiveMessageInput{
QueueUrl: aws.String(c.url),
MaxNumberOfMessages: c.maxMessages,
AttributeNames: []*string{
aws.String("All"),
},
MessageAttributeNames: []*string{
aws.String("All"),
},
WaitTimeSeconds: c.waitTimeSeconds,
VisibilityTimeout: c.visibilityTimeout,
}
res, err := c.api.ReceiveMessage(params)
if err != nil {
return nil, err
}
return res.Messages, nil
}
// DeleteMessage deletes messages from SQS.
func (c *Consumer) DeleteMessage(msg *aws_sqs.Message) error {
params := &aws_sqs.DeleteMessageInput{
QueueUrl: aws.String(c.url),
ReceiptHandle: msg.ReceiptHandle,
}
_, err := c.api.DeleteMessage(params)
return err
}
// GetVisibilityTimeout returns visibility timeout.
func (c *Consumer) GetVisibilityTimeout() time.Duration {
return time.Duration(*c.visibilityTimeout * int64(time.Second))
}
// GetQueueAttributes get queue attributes.
func (c *Consumer) GetQueueAttributes() (*aws_sqs.GetQueueAttributesOutput, error) {
params := &aws_sqs.GetQueueAttributesInput{
QueueUrl: aws.String(c.url),
AttributeNames: []*string{
aws.String("CreatedTimestamp"),
},
}
return c.api.GetQueueAttributes(params)
}

View File

@ -0,0 +1,34 @@
package sqs
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
aws_sqs "github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
)
// Producer represents SQS producer.
type Producer struct {
api sqsiface.SQSAPI
url string
}
// NewProducer create a new instance of Producer.
func NewProducer(sess *session.Session, url string) (*Producer, error) {
return &Producer{
api: aws_sqs.New(sess),
url: url,
}, nil
}
// SendMessage sends messages to SQS.
func (p *Producer) SendMessage(groupID, deduplicationID, body string) error {
_, err := p.api.SendMessage(
&aws_sqs.SendMessageInput{
MessageGroupId: aws.String(groupID),
MessageDeduplicationId: aws.String(deduplicationID),
MessageBody: aws.String(body),
QueueUrl: aws.String(p.url),
})
return err
}

16
parser/parser/model.go Normal file
View File

@ -0,0 +1,16 @@
package parser
import (
"time"
)
// ParsedVaaUpdate representa a parsedVaa document.
type ParsedVaaUpdate struct {
ID string `bson:"_id"`
EmitterChain uint16 `bson:"emitterChain"`
EmitterAddr string `bson:"emitterAddr"`
Sequence string `bson:"sequence"`
AppID string `bson:"appId"`
Result interface{} `bson:"result"`
UpdatedAt *time.Time `bson:"updatedAt"`
}

104
parser/parser/parser.go Normal file
View File

@ -0,0 +1,104 @@
package parser
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
"go.uber.org/zap"
)
const DefaultTimeout = 10
var (
ErrCallEndpoint = errors.New("ERROR CALL ENPOINT")
ErrNotFound = errors.New("NOT FOUND")
ErrInternalError = errors.New("INTERNAL ERROR")
ErrUnproceesableEntity = errors.New("UNPROCESSABLE")
ErrBadRequest = errors.New("BAD REQUEST")
)
// ParseVaaResponse represent a parse vaa response.
type ParseVaaResponse struct {
ChainID uint16 `json:"chainId"`
EmitterAddress string `json:"address"`
Sequence string `json:"sequence"`
AppID string `json:"appId"`
Result interface{} `json:"result"`
}
// ParserVAAAPIClient parse vaa api client.
type ParserVAAAPIClient struct {
Client http.Client
BaseURL string
Logger *zap.Logger
}
// NewParserVAAAPIClient create new instances of ParserVAAAPIClient.
func NewParserVAAAPIClient(timeout int64, baseURL string, logger *zap.Logger) (ParserVAAAPIClient, error) {
if timeout == 0 {
timeout = DefaultTimeout
}
if baseURL == "" {
return ParserVAAAPIClient{}, errors.New("baseURL can not be empty")
}
return ParserVAAAPIClient{
Client: http.Client{
Timeout: time.Duration(timeout) * time.Second,
},
BaseURL: baseURL,
Logger: logger,
}, nil
}
type ParseData struct {
PayloadID int `bson:"payloadid"`
Fields interface{}
}
// ParseVaa invoke the endpoint to parse a VAA from the VAAParserAPI.
func (c ParserVAAAPIClient) Parse(chainID uint16, address, sequence string, vaa []byte) (*ParseVaaResponse, error) {
endpointUrl := fmt.Sprintf("%s/vaa/parser/%v/%s/%v", c.BaseURL, chainID,
address, sequence)
// create request body.
payload := struct {
Payload []byte `json:"payload"`
}{
Payload: vaa,
}
body, err := json.Marshal(payload)
if err != nil {
c.Logger.Error("error marshalling payload", zap.Error(err), zap.Uint16("chainID", chainID),
zap.String("address", address), zap.String("sequence", sequence))
return nil, err
}
response, err := c.Client.Post(endpointUrl, "application/json", bytes.NewBuffer(body))
if err != nil {
c.Logger.Error("error call parse vaa endpoint", zap.Error(err), zap.Uint16("chainID", chainID),
zap.String("address", address), zap.String("sequence", sequence))
return nil, ErrCallEndpoint
}
defer response.Body.Close()
switch response.StatusCode {
case http.StatusCreated:
var parsedVAA ParseVaaResponse
json.NewDecoder(response.Body).Decode(&parsedVAA)
return &parsedVAA, nil
case http.StatusNotFound:
return nil, ErrNotFound
case http.StatusBadRequest:
return nil, ErrBadRequest
case http.StatusUnprocessableEntity:
return nil, ErrUnproceesableEntity
default:
return nil, ErrInternalError
}
}

View File

@ -0,0 +1,137 @@
package parser
import (
"errors"
"io"
"net/http"
"strings"
"testing"
"time"
"go.uber.org/zap"
)
var timeout int64 = 10
type RoundTripFunc func(req *http.Request) *http.Response
// RoundTrip interface implementation.
func (r RoundTripFunc) RoundTrip(request *http.Request) (*http.Response, error) {
return r(request), nil
}
// NewParserVAAAPITestClient create a
func NewParserVAAAPITestClient(rountTripFunc RoundTripFunc) ParserVAAAPIClient {
parserVaaClient, _ := NewParserVAAAPIClient(timeout, "", zap.NewExample())
parserVaaClient.Client = http.Client{
Timeout: time.Duration(10) * time.Second,
Transport: RoundTripFunc(rountTripFunc),
}
return parserVaaClient
}
// TestSuccessVAAParser test success vaa parser.
func TestSuccessVAAParser(t *testing.T) {
parserVaaClient := NewParserVAAAPITestClient(func(request *http.Request) *http.Response {
return &http.Response{
StatusCode: http.StatusCreated,
Body: io.NopCloser(strings.NewReader(`{"appID": "PORTAL_TOKEN_BRIDGE","chainId": 4, "address": "000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585", "sequence": "226769", "result": {"fee": 0,"type": "Transfer","payloadId": 1,"amount": 10000000,"tokenAddress": "0x000000000000000000000000dac17f958d2ee523a2206206994597c13d831ec7","tokenChain": 2,"toAddress": "0x0000000000000000000000000ff664edd699bd85610c2782d9dbbbad704b6fc5","chain": 5}}`)),
}
})
var chainID uint16 = 4
address := "000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585"
sequence := "226769"
parserVaaResponse, err := parserVaaClient.Parse(chainID, address, sequence, []byte{})
if err != nil {
t.Error("expected err zero value, got %w", err)
}
if parserVaaResponse == nil {
t.Error("expected parserVaaResponse value, got nil")
} else {
if chainID != parserVaaResponse.ChainID {
t.Errorf("expected chainID %v, got %v", chainID, parserVaaResponse.ChainID)
}
if parserVaaResponse.EmitterAddress != address {
t.Errorf("expected address %s, got %s", address, parserVaaResponse.EmitterAddress)
}
if parserVaaResponse.Sequence != sequence {
t.Errorf("expected sequence %s, got %s", sequence, parserVaaResponse.Sequence)
}
}
}
// TestNotFoundVaaParser test vaa parser not found.
func TestNotFoundVaaParser(t *testing.T) {
parserVaaClient := NewParserVAAAPITestClient(func(request *http.Request) *http.Response {
return &http.Response{
StatusCode: http.StatusNotFound,
}
})
parserVaaResponse, err := parserVaaClient.Parse(4, "000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585", "226769", []byte{})
if err == nil {
t.Error("expected error, got nil")
}
if !errors.Is(err, ErrNotFound) {
t.Errorf("expected ErrNotFound, got %s", err.Error())
}
if parserVaaResponse != nil {
t.Error("expected parserVaaResponse zero value, got %w", parserVaaResponse)
}
}
// TestBadRequestVaaParser test vaa parser bad request.
func TestBadRequestVaaParser(t *testing.T) {
parserVaaClient := NewParserVAAAPITestClient(func(request *http.Request) *http.Response {
return &http.Response{
StatusCode: http.StatusBadRequest,
}
})
parserVaaResponse, err := parserVaaClient.Parse(4, "000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585", "226769", []byte{})
if err == nil {
t.Error("expected error, got nil")
}
if !errors.Is(err, ErrBadRequest) {
t.Errorf("expected ErrBadRequest, got %s", err.Error())
}
if parserVaaResponse != nil {
t.Error("expected parserVaaResponse zero value, got %w", parserVaaResponse)
}
}
// TestUnprocessableVaaParser test vaa parser unprocessable request.
func TestUnprocessableVaaParser(t *testing.T) {
parserVaaClient := NewParserVAAAPITestClient(func(request *http.Request) *http.Response {
return &http.Response{
StatusCode: http.StatusUnprocessableEntity,
}
})
parserVaaResponse, err := parserVaaClient.Parse(4, "000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585", "26769", []byte{})
if err == nil {
t.Error("expected error, got nil")
}
if !errors.Is(err, ErrUnproceesableEntity) {
t.Errorf("expected ErrUnproceesableEntity, got %s", err.Error())
}
if parserVaaResponse != nil {
t.Error("expected parserVaaResponse zero value, got %w", parserVaaResponse)
}
}
// TestInternalErrorVaaParser test vaa parser internal error request.
func TestInternalErrorVaaParser(t *testing.T) {
parserVaaClient := NewParserVAAAPITestClient(func(request *http.Request) *http.Response {
return &http.Response{
StatusCode: http.StatusInternalServerError,
}
})
parserVaaResponse, err := parserVaaClient.Parse(4, "000000000000000000000003ee18b2214aff97000d974cf647e7c347e8fa585", "226769", []byte{})
if err == nil {
t.Error("expected error, got nil")
}
if !errors.Is(err, ErrInternalError) {
t.Errorf("expected ErrInternalError, got %s", err.Error())
}
if parserVaaResponse != nil {
t.Error("expected parserVaaResponse zero value, got %w", parserVaaResponse)
}
}

View File

@ -0,0 +1,57 @@
package parser
import (
"context"
"time"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// repository errors
var ErrDocNotFound = errors.New("NOT FOUND")
// Repository definitions.
type Repository struct {
db *mongo.Database
log *zap.Logger
collections struct {
parsedVaa *mongo.Collection
}
}
// NewRepository create a new respository instance.
func NewRepository(db *mongo.Database, log *zap.Logger) *Repository {
return &Repository{db, log, struct {
parsedVaa *mongo.Collection
}{
parsedVaa: db.Collection("parsedVaa"),
}}
}
// UpsertParsedVaa saves vaa information and parsed result.
func (s *Repository) UpsertParsedVaa(ctx context.Context, parsedVAA ParsedVaaUpdate) error {
update := bson.M{
"$set": parsedVAA,
"$setOnInsert": indexedAt(*parsedVAA.UpdatedAt),
"$inc": bson.D{{Key: "revision", Value: 1}},
}
opts := options.Update().SetUpsert(true)
var err error
_, err = s.collections.parsedVaa.UpdateByID(ctx, parsedVAA.ID, update, opts)
return err
}
func indexedAt(t time.Time) IndexingTimestamps {
return IndexingTimestamps{
IndexedAt: t,
}
}
type IndexingTimestamps struct {
IndexedAt time.Time `bson:"indexedAt"`
}

View File

@ -0,0 +1,82 @@
package pipeline
import (
"context"
"errors"
"strconv"
"time"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
"go.uber.org/zap"
)
type Consumer struct {
consume queue.VAAConsumeFunc
repository *parser.Repository
parser parser.ParserVAAAPIClient
logger *zap.Logger
}
// NewConsumer creates a new vaa consumer.
func NewConsumer(consume queue.VAAConsumeFunc, repository *parser.Repository, parser parser.ParserVAAAPIClient, logger *zap.Logger) *Consumer {
return &Consumer{consume: consume, repository: repository, parser: parser, logger: logger}
}
// Start consumes messages from VAA queue, parse and store those messages in a repository.
func (c *Consumer) Start(ctx context.Context) {
go func() {
for {
select {
case <-ctx.Done():
return
case msg := <-c.consume(ctx):
event := msg.Data
// check id message is expired.
if msg.IsExpired() {
c.logger.Warn("Message with vaa expired", zap.String("id", event.ID()))
continue
}
// call vaa-payload-parser api to parse a VAA.
sequence := strconv.FormatUint(event.Sequence, 10)
vaaParseResponse, err := c.parser.Parse(event.ChainID, event.EmitterAddress, sequence, event.Vaa)
if err != nil {
if errors.Is(err, parser.ErrInternalError) {
c.logger.Info("error parsing VAA, will retry later", zap.Uint16("chainID", event.ChainID),
zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err))
continue
}
c.logger.Info("VAA cannot be parsed", zap.Uint16("chainID", event.ChainID),
zap.String("address", event.EmitterAddress), zap.Uint64("sequence", event.Sequence), zap.Error(err))
msg.Ack()
continue
}
// create ParsedVaaUpdate to upsert.
now := time.Now()
vaaParsed := parser.ParsedVaaUpdate{
ID: event.ID(),
EmitterChain: event.ChainID,
EmitterAddr: event.EmitterAddress,
Sequence: strconv.FormatUint(event.Sequence, 10),
AppID: vaaParseResponse.AppID,
Result: vaaParseResponse.Result,
UpdatedAt: &now,
}
err = c.repository.UpsertParsedVaa(ctx, vaaParsed)
if err != nil {
c.logger.Error("Error inserting vaa in repository",
zap.String("id", event.ID()),
zap.Error(err))
continue
}
msg.Ack()
c.logger.Info("Vaa save in repository", zap.String("id", event.ID()))
}
}
}()
}

View File

@ -0,0 +1,49 @@
package pipeline
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
"github.com/wormhole-foundation/wormhole-explorer/parser/watcher"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)
// Publisher definition.
type Publisher struct {
logger *zap.Logger
repository *parser.Repository
pushFunc queue.VAAPushFunc
}
// NewPublisher creates a new publisher for vaa with parse configuration.
func NewPublisher(logger *zap.Logger, repository *parser.Repository, pushFunc queue.VAAPushFunc) *Publisher {
return &Publisher{logger: logger, repository: repository, pushFunc: pushFunc}
}
// Publish sends a VaaEvent for the vaa that has parse configuration defined.
func (p *Publisher) Publish(e *watcher.Event) {
// deserializes the binary representation of a VAA
vaa, err := vaa.Unmarshal(e.Vaas)
if err != nil {
p.logger.Error("error Unmarshal vaa", zap.Error(err))
return
}
// V2 Get chainID/address that have parser function defined and add to sqs only that vaa.
// create a VaaEvent.
event := queue.VaaEvent{
ChainID: uint16(vaa.EmitterChain),
EmitterAddress: vaa.EmitterAddress.String(),
Sequence: vaa.Sequence,
Vaa: vaa.Payload,
}
// push messages to queue.
err = p.pushFunc(context.TODO(), &event)
if err != nil {
p.logger.Error("can not push event to queue", zap.Error(err), zap.String("event", event.ID()))
}
}

32
parser/queue/queue.go Normal file
View File

@ -0,0 +1,32 @@
package queue
import (
"context"
"fmt"
)
// VaaEvent represents a vaa data to be handle by the pipeline.
type VaaEvent struct {
ChainID uint16 `json:"chainId"`
EmitterAddress string `json:"emitter"`
Sequence uint64 `json:"sequence"`
Vaa []byte `json:"vaa"`
}
// ConsumerMessage defition.
type ConsumerMessage struct {
Data *VaaEvent
Ack func()
IsExpired func() bool
}
// ID get vaa ID (chainID/emiiterAddress/sequence)
func (v *VaaEvent) ID() string {
return fmt.Sprintf("%d/%s/%d", v.ChainID, v.EmitterAddress, v.Sequence)
}
// VAAPushFunc is a function to push VAAEvent.
type VAAPushFunc func(context.Context, *VaaEvent) error
// VAAConsumeFunc is a function to consume VAAEvent.
type VAAConsumeFunc func(context.Context) <-chan *ConsumerMessage

View File

@ -0,0 +1,46 @@
package queue
import (
"context"
)
// VAAInMemoryOption represents a VAA queue in memory option function.
type VAAInMemoryOption func(*VAAInMemory)
// VAAInMemory represents VAA queue in memory.
type VAAInMemory struct {
ch chan *ConsumerMessage
size int
}
// NewVAAInMemory creates a VAA queue in memory instances.
func NewVAAInMemory(opts ...VAAInMemoryOption) *VAAInMemory {
m := &VAAInMemory{size: 100}
for _, opt := range opts {
opt(m)
}
m.ch = make(chan *ConsumerMessage, m.size)
return m
}
// WithSize allows to specify an channel size when setting a value.
func WithSize(v int) VAAInMemoryOption {
return func(i *VAAInMemory) {
i.size = v
}
}
// Publish sends the message to a channel.
func (i *VAAInMemory) Publish(_ context.Context, message *VaaEvent) error {
i.ch <- &ConsumerMessage{
Data: message,
Ack: func() {},
IsExpired: func() bool { return false },
}
return nil
}
// Consume returns the channel with the received messages.
func (i *VAAInMemory) Consume(_ context.Context) <-chan *ConsumerMessage {
return i.ch
}

99
parser/queue/vaa_sqs.go Normal file
View File

@ -0,0 +1,99 @@
package queue
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
"go.uber.org/zap"
)
// SQSOption represents a VAA queue in SQS option function.
type SQSOption func(*SQS)
// SQS represents a VAA queue in SQS.
type SQS struct {
producer *sqs.Producer
consumer *sqs.Consumer
ch chan *ConsumerMessage
chSize int
logger *zap.Logger
}
// NewVAASQS creates a VAA queue in SQS instances.
func NewVAASQS(producer *sqs.Producer, consumer *sqs.Consumer, logger *zap.Logger, opts ...SQSOption) *SQS {
s := &SQS{
producer: producer,
consumer: consumer,
chSize: 10,
logger: logger}
for _, opt := range opts {
opt(s)
}
s.ch = make(chan *ConsumerMessage, s.chSize)
return s
}
// WithChannelSize allows to specify an channel size when setting a value.
func WithChannelSize(size int) SQSOption {
return func(d *SQS) {
d.chSize = size
}
}
// Publish sends the message to a SQS queue.
func (q *SQS) Publish(_ context.Context, message *VaaEvent) error {
body, err := json.Marshal(message)
if err != nil {
return err
}
groupID := fmt.Sprintf("%d/%s", message.ChainID, message.EmitterAddress)
deduplicationID := fmt.Sprintf("%d/%s/%d", message.ChainID, message.EmitterAddress, message.Sequence)
return q.producer.SendMessage(groupID, deduplicationID, string(body))
}
// Consume returns the channel with the received messages from SQS queue.
func (q *SQS) Consume(ctx context.Context) <-chan *ConsumerMessage {
go func() {
for {
select {
case <-ctx.Done():
return
default:
messages, err := q.consumer.GetMessages()
if err != nil {
q.logger.Error("Error getting messages from SQS", zap.Error(err))
continue
}
expiredAt := time.Now().Add(q.consumer.GetVisibilityTimeout())
for _, msg := range messages {
var body VaaEvent
err := json.Unmarshal([]byte(*msg.Body), &body)
if err != nil {
q.logger.Error("Error decoding message from SQS", zap.Error(err))
continue
}
q.ch <- &ConsumerMessage{
Data: &body,
Ack: func() {
if err := q.consumer.DeleteMessage(msg); err != nil {
q.logger.Error("Error deleting message from SQS", zap.Error(err))
}
},
IsExpired: func() bool {
return expiredAt.Before(time.Now())
},
}
}
}
}
}()
return q.ch
}
// Close closes all consumer resources.
func (q *SQS) Close() {
close(q.ch)
}

86
parser/watcher/watcher.go Normal file
View File

@ -0,0 +1,86 @@
package watcher
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// Watcher represents a listener of database changes.
type Watcher struct {
db *mongo.Database
dbName string
handler WatcherFunc
logger *zap.Logger
}
// WatcherFunc is a function to send database changes.
type WatcherFunc func(*Event)
type watchEvent struct {
DocumentKey documentKey `bson:"documentKey"`
OperationType string `bson:"operationType"`
DbFullDocument Event `bson:"fullDocument"`
}
type documentKey struct {
ID string `bson:"_id"`
}
// Event represents a database change.
type Event struct {
ID string `bson:"_id"`
Vaas []byte
}
const queryTemplate = `
[
{
"$match" : {
"operationType" : "insert",
"ns": { "$in": [{"db": "%s", "coll": "vaas"}] }
}
}
]
`
// NewWatcher creates a new database event watcher.
func NewWatcher(db *mongo.Database, dbName string, handler WatcherFunc, logger *zap.Logger) *Watcher {
return &Watcher{
db: db,
dbName: dbName,
handler: handler,
logger: logger,
}
}
// Start executes database event consumption.
func (w *Watcher) Start(ctx context.Context) error {
query := fmt.Sprintf(queryTemplate, w.dbName, w.dbName)
var steps []bson.D
err := bson.UnmarshalExtJSON([]byte(query), true, &steps)
if err != nil {
return err
}
stream, err := w.db.Watch(ctx, steps)
if err != nil {
return err
}
go func() {
for stream.Next(ctx) {
var e watchEvent
if err := stream.Decode(&e); err != nil {
w.logger.Error("Error unmarshalling event", zap.Error(err))
continue
}
w.handler(&Event{
ID: e.DbFullDocument.ID,
Vaas: e.DbFullDocument.Vaas,
})
}
}()
return nil
}