[tm-monitor] now EventMeter can be restarted multiple times (Refs #6)

with one caveat: go-common and go-rpc need to be updated as well
This commit is contained in:
Anton Kaliaev 2017-03-21 20:39:48 +04:00
parent c053c15231
commit 3044f66ba9
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
2 changed files with 33 additions and 13 deletions

View File

@ -136,20 +136,28 @@ func (em *EventMeter) Start() error {
} }
return nil return nil
}) })
em.quit = make(chan struct{})
go em.receiveRoutine() go em.receiveRoutine()
return nil
return em.resubscribe()
} }
// Stop stops the EventMeter. // Stop stops the EventMeter.
func (em *EventMeter) Stop() { func (em *EventMeter) Stop() {
close(em.quit) close(em.quit)
em.RegisterDisconnectCallback(nil) // so we don't try and reconnect if em.wsc.IsRunning() {
em.wsc.Stop() // close(wsc.Quit) em.wsc.Stop()
}
} }
func (em *EventMeter) StopAndReconnect() { // StopAndCallDisconnectCallback stops the EventMeter and calls
em.wsc.Stop() // disconnectCallback if present.
func (em *EventMeter) StopAndCallDisconnectCallback() {
if em.wsc.IsRunning() {
em.wsc.Stop()
}
em.mtx.Lock() em.mtx.Lock()
defer em.mtx.Unlock() defer em.mtx.Unlock()
@ -223,38 +231,50 @@ func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
//------------------------------------------------------ //------------------------------------------------------
func (em *EventMeter) resubscribe() error {
for eventID, _ := range em.events {
if err := em.wsc.Subscribe(eventID); err != nil {
return err
}
}
return nil
}
func (em *EventMeter) receiveRoutine() { func (em *EventMeter) receiveRoutine() {
pingTime := time.Second * 1 pingTime := time.Second * 1
pingTicker := time.NewTicker(pingTime) pingTicker := time.NewTicker(pingTime)
pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn
var err error var err error
for { for {
select { select {
case <-pingTicker.C: case <-pingTicker.C:
if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil { if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil {
em.logger.Log("err", errors.Wrap(err, "Failed to write ping message on websocket")) em.logger.Log("err", errors.Wrap(err, "failed to write ping message on websocket"))
em.StopAndReconnect() em.StopAndCallDisconnectCallback()
return return
} else if pingAttempts >= maxPingsPerPong { } else if pingAttempts >= maxPingsPerPong {
em.logger.Log("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime)) em.logger.Log("err", errors.Errorf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
em.StopAndReconnect() em.StopAndCallDisconnectCallback()
return return
} }
case r := <-em.wsc.ResultsCh: case r := <-em.wsc.ResultsCh:
if r == nil { if r == nil {
em.StopAndReconnect() em.logger.Log("err", errors.New("Expected some event, received nil"))
em.StopAndCallDisconnectCallback()
return return
} }
eventID, data, err := em.unmarshalEvent(r) eventID, data, err := em.unmarshalEvent(r)
if err != nil { if err != nil {
em.logger.Log("err", err) em.logger.Log("err", errors.Wrap(err, "failed to unmarshal event"))
continue continue
} }
if eventID != "" { if eventID != "" {
em.updateMetric(eventID, data) em.updateMetric(eventID, data)
} }
case <-em.wsc.Quit: case <-em.wsc.Quit:
em.StopAndReconnect() em.logger.Log("err", errors.New("WSClient closed unexpectedly"))
em.StopAndCallDisconnectCallback()
return return
case <-em.quit: case <-em.quit:
return return

View File

@ -162,7 +162,7 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
n.disconnectCh <- true n.disconnectCh <- true
} }
if err := n.RestartBackOff(); err != nil { if err := n.RestartEventMeterBackoff(); err != nil {
n.logger.Log("err", errors.Wrap(err, "restart failed")) n.logger.Log("err", errors.Wrap(err, "restart failed"))
} else { } else {
n.Online = true n.Online = true
@ -175,7 +175,7 @@ func disconnectCallback(n *Node) em.DisconnectCallbackFunc {
} }
} }
func (n *Node) RestartBackOff() error { func (n *Node) RestartEventMeterBackoff() error {
attempt := 0 attempt := 0
for { for {