Fixes TestParallelAbort nondeterministic failure #201 (#202)

This commit is contained in:
Jae Kwon 2018-04-23 00:07:03 -07:00 committed by GitHub
parent e328006bfe
commit 8fa4211bbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 9 deletions

View File

@ -32,7 +32,7 @@ type TaskResultSet struct {
func newTaskResultSet(chz []TaskResultCh) *TaskResultSet { func newTaskResultSet(chz []TaskResultCh) *TaskResultSet {
return &TaskResultSet{ return &TaskResultSet{
chz: chz, chz: chz,
results: nil, results: make([]taskResultOK, len(chz)),
} }
} }
@ -49,18 +49,20 @@ func (trs *TaskResultSet) LatestResult(index int) (TaskResult, bool) {
} }
// NOTE: Not concurrency safe. // NOTE: Not concurrency safe.
// Writes results to trs.results without waiting for all tasks to complete.
func (trs *TaskResultSet) Reap() *TaskResultSet { func (trs *TaskResultSet) Reap() *TaskResultSet {
if trs.results == nil {
trs.results = make([]taskResultOK, len(trs.chz))
}
for i := 0; i < len(trs.results); i++ { for i := 0; i < len(trs.results); i++ {
var trch = trs.chz[i] var trch = trs.chz[i]
select { select {
case result := <-trch: case result, ok := <-trch:
// Overwrite result. if ok {
trs.results[i] = taskResultOK{ // Write result.
TaskResult: result, trs.results[i] = taskResultOK{
OK: true, TaskResult: result,
OK: true,
}
} else {
// We already wrote it.
} }
default: default:
// Do nothing. // Do nothing.
@ -69,6 +71,27 @@ func (trs *TaskResultSet) Reap() *TaskResultSet {
return trs return trs
} }
// NOTE: Not concurrency safe.
// Like Reap() but waits until all tasks have returned or panic'd.
func (trs *TaskResultSet) Wait() *TaskResultSet {
for i := 0; i < len(trs.results); i++ {
var trch = trs.chz[i]
select {
case result, ok := <-trch:
if ok {
// Write result.
trs.results[i] = taskResultOK{
TaskResult: result,
OK: true,
}
} else {
// We already wrote it.
}
}
}
return trs
}
// Returns the firstmost (by task index) error as // Returns the firstmost (by task index) error as
// discovered by all previous Reap() calls. // discovered by all previous Reap() calls.
func (trs *TaskResultSet) FirstValue() interface{} { func (trs *TaskResultSet) FirstValue() interface{} {
@ -116,7 +139,11 @@ func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
defer func() { defer func() {
if pnk := recover(); pnk != nil { if pnk := recover(); pnk != nil {
atomic.AddInt32(numPanics, 1) atomic.AddInt32(numPanics, 1)
// Send panic to taskResultCh.
taskResultCh <- TaskResult{nil, ErrorWrap(pnk, "Panic in task")} taskResultCh <- TaskResult{nil, ErrorWrap(pnk, "Panic in task")}
// Closing taskResultCh lets trs.Wait() work.
close(taskResultCh)
// Decrement waitgroup.
taskDoneCh <- false taskDoneCh <- false
} }
}() }()
@ -125,6 +152,8 @@ func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
// Send val/err to taskResultCh. // Send val/err to taskResultCh.
// NOTE: Below this line, nothing must panic/ // NOTE: Below this line, nothing must panic/
taskResultCh <- TaskResult{val, err} taskResultCh <- TaskResult{val, err}
// Closing taskResultCh lets trs.Wait() work.
close(taskResultCh)
// Decrement waitgroup. // Decrement waitgroup.
taskDoneCh <- abort taskDoneCh <- abort
}(i, task, taskResultCh) }(i, task, taskResultCh)

View File

@ -91,10 +91,14 @@ func TestParallelAbort(t *testing.T) {
// Now let the last task (#3) complete after abort. // Now let the last task (#3) complete after abort.
flow4 <- <-flow3 flow4 <- <-flow3
// Wait until all tasks have returned or panic'd.
taskResultSet.Wait()
// Verify task #0, #1, #2. // Verify task #0, #1, #2.
checkResult(t, taskResultSet, 0, 0, nil, nil) checkResult(t, taskResultSet, 0, 0, nil, nil)
checkResult(t, taskResultSet, 1, 1, errors.New("some error"), nil) checkResult(t, taskResultSet, 1, 1, errors.New("some error"), nil)
checkResult(t, taskResultSet, 2, 2, nil, nil) checkResult(t, taskResultSet, 2, 2, nil, nil)
checkResult(t, taskResultSet, 3, 3, nil, nil)
} }
func TestParallelRecover(t *testing.T) { func TestParallelRecover(t *testing.T) {