pipeline: new component (#146)

* pipeline: new component consuming mongo watch and publishing to sns topic

* fix: use aws config with role
This commit is contained in:
ftocal 2023-02-16 11:09:57 -03:00 committed by GitHub
parent cbb7956f90
commit 27d864d91c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 958 additions and 0 deletions

5
pipeline/.env.example Normal file
View File

@ -0,0 +1,5 @@
PORT=
MONGODB_URI=
MONGODB_DATABASE=
AWS_REGION=
SNS_URL=

1
pipeline/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.env

20
pipeline/Dockerfile Normal file
View File

@ -0,0 +1,20 @@
# syntax=docker.io/docker/dockerfile:1.3@sha256:42399d4635eddd7a9b8a24be879d2f9a930d0ed040a61324cfdf59ef1357b3b2
FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a2dae4a09aa13d0aa56e6d23c4ec2b1e4faacf86a813 AS build
WORKDIR /app
COPY . .
# Build the Go app
RUN make build
############################
# STEP 2 build a small image
############################
FROM alpine
#Copy certificates
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy our static executable.
COPY --from=build "/app/pipeline-app" "/pipeline"
# Run the binary.
ENTRYPOINT ["/pipeline"]

21
pipeline/Makefile Normal file
View File

@ -0,0 +1,21 @@
SHELL := /bin/bash
## help: print this help message
.PHONY: help
help:
@echo 'Usage:'
@sed -n 's/^##//p' ${MAKEFILE_LIST} | column -t -s ':' | sed -e 's/^/ /'
build:
CGO_ENABLED=0 GOOS=linux go build -o "./pipeline-app" cmd/main.go
doc:
swag init -pd
test:
go test -v -cover ./...
.PHONY: build doc test

5
pipeline/README.md Normal file
View File

@ -0,0 +1,5 @@
# Pipeline
## Config SNS FIFO in localstack
aws --profile localstack --endpoint-url=http://localhost:4566 sns --name vaas-pipeline.fifo --attributes FifoTopic=true,ContentBasedDeduplication=false

156
pipeline/cmd/main.go Normal file
View File

@ -0,0 +1,156 @@
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/config"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/healthcheck"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/http/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/sns"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/topic"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/watcher"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
type exitCode int
func handleExit() {
if r := recover(); r != nil {
if e, ok := r.(exitCode); ok {
os.Exit(int(e))
}
panic(r) // not an Exit, bubble up
}
}
func main() {
defer handleExit()
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
config, err := config.New(rootCtx)
if err != nil {
log.Fatal("Error creating config", err)
}
level, err := ipfslog.LevelFromString(config.LogLevel)
if err != nil {
log.Fatal("Invalid log level", err)
}
logger := ipfslog.Logger("wormhole-explorer-pipeline").Desugar()
ipfslog.SetAllLoggers(level)
logger.Info("Starting wormhole-explorer-pipeline ...")
//setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
// get publish function.
pushFunc, err := newTopicProducer(rootCtx, config, logger)
if err != nil {
logger.Fatal("failed to create publish function", zap.Error(err))
}
// get health check functions.
healthChecks, err := newHealthChecks(rootCtx, config, db.Database)
if err != nil {
logger.Fatal("failed to create health checks", zap.Error(err))
}
// // create a new publisher.
publisher := pipeline.NewPublisher(pushFunc, logger)
watcher := watcher.NewWatcher(rootCtx, db.Database, config.MongoDatabase, publisher.Publish, logger)
err = watcher.Start(rootCtx)
if err != nil {
logger.Fatal("failed to watch MongoDB", zap.Error(err))
}
server := infrastructure.NewServer(logger, config.Port, config.PprofEnabled, healthChecks...)
server.Start()
logger.Info("Started wormhole-explorer-pipeline")
// Waiting for signal
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rootCtx.Done():
logger.Warn("Terminating with root context cancelled.")
case signal := <-sigterm:
logger.Info("Terminating with signal.", zap.String("signal", signal.String()))
}
logger.Info("root context cancelled, exiting...")
rootCtxCancel()
logger.Info("Closing database connections ...")
db.Close()
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-pipeline")
}
func newAwsConfig(appCtx context.Context, cfg *config.Configuration) (aws.Config, error) {
region := cfg.AwsRegion
if cfg.AwsAccessKeyID != "" && cfg.AwsSecretAccessKey != "" {
credentials := credentials.NewStaticCredentialsProvider(cfg.AwsAccessKeyID, cfg.AwsSecretAccessKey, "")
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
if cfg.AwsEndpoint != "" {
return aws.Endpoint{
PartitionID: "aws",
URL: cfg.AwsEndpoint,
SigningRegion: region,
}, nil
}
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
})
awsCfg, err := awsconfig.LoadDefaultConfig(appCtx,
awsconfig.WithRegion(region),
awsconfig.WithEndpointResolver(customResolver),
awsconfig.WithCredentialsProvider(credentials),
)
return awsCfg, err
}
return awsconfig.LoadDefaultConfig(appCtx, awsconfig.WithRegion(region))
}
func newTopicProducer(appCtx context.Context, config *config.Configuration, logger *zap.Logger) (topic.PushFunc, error) {
awsConfig, err := newAwsConfig(appCtx, config)
if err != nil {
return nil, err
}
snsProducer, err := sns.NewProducer(awsConfig, config.SNSUrl)
if err != nil {
return nil, err
}
return topic.NewVAASNS(snsProducer, logger).Publish, nil
}
func newHealthChecks(ctx context.Context, config *config.Configuration, db *mongo.Database) ([]healthcheck.Check, error) {
awsConfig, err := newAwsConfig(ctx, config)
if err != nil {
return nil, err
}
return []healthcheck.Check{healthcheck.Mongo(db), healthcheck.SNS(awsConfig, config.SNSUrl)}, nil
}

35
pipeline/config/config.go Normal file
View File

@ -0,0 +1,35 @@
package config
import (
"context"
"github.com/joho/godotenv"
"github.com/sethvargo/go-envconfig"
)
// Configuration represents the application configuration with the default values.
type Configuration struct {
Env string `env:"ENV,default=development"`
LogLevel string `env:"LOG_LEVEL,default=INFO"`
Port string `env:"PORT,default=8000"`
MongoURI string `env:"MONGODB_URI,required"`
MongoDatabase string `env:"MONGODB_DATABASE,required"`
AwsEndpoint string `env:"AWS_ENDPOINT"`
AwsAccessKeyID string `env:"AWS_ACCESS_KEY_ID"`
AwsSecretAccessKey string `env:"AWS_SECRET_ACCESS_KEY"`
AwsRegion string `env:"AWS_REGION"`
SNSUrl string `env:"SNS_URL"`
PprofEnabled bool `env:"PPROF_ENABLED,default=false"`
}
// New creates a configuration with the values from .env file and environment variables.
func New(ctx context.Context) (*Configuration, error) {
_ = godotenv.Load(".env", "../.env")
var configuration Configuration
if err := envconfig.Process(ctx, &configuration); err != nil {
return nil, err
}
return &configuration, nil
}

63
pipeline/go.mod Normal file
View File

@ -0,0 +1,63 @@
module github.com/wormhole-foundation/wormhole-explorer/pipeline
go 1.19
require (
github.com/gofiber/fiber/v2 v2.40.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/joho/godotenv v1.4.0 // Configuration environment
github.com/pkg/errors v0.9.1
github.com/sethvargo/go-envconfig v0.6.0 // Configuration environment
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230123141139-45b3d18d80b2
go.mongodb.org/mongo-driver v1.11.0
go.uber.org/zap v1.23.0
)
require (
github.com/aws/aws-sdk-go-v2 v1.17.4
github.com/aws/aws-sdk-go-v2/config v1.1.1
github.com/aws/aws-sdk-go-v2/credentials v1.1.1
github.com/aws/aws-sdk-go-v2/service/sns v1.20.1
github.com/wormhole-foundation/wormhole-explorer/parser v0.0.0-20230206141215-b033c04a1356
)
require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.1.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.1.1 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/ethereum/go-ethereum v1.10.21 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.6.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.41.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.1 // indirect
github.com/xdg-go/stringprep v1.0.3 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
)
// Needed for cosmos-sdk based chains. See
// https://github.com/cosmos/cosmos-sdk/issues/10925 for more details.
replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

184
pipeline/go.sum Normal file
View File

@ -0,0 +1,184 @@
github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/aws/aws-sdk-go-v2 v1.2.0/go.mod h1:zEQs02YRBw1DjK0PoJv3ygDYOFTre1ejlJWl8FwAuQo=
github.com/aws/aws-sdk-go-v2 v1.17.3/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2 v1.17.4 h1:wyC6p9Yfq6V2y98wfDsj6OnNQa4w2BLGCLIxzNhwOGY=
github.com/aws/aws-sdk-go-v2 v1.17.4/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
github.com/aws/aws-sdk-go-v2/config v1.1.1 h1:ZAoq32boMzcaTW9bcUacBswAmHTbvlvDJICgHFZuECo=
github.com/aws/aws-sdk-go-v2/config v1.1.1/go.mod h1:0XsVy9lBI/BCXm+2Tuvt39YmdHwS5unDQmxZOYe8F5Y=
github.com/aws/aws-sdk-go-v2/credentials v1.1.1 h1:NbvWIM1Mx6sNPTxowHgS2ewXCRp+NGTzUYb/96FZJbY=
github.com/aws/aws-sdk-go-v2/credentials v1.1.1/go.mod h1:mM2iIjwl7LULWtS6JCACyInboHirisUUdkBPoTHMOUo=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.2 h1:EtEU7WRaWliitZh2nmuxEXrN0Cb8EgPUFGIoTMeqbzI=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.2/go.mod h1:3hGg3PpiEjHnrkrlasTfxFqUsZ2GCk/fMUn4CbKgSkM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.27/go.mod h1:a1/UpzeyBBerajpnP5nGZa9mGzsBn5cOKxm6NWQsvoI=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.28 h1:r+XwaCLpIvCKjBIYy/HVZujQS9tsz5ohHG3ZIe0wKoE=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.28/go.mod h1:3lwChorpIM/BhImY/hy+Z6jekmN92cXGPI1QJasVPYY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.21/go.mod h1:+Gxn8jYn5k9ebfHEqlhrMirFjSW0v0C9fI+KN5vk2kE=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22 h1:7AwGYXDdqRQYsluvKFmWoqpcOQJ4bH634SkYf3FNj/A=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22/go.mod h1:EqK7gVrIGAHyZItrD1D8B0ilgwMD1GiWAmbU4u/JHNk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28 h1:KeTxcGdNnQudb46oOl4d90f2I33DF/c6q3RnZAmvQdQ=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.28/go.mod h1:yRZVr/iT0AqyHeep00SZ4YfBAKojXz08w3XMBscdi0c=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.2 h1:4AH9fFjUlVktQMznF+YN33aWNXaR4VgDXyP28qokJC0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.2/go.mod h1:45MfaXZ0cNbeuT0KQ1XJylq8A6+OpVV2E5kvY/Kq+u8=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.1 h1:VbbZ4Irb+fScR/J1SdIawecsnRCOfOR20ogRPMloTDg=
github.com/aws/aws-sdk-go-v2/service/sns v1.20.1/go.mod h1:VN2n9SOMS1lNbh5YD7o+ho0/rgfifSrK//YYNiVVF5E=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.1 h1:JvO+TT1JhH8InfwOfgWAfIFo3H1cz5qW8WuIP8Y5d6s=
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.1/go.mod h1:jQhN5f4p3PALMNlUtfb/0wGIFlV7vGtJlPDVfxfNfPY=
github.com/aws/aws-sdk-go-v2/service/sso v1.1.1 h1:37QubsarExl5ZuCBlnRP+7l1tNwZPBSTqpTBrPH98RU=
github.com/aws/aws-sdk-go-v2/service/sso v1.1.1/go.mod h1:SuZJxklHxLAXgLTc1iFXbEWkXs7QRTQpCLGaKIprQW0=
github.com/aws/aws-sdk-go-v2/service/sts v1.1.1 h1:TJoIfnIFubCX0ACVeJ0w46HEH5MwjwYN4iFhuYIhfIY=
github.com/aws/aws-sdk-go-v2/service/sts v1.1.1/go.mod h1:Wi0EBZwiz/K44YliU0EKxqTCJGUfYTWXrrBwkq736bM=
github.com/aws/smithy-go v1.1.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k=
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc=
github.com/ethereum/go-ethereum v1.10.21 h1:5lqsEx92ZaZzRyOqBEXux4/UR06m296RGzN3ol3teJY=
github.com/ethereum/go-ethereum v1.10.21/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg=
github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pIl4=
github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY=
github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joho/godotenv v1.4.0 h1:3l4+N6zfMWnkbPEXKng2o2/MR5mSwTrBih4ZEkkz1lg=
github.com/joho/godotenv v1.4.0/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/sethvargo/go-envconfig v0.6.0 h1:GxxdoeiNpWgGiVEphNFNObgMYRN/ZvI2dN7rBwadyss=
github.com/sethvargo/go-envconfig v0.6.0/go.mod h1:00S1FAhRUuTNJazWBWcJGvEHOM+NO6DhoRMAOX7FY5o=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY=
github.com/valyala/fasthttp v1.41.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/wormhole-foundation/wormhole-explorer/parser v0.0.0-20230206141215-b033c04a1356 h1:7M1tHP/K5JC7xnoN2IxharAVSA7Ir77NOpwPbjgq2L4=
github.com/wormhole-foundation/wormhole-explorer/parser v0.0.0-20230206141215-b033c04a1356/go.mod h1:fwx3DcH75h2jFhGoGWgCZ4lkN9Fvaz+/dySYm32K6bo=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230123141139-45b3d18d80b2 h1:we8iat9DdKt8V6aopxFe+2PkKol9cDPYwH8xZue0R60=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230123141139-45b3d18d80b2/go.mod h1:9KomdyKDfd0O0A64dfapTbTp6I9LGkudkK8Q7s72RyI=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E=
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.mongodb.org/mongo-driver v1.11.0 h1:FZKhBSTydeuffHj9CBjXlR8vQLee1cQyTWYPA6/tqiE=
go.mongodb.org/mongo-driver v1.11.0/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,54 @@
package healthcheck
import (
"context"
"fmt"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)
type mongoStatus struct {
Ok int32 `bson:"ok"`
Host string `bson:"host"`
Version string `bson:"version"`
Process string `bson:"process"`
Pid int32 `bson:"pid"`
Uptime int32 `bson:"uptime"`
Connections *mongoConnections `bson:"connections"`
}
// mongoConnections represents a mongo server connection.
type mongoConnections struct {
Current int32 `bson:"current"`
Available int32 `bson:"available"`
TotalCreated int32 `bson:"totalCreated"`
}
func Mongo(db *mongo.Database) Check {
return func(ctx context.Context) error {
command := bson.D{{Key: "serverStatus", Value: 1}}
result := db.RunCommand(ctx, command)
if result.Err() != nil {
return errors.WithStack(result.Err())
}
var mongoStatus mongoStatus
err := result.Decode(&mongoStatus)
if err != nil {
return errors.WithStack(err)
}
// check mongo server status
mongoStatusCheck := (mongoStatus.Ok == 1 && mongoStatus.Pid > 0 && mongoStatus.Uptime > 0)
if !mongoStatusCheck {
return fmt.Errorf("mongo server not ready (Ok = %v, Pid = %v, Uptime = %v)", mongoStatus.Ok, mongoStatus.Pid, mongoStatus.Uptime)
}
// check mongo connections
if mongoStatus.Connections.Available <= 0 {
return fmt.Errorf("mongo server without available connections (availableConection = %v)", mongoStatus.Connections.Available)
}
return nil
}
}

View File

@ -0,0 +1,22 @@
package healthcheck
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sns"
)
func SNS(config aws.Config, url string) Check {
api := sns.NewFromConfig(config)
return func(ctx context.Context) error {
params := &sns.GetTopicAttributesInput{
TopicArn: aws.String(url),
}
_, err := api.GetTopicAttributes(ctx, params)
if err != nil {
return err
}
return nil
}
}

View File

@ -0,0 +1,5 @@
package healthcheck
import "context"
type Check func(context.Context) error

View File

@ -0,0 +1,46 @@
package infrastructure
import (
"fmt"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/healthcheck"
"go.uber.org/zap"
)
// Controller definition.
type Controller struct {
checks []healthcheck.Check
logger *zap.Logger
}
// NewController creates a Controller instance.
func NewController(checks []healthcheck.Check, logger *zap.Logger) *Controller {
return &Controller{checks: checks, logger: logger}
}
// HealthCheck handler for the endpoint /health.
func (c *Controller) HealthCheck(ctx *fiber.Ctx) error {
return ctx.JSON(struct {
Status string `json:"status"`
}{Status: "OK"})
}
// ReadyCheck handler for the endpoint /ready.
func (c *Controller) ReadyCheck(ctx *fiber.Ctx) error {
rctx := ctx.Context()
requestID := fmt.Sprintf("%v", rctx.Value("requestid"))
for _, check := range c.checks {
if err := check(rctx); err != nil {
c.logger.Error("Ready check failed", zap.Error(err), zap.String("requestID", requestID))
return ctx.Status(fiber.StatusInternalServerError).JSON(struct {
Ready string `json:"ready"`
Error string `json:"error"`
}{Ready: "NO", Error: err.Error()})
}
}
return ctx.Status(fiber.StatusOK).JSON(struct {
Ready string `json:"ready"`
}{Ready: "OK"})
}

View File

@ -0,0 +1,48 @@
package infrastructure
import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/pprof"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/healthcheck"
"go.uber.org/zap"
)
type Server struct {
app *fiber.App
port string
logger *zap.Logger
}
func NewServer(logger *zap.Logger, port string, pprofEnabled bool, checks ...healthcheck.Check) *Server {
app := fiber.New(fiber.Config{DisableStartupMessage: true})
// config use of middlware.
if pprofEnabled {
app.Use(pprof.New())
}
ctrl := NewController(checks, logger)
api := app.Group("/api")
api.Get("/health", ctrl.HealthCheck)
api.Get("/ready", ctrl.ReadyCheck)
return &Server{
app: app,
port: port,
logger: logger,
}
}
// Start listen serves HTTP requests from addr.
func (s *Server) Start() {
addr := ":" + s.port
s.logger.Info("Listening on " + addr)
go func() {
s.app.Listen(addr)
}()
}
// Stop gracefull server.
func (s *Server) Stop() {
_ = s.app.Shutdown()
}

View File

@ -0,0 +1,31 @@
package db
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// Database definition.
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// New connects to DB and returns a client that will disconnect when the passed in context is cancelled.
func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
// Close closes the database connections.
func (d *Database) Close() error {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
return d.client.Disconnect(ctx)
}

View File

@ -0,0 +1,33 @@
package sns
import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
aws_sns "github.com/aws/aws-sdk-go-v2/service/sns"
)
// Producer represents SNS producer.
type Producer struct {
api *aws_sns.Client
url string
}
func NewProducer(awsConfig aws.Config, url string) (*Producer, error) {
return &Producer{
api: aws_sns.NewFromConfig(awsConfig),
url: url,
}, nil
}
// SendMessage sends messages to SQS.
func (p *Producer) SendMessage(ctx context.Context, groupID, deduplicationID, body string) error {
_, err := p.api.Publish(ctx,
&aws_sns.PublishInput{
MessageGroupId: aws.String(groupID),
MessageDeduplicationId: aws.String(deduplicationID),
Message: aws.String(body),
TopicArn: aws.String(p.url),
})
return err
}

View File

@ -0,0 +1,46 @@
package pipeline
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/topic"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/watcher"
"go.uber.org/zap"
)
// Publisher definition.
type Publisher struct {
logger *zap.Logger
pushFunc topic.PushFunc
}
// NewPublisher creates a new publisher for vaa with parse configuration.
func NewPublisher(pushFunc topic.PushFunc, logger *zap.Logger) *Publisher {
return &Publisher{logger: logger, pushFunc: pushFunc}
}
// Publish sends a Event for the vaa that has parse configuration defined.
func (p *Publisher) Publish(ctx context.Context, e *watcher.Event) {
// create a Event.
event := topic.Event{
ID: e.ID,
ChainID: e.ChainID,
EmitterAddress: e.EmitterAddress,
Sequence: e.Sequence,
GuardianSetIndex: e.GuardianSetIndex,
Vaa: e.Vaa,
IndexedAt: e.IndexedAt,
Timestamp: e.Timestamp,
UpdatedAt: e.UpdatedAt,
TxHash: e.TxHash,
Version: e.Version,
Revision: e.Revision,
}
// push messages to topic.
err := p.pushFunc(ctx, &event)
if err != nil {
p.logger.Error("can not push event to topic", zap.Error(err), zap.String("event", event.ID))
}
}

25
pipeline/topic/topic.go Normal file
View File

@ -0,0 +1,25 @@
package topic
import (
"context"
"time"
)
// Event represents a vaa data to be handle by the pipeline.
type Event struct {
ID string `json:"id"`
ChainID uint16 `json:"emitterChain"`
EmitterAddress string `json:"emitterAddr"`
Sequence string `json:"sequence"`
GuardianSetIndex uint32 `json:"guardianSetIndex"`
Vaa []byte `json:"vaas"`
IndexedAt time.Time `json:"indexedAt"`
Timestamp *time.Time `json:"timestamp"`
UpdatedAt *time.Time `json:"updatedAt"`
TxHash string `json:"txHash"`
Version uint16 `json:"version"`
Revision uint16 `json:"revision"`
}
// PushFunc is a function to push VAAEvent.
type PushFunc func(context.Context, *Event) error

View File

@ -0,0 +1,27 @@
package topic
import (
"context"
"go.uber.org/zap"
)
// VAAInMemoryOption represents a VAA queue in memory option function.
type VAAInMemoryOption func(*VAAInMemory)
// VAAInMemory represents VAA queue in memory.
type VAAInMemory struct {
logger *zap.Logger
}
// NewVAAInMemory creates a VAA queue in memory instances.
func NewVAAInMemory(logger *zap.Logger) *VAAInMemory {
m := &VAAInMemory{logger: logger}
return m
}
// Publish sends the message to a channel.
func (i *VAAInMemory) Publish(_ context.Context, message *Event) error {
return nil
}

37
pipeline/topic/vaa_sns.go Normal file
View File

@ -0,0 +1,37 @@
package topic
import (
"context"
"encoding/json"
"fmt"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/sns"
"go.uber.org/zap"
)
// SQS represents a VAA queue in SNS.
type SNS struct {
producer *sns.Producer
logger *zap.Logger
}
// NewVAASNS creates a VAA topic in SNS instances.
func NewVAASNS(producer *sns.Producer, logger *zap.Logger) *SNS {
s := &SNS{
producer: producer,
logger: logger,
}
return s
}
// Publish sends the message to a SNS topic.
func (s *SNS) Publish(ctx context.Context, message *Event) error {
body, err := json.Marshal(message)
if err != nil {
return err
}
groupID := fmt.Sprintf("%d/%s", message.ChainID, message.EmitterAddress)
s.logger.Debug("Publishing message", zap.String("groupID", groupID))
return s.producer.SendMessage(ctx, groupID, message.ID, string(body))
}

View File

@ -0,0 +1,94 @@
package watcher
import (
"context"
"fmt"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
// Watcher represents a listener of database changes.
type Watcher struct {
db *mongo.Database
dbName string
handler WatcherFunc
logger *zap.Logger
}
// WatcherFunc is a function to send database changes.
type WatcherFunc func(context.Context, *Event)
type watchEvent struct {
DocumentKey documentKey `bson:"documentKey"`
OperationType string `bson:"operationType"`
DbFullDocument Event `bson:"fullDocument"`
}
type documentKey struct {
ID string `bson:"_id"`
}
// Event represents a database change.
type Event struct {
ID string `bson:"_id"`
ChainID uint16 `bson:"emitterChain"`
EmitterAddress string `bson:"emitterAddr"`
Sequence string `bson:"sequence"`
GuardianSetIndex uint32 `bson:"guardianSetIndex"`
Vaa []byte `bson:"vaas"`
IndexedAt time.Time `bson:"indexedAt"`
Timestamp *time.Time `bson:"timestamp"`
UpdatedAt *time.Time `bson:"updatedAt"`
TxHash string `bson:"txHash"`
Version uint16 `bson:"version"`
Revision uint16 `bson:"revision"`
}
const queryTemplate = `
[
{
"$match" : {
"operationType" : "insert",
"ns": { "$in": [{"db": "%s", "coll": "vaasPythnet"}, {"db": "%s", "coll": "vaas"}] }
}
}
]
`
// NewWatcher creates a new database event watcher.
func NewWatcher(ctx context.Context, db *mongo.Database, dbName string, handler WatcherFunc, logger *zap.Logger) *Watcher {
return &Watcher{
db: db,
dbName: dbName,
handler: handler,
logger: logger,
}
}
// Start executes database event consumption.
func (w *Watcher) Start(ctx context.Context) error {
query := fmt.Sprintf(queryTemplate, w.dbName, w.dbName)
var steps []bson.D
err := bson.UnmarshalExtJSON([]byte(query), true, &steps)
if err != nil {
return err
}
stream, err := w.db.Watch(ctx, steps)
if err != nil {
return err
}
go func() {
for stream.Next(ctx) {
var e watchEvent
if err := stream.Decode(&e); err != nil {
w.logger.Error("Error unmarshalling event", zap.Error(err))
continue
}
w.handler(ctx, &e.DbFullDocument)
}
}()
return nil
}