Сельдерей параллельная распределенная задача с многопроцессорной обработкой


У меня есть задача с интенсивным ЦП сельдерея. Я хотел бы использовать всю вычислительную мощность (ядра) во многих экземплярах EC2, чтобы выполнить эту работу быстрее (параллельная распределенная задача сельдерея с многопроцессорной обработкой -Я думаю).

условия резьбонарезной, многопроцессорная обработка,распределенных вычислений,распределенная параллельная обработка все термины, которые я пытаюсь понять лучше.

пример задачи:

  @app.task
  for item in list_of_millions_of_ids:
      id = item # do some long complicated equation here very CPU heavy!!!!!!! 
      database.objects(newid=id).save()

используя код выше (С примером, если можно) как можно было бы распределить эту задачу с помощью сельдерея, разрешив разделить эту задачу, используя всю вычислительную мощность процессора на всех доступных машинах в облаке?

4 56

4 ответа:

ваши задачи:

  1. распределите вашу работу к много машин (распределенных вычисление / распределенная параллельная обработка)
  2. распределите работу на данной машине по всем процессорам (многопроцессорность/многопоточность)

сельдерей может сделать оба из них для вас довольно легко. Первое, что нужно понять, это то, что каждый работник сельдерея по умолчанию чтобы выполнить столько задач, сколько ядер процессора доступно на сервере система:

параллелизм-это число рабочих процессов prefork, используемых для обработки ваши задачи одновременно, когда все они заняты новой работой задачи должны будут дождаться завершения одной из задач, прежде чем она сможет быть обработанным.

номер параллелизма по умолчанию-это количество процессоров на этой машине (включая ядра), вы можете указать пользовательский номер с помощью опции-c. Там нет рекомендуемого значения, как оптимальное число зависит от количество факторов, но если ваши задачи в основном связаны с вводом / выводом, вы можете попробуйте увеличить его, эксперименты показали, что добавление более чем удвоенное количество процессоров редко эффективно и, вероятно, ухудшится вместо этого производительность.

это означает, что каждой отдельной задаче не нужно беспокоиться об использовании многопроцессорной/потоковой обработки для использования нескольких процессоров/ядер. Вместо этого сельдерей будет выполнять достаточно задач одновременно, чтобы использовать каждый доступный ЦП.

С этим из пути, следующим шагом является создание задачи, которая обрабатывает обработку некоторого подмножества вашего list_of_millions_of_ids. У вас есть несколько вариантов здесь-один, чтобы каждая задача обрабатывала один идентификатор, поэтому вы запускаете N задач, где N == len(list_of_millions_of_ids). Это гарантирует, что работа будет равномерно распределена между всеми вашими задачами, так как никогда не будет случая, когда один работник заканчивает рано и просто ждет; если ему нужна работа, он может вытащить идентификатор из очереди. Вы можете сделать это (как упомянутый Джон Доу) с использованием сельдерея group.

tasks.py:

@app.task
def process_id(item):
    id = item #long complicated equation here
    database.objects(newid=id).save()

и для выполнения задачи:

from celery import group
from tasks import process_id

jobs = group(process_id.s(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()

другой вариант-разбить список на более мелкие части и распределить части своим работникам. Этот подход рискует потратить впустую некоторые циклы, потому что вы можете в конечном итоге с некоторыми работниками ждать вокруг, в то время как другие все еще делают работу. Тем не менее,сельдерей примечания к документации что это беспокойство часто необоснованно:

некоторые могут беспокоиться, что фрагментация ваших задач приводит к деградации параллелизм, но это редко верно для занятого кластера и в практика, так как вы избегаете накладных расходов на обмен сообщениями это может значительно повысить производительность.

таким образом, вы можете обнаружить, что фрагментация списка и распределение кусков для каждой задачи выполняется лучше, из-за снижения накладных расходов на обмен сообщениями. Вероятно, вы также можете облегчить нагрузку на база данных немного таким образом, вычисляя каждый идентификатор, сохраняя его в списке, а затем добавляя весь список в БД, как только вы закончите, а не делать это по одному идентификатору за раз. Чункинг подход будет выглядеть примерно так

tasks.py:

@app.task
def process_ids(items):
    for item in items:
        id = item #long complicated equation here
        database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.

и чтобы начать задачи:

from tasks import process_ids

jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()

вы можете немного поэкспериментировать с тем, что размер фрагментов дает лучший результат. Вы хотите найти сладкое место, где вы сокращаете накладные расходы на обмен сообщениями в то же время сохраняя размер достаточно маленьким, чтобы вы не заканчивали с рабочими, заканчивающими свой кусок намного быстрее, чем другой рабочий, а затем просто ждали, ничего не делая.

в мире дистрибуции есть только одна вещь, которую вы должны помнить прежде всего:

преждевременная оптимизация является корнем всех зол. By D. Knuth

Я знаю, что это звучит очевидно, но перед распространением двойной проверки вы используете лучший алгоритм (если он существует...). Сказав это, оптимизация распределения является балансирующим актом между 3 вещами:

  1. запись/чтение данных из стойких средний,
  2. перемещение данных от среднего до среднего Б,
  3. обработка данных,

компьютеры сделаны так, что чем ближе вы подойдете к своему процессору (3), тем быстрее и эффективнее будут (1) и (2). Порядок в классическом кластере будет следующим: сетевой жесткий диск, локальный жесткий диск, оперативная память, внутренняя территория процессорного блока... В настоящее время процессоры становятся достаточно сложными, чтобы их можно было рассматривать как ансамбль независимых аппаратных процессоров обычно называемые ядрами, эти ядра обрабатывают данные (3) через потоки (2). Представьте, что ваше ядро настолько быстро, что при отправке данных с одним потоком вы используете 50% мощности компьютера, если ядро имеет 2 потока, вы будете использовать 100%. Два потока на ядро называется hyper threading,и ваша ОС будет видеть 2 процессора на ядро hyper threaded.

управление потоками в процессоре обычно называется многопоточностью. Управление процессорами от операционной системы, обычно называется многопроцессорной обработки. Управляющий параллельные задачи в кластере обычно называют параллельным программированием. Управление зависимыми задачами в кластере обычно называется распределенным программированием.

так где узкое место ?

  • In (1): попробуйте сохранить и поток с верхнего уровня (тот, который ближе к вашему процессору, например, если сетевой жесткий диск медленно сначала сохранить на локальном жестком диске)
  • In (2): Это самый распространенный, старайтесь избегать общения пакеты, не необходимые для распространения или сжатия пакетов "на лету" (например, если HD медленный, сохраните только "пакетное вычисленное" сообщение и сохраните промежуточные результаты в ОЗУ).
  • In (3): Вы сделали! Вы используете всю вычислительную мощность в вашем распоряжении.

как насчет сельдерея ?

Сельдерей-это платформа обмена сообщениями для распределенного программирования, которая будет использовать модуль брокера для связи (2) и бэкэнд-модуль для persistence (1) это означает, что вы сможете, изменив конфигурацию, избежать большинства узких мест (если это возможно) в вашей сети и только в вашей сети. Сначала профилируйте свой код для достижения наилучшей производительности на одном компьютере. Затем используйте сельдерей в кластере с конфигурацией по умолчанию и установите CELERY_RESULT_PERSISTENT=True:

from celery import Celery

app = Celery('tasks', 
             broker='amqp://guest@localhost//',
             backend='redis://localhost')

@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
    #code that does stuff
    return result

во время выполнения откройте ваши любимые инструменты мониторинга, я использую по умолчанию для rabbitMQ и цветок для сельдерея и сверху для процессоров, ваши результаты будут быть сохранены на сервере. Примером узкого места сети является очередь задач, растущая настолько, что они задерживают выполнение, Вы можете перейти к изменению модулей или конфигурации сельдерея, если ваше узкое место не находится где-то еще.

почему бы не использовать group сельдерей задача для этого?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

в принципе, вы должны разделить ids на куски (или диапазоны) и дать им кучу задач group.

для более сложных задач, таких как агрегирование результатов конкретных задач сельдерея, я успешно использовал chord задач для подобных цель:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

увеличить settings.CELERYD_CONCURRENCY к числу, которое является разумным, и вы можете себе позволить, то эти работники сельдерей будет продолжать выполнять свои задачи в группе или аккорд до тех пор, пока не будет сделано.

Примечание: из-за ошибки в kombu в прошлом были проблемы с повторным использованием рабочих для большого количества задач, я не знаю, исправлено ли это сейчас. Может быть, это и так, но если нет, уменьшите CELERYD_MAX_TASKS_PER_CHILD.

пример на основе упрощенного и измененного кода я запускаю:

@app.task
def do_matches():
    match_data = ...
    result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())

summarize получает результаты всех single_batch_processor задач. Каждая задача выполняется на любом работнике сельдерея,kombu координаты.

теперь я понял: single_batch_processor и summarize также должны быть задачи сельдерея, а не регулярные функции - в противном случае, конечно, он не будет распараллелен (я даже не уверен, что конструктор аккордов примет его, если это не задача сельдерея).

добавление большего количества работников сельдерея, безусловно, ускорит выполнение задачи. Однако у вас может быть еще одно узкое место: база данных. Убедитесь, что он может обрабатывать одновременные вставки/обновления.

Что касается вашего вопроса: вы добавляете работников сельдерея, назначая другой процесс на своих экземплярах EC2 как celeryd. В зависимости от того, сколько работников вам нужно, вы можете добавить еще больше экземпляров.