From 330f38a77afe3168d9a425be9b53c1a6c82982a4 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 3 Aug 2017 16:01:28 -0400 Subject: [PATCH] [tm-monitor] update WSClient --- tm-monitor/eventmeter/eventmeter.go | 226 +++++++++------------ tm-monitor/glide.lock | 16 +- tm-monitor/glide.yaml | 2 +- tm-monitor/main.go | 2 +- tm-monitor/mock/{mock.go => eventmeter.go} | 0 tm-monitor/monitor/node.go | 19 +- tm-monitor/rpc.go | 3 +- 7 files changed, 120 insertions(+), 148 deletions(-) rename tm-monitor/mock/{mock.go => eventmeter.go} (100%) diff --git a/tm-monitor/eventmeter/eventmeter.go b/tm-monitor/eventmeter/eventmeter.go index 12742383..9fe56fa6 100644 --- a/tm-monitor/eventmeter/eventmeter.go +++ b/tm-monitor/eventmeter/eventmeter.go @@ -1,30 +1,28 @@ +// eventmeter - generic system to subscribe to events and record their frequency. package eventmeter import ( + "context" "encoding/json" "fmt" "sync" "time" - "github.com/gorilla/websocket" - "github.com/pkg/errors" metrics "github.com/rcrowley/go-metrics" client "github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tmlibs/events" "github.com/tendermint/tmlibs/log" ) -//------------------------------------------------------ -// Generic system to subscribe to events and record their frequency -//------------------------------------------------------ +const ( + // Get ping/pong latency and call LatencyCallbackFunc with this period. + latencyPeriod = 1 * time.Second -//------------------------------------------------------ -// Meter for a particular event + // Check if the WS client is connected every + connectionCheckPeriod = 100 * time.Millisecond +) -// Closure to enable side effects from receiving an event -type EventCallbackFunc func(em *EventMetric, data interface{}) - -// Metrics for a given event +// EventMetric exposes metrics for an event. type EventMetric struct { ID string `json:"id"` Started time.Time `json:"start_time"` @@ -42,8 +40,8 @@ type EventMetric struct { Rate15 float64 `json:"rate_15" wire:"unsafe"` RateMean float64 `json:"rate_mean" wire:"unsafe"` - // so the event can have effects in the event-meter's consumer. - // runs in a go routine + // so the event can have effects in the eventmeter's consumer. runs in a go + // routine. callback EventCallbackFunc } @@ -63,35 +61,32 @@ func (metric *EventMetric) fillMetric() *EventMetric { return metric } -//------------------------------------------------------ -// Websocket client and event meter for many events +// EventCallbackFunc is a closure to enable side effects from receiving an +// event. +type EventCallbackFunc func(em *EventMetric, data interface{}) -const maxPingsPerPong = 30 // if we haven't received a pong in this many attempted pings we kill the conn - -// Get the eventID and data out of the raw json received over the go-rpc websocket +// EventUnmarshalFunc is a closure to get the eventType and data out of the raw +// JSON received over the RPC WebSocket. type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error) -// Closure to enable side effects from receiving a pong +// LatencyCallbackFunc is a closure to enable side effects from receiving a latency. type LatencyCallbackFunc func(meanLatencyNanoSeconds float64) -// Closure to notify consumer that the connection died +// DisconnectCallbackFunc is a closure to notify a consumer that the connection +// has died. type DisconnectCallbackFunc func() -// Each node gets an event meter to track events for that node +// EventMeter tracks events, reports latency and disconnects. type EventMeter struct { wsc *client.WSClient mtx sync.Mutex events map[string]*EventMetric - // to record ws latency - timer metrics.Timer - lastPing time.Time - receivedPong bool + unmarshalEvent EventUnmarshalFunc latencyCallback LatencyCallbackFunc disconnectCallback DisconnectCallbackFunc - - unmarshalEvent EventUnmarshalFunc + subscribed bool quit chan struct{} @@ -99,54 +94,44 @@ type EventMeter struct { } func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter { - em := &EventMeter{ - wsc: client.NewWSClient(addr, "/websocket"), + return &EventMeter{ + wsc: client.NewWSClient(addr, "/websocket", client.PingPong(1*time.Second, 2*time.Second)), events: make(map[string]*EventMetric), - timer: metrics.NewTimer(), - receivedPong: true, unmarshalEvent: unmarshalEvent, logger: log.NewNopLogger(), } - return em } -// SetLogger lets you set your own logger +// SetLogger lets you set your own logger. func (em *EventMeter) SetLogger(l log.Logger) { em.logger = l + em.wsc.SetLogger(l.With("module", "rpcclient")) } +// String returns a string representation of event meter. func (em *EventMeter) String() string { return em.wsc.Address } +// Start boots up event meter. func (em *EventMeter) Start() error { - if _, err := em.wsc.Reset(); err != nil { - return err - } - if _, err := em.wsc.Start(); err != nil { return err } - em.wsc.Conn.SetPongHandler(func(m string) error { - // NOTE: https://github.com/gorilla/websocket/issues/97 - em.mtx.Lock() - defer em.mtx.Unlock() - em.receivedPong = true - em.timer.UpdateSince(em.lastPing) - if em.latencyCallback != nil { - go em.latencyCallback(em.timer.Mean()) - } - return nil - }) - em.quit = make(chan struct{}) go em.receiveRoutine() + go em.disconnectRoutine() - return em.resubscribe() + err := em.subscribe() + if err != nil { + return err + } + em.subscribed = true + return nil } -// Stop stops the EventMeter. +// Stop stops event meter. func (em *EventMeter) Stop() { close(em.quit) @@ -158,9 +143,7 @@ func (em *EventMeter) Stop() { // StopAndCallDisconnectCallback stops the EventMeter and calls // disconnectCallback if present. func (em *EventMeter) StopAndCallDisconnectCallback() { - if em.wsc.IsRunning() { - em.wsc.Stop() - } + em.Stop() em.mtx.Lock() defer em.mtx.Unlock() @@ -169,74 +152,70 @@ func (em *EventMeter) StopAndCallDisconnectCallback() { } } -func (em *EventMeter) Subscribe(eventID string, cb EventCallbackFunc) error { +// Subscribe for the given event type. Callback function will be called upon +// receiving an event. +func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { em.mtx.Lock() defer em.mtx.Unlock() - if _, ok := em.events[eventID]; ok { + if _, ok := em.events[eventType]; ok { return fmt.Errorf("subscribtion already exists") } - if err := em.wsc.Subscribe(eventID); err != nil { + if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { return err } metric := &EventMetric{ - ID: eventID, - Started: time.Now(), - MinDuration: 1 << 62, - meter: metrics.NewMeter(), - callback: cb, + meter: metrics.NewMeter(), + callback: cb, } - em.events[eventID] = metric + em.events[eventType] = metric return nil } -func (em *EventMeter) Unsubscribe(eventID string) error { +// Unsubscribe from the given event type. +func (em *EventMeter) Unsubscribe(eventType string) error { em.mtx.Lock() defer em.mtx.Unlock() - if err := em.wsc.Unsubscribe(eventID); err != nil { + if err := em.wsc.Unsubscribe(context.TODO(), eventType); err != nil { return err } // XXX: should we persist or save this info first? - delete(em.events, eventID) + delete(em.events, eventType) return nil } -// Fill in the latest data for an event and return a copy -func (em *EventMeter) GetMetric(eventID string) (*EventMetric, error) { +// GetMetric fills in the latest data for an event and return a copy. +func (em *EventMeter) GetMetric(eventType string) (*EventMetric, error) { em.mtx.Lock() defer em.mtx.Unlock() - metric, ok := em.events[eventID] + metric, ok := em.events[eventType] if !ok { - return nil, fmt.Errorf("Unknown event %s", eventID) + return nil, fmt.Errorf("unknown event: %s", eventType) } return metric.fillMetric().Copy(), nil } -// Return the average latency over the websocket -func (em *EventMeter) Latency() float64 { - em.mtx.Lock() - defer em.mtx.Unlock() - return em.timer.Mean() -} - +// RegisterLatencyCallback allows you to set latency callback. func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) { em.mtx.Lock() defer em.mtx.Unlock() em.latencyCallback = f } +// RegisterDisconnectCallback allows you to set disconnect callback. func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) { em.mtx.Lock() defer em.mtx.Unlock() em.disconnectCallback = f } -//------------------------------------------------------ +/////////////////////////////////////////////////////////////////////////////// +// Private -func (em *EventMeter) resubscribe() error { - for eventID, _ := range em.events { - if err := em.wsc.Subscribe(eventID); err != nil { +func (em *EventMeter) subscribe() error { + for eventType, _ := range em.events { + if err := em.wsc.Subscribe(context.TODO(), eventType); err != nil { return err } } @@ -244,39 +223,32 @@ func (em *EventMeter) resubscribe() error { } func (em *EventMeter) receiveRoutine() { - pingTime := time.Second * 1 - pingTicker := time.NewTicker(pingTime) - pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn - - var err error + latencyTicker := time.NewTicker(latencyPeriod) for { select { - case <-pingTicker.C: - if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { - em.logger.Error("err", errors.Wrap(err, "failed to write ping message on websocket")) - em.StopAndCallDisconnectCallback() - return - } else if pingAttempts >= maxPingsPerPong { - em.logger.Error("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) - em.StopAndCallDisconnectCallback() - return - } - case r := <-em.wsc.ResultsCh: - if r == nil { - em.logger.Error("err", errors.New("Expected some event, received nil")) - em.StopAndCallDisconnectCallback() - return - } - eventID, data, err := em.unmarshalEvent(r) - if err != nil { - em.logger.Error("err", errors.Wrap(err, "failed to unmarshal event")) + case rawEvent := <-em.wsc.ResultsCh: + if rawEvent == nil { + em.logger.Error("expected some event, got nil") continue } - if eventID != "" { - em.updateMetric(eventID, data) + eventType, data, err := em.unmarshalEvent(rawEvent) + if err != nil { + em.logger.Error("failed to unmarshal event", "err", err) + continue + } + if eventType != "" { // FIXME how can it be an empty string? + em.updateMetric(eventType, data) + } + case err := <-em.wsc.ErrorsCh: + if err != nil { + em.logger.Error("expected some event, got error", "err", err) + } + case <-latencyTicker.C: + if em.wsc.IsActive() { + em.latencyCallback(em.wsc.PingPongLatencyTimer.Mean()) } case <-em.wsc.Quit: - em.logger.Error("err", errors.New("WSClient closed unexpectedly")) + em.logger.Error("WebSocket client closed unexpectedly") em.StopAndCallDisconnectCallback() return case <-em.quit: @@ -285,29 +257,33 @@ func (em *EventMeter) receiveRoutine() { } } -func (em *EventMeter) pingForLatency(pingAttempts int) (int, error) { - em.mtx.Lock() - defer em.mtx.Unlock() - - // ping to record latency - if !em.receivedPong { - return pingAttempts + 1, nil +func (em *EventMeter) disconnectRoutine() { + ticker := time.NewTicker(connectionCheckPeriod) + for { + select { + case <-ticker.C: + if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once + em.mtx.Lock() + if em.disconnectCallback != nil { + go em.disconnectCallback() + } + em.mtx.Unlock() + em.subscribed = false + } else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe + em.subscribe() + em.subscribed = true + } + case <-em.quit: + return + } } - - em.lastPing = time.Now() - em.receivedPong = false - err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{}) - if err != nil { - return pingAttempts, err - } - return 0, nil } -func (em *EventMeter) updateMetric(eventID string, data events.EventData) { +func (em *EventMeter) updateMetric(eventType string, data events.EventData) { em.mtx.Lock() defer em.mtx.Unlock() - metric, ok := em.events[eventID] + metric, ok := em.events[eventType] if !ok { // we already unsubscribed, or got an unexpected event return diff --git a/tm-monitor/glide.lock b/tm-monitor/glide.lock index 33b827d2..fb49f9b7 100644 --- a/tm-monitor/glide.lock +++ b/tm-monitor/glide.lock @@ -1,5 +1,5 @@ -hash: 30b649bc544a4ebd2b2a6188ce314cb72e6c28be8f3e57ec22e7cb83fd974814 -updated: 2017-07-29T18:47:33.199177142Z +hash: 3e3601085c1862570e37cfa6c7024df03e3921a43dc78be08372b1e9fbb1620d +updated: 2017-08-04T14:44:28.484044469Z imports: - name: github.com/btcsuite/btcd version: 583684b21bfbde9b5fc4403916fd7c807feb0289 @@ -13,6 +13,12 @@ imports: - log/term - name: github.com/go-logfmt/logfmt version: 390ab7935ee28ec6b286364bba9b4dd6410cb3d5 +- name: github.com/go-playground/locales + version: 1e5f1161c6416a5ff48840eb8724a394e48cc534 + subpackages: + - currency +- name: github.com/go-playground/universal-translator + version: 71201497bace774495daed26a3874fd339e0b538 - name: github.com/go-stack/stack version: 100eb0c0a9c5b306ca2fb4f165df21d80ada4b82 - name: github.com/golang/protobuf @@ -45,7 +51,7 @@ imports: subpackages: - data - name: github.com/tendermint/tendermint - version: e9b7221292afe25ce956ea85ab83bb5708eb2992 + version: 0013053fae3fb7611c392ebcff15352bb7ec717b subpackages: - config - p2p @@ -56,7 +62,7 @@ imports: - rpc/lib/types - types - name: github.com/tendermint/tmlibs - version: 2f6f3e6aa70bb19b70a6e73210273fa127041070 + version: 75372988e737a9f672c0e7f6308042620bd3e151 subpackages: - common - events @@ -97,6 +103,8 @@ imports: - stats - tap - transport +- name: gopkg.in/go-playground/validator.v9 + version: 0f6f568263a1ab5105b57f66f446d2625e4f545c testImports: - name: github.com/davecgh/go-spew version: 04cdfd42973bb9c8589fd6a731800cf222fde1a9 diff --git a/tm-monitor/glide.yaml b/tm-monitor/glide.yaml index e68ff697..8256fc32 100644 --- a/tm-monitor/glide.yaml +++ b/tm-monitor/glide.yaml @@ -5,7 +5,7 @@ import: - package: github.com/rcrowley/go-metrics - package: github.com/tendermint/go-crypto - package: github.com/tendermint/tendermint - version: develop + version: 0013053fae3fb7611c392ebcff15352bb7ec717b subpackages: - rpc/core/types - rpc/lib/client diff --git a/tm-monitor/main.go b/tm-monitor/main.go index 0c841c75..54901c6c 100644 --- a/tm-monitor/main.go +++ b/tm-monitor/main.go @@ -47,7 +47,7 @@ Examples: } if noton { - logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "tm-monitor") + logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) } m := startMonitor(flag.Arg(0)) diff --git a/tm-monitor/mock/mock.go b/tm-monitor/mock/eventmeter.go similarity index 100% rename from tm-monitor/mock/mock.go rename to tm-monitor/mock/eventmeter.go diff --git a/tm-monitor/monitor/node.go b/tm-monitor/monitor/node.go index 2c2352a7..53f76f33 100644 --- a/tm-monitor/monitor/node.go +++ b/tm-monitor/monitor/node.go @@ -129,7 +129,7 @@ func newBlockCallback(n *Node) em.EventCallbackFunc { block := data.(tmtypes.TMEventData).Unwrap().(tmtypes.EventDataNewBlockHeader).Header n.Height = uint64(block.Height) - n.logger.Info("event", "new block", "height", block.Height, "numTxs", block.NumTxs) + n.logger.Info("new block", "height", block.Height, "numTxs", block.NumTxs) if n.blockCh != nil { n.blockCh <- *block @@ -141,7 +141,7 @@ func newBlockCallback(n *Node) em.EventCallbackFunc { func latencyCallback(n *Node) em.LatencyCallbackFunc { return func(latency float64) { n.BlockLatency = latency / 1000000.0 // ns to ms - n.logger.Info("event", "new block latency", "latency", n.BlockLatency) + n.logger.Info("new block latency", "latency", n.BlockLatency) if n.blockLatencyCh != nil { n.blockLatencyCh <- latency @@ -158,17 +158,6 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc { if n.disconnectCh != nil { n.disconnectCh <- true } - - if err := n.RestartEventMeterBackoff(); err != nil { - n.logger.Info("err", errors.Wrap(err, "restart failed")) - } else { - n.Online = true - n.logger.Info("status", "online") - - if n.disconnectCh != nil { - n.disconnectCh <- false - } - } } } @@ -180,7 +169,7 @@ func (n *Node) RestartEventMeterBackoff() error { time.Sleep(d * time.Second) if err := n.em.Start(); err != nil { - n.logger.Info("err", errors.Wrap(err, "restart failed")) + n.logger.Info("restart failed", "err", err) } else { // TODO: authenticate pubkey return nil @@ -231,7 +220,7 @@ func (n *Node) checkIsValidator() { } } } else { - n.logger.Info("err", errors.Wrap(err, "check is validator failed")) + n.logger.Info("check is validator failed", "err", err) } } diff --git a/tm-monitor/rpc.go b/tm-monitor/rpc.go index bbf508a9..52e98eeb 100644 --- a/tm-monitor/rpc.go +++ b/tm-monitor/rpc.go @@ -12,9 +12,8 @@ import ( func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) { routes := routes(m) - // serve http and ws mux := http.NewServeMux() - wm := rpc.NewWebsocketManager(routes, nil) // TODO: evsw + wm := rpc.NewWebsocketManager(routes, nil) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpc.RegisterRPCFuncs(mux, routes, logger) if _, err := rpc.StartHTTPServer(listenAddr, mux, logger); err != nil {