Как правильно создавать и запускать параллельные задачи, используя модуль ввода-вывода в Python?
я пытаюсь правильно понять и реализовать два одновременно работающих Task
объекты с использованием Python 3 относительно новые asyncio
модуль.
в двух словах, asyncio, кажется, предназначен для обработки асинхронных процессов и параллельных Task
исполнение в течение цикла обработки событий. Это способствует использованию await
(применяется в асинхронных функциях) в качестве обратного вызова-бесплатный способ ждать и использовать результат, не блокируя цикл событий. (Фьючерс и обратные вызовы по-прежнему являются жизнеспособной альтернативой.)
он также обеспечивает asyncio.Task()
класс, специализированный подкласс Future
предназначена для упаковки сопрограммы. Предпочтительно вызывается с помощью asyncio.ensure_future()
метод. Предполагаемое использование задач asyncio заключается в том, чтобы позволить независимо выполняемым задачам выполняться "одновременно" с другими задачами в том же цикле событий. Я так понимаю, что Tasks
подключены к циклу событий, который затем автоматически продолжает управлять сопрограммой между await
заявления.
мне нравится идея быть в состоянии использовать параллельные задачи без необходимости использовать Executor
классы, но я не нашел много разработки по реализации.
вот как я сейчас это делаю:
import asyncio
print('running async test')
async def say_boo():
i = 0
while True:
await asyncio.sleep(0)
print('...boo {0}'.format(i))
i += 1
async def say_baa():
i = 0
while True:
await asyncio.sleep(0)
print('...baa {0}'.format(i))
i += 1
# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())
loop = asyncio.get_event_loop()
loop.run_forever()
в случае попытки одновременно запустить две задачи цикла, я заметил, что если задача не имеет внутреннего await
выражение, оно застрянет в while
петля, эффективно блокируя другие задачи от запуска (очень похоже на обычный while
петли). Однако, как только задачи должны (а) ждать, они, кажется, работают одновременно без проблем.
таким образом,await
операторы, кажется, обеспечивают цикл событий с опорой для переключения между задачами, давая эффект параллелизма.
пример вывода с внутренними await
:
running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2
пример вывода без внутренние await
:
...boo 0
...boo 1
...boo 2
...boo 3
...boo 4
вопросы
проходит ли эта реализация для "правильного" примера параллельных задач цикла в asyncio
?
правильно ли, что это работает только для A Task
чтобы обеспечить точку блокировки (await
выражение) для того, чтобы цикл событий жонглировать несколькими задачами?
2 ответа:
да, любая сопрограмма, работающая внутри цикла событий, блокирует выполнение других сопрограмм и задач, если только она
- вызывает другую сопрограмму с помощью
yield from
илиawait
(при использовании Python 3.5+).- возвращает.
это так
asyncio
является однопоточным; единственный способ запуска цикла событий - это отсутствие активного выполнения других сопрограмм. Используяyield from
/await
временно приостанавливает работу сопрограммы, давая цикл событий шанс работать.ваш пример кода в порядке, но во многих случаях вы, вероятно, не захотите, чтобы долго работающий код, который не выполняет асинхронный ввод-вывод, выполнялся внутри цикла событий. В этих случаях часто имеет смысл использовать
BaseEventLoop.run_in_executor
для выполнения кода в фоновом потоке или процессе.ProcessPoolExecutor
было бы лучшим выбором, если ваша задача связана с процессором,ThreadPoolExecutor
будет использоваться, если вам нужно сделать некоторые I / O, который не являетсяasyncio
-дружелюбный.ваши два цикла, например, полностью привязаны к процессору и не имеют общего состояния, поэтому лучшая производительность будет получена от использования
ProcessPoolExecutor
чтобы запустить каждый цикл параллельно через процессоры:import asyncio from concurrent.futures import ProcessPoolExecutor print('running async test') def say_boo(): i = 0 while True: print('...boo {0}'.format(i)) i += 1 def say_baa(): i = 0 while True: print('...baa {0}'.format(i)) i += 1 if __name__ == "__main__": executor = ProcessPoolExecutor(2) loop = asyncio.get_event_loop() boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo)) baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa)) loop.run_forever()
вам не обязательно нужен
yield from x
для управления циклом событий.в вашем примере, я думаю правильный как бы сделать
yield None
или простоyield
, а неyield from asyncio.sleep(0.001)
:import asyncio @asyncio.coroutine def say_boo(): i = 0 while True: yield None print("...boo {0}".format(i)) i += 1 @asyncio.coroutine def say_baa(): i = 0 while True: yield print("...baa {0}".format(i)) i += 1 boo_task = asyncio.async(say_boo()) baa_task = asyncio.async(say_baa()) loop = asyncio.get_event_loop() loop.run_forever()
сопрограммы - это просто старые генераторы Python. Внутри
asyncio
цикл событий сохраняет запись этих генераторов и вызововgen.send()
на каждой из них по одному в бесконечном цикле. Всякий раз, когда выyield
, в звоните вgen.send()
завершается и цикл может двигаться дальше. (Я упрощаю его; оглянитесь вокруг https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 для фактического кода)что сказал, Я бы все равно пошел
run_in_executor
маршрут, если вам нужно сделать интенсивные вычисления процессора без обмена данными.