329 lines
9.3 KiB
Go
329 lines
9.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
|
|
"github.com/wormhole-foundation/wormhole-explorer/txtracker/chains"
|
|
"github.com/wormhole-foundation/wormhole-explorer/txtracker/config"
|
|
"github.com/wormhole-foundation/wormhole-explorer/txtracker/consumer"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const layoutRcf3339 = "2006-01-02T15:04:05.000Z"
|
|
|
|
func makeLogger(logger *zap.Logger, name string) *zap.Logger {
|
|
|
|
rightPadding := fmt.Sprintf("%-10s", name)
|
|
|
|
l := logger.Named(rightPadding)
|
|
|
|
return l
|
|
}
|
|
|
|
func main() {
|
|
|
|
// Create the top-level context
|
|
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
|
|
|
|
// Load config
|
|
cfg, err := config.LoadFromEnv[config.BackfillerSettings]()
|
|
if err != nil {
|
|
log.Fatal("Failed to load config: ", err)
|
|
}
|
|
|
|
// Initialize rate limiters
|
|
chains.Initialize(&cfg.RpcProviderSettings)
|
|
|
|
// Initialize logger
|
|
rootLogger := logger.New("backfiller", logger.WithLevel(cfg.LogLevel))
|
|
mainLogger := makeLogger(rootLogger, "main")
|
|
mainLogger.Info("Starting")
|
|
|
|
// Spawn a goroutine that will call `cancelFunc` if a signal is received.
|
|
go func() {
|
|
l := makeLogger(rootLogger, "watcher")
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
select {
|
|
case <-rootCtx.Done():
|
|
l.Info("Closing due to cancelled context")
|
|
case _ = <-sigterm:
|
|
l.Info("Cancelling root context")
|
|
rootCtxCancel()
|
|
}
|
|
}()
|
|
|
|
// Initialize the database client
|
|
cli, err := mongo.Connect(rootCtx, options.Client().ApplyURI(cfg.MongodbUri))
|
|
if err != nil {
|
|
log.Fatal("Failed to initialize MongoDB client: ", err)
|
|
}
|
|
defer cli.Disconnect(rootCtx)
|
|
repository := consumer.NewRepository(rootLogger, cli.Database(cfg.MongodbDatabase))
|
|
|
|
strategyCallbacks, err := parseStrategyCallbacks(mainLogger, cfg, repository)
|
|
if err != nil {
|
|
log.Fatal("Failed to parse strategy callbacks: ", err)
|
|
}
|
|
|
|
// Count the number of documents to process
|
|
totalDocuments, err := strategyCallbacks.countFn(rootCtx)
|
|
if err != nil {
|
|
log.Fatal("Closing - failed to count number of global transactions: ", err)
|
|
}
|
|
mainLogger.Info("Starting", zap.Uint64("documentsToProcess", totalDocuments))
|
|
|
|
// Spawn the producer goroutine.
|
|
//
|
|
// The producer sends tasks to the workers via a buffered channel.
|
|
queue := make(chan consumer.GlobalTransaction, cfg.BulkSize)
|
|
p := producerParams{
|
|
logger: makeLogger(rootLogger, "producer"),
|
|
repository: repository,
|
|
queueTx: queue,
|
|
bulkSize: cfg.BulkSize,
|
|
strategyCallbacks: strategyCallbacks,
|
|
}
|
|
go produce(rootCtx, &p)
|
|
|
|
// Spawn a goroutine for each worker
|
|
var wg sync.WaitGroup
|
|
var processedDocuments atomic.Uint64
|
|
wg.Add(int(cfg.NumWorkers))
|
|
for i := uint(0); i < cfg.NumWorkers; i++ {
|
|
name := fmt.Sprintf("worker-%d", i)
|
|
p := consumerParams{
|
|
logger: makeLogger(rootLogger, name),
|
|
rpcProviderSettings: &cfg.RpcProviderSettings,
|
|
repository: repository,
|
|
queueRx: queue,
|
|
wg: &wg,
|
|
totalDocuments: totalDocuments,
|
|
processedDocuments: &processedDocuments,
|
|
}
|
|
go consume(rootCtx, &p)
|
|
}
|
|
|
|
// Wait for all workers to finish before closing
|
|
wg.Wait()
|
|
mainLogger.Info("Closing main goroutine")
|
|
}
|
|
|
|
func parseStrategyCallbacks(
|
|
logger *zap.Logger,
|
|
cfg *config.BackfillerSettings,
|
|
r *consumer.Repository,
|
|
) (*strategyCallbacks, error) {
|
|
|
|
switch cfg.Strategy.Name {
|
|
|
|
case config.BackfillerStrategyReprocessFailed:
|
|
cb := strategyCallbacks{
|
|
countFn: r.CountIncompleteDocuments,
|
|
iteratorFn: r.GetIncompleteDocuments,
|
|
}
|
|
|
|
logger.Info("backfilling incomplete documents")
|
|
|
|
return &cb, nil
|
|
|
|
case config.BackfillerStrategyTimeRange:
|
|
|
|
timestampAfter, err := time.Parse(layoutRcf3339, cfg.Strategy.TimestampAfter)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse timestampAfter: %w", err)
|
|
}
|
|
timestampBefore, err := time.Parse(layoutRcf3339, cfg.Strategy.TimestampBefore)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse timestampBefore: %w", err)
|
|
}
|
|
|
|
cb := strategyCallbacks{
|
|
countFn: func(ctx context.Context) (uint64, error) {
|
|
return r.CountDocumentsByTimeRange(ctx, timestampAfter, timestampBefore)
|
|
},
|
|
iteratorFn: func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error) {
|
|
return r.GetDocumentsByTimeRange(ctx, lastId, lastTimestamp, limit, timestampAfter, timestampBefore)
|
|
},
|
|
}
|
|
|
|
logger.Info("backfilling by time range",
|
|
zap.Time("after", timestampAfter),
|
|
zap.Time("before", timestampBefore),
|
|
)
|
|
|
|
return &cb, nil
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unknown strategy: %s", cfg.Strategy.Name)
|
|
}
|
|
|
|
}
|
|
|
|
type strategyCallbacks struct {
|
|
countFn func(ctx context.Context) (uint64, error)
|
|
iteratorFn func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error)
|
|
}
|
|
|
|
// producerParams contains the parameters for the producer goroutine.
|
|
type producerParams struct {
|
|
logger *zap.Logger
|
|
repository *consumer.Repository
|
|
queueTx chan<- consumer.GlobalTransaction
|
|
bulkSize uint
|
|
strategyCallbacks *strategyCallbacks
|
|
}
|
|
|
|
// produce reads VAA IDs from the database, and sends them through a channel for the workers to consume.
|
|
//
|
|
// The function will return when:
|
|
// - the context is cancelled
|
|
// - a fatal error is encountered
|
|
// - there are no more items to process
|
|
func produce(ctx context.Context, params *producerParams) {
|
|
defer close(params.queueTx)
|
|
|
|
// Producer main loop
|
|
var lastId = ""
|
|
var lastTimestamp *time.Time
|
|
for {
|
|
|
|
// Get a batch of VAA IDs from the database
|
|
globalTxs, err := params.strategyCallbacks.iteratorFn(ctx, lastId, lastTimestamp, params.bulkSize)
|
|
if err != nil {
|
|
params.logger.Error("Closing: failed to read from cursor", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
// If there are no more documents to process, close the goroutine
|
|
if len(globalTxs) == 0 {
|
|
params.logger.Info("Closing: no documents left in database")
|
|
return
|
|
}
|
|
|
|
// Enqueue the VAA IDs, and update the pagination cursor
|
|
params.logger.Debug("queueing batch for consumers", zap.Int("numElements", len(globalTxs)))
|
|
for _, globalTx := range globalTxs {
|
|
select {
|
|
case params.queueTx <- globalTx:
|
|
if len(globalTx.Vaas) != 0 {
|
|
lastId = globalTx.Id
|
|
lastTimestamp = globalTx.Vaas[0].Timestamp
|
|
}
|
|
case <-ctx.Done():
|
|
params.logger.Info("Closing: context was cancelled")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// consumerParams contains the parameters for the consumer goroutine.
|
|
type consumerParams struct {
|
|
logger *zap.Logger
|
|
rpcProviderSettings *config.RpcProviderSettings
|
|
repository *consumer.Repository
|
|
queueRx <-chan consumer.GlobalTransaction
|
|
wg *sync.WaitGroup
|
|
totalDocuments uint64
|
|
processedDocuments *atomic.Uint64
|
|
}
|
|
|
|
// consume reads VAA IDs from a channel, processes them, and updates the database accordingly.
|
|
//
|
|
// The function will return when:
|
|
// - the context is cancelled
|
|
// - a fatal error is encountered
|
|
// - the channel is closed (i.e.: no more items to process)
|
|
func consume(ctx context.Context, params *consumerParams) {
|
|
|
|
// Main loop: fetch global txs and process them
|
|
for {
|
|
select {
|
|
|
|
// Try to pop a globalTransaction from the queue
|
|
case globalTx, ok := <-params.queueRx:
|
|
|
|
// If the channel was closed, exit immediately
|
|
if !ok {
|
|
params.logger.Info("Closing, channel was closed")
|
|
params.wg.Done()
|
|
return
|
|
}
|
|
|
|
// Sanity check
|
|
if len(globalTx.Vaas) != 1 {
|
|
params.logger.Warn("globalTransaction doesn't match exactly one VAA, skipping",
|
|
zap.String("vaaId", globalTx.Id),
|
|
zap.Int("matches", len(globalTx.Vaas)),
|
|
)
|
|
params.processedDocuments.Add(1)
|
|
continue
|
|
}
|
|
if globalTx.Vaas[0].TxHash == nil {
|
|
params.logger.Warn("VAA doesn't have a TxHash, skipping",
|
|
zap.String("vaaId", globalTx.Id),
|
|
)
|
|
params.processedDocuments.Add(1)
|
|
continue
|
|
}
|
|
|
|
params.logger.Debug("Processing source tx",
|
|
zap.String("vaaId", globalTx.Id),
|
|
zap.String("txid", *globalTx.Vaas[0].TxHash),
|
|
)
|
|
|
|
// Process the transaction
|
|
//
|
|
// This involves:
|
|
// 1. Querying an API/RPC service for the source tx details
|
|
// 2. Persisting source tx details in the database.
|
|
v := globalTx.Vaas[0]
|
|
p := consumer.ProcessSourceTxParams{
|
|
VaaId: v.ID,
|
|
ChainId: v.EmitterChain,
|
|
Emitter: v.EmitterAddr,
|
|
Sequence: v.Sequence,
|
|
TxHash: *v.TxHash,
|
|
Overwrite: true, // Overwrite old contents
|
|
}
|
|
err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p)
|
|
if err != nil {
|
|
params.logger.Error("Failed to track source tx",
|
|
zap.String("vaaId", globalTx.Id),
|
|
zap.Error(err),
|
|
)
|
|
params.processedDocuments.Add(1)
|
|
continue
|
|
}
|
|
|
|
params.processedDocuments.Add(1)
|
|
params.logger.Debug("Updated source tx",
|
|
zap.String("vaaId", globalTx.Id),
|
|
zap.String("txid", *globalTx.Vaas[0].TxHash),
|
|
zap.String("progress", fmt.Sprintf("%d/%d", params.processedDocuments.Load(), params.totalDocuments)),
|
|
)
|
|
|
|
// If the context was cancelled, exit immediately
|
|
case <-ctx.Done():
|
|
params.logger.Info("Closing due to cancelled context")
|
|
params.wg.Done()
|
|
return
|
|
}
|
|
|
|
}
|
|
|
|
}
|