258 lines
6.2 KiB
Go
258 lines
6.2 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
ctypes "github.com/certusone/chain_exporter/types"
|
|
"github.com/go-pg/pg"
|
|
"github.com/go-pg/pg/orm"
|
|
"github.com/pkg/errors"
|
|
"github.com/tendermint/tendermint/rpc/client"
|
|
"github.com/tendermint/tendermint/types"
|
|
"gopkg.in/resty.v1"
|
|
"os"
|
|
"os/signal"
|
|
"time"
|
|
)
|
|
|
|
type (
|
|
Monitor struct {
|
|
client *client.HTTP
|
|
db *pg.DB
|
|
}
|
|
)
|
|
|
|
func main() {
|
|
if os.Getenv("GAIA_URL") == "" {
|
|
panic(errors.New("GAIA_URL needs to be set"))
|
|
}
|
|
if os.Getenv("DB_HOST") == "" {
|
|
panic(errors.New("DB_HOST needs to be set"))
|
|
}
|
|
if os.Getenv("DB_USER") == "" {
|
|
panic(errors.New("DB_USER needs to be set"))
|
|
}
|
|
if os.Getenv("DB_PW") == "" {
|
|
panic(errors.New("DB_PW needs to be set"))
|
|
}
|
|
if os.Getenv("LCD_URL") == "" {
|
|
panic(errors.New("LCD_URL needs to be set"))
|
|
}
|
|
|
|
tClient := client.NewHTTP(os.Getenv("GAIA_URL"), "/websocket")
|
|
|
|
db := pg.Connect(&pg.Options{
|
|
Addr: os.Getenv("DB_HOST"),
|
|
User: os.Getenv("DB_USER"),
|
|
Password: os.Getenv("DB_PW"),
|
|
})
|
|
defer db.Close()
|
|
|
|
// Setup the database and ignore errors if the schema already exists
|
|
err := CreateSchema(db)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
// Configure resty
|
|
resty.SetTimeout(5 * time.Second)
|
|
|
|
// Setup the monitor
|
|
monitor := &Monitor{tClient, db}
|
|
|
|
// Start the syncing task
|
|
go func() {
|
|
for {
|
|
fmt.Println("start - sync blockchain")
|
|
err := monitor.Sync()
|
|
if err != nil {
|
|
fmt.Printf("error - sync blockchain: %v\n", err)
|
|
}
|
|
fmt.Println("finish - sync blockchain")
|
|
time.Sleep(time.Second)
|
|
}
|
|
}()
|
|
|
|
// Allow graceful closing of the governance loop
|
|
signalCh := make(chan os.Signal, 1)
|
|
signal.Notify(signalCh, os.Interrupt)
|
|
|
|
for {
|
|
select {
|
|
case <-time.Tick(10 * time.Second):
|
|
fmt.Println("start - sync governance proposals")
|
|
err := monitor.GetGovernance()
|
|
if err != nil {
|
|
fmt.Printf("error - sync governance proposals: %v\n", err)
|
|
continue
|
|
}
|
|
fmt.Println("finish - sync governance proposals")
|
|
case <-signalCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// CreateSchema sets up the database using the ORM
|
|
func CreateSchema(db *pg.DB) error {
|
|
for _, model := range []interface{}{(*ctypes.BlockInfo)(nil), (*ctypes.EvidenceInfo)(nil), (*ctypes.MissInfo)(nil), (*ctypes.Proposal)(nil)} {
|
|
err := db.CreateTable(model, &orm.CreateTableOptions{IfNotExists: true})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Sync syncs the blockchain and missed blocks from a node
|
|
func (m *Monitor) Sync() error {
|
|
|
|
// Check current height in db
|
|
var blocks []ctypes.BlockInfo
|
|
err := m.db.Model(&blocks).Order("height DESC").Limit(1).Select()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
bestHeight := int64(1)
|
|
if len(blocks) > 0 {
|
|
bestHeight = blocks[0].Height
|
|
}
|
|
|
|
// Query the node for its height
|
|
status, err := m.client.Status()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
maxHeight := status.SyncInfo.LatestBlockHeight
|
|
|
|
// Ingest all blocks up to the best height
|
|
for i := bestHeight + 1; i <= maxHeight; i++ {
|
|
err = m.IngestPrevBlock(i)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fmt.Printf("synced block %d/%d \n", i, maxHeight)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IngestPrevBlock queries the block at the given height-1 from the node and ingests its metadata (blockinfo,evidence)
|
|
// into the database. It also queries the next block to access the commits and stores the missed signatures.
|
|
func (m *Monitor) IngestPrevBlock(height int64) error {
|
|
prevHeight := height - 1
|
|
|
|
// Get validator set for the block
|
|
validators, err := m.client.Validators(&prevHeight)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Query the previous block
|
|
block, err := m.client.Block(&prevHeight)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Query the next block to access the commits
|
|
nextBlock, err := m.client.Block(&height)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Parse blockinfo
|
|
blockInfo := new(ctypes.BlockInfo)
|
|
blockInfo.ID = block.BlockMeta.BlockID.String()
|
|
blockInfo.Height = height
|
|
blockInfo.Time = block.BlockMeta.Header.Time
|
|
blockInfo.Proposer = block.Block.ProposerAddress.String()
|
|
|
|
// Identify missed validators
|
|
missedValidators := make([]*ctypes.MissInfo, 0)
|
|
|
|
for i, validator := range validators.Validators {
|
|
if nextBlock.Block.LastCommit.Precommits[i] == nil {
|
|
missed := &ctypes.MissInfo{
|
|
Height: block.BlockMeta.Header.Height,
|
|
Address: validator.Address.String(),
|
|
Alerted: false,
|
|
Time: block.BlockMeta.Header.Time,
|
|
Proposer: block.BlockMeta.Header.ProposerAddress.String(),
|
|
}
|
|
missedValidators = append(missedValidators, missed)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Collect evidence
|
|
evidenceInfo := make([]*ctypes.EvidenceInfo, 0)
|
|
for _, evidence := range nextBlock.Block.Evidence.Evidence {
|
|
evInfo := &ctypes.EvidenceInfo{}
|
|
evInfo.Address = types.Address(evidence.Address()).String()
|
|
evInfo.Height = evidence.Height()
|
|
evidenceInfo = append(evidenceInfo, evInfo)
|
|
}
|
|
|
|
// Insert in DB
|
|
err = m.db.RunInTransaction(func(tx *pg.Tx) error {
|
|
// Insert blockinfo
|
|
err = tx.Insert(blockInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Insert evidence
|
|
if len(evidenceInfo) > 0 {
|
|
err = tx.Insert(&evidenceInfo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Insert missed signatures
|
|
if len(missedValidators) > 0 {
|
|
err = tx.Insert(&missedValidators)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetGovernance queries the governance proposals from the lcd and stores them in the db
|
|
func (m *Monitor) GetGovernance() error {
|
|
// Query lcd
|
|
resp, err := resty.R().Get(os.Getenv("LCD_URL") + "/gov/proposals")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Parse proposals
|
|
var proposals []*ctypes.Proposal
|
|
err = json.Unmarshal(resp.Body(), &proposals)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Copy proposal data into the database model
|
|
for _, proposal := range proposals {
|
|
proposal.ID = proposal.Details.ProposalID
|
|
proposal.Height = proposal.Details.SubmitBlock
|
|
proposal.Alerted = false
|
|
proposal.Description = proposal.Details.Description
|
|
proposal.ProposalStatus = proposal.Details.ProposalStatus
|
|
proposal.ProposalType = proposal.Details.ProposalType
|
|
proposal.Title = proposal.Details.Title
|
|
proposal.VotingStartBlock = proposal.Details.VotingStartBlock
|
|
}
|
|
|
|
// Store proposals in the db
|
|
_, err = m.db.Model(&proposals).OnConflict("(id) DO UPDATE").Set("proposal_status = EXCLUDED.proposal_status").Insert()
|
|
return err
|
|
}
|