Parallel returns a TaskResultSet
This commit is contained in:
parent
db48010e81
commit
4caf943f49
|
@ -4,6 +4,9 @@ import (
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//----------------------------------------
|
||||||
|
// Task
|
||||||
|
|
||||||
// val: the value returned after task execution.
|
// val: the value returned after task execution.
|
||||||
// err: the error returned during task completion.
|
// err: the error returned during task completion.
|
||||||
// abort: tells Parallel to return, whether or not all tasks have completed.
|
// abort: tells Parallel to return, whether or not all tasks have completed.
|
||||||
|
@ -17,12 +20,97 @@ type TaskResult struct {
|
||||||
|
|
||||||
type TaskResultCh <-chan TaskResult
|
type TaskResultCh <-chan TaskResult
|
||||||
|
|
||||||
|
type taskResultOK struct {
|
||||||
|
TaskResult
|
||||||
|
OK bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type TaskResultSet struct {
|
||||||
|
chz []TaskResultCh
|
||||||
|
results []taskResultOK
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskResultSet(chz []TaskResultCh) *TaskResultSet {
|
||||||
|
return &TaskResultSet{
|
||||||
|
chz: chz,
|
||||||
|
results: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (trs *TaskResultSet) Channels() []TaskResultCh {
|
||||||
|
return trs.chz
|
||||||
|
}
|
||||||
|
|
||||||
|
func (trs *TaskResultSet) LastResult(index int) (TaskResult, bool) {
|
||||||
|
if len(trs.results) <= index {
|
||||||
|
return TaskResult{}, false
|
||||||
|
}
|
||||||
|
resultOK := trs.results[index]
|
||||||
|
return resultOK.TaskResult, resultOK.OK
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: Not concurrency safe.
|
||||||
|
func (trs *TaskResultSet) Reap() {
|
||||||
|
if trs.results == nil {
|
||||||
|
trs.results = make([]taskResultOK, len(trs.chz))
|
||||||
|
}
|
||||||
|
for i := 0; i < len(trs.results); i++ {
|
||||||
|
var trch = trs.chz[i]
|
||||||
|
select {
|
||||||
|
case result := <-trch:
|
||||||
|
// Overwrite result.
|
||||||
|
trs.results[i] = taskResultOK{
|
||||||
|
TaskResult: result,
|
||||||
|
OK: true,
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// Do nothing.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the firstmost (by task index) error as
|
||||||
|
// discovered by all previous Reap() calls.
|
||||||
|
func (trs *TaskResultSet) FirstValue() interface{} {
|
||||||
|
for _, result := range trs.results {
|
||||||
|
if result.Value != nil {
|
||||||
|
return result.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the firstmost (by task index) error as
|
||||||
|
// discovered by all previous Reap() calls.
|
||||||
|
func (trs *TaskResultSet) FirstError() error {
|
||||||
|
for _, result := range trs.results {
|
||||||
|
if result.Error != nil {
|
||||||
|
return result.Error
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the firstmost (by task index) panic as
|
||||||
|
// discovered by all previous Reap() calls.
|
||||||
|
func (trs *TaskResultSet) FirstPanic() interface{} {
|
||||||
|
for _, result := range trs.results {
|
||||||
|
if result.Panic != nil {
|
||||||
|
return result.Panic
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//----------------------------------------
|
||||||
|
// Parallel
|
||||||
|
|
||||||
// Run tasks in parallel, with ability to abort early.
|
// Run tasks in parallel, with ability to abort early.
|
||||||
// Returns ok=false iff any of the tasks returned abort=true.
|
// Returns ok=false iff any of the tasks returned abort=true.
|
||||||
// NOTE: Do not implement quit features here. Instead, provide convenient
|
// NOTE: Do not implement quit features here. Instead, provide convenient
|
||||||
// concurrent quit-like primitives, passed implicitly via Task closures. (e.g.
|
// concurrent quit-like primitives, passed implicitly via Task closures. (e.g.
|
||||||
// it's not Parallel's concern how you quit/abort your tasks).
|
// it's not Parallel's concern how you quit/abort your tasks).
|
||||||
func Parallel(tasks ...Task) (chz []TaskResultCh, ok bool) {
|
func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) {
|
||||||
var taskResultChz = make([]TaskResultCh, len(tasks)) // To return.
|
var taskResultChz = make([]TaskResultCh, len(tasks)) // To return.
|
||||||
var taskDoneCh = make(chan bool, len(tasks)) // A "wait group" channel, early abort if any true received.
|
var taskDoneCh = make(chan bool, len(tasks)) // A "wait group" channel, early abort if any true received.
|
||||||
var numPanics = new(int32) // Keep track of panics to set ok=false later.
|
var numPanics = new(int32) // Keep track of panics to set ok=false later.
|
||||||
|
@ -67,8 +155,5 @@ func Parallel(tasks ...Task) (chz []TaskResultCh, ok bool) {
|
||||||
// We must do this check here (after DONE_LOOP).
|
// We must do this check here (after DONE_LOOP).
|
||||||
ok = ok && (atomic.LoadInt32(numPanics) == 0)
|
ok = ok && (atomic.LoadInt32(numPanics) == 0)
|
||||||
|
|
||||||
// Caller can use this however they want.
|
return newTaskResultSet(taskResultChz), ok
|
||||||
// TODO: implement convenience functions to
|
|
||||||
// make sense of this structure safely.
|
|
||||||
return taskResultChz, ok
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ func TestParallel(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run in parallel.
|
// Run in parallel.
|
||||||
var taskResultChz, ok = Parallel(tasks...)
|
var trs, ok = Parallel(tasks...)
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
|
|
||||||
// Verify.
|
// Verify.
|
||||||
|
@ -30,7 +30,7 @@ func TestParallel(t *testing.T) {
|
||||||
var failedTasks int
|
var failedTasks int
|
||||||
for i := 0; i < len(tasks); i++ {
|
for i := 0; i < len(tasks); i++ {
|
||||||
select {
|
select {
|
||||||
case taskResult := <-taskResultChz[i]:
|
case taskResult := <-trs.chz[i]:
|
||||||
if taskResult.Error != nil {
|
if taskResult.Error != nil {
|
||||||
assert.Fail(t, "Task should not have errored but got %v", taskResult.Error)
|
assert.Fail(t, "Task should not have errored but got %v", taskResult.Error)
|
||||||
failedTasks += 1
|
failedTasks += 1
|
||||||
|
@ -79,20 +79,20 @@ func TestParallelAbort(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run in parallel.
|
// Run in parallel.
|
||||||
var taskResultChz, ok = Parallel(tasks...)
|
var taskResultSet, ok = Parallel(tasks...)
|
||||||
assert.False(t, ok, "ok should be false since we aborted task #2.")
|
assert.False(t, ok, "ok should be false since we aborted task #2.")
|
||||||
|
|
||||||
// Verify task #3.
|
// Verify task #3.
|
||||||
// Initially taskResultCh[3] sends nothing since flow4 didn't send.
|
// Initially taskResultSet.chz[3] sends nothing since flow4 didn't send.
|
||||||
waitTimeout(t, taskResultChz[3], "Task #3")
|
waitTimeout(t, taskResultSet.chz[3], "Task #3")
|
||||||
|
|
||||||
// Now let the last task (#3) complete after abort.
|
// Now let the last task (#3) complete after abort.
|
||||||
flow4 <- <-flow3
|
flow4 <- <-flow3
|
||||||
|
|
||||||
// Verify task #0, #1, #2.
|
// Verify task #0, #1, #2.
|
||||||
waitFor(t, taskResultChz[0], "Task #0", 0, nil, nil)
|
waitFor(t, taskResultSet.chz[0], "Task #0", 0, nil, nil)
|
||||||
waitFor(t, taskResultChz[1], "Task #1", 1, errors.New("some error"), nil)
|
waitFor(t, taskResultSet.chz[1], "Task #1", 1, errors.New("some error"), nil)
|
||||||
waitFor(t, taskResultChz[2], "Task #2", 2, nil, nil)
|
waitFor(t, taskResultSet.chz[2], "Task #2", 2, nil, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParallelRecover(t *testing.T) {
|
func TestParallelRecover(t *testing.T) {
|
||||||
|
@ -111,13 +111,13 @@ func TestParallelRecover(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run in parallel.
|
// Run in parallel.
|
||||||
var taskResultChz, ok = Parallel(tasks...)
|
var taskResultSet, ok = Parallel(tasks...)
|
||||||
assert.False(t, ok, "ok should be false since we panic'd in task #2.")
|
assert.False(t, ok, "ok should be false since we panic'd in task #2.")
|
||||||
|
|
||||||
// Verify task #0, #1, #2.
|
// Verify task #0, #1, #2.
|
||||||
waitFor(t, taskResultChz[0], "Task #0", 0, nil, nil)
|
waitFor(t, taskResultSet.chz[0], "Task #0", 0, nil, nil)
|
||||||
waitFor(t, taskResultChz[1], "Task #1", 1, errors.New("some error"), nil)
|
waitFor(t, taskResultSet.chz[1], "Task #1", 1, errors.New("some error"), nil)
|
||||||
waitFor(t, taskResultChz[2], "Task #2", nil, nil, 2)
|
waitFor(t, taskResultSet.chz[2], "Task #2", nil, nil, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for result
|
// Wait for result
|
||||||
|
|
Loading…
Reference in New Issue