2023-05-24 12:52:22 -07:00
|
|
|
package backfiller
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2023-11-27 07:31:35 -08:00
|
|
|
"fmt"
|
2023-05-24 12:52:22 -07:00
|
|
|
"time"
|
|
|
|
|
2023-07-05 11:39:56 -07:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
|
2023-08-10 07:02:14 -07:00
|
|
|
vaaPayloadParser "github.com/wormhole-foundation/wormhole-explorer/common/client/parser"
|
2023-08-07 12:05:08 -07:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
|
2023-11-28 05:16:40 -08:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
|
2023-05-24 12:52:22 -07:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
|
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/config"
|
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/http/vaa"
|
2023-07-03 11:46:47 -07:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
|
2023-05-24 12:52:22 -07:00
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
|
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/parser/processor"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
func Run(config *config.BackfillerConfiguration) {
|
|
|
|
|
|
|
|
rootCtx := context.Background()
|
|
|
|
|
|
|
|
logger := logger.New("wormhole-explorer-parser", logger.WithLevel(config.LogLevel))
|
|
|
|
|
2023-11-27 07:31:35 -08:00
|
|
|
logger.Info("Starting wormhole-explorer-parser as backfiller ...")
|
2023-05-24 12:52:22 -07:00
|
|
|
|
|
|
|
startTime, err := time.Parse(time.RFC3339, config.StartTime)
|
|
|
|
if err != nil {
|
|
|
|
logger.Fatal("failed to parse start time", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
|
|
|
endTime := time.Now()
|
|
|
|
if config.EndTime != "" {
|
|
|
|
endTime, err = time.Parse(time.RFC3339, config.EndTime)
|
|
|
|
if err != nil {
|
|
|
|
logger.Fatal("Failed to parse end time", zap.Error(err))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if startTime.After(endTime) {
|
|
|
|
logger.Fatal("Start time should be before end time",
|
2023-07-12 08:51:17 -07:00
|
|
|
zap.String("start_time", startTime.Format(time.RFC3339)),
|
|
|
|
zap.String("end_time", endTime.Format(time.RFC3339)))
|
2023-05-24 12:52:22 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
//setup DB connection
|
2023-09-25 12:50:16 -07:00
|
|
|
db, err := dbutil.Connect(rootCtx, logger, config.MongoURI, config.MongoDatabase, false)
|
2023-05-24 12:52:22 -07:00
|
|
|
if err != nil {
|
|
|
|
logger.Fatal("Failed to connect MongoDB", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
2023-08-10 07:02:14 -07:00
|
|
|
parserVAAAPIClient, err := vaaPayloadParser.NewParserVAAAPIClient(config.VaaPayloadParserTimeout, config.VaaPayloadParserURL, logger)
|
2023-05-24 12:52:22 -07:00
|
|
|
if err != nil {
|
|
|
|
logger.Fatal("Failed to create parse vaa api client")
|
|
|
|
}
|
|
|
|
|
|
|
|
parserRepository := parser.NewRepository(db.Database, logger)
|
|
|
|
vaaRepository := vaa.NewRepository(db.Database, logger)
|
|
|
|
|
2023-11-28 05:16:40 -08:00
|
|
|
// create a token provider
|
|
|
|
tokenProvider := domain.NewTokenProvider(config.P2pNetwork)
|
|
|
|
|
2023-05-24 12:52:22 -07:00
|
|
|
//create a processor
|
2023-11-28 05:16:40 -08:00
|
|
|
eventProcessor := processor.New(parserVAAAPIClient, parserRepository, alert.NewDummyClient(), metrics.NewDummyMetrics(), tokenProvider, logger)
|
2023-05-24 12:52:22 -07:00
|
|
|
|
|
|
|
logger.Info("Started wormhole-explorer-parser as backfiller")
|
|
|
|
|
|
|
|
//start backfilling
|
|
|
|
page := int64(0)
|
|
|
|
for {
|
|
|
|
logger.Info("Processing page", zap.Int64("page", page),
|
|
|
|
zap.String("start_time", startTime.Format(time.RFC3339)),
|
|
|
|
zap.String("end_time", endTime.Format(time.RFC3339)))
|
|
|
|
|
2023-07-12 08:51:17 -07:00
|
|
|
vaas, err := vaaRepository.FindPageByTimeRange(rootCtx, startTime, endTime, page, config.PageSize, config.SortAsc)
|
2023-05-24 12:52:22 -07:00
|
|
|
if err != nil {
|
|
|
|
logger.Error("Failed to get vaas", zap.Error(err))
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(vaas) == 0 {
|
|
|
|
logger.Info("Empty page", zap.Int64("page", page))
|
|
|
|
break
|
|
|
|
}
|
|
|
|
for _, v := range vaas {
|
|
|
|
logger.Debug("Processing vaa", zap.String("id", v.ID))
|
2023-11-27 07:31:35 -08:00
|
|
|
p := &processor.Params{Vaa: v.Vaa, TrackID: fmt.Sprintf("backfiller-%s", v.ID)}
|
|
|
|
_, err := eventProcessor.Process(rootCtx, p)
|
2023-05-24 12:52:22 -07:00
|
|
|
if err != nil {
|
|
|
|
logger.Error("Failed to process vaa", zap.String("id", v.ID), zap.Error(err))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
page++
|
|
|
|
}
|
2023-08-07 12:05:08 -07:00
|
|
|
|
|
|
|
logger.Info("closing MongoDB connection...")
|
|
|
|
db.DisconnectWithTimeout(10 * time.Second)
|
2023-05-24 12:52:22 -07:00
|
|
|
|
|
|
|
logger.Info("Finish wormhole-explorer-parser as backfiller")
|
|
|
|
}
|