Refactor throttle timer
This commit is contained in:
parent
3d9113c16e
commit
dcb4395604
|
@ -1,7 +1,7 @@
|
||||||
package common
|
package common
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -12,54 +12,88 @@ If a long continuous burst of .Set() calls happens, ThrottleTimer fires
|
||||||
at most once every "dur".
|
at most once every "dur".
|
||||||
*/
|
*/
|
||||||
type ThrottleTimer struct {
|
type ThrottleTimer struct {
|
||||||
Name string
|
Name string
|
||||||
Ch chan struct{}
|
Ch chan struct{}
|
||||||
quit chan struct{}
|
input chan command
|
||||||
dur time.Duration
|
dur time.Duration
|
||||||
|
|
||||||
mtx sync.Mutex
|
|
||||||
timer *time.Timer
|
timer *time.Timer
|
||||||
isSet bool
|
isSet bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type command int32
|
||||||
|
|
||||||
|
const (
|
||||||
|
Set command = iota
|
||||||
|
Unset
|
||||||
|
Quit
|
||||||
|
)
|
||||||
|
|
||||||
func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer {
|
func NewThrottleTimer(name string, dur time.Duration) *ThrottleTimer {
|
||||||
var ch = make(chan struct{})
|
var t = &ThrottleTimer{
|
||||||
var quit = make(chan struct{})
|
Name: name,
|
||||||
var t = &ThrottleTimer{Name: name, Ch: ch, dur: dur, quit: quit}
|
Ch: make(chan struct{}, 1),
|
||||||
t.mtx.Lock()
|
dur: dur,
|
||||||
t.timer = time.AfterFunc(dur, t.fireRoutine)
|
input: make(chan command),
|
||||||
t.mtx.Unlock()
|
timer: time.NewTimer(dur),
|
||||||
|
}
|
||||||
t.timer.Stop()
|
t.timer.Stop()
|
||||||
|
go t.run()
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *ThrottleTimer) fireRoutine() {
|
func (t *ThrottleTimer) run() {
|
||||||
t.mtx.Lock()
|
for {
|
||||||
defer t.mtx.Unlock()
|
select {
|
||||||
select {
|
case cmd := <-t.input:
|
||||||
case t.Ch <- struct{}{}:
|
// stop goroutine if the input says so
|
||||||
t.isSet = false
|
if t.processInput(cmd) {
|
||||||
case <-t.quit:
|
// TODO: do we want to close the channels???
|
||||||
// do nothing
|
// close(t.Ch)
|
||||||
default:
|
// close(t.input)
|
||||||
t.timer.Reset(t.dur)
|
return
|
||||||
|
}
|
||||||
|
case <-t.timer.C:
|
||||||
|
t.isSet = false
|
||||||
|
t.Ch <- struct{}{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// all modifications of the internal state of ThrottleTimer
|
||||||
|
// happen in this method. It is only called from the run goroutine
|
||||||
|
// so we avoid any race conditions
|
||||||
|
func (t *ThrottleTimer) processInput(cmd command) (shutdown bool) {
|
||||||
|
fmt.Printf("processInput: %d\n", cmd)
|
||||||
|
switch cmd {
|
||||||
|
case Set:
|
||||||
|
if !t.isSet {
|
||||||
|
t.isSet = true
|
||||||
|
t.timer.Reset(t.dur)
|
||||||
|
}
|
||||||
|
case Quit:
|
||||||
|
shutdown = true
|
||||||
|
fallthrough
|
||||||
|
case Unset:
|
||||||
|
if t.isSet {
|
||||||
|
t.isSet = false
|
||||||
|
if !t.timer.Stop() {
|
||||||
|
<-t.timer.C
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
panic("unknown command!")
|
||||||
|
}
|
||||||
|
// return true
|
||||||
|
return shutdown
|
||||||
|
}
|
||||||
|
|
||||||
func (t *ThrottleTimer) Set() {
|
func (t *ThrottleTimer) Set() {
|
||||||
t.mtx.Lock()
|
t.input <- Set
|
||||||
defer t.mtx.Unlock()
|
|
||||||
if !t.isSet {
|
|
||||||
t.isSet = true
|
|
||||||
t.timer.Reset(t.dur)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *ThrottleTimer) Unset() {
|
func (t *ThrottleTimer) Unset() {
|
||||||
t.mtx.Lock()
|
t.input <- Unset
|
||||||
defer t.mtx.Unlock()
|
|
||||||
t.isSet = false
|
|
||||||
t.timer.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// For ease of .Stop()'ing services before .Start()'ing them,
|
// For ease of .Stop()'ing services before .Start()'ing them,
|
||||||
|
@ -68,8 +102,6 @@ func (t *ThrottleTimer) Stop() bool {
|
||||||
if t == nil {
|
if t == nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
close(t.quit)
|
t.input <- Quit
|
||||||
t.mtx.Lock()
|
return true
|
||||||
defer t.mtx.Unlock()
|
|
||||||
return t.timer.Stop()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,6 +41,7 @@ func TestThrottle(test *testing.T) {
|
||||||
|
|
||||||
ms := 50
|
ms := 50
|
||||||
delay := time.Duration(ms) * time.Millisecond
|
delay := time.Duration(ms) * time.Millisecond
|
||||||
|
shortwait := time.Duration(ms/2) * time.Millisecond
|
||||||
longwait := time.Duration(2) * delay
|
longwait := time.Duration(2) * delay
|
||||||
t := NewThrottleTimer("foo", delay)
|
t := NewThrottleTimer("foo", delay)
|
||||||
|
|
||||||
|
@ -65,6 +66,21 @@ func TestThrottle(test *testing.T) {
|
||||||
time.Sleep(longwait)
|
time.Sleep(longwait)
|
||||||
assert.Equal(2, c.Count())
|
assert.Equal(2, c.Count())
|
||||||
|
|
||||||
|
// keep cancelling before it is ready
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
t.Set()
|
||||||
|
time.Sleep(shortwait)
|
||||||
|
t.Unset()
|
||||||
|
}
|
||||||
|
time.Sleep(longwait)
|
||||||
|
assert.Equal(2, c.Count())
|
||||||
|
|
||||||
|
// a few unsets do nothing...
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
t.Unset()
|
||||||
|
}
|
||||||
|
assert.Equal(2, c.Count())
|
||||||
|
|
||||||
// send 12, over 2 delay sections, adds 3
|
// send 12, over 2 delay sections, adds 3
|
||||||
short := time.Duration(ms/5) * time.Millisecond
|
short := time.Duration(ms/5) * time.Millisecond
|
||||||
for i := 0; i < 13; i++ {
|
for i := 0; i < 13; i++ {
|
||||||
|
@ -74,5 +90,6 @@ func TestThrottle(test *testing.T) {
|
||||||
time.Sleep(longwait)
|
time.Sleep(longwait)
|
||||||
assert.Equal(5, c.Count())
|
assert.Equal(5, c.Count())
|
||||||
|
|
||||||
close(t.Ch)
|
stopped := t.Stop()
|
||||||
|
assert.True(stopped)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue