Fix [api] get transactions by address and add mongo query monitor (#707)
Fix api endpoint get transactions by address, add mongo query monitor
This commit is contained in:
parent
c0af7eac79
commit
217c6f28be
|
@ -25,7 +25,7 @@ func RunVaaVolumeFromMongo(mongoUri, mongoDb, outputFile, pricesFile string) {
|
|||
logger.Info("starting wormhole-explorer-analytics ...")
|
||||
|
||||
//setup DB connection
|
||||
db, err := dbutil.Connect(rootCtx, logger, mongoUri, mongoDb)
|
||||
db, err := dbutil.Connect(rootCtx, logger, mongoUri, mongoDb, false)
|
||||
if err != nil {
|
||||
logger.Fatal("Failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ func Run() {
|
|||
|
||||
// setup DB connection
|
||||
logger.Info("connecting to MongoDB...")
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongodbURI, config.MongodbDatabase)
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongodbURI, config.MongodbDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -983,6 +983,11 @@ func (r *Repository) ListTransactionsByAddress(
|
|||
}},
|
||||
})
|
||||
|
||||
// Sorting criteria
|
||||
pipeline = append(pipeline, bson.D{
|
||||
{"$sort", bson.D{bson.E{"timestamp", pagination.GetSortInt()}}},
|
||||
})
|
||||
|
||||
// Unset unused fields
|
||||
pipeline = append(pipeline, bson.D{
|
||||
{"$unset", []interface{}{"transferPrices", "vaas", "vaaTxIdHash", "parsedVaa"}},
|
||||
|
|
|
@ -106,7 +106,7 @@ func main() {
|
|||
|
||||
// Setup DB
|
||||
rootLogger.Info("connecting to MongoDB")
|
||||
db, err := dbutil.Connect(appCtx, rootLogger, cfg.DB.URL, cfg.DB.Name)
|
||||
db, err := dbutil.Connect(appCtx, rootLogger, cfg.DB.URL, cfg.DB.Name, false)
|
||||
if err != nil {
|
||||
rootLogger.Fatal("failed to connect to MongoDB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.mongodb.org/mongo-driver/event"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"go.mongodb.org/mongo-driver/mongo/readpref"
|
||||
|
@ -24,6 +25,7 @@ func Connect(
|
|||
logger *zap.Logger,
|
||||
uri string,
|
||||
databaseName string,
|
||||
enableQueryLog bool,
|
||||
) (*Session, error) {
|
||||
|
||||
// Create a timed sub-context for the connection attempt
|
||||
|
@ -31,8 +33,18 @@ func Connect(
|
|||
subContext, cancelFunc := context.WithTimeout(ctx, connectTimeout)
|
||||
defer cancelFunc()
|
||||
|
||||
// build mongo options
|
||||
options := options.Client().ApplyURI(uri)
|
||||
if enableQueryLog {
|
||||
cmdMonitor := &event.CommandMonitor{
|
||||
Started: func(_ context.Context, evt *event.CommandStartedEvent) {
|
||||
logger.Info(evt.Command.String())
|
||||
}}
|
||||
options.SetMonitor(cmdMonitor)
|
||||
}
|
||||
|
||||
// Connect to MongoDB
|
||||
client, err := mongo.Connect(subContext, options.Client().ApplyURI(uri))
|
||||
client, err := mongo.Connect(subContext, options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to MongoDB: %w", err)
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ func Run(config *config.BackfillerConfiguration) {
|
|||
logger.Info("Starting wormhole-explorer-contract-watcher as backfiller ...")
|
||||
|
||||
//setup DB connection
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -86,7 +86,7 @@ func Run() {
|
|||
logger.Info("Starting wormhole-explorer-contract-watcher ...")
|
||||
|
||||
//setup DB connection
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ func RunTxHashEncoding(cfg TxHashEncondingConfig) {
|
|||
ctx := context.Background()
|
||||
logger := logger.New("wormhole-fly", logger.WithLevel(cfg.LogLevel))
|
||||
|
||||
db, err := dbutil.Connect(ctx, logger, cfg.MongoURI, cfg.MongoDatabase)
|
||||
db, err := dbutil.Connect(ctx, logger, cfg.MongoURI, cfg.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("could not connect to DB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ func NewWorkpool(ctx context.Context, cfg WorkerConfiguration, workerFunc Generi
|
|||
WorkerFunc: workerFunc,
|
||||
}
|
||||
|
||||
db, err := dbutil.Connect(ctx, wp.Log, cfg.MongoURI, cfg.MongoDatabase)
|
||||
db, err := dbutil.Connect(ctx, wp.Log, cfg.MongoURI, cfg.MongoDatabase, false)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
|
@ -270,7 +270,7 @@ func main() {
|
|||
logger.Fatal("You must set your 'MONGODB_DATABASE' environmental variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable")
|
||||
}
|
||||
|
||||
db, err := dbutil.Connect(rootCtx, logger, uri, databaseName)
|
||||
db, err := dbutil.Connect(rootCtx, logger, uri, databaseName, false)
|
||||
if err != nil {
|
||||
logger.Fatal("could not connect to DB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ func Run(config *config.BackfillerConfiguration) {
|
|||
}
|
||||
|
||||
//setup DB connection
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("Failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -55,7 +55,7 @@ func Run() {
|
|||
logger.Info("Starting wormhole-explorer-parser ...")
|
||||
|
||||
// setup DB connection
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ func main() {
|
|||
logger.Info("Starting wormhole-explorer-pipeline ...")
|
||||
|
||||
//setup DB connection
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -69,7 +69,7 @@ func main() {
|
|||
|
||||
publisher := grpc.NewPublisher(svs, avs, logger)
|
||||
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase)
|
||||
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
|
||||
if err != nil {
|
||||
logger.Fatal("failed to connect MongoDB", zap.Error(err))
|
||||
}
|
||||
|
|
|
@ -33,4 +33,24 @@ In the `cmd/backfiller` directory, there is a backfiller program that can be use
|
|||
|
||||
The backfiller processes VAAs between two input dates.
|
||||
|
||||
VAAs will be iterated in a descending timestamp order, that is, newer VAAs are processed first. The rationale behind this is that old VAAs are not as relevant as recent ones.
|
||||
VAAs will be iterated in a descending timestamp order, that is, newer VAAs are processed first. The rationale behind this is that old VAAs are not as relevant as recent ones.
|
||||
|
||||
## Localstack configuration
|
||||
|
||||
### Config sns topic
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sns create-topic --name vaas-pipeline.fifo --attributes FifoTopic=true,ContentBasedDeduplication=false
|
||||
|
||||
### Config SQS FIFO with dead letter queue localstack
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=wormhole-vaa-tx-tracker-dlq-queue.fifo --attributes "FifoQueue=true"
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sqs create-queue --queue-name=wormhole-vaa-tx-tracker-queue.fifo --attributes FifoQueue=true,MessageRetentionPeriod=3600,ReceiveMessageWaitTimeSeconds=5,VisibilityTimeout=20,RedrivePolicy="\"{\\\"deadLetterTargetArn\\\":\\\"arn:aws:sqs:us-east-1:000000000000:wormhole-vaa-tx-tracker-dlq-queue.fifo\\\",\\\"maxReceiveCount\\\":\\\"2\\\"}\""
|
||||
|
||||
### Subscribe SQS FIFO to vaas-pipeline.fifo topic
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sns subscribe --topic-arn arn:aws:sns:us-east-1:000000000000:vaas-pipeline.fifo --protocol sqs --notification-endpoint http://localhost:4566/000000000000/wormhole-vaa-tx-tracker-queue.fifo
|
||||
|
||||
### Check message in the dead letter queue localstack
|
||||
|
||||
aws --profile localstack --endpoint-url=http://localhost:4566 sqs receive-message --queue-url=http://localhost:4566/000000000000/wormhole-vaa-tx-tracker-dlq-queue.fifo
|
|
@ -64,7 +64,7 @@ func main() {
|
|||
}()
|
||||
|
||||
// Initialize the database client
|
||||
db, err := dbutil.Connect(rootCtx, mainLogger, cfg.MongodbUri, cfg.MongodbDatabase)
|
||||
db, err := dbutil.Connect(rootCtx, mainLogger, cfg.MongodbUri, cfg.MongodbDatabase, false)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to initialize MongoDB client: ", err)
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ func main() {
|
|||
chains.Initialize(&cfg.RpcProviderSettings)
|
||||
|
||||
// initialize the database client
|
||||
db, err := dbutil.Connect(rootCtx, logger, cfg.MongodbUri, cfg.MongodbDatabase)
|
||||
db, err := dbutil.Connect(rootCtx, logger, cfg.MongodbUri, cfg.MongodbDatabase, false)
|
||||
if err != nil {
|
||||
log.Fatal("Failed to initialize MongoDB client: ", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue