Fix logical time (#122)

Should fix a nondeterministic bug so...
This commit is contained in:
Jae Kwon 2018-01-14 21:20:53 -08:00 committed by Anton Kaliaev
parent cfbb9338bd
commit ff230682d1
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
3 changed files with 69 additions and 55 deletions

View File

@ -62,12 +62,16 @@ get_vendor_deps:
@echo "--> Running glide install" @echo "--> Running glide install"
@glide install @glide install
######################################## ########################################
### Testing ### Testing
test: test:
go test -tags gcc `glide novendor` go test -tags gcc `glide novendor`
test100:
@for i in {1..100}; do make test; done
######################################## ########################################
### Formatting, linting, and vetting ### Formatting, linting, and vetting

View File

@ -80,13 +80,11 @@ func (t *logicalTicker) fireRoutine(interval time.Duration) {
} }
// Init `lasttime` end // Init `lasttime` end
timeleft := interval
for { for {
select { select {
case newtime := <-source: case newtime := <-source:
elapsed := newtime.Sub(lasttime) elapsed := newtime.Sub(lasttime)
timeleft -= elapsed if interval <= elapsed {
if timeleft <= 0 {
// Block for determinism until the ticker is stopped. // Block for determinism until the ticker is stopped.
select { select {
case t.ch <- newtime: case t.ch <- newtime:
@ -97,7 +95,7 @@ func (t *logicalTicker) fireRoutine(interval time.Duration) {
// Don't try to "catch up" by sending more. // Don't try to "catch up" by sending more.
// "Ticker adjusts the intervals or drops ticks to make up for // "Ticker adjusts the intervals or drops ticks to make up for
// slow receivers" - https://golang.org/pkg/time/#Ticker // slow receivers" - https://golang.org/pkg/time/#Ticker
timeleft = interval lasttime = newtime
} }
case <-t.quit: case <-t.quit:
return // done return // done

View File

@ -1,6 +1,7 @@
package common package common
import ( import (
"sync"
"testing" "testing"
"time" "time"
@ -13,29 +14,42 @@ func TestDefaultTicker(t *testing.T) {
ticker.Stop() ticker.Stop()
} }
func TestRepeat(t *testing.T) { func TestRepeatTimer(t *testing.T) {
ch := make(chan time.Time, 100) ch := make(chan time.Time, 100)
lt := time.Time{} // zero time is year 1 mtx := new(sync.Mutex)
// tick fires `cnt` times for each second. // tick() fires from start to end
tick := func(cnt int) { // (exclusive) in milliseconds with incr.
for i := 0; i < cnt; i++ { // It locks on mtx, so subsequent calls
lt = lt.Add(time.Second) // run in series.
tick := func(startMs, endMs, incrMs time.Duration) {
mtx.Lock()
go func() {
for tMs := startMs; tMs < endMs; tMs += incrMs {
lt := time.Time{}
lt = lt.Add(tMs * time.Millisecond)
ch <- lt ch <- lt
} }
mtx.Unlock()
}()
} }
// tock consumes Ticker.Chan() events `cnt` times. // tock consumes Ticker.Chan() events and checks them against the ms in "timesMs".
tock := func(t *testing.T, rt *RepeatTimer, cnt int) { tock := func(t *testing.T, rt *RepeatTimer, timesMs []int64) {
for i := 0; i < cnt; i++ {
timeout := time.After(time.Second * 10) // Check against timesMs.
select { for _, timeMs := range timesMs {
case <-rt.Chan(): tyme := <-rt.Chan()
case <-timeout: sinceMs := tyme.Sub(time.Time{}) / time.Millisecond
panic("expected RepeatTimer to fire") assert.Equal(t, timeMs, int64(sinceMs))
}
} }
// TODO detect number of running
// goroutines to ensure that
// no other times will fire.
// See https://github.com/tendermint/tmlibs/issues/120.
time.Sleep(time.Millisecond * 100)
done := true done := true
select { select {
case <-rt.Chan(): case <-rt.Chan():
@ -46,46 +60,44 @@ func TestRepeat(t *testing.T) {
} }
tm := NewLogicalTickerMaker(ch) tm := NewLogicalTickerMaker(ch)
dur := time.Duration(10 * time.Millisecond) // less than a second rt := NewRepeatTimerWithTickerMaker("bar", time.Second, tm)
rt := NewRepeatTimerWithTickerMaker("bar", dur, tm)
// Start at 0. /* NOTE: Useful for debugging deadlocks...
tock(t, rt, 0) go func() {
tick(1) // init time time.Sleep(time.Second * 3)
trace := make([]byte, 102400)
count := runtime.Stack(trace, true)
fmt.Printf("Stack of %d bytes: %s\n", count, trace)
}()
*/
tock(t, rt, 0) tick(0, 1000, 10)
tick(1) // wait 1 periods tock(t, rt, []int64{})
tock(t, rt, 1) tick(1000, 2000, 10)
tick(2) // wait 2 periods tock(t, rt, []int64{1000})
tock(t, rt, 2) tick(2005, 5000, 10)
tick(3) // wait 3 periods tock(t, rt, []int64{2005, 3005, 4005})
tock(t, rt, 3) tick(5001, 5999, 1)
tick(4) // wait 4 periods // Read 5005 instead of 5001 because
tock(t, rt, 4) // it's 1 second greater than 4005.
tock(t, rt, []int64{5005})
tick(6000, 7005, 1)
tock(t, rt, []int64{6005})
tick(7033, 8032, 1)
tock(t, rt, []int64{7033})
// Multiple resets leads to no firing. // After a reset, nothing happens
for i := 0; i < 20; i++ { // until two ticks are received.
time.Sleep(time.Millisecond)
rt.Reset() rt.Reset()
} tock(t, rt, []int64{})
tick(8040, 8041, 1)
// After this, it works as new. tock(t, rt, []int64{})
tock(t, rt, 0) tick(9555, 9556, 1)
tick(1) // init time tock(t, rt, []int64{9555})
tock(t, rt, 0)
tick(1) // wait 1 periods
tock(t, rt, 1)
tick(2) // wait 2 periods
tock(t, rt, 2)
tick(3) // wait 3 periods
tock(t, rt, 3)
tick(4) // wait 4 periods
tock(t, rt, 4)
// After a stop, nothing more is sent. // After a stop, nothing more is sent.
rt.Stop() rt.Stop()
tock(t, rt, 0) tock(t, rt, []int64{})
// Another stop panics. // Another stop panics.
assert.Panics(t, func() { rt.Stop() }) assert.Panics(t, func() { rt.Stop() })