[tm-monitor] update WSClient
This commit is contained in:
parent
af04238bb9
commit
330f38a77a
|
@ -1,30 +1,28 @@
|
|||
// eventmeter - generic system to subscribe to events and record their frequency.
|
||||
package eventmeter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/pkg/errors"
|
||||
metrics "github.com/rcrowley/go-metrics"
|
||||
client "github.com/tendermint/tendermint/rpc/lib/client"
|
||||
"github.com/tendermint/tmlibs/events"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
)
|
||||
|
||||
//------------------------------------------------------
|
||||
// Generic system to subscribe to events and record their frequency
|
||||
//------------------------------------------------------
|
||||
const (
|
||||
// Get ping/pong latency and call LatencyCallbackFunc with this period.
|
||||
latencyPeriod = 1 * time.Second
|
||||
|
||||
//------------------------------------------------------
|
||||
// Meter for a particular event
|
||||
// Check if the WS client is connected every
|
||||
connectionCheckPeriod = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
// Closure to enable side effects from receiving an event
|
||||
type EventCallbackFunc func(em *EventMetric, data interface{})
|
||||
|
||||
// Metrics for a given event
|
||||
// EventMetric exposes metrics for an event.
|
||||
type EventMetric struct {
|
||||
ID string `json:"id"`
|
||||
Started time.Time `json:"start_time"`
|
||||
|
@ -42,8 +40,8 @@ type EventMetric struct {
|
|||
Rate15 float64 `json:"rate_15" wire:"unsafe"`
|
||||
RateMean float64 `json:"rate_mean" wire:"unsafe"`
|
||||
|
||||
// so the event can have effects in the event-meter's consumer.
|
||||
// runs in a go routine
|
||||
// so the event can have effects in the eventmeter's consumer. runs in a go
|
||||
// routine.
|
||||
callback EventCallbackFunc
|
||||
}
|
||||
|
||||
|
@ -63,35 +61,32 @@ func (metric *EventMetric) fillMetric() *EventMetric {
|
|||
return metric
|
||||
}
|
||||
|
||||
//------------------------------------------------------
|
||||
// Websocket client and event meter for many events
|
||||
// EventCallbackFunc is a closure to enable side effects from receiving an
|
||||
// event.
|
||||
type EventCallbackFunc func(em *EventMetric, data interface{})
|
||||
|
||||
const maxPingsPerPong = 30 // if we haven't received a pong in this many attempted pings we kill the conn
|
||||
|
||||
// Get the eventID and data out of the raw json received over the go-rpc websocket
|
||||
// EventUnmarshalFunc is a closure to get the eventType and data out of the raw
|
||||
// JSON received over the RPC WebSocket.
|
||||
type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
|
||||
|
||||
// Closure to enable side effects from receiving a pong
|
||||
// LatencyCallbackFunc is a closure to enable side effects from receiving a latency.
|
||||
type LatencyCallbackFunc func(meanLatencyNanoSeconds float64)
|
||||
|
||||
// Closure to notify consumer that the connection died
|
||||
// DisconnectCallbackFunc is a closure to notify a consumer that the connection
|
||||
// has died.
|
||||
type DisconnectCallbackFunc func()
|
||||
|
||||
// Each node gets an event meter to track events for that node
|
||||
// EventMeter tracks events, reports latency and disconnects.
|
||||
type EventMeter struct {
|
||||
wsc *client.WSClient
|
||||
|
||||
mtx sync.Mutex
|
||||
events map[string]*EventMetric
|
||||
|
||||
// to record ws latency
|
||||
timer metrics.Timer
|
||||
lastPing time.Time
|
||||
receivedPong bool
|
||||
unmarshalEvent EventUnmarshalFunc
|
||||
latencyCallback LatencyCallbackFunc
|
||||
disconnectCallback DisconnectCallbackFunc
|
||||
|
||||
unmarshalEvent EventUnmarshalFunc
|
||||
subscribed bool
|
||||
|
||||
quit chan struct{}
|
||||
|
||||
|
@ -99,54 +94,44 @@ type EventMeter struct {
|
|||
}
|
||||
|
||||
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
|
||||
em := &EventMeter{
|
||||
wsc: client.NewWSClient(addr, "/websocket"),
|
||||
return &EventMeter{
|
||||
wsc: client.NewWSClient(addr, "/websocket", client.PingPong(1*time.Second, 2*time.Second)),
|
||||
events: make(map[string]*EventMetric),
|
||||
timer: metrics.NewTimer(),
|
||||
receivedPong: true,
|
||||
unmarshalEvent: unmarshalEvent,
|
||||
logger: log.NewNopLogger(),
|
||||
}
|
||||
return em
|
||||
}
|
||||
|
||||
// SetLogger lets you set your own logger
|
||||
// SetLogger lets you set your own logger.
|
||||
func (em *EventMeter) SetLogger(l log.Logger) {
|
||||
em.logger = l
|
||||
em.wsc.SetLogger(l.With("module", "rpcclient"))
|
||||
}
|
||||
|
||||
// String returns a string representation of event meter.
|
||||
func (em *EventMeter) String() string {
|
||||
return em.wsc.Address
|
||||
}
|
||||
|
||||
// Start boots up event meter.
|
||||
func (em *EventMeter) Start() error {
|
||||
if _, err := em.wsc.Reset(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := em.wsc.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
em.wsc.Conn.SetPongHandler(func(m string) error {
|
||||
// NOTE: https://github.com/gorilla/websocket/issues/97
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
em.receivedPong = true
|
||||
em.timer.UpdateSince(em.lastPing)
|
||||
if em.latencyCallback != nil {
|
||||
go em.latencyCallback(em.timer.Mean())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
em.quit = make(chan struct{})
|
||||
go em.receiveRoutine()
|
||||
go em.disconnectRoutine()
|
||||
|
||||
return em.resubscribe()
|
||||
err := em.subscribe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
em.subscribed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the EventMeter.
|
||||
// Stop stops event meter.
|
||||
func (em *EventMeter) Stop() {
|
||||
close(em.quit)
|
||||
|
||||
|
@ -158,9 +143,7 @@ func (em *EventMeter) Stop() {
|
|||
// StopAndCallDisconnectCallback stops the EventMeter and calls
|
||||
// disconnectCallback if present.
|
||||
func (em *EventMeter) StopAndCallDisconnectCallback() {
|
||||
if em.wsc.IsRunning() {
|
||||
em.wsc.Stop()
|
||||
}
|
||||
em.Stop()
|
||||
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
@ -169,74 +152,70 @@ func (em *EventMeter) StopAndCallDisconnectCallback() {
|
|||
}
|
||||
}
|
||||
|
||||
func (em *EventMeter) Subscribe(eventID string, cb EventCallbackFunc) error {
|
||||
// Subscribe for the given event type. Callback function will be called upon
|
||||
// receiving an event.
|
||||
func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
||||
if _, ok := em.events[eventID]; ok {
|
||||
if _, ok := em.events[eventType]; ok {
|
||||
return fmt.Errorf("subscribtion already exists")
|
||||
}
|
||||
if err := em.wsc.Subscribe(eventID); err != nil {
|
||||
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metric := &EventMetric{
|
||||
ID: eventID,
|
||||
Started: time.Now(),
|
||||
MinDuration: 1 << 62,
|
||||
meter: metrics.NewMeter(),
|
||||
callback: cb,
|
||||
}
|
||||
em.events[eventID] = metric
|
||||
em.events[eventType] = metric
|
||||
return nil
|
||||
}
|
||||
|
||||
func (em *EventMeter) Unsubscribe(eventID string) error {
|
||||
// Unsubscribe from the given event type.
|
||||
func (em *EventMeter) Unsubscribe(eventType string) error {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
if err := em.wsc.Unsubscribe(eventID); err != nil {
|
||||
if err := em.wsc.Unsubscribe(context.TODO(), eventType); err != nil {
|
||||
return err
|
||||
}
|
||||
// XXX: should we persist or save this info first?
|
||||
delete(em.events, eventID)
|
||||
delete(em.events, eventType)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fill in the latest data for an event and return a copy
|
||||
func (em *EventMeter) GetMetric(eventID string) (*EventMetric, error) {
|
||||
// GetMetric fills in the latest data for an event and return a copy.
|
||||
func (em *EventMeter) GetMetric(eventType string) (*EventMetric, error) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
metric, ok := em.events[eventID]
|
||||
metric, ok := em.events[eventType]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Unknown event %s", eventID)
|
||||
return nil, fmt.Errorf("unknown event: %s", eventType)
|
||||
}
|
||||
return metric.fillMetric().Copy(), nil
|
||||
}
|
||||
|
||||
// Return the average latency over the websocket
|
||||
func (em *EventMeter) Latency() float64 {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
return em.timer.Mean()
|
||||
}
|
||||
|
||||
// RegisterLatencyCallback allows you to set latency callback.
|
||||
func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
em.latencyCallback = f
|
||||
}
|
||||
|
||||
// RegisterDisconnectCallback allows you to set disconnect callback.
|
||||
func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
em.disconnectCallback = f
|
||||
}
|
||||
|
||||
//------------------------------------------------------
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// Private
|
||||
|
||||
func (em *EventMeter) resubscribe() error {
|
||||
for eventID, _ := range em.events {
|
||||
if err := em.wsc.Subscribe(eventID); err != nil {
|
||||
func (em *EventMeter) subscribe() error {
|
||||
for eventType, _ := range em.events {
|
||||
if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -244,39 +223,32 @@ func (em *EventMeter) resubscribe() error {
|
|||
}
|
||||
|
||||
func (em *EventMeter) receiveRoutine() {
|
||||
pingTime := time.Second * 1
|
||||
pingTicker := time.NewTicker(pingTime)
|
||||
pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn
|
||||
|
||||
var err error
|
||||
latencyTicker := time.NewTicker(latencyPeriod)
|
||||
for {
|
||||
select {
|
||||
case <-pingTicker.C:
|
||||
if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil {
|
||||
em.logger.Error("err", errors.Wrap(err, "failed to write ping message on websocket"))
|
||||
em.StopAndCallDisconnectCallback()
|
||||
return
|
||||
} else if pingAttempts >= maxPingsPerPong {
|
||||
em.logger.Error("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
|
||||
em.StopAndCallDisconnectCallback()
|
||||
return
|
||||
}
|
||||
case r := <-em.wsc.ResultsCh:
|
||||
if r == nil {
|
||||
em.logger.Error("err", errors.New("Expected some event, received nil"))
|
||||
em.StopAndCallDisconnectCallback()
|
||||
return
|
||||
}
|
||||
eventID, data, err := em.unmarshalEvent(r)
|
||||
if err != nil {
|
||||
em.logger.Error("err", errors.Wrap(err, "failed to unmarshal event"))
|
||||
case rawEvent := <-em.wsc.ResultsCh:
|
||||
if rawEvent == nil {
|
||||
em.logger.Error("expected some event, got nil")
|
||||
continue
|
||||
}
|
||||
if eventID != "" {
|
||||
em.updateMetric(eventID, data)
|
||||
eventType, data, err := em.unmarshalEvent(rawEvent)
|
||||
if err != nil {
|
||||
em.logger.Error("failed to unmarshal event", "err", err)
|
||||
continue
|
||||
}
|
||||
if eventType != "" { // FIXME how can it be an empty string?
|
||||
em.updateMetric(eventType, data)
|
||||
}
|
||||
case err := <-em.wsc.ErrorsCh:
|
||||
if err != nil {
|
||||
em.logger.Error("expected some event, got error", "err", err)
|
||||
}
|
||||
case <-latencyTicker.C:
|
||||
if em.wsc.IsActive() {
|
||||
em.latencyCallback(em.wsc.PingPongLatencyTimer.Mean())
|
||||
}
|
||||
case <-em.wsc.Quit:
|
||||
em.logger.Error("err", errors.New("WSClient closed unexpectedly"))
|
||||
em.logger.Error("WebSocket client closed unexpectedly")
|
||||
em.StopAndCallDisconnectCallback()
|
||||
return
|
||||
case <-em.quit:
|
||||
|
@ -285,29 +257,33 @@ func (em *EventMeter) receiveRoutine() {
|
|||
}
|
||||
}
|
||||
|
||||
func (em *EventMeter) pingForLatency(pingAttempts int) (int, error) {
|
||||
func (em *EventMeter) disconnectRoutine() {
|
||||
ticker := time.NewTicker(connectionCheckPeriod)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once
|
||||
em.mtx.Lock()
|
||||
if em.disconnectCallback != nil {
|
||||
go em.disconnectCallback()
|
||||
}
|
||||
em.mtx.Unlock()
|
||||
em.subscribed = false
|
||||
} else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe
|
||||
em.subscribe()
|
||||
em.subscribed = true
|
||||
}
|
||||
case <-em.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (em *EventMeter) updateMetric(eventType string, data events.EventData) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
||||
// ping to record latency
|
||||
if !em.receivedPong {
|
||||
return pingAttempts + 1, nil
|
||||
}
|
||||
|
||||
em.lastPing = time.Now()
|
||||
em.receivedPong = false
|
||||
err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{})
|
||||
if err != nil {
|
||||
return pingAttempts, err
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (em *EventMeter) updateMetric(eventID string, data events.EventData) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
||||
metric, ok := em.events[eventID]
|
||||
metric, ok := em.events[eventType]
|
||||
if !ok {
|
||||
// we already unsubscribed, or got an unexpected event
|
||||
return
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
hash: 30b649bc544a4ebd2b2a6188ce314cb72e6c28be8f3e57ec22e7cb83fd974814
|
||||
updated: 2017-07-29T18:47:33.199177142Z
|
||||
hash: 3e3601085c1862570e37cfa6c7024df03e3921a43dc78be08372b1e9fbb1620d
|
||||
updated: 2017-08-04T14:44:28.484044469Z
|
||||
imports:
|
||||
- name: github.com/btcsuite/btcd
|
||||
version: 583684b21bfbde9b5fc4403916fd7c807feb0289
|
||||
|
@ -13,6 +13,12 @@ imports:
|
|||
- log/term
|
||||
- name: github.com/go-logfmt/logfmt
|
||||
version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5
|
||||
- name: github.com/go-playground/locales
|
||||
version: 1e5f1161c6416a5ff48840eb8724a394e48cc534
|
||||
subpackages:
|
||||
- currency
|
||||
- name: github.com/go-playground/universal-translator
|
||||
version: 71201497bace774495daed26a3874fd339e0b538
|
||||
- name: github.com/go-stack/stack
|
||||
version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82
|
||||
- name: github.com/golang/protobuf
|
||||
|
@ -45,7 +51,7 @@ imports:
|
|||
subpackages:
|
||||
- data
|
||||
- name: github.com/tendermint/tendermint
|
||||
version: e9b7221292afe25ce956ea85ab83bb5708eb2992
|
||||
version: 0013053fae3fb7611c392ebcff15352bb7ec717b
|
||||
subpackages:
|
||||
- config
|
||||
- p2p
|
||||
|
@ -56,7 +62,7 @@ imports:
|
|||
- rpc/lib/types
|
||||
- types
|
||||
- name: github.com/tendermint/tmlibs
|
||||
version: 2f6f3e6aa70bb19b70a6e73210273fa127041070
|
||||
version: 75372988e737a9f672c0e7f6308042620bd3e151
|
||||
subpackages:
|
||||
- common
|
||||
- events
|
||||
|
@ -97,6 +103,8 @@ imports:
|
|||
- stats
|
||||
- tap
|
||||
- transport
|
||||
- name: gopkg.in/go-playground/validator.v9
|
||||
version: 0f6f568263a1ab5105b57f66f446d2625e4f545c
|
||||
testImports:
|
||||
- name: github.com/davecgh/go-spew
|
||||
version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9
|
||||
|
|
|
@ -5,7 +5,7 @@ import:
|
|||
- package: github.com/rcrowley/go-metrics
|
||||
- package: github.com/tendermint/go-crypto
|
||||
- package: github.com/tendermint/tendermint
|
||||
version: develop
|
||||
version: 0013053fae3fb7611c392ebcff15352bb7ec717b
|
||||
subpackages:
|
||||
- rpc/core/types
|
||||
- rpc/lib/client
|
||||
|
|
|
@ -47,7 +47,7 @@ Examples:
|
|||
}
|
||||
|
||||
if noton {
|
||||
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "tm-monitor")
|
||||
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
|
||||
}
|
||||
|
||||
m := startMonitor(flag.Arg(0))
|
||||
|
|
|
@ -129,7 +129,7 @@ func newBlockCallback(n *Node) em.EventCallbackFunc {
|
|||
block := data.(tmtypes.TMEventData).Unwrap().(tmtypes.EventDataNewBlockHeader).Header
|
||||
|
||||
n.Height = uint64(block.Height)
|
||||
n.logger.Info("event", "new block", "height", block.Height, "numTxs", block.NumTxs)
|
||||
n.logger.Info("new block", "height", block.Height, "numTxs", block.NumTxs)
|
||||
|
||||
if n.blockCh != nil {
|
||||
n.blockCh <- *block
|
||||
|
@ -141,7 +141,7 @@ func newBlockCallback(n *Node) em.EventCallbackFunc {
|
|||
func latencyCallback(n *Node) em.LatencyCallbackFunc {
|
||||
return func(latency float64) {
|
||||
n.BlockLatency = latency / 1000000.0 // ns to ms
|
||||
n.logger.Info("event", "new block latency", "latency", n.BlockLatency)
|
||||
n.logger.Info("new block latency", "latency", n.BlockLatency)
|
||||
|
||||
if n.blockLatencyCh != nil {
|
||||
n.blockLatencyCh <- latency
|
||||
|
@ -158,17 +158,6 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
|
|||
if n.disconnectCh != nil {
|
||||
n.disconnectCh <- true
|
||||
}
|
||||
|
||||
if err := n.RestartEventMeterBackoff(); err != nil {
|
||||
n.logger.Info("err", errors.Wrap(err, "restart failed"))
|
||||
} else {
|
||||
n.Online = true
|
||||
n.logger.Info("status", "online")
|
||||
|
||||
if n.disconnectCh != nil {
|
||||
n.disconnectCh <- false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -180,7 +169,7 @@ func (n *Node) RestartEventMeterBackoff() error {
|
|||
time.Sleep(d * time.Second)
|
||||
|
||||
if err := n.em.Start(); err != nil {
|
||||
n.logger.Info("err", errors.Wrap(err, "restart failed"))
|
||||
n.logger.Info("restart failed", "err", err)
|
||||
} else {
|
||||
// TODO: authenticate pubkey
|
||||
return nil
|
||||
|
@ -231,7 +220,7 @@ func (n *Node) checkIsValidator() {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
n.logger.Info("err", errors.Wrap(err, "check is validator failed"))
|
||||
n.logger.Info("check is validator failed", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,9 +12,8 @@ import (
|
|||
func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) {
|
||||
routes := routes(m)
|
||||
|
||||
// serve http and ws
|
||||
mux := http.NewServeMux()
|
||||
wm := rpc.NewWebsocketManager(routes, nil) // TODO: evsw
|
||||
wm := rpc.NewWebsocketManager(routes, nil)
|
||||
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
||||
rpc.RegisterRPCFuncs(mux, routes, logger)
|
||||
if _, err := rpc.StartHTTPServer(listenAddr, mux, logger); err != nil {
|
||||
|
|
Loading…
Reference in New Issue