diff --git a/entity/log.go b/entity/log.go index 3e4071e..4d4ffc5 100644 --- a/entity/log.go +++ b/entity/log.go @@ -5,6 +5,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" ) type Log struct { @@ -30,3 +31,27 @@ type LogsRepo interface { FindByBlockNumber(ctx context.Context, chainID string, block uint) ([]*Log, error) FindByTxHash(ctx context.Context, txHash common.Hash) ([]*Log, error) } + +func NewLog(chainID string, log types.Log) *Log { + e := &Log{ + ChainID: chainID, + Address: log.Address, + Data: log.Data, + BlockNumber: uint(log.BlockNumber), + LogIndex: log.Index, + TransactionHash: log.TxHash, + } + if len(log.Topics) > 0 { + e.Topic0 = &log.Topics[0] + if len(log.Topics) > 1 { + e.Topic1 = &log.Topics[1] + if len(log.Topics) > 2 { + e.Topic2 = &log.Topics[2] + if len(log.Topics) > 3 { + e.Topic3 = &log.Topics[3] + } + } + } + } + return e +} diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 513be61..0344291 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -51,14 +51,14 @@ func NewClient(url string, timeout time.Duration, chainID string) (*Client, erro return client, nil } -func (c *Client) BlockNumber(ctx context.Context) (uint64, error) { +func (c *Client) BlockNumber(ctx context.Context) (uint, error) { defer ObserveDuration(c.ChainID, c.url, "eth_blockNumber")() ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() n, err := c.client.BlockNumber(ctx) ObserveError(c.ChainID, c.url, "eth_blockNumber", err) - return n, err + return uint(n), err } func (c *Client) HeaderByNumber(ctx context.Context, n uint) (*types.Header, error) { diff --git a/monitor/contract_monitor.go b/monitor/contract_monitor.go new file mode 100644 index 0000000..6ef59ff --- /dev/null +++ b/monitor/contract_monitor.go @@ -0,0 +1,541 @@ +package monitor + +import ( + "context" + "database/sql" + "errors" + "fmt" + "math" + "math/big" + "sort" + "sync" + "time" + "tokenbridge-monitor/config" + "tokenbridge-monitor/contract" + "tokenbridge-monitor/contract/abi" + "tokenbridge-monitor/entity" + "tokenbridge-monitor/ethclient" + "tokenbridge-monitor/logging" + "tokenbridge-monitor/repository" + "tokenbridge-monitor/utils" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +const defaultSyncedThreshold = 10 +const defaultBlockRangesChanCap = 10 +const defaultLogsChanCap = 200 +const defaultEventHandlersMapCap = 20 + +type ContractMonitor struct { + bridgeCfg *config.BridgeConfig + cfg *config.BridgeSideConfig + logger logging.Logger + repo *repository.Repo + client *ethclient.Client + logsCursor *entity.LogsCursor + blocksRangeChan chan *BlocksRange + logsChan chan *LogsBatch + contract *contract.Contract + eventHandlers map[string]EventHandler + headBlock uint + isSynced bool + syncedMetric prometheus.Gauge + headBlockMetric prometheus.Gauge + fetchedBlockMetric prometheus.Gauge + processedBlockMetric prometheus.Gauge +} + +func NewContractMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, bridgeCfg *config.BridgeConfig, cfg *config.BridgeSideConfig) (*ContractMonitor, error) { + client, err := ethclient.NewClient(cfg.Chain.RPC.Host, cfg.Chain.RPC.Timeout, cfg.Chain.ChainID) + if err != nil { + return nil, fmt.Errorf("failed to start eth client: %w", err) + } + contractAbi := abi.AMB + if bridgeCfg.IsErcToNative { + contractAbi = abi.ERC_TO_NATIVE + } + bridgeContract := contract.NewContract(client, cfg.Address, contractAbi) + if cfg.ValidatorContractAddress == (common.Address{}) { + cfg.ValidatorContractAddress, err = bridgeContract.ValidatorContractAddress(ctx) + if err != nil { + return nil, fmt.Errorf("cannot get validator contract address: %w", err) + } + logger.WithFields(logrus.Fields{ + "chain_id": client.ChainID, + "bridge_address": cfg.Address, + "validator_contract_address": cfg.ValidatorContractAddress, + "start_block": cfg.StartBlock, + }).Info("obtained validator contract address") + } + logsCursor, err := repo.LogsCursors.GetByChainIDAndAddress(ctx, client.ChainID, cfg.Address) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + logger.WithFields(logrus.Fields{ + "chain_id": client.ChainID, + "address": cfg.Address, + "start_block": cfg.StartBlock, + }).Warn("contract cursor is not present, staring indexing from scratch") + logsCursor = &entity.LogsCursor{ + ChainID: client.ChainID, + Address: cfg.Address, + LastFetchedBlock: cfg.StartBlock - 1, + LastProcessedBlock: cfg.StartBlock - 1, + } + } else { + return nil, fmt.Errorf("failed to read logs cursor: %w", err) + } + } + commonLabels := prometheus.Labels{ + "bridge_id": bridgeCfg.ID, + "chain_id": client.ChainID, + "address": cfg.Address.String(), + } + return &ContractMonitor{ + logger: logger, + bridgeCfg: bridgeCfg, + cfg: cfg, + repo: repo, + client: client, + logsCursor: logsCursor, + blocksRangeChan: make(chan *BlocksRange, defaultBlockRangesChanCap), + logsChan: make(chan *LogsBatch, defaultLogsChanCap), + contract: bridgeContract, + eventHandlers: make(map[string]EventHandler, defaultEventHandlersMapCap), + syncedMetric: SyncedContract.With(commonLabels), + headBlockMetric: LatestHeadBlock.With(commonLabels), + fetchedBlockMetric: LatestFetchedBlock.With(commonLabels), + processedBlockMetric: LatestProcessedBlock.With(commonLabels), + }, nil +} + +func (m *ContractMonitor) IsSynced() bool { + return m.isSynced +} + +func (m *ContractMonitor) RegisterEventHandler(event string, handler EventHandler) { + m.eventHandlers[event] = handler +} + +func (m *ContractMonitor) VerifyEventHandlersABI() error { + events := m.contract.AllEvents() + for e := range m.eventHandlers { + if !events[e] { + return fmt.Errorf("contract does not have %s event in its ABI", e) + } + } + return nil +} + +func (m *ContractMonitor) Start(ctx context.Context) { + lastProcessedBlock := m.logsCursor.LastProcessedBlock + lastFetchedBlock := m.logsCursor.LastFetchedBlock + go m.StartBlockFetcher(ctx, lastFetchedBlock+1) + go m.StartLogsProcessor(ctx) + m.LoadUnprocessedLogs(ctx, lastProcessedBlock+1, lastFetchedBlock) + go m.StartLogsFetcher(ctx) +} + +func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, toBlock uint) { + m.logger.WithFields(logrus.Fields{ + "from_block": fromBlock, + "to_block": toBlock, + }).Info("loading fetched but not yet processed blocks") + + addresses := m.cfg.ContractAddresses(fromBlock, toBlock) + for { + logs, err := m.repo.Logs.FindByBlockRange(ctx, m.client.ChainID, addresses, fromBlock, toBlock) + if err != nil { + m.logger.WithError(err).Error("can't find unprocessed logs in block range") + } else { + m.submitLogs(logs, toBlock) + break + } + + if utils.ContextSleep(ctx, 10*time.Second) == nil { + return + } + } +} + +func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) { + m.logger.Info("starting new blocks tracker") + + if len(m.cfg.RefetchEvents) > 0 { + m.RefetchEvents(start - 1) + } + + for { + head, err := m.client.BlockNumber(ctx) + if err != nil { + m.logger.WithError(err).Error("can't fetch latest block number") + } else { + head -= m.cfg.BlockConfirmations + m.recordHeadBlockNumber(head) + + for start <= head { + end := start + m.cfg.MaxBlockRangeSize - 1 + if end > head { + end = head + } + m.logger.WithFields(logrus.Fields{ + "from_block": start, + "to_block": end, + }).Info("scheduling new block range logs search") + m.blocksRangeChan <- &BlocksRange{ + From: start, + To: end, + } + start = end + 1 + } + } + + if utils.ContextSleep(ctx, m.cfg.Chain.BlockIndexInterval) == nil { + return + } + } +} + +func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) { + m.logger.Info("refetching old events") + for _, job := range m.cfg.RefetchEvents { + fromBlock := job.StartBlock + if fromBlock < m.cfg.StartBlock { + fromBlock = m.cfg.StartBlock + } + toBlock := job.EndBlock + if toBlock == 0 || toBlock > lastFetchedBlock { + toBlock = lastFetchedBlock + } + + for fromBlock <= toBlock { + end := fromBlock + m.cfg.MaxBlockRangeSize - 1 + if end > toBlock { + end = toBlock + } + m.logger.WithFields(logrus.Fields{ + "from_block": fromBlock, + "to_block": end, + }).Info("scheduling new block range logs search") + br := &BlocksRange{ + From: fromBlock, + To: end, + } + if job.Event != "" { + topic := crypto.Keccak256Hash([]byte(job.Event)) + br.Topic = &topic + } + m.blocksRangeChan <- br + fromBlock = end + 1 + } + } +} + +func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) { + m.logger.Info("starting logs fetcher") + for { + select { + case <-ctx.Done(): + return + case blocksRange := <-m.blocksRangeChan: + for { + err := m.tryToFetchLogs(ctx, blocksRange) + if err != nil { + m.logger.WithError(err).WithFields(logrus.Fields{ + "from_block": blocksRange.From, + "to_block": blocksRange.To, + }).Error("failed logs fetching, retrying") + if utils.ContextSleep(ctx, 10*time.Second) == nil { + return + } + continue + } + break + } + } + } +} + +func (m *ContractMonitor) buildFilterQueries(blocksRange *BlocksRange) []ethereum.FilterQuery { + var qs []ethereum.FilterQuery + q := ethereum.FilterQuery{ + FromBlock: big.NewInt(int64(blocksRange.From)), + ToBlock: big.NewInt(int64(blocksRange.To)), + Addresses: []common.Address{m.cfg.Address, m.cfg.ValidatorContractAddress}, + } + if blocksRange.Topic != nil { + q.Topics = [][]common.Hash{{*blocksRange.Topic}} + } + qs = append(qs, q) + if m.bridgeCfg.IsErcToNative { + for _, token := range m.cfg.ErcToNativeTokens { + if token.StartBlock > 0 && blocksRange.To < token.StartBlock { + continue + } + if token.EndBlock > 0 && blocksRange.From > token.EndBlock { + continue + } + qc := q + if blocksRange.Topic != nil { + qc.Topics = [][]common.Hash{{*blocksRange.Topic}, {}, {m.cfg.Address.Hash()}} + } else { + qc.Topics = [][]common.Hash{{}, {}, {m.cfg.Address.Hash()}} + } + qc.Addresses = []common.Address{token.Address} + if token.StartBlock > 0 && token.StartBlock > blocksRange.From { + qc.FromBlock = big.NewInt(int64(token.StartBlock)) + } + if token.EndBlock > 0 && token.EndBlock < blocksRange.To { + qc.ToBlock = big.NewInt(int64(token.EndBlock)) + } + qs = append(qs, qc) + } + } + return qs +} + +func (m *ContractMonitor) tryToFetchLogs(ctx context.Context, blocksRange *BlocksRange) error { + qs := m.buildFilterQueries(blocksRange) + var logs []*entity.Log + var logsBatch []types.Log + var err error + for _, q := range qs { + if m.cfg.Chain.SafeLogsRequest { + logsBatch, err = m.client.FilterLogsSafe(ctx, q) + } else { + logsBatch, err = m.client.FilterLogs(ctx, q) + } + if err != nil { + return err + } + for _, log := range logsBatch { + logs = append(logs, entity.NewLog(m.cfg.Chain.ChainID, log)) + } + } + sort.Slice(logs, func(i, j int) bool { + a, b := logs[i], logs[j] + return a.BlockNumber < b.BlockNumber || (a.BlockNumber == b.BlockNumber && a.LogIndex < b.LogIndex) + }) + m.logger.WithFields(logrus.Fields{ + "count": len(logs), + "from_block": blocksRange.From, + "to_block": blocksRange.To, + }).Info("fetched logs in range") + if len(logs) > 0 { + err = m.repo.Logs.Ensure(ctx, logs...) + if err != nil { + return err + } + m.logger.WithFields(logrus.Fields{ + "count": len(logs), + "from_block": blocksRange.From, + "to_block": blocksRange.To, + }).Info("saved logs") + } + if err = m.recordFetchedBlockNumber(ctx, blocksRange.To); err != nil { + return err + } + + m.submitLogs(logs, blocksRange.To) + return nil +} + +func (m *ContractMonitor) submitLogs(logs []*entity.Log, endBlock uint) { + jobs, lastBlock := 0, uint(0) + for _, log := range logs { + if log.BlockNumber > lastBlock { + lastBlock = log.BlockNumber + jobs++ + } + } + m.logger.WithFields(logrus.Fields{ + "count": len(logs), + "jobs": jobs, + }).Info("create jobs for logs processor") + // fake log to simplify loop, it will be skipped + logs = append(logs, &entity.Log{BlockNumber: math.MaxUint32}) + batchStartIndex := 0 + for i, log := range logs { + if log.BlockNumber > logs[batchStartIndex].BlockNumber { + m.logger.WithFields(logrus.Fields{ + "count": i - batchStartIndex, + "block_number": logs[batchStartIndex].BlockNumber, + }).Debug("submitting logs batch to logs processor") + m.logsChan <- &LogsBatch{ + BlockNumber: logs[batchStartIndex].BlockNumber, + Logs: logs[batchStartIndex:i], + } + batchStartIndex = i + } + } + if lastBlock < endBlock { + m.logsChan <- &LogsBatch{ + BlockNumber: endBlock, + Logs: nil, + } + } +} + +func (m *ContractMonitor) StartLogsProcessor(ctx context.Context) { + m.logger.Info("starting logs processor") + for { + select { + case <-ctx.Done(): + return + case logs := <-m.logsChan: + wg := new(sync.WaitGroup) + wg.Add(2) + go func() { + defer wg.Done() + for { + err := m.tryToGetBlockTimestamp(ctx, logs.BlockNumber) + if err != nil { + m.logger.WithError(err).WithFields(logrus.Fields{ + "block_number": logs.BlockNumber, + }).Error("failed to get block timestamp, retrying") + continue + } + return + } + }() + + go func() { + defer wg.Done() + for { + err := m.tryToProcessLogsBatch(ctx, logs) + if err != nil { + m.logger.WithError(err).WithFields(logrus.Fields{ + "block_number": logs.BlockNumber, + "count": len(logs.Logs), + }).Error("failed to process logs batch, retrying") + continue + } + return + } + }() + wg.Wait() + + for { + err := m.recordProcessedBlockNumber(ctx, logs.BlockNumber) + if err != nil { + m.logger.WithError(err).WithField("block_number", logs.BlockNumber). + Error("failed to update latest processed block number, retrying") + if utils.ContextSleep(ctx, 10*time.Second) == nil { + return + } + continue + } + break + } + } + } +} + +func (m *ContractMonitor) tryToGetBlockTimestamp(ctx context.Context, blockNumber uint) error { + ts, err := m.repo.BlockTimestamps.GetByBlockNumber(ctx, m.client.ChainID, blockNumber) + if err != nil { + return fmt.Errorf("can't get block timestamp from db: %w", err) + } + if ts != nil { + m.logger.WithField("block_number", blockNumber).Debug("timestamp already exists, skipping") + return nil + } + m.logger.WithField("block_number", blockNumber).Debug("fetching block timestamp") + header, err := m.client.HeaderByNumber(ctx, blockNumber) + if err != nil { + return fmt.Errorf("can't request block header: %w", err) + } + return m.repo.BlockTimestamps.Ensure(ctx, &entity.BlockTimestamp{ + ChainID: m.client.ChainID, + BlockNumber: blockNumber, + Timestamp: time.Unix(int64(header.Time), 0), + }) +} + +func (m *ContractMonitor) tryToProcessLogsBatch(ctx context.Context, batch *LogsBatch) error { + m.logger.WithFields(logrus.Fields{ + "count": len(batch.Logs), + "block_number": batch.BlockNumber, + }).Debug("processing logs batch") + for _, log := range batch.Logs { + event, data, err := m.contract.ParseLog(log) + if err != nil { + return fmt.Errorf("can't parse log: %w", err) + } + handle, ok := m.eventHandlers[event] + if !ok { + if event == "" { + event = log.Topic0.String() + } + m.logger.WithFields(logrus.Fields{ + "event": event, + "log_id": log.ID, + "block_number": log.BlockNumber, + "tx_hash": log.TransactionHash, + "log_index": log.LogIndex, + }).Warn("received unknown event") + continue + } + m.logger.WithFields(logrus.Fields{ + "event": event, + "log_id": log.ID, + }).Trace("handling event") + if err = handle(ctx, log, data); err != nil { + return err + } + } + return nil +} + +func (m *ContractMonitor) recordHeadBlockNumber(blockNumber uint) { + if blockNumber < m.headBlock { + return + } + + m.headBlock = blockNumber + m.headBlockMetric.Set(float64(blockNumber)) + m.recordIsSynced() +} + +func (m *ContractMonitor) recordIsSynced() { + m.isSynced = m.logsCursor.LastProcessedBlock+defaultSyncedThreshold > m.headBlock + if m.isSynced { + m.syncedMetric.Set(1) + } else { + m.syncedMetric.Set(0) + } +} + +func (m *ContractMonitor) recordFetchedBlockNumber(ctx context.Context, blockNumber uint) error { + if blockNumber < m.logsCursor.LastFetchedBlock { + return nil + } + + m.logsCursor.LastFetchedBlock = blockNumber + m.fetchedBlockMetric.Set(float64(blockNumber)) + err := m.repo.LogsCursors.Ensure(ctx, m.logsCursor) + if err != nil { + return err + } + return nil +} + +func (m *ContractMonitor) recordProcessedBlockNumber(ctx context.Context, blockNumber uint) error { + if blockNumber < m.logsCursor.LastProcessedBlock { + return nil + } + + m.logsCursor.LastProcessedBlock = blockNumber + m.processedBlockMetric.Set(float64(blockNumber)) + m.recordIsSynced() + err := m.repo.LogsCursors.Ensure(ctx, m.logsCursor) + if err != nil { + return err + } + return nil +} diff --git a/monitor/handlers.go b/monitor/handlers.go index 3bc8961..c7fb663 100644 --- a/monitor/handlers.go +++ b/monitor/handlers.go @@ -17,20 +17,18 @@ import ( type EventHandler func(ctx context.Context, log *entity.Log, data map[string]interface{}) error type BridgeEventHandler struct { - repo *repository.Repo - bridgeID string - homeClient *ethclient.Client - foreignClient *ethclient.Client - cfg *config.BridgeConfig + repo *repository.Repo + bridgeID string + homeClient *ethclient.Client + cfg *config.BridgeConfig } -func NewBridgeEventHandler(repo *repository.Repo, bridgeID string, homeClient, foreignClient *ethclient.Client, cfg *config.BridgeConfig) *BridgeEventHandler { +func NewBridgeEventHandler(repo *repository.Repo, cfg *config.BridgeConfig, homeClient *ethclient.Client) *BridgeEventHandler { return &BridgeEventHandler{ - repo: repo, - bridgeID: bridgeID, - homeClient: homeClient, - foreignClient: foreignClient, - cfg: cfg, + repo: repo, + bridgeID: cfg.ID, + homeClient: homeClient, + cfg: cfg, } } diff --git a/monitor/monitor.go b/monitor/monitor.go index 0e170eb..5c9abb9 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -2,51 +2,15 @@ package monitor import ( "context" - "database/sql" - "errors" "fmt" - "math/big" - "sort" - "sync" - "time" "tokenbridge-monitor/config" - "tokenbridge-monitor/contract" "tokenbridge-monitor/contract/abi" "tokenbridge-monitor/db" - "tokenbridge-monitor/entity" - "tokenbridge-monitor/ethclient" "tokenbridge-monitor/logging" "tokenbridge-monitor/monitor/alerts" "tokenbridge-monitor/repository" - "tokenbridge-monitor/utils" - - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/math" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" ) -type ContractMonitor struct { - bridgeCfg *config.BridgeConfig - cfg *config.BridgeSideConfig - logger logging.Logger - repo *repository.Repo - client *ethclient.Client - logsCursor *entity.LogsCursor - blocksRangeChan chan *BlocksRange - logsChan chan *LogsBatch - contract *contract.Contract - eventHandlers map[string]EventHandler - headBlock uint - syncedMetric prometheus.Gauge - headBlockMetric prometheus.Gauge - fetchedBlockMetric prometheus.Gauge - processedBlockMetric prometheus.Gauge -} - type Monitor struct { cfg *config.BridgeConfig logger logging.Logger @@ -56,78 +20,13 @@ type Monitor struct { alertManager *alerts.AlertManager } -const defaultSyncedThreshold = 10 - -func newContractMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, bridgeCfg *config.BridgeConfig, cfg *config.BridgeSideConfig) (*ContractMonitor, error) { - client, err := ethclient.NewClient(cfg.Chain.RPC.Host, cfg.Chain.RPC.Timeout, cfg.Chain.ChainID) - if err != nil { - return nil, fmt.Errorf("failed to start eth client: %w", err) - } - contractAbi := abi.AMB - if bridgeCfg.IsErcToNative { - contractAbi = abi.ERC_TO_NATIVE - } - bridgeContract := contract.NewContract(client, cfg.Address, contractAbi) - if cfg.ValidatorContractAddress == (common.Address{}) { - cfg.ValidatorContractAddress, err = bridgeContract.ValidatorContractAddress(ctx) - if err != nil { - return nil, fmt.Errorf("cannot get validator contract address: %w", err) - } - logger.WithFields(logrus.Fields{ - "chain_id": client.ChainID, - "bridge_address": cfg.Address, - "validator_contract_address": cfg.ValidatorContractAddress, - "start_block": cfg.StartBlock, - }).Info("obtained validator contract address") - } - logsCursor, err := repo.LogsCursors.GetByChainIDAndAddress(ctx, client.ChainID, cfg.Address) - if err != nil { - if errors.Is(err, sql.ErrNoRows) { - logger.WithFields(logrus.Fields{ - "chain_id": client.ChainID, - "address": cfg.Address, - "start_block": cfg.StartBlock, - }).Warn("contract cursor is not present, staring indexing from scratch") - logsCursor = &entity.LogsCursor{ - ChainID: client.ChainID, - Address: cfg.Address, - LastFetchedBlock: cfg.StartBlock - 1, - LastProcessedBlock: cfg.StartBlock - 1, - } - } else { - return nil, fmt.Errorf("failed to read home logs cursor: %w", err) - } - } - commonLabels := prometheus.Labels{ - "bridge_id": bridgeCfg.ID, - "chain_id": client.ChainID, - "address": cfg.Address.String(), - } - return &ContractMonitor{ - logger: logger, - bridgeCfg: bridgeCfg, - cfg: cfg, - repo: repo, - client: client, - logsCursor: logsCursor, - blocksRangeChan: make(chan *BlocksRange, 10), - logsChan: make(chan *LogsBatch, 200), - contract: bridgeContract, - eventHandlers: make(map[string]EventHandler, 12), - syncedMetric: SyncedContract.With(commonLabels), - headBlockMetric: LatestHeadBlock.With(commonLabels), - fetchedBlockMetric: LatestFetchedBlock.With(commonLabels), - processedBlockMetric: LatestProcessedBlock.With(commonLabels), - }, nil -} - func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo *repository.Repo, cfg *config.BridgeConfig) (*Monitor, error) { logger.Info("initializing bridge monitor") - homeMonitor, err := newContractMonitor(ctx, logger.WithField("contract", "home"), repo, cfg, cfg.Home) + homeMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "home"), repo, cfg, cfg.Home) if err != nil { return nil, fmt.Errorf("failed to initialize home side monitor: %w", err) } - foreignMonitor, err := newContractMonitor(ctx, logger.WithField("contract", "foreign"), repo, cfg, cfg.Foreign) + foreignMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "foreign"), repo, cfg, cfg.Foreign) if err != nil { return nil, fmt.Errorf("failed to initialize foreign side monitor: %w", err) } @@ -135,55 +34,68 @@ func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo if err != nil { return nil, fmt.Errorf("failed to initialize alert manager: %w", err) } - handlers := NewBridgeEventHandler(repo, cfg.ID, homeMonitor.client, foreignMonitor.client, cfg) - if cfg.IsErcToNative { - homeMonitor.eventHandlers[abi.ErcToNativeUserRequestForSignature] = handlers.HandleErcToNativeUserRequestForSignature - homeMonitor.eventHandlers[abi.ErcToNativeSignedForAffirmation] = handlers.HandleErcToNativeSignedForAffirmation - homeMonitor.eventHandlers[abi.ErcToNativeAffirmationCompleted] = handlers.HandleErcToNativeAffirmationCompleted - foreignMonitor.eventHandlers[abi.ErcToNativeUserRequestForAffirmation] = handlers.HandleErcToNativeUserRequestForAffirmation - foreignMonitor.eventHandlers[abi.ErcToNativeTransfer] = handlers.HandleErcToNativeTransfer - foreignMonitor.eventHandlers[abi.ErcToNativeRelayedMessage] = handlers.HandleErcToNativeRelayedMessage - } else { - homeMonitor.eventHandlers[abi.UserRequestForSignature] = handlers.HandleUserRequestForSignature - homeMonitor.eventHandlers[abi.LegacyUserRequestForSignature] = handlers.HandleLegacyUserRequestForSignature - homeMonitor.eventHandlers[abi.SignedForAffirmation] = handlers.HandleSignedForUserRequest - homeMonitor.eventHandlers[abi.AffirmationCompleted] = handlers.HandleAffirmationCompleted - homeMonitor.eventHandlers[abi.LegacyAffirmationCompleted] = handlers.HandleAffirmationCompleted - homeMonitor.eventHandlers[abi.UserRequestForInformation] = handlers.HandleUserRequestForInformation - homeMonitor.eventHandlers[abi.SignedForInformation] = handlers.HandleSignedForInformation - homeMonitor.eventHandlers[abi.InformationRetrieved] = handlers.HandleInformationRetrieved - foreignMonitor.eventHandlers[abi.UserRequestForAffirmation] = handlers.HandleUserRequestForAffirmation - foreignMonitor.eventHandlers[abi.LegacyUserRequestForAffirmation] = handlers.HandleLegacyUserRequestForAffirmation - foreignMonitor.eventHandlers[abi.RelayedMessage] = handlers.HandleRelayedMessage - foreignMonitor.eventHandlers[abi.LegacyRelayedMessage] = handlers.HandleRelayedMessage - } - homeMonitor.eventHandlers[abi.SignedForUserRequest] = handlers.HandleSignedForUserRequest - homeMonitor.eventHandlers[abi.CollectedSignatures] = handlers.HandleCollectedSignatures - homeMonitor.eventHandlers[abi.ValidatorAdded] = handlers.HandleValidatorAdded - homeMonitor.eventHandlers[abi.ValidatorRemoved] = handlers.HandleValidatorRemoved - foreignMonitor.eventHandlers[abi.ValidatorAdded] = handlers.HandleValidatorAdded - foreignMonitor.eventHandlers[abi.ValidatorRemoved] = handlers.HandleValidatorRemoved - - homeEvents := homeMonitor.contract.AllEvents() - foreignEvents := homeMonitor.contract.AllEvents() - for e := range homeMonitor.eventHandlers { - if !homeEvents[e] { - return nil, fmt.Errorf("home side contract does not have %s event in its ABI", e) - } - } - for e := range foreignMonitor.eventHandlers { - if !foreignEvents[e] { - return nil, fmt.Errorf("foreign side contract does not have %s event in its ABI", e) - } - } - return &Monitor{ + monitor := &Monitor{ cfg: cfg, logger: logger, repo: repo, homeMonitor: homeMonitor, foreignMonitor: foreignMonitor, alertManager: alertManager, - }, nil + } + if cfg.IsErcToNative { + monitor.RegisterErcToNativeEventHandlers() + } else { + monitor.RegisterAMBEventHandlers() + } + err = monitor.homeMonitor.VerifyEventHandlersABI() + if err != nil { + return nil, fmt.Errorf("home side contract does not have %s event in its ABI", err) + } + err = monitor.foreignMonitor.VerifyEventHandlersABI() + if err != nil { + return nil, fmt.Errorf("foreign side contract does not have %s event in its ABI", err) + } + return monitor, nil +} + +func (m *Monitor) RegisterErcToNativeEventHandlers() { + handlers := NewBridgeEventHandler(m.repo, m.cfg, m.homeMonitor.client) + m.homeMonitor.RegisterEventHandler(abi.ErcToNativeUserRequestForSignature, handlers.HandleErcToNativeUserRequestForSignature) + m.homeMonitor.RegisterEventHandler(abi.SignedForUserRequest, handlers.HandleSignedForUserRequest) + m.homeMonitor.RegisterEventHandler(abi.CollectedSignatures, handlers.HandleCollectedSignatures) + m.homeMonitor.RegisterEventHandler(abi.ErcToNativeSignedForAffirmation, handlers.HandleErcToNativeSignedForAffirmation) + m.homeMonitor.RegisterEventHandler(abi.ErcToNativeAffirmationCompleted, handlers.HandleErcToNativeAffirmationCompleted) + m.homeMonitor.RegisterEventHandler(abi.ValidatorAdded, handlers.HandleValidatorAdded) + m.homeMonitor.RegisterEventHandler(abi.ValidatorRemoved, handlers.HandleValidatorRemoved) + + m.foreignMonitor.RegisterEventHandler(abi.ErcToNativeUserRequestForAffirmation, handlers.HandleErcToNativeUserRequestForAffirmation) + m.foreignMonitor.RegisterEventHandler(abi.ErcToNativeTransfer, handlers.HandleErcToNativeTransfer) + m.foreignMonitor.RegisterEventHandler(abi.ErcToNativeRelayedMessage, handlers.HandleErcToNativeRelayedMessage) + m.foreignMonitor.RegisterEventHandler(abi.ValidatorAdded, handlers.HandleValidatorAdded) + m.foreignMonitor.RegisterEventHandler(abi.ValidatorRemoved, handlers.HandleValidatorRemoved) +} + +func (m *Monitor) RegisterAMBEventHandlers() { + handlers := NewBridgeEventHandler(m.repo, m.cfg, m.homeMonitor.client) + m.homeMonitor.RegisterEventHandler(abi.UserRequestForSignature, handlers.HandleUserRequestForSignature) + m.homeMonitor.RegisterEventHandler(abi.LegacyUserRequestForSignature, handlers.HandleLegacyUserRequestForSignature) + m.homeMonitor.RegisterEventHandler(abi.SignedForUserRequest, handlers.HandleSignedForUserRequest) + m.homeMonitor.RegisterEventHandler(abi.CollectedSignatures, handlers.HandleCollectedSignatures) + m.homeMonitor.RegisterEventHandler(abi.SignedForAffirmation, handlers.HandleSignedForUserRequest) + m.homeMonitor.RegisterEventHandler(abi.AffirmationCompleted, handlers.HandleAffirmationCompleted) + m.homeMonitor.RegisterEventHandler(abi.LegacyAffirmationCompleted, handlers.HandleAffirmationCompleted) + m.homeMonitor.RegisterEventHandler(abi.UserRequestForInformation, handlers.HandleUserRequestForInformation) + m.homeMonitor.RegisterEventHandler(abi.SignedForInformation, handlers.HandleSignedForInformation) + m.homeMonitor.RegisterEventHandler(abi.InformationRetrieved, handlers.HandleInformationRetrieved) + m.homeMonitor.RegisterEventHandler(abi.ValidatorAdded, handlers.HandleValidatorAdded) + m.homeMonitor.RegisterEventHandler(abi.ValidatorRemoved, handlers.HandleValidatorRemoved) + + m.foreignMonitor.RegisterEventHandler(abi.UserRequestForAffirmation, handlers.HandleUserRequestForAffirmation) + m.foreignMonitor.RegisterEventHandler(abi.LegacyUserRequestForAffirmation, handlers.HandleLegacyUserRequestForAffirmation) + m.foreignMonitor.RegisterEventHandler(abi.RelayedMessage, handlers.HandleRelayedMessage) + m.foreignMonitor.RegisterEventHandler(abi.LegacyRelayedMessage, handlers.HandleRelayedMessage) + m.foreignMonitor.RegisterEventHandler(abi.ValidatorAdded, handlers.HandleValidatorAdded) + m.foreignMonitor.RegisterEventHandler(abi.ValidatorRemoved, handlers.HandleValidatorRemoved) } func (m *Monitor) Start(ctx context.Context) { @@ -196,440 +108,3 @@ func (m *Monitor) Start(ctx context.Context) { func (m *Monitor) IsSynced() bool { return m.homeMonitor.IsSynced() && m.foreignMonitor.IsSynced() } - -func (m *ContractMonitor) IsSynced() bool { - if m.headBlock > 0 && m.logsCursor.LastProcessedBlock+defaultSyncedThreshold > m.headBlock { - m.syncedMetric.Set(1) - return true - } - m.syncedMetric.Set(0) - return false -} - -func (m *ContractMonitor) Start(ctx context.Context) { - lastProcessedBlock := m.logsCursor.LastProcessedBlock - lastFetchedBlock := m.logsCursor.LastFetchedBlock - go m.StartBlockFetcher(ctx, lastFetchedBlock+1) - go m.StartLogsProcessor(ctx) - m.LoadUnprocessedLogs(ctx, lastProcessedBlock+1, lastFetchedBlock) - go m.StartLogsFetcher(ctx) -} - -func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, toBlock uint) { - m.logger.WithFields(logrus.Fields{ - "from_block": fromBlock, - "to_block": toBlock, - }).Info("loading fetched but not yet processed blocks") - - var logs []*entity.Log - for { - var err error - addresses := m.cfg.ContractAddresses(fromBlock, toBlock) - logs, err = m.repo.Logs.FindByBlockRange(ctx, m.client.ChainID, addresses, fromBlock, toBlock) - if err != nil { - m.logger.WithError(err).Error("can't find unprocessed logs in block range") - if utils.ContextSleep(ctx, 10*time.Second) == nil { - return - } - continue - } - break - } - - m.submitLogs(logs, toBlock) -} - -func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) { - m.logger.Info("starting new blocks tracker") - - if len(m.cfg.RefetchEvents) > 0 { - m.RefetchEvents(start - 1) - } - - for { - head, err := m.client.BlockNumber(ctx) - if err != nil { - m.logger.WithError(err).Error("can't fetch latest block number") - } else { - m.headBlock = uint(head) - m.cfg.BlockConfirmations - - if start > m.headBlock { - m.logger.WithFields(logrus.Fields{ - "head_block": m.headBlock, - "required_block_confirmations": m.cfg.BlockConfirmations, - "current_block": start, - }).Warn("latest block is behind processed block in the database, skipping") - } - m.headBlockMetric.Set(float64(m.headBlock)) - - for start <= m.headBlock { - end := start + m.cfg.MaxBlockRangeSize - 1 - if end > m.headBlock { - end = m.headBlock - } - m.logger.WithFields(logrus.Fields{ - "from_block": start, - "to_block": end, - }).Info("scheduling new block range logs search") - m.blocksRangeChan <- &BlocksRange{ - From: start, - To: end, - } - start = end + 1 - } - } - - if utils.ContextSleep(ctx, m.cfg.Chain.BlockIndexInterval) == nil { - return - } - } -} - -func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) { - m.logger.Info("refetching old events") - for _, job := range m.cfg.RefetchEvents { - fromBlock := job.StartBlock - if fromBlock < m.cfg.StartBlock { - fromBlock = m.cfg.StartBlock - } - toBlock := job.EndBlock - if toBlock == 0 || toBlock > lastFetchedBlock { - toBlock = lastFetchedBlock - } - - for fromBlock <= toBlock { - end := fromBlock + m.cfg.MaxBlockRangeSize - 1 - if end > toBlock { - end = toBlock - } - m.logger.WithFields(logrus.Fields{ - "from_block": fromBlock, - "to_block": end, - }).Info("scheduling new block range logs search") - br := &BlocksRange{ - From: fromBlock, - To: end, - } - if job.Event != "" { - topic := crypto.Keccak256Hash([]byte(job.Event)) - br.Topic = &topic - } - m.blocksRangeChan <- br - fromBlock = end + 1 - } - } -} - -func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) { - m.logger.Info("starting logs fetcher") - for { - select { - case <-ctx.Done(): - return - case blocksRange := <-m.blocksRangeChan: - for { - err := m.tryToFetchLogs(ctx, blocksRange) - if err != nil { - m.logger.WithError(err).WithFields(logrus.Fields{ - "from_block": blocksRange.From, - "to_block": blocksRange.To, - }).Error("failed logs fetching, retrying") - if utils.ContextSleep(ctx, 10*time.Second) == nil { - return - } - continue - } - break - } - } - } -} - -func (m *ContractMonitor) buildFilterQueries(blocksRange *BlocksRange) []ethereum.FilterQuery { - var qs []ethereum.FilterQuery - q := ethereum.FilterQuery{ - FromBlock: big.NewInt(int64(blocksRange.From)), - ToBlock: big.NewInt(int64(blocksRange.To)), - Addresses: []common.Address{m.cfg.Address, m.cfg.ValidatorContractAddress}, - } - if blocksRange.Topic != nil { - q.Topics = [][]common.Hash{{*blocksRange.Topic}} - } - qs = append(qs, q) - if m.bridgeCfg.IsErcToNative { - for _, token := range m.cfg.ErcToNativeTokens { - if token.StartBlock > 0 && blocksRange.To < token.StartBlock { - continue - } - if token.EndBlock > 0 && blocksRange.From > token.EndBlock { - continue - } - qc := q - if blocksRange.Topic != nil { - qc.Topics = [][]common.Hash{{*blocksRange.Topic}, {}, {m.cfg.Address.Hash()}} - } else { - qc.Topics = [][]common.Hash{{}, {}, {m.cfg.Address.Hash()}} - } - qc.Addresses = []common.Address{token.Address} - if token.StartBlock > 0 && token.StartBlock > blocksRange.From { - qc.FromBlock = big.NewInt(int64(token.StartBlock)) - } - if token.EndBlock > 0 && token.EndBlock < blocksRange.To { - qc.ToBlock = big.NewInt(int64(token.EndBlock)) - } - qs = append(qs, qc) - } - } - return qs -} - -func (m *ContractMonitor) tryToFetchLogs(ctx context.Context, blocksRange *BlocksRange) error { - qs := m.buildFilterQueries(blocksRange) - var logs, logsBatch []types.Log - var err error - for _, q := range qs { - if m.cfg.Chain.SafeLogsRequest { - logsBatch, err = m.client.FilterLogsSafe(ctx, q) - } else { - logsBatch, err = m.client.FilterLogs(ctx, q) - } - if err != nil { - return err - } - logs = append(logs, logsBatch...) - } - m.logger.WithFields(logrus.Fields{ - "count": len(logs), - "from_block": blocksRange.From, - "to_block": blocksRange.To, - }).Info("fetched logs in range") - entities := make([]*entity.Log, len(logs)) - if len(logs) > 0 { - sort.Slice(logs, func(i, j int) bool { - a, b := &logs[i], &logs[j] - return a.BlockNumber < b.BlockNumber || (a.BlockNumber == b.BlockNumber && a.Index < b.Index) - }) - for i, log := range logs { - entities[i] = m.logToEntity(log) - } - err = m.repo.Logs.Ensure(ctx, entities...) - if err != nil { - return err - } - - indexes := make([]uint, len(entities)) - for i, x := range entities { - indexes[i] = x.ID - } - m.logger.WithFields(logrus.Fields{ - "count": len(logs), - "from_block": blocksRange.From, - "to_block": blocksRange.To, - }).Info("saved logs") - } - if err = m.recordFetchedBlockNumber(ctx, blocksRange.To); err != nil { - return err - } - - m.submitLogs(entities, blocksRange.To) - return nil -} - -func (m *ContractMonitor) submitLogs(logs []*entity.Log, endBlock uint) { - jobs, lastBlock := 0, uint(0) - for _, log := range logs { - if log.BlockNumber > lastBlock { - lastBlock = log.BlockNumber - jobs++ - } - } - m.logger.WithFields(logrus.Fields{ - "count": len(logs), - "jobs": jobs, - }).Info("create jobs for logs processor") - // fake log to simplify loop, it will be skipped - logs = append(logs, &entity.Log{BlockNumber: math.MaxUint32}) - batchStartIndex := 0 - for i, log := range logs { - if log.BlockNumber > logs[batchStartIndex].BlockNumber { - m.logger.WithFields(logrus.Fields{ - "count": i - batchStartIndex, - "block_number": logs[batchStartIndex].BlockNumber, - }).Debug("submitting logs batch to logs processor") - m.logsChan <- &LogsBatch{ - BlockNumber: logs[batchStartIndex].BlockNumber, - Logs: logs[batchStartIndex:i], - } - batchStartIndex = i - } - } - if lastBlock < endBlock { - m.logsChan <- &LogsBatch{ - BlockNumber: endBlock, - Logs: nil, - } - } -} - -func (m *ContractMonitor) logToEntity(log types.Log) *entity.Log { - e := &entity.Log{ - ChainID: m.cfg.Chain.ChainID, - Address: log.Address, - Data: log.Data, - BlockNumber: uint(log.BlockNumber), - LogIndex: log.Index, - TransactionHash: log.TxHash, - } - if len(log.Topics) > 0 { - e.Topic0 = &log.Topics[0] - if len(log.Topics) > 1 { - e.Topic1 = &log.Topics[1] - if len(log.Topics) > 2 { - e.Topic2 = &log.Topics[2] - if len(log.Topics) > 3 { - e.Topic3 = &log.Topics[3] - } - } - } - } - return e -} - -func (m *ContractMonitor) StartLogsProcessor(ctx context.Context) { - m.logger.Info("starting logs processor") - for { - select { - case <-ctx.Done(): - return - case logs := <-m.logsChan: - wg := new(sync.WaitGroup) - wg.Add(2) - go func() { - defer wg.Done() - for { - err := m.tryToGetBlockTimestamp(ctx, logs.BlockNumber) - if err != nil { - m.logger.WithError(err).WithFields(logrus.Fields{ - "block_number": logs.BlockNumber, - }).Error("failed to get block timestamp, retrying") - continue - } - return - } - }() - - go func() { - defer wg.Done() - for { - err := m.tryToProcessLogsBatch(ctx, logs) - if err != nil { - m.logger.WithError(err).WithFields(logrus.Fields{ - "block_number": logs.BlockNumber, - "count": len(logs.Logs), - }).Error("failed to process logs batch, retrying") - continue - } - return - } - }() - wg.Wait() - - for { - err := m.recordProcessedBlockNumber(ctx, logs.BlockNumber) - if err != nil { - m.logger.WithError(err).WithField("block_number", logs.BlockNumber). - Error("failed to update latest processed block number, retrying") - if utils.ContextSleep(ctx, 10*time.Second) == nil { - return - } - continue - } - break - } - } - } -} - -func (m *ContractMonitor) tryToGetBlockTimestamp(ctx context.Context, blockNumber uint) error { - ts, err := m.repo.BlockTimestamps.GetByBlockNumber(ctx, m.client.ChainID, blockNumber) - if err != nil { - return err - } - if ts != nil { - m.logger.WithField("block_number", blockNumber).Debug("timestamp already exists, skipping") - return nil - } - m.logger.WithField("block_number", blockNumber).Debug("fetching block timestamp") - header, err := m.client.HeaderByNumber(ctx, blockNumber) - if err != nil { - return err - } - return m.repo.BlockTimestamps.Ensure(ctx, &entity.BlockTimestamp{ - ChainID: m.client.ChainID, - BlockNumber: blockNumber, - Timestamp: time.Unix(int64(header.Time), 0), - }) -} - -func (m *ContractMonitor) tryToProcessLogsBatch(ctx context.Context, logs *LogsBatch) error { - m.logger.WithFields(logrus.Fields{ - "count": len(logs.Logs), - "block_number": logs.BlockNumber, - }).Debug("processing logs batch") - for _, log := range logs.Logs { - event, data, err := m.contract.ParseLog(log) - if err != nil { - return fmt.Errorf("can't parse log: %w", err) - } - handle, ok := m.eventHandlers[event] - if !ok { - if event == "" { - event = log.Topic0.String() - } - m.logger.WithFields(logrus.Fields{ - "event": event, - "log_id": log.ID, - "block_number": log.BlockNumber, - "tx_hash": log.TransactionHash, - "log_index": log.LogIndex, - }).Warn("received unknown event") - continue - } - m.logger.WithFields(logrus.Fields{ - "event": event, - "log_id": log.ID, - }).Trace("handling event") - if err = handle(ctx, log, data); err != nil { - return err - } - } - return nil -} - -func (m *ContractMonitor) recordFetchedBlockNumber(ctx context.Context, blockNumber uint) error { - if blockNumber < m.logsCursor.LastFetchedBlock { - return nil - } - - m.logsCursor.LastFetchedBlock = blockNumber - err := m.repo.LogsCursors.Ensure(ctx, m.logsCursor) - if err != nil { - return err - } - m.fetchedBlockMetric.Set(float64(blockNumber)) - return nil -} - -func (m *ContractMonitor) recordProcessedBlockNumber(ctx context.Context, blockNumber uint) error { - if blockNumber < m.logsCursor.LastProcessedBlock { - return nil - } - - m.logsCursor.LastProcessedBlock = blockNumber - err := m.repo.LogsCursors.Ensure(ctx, m.logsCursor) - if err != nil { - return err - } - m.processedBlockMetric.Set(float64(blockNumber)) - return nil -} diff --git a/monitor/wrappers.go b/monitor/types.go similarity index 100% rename from monitor/wrappers.go rename to monitor/types.go