Slack alerts

This commit is contained in:
Kirill Fedoseev 2021-09-24 21:38:25 +03:00
parent 9d25d06491
commit 7ca7a910d5
19 changed files with 937 additions and 79 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
.env
.idea/
.idea/
prometheus/slack_api_url.txt

View File

@ -1,12 +1,20 @@
chains:
mainnet:
rpc:
host: https://mainnet.infura.io/v3/5d7bd94c50ed43fab1cb8e74f58678b0
host: https://mainnet.infura.io/v3/${INFURA_PROJECT_KEY}
timeout: 30s
rps: 10
chain_id: 1
block_time: 15s
block_index_interval: 60s
bsc:
rpc:
host: https://bsc-dataseed1.defibit.io/
timeout: 20s
rps: 10
chain_id: 56
block_time: 3s
block_index_interval: 60s
kovan:
rpc:
host: https://kovan.poa.network
@ -17,12 +25,20 @@ chains:
block_index_interval: 60s
xdai:
rpc:
host: https://dai.poa.network
host: https://dai.poa.network/oe-only
timeout: 20s
rps: 10
chain_id: 100
block_time: 5s
block_index_interval: 30s
poa:
rpc:
host: https://core.poa.network
timeout: 20s
rps: 10
chain_id: 99
block_time: 5s
block_index_interval: 30s
sokol:
rpc:
host: https://sokol.poa.network
@ -31,6 +47,14 @@ chains:
chain_id: 77
block_time: 5s
block_index_interval: 30s
rinkeby:
rpc:
host: https://rinkeby.infura.io/v3/${INFURA_PROJECT_KEY}
timeout: 20s
rps: 10
chain_id: 4
block_time: 15s
block_index_interval: 60s
bridges:
xdai-amb:
home:
@ -45,6 +69,19 @@ bridges:
start_block: 9130277
required_block_confirmations: 12
max_block_range_size: 1000
alerts:
unknown_message_confirmation:
home_start_block: 7408640
foreign_start_block: 9130277
unknown_message_execution:
home_start_block: 7408640
foreign_start_block: 9130277
stuck_message_confirmation:
home_start_block: 7408640
foreign_start_block: 11907302
failed_message_execution:
home_start_block: 17518126
foreign_start_block: 12927769
test-amb:
home:
chain: sokol
@ -58,9 +95,121 @@ bridges:
start_block: 12372929
required_block_confirmations: 12
max_block_range_size: 10000
alerts:
stuck_message_confirmation:
home_start_block: 9849619
foreign_start_block: 25437689
unknown_message_confirmation:
home_start_block: 10666885
foreign_start_block: 12372929
unknown_message_execution:
home_start_block: 9849619
foreign_start_block: 13603170
bsc-xdai-amb:
home:
chain: xdai
address: 0x162E898bD0aacB578C8D5F8d6ca588c13d2A383F
start_block: 14496725
required_block_confirmations: 12
max_block_range_size: 10000
foreign:
chain: bsc
address: 0x05185872898b6f94AA600177EF41B9334B1FA48B
start_block: 4792263
required_block_confirmations: 12
max_block_range_size: 1000
alerts:
stuck_message_confirmation:
home_start_block: 14496725
foreign_start_block: 4792263
failed_message_execution:
home_start_block: 14496725
foreign_start_block: 4792263
unknown_message_confirmation:
home_start_block: 14496725
foreign_start_block: 4792263
unknown_message_execution:
home_start_block: 14496725
foreign_start_block: 4792263
rinkeby-xdai-amb:
home:
chain: xdai
address: 0xc38D4991c951fE8BCE1a12bEef2046eF36b0FA4A
start_block: 10030211
required_block_confirmations: 12
max_block_range_size: 20000
foreign:
chain: rinkeby
address: 0xD4075FB57fCf038bFc702c915Ef9592534bED5c1
start_block: 6529875
required_block_confirmations: 12
max_block_range_size: 20000
alerts:
stuck_message_confirmation:
home_start_block: 10030211
foreign_start_block: 8360609
unknown_message_confirmation:
home_start_block: 10030211
foreign_start_block: 6529875
unknown_message_execution:
home_start_block: 10030211
foreign_start_block: 6529875
poa-xdai-amb:
home:
chain: xdai
address: 0xc2d77d118326c33BBe36EbeAbf4F7ED6BC2dda5c
start_block: 17976497
required_block_confirmations: 12
max_block_range_size: 10000
foreign:
chain: poa
address: 0xB2218bdEbe8e90f80D04286772B0968ead666942
start_block: 23152190
required_block_confirmations: 12
max_block_range_size: 10000
alerts:
stuck_message_confirmation:
home_start_block: 17976497
foreign_start_block: 23152190
failed_message_execution:
home_start_block: 17976497
foreign_start_block: 23152190
unknown_message_confirmation:
home_start_block: 17976497
foreign_start_block: 23152190
unknown_message_execution:
home_start_block: 17976497
foreign_start_block: 23152190
eth-bsc-amb:
home:
chain: bsc
address: 0x6943A218d58135793F1FE619414eD476C37ad65a
start_block: 2756521
required_block_confirmations: 12
max_block_range_size: 1000
foreign:
chain: mainnet
address: 0x07955be2967B655Cf52751fCE7ccC8c61EA594e2
start_block: 11375619
required_block_confirmations: 12
max_block_range_size: 10000
alerts:
stuck_message_confirmation:
home_start_block: 2756521
foreign_start_block: 11375619
failed_message_execution:
home_start_block: 2756521
foreign_start_block: 11375619
unknown_message_confirmation:
home_start_block: 2756521
foreign_start_block: 11375619
unknown_message_execution:
home_start_block: 2756521
foreign_start_block: 11375619
postgres:
user: postgres
password: pass
host: localhost
host: postgres
port: 5432
database: db
log_level: info

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
)
@ -32,10 +33,16 @@ type BridgeSideConfig struct {
MaxBlockRangeSize uint `yaml:"max_block_range_size"`
}
type BridgeAlertConfig struct {
HomeStartBlock uint `yaml:"home_start_block"`
ForeignStartBlock uint `yaml:"foreign_start_block"`
}
type BridgeConfig struct {
ID string `yaml:"-"`
Home *BridgeSideConfig `yaml:"home"`
Foreign *BridgeSideConfig `yaml:"foreign"`
ID string `yaml:"-"`
Home *BridgeSideConfig `yaml:"home"`
Foreign *BridgeSideConfig `yaml:"foreign"`
Alerts map[string]*BridgeAlertConfig `yaml:"alerts"`
}
type DBConfig struct {
@ -47,9 +54,12 @@ type DBConfig struct {
}
type Config struct {
Chains map[string]*ChainConfig `yaml:"chains"`
Bridges map[string]*BridgeConfig `yaml:"bridges"`
DBConfig *DBConfig `yaml:"postgres"`
Chains map[string]*ChainConfig `yaml:"chains"`
Bridges map[string]*BridgeConfig `yaml:"bridges"`
DBConfig *DBConfig `yaml:"postgres"`
LogLevel logrus.Level `yaml:"log_level"`
DisabledBridges []string `yaml:"disabled_bridges"`
EnabledBridges []string `yaml:"enabled_bridges"`
}
func readYamlConfig(cfg *Config) error {

28
db/queries.sql Normal file
View File

@ -0,0 +1,28 @@
-- message stats
-- total
SELECT count(*)
FROM messages
WHERE bridge_id = '?'
AND direction = 'home_to_foreign';
SELECT count(*)
FROM messages
WHERE bridge_id = '?'
AND direction = 'foreign_to_home';
-- foreign to home pending
SELECT m.*
FROM messages m
LEFT JOIN executed_messages em ON m.id = em.message_id AND m.bridge_id = em.bridge_id
WHERE m.bridge_id = '?'
AND m.direction = 'foreign_to_home'
AND em.log_id IS NULL;
-- home to foreign oracle-driven pending
SELECT m.*
FROM messages m
LEFT JOIN executed_messages em ON m.id = em.message_id AND m.bridge_id = em.bridge_id
WHERE m.bridge_id = '?'
AND m.direction = 'home_to_foreign'
AND m.data_type = 0
AND em.log_id IS NULL;

View File

@ -8,7 +8,7 @@ services:
POSTGRES_DB: db
POSTGRES_PASSWORD: pass
ports:
- 5432:5432
- "5432:5432"
shm_size: 256mb
monitor:
build: .
@ -17,8 +17,29 @@ services:
volumes:
- ./config.yml:/app/config.yml
grafana:
image: grafana/grafana:latest
image: grafana/grafana:8.1.5
volumes:
- grafana-storage:/var/lib/grafana
ports:
- 3000:3000
- "3000:3000"
prometheus:
image: prom/prometheus:v2.30.0
volumes:
- ./prometheus:/etc/prometheus
- prom-storage:/prometheus
command: ["--config.file=/etc/prometheus/prometheus.yml", "--web.enable-lifecycle"]
ports:
- "9090:9090"
alertmanager:
image: prom/alertmanager:v0.23.0
command: ["--config.file=/etc/prometheus/alertmanager.yml", "--storage.path=/alertmanager", "--web.external-url=http://localhost:9093"]
volumes:
- ./prometheus:/etc/prometheus
- alertmanager-storage:/alertmanager
ports:
- "9093:9093"
volumes:
db:
db:
grafana-storage:
prom-storage:
alertmanager-storage:

View File

@ -3,7 +3,7 @@ package ethclient
import (
"context"
"errors"
"strconv"
"fmt"
"time"
"github.com/ethereum/go-ethereum/rpc"
@ -31,10 +31,7 @@ func ObserveError(url, query string, err error) {
if errors.Is(err, context.DeadlineExceeded) {
RequestResults.WithLabelValues(url, query, "timeout").Inc()
} else if err, ok := err.(rpc.Error); ok {
RequestResults.MustCurryWith(prometheus.Labels{
"code": strconv.Itoa(err.ErrorCode()),
"message": err.Error(),
}).WithLabelValues(url, query, "error").Inc()
RequestResults.WithLabelValues(url, query, fmt.Sprintf("error-%d-%s", err.ErrorCode(), err.Error())).Inc()
} else {
RequestResults.WithLabelValues(url, query, "error").Inc()
}

View File

@ -8,7 +8,7 @@ import (
type Logger logrus.FieldLogger
func New() Logger {
func New() *logrus.Logger {
logger := logrus.New()
logger.SetLevel(logrus.InfoLevel)
logger.SetFormatter(&logrus.TextFormatter{

13
main.go
View File

@ -21,6 +21,7 @@ func main() {
if err != nil {
logger.WithError(err).Fatal("can't read config")
}
logger.SetLevel(cfg.LogLevel)
dbConn, err := db.NewDB(cfg.DBConfig)
if err != nil {
@ -44,8 +45,18 @@ func main() {
monitors := make([]*monitor.Monitor, 0, len(cfg.Bridges))
ctx, cancel := context.WithCancel(context.Background())
for _, bridge := range cfg.DisabledBridges {
delete(cfg.Bridges, bridge)
}
if cfg.EnabledBridges != nil {
newBridgeCfg := make(map[string]*config.BridgeConfig, len(cfg.EnabledBridges))
for _, bridge := range cfg.EnabledBridges {
newBridgeCfg[bridge] = cfg.Bridges[bridge]
}
cfg.Bridges = newBridgeCfg
}
for _, bridgeCfg := range cfg.Bridges {
m, err2 := monitor.NewMonitor(ctx, logger.WithField("bridge_id", bridgeCfg.ID), repo, bridgeCfg)
m, err2 := monitor.NewMonitor(ctx, logger.WithField("bridge_id", bridgeCfg.ID), dbConn, repo, bridgeCfg)
if err2 != nil {
logger.WithError(err2).Fatal("can't initialize bridge monitor")
}

View File

@ -0,0 +1,85 @@
package alerts
import (
"amb-monitor/config"
"amb-monitor/db"
"amb-monitor/logging"
"context"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type AlertManager struct {
logger logging.Logger
jobs map[string]*Job
}
func NewAlertManager(logger logging.Logger, db *db.DB, cfg *config.BridgeConfig) (*AlertManager, error) {
provider := NewDBAlertsProvider(db)
jobs := make(map[string]*Job, len(cfg.Alerts))
for name, alertCfg := range cfg.Alerts {
switch name {
case "unknown_message_confirmation":
jobs[name] = &Job{
Interval: time.Minute,
Timeout: time.Second * 10,
Func: provider.FindUnknownConfirmations,
Labels: []string{"chain_id", "block_number", "tx_hash", "signer", "msg_hash"},
}
case "unknown_message_execution":
jobs[name] = &Job{
Interval: time.Minute,
Timeout: time.Second * 10,
Func: provider.FindUnknownExecutions,
Labels: []string{"chain_id", "block_number", "tx_hash", "message_id"},
}
case "stuck_message_confirmation":
jobs[name] = &Job{
Interval: time.Minute * 2,
Timeout: time.Second * 20,
Func: provider.FindStuckMessages,
Labels: []string{"chain_id", "block_number", "tx_hash", "msg_hash", "count"},
}
case "failed_message_execution":
jobs[name] = &Job{
Interval: time.Minute * 5,
Timeout: time.Second * 20,
Func: provider.FindFailedExecutions,
Labels: []string{"chain_id", "block_number", "tx_hash", "sender", "executor"},
}
default:
return nil, fmt.Errorf("unknown alert type %q", name)
}
jobs[name].Metric = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "alert",
Subsystem: "monitor",
Name: name,
ConstLabels: map[string]string{
"bridge": cfg.ID,
},
}, jobs[name].Labels)
jobs[name].Params = &AlertJobParams{
Bridge: cfg.ID,
HomeChainID: cfg.Home.Chain.ChainID,
HomeStartBlockNumber: alertCfg.HomeStartBlock,
ForeignChainID: cfg.Foreign.Chain.ChainID,
ForeignStartBlockNumber: alertCfg.ForeignStartBlock,
}
}
return &AlertManager{
logger: logger,
jobs: jobs,
}, nil
}
func (m *AlertManager) Start(ctx context.Context) {
for name, job := range m.jobs {
job.logger = m.logger.WithField("alert_job", name)
go job.Start(ctx)
}
}

View File

@ -0,0 +1,238 @@
package alerts
import (
"amb-monitor/db"
"context"
"fmt"
"strconv"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/ethereum/go-ethereum/common"
)
type DBAlertsProvider struct {
db *db.DB
}
func NewDBAlertsProvider(db *db.DB) *DBAlertsProvider {
return &DBAlertsProvider{
db: db,
}
}
type UnknownConfirmation struct {
ChainID string `db:"chain_id"`
BlockNumber uint64 `db:"block_number"`
TransactionHash common.Hash `db:"transaction_hash"`
Signer common.Address `db:"signer"`
MsgHash common.Hash `db:"msg_hash"`
}
func (c *UnknownConfirmation) AlertValues() AlertValues {
return AlertValues{
Labels: map[string]string{
"chain_id": c.ChainID,
"block_number": strconv.FormatUint(c.BlockNumber, 10),
"tx_hash": c.TransactionHash.String(),
"signer": c.Signer.String(),
"msg_hash": c.MsgHash.String(),
},
Value: 1,
}
}
func (p *DBAlertsProvider) FindUnknownConfirmations(ctx context.Context, params *AlertJobParams) ([]AlertValues, error) {
q, args, err := sq.Select("l.chain_id", "l.block_number", "l.transaction_hash", "sm.signer", "sm.msg_hash").
From("signed_messages sm").
Join("logs l ON l.id = sm.log_id").
LeftJoin("messages m ON sm.bridge_id = m.bridge_id AND m.msg_hash = sm.msg_hash").
Where(sq.Eq{"m.id": nil, "sm.bridge_id": params.Bridge, "l.chain_id": params.HomeChainID}).
Where(sq.GtOrEq{"l.block_number": params.HomeStartBlockNumber}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, fmt.Errorf("can't build query: %w", err)
}
res := make([]UnknownConfirmation, 0, 5)
err = p.db.SelectContext(ctx, &res, q, args...)
if err != nil {
return nil, fmt.Errorf("can't select alerts: %w", err)
}
alerts := make([]AlertValues, len(res))
for i := range res {
alerts[i] = res[i].AlertValues()
}
return alerts, nil
}
type UnknownExecution struct {
ChainID string `db:"chain_id"`
BlockNumber uint64 `db:"block_number"`
TransactionHash common.Hash `db:"transaction_hash"`
MessageID common.Hash `db:"message_id"`
}
func (c *UnknownExecution) AlertValues() AlertValues {
return AlertValues{
Labels: map[string]string{
"chain_id": c.ChainID,
"block_number": strconv.FormatUint(c.BlockNumber, 10),
"tx_hash": c.TransactionHash.String(),
"message_id": c.MessageID.String(),
},
Value: 1,
}
}
func (p *DBAlertsProvider) FindUnknownExecutions(ctx context.Context, params *AlertJobParams) ([]AlertValues, error) {
q, args, err := sq.Select("l.chain_id", "l.block_number", "l.transaction_hash", "em.message_id").
From("executed_messages em").
Join("logs l ON l.id = em.log_id").
LeftJoin("messages m ON em.bridge_id = m.bridge_id AND em.message_id = m.message_id").
Where(sq.Eq{"m.id": nil, "em.bridge_id": params.Bridge}).
Where(sq.Or{
sq.And{
sq.Eq{"l.chain_id": params.HomeChainID},
sq.GtOrEq{"l.block_number": params.HomeStartBlockNumber},
},
sq.And{
sq.Eq{"l.chain_id": params.ForeignChainID},
sq.GtOrEq{"l.block_number": params.ForeignStartBlockNumber},
},
}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, fmt.Errorf("can't build query: %w", err)
}
res := make([]UnknownExecution, 0, 5)
err = p.db.SelectContext(ctx, &res, q, args...)
if err != nil {
return nil, fmt.Errorf("can't select alerts: %w", err)
}
alerts := make([]AlertValues, len(res))
for i := range res {
alerts[i] = res[i].AlertValues()
}
return alerts, nil
}
type StuckMessage struct {
ChainID string `db:"chain_id"`
BlockNumber uint64 `db:"block_number"`
TransactionHash common.Hash `db:"transaction_hash"`
MsgHash common.Hash `db:"msg_hash"`
Count uint64 `db:"count"`
WaitTime time.Duration `db:"wait_time"`
}
func (c *StuckMessage) AlertValues() AlertValues {
return AlertValues{
Labels: map[string]string{
"chain_id": c.ChainID,
"block_number": strconv.FormatUint(c.BlockNumber, 10),
"tx_hash": c.TransactionHash.String(),
"msg_hash": c.MsgHash.String(),
"count": strconv.FormatUint(c.Count, 10),
},
Value: float64(c.WaitTime),
}
}
func (p *DBAlertsProvider) FindStuckMessages(ctx context.Context, params *AlertJobParams) ([]AlertValues, error) {
query := `
SELECT l.chain_id,
l.block_number,
l.transaction_hash,
sm.msg_hash,
count(s.log_id) as count,
EXTRACT(EPOCH FROM now() - ts.timestamp)::int as wait_time
FROM sent_messages sm
JOIN logs l on l.id = sm.log_id
JOIN block_timestamps ts on ts.chain_id = l.chain_id AND ts.block_number = l.block_number
JOIN messages m on sm.bridge_id = m.bridge_id AND m.msg_hash = sm.msg_hash AND data_type = 0
LEFT JOIN signed_messages s on s.bridge_id = m.bridge_id AND m.msg_hash = s.msg_hash
LEFT JOIN collected_messages cm on m.bridge_id = cm.bridge_id AND cm.msg_hash = m.msg_hash
WHERE m.direction::text='home_to_foreign' AND cm.log_id IS NULL AND sm.bridge_id = $1 AND l.block_number >= $2 GROUP BY sm.log_id, l.id, ts.timestamp
UNION
SELECT l.chain_id,
l.block_number,
l.transaction_hash,
sm.msg_hash,
count(s.log_id) as count,
EXTRACT(EPOCH FROM now() - ts.timestamp)::int as wait_time
FROM sent_messages sm
JOIN logs l on l.id = sm.log_id
JOIN block_timestamps ts on ts.chain_id = l.chain_id AND ts.block_number = l.block_number
JOIN messages m on sm.bridge_id = m.bridge_id AND m.msg_hash = sm.msg_hash AND data_type = 0
LEFT JOIN signed_messages s on s.bridge_id = m.bridge_id AND m.msg_hash = s.msg_hash
LEFT JOIN executed_messages em on m.bridge_id = em.bridge_id AND em.message_id = m.message_id
WHERE m.direction::text='foreign_to_home' AND em.log_id IS NULL AND sm.bridge_id = $1 AND l.block_number >= $3 GROUP BY sm.log_id, l.id, ts.timestamp`
res := make([]StuckMessage, 0, 5)
err := p.db.SelectContext(ctx, &res, query, params.Bridge, params.HomeStartBlockNumber, params.ForeignStartBlockNumber)
if err != nil {
return nil, fmt.Errorf("can't select alerts: %w", err)
}
alerts := make([]AlertValues, len(res))
for i := range res {
alerts[i] = res[i].AlertValues()
}
return alerts, nil
}
type FailedExecution struct {
ChainID string `db:"chain_id"`
BlockNumber uint64 `db:"block_number"`
TransactionHash common.Hash `db:"transaction_hash"`
Sender common.Address `db:"sender"`
Executor common.Address `db:"executor"`
}
func (c *FailedExecution) AlertValues() AlertValues {
return AlertValues{
Labels: map[string]string{
"chain_id": c.ChainID,
"block_number": strconv.FormatUint(c.BlockNumber, 10),
"tx_hash": c.TransactionHash.String(),
"sender": c.Sender.String(),
"executor": c.Executor.String(),
},
Value: 1,
}
}
func (p *DBAlertsProvider) FindFailedExecutions(ctx context.Context, params *AlertJobParams) ([]AlertValues, error) {
q, args, err := sq.Select("l.chain_id", "l.block_number", "l.transaction_hash", "m.sender", "m.executor").
From("sent_messages sm").
Join("messages m on sm.bridge_id = m.bridge_id AND m.msg_hash = sm.msg_hash").
Join("executed_messages em on m.bridge_id = em.bridge_id AND em.message_id = m.message_id").
Join("logs l ON l.id = em.log_id").
Join("block_timestamps bt on bt.chain_id = l.chain_id AND bt.block_number = l.block_number").
Where(sq.Eq{"em.status": false, "m.data_type": 0, "em.bridge_id": params.Bridge}).
Where(sq.Or{
sq.And{
sq.Eq{"l.chain_id": params.HomeChainID},
sq.GtOrEq{"l.block_number": params.HomeStartBlockNumber},
},
sq.And{
sq.Eq{"l.chain_id": params.ForeignChainID},
sq.GtOrEq{"l.block_number": params.ForeignStartBlockNumber},
},
}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, fmt.Errorf("can't build query: %w", err)
}
res := make([]FailedExecution, 0, 5)
err = p.db.SelectContext(ctx, &res, q, args...)
if err != nil {
return nil, fmt.Errorf("can't select alerts: %w", err)
}
alerts := make([]AlertValues, len(res))
for i := range res {
alerts[i] = res[i].AlertValues()
}
return alerts, nil
}

69
monitor/alerts/job.go Normal file
View File

@ -0,0 +1,69 @@
package alerts
import (
"amb-monitor/logging"
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
type AlertValues struct {
Labels map[string]string
Value float64
}
type AlertJobParams struct {
Bridge string
HomeChainID string
HomeStartBlockNumber uint
ForeignChainID string
ForeignStartBlockNumber uint
}
type Job struct {
logger logging.Logger
Metric *prometheus.GaugeVec
Interval time.Duration
Timeout time.Duration
Labels []string
Func func(ctx context.Context, params *AlertJobParams) ([]AlertValues, error)
Params *AlertJobParams
}
func (j *Job) Start(ctx context.Context) {
ticker := time.NewTicker(j.Interval)
for {
timeoutCtx, cancel := context.WithTimeout(ctx, j.Timeout)
start := time.Now()
values, err := j.Func(timeoutCtx, j.Params)
cancel()
if err != nil {
j.logger.WithError(err).Error("failed to process alert job")
} else {
// TODO scrape only active up-to-date bridges
j.Metric.Reset()
if len(values) > 0 {
j.logger.WithFields(logrus.Fields{
"count": len(values),
"duration": time.Since(start),
}).Warn("found some possible alerts")
for _, v := range values {
j.Metric.With(v.Labels).Set(v.Value)
}
} else {
j.logger.WithField("duration", time.Since(start)).Info("no alerts has been found")
}
}
select {
case <-ticker.C:
continue
case <-ctx.Done():
ticker.Stop()
return
}
}
}

View File

@ -2,6 +2,7 @@ package monitor
import (
"amb-monitor/entity"
"bytes"
"encoding/binary"
"github.com/ethereum/go-ethereum/common"
@ -9,17 +10,34 @@ import (
)
func unmarshalMessage(bridgeID string, direction entity.Direction, encodedData []byte) *entity.Message {
return &entity.Message{
BridgeID: bridgeID,
Direction: direction,
MsgHash: crypto.Keccak256Hash(encodedData),
MessageID: common.BytesToHash(encodedData[:32]),
Sender: common.BytesToAddress(encodedData[32:52]),
Executor: common.BytesToAddress(encodedData[52:72]),
GasLimit: uint(binary.BigEndian.Uint32(encodedData[72:76])),
DataType: uint(encodedData[78]),
Data: encodedData[79+encodedData[76]+encodedData[77]:],
messageID := common.BytesToHash(encodedData[:32])
if bytes.Equal(messageID[0:4], []byte{0, 4, 0, 0}) {
return &entity.Message{
BridgeID: bridgeID,
Direction: direction,
MsgHash: crypto.Keccak256Hash(encodedData),
MessageID: messageID,
Sender: common.BytesToAddress(encodedData[64:84]),
Executor: common.BytesToAddress(encodedData[84:104]),
GasLimit: uint(binary.BigEndian.Uint32(encodedData[104:108])),
DataType: uint(encodedData[108]),
Data: encodedData[108:],
}
}
if bytes.Equal(messageID[0:4], []byte{0, 5, 0, 0}) {
return &entity.Message{
BridgeID: bridgeID,
Direction: direction,
MsgHash: crypto.Keccak256Hash(encodedData),
MessageID: messageID,
Sender: common.BytesToAddress(encodedData[32:52]),
Executor: common.BytesToAddress(encodedData[52:72]),
GasLimit: uint(binary.BigEndian.Uint32(encodedData[72:76])),
DataType: uint(encodedData[78]),
Data: encodedData[79+encodedData[76]+encodedData[77]:],
}
}
panic("unsupported message version prefix")
}
func unmarshalLegacyMessage(bridgeID string, direction entity.Direction, encodedData []byte) *entity.Message {

View File

@ -4,9 +4,11 @@ import (
"amb-monitor/config"
"amb-monitor/contract"
"amb-monitor/contract/constants"
"amb-monitor/db"
"amb-monitor/entity"
"amb-monitor/ethclient"
"amb-monitor/logging"
"amb-monitor/monitor/alerts"
"amb-monitor/repository"
"context"
"database/sql"
@ -42,6 +44,7 @@ type Monitor struct {
repo *repository.Repo
homeMonitor *ContractMonitor
foreignMonitor *ContractMonitor
alertManager *alerts.AlertManager
}
func newContractMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, cfg *config.BridgeSideConfig) (*ContractMonitor, error) {
@ -80,7 +83,7 @@ func newContractMonitor(ctx context.Context, logger logging.Logger, repo *reposi
}, nil
}
func NewMonitor(ctx context.Context, logger logging.Logger, repo *repository.Repo, cfg *config.BridgeConfig) (*Monitor, error) {
func NewMonitor(ctx context.Context, logger logging.Logger, dbConn *db.DB, repo *repository.Repo, cfg *config.BridgeConfig) (*Monitor, error) {
homeMonitor, err := newContractMonitor(ctx, logger.WithField("contract", "home"), repo, cfg.Home)
if err != nil {
return nil, fmt.Errorf("failed to initialize home side monitor: %w", err)
@ -89,6 +92,10 @@ func NewMonitor(ctx context.Context, logger logging.Logger, repo *repository.Rep
if err != nil {
return nil, fmt.Errorf("failed to initialize foreign side monitor: %w", err)
}
alertManager, err := alerts.NewAlertManager(logger, dbConn, cfg)
if err != nil {
return nil, fmt.Errorf("failed to initialize alert manager: %w", err)
}
handlers := NewBridgeEventHandler(repo, cfg.ID)
homeMonitor.eventHandlers["UserRequestForSignature"] = handlers.HandleUserRequestForSignature
homeMonitor.eventHandlers["UserRequestForSignature0"] = handlers.HandleLegacyUserRequestForSignature
@ -107,6 +114,7 @@ func NewMonitor(ctx context.Context, logger logging.Logger, repo *repository.Rep
repo: repo,
homeMonitor: homeMonitor,
foreignMonitor: foreignMonitor,
alertManager: alertManager,
}, nil
}
@ -114,6 +122,7 @@ func (m *Monitor) Start(ctx context.Context) {
m.logger.Info("starting bridge monitor")
go m.homeMonitor.Start(ctx)
go m.foreignMonitor.Start(ctx)
go m.alertManager.Start(ctx)
}
func (m *ContractMonitor) Start(ctx context.Context) {
@ -143,7 +152,7 @@ func (m *ContractMonitor) LoadUnprocessedLogs(ctx context.Context, fromBlock, to
break
}
m.submitLogs(logs)
m.submitLogs(logs, toBlock)
}
func (m *ContractMonitor) StartBlockFetcher(ctx context.Context, start uint) {
@ -201,13 +210,14 @@ func (m *ContractMonitor) StartLogsFetcher(ctx context.Context) {
case blocksRange := <-m.blocksRangeChan:
for {
err := m.tryToFetchLogs(ctx, blocksRange)
if err == nil {
break
if err != nil {
m.logger.WithError(err).WithFields(logrus.Fields{
"from_block": blocksRange.From,
"to_block": blocksRange.To,
}).Error("failed logs fetching, retrying")
continue
}
m.logger.WithError(err).WithFields(logrus.Fields{
"from_block": blocksRange.From,
"to_block": blocksRange.To,
}).Error("failed logs fetching, retrying")
break
}
}
}
@ -227,42 +237,40 @@ func (m *ContractMonitor) tryToFetchLogs(ctx context.Context, blocksRange *Block
"from_block": blocksRange.From,
"to_block": blocksRange.To,
}).Info("fetched logs in range")
if len(logs) == 0 {
return nil
}
sort.Slice(logs, func(i, j int) bool {
a, b := &logs[i], &logs[j]
return a.BlockNumber < b.BlockNumber || (a.BlockNumber == b.BlockNumber && a.Index < b.Index)
})
entities := make([]*entity.Log, len(logs))
for i, log := range logs {
entities[i] = m.logToEntity(log)
}
err = m.repo.Logs.Ensure(ctx, entities...)
if err != nil {
return err
}
if len(logs) > 0 {
sort.Slice(logs, func(i, j int) bool {
a, b := &logs[i], &logs[j]
return a.BlockNumber < b.BlockNumber || (a.BlockNumber == b.BlockNumber && a.Index < b.Index)
})
for i, log := range logs {
entities[i] = m.logToEntity(log)
}
err = m.repo.Logs.Ensure(ctx, entities...)
if err != nil {
return err
}
indexes := make([]uint, len(entities))
for i, x := range entities {
indexes[i] = x.ID
indexes := make([]uint, len(entities))
for i, x := range entities {
indexes[i] = x.ID
}
m.logger.WithFields(logrus.Fields{
"count": len(logs),
"from_block": blocksRange.From,
"to_block": blocksRange.To,
}).Info("saved logs")
}
m.logger.WithFields(logrus.Fields{
"count": len(logs),
"from_block": blocksRange.From,
"to_block": blocksRange.To,
}).Info("saved logs")
m.logsCursor.LastFetchedBlock = entities[len(logs)-1].BlockNumber
m.logsCursor.LastFetchedBlock = blocksRange.To
if err = m.repo.LogsCursors.Ensure(ctx, m.logsCursor); err != nil {
return err
}
m.submitLogs(entities)
m.submitLogs(entities, blocksRange.To)
return nil
}
func (m *ContractMonitor) submitLogs(logs []*entity.Log) {
func (m *ContractMonitor) submitLogs(logs []*entity.Log, endBlock uint) {
jobs, lastBlock := 0, uint(0)
for _, log := range logs {
if log.BlockNumber > lastBlock {
@ -290,6 +298,12 @@ func (m *ContractMonitor) submitLogs(logs []*entity.Log) {
batchStartIndex = i
}
}
if lastBlock < endBlock {
m.logsChan <- &LogsBatch{
BlockNumber: endBlock,
Logs: nil,
}
}
}
func (m *ContractMonitor) logToEntity(log types.Log) *entity.Log {
@ -326,29 +340,31 @@ func (m *ContractMonitor) StartLogsProcessor(ctx context.Context) {
wg := new(sync.WaitGroup)
wg.Add(2)
go func() {
defer wg.Done()
for {
err := m.tryToGetBlockTimestamp(ctx, logs.BlockNumber)
if err == nil {
wg.Done()
return
if err != nil {
m.logger.WithError(err).WithFields(logrus.Fields{
"block_number": logs.BlockNumber,
}).Error("failed to get block timestamp, retrying")
continue
}
m.logger.WithError(err).WithFields(logrus.Fields{
"block_number": logs.BlockNumber,
}).Error("failed to get block timestamp, retrying")
return
}
}()
go func() {
defer wg.Done()
for {
err := m.tryToProcessLogsBatch(ctx, logs)
if err == nil {
wg.Done()
return
if err != nil {
m.logger.WithError(err).WithFields(logrus.Fields{
"block_number": logs.BlockNumber,
"count": len(logs.Logs),
}).Error("failed to process logs batch, retrying")
continue
}
m.logger.WithError(err).WithFields(logrus.Fields{
"block_number": logs.BlockNumber,
"count": len(logs.Logs),
}).Error("failed to process logs batch, retrying")
return
}
}()
wg.Wait()
@ -410,6 +426,14 @@ func (m *ContractMonitor) tryToProcessLogsBatch(ctx context.Context, logs *LogsB
}
}
m.logsCursor.LastProcessedBlock = logs.BlockNumber
return m.recordProcessedBlockNumber(ctx, logs.BlockNumber)
}
func (m *ContractMonitor) recordProcessedBlockNumber(ctx context.Context, blockNumber uint) error {
if blockNumber < m.logsCursor.LastProcessedBlock {
return nil
}
m.logsCursor.LastProcessedBlock = blockNumber
return m.repo.LogsCursors.Ensure(ctx, m.logsCursor)
}

View File

@ -0,0 +1,91 @@
global:
slack_api_url_file: /etc/prometheus/slack_api_url.txt
receivers:
- name: slack-default
slack_configs:
- send_resolved: true
channel: '#amb-alerts'
title: 'Unrecognized alert occurred'
text: '{{ . }}'
- name: slack-stuck-message
slack_configs:
- send_resolved: true
channel: '#amb-alerts'
title: '{{ template "slack.stuck_message.title" . }}'
text: '{{ template "slack.stuck_message.text" . }}'
actions:
- type: button
text: 'Silence :no_bell:'
url: '{{ template "__alert_silence_link" . }}'
- name: slack-unknown-confirmation
slack_configs:
- send_resolved: true
channel: '#amb-alerts'
title: '{{ template "slack.unknown_confirmation.title" . }}'
text: '{{ template "slack.unknown_confirmation.text" . }}'
actions:
- type: button
text: 'Silence :no_bell:'
url: '{{ template "__alert_silence_link" . }}'
- name: slack-unknown-execution
slack_configs:
- send_resolved: true
channel: '#amb-alerts'
title: '{{ template "slack.unknown_execution.title" . }}'
text: '{{ template "slack.unknown_execution.text" . }}'
actions:
- type: button
text: 'Silence :no_bell:'
url: '{{ template "__alert_silence_link" . }}'
- name: slack-failed-execution
slack_configs:
- send_resolved: false
channel: '#amb-alerts'
title: '{{ template "slack.failed_execution.title" . }}'
text: '{{ template "slack.failed_execution.text" . }}'
actions:
- type: button
text: 'Silence :no_bell:'
url: '{{ template "__alert_silence_link" . }}'
- name: slack-dm
slack_configs:
- send_resolved: true
channel: '#amb-alerts'
title: 'Monitor application is down'
actions:
- type: button
text: 'Silence :no_bell:'
url: '{{ template "__alert_silence_link" . }}'
route:
receiver: slack-default
group_by: [ "..." ]
routes:
- receiver: slack-stuck-message
group_by: [ "bridge", "chain_id", "block_number", "tx_hash" ]
matchers:
- alertname = StuckMessage
- receiver: slack-unknown-confirmation
group_by: [ "..." ]
matchers:
- alertname = UnknownMessageConfirmation
- receiver: slack-unknown-execution
group_by: [ "..." ]
matchers:
- alertname = UnknownMessageExecution
- receiver: slack-failed-execution
group_by: [ "..." ]
repeat_interval: 10000d
matchers:
- alertname = FailedMessageExecution
- receiver: slack-dm
group_by: [ "..." ]
matchers:
- alertname = InstanceIsDown
inhibit_rules:
- target_matchers:
- alertname =~ .*
- source_matchers:
- alertname = InstanceIsDown
equal: [ "job", "instance" ]
templates:
- templates/*.tmpl

16
prometheus/prometheus.yml Normal file
View File

@ -0,0 +1,16 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
rule_files:
- "/etc/prometheus/rules.yml"
scrape_configs:
- job_name: "monitor"
static_configs:
- targets: ["monitor:2112"]

28
prometheus/rules.yml Normal file
View File

@ -0,0 +1,28 @@
groups:
- name: StuckMessage
rules:
- alert: StuckMessage
expr: alert_monitor_stuck_message_confirmation > 3600
for: 3m
annotations:
wait_time: '{{ humanizeDuration $value }}'
- name: UnknownMessageConfirmation
rules:
- alert: UnknownMessageConfirmation
expr: alert_monitor_unknown_message_confirmation > 0
for: 3m
- name: UnknownMessageExecution
rules:
- alert: UnknownMessageExecution
expr: alert_monitor_unknown_message_execution > 0
for: 3m
- name: FailedMessageExecution
rules:
- alert: FailedMessageExecution
expr: alert_monitor_failed_message_execution > 0
for: 3m
- name: InstanceIsDown
rules:
- alert: InstanceIsDown
expr: up < 1
for: 1m

View File

@ -0,0 +1,19 @@
{{ define "explorer.tx.link" -}}
{{- if eq .chain_id "1" -}}
https://etherscan.io/tx/{{ .tx_hash }}
{{- else if eq .chain_id "4" -}}
https://rinkeby.etherscan.io/tx/{{ .tx_hash }}
{{- else if eq .chain_id "42" -}}
https://kovan.etherscan.io/tx/{{ .tx_hash }}
{{- else if eq .chain_id "56" -}}
https://bscscan.com/tx/{{ .tx_hash }}
{{- else if eq .chain_id "77" -}}
https://blockscout.com/poa/sokol/tx/{{ .tx_hash }}
{{- else if eq .chain_id "99" -}}
https://blockscout.com/poa/core/tx/{{ .tx_hash }}
{{- else if eq .chain_id "100" -}}
https://blockscout.com/xdai/mainnet/tx/{{ .tx_hash }}
{{- else -}}
{{ .tx_hash }}
{{- end -}}
{{- end }}

View File

@ -0,0 +1,44 @@
{{ define "slack.default.username" }}AMB monitor{{ end }}
{{ define "slack.stuck_message.title" -}}
Stuck AMB message confirmation
{{- end }}
{{ define "slack.stuck_message.text" -}}
*Bridge:* {{ .CommonLabels.bridge }}
*Chain ID*: {{ .CommonLabels.chain_id }}
*Stuck for:* {{ .CommonAnnotations.wait_time }}
*Collected confirmations:* {{ .CommonLabels.count }}
*Tx:* {{ template "explorer.tx.link" .CommonLabels }}
{{- end }}
{{ define "slack.unknown_confirmation.title" -}}
Validator signed for unknown AMB message
{{- end }}
{{ define "slack.unknown_confirmation.text" -}}
*Bridge:* {{ .CommonLabels.bridge }}
*Chain ID*: {{ .CommonLabels.chain_id }}
*Validator:* {{ .CommonLabels.signer }}
*Message hash:* {{ .CommonLabels.msg_hash }}
*Tx:* {{ template "explorer.tx.link" .CommonLabels }}
{{- end }}
{{ define "slack.unknown_execution.title" -}}
Bridged executed unknown AMB message
{{- end }}
{{ define "slack.unknown_execution.text" -}}
*Bridge:* {{ .CommonLabels.bridge }}
*Chain ID*: {{ .CommonLabels.chain_id }}
*Message id:* {{ .CommonLabels.message_id }}
*Tx:* {{ template "explorer.tx.link" .CommonLabels }}
{{- end }}
{{ define "slack.failed_execution.title" -}}
Failed AMB message execution
{{- end }}
{{ define "slack.failed_execution.text" -}}
*Bridge:* {{ .CommonLabels.bridge }}
*Chain ID*: {{ .CommonLabels.chain_id }}
*Sender:* {{ .CommonLabels.sender }}
*Executor:* {{ .CommonLabels.executor }}
*Tx:* {{ template "explorer.tx.link" .CommonLabels }}
{{- end }}

View File

@ -0,0 +1,9 @@
{{ define "__alert_silence_link" -}}
{{ .ExternalURL }}/#/silences/new?filter=%7B
{{- range .CommonLabels.SortedPairs -}}
{{- if ne .Name "alertname" -}}
{{- .Name }}%3D"{{- .Value -}}"%2C%20
{{- end -}}
{{- end -}}
alertname%3D"{{- .CommonLabels.alertname -}}"%7D
{{- end }}