wormhole-explorer/parser/cmd/backfiller/run.go

103 lines
3.3 KiB
Go

package backfiller
import (
"context"
"fmt"
"time"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
vaaPayloadParser "github.com/wormhole-foundation/wormhole-explorer/common/client/parser"
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"github.com/wormhole-foundation/wormhole-explorer/parser/config"
"github.com/wormhole-foundation/wormhole-explorer/parser/http/vaa"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
"github.com/wormhole-foundation/wormhole-explorer/parser/processor"
"go.uber.org/zap"
)
func Run(config *config.BackfillerConfiguration) {
rootCtx := context.Background()
logger := logger.New("wormhole-explorer-parser", logger.WithLevel(config.LogLevel))
logger.Info("Starting wormhole-explorer-parser as backfiller ...")
startTime, err := time.Parse(time.RFC3339, config.StartTime)
if err != nil {
logger.Fatal("failed to parse start time", zap.Error(err))
}
endTime := time.Now()
if config.EndTime != "" {
endTime, err = time.Parse(time.RFC3339, config.EndTime)
if err != nil {
logger.Fatal("Failed to parse end time", zap.Error(err))
}
}
if startTime.After(endTime) {
logger.Fatal("Start time should be before end time",
zap.String("start_time", startTime.Format(time.RFC3339)),
zap.String("end_time", endTime.Format(time.RFC3339)))
}
//setup DB connection
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
if err != nil {
logger.Fatal("Failed to connect MongoDB", zap.Error(err))
}
parserVAAAPIClient, err := vaaPayloadParser.NewParserVAAAPIClient(config.VaaPayloadParserTimeout, config.VaaPayloadParserURL, logger)
if err != nil {
logger.Fatal("Failed to create parse vaa api client")
}
parserRepository := parser.NewRepository(db.Database, logger)
vaaRepository := vaa.NewRepository(db.Database, logger)
// create a token provider
tokenProvider := domain.NewTokenProvider(config.P2pNetwork)
//create a processor
eventProcessor := processor.New(parserVAAAPIClient, parserRepository, alert.NewDummyClient(), metrics.NewDummyMetrics(), tokenProvider, logger)
logger.Info("Started wormhole-explorer-parser as backfiller")
//start backfilling
page := int64(0)
for {
logger.Info("Processing page", zap.Int64("page", page),
zap.String("start_time", startTime.Format(time.RFC3339)),
zap.String("end_time", endTime.Format(time.RFC3339)))
vaas, err := vaaRepository.FindPageByTimeRange(rootCtx, startTime, endTime, page, config.PageSize, config.SortAsc)
if err != nil {
logger.Error("Failed to get vaas", zap.Error(err))
break
}
if len(vaas) == 0 {
logger.Info("Empty page", zap.Int64("page", page))
break
}
for _, v := range vaas {
logger.Debug("Processing vaa", zap.String("id", v.ID))
p := &processor.Params{Vaa: v.Vaa, TrackID: fmt.Sprintf("backfiller-%s", v.ID)}
_, err := eventProcessor.Process(rootCtx, p)
if err != nil {
logger.Error("Failed to process vaa", zap.String("id", v.ID), zap.Error(err))
}
}
page++
}
logger.Info("closing MongoDB connection...")
db.DisconnectWithTimeout(10 * time.Second)
logger.Info("Finish wormhole-explorer-parser as backfiller")
}