From 91b41ddd59788ef800804b036f47eda73442b780 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 31 Jan 2018 12:13:59 +0400 Subject: [PATCH] add waitCh as an alternative to waitGroup new methods: - [CList] WaitChan() - [CElement] NextWaitChan() - [CElement] PrevWaitChan() Refs https://github.com/tendermint/tendermint/pull/1173 --- CHANGELOG.md | 7 +++++ clist/clist.go | 75 +++++++++++++++++++++++++++++++++++---------- clist/clist_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42b8cdd6..2c946612 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ BREAKING: - [cli] WriteDemoConfig -> WriteConfigValues +## 0.6.1 (TBD) + +IMPROVEMENTS: + - [clist] add WaitChan() to CList, NextWaitChan() and PrevWaitChan() + to CElement. These can be used instead of blocking *Wait() methods + if you need to be able to send quit signal and not block forever + ## 0.6.0 (December 29, 2017) BREAKING: diff --git a/clist/clist.go b/clist/clist.go index a52920f8..28d771a2 100644 --- a/clist/clist.go +++ b/clist/clist.go @@ -36,12 +36,14 @@ waiting on NextWait() (since it's just a read operation). */ type CElement struct { - mtx sync.RWMutex - prev *CElement - prevWg *sync.WaitGroup - next *CElement - nextWg *sync.WaitGroup - removed bool + mtx sync.RWMutex + prev *CElement + prevWg *sync.WaitGroup + prevWaitCh chan struct{} + next *CElement + nextWg *sync.WaitGroup + nextWaitCh chan struct{} + removed bool Value interface{} // immutable } @@ -84,6 +86,24 @@ func (e *CElement) PrevWait() *CElement { } } +// PrevWaitChan can be used to wait until Prev becomes not nil. Once it does, +// channel will be closed. +func (e *CElement) PrevWaitChan() <-chan struct{} { + e.mtx.RLock() + defer e.mtx.RUnlock() + + return e.prevWaitCh +} + +// NextWaitChan can be used to wait until Next becomes not nil. Once it does, +// channel will be closed. +func (e *CElement) NextWaitChan() <-chan struct{} { + e.mtx.RLock() + defer e.mtx.RUnlock() + + return e.nextWaitCh +} + // Nonblocking, may return nil if at the end. func (e *CElement) Next() *CElement { e.mtx.RLock() @@ -142,9 +162,11 @@ func (e *CElement) SetNext(newNext *CElement) { // events, new Add calls must happen after all previous Wait calls have // returned. e.nextWg = waitGroup1() // WaitGroups are difficult to re-use. + e.nextWaitCh = make(chan struct{}) } if oldNext == nil && newNext != nil { e.nextWg.Done() + close(e.nextWaitCh) } } @@ -158,9 +180,11 @@ func (e *CElement) SetPrev(newPrev *CElement) { e.prev = newPrev if oldPrev != nil && newPrev == nil { e.prevWg = waitGroup1() // WaitGroups are difficult to re-use. + e.prevWaitCh = make(chan struct{}) } if oldPrev == nil && newPrev != nil { e.prevWg.Done() + close(e.prevWaitCh) } } @@ -173,9 +197,11 @@ func (e *CElement) SetRemoved() { // This wakes up anyone waiting in either direction. if e.prev == nil { e.prevWg.Done() + close(e.prevWaitCh) } if e.next == nil { e.nextWg.Done() + close(e.nextWaitCh) } } @@ -185,11 +211,12 @@ func (e *CElement) SetRemoved() { // The zero value for CList is an empty list ready to use. // Operations are goroutine-safe. type CList struct { - mtx sync.RWMutex - wg *sync.WaitGroup - head *CElement // first element - tail *CElement // last element - len int // list length + mtx sync.RWMutex + wg *sync.WaitGroup + waitCh chan struct{} + head *CElement // first element + tail *CElement // last element + len int // list length } func (l *CList) Init() *CList { @@ -197,6 +224,7 @@ func (l *CList) Init() *CList { defer l.mtx.Unlock() l.wg = waitGroup1() + l.waitCh = make(chan struct{}) l.head = nil l.tail = nil l.len = 0 @@ -258,23 +286,35 @@ func (l *CList) BackWait() *CElement { } } +// WaitChan can be used to wait until Front or Back becomes not nil. Once it +// does, channel will be closed. +func (l *CList) WaitChan() <-chan struct{} { + l.mtx.Lock() + defer l.mtx.Unlock() + + return l.waitCh +} + func (l *CList) PushBack(v interface{}) *CElement { l.mtx.Lock() defer l.mtx.Unlock() // Construct a new element e := &CElement{ - prev: nil, - prevWg: waitGroup1(), - next: nil, - nextWg: waitGroup1(), - removed: false, - Value: v, + prev: nil, + prevWg: waitGroup1(), + prevWaitCh: make(chan struct{}), + next: nil, + nextWg: waitGroup1(), + nextWaitCh: make(chan struct{}), + removed: false, + Value: v, } // Release waiters on FrontWait/BackWait maybe if l.len == 0 { l.wg.Done() + close(l.waitCh) } l.len += 1 @@ -313,6 +353,7 @@ func (l *CList) Remove(e *CElement) interface{} { // If we're removing the only item, make CList FrontWait/BackWait wait. if l.len == 1 { l.wg = waitGroup1() // WaitGroups are difficult to re-use. + l.waitCh = make(chan struct{}) } // Update l.len diff --git a/clist/clist_test.go b/clist/clist_test.go index 9d5272de..31f82165 100644 --- a/clist/clist_test.go +++ b/clist/clist_test.go @@ -218,3 +218,76 @@ func TestScanRightDeleteRandom(t *testing.T) { t.Fatal("Failed to remove all elements from CList") } } + +func TestWaitChan(t *testing.T) { + l := New() + ch := l.WaitChan() + + // 1) add one element to an empty list + go l.PushBack(1) + <-ch + + // 2) and remove it + el := l.Front() + v := l.Remove(el) + if v != 1 { + t.Fatal("where is 1 coming from?") + } + + // 3) test iterating forward and waiting for Next (NextWaitChan and Next) + el = l.PushBack(0) + + done := make(chan struct{}) + pushed := 0 + go func() { + for i := 1; i < 100; i++ { + l.PushBack(i) + pushed++ + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + } + close(done) + }() + + next := el + seen := 0 +FOR_LOOP: + for { + select { + case <-next.NextWaitChan(): + next = next.Next() + seen++ + if next == nil { + continue + } + case <-done: + break FOR_LOOP + case <-time.After(10 * time.Second): + t.Fatal("max execution time") + } + } + + if pushed != seen { + t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen) + } + + // 4) test iterating backwards (PrevWaitChan and Prev) + prev := next + seen = 0 +FOR_LOOP2: + for { + select { + case <-prev.PrevWaitChan(): + prev = prev.Prev() + seen++ + if prev == nil { + t.Fatal("expected PrevWaitChan to block forever on nil when reached first elem") + } + case <-time.After(5 * time.Second): + break FOR_LOOP2 + } + } + + if pushed != seen { + t.Fatalf("number of pushed items (%d) not equal to number of seen items (%d)", pushed, seen) + } +}