Contract ABI handling refactor

This commit is contained in:
Kirill Fedoseev 2022-05-21 15:26:28 +04:00
parent 5871194bd1
commit 989e270d86
13 changed files with 240 additions and 125 deletions

View File

@ -4,7 +4,7 @@ WORKDIR /app
COPY . .
RUN go build
RUN mkdir out && go build -o ./out ./cmd/...
FROM ubuntu:20.04
@ -13,8 +13,8 @@ WORKDIR /app
RUN apt-get update && apt-get install -y ca-certificates && update-ca-certificates
COPY db/migrations ./db/migrations/
COPY --from=build /app/tokenbridge-monitor ./
COPY --from=build /app/out/monitor ./
EXPOSE 3333
ENTRYPOINT ./tokenbridge-monitor
ENTRYPOINT ./monitor

View File

@ -7,6 +7,7 @@ import (
"os/signal"
"tokenbridge-monitor/config"
"tokenbridge-monitor/db"
"tokenbridge-monitor/ethclient"
"tokenbridge-monitor/logging"
"tokenbridge-monitor/monitor"
"tokenbridge-monitor/presenter"
@ -67,7 +68,15 @@ func main() {
}
for _, bridgeCfg := range cfg.Bridges {
bridgeLogger := logger.WithField("bridge_id", bridgeCfg.ID)
m, err2 := monitor.NewMonitor(ctx, bridgeLogger, dbConn, repo, bridgeCfg)
homeClient, err2 := ethclient.NewClient(bridgeCfg.Home.Chain.RPC.Host, bridgeCfg.Home.Chain.RPC.Timeout, bridgeCfg.Home.Chain.ChainID)
if err2 != nil {
bridgeLogger.WithError(err2).Fatal("can't dial home rpc client")
}
foreignClient, err2 := ethclient.NewClient(bridgeCfg.Foreign.Chain.RPC.Host, bridgeCfg.Foreign.Chain.RPC.Timeout, bridgeCfg.Foreign.Chain.ChainID)
if err2 != nil {
bridgeLogger.WithError(err2).Fatal("can't dial foreign rpc client")
}
m, err2 := monitor.NewMonitor(ctx, bridgeLogger, dbConn, repo, bridgeCfg, homeClient, foreignClient)
if err2 != nil {
bridgeLogger.WithError(err2).Fatal("can't initialize bridge monitor")
}

View File

@ -58,8 +58,10 @@
"additionalProperties": {
"type": "object",
"properties": {
"is_erc_to_native": {
"type": "boolean"
"bridge_mode": {
"type": "string",
"enum": ["AMB", "ERC_TO_NATIVE"],
"default": "AMB"
},
"home": {
"$ref": "#/$defs/side_config"

View File

@ -59,7 +59,7 @@ chains:
block_index_interval: 60s
bridges:
xdai:
is_erc_to_native: true
bridge_mode: ERC_TO_NATIVE
home:
chain: xdai
address: 0x7301CFA0e1756B71869E93d4e4Dca5c7d0eb0AA6
@ -89,6 +89,7 @@ bridges:
stuck_erc_to_native_message_confirmation:
last_validator_activity:
xdai-amb:
bridge_mode: AMB
home:
chain: xdai
address: 0x75Df5AF045d91108662D8080fD1FEFAd6aA0bb59
@ -120,6 +121,7 @@ bridges:
different_information_signatures:
last_validator_activity:
test-amb:
bridge_mode: AMB
home:
chain: sokol
address: 0xFe446bEF1DbF7AFE24E81e05BC8B271C1BA9a560
@ -149,6 +151,7 @@ bridges:
home_start_block: 21822099
different_information_signatures:
bsc-xdai-amb:
bridge_mode: AMB
home:
chain: xdai
address: 0x162E898bD0aacB578C8D5F8d6ca588c13d2A383F
@ -176,6 +179,7 @@ bridges:
failed_information_request:
different_information_signatures:
rinkeby-xdai-amb:
bridge_mode: AMB
home:
chain: xdai
address: 0xc38D4991c951fE8BCE1a12bEef2046eF36b0FA4A
@ -201,6 +205,7 @@ bridges:
failed_information_request:
different_information_signatures:
poa-xdai-amb:
bridge_mode: AMB
home:
chain: xdai
address: 0xc2d77d118326c33BBe36EbeAbf4F7ED6BC2dda5c
@ -226,6 +231,7 @@ bridges:
failed_information_request:
different_information_signatures:
eth-bsc-amb:
bridge_mode: AMB
home:
chain: bsc
address: 0x6943A218d58135793F1FE619414eD476C37ad65a

View File

@ -56,12 +56,19 @@ type BridgeAlertConfig struct {
ForeignStartBlock uint `yaml:"foreign_start_block"`
}
type BridgeMode string
const (
BridgeModeArbitraryMessage BridgeMode = "AMB"
BridgeModeErcToNative BridgeMode = "ERC_TO_NATIVE"
)
type BridgeConfig struct {
ID string `yaml:"-"`
IsErcToNative bool `yaml:"is_erc_to_native"`
Home *BridgeSideConfig `yaml:"home"`
Foreign *BridgeSideConfig `yaml:"foreign"`
Alerts map[string]*BridgeAlertConfig `yaml:"alerts"`
ID string `yaml:"-"`
BridgeMode BridgeMode `yaml:"bridge_mode"`
Home *BridgeSideConfig `yaml:"home"`
Foreign *BridgeSideConfig `yaml:"foreign"`
Alerts map[string]*BridgeAlertConfig `yaml:"alerts"`
}
type DBConfig struct {
@ -111,6 +118,21 @@ func (cfg *Config) init() error {
if bridge.Foreign.MaxBlockRangeSize <= 0 {
bridge.Foreign.MaxBlockRangeSize = 1000
}
if len(bridge.Home.ErcToNativeTokens) > 0 {
return fmt.Errorf("non-empty home token address list")
}
switch bridge.BridgeMode {
case BridgeModeErcToNative:
if len(bridge.Foreign.ErcToNativeTokens) == 0 {
return fmt.Errorf("empty foreign token address list in ERC_TO_NATIVE mode")
}
case BridgeModeArbitraryMessage:
default:
bridge.BridgeMode = BridgeModeArbitraryMessage
if len(bridge.Foreign.ErcToNativeTokens) > 0 {
return fmt.Errorf("non-empty foreign token address list in AMB mode")
}
}
for _, side := range [2]*BridgeSideConfig{bridge.Home, bridge.Foreign} {
chainName := side.ChainName
var ok bool

View File

@ -0,0 +1,36 @@
package contract
import (
"context"
"fmt"
"tokenbridge-monitor/config"
"tokenbridge-monitor/contract/abi"
"tokenbridge-monitor/ethclient"
"github.com/ethereum/go-ethereum/common"
)
type BridgeContract struct {
*Contract
}
func NewBridgeContract(client ethclient.Client, addr common.Address, mode config.BridgeMode) *BridgeContract {
var contract *Contract
switch mode {
case config.BridgeModeArbitraryMessage:
contract = NewContract(client, addr, abi.AMB)
case config.BridgeModeErcToNative:
contract = NewContract(client, addr, abi.ERC_TO_NATIVE)
default:
contract = NewContract(client, addr, abi.AMB)
}
return &BridgeContract{contract}
}
func (c *BridgeContract) ValidatorContractAddress(ctx context.Context) (common.Address, error) {
res, err := c.Call(ctx, "validatorContract")
if err != nil {
return common.Address{}, fmt.Errorf("cannot obtain validator contract address: %w", err)
}
return common.BytesToAddress(res), nil
}

View File

@ -12,12 +12,12 @@ import (
)
type Contract struct {
Address common.Address
client *ethclient.Client
address common.Address
client ethclient.Client
abi abi.ABI
}
func NewContract(client *ethclient.Client, addr common.Address, abi abi.ABI) *Contract {
func NewContract(client ethclient.Client, addr common.Address, abi abi.ABI) *Contract {
return &Contract{addr, client, abi}
}
@ -29,67 +29,21 @@ func (c *Contract) AllEvents() map[string]bool {
return events
}
func (c *Contract) ValidatorContractAddress(ctx context.Context) (common.Address, error) {
data, err := c.abi.Pack("validatorContract")
func (c *Contract) Call(ctx context.Context, method string, args ...interface{}) ([]byte, error) {
data, err := c.abi.Pack(method, args...)
if err != nil {
return common.Address{}, fmt.Errorf("cannot encode abi calldata: %w", err)
return nil, fmt.Errorf("cannot encode abi calldata: %w", err)
}
res, err := c.client.CallContract(ctx, ethereum.CallMsg{
To: &c.Address,
To: &c.address,
Data: data,
})
if err != nil {
return common.Address{}, fmt.Errorf("cannot call validatorContract(): %w", err)
return nil, fmt.Errorf("cannot call %s(...): %w", method, err)
}
return common.BytesToAddress(res), nil
return res, nil
}
func (c *Contract) ParseLog(log *entity.Log) (string, map[string]interface{}, error) {
if log.Topic0 == nil {
return "", nil, fmt.Errorf("cannot process event without topics")
}
topics := make([]common.Hash, 0, 3)
if log.Topic1 != nil {
topics = append(topics, *log.Topic1)
if log.Topic2 != nil {
topics = append(topics, *log.Topic2)
if log.Topic3 != nil {
topics = append(topics, *log.Topic3)
}
}
}
var event *abi.Event
var indexed abi.Arguments
for _, e := range c.abi.Events {
if e.ID == *log.Topic0 {
indexed = Indexed(e.Inputs)
if len(indexed) == len(topics) {
event = &e
break
}
}
}
if event == nil {
return "", nil, nil
}
m := make(map[string]interface{})
if len(indexed) < len(event.Inputs) {
if err := event.Inputs.UnpackIntoMap(m, log.Data); err != nil {
return "", nil, fmt.Errorf("can't unpack data: %w", err)
}
}
if err := abi.ParseTopicsIntoMap(m, indexed, topics); err != nil {
return "", nil, fmt.Errorf("can't unpack topics: %w", err)
}
return event.String(), m, nil
}
func Indexed(args abi.Arguments) abi.Arguments {
var indexed abi.Arguments
for _, arg := range args {
if arg.Indexed {
indexed = append(indexed, arg)
}
}
return indexed
return ParseLog(c.abi, log)
}

62
contract/utils.go Normal file
View File

@ -0,0 +1,62 @@
package contract
import (
"fmt"
"tokenbridge-monitor/entity"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
)
func Indexed(args abi.Arguments) abi.Arguments {
var indexed abi.Arguments
for _, arg := range args {
if arg.Indexed {
indexed = append(indexed, arg)
}
}
return indexed
}
func FindMatchingEventABI(contractABI abi.ABI, topics []common.Hash) *abi.Event {
for _, e := range contractABI.Events {
if e.ID == topics[0] {
indexed := Indexed(e.Inputs)
if len(indexed) == len(topics)-1 {
return &e
}
}
}
return nil
}
func DecodeEventLog(event *abi.Event, topics []common.Hash, data []byte) (map[string]interface{}, error) {
indexed := Indexed(event.Inputs)
m := make(map[string]interface{})
if len(indexed) < len(event.Inputs) {
if err := event.Inputs.UnpackIntoMap(m, data); err != nil {
return nil, fmt.Errorf("can't unpack data: %w", err)
}
}
if err := abi.ParseTopicsIntoMap(m, indexed, topics[1:]); err != nil {
return nil, fmt.Errorf("can't unpack topics: %w", err)
}
return m, nil
}
func ParseLog(contractABI abi.ABI, log *entity.Log) (string, map[string]interface{}, error) {
topics := log.Topics()
if len(topics) == 0 {
return "", nil, fmt.Errorf("cannot process event without topics")
}
event := FindMatchingEventABI(contractABI, topics)
if event == nil {
return "", nil, nil
}
res, err := DecodeEventLog(event, topics, log.Data)
if err != nil {
return "", nil, fmt.Errorf("can't decode event log: %w", err)
}
return event.String(), res, nil
}

View File

@ -55,3 +55,20 @@ func NewLog(chainID string, log types.Log) *Log {
}
return e
}
func (l *Log) Topics() []common.Hash {
topics := make([]common.Hash, 0, 4)
if l.Topic0 != nil {
topics = append(topics, *l.Topic0)
if l.Topic1 != nil {
topics = append(topics, *l.Topic1)
if l.Topic2 != nil {
topics = append(topics, *l.Topic2)
if l.Topic3 != nil {
topics = append(topics, *l.Topic3)
}
}
}
}
return topics
}

View File

@ -14,8 +14,19 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)
type Client struct {
ChainID string
type Client interface {
BlockNumber(ctx context.Context) (uint, error)
HeaderByNumber(ctx context.Context, n uint) (*types.Header, error)
FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
FilterLogsSafe(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error)
TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, error)
TransactionReceiptByHash(ctx context.Context, hash common.Hash) (*types.Receipt, error)
CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error)
TransactionSender(tx *types.Transaction) (common.Address, error)
}
type rpcClient struct {
chainID string
url string
timeout time.Duration
rawClient *rpc.Client
@ -23,7 +34,7 @@ type Client struct {
signer types.Signer
}
func NewClient(url string, timeout time.Duration, chainID string) (*Client, error) {
func NewClient(url string, timeout time.Duration, chainID string) (Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
@ -31,8 +42,8 @@ func NewClient(url string, timeout time.Duration, chainID string) (*Client, erro
if err != nil {
return nil, fmt.Errorf("can't dial JSON rpc url: %w", err)
}
client := &Client{
ChainID: chainID,
client := &rpcClient{
chainID: chainID,
url: url,
timeout: timeout,
rawClient: rawClient,
@ -51,46 +62,46 @@ func NewClient(url string, timeout time.Duration, chainID string) (*Client, erro
return client, nil
}
func (c *Client) BlockNumber(ctx context.Context) (uint, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_blockNumber")()
func (c *rpcClient) BlockNumber(ctx context.Context) (uint, error) {
defer ObserveDuration(c.chainID, c.url, "eth_blockNumber")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
n, err := c.client.BlockNumber(ctx)
ObserveError(c.ChainID, c.url, "eth_blockNumber", err)
ObserveError(c.chainID, c.url, "eth_blockNumber", err)
return uint(n), err
}
func (c *Client) HeaderByNumber(ctx context.Context, n uint) (*types.Header, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_getBlockByNumber")()
func (c *rpcClient) HeaderByNumber(ctx context.Context, n uint) (*types.Header, error) {
defer ObserveDuration(c.chainID, c.url, "eth_getBlockByNumber")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
header, err := c.client.HeaderByNumber(ctx, big.NewInt(int64(n)))
ObserveError(c.ChainID, c.url, "eth_getBlockByNumber", err)
ObserveError(c.chainID, c.url, "eth_getBlockByNumber", err)
return header, err
}
func (c *Client) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_getLogs")()
func (c *rpcClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
defer ObserveDuration(c.chainID, c.url, "eth_getLogs")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
logs, err := c.client.FilterLogs(ctx, q)
ObserveError(c.ChainID, c.url, "eth_getLogs", err)
ObserveError(c.chainID, c.url, "eth_getLogs", err)
return logs, err
}
// FilterLogsSafe is the same as FilterLogs, but makes an additional eth_blockNumber
// request to ensure that the node behind RPC is synced to the needed point.
func (c *Client) FilterLogsSafe(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_getLogsSafe")()
func (c *rpcClient) FilterLogsSafe(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
defer ObserveDuration(c.chainID, c.url, "eth_getLogsSafe")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
var err error
defer func() {
ObserveError(c.ChainID, c.url, "eth_getLogsSafe", err)
ObserveError(c.chainID, c.url, "eth_getLogsSafe", err)
}()
var arg interface{}
@ -127,37 +138,37 @@ func (c *Client) FilterLogsSafe(ctx context.Context, q ethereum.FilterQuery) ([]
return logs, nil
}
func (c *Client) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_getTransactionByHash")()
func (c *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, error) {
defer ObserveDuration(c.chainID, c.url, "eth_getTransactionByHash")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
tx, _, err := c.client.TransactionByHash(ctx, txHash)
ObserveError(c.ChainID, c.url, "eth_getTransactionByHash", err)
ObserveError(c.chainID, c.url, "eth_getTransactionByHash", err)
return tx, err
}
func (c *Client) TransactionReceiptByHash(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_getTransactionReceipt")()
func (c *rpcClient) TransactionReceiptByHash(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
defer ObserveDuration(c.chainID, c.url, "eth_getTransactionReceipt")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
receipt, err := c.client.TransactionReceipt(ctx, txHash)
ObserveError(c.ChainID, c.url, "eth_getTransactionReceipt", err)
ObserveError(c.chainID, c.url, "eth_getTransactionReceipt", err)
return receipt, err
}
func (c *Client) CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_call")()
func (c *rpcClient) CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) {
defer ObserveDuration(c.chainID, c.url, "eth_call")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
res, err := c.client.CallContract(ctx, msg, nil)
ObserveError(c.ChainID, c.url, "eth_call", err)
ObserveError(c.chainID, c.url, "eth_call", err)
return res, err
}
func (c *Client) TransactionSender(tx *types.Transaction) (common.Address, error) {
func (c *rpcClient) TransactionSender(tx *types.Transaction) (common.Address, error) {
return c.signer.Sender(tx)
}

View File

@ -12,7 +12,6 @@ import (
"time"
"tokenbridge-monitor/config"
"tokenbridge-monitor/contract"
"tokenbridge-monitor/contract/abi"
"tokenbridge-monitor/entity"
"tokenbridge-monitor/ethclient"
"tokenbridge-monitor/logging"
@ -37,11 +36,11 @@ type ContractMonitor struct {
cfg *config.BridgeSideConfig
logger logging.Logger
repo *repository.Repo
client *ethclient.Client
client ethclient.Client
logsCursor *entity.LogsCursor
blocksRangeChan chan *BlocksRange
logsChan chan *LogsBatch
contract *contract.Contract
contract *contract.BridgeContract
eventHandlers map[string]EventHandler
headBlock uint
isSynced bool
@ -51,38 +50,31 @@ type ContractMonitor struct {
processedBlockMetric prometheus.Gauge
}
func NewContractMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, bridgeCfg *config.BridgeConfig, cfg *config.BridgeSideConfig) (*ContractMonitor, error) {
client, err := ethclient.NewClient(cfg.Chain.RPC.Host, cfg.Chain.RPC.Timeout, cfg.Chain.ChainID)
if err != nil {
return nil, fmt.Errorf("failed to start eth client: %w", err)
}
contractAbi := abi.AMB
if bridgeCfg.IsErcToNative {
contractAbi = abi.ERC_TO_NATIVE
}
bridgeContract := contract.NewContract(client, cfg.Address, contractAbi)
func NewContractMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, bridgeCfg *config.BridgeConfig, cfg *config.BridgeSideConfig, client ethclient.Client) (*ContractMonitor, error) {
bridgeContract := contract.NewBridgeContract(client, cfg.Address, bridgeCfg.BridgeMode)
if cfg.ValidatorContractAddress == (common.Address{}) {
cfg.ValidatorContractAddress, err = bridgeContract.ValidatorContractAddress(ctx)
addr, err := bridgeContract.ValidatorContractAddress(ctx)
if err != nil {
return nil, fmt.Errorf("cannot get validator contract address: %w", err)
}
logger.WithFields(logrus.Fields{
"chain_id": client.ChainID,
"chain_id": cfg.Chain.ChainID,
"bridge_address": cfg.Address,
"validator_contract_address": cfg.ValidatorContractAddress,
"start_block": cfg.StartBlock,
}).Info("obtained validator contract address")
cfg.ValidatorContractAddress = addr
}
logsCursor, err := repo.LogsCursors.GetByChainIDAndAddress(ctx, client.ChainID, cfg.Address)
logsCursor, err := repo.LogsCursors.GetByChainIDAndAddress(ctx, cfg.Chain.ChainID, cfg.Address)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
logger.WithFields(logrus.Fields{
"chain_id": client.ChainID,
"chain_id": cfg.Chain.ChainID,
"address": cfg.Address,
"start_block": cfg.StartBlock,
}).Warn("contract cursor is not present, staring indexing from scratch")
logsCursor = &entity.LogsCursor{
ChainID: client.ChainID,
ChainID: cfg.Chain.ChainID,
Address: cfg.Address,
LastFetchedBlock: cfg.StartBlock - 1,
LastProcessedBlock: cfg.StartBlock - 1,
@ -93,7 +85,7 @@ func NewContractMonitor(ctx context.Context, logger logging.Logger, repo *reposi
}
commonLabels := prometheus.Labels{
"bridge_id": bridgeCfg.ID,
"chain_id": client.ChainID,
"chain_id": cfg.Chain.ChainID,
"address": cfg.Address.String(),
}
return &ContractMonitor{
@ -135,6 +127,8 @@ func (m *ContractMonitor) VerifyEventHandlersABI() error {
func (m *ContractMonitor) Start(ctx context.Context) {
lastProcessedBlock := m.logsCursor.LastProcessedBlock
lastFetchedBlock := m.logsCursor.LastFetchedBlock
m.processedBlockMetric.Set(float64(lastProcessedBlock))
m.fetchedBlockMetric.Set(float64(lastFetchedBlock))
go m.StartBlockFetcher(ctx, lastFetchedBlock+1)
go m.StartLogsProcessor(ctx)
m.LoadUnprocessedLogs(ctx, lastProcessedBlock+1, lastFetchedBlock)
@ -149,7 +143,7 @@ func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, to
addresses := m.cfg.ContractAddresses(fromBlock, toBlock)
for {
logs, err := m.repo.Logs.FindByBlockRange(ctx, m.client.ChainID, addresses, fromBlock, toBlock)
logs, err := m.repo.Logs.FindByBlockRange(ctx, m.cfg.Chain.ChainID, addresses, fromBlock, toBlock)
if err != nil {
m.logger.WithError(err).Error("can't find unprocessed logs in block range")
} else {
@ -272,7 +266,7 @@ func (m *ContractMonitor) buildFilterQueries(blocksRange *BlocksRange) []ethereu
q.Topics = [][]common.Hash{{*blocksRange.Topic}}
}
qs = append(qs, q)
if m.bridgeCfg.IsErcToNative {
if m.bridgeCfg.BridgeMode == config.BridgeModeErcToNative {
for _, token := range m.cfg.ErcToNativeTokens {
if token.StartBlock > 0 && blocksRange.To < token.StartBlock {
continue
@ -437,7 +431,7 @@ func (m *ContractMonitor) StartLogsProcessor(ctx context.Context) {
}
func (m *ContractMonitor) tryToGetBlockTimestamp(ctx context.Context, blockNumber uint) error {
ts, err := m.repo.BlockTimestamps.GetByBlockNumber(ctx, m.client.ChainID, blockNumber)
ts, err := m.repo.BlockTimestamps.GetByBlockNumber(ctx, m.cfg.Chain.ChainID, blockNumber)
if err != nil {
return fmt.Errorf("can't get block timestamp from db: %w", err)
}
@ -451,7 +445,7 @@ func (m *ContractMonitor) tryToGetBlockTimestamp(ctx context.Context, blockNumbe
return fmt.Errorf("can't request block header: %w", err)
}
return m.repo.BlockTimestamps.Ensure(ctx, &entity.BlockTimestamp{
ChainID: m.client.ChainID,
ChainID: m.cfg.Chain.ChainID,
BlockNumber: blockNumber,
Timestamp: time.Unix(int64(header.Time), 0),
})

View File

@ -19,11 +19,11 @@ type EventHandler func(ctx context.Context, log *entity.Log, data map[string]int
type BridgeEventHandler struct {
repo *repository.Repo
bridgeID string
homeClient *ethclient.Client
homeClient ethclient.Client
cfg *config.BridgeConfig
}
func NewBridgeEventHandler(repo *repository.Repo, cfg *config.BridgeConfig, homeClient *ethclient.Client) *BridgeEventHandler {
func NewBridgeEventHandler(repo *repository.Repo, cfg *config.BridgeConfig, homeClient ethclient.Client) *BridgeEventHandler {
return &BridgeEventHandler{
repo: repo,
bridgeID: cfg.ID,

View File

@ -6,6 +6,7 @@ import (
"tokenbridge-monitor/config"
"tokenbridge-monitor/contract/abi"
"tokenbridge-monitor/db"
"tokenbridge-monitor/ethclient"
"tokenbridge-monitor/logging"
"tokenbridge-monitor/monitor/alerts"
"tokenbridge-monitor/repository"
@ -20,13 +21,13 @@ type Monitor struct {
alertManager *alerts.AlertManager
}
func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo *repository.Repo, cfg *config.BridgeConfig) (*Monitor, error) {
func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo *repository.Repo, cfg *config.BridgeConfig, homeClient, foreignClient ethclient.Client) (*Monitor, error) {
logger.Info("initializing bridge monitor")
homeMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "home"), repo, cfg, cfg.Home)
homeMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "home"), repo, cfg, cfg.Home, homeClient)
if err != nil {
return nil, fmt.Errorf("failed to initialize home side monitor: %w", err)
}
foreignMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "foreign"), repo, cfg, cfg.Foreign)
foreignMonitor, err := NewContractMonitor(ctx, logger.WithField("contract", "foreign"), repo, cfg, cfg.Foreign, foreignClient)
if err != nil {
return nil, fmt.Errorf("failed to initialize foreign side monitor: %w", err)
}
@ -42,9 +43,10 @@ func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo
foreignMonitor: foreignMonitor,
alertManager: alertManager,
}
if cfg.IsErcToNative {
switch cfg.BridgeMode {
case config.BridgeModeErcToNative:
monitor.RegisterErcToNativeEventHandlers()
} else {
case config.BridgeModeArbitraryMessage:
monitor.RegisterAMBEventHandlers()
}
err = monitor.homeMonitor.VerifyEventHandlersABI()