From 4caf943f49759c5429a1228e390330e622a43738 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 20 Mar 2018 21:43:58 +0100 Subject: [PATCH] Parallel returns a TaskResultSet --- common/async.go | 95 +++++++++++++++++++++++++++++++++++++++++--- common/async_test.go | 24 +++++------ 2 files changed, 102 insertions(+), 17 deletions(-) diff --git a/common/async.go b/common/async.go index e7bc71b1..64e32076 100644 --- a/common/async.go +++ b/common/async.go @@ -4,6 +4,9 @@ import ( "sync/atomic" ) +//---------------------------------------- +// Task + // val: the value returned after task execution. // err: the error returned during task completion. // abort: tells Parallel to return, whether or not all tasks have completed. @@ -17,12 +20,97 @@ type TaskResult struct { type TaskResultCh <-chan TaskResult +type taskResultOK struct { + TaskResult + OK bool +} + +type TaskResultSet struct { + chz []TaskResultCh + results []taskResultOK +} + +func newTaskResultSet(chz []TaskResultCh) *TaskResultSet { + return &TaskResultSet{ + chz: chz, + results: nil, + } +} + +func (trs *TaskResultSet) Channels() []TaskResultCh { + return trs.chz +} + +func (trs *TaskResultSet) LastResult(index int) (TaskResult, bool) { + if len(trs.results) <= index { + return TaskResult{}, false + } + resultOK := trs.results[index] + return resultOK.TaskResult, resultOK.OK +} + +// NOTE: Not concurrency safe. +func (trs *TaskResultSet) Reap() { + if trs.results == nil { + trs.results = make([]taskResultOK, len(trs.chz)) + } + for i := 0; i < len(trs.results); i++ { + var trch = trs.chz[i] + select { + case result := <-trch: + // Overwrite result. + trs.results[i] = taskResultOK{ + TaskResult: result, + OK: true, + } + default: + // Do nothing. + } + } +} + +// Returns the firstmost (by task index) error as +// discovered by all previous Reap() calls. +func (trs *TaskResultSet) FirstValue() interface{} { + for _, result := range trs.results { + if result.Value != nil { + return result.Value + } + } + return nil +} + +// Returns the firstmost (by task index) error as +// discovered by all previous Reap() calls. +func (trs *TaskResultSet) FirstError() error { + for _, result := range trs.results { + if result.Error != nil { + return result.Error + } + } + return nil +} + +// Returns the firstmost (by task index) panic as +// discovered by all previous Reap() calls. +func (trs *TaskResultSet) FirstPanic() interface{} { + for _, result := range trs.results { + if result.Panic != nil { + return result.Panic + } + } + return nil +} + +//---------------------------------------- +// Parallel + // Run tasks in parallel, with ability to abort early. // Returns ok=false iff any of the tasks returned abort=true. // NOTE: Do not implement quit features here. Instead, provide convenient // concurrent quit-like primitives, passed implicitly via Task closures. (e.g. // it's not Parallel's concern how you quit/abort your tasks). -func Parallel(tasks ...Task) (chz []TaskResultCh, ok bool) { +func Parallel(tasks ...Task) (trs *TaskResultSet, ok bool) { var taskResultChz = make([]TaskResultCh, len(tasks)) // To return. var taskDoneCh = make(chan bool, len(tasks)) // A "wait group" channel, early abort if any true received. var numPanics = new(int32) // Keep track of panics to set ok=false later. @@ -67,8 +155,5 @@ func Parallel(tasks ...Task) (chz []TaskResultCh, ok bool) { // We must do this check here (after DONE_LOOP). ok = ok && (atomic.LoadInt32(numPanics) == 0) - // Caller can use this however they want. - // TODO: implement convenience functions to - // make sense of this structure safely. - return taskResultChz, ok + return newTaskResultSet(taskResultChz), ok } diff --git a/common/async_test.go b/common/async_test.go index f2a83d56..3b47c3fa 100644 --- a/common/async_test.go +++ b/common/async_test.go @@ -22,7 +22,7 @@ func TestParallel(t *testing.T) { } // Run in parallel. - var taskResultChz, ok = Parallel(tasks...) + var trs, ok = Parallel(tasks...) assert.True(t, ok) // Verify. @@ -30,7 +30,7 @@ func TestParallel(t *testing.T) { var failedTasks int for i := 0; i < len(tasks); i++ { select { - case taskResult := <-taskResultChz[i]: + case taskResult := <-trs.chz[i]: if taskResult.Error != nil { assert.Fail(t, "Task should not have errored but got %v", taskResult.Error) failedTasks += 1 @@ -79,20 +79,20 @@ func TestParallelAbort(t *testing.T) { } // Run in parallel. - var taskResultChz, ok = Parallel(tasks...) + var taskResultSet, ok = Parallel(tasks...) assert.False(t, ok, "ok should be false since we aborted task #2.") // Verify task #3. - // Initially taskResultCh[3] sends nothing since flow4 didn't send. - waitTimeout(t, taskResultChz[3], "Task #3") + // Initially taskResultSet.chz[3] sends nothing since flow4 didn't send. + waitTimeout(t, taskResultSet.chz[3], "Task #3") // Now let the last task (#3) complete after abort. flow4 <- <-flow3 // Verify task #0, #1, #2. - waitFor(t, taskResultChz[0], "Task #0", 0, nil, nil) - waitFor(t, taskResultChz[1], "Task #1", 1, errors.New("some error"), nil) - waitFor(t, taskResultChz[2], "Task #2", 2, nil, nil) + waitFor(t, taskResultSet.chz[0], "Task #0", 0, nil, nil) + waitFor(t, taskResultSet.chz[1], "Task #1", 1, errors.New("some error"), nil) + waitFor(t, taskResultSet.chz[2], "Task #2", 2, nil, nil) } func TestParallelRecover(t *testing.T) { @@ -111,13 +111,13 @@ func TestParallelRecover(t *testing.T) { } // Run in parallel. - var taskResultChz, ok = Parallel(tasks...) + var taskResultSet, ok = Parallel(tasks...) assert.False(t, ok, "ok should be false since we panic'd in task #2.") // Verify task #0, #1, #2. - waitFor(t, taskResultChz[0], "Task #0", 0, nil, nil) - waitFor(t, taskResultChz[1], "Task #1", 1, errors.New("some error"), nil) - waitFor(t, taskResultChz[2], "Task #2", nil, nil, 2) + waitFor(t, taskResultSet.chz[0], "Task #0", 0, nil, nil) + waitFor(t, taskResultSet.chz[1], "Task #1", 1, errors.New("some error"), nil) + waitFor(t, taskResultSet.chz[2], "Task #2", nil, nil, 2) } // Wait for result