Как многопоточной операции внутри цикла в Python


Допустим, у меня очень большой список и я выполняю такую операцию:

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

Моя проблема двоякая:

  • есть много пунктов
  • API-интерфейс.my_operation принимает навсегда, чтобы вернуться

Я хотел бы использовать многопоточность, чтобы раскрутить кучу api. my_operations сразу, чтобы я мог обрабатывать, возможно, 5 или 10 или даже 100 элементов одновременно.

Если my_operation () возвращает исключение (потому что, возможно, я уже обработал этот элемент) - это нормально. Он не сломается. что-нибудь. Цикл можно продолжить до следующего элемента.

Примечание : это для Python 2.7.3

3 40

3 ответа:

Во-первых, в Python, если ваш код привязан к процессору, многопоточность не поможет, потому что только один поток может удерживать глобальную блокировку интерпретатора и, следовательно, одновременно запускать код Python. Поэтому вам нужно использовать процессы, а не потоки.

Это неверно, если ваша операция "занимает вечность, чтобы вернуться", потому что она связана с IO-то есть ожидает в сети или дисковых копиях или тому подобном. Я вернусь к этому позже.


Далее, способ обработки 5 или 10 или 100 элементов сразу состоит в том, чтобы создайте пул из 5, 10 или 100 работников и поместите элементы в очередь, которую обслуживают работники. К счастью, stdlib multiprocessing и еще concurrent.futures библиотеки оборачивают большую часть деталей для вас.

Первый более мощный и гибкий для традиционного программирования; второй проще, если вам нужно составить ожидание будущего; для тривиальных случаев действительно не имеет значения, что вы выбираете. (В этом случае наиболее очевидная реализация с каждым занимает 3 строки с futures, 4 строки с multiprocessing.)

Если вы используете 2.6-2.7 или 3.0-3.1, futures не встроен, но вы можете установить его из PyPI (pip install futures).


Наконец, обычно намного проще распараллелить вещи, если вы можете превратить всю итерацию цикла в вызов функции (что-то, что вы могли бы, например, передать в map), поэтому давайте сначала сделаем это:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

Собирая все это вместе:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

Если у вас есть много относительно небольших рабочих мест, накладные расходы многопроцессорная обработка может затопить прибыль. Чтобы решить эту проблему, нужно сгруппировать работу в более крупные задания. Например (используя grouper из itertools рецепты , которые вы можете скопировать и вставить в свой код, или получить из проекта more-itertools на PyPI):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

Наконец, что делать, если ваш код привязан к IO? Тогда потоки так же хороши, как и процессы, и с меньшими накладными расходами (и меньшими ограничениями, но эти ограничения обычно не влияют на вас в подобных случаях). Иногда это "меньше накладных расходов" достаточно, чтобы означать, что вам не нужно паковать потоки, но вы делаете это с процессами, что является хорошей победой.

Итак, как вы используете потоки вместо процессов? Просто измените ProcessPoolExecutor на ThreadPoolExecutor.

Если вы не уверены, привязан ли ваш код к процессору или к IO, просто попробуйте использовать оба способа.


Могу ли я сделать это для нескольких функций в моем скрипте python? Например, если бы у меня был другой цикл for в другом месте кода, который я хотел бы распараллелить. Это возможно ли выполнить две многопоточные функции в одном скрипте?

Да. На самом деле, есть два разных способа сделать это.

Во-первых, вы можете совместно использовать один и тот же исполнитель (поток или процесс) и использовать его из нескольких мест без проблем. Весь смысл задач и будущего в том, что они самодостаточны; вам все равно, где они выполняются, просто вы ставите их в очередь и в конечном итоге получаете ответ.

Кроме того, вы можете иметь двух исполнителей в одной программе без проблем. Это имеет стоимость производительности-если вы используете оба исполнителя одновременно, вы в конечном итоге попытаетесь запустить (например) 16 занятых потоков на 8 ядрах, что означает, что произойдет некоторое переключение контекста. Но иногда это стоит сделать, потому что, скажем, два исполнителя редко заняты одновременно, и это делает ваш код намного проще. Или, может быть, один исполнитель выполняет очень большие задачи, которые могут занять некоторое время, а другой выполняет очень маленькие задачи, которые необходимо выполнить. завершите как можно быстрее, потому что скорость отклика важнее, чем пропускная способность для части вашей программы.

Если вы не знаете, что подходит для вашей программы, обычно это первое.

Edit 2018-02-06 : редакция, основанная на этом Комментарии

Edit : забыл упомянуть, что это работает на Python 2.7.x

Есть многопроцессорность.пул, и следующий пример иллюстрирует, как использовать один из них:

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

Теперь, если вы действительно определяете, что ваш процесс связан с процессором, как упоминалось в @abarnert, измените ThreadPool на реализацию пула процессов (прокомментировано в разделе импорт ThreadPool). Более подробную информацию вы можете найти здесь: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

Вы можете разделить обработку на определенное количество потоков, используя такой подход:

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)