// // Copyright (c) 2022-present, Trail of Bits, Inc. // All rights reserved. // // This source code is licensed in accordance with the terms specified in // the LICENSE file found in the root directory of this source tree. // using System; using System.Collections.Generic; using System.Linq; using System.Diagnostics; using System.Threading.Tasks; using System.Text; namespace RpcInvestigator { using static TraceLogger; public static class TaskWorker { public class TaskWorkerResult { public StringBuilder Messages; public T TaskResult; } public static async Task>> Run( List Input, int WorkSize, Func, Task>> WorkRoutine ) { // // This routine takes an input array of work items and splices it into // multiple smaller lists that are passed to an asynchronous work routine. // The results of all those workers is returned. // var workers = new List>>(); for (int i = 0; i < Input.Count; i += WorkSize) { var input = Input.Skip(i).Take(WorkSize).ToList(); workers.Add(Task.Run(() => WorkRoutine(input))); } // // If a WorkRoutine task threw an unhandled exception, it will be treated // as a TPL AggregateException which itself must be handled, or the whole // application will go down. // var results = await Task.WhenAll(workers.ToArray()).ContinueWith(final => { if (final.Exception != null) { final.Exception.Flatten().Handle(ex => { Trace(TraceLoggerType.TaskWorker, TraceEventType.Error, "Exception in Run: " + ex.Message); return true; }); } return final.Result; }); return results.ToList(); } public static async Task Run( List Input, int WorkSize, Func, Task> WorkRoutine ) { // // This routine takes an input array of work items and splices it into // multiple smaller lists that are passed to an asynchronous work routine. // The results of all those workers is returned. // var workers = new List>(); for (int i = 0; i < Input.Count; i += WorkSize) { var input = Input.Skip(i).Take(WorkSize).ToList(); workers.Add(Task.Run(() => WorkRoutine(input))); } // // If a WorkRoutine task threw an unhandled exception, it will be treated // as a TPL AggregateException which itself must be handled, or the whole // application will go down. // _ = await Task.WhenAll(workers.ToArray()).ContinueWith(final => { if (final.Exception != null) { final.Exception.Flatten().Handle(ex => { Trace(TraceLoggerType.TaskWorker, TraceEventType.Error, "Exception in Run: " + ex.Message); return true; }); } return final.Result; }); return true; } public static async Task>> RunSync( List Input, int WorkSize, Func, TaskWorkerResult> WorkRoutine ) { // // This routine takes an input array of work items and splices it into // multiple smaller lists that are passed to a synchronous work routine. // The results of all those workers is returned. // var workers = new List>>(); for (int i = 0; i < Input.Count; i += WorkSize) { var input = Input.Skip(i).Take(WorkSize).ToList(); workers.Add(Task.Run(() => WorkRoutine(input))); } // // If a WorkRoutine task threw an unhandled exception, it will be treated // as a TPL AggregateException which itself must be handled, or the whole // application will go down. // var results = await Task.WhenAll(workers.ToArray()).ContinueWith(final => { if (final.Exception != null) { final.Exception.Flatten().Handle(ex => { Trace(TraceLoggerType.TaskWorker, TraceEventType.Error, "Exception in RunSync: " + ex.Message); return true; }); } return final.Result; }); return results.ToList(); } } }