using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Hcs.ClientApi { public class HcsParallel { /// /// Асинхронно обрабатывает все элементы @values типа @T методом @processor в параллельном режиме, /// используя максимум @maxThreads потоков /// public static async Task ForEachAsync(IEnumerable values, Func processor, int maxThreads) { await Task.Run(() => ForEach(values, processor, maxThreads)); } /// /// Обрабатывает все элементы @values типа @T методом @processor в параллельном режиме, /// используя максимум @maxThreads потоков /// public static void ForEach(IEnumerable values, Func processor, int maxThreads) { var taskList = new List(); var enumerator = values.GetEnumerator(); int numTasksFinished = 0; while (true) { // Наполняем массив ожидания следующими задачами while (taskList.Count < maxThreads) { if (!enumerator.MoveNext()) break; // Запускаем новую задачу в отсоединенном потоке Task newTask = Task.Run(() => processor(enumerator.Current)); taskList.Add(newTask); } // Если массив ожидания пуст, работа окончена if (taskList.Count == 0) return; // Ждем завершение любой задачи из массива ожидания int finishedIndex = Task.WaitAny(taskList.ToArray()); var finishedTask = taskList[finishedIndex]; numTasksFinished += 1; // Удаляем задачу из массива ожидания чтобы более ее не ждать taskList.Remove(finishedTask); // Если задача завершилась успешно уходим на добавление новой задачи if (!finishedTask.IsFaulted && !finishedTask.IsCanceled) continue; // Задача завершилась аномально, ждем завершения других запущенных задач if (taskList.Count > 0) Task.WaitAll(taskList.ToArray()); // Составляем список всех возникших ошибок включая первую taskList.Insert(0, finishedTask); var errors = new List(); foreach (var task in taskList) { if (task.IsFaulted) errors.Add(task.Exception); if (task.IsCanceled) errors.Add(new Exception("Task was cancelled")); } // Аномально завершаем обработку string message = $"Ошибка параллельной обработки №{numTasksFinished} из {values.Count()}" + $" объектов типа {typeof(T).FullName}"; throw new AggregateException(message, errors.ToArray()); } } } }