Делает Параллель.Объекту ограничить количество активных потоков?


учитывая этот код:

var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
    DoSomething(someString);
});

будут ли все 1000 потоков появляться почти одновременно?

5 94

5 ответов:

нет, он не будет запускать 1000 потоков - да, он будет ограничивать количество используемых потоков. Параллельные расширения используют соответствующее количество ядер, исходя из того, сколько у вас физически есть и сколько уже занято. Он выделяет работу для каждого ядра, а затем использует метод под названием работа краже чтобы каждый поток эффективно обрабатывал свою собственную очередь и только тогда, когда это действительно необходимо, нужно делать какой-либо дорогостоящий кросс-потоковый доступ.

посмотреть pfx Team Blog на loads информации о том, как он распределяет работу и все виды других тем.

обратите внимание, что в некоторых случаях вы также можете указать степень параллелизма.

на одноядерной машине... Параллельный.Разделы ForEach (куски) коллекции, над которой он работает, между несколькими потоками, но это число вычисляется на основе алгоритма, который учитывает и, по-видимому, постоянно контролирует работу, выполняемую потоками, которые он выделяет для ForEach. Так что если часть тела ForEach вызывает длительные функции IO-bound / blocking, которые оставят поток в ожидании, алгоритм создаст больше потоков и передел коллекции между ними. Если потоки завершаются быстро и не блокируются на потоках ввода-вывода, например, например, просто вычисляя некоторые числа,алгоритм увеличит (или действительно уменьшит) количество потоков до точки, где алгоритм считает оптимальным для пропускной способности (среднее время завершения каждой итерации).

в основном пул потоков за всеми различными параллельными библиотечными функциями, разработает оптимальное количество потоков для использовать. Число физических процессорных ядер составляет только часть уравнения. Существует не простая связь один к одному между количеством ядер и количеством порожденных потоков.

Я не нахожу документацию по отмене и обработке синхронизации потоков очень полезной. Надеюсь, MS может предоставить лучшие примеры в MSDN.

Не забывайте, что код тела должен быть написан для запуска на нескольких потоках, наряду со всеми обычными потоками безопасности соображения, структура не абстрагирует этот фактор... еще.

он вырабатывает оптимальное количество потоков в зависимости от количества процессоров/ядер. Они не будут появляться все сразу.

посмотреть Делает Параллельно.Для использования одной задачи на итерацию? для идеи "ментальной модели" использовать. Однако автор заявляет, что "в конце концов, важно помнить, что детали реализации могут измениться в любое время."

большой вопрос. В вашем примере уровень распараллеливания довольно низок даже на четырехъядерном процессоре, но с некоторым ожиданием уровень распараллеливания может стать довольно высоким.

// Max concurrency: 5
[Test]
public void Memory_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);
        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

теперь посмотрите, что происходит, когда операция ожидания добавляется для имитации HTTP-запроса.

// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    Parallel.ForEach<string>(arrayStrings, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Я еще не внес никаких изменений, и уровень параллелизма/распараллеливания резко подскочил. Параллелизм может иметь свой предел увеличен с помощью ParallelOptions.MaxDegreeOfParallelism.

// Max concurrency: 43
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(1000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

// Max concurrency: 391
[Test]
public void Test()
{
    ConcurrentBag<int> monitor = new ConcurrentBag<int>();
    ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(monitor.Count);

        System.Threading.Thread.Sleep(100000);

        monitor.TryTake(out int result);
        monitorOut.Add(result);
    });

    Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}

Я рекомендую установка ParallelOptions.MaxDegreeOfParallelism. Это не обязательно увеличит количество используемых потоков, но это гарантирует, что вы начнете только разумное количество потоков, что, похоже, вас беспокоит.

чтобы ответить на ваш вопрос, нет, вы не получите все потоки сразу. Использовать Параллельно.Вызовите, если вы хотите вызвать параллельно отлично, например, тестирование условий гонки.
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
    ConcurrentBag<string> monitor = new ConcurrentBag<string>();
    ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
    var arrayStrings = new string[1000];
    var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
    Parallel.ForEach<string>(arrayStrings, options, someString =>
    {
        monitor.Add(DateTime.UtcNow.Ticks.ToString());
        monitor.TryTake(out string result);
        monitorOut.Add(result);
    });

    var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
    Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}