Refactor code, add address filter for alerting, allow graceful stopping, filter governance alerts, update proposals on status change

This commit is contained in:
Hendrik Hofstadt 2018-09-26 13:44:27 +02:00
parent 2273d265d2
commit 647fddd188
5 changed files with 182 additions and 79 deletions

View File

@ -38,4 +38,5 @@ Environment:
"DB_USER" = "postgres" # DB username "DB_USER" = "postgres" # DB username
"DB_PW"= "mypwd" # DB password "DB_PW"= "mypwd" # DB password
"RAVEN_DSN" = "http://xxxxxxx" # DSN_URL from Sentry (hosted or self-hosted) "RAVEN_DSN" = "http://xxxxxxx" # DSN_URL from Sentry (hosted or self-hosted)
"ADDRESS" = "ABCDDED" # Address of the validator to alert on
``` ```

View File

@ -1,11 +1,13 @@
package main package main
import ( import (
"fmt"
"github.com/certusone/chain_exporter/types" "github.com/certusone/chain_exporter/types"
"github.com/getsentry/raven-go" "github.com/getsentry/raven-go"
"github.com/go-pg/pg" "github.com/go-pg/pg"
"github.com/pkg/errors" "github.com/pkg/errors"
"os" "os"
"os/signal"
"strconv" "strconv"
"time" "time"
) )
@ -13,6 +15,7 @@ import (
type ( type (
Monitor struct { Monitor struct {
db *pg.DB db *pg.DB
address string
} }
) )
@ -29,9 +32,14 @@ func main() {
if os.Getenv("RAVEN_DSN") == "" { if os.Getenv("RAVEN_DSN") == "" {
panic(errors.New("RAVEN_DSN needs to be set")) panic(errors.New("RAVEN_DSN needs to be set"))
} }
if os.Getenv("ADDRESS") == "" {
panic(errors.New("ADDRESS needs to be set"))
}
// Set Raven URL for alerts
raven.SetDSN(os.Getenv("RAVEN_DSN")) raven.SetDSN(os.Getenv("RAVEN_DSN"))
// Connect to the postgres datastore
db := pg.Connect(&pg.Options{ db := pg.Connect(&pg.Options{
Addr: os.Getenv("DB_HOST"), Addr: os.Getenv("DB_HOST"),
User: os.Getenv("DB_USER"), User: os.Getenv("DB_USER"),
@ -39,57 +47,96 @@ func main() {
}) })
defer db.Close() defer db.Close()
monitor := &Monitor{db} // Start the monitor
monitor := &Monitor{db, os.Getenv("ADDRESS")}
go func() {
for {
select {
// Check for alert conditions every second
case <-time.Tick(time.Second):
fmt.Println("start - alerting on misses")
err := monitor.AlertMisses()
if err != nil {
fmt.Printf("error - alerting on misses: %v\n", err)
}
fmt.Println("finish - alerting on misses")
}
}
}()
go func() {
for { for {
select { select {
case <-time.Tick(time.Second): case <-time.Tick(time.Second):
err := monitor.sync() fmt.Println("start - alerting on governance")
err := monitor.AlertGovernance()
if err != nil { if err != nil {
panic(err) fmt.Printf("error - alerting on governance: %v\n", err)
} }
fmt.Println("finish - alerting on governance")
} }
} }
}()
// Allow graceful closing of the process
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
<-signalCh
} }
func (m *Monitor) sync() error { // AlertMisses queries misses from the database and sends the relevant alert to sentry
// Alert on block misses func (m *Monitor) AlertMisses() error {
// Query block misses from the DB
var misses []*types.MissInfo var misses []*types.MissInfo
err := m.db.Model(&types.MissInfo{}).Where("alerted = FALSE").Select(&misses) err := m.db.Model(&types.MissInfo{}).Where("alerted = FALSE and address = ?", m.address).Select(&misses)
if err != nil { if err != nil {
return err return err
} }
// Iterate misses and send alerts
for _, miss := range misses { for _, miss := range misses {
raven.CaptureError(errors.New("Missed block"), map[string]string{"height": strconv.FormatInt(miss.Height, 10), "time": miss.Time.String(), "address": miss.Address}) raven.CaptureMessage("Missed block", map[string]string{"height": strconv.FormatInt(miss.Height, 10), "time": miss.Time.String(), "address": miss.Address})
// Mark miss as alerted in the db
miss.Alerted = true miss.Alerted = true
_, err = m.db.Model(miss).Where("id = ?", miss.ID).Update() _, err = m.db.Model(miss).Where("id = ?", miss.ID).Update()
if err != nil { if err != nil {
return err return err
} }
}
// Alert on proposals fmt.Printf("alerted on miss #height: %d\n", miss.Height)
var proposals []*types.Proposal }
err = m.db.Model(&types.Proposal{}).Where("alerted = FALSE").Select(&proposals)
if err != nil { return nil
return err }
}
for _, proposal := range proposals { // AlertGovernance queries active governance proposals from the database and sends the relevant alert to sentry
if proposal.ProposalStatus == "Passed" || proposal.ProposalStatus == "Rejected" { func (m *Monitor) AlertGovernance() error {
proposal.Alerted = true // Query proposals from the DB
_, err = m.db.Model(proposal).Where("id = ?", proposal.ID).Update() var proposals []*types.Proposal
if err != nil { err := m.db.Model(&types.Proposal{}).
return err Where("alerted = FALSE and proposal_status = ?", "Active").
} Select(&proposals)
continue if err != nil {
} return err
}
raven.CaptureMessage("New governance proposal: "+proposal.Title+"\nDescription: "+proposal.Description+"\nStartHeight: "+proposal.VotingStartBlock, map[string]string{"height": strconv.FormatInt(proposal.Height, 10), "type": proposal.Type})
// Send alerts for every proposal
proposal.Alerted = true for _, proposal := range proposals {
_, err = m.db.Model(proposal).Where("id = ?", proposal.ID).Update() raven.CaptureMessage(fmt.Sprintf("New governance proposal: %s\nDescription: %s\nStartHeight: %s", proposal.Title, proposal.Description, proposal.VotingStartBlock),
if err != nil { map[string]string{
return err "height": strconv.FormatInt(proposal.Height, 10),
} "type": proposal.Type,
})
// Mark proposal as alerted in the db
proposal.Alerted = true
_, err = m.db.Model(proposal).Where("id = ?", proposal.ID).Update()
if err != nil {
return err
}
fmt.Printf("alerted on proposal #%s\n", proposal.ID)
} }
return nil return nil

73
main.go
View File

@ -11,6 +11,7 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"gopkg.in/resty.v1" "gopkg.in/resty.v1"
"os" "os"
"os/signal"
"time" "time"
) )
@ -47,33 +48,50 @@ func main() {
}) })
defer db.Close() defer db.Close()
err := createSchema(db) // Setup the database and ignore errors if the schema already exists
if err != nil { CreateSchema(db)
//panic(err)
} // Configure resty
resty.SetTimeout(5 * time.Second)
// Setup the monitor
monitor := &Monitor{tClient, db} monitor := &Monitor{tClient, db}
// Start the syncing task
go func() { go func() {
for { for {
err = monitor.sync() fmt.Println("start - sync blockchain")
err := monitor.Sync()
if err != nil { if err != nil {
fmt.Printf("error syncing: %v\n", err) fmt.Printf("error - sync blockchain: %v\n", err)
} }
fmt.Println("finish - sync blockchain")
time.Sleep(time.Second) time.Sleep(time.Second)
} }
}() }()
// Allow graceful closing of the governance loop
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
for { for {
select { select {
case <-time.Tick(10 * time.Second): case <-time.Tick(10 * time.Second):
err := monitor.getGovernance() fmt.Println("start - sync governance proposals")
err := monitor.GetGovernance()
if err != nil { if err != nil {
fmt.Printf("error parsing governance: %v\n", err) fmt.Printf("error - sync governance proposals: %v\n", err)
continue
} }
fmt.Println("finish - sync governance proposals")
case <-signalCh:
return
} }
} }
} }
func createSchema(db *pg.DB) error { // CreateSchema sets up the database using the ORM
func CreateSchema(db *pg.DB) error {
for _, model := range []interface{}{(*ctypes.BlockInfo)(nil), (*ctypes.EvidenceInfo)(nil), (*ctypes.MissInfo)(nil), (*ctypes.Proposal)(nil)} { for _, model := range []interface{}{(*ctypes.BlockInfo)(nil), (*ctypes.EvidenceInfo)(nil), (*ctypes.MissInfo)(nil), (*ctypes.Proposal)(nil)} {
err := db.CreateTable(model, &orm.CreateTableOptions{}) err := db.CreateTable(model, &orm.CreateTableOptions{})
if err != nil { if err != nil {
@ -83,7 +101,10 @@ func createSchema(db *pg.DB) error {
return nil return nil
} }
func (m *Monitor) sync() error { // Sync syncs the blockchain and missed blocks from a node
func (m *Monitor) Sync() error {
// Check current height in db
var blocks []ctypes.BlockInfo var blocks []ctypes.BlockInfo
err := m.db.Model(&blocks).Order("height DESC").Limit(1).Select() err := m.db.Model(&blocks).Order("height DESC").Limit(1).Select()
if err != nil { if err != nil {
@ -94,14 +115,16 @@ func (m *Monitor) sync() error {
bestHeight = blocks[0].Height bestHeight = blocks[0].Height
} }
// Query the node for its height
status, err := m.client.Status() status, err := m.client.Status()
if err != nil { if err != nil {
return err return err
} }
maxHeight := status.SyncInfo.LatestBlockHeight maxHeight := status.SyncInfo.LatestBlockHeight
// Ingest all blocks up to the best height
for i := bestHeight + 1; i <= maxHeight; i++ { for i := bestHeight + 1; i <= maxHeight; i++ {
err = m.ingestBlock(i) err = m.IngestPrevBlock(i)
if err != nil { if err != nil {
return err return err
} }
@ -110,35 +133,39 @@ func (m *Monitor) sync() error {
return nil return nil
} }
func (m *Monitor) ingestBlock(height int64) error { // IngestPrevBlock queries the block at the given height-1 from the node and ingests its metadata (blockinfo,evidence)
// into the database. It also queries the next block to access the commits and stores the missed signatures.
func (m *Monitor) IngestPrevBlock(height int64) error {
prevHeight := height - 1 prevHeight := height - 1
// Get Data // Get validator set for the block
validators, err := m.client.Validators(&prevHeight) validators, err := m.client.Validators(&prevHeight)
if err != nil { if err != nil {
return err return err
} }
// Query the previous block
block, err := m.client.Block(&prevHeight) block, err := m.client.Block(&prevHeight)
if err != nil { if err != nil {
return err return err
} }
// Query the next block to access the commits
nextBlock, err := m.client.Block(&height) nextBlock, err := m.client.Block(&height)
if err != nil { if err != nil {
return err return err
} }
// Parse blockinfo
blockInfo := new(ctypes.BlockInfo) blockInfo := new(ctypes.BlockInfo)
blockInfo.ID = nextBlock.BlockMeta.Header.LastBlockID.String() blockInfo.ID = block.BlockMeta.BlockID.String()
blockInfo.Height = height blockInfo.Height = height
blockInfo.Time = nextBlock.BlockMeta.Header.Time blockInfo.Time = block.BlockMeta.Header.Time
blockInfo.Proposer = block.Block.ProposerAddress.String() blockInfo.Proposer = block.Block.ProposerAddress.String()
// Identify missed validators // Identify missed validators
missedValidators := make([]*ctypes.MissInfo, 0) missedValidators := make([]*ctypes.MissInfo, 0)
// Parse
for i, validator := range validators.Validators { for i, validator := range validators.Validators {
if nextBlock.Block.LastCommit.Precommits[i] == nil { if nextBlock.Block.LastCommit.Precommits[i] == nil {
missed := &ctypes.MissInfo{ missed := &ctypes.MissInfo{
@ -164,16 +191,21 @@ func (m *Monitor) ingestBlock(height int64) error {
// Insert in DB // Insert in DB
err = m.db.RunInTransaction(func(tx *pg.Tx) error { err = m.db.RunInTransaction(func(tx *pg.Tx) error {
// Insert blockinfo
err = tx.Insert(blockInfo) err = tx.Insert(blockInfo)
if err != nil { if err != nil {
return err return err
} }
// Insert evidence
if len(evidenceInfo) > 0 { if len(evidenceInfo) > 0 {
err = tx.Insert(&evidenceInfo) err = tx.Insert(&evidenceInfo)
if err != nil { if err != nil {
return err return err
} }
} }
// Insert missed signatures
if len(missedValidators) > 0 { if len(missedValidators) > 0 {
err = tx.Insert(&missedValidators) err = tx.Insert(&missedValidators)
if err != nil { if err != nil {
@ -189,18 +221,22 @@ func (m *Monitor) ingestBlock(height int64) error {
return nil return nil
} }
func (m *Monitor) getGovernance() error { // GetGovernance queries the governance proposals from the lcd and stores them in the db
func (m *Monitor) GetGovernance() error {
// Query lcd
resp, err := resty.R().Get(os.Getenv("LCD_URL") + "/gov/proposals") resp, err := resty.R().Get(os.Getenv("LCD_URL") + "/gov/proposals")
if err != nil { if err != nil {
return err return err
} }
// Parse proposals
var proposals []*ctypes.Proposal var proposals []*ctypes.Proposal
err = json.Unmarshal(resp.Body(), &proposals) err = json.Unmarshal(resp.Body(), &proposals)
if err != nil { if err != nil {
return err return err
} }
// Copy proposal data into the database model
for _, proposal := range proposals { for _, proposal := range proposals {
proposal.ID = proposal.Details.ProposalID proposal.ID = proposal.Details.ProposalID
proposal.Height = proposal.Details.SubmitBlock proposal.Height = proposal.Details.SubmitBlock
@ -212,6 +248,7 @@ func (m *Monitor) getGovernance() error {
proposal.VotingStartBlock = proposal.Details.VotingStartBlock proposal.VotingStartBlock = proposal.Details.VotingStartBlock
} }
_, err = m.db.Model(&proposals).OnConflict("DO NOTHING").Insert() // Store proposals in the db
_, err = m.db.Model(&proposals).OnConflict("(id) DO UPDATE").Set("proposal_status = EXCLUDED.proposal_status").Insert()
return err return err
} }

View File

@ -2,14 +2,14 @@ package main
import ( import (
"fmt" "fmt"
"github.com/certusone/chain_exporter/types"
"github.com/go-pg/pg" "github.com/go-pg/pg"
"github.com/go-pg/pg/orm" "github.com/go-pg/pg/orm"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/tendermint/tendermint/libs/flowrate"
"github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/rpc/client"
"net/url" "net/url"
"os" "os"
"os/signal"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -20,24 +20,6 @@ type (
db *pg.DB db *pg.DB
clients map[string]*client.HTTP clients map[string]*client.HTTP
} }
PeerInfo struct {
ID int64
Timestamp time.Time
Node string
PeerID string `json:"id"`
ListenAddr string `json:"listen_addr"`
Network string `json:"network"`
Version string `json:"version"`
Channels string `json:"channels"`
Moniker string `json:"moniker"`
IsOutbound bool `json:"is_outbound"`
SendData flowrate.Status
RecvData flowrate.Status
ChannelData []conn.ChannelStatus
}
) )
func main() { func main() {
@ -60,6 +42,7 @@ func main() {
panic(errors.New("PERIOD needs to be a number")) panic(errors.New("PERIOD needs to be a number"))
} }
// Setup the RPC clients
clients := make(map[string]*client.HTTP) clients := make(map[string]*client.HTTP)
for _, item := range strings.Split(os.Getenv("GAIA_URLS"), ",") { for _, item := range strings.Split(os.Getenv("GAIA_URLS"), ",") {
tClient := client.NewHTTP(item, "/websocket") tClient := client.NewHTTP(item, "/websocket")
@ -71,6 +54,7 @@ func main() {
clients[hostname.Host] = tClient clients[hostname.Host] = tClient
} }
// Connect to the postgres datastore
db := pg.Connect(&pg.Options{ db := pg.Connect(&pg.Options{
Addr: os.Getenv("DB_HOST"), Addr: os.Getenv("DB_HOST"),
User: os.Getenv("DB_USER"), User: os.Getenv("DB_USER"),
@ -78,25 +62,32 @@ func main() {
}) })
defer db.Close() defer db.Close()
err := createSchema(db) // Setup the database and ignore errors if the schema already exists
if err != nil { CreateSchema(db)
//panic(err)
} // Setup monitor
monitor := &Monitor{db, clients} monitor := &Monitor{db, clients}
// Parse query period
period, _ := strconv.Atoi(os.Getenv("PERIOD")) period, _ := strconv.Atoi(os.Getenv("PERIOD"))
// Allow graceful closing of the process
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
// Start the periodic syncing
for { for {
select { select {
case <-time.Tick(time.Duration(period) * time.Second): case <-time.Tick(time.Duration(period) * time.Second):
err := monitor.sync() monitor.Sync()
if err != nil { case <-signalCh:
fmt.Printf("error parsing governance: %v\n", err) return
}
} }
} }
} }
func createSchema(db *pg.DB) error { // CreateSchema sets up the database using the ORM
for _, model := range []interface{}{(*PeerInfo)(nil)} { func CreateSchema(db *pg.DB) error {
for _, model := range []interface{}{(*types.PeerInfo)(nil)} {
err := db.CreateTable(model, &orm.CreateTableOptions{}) err := db.CreateTable(model, &orm.CreateTableOptions{})
if err != nil { if err != nil {
return err return err
@ -105,28 +96,34 @@ func createSchema(db *pg.DB) error {
return nil return nil
} }
func (m *Monitor) sync() error { // Sync queries and stores the netdata for each node listed
func (m *Monitor) Sync() {
for name := range m.clients { for name := range m.clients {
go func(n string, client *client.HTTP) { go func(n string, client *client.HTTP) {
err := m.captureNetData(client, n) err := m.CaptureNetData(client, n)
if err != nil { if err != nil {
fmt.Printf("error parsing netData: %v\n", err) fmt.Printf("error parsing netData for %s: %v\n", name, err)
return
} }
fmt.Printf("parsed netData for %s\n", name)
}(name, m.clients[name]) }(name, m.clients[name])
} }
return nil
} }
func (m *Monitor) captureNetData(client *client.HTTP, name string) error { // CaptureNetData queries a node's net_info and stores the information for each peer in the db
func (m *Monitor) CaptureNetData(client *client.HTTP, name string) error {
// Get Data // Get Data
netInfo, err := client.NetInfo() netInfo, err := client.NetInfo()
if err != nil { if err != nil {
return err return err
} }
// Use one timestamp to allow grouping
timestamp := time.Now() timestamp := time.Now()
for _, peer := range netInfo.Peers { for _, peer := range netInfo.Peers {
data := &PeerInfo{} // Aggregate data
data := &types.PeerInfo{}
data.Timestamp = timestamp data.Timestamp = timestamp
data.Node = name data.Node = name
@ -142,6 +139,7 @@ func (m *Monitor) captureNetData(client *client.HTTP, name string) error {
data.RecvData = peer.ConnectionStatus.RecvMonitor data.RecvData = peer.ConnectionStatus.RecvMonitor
data.ChannelData = peer.ConnectionStatus.Channels data.ChannelData = peer.ConnectionStatus.Channels
// Store data in postgres
_, err = m.db.Model(data).Insert() _, err = m.db.Model(data).Insert()
if err != nil { if err != nil {
fmt.Printf("error inserting netData: %v\n", err) fmt.Printf("error inserting netData: %v\n", err)

View File

@ -1,6 +1,8 @@
package types package types
import ( import (
"github.com/tendermint/tendermint/libs/flowrate"
"github.com/tendermint/tendermint/p2p/conn"
"time" "time"
) )
@ -46,4 +48,22 @@ type (
VotingStartBlock string `json:"voting_start_block"` VotingStartBlock string `json:"voting_start_block"`
} `json:"value"` } `json:"value"`
} }
PeerInfo struct {
ID int64
Timestamp time.Time
Node string
PeerID string `json:"id"`
ListenAddr string `json:"listen_addr"`
Network string `json:"network"`
Version string `json:"version"`
Channels string `json:"channels"`
Moniker string `json:"moniker"`
IsOutbound bool `json:"is_outbound";sql:",default:false,notnull"`
SendData flowrate.Status
RecvData flowrate.Status
ChannelData []conn.ChannelStatus
}
) )