Utils for splitting blocks and logs ranges
This commit is contained in:
parent
1da539d42b
commit
860df4c93c
|
@ -4,3 +4,5 @@ Dockerfile
|
||||||
docker-compose.*
|
docker-compose.*
|
||||||
.dockerignore
|
.dockerignore
|
||||||
config.yml
|
config.yml
|
||||||
|
cover.out
|
||||||
|
coverage.html
|
||||||
|
|
|
@ -11,11 +11,29 @@ env:
|
||||||
DOCKER_IMAGE_NAME: ${{ github.repository }}
|
DOCKER_IMAGE_NAME: ${{ github.repository }}
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
build-and-publish:
|
test:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout repository
|
- name: Checkout repository
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v3
|
||||||
|
- uses: actions/setup-go@v3
|
||||||
|
with:
|
||||||
|
go-version: '^1.17'
|
||||||
|
- name: Run tests
|
||||||
|
run: go test -race -cover -coverprofile cover.out ./...
|
||||||
|
- name: Generage coverage report
|
||||||
|
run: go tool cover -html=cover.out -o coverage.html
|
||||||
|
- name: Upload coverage
|
||||||
|
uses: actions/upload-artifact@v3
|
||||||
|
with:
|
||||||
|
name: coverage
|
||||||
|
path: coverage.html
|
||||||
|
build-and-publish:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
needs: test
|
||||||
|
steps:
|
||||||
|
- name: Checkout repository
|
||||||
|
uses: actions/checkout@v3
|
||||||
- name: Login to GitHub Container Registry
|
- name: Login to GitHub Container Registry
|
||||||
uses: docker/login-action@v1
|
uses: docker/login-action@v1
|
||||||
with:
|
with:
|
||||||
|
|
|
@ -2,3 +2,5 @@
|
||||||
.idea/
|
.idea/
|
||||||
prometheus/slack_api_url.txt
|
prometheus/slack_api_url.txt
|
||||||
prometheus/admin_password.txt
|
prometheus/admin_password.txt
|
||||||
|
cover.out
|
||||||
|
coverage.html
|
||||||
|
|
3
go.mod
3
go.mod
|
@ -12,6 +12,7 @@ require (
|
||||||
github.com/lib/pq v1.10.4
|
github.com/lib/pq v1.10.4
|
||||||
github.com/prometheus/client_golang v1.11.1
|
github.com/prometheus/client_golang v1.11.1
|
||||||
github.com/sirupsen/logrus v1.8.1
|
github.com/sirupsen/logrus v1.8.1
|
||||||
|
github.com/stretchr/testify v1.7.0
|
||||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
|
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -20,6 +21,7 @@ require (
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
github.com/beorn7/perks v1.0.1 // indirect
|
||||||
github.com/btcsuite/btcd/btcec/v2 v2.1.2 // indirect
|
github.com/btcsuite/btcd/btcec/v2 v2.1.2 // indirect
|
||||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||||
|
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||||
github.com/deckarep/golang-set v1.8.0 // indirect
|
github.com/deckarep/golang-set v1.8.0 // indirect
|
||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
|
||||||
github.com/go-ole/go-ole v1.2.1 // indirect
|
github.com/go-ole/go-ole v1.2.1 // indirect
|
||||||
|
@ -39,6 +41,7 @@ require (
|
||||||
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
|
||||||
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
|
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
github.com/prometheus/client_model v0.2.0 // indirect
|
github.com/prometheus/client_model v0.2.0 // indirect
|
||||||
github.com/prometheus/common v0.30.0 // indirect
|
github.com/prometheus/common v0.30.0 // indirect
|
||||||
github.com/prometheus/procfs v0.7.3 // indirect
|
github.com/prometheus/procfs v0.7.3 // indirect
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
|
||||||
"math/big"
|
"math/big"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -172,20 +171,13 @@ func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) {
|
||||||
head -= m.cfg.BlockConfirmations
|
head -= m.cfg.BlockConfirmations
|
||||||
m.recordHeadBlockNumber(head)
|
m.recordHeadBlockNumber(head)
|
||||||
|
|
||||||
for start <= head {
|
batches := SplitBlockRange(start, head, m.cfg.MaxBlockRangeSize)
|
||||||
end := start + m.cfg.MaxBlockRangeSize - 1
|
for _, batch := range batches {
|
||||||
if end > head {
|
|
||||||
end = head
|
|
||||||
}
|
|
||||||
m.logger.WithFields(logrus.Fields{
|
m.logger.WithFields(logrus.Fields{
|
||||||
"from_block": start,
|
"from_block": batch.From,
|
||||||
"to_block": end,
|
"to_block": batch.To,
|
||||||
}).Info("scheduling new block range logs search")
|
}).Info("scheduling new block range logs search")
|
||||||
m.blocksRangeChan <- &BlocksRange{
|
m.blocksRangeChan <- batch
|
||||||
From: start,
|
|
||||||
To: end,
|
|
||||||
}
|
|
||||||
start = end + 1
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,25 +199,17 @@ func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) {
|
||||||
toBlock = lastFetchedBlock
|
toBlock = lastFetchedBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
for fromBlock <= toBlock {
|
batches := SplitBlockRange(fromBlock, toBlock, m.cfg.MaxBlockRangeSize)
|
||||||
end := fromBlock + m.cfg.MaxBlockRangeSize - 1
|
for _, batch := range batches {
|
||||||
if end > toBlock {
|
|
||||||
end = toBlock
|
|
||||||
}
|
|
||||||
m.logger.WithFields(logrus.Fields{
|
m.logger.WithFields(logrus.Fields{
|
||||||
"from_block": fromBlock,
|
"from_block": batch.From,
|
||||||
"to_block": end,
|
"to_block": batch.To,
|
||||||
}).Info("scheduling new block range logs search")
|
}).Info("scheduling new block range logs search")
|
||||||
br := &BlocksRange{
|
|
||||||
From: fromBlock,
|
|
||||||
To: end,
|
|
||||||
}
|
|
||||||
if job.Event != "" {
|
if job.Event != "" {
|
||||||
topic := crypto.Keccak256Hash([]byte(job.Event))
|
topic := crypto.Keccak256Hash([]byte(job.Event))
|
||||||
br.Topic = &topic
|
batch.Topic = &topic
|
||||||
}
|
}
|
||||||
m.blocksRangeChan <- br
|
m.blocksRangeChan <- batch
|
||||||
fromBlock = end + 1
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -340,34 +324,19 @@ func (m *ContractMonitor) tryToFetchLogs(ctx context.Context, blocksRange *Block
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *ContractMonitor) submitLogs(logs []*entity.Log, endBlock uint) {
|
func (m *ContractMonitor) submitLogs(logs []*entity.Log, endBlock uint) {
|
||||||
jobs, lastBlock := 0, uint(0)
|
logBatches := SplitLogsInBatches(logs)
|
||||||
for _, log := range logs {
|
|
||||||
if log.BlockNumber > lastBlock {
|
|
||||||
lastBlock = log.BlockNumber
|
|
||||||
jobs++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
m.logger.WithFields(logrus.Fields{
|
m.logger.WithFields(logrus.Fields{
|
||||||
"count": len(logs),
|
"count": len(logs),
|
||||||
"jobs": jobs,
|
"jobs": len(logBatches),
|
||||||
}).Info("create jobs for logs processor")
|
}).Info("create jobs for logs processor")
|
||||||
// fake log to simplify loop, it will be skipped
|
for _, batch := range logBatches {
|
||||||
logs = append(logs, &entity.Log{BlockNumber: math.MaxUint32})
|
m.logger.WithFields(logrus.Fields{
|
||||||
batchStartIndex := 0
|
"count": len(batch.Logs),
|
||||||
for i, log := range logs {
|
"block_number": batch.BlockNumber,
|
||||||
if log.BlockNumber > logs[batchStartIndex].BlockNumber {
|
}).Debug("submitting logs batch to logs processor")
|
||||||
m.logger.WithFields(logrus.Fields{
|
m.logsChan <- batch
|
||||||
"count": i - batchStartIndex,
|
|
||||||
"block_number": logs[batchStartIndex].BlockNumber,
|
|
||||||
}).Debug("submitting logs batch to logs processor")
|
|
||||||
m.logsChan <- &LogsBatch{
|
|
||||||
BlockNumber: logs[batchStartIndex].BlockNumber,
|
|
||||||
Logs: logs[batchStartIndex:i],
|
|
||||||
}
|
|
||||||
batchStartIndex = i
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if lastBlock < endBlock {
|
if logBatches[len(logBatches)-1].BlockNumber < endBlock {
|
||||||
m.logsChan <- &LogsBatch{
|
m.logsChan <- &LogsBatch{
|
||||||
BlockNumber: endBlock,
|
BlockNumber: endBlock,
|
||||||
Logs: nil,
|
Logs: nil,
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package monitor
|
package monitor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"tokenbridge-monitor/entity"
|
"tokenbridge-monitor/entity"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
@ -16,3 +17,36 @@ type LogsBatch struct {
|
||||||
BlockNumber uint
|
BlockNumber uint
|
||||||
Logs []*entity.Log
|
Logs []*entity.Log
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SplitBlockRange(fromBlock uint, toBlock uint, maxSize uint) []*BlocksRange {
|
||||||
|
batches := make([]*BlocksRange, 0, 10)
|
||||||
|
for fromBlock <= toBlock {
|
||||||
|
batchToBlock := fromBlock + maxSize - 1
|
||||||
|
if batchToBlock > toBlock {
|
||||||
|
batchToBlock = toBlock
|
||||||
|
}
|
||||||
|
batches = append(batches, &BlocksRange{
|
||||||
|
From: fromBlock,
|
||||||
|
To: batchToBlock,
|
||||||
|
})
|
||||||
|
fromBlock += maxSize
|
||||||
|
}
|
||||||
|
return batches
|
||||||
|
}
|
||||||
|
|
||||||
|
func SplitLogsInBatches(logs []*entity.Log) []*LogsBatch {
|
||||||
|
batches := make([]*LogsBatch, 0, 10)
|
||||||
|
// fake log to simplify loop, it will be skipped
|
||||||
|
logs = append(logs, &entity.Log{BlockNumber: math.MaxUint32})
|
||||||
|
batchStartIndex := 0
|
||||||
|
for i, log := range logs {
|
||||||
|
if log.BlockNumber > logs[batchStartIndex].BlockNumber {
|
||||||
|
batches = append(batches, &LogsBatch{
|
||||||
|
BlockNumber: logs[batchStartIndex].BlockNumber,
|
||||||
|
Logs: logs[batchStartIndex:i],
|
||||||
|
})
|
||||||
|
batchStartIndex = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return batches
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,193 @@
|
||||||
|
package monitor_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"tokenbridge-monitor/entity"
|
||||||
|
"tokenbridge-monitor/monitor"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSplitBlockRange(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
for _, test := range []struct {
|
||||||
|
Name string
|
||||||
|
Input [3]uint
|
||||||
|
ExpectedOutput []*monitor.BlocksRange
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "Split range in two",
|
||||||
|
Input: [3]uint{100, 199, 50},
|
||||||
|
ExpectedOutput: []*monitor.BlocksRange{
|
||||||
|
{100, 149, nil},
|
||||||
|
{150, 199, nil},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Split range in two 2",
|
||||||
|
Input: [3]uint{100, 200, 90},
|
||||||
|
ExpectedOutput: []*monitor.BlocksRange{
|
||||||
|
{100, 189, nil},
|
||||||
|
{190, 200, nil},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Split range in three",
|
||||||
|
Input: [3]uint{100, 200, 50},
|
||||||
|
ExpectedOutput: []*monitor.BlocksRange{
|
||||||
|
{100, 149, nil},
|
||||||
|
{150, 199, nil},
|
||||||
|
{200, 200, nil},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Keep range as is",
|
||||||
|
Input: [3]uint{100, 200, 101},
|
||||||
|
ExpectedOutput: []*monitor.BlocksRange{
|
||||||
|
{100, 200, nil},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Keep range as is 2",
|
||||||
|
Input: [3]uint{100, 200, 999},
|
||||||
|
ExpectedOutput: []*monitor.BlocksRange{
|
||||||
|
{100, 200, nil},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Keep range of one block",
|
||||||
|
Input: [3]uint{100, 100, 10},
|
||||||
|
ExpectedOutput: []*monitor.BlocksRange{
|
||||||
|
{100, 100, nil},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Split range in many subranges",
|
||||||
|
Input: [3]uint{100000, 201000, 5000},
|
||||||
|
ExpectedOutput: []*monitor.BlocksRange{
|
||||||
|
{100000, 104999, nil},
|
||||||
|
{105000, 109999, nil},
|
||||||
|
{110000, 114999, nil},
|
||||||
|
{115000, 119999, nil},
|
||||||
|
{120000, 124999, nil},
|
||||||
|
{125000, 129999, nil},
|
||||||
|
{130000, 134999, nil},
|
||||||
|
{135000, 139999, nil},
|
||||||
|
{140000, 144999, nil},
|
||||||
|
{145000, 149999, nil},
|
||||||
|
{150000, 154999, nil},
|
||||||
|
{155000, 159999, nil},
|
||||||
|
{160000, 164999, nil},
|
||||||
|
{165000, 169999, nil},
|
||||||
|
{170000, 174999, nil},
|
||||||
|
{175000, 179999, nil},
|
||||||
|
{180000, 184999, nil},
|
||||||
|
{185000, 189999, nil},
|
||||||
|
{190000, 194999, nil},
|
||||||
|
{195000, 199999, nil},
|
||||||
|
{200000, 201000, nil},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Invalid range",
|
||||||
|
Input: [3]uint{200, 100, 50},
|
||||||
|
ExpectedOutput: []*monitor.BlocksRange{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Invalid range 2",
|
||||||
|
Input: [3]uint{200, 100, 500},
|
||||||
|
ExpectedOutput: []*monitor.BlocksRange{},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Logf("Running sub-test %q", test.Name)
|
||||||
|
res := monitor.SplitBlockRange(test.Input[0], test.Input[1], test.Input[2])
|
||||||
|
require.Equal(t, test.ExpectedOutput, res, "Failed %s", test.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSplitLogsInBatches(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
for _, test := range []struct {
|
||||||
|
Name string
|
||||||
|
Input []*entity.Log
|
||||||
|
ExpectedOutput []*monitor.LogsBatch
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "Split range in two",
|
||||||
|
Input: []*entity.Log{
|
||||||
|
{ID: 1, BlockNumber: 100},
|
||||||
|
{ID: 2, BlockNumber: 100},
|
||||||
|
{ID: 3, BlockNumber: 150},
|
||||||
|
{ID: 4, BlockNumber: 150},
|
||||||
|
},
|
||||||
|
ExpectedOutput: []*monitor.LogsBatch{
|
||||||
|
{100, []*entity.Log{
|
||||||
|
{ID: 1, BlockNumber: 100},
|
||||||
|
{ID: 2, BlockNumber: 100},
|
||||||
|
}},
|
||||||
|
{150, []*entity.Log{
|
||||||
|
{ID: 3, BlockNumber: 150},
|
||||||
|
{ID: 4, BlockNumber: 150},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Split range in three",
|
||||||
|
Input: []*entity.Log{
|
||||||
|
{ID: 1, BlockNumber: 100},
|
||||||
|
{ID: 2, BlockNumber: 120},
|
||||||
|
{ID: 3, BlockNumber: 120},
|
||||||
|
{ID: 4, BlockNumber: 150},
|
||||||
|
},
|
||||||
|
ExpectedOutput: []*monitor.LogsBatch{
|
||||||
|
{100, []*entity.Log{
|
||||||
|
{ID: 1, BlockNumber: 100},
|
||||||
|
}},
|
||||||
|
{120, []*entity.Log{
|
||||||
|
{ID: 2, BlockNumber: 120},
|
||||||
|
{ID: 3, BlockNumber: 120},
|
||||||
|
}},
|
||||||
|
{150, []*entity.Log{
|
||||||
|
{ID: 4, BlockNumber: 150},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Leave empty range",
|
||||||
|
Input: []*entity.Log{},
|
||||||
|
ExpectedOutput: []*monitor.LogsBatch{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Keep single range of one log as is",
|
||||||
|
Input: []*entity.Log{
|
||||||
|
{ID: 1, BlockNumber: 100},
|
||||||
|
},
|
||||||
|
ExpectedOutput: []*monitor.LogsBatch{
|
||||||
|
{100, []*entity.Log{
|
||||||
|
{ID: 1, BlockNumber: 100},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "Keep single range as is",
|
||||||
|
Input: []*entity.Log{
|
||||||
|
{ID: 1, BlockNumber: 100},
|
||||||
|
{ID: 2, BlockNumber: 100},
|
||||||
|
{ID: 3, BlockNumber: 100},
|
||||||
|
},
|
||||||
|
ExpectedOutput: []*monitor.LogsBatch{
|
||||||
|
{100, []*entity.Log{
|
||||||
|
{ID: 1, BlockNumber: 100},
|
||||||
|
{ID: 2, BlockNumber: 100},
|
||||||
|
{ID: 3, BlockNumber: 100},
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Logf("Running sub-test %q", test.Name)
|
||||||
|
res := monitor.SplitLogsInBatches(test.Input)
|
||||||
|
require.Equal(t, test.ExpectedOutput, res, "Failed %s", test.Name)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue