From 989e270d8664ed421c538a24d39852fda982fad9 Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Sat, 21 May 2022 15:26:28 +0400 Subject: [PATCH] Contract ABI handling refactor --- Dockerfile | 6 ++-- main.go => cmd/monitor/main.go | 11 +++++- config.schema.json | 6 ++-- config.yml | 8 ++++- config/config.go | 32 ++++++++++++++--- contract/bridge_contract.go | 36 +++++++++++++++++++ contract/contract.go | 66 ++++++---------------------------- contract/utils.go | 62 ++++++++++++++++++++++++++++++++ entity/log.go | 17 +++++++++ ethclient/ethclient.go | 65 +++++++++++++++++++-------------- monitor/contract_monitor.go | 40 +++++++++------------ monitor/handlers.go | 4 +-- monitor/monitor.go | 12 ++++--- 13 files changed, 240 insertions(+), 125 deletions(-) rename main.go => cmd/monitor/main.go (80%) create mode 100644 contract/bridge_contract.go create mode 100644 contract/utils.go diff --git a/Dockerfile b/Dockerfile index d9076c0..7ef21fc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ WORKDIR /app COPY . . -RUN go build +RUN mkdir out && go build -o ./out ./cmd/... FROM ubuntu:20.04 @@ -13,8 +13,8 @@ WORKDIR /app RUN apt-get update && apt-get install -y ca-certificates && update-ca-certificates COPY db/migrations ./db/migrations/ -COPY --from=build /app/tokenbridge-monitor ./ +COPY --from=build /app/out/monitor ./ EXPOSE 3333 -ENTRYPOINT ./tokenbridge-monitor +ENTRYPOINT ./monitor diff --git a/main.go b/cmd/monitor/main.go similarity index 80% rename from main.go rename to cmd/monitor/main.go index 3cd0712..4a8e76e 100644 --- a/main.go +++ b/cmd/monitor/main.go @@ -7,6 +7,7 @@ import ( "os/signal" "tokenbridge-monitor/config" "tokenbridge-monitor/db" + "tokenbridge-monitor/ethclient" "tokenbridge-monitor/logging" "tokenbridge-monitor/monitor" "tokenbridge-monitor/presenter" @@ -67,7 +68,15 @@ func main() { } for _, bridgeCfg := range cfg.Bridges { bridgeLogger := logger.WithField("bridge_id", bridgeCfg.ID) - m, err2 := monitor.NewMonitor(ctx, bridgeLogger, dbConn, repo, bridgeCfg) + homeClient, err2 := ethclient.NewClient(bridgeCfg.Home.Chain.RPC.Host, bridgeCfg.Home.Chain.RPC.Timeout, bridgeCfg.Home.Chain.ChainID) + if err2 != nil { + bridgeLogger.WithError(err2).Fatal("can't dial home rpc client") + } + foreignClient, err2 := ethclient.NewClient(bridgeCfg.Foreign.Chain.RPC.Host, bridgeCfg.Foreign.Chain.RPC.Timeout, bridgeCfg.Foreign.Chain.ChainID) + if err2 != nil { + bridgeLogger.WithError(err2).Fatal("can't dial foreign rpc client") + } + m, err2 := monitor.NewMonitor(ctx, bridgeLogger, dbConn, repo, bridgeCfg, homeClient, foreignClient) if err2 != nil { bridgeLogger.WithError(err2).Fatal("can't initialize bridge monitor") } diff --git a/config.schema.json b/config.schema.json index b85cb39..1e5dc5d 100644 --- a/config.schema.json +++ b/config.schema.json @@ -58,8 +58,10 @@ "additionalProperties": { "type": "object", "properties": { - "is_erc_to_native": { - "type": "boolean" + "bridge_mode": { + "type": "string", + "enum": ["AMB", "ERC_TO_NATIVE"], + "default": "AMB" }, "home": { "$ref": "#/$defs/side_config" diff --git a/config.yml b/config.yml index 1841475..dbc7c82 100644 --- a/config.yml +++ b/config.yml @@ -59,7 +59,7 @@ chains: block_index_interval: 60s bridges: xdai: - is_erc_to_native: true + bridge_mode: ERC_TO_NATIVE home: chain: xdai address: 0x7301CFA0e1756B71869E93d4e4Dca5c7d0eb0AA6 @@ -89,6 +89,7 @@ bridges: stuck_erc_to_native_message_confirmation: last_validator_activity: xdai-amb: + bridge_mode: AMB home: chain: xdai address: 0x75Df5AF045d91108662D8080fD1FEFAd6aA0bb59 @@ -120,6 +121,7 @@ bridges: different_information_signatures: last_validator_activity: test-amb: + bridge_mode: AMB home: chain: sokol address: 0xFe446bEF1DbF7AFE24E81e05BC8B271C1BA9a560 @@ -149,6 +151,7 @@ bridges: home_start_block: 21822099 different_information_signatures: bsc-xdai-amb: + bridge_mode: AMB home: chain: xdai address: 0x162E898bD0aacB578C8D5F8d6ca588c13d2A383F @@ -176,6 +179,7 @@ bridges: failed_information_request: different_information_signatures: rinkeby-xdai-amb: + bridge_mode: AMB home: chain: xdai address: 0xc38D4991c951fE8BCE1a12bEef2046eF36b0FA4A @@ -201,6 +205,7 @@ bridges: failed_information_request: different_information_signatures: poa-xdai-amb: + bridge_mode: AMB home: chain: xdai address: 0xc2d77d118326c33BBe36EbeAbf4F7ED6BC2dda5c @@ -226,6 +231,7 @@ bridges: failed_information_request: different_information_signatures: eth-bsc-amb: + bridge_mode: AMB home: chain: bsc address: 0x6943A218d58135793F1FE619414eD476C37ad65a diff --git a/config/config.go b/config/config.go index d547841..96c49a9 100644 --- a/config/config.go +++ b/config/config.go @@ -56,12 +56,19 @@ type BridgeAlertConfig struct { ForeignStartBlock uint `yaml:"foreign_start_block"` } +type BridgeMode string + +const ( + BridgeModeArbitraryMessage BridgeMode = "AMB" + BridgeModeErcToNative BridgeMode = "ERC_TO_NATIVE" +) + type BridgeConfig struct { - ID string `yaml:"-"` - IsErcToNative bool `yaml:"is_erc_to_native"` - Home *BridgeSideConfig `yaml:"home"` - Foreign *BridgeSideConfig `yaml:"foreign"` - Alerts map[string]*BridgeAlertConfig `yaml:"alerts"` + ID string `yaml:"-"` + BridgeMode BridgeMode `yaml:"bridge_mode"` + Home *BridgeSideConfig `yaml:"home"` + Foreign *BridgeSideConfig `yaml:"foreign"` + Alerts map[string]*BridgeAlertConfig `yaml:"alerts"` } type DBConfig struct { @@ -111,6 +118,21 @@ func (cfg *Config) init() error { if bridge.Foreign.MaxBlockRangeSize <= 0 { bridge.Foreign.MaxBlockRangeSize = 1000 } + if len(bridge.Home.ErcToNativeTokens) > 0 { + return fmt.Errorf("non-empty home token address list") + } + switch bridge.BridgeMode { + case BridgeModeErcToNative: + if len(bridge.Foreign.ErcToNativeTokens) == 0 { + return fmt.Errorf("empty foreign token address list in ERC_TO_NATIVE mode") + } + case BridgeModeArbitraryMessage: + default: + bridge.BridgeMode = BridgeModeArbitraryMessage + if len(bridge.Foreign.ErcToNativeTokens) > 0 { + return fmt.Errorf("non-empty foreign token address list in AMB mode") + } + } for _, side := range [2]*BridgeSideConfig{bridge.Home, bridge.Foreign} { chainName := side.ChainName var ok bool diff --git a/contract/bridge_contract.go b/contract/bridge_contract.go new file mode 100644 index 0000000..fe18396 --- /dev/null +++ b/contract/bridge_contract.go @@ -0,0 +1,36 @@ +package contract + +import ( + "context" + "fmt" + "tokenbridge-monitor/config" + "tokenbridge-monitor/contract/abi" + "tokenbridge-monitor/ethclient" + + "github.com/ethereum/go-ethereum/common" +) + +type BridgeContract struct { + *Contract +} + +func NewBridgeContract(client ethclient.Client, addr common.Address, mode config.BridgeMode) *BridgeContract { + var contract *Contract + switch mode { + case config.BridgeModeArbitraryMessage: + contract = NewContract(client, addr, abi.AMB) + case config.BridgeModeErcToNative: + contract = NewContract(client, addr, abi.ERC_TO_NATIVE) + default: + contract = NewContract(client, addr, abi.AMB) + } + return &BridgeContract{contract} +} + +func (c *BridgeContract) ValidatorContractAddress(ctx context.Context) (common.Address, error) { + res, err := c.Call(ctx, "validatorContract") + if err != nil { + return common.Address{}, fmt.Errorf("cannot obtain validator contract address: %w", err) + } + return common.BytesToAddress(res), nil +} diff --git a/contract/contract.go b/contract/contract.go index c9b7580..bd0b538 100644 --- a/contract/contract.go +++ b/contract/contract.go @@ -12,12 +12,12 @@ import ( ) type Contract struct { - Address common.Address - client *ethclient.Client + address common.Address + client ethclient.Client abi abi.ABI } -func NewContract(client *ethclient.Client, addr common.Address, abi abi.ABI) *Contract { +func NewContract(client ethclient.Client, addr common.Address, abi abi.ABI) *Contract { return &Contract{addr, client, abi} } @@ -29,67 +29,21 @@ func (c *Contract) AllEvents() map[string]bool { return events } -func (c *Contract) ValidatorContractAddress(ctx context.Context) (common.Address, error) { - data, err := c.abi.Pack("validatorContract") +func (c *Contract) Call(ctx context.Context, method string, args ...interface{}) ([]byte, error) { + data, err := c.abi.Pack(method, args...) if err != nil { - return common.Address{}, fmt.Errorf("cannot encode abi calldata: %w", err) + return nil, fmt.Errorf("cannot encode abi calldata: %w", err) } res, err := c.client.CallContract(ctx, ethereum.CallMsg{ - To: &c.Address, + To: &c.address, Data: data, }) if err != nil { - return common.Address{}, fmt.Errorf("cannot call validatorContract(): %w", err) + return nil, fmt.Errorf("cannot call %s(...): %w", method, err) } - return common.BytesToAddress(res), nil + return res, nil } func (c *Contract) ParseLog(log *entity.Log) (string, map[string]interface{}, error) { - if log.Topic0 == nil { - return "", nil, fmt.Errorf("cannot process event without topics") - } - topics := make([]common.Hash, 0, 3) - if log.Topic1 != nil { - topics = append(topics, *log.Topic1) - if log.Topic2 != nil { - topics = append(topics, *log.Topic2) - if log.Topic3 != nil { - topics = append(topics, *log.Topic3) - } - } - } - var event *abi.Event - var indexed abi.Arguments - for _, e := range c.abi.Events { - if e.ID == *log.Topic0 { - indexed = Indexed(e.Inputs) - if len(indexed) == len(topics) { - event = &e - break - } - } - } - if event == nil { - return "", nil, nil - } - m := make(map[string]interface{}) - if len(indexed) < len(event.Inputs) { - if err := event.Inputs.UnpackIntoMap(m, log.Data); err != nil { - return "", nil, fmt.Errorf("can't unpack data: %w", err) - } - } - if err := abi.ParseTopicsIntoMap(m, indexed, topics); err != nil { - return "", nil, fmt.Errorf("can't unpack topics: %w", err) - } - return event.String(), m, nil -} - -func Indexed(args abi.Arguments) abi.Arguments { - var indexed abi.Arguments - for _, arg := range args { - if arg.Indexed { - indexed = append(indexed, arg) - } - } - return indexed + return ParseLog(c.abi, log) } diff --git a/contract/utils.go b/contract/utils.go new file mode 100644 index 0000000..455bf2c --- /dev/null +++ b/contract/utils.go @@ -0,0 +1,62 @@ +package contract + +import ( + "fmt" + "tokenbridge-monitor/entity" + + "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" +) + +func Indexed(args abi.Arguments) abi.Arguments { + var indexed abi.Arguments + for _, arg := range args { + if arg.Indexed { + indexed = append(indexed, arg) + } + } + return indexed +} + +func FindMatchingEventABI(contractABI abi.ABI, topics []common.Hash) *abi.Event { + for _, e := range contractABI.Events { + if e.ID == topics[0] { + indexed := Indexed(e.Inputs) + if len(indexed) == len(topics)-1 { + return &e + } + } + } + return nil +} + +func DecodeEventLog(event *abi.Event, topics []common.Hash, data []byte) (map[string]interface{}, error) { + indexed := Indexed(event.Inputs) + m := make(map[string]interface{}) + if len(indexed) < len(event.Inputs) { + if err := event.Inputs.UnpackIntoMap(m, data); err != nil { + return nil, fmt.Errorf("can't unpack data: %w", err) + } + } + if err := abi.ParseTopicsIntoMap(m, indexed, topics[1:]); err != nil { + return nil, fmt.Errorf("can't unpack topics: %w", err) + } + return m, nil +} + +func ParseLog(contractABI abi.ABI, log *entity.Log) (string, map[string]interface{}, error) { + topics := log.Topics() + if len(topics) == 0 { + return "", nil, fmt.Errorf("cannot process event without topics") + } + event := FindMatchingEventABI(contractABI, topics) + if event == nil { + return "", nil, nil + } + + res, err := DecodeEventLog(event, topics, log.Data) + if err != nil { + return "", nil, fmt.Errorf("can't decode event log: %w", err) + } + return event.String(), res, nil +} diff --git a/entity/log.go b/entity/log.go index 4d4ffc5..a892f5a 100644 --- a/entity/log.go +++ b/entity/log.go @@ -55,3 +55,20 @@ func NewLog(chainID string, log types.Log) *Log { } return e } + +func (l *Log) Topics() []common.Hash { + topics := make([]common.Hash, 0, 4) + if l.Topic0 != nil { + topics = append(topics, *l.Topic0) + if l.Topic1 != nil { + topics = append(topics, *l.Topic1) + if l.Topic2 != nil { + topics = append(topics, *l.Topic2) + if l.Topic3 != nil { + topics = append(topics, *l.Topic3) + } + } + } + } + return topics +} diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 0344291..7d653c0 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -14,8 +14,19 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -type Client struct { - ChainID string +type Client interface { + BlockNumber(ctx context.Context) (uint, error) + HeaderByNumber(ctx context.Context, n uint) (*types.Header, error) + FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) + FilterLogsSafe(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) + TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, error) + TransactionReceiptByHash(ctx context.Context, hash common.Hash) (*types.Receipt, error) + CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) + TransactionSender(tx *types.Transaction) (common.Address, error) +} + +type rpcClient struct { + chainID string url string timeout time.Duration rawClient *rpc.Client @@ -23,7 +34,7 @@ type Client struct { signer types.Signer } -func NewClient(url string, timeout time.Duration, chainID string) (*Client, error) { +func NewClient(url string, timeout time.Duration, chainID string) (Client, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -31,8 +42,8 @@ func NewClient(url string, timeout time.Duration, chainID string) (*Client, erro if err != nil { return nil, fmt.Errorf("can't dial JSON rpc url: %w", err) } - client := &Client{ - ChainID: chainID, + client := &rpcClient{ + chainID: chainID, url: url, timeout: timeout, rawClient: rawClient, @@ -51,46 +62,46 @@ func NewClient(url string, timeout time.Duration, chainID string) (*Client, erro return client, nil } -func (c *Client) BlockNumber(ctx context.Context) (uint, error) { - defer ObserveDuration(c.ChainID, c.url, "eth_blockNumber")() +func (c *rpcClient) BlockNumber(ctx context.Context) (uint, error) { + defer ObserveDuration(c.chainID, c.url, "eth_blockNumber")() ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() n, err := c.client.BlockNumber(ctx) - ObserveError(c.ChainID, c.url, "eth_blockNumber", err) + ObserveError(c.chainID, c.url, "eth_blockNumber", err) return uint(n), err } -func (c *Client) HeaderByNumber(ctx context.Context, n uint) (*types.Header, error) { - defer ObserveDuration(c.ChainID, c.url, "eth_getBlockByNumber")() +func (c *rpcClient) HeaderByNumber(ctx context.Context, n uint) (*types.Header, error) { + defer ObserveDuration(c.chainID, c.url, "eth_getBlockByNumber")() ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n))) - ObserveError(c.ChainID, c.url, "eth_getBlockByNumber", err) + ObserveError(c.chainID, c.url, "eth_getBlockByNumber", err) return header, err } -func (c *Client) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { - defer ObserveDuration(c.ChainID, c.url, "eth_getLogs")() +func (c *rpcClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + defer ObserveDuration(c.chainID, c.url, "eth_getLogs")() ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() logs, err := c.client.FilterLogs(ctx, q) - ObserveError(c.ChainID, c.url, "eth_getLogs", err) + ObserveError(c.chainID, c.url, "eth_getLogs", err) return logs, err } // FilterLogsSafe is the same as FilterLogs, but makes an additional eth_blockNumber // request to ensure that the node behind RPC is synced to the needed point. -func (c *Client) FilterLogsSafe(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { - defer ObserveDuration(c.ChainID, c.url, "eth_getLogsSafe")() +func (c *rpcClient) FilterLogsSafe(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + defer ObserveDuration(c.chainID, c.url, "eth_getLogsSafe")() ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() var err error defer func() { - ObserveError(c.ChainID, c.url, "eth_getLogsSafe", err) + ObserveError(c.chainID, c.url, "eth_getLogsSafe", err) }() var arg interface{} @@ -127,37 +138,37 @@ func (c *Client) FilterLogsSafe(ctx context.Context, q ethereum.FilterQuery) ([] return logs, nil } -func (c *Client) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, error) { - defer ObserveDuration(c.ChainID, c.url, "eth_getTransactionByHash")() +func (c *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, error) { + defer ObserveDuration(c.chainID, c.url, "eth_getTransactionByHash")() ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() tx, _, err := c.client.TransactionByHash(ctx, txHash) - ObserveError(c.ChainID, c.url, "eth_getTransactionByHash", err) + ObserveError(c.chainID, c.url, "eth_getTransactionByHash", err) return tx, err } -func (c *Client) TransactionReceiptByHash(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { - defer ObserveDuration(c.ChainID, c.url, "eth_getTransactionReceipt")() +func (c *rpcClient) TransactionReceiptByHash(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + defer ObserveDuration(c.chainID, c.url, "eth_getTransactionReceipt")() ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() receipt, err := c.client.TransactionReceipt(ctx, txHash) - ObserveError(c.ChainID, c.url, "eth_getTransactionReceipt", err) + ObserveError(c.chainID, c.url, "eth_getTransactionReceipt", err) return receipt, err } -func (c *Client) CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) { - defer ObserveDuration(c.ChainID, c.url, "eth_call")() +func (c *rpcClient) CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) { + defer ObserveDuration(c.chainID, c.url, "eth_call")() ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() res, err := c.client.CallContract(ctx, msg, nil) - ObserveError(c.ChainID, c.url, "eth_call", err) + ObserveError(c.chainID, c.url, "eth_call", err) return res, err } -func (c *Client) TransactionSender(tx *types.Transaction) (common.Address, error) { +func (c *rpcClient) TransactionSender(tx *types.Transaction) (common.Address, error) { return c.signer.Sender(tx) } diff --git a/monitor/contract_monitor.go b/monitor/contract_monitor.go index 6ef59ff..7e12227 100644 --- a/monitor/contract_monitor.go +++ b/monitor/contract_monitor.go @@ -12,7 +12,6 @@ import ( "time" "tokenbridge-monitor/config" "tokenbridge-monitor/contract" - "tokenbridge-monitor/contract/abi" "tokenbridge-monitor/entity" "tokenbridge-monitor/ethclient" "tokenbridge-monitor/logging" @@ -37,11 +36,11 @@ type ContractMonitor struct { cfg *config.BridgeSideConfig logger logging.Logger repo *repository.Repo - client *ethclient.Client + client ethclient.Client logsCursor *entity.LogsCursor blocksRangeChan chan *BlocksRange logsChan chan *LogsBatch - contract *contract.Contract + contract *contract.BridgeContract eventHandlers map[string]EventHandler headBlock uint isSynced bool @@ -51,38 +50,31 @@ type ContractMonitor struct { processedBlockMetric prometheus.Gauge } -func NewContractMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, bridgeCfg *config.BridgeConfig, cfg *config.BridgeSideConfig) (*ContractMonitor, error) { - client, err := ethclient.NewClient(cfg.Chain.RPC.Host, cfg.Chain.RPC.Timeout, cfg.Chain.ChainID) - if err != nil { - return nil, fmt.Errorf("failed to start eth client: %w", err) - } - contractAbi := abi.AMB - if bridgeCfg.IsErcToNative { - contractAbi = abi.ERC_TO_NATIVE - } - bridgeContract := contract.NewContract(client, cfg.Address, contractAbi) +func NewContractMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, bridgeCfg *config.BridgeConfig, cfg *config.BridgeSideConfig, client ethclient.Client) (*ContractMonitor, error) { + bridgeContract := contract.NewBridgeContract(client, cfg.Address, bridgeCfg.BridgeMode) if cfg.ValidatorContractAddress == (common.Address{}) { - cfg.ValidatorContractAddress, err = bridgeContract.ValidatorContractAddress(ctx) + addr, err := bridgeContract.ValidatorContractAddress(ctx) if err != nil { return nil, fmt.Errorf("cannot get validator contract address: %w", err) } logger.WithFields(logrus.Fields{ - "chain_id": client.ChainID, + "chain_id": cfg.Chain.ChainID, "bridge_address": cfg.Address, "validator_contract_address": cfg.ValidatorContractAddress, "start_block": cfg.StartBlock, }).Info("obtained validator contract address") + cfg.ValidatorContractAddress = addr } - logsCursor, err := repo.LogsCursors.GetByChainIDAndAddress(ctx, client.ChainID, cfg.Address) + logsCursor, err := repo.LogsCursors.GetByChainIDAndAddress(ctx, cfg.Chain.ChainID, cfg.Address) if err != nil { if errors.Is(err, sql.ErrNoRows) { logger.WithFields(logrus.Fields{ - "chain_id": client.ChainID, + "chain_id": cfg.Chain.ChainID, "address": cfg.Address, "start_block": cfg.StartBlock, }).Warn("contract cursor is not present, staring indexing from scratch") logsCursor = &entity.LogsCursor{ - ChainID: client.ChainID, + ChainID: cfg.Chain.ChainID, Address: cfg.Address, LastFetchedBlock: cfg.StartBlock - 1, LastProcessedBlock: cfg.StartBlock - 1, @@ -93,7 +85,7 @@ func NewContractMonitor(ctx context.Context, logger logging.Logger, repo *reposi } commonLabels := prometheus.Labels{ "bridge_id": bridgeCfg.ID, - "chain_id": client.ChainID, + "chain_id": cfg.Chain.ChainID, "address": cfg.Address.String(), } return &ContractMonitor{ @@ -135,6 +127,8 @@ func (m *ContractMonitor) VerifyEventHandlersABI() error { func (m *ContractMonitor) Start(ctx context.Context) { lastProcessedBlock := m.logsCursor.LastProcessedBlock lastFetchedBlock := m.logsCursor.LastFetchedBlock + m.processedBlockMetric.Set(float64(lastProcessedBlock)) + m.fetchedBlockMetric.Set(float64(lastFetchedBlock)) go m.StartBlockFetcher(ctx, lastFetchedBlock+1) go m.StartLogsProcessor(ctx) m.LoadUnprocessedLogs(ctx, lastProcessedBlock+1, lastFetchedBlock) @@ -149,7 +143,7 @@ func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, to addresses := m.cfg.ContractAddresses(fromBlock, toBlock) for { - logs, err := m.repo.Logs.FindByBlockRange(ctx, m.client.ChainID, addresses, fromBlock, toBlock) + logs, err := m.repo.Logs.FindByBlockRange(ctx, m.cfg.Chain.ChainID, addresses, fromBlock, toBlock) if err != nil { m.logger.WithError(err).Error("can't find unprocessed logs in block range") } else { @@ -272,7 +266,7 @@ func (m *ContractMonitor) buildFilterQueries(blocksRange *BlocksRange) []ethereu q.Topics = [][]common.Hash{{*blocksRange.Topic}} } qs = append(qs, q) - if m.bridgeCfg.IsErcToNative { + if m.bridgeCfg.BridgeMode == config.BridgeModeErcToNative { for _, token := range m.cfg.ErcToNativeTokens { if token.StartBlock > 0 && blocksRange.To < token.StartBlock { continue @@ -437,7 +431,7 @@ func (m *ContractMonitor) StartLogsProcessor(ctx context.Context) { } func (m *ContractMonitor) tryToGetBlockTimestamp(ctx context.Context, blockNumber uint) error { - ts, err := m.repo.BlockTimestamps.GetByBlockNumber(ctx, m.client.ChainID, blockNumber) + ts, err := m.repo.BlockTimestamps.GetByBlockNumber(ctx, m.cfg.Chain.ChainID, blockNumber) if err != nil { return fmt.Errorf("can't get block timestamp from db: %w", err) } @@ -451,7 +445,7 @@ func (m *ContractMonitor) tryToGetBlockTimestamp(ctx context.Context, blockNumbe return fmt.Errorf("can't request block header: %w", err) } return m.repo.BlockTimestamps.Ensure(ctx, &entity.BlockTimestamp{ - ChainID: m.client.ChainID, + ChainID: m.cfg.Chain.ChainID, BlockNumber: blockNumber, Timestamp: time.Unix(int64(header.Time), 0), }) diff --git a/monitor/handlers.go b/monitor/handlers.go index c7fb663..b597a02 100644 --- a/monitor/handlers.go +++ b/monitor/handlers.go @@ -19,11 +19,11 @@ type EventHandler func(ctx context.Context, log *entity.Log, data map[string]int type BridgeEventHandler struct { repo *repository.Repo bridgeID string - homeClient *ethclient.Client + homeClient ethclient.Client cfg *config.BridgeConfig } -func NewBridgeEventHandler(repo *repository.Repo, cfg *config.BridgeConfig, homeClient *ethclient.Client) *BridgeEventHandler { +func NewBridgeEventHandler(repo *repository.Repo, cfg *config.BridgeConfig, homeClient ethclient.Client) *BridgeEventHandler { return &BridgeEventHandler{ repo: repo, bridgeID: cfg.ID, diff --git a/monitor/monitor.go b/monitor/monitor.go index 5c9abb9..71de6ab 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -6,6 +6,7 @@ import ( "tokenbridge-monitor/config" "tokenbridge-monitor/contract/abi" "tokenbridge-monitor/db" + "tokenbridge-monitor/ethclient" "tokenbridge-monitor/logging" "tokenbridge-monitor/monitor/alerts" "tokenbridge-monitor/repository" @@ -20,13 +21,13 @@ type Monitor struct { alertManager *alerts.AlertManager } -func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo *repository.Repo, cfg *config.BridgeConfig) (*Monitor, error) { +func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo *repository.Repo, cfg *config.BridgeConfig, homeClient, foreignClient ethclient.Client) (*Monitor, error) { logger.Info("initializing bridge monitor") - homeMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "home"), repo, cfg, cfg.Home) + homeMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "home"), repo, cfg, cfg.Home, homeClient) if err != nil { return nil, fmt.Errorf("failed to initialize home side monitor: %w", err) } - foreignMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "foreign"), repo, cfg, cfg.Foreign) + foreignMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "foreign"), repo, cfg, cfg.Foreign, foreignClient) if err != nil { return nil, fmt.Errorf("failed to initialize foreign side monitor: %w", err) } @@ -42,9 +43,10 @@ func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo foreignMonitor: foreignMonitor, alertManager: alertManager, } - if cfg.IsErcToNative { + switch cfg.BridgeMode { + case config.BridgeModeErcToNative: monitor.RegisterErcToNativeEventHandlers() - } else { + case config.BridgeModeArbitraryMessage: monitor.RegisterAMBEventHandlers() } err = monitor.homeMonitor.VerifyEventHandlersABI()