diff --git a/common/repeat_timer.go b/common/repeat_timer.go index d7d9154d..0f650113 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -1,7 +1,7 @@ package common import ( - "sync" + "fmt" "time" ) @@ -11,54 +11,40 @@ It's good for keeping connections alive. A RepeatTimer must be Stop()'d or it will keep a goroutine alive. */ type RepeatTimer struct { - Ch chan time.Time + Name string + Ch <-chan time.Time + output chan<- time.Time + input chan repeatCommand - mtx sync.Mutex - name string - ticker *time.Ticker - quit chan struct{} - wg *sync.WaitGroup - dur time.Duration + dur time.Duration + timer *time.Timer } +type repeatCommand int32 + +const ( + Reset repeatCommand = iota + RQuit +) + func NewRepeatTimer(name string, dur time.Duration) *RepeatTimer { + c := make(chan time.Time) var t = &RepeatTimer{ - Ch: make(chan time.Time), - ticker: time.NewTicker(dur), - quit: make(chan struct{}), - wg: new(sync.WaitGroup), - name: name, - dur: dur, - } - t.wg.Add(1) - go t.fireRoutine(t.ticker) - return t -} + Name: name, + Ch: c, + output: c, + input: make(chan repeatCommand), -func (t *RepeatTimer) fireRoutine(ticker *time.Ticker) { - for { - select { - case t_ := <-ticker.C: - t.Ch <- t_ - case <-t.quit: - // needed so we know when we can reset t.quit - t.wg.Done() - return - } + timer: time.NewTimer(dur), + dur: dur, } + go t.run() + return t } // Wait the duration again before firing. func (t *RepeatTimer) Reset() { - t.Stop() - - t.mtx.Lock() // Lock - defer t.mtx.Unlock() - - t.ticker = time.NewTicker(t.dur) - t.quit = make(chan struct{}) - t.wg.Add(1) - go t.fireRoutine(t.ticker) + t.input <- Reset } // For ease of .Stop()'ing services before .Start()'ing them, @@ -67,20 +53,55 @@ func (t *RepeatTimer) Stop() bool { if t == nil { return false } - t.mtx.Lock() // Lock - defer t.mtx.Unlock() - - exists := t.ticker != nil - if exists { - t.ticker.Stop() // does not close the channel - select { - case <-t.Ch: - // read off channel if there's anything there - default: - } - close(t.quit) - t.wg.Wait() // must wait for quit to close else we race Reset - t.ticker = nil - } - return exists + t.input <- RQuit + return true +} + +func (t *RepeatTimer) run() { + for { + fmt.Println("for") + select { + case cmd := <-t.input: + // stop goroutine if the input says so + // don't close channels, as closed channels mess up select reads + if t.processInput(cmd) { + t.timer.Stop() + return + } + case <-t.timer.C: + fmt.Println("tick") + // send if not blocked, then start the next tick + // for blocking send, just + // t.output <- time.Now() + t.trySend() + t.timer.Reset(t.dur) + } + } +} + +// trySend performs non-blocking send on t.Ch +func (t *RepeatTimer) trySend() { + // TODO: this was blocking in previous version (t.Ch <- t_) + // should I use that behavior unstead of unblocking as per throttle? + select { + case t.output <- time.Now(): + default: + } +} + +// all modifications of the internal state of ThrottleTimer +// happen in this method. It is only called from the run goroutine +// so we avoid any race conditions +func (t *RepeatTimer) processInput(cmd repeatCommand) (shutdown bool) { + fmt.Printf("process: %d\n", cmd) + switch cmd { + case Reset: + t.timer.Reset(t.dur) + case RQuit: + t.timer.Stop() + shutdown = true + default: + panic("unknown command!") + } + return shutdown } diff --git a/common/repeat_timer_test.go b/common/repeat_timer_test.go index 87f34b95..d66cd315 100644 --- a/common/repeat_timer_test.go +++ b/common/repeat_timer_test.go @@ -10,7 +10,7 @@ import ( ) type rCounter struct { - input chan time.Time + input <-chan time.Time mtx sync.Mutex count int } @@ -74,5 +74,5 @@ func TestRepeat(test *testing.T) { assert.Equal(6, c.Count()) // close channel to stop counter - close(t.Ch) + t.Stop() } diff --git a/common/throttle_timer.go b/common/throttle_timer.go index ab2ad2e6..c148d990 100644 --- a/common/throttle_timer.go +++ b/common/throttle_timer.go @@ -13,7 +13,7 @@ at most once every "dur". type ThrottleTimer struct { Name string Ch <-chan struct{} - input chan command + input chan throttleCommand output chan<- struct{} dur time.Duration @@ -21,12 +21,12 @@ type ThrottleTimer struct { isSet bool } -type command int32 +type throttleCommand int32 const ( - Set command = iota + Set throttleCommand = iota Unset - Quit + TQuit ) // NewThrottleTimer creates a new ThrottleTimer. @@ -36,7 +36,7 @@ func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer { Name: name, Ch: c, dur: dur, - input: make(chan command), + input: make(chan throttleCommand), output: c, timer: time.NewTimer(dur), } @@ -74,14 +74,14 @@ func (t *ThrottleTimer) trySend() { // all modifications of the internal state of ThrottleTimer // happen in this method. It is only called from the run goroutine // so we avoid any race conditions -func (t *ThrottleTimer) processInput(cmd command) (shutdown bool) { +func (t *ThrottleTimer) processInput(cmd throttleCommand) (shutdown bool) { switch cmd { case Set: if !t.isSet { t.isSet = true t.timer.Reset(t.dur) } - case Quit: + case TQuit: shutdown = true fallthrough case Unset: @@ -122,6 +122,6 @@ func (t *ThrottleTimer) Stop() bool { if t == nil { return false } - t.input <- Quit + t.input <- TQuit return true }