Add backfiller for pipeline (#1221)
This commit is contained in:
parent
277373bf57
commit
a6cbcc7c55
|
@ -20,8 +20,18 @@ type VaaRepository struct {
|
|||
|
||||
// VaaDoc is a document for VAA.
|
||||
type VaaDoc struct {
|
||||
ID string `bson:"_id" json:"id"`
|
||||
Vaa []byte `bson:"vaas" json:"vaa"`
|
||||
ID string `bson:"_id" json:"id"`
|
||||
Vaa []byte `bson:"vaas" json:"vaa"`
|
||||
ChainID uint16 `bson:"emitterChain"`
|
||||
EmitterAddress string `bson:"emitterAddr"`
|
||||
Sequence string `bson:"sequence"`
|
||||
GuardianSetIndex uint32 `bson:"guardianSetIndex"`
|
||||
IndexedAt time.Time `bson:"indexedAt"`
|
||||
Timestamp *time.Time `bson:"timestamp"`
|
||||
UpdatedAt *time.Time `bson:"updatedAt"`
|
||||
TxHash string `bson:"txHash"`
|
||||
Version uint16 `bson:"version"`
|
||||
Revision uint16 `bson:"revision"`
|
||||
}
|
||||
|
||||
// VaaQuery is a query for VAA.
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
---
|
||||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: {{ .NAME }}-backfiller
|
||||
namespace: {{ .NAMESPACE }}
|
||||
spec:
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: {{ .NAME }}-backfiller
|
||||
spec:
|
||||
restartPolicy: Never
|
||||
terminationGracePeriodSeconds: 40
|
||||
serviceAccountName: pipeline
|
||||
containers:
|
||||
- name: {{ .NAME }}-backfiller
|
||||
image: {{ .IMAGE_NAME }}
|
||||
imagePullPolicy: Always
|
||||
env:
|
||||
- name: MONGODB_URI
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: mongodb
|
||||
key: mongo-uri
|
||||
- name: MONGODB_DATABASE
|
||||
valueFrom:
|
||||
configMapKeyRef:
|
||||
name: config
|
||||
key: mongo-database
|
||||
- name: SNS_URL
|
||||
value: {{ .SNS_URL }}
|
||||
- name: AWS_REGION
|
||||
value: {{ .SNS_AWS_REGION }}
|
||||
command: ["/pipeline"]
|
||||
args:
|
||||
- backfiller
|
||||
- --mongo-uri
|
||||
- "$(MONGODB_URI)"
|
||||
- --mongo-database
|
||||
- "$(MONGODB_DATABASE)"
|
||||
- --aws-region
|
||||
- "{{ .SNS_AWS_REGION }}"
|
||||
- --sns-url
|
||||
- "{{ .SNS_URL }}"
|
||||
- "--start-time"
|
||||
- "2024-03-01T00:00:00Z"
|
||||
- "--page-size"
|
||||
- "1000"
|
||||
- "--requests-per-second"
|
||||
- "10000"
|
||||
- "--num-workers"
|
||||
- "10"
|
|
@ -0,0 +1,56 @@
|
|||
package builder
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/metrics"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/sns"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/topic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func NewAwsConfig(appCtx context.Context, region string, accessKeyID, secretAccessKey, endpoint string) (aws.Config, error) {
|
||||
|
||||
if accessKeyID != "" && secretAccessKey != "" {
|
||||
credentials := credentials.NewStaticCredentialsProvider(accessKeyID, secretAccessKey, "")
|
||||
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
|
||||
if endpoint != "" {
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
URL: endpoint,
|
||||
SigningRegion: region,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
|
||||
})
|
||||
|
||||
awsCfg, err := awsconfig.LoadDefaultConfig(appCtx,
|
||||
awsconfig.WithRegion(region),
|
||||
awsconfig.WithEndpointResolver(customResolver),
|
||||
awsconfig.WithCredentialsProvider(credentials),
|
||||
)
|
||||
return awsCfg, err
|
||||
}
|
||||
|
||||
return awsconfig.LoadDefaultConfig(appCtx, awsconfig.WithRegion(region))
|
||||
}
|
||||
|
||||
func NewTopicProducer(ctx context.Context, region, snsUrl string, accessKeyID, secretAccessKey, endpoint string,
|
||||
alertClient alert.AlertClient, metrics metrics.Metrics, logger *zap.Logger) (topic.PushFunc, error) {
|
||||
awsConfig, err := NewAwsConfig(ctx, region, accessKeyID, secretAccessKey, endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
snsProducer, err := sns.NewProducer(awsConfig, snsUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return topic.NewVAASNS(snsProducer, alertClient, metrics, logger).Publish, nil
|
||||
}
|
|
@ -0,0 +1,191 @@
|
|||
package backfiller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/builder"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/config"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/metrics"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/topic"
|
||||
"go.uber.org/ratelimit"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func Run(cfg *config.Backfiller) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
logger := logger.New("wormhole-explorer-pipeline", logger.WithLevel(cfg.LogLevel))
|
||||
|
||||
logger.Info("Starting wormhole-explorer-pipeline as backfiller ...")
|
||||
|
||||
startTime, err := time.Parse(time.RFC3339, cfg.StartTime)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to parse start time", zap.Error(err))
|
||||
}
|
||||
|
||||
endTime := time.Now()
|
||||
if cfg.EndTime != "" {
|
||||
endTime, err = time.Parse(time.RFC3339, cfg.EndTime)
|
||||
if err != nil {
|
||||
logger.Fatal("Failed to parse end time", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if startTime.After(endTime) {
|
||||
logger.Fatal("Start time should be before end time",
|
||||
zap.String("start_time", startTime.Format(time.RFC3339)),
|
||||
zap.String("end_time", endTime.Format(time.RFC3339)))
|
||||
}
|
||||
|
||||
//setup DB connection
|
||||
db, err := dbutil.Connect(ctx, logger, cfg.MongoURI, cfg.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
||||
// get alert client.
|
||||
alertClient := alert.NewDummyClient()
|
||||
|
||||
// get metrics.
|
||||
metrics := metrics.NewDummyMetrics()
|
||||
|
||||
// get publish function.
|
||||
pushFunc, err := builder.NewTopicProducer(ctx, cfg.AwsRegion, cfg.SNSUrl, cfg.AwsAccessKeyID,
|
||||
cfg.AwsSecretAccessKey, cfg.AwsEndpoint, alertClient, metrics, logger)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create publish function", zap.Error(err))
|
||||
}
|
||||
|
||||
// create a vaa repository.
|
||||
vaaRepository := repository.NewVaaRepository(db.Database, logger)
|
||||
|
||||
query := repository.VaaQuery{
|
||||
StartTime: &startTime,
|
||||
EndTime: &endTime,
|
||||
}
|
||||
|
||||
limiter := ratelimit.New(int(cfg.RequestsPerSecond), ratelimit.Per(time.Second))
|
||||
|
||||
pagination := repository.Pagination{
|
||||
Page: 0,
|
||||
PageSize: cfg.PageSize,
|
||||
SortAsc: true,
|
||||
}
|
||||
|
||||
queue := make(chan *repository.VaaDoc, 5*cfg.PageSize)
|
||||
|
||||
var quantityProduced, quantityConsumed atomic.Uint64
|
||||
|
||||
go getVaas(ctx, logger, pagination, query, vaaRepository, queue, &quantityProduced)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(cfg.NumWorkers)
|
||||
|
||||
for i := 0; i < cfg.NumWorkers; i++ {
|
||||
name := fmt.Sprintf("worker-%d", i)
|
||||
log := logger.With(zap.String("worker", name))
|
||||
go publishVaa(ctx, pushFunc, queue, log, &wg, limiter, &quantityConsumed)
|
||||
}
|
||||
|
||||
logger.Info("Waiting for all workers to finish...")
|
||||
wg.Wait()
|
||||
|
||||
logger.Info("closing MongoDB connection...")
|
||||
db.DisconnectWithTimeout(10 * time.Second)
|
||||
|
||||
logger.Info("Finish wormhole-explorer-pipeline as backfiller",
|
||||
zap.Uint64("produced", quantityProduced.Load()),
|
||||
zap.Uint64("consumed", quantityConsumed.Load()))
|
||||
}
|
||||
|
||||
func getVaas(ctx context.Context, logger *zap.Logger, pagination repository.Pagination, query repository.VaaQuery,
|
||||
vaaRepository *repository.VaaRepository, queue chan *repository.VaaDoc, quantityProduced *atomic.Uint64) {
|
||||
defer close(queue)
|
||||
for {
|
||||
logger.Info("Processing page", zap.Any("pagination", pagination), zap.Any("query", query))
|
||||
|
||||
vaas, err := vaaRepository.FindPage(ctx, query, pagination)
|
||||
if err != nil {
|
||||
logger.Error("Failed to get vaas", zap.Error(err))
|
||||
break
|
||||
}
|
||||
|
||||
if len(vaas) == 0 {
|
||||
logger.Info("Empty page", zap.Int64("page", pagination.Page))
|
||||
break
|
||||
}
|
||||
|
||||
for _, vaa := range vaas {
|
||||
queue <- vaa
|
||||
quantityProduced.Add(1)
|
||||
}
|
||||
|
||||
pagination.Page++
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
if len(queue) == 0 {
|
||||
logger.Info("Closing, queue is empty")
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
logger.Info("Closing due to cancelled context")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func publishVaa(ctx context.Context, push topic.PushFunc, queue chan *repository.VaaDoc, logger *zap.Logger, wg *sync.WaitGroup,
|
||||
limiter ratelimit.Limiter, quantityConsumed *atomic.Uint64) {
|
||||
// Main loop: fetch global txs and process them
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
|
||||
// Try to pop a globalTransaction from the queue
|
||||
case vaa, ok := <-queue:
|
||||
// If the channel was closed, exit immediately
|
||||
if !ok {
|
||||
logger.Info("Closing, channel was closed")
|
||||
return
|
||||
}
|
||||
|
||||
limiter.Take()
|
||||
|
||||
if err := push(ctx, &topic.Event{
|
||||
ID: vaa.ID,
|
||||
ChainID: vaa.ChainID,
|
||||
EmitterAddress: vaa.EmitterAddress,
|
||||
Sequence: vaa.Sequence,
|
||||
GuardianSetIndex: vaa.GuardianSetIndex,
|
||||
Vaa: vaa.Vaa,
|
||||
IndexedAt: vaa.IndexedAt,
|
||||
Timestamp: vaa.Timestamp,
|
||||
UpdatedAt: vaa.UpdatedAt,
|
||||
TxHash: vaa.TxHash,
|
||||
Version: vaa.Version,
|
||||
Revision: vaa.Revision,
|
||||
}); err != nil {
|
||||
logger.Error("Failed to push vaa", zap.Error(err))
|
||||
} else {
|
||||
quantityConsumed.Add(1)
|
||||
logger.Debug("VAA pushed", zap.String("vaa_id", vaa.ID))
|
||||
}
|
||||
|
||||
// If the context was cancelled, exit immediately
|
||||
case <-ctx.Done():
|
||||
logger.Info("Closing due to cancelled context")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,198 +1,90 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/cmd/backfiller"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/cmd/service"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/config"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/healthcheck"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/http/infrastructure"
|
||||
pipelineAlert "github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/alert"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/metrics"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/sns"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/topic"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/watcher"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type exitCode int
|
||||
|
||||
func handleExit() {
|
||||
if r := recover(); r != nil {
|
||||
if e, ok := r.(exitCode); ok {
|
||||
os.Exit(int(e))
|
||||
}
|
||||
panic(r) // not an Exit, bubble up
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
||||
defer handleExit()
|
||||
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
|
||||
|
||||
config, err := config.New(rootCtx)
|
||||
if err != nil {
|
||||
log.Fatal("Error creating config", err)
|
||||
}
|
||||
|
||||
logger := logger.New("wormhole-explorer-pipeline", logger.WithLevel(config.LogLevel))
|
||||
|
||||
logger.Info("Starting wormhole-explorer-pipeline ...")
|
||||
|
||||
//setup DB connection
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
||||
// get alert client.
|
||||
alertClient, err := newAlertClient(config)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create alert client", zap.Error(err))
|
||||
}
|
||||
|
||||
// get metrics.
|
||||
metrics := newMetrics(config)
|
||||
|
||||
// get publish function.
|
||||
pushFunc, err := newTopicProducer(rootCtx, config, alertClient, metrics, logger)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create publish function", zap.Error(err))
|
||||
}
|
||||
|
||||
// get health check functions.
|
||||
healthChecks, err := newHealthChecks(rootCtx, config, db.Database)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create health checks", zap.Error(err))
|
||||
}
|
||||
|
||||
// create a new pipeline repository.
|
||||
repository := pipeline.NewRepository(db.Database, logger)
|
||||
|
||||
// create and start a new tx hash handler.
|
||||
quit := make(chan bool)
|
||||
txHashHandler := pipeline.NewTxHashHandler(repository, pushFunc, alertClient, metrics, logger, quit)
|
||||
go txHashHandler.Run(rootCtx)
|
||||
|
||||
// create a new publisher.
|
||||
publisher := pipeline.NewPublisher(pushFunc, metrics, repository, config.P2pNetwork, txHashHandler, logger)
|
||||
watcher := watcher.NewWatcher(rootCtx, db.Database, config.MongoDatabase, publisher.Publish, alertClient, metrics, logger)
|
||||
err = watcher.Start(rootCtx)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to watch MongoDB", zap.Error(err))
|
||||
}
|
||||
|
||||
server := infrastructure.NewServer(logger, config.Port, config.PprofEnabled, healthChecks...)
|
||||
server.Start()
|
||||
|
||||
logger.Info("Started wormhole-explorer-pipeline")
|
||||
|
||||
// Waiting for signal
|
||||
sigterm := make(chan os.Signal, 1)
|
||||
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
||||
select {
|
||||
case <-rootCtx.Done():
|
||||
logger.Warn("Terminating with root context cancelled.")
|
||||
case signal := <-sigterm:
|
||||
logger.Info("Terminating with signal.", zap.String("signal", signal.String()))
|
||||
}
|
||||
|
||||
logger.Info("root context cancelled, exiting...")
|
||||
rootCtxCancel()
|
||||
|
||||
logger.Info("Closing tx hash handler ...")
|
||||
close(quit)
|
||||
|
||||
logger.Info("closing MongoDB connection...")
|
||||
db.DisconnectWithTimeout(10 * time.Second)
|
||||
|
||||
logger.Info("Closing Http server ...")
|
||||
server.Stop()
|
||||
|
||||
logger.Info("Finished wormhole-explorer-pipeline")
|
||||
|
||||
execute()
|
||||
}
|
||||
|
||||
func newAwsConfig(appCtx context.Context, cfg *config.Configuration) (aws.Config, error) {
|
||||
region := cfg.AwsRegion
|
||||
|
||||
if cfg.AwsAccessKeyID != "" && cfg.AwsSecretAccessKey != "" {
|
||||
credentials := credentials.NewStaticCredentialsProvider(cfg.AwsAccessKeyID, cfg.AwsSecretAccessKey, "")
|
||||
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
|
||||
if cfg.AwsEndpoint != "" {
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
URL: cfg.AwsEndpoint,
|
||||
SigningRegion: region,
|
||||
}, nil
|
||||
func execute() error {
|
||||
root := &cobra.Command{
|
||||
Use: "pipeline",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
if len(args) == 0 {
|
||||
service.Run()
|
||||
}
|
||||
|
||||
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
|
||||
})
|
||||
|
||||
awsCfg, err := awsconfig.LoadDefaultConfig(appCtx,
|
||||
awsconfig.WithRegion(region),
|
||||
awsconfig.WithEndpointResolver(customResolver),
|
||||
awsconfig.WithCredentialsProvider(credentials),
|
||||
)
|
||||
return awsCfg, err
|
||||
},
|
||||
}
|
||||
|
||||
return awsconfig.LoadDefaultConfig(appCtx, awsconfig.WithRegion(region))
|
||||
addServiceCommand(root)
|
||||
addBackfiller(root)
|
||||
|
||||
return root.Execute()
|
||||
}
|
||||
|
||||
func newTopicProducer(appCtx context.Context, config *config.Configuration, alertClient alert.AlertClient, metrics metrics.Metrics, logger *zap.Logger) (topic.PushFunc, error) {
|
||||
awsConfig, err := newAwsConfig(appCtx, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func addServiceCommand(root *cobra.Command) {
|
||||
serviceCommand := &cobra.Command{
|
||||
Use: "service",
|
||||
Short: "Run parser as service",
|
||||
Run: func(_ *cobra.Command, _ []string) {
|
||||
service.Run()
|
||||
},
|
||||
}
|
||||
|
||||
snsProducer, err := sns.NewProducer(awsConfig, config.SNSUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return topic.NewVAASNS(snsProducer, alertClient, metrics, logger).Publish, nil
|
||||
root.AddCommand(serviceCommand)
|
||||
}
|
||||
|
||||
func newHealthChecks(ctx context.Context, config *config.Configuration, db *mongo.Database) ([]healthcheck.Check, error) {
|
||||
awsConfig, err := newAwsConfig(ctx, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []healthcheck.Check{healthcheck.Mongo(db), healthcheck.SNS(awsConfig, config.SNSUrl)}, nil
|
||||
}
|
||||
func addBackfiller(root *cobra.Command) {
|
||||
var mongoUri, mongoDb, snsUrl, logLevel, awsRegion, startTime, endTime string
|
||||
var awsEndpoint, awsAccessKeyID, awsSecretAccessKey string
|
||||
var pageSize, requestsPerSecond int64
|
||||
var numWorkers int
|
||||
|
||||
func newMetrics(cfg *config.Configuration) metrics.Metrics {
|
||||
metricsEnabled := cfg.MetricsEnabled
|
||||
if !metricsEnabled {
|
||||
return metrics.NewDummyMetrics()
|
||||
backfillerCommand := &cobra.Command{
|
||||
Use: "backfiller",
|
||||
Short: "Run backfiller to send vaas to sns",
|
||||
Run: func(_ *cobra.Command, _ []string) {
|
||||
cfg := &config.Backfiller{
|
||||
LogLevel: logLevel,
|
||||
MongoURI: mongoUri,
|
||||
MongoDatabase: mongoDb,
|
||||
AwsEndpoint: awsEndpoint,
|
||||
AwsAccessKeyID: awsAccessKeyID,
|
||||
AwsSecretAccessKey: awsSecretAccessKey,
|
||||
AwsRegion: awsRegion,
|
||||
SNSUrl: snsUrl,
|
||||
RequestsPerSecond: requestsPerSecond,
|
||||
StartTime: startTime,
|
||||
EndTime: endTime,
|
||||
PageSize: pageSize,
|
||||
NumWorkers: numWorkers,
|
||||
}
|
||||
backfiller.Run(cfg)
|
||||
},
|
||||
}
|
||||
return metrics.NewPrometheusMetrics(cfg.Environment)
|
||||
}
|
||||
backfillerCommand.Flags().StringVar(&logLevel, "log-level", "INFO", "log level")
|
||||
backfillerCommand.Flags().StringVar(&mongoUri, "mongo-uri", "", "Mongo connection")
|
||||
backfillerCommand.Flags().StringVar(&mongoDb, "mongo-database", "", "Mongo database")
|
||||
backfillerCommand.Flags().StringVar(&snsUrl, "sns-url", "", "SNS Url topic to push vaas")
|
||||
backfillerCommand.Flags().StringVar(&awsRegion, "aws-region", "", "Aws region")
|
||||
backfillerCommand.Flags().StringVar(&awsEndpoint, "aws-endpoint", "", "Aws endpoint")
|
||||
backfillerCommand.Flags().StringVar(&awsAccessKeyID, "aws-access-key-id", "", "Aws access key id")
|
||||
backfillerCommand.Flags().StringVar(&awsSecretAccessKey, "aws-secret-access-key", "", "Aws secret access key")
|
||||
backfillerCommand.Flags().StringVar(&startTime, "start-time", "1970-01-01T00:00:00Z", "minimum VAA timestamp to process")
|
||||
backfillerCommand.Flags().StringVar(&endTime, "end-time", "", "maximum VAA timestamp to process (default now)")
|
||||
backfillerCommand.Flags().Int64Var(&pageSize, "page-size", 100, "number of documents retrieved at a time")
|
||||
backfillerCommand.Flags().Int64Var(&requestsPerSecond, "requests-per-second", 100, "maximum number of requests per second to publish to sns topic")
|
||||
backfillerCommand.Flags().IntVar(&numWorkers, "num-workers", 5, "number of workers to publish vaas")
|
||||
|
||||
func newAlertClient(cfg *config.Configuration) (alert.AlertClient, error) {
|
||||
if !cfg.AlertEnabled {
|
||||
return alert.NewDummyClient(), nil
|
||||
}
|
||||
backfillerCommand.MarkFlagRequired("mongo-uri")
|
||||
backfillerCommand.MarkFlagRequired("mongo-database")
|
||||
backfillerCommand.MarkFlagRequired("aws-region")
|
||||
backfillerCommand.MarkFlagRequired("sns-url")
|
||||
backfillerCommand.MarkFlagRequired("start-time")
|
||||
|
||||
alertConfig := alert.AlertConfig{
|
||||
Environment: cfg.Environment,
|
||||
ApiKey: cfg.AlertApiKey,
|
||||
Enabled: cfg.AlertEnabled,
|
||||
}
|
||||
return alert.NewAlertService(alertConfig, pipelineAlert.LoadAlerts)
|
||||
root.AddCommand(backfillerCommand)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
awsconfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/config"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/healthcheck"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/http/infrastructure"
|
||||
pipelineAlert "github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/alert"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/metrics"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/sns"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/topic"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/pipeline/watcher"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type exitCode int
|
||||
|
||||
func handleExit() {
|
||||
if r := recover(); r != nil {
|
||||
if e, ok := r.(exitCode); ok {
|
||||
os.Exit(int(e))
|
||||
}
|
||||
panic(r) // not an Exit, bubble up
|
||||
}
|
||||
}
|
||||
|
||||
func Run() {
|
||||
|
||||
defer handleExit()
|
||||
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
|
||||
|
||||
config, err := config.New(rootCtx)
|
||||
if err != nil {
|
||||
log.Fatal("Error creating config", err)
|
||||
}
|
||||
|
||||
logger := logger.New("wormhole-explorer-pipeline", logger.WithLevel(config.LogLevel))
|
||||
|
||||
logger.Info("Starting wormhole-explorer-pipeline ...")
|
||||
|
||||
//setup DB connection
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
||||
// get alert client.
|
||||
alertClient, err := newAlertClient(config)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create alert client", zap.Error(err))
|
||||
}
|
||||
|
||||
// get metrics.
|
||||
metrics := newMetrics(config)
|
||||
|
||||
// get publish function.
|
||||
pushFunc, err := newTopicProducer(rootCtx, config, alertClient, metrics, logger)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create publish function", zap.Error(err))
|
||||
}
|
||||
|
||||
// get health check functions.
|
||||
healthChecks, err := newHealthChecks(rootCtx, config, db.Database)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to create health checks", zap.Error(err))
|
||||
}
|
||||
|
||||
// create a new pipeline repository.
|
||||
repository := pipeline.NewRepository(db.Database, logger)
|
||||
|
||||
// create and start a new tx hash handler.
|
||||
quit := make(chan bool)
|
||||
txHashHandler := pipeline.NewTxHashHandler(repository, pushFunc, alertClient, metrics, logger, quit)
|
||||
go txHashHandler.Run(rootCtx)
|
||||
|
||||
// create a new publisher.
|
||||
publisher := pipeline.NewPublisher(pushFunc, metrics, repository, config.P2pNetwork, txHashHandler, logger)
|
||||
watcher := watcher.NewWatcher(rootCtx, db.Database, config.MongoDatabase, publisher.Publish, alertClient, metrics, logger)
|
||||
err = watcher.Start(rootCtx)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to watch MongoDB", zap.Error(err))
|
||||
}
|
||||
|
||||
server := infrastructure.NewServer(logger, config.Port, config.PprofEnabled, healthChecks...)
|
||||
server.Start()
|
||||
|
||||
logger.Info("Started wormhole-explorer-pipeline")
|
||||
|
||||
// Waiting for signal
|
||||
sigterm := make(chan os.Signal, 1)
|
||||
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
||||
select {
|
||||
case <-rootCtx.Done():
|
||||
logger.Warn("Terminating with root context cancelled.")
|
||||
case signal := <-sigterm:
|
||||
logger.Info("Terminating with signal.", zap.String("signal", signal.String()))
|
||||
}
|
||||
|
||||
logger.Info("root context cancelled, exiting...")
|
||||
rootCtxCancel()
|
||||
|
||||
logger.Info("Closing tx hash handler ...")
|
||||
close(quit)
|
||||
|
||||
logger.Info("closing MongoDB connection...")
|
||||
db.DisconnectWithTimeout(10 * time.Second)
|
||||
|
||||
logger.Info("Closing Http server ...")
|
||||
server.Stop()
|
||||
|
||||
logger.Info("Finished wormhole-explorer-pipeline")
|
||||
|
||||
}
|
||||
|
||||
func newAwsConfig(appCtx context.Context, cfg *config.Configuration) (aws.Config, error) {
|
||||
region := cfg.AwsRegion
|
||||
|
||||
if cfg.AwsAccessKeyID != "" && cfg.AwsSecretAccessKey != "" {
|
||||
credentials := credentials.NewStaticCredentialsProvider(cfg.AwsAccessKeyID, cfg.AwsSecretAccessKey, "")
|
||||
customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
|
||||
if cfg.AwsEndpoint != "" {
|
||||
return aws.Endpoint{
|
||||
PartitionID: "aws",
|
||||
URL: cfg.AwsEndpoint,
|
||||
SigningRegion: region,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
|
||||
})
|
||||
|
||||
awsCfg, err := awsconfig.LoadDefaultConfig(appCtx,
|
||||
awsconfig.WithRegion(region),
|
||||
awsconfig.WithEndpointResolver(customResolver),
|
||||
awsconfig.WithCredentialsProvider(credentials),
|
||||
)
|
||||
return awsCfg, err
|
||||
}
|
||||
|
||||
return awsconfig.LoadDefaultConfig(appCtx, awsconfig.WithRegion(region))
|
||||
}
|
||||
|
||||
func newTopicProducer(appCtx context.Context, config *config.Configuration, alertClient alert.AlertClient, metrics metrics.Metrics, logger *zap.Logger) (topic.PushFunc, error) {
|
||||
awsConfig, err := newAwsConfig(appCtx, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
snsProducer, err := sns.NewProducer(awsConfig, config.SNSUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return topic.NewVAASNS(snsProducer, alertClient, metrics, logger).Publish, nil
|
||||
}
|
||||
|
||||
func newHealthChecks(ctx context.Context, config *config.Configuration, db *mongo.Database) ([]healthcheck.Check, error) {
|
||||
awsConfig, err := newAwsConfig(ctx, config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []healthcheck.Check{healthcheck.Mongo(db), healthcheck.SNS(awsConfig, config.SNSUrl)}, nil
|
||||
}
|
||||
|
||||
func newMetrics(cfg *config.Configuration) metrics.Metrics {
|
||||
metricsEnabled := cfg.MetricsEnabled
|
||||
if !metricsEnabled {
|
||||
return metrics.NewDummyMetrics()
|
||||
}
|
||||
return metrics.NewPrometheusMetrics(cfg.Environment)
|
||||
}
|
||||
|
||||
func newAlertClient(cfg *config.Configuration) (alert.AlertClient, error) {
|
||||
if !cfg.AlertEnabled {
|
||||
return alert.NewDummyClient(), nil
|
||||
}
|
||||
|
||||
alertConfig := alert.AlertConfig{
|
||||
Environment: cfg.Environment,
|
||||
ApiKey: cfg.AlertApiKey,
|
||||
Enabled: cfg.AlertEnabled,
|
||||
}
|
||||
return alert.NewAlertService(alertConfig, pipelineAlert.LoadAlerts)
|
||||
}
|
|
@ -26,6 +26,22 @@ type Configuration struct {
|
|||
MetricsEnabled bool `env:"METRICS_ENABLED,default=false"`
|
||||
}
|
||||
|
||||
type Backfiller struct {
|
||||
LogLevel string
|
||||
MongoURI string
|
||||
MongoDatabase string
|
||||
AwsEndpoint string
|
||||
AwsAccessKeyID string
|
||||
AwsSecretAccessKey string
|
||||
AwsRegion string
|
||||
SNSUrl string
|
||||
RequestsPerSecond int64
|
||||
StartTime string
|
||||
EndTime string
|
||||
PageSize int64
|
||||
NumWorkers int
|
||||
}
|
||||
|
||||
// New creates a configuration with the values from .env file and environment variables.
|
||||
func New(ctx context.Context) (*Configuration, error) {
|
||||
_ = godotenv.Load(".env", "../.env")
|
||||
|
|
|
@ -19,9 +19,11 @@ require (
|
|||
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2
|
||||
github.com/golang/mock v1.6.0
|
||||
github.com/prometheus/client_golang v1.16.0
|
||||
github.com/spf13/cobra v1.8.0
|
||||
github.com/test-go/testify v1.1.4
|
||||
github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-00010101000000-000000000000
|
||||
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229
|
||||
go.uber.org/ratelimit v0.3.1
|
||||
)
|
||||
|
||||
require (
|
||||
|
@ -48,6 +50,7 @@ require (
|
|||
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
|
||||
github.com/hashicorp/go-retryablehttp v0.5.1 // indirect
|
||||
github.com/holiman/uint256 v1.2.1 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/klauspost/compress v1.16.3 // indirect
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
|
@ -66,6 +69,7 @@ require (
|
|||
github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 // indirect
|
||||
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
|
||||
github.com/sirupsen/logrus v1.6.0 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/stretchr/testify v1.8.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.1 // indirect
|
||||
github.com/tinylib/msgp v1.1.8 // indirect
|
||||
|
|
|
@ -89,6 +89,7 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
|
|||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
@ -193,6 +194,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
|
|||
github.com/holiman/uint256 v1.2.1 h1:XRtyuda/zw2l+Bq/38n5XUoEF72aSOu/77Thd9pPp2o=
|
||||
github.com/holiman/uint256 v1.2.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw=
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
|
||||
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
|
||||
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
|
||||
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
|
||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
|
@ -287,6 +290,7 @@ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ
|
|||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94 h1:rmMl4fXJhKMNWl+K+r/fq4FbbKI+Ia2m9hYBLm2h4G4=
|
||||
github.com/savsgio/dictpool v0.0.0-20221023140959-7bf2e61cea94/go.mod h1:90zrgN3D/WJsDd1iXHT96alCoN2KJo6/4x1DZC3wZs8=
|
||||
github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d/go.mod h1:Gy+0tqhJvgGlqnTF8CVGP0AaGRjwBtXs/a5PA0Y3+A4=
|
||||
|
@ -298,6 +302,10 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
|
|||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
|
||||
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
|
||||
github.com/spf13/cobra v1.8.0 h1:7aJaZx1B85qltLMc546zn58BxxfZdR/W22ej9CFoEf0=
|
||||
github.com/spf13/cobra v1.8.0/go.mod h1:WXLWApfZ71AjXPya3WOlMsY9yMs7YeiHhFVlvLyhcho=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
|
@ -357,6 +365,8 @@ go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
|
|||
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
|
||||
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
|
||||
go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0=
|
||||
go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk=
|
||||
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
|
||||
go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
|
|
|
@ -287,7 +287,7 @@ func consume(ctx context.Context, params *consumerParams) {
|
|||
|
||||
// If the channel was closed, exit immediately
|
||||
if !ok {
|
||||
params.logger.Info("Closing, channel was closed")
|
||||
params.logger.Debug("Closing, channel was closed")
|
||||
params.wg.Done()
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue