From 9f24cdc4262d1431b7cbcfa41e7dd7668eafacb7 Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Mon, 14 Mar 2022 13:43:14 +0400 Subject: [PATCH] Listen for validator set update events --- Dockerfile | 2 +- config/config.go | 21 +- contract/constants/amb.json | 340 ++++++++++++++++++ contract/contract.go | 17 + db/db.go | 6 +- .../000003_bridge_validators.down.sql | 2 + db/migrations/000003_bridge_validators.up.sql | 11 + docker-compose.dev.yml | 2 + entity/bridge_validator.go | 24 ++ entity/log.go | 1 - entity/signed_message.go | 1 + ethclient/ethclient.go | 10 + main.go | 2 +- monitor/handlers.go | 23 ++ monitor/monitor.go | 97 +++-- monitor/wrappers.go | 7 +- presenter/presenter.go | 85 ++++- presenter/types.go | 17 + repository/postgres/bridge_validators.go | 84 +++++ repository/postgres/logs.go | 20 -- repository/postgres/signed_messages.go | 23 ++ repository/repos.go | 2 + 22 files changed, 723 insertions(+), 74 deletions(-) create mode 100644 db/migrations/000003_bridge_validators.down.sql create mode 100644 db/migrations/000003_bridge_validators.up.sql create mode 100644 entity/bridge_validator.go create mode 100644 repository/postgres/bridge_validators.go diff --git a/Dockerfile b/Dockerfile index 1d9d9dd..c11f160 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ COPY . . RUN go build -FROM ubuntu:20.10 +FROM ubuntu:20.04 WORKDIR /app diff --git a/config/config.go b/config/config.go index 0968947..264ce83 100644 --- a/config/config.go +++ b/config/config.go @@ -25,14 +25,21 @@ type ChainConfig struct { SafeLogsRequest bool `yaml:"safe_logs_request"` } +type ReloadJobConfig struct { + Event string `yaml:"event"` + StartBlock uint `yaml:"start_block"` + EndBlock uint `yaml:"end_block"` +} + type BridgeSideConfig struct { - ChainName string `yaml:"chain"` - Chain *ChainConfig `yaml:"-"` - Address common.Address `yaml:"address"` - StartBlock uint `yaml:"start_block"` - BlockConfirmations uint `yaml:"required_block_confirmations"` - MaxBlockRangeSize uint `yaml:"max_block_range_size"` - ReloadEvents []string `yaml:"reload_events"` + ChainName string `yaml:"chain"` + Chain *ChainConfig `yaml:"-"` + Address common.Address `yaml:"address"` + ValidatorContractAddress common.Address `yaml:"validator_contract_address"` + StartBlock uint `yaml:"start_block"` + BlockConfirmations uint `yaml:"required_block_confirmations"` + MaxBlockRangeSize uint `yaml:"max_block_range_size"` + RefetchEvents []*ReloadJobConfig `yaml:"refetch_events"` } type BridgeAlertConfig struct { diff --git a/contract/constants/amb.json b/contract/constants/amb.json index c26710a..d15f0b7 100644 --- a/contract/constants/amb.json +++ b/contract/constants/amb.json @@ -1166,5 +1166,345 @@ ], "name": "RelayedMessage", "type": "event" + }, + { + "constant": true, + "inputs": [], + "name": "validatorCount", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "getBridgeValidatorsInterfacesVersion", + "outputs": [ + { + "name": "major", + "type": "uint64" + }, + { + "name": "minor", + "type": "uint64" + }, + { + "name": "patch", + "type": "uint64" + } + ], + "payable": false, + "stateMutability": "pure", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "isInitialized", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "validatorList", + "outputs": [ + { + "name": "", + "type": "address[]" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_requiredSignatures", + "type": "uint256" + } + ], + "name": "setRequiredSignatures", + "outputs": [], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "requiredSignatures", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_address", + "type": "address" + } + ], + "name": "getNextValidator", + "outputs": [ + { + "name": "", + "type": "address" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "owner", + "outputs": [ + { + "name": "", + "type": "address" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_validator", + "type": "address" + } + ], + "name": "isValidatorDuty", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "deployedAtBlock", + "outputs": [ + { + "name": "", + "type": "uint256" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": true, + "inputs": [], + "name": "F_ADDR", + "outputs": [ + { + "name": "", + "type": "address" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "newOwner", + "type": "address" + } + ], + "name": "transferOwnership", + "outputs": [], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_validator", + "type": "address" + } + ], + "name": "isValidator", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "validator", + "type": "address" + } + ], + "name": "ValidatorAdded", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": true, + "name": "validator", + "type": "address" + } + ], + "name": "ValidatorRemoved", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "name": "requiredSignatures", + "type": "uint256" + } + ], + "name": "RequiredSignaturesChanged", + "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "name": "previousOwner", + "type": "address" + }, + { + "indexed": false, + "name": "newOwner", + "type": "address" + } + ], + "name": "OwnershipTransferred", + "type": "event" + }, + { + "constant": false, + "inputs": [ + { + "name": "_requiredSignatures", + "type": "uint256" + }, + { + "name": "_initialValidators", + "type": "address[]" + }, + { + "name": "_initialRewards", + "type": "address[]" + }, + { + "name": "_owner", + "type": "address" + } + ], + "name": "initialize", + "outputs": [ + { + "name": "", + "type": "bool" + } + ], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_validator", + "type": "address" + }, + { + "name": "_reward", + "type": "address" + } + ], + "name": "addRewardableValidator", + "outputs": [], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": false, + "inputs": [ + { + "name": "_validator", + "type": "address" + } + ], + "name": "removeValidator", + "outputs": [], + "payable": false, + "stateMutability": "nonpayable", + "type": "function" + }, + { + "constant": true, + "inputs": [ + { + "name": "_validator", + "type": "address" + } + ], + "name": "getValidatorRewardAddress", + "outputs": [ + { + "name": "", + "type": "address" + } + ], + "payable": false, + "stateMutability": "view", + "type": "function" } ] \ No newline at end of file diff --git a/contract/contract.go b/contract/contract.go index 9cf6b18..159c5e4 100644 --- a/contract/contract.go +++ b/contract/contract.go @@ -4,8 +4,10 @@ import ( "amb-monitor/entity" "amb-monitor/ethclient" "bytes" + "context" "fmt" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" ) @@ -25,6 +27,21 @@ func (c *Contract) HasEvent(event string) bool { return ok } +func (c *Contract) ValidatorContractAddress(ctx context.Context) (common.Address, error) { + data, err := c.abi.Pack("validatorContract") + if err != nil { + return common.Address{}, fmt.Errorf("cannot encode abi calldata: %w", err) + } + res, err := c.client.CallContract(ctx, ethereum.CallMsg{ + To: &c.Address, + Data: data, + }) + if err != nil { + return common.Address{}, fmt.Errorf("cannot call validatorContract(): %w", err) + } + return common.BytesToAddress(res), nil +} + func (c *Contract) ParseLog(log *entity.Log) (string, map[string]interface{}, error) { if log.Topic0 == nil { return "", nil, fmt.Errorf("cannot process event without topics") diff --git a/db/db.go b/db/db.go index bb5a0d5..c8238ba 100644 --- a/db/db.go +++ b/db/db.go @@ -17,8 +17,8 @@ import ( ) type DB struct { - cfg *config.DBConfig - db *sqlx.DB + cfg *config.DBConfig + db *sqlx.DB } func (db *DB) Migrate() error { @@ -39,7 +39,7 @@ func (db *DB) dbURL(prefix string) string { func NewDB(cfg *config.DBConfig) (*DB, error) { db := &DB{ - cfg: cfg, + cfg: cfg, } conn, err := sqlx.ConnectContext(context.Background(), "pgx", db.dbURL("postgres")) if err != nil { diff --git a/db/migrations/000003_bridge_validators.down.sql b/db/migrations/000003_bridge_validators.down.sql new file mode 100644 index 0000000..d6a7f3f --- /dev/null +++ b/db/migrations/000003_bridge_validators.down.sql @@ -0,0 +1,2 @@ +DROP TABLE bridge_validators; +DROP INDEX signed_messages_bridge_id_signer_idx; \ No newline at end of file diff --git a/db/migrations/000003_bridge_validators.up.sql b/db/migrations/000003_bridge_validators.up.sql new file mode 100644 index 0000000..54f3024 --- /dev/null +++ b/db/migrations/000003_bridge_validators.up.sql @@ -0,0 +1,11 @@ +CREATE TABLE bridge_validators +( + log_id SERIAL REFERENCES logs PRIMARY KEY, + bridge_id TEXT_ID, + chain_id CHAIN_ID, + address ADDRESS, + removed_log_id INT REFERENCES logs NULL, + updated_at TS_NOW, + created_at TS_NOW +); +CREATE INDEX signed_messages_bridge_id_signer_idx ON signed_messages (bridge_id, signer); \ No newline at end of file diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index d14780d..a3c0799 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -14,6 +14,8 @@ services: build: . env_file: - .env + ports: + - "3333:3333" volumes: - ./config.yml:/app/config.yml grafana: diff --git a/entity/bridge_validator.go b/entity/bridge_validator.go new file mode 100644 index 0000000..94a2ee3 --- /dev/null +++ b/entity/bridge_validator.go @@ -0,0 +1,24 @@ +package entity + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +type BridgeValidator struct { + LogID uint `db:"log_id"` + BridgeID string `db:"bridge_id"` + ChainID string `db:"chain_id"` + Address common.Address `db:"address"` + RemovedLogID *uint `db:"removed_log_id"` + CreatedAt *time.Time `db:"created_at"` + UpdatedAt *time.Time `db:"updated_at"` +} + +type BridgeValidatorsRepo interface { + Ensure(ctx context.Context, val *BridgeValidator) error + FindActiveValidator(ctx context.Context, bridgeID, chainID string, address common.Address) (*BridgeValidator, error) + FindActiveValidators(ctx context.Context, bridgeID string) ([]*BridgeValidator, error) +} diff --git a/entity/log.go b/entity/log.go index c948ba2..f1cfcfd 100644 --- a/entity/log.go +++ b/entity/log.go @@ -28,6 +28,5 @@ type LogsRepo interface { GetByID(ctx context.Context, id uint) (*Log, error) FindByBlockRange(ctx context.Context, chainID string, addr common.Address, fromBlock, toBlock uint) ([]*Log, error) FindByBlockNumber(ctx context.Context, chainID string, block uint) ([]*Log, error) - FindByTopicAndBlockRange(ctx context.Context, chainID string, addr common.Address, fromBlock, toBlock uint, topic common.Hash) ([]*Log, error) FindByTxHash(ctx context.Context, txHash common.Hash) ([]*Log, error) } diff --git a/entity/signed_message.go b/entity/signed_message.go index 1a81a64..ddfa62b 100644 --- a/entity/signed_message.go +++ b/entity/signed_message.go @@ -20,4 +20,5 @@ type SignedMessagesRepo interface { Ensure(ctx context.Context, msg *SignedMessage) error FindByLogID(ctx context.Context, logID uint) (*SignedMessage, error) FindByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) ([]*SignedMessage, error) + FindLatest(ctx context.Context, bridgeID, chainID string, signer common.Address) (*SignedMessage, error) } diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index dbf1e10..8fd723a 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -135,6 +135,16 @@ func (c *Client) TransactionByHash(ctx context.Context, txHash common.Hash) (*ty return tx, err } +func (c *Client) CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) { + defer ObserveDuration(c.ChainID, c.url, "eth_call")() + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + res, err := c.client.CallContract(ctx, msg, nil) + ObserveError(c.ChainID, c.url, "eth_call", err) + return res, err +} + func toFilterArg(q ethereum.FilterQuery) (interface{}, error) { arg := map[string]interface{}{ "address": q.Addresses, diff --git a/main.go b/main.go index 036ab99..16aa5b8 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ func main() { repo := repository.NewRepo(dbConn) if cfg.Presenter != nil { - pr := presenter.NewPresenter(logger.WithField("service", "presenter"), repo) + pr := presenter.NewPresenter(logger.WithField("service", "presenter"), repo, cfg.Bridges) go func() { err := pr.Serve(cfg.Presenter.Host) if err != nil { diff --git a/monitor/handlers.go b/monitor/handlers.go index 1ccc95a..433f121 100644 --- a/monitor/handlers.go +++ b/monitor/handlers.go @@ -209,3 +209,26 @@ func (p *BridgeEventHandler) HandleInformationRetrieved(ctx context.Context, log Data: unmarshalConfirmInformationResult(tx.Data()), }) } + +func (p *BridgeEventHandler) HandleValidatorAdded(ctx context.Context, log *entity.Log, data map[string]interface{}) error { + validator := data["validator"].(common.Address) + return p.repo.BridgeValidators.Ensure(ctx, &entity.BridgeValidator{ + LogID: log.ID, + BridgeID: p.bridgeID, + ChainID: log.ChainID, + Address: validator, + }) +} + +func (p *BridgeEventHandler) HandleValidatorRemoved(ctx context.Context, log *entity.Log, data map[string]interface{}) error { + validator := data["validator"].(common.Address) + val, err := p.repo.BridgeValidators.FindActiveValidator(ctx, p.bridgeID, log.ChainID, validator) + if err != nil { + return err + } + if val == nil { + return nil + } + val.RemovedLogID = &log.ID + return p.repo.BridgeValidators.Ensure(ctx, val) +} diff --git a/monitor/monitor.go b/monitor/monitor.go index c2255b0..bcace28 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -62,6 +62,19 @@ func newContractMonitor(ctx context.Context, logger logging.Logger, repo *reposi if err != nil { return nil, fmt.Errorf("failed to start eth client: %w", err) } + bridgeContract := contract.NewContract(client, cfg.Address, constants.AMB) + if cfg.ValidatorContractAddress == (common.Address{}) { + cfg.ValidatorContractAddress, err = bridgeContract.ValidatorContractAddress(ctx) + if err != nil { + return nil, fmt.Errorf("cannot get validator contract address: %w", err) + } + logger.WithFields(logrus.Fields{ + "chain_id": client.ChainID, + "bridge_address": cfg.Address, + "validator_contract_address": cfg.ValidatorContractAddress, + "start_block": cfg.StartBlock, + }).Info("obtained validator contract address") + } logsCursor, err := repo.LogsCursors.GetByChainIDAndAddress(ctx, client.ChainID, cfg.Address) if err != nil { if errors.Is(err, sql.ErrNoRows) { @@ -93,8 +106,8 @@ func newContractMonitor(ctx context.Context, logger logging.Logger, repo *reposi logsCursor: logsCursor, blocksRangeChan: make(chan *BlocksRange, 10), logsChan: make(chan *LogsBatch, 200), - contract: contract.NewContract(client, cfg.Address, constants.AMB), - eventHandlers: make(map[string]EventHandler, 10), + contract: bridgeContract, + eventHandlers: make(map[string]EventHandler, 12), syncedMetric: SyncedContract.With(commonLabels), headBlockMetric: LatestHeadBlock.With(commonLabels), fetchedBlockMetric: LatestFetchedBlock.With(commonLabels), @@ -127,10 +140,14 @@ func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo homeMonitor.eventHandlers["UserRequestForInformation"] = handlers.HandleUserRequestForInformation homeMonitor.eventHandlers["SignedForInformation"] = handlers.HandleSignedForInformation homeMonitor.eventHandlers["InformationRetrieved"] = handlers.HandleInformationRetrieved + homeMonitor.eventHandlers["ValidatorAdded"] = handlers.HandleValidatorAdded + homeMonitor.eventHandlers["ValidatorRemoved"] = handlers.HandleValidatorRemoved foreignMonitor.eventHandlers["UserRequestForAffirmation"] = handlers.HandleUserRequestForAffirmation foreignMonitor.eventHandlers["UserRequestForAffirmation0"] = handlers.HandleLegacyUserRequestForAffirmation foreignMonitor.eventHandlers["RelayedMessage"] = handlers.HandleRelayedMessage foreignMonitor.eventHandlers["RelayedMessage0"] = handlers.HandleRelayedMessage + foreignMonitor.eventHandlers["ValidatorAdded"] = handlers.HandleValidatorAdded + foreignMonitor.eventHandlers["ValidatorRemoved"] = handlers.HandleValidatorRemoved return &Monitor{ cfg: cfg, logger: logger, @@ -166,41 +183,16 @@ func (m *ContractMonitor) Start(ctx context.Context) { lastFetchedBlock := m.logsCursor.LastFetchedBlock go m.StartBlockFetcher(ctx, lastFetchedBlock+1) go m.StartLogsProcessor(ctx) - m.LoadUnprocessedLogs(ctx, m.cfg.ReloadEvents, lastProcessedBlock+1, lastFetchedBlock) + m.LoadUnprocessedLogs(ctx, lastProcessedBlock+1, lastFetchedBlock) go m.StartLogsFetcher(ctx) } -func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, reloadEvents []string, fromBlock, toBlock uint) { +func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, toBlock uint) { m.logger.WithFields(logrus.Fields{ - "from_block": fromBlock, - "to_block": toBlock, - "manual_reload_count": len(reloadEvents), + "from_block": fromBlock, + "to_block": toBlock, }).Info("loading fetched but not yet processed blocks") - for _, event := range reloadEvents { - topic := crypto.Keccak256Hash([]byte(event)) - - m.logger.WithFields(logrus.Fields{ - "from_block": m.cfg.StartBlock, - "to_block": fromBlock, - "event": event, - "topic": topic, - }).Info("manually reloading events") - for { - logs, err := m.repo.Logs.FindByTopicAndBlockRange(ctx, m.client.ChainID, m.cfg.Address, m.cfg.StartBlock, fromBlock, topic) - if err != nil { - m.logger.WithError(err).Error("can't find manually reloaded logs in block range") - if utils.ContextSleep(ctx, 10*time.Second) == nil { - return - } - continue - } - - m.submitLogs(logs, toBlock) - break - } - } - var logs []*entity.Log for { var err error @@ -237,6 +229,11 @@ func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) { } m.headBlockMetric.Set(float64(m.headBlock)) + if len(m.cfg.RefetchEvents) > 0 { + m.RefetchEvents(start - 1) + m.cfg.RefetchEvents = nil + } + for start <= m.headBlock { end := start + m.cfg.MaxBlockRangeSize - 1 if end > m.headBlock { @@ -260,6 +257,39 @@ func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) { } } +func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) { + m.logger.Info("refetching old events") + for _, job := range m.cfg.RefetchEvents { + topic := crypto.Keccak256Hash([]byte(job.Event)) + + fromBlock := job.StartBlock + if fromBlock < m.cfg.StartBlock { + fromBlock = m.cfg.StartBlock + } + toBlock := job.EndBlock + if toBlock == 0 || toBlock > lastFetchedBlock { + toBlock = lastFetchedBlock + } + + for fromBlock <= toBlock { + end := fromBlock + m.cfg.MaxBlockRangeSize - 1 + if end > toBlock { + end = toBlock + } + m.logger.WithFields(logrus.Fields{ + "from_block": fromBlock, + "to_block": toBlock, + }).Info("scheduling new block range logs search") + m.blocksRangeChan <- &BlocksRange{ + From: fromBlock, + To: end, + Topic: &topic, + } + fromBlock = end + 1 + } + } +} + func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) { m.logger.Info("starting logs fetcher") for { @@ -289,7 +319,10 @@ func (m *ContractMonitor) tryToFetchLogs(ctx context.Context, blocksRange *Block q := ethereum.FilterQuery{ FromBlock: big.NewInt(int64(blocksRange.From)), ToBlock: big.NewInt(int64(blocksRange.To)), - Addresses: []common.Address{m.cfg.Address}, + Addresses: []common.Address{m.cfg.Address, m.cfg.ValidatorContractAddress}, + } + if blocksRange.Topic != nil { + q.Topics = [][]common.Hash{{*blocksRange.Topic}} } var logs []types.Log var err error diff --git a/monitor/wrappers.go b/monitor/wrappers.go index 7803f70..6cbe6aa 100644 --- a/monitor/wrappers.go +++ b/monitor/wrappers.go @@ -2,11 +2,14 @@ package monitor import ( "amb-monitor/entity" + + "github.com/ethereum/go-ethereum/common" ) type BlocksRange struct { - From uint - To uint + From uint + To uint + Topic *common.Hash } type LogsBatch struct { diff --git a/presenter/presenter.go b/presenter/presenter.go index 8316f06..74653c8 100644 --- a/presenter/presenter.go +++ b/presenter/presenter.go @@ -1,11 +1,13 @@ package presenter import ( + "amb-monitor/config" "amb-monitor/entity" "amb-monitor/logging" "amb-monitor/repository" "context" "encoding/json" + "fmt" "net/http" "strconv" @@ -15,16 +17,18 @@ import ( ) type Presenter struct { - logger logging.Logger - repo *repository.Repo - root chi.Router + logger logging.Logger + repo *repository.Repo + bridges map[string]*config.BridgeConfig + root chi.Router } -func NewPresenter(logger logging.Logger, repo *repository.Repo) *Presenter { +func NewPresenter(logger logging.Logger, repo *repository.Repo, bridges map[string]*config.BridgeConfig) *Presenter { return &Presenter{ - logger: logger, - repo: repo, - root: chi.NewMux(), + logger: logger, + repo: repo, + bridges: bridges, + root: chi.NewMux(), } } @@ -35,6 +39,7 @@ func (p *Presenter) Serve(addr string) error { p.root.Use(NewRequestLogger(p.logger)) p.root.Get("/tx/{txHash:0x[0-9a-fA-F]{64}}", p.wrapJSONHandler(p.SearchTx)) p.root.Get("/block/{chainID:[0-9]+}/{blockNumber:[0-9]+}", p.wrapJSONHandler(p.SearchBlock)) + p.root.Get("/bridge/{bridgeID:[0-9a-zA-Z_\\-]+}/validators", p.wrapJSONHandler(p.SearchValidators)) return http.ListenAndServe(addr, p.root) } @@ -84,6 +89,72 @@ func (p *Presenter) SearchBlock(ctx context.Context) (interface{}, error) { return p.searchInLogs(ctx, logs), nil } +func (p *Presenter) SearchValidators(ctx context.Context) (interface{}, error) { + bridgeID := chi.URLParamFromCtx(ctx, "bridgeID") + + if p.bridges[bridgeID] == nil { + return nil, fmt.Errorf("bridge %q not found", bridgeID) + } + + cfg := p.bridges[bridgeID] + res := ValidatorsResult{ + BridgeID: bridgeID, + Home: &ValidatorSideResult{ + ChainID: cfg.Home.Chain.ChainID, + }, + Foreign: &ValidatorSideResult{ + ChainID: cfg.Foreign.Chain.ChainID, + }, + } + + homeCursor, err := p.repo.LogsCursors.GetByChainIDAndAddress(ctx, res.Home.ChainID, cfg.Home.Address) + if err != nil { + p.logger.WithError(err).Error("failed to get home bridge cursor") + return nil, err + } + foreignCursor, err := p.repo.LogsCursors.GetByChainIDAndAddress(ctx, res.Foreign.ChainID, cfg.Foreign.Address) + if err != nil { + p.logger.WithError(err).Error("failed to get foreign bridge cursor") + return nil, err + } + + res.Home.BlockNumber = homeCursor.LastProcessedBlock + res.Foreign.BlockNumber = foreignCursor.LastProcessedBlock + + validators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, bridgeID) + if err != nil { + p.logger.WithError(err).Error("failed to find validators for bridge id") + return nil, err + } + + seenValidators := make(map[common.Address]bool, len(validators)) + for _, val := range validators { + if seenValidators[val.Address] { + continue + } + seenValidators[val.Address] = true + + confirmation, err := p.repo.SignedMessages.FindLatest(ctx, bridgeID, res.Home.ChainID, val.Address) + if err != nil { + p.logger.WithError(err).Error("failed to find latest validator confirmation") + return nil, err + } + valInfo := &ValidatorInfo{ + Signer: val.Address, + } + if confirmation != nil { + valInfo.LastConfirmation, err = p.getTxInfo(ctx, confirmation.LogID) + if err != nil { + p.logger.WithError(err).Error("failed to get tx info") + return nil, err + } + } + res.Validators = append(res.Validators, valInfo) + } + + return res, nil +} + func (p *Presenter) searchInLogs(ctx context.Context, logs []*entity.Log) []*SearchResult { results := make([]*SearchResult, 0, len(logs)) for _, log := range logs { diff --git a/presenter/types.go b/presenter/types.go index f0ecd85..c03bbdb 100644 --- a/presenter/types.go +++ b/presenter/types.go @@ -48,3 +48,20 @@ type SearchResult struct { Message interface{} RelatedEvents []*EventInfo } + +type ValidatorInfo struct { + Signer common.Address + LastConfirmation *TxInfo +} + +type ValidatorSideResult struct { + ChainID string + BlockNumber uint +} + +type ValidatorsResult struct { + BridgeID string + Home *ValidatorSideResult + Foreign *ValidatorSideResult + Validators []*ValidatorInfo +} diff --git a/repository/postgres/bridge_validators.go b/repository/postgres/bridge_validators.go new file mode 100644 index 0000000..932b8a9 --- /dev/null +++ b/repository/postgres/bridge_validators.go @@ -0,0 +1,84 @@ +package postgres + +import ( + "amb-monitor/db" + "amb-monitor/entity" + "context" + "database/sql" + "errors" + "fmt" + + sq "github.com/Masterminds/squirrel" + "github.com/ethereum/go-ethereum/common" +) + +type bridgeValidatorsRepo basePostgresRepo + +func NewBridgeValidatorsRepo(table string, db *db.DB) entity.BridgeValidatorsRepo { + return (*bridgeValidatorsRepo)(newBasePostgresRepo(table, db)) +} + +func (r *bridgeValidatorsRepo) Ensure(ctx context.Context, val *entity.BridgeValidator) error { + q, args, err := sq.Insert(r.table). + Columns("log_id", "bridge_id", "chain_id", "address", "removed_log_id"). + Values(val.LogID, val.BridgeID, val.ChainID, val.Address, val.RemovedLogID). + Suffix("ON CONFLICT (log_id) DO UPDATE SET updated_at = NOW(), removed_log_id = EXCLUDED.removed_log_id"). + PlaceholderFormat(sq.Dollar). + ToSql() + if err != nil { + return fmt.Errorf("can't build query: %w", err) + } + _, err = r.db.ExecContext(ctx, q, args...) + if err != nil { + return fmt.Errorf("can't ensure bridge validator: %w", err) + } + return nil +} + +func (r *bridgeValidatorsRepo) FindActiveValidator(ctx context.Context, bridgeID, chainID string, address common.Address) (*entity.BridgeValidator, error) { + q, args, err := sq.Select("*"). + From(r.table). + Where(sq.Eq{ + "bridge_id": bridgeID, + "chain_id": chainID, + "address": address, + "removed_log_id": nil, + }). + PlaceholderFormat(sq.Dollar). + ToSql() + if err != nil { + return nil, fmt.Errorf("can't build query: %w", err) + } + val := new(entity.BridgeValidator) + err = r.db.GetContext(ctx, val, q, args...) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("can't get bridge validator: %w", err) + } + return val, nil +} + +func (r *bridgeValidatorsRepo) FindActiveValidators(ctx context.Context, bridgeID string) ([]*entity.BridgeValidator, error) { + q, args, err := sq.Select("*"). + From(r.table). + Where(sq.Eq{ + "bridge_id": bridgeID, + "removed_log_id": nil, + }). + PlaceholderFormat(sq.Dollar). + ToSql() + if err != nil { + return nil, fmt.Errorf("can't build query: %w", err) + } + vals := make([]*entity.BridgeValidator, 0, 10) + err = r.db.SelectContext(ctx, &vals, q, args...) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("can't get bridge validator: %w", err) + } + return vals, nil +} diff --git a/repository/postgres/logs.go b/repository/postgres/logs.go index 6f4c5dd..e186c11 100644 --- a/repository/postgres/logs.go +++ b/repository/postgres/logs.go @@ -99,26 +99,6 @@ func (r *logsRepo) FindByBlockNumber(ctx context.Context, chainID string, block return logs, nil } -func (r *logsRepo) FindByTopicAndBlockRange(ctx context.Context, chainID string, addr common.Address, fromBlock uint, toBlock uint, topic common.Hash) ([]*entity.Log, error) { - q, args, err := sq.Select("*"). - From(r.table). - Where(sq.Eq{"chain_id": chainID, "address": addr, "topic0": topic}). - Where(sq.LtOrEq{"block_number": toBlock}). - Where(sq.GtOrEq{"block_number": fromBlock}). - OrderBy("block_number", "log_index"). - PlaceholderFormat(sq.Dollar). - ToSql() - if err != nil { - return nil, fmt.Errorf("can't build query: %w", err) - } - logs := make([]*entity.Log, 0, 10) - err = r.db.SelectContext(ctx, &logs, q, args...) - if err != nil { - return nil, fmt.Errorf("can't get logs by block number: %w", err) - } - return logs, nil -} - func (r *logsRepo) FindByTxHash(ctx context.Context, txHash common.Hash) ([]*entity.Log, error) { q, args, err := sq.Select("*"). From(r.table). diff --git a/repository/postgres/signed_messages.go b/repository/postgres/signed_messages.go index a7df72f..686ab1c 100644 --- a/repository/postgres/signed_messages.go +++ b/repository/postgres/signed_messages.go @@ -74,3 +74,26 @@ func (r *signedMessagesRepo) FindByMsgHash(ctx context.Context, bridgeID string, } return msgs, nil } + +func (r *signedMessagesRepo) FindLatest(ctx context.Context, bridgeID, chainID string, signer common.Address) (*entity.SignedMessage, error) { + q, args, err := sq.Select(r.table + ".*"). + From(r.table). + Join("logs l ON l.id = log_id"). + Where(sq.Eq{"bridge_id": bridgeID, "signer": signer, "l.chain_id": chainID}). + OrderBy("l.block_number DESC"). + Limit(1). + PlaceholderFormat(sq.Dollar). + ToSql() + if err != nil { + return nil, fmt.Errorf("can't build query: %w", err) + } + msg := new(entity.SignedMessage) + err = r.db.GetContext(ctx, msg, q, args...) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("can't get latest signed message: %w", err) + } + return msg, nil +} diff --git a/repository/repos.go b/repository/repos.go index 2a89c8f..410e6e9 100644 --- a/repository/repos.go +++ b/repository/repos.go @@ -19,6 +19,7 @@ type Repo struct { SentInformationRequests entity.SentInformationRequestsRepo SignedInformationRequests entity.SignedInformationRequestsRepo ExecutedInformationRequests entity.ExecutedInformationRequestsRepo + BridgeValidators entity.BridgeValidatorsRepo } func NewRepo(db *db.DB) *Repo { @@ -35,5 +36,6 @@ func NewRepo(db *db.DB) *Repo { SentInformationRequests: postgres.NewSentInformationRequestsRepo("sent_information_requests", db), SignedInformationRequests: postgres.NewSignedInformationRequestsRepo("signed_information_requests", db), ExecutedInformationRequests: postgres.NewExecutedInformationRequestsRepo("executed_information_requests", db), + BridgeValidators: postgres.NewBridgeValidatorsRepo("bridge_validators", db), } }