Optimize tx-tracker backfiller (#220)
### Summary The backfiller for tx-tracker data was not running as fast as it should. It was optimized it in order to reduce processing times.
This commit is contained in:
parent
d009a616c8
commit
24673d4f31
|
@ -98,9 +98,9 @@ spec:
|
|||
- name: TERRA_REQUESTS_PER_MINUTE
|
||||
value: "{{ .TERRA_REQUESTS_PER_MINUTE }}"
|
||||
- name: NUM_WORKERS
|
||||
value: "20"
|
||||
value: "100"
|
||||
- name: BULK_SIZE
|
||||
value: "500"
|
||||
value: "1000"
|
||||
- name: STRATEGY_NAME
|
||||
value: "{{ .STRATEGY_NAME }}"
|
||||
- name: STRATEGY_TIMESTAMP_AFTER
|
||||
|
|
|
@ -2,7 +2,6 @@ package chains
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
|
@ -48,7 +47,7 @@ func fetchEthTx(
|
|||
return nil, fmt.Errorf("failed to get tx by hash: %w", err)
|
||||
}
|
||||
if txReply.BlockHash == "" || txReply.From == "" {
|
||||
return nil, errors.New("received empty response from the RPC service")
|
||||
return nil, ErrTransactionNotFound
|
||||
}
|
||||
|
||||
// query block data
|
||||
|
|
|
@ -154,8 +154,8 @@ func parseStrategyCallbacks(
|
|||
countFn: func(ctx context.Context) (uint64, error) {
|
||||
return r.CountDocumentsByTimeRange(ctx, timestampAfter, timestampBefore)
|
||||
},
|
||||
iteratorFn: func(ctx context.Context, maxId string, limit uint) ([]consumer.GlobalTransaction, error) {
|
||||
return r.GetDocumentsByTimeRange(ctx, maxId, limit, timestampAfter, timestampBefore)
|
||||
iteratorFn: func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error) {
|
||||
return r.GetDocumentsByTimeRange(ctx, lastId, lastTimestamp, limit, timestampAfter, timestampBefore)
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -174,7 +174,7 @@ func parseStrategyCallbacks(
|
|||
|
||||
type strategyCallbacks struct {
|
||||
countFn func(ctx context.Context) (uint64, error)
|
||||
iteratorFn func(ctx context.Context, maxId string, limit uint) ([]consumer.GlobalTransaction, error)
|
||||
iteratorFn func(ctx context.Context, lastId string, lastTimestamp *time.Time, limit uint) ([]consumer.GlobalTransaction, error)
|
||||
}
|
||||
|
||||
// producerParams contains the parameters for the producer goroutine.
|
||||
|
@ -196,11 +196,12 @@ func produce(ctx context.Context, params *producerParams) {
|
|||
defer close(params.queueTx)
|
||||
|
||||
// Producer main loop
|
||||
var maxId = ""
|
||||
var lastId = ""
|
||||
var lastTimestamp *time.Time
|
||||
for {
|
||||
|
||||
// Get a batch of VAA IDs from the database
|
||||
globalTxs, err := params.strategyCallbacks.iteratorFn(ctx, maxId, params.bulkSize)
|
||||
globalTxs, err := params.strategyCallbacks.iteratorFn(ctx, lastId, lastTimestamp, params.bulkSize)
|
||||
if err != nil {
|
||||
params.logger.Error("Closing: failed to read from cursor", zap.Error(err))
|
||||
return
|
||||
|
@ -217,7 +218,10 @@ func produce(ctx context.Context, params *producerParams) {
|
|||
for _, globalTx := range globalTxs {
|
||||
select {
|
||||
case params.queueTx <- globalTx:
|
||||
maxId = globalTx.Id
|
||||
if len(globalTx.Vaas) != 0 {
|
||||
lastId = globalTx.Id
|
||||
lastTimestamp = globalTx.Vaas[0].Timestamp
|
||||
}
|
||||
case <-ctx.Done():
|
||||
params.logger.Info("Closing: context was cancelled")
|
||||
return
|
||||
|
|
|
@ -194,7 +194,8 @@ type GlobalTransaction struct {
|
|||
// GetDocumentsByTimeRange iterates through documents within a specified time range.
|
||||
func (r *Repository) GetDocumentsByTimeRange(
|
||||
ctx context.Context,
|
||||
maxId string,
|
||||
lastId string,
|
||||
lastTimestamp *time.Time,
|
||||
limit uint,
|
||||
timeAfter time.Time,
|
||||
timeBefore time.Time,
|
||||
|
@ -205,15 +206,28 @@ func (r *Repository) GetDocumentsByTimeRange(
|
|||
{
|
||||
// Specify sorting criteria
|
||||
pipeline = append(pipeline, bson.D{
|
||||
{"$sort", bson.D{bson.E{"_id", 1}}},
|
||||
{"$sort", bson.D{
|
||||
bson.E{"timestamp", -1},
|
||||
bson.E{"_id", 1},
|
||||
}},
|
||||
})
|
||||
|
||||
// filter out already processed documents
|
||||
//
|
||||
// We use the _id field as a pagination cursor
|
||||
// We use the timestap field as a pagination cursor
|
||||
if lastTimestamp != nil {
|
||||
pipeline = append(pipeline, bson.D{
|
||||
{"$match", bson.D{{"_id", bson.M{"$gt": maxId}}}},
|
||||
{"$match", bson.D{
|
||||
{"$or", bson.A{
|
||||
bson.D{{"timestamp", bson.M{"$lt": *lastTimestamp}}},
|
||||
bson.D{{"$and", bson.A{
|
||||
bson.D{{"timestamp", bson.M{"$eq": *lastTimestamp}}},
|
||||
bson.D{{"_id", bson.M{"$gt": lastId}}},
|
||||
}}},
|
||||
}},
|
||||
}},
|
||||
})
|
||||
}
|
||||
|
||||
// filter by time range
|
||||
pipeline = append(pipeline, bson.D{
|
||||
|
@ -264,7 +278,8 @@ func (r *Repository) GetDocumentsByTimeRange(
|
|||
// GetIncompleteDocuments gets a batch of VAA IDs from the database.
|
||||
func (r *Repository) GetIncompleteDocuments(
|
||||
ctx context.Context,
|
||||
maxId string,
|
||||
lastId string,
|
||||
lastTimestamp *time.Time,
|
||||
limit uint,
|
||||
) ([]GlobalTransaction, error) {
|
||||
|
||||
|
@ -280,7 +295,7 @@ func (r *Repository) GetIncompleteDocuments(
|
|||
//
|
||||
// We use the _id field as a pagination cursor
|
||||
pipeline = append(pipeline, bson.D{
|
||||
{"$match", bson.D{{"_id", bson.M{"$gt": maxId}}}},
|
||||
{"$match", bson.D{{"_id", bson.M{"$gt": lastId}}}},
|
||||
})
|
||||
|
||||
// Look up transactions that either:
|
||||
|
|
Loading…
Reference in New Issue