Separate command for events reprocessing

This commit is contained in:
Kirill Fedoseev 2022-05-25 23:47:30 +04:00
parent bf6042d916
commit 04eab2e856
9 changed files with 227 additions and 109 deletions

View File

@ -0,0 +1,106 @@
package main
import (
"context"
"flag"
"os"
"os/signal"
"github.com/sirupsen/logrus"
"github.com/poanetwork/tokenbridge-monitor/config"
"github.com/poanetwork/tokenbridge-monitor/db"
"github.com/poanetwork/tokenbridge-monitor/ethclient"
"github.com/poanetwork/tokenbridge-monitor/logging"
"github.com/poanetwork/tokenbridge-monitor/monitor"
"github.com/poanetwork/tokenbridge-monitor/repository"
)
var (
bridgeID = flag.String("bridgeId", "", "bridgeId to reprocess message in")
home = flag.Bool("home", false, "reprocess home messages")
foreign = flag.Bool("foreign", false, "reprocess foreign messages")
fromBlock = flag.Uint("fromBlock", 0, "starting block")
toBlock = flag.Uint("toBlock", 0, "ending block")
)
func main() {
flag.Parse()
logger := logging.New()
cfg, err := config.ReadConfig()
if err != nil {
logger.WithError(err).Fatal("can't read config")
}
logger.SetLevel(cfg.LogLevel)
if *bridgeID == "" {
logger.Fatal("bridgeId is not specified")
}
if *home == *foreign {
logger.Fatal("exactly one of --home or --foreign should be specified")
}
bridgeCfg, ok := cfg.Bridges[*bridgeID]
if !ok || bridgeCfg == nil {
logger.WithField("bridge_id", *bridgeID).Fatal("bridge config for given bridgeId is not found")
}
sideCfg := bridgeCfg.Foreign
if *home {
sideCfg = bridgeCfg.Home
}
if *fromBlock < sideCfg.StartBlock {
fromBlock = &sideCfg.StartBlock
}
if *toBlock == 0 {
logger.Fatal("toBlock is not specified")
}
if *toBlock < *fromBlock {
logger.WithFields(logrus.Fields{
"from_block": *fromBlock,
"to_block": *toBlock,
}).Fatal("toBlock < fromBlock is not specified")
}
dbConn, err := db.NewDB(cfg.DBConfig)
if err != nil {
logger.WithError(err).Fatal("can't connect to database")
}
defer dbConn.Close()
if err = dbConn.Migrate(); err != nil {
logger.WithError(err).Fatal("can't run database migrations")
}
ctx, cancel := context.WithCancel(context.Background())
repo := repository.NewRepo(dbConn)
bridgeLogger := logger.WithField("bridge_id", bridgeCfg.ID)
homeClient, err2 := ethclient.NewClient(bridgeCfg.Home.Chain.RPC.Host, bridgeCfg.Home.Chain.RPC.Timeout, bridgeCfg.Home.Chain.ChainID)
if err2 != nil {
bridgeLogger.WithError(err2).Fatal("can't dial home rpc client")
}
foreignClient, err2 := ethclient.NewClient(bridgeCfg.Foreign.Chain.RPC.Host, bridgeCfg.Foreign.Chain.RPC.Timeout, bridgeCfg.Foreign.Chain.ChainID)
if err2 != nil {
bridgeLogger.WithError(err2).Fatal("can't dial foreign rpc client")
}
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
for range c {
cancel()
logger.Warn("caught CTRL-C, gracefully terminating")
return
}
}()
m, err2 := monitor.NewMonitor(ctx, bridgeLogger, dbConn, repo, bridgeCfg, homeClient, foreignClient)
if err2 != nil {
bridgeLogger.WithError(err2).Fatal("can't initialize bridge monitor")
}
err = m.ProcessBlockRange(ctx, *home, *fromBlock, *toBlock)
if err != nil {
logger.WithError(err).Fatal("can't manually process block range")
}
}

View File

@ -230,25 +230,6 @@
"max_block_range_size": {
"type": "integer"
},
"refetch_events": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"properties": {
"event": {
"type": "string"
},
"start_block": {
"type": "integer"
},
"end_block": {
"type": "integer"
}
},
"additionalProperties": false
}
},
"whitelisted_senders": {
"type": "array",
"minItems": 1,

View File

@ -26,12 +26,6 @@ type ChainConfig struct {
SafeLogsRequest bool `yaml:"safe_logs_request"`
}
type ReloadJobConfig struct {
Event string `yaml:"event"`
StartBlock uint `yaml:"start_block"`
EndBlock uint `yaml:"end_block"`
}
type TokenConfig struct {
Address common.Address `yaml:"address"`
StartBlock uint `yaml:"start_block"`
@ -40,16 +34,15 @@ type TokenConfig struct {
}
type BridgeSideConfig struct {
ChainName string `yaml:"chain"`
Chain *ChainConfig `yaml:"-"`
Address common.Address `yaml:"address"`
ValidatorContractAddress common.Address `yaml:"validator_contract_address"`
StartBlock uint `yaml:"start_block"`
BlockConfirmations uint `yaml:"required_block_confirmations"`
MaxBlockRangeSize uint `yaml:"max_block_range_size"`
RefetchEvents []*ReloadJobConfig `yaml:"refetch_events"`
WhitelistedSenders []common.Address `yaml:"whitelisted_senders"`
ErcToNativeTokens []TokenConfig `yaml:"erc_to_native_tokens"`
ChainName string `yaml:"chain"`
Chain *ChainConfig `yaml:"-"`
Address common.Address `yaml:"address"`
ValidatorContractAddress common.Address `yaml:"validator_contract_address"`
StartBlock uint `yaml:"start_block"`
BlockConfirmations uint `yaml:"required_block_confirmations"`
MaxBlockRangeSize uint `yaml:"max_block_range_size"`
WhitelistedSenders []common.Address `yaml:"whitelisted_senders"`
ErcToNativeTokens []TokenConfig `yaml:"erc_to_native_tokens"`
}
type BridgeAlertConfig struct {

View File

@ -25,6 +25,13 @@ services:
- .env
volumes:
- ./config.yml:/app/config.yml
reprocess_block_range:
build: .
entrypoint: [ "./reprocess_block_range" ]
env_file:
- .env
volumes:
- ./config.yml:/app/config.yml
grafana:
image: grafana/grafana:8.1.5
volumes:

View File

@ -102,6 +102,14 @@ services:
- .env
volumes:
- ./config.yml:/app/config.yml
reprocess_block_range:
container_name: fix_block_timestamps
image: ghcr.io/poanetwork/tokenbridge-monitor:v0.0.28
entrypoint: [ "./reprocess_block_range" ]
env_file:
- .env
volumes:
- ./config.yml:/app/config.yml
volumes:
db:
grafana-storage:

View File

@ -12,7 +12,6 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
@ -137,6 +136,57 @@ func (m *ContractMonitor) Start(ctx context.Context) {
go m.StartLogsFetcher(ctx)
}
func (m *ContractMonitor) ProcessBlockRange(ctx context.Context, fromBlock, toBlock uint) error {
if toBlock > m.logsCursor.LastProcessedBlock {
return errors.New("can't manually process logs further then current lastProcessedBlock")
}
wg := sync.WaitGroup{}
wg.Add(1)
m.blocksRangeChan <- nil
go func() {
defer wg.Done()
batches := SplitBlockRange(fromBlock, toBlock, m.cfg.MaxBlockRangeSize)
for _, batch := range batches {
m.logger.WithFields(logrus.Fields{
"from_block": batch.From,
"to_block": batch.To,
}).Info("scheduling new block range logs search")
select {
case m.blocksRangeChan <- batch:
case <-ctx.Done():
return
}
}
m.blocksRangeChan <- nil
}()
go m.StartLogsFetcher(ctx)
go m.StartLogsProcessor(ctx)
finishedFetching := false
for {
if !finishedFetching && len(m.blocksRangeChan) == 0 {
// last nil from m.blocksRangeChan was consumed, meaning that all previous values were already handled
// and log batches were sent to processing queue
m.logger.Info("all block ranges were processed, submitting stub logs batch")
finishedFetching = true
m.logsChan <- nil
}
if finishedFetching && len(m.logsChan) == 0 {
// last nil from m.logsChan was consumed, meaning that all previous values were already processed
// there is nothing to process, so exit
m.logger.Info("all logs batches were processed, exiting")
return nil
}
if utils.ContextSleep(ctx, time.Second) == nil {
return ctx.Err()
}
}
}
func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, toBlock uint) {
m.logger.WithFields(logrus.Fields{
"from_block": fromBlock,
@ -167,10 +217,6 @@ func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, to
func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) {
m.logger.Info("starting new blocks tracker")
if len(m.cfg.RefetchEvents) > 0 {
m.RefetchEvents(start - 1)
}
for {
head, err := m.client.BlockNumber(ctx)
if err != nil {
@ -196,33 +242,6 @@ func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) {
}
}
func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) {
m.logger.Info("refetching old events")
for _, job := range m.cfg.RefetchEvents {
fromBlock := job.StartBlock
if fromBlock < m.cfg.StartBlock {
fromBlock = m.cfg.StartBlock
}
toBlock := job.EndBlock
if toBlock == 0 || toBlock > lastFetchedBlock {
toBlock = lastFetchedBlock
}
batches := SplitBlockRange(fromBlock, toBlock, m.cfg.MaxBlockRangeSize)
for _, batch := range batches {
m.logger.WithFields(logrus.Fields{
"from_block": batch.From,
"to_block": batch.To,
}).Info("scheduling new block range logs search")
if job.Event != "" {
topic := crypto.Keccak256Hash([]byte(job.Event))
batch.Topic = &topic
}
m.blocksRangeChan <- batch
}
}
}
func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) {
m.logger.Info("starting logs fetcher")
for {
@ -230,6 +249,9 @@ func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) {
case <-ctx.Done():
return
case blocksRange := <-m.blocksRangeChan:
if blocksRange == nil {
continue
}
for {
err := m.tryToFetchLogs(ctx, blocksRange)
if err != nil {
@ -255,9 +277,6 @@ func (m *ContractMonitor) buildFilterQueries(blocksRange *BlocksRange) []ethereu
ToBlock: big.NewInt(int64(blocksRange.To)),
Addresses: []common.Address{m.cfg.Address, m.cfg.ValidatorContractAddress},
}
if blocksRange.Topic != nil {
q.Topics = [][]common.Hash{{*blocksRange.Topic}}
}
queries = append(queries, q)
if m.bridgeCfg.BridgeMode == config.BridgeModeErcToNative {
for _, token := range m.cfg.ErcToNativeTokens {
@ -270,9 +289,6 @@ func (m *ContractMonitor) buildFilterQueries(blocksRange *BlocksRange) []ethereu
Addresses: []common.Address{token.Address},
Topics: [][]common.Hash{{}, {}, {m.cfg.Address.Hash()}},
}
if blocksRange.Topic != nil {
q.Topics[0] = []common.Hash{*blocksRange.Topic}
}
if token.StartBlock > blocksRange.From {
q.FromBlock = big.NewInt(int64(token.StartBlock))
}
@ -359,6 +375,9 @@ func (m *ContractMonitor) StartLogsProcessor(ctx context.Context) {
case <-ctx.Done():
return
case logs := <-m.logsChan:
if logs == nil {
continue
}
wg := new(sync.WaitGroup)
wg.Add(2)
go func() {

View File

@ -108,6 +108,13 @@ func (m *Monitor) Start(ctx context.Context) {
go m.alertManager.Start(ctx, m.IsSynced)
}
func (m *Monitor) ProcessBlockRange(ctx context.Context, home bool, fromBlock, toBlock uint) error {
if home {
return m.homeMonitor.ProcessBlockRange(ctx, fromBlock, toBlock)
}
return m.foreignMonitor.ProcessBlockRange(ctx, fromBlock, toBlock)
}
func (m *Monitor) IsSynced() bool {
return m.homeMonitor.IsSynced() && m.foreignMonitor.IsSynced()
}

View File

@ -4,17 +4,14 @@ import (
"errors"
"math"
"github.com/ethereum/go-ethereum/common"
"github.com/poanetwork/tokenbridge-monitor/entity"
)
var ErrWrongArgumentType = errors.New("argument has unexpected type")
type BlocksRange struct {
From uint
To uint
Topic *common.Hash
From uint
To uint
}
type LogsBatch struct {

View File

@ -21,73 +21,73 @@ func TestSplitBlockRange(t *testing.T) {
Name: "Split range in two",
Input: [3]uint{100, 199, 50},
ExpectedOutput: []*monitor.BlocksRange{
{100, 149, nil},
{150, 199, nil},
{100, 149},
{150, 199},
},
},
{
Name: "Split range in two 2",
Input: [3]uint{100, 200, 90},
ExpectedOutput: []*monitor.BlocksRange{
{100, 189, nil},
{190, 200, nil},
{100, 189},
{190, 200},
},
},
{
Name: "Split range in three",
Input: [3]uint{100, 200, 50},
ExpectedOutput: []*monitor.BlocksRange{
{100, 149, nil},
{150, 199, nil},
{200, 200, nil},
{100, 149},
{150, 199},
{200, 200},
},
},
{
Name: "Keep range as is",
Input: [3]uint{100, 200, 101},
ExpectedOutput: []*monitor.BlocksRange{
{100, 200, nil},
{100, 200},
},
},
{
Name: "Keep range as is 2",
Input: [3]uint{100, 200, 999},
ExpectedOutput: []*monitor.BlocksRange{
{100, 200, nil},
{100, 200},
},
},
{
Name: "Keep range of one block",
Input: [3]uint{100, 100, 10},
ExpectedOutput: []*monitor.BlocksRange{
{100, 100, nil},
{100, 100},
},
},
{
Name: "Split range in many subranges",
Input: [3]uint{100000, 201000, 5000},
ExpectedOutput: []*monitor.BlocksRange{
{100000, 104999, nil},
{105000, 109999, nil},
{110000, 114999, nil},
{115000, 119999, nil},
{120000, 124999, nil},
{125000, 129999, nil},
{130000, 134999, nil},
{135000, 139999, nil},
{140000, 144999, nil},
{145000, 149999, nil},
{150000, 154999, nil},
{155000, 159999, nil},
{160000, 164999, nil},
{165000, 169999, nil},
{170000, 174999, nil},
{175000, 179999, nil},
{180000, 184999, nil},
{185000, 189999, nil},
{190000, 194999, nil},
{195000, 199999, nil},
{200000, 201000, nil},
{100000, 104999},
{105000, 109999},
{110000, 114999},
{115000, 119999},
{120000, 124999},
{125000, 129999},
{130000, 134999},
{135000, 139999},
{140000, 144999},
{145000, 149999},
{150000, 154999},
{155000, 159999},
{160000, 164999},
{165000, 169999},
{170000, 174999},
{175000, 179999},
{180000, 184999},
{185000, 189999},
{190000, 194999},
{195000, 199999},
{200000, 201000},
},
},
{