diff --git a/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml b/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml index 46127e52..93bc80ab 100644 --- a/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml +++ b/deploy/tx-tracker-backfiller/tx-tracker-backfiller-job.yaml @@ -98,9 +98,9 @@ spec: - name: TERRA_REQUESTS_PER_MINUTE value: "{{ .TERRA_REQUESTS_PER_MINUTE }}" - name: NUM_WORKERS - value: "20" + value: "100" - name: BULK_SIZE - value: "500" + value: "1000" - name: STRATEGY_NAME value: "{{ .STRATEGY_NAME }}" - name: STRATEGY_TIMESTAMP_AFTER diff --git a/tx-tracker/chains/eth.go b/tx-tracker/chains/eth.go index 54481402..3865a97a 100644 --- a/tx-tracker/chains/eth.go +++ b/tx-tracker/chains/eth.go @@ -2,7 +2,6 @@ package chains import ( "context" - "errors" "fmt" "strings" @@ -48,7 +47,7 @@ func fetchEthTx( return nil, fmt.Errorf("failed to get tx by hash: %w", err) } if txReply.BlockHash == "" || txReply.From == "" { - return nil, errors.New("received empty response from the RPC service") + return nil, ErrTransactionNotFound } // query block data diff --git a/tx-tracker/cmd/backfiller/main.go b/tx-tracker/cmd/backfiller/main.go index 6ae296b9..0c461acb 100644 --- a/tx-tracker/cmd/backfiller/main.go +++ b/tx-tracker/cmd/backfiller/main.go @@ -154,8 +154,8 @@ func parseStrategyCallbacks( countFn: func(ctx context.Context) (uint64, error) { return r.CountDocumentsByTimeRange(ctx, timestampAfter, timestampBefore) }, - iteratorFn: func(ctx context.Context, maxId string, limit uint) ([]consumer.GlobalTransaction, error) { - return r.GetDocumentsByTimeRange(ctx, maxId, limit, timestampAfter, timestampBefore) + iteratorFn: func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error) { + return r.GetDocumentsByTimeRange(ctx, lastId, lastTimestamp, limit, timestampAfter, timestampBefore) }, } @@ -174,7 +174,7 @@ func parseStrategyCallbacks( type strategyCallbacks struct { countFn func(ctx context.Context) (uint64, error) - iteratorFn func(ctx context.Context, maxId string, limit uint) ([]consumer.GlobalTransaction, error) + iteratorFn func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error) } // producerParams contains the parameters for the producer goroutine. @@ -196,11 +196,12 @@ func produce(ctx context.Context, params *producerParams) { defer close(params.queueTx) // Producer main loop - var maxId = "" + var lastId = "" + var lastTimestamp *time.Time for { // Get a batch of VAA IDs from the database - globalTxs, err := params.strategyCallbacks.iteratorFn(ctx, maxId, params.bulkSize) + globalTxs, err := params.strategyCallbacks.iteratorFn(ctx, lastId, lastTimestamp, params.bulkSize) if err != nil { params.logger.Error("Closing: failed to read from cursor", zap.Error(err)) return @@ -217,7 +218,10 @@ func produce(ctx context.Context, params *producerParams) { for _, globalTx := range globalTxs { select { case params.queueTx <- globalTx: - maxId = globalTx.Id + if len(globalTx.Vaas) != 0 { + lastId = globalTx.Id + lastTimestamp = globalTx.Vaas[0].Timestamp + } case <-ctx.Done(): params.logger.Info("Closing: context was cancelled") return diff --git a/tx-tracker/consumer/repository.go b/tx-tracker/consumer/repository.go index d6b13c49..f0cda91f 100644 --- a/tx-tracker/consumer/repository.go +++ b/tx-tracker/consumer/repository.go @@ -194,7 +194,8 @@ type GlobalTransaction struct { // GetDocumentsByTimeRange iterates through documents within a specified time range. func (r *Repository) GetDocumentsByTimeRange( ctx context.Context, - maxId string, + lastId string, + lastTimestamp *time.Time, limit uint, timeAfter time.Time, timeBefore time.Time, @@ -205,15 +206,28 @@ func (r *Repository) GetDocumentsByTimeRange( { // Specify sorting criteria pipeline = append(pipeline, bson.D{ - {"$sort", bson.D{bson.E{"_id", 1}}}, + {"$sort", bson.D{ + bson.E{"timestamp", -1}, + bson.E{"_id", 1}, + }}, }) // filter out already processed documents // - // We use the _id field as a pagination cursor - pipeline = append(pipeline, bson.D{ - {"$match", bson.D{{"_id", bson.M{"$gt": maxId}}}}, - }) + // We use the timestap field as a pagination cursor + if lastTimestamp != nil { + pipeline = append(pipeline, bson.D{ + {"$match", bson.D{ + {"$or", bson.A{ + bson.D{{"timestamp", bson.M{"$lt": *lastTimestamp}}}, + bson.D{{"$and", bson.A{ + bson.D{{"timestamp", bson.M{"$eq": *lastTimestamp}}}, + bson.D{{"_id", bson.M{"$gt": lastId}}}, + }}}, + }}, + }}, + }) + } // filter by time range pipeline = append(pipeline, bson.D{ @@ -264,7 +278,8 @@ func (r *Repository) GetDocumentsByTimeRange( // GetIncompleteDocuments gets a batch of VAA IDs from the database. func (r *Repository) GetIncompleteDocuments( ctx context.Context, - maxId string, + lastId string, + lastTimestamp *time.Time, limit uint, ) ([]GlobalTransaction, error) { @@ -280,7 +295,7 @@ func (r *Repository) GetIncompleteDocuments( // // We use the _id field as a pagination cursor pipeline = append(pipeline, bson.D{ - {"$match", bson.D{{"_id", bson.M{"$gt": maxId}}}}, + {"$match", bson.D{{"_id", bson.M{"$gt": lastId}}}}, }) // Look up transactions that either: