From 860df4c93c07bdb0baf40c44012b15accfbc6d82 Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Sun, 22 May 2022 14:15:59 +0400 Subject: [PATCH] Utils for splitting blocks and logs ranges --- .dockerignore | 2 + .github/workflows/ci.yml | 22 +++- .gitignore | 2 + go.mod | 3 + monitor/contract_monitor.go | 71 ++++--------- monitor/types.go | 34 +++++++ monitor/types_test.go | 193 ++++++++++++++++++++++++++++++++++++ 7 files changed, 274 insertions(+), 53 deletions(-) create mode 100644 monitor/types_test.go diff --git a/.dockerignore b/.dockerignore index 9310d11..6c112d6 100644 --- a/.dockerignore +++ b/.dockerignore @@ -4,3 +4,5 @@ Dockerfile docker-compose.* .dockerignore config.yml +cover.out +coverage.html diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1b40c05..4769713 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,11 +11,29 @@ env: DOCKER_IMAGE_NAME: ${{ github.repository }} jobs: - build-and-publish: + test: runs-on: ubuntu-latest steps: - name: Checkout repository - uses: actions/checkout@v2 + uses: actions/checkout@v3 + - uses: actions/setup-go@v3 + with: + go-version: '^1.17' + - name: Run tests + run: go test -race -cover -coverprofile cover.out ./... + - name: Generage coverage report + run: go tool cover -html=cover.out -o coverage.html + - name: Upload coverage + uses: actions/upload-artifact@v3 + with: + name: coverage + path: coverage.html + build-and-publish: + runs-on: ubuntu-latest + needs: test + steps: + - name: Checkout repository + uses: actions/checkout@v3 - name: Login to GitHub Container Registry uses: docker/login-action@v1 with: diff --git a/.gitignore b/.gitignore index dbe6b76..18dc530 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ .idea/ prometheus/slack_api_url.txt prometheus/admin_password.txt +cover.out +coverage.html diff --git a/go.mod b/go.mod index 2653dd5..41a9543 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/lib/pq v1.10.4 github.com/prometheus/client_golang v1.11.1 github.com/sirupsen/logrus v1.8.1 + github.com/stretchr/testify v1.7.0 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b ) @@ -20,6 +21,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set v1.8.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/go-ole/go-ole v1.2.1 // indirect @@ -39,6 +41,7 @@ require ( github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.30.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/monitor/contract_monitor.go b/monitor/contract_monitor.go index 7e12227..392a3fe 100644 --- a/monitor/contract_monitor.go +++ b/monitor/contract_monitor.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "math" "math/big" "sort" "sync" @@ -172,20 +171,13 @@ func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) { head -= m.cfg.BlockConfirmations m.recordHeadBlockNumber(head) - for start <= head { - end := start + m.cfg.MaxBlockRangeSize - 1 - if end > head { - end = head - } + batches := SplitBlockRange(start, head, m.cfg.MaxBlockRangeSize) + for _, batch := range batches { m.logger.WithFields(logrus.Fields{ - "from_block": start, - "to_block": end, + "from_block": batch.From, + "to_block": batch.To, }).Info("scheduling new block range logs search") - m.blocksRangeChan <- &BlocksRange{ - From: start, - To: end, - } - start = end + 1 + m.blocksRangeChan <- batch } } @@ -207,25 +199,17 @@ func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) { toBlock = lastFetchedBlock } - for fromBlock <= toBlock { - end := fromBlock + m.cfg.MaxBlockRangeSize - 1 - if end > toBlock { - end = toBlock - } + batches := SplitBlockRange(fromBlock, toBlock, m.cfg.MaxBlockRangeSize) + for _, batch := range batches { m.logger.WithFields(logrus.Fields{ - "from_block": fromBlock, - "to_block": end, + "from_block": batch.From, + "to_block": batch.To, }).Info("scheduling new block range logs search") - br := &BlocksRange{ - From: fromBlock, - To: end, - } if job.Event != "" { topic := crypto.Keccak256Hash([]byte(job.Event)) - br.Topic = &topic + batch.Topic = &topic } - m.blocksRangeChan <- br - fromBlock = end + 1 + m.blocksRangeChan <- batch } } } @@ -340,34 +324,19 @@ func (m *ContractMonitor) tryToFetchLogs(ctx context.Context, blocksRange *Block } func (m *ContractMonitor) submitLogs(logs []*entity.Log, endBlock uint) { - jobs, lastBlock := 0, uint(0) - for _, log := range logs { - if log.BlockNumber > lastBlock { - lastBlock = log.BlockNumber - jobs++ - } - } + logBatches := SplitLogsInBatches(logs) m.logger.WithFields(logrus.Fields{ "count": len(logs), - "jobs": jobs, + "jobs": len(logBatches), }).Info("create jobs for logs processor") - // fake log to simplify loop, it will be skipped - logs = append(logs, &entity.Log{BlockNumber: math.MaxUint32}) - batchStartIndex := 0 - for i, log := range logs { - if log.BlockNumber > logs[batchStartIndex].BlockNumber { - m.logger.WithFields(logrus.Fields{ - "count": i - batchStartIndex, - "block_number": logs[batchStartIndex].BlockNumber, - }).Debug("submitting logs batch to logs processor") - m.logsChan <- &LogsBatch{ - BlockNumber: logs[batchStartIndex].BlockNumber, - Logs: logs[batchStartIndex:i], - } - batchStartIndex = i - } + for _, batch := range logBatches { + m.logger.WithFields(logrus.Fields{ + "count": len(batch.Logs), + "block_number": batch.BlockNumber, + }).Debug("submitting logs batch to logs processor") + m.logsChan <- batch } - if lastBlock < endBlock { + if logBatches[len(logBatches)-1].BlockNumber < endBlock { m.logsChan <- &LogsBatch{ BlockNumber: endBlock, Logs: nil, diff --git a/monitor/types.go b/monitor/types.go index 3193526..9fb9b3b 100644 --- a/monitor/types.go +++ b/monitor/types.go @@ -1,6 +1,7 @@ package monitor import ( + "math" "tokenbridge-monitor/entity" "github.com/ethereum/go-ethereum/common" @@ -16,3 +17,36 @@ type LogsBatch struct { BlockNumber uint Logs []*entity.Log } + +func SplitBlockRange(fromBlock uint, toBlock uint, maxSize uint) []*BlocksRange { + batches := make([]*BlocksRange, 0, 10) + for fromBlock <= toBlock { + batchToBlock := fromBlock + maxSize - 1 + if batchToBlock > toBlock { + batchToBlock = toBlock + } + batches = append(batches, &BlocksRange{ + From: fromBlock, + To: batchToBlock, + }) + fromBlock += maxSize + } + return batches +} + +func SplitLogsInBatches(logs []*entity.Log) []*LogsBatch { + batches := make([]*LogsBatch, 0, 10) + // fake log to simplify loop, it will be skipped + logs = append(logs, &entity.Log{BlockNumber: math.MaxUint32}) + batchStartIndex := 0 + for i, log := range logs { + if log.BlockNumber > logs[batchStartIndex].BlockNumber { + batches = append(batches, &LogsBatch{ + BlockNumber: logs[batchStartIndex].BlockNumber, + Logs: logs[batchStartIndex:i], + }) + batchStartIndex = i + } + } + return batches +} diff --git a/monitor/types_test.go b/monitor/types_test.go new file mode 100644 index 0000000..c5f04ac --- /dev/null +++ b/monitor/types_test.go @@ -0,0 +1,193 @@ +package monitor_test + +import ( + "testing" + "tokenbridge-monitor/entity" + "tokenbridge-monitor/monitor" + + "github.com/stretchr/testify/require" +) + +func TestSplitBlockRange(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + Name string + Input [3]uint + ExpectedOutput []*monitor.BlocksRange + }{ + { + Name: "Split range in two", + Input: [3]uint{100, 199, 50}, + ExpectedOutput: []*monitor.BlocksRange{ + {100, 149, nil}, + {150, 199, nil}, + }, + }, + { + Name: "Split range in two 2", + Input: [3]uint{100, 200, 90}, + ExpectedOutput: []*monitor.BlocksRange{ + {100, 189, nil}, + {190, 200, nil}, + }, + }, + { + Name: "Split range in three", + Input: [3]uint{100, 200, 50}, + ExpectedOutput: []*monitor.BlocksRange{ + {100, 149, nil}, + {150, 199, nil}, + {200, 200, nil}, + }, + }, + { + Name: "Keep range as is", + Input: [3]uint{100, 200, 101}, + ExpectedOutput: []*monitor.BlocksRange{ + {100, 200, nil}, + }, + }, + { + Name: "Keep range as is 2", + Input: [3]uint{100, 200, 999}, + ExpectedOutput: []*monitor.BlocksRange{ + {100, 200, nil}, + }, + }, + { + Name: "Keep range of one block", + Input: [3]uint{100, 100, 10}, + ExpectedOutput: []*monitor.BlocksRange{ + {100, 100, nil}, + }, + }, + { + Name: "Split range in many subranges", + Input: [3]uint{100000, 201000, 5000}, + ExpectedOutput: []*monitor.BlocksRange{ + {100000, 104999, nil}, + {105000, 109999, nil}, + {110000, 114999, nil}, + {115000, 119999, nil}, + {120000, 124999, nil}, + {125000, 129999, nil}, + {130000, 134999, nil}, + {135000, 139999, nil}, + {140000, 144999, nil}, + {145000, 149999, nil}, + {150000, 154999, nil}, + {155000, 159999, nil}, + {160000, 164999, nil}, + {165000, 169999, nil}, + {170000, 174999, nil}, + {175000, 179999, nil}, + {180000, 184999, nil}, + {185000, 189999, nil}, + {190000, 194999, nil}, + {195000, 199999, nil}, + {200000, 201000, nil}, + }, + }, + { + Name: "Invalid range", + Input: [3]uint{200, 100, 50}, + ExpectedOutput: []*monitor.BlocksRange{}, + }, + { + Name: "Invalid range 2", + Input: [3]uint{200, 100, 500}, + ExpectedOutput: []*monitor.BlocksRange{}, + }, + } { + t.Logf("Running sub-test %q", test.Name) + res := monitor.SplitBlockRange(test.Input[0], test.Input[1], test.Input[2]) + require.Equal(t, test.ExpectedOutput, res, "Failed %s", test.Name) + } +} + +func TestSplitLogsInBatches(t *testing.T) { + t.Parallel() + + for _, test := range []struct { + Name string + Input []*entity.Log + ExpectedOutput []*monitor.LogsBatch + }{ + { + Name: "Split range in two", + Input: []*entity.Log{ + {ID: 1, BlockNumber: 100}, + {ID: 2, BlockNumber: 100}, + {ID: 3, BlockNumber: 150}, + {ID: 4, BlockNumber: 150}, + }, + ExpectedOutput: []*monitor.LogsBatch{ + {100, []*entity.Log{ + {ID: 1, BlockNumber: 100}, + {ID: 2, BlockNumber: 100}, + }}, + {150, []*entity.Log{ + {ID: 3, BlockNumber: 150}, + {ID: 4, BlockNumber: 150}, + }}, + }, + }, + { + Name: "Split range in three", + Input: []*entity.Log{ + {ID: 1, BlockNumber: 100}, + {ID: 2, BlockNumber: 120}, + {ID: 3, BlockNumber: 120}, + {ID: 4, BlockNumber: 150}, + }, + ExpectedOutput: []*monitor.LogsBatch{ + {100, []*entity.Log{ + {ID: 1, BlockNumber: 100}, + }}, + {120, []*entity.Log{ + {ID: 2, BlockNumber: 120}, + {ID: 3, BlockNumber: 120}, + }}, + {150, []*entity.Log{ + {ID: 4, BlockNumber: 150}, + }}, + }, + }, + { + Name: "Leave empty range", + Input: []*entity.Log{}, + ExpectedOutput: []*monitor.LogsBatch{}, + }, + { + Name: "Keep single range of one log as is", + Input: []*entity.Log{ + {ID: 1, BlockNumber: 100}, + }, + ExpectedOutput: []*monitor.LogsBatch{ + {100, []*entity.Log{ + {ID: 1, BlockNumber: 100}, + }}, + }, + }, + { + Name: "Keep single range as is", + Input: []*entity.Log{ + {ID: 1, BlockNumber: 100}, + {ID: 2, BlockNumber: 100}, + {ID: 3, BlockNumber: 100}, + }, + ExpectedOutput: []*monitor.LogsBatch{ + {100, []*entity.Log{ + {ID: 1, BlockNumber: 100}, + {ID: 2, BlockNumber: 100}, + {ID: 3, BlockNumber: 100}, + }}, + }, + }, + } { + t.Logf("Running sub-test %q", test.Name) + res := monitor.SplitLogsInBatches(test.Input) + require.Equal(t, test.ExpectedOutput, res, "Failed %s", test.Name) + } +}