Add backfiller for parser with more parameters (#1197)
This commit is contained in:
parent
faa8e38b2e
commit
f98d8f28ba
|
@ -10,8 +10,8 @@ import (
|
|||
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/common/repository"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/config"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/http/vaa"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/metrics"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/processor"
|
||||
|
@ -56,8 +56,22 @@ func Run(config *config.BackfillerConfiguration) {
|
|||
logger.Fatal("Failed to create parse vaa api client")
|
||||
}
|
||||
|
||||
query := repository.VaaQuery{
|
||||
StartTime: &startTime,
|
||||
EndTime: &endTime,
|
||||
EmitterChainID: config.EmitterChainID,
|
||||
EmitterAddress: config.EmitterAddress,
|
||||
Sequence: config.Sequence,
|
||||
}
|
||||
|
||||
pagination := repository.Pagination{
|
||||
Page: 0,
|
||||
PageSize: config.PageSize,
|
||||
SortAsc: config.SortAsc,
|
||||
}
|
||||
|
||||
parserRepository := parser.NewRepository(db.Database, logger)
|
||||
vaaRepository := vaa.NewRepository(db.Database, logger)
|
||||
vaaRepository := repository.NewVaaRepository(db.Database, logger)
|
||||
|
||||
// create a token provider
|
||||
tokenProvider := domain.NewTokenProvider(config.P2pNetwork)
|
||||
|
@ -67,21 +81,17 @@ func Run(config *config.BackfillerConfiguration) {
|
|||
|
||||
logger.Info("Started wormhole-explorer-parser as backfiller")
|
||||
|
||||
//start backfilling
|
||||
page := int64(0)
|
||||
for {
|
||||
logger.Info("Processing page", zap.Int64("page", page),
|
||||
zap.String("start_time", startTime.Format(time.RFC3339)),
|
||||
zap.String("end_time", endTime.Format(time.RFC3339)))
|
||||
logger.Info("Processing page", zap.Any("pagination", pagination), zap.Any("query", query))
|
||||
|
||||
vaas, err := vaaRepository.FindPageByTimeRange(rootCtx, startTime, endTime, page, config.PageSize, config.SortAsc)
|
||||
vaas, err := vaaRepository.FindPage(rootCtx, query, pagination)
|
||||
if err != nil {
|
||||
logger.Error("Failed to get vaas", zap.Error(err))
|
||||
break
|
||||
}
|
||||
|
||||
if len(vaas) == 0 {
|
||||
logger.Info("Empty page", zap.Int64("page", page))
|
||||
logger.Info("Empty page", zap.Int64("page", pagination.Page))
|
||||
break
|
||||
}
|
||||
for _, v := range vaas {
|
||||
|
@ -92,7 +102,7 @@ func Run(config *config.BackfillerConfiguration) {
|
|||
logger.Error("Failed to process vaa", zap.String("id", v.ID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
page++
|
||||
pagination.Page++
|
||||
}
|
||||
|
||||
logger.Info("closing MongoDB connection...")
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"github.com/wormhole-foundation/wormhole-explorer/parser/cmd/backfiller"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/cmd/service"
|
||||
"github.com/wormhole-foundation/wormhole-explorer/parser/config"
|
||||
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
@ -41,8 +42,9 @@ func addServiceCommand(root *cobra.Command) {
|
|||
}
|
||||
|
||||
func addBackfiller(root *cobra.Command) {
|
||||
var mongoUri, mongoDb, vaaPayloadParserURL, logLevel, startTime, endTime, sort string
|
||||
var mongoUri, mongoDb, p2pNetwork, vaaPayloadParserURL, logLevel, startTime, endTime, sort, emitterAddress, sequence string
|
||||
var vaaPayloadParserTimeout, pageSize int64
|
||||
var emitterChainID uint16
|
||||
|
||||
sortAsc := false
|
||||
if strings.ToLower(sort) == "asc" {
|
||||
|
@ -56,6 +58,7 @@ func addBackfiller(root *cobra.Command) {
|
|||
LogLevel: logLevel,
|
||||
MongoURI: mongoUri,
|
||||
MongoDatabase: mongoDb,
|
||||
P2pNetwork: p2pNetwork,
|
||||
VaaPayloadParserURL: vaaPayloadParserURL,
|
||||
VaaPayloadParserTimeout: vaaPayloadParserTimeout,
|
||||
StartTime: startTime,
|
||||
|
@ -63,18 +66,33 @@ func addBackfiller(root *cobra.Command) {
|
|||
PageSize: pageSize,
|
||||
SortAsc: sortAsc,
|
||||
}
|
||||
|
||||
if emitterChainID != 0 {
|
||||
eci := sdk.ChainID(emitterChainID)
|
||||
cfg.EmitterChainID = &eci
|
||||
}
|
||||
if emitterAddress != "" {
|
||||
cfg.EmitterAddress = &emitterAddress
|
||||
}
|
||||
if sequence != "" {
|
||||
cfg.Sequence = &sequence
|
||||
}
|
||||
backfiller.Run(cfg)
|
||||
},
|
||||
}
|
||||
backfillerCommand.Flags().StringVar(&logLevel, "log-level", "INFO", "log level")
|
||||
backfillerCommand.Flags().StringVar(&mongoUri, "mongo-uri", "", "Mongo connection")
|
||||
backfillerCommand.Flags().StringVar(&mongoDb, "mongo-database", "", "Mongo database")
|
||||
backfillerCommand.Flags().StringVar(&p2pNetwork, "p2p-network", "", "P2P network")
|
||||
backfillerCommand.Flags().StringVar(&vaaPayloadParserURL, "vaa-payload-parser-url", "", "VAA payload parser service URL")
|
||||
backfillerCommand.Flags().Int64Var(&vaaPayloadParserTimeout, "vaa-payload-parser-timeout", 10, "maximum waiting time in call to VAA payload service in seconds")
|
||||
backfillerCommand.Flags().StringVar(&startTime, "start-time", "1970-01-01T00:00:00Z", "minimum VAA timestamp to process")
|
||||
backfillerCommand.Flags().StringVar(&endTime, "end-time", "", "maximum VAA timestamp to process (default now)")
|
||||
backfillerCommand.Flags().Int64Var(&pageSize, "page-size", 100, "number of documents retrieved at a time")
|
||||
backfillerCommand.Flags().StringVar(&sort, "sort", "desc", "process VAA in asc/desc order of timestamp")
|
||||
backfillerCommand.Flags().Uint16Var(&emitterChainID, "emitter-chain", 0, "emitter chain id")
|
||||
backfillerCommand.Flags().StringVar(&emitterAddress, "emitter-address", "", "emitter address")
|
||||
backfillerCommand.Flags().StringVar(&sequence, "sequence", "", "sequence")
|
||||
|
||||
backfillerCommand.MarkFlagRequired("mongo-uri")
|
||||
backfillerCommand.MarkFlagRequired("mongo-database")
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
"github.com/joho/godotenv"
|
||||
"github.com/sethvargo/go-envconfig"
|
||||
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
|
||||
)
|
||||
|
||||
// p2p network constants.
|
||||
|
@ -38,17 +39,23 @@ type ServiceConfiguration struct {
|
|||
}
|
||||
|
||||
// BackfillerConfiguration represents the application configuration when running as backfiller with default values.
|
||||
|
||||
type BackfillerConfiguration struct {
|
||||
LogLevel string `env:"LOG_LEVEL,default=INFO"`
|
||||
MongoURI string `env:"MONGODB_URI,required"`
|
||||
MongoDatabase string `env:"MONGODB_DATABASE,required"`
|
||||
VaaPayloadParserURL string `env:"VAA_PAYLOAD_PARSER_URL, required"`
|
||||
VaaPayloadParserTimeout int64 `env:"VAA_PAYLOAD_PARSER_TIMEOUT, required"`
|
||||
StartTime string `env:"START_TIME"`
|
||||
EndTime string `env:"END_TIME"`
|
||||
PageSize int64 `env:"PAGE_SIZE,default=100"`
|
||||
SortAsc bool `env:"SORT_ASC,default=false"`
|
||||
P2pNetwork string `env:"P2P_NETWORK,required"`
|
||||
LogLevel string
|
||||
MongoURI string
|
||||
MongoDatabase string
|
||||
P2pNetwork string
|
||||
PageSize int64
|
||||
P2PNetwork string
|
||||
NotionalUrl string
|
||||
VaaPayloadParserURL string
|
||||
VaaPayloadParserTimeout int64
|
||||
StartTime string
|
||||
EndTime string
|
||||
EmitterChainID *sdk.ChainID
|
||||
EmitterAddress *string
|
||||
Sequence *string
|
||||
SortAsc bool
|
||||
}
|
||||
|
||||
// New creates a configuration with the values from .env file and environment variables.
|
||||
|
|
Loading…
Reference in New Issue