Listen for validator set update events

This commit is contained in:
Kirill Fedoseev 2022-03-14 13:43:14 +04:00
parent e55ebcfdcb
commit 9f24cdc426
22 changed files with 723 additions and 74 deletions

View File

@ -6,7 +6,7 @@ COPY . .
RUN go build
FROM ubuntu:20.10
FROM ubuntu:20.04
WORKDIR /app

View File

@ -25,14 +25,21 @@ type ChainConfig struct {
SafeLogsRequest bool `yaml:"safe_logs_request"`
}
type ReloadJobConfig struct {
Event string `yaml:"event"`
StartBlock uint `yaml:"start_block"`
EndBlock uint `yaml:"end_block"`
}
type BridgeSideConfig struct {
ChainName string `yaml:"chain"`
Chain *ChainConfig `yaml:"-"`
Address common.Address `yaml:"address"`
StartBlock uint `yaml:"start_block"`
BlockConfirmations uint `yaml:"required_block_confirmations"`
MaxBlockRangeSize uint `yaml:"max_block_range_size"`
ReloadEvents []string `yaml:"reload_events"`
ChainName string `yaml:"chain"`
Chain *ChainConfig `yaml:"-"`
Address common.Address `yaml:"address"`
ValidatorContractAddress common.Address `yaml:"validator_contract_address"`
StartBlock uint `yaml:"start_block"`
BlockConfirmations uint `yaml:"required_block_confirmations"`
MaxBlockRangeSize uint `yaml:"max_block_range_size"`
RefetchEvents []*ReloadJobConfig `yaml:"refetch_events"`
}
type BridgeAlertConfig struct {

View File

@ -1166,5 +1166,345 @@
],
"name": "RelayedMessage",
"type": "event"
},
{
"constant": true,
"inputs": [],
"name": "validatorCount",
"outputs": [
{
"name": "",
"type": "uint256"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "getBridgeValidatorsInterfacesVersion",
"outputs": [
{
"name": "major",
"type": "uint64"
},
{
"name": "minor",
"type": "uint64"
},
{
"name": "patch",
"type": "uint64"
}
],
"payable": false,
"stateMutability": "pure",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "isInitialized",
"outputs": [
{
"name": "",
"type": "bool"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "validatorList",
"outputs": [
{
"name": "",
"type": "address[]"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": false,
"inputs": [
{
"name": "_requiredSignatures",
"type": "uint256"
}
],
"name": "setRequiredSignatures",
"outputs": [],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "requiredSignatures",
"outputs": [
{
"name": "",
"type": "uint256"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [
{
"name": "_address",
"type": "address"
}
],
"name": "getNextValidator",
"outputs": [
{
"name": "",
"type": "address"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "owner",
"outputs": [
{
"name": "",
"type": "address"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [
{
"name": "_validator",
"type": "address"
}
],
"name": "isValidatorDuty",
"outputs": [
{
"name": "",
"type": "bool"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "deployedAtBlock",
"outputs": [
{
"name": "",
"type": "uint256"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "F_ADDR",
"outputs": [
{
"name": "",
"type": "address"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": false,
"inputs": [
{
"name": "newOwner",
"type": "address"
}
],
"name": "transferOwnership",
"outputs": [],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": true,
"inputs": [
{
"name": "_validator",
"type": "address"
}
],
"name": "isValidator",
"outputs": [
{
"name": "",
"type": "bool"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"name": "validator",
"type": "address"
}
],
"name": "ValidatorAdded",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"name": "validator",
"type": "address"
}
],
"name": "ValidatorRemoved",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"name": "requiredSignatures",
"type": "uint256"
}
],
"name": "RequiredSignaturesChanged",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"name": "previousOwner",
"type": "address"
},
{
"indexed": false,
"name": "newOwner",
"type": "address"
}
],
"name": "OwnershipTransferred",
"type": "event"
},
{
"constant": false,
"inputs": [
{
"name": "_requiredSignatures",
"type": "uint256"
},
{
"name": "_initialValidators",
"type": "address[]"
},
{
"name": "_initialRewards",
"type": "address[]"
},
{
"name": "_owner",
"type": "address"
}
],
"name": "initialize",
"outputs": [
{
"name": "",
"type": "bool"
}
],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": false,
"inputs": [
{
"name": "_validator",
"type": "address"
},
{
"name": "_reward",
"type": "address"
}
],
"name": "addRewardableValidator",
"outputs": [],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": false,
"inputs": [
{
"name": "_validator",
"type": "address"
}
],
"name": "removeValidator",
"outputs": [],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": true,
"inputs": [
{
"name": "_validator",
"type": "address"
}
],
"name": "getValidatorRewardAddress",
"outputs": [
{
"name": "",
"type": "address"
}
],
"payable": false,
"stateMutability": "view",
"type": "function"
}
]

View File

@ -4,8 +4,10 @@ import (
"amb-monitor/entity"
"amb-monitor/ethclient"
"bytes"
"context"
"fmt"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
)
@ -25,6 +27,21 @@ func (c *Contract) HasEvent(event string) bool {
return ok
}
func (c *Contract) ValidatorContractAddress(ctx context.Context) (common.Address, error) {
data, err := c.abi.Pack("validatorContract")
if err != nil {
return common.Address{}, fmt.Errorf("cannot encode abi calldata: %w", err)
}
res, err := c.client.CallContract(ctx, ethereum.CallMsg{
To: &c.Address,
Data: data,
})
if err != nil {
return common.Address{}, fmt.Errorf("cannot call validatorContract(): %w", err)
}
return common.BytesToAddress(res), nil
}
func (c *Contract) ParseLog(log *entity.Log) (string, map[string]interface{}, error) {
if log.Topic0 == nil {
return "", nil, fmt.Errorf("cannot process event without topics")

View File

@ -17,8 +17,8 @@ import (
)
type DB struct {
cfg *config.DBConfig
db *sqlx.DB
cfg *config.DBConfig
db *sqlx.DB
}
func (db *DB) Migrate() error {
@ -39,7 +39,7 @@ func (db *DB) dbURL(prefix string) string {
func NewDB(cfg *config.DBConfig) (*DB, error) {
db := &DB{
cfg: cfg,
cfg: cfg,
}
conn, err := sqlx.ConnectContext(context.Background(), "pgx", db.dbURL("postgres"))
if err != nil {

View File

@ -0,0 +1,2 @@
DROP TABLE bridge_validators;
DROP INDEX signed_messages_bridge_id_signer_idx;

View File

@ -0,0 +1,11 @@
CREATE TABLE bridge_validators
(
log_id SERIAL REFERENCES logs PRIMARY KEY,
bridge_id TEXT_ID,
chain_id CHAIN_ID,
address ADDRESS,
removed_log_id INT REFERENCES logs NULL,
updated_at TS_NOW,
created_at TS_NOW
);
CREATE INDEX signed_messages_bridge_id_signer_idx ON signed_messages (bridge_id, signer);

View File

@ -14,6 +14,8 @@ services:
build: .
env_file:
- .env
ports:
- "3333:3333"
volumes:
- ./config.yml:/app/config.yml
grafana:

View File

@ -0,0 +1,24 @@
package entity
import (
"context"
"time"
"github.com/ethereum/go-ethereum/common"
)
type BridgeValidator struct {
LogID uint `db:"log_id"`
BridgeID string `db:"bridge_id"`
ChainID string `db:"chain_id"`
Address common.Address `db:"address"`
RemovedLogID *uint `db:"removed_log_id"`
CreatedAt *time.Time `db:"created_at"`
UpdatedAt *time.Time `db:"updated_at"`
}
type BridgeValidatorsRepo interface {
Ensure(ctx context.Context, val *BridgeValidator) error
FindActiveValidator(ctx context.Context, bridgeID, chainID string, address common.Address) (*BridgeValidator, error)
FindActiveValidators(ctx context.Context, bridgeID string) ([]*BridgeValidator, error)
}

View File

@ -28,6 +28,5 @@ type LogsRepo interface {
GetByID(ctx context.Context, id uint) (*Log, error)
FindByBlockRange(ctx context.Context, chainID string, addr common.Address, fromBlock, toBlock uint) ([]*Log, error)
FindByBlockNumber(ctx context.Context, chainID string, block uint) ([]*Log, error)
FindByTopicAndBlockRange(ctx context.Context, chainID string, addr common.Address, fromBlock, toBlock uint, topic common.Hash) ([]*Log, error)
FindByTxHash(ctx context.Context, txHash common.Hash) ([]*Log, error)
}

View File

@ -20,4 +20,5 @@ type SignedMessagesRepo interface {
Ensure(ctx context.Context, msg *SignedMessage) error
FindByLogID(ctx context.Context, logID uint) (*SignedMessage, error)
FindByMsgHash(ctx context.Context, bridgeID string, msgHash common.Hash) ([]*SignedMessage, error)
FindLatest(ctx context.Context, bridgeID, chainID string, signer common.Address) (*SignedMessage, error)
}

View File

@ -135,6 +135,16 @@ func (c *Client) TransactionByHash(ctx context.Context, txHash common.Hash) (*ty
return tx, err
}
func (c *Client) CallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) {
defer ObserveDuration(c.ChainID, c.url, "eth_call")()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
res, err := c.client.CallContract(ctx, msg, nil)
ObserveError(c.ChainID, c.url, "eth_call", err)
return res, err
}
func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
arg := map[string]interface{}{
"address": q.Addresses,

View File

@ -44,7 +44,7 @@ func main() {
repo := repository.NewRepo(dbConn)
if cfg.Presenter != nil {
pr := presenter.NewPresenter(logger.WithField("service", "presenter"), repo)
pr := presenter.NewPresenter(logger.WithField("service", "presenter"), repo, cfg.Bridges)
go func() {
err := pr.Serve(cfg.Presenter.Host)
if err != nil {

View File

@ -209,3 +209,26 @@ func (p *BridgeEventHandler) HandleInformationRetrieved(ctx context.Context, log
Data: unmarshalConfirmInformationResult(tx.Data()),
})
}
func (p *BridgeEventHandler) HandleValidatorAdded(ctx context.Context, log *entity.Log, data map[string]interface{}) error {
validator := data["validator"].(common.Address)
return p.repo.BridgeValidators.Ensure(ctx, &entity.BridgeValidator{
LogID: log.ID,
BridgeID: p.bridgeID,
ChainID: log.ChainID,
Address: validator,
})
}
func (p *BridgeEventHandler) HandleValidatorRemoved(ctx context.Context, log *entity.Log, data map[string]interface{}) error {
validator := data["validator"].(common.Address)
val, err := p.repo.BridgeValidators.FindActiveValidator(ctx, p.bridgeID, log.ChainID, validator)
if err != nil {
return err
}
if val == nil {
return nil
}
val.RemovedLogID = &log.ID
return p.repo.BridgeValidators.Ensure(ctx, val)
}

View File

@ -62,6 +62,19 @@ func newContractMonitor(ctx context.Context, logger logging.Logger, repo *reposi
if err != nil {
return nil, fmt.Errorf("failed to start eth client: %w", err)
}
bridgeContract := contract.NewContract(client, cfg.Address, constants.AMB)
if cfg.ValidatorContractAddress == (common.Address{}) {
cfg.ValidatorContractAddress, err = bridgeContract.ValidatorContractAddress(ctx)
if err != nil {
return nil, fmt.Errorf("cannot get validator contract address: %w", err)
}
logger.WithFields(logrus.Fields{
"chain_id": client.ChainID,
"bridge_address": cfg.Address,
"validator_contract_address": cfg.ValidatorContractAddress,
"start_block": cfg.StartBlock,
}).Info("obtained validator contract address")
}
logsCursor, err := repo.LogsCursors.GetByChainIDAndAddress(ctx, client.ChainID, cfg.Address)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
@ -93,8 +106,8 @@ func newContractMonitor(ctx context.Context, logger logging.Logger, repo *reposi
logsCursor: logsCursor,
blocksRangeChan: make(chan *BlocksRange, 10),
logsChan: make(chan *LogsBatch, 200),
contract: contract.NewContract(client, cfg.Address, constants.AMB),
eventHandlers: make(map[string]EventHandler, 10),
contract: bridgeContract,
eventHandlers: make(map[string]EventHandler, 12),
syncedMetric: SyncedContract.With(commonLabels),
headBlockMetric: LatestHeadBlock.With(commonLabels),
fetchedBlockMetric: LatestFetchedBlock.With(commonLabels),
@ -127,10 +140,14 @@ func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo
homeMonitor.eventHandlers["UserRequestForInformation"] = handlers.HandleUserRequestForInformation
homeMonitor.eventHandlers["SignedForInformation"] = handlers.HandleSignedForInformation
homeMonitor.eventHandlers["InformationRetrieved"] = handlers.HandleInformationRetrieved
homeMonitor.eventHandlers["ValidatorAdded"] = handlers.HandleValidatorAdded
homeMonitor.eventHandlers["ValidatorRemoved"] = handlers.HandleValidatorRemoved
foreignMonitor.eventHandlers["UserRequestForAffirmation"] = handlers.HandleUserRequestForAffirmation
foreignMonitor.eventHandlers["UserRequestForAffirmation0"] = handlers.HandleLegacyUserRequestForAffirmation
foreignMonitor.eventHandlers["RelayedMessage"] = handlers.HandleRelayedMessage
foreignMonitor.eventHandlers["RelayedMessage0"] = handlers.HandleRelayedMessage
foreignMonitor.eventHandlers["ValidatorAdded"] = handlers.HandleValidatorAdded
foreignMonitor.eventHandlers["ValidatorRemoved"] = handlers.HandleValidatorRemoved
return &Monitor{
cfg: cfg,
logger: logger,
@ -166,41 +183,16 @@ func (m *ContractMonitor) Start(ctx context.Context) {
lastFetchedBlock := m.logsCursor.LastFetchedBlock
go m.StartBlockFetcher(ctx, lastFetchedBlock+1)
go m.StartLogsProcessor(ctx)
m.LoadUnprocessedLogs(ctx, m.cfg.ReloadEvents, lastProcessedBlock+1, lastFetchedBlock)
m.LoadUnprocessedLogs(ctx, lastProcessedBlock+1, lastFetchedBlock)
go m.StartLogsFetcher(ctx)
}
func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, reloadEvents []string, fromBlock, toBlock uint) {
func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, toBlock uint) {
m.logger.WithFields(logrus.Fields{
"from_block": fromBlock,
"to_block": toBlock,
"manual_reload_count": len(reloadEvents),
"from_block": fromBlock,
"to_block": toBlock,
}).Info("loading fetched but not yet processed blocks")
for _, event := range reloadEvents {
topic := crypto.Keccak256Hash([]byte(event))
m.logger.WithFields(logrus.Fields{
"from_block": m.cfg.StartBlock,
"to_block": fromBlock,
"event": event,
"topic": topic,
}).Info("manually reloading events")
for {
logs, err := m.repo.Logs.FindByTopicAndBlockRange(ctx, m.client.ChainID, m.cfg.Address, m.cfg.StartBlock, fromBlock, topic)
if err != nil {
m.logger.WithError(err).Error("can't find manually reloaded logs in block range")
if utils.ContextSleep(ctx, 10*time.Second) == nil {
return
}
continue
}
m.submitLogs(logs, toBlock)
break
}
}
var logs []*entity.Log
for {
var err error
@ -237,6 +229,11 @@ func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) {
}
m.headBlockMetric.Set(float64(m.headBlock))
if len(m.cfg.RefetchEvents) > 0 {
m.RefetchEvents(start - 1)
m.cfg.RefetchEvents = nil
}
for start <= m.headBlock {
end := start + m.cfg.MaxBlockRangeSize - 1
if end > m.headBlock {
@ -260,6 +257,39 @@ func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) {
}
}
func (m *ContractMonitor) RefetchEvents(lastFetchedBlock uint) {
m.logger.Info("refetching old events")
for _, job := range m.cfg.RefetchEvents {
topic := crypto.Keccak256Hash([]byte(job.Event))
fromBlock := job.StartBlock
if fromBlock < m.cfg.StartBlock {
fromBlock = m.cfg.StartBlock
}
toBlock := job.EndBlock
if toBlock == 0 || toBlock > lastFetchedBlock {
toBlock = lastFetchedBlock
}
for fromBlock <= toBlock {
end := fromBlock + m.cfg.MaxBlockRangeSize - 1
if end > toBlock {
end = toBlock
}
m.logger.WithFields(logrus.Fields{
"from_block": fromBlock,
"to_block": toBlock,
}).Info("scheduling new block range logs search")
m.blocksRangeChan <- &BlocksRange{
From: fromBlock,
To: end,
Topic: &topic,
}
fromBlock = end + 1
}
}
}
func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) {
m.logger.Info("starting logs fetcher")
for {
@ -289,7 +319,10 @@ func (m *ContractMonitor) tryToFetchLogs(ctx context.Context, blocksRange *Block
q := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(blocksRange.From)),
ToBlock: big.NewInt(int64(blocksRange.To)),
Addresses: []common.Address{m.cfg.Address},
Addresses: []common.Address{m.cfg.Address, m.cfg.ValidatorContractAddress},
}
if blocksRange.Topic != nil {
q.Topics = [][]common.Hash{{*blocksRange.Topic}}
}
var logs []types.Log
var err error

View File

@ -2,11 +2,14 @@ package monitor
import (
"amb-monitor/entity"
"github.com/ethereum/go-ethereum/common"
)
type BlocksRange struct {
From uint
To uint
From uint
To uint
Topic *common.Hash
}
type LogsBatch struct {

View File

@ -1,11 +1,13 @@
package presenter
import (
"amb-monitor/config"
"amb-monitor/entity"
"amb-monitor/logging"
"amb-monitor/repository"
"context"
"encoding/json"
"fmt"
"net/http"
"strconv"
@ -15,16 +17,18 @@ import (
)
type Presenter struct {
logger logging.Logger
repo *repository.Repo
root chi.Router
logger logging.Logger
repo *repository.Repo
bridges map[string]*config.BridgeConfig
root chi.Router
}
func NewPresenter(logger logging.Logger, repo *repository.Repo) *Presenter {
func NewPresenter(logger logging.Logger, repo *repository.Repo, bridges map[string]*config.BridgeConfig) *Presenter {
return &Presenter{
logger: logger,
repo: repo,
root: chi.NewMux(),
logger: logger,
repo: repo,
bridges: bridges,
root: chi.NewMux(),
}
}
@ -35,6 +39,7 @@ func (p *Presenter) Serve(addr string) error {
p.root.Use(NewRequestLogger(p.logger))
p.root.Get("/tx/{txHash:0x[0-9a-fA-F]{64}}", p.wrapJSONHandler(p.SearchTx))
p.root.Get("/block/{chainID:[0-9]+}/{blockNumber:[0-9]+}", p.wrapJSONHandler(p.SearchBlock))
p.root.Get("/bridge/{bridgeID:[0-9a-zA-Z_\\-]+}/validators", p.wrapJSONHandler(p.SearchValidators))
return http.ListenAndServe(addr, p.root)
}
@ -84,6 +89,72 @@ func (p *Presenter) SearchBlock(ctx context.Context) (interface{}, error) {
return p.searchInLogs(ctx, logs), nil
}
func (p *Presenter) SearchValidators(ctx context.Context) (interface{}, error) {
bridgeID := chi.URLParamFromCtx(ctx, "bridgeID")
if p.bridges[bridgeID] == nil {
return nil, fmt.Errorf("bridge %q not found", bridgeID)
}
cfg := p.bridges[bridgeID]
res := ValidatorsResult{
BridgeID: bridgeID,
Home: &ValidatorSideResult{
ChainID: cfg.Home.Chain.ChainID,
},
Foreign: &ValidatorSideResult{
ChainID: cfg.Foreign.Chain.ChainID,
},
}
homeCursor, err := p.repo.LogsCursors.GetByChainIDAndAddress(ctx, res.Home.ChainID, cfg.Home.Address)
if err != nil {
p.logger.WithError(err).Error("failed to get home bridge cursor")
return nil, err
}
foreignCursor, err := p.repo.LogsCursors.GetByChainIDAndAddress(ctx, res.Foreign.ChainID, cfg.Foreign.Address)
if err != nil {
p.logger.WithError(err).Error("failed to get foreign bridge cursor")
return nil, err
}
res.Home.BlockNumber = homeCursor.LastProcessedBlock
res.Foreign.BlockNumber = foreignCursor.LastProcessedBlock
validators, err := p.repo.BridgeValidators.FindActiveValidators(ctx, bridgeID)
if err != nil {
p.logger.WithError(err).Error("failed to find validators for bridge id")
return nil, err
}
seenValidators := make(map[common.Address]bool, len(validators))
for _, val := range validators {
if seenValidators[val.Address] {
continue
}
seenValidators[val.Address] = true
confirmation, err := p.repo.SignedMessages.FindLatest(ctx, bridgeID, res.Home.ChainID, val.Address)
if err != nil {
p.logger.WithError(err).Error("failed to find latest validator confirmation")
return nil, err
}
valInfo := &ValidatorInfo{
Signer: val.Address,
}
if confirmation != nil {
valInfo.LastConfirmation, err = p.getTxInfo(ctx, confirmation.LogID)
if err != nil {
p.logger.WithError(err).Error("failed to get tx info")
return nil, err
}
}
res.Validators = append(res.Validators, valInfo)
}
return res, nil
}
func (p *Presenter) searchInLogs(ctx context.Context, logs []*entity.Log) []*SearchResult {
results := make([]*SearchResult, 0, len(logs))
for _, log := range logs {

View File

@ -48,3 +48,20 @@ type SearchResult struct {
Message interface{}
RelatedEvents []*EventInfo
}
type ValidatorInfo struct {
Signer common.Address
LastConfirmation *TxInfo
}
type ValidatorSideResult struct {
ChainID string
BlockNumber uint
}
type ValidatorsResult struct {
BridgeID string
Home *ValidatorSideResult
Foreign *ValidatorSideResult
Validators []*ValidatorInfo
}

View File

@ -0,0 +1,84 @@
package postgres
import (
"amb-monitor/db"
"amb-monitor/entity"
"context"
"database/sql"
"errors"
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/ethereum/go-ethereum/common"
)
type bridgeValidatorsRepo basePostgresRepo
func NewBridgeValidatorsRepo(table string, db *db.DB) entity.BridgeValidatorsRepo {
return (*bridgeValidatorsRepo)(newBasePostgresRepo(table, db))
}
func (r *bridgeValidatorsRepo) Ensure(ctx context.Context, val *entity.BridgeValidator) error {
q, args, err := sq.Insert(r.table).
Columns("log_id", "bridge_id", "chain_id", "address", "removed_log_id").
Values(val.LogID, val.BridgeID, val.ChainID, val.Address, val.RemovedLogID).
Suffix("ON CONFLICT (log_id) DO UPDATE SET updated_at = NOW(), removed_log_id = EXCLUDED.removed_log_id").
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return fmt.Errorf("can't build query: %w", err)
}
_, err = r.db.ExecContext(ctx, q, args...)
if err != nil {
return fmt.Errorf("can't ensure bridge validator: %w", err)
}
return nil
}
func (r *bridgeValidatorsRepo) FindActiveValidator(ctx context.Context, bridgeID, chainID string, address common.Address) (*entity.BridgeValidator, error) {
q, args, err := sq.Select("*").
From(r.table).
Where(sq.Eq{
"bridge_id": bridgeID,
"chain_id": chainID,
"address": address,
"removed_log_id": nil,
}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, fmt.Errorf("can't build query: %w", err)
}
val := new(entity.BridgeValidator)
err = r.db.GetContext(ctx, val, q, args...)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("can't get bridge validator: %w", err)
}
return val, nil
}
func (r *bridgeValidatorsRepo) FindActiveValidators(ctx context.Context, bridgeID string) ([]*entity.BridgeValidator, error) {
q, args, err := sq.Select("*").
From(r.table).
Where(sq.Eq{
"bridge_id": bridgeID,
"removed_log_id": nil,
}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, fmt.Errorf("can't build query: %w", err)
}
vals := make([]*entity.BridgeValidator, 0, 10)
err = r.db.SelectContext(ctx, &vals, q, args...)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("can't get bridge validator: %w", err)
}
return vals, nil
}

View File

@ -99,26 +99,6 @@ func (r *logsRepo) FindByBlockNumber(ctx context.Context, chainID string, block
return logs, nil
}
func (r *logsRepo) FindByTopicAndBlockRange(ctx context.Context, chainID string, addr common.Address, fromBlock uint, toBlock uint, topic common.Hash) ([]*entity.Log, error) {
q, args, err := sq.Select("*").
From(r.table).
Where(sq.Eq{"chain_id": chainID, "address": addr, "topic0": topic}).
Where(sq.LtOrEq{"block_number": toBlock}).
Where(sq.GtOrEq{"block_number": fromBlock}).
OrderBy("block_number", "log_index").
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, fmt.Errorf("can't build query: %w", err)
}
logs := make([]*entity.Log, 0, 10)
err = r.db.SelectContext(ctx, &logs, q, args...)
if err != nil {
return nil, fmt.Errorf("can't get logs by block number: %w", err)
}
return logs, nil
}
func (r *logsRepo) FindByTxHash(ctx context.Context, txHash common.Hash) ([]*entity.Log, error) {
q, args, err := sq.Select("*").
From(r.table).

View File

@ -74,3 +74,26 @@ func (r *signedMessagesRepo) FindByMsgHash(ctx context.Context, bridgeID string,
}
return msgs, nil
}
func (r *signedMessagesRepo) FindLatest(ctx context.Context, bridgeID, chainID string, signer common.Address) (*entity.SignedMessage, error) {
q, args, err := sq.Select(r.table + ".*").
From(r.table).
Join("logs l ON l.id = log_id").
Where(sq.Eq{"bridge_id": bridgeID, "signer": signer, "l.chain_id": chainID}).
OrderBy("l.block_number DESC").
Limit(1).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, fmt.Errorf("can't build query: %w", err)
}
msg := new(entity.SignedMessage)
err = r.db.GetContext(ctx, msg, q, args...)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("can't get latest signed message: %w", err)
}
return msg, nil
}

View File

@ -19,6 +19,7 @@ type Repo struct {
SentInformationRequests entity.SentInformationRequestsRepo
SignedInformationRequests entity.SignedInformationRequestsRepo
ExecutedInformationRequests entity.ExecutedInformationRequestsRepo
BridgeValidators entity.BridgeValidatorsRepo
}
func NewRepo(db *db.DB) *Repo {
@ -35,5 +36,6 @@ func NewRepo(db *db.DB) *Repo {
SentInformationRequests: postgres.NewSentInformationRequestsRepo("sent_information_requests", db),
SignedInformationRequests: postgres.NewSignedInformationRequestsRepo("signed_information_requests", db),
ExecutedInformationRequests: postgres.NewExecutedInformationRequestsRepo("executed_information_requests", db),
BridgeValidators: postgres.NewBridgeValidatorsRepo("bridge_validators", db),
}
}