From 42f58ceb4bb3b93d17e8669c1fb6d693b15435ca Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 8 Aug 2017 15:44:16 -0400 Subject: [PATCH] [tm-monitor] call latency callback in a separate goroutine --- tm-bench/transacter.go | 4 +-- tm-monitor/eventmeter/eventmeter.go | 40 ++++++++++++++--------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/tm-bench/transacter.go b/tm-bench/transacter.go index ad30ff45..894d133f 100644 --- a/tm-bench/transacter.go +++ b/tm-bench/transacter.go @@ -152,7 +152,7 @@ func (t *transacter) sendLoop(connIndex int) { Params: &rawParamsJson, }) if err != nil { - fmt.Printf("%v. Try increasing the connections count and reducing the rate.\n", errors.Wrap(err, "txs send failed")) + fmt.Printf("%v. Try reducing the connections count and increasing the rate.\n", errors.Wrap(err, "txs send failed")) os.Exit(1) } @@ -163,7 +163,7 @@ func (t *transacter) sendLoop(connIndex int) { time.Sleep(time.Second - timeToSend) logger.Info(fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend) case <-pingsTicker.C: - // Right now go-rpc server closes the connection in the absence of pings + // go-rpc server closes the connection in the absence of pings c.SetWriteDeadline(time.Now().Add(sendTimeout)) if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { logger.Error("failed to write ping message", "err", err) diff --git a/tm-monitor/eventmeter/eventmeter.go b/tm-monitor/eventmeter/eventmeter.go index 9fe56fa6..a96b425e 100644 --- a/tm-monitor/eventmeter/eventmeter.go +++ b/tm-monitor/eventmeter/eventmeter.go @@ -140,18 +140,6 @@ func (em *EventMeter) Stop() { } } -// StopAndCallDisconnectCallback stops the EventMeter and calls -// disconnectCallback if present. -func (em *EventMeter) StopAndCallDisconnectCallback() { - em.Stop() - - em.mtx.Lock() - defer em.mtx.Unlock() - if em.disconnectCallback != nil { - go em.disconnectCallback() - } -} - // Subscribe for the given event type. Callback function will be called upon // receiving an event. func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { @@ -245,11 +233,9 @@ func (em *EventMeter) receiveRoutine() { } case <-latencyTicker.C: if em.wsc.IsActive() { - em.latencyCallback(em.wsc.PingPongLatencyTimer.Mean()) + em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean()) } case <-em.wsc.Quit: - em.logger.Error("WebSocket client closed unexpectedly") - em.StopAndCallDisconnectCallback() return case <-em.quit: return @@ -263,16 +249,14 @@ func (em *EventMeter) disconnectRoutine() { select { case <-ticker.C: if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once - em.mtx.Lock() - if em.disconnectCallback != nil { - go em.disconnectCallback() - } - em.mtx.Unlock() + em.callDisconnectCallback() em.subscribed = false } else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe em.subscribe() em.subscribed = true } + case <-em.wsc.Quit: + return case <-em.quit: return } @@ -304,3 +288,19 @@ func (em *EventMeter) updateMetric(eventType string, data events.EventData) { go metric.callback(metric.Copy(), data) } } + +func (em *EventMeter) callDisconnectCallback() { + em.mtx.Lock() + if em.disconnectCallback != nil { + go em.disconnectCallback() + } + em.mtx.Unlock() +} + +func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) { + em.mtx.Lock() + if em.latencyCallback != nil { + go em.latencyCallback(meanLatencyNanoSeconds) + } + em.mtx.Unlock() +}