From 647fddd1886f205a27dade69f7c33277a58903a7 Mon Sep 17 00:00:00 2001 From: Hendrik Hofstadt Date: Wed, 26 Sep 2018 13:44:27 +0200 Subject: [PATCH] Refactor code, add address filter for alerting, allow graceful stopping, filter governance alerts, update proposals on status change --- README.md | 1 + alerter/main.go | 97 ++++++++++++++++++++++++++++++++------------ main.go | 73 +++++++++++++++++++++++++-------- net_exporter/main.go | 70 ++++++++++++++++---------------- types/types.go | 20 +++++++++ 5 files changed, 182 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 134a4d7..fc6a2c4 100644 --- a/README.md +++ b/README.md @@ -38,4 +38,5 @@ Environment: "DB_USER" = "postgres" # DB username "DB_PW"= "mypwd" # DB password "RAVEN_DSN" = "http://xxxxxxx" # DSN_URL from Sentry (hosted or self-hosted) +"ADDRESS" = "ABCDDED" # Address of the validator to alert on ``` diff --git a/alerter/main.go b/alerter/main.go index f00bde3..35143fe 100644 --- a/alerter/main.go +++ b/alerter/main.go @@ -1,18 +1,21 @@ package main import ( + "fmt" "github.com/certusone/chain_exporter/types" "github.com/getsentry/raven-go" "github.com/go-pg/pg" "github.com/pkg/errors" "os" + "os/signal" "strconv" "time" ) type ( Monitor struct { - db *pg.DB + db *pg.DB + address string } ) @@ -29,9 +32,14 @@ func main() { if os.Getenv("RAVEN_DSN") == "" { panic(errors.New("RAVEN_DSN needs to be set")) } + if os.Getenv("ADDRESS") == "" { + panic(errors.New("ADDRESS needs to be set")) + } + // Set Raven URL for alerts raven.SetDSN(os.Getenv("RAVEN_DSN")) + // Connect to the postgres datastore db := pg.Connect(&pg.Options{ Addr: os.Getenv("DB_HOST"), User: os.Getenv("DB_USER"), @@ -39,57 +47,96 @@ func main() { }) defer db.Close() - monitor := &Monitor{db} - for { - select { - case <-time.Tick(time.Second): - err := monitor.sync() - if err != nil { - panic(err) + // Start the monitor + monitor := &Monitor{db, os.Getenv("ADDRESS")} + + go func() { + for { + select { + // Check for alert conditions every second + case <-time.Tick(time.Second): + fmt.Println("start - alerting on misses") + err := monitor.AlertMisses() + if err != nil { + fmt.Printf("error - alerting on misses: %v\n", err) + } + fmt.Println("finish - alerting on misses") } } - } + }() + go func() { + for { + select { + case <-time.Tick(time.Second): + fmt.Println("start - alerting on governance") + err := monitor.AlertGovernance() + if err != nil { + fmt.Printf("error - alerting on governance: %v\n", err) + } + fmt.Println("finish - alerting on governance") + } + } + }() + + // Allow graceful closing of the process + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt) + <-signalCh } -func (m *Monitor) sync() error { - // Alert on block misses +// AlertMisses queries misses from the database and sends the relevant alert to sentry +func (m *Monitor) AlertMisses() error { + // Query block misses from the DB var misses []*types.MissInfo - err := m.db.Model(&types.MissInfo{}).Where("alerted = FALSE").Select(&misses) + err := m.db.Model(&types.MissInfo{}).Where("alerted = FALSE and address = ?", m.address).Select(&misses) if err != nil { return err } + + // Iterate misses and send alerts for _, miss := range misses { - raven.CaptureError(errors.New("Missed block"), map[string]string{"height": strconv.FormatInt(miss.Height, 10), "time": miss.Time.String(), "address": miss.Address}) + raven.CaptureMessage("Missed block", map[string]string{"height": strconv.FormatInt(miss.Height, 10), "time": miss.Time.String(), "address": miss.Address}) + + // Mark miss as alerted in the db miss.Alerted = true _, err = m.db.Model(miss).Where("id = ?", miss.ID).Update() if err != nil { return err } + + fmt.Printf("alerted on miss #height: %d\n", miss.Height) } - // Alert on proposals + return nil +} + +// AlertGovernance queries active governance proposals from the database and sends the relevant alert to sentry +func (m *Monitor) AlertGovernance() error { + // Query proposals from the DB var proposals []*types.Proposal - err = m.db.Model(&types.Proposal{}).Where("alerted = FALSE").Select(&proposals) + err := m.db.Model(&types.Proposal{}). + Where("alerted = FALSE and proposal_status = ?", "Active"). + Select(&proposals) if err != nil { return err } + + // Send alerts for every proposal for _, proposal := range proposals { - if proposal.ProposalStatus == "Passed" || proposal.ProposalStatus == "Rejected" { - proposal.Alerted = true - _, err = m.db.Model(proposal).Where("id = ?", proposal.ID).Update() - if err != nil { - return err - } - continue - } - - raven.CaptureMessage("New governance proposal: "+proposal.Title+"\nDescription: "+proposal.Description+"\nStartHeight: "+proposal.VotingStartBlock, map[string]string{"height": strconv.FormatInt(proposal.Height, 10), "type": proposal.Type}) + raven.CaptureMessage(fmt.Sprintf("New governance proposal: %s\nDescription: %s\nStartHeight: %s", proposal.Title, proposal.Description, proposal.VotingStartBlock), + map[string]string{ + "height": strconv.FormatInt(proposal.Height, 10), + "type": proposal.Type, + }) + // Mark proposal as alerted in the db proposal.Alerted = true _, err = m.db.Model(proposal).Where("id = ?", proposal.ID).Update() if err != nil { return err } + + fmt.Printf("alerted on proposal #%s\n", proposal.ID) } return nil diff --git a/main.go b/main.go index 84f46a1..e58e5ca 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "github.com/tendermint/tendermint/types" "gopkg.in/resty.v1" "os" + "os/signal" "time" ) @@ -47,33 +48,50 @@ func main() { }) defer db.Close() - err := createSchema(db) - if err != nil { - //panic(err) - } + // Setup the database and ignore errors if the schema already exists + CreateSchema(db) + + // Configure resty + resty.SetTimeout(5 * time.Second) + + // Setup the monitor monitor := &Monitor{tClient, db} + + // Start the syncing task go func() { for { - err = monitor.sync() + fmt.Println("start - sync blockchain") + err := monitor.Sync() if err != nil { - fmt.Printf("error syncing: %v\n", err) + fmt.Printf("error - sync blockchain: %v\n", err) } + fmt.Println("finish - sync blockchain") time.Sleep(time.Second) } }() + // Allow graceful closing of the governance loop + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt) + for { select { case <-time.Tick(10 * time.Second): - err := monitor.getGovernance() + fmt.Println("start - sync governance proposals") + err := monitor.GetGovernance() if err != nil { - fmt.Printf("error parsing governance: %v\n", err) + fmt.Printf("error - sync governance proposals: %v\n", err) + continue } + fmt.Println("finish - sync governance proposals") + case <-signalCh: + return } } } -func createSchema(db *pg.DB) error { +// CreateSchema sets up the database using the ORM +func CreateSchema(db *pg.DB) error { for _, model := range []interface{}{(*ctypes.BlockInfo)(nil), (*ctypes.EvidenceInfo)(nil), (*ctypes.MissInfo)(nil), (*ctypes.Proposal)(nil)} { err := db.CreateTable(model, &orm.CreateTableOptions{}) if err != nil { @@ -83,7 +101,10 @@ func createSchema(db *pg.DB) error { return nil } -func (m *Monitor) sync() error { +// Sync syncs the blockchain and missed blocks from a node +func (m *Monitor) Sync() error { + + // Check current height in db var blocks []ctypes.BlockInfo err := m.db.Model(&blocks).Order("height DESC").Limit(1).Select() if err != nil { @@ -94,14 +115,16 @@ func (m *Monitor) sync() error { bestHeight = blocks[0].Height } + // Query the node for its height status, err := m.client.Status() if err != nil { return err } maxHeight := status.SyncInfo.LatestBlockHeight + // Ingest all blocks up to the best height for i := bestHeight + 1; i <= maxHeight; i++ { - err = m.ingestBlock(i) + err = m.IngestPrevBlock(i) if err != nil { return err } @@ -110,35 +133,39 @@ func (m *Monitor) sync() error { return nil } -func (m *Monitor) ingestBlock(height int64) error { +// IngestPrevBlock queries the block at the given height-1 from the node and ingests its metadata (blockinfo,evidence) +// into the database. It also queries the next block to access the commits and stores the missed signatures. +func (m *Monitor) IngestPrevBlock(height int64) error { prevHeight := height - 1 - // Get Data + // Get validator set for the block validators, err := m.client.Validators(&prevHeight) if err != nil { return err } + // Query the previous block block, err := m.client.Block(&prevHeight) if err != nil { return err } + // Query the next block to access the commits nextBlock, err := m.client.Block(&height) if err != nil { return err } + // Parse blockinfo blockInfo := new(ctypes.BlockInfo) - blockInfo.ID = nextBlock.BlockMeta.Header.LastBlockID.String() + blockInfo.ID = block.BlockMeta.BlockID.String() blockInfo.Height = height - blockInfo.Time = nextBlock.BlockMeta.Header.Time + blockInfo.Time = block.BlockMeta.Header.Time blockInfo.Proposer = block.Block.ProposerAddress.String() // Identify missed validators missedValidators := make([]*ctypes.MissInfo, 0) - // Parse for i, validator := range validators.Validators { if nextBlock.Block.LastCommit.Precommits[i] == nil { missed := &ctypes.MissInfo{ @@ -164,16 +191,21 @@ func (m *Monitor) ingestBlock(height int64) error { // Insert in DB err = m.db.RunInTransaction(func(tx *pg.Tx) error { + // Insert blockinfo err = tx.Insert(blockInfo) if err != nil { return err } + + // Insert evidence if len(evidenceInfo) > 0 { err = tx.Insert(&evidenceInfo) if err != nil { return err } } + + // Insert missed signatures if len(missedValidators) > 0 { err = tx.Insert(&missedValidators) if err != nil { @@ -189,18 +221,22 @@ func (m *Monitor) ingestBlock(height int64) error { return nil } -func (m *Monitor) getGovernance() error { +// GetGovernance queries the governance proposals from the lcd and stores them in the db +func (m *Monitor) GetGovernance() error { + // Query lcd resp, err := resty.R().Get(os.Getenv("LCD_URL") + "/gov/proposals") if err != nil { return err } + // Parse proposals var proposals []*ctypes.Proposal err = json.Unmarshal(resp.Body(), &proposals) if err != nil { return err } + // Copy proposal data into the database model for _, proposal := range proposals { proposal.ID = proposal.Details.ProposalID proposal.Height = proposal.Details.SubmitBlock @@ -212,6 +248,7 @@ func (m *Monitor) getGovernance() error { proposal.VotingStartBlock = proposal.Details.VotingStartBlock } - _, err = m.db.Model(&proposals).OnConflict("DO NOTHING").Insert() + // Store proposals in the db + _, err = m.db.Model(&proposals).OnConflict("(id) DO UPDATE").Set("proposal_status = EXCLUDED.proposal_status").Insert() return err } diff --git a/net_exporter/main.go b/net_exporter/main.go index 51b4e37..7a91daf 100644 --- a/net_exporter/main.go +++ b/net_exporter/main.go @@ -2,14 +2,14 @@ package main import ( "fmt" + "github.com/certusone/chain_exporter/types" "github.com/go-pg/pg" "github.com/go-pg/pg/orm" "github.com/pkg/errors" - "github.com/tendermint/tendermint/libs/flowrate" - "github.com/tendermint/tendermint/p2p/conn" "github.com/tendermint/tendermint/rpc/client" "net/url" "os" + "os/signal" "strconv" "strings" "time" @@ -20,24 +20,6 @@ type ( db *pg.DB clients map[string]*client.HTTP } - - PeerInfo struct { - ID int64 - Timestamp time.Time - Node string - - PeerID string `json:"id"` - ListenAddr string `json:"listen_addr"` - Network string `json:"network"` - Version string `json:"version"` - Channels string `json:"channels"` - Moniker string `json:"moniker"` - IsOutbound bool `json:"is_outbound"` - - SendData flowrate.Status - RecvData flowrate.Status - ChannelData []conn.ChannelStatus - } ) func main() { @@ -60,6 +42,7 @@ func main() { panic(errors.New("PERIOD needs to be a number")) } + // Setup the RPC clients clients := make(map[string]*client.HTTP) for _, item := range strings.Split(os.Getenv("GAIA_URLS"), ",") { tClient := client.NewHTTP(item, "/websocket") @@ -71,6 +54,7 @@ func main() { clients[hostname.Host] = tClient } + // Connect to the postgres datastore db := pg.Connect(&pg.Options{ Addr: os.Getenv("DB_HOST"), User: os.Getenv("DB_USER"), @@ -78,25 +62,32 @@ func main() { }) defer db.Close() - err := createSchema(db) - if err != nil { - //panic(err) - } + // Setup the database and ignore errors if the schema already exists + CreateSchema(db) + + // Setup monitor monitor := &Monitor{db, clients} + // Parse query period period, _ := strconv.Atoi(os.Getenv("PERIOD")) + + // Allow graceful closing of the process + signalCh := make(chan os.Signal, 1) + signal.Notify(signalCh, os.Interrupt) + + // Start the periodic syncing for { select { case <-time.Tick(time.Duration(period) * time.Second): - err := monitor.sync() - if err != nil { - fmt.Printf("error parsing governance: %v\n", err) - } + monitor.Sync() + case <-signalCh: + return } } } -func createSchema(db *pg.DB) error { - for _, model := range []interface{}{(*PeerInfo)(nil)} { +// CreateSchema sets up the database using the ORM +func CreateSchema(db *pg.DB) error { + for _, model := range []interface{}{(*types.PeerInfo)(nil)} { err := db.CreateTable(model, &orm.CreateTableOptions{}) if err != nil { return err @@ -105,28 +96,34 @@ func createSchema(db *pg.DB) error { return nil } -func (m *Monitor) sync() error { +// Sync queries and stores the netdata for each node listed +func (m *Monitor) Sync() { for name := range m.clients { go func(n string, client *client.HTTP) { - err := m.captureNetData(client, n) + err := m.CaptureNetData(client, n) if err != nil { - fmt.Printf("error parsing netData: %v\n", err) + fmt.Printf("error parsing netData for %s: %v\n", name, err) + return } + fmt.Printf("parsed netData for %s\n", name) + }(name, m.clients[name]) } - return nil } -func (m *Monitor) captureNetData(client *client.HTTP, name string) error { +// CaptureNetData queries a node's net_info and stores the information for each peer in the db +func (m *Monitor) CaptureNetData(client *client.HTTP, name string) error { // Get Data netInfo, err := client.NetInfo() if err != nil { return err } + // Use one timestamp to allow grouping timestamp := time.Now() for _, peer := range netInfo.Peers { - data := &PeerInfo{} + // Aggregate data + data := &types.PeerInfo{} data.Timestamp = timestamp data.Node = name @@ -142,6 +139,7 @@ func (m *Monitor) captureNetData(client *client.HTTP, name string) error { data.RecvData = peer.ConnectionStatus.RecvMonitor data.ChannelData = peer.ConnectionStatus.Channels + // Store data in postgres _, err = m.db.Model(data).Insert() if err != nil { fmt.Printf("error inserting netData: %v\n", err) diff --git a/types/types.go b/types/types.go index 80146de..fff2a50 100644 --- a/types/types.go +++ b/types/types.go @@ -1,6 +1,8 @@ package types import ( + "github.com/tendermint/tendermint/libs/flowrate" + "github.com/tendermint/tendermint/p2p/conn" "time" ) @@ -46,4 +48,22 @@ type ( VotingStartBlock string `json:"voting_start_block"` } `json:"value"` } + + PeerInfo struct { + ID int64 + Timestamp time.Time + Node string + + PeerID string `json:"id"` + ListenAddr string `json:"listen_addr"` + Network string `json:"network"` + Version string `json:"version"` + Channels string `json:"channels"` + Moniker string `json:"moniker"` + IsOutbound bool `json:"is_outbound";sql:",default:false,notnull"` + + SendData flowrate.Status + RecvData flowrate.Status + ChannelData []conn.ChannelStatus + } )