Parallel reaps automatically before returning

This commit is contained in:
Jae Kwon 2018-03-20 23:08:51 +01:00
parent 4caf943f49
commit 4e5c655944
2 changed files with 35 additions and 31 deletions

View File

@ -41,7 +41,7 @@ func (trs *TaskResultSet) Channels() []TaskResultCh {
return trs.chz return trs.chz
} }
func (trs *TaskResultSet) LastResult(index int) (TaskResult, bool) { func (trs *TaskResultSet) LatestResult(index int) (TaskResult, bool) {
if len(trs.results) <= index { if len(trs.results) <= index {
return TaskResult{}, false return TaskResult{}, false
} }
@ -50,7 +50,7 @@ func (trs *TaskResultSet) LastResult(index int) (TaskResult, bool) {
} }
// NOTE: Not concurrency safe. // NOTE: Not concurrency safe.
func (trs *TaskResultSet) Reap() { func (trs *TaskResultSet) Reap() *TaskResultSet {
if trs.results == nil { if trs.results == nil {
trs.results = make([]taskResultOK, len(trs.chz)) trs.results = make([]taskResultOK, len(trs.chz))
} }
@ -67,6 +67,7 @@ func (trs *TaskResultSet) Reap() {
// Do nothing. // Do nothing.
} }
} }
return trs
} }
// Returns the firstmost (by task index) error as // Returns the firstmost (by task index) error as
@ -155,5 +156,5 @@ func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
// We must do this check here (after DONE_LOOP). // We must do this check here (after DONE_LOOP).
ok = ok && (atomic.LoadInt32(numPanics) == 0) ok = ok && (atomic.LoadInt32(numPanics) == 0)
return newTaskResultSet(taskResultChz), ok return newTaskResultSet(taskResultChz).Reap(), ok
} }

View File

@ -2,6 +2,7 @@ package common
import ( import (
"errors" "errors"
"fmt"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -29,22 +30,27 @@ func TestParallel(t *testing.T) {
assert.Equal(t, int(*counter), len(tasks), "Each task should have incremented the counter already") assert.Equal(t, int(*counter), len(tasks), "Each task should have incremented the counter already")
var failedTasks int var failedTasks int
for i := 0; i < len(tasks); i++ { for i := 0; i < len(tasks); i++ {
select { taskResult, ok := trs.LatestResult(i)
case taskResult := <-trs.chz[i]: if !ok {
if taskResult.Error != nil { assert.Fail(t, "Task #%v did not complete.", i)
assert.Fail(t, "Task should not have errored but got %v", taskResult.Error)
failedTasks += 1
} else if !assert.Equal(t, -1*i, taskResult.Value.(int)) {
failedTasks += 1
} else {
// Good!
}
default:
failedTasks += 1 failedTasks += 1
} else if taskResult.Error != nil {
assert.Fail(t, "Task should not have errored but got %v", taskResult.Error)
failedTasks += 1
} else if taskResult.Panic != nil {
assert.Fail(t, "Task should not have panic'd but got %v", taskResult.Panic)
failedTasks += 1
} else if !assert.Equal(t, -1*i, taskResult.Value.(int)) {
assert.Fail(t, "Task should have returned %v but got %v", -1*i, taskResult.Value.(int))
failedTasks += 1
} else {
// Good!
} }
} }
assert.Equal(t, failedTasks, 0, "No task should have failed") assert.Equal(t, failedTasks, 0, "No task should have failed")
assert.Nil(t, trs.FirstError(), "There should be no errors")
assert.Nil(t, trs.FirstPanic(), "There should be no panics")
assert.Equal(t, 0, trs.FirstValue(), "First value should be 0")
} }
func TestParallelAbort(t *testing.T) { func TestParallelAbort(t *testing.T) {
@ -90,9 +96,9 @@ func TestParallelAbort(t *testing.T) {
flow4 <- <-flow3 flow4 <- <-flow3
// Verify task #0, #1, #2. // Verify task #0, #1, #2.
waitFor(t, taskResultSet.chz[0], "Task #0", 0, nil, nil) checkResult(t, taskResultSet, 0, 0, nil, nil)
waitFor(t, taskResultSet.chz[1], "Task #1", 1, errors.New("some error"), nil) checkResult(t, taskResultSet, 1, 1, errors.New("some error"), nil)
waitFor(t, taskResultSet.chz[2], "Task #2", 2, nil, nil) checkResult(t, taskResultSet, 2, 2, nil, nil)
} }
func TestParallelRecover(t *testing.T) { func TestParallelRecover(t *testing.T) {
@ -115,22 +121,19 @@ func TestParallelRecover(t *testing.T) {
assert.False(t, ok, "ok should be false since we panic'd in task #2.") assert.False(t, ok, "ok should be false since we panic'd in task #2.")
// Verify task #0, #1, #2. // Verify task #0, #1, #2.
waitFor(t, taskResultSet.chz[0], "Task #0", 0, nil, nil) checkResult(t, taskResultSet, 0, 0, nil, nil)
waitFor(t, taskResultSet.chz[1], "Task #1", 1, errors.New("some error"), nil) checkResult(t, taskResultSet, 1, 1, errors.New("some error"), nil)
waitFor(t, taskResultSet.chz[2], "Task #2", nil, nil, 2) checkResult(t, taskResultSet, 2, nil, nil, 2)
} }
// Wait for result // Wait for result
func waitFor(t *testing.T, taskResultCh TaskResultCh, taskName string, val interface{}, err error, pnk interface{}) { func checkResult(t *testing.T, taskResultSet *TaskResultSet, index int, val interface{}, err error, pnk interface{}) {
select { taskResult, ok := taskResultSet.LatestResult(index)
case taskResult, ok := <-taskResultCh: taskName := fmt.Sprintf("Task #%v", index)
assert.True(t, ok, "TaskResultCh unexpectedly closed for %v", taskName) assert.True(t, ok, "TaskResultCh unexpectedly closed for %v", taskName)
assert.Equal(t, val, taskResult.Value, taskName) assert.Equal(t, val, taskResult.Value, taskName)
assert.Equal(t, err, taskResult.Error, taskName) assert.Equal(t, err, taskResult.Error, taskName)
assert.Equal(t, pnk, taskResult.Panic, taskName) assert.Equal(t, pnk, taskResult.Panic, taskName)
default:
assert.Fail(t, "Failed to receive result for %v", taskName)
}
} }
// Wait for timeout (no result) // Wait for timeout (no result)