using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Hcs.ClientApi
{
public class HcsParallel
{
///
/// Асинхронно обрабатывает все элементы @values типа @T методом @processor в параллельном режиме,
/// используя максимум @maxThreads потоков
///
public static async Task ForEachAsync(IEnumerable values, Func processor, int maxThreads)
{
await Task.Run(() => ForEach(values, processor, maxThreads));
}
///
/// Обрабатывает все элементы @values типа @T методом @processor в параллельном режиме,
/// используя максимум @maxThreads потоков
///
public static void ForEach(IEnumerable values, Func processor, int maxThreads)
{
var taskList = new List();
var enumerator = values.GetEnumerator();
int numTasksFinished = 0;
while (true)
{
// Наполняем массив ожидания следующими задачами
while (taskList.Count < maxThreads)
{
if (!enumerator.MoveNext()) break;
// Запускаем новую задачу в отсоединенном потоке
Task newTask = Task.Run(() => processor(enumerator.Current));
taskList.Add(newTask);
}
// Если массив ожидания пуст, работа окончена
if (taskList.Count == 0) return;
// Ждем завершение любой задачи из массива ожидания
int finishedIndex = Task.WaitAny(taskList.ToArray());
var finishedTask = taskList[finishedIndex];
numTasksFinished += 1;
// Удаляем задачу из массива ожидания чтобы более ее не ждать
taskList.Remove(finishedTask);
// Если задача завершилась успешно уходим на добавление новой задачи
if (!finishedTask.IsFaulted &&
!finishedTask.IsCanceled) continue;
// Задача завершилась аномально, ждем завершения других запущенных задач
if (taskList.Count > 0) Task.WaitAll(taskList.ToArray());
// Составляем список всех возникших ошибок включая первую
taskList.Insert(0, finishedTask);
var errors = new List();
foreach (var task in taskList)
{
if (task.IsFaulted) errors.Add(task.Exception);
if (task.IsCanceled) errors.Add(new Exception("Task was cancelled"));
}
// Аномально завершаем обработку
string message =
$"Ошибка параллельной обработки №{numTasksFinished} из {values.Count()}" +
$" объектов типа {typeof(T).FullName}";
throw new AggregateException(message, errors.ToArray());
}
}
}
}