package main
import (
const layoutRcf3339 = "2006-01-02T15:04:05.000Z"
func makeLogger(logger *zap.Logger, name string) *zap.Logger {
rightPadding := fmt.Sprintf("%-10s", name)
l := logger.Named(rightPadding)
return l
func main() {
// Create the top-level context
rootCtx, rootCtxCancel := context.WithCancel(context.Background())
// Load config
cfg, err := config.LoadFromEnv[config.BackfillerSettings]()
if err != nil {
log.Fatal("Failed to load config: ", err)
// Initialize rate limiters
// Initialize logger
rootLogger := logger.New("backfiller", logger.WithLevel(cfg.LogLevel))
mainLogger := makeLogger(rootLogger, "main")
// Spawn a goroutine that will call `cancelFunc` if a signal is received.
go func() {
l := makeLogger(rootLogger, "watcher")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-rootCtx.Done():
l.Info("Closing due to cancelled context")
case _ = <-sigterm:
l.Info("Cancelling root context")
// Initialize the database client
cli, err := mongo.Connect(rootCtx, options.Client().ApplyURI(cfg.MongodbUri))
if err != nil {
log.Fatal("Failed to initialize MongoDB client: ", err)
defer cli.Disconnect(rootCtx)
repository := consumer.NewRepository(rootLogger, cli.Database(cfg.MongodbDatabase))
strategyCallbacks, err := parseStrategyCallbacks(mainLogger, cfg, repository)
if err != nil {
log.Fatal("Failed to parse strategy callbacks: ", err)
// Count the number of documents to process
totalDocuments, err := strategyCallbacks.countFn(rootCtx)
if err != nil {
log.Fatal("Closing - failed to count number of global transactions: ", err)
mainLogger.Info("Starting", zap.Uint64("documentsToProcess", totalDocuments))
// Spawn the producer goroutine.
// The producer sends tasks to the workers via a buffered channel.
queue := make(chan consumer.GlobalTransaction, cfg.BulkSize)
p := producerParams{
logger: makeLogger(rootLogger, "producer"),
repository: repository,
queueTx: queue,
bulkSize: cfg.BulkSize,
strategyCallbacks: strategyCallbacks,
go produce(rootCtx, &p)
// Spawn a goroutine for each worker
var wg sync.WaitGroup
var processedDocuments atomic.Uint64
for i := uint(0); i < cfg.NumWorkers; i++ {
name := fmt.Sprintf("worker-%d", i)
p := consumerParams{
logger: makeLogger(rootLogger, name),
rpcProviderSettings: &cfg.RpcProviderSettings,
repository: repository,
queueRx: queue,
wg: &wg,
totalDocuments: totalDocuments,
processedDocuments: &processedDocuments,
go consume(rootCtx, &p)
// Wait for all workers to finish before closing
mainLogger.Info("Closing main goroutine")
func parseStrategyCallbacks(
logger *zap.Logger,
cfg *config.BackfillerSettings,
r *consumer.Repository,
) (*strategyCallbacks, error) {
switch cfg.Strategy.Name {
case config.BackfillerStrategyReprocessFailed:
cb := strategyCallbacks{
countFn: r.CountIncompleteDocuments,
iteratorFn: r.GetIncompleteDocuments,
logger.Info("backfilling incomplete documents")
return &cb, nil
case config.BackfillerStrategyTimeRange:
timestampAfter, err := time.Parse(layoutRcf3339, cfg.Strategy.TimestampAfter)
if err != nil {
return nil, fmt.Errorf("failed to parse timestampAfter: %w", err)
timestampBefore, err := time.Parse(layoutRcf3339, cfg.Strategy.TimestampBefore)
if err != nil {
return nil, fmt.Errorf("failed to parse timestampBefore: %w", err)
cb := strategyCallbacks{
countFn: func(ctx context.Context) (uint64, error) {
return r.CountDocumentsByTimeRange(ctx, timestampAfter, timestampBefore)
iteratorFn: func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error) {
return r.GetDocumentsByTimeRange(ctx, lastId, lastTimestamp, limit, timestampAfter, timestampBefore)
logger.Info("backfilling by time range",
zap.Time("after", timestampAfter),
zap.Time("before", timestampBefore),
return &cb, nil
return nil, fmt.Errorf("unknown strategy: %s", cfg.Strategy.Name)
type strategyCallbacks struct {
countFn func(ctx context.Context) (uint64, error)
iteratorFn func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error)
// producerParams contains the parameters for the producer goroutine.
type producerParams struct {
logger *zap.Logger
repository *consumer.Repository
queueTx chan<- consumer.GlobalTransaction
bulkSize uint
strategyCallbacks *strategyCallbacks
// produce reads VAA IDs from the database, and sends them through a channel for the workers to consume.
// The function will return when:
// - the context is cancelled
// - a fatal error is encountered
// - there are no more items to process
func produce(ctx context.Context, params *producerParams) {
defer close(params.queueTx)
// Producer main loop
var lastId = ""
var lastTimestamp *time.Time
for {
// Get a batch of VAA IDs from the database
globalTxs, err := params.strategyCallbacks.iteratorFn(ctx, lastId, lastTimestamp, params.bulkSize)
if err != nil {
params.logger.Error("Closing: failed to read from cursor", zap.Error(err))
// If there are no more documents to process, close the goroutine
if len(globalTxs) == 0 {
params.logger.Info("Closing: no documents left in database")
// Enqueue the VAA IDs, and update the pagination cursor
params.logger.Debug("queueing batch for consumers", zap.Int("numElements", len(globalTxs)))
for _, globalTx := range globalTxs {
select {
case params.queueTx <- globalTx:
if len(globalTx.Vaas) != 0 {
lastId = globalTx.Id
lastTimestamp = globalTx.Vaas[0].Timestamp
case <-ctx.Done():
params.logger.Info("Closing: context was cancelled")
// consumerParams contains the parameters for the consumer goroutine.
type consumerParams struct {
logger *zap.Logger
rpcProviderSettings *config.RpcProviderSettings
repository *consumer.Repository
queueRx <-chan consumer.GlobalTransaction
wg *sync.WaitGroup
totalDocuments uint64
processedDocuments *atomic.Uint64
// consume reads VAA IDs from a channel, processes them, and updates the database accordingly.
// The function will return when:
// - the context is cancelled
// - a fatal error is encountered
// - the channel is closed (i.e.: no more items to process)
func consume(ctx context.Context, params *consumerParams) {
// Main loop: fetch global txs and process them
for {
select {
// Try to pop a globalTransaction from the queue
case globalTx, ok := <-params.queueRx:
// If the channel was closed, exit immediately
if !ok {
params.logger.Info("Closing, channel was closed")
// Sanity check
if len(globalTx.Vaas) != 1 {
params.logger.Warn("globalTransaction doesn't match exactly one VAA, skipping",
zap.String("vaaId", globalTx.Id),
zap.Int("matches", len(globalTx.Vaas)),
if globalTx.Vaas[0].TxHash == nil {
params.logger.Warn("VAA doesn't have a TxHash, skipping",
zap.String("vaaId", globalTx.Id),
params.logger.Debug("Processing source tx",
zap.String("vaaId", globalTx.Id),
zap.String("txid", *globalTx.Vaas[0].TxHash),
// Process the transaction
// This involves:
// 1. Querying an API/RPC service for the source tx details
// 2. Persisting source tx details in the database.
v := globalTx.Vaas[0]
p := consumer.ProcessSourceTxParams{
VaaId: v.ID,
ChainId: v.EmitterChain,
Emitter: v.EmitterAddr,
Sequence: v.Sequence,
TxHash: *v.TxHash,
Overwrite: true, // Overwrite old contents
err := consumer.ProcessSourceTx(ctx, params.logger, params.rpcProviderSettings, params.repository, &p)
if err != nil {
params.logger.Error("Failed to track source tx",
zap.String("vaaId", globalTx.Id),
params.logger.Debug("Updated source tx",
zap.String("vaaId", globalTx.Id),
zap.String("txid", *globalTx.Vaas[0].TxHash),
zap.String("progress", fmt.Sprintf("%d/%d", params.processedDocuments.Load(), params.totalDocuments)),
// If the context was cancelled, exit immediately
case <-ctx.Done():
params.logger.Info("Closing due to cancelled context")