From 5a2afc575485e2d651b9840f5d1ea080cdc72fa7 Mon Sep 17 00:00:00 2001 From: zelig Date: Sat, 5 Jul 2014 19:56:01 +0100 Subject: [PATCH] fix reactor engine main loop blocked to wait if drained --- ethreact/reactor.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/ethreact/reactor.go b/ethreact/reactor.go index 3802d95b3..f42f71202 100644 --- a/ethreact/reactor.go +++ b/ethreact/reactor.go @@ -28,7 +28,7 @@ func (e *EventHandler) Post(event Event) { select { case ch <- event: default: - logger.Warnln("subscribing channel %d to event %s blocked. skipping", i, event.Name) + logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name) } } } @@ -69,7 +69,7 @@ type ReactorEngine struct { quit chan bool shutdownChannel chan bool running bool - drained bool + drained chan bool } func New() *ReactorEngine { @@ -77,6 +77,7 @@ func New() *ReactorEngine { eventHandlers: make(map[string]*EventHandler), eventChannel: make(chan Event), quit: make(chan bool, 1), + drained: make(chan bool, 1), shutdownChannel: make(chan bool, 1), } } @@ -94,8 +95,9 @@ func (reactor *ReactorEngine) Start() { case event := <-reactor.eventChannel: // needs to be called syncronously to keep order of events reactor.dispatch(event) + case reactor.drained <- true: default: - reactor.drained = true + reactor.drained <- true // blocking till message is coming in } } reactor.lock.Lock() @@ -113,14 +115,16 @@ func (reactor *ReactorEngine) Stop() { reactor.lock.RLock() if reactor.running { reactor.quit <- true + select { + case <-reactor.drained: + } } reactor.lock.RUnlock() <-reactor.shutdownChannel } func (reactor *ReactorEngine) Flush() { - for !reactor.drained { - } + <-reactor.drained } // Subscribe a channel to the specified event @@ -136,7 +140,7 @@ func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { } // Add the events channel to reactor event handler eventHandler.Add(eventChannel) - logger.Debugln("added new subscription to %s", event) + logger.Debugf("added new subscription to %s", event) } func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { @@ -149,7 +153,7 @@ func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) if len == 0 { reactor.eventHandlers[event] = nil } - logger.Debugln("removed subscription to %s", event) + logger.Debugf("removed subscription to %s", event) } } @@ -158,8 +162,10 @@ func (reactor *ReactorEngine) Post(event string, resource interface{}) { defer reactor.lock.Unlock() if reactor.running { - reactor.drained = false reactor.eventChannel <- Event{Resource: resource, Name: event} + select { + case <-reactor.drained: + } } }