Remove duplicated code: MongoDB connection handling (#590)

### Description

This pull request removes duplicated code related to MongoDB connection/disconnection attempts. This code was copied across all 8 microservices.

The functionality is now unified under the `common/dbutil` package.
This commit is contained in:
agodnic 2023-08-07 16:05:08 -03:00 committed by GitHub
parent 0f1797e44a
commit 3c7bab3f8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 181 additions and 263 deletions

View File

@ -7,7 +7,7 @@ import (
"time"
"github.com/wormhole-foundation/wormhole-explorer/analytics/prices"
"github.com/wormhole-foundation/wormhole-explorer/common/db"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
"go.uber.org/zap"
@ -25,7 +25,7 @@ func RunVaaVolumeFromMongo(mongoUri, mongoDb, outputFile, pricesFile string) {
logger.Info("starting wormhole-explorer-analytics ...")
//setup DB connection
db, err := db.New(rootCtx, logger, mongoUri, mongoDb)
db, err := dbutil.Connect(rootCtx, logger, mongoUri, mongoDb)
if err != nil {
logger.Fatal("Failed to connect MongoDB", zap.Error(err))
}
@ -98,8 +98,9 @@ func RunVaaVolumeFromMongo(mongoUri, mongoDb, outputFile, pricesFile string) {
}
page++
}
logger.Info("Closing database connections ...")
db.Close()
logger.Info("closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
for k := range converter.MissingTokensCounter {
fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), converter.MissingTokens[k], converter.MissingTokensCounter[k]))

View File

@ -7,6 +7,7 @@ import (
"os"
"os/signal"
"syscall"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
@ -23,7 +24,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/analytics/queue"
wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional"
sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs"
"github.com/wormhole-foundation/wormhole-explorer/common/db"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
health "github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"go.mongodb.org/mongo-driver/mongo"
@ -57,7 +58,7 @@ func Run() {
// setup DB connection
logger.Info("connecting to MongoDB...")
db, err := db.New(rootCtx, logger, config.MongodbURI, config.MongodbDatabase)
db, err := dbutil.Connect(rootCtx, logger, config.MongodbURI, config.MongodbDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
@ -119,12 +120,16 @@ func Run() {
logger.Info("cancelling root context...")
rootCtxCancel()
logger.Info("closing metrics client...")
metric.Close()
logger.Info("closing HTTP server...")
server.Stop()
logger.Info("closing MongoDB connection...")
db.Close()
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("terminated successfully")
}

View File

@ -1,19 +0,0 @@
// Package db handle mongodb connections.
package db
import (
"context"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Connect create a new mongo db client for the options defined in the input param url.
func Connect(ctx context.Context, url string) (*mongo.Client, error) {
cli, err := mongo.NewClient(options.Client().ApplyURI(url))
if err != nil {
return cli, err
}
err = cli.Connect(ctx)
return cli, err
}

View File

@ -32,7 +32,6 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/config"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/tvl"
"github.com/wormhole-foundation/wormhole-explorer/api/middleware"
"github.com/wormhole-foundation/wormhole-explorer/api/response"
@ -40,6 +39,7 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan"
rpcApi "github.com/wormhole-foundation/wormhole-explorer/api/rpc"
wormscanCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
xlogger "github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/common/utils"
"go.uber.org/zap"
@ -103,11 +103,10 @@ func main() {
// Setup DB
rootLogger.Info("connecting to MongoDB")
cli, err := db.Connect(appCtx, cfg.DB.URL)
db, err := dbutil.Connect(appCtx, rootLogger, cfg.DB.URL, cfg.DB.Name)
if err != nil {
rootLogger.Fatal("failed to connect to MongoDB", zap.Error(err))
}
db := cli.Database(cfg.DB.Name)
// Get cache get function
rootLogger.Info("initializing cache")
@ -126,12 +125,12 @@ func main() {
// Set up repositories
rootLogger.Info("initializing repositories")
addressRepo := address.NewRepository(db, rootLogger)
vaaRepo := vaa.NewRepository(db, rootLogger)
obsRepo := observations.NewRepository(db, rootLogger)
governorRepo := governor.NewRepository(db, rootLogger)
infrastructureRepo := infrastructure.NewRepository(db, rootLogger)
heartbeatsRepo := heartbeats.NewRepository(db, rootLogger)
addressRepo := address.NewRepository(db.Database, rootLogger)
vaaRepo := vaa.NewRepository(db.Database, rootLogger)
obsRepo := observations.NewRepository(db.Database, rootLogger)
governorRepo := governor.NewRepository(db.Database, rootLogger)
infrastructureRepo := infrastructure.NewRepository(db.Database, rootLogger)
heartbeatsRepo := heartbeats.NewRepository(db.Database, rootLogger)
transactionsRepo := transactions.NewRepository(
tvl,
influxCli,
@ -139,7 +138,7 @@ func main() {
cfg.Influx.Bucket24Hours,
cfg.Influx.Bucket30Days,
cfg.Influx.BucketInfinite,
db,
db.Database,
rootLogger,
)
@ -223,10 +222,16 @@ func main() {
}
rootLogger.Info("cleanup tasks...")
rootLogger.Info("shutting down server...")
app.Shutdown()
rootLogger.Info("closing cache...")
cache.Close()
rootLogger.Info("closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
rootLogger.Info("terminated API service successfully")
}

View File

@ -1,31 +0,0 @@
package db
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// Database definition.
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// New connects to DB and returns a client that will disconnect when the passed in context is cancelled.
func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
// Close closes the database connections.
func (d *Database) Close() error {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
return d.client.Disconnect(ctx)
}

76
common/dbutil/session.go Normal file
View File

@ -0,0 +1,76 @@
package dbutil
import (
"context"
"fmt"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.uber.org/zap"
)
// Session is a plain-old-data struct that represents a handle to a MongoDB database.
type Session struct {
Client *mongo.Client
Database *mongo.Database
logger *zap.Logger
}
// Connect to a MongoDB database.
func Connect(
ctx context.Context,
logger *zap.Logger,
uri string,
databaseName string,
) (*Session, error) {
// Create a timed sub-context for the connection attempt
const connectTimeout = 10 * time.Second
subContext, cancelFunc := context.WithTimeout(ctx, connectTimeout)
defer cancelFunc()
// Connect to MongoDB
client, err := mongo.Connect(subContext, options.Client().ApplyURI(uri))
if err != nil {
return nil, fmt.Errorf("failed to connect to MongoDB: %w", err)
}
// Ping the database to make sure we're actually connected
//
// This can detect a misconfuiguration error when a service is being initialized,
// rather than waiting for the first query to fail in the service's processing loop.
err = client.Ping(subContext, readpref.Primary())
if err != nil {
return nil, fmt.Errorf("failed to ping MongoDB database: %w", err)
}
// Populate the result struct and return
db := &Session{
Client: client,
Database: client.Database(databaseName),
}
return db, nil
}
// Disconnect from a MongoDB database.
func (s *Session) DisconnectWithTimeout(timeout time.Duration) error {
// Create a timed sub-context for the disconnection attempt
subContext, cancelFunc := context.WithTimeout(context.Background(), timeout)
defer cancelFunc()
// Attempt to disconnect
err := s.Client.Disconnect(subContext)
if err != nil {
s.logger.Warn(
"failed to disconnect from MongoDB",
zap.Duration("timeout", timeout),
zap.Error(err),
)
return fmt.Errorf("failed to disconnect from MongoDB: %w", err)
}
return nil
}

View File

@ -2,13 +2,14 @@ package backfiller
import (
"context"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/builder"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/config"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/watcher"
@ -24,7 +25,7 @@ func Run(config *config.BackfillerConfiguration) {
logger.Info("Starting wormhole-explorer-contract-watcher as backfiller ...")
//setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
@ -58,8 +59,8 @@ func Run(config *config.BackfillerConfiguration) {
watcher.Backfill(rootCtx, config.FromBlock, config.ToBlock, config.PageSize, config.PersistBlock)
logger.Info("Closing database connections ...")
db.Close()
logger.Info("closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Finish wormhole-explorer-contract-watcher as backfiller")

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
@ -17,7 +18,6 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/http/infrastructure"
cwAlert "github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/ankr"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/processor"
"github.com/wormhole-foundation/wormhole-explorer/contract-watcher/storage"
@ -78,7 +78,7 @@ func Run() {
logger.Info("Starting wormhole-explorer-contract-watcher ...")
//setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
@ -126,10 +126,13 @@ func Run() {
logger.Info("Closing processor ...")
processor.Close()
logger.Info("Closing database connections ...")
db.Close()
logger.Info("closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-contract-watcher")
}

View File

@ -1,31 +0,0 @@
package db
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// Database definition.
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// New connects to DB and returns a client that will disconnect when the passed in context is cancelled.
func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
// Close closes the database connections.
func (d *Database) Close() error {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
return d.client.Disconnect(ctx)
}

View File

@ -3,8 +3,10 @@ package main
import (
"context"
"encoding/hex"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
@ -25,12 +27,13 @@ func RunTxHashEncoding(cfg TxHashEncondingConfig) {
ctx := context.Background()
logger := logger.New("wormhole-fly", logger.WithLevel(cfg.LogLevel))
db, err := storage.GetDB(ctx, logger, cfg.MongoURI, cfg.MongoDatabase)
db, err := dbutil.Connect(ctx, logger, cfg.MongoURI, cfg.MongoDatabase)
if err != nil {
logger.Fatal("could not connect to DB", zap.Error(err))
}
defer db.DisconnectWithTimeout(10 * time.Second)
repository := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), db, logger)
repository := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), db.Database, logger)
workerTxHashEncoding(ctx, logger, repository, vaa.ChainID(cfg.ChainID), cfg.PageSize)
}

View File

@ -4,12 +4,13 @@ import (
"context"
"fmt"
"sync"
"time"
"github.com/schollz/progressbar/v3"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
"go.mongodb.org/mongo-driver/mongo"
"go.uber.org/zap"
)
@ -19,7 +20,7 @@ type Workpool struct {
Workers int
Queue chan string
WG sync.WaitGroup
DB *mongo.Database
DB *dbutil.Session
Log *zap.Logger
Bar *progressbar.ProgressBar
WorkerFunc GenericWorker
@ -42,7 +43,7 @@ func NewWorkpool(ctx context.Context, cfg WorkerConfiguration, workerFunc Generi
WorkerFunc: workerFunc,
}
db, err := storage.GetDB(ctx, wp.Log, cfg.MongoURI, cfg.MongoDatabase)
db, err := dbutil.Connect(ctx, wp.Log, cfg.MongoURI, cfg.MongoDatabase)
if err != nil {
panic(err)
}
@ -59,9 +60,11 @@ func NewWorkpool(ctx context.Context, cfg WorkerConfiguration, workerFunc Generi
}
func (w *Workpool) Process(ctx context.Context) error {
repo := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), w.DB, w.Log)
repo := storage.NewRepository(alert.NewDummyClient(), metrics.NewDummyMetrics(), w.DB.Database, w.Log)
var err error
defer w.DB.DisconnectWithTimeout(10 * time.Second)
for {
select {
case line := <-w.Queue:

View File

@ -6,6 +6,7 @@ import (
"log"
"strconv"
"strings"
"time"
"fmt"
"os"
@ -15,6 +16,7 @@ import (
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/go-redis/redis/v8"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
@ -270,18 +272,18 @@ func main() {
logger.Fatal("You must set your 'MONGODB_DATABASE' environmental variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable")
}
db, err := storage.GetDB(rootCtx, logger, uri, databaseName)
db, err := dbutil.Connect(rootCtx, logger, uri, databaseName)
if err != nil {
logger.Fatal("could not connect to DB", zap.Error(err))
}
// Run the database migration.
err = migration.Run(db)
err = migration.Run(db.Database)
if err != nil {
logger.Fatal("error running migration", zap.Error(err))
}
repository := storage.NewRepository(alertClient, metrics, db, logger)
repository := storage.NewRepository(alertClient, metrics, db.Database, logger)
// Outbound gossip message queue
sendC := make(chan []byte)
@ -503,9 +505,13 @@ func main() {
supervisor.WithPropagatePanic)
<-rootCtx.Done()
// TODO: wait for things to shut down gracefully
vaaGossipConsumerSplitter.Close()
server.Stop()
logger.Info("Closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
}
// getGovernorConfigNodeName get node name from governor config.

View File

@ -1,25 +0,0 @@
package storage
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// GetDB connects to DB and returns a client that will disconnect when the passed in context is cancelled
func GetDB(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*mongo.Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
go func() {
<-appCtx.Done()
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
err := cli.Disconnect(ctx)
log.Error("error disconnecting from db", zap.Error(err))
}()
return cli.Database(databaseName), err
}

View File

@ -5,10 +5,10 @@ import (
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/parser/config"
"github.com/wormhole-foundation/wormhole-explorer/parser/http/vaa"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
"github.com/wormhole-foundation/wormhole-explorer/parser/processor"
@ -43,7 +43,7 @@ func Run(config *config.BackfillerConfiguration) {
}
//setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("Failed to connect MongoDB", zap.Error(err))
}
@ -87,8 +87,9 @@ func Run(config *config.BackfillerConfiguration) {
}
page++
}
logger.Info("Closing database connections ...")
db.Close()
logger.Info("closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Finish wormhole-explorer-parser as backfiller")
}

View File

@ -6,18 +6,19 @@ import (
"os"
"os/signal"
"syscall"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/parser/config"
"github.com/wormhole-foundation/wormhole-explorer/parser/consumer"
"github.com/wormhole-foundation/wormhole-explorer/parser/http/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/parser/http/vaa"
parserAlert "github.com/wormhole-foundation/wormhole-explorer/parser/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
@ -52,7 +53,7 @@ func Run() {
logger.Info("Starting wormhole-explorer-parser ...")
// setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
@ -104,10 +105,12 @@ func Run() {
logger.Info("root context cancelled, exiting...")
rootCtxCancel()
logger.Info("Closing database connections ...")
db.Close()
logger.Info("closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-parser")
}

View File

@ -1,31 +0,0 @@
package db
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// Database definition.
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// New connects to DB and returns a client that will disconnect when the passed in context is cancelled.
func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
// Close closes the database connections.
func (d *Database) Close() error {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
return d.client.Disconnect(ctx)
}

View File

@ -6,17 +6,18 @@ import (
"os"
"os/signal"
"syscall"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/config"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/healthcheck"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/http/infrastructure"
pipelineAlert "github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/alert"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/internal/sns"
"github.com/wormhole-foundation/wormhole-explorer/pipeline/pipeline"
@ -52,7 +53,7 @@ func main() {
logger.Info("Starting wormhole-explorer-pipeline ...")
//setup DB connection
db, err := db.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
@ -115,10 +116,12 @@ func main() {
logger.Info("Closing tx hash handler ...")
close(quit)
logger.Info("Closing database connections ...")
db.Close()
logger.Info("closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-pipeline")
}

View File

@ -1,31 +0,0 @@
package db
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
// Database definition.
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// New connects to DB and returns a client that will disconnect when the passed in context is cancelled.
func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
// Close closes the database connections.
func (d *Database) Close() error {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
return d.client.Disconnect(ctx)
}

View File

@ -6,8 +6,10 @@ import (
"os"
"os/signal"
"syscall"
"time"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/spy/config"
"github.com/wormhole-foundation/wormhole-explorer/spy/grpc"
@ -67,7 +69,7 @@ func main() {
publisher := grpc.NewPublisher(svs, avs, logger)
db, err := storage.New(rootCtx, logger, config.MongoURI, config.MongoDatabase)
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
if err != nil {
logger.Fatal("failed to connect MongoDB", zap.Error(err))
}
@ -98,8 +100,10 @@ func main() {
logger.Info("Closing GRPC server ...")
grpcServer.Stop()
logger.Info("Closing database connections ...")
db.Close()
logger.Info("Closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-spy")

View File

@ -1,30 +0,0 @@
package storage
import (
"context"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
type Database struct {
Database *mongo.Database
client *mongo.Client
}
// New connects to DB and returns a client that will disconnect when the passed in context is cancelled
func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) {
cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri))
if err != nil {
return nil, err
}
return &Database{client: cli, Database: cli.Database(databaseName)}, err
}
// Close closes the database connections.
func (d *Database) Close() error {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
return d.client.Disconnect(ctx)
}

View File

@ -11,12 +11,11 @@ import (
"syscall"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.uber.org/zap"
)
@ -65,12 +64,11 @@ func main() {
}()
// Initialize the database client
cli, err := mongo.Connect(rootCtx, options.Client().ApplyURI(cfg.MongodbUri))
db, err := dbutil.Connect(rootCtx, mainLogger, cfg.MongodbUri, cfg.MongodbDatabase)
if err != nil {
log.Fatal("Failed to initialize MongoDB client: ", err)
}
defer cli.Disconnect(rootCtx)
repository := consumer.NewRepository(rootLogger, cli.Database(cfg.MongodbDatabase))
repository := consumer.NewRepository(rootLogger, db.Database)
strategyCallbacks, err := parseStrategyCallbacks(mainLogger, cfg, repository)
if err != nil {
@ -116,7 +114,12 @@ func main() {
}
// Wait for all workers to finish before closing
mainLogger.Info("Waiting for all workers to finish...")
wg.Wait()
mainLogger.Info("Closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
mainLogger.Info("Closing main goroutine")
}

View File

@ -12,9 +12,9 @@ import (
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/wormhole-foundation/wormhole-explorer/common/client/sqs"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/health"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
@ -47,19 +47,13 @@ func main() {
chains.Initialize(&cfg.RpcProviderSettings)
// initialize the database client
cli, err := mongo.Connect(rootCtx, options.Client().ApplyURI(cfg.MongodbUri))
db, err := dbutil.Connect(rootCtx, logger, cfg.MongodbUri, cfg.MongodbDatabase)
if err != nil {
log.Fatal("Failed to initialize MongoDB client: ", err)
}
defer func() {
subCtx, cancelSubCtx := context.WithTimeout(context.Background(), 10*time.Second)
_ = cli.Disconnect(subCtx)
cancelSubCtx()
}()
db := cli.Database(cfg.MongodbDatabase)
// start serving /health and /ready endpoints
healthChecks, err := makeHealthChecks(rootCtx, cfg, db)
healthChecks, err := makeHealthChecks(rootCtx, cfg, db.Database)
if err != nil {
logger.Fatal("Failed to create health checks", zap.Error(err))
}
@ -68,7 +62,7 @@ func main() {
// create and start a consumer.
vaaConsumeFunc := newVAAConsumeFunc(rootCtx, cfg, metrics, logger)
repository := consumer.NewRepository(logger, db)
repository := consumer.NewRepository(logger, db.Database)
consumer := consumer.New(vaaConsumeFunc, &cfg.RpcProviderSettings, rootCtx, logger, repository, metrics)
consumer.Start(rootCtx)
@ -87,8 +81,13 @@ func main() {
// graceful shutdown
logger.Info("Cancelling root context...")
rootCtxCancel()
logger.Info("Closing Http server...")
server.Stop()
logger.Info("Closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Terminated wormhole-explorer-tx-tracker")
}