From 7ca7a910d53378f7e45a8237e0d384b740b8df56 Mon Sep 17 00:00:00 2001 From: Kirill Fedoseev Date: Fri, 24 Sep 2021 21:38:25 +0300 Subject: [PATCH] Slack alerts --- .gitignore | 3 +- config.yml | 155 ++++++++++++++++- config/config.go | 22 ++- db/queries.sql | 28 ++++ docker-compose.dev.yml | 29 +++- ethclient/metrics.go | 7 +- logging/logger.go | 2 +- main.go | 13 +- monitor/alerts/alert_manager.go | 85 ++++++++++ monitor/alerts/db_alerts_provider.go | 238 +++++++++++++++++++++++++++ monitor/alerts/job.go | 69 ++++++++ monitor/decoders.go | 38 +++-- monitor/monitor.go | 120 ++++++++------ prometheus/alertmanager.yml | 91 ++++++++++ prometheus/prometheus.yml | 16 ++ prometheus/rules.yml | 28 ++++ prometheus/templates/explorer.tmpl | 19 +++ prometheus/templates/slack.tmpl | 44 +++++ prometheus/templates/utils.tmpl | 9 + 19 files changed, 937 insertions(+), 79 deletions(-) create mode 100644 db/queries.sql create mode 100644 monitor/alerts/alert_manager.go create mode 100644 monitor/alerts/db_alerts_provider.go create mode 100644 monitor/alerts/job.go create mode 100644 prometheus/alertmanager.yml create mode 100644 prometheus/prometheus.yml create mode 100644 prometheus/rules.yml create mode 100644 prometheus/templates/explorer.tmpl create mode 100644 prometheus/templates/slack.tmpl create mode 100644 prometheus/templates/utils.tmpl diff --git a/.gitignore b/.gitignore index 13d4ac5..5f1dd1f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .env -.idea/ \ No newline at end of file +.idea/ +prometheus/slack_api_url.txt \ No newline at end of file diff --git a/config.yml b/config.yml index 06bacbb..13a0976 100644 --- a/config.yml +++ b/config.yml @@ -1,12 +1,20 @@ chains: mainnet: rpc: - host: https://mainnet.infura.io/v3/5d7bd94c50ed43fab1cb8e74f58678b0 + host: https://mainnet.infura.io/v3/${INFURA_PROJECT_KEY} timeout: 30s rps: 10 chain_id: 1 block_time: 15s block_index_interval: 60s + bsc: + rpc: + host: https://bsc-dataseed1.defibit.io/ + timeout: 20s + rps: 10 + chain_id: 56 + block_time: 3s + block_index_interval: 60s kovan: rpc: host: https://kovan.poa.network @@ -17,12 +25,20 @@ chains: block_index_interval: 60s xdai: rpc: - host: https://dai.poa.network + host: https://dai.poa.network/oe-only timeout: 20s rps: 10 chain_id: 100 block_time: 5s block_index_interval: 30s + poa: + rpc: + host: https://core.poa.network + timeout: 20s + rps: 10 + chain_id: 99 + block_time: 5s + block_index_interval: 30s sokol: rpc: host: https://sokol.poa.network @@ -31,6 +47,14 @@ chains: chain_id: 77 block_time: 5s block_index_interval: 30s + rinkeby: + rpc: + host: https://rinkeby.infura.io/v3/${INFURA_PROJECT_KEY} + timeout: 20s + rps: 10 + chain_id: 4 + block_time: 15s + block_index_interval: 60s bridges: xdai-amb: home: @@ -45,6 +69,19 @@ bridges: start_block: 9130277 required_block_confirmations: 12 max_block_range_size: 1000 + alerts: + unknown_message_confirmation: + home_start_block: 7408640 + foreign_start_block: 9130277 + unknown_message_execution: + home_start_block: 7408640 + foreign_start_block: 9130277 + stuck_message_confirmation: + home_start_block: 7408640 + foreign_start_block: 11907302 + failed_message_execution: + home_start_block: 17518126 + foreign_start_block: 12927769 test-amb: home: chain: sokol @@ -58,9 +95,121 @@ bridges: start_block: 12372929 required_block_confirmations: 12 max_block_range_size: 10000 + alerts: + stuck_message_confirmation: + home_start_block: 9849619 + foreign_start_block: 25437689 + unknown_message_confirmation: + home_start_block: 10666885 + foreign_start_block: 12372929 + unknown_message_execution: + home_start_block: 9849619 + foreign_start_block: 13603170 + bsc-xdai-amb: + home: + chain: xdai + address: 0x162E898bD0aacB578C8D5F8d6ca588c13d2A383F + start_block: 14496725 + required_block_confirmations: 12 + max_block_range_size: 10000 + foreign: + chain: bsc + address: 0x05185872898b6f94AA600177EF41B9334B1FA48B + start_block: 4792263 + required_block_confirmations: 12 + max_block_range_size: 1000 + alerts: + stuck_message_confirmation: + home_start_block: 14496725 + foreign_start_block: 4792263 + failed_message_execution: + home_start_block: 14496725 + foreign_start_block: 4792263 + unknown_message_confirmation: + home_start_block: 14496725 + foreign_start_block: 4792263 + unknown_message_execution: + home_start_block: 14496725 + foreign_start_block: 4792263 + rinkeby-xdai-amb: + home: + chain: xdai + address: 0xc38D4991c951fE8BCE1a12bEef2046eF36b0FA4A + start_block: 10030211 + required_block_confirmations: 12 + max_block_range_size: 20000 + foreign: + chain: rinkeby + address: 0xD4075FB57fCf038bFc702c915Ef9592534bED5c1 + start_block: 6529875 + required_block_confirmations: 12 + max_block_range_size: 20000 + alerts: + stuck_message_confirmation: + home_start_block: 10030211 + foreign_start_block: 8360609 + unknown_message_confirmation: + home_start_block: 10030211 + foreign_start_block: 6529875 + unknown_message_execution: + home_start_block: 10030211 + foreign_start_block: 6529875 + poa-xdai-amb: + home: + chain: xdai + address: 0xc2d77d118326c33BBe36EbeAbf4F7ED6BC2dda5c + start_block: 17976497 + required_block_confirmations: 12 + max_block_range_size: 10000 + foreign: + chain: poa + address: 0xB2218bdEbe8e90f80D04286772B0968ead666942 + start_block: 23152190 + required_block_confirmations: 12 + max_block_range_size: 10000 + alerts: + stuck_message_confirmation: + home_start_block: 17976497 + foreign_start_block: 23152190 + failed_message_execution: + home_start_block: 17976497 + foreign_start_block: 23152190 + unknown_message_confirmation: + home_start_block: 17976497 + foreign_start_block: 23152190 + unknown_message_execution: + home_start_block: 17976497 + foreign_start_block: 23152190 + eth-bsc-amb: + home: + chain: bsc + address: 0x6943A218d58135793F1FE619414eD476C37ad65a + start_block: 2756521 + required_block_confirmations: 12 + max_block_range_size: 1000 + foreign: + chain: mainnet + address: 0x07955be2967B655Cf52751fCE7ccC8c61EA594e2 + start_block: 11375619 + required_block_confirmations: 12 + max_block_range_size: 10000 + alerts: + stuck_message_confirmation: + home_start_block: 2756521 + foreign_start_block: 11375619 + failed_message_execution: + home_start_block: 2756521 + foreign_start_block: 11375619 + unknown_message_confirmation: + home_start_block: 2756521 + foreign_start_block: 11375619 + unknown_message_execution: + home_start_block: 2756521 + foreign_start_block: 11375619 postgres: user: postgres password: pass - host: localhost + host: postgres port: 5432 database: db +log_level: info diff --git a/config/config.go b/config/config.go index b550195..cf1d8d7 100644 --- a/config/config.go +++ b/config/config.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" ) @@ -32,10 +33,16 @@ type BridgeSideConfig struct { MaxBlockRangeSize uint `yaml:"max_block_range_size"` } +type BridgeAlertConfig struct { + HomeStartBlock uint `yaml:"home_start_block"` + ForeignStartBlock uint `yaml:"foreign_start_block"` +} + type BridgeConfig struct { - ID string `yaml:"-"` - Home *BridgeSideConfig `yaml:"home"` - Foreign *BridgeSideConfig `yaml:"foreign"` + ID string `yaml:"-"` + Home *BridgeSideConfig `yaml:"home"` + Foreign *BridgeSideConfig `yaml:"foreign"` + Alerts map[string]*BridgeAlertConfig `yaml:"alerts"` } type DBConfig struct { @@ -47,9 +54,12 @@ type DBConfig struct { } type Config struct { - Chains map[string]*ChainConfig `yaml:"chains"` - Bridges map[string]*BridgeConfig `yaml:"bridges"` - DBConfig *DBConfig `yaml:"postgres"` + Chains map[string]*ChainConfig `yaml:"chains"` + Bridges map[string]*BridgeConfig `yaml:"bridges"` + DBConfig *DBConfig `yaml:"postgres"` + LogLevel logrus.Level `yaml:"log_level"` + DisabledBridges []string `yaml:"disabled_bridges"` + EnabledBridges []string `yaml:"enabled_bridges"` } func readYamlConfig(cfg *Config) error { diff --git a/db/queries.sql b/db/queries.sql new file mode 100644 index 0000000..5f4eba4 --- /dev/null +++ b/db/queries.sql @@ -0,0 +1,28 @@ +-- message stats + +-- total +SELECT count(*) +FROM messages +WHERE bridge_id = '?' + AND direction = 'home_to_foreign'; +SELECT count(*) +FROM messages +WHERE bridge_id = '?' + AND direction = 'foreign_to_home'; + +-- foreign to home pending +SELECT m.* +FROM messages m + LEFT JOIN executed_messages em ON m.id = em.message_id AND m.bridge_id = em.bridge_id +WHERE m.bridge_id = '?' + AND m.direction = 'foreign_to_home' + AND em.log_id IS NULL; + +-- home to foreign oracle-driven pending +SELECT m.* +FROM messages m + LEFT JOIN executed_messages em ON m.id = em.message_id AND m.bridge_id = em.bridge_id +WHERE m.bridge_id = '?' + AND m.direction = 'home_to_foreign' + AND m.data_type = 0 + AND em.log_id IS NULL; \ No newline at end of file diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 7c2078c..d14780d 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -8,7 +8,7 @@ services: POSTGRES_DB: db POSTGRES_PASSWORD: pass ports: - - 5432:5432 + - "5432:5432" shm_size: 256mb monitor: build: . @@ -17,8 +17,29 @@ services: volumes: - ./config.yml:/app/config.yml grafana: - image: grafana/grafana:latest + image: grafana/grafana:8.1.5 + volumes: + - grafana-storage:/var/lib/grafana ports: - - 3000:3000 + - "3000:3000" + prometheus: + image: prom/prometheus:v2.30.0 + volumes: + - ./prometheus:/etc/prometheus + - prom-storage:/prometheus + command: ["--config.file=/etc/prometheus/prometheus.yml", "--web.enable-lifecycle"] + ports: + - "9090:9090" + alertmanager: + image: prom/alertmanager:v0.23.0 + command: ["--config.file=/etc/prometheus/alertmanager.yml", "--storage.path=/alertmanager", "--web.external-url=http://localhost:9093"] + volumes: + - ./prometheus:/etc/prometheus + - alertmanager-storage:/alertmanager + ports: + - "9093:9093" volumes: - db: \ No newline at end of file + db: + grafana-storage: + prom-storage: + alertmanager-storage: \ No newline at end of file diff --git a/ethclient/metrics.go b/ethclient/metrics.go index f5cbd5e..63a4f5f 100644 --- a/ethclient/metrics.go +++ b/ethclient/metrics.go @@ -3,7 +3,7 @@ package ethclient import ( "context" "errors" - "strconv" + "fmt" "time" "github.com/ethereum/go-ethereum/rpc" @@ -31,10 +31,7 @@ func ObserveError(url, query string, err error) { if errors.Is(err, context.DeadlineExceeded) { RequestResults.WithLabelValues(url, query, "timeout").Inc() } else if err, ok := err.(rpc.Error); ok { - RequestResults.MustCurryWith(prometheus.Labels{ - "code": strconv.Itoa(err.ErrorCode()), - "message": err.Error(), - }).WithLabelValues(url, query, "error").Inc() + RequestResults.WithLabelValues(url, query, fmt.Sprintf("error-%d-%s", err.ErrorCode(), err.Error())).Inc() } else { RequestResults.WithLabelValues(url, query, "error").Inc() } diff --git a/logging/logger.go b/logging/logger.go index c474f09..796b3ad 100644 --- a/logging/logger.go +++ b/logging/logger.go @@ -8,7 +8,7 @@ import ( type Logger logrus.FieldLogger -func New() Logger { +func New() *logrus.Logger { logger := logrus.New() logger.SetLevel(logrus.InfoLevel) logger.SetFormatter(&logrus.TextFormatter{ diff --git a/main.go b/main.go index 62cc33b..ece3734 100644 --- a/main.go +++ b/main.go @@ -21,6 +21,7 @@ func main() { if err != nil { logger.WithError(err).Fatal("can't read config") } + logger.SetLevel(cfg.LogLevel) dbConn, err := db.NewDB(cfg.DBConfig) if err != nil { @@ -44,8 +45,18 @@ func main() { monitors := make([]*monitor.Monitor, 0, len(cfg.Bridges)) ctx, cancel := context.WithCancel(context.Background()) + for _, bridge := range cfg.DisabledBridges { + delete(cfg.Bridges, bridge) + } + if cfg.EnabledBridges != nil { + newBridgeCfg := make(map[string]*config.BridgeConfig, len(cfg.EnabledBridges)) + for _, bridge := range cfg.EnabledBridges { + newBridgeCfg[bridge] = cfg.Bridges[bridge] + } + cfg.Bridges = newBridgeCfg + } for _, bridgeCfg := range cfg.Bridges { - m, err2 := monitor.NewMonitor(ctx, logger.WithField("bridge_id", bridgeCfg.ID), repo, bridgeCfg) + m, err2 := monitor.NewMonitor(ctx, logger.WithField("bridge_id", bridgeCfg.ID), dbConn, repo, bridgeCfg) if err2 != nil { logger.WithError(err2).Fatal("can't initialize bridge monitor") } diff --git a/monitor/alerts/alert_manager.go b/monitor/alerts/alert_manager.go new file mode 100644 index 0000000..8cb13a0 --- /dev/null +++ b/monitor/alerts/alert_manager.go @@ -0,0 +1,85 @@ +package alerts + +import ( + "amb-monitor/config" + "amb-monitor/db" + "amb-monitor/logging" + "context" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type AlertManager struct { + logger logging.Logger + jobs map[string]*Job +} + +func NewAlertManager(logger logging.Logger, db *db.DB, cfg *config.BridgeConfig) (*AlertManager, error) { + provider := NewDBAlertsProvider(db) + jobs := make(map[string]*Job, len(cfg.Alerts)) + + for name, alertCfg := range cfg.Alerts { + switch name { + case "unknown_message_confirmation": + jobs[name] = &Job{ + Interval: time.Minute, + Timeout: time.Second * 10, + Func: provider.FindUnknownConfirmations, + Labels: []string{"chain_id", "block_number", "tx_hash", "signer", "msg_hash"}, + } + case "unknown_message_execution": + jobs[name] = &Job{ + Interval: time.Minute, + Timeout: time.Second * 10, + Func: provider.FindUnknownExecutions, + Labels: []string{"chain_id", "block_number", "tx_hash", "message_id"}, + } + case "stuck_message_confirmation": + jobs[name] = &Job{ + Interval: time.Minute * 2, + Timeout: time.Second * 20, + Func: provider.FindStuckMessages, + Labels: []string{"chain_id", "block_number", "tx_hash", "msg_hash", "count"}, + } + case "failed_message_execution": + jobs[name] = &Job{ + Interval: time.Minute * 5, + Timeout: time.Second * 20, + Func: provider.FindFailedExecutions, + Labels: []string{"chain_id", "block_number", "tx_hash", "sender", "executor"}, + } + default: + return nil, fmt.Errorf("unknown alert type %q", name) + } + jobs[name].Metric = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "alert", + Subsystem: "monitor", + Name: name, + ConstLabels: map[string]string{ + "bridge": cfg.ID, + }, + }, jobs[name].Labels) + jobs[name].Params = &AlertJobParams{ + Bridge: cfg.ID, + HomeChainID: cfg.Home.Chain.ChainID, + HomeStartBlockNumber: alertCfg.HomeStartBlock, + ForeignChainID: cfg.Foreign.Chain.ChainID, + ForeignStartBlockNumber: alertCfg.ForeignStartBlock, + } + } + + return &AlertManager{ + logger: logger, + jobs: jobs, + }, nil +} + +func (m *AlertManager) Start(ctx context.Context) { + for name, job := range m.jobs { + job.logger = m.logger.WithField("alert_job", name) + go job.Start(ctx) + } +} diff --git a/monitor/alerts/db_alerts_provider.go b/monitor/alerts/db_alerts_provider.go new file mode 100644 index 0000000..6051f8c --- /dev/null +++ b/monitor/alerts/db_alerts_provider.go @@ -0,0 +1,238 @@ +package alerts + +import ( + "amb-monitor/db" + "context" + "fmt" + "strconv" + "time" + + sq "github.com/Masterminds/squirrel" + "github.com/ethereum/go-ethereum/common" +) + +type DBAlertsProvider struct { + db *db.DB +} + +func NewDBAlertsProvider(db *db.DB) *DBAlertsProvider { + return &DBAlertsProvider{ + db: db, + } +} + +type UnknownConfirmation struct { + ChainID string `db:"chain_id"` + BlockNumber uint64 `db:"block_number"` + TransactionHash common.Hash `db:"transaction_hash"` + Signer common.Address `db:"signer"` + MsgHash common.Hash `db:"msg_hash"` +} + +func (c *UnknownConfirmation) AlertValues() AlertValues { + return AlertValues{ + Labels: map[string]string{ + "chain_id": c.ChainID, + "block_number": strconv.FormatUint(c.BlockNumber, 10), + "tx_hash": c.TransactionHash.String(), + "signer": c.Signer.String(), + "msg_hash": c.MsgHash.String(), + }, + Value: 1, + } +} + +func (p *DBAlertsProvider) FindUnknownConfirmations(ctx context.Context, params *AlertJobParams) ([]AlertValues, error) { + q, args, err := sq.Select("l.chain_id", "l.block_number", "l.transaction_hash", "sm.signer", "sm.msg_hash"). + From("signed_messages sm"). + Join("logs l ON l.id = sm.log_id"). + LeftJoin("messages m ON sm.bridge_id = m.bridge_id AND m.msg_hash = sm.msg_hash"). + Where(sq.Eq{"m.id": nil, "sm.bridge_id": params.Bridge, "l.chain_id": params.HomeChainID}). + Where(sq.GtOrEq{"l.block_number": params.HomeStartBlockNumber}). + PlaceholderFormat(sq.Dollar). + ToSql() + if err != nil { + return nil, fmt.Errorf("can't build query: %w", err) + } + res := make([]UnknownConfirmation, 0, 5) + err = p.db.SelectContext(ctx, &res, q, args...) + if err != nil { + return nil, fmt.Errorf("can't select alerts: %w", err) + } + alerts := make([]AlertValues, len(res)) + for i := range res { + alerts[i] = res[i].AlertValues() + } + return alerts, nil +} + +type UnknownExecution struct { + ChainID string `db:"chain_id"` + BlockNumber uint64 `db:"block_number"` + TransactionHash common.Hash `db:"transaction_hash"` + MessageID common.Hash `db:"message_id"` +} + +func (c *UnknownExecution) AlertValues() AlertValues { + return AlertValues{ + Labels: map[string]string{ + "chain_id": c.ChainID, + "block_number": strconv.FormatUint(c.BlockNumber, 10), + "tx_hash": c.TransactionHash.String(), + "message_id": c.MessageID.String(), + }, + Value: 1, + } +} + +func (p *DBAlertsProvider) FindUnknownExecutions(ctx context.Context, params *AlertJobParams) ([]AlertValues, error) { + q, args, err := sq.Select("l.chain_id", "l.block_number", "l.transaction_hash", "em.message_id"). + From("executed_messages em"). + Join("logs l ON l.id = em.log_id"). + LeftJoin("messages m ON em.bridge_id = m.bridge_id AND em.message_id = m.message_id"). + Where(sq.Eq{"m.id": nil, "em.bridge_id": params.Bridge}). + Where(sq.Or{ + sq.And{ + sq.Eq{"l.chain_id": params.HomeChainID}, + sq.GtOrEq{"l.block_number": params.HomeStartBlockNumber}, + }, + sq.And{ + sq.Eq{"l.chain_id": params.ForeignChainID}, + sq.GtOrEq{"l.block_number": params.ForeignStartBlockNumber}, + }, + }). + PlaceholderFormat(sq.Dollar). + ToSql() + if err != nil { + return nil, fmt.Errorf("can't build query: %w", err) + } + res := make([]UnknownExecution, 0, 5) + err = p.db.SelectContext(ctx, &res, q, args...) + if err != nil { + return nil, fmt.Errorf("can't select alerts: %w", err) + } + alerts := make([]AlertValues, len(res)) + for i := range res { + alerts[i] = res[i].AlertValues() + } + return alerts, nil +} + +type StuckMessage struct { + ChainID string `db:"chain_id"` + BlockNumber uint64 `db:"block_number"` + TransactionHash common.Hash `db:"transaction_hash"` + MsgHash common.Hash `db:"msg_hash"` + Count uint64 `db:"count"` + WaitTime time.Duration `db:"wait_time"` +} + +func (c *StuckMessage) AlertValues() AlertValues { + return AlertValues{ + Labels: map[string]string{ + "chain_id": c.ChainID, + "block_number": strconv.FormatUint(c.BlockNumber, 10), + "tx_hash": c.TransactionHash.String(), + "msg_hash": c.MsgHash.String(), + "count": strconv.FormatUint(c.Count, 10), + }, + Value: float64(c.WaitTime), + } +} + +func (p *DBAlertsProvider) FindStuckMessages(ctx context.Context, params *AlertJobParams) ([]AlertValues, error) { + query := ` + SELECT l.chain_id, + l.block_number, + l.transaction_hash, + sm.msg_hash, + count(s.log_id) as count, + EXTRACT(EPOCH FROM now() - ts.timestamp)::int as wait_time + FROM sent_messages sm + JOIN logs l on l.id = sm.log_id + JOIN block_timestamps ts on ts.chain_id = l.chain_id AND ts.block_number = l.block_number + JOIN messages m on sm.bridge_id = m.bridge_id AND m.msg_hash = sm.msg_hash AND data_type = 0 + LEFT JOIN signed_messages s on s.bridge_id = m.bridge_id AND m.msg_hash = s.msg_hash + LEFT JOIN collected_messages cm on m.bridge_id = cm.bridge_id AND cm.msg_hash = m.msg_hash + WHERE m.direction::text='home_to_foreign' AND cm.log_id IS NULL AND sm.bridge_id = $1 AND l.block_number >= $2 GROUP BY sm.log_id, l.id, ts.timestamp + UNION + SELECT l.chain_id, + l.block_number, + l.transaction_hash, + sm.msg_hash, + count(s.log_id) as count, + EXTRACT(EPOCH FROM now() - ts.timestamp)::int as wait_time + FROM sent_messages sm + JOIN logs l on l.id = sm.log_id + JOIN block_timestamps ts on ts.chain_id = l.chain_id AND ts.block_number = l.block_number + JOIN messages m on sm.bridge_id = m.bridge_id AND m.msg_hash = sm.msg_hash AND data_type = 0 + LEFT JOIN signed_messages s on s.bridge_id = m.bridge_id AND m.msg_hash = s.msg_hash + LEFT JOIN executed_messages em on m.bridge_id = em.bridge_id AND em.message_id = m.message_id + WHERE m.direction::text='foreign_to_home' AND em.log_id IS NULL AND sm.bridge_id = $1 AND l.block_number >= $3 GROUP BY sm.log_id, l.id, ts.timestamp` + res := make([]StuckMessage, 0, 5) + err := p.db.SelectContext(ctx, &res, query, params.Bridge, params.HomeStartBlockNumber, params.ForeignStartBlockNumber) + if err != nil { + return nil, fmt.Errorf("can't select alerts: %w", err) + } + alerts := make([]AlertValues, len(res)) + for i := range res { + alerts[i] = res[i].AlertValues() + } + return alerts, nil +} + +type FailedExecution struct { + ChainID string `db:"chain_id"` + BlockNumber uint64 `db:"block_number"` + TransactionHash common.Hash `db:"transaction_hash"` + Sender common.Address `db:"sender"` + Executor common.Address `db:"executor"` +} + +func (c *FailedExecution) AlertValues() AlertValues { + return AlertValues{ + Labels: map[string]string{ + "chain_id": c.ChainID, + "block_number": strconv.FormatUint(c.BlockNumber, 10), + "tx_hash": c.TransactionHash.String(), + "sender": c.Sender.String(), + "executor": c.Executor.String(), + }, + Value: 1, + } +} + +func (p *DBAlertsProvider) FindFailedExecutions(ctx context.Context, params *AlertJobParams) ([]AlertValues, error) { + q, args, err := sq.Select("l.chain_id", "l.block_number", "l.transaction_hash", "m.sender", "m.executor"). + From("sent_messages sm"). + Join("messages m on sm.bridge_id = m.bridge_id AND m.msg_hash = sm.msg_hash"). + Join("executed_messages em on m.bridge_id = em.bridge_id AND em.message_id = m.message_id"). + Join("logs l ON l.id = em.log_id"). + Join("block_timestamps bt on bt.chain_id = l.chain_id AND bt.block_number = l.block_number"). + Where(sq.Eq{"em.status": false, "m.data_type": 0, "em.bridge_id": params.Bridge}). + Where(sq.Or{ + sq.And{ + sq.Eq{"l.chain_id": params.HomeChainID}, + sq.GtOrEq{"l.block_number": params.HomeStartBlockNumber}, + }, + sq.And{ + sq.Eq{"l.chain_id": params.ForeignChainID}, + sq.GtOrEq{"l.block_number": params.ForeignStartBlockNumber}, + }, + }). + PlaceholderFormat(sq.Dollar). + ToSql() + if err != nil { + return nil, fmt.Errorf("can't build query: %w", err) + } + res := make([]FailedExecution, 0, 5) + err = p.db.SelectContext(ctx, &res, q, args...) + if err != nil { + return nil, fmt.Errorf("can't select alerts: %w", err) + } + alerts := make([]AlertValues, len(res)) + for i := range res { + alerts[i] = res[i].AlertValues() + } + return alerts, nil +} diff --git a/monitor/alerts/job.go b/monitor/alerts/job.go new file mode 100644 index 0000000..fc66a85 --- /dev/null +++ b/monitor/alerts/job.go @@ -0,0 +1,69 @@ +package alerts + +import ( + "amb-monitor/logging" + "context" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" +) + +type AlertValues struct { + Labels map[string]string + Value float64 +} + +type AlertJobParams struct { + Bridge string + HomeChainID string + HomeStartBlockNumber uint + ForeignChainID string + ForeignStartBlockNumber uint +} + +type Job struct { + logger logging.Logger + Metric *prometheus.GaugeVec + Interval time.Duration + Timeout time.Duration + Labels []string + Func func(ctx context.Context, params *AlertJobParams) ([]AlertValues, error) + Params *AlertJobParams +} + +func (j *Job) Start(ctx context.Context) { + ticker := time.NewTicker(j.Interval) + for { + timeoutCtx, cancel := context.WithTimeout(ctx, j.Timeout) + start := time.Now() + values, err := j.Func(timeoutCtx, j.Params) + cancel() + if err != nil { + j.logger.WithError(err).Error("failed to process alert job") + } else { + // TODO scrape only active up-to-date bridges + j.Metric.Reset() + + if len(values) > 0 { + j.logger.WithFields(logrus.Fields{ + "count": len(values), + "duration": time.Since(start), + }).Warn("found some possible alerts") + for _, v := range values { + j.Metric.With(v.Labels).Set(v.Value) + } + } else { + j.logger.WithField("duration", time.Since(start)).Info("no alerts has been found") + } + } + + select { + case <-ticker.C: + continue + case <-ctx.Done(): + ticker.Stop() + return + } + } +} diff --git a/monitor/decoders.go b/monitor/decoders.go index 9bf24c6..d39b3f7 100644 --- a/monitor/decoders.go +++ b/monitor/decoders.go @@ -2,6 +2,7 @@ package monitor import ( "amb-monitor/entity" + "bytes" "encoding/binary" "github.com/ethereum/go-ethereum/common" @@ -9,17 +10,34 @@ import ( ) func unmarshalMessage(bridgeID string, direction entity.Direction, encodedData []byte) *entity.Message { - return &entity.Message{ - BridgeID: bridgeID, - Direction: direction, - MsgHash: crypto.Keccak256Hash(encodedData), - MessageID: common.BytesToHash(encodedData[:32]), - Sender: common.BytesToAddress(encodedData[32:52]), - Executor: common.BytesToAddress(encodedData[52:72]), - GasLimit: uint(binary.BigEndian.Uint32(encodedData[72:76])), - DataType: uint(encodedData[78]), - Data: encodedData[79+encodedData[76]+encodedData[77]:], + messageID := common.BytesToHash(encodedData[:32]) + if bytes.Equal(messageID[0:4], []byte{0, 4, 0, 0}) { + return &entity.Message{ + BridgeID: bridgeID, + Direction: direction, + MsgHash: crypto.Keccak256Hash(encodedData), + MessageID: messageID, + Sender: common.BytesToAddress(encodedData[64:84]), + Executor: common.BytesToAddress(encodedData[84:104]), + GasLimit: uint(binary.BigEndian.Uint32(encodedData[104:108])), + DataType: uint(encodedData[108]), + Data: encodedData[108:], + } } + if bytes.Equal(messageID[0:4], []byte{0, 5, 0, 0}) { + return &entity.Message{ + BridgeID: bridgeID, + Direction: direction, + MsgHash: crypto.Keccak256Hash(encodedData), + MessageID: messageID, + Sender: common.BytesToAddress(encodedData[32:52]), + Executor: common.BytesToAddress(encodedData[52:72]), + GasLimit: uint(binary.BigEndian.Uint32(encodedData[72:76])), + DataType: uint(encodedData[78]), + Data: encodedData[79+encodedData[76]+encodedData[77]:], + } + } + panic("unsupported message version prefix") } func unmarshalLegacyMessage(bridgeID string, direction entity.Direction, encodedData []byte) *entity.Message { diff --git a/monitor/monitor.go b/monitor/monitor.go index b246a2f..d4ca693 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -4,9 +4,11 @@ import ( "amb-monitor/config" "amb-monitor/contract" "amb-monitor/contract/constants" + "amb-monitor/db" "amb-monitor/entity" "amb-monitor/ethclient" "amb-monitor/logging" + "amb-monitor/monitor/alerts" "amb-monitor/repository" "context" "database/sql" @@ -42,6 +44,7 @@ type Monitor struct { repo *repository.Repo homeMonitor *ContractMonitor foreignMonitor *ContractMonitor + alertManager *alerts.AlertManager } func newContractMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, cfg *config.BridgeSideConfig) (*ContractMonitor, error) { @@ -80,7 +83,7 @@ func newContractMonitor(ctx context.Context, logger logging.Logger, repo *reposi }, nil } -func NewMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, cfg *config.BridgeConfig) (*Monitor, error) { +func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo *repository.Repo, cfg *config.BridgeConfig) (*Monitor, error) { homeMonitor, err := newContractMonitor(ctx, logger.WithField("contract", "home"), repo, cfg.Home) if err != nil { return nil, fmt.Errorf("failed to initialize home side monitor: %w", err) @@ -89,6 +92,10 @@ func NewMonitor(ctx context.Context, logger logging.Logger, repo *repository.Rep if err != nil { return nil, fmt.Errorf("failed to initialize foreign side monitor: %w", err) } + alertManager, err := alerts.NewAlertManager(logger, dbConn, cfg) + if err != nil { + return nil, fmt.Errorf("failed to initialize alert manager: %w", err) + } handlers := NewBridgeEventHandler(repo, cfg.ID) homeMonitor.eventHandlers["UserRequestForSignature"] = handlers.HandleUserRequestForSignature homeMonitor.eventHandlers["UserRequestForSignature0"] = handlers.HandleLegacyUserRequestForSignature @@ -107,6 +114,7 @@ func NewMonitor(ctx context.Context, logger logging.Logger, repo *repository.Rep repo: repo, homeMonitor: homeMonitor, foreignMonitor: foreignMonitor, + alertManager: alertManager, }, nil } @@ -114,6 +122,7 @@ func (m *Monitor) Start(ctx context.Context) { m.logger.Info("starting bridge monitor") go m.homeMonitor.Start(ctx) go m.foreignMonitor.Start(ctx) + go m.alertManager.Start(ctx) } func (m *ContractMonitor) Start(ctx context.Context) { @@ -143,7 +152,7 @@ func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, to break } - m.submitLogs(logs) + m.submitLogs(logs, toBlock) } func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) { @@ -201,13 +210,14 @@ func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) { case blocksRange := <-m.blocksRangeChan: for { err := m.tryToFetchLogs(ctx, blocksRange) - if err == nil { - break + if err != nil { + m.logger.WithError(err).WithFields(logrus.Fields{ + "from_block": blocksRange.From, + "to_block": blocksRange.To, + }).Error("failed logs fetching, retrying") + continue } - m.logger.WithError(err).WithFields(logrus.Fields{ - "from_block": blocksRange.From, - "to_block": blocksRange.To, - }).Error("failed logs fetching, retrying") + break } } } @@ -227,42 +237,40 @@ func (m *ContractMonitor) tryToFetchLogs(ctx context.Context, blocksRange *Block "from_block": blocksRange.From, "to_block": blocksRange.To, }).Info("fetched logs in range") - if len(logs) == 0 { - return nil - } - sort.Slice(logs, func(i, j int) bool { - a, b := &logs[i], &logs[j] - return a.BlockNumber < b.BlockNumber || (a.BlockNumber == b.BlockNumber && a.Index < b.Index) - }) entities := make([]*entity.Log, len(logs)) - for i, log := range logs { - entities[i] = m.logToEntity(log) - } - err = m.repo.Logs.Ensure(ctx, entities...) - if err != nil { - return err - } + if len(logs) > 0 { + sort.Slice(logs, func(i, j int) bool { + a, b := &logs[i], &logs[j] + return a.BlockNumber < b.BlockNumber || (a.BlockNumber == b.BlockNumber && a.Index < b.Index) + }) + for i, log := range logs { + entities[i] = m.logToEntity(log) + } + err = m.repo.Logs.Ensure(ctx, entities...) + if err != nil { + return err + } - indexes := make([]uint, len(entities)) - for i, x := range entities { - indexes[i] = x.ID + indexes := make([]uint, len(entities)) + for i, x := range entities { + indexes[i] = x.ID + } + m.logger.WithFields(logrus.Fields{ + "count": len(logs), + "from_block": blocksRange.From, + "to_block": blocksRange.To, + }).Info("saved logs") } - m.logger.WithFields(logrus.Fields{ - "count": len(logs), - "from_block": blocksRange.From, - "to_block": blocksRange.To, - }).Info("saved logs") - - m.logsCursor.LastFetchedBlock = entities[len(logs)-1].BlockNumber + m.logsCursor.LastFetchedBlock = blocksRange.To if err = m.repo.LogsCursors.Ensure(ctx, m.logsCursor); err != nil { return err } - m.submitLogs(entities) + m.submitLogs(entities, blocksRange.To) return nil } -func (m *ContractMonitor) submitLogs(logs []*entity.Log) { +func (m *ContractMonitor) submitLogs(logs []*entity.Log, endBlock uint) { jobs, lastBlock := 0, uint(0) for _, log := range logs { if log.BlockNumber > lastBlock { @@ -290,6 +298,12 @@ func (m *ContractMonitor) submitLogs(logs []*entity.Log) { batchStartIndex = i } } + if lastBlock < endBlock { + m.logsChan <- &LogsBatch{ + BlockNumber: endBlock, + Logs: nil, + } + } } func (m *ContractMonitor) logToEntity(log types.Log) *entity.Log { @@ -326,29 +340,31 @@ func (m *ContractMonitor) StartLogsProcessor(ctx context.Context) { wg := new(sync.WaitGroup) wg.Add(2) go func() { + defer wg.Done() for { err := m.tryToGetBlockTimestamp(ctx, logs.BlockNumber) - if err == nil { - wg.Done() - return + if err != nil { + m.logger.WithError(err).WithFields(logrus.Fields{ + "block_number": logs.BlockNumber, + }).Error("failed to get block timestamp, retrying") + continue } - m.logger.WithError(err).WithFields(logrus.Fields{ - "block_number": logs.BlockNumber, - }).Error("failed to get block timestamp, retrying") + return } }() go func() { + defer wg.Done() for { err := m.tryToProcessLogsBatch(ctx, logs) - if err == nil { - wg.Done() - return + if err != nil { + m.logger.WithError(err).WithFields(logrus.Fields{ + "block_number": logs.BlockNumber, + "count": len(logs.Logs), + }).Error("failed to process logs batch, retrying") + continue } - m.logger.WithError(err).WithFields(logrus.Fields{ - "block_number": logs.BlockNumber, - "count": len(logs.Logs), - }).Error("failed to process logs batch, retrying") + return } }() wg.Wait() @@ -410,6 +426,14 @@ func (m *ContractMonitor) tryToProcessLogsBatch(ctx context.Context, logs *LogsB } } - m.logsCursor.LastProcessedBlock = logs.BlockNumber + return m.recordProcessedBlockNumber(ctx, logs.BlockNumber) +} + +func (m *ContractMonitor) recordProcessedBlockNumber(ctx context.Context, blockNumber uint) error { + if blockNumber < m.logsCursor.LastProcessedBlock { + return nil + } + + m.logsCursor.LastProcessedBlock = blockNumber return m.repo.LogsCursors.Ensure(ctx, m.logsCursor) } diff --git a/prometheus/alertmanager.yml b/prometheus/alertmanager.yml new file mode 100644 index 0000000..c02210a --- /dev/null +++ b/prometheus/alertmanager.yml @@ -0,0 +1,91 @@ +global: + slack_api_url_file: /etc/prometheus/slack_api_url.txt +receivers: + - name: slack-default + slack_configs: + - send_resolved: true + channel: '#amb-alerts' + title: 'Unrecognized alert occurred' + text: '{{ . }}' + - name: slack-stuck-message + slack_configs: + - send_resolved: true + channel: '#amb-alerts' + title: '{{ template "slack.stuck_message.title" . }}' + text: '{{ template "slack.stuck_message.text" . }}' + actions: + - type: button + text: 'Silence :no_bell:' + url: '{{ template "__alert_silence_link" . }}' + - name: slack-unknown-confirmation + slack_configs: + - send_resolved: true + channel: '#amb-alerts' + title: '{{ template "slack.unknown_confirmation.title" . }}' + text: '{{ template "slack.unknown_confirmation.text" . }}' + actions: + - type: button + text: 'Silence :no_bell:' + url: '{{ template "__alert_silence_link" . }}' + - name: slack-unknown-execution + slack_configs: + - send_resolved: true + channel: '#amb-alerts' + title: '{{ template "slack.unknown_execution.title" . }}' + text: '{{ template "slack.unknown_execution.text" . }}' + actions: + - type: button + text: 'Silence :no_bell:' + url: '{{ template "__alert_silence_link" . }}' + - name: slack-failed-execution + slack_configs: + - send_resolved: false + channel: '#amb-alerts' + title: '{{ template "slack.failed_execution.title" . }}' + text: '{{ template "slack.failed_execution.text" . }}' + actions: + - type: button + text: 'Silence :no_bell:' + url: '{{ template "__alert_silence_link" . }}' + - name: slack-dm + slack_configs: + - send_resolved: true + channel: '#amb-alerts' + title: 'Monitor application is down' + actions: + - type: button + text: 'Silence :no_bell:' + url: '{{ template "__alert_silence_link" . }}' +route: + receiver: slack-default + group_by: [ "..." ] + routes: + - receiver: slack-stuck-message + group_by: [ "bridge", "chain_id", "block_number", "tx_hash" ] + matchers: + - alertname = StuckMessage + - receiver: slack-unknown-confirmation + group_by: [ "..." ] + matchers: + - alertname = UnknownMessageConfirmation + - receiver: slack-unknown-execution + group_by: [ "..." ] + matchers: + - alertname = UnknownMessageExecution + - receiver: slack-failed-execution + group_by: [ "..." ] + repeat_interval: 10000d + matchers: + - alertname = FailedMessageExecution + - receiver: slack-dm + group_by: [ "..." ] + matchers: + - alertname = InstanceIsDown +inhibit_rules: + - target_matchers: + - alertname =~ .* + - source_matchers: + - alertname = InstanceIsDown + equal: [ "job", "instance" ] +templates: + - templates/*.tmpl diff --git a/prometheus/prometheus.yml b/prometheus/prometheus.yml new file mode 100644 index 0000000..d882b81 --- /dev/null +++ b/prometheus/prometheus.yml @@ -0,0 +1,16 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +alerting: + alertmanagers: + - static_configs: + - targets: ['alertmanager:9093'] + +rule_files: + - "/etc/prometheus/rules.yml" + +scrape_configs: + - job_name: "monitor" + static_configs: + - targets: ["monitor:2112"] \ No newline at end of file diff --git a/prometheus/rules.yml b/prometheus/rules.yml new file mode 100644 index 0000000..c41650d --- /dev/null +++ b/prometheus/rules.yml @@ -0,0 +1,28 @@ +groups: + - name: StuckMessage + rules: + - alert: StuckMessage + expr: alert_monitor_stuck_message_confirmation > 3600 + for: 3m + annotations: + wait_time: '{{ humanizeDuration $value }}' + - name: UnknownMessageConfirmation + rules: + - alert: UnknownMessageConfirmation + expr: alert_monitor_unknown_message_confirmation > 0 + for: 3m + - name: UnknownMessageExecution + rules: + - alert: UnknownMessageExecution + expr: alert_monitor_unknown_message_execution > 0 + for: 3m + - name: FailedMessageExecution + rules: + - alert: FailedMessageExecution + expr: alert_monitor_failed_message_execution > 0 + for: 3m + - name: InstanceIsDown + rules: + - alert: InstanceIsDown + expr: up < 1 + for: 1m diff --git a/prometheus/templates/explorer.tmpl b/prometheus/templates/explorer.tmpl new file mode 100644 index 0000000..631343d --- /dev/null +++ b/prometheus/templates/explorer.tmpl @@ -0,0 +1,19 @@ +{{ define "explorer.tx.link" -}} +{{- if eq .chain_id "1" -}} +https://etherscan.io/tx/{{ .tx_hash }} +{{- else if eq .chain_id "4" -}} +https://rinkeby.etherscan.io/tx/{{ .tx_hash }} +{{- else if eq .chain_id "42" -}} +https://kovan.etherscan.io/tx/{{ .tx_hash }} +{{- else if eq .chain_id "56" -}} +https://bscscan.com/tx/{{ .tx_hash }} +{{- else if eq .chain_id "77" -}} +https://blockscout.com/poa/sokol/tx/{{ .tx_hash }} +{{- else if eq .chain_id "99" -}} +https://blockscout.com/poa/core/tx/{{ .tx_hash }} +{{- else if eq .chain_id "100" -}} +https://blockscout.com/xdai/mainnet/tx/{{ .tx_hash }} +{{- else -}} +{{ .tx_hash }} +{{- end -}} +{{- end }} diff --git a/prometheus/templates/slack.tmpl b/prometheus/templates/slack.tmpl new file mode 100644 index 0000000..7e452b7 --- /dev/null +++ b/prometheus/templates/slack.tmpl @@ -0,0 +1,44 @@ +{{ define "slack.default.username" }}AMB monitor{{ end }} + +{{ define "slack.stuck_message.title" -}} +Stuck AMB message confirmation +{{- end }} +{{ define "slack.stuck_message.text" -}} +*Bridge:* {{ .CommonLabels.bridge }} +*Chain ID*: {{ .CommonLabels.chain_id }} +*Stuck for:* {{ .CommonAnnotations.wait_time }} +*Collected confirmations:* {{ .CommonLabels.count }} +*Tx:* {{ template "explorer.tx.link" .CommonLabels }} +{{- end }} + +{{ define "slack.unknown_confirmation.title" -}} +Validator signed for unknown AMB message +{{- end }} +{{ define "slack.unknown_confirmation.text" -}} +*Bridge:* {{ .CommonLabels.bridge }} +*Chain ID*: {{ .CommonLabels.chain_id }} +*Validator:* {{ .CommonLabels.signer }} +*Message hash:* {{ .CommonLabels.msg_hash }} +*Tx:* {{ template "explorer.tx.link" .CommonLabels }} +{{- end }} + +{{ define "slack.unknown_execution.title" -}} +Bridged executed unknown AMB message +{{- end }} +{{ define "slack.unknown_execution.text" -}} +*Bridge:* {{ .CommonLabels.bridge }} +*Chain ID*: {{ .CommonLabels.chain_id }} +*Message id:* {{ .CommonLabels.message_id }} +*Tx:* {{ template "explorer.tx.link" .CommonLabels }} +{{- end }} + +{{ define "slack.failed_execution.title" -}} +Failed AMB message execution +{{- end }} +{{ define "slack.failed_execution.text" -}} +*Bridge:* {{ .CommonLabels.bridge }} +*Chain ID*: {{ .CommonLabels.chain_id }} +*Sender:* {{ .CommonLabels.sender }} +*Executor:* {{ .CommonLabels.executor }} +*Tx:* {{ template "explorer.tx.link" .CommonLabels }} +{{- end }} diff --git a/prometheus/templates/utils.tmpl b/prometheus/templates/utils.tmpl new file mode 100644 index 0000000..4fef3a1 --- /dev/null +++ b/prometheus/templates/utils.tmpl @@ -0,0 +1,9 @@ +{{ define "__alert_silence_link" -}} + {{ .ExternalURL }}/#/silences/new?filter=%7B + {{- range .CommonLabels.SortedPairs -}} + {{- if ne .Name "alertname" -}} + {{- .Name }}%3D"{{- .Value -}}"%2C%20 + {{- end -}} + {{- end -}} + alertname%3D"{{- .CommonLabels.alertname -}}"%7D +{{- end }} \ No newline at end of file