Как правильно создавать и запускать параллельные задачи, используя модуль ввода-вывода в Python?


я пытаюсь правильно понять и реализовать два одновременно работающих Task объекты с использованием Python 3 относительно новые asyncio модуль.

в двух словах, asyncio, кажется, предназначен для обработки асинхронных процессов и параллельных Task исполнение в течение цикла обработки событий. Это способствует использованию await (применяется в асинхронных функциях) в качестве обратного вызова-бесплатный способ ждать и использовать результат, не блокируя цикл событий. (Фьючерс и обратные вызовы по-прежнему являются жизнеспособной альтернативой.)

он также обеспечивает asyncio.Task() класс, специализированный подкласс Future предназначена для упаковки сопрограммы. Предпочтительно вызывается с помощью asyncio.ensure_future() метод. Предполагаемое использование задач asyncio заключается в том, чтобы позволить независимо выполняемым задачам выполняться "одновременно" с другими задачами в том же цикле событий. Я так понимаю, что Tasks подключены к циклу событий, который затем автоматически продолжает управлять сопрограммой между await заявления.

мне нравится идея быть в состоянии использовать параллельные задачи без необходимости использовать Executor классы, но я не нашел много разработки по реализации.

вот как я сейчас это делаю:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

в случае попытки одновременно запустить две задачи цикла, я заметил, что если задача не имеет внутреннего await выражение, оно застрянет в while петля, эффективно блокируя другие задачи от запуска (очень похоже на обычный while петли). Однако, как только задачи должны (а) ждать, они, кажется, работают одновременно без проблем.

таким образом,await операторы, кажется, обеспечивают цикл событий с опорой для переключения между задачами, давая эффект параллелизма.

пример вывода с внутренними await:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

пример вывода без внутренние await:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

вопросы

проходит ли эта реализация для "правильного" примера параллельных задач цикла в asyncio?

правильно ли, что это работает только для A Task чтобы обеспечить точку блокировки (await выражение) для того, чтобы цикл событий жонглировать несколькими задачами?

2 54

2 ответа:

да, любая сопрограмма, работающая внутри цикла событий, блокирует выполнение других сопрограмм и задач, если только она

  1. вызывает другую сопрограмму с помощью yield from или await (при использовании Python 3.5+).
  2. возвращает.

это так asyncio является однопоточным; единственный способ запуска цикла событий - это отсутствие активного выполнения других сопрограмм. Используя yield from/await временно приостанавливает работу сопрограммы, давая цикл событий шанс работать.

ваш пример кода в порядке, но во многих случаях вы, вероятно, не захотите, чтобы долго работающий код, который не выполняет асинхронный ввод-вывод, выполнялся внутри цикла событий. В этих случаях часто имеет смысл использовать BaseEventLoop.run_in_executor для выполнения кода в фоновом потоке или процессе. ProcessPoolExecutor было бы лучшим выбором, если ваша задача связана с процессором,ThreadPoolExecutor будет использоваться, если вам нужно сделать некоторые I / O, который не является asyncio-дружелюбный.

ваши два цикла, например, полностью привязаны к процессору и не имеют общего состояния, поэтому лучшая производительность будет получена от использования ProcessPoolExecutor чтобы запустить каждый цикл параллельно через процессоры:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.get_event_loop()
    boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo))
    baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa))

    loop.run_forever()

вам не обязательно нужен yield from x для управления циклом событий.

в вашем примере, я думаю правильный как бы сделать yield None или просто yield, а не yield from asyncio.sleep(0.001):

import asyncio

@asyncio.coroutine
def say_boo():
  i = 0
  while True:
    yield None
    print("...boo {0}".format(i))
    i += 1

@asyncio.coroutine
def say_baa():
  i = 0
  while True:
    yield
    print("...baa {0}".format(i))
    i += 1

boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

сопрограммы - это просто старые генераторы Python. Внутри asyncio цикл событий сохраняет запись этих генераторов и вызовов gen.send() на каждой из них по одному в бесконечном цикле. Всякий раз, когда вы yield, в звоните в gen.send() завершается и цикл может двигаться дальше. (Я упрощаю его; оглянитесь вокруг https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 для фактического кода)

что сказал, Я бы все равно пошел run_in_executor маршрут, если вам нужно сделать интенсивные вычисления процессора без обмена данными.