tendermint/tools/tm-monitor/monitor/node.go

261 lines
6.0 KiB
Go
Raw Normal View History

package monitor
2017-02-24 06:54:36 -08:00
import (
"encoding/json"
"math"
"time"
"github.com/pkg/errors"
crypto "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/events"
"github.com/tendermint/tendermint/libs/log"
2017-02-24 06:54:36 -08:00
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpc_client "github.com/tendermint/tendermint/rpc/lib/client"
em "github.com/tendermint/tendermint/tools/tm-monitor/eventmeter"
tmtypes "github.com/tendermint/tendermint/types"
2017-02-24 06:54:36 -08:00
)
const maxRestarts = 25
type Node struct {
rpcAddr string
IsValidator bool `json:"is_validator"` // validator or non-validator?
pubKey crypto.PubKey `json:"pub_key"`
2017-02-24 06:54:36 -08:00
Name string `json:"name"`
Online bool `json:"online"`
2018-04-03 04:02:09 -07:00
Height int64 `json:"height"`
BlockLatency float64 `json:"block_latency" amino:"unsafe"` // ms, interval between block commits
2017-02-24 06:54:36 -08:00
// em holds the ws connection. Each eventMeter callback is called in a separate go-routine.
em eventMeter
2017-03-06 06:35:52 -08:00
// rpcClient is an client for making RPC calls to TM
rpcClient rpc_client.HTTPClient
2017-02-24 06:54:36 -08:00
blockCh chan<- tmtypes.Header
blockLatencyCh chan<- float64
disconnectCh chan<- bool
2017-03-06 06:35:52 -08:00
checkIsValidatorInterval time.Duration
quit chan struct{}
logger log.Logger
2017-02-24 06:54:36 -08:00
}
2017-03-06 06:35:52 -08:00
func NewNode(rpcAddr string, options ...func(*Node)) *Node {
2017-02-24 06:54:36 -08:00
em := em.NewEventMeter(rpcAddr, UnmarshalEvent)
2017-04-20 16:18:38 -07:00
rpcClient := rpc_client.NewURIClient(rpcAddr) // HTTP client by default
rpcClient.SetCodec(cdc)
2017-03-06 06:35:52 -08:00
return NewNodeWithEventMeterAndRpcClient(rpcAddr, em, rpcClient, options...)
2017-02-24 06:54:36 -08:00
}
func NewNodeWithEventMeterAndRpcClient(rpcAddr string, em eventMeter, rpcClient rpc_client.HTTPClient, options ...func(*Node)) *Node {
2017-03-06 06:35:52 -08:00
n := &Node{
rpcAddr: rpcAddr,
em: em,
2017-03-06 06:35:52 -08:00
rpcClient: rpcClient,
Name: rpcAddr,
quit: make(chan struct{}),
2017-03-06 06:35:52 -08:00
checkIsValidatorInterval: 5 * time.Second,
logger: log.NewNopLogger(),
2017-03-06 06:35:52 -08:00
}
for _, option := range options {
option(n)
}
return n
}
// SetCheckIsValidatorInterval lets you change interval for checking whenever
// node is still a validator or not.
func SetCheckIsValidatorInterval(d time.Duration) func(n *Node) {
return func(n *Node) {
n.checkIsValidatorInterval = d
2017-02-24 06:54:36 -08:00
}
}
func (n *Node) SendBlocksTo(ch chan<- tmtypes.Header) {
n.blockCh = ch
}
func (n *Node) SendBlockLatenciesTo(ch chan<- float64) {
n.blockLatencyCh = ch
}
func (n *Node) NotifyAboutDisconnects(ch chan<- bool) {
n.disconnectCh = ch
}
// SetLogger lets you set your own logger
func (n *Node) SetLogger(l log.Logger) {
n.logger = l
n.em.SetLogger(l)
}
2017-02-24 06:54:36 -08:00
func (n *Node) Start() error {
if err := n.em.Start(); err != nil {
2017-02-24 06:54:36 -08:00
return err
}
n.em.RegisterLatencyCallback(latencyCallback(n))
2018-03-27 20:44:06 -07:00
err := n.em.Subscribe(tmtypes.EventQueryNewBlockHeader.String(), newBlockCallback(n))
2017-04-20 16:18:38 -07:00
if err != nil {
return err
}
2017-02-24 06:54:36 -08:00
n.em.RegisterDisconnectCallback(disconnectCallback(n))
n.Online = true
2017-03-06 06:35:52 -08:00
n.checkIsValidator()
go n.checkIsValidatorLoop()
2017-02-24 06:54:36 -08:00
return nil
}
func (n *Node) Stop() {
n.Online = false
n.em.Stop()
close(n.quit)
2017-02-24 06:54:36 -08:00
}
// implements eventmeter.EventCallbackFunc
func newBlockCallback(n *Node) em.EventCallbackFunc {
2017-07-29 10:50:33 -07:00
return func(metric *em.EventMetric, data interface{}) {
block := data.(tmtypes.TMEventData).(tmtypes.EventDataNewBlockHeader).Header
2017-02-24 06:54:36 -08:00
2018-04-03 04:02:09 -07:00
n.Height = block.Height
2017-08-03 13:01:28 -07:00
n.logger.Info("new block", "height", block.Height, "numTxs", block.NumTxs)
2017-02-24 06:54:36 -08:00
if n.blockCh != nil {
n.blockCh <- block
2017-02-24 06:54:36 -08:00
}
}
}
// implements eventmeter.EventLatencyFunc
func latencyCallback(n *Node) em.LatencyCallbackFunc {
return func(latency float64) {
n.BlockLatency = latency / 1000000.0 // ns to ms
2017-08-03 13:01:28 -07:00
n.logger.Info("new block latency", "latency", n.BlockLatency)
2017-02-24 06:54:36 -08:00
if n.blockLatencyCh != nil {
n.blockLatencyCh <- latency
}
}
}
// implements eventmeter.DisconnectCallbackFunc
func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
return func() {
n.Online = false
n.logger.Info("status", "down")
2017-02-24 06:54:36 -08:00
if n.disconnectCh != nil {
n.disconnectCh <- true
}
}
}
func (n *Node) RestartEventMeterBackoff() error {
2017-02-24 06:54:36 -08:00
attempt := 0
for {
d := time.Duration(math.Exp2(float64(attempt)))
time.Sleep(d * time.Second)
2017-03-21 09:37:52 -07:00
if err := n.em.Start(); err != nil {
2017-08-03 13:01:28 -07:00
n.logger.Info("restart failed", "err", err)
2017-02-24 06:54:36 -08:00
} else {
// TODO: authenticate pubkey
return nil
}
attempt++
if attempt > maxRestarts {
return errors.New("Reached max restarts")
2017-02-24 06:54:36 -08:00
}
}
}
2018-04-03 04:02:09 -07:00
func (n *Node) NumValidators() (height int64, num int, err error) {
height, vals, err := n.validators()
if err != nil {
return 0, 0, err
}
return height, len(vals), nil
}
2018-04-03 04:02:09 -07:00
func (n *Node) validators() (height int64, validators []*tmtypes.Validator, err error) {
vals := new(ctypes.ResultValidators)
if _, err = n.rpcClient.Call("validators", nil, vals); err != nil {
return 0, make([]*tmtypes.Validator, 0), err
}
2018-04-03 04:02:09 -07:00
return vals.BlockHeight, vals.Validators, nil
}
2017-03-06 06:35:52 -08:00
func (n *Node) checkIsValidatorLoop() {
for {
select {
case <-n.quit:
return
2017-03-06 06:35:52 -08:00
case <-time.After(n.checkIsValidatorInterval):
n.checkIsValidator()
}
}
}
func (n *Node) checkIsValidator() {
_, validators, err := n.validators()
if err == nil {
for _, v := range validators {
2018-04-03 02:56:00 -07:00
key, err1 := n.getPubKey()
if err1 == nil && v.PubKey.Equals(key) {
2017-03-06 06:35:52 -08:00
n.IsValidator = true
}
}
2017-03-06 06:35:52 -08:00
} else {
2017-08-03 13:01:28 -07:00
n.logger.Info("check is validator failed", "err", err)
}
}
func (n *Node) getPubKey() (crypto.PubKey, error) {
if n.pubKey != nil {
return n.pubKey, nil
}
status := new(ctypes.ResultStatus)
_, err := n.rpcClient.Call("status", nil, status)
if err != nil {
return nil, err
}
n.pubKey = status.ValidatorInfo.PubKey
return n.pubKey, nil
}
2017-02-24 06:54:36 -08:00
type eventMeter interface {
Start() error
Stop()
2017-02-24 06:54:36 -08:00
RegisterLatencyCallback(em.LatencyCallbackFunc)
RegisterDisconnectCallback(em.DisconnectCallbackFunc)
Subscribe(string, em.EventCallbackFunc) error
Unsubscribe(string) error
SetLogger(l log.Logger)
2017-02-24 06:54:36 -08:00
}
// UnmarshalEvent unmarshals a json event
2017-02-24 06:54:36 -08:00
func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) {
event := new(ctypes.ResultEvent)
if err := cdc.UnmarshalJSON(b, event); err != nil {
2017-02-24 06:54:36 -08:00
return "", nil, err
}
2018-03-27 20:44:06 -07:00
return event.Query, event.Data, nil
2017-02-24 06:54:36 -08:00
}