Встроенный способ преобразования асинхронного итерационного списка в синхронный итерационный список
Python3. 6 теперь асинхронные итераторы . Существует ли встроенный способ преобразования асинхронной итерации в синхронную итерацию.
В настоящее время у меня есть эта вспомогательная функция, но она кажется очень непифонической. Есть ли лучший способ сделать это?
async def aiter_to_list(aiter):
l = []
async for i in aiter:
l.append(i)
return l
3 ответа:
Вы можете использовать aiostream.поток.список :
from aiostream import stream async def agen(): yield 1 yield 2 yield 3 async def main(): lst = await stream.list(agen()) print(lst) # prints [1, 2, 3]
Ваш помощник "от асинхронного к синхронному" сам по себе асинхронен; это совсем не большое изменение. В общем: нет, вы не можете сделать что-то асинхронное синхронным. Асинхронное значение будет предоставлено "когда-нибудь позже"; вы не можете сделать это в "сейчас", потому что значение не существует "сейчас", и вам придется ждать его асинхронно.
Эти функции позволяют конвертировать из / в
iterable
async iterable
, не просто списки.Основной импорт
import asyncio import threading import time DONE = object() TIMEOUT = 0.001
Функция
to_sync_iterable
преобразует любую асинхронную итерацию в синхронизирующую итерацию:def to_sync_iterable(async_iterable, maxsize = 0): def sync_iterable(): queue = asyncio.Queue(maxsize=maxsize) loop = asyncio.get_event_loop() t = threading.Thread(target=_run_coroutine, args=(loop, async_iterable, queue)) t.daemon = True t.start() while True: if not queue.empty(): x = queue.get_nowait() if x is DONE: break else: yield x else: time.sleep(utils.TIMEOUT) t.join() return sync_iterable() def _run_coroutine(loop, async_iterable, queue): loop.run_until_complete(_consume_async_iterable(async_iterable, queue)) async def _consume_async_iterable(async_iterable, queue): async for x in async_iterable: await queue.put(x) await queue.put(DONE)
Вы можете использовать его следующим образом:
async def slow_async_generator(): yield 0 await asyncio.sleep(1) yield 1 await asyncio.sleep(1) yield 2 await asyncio.sleep(1) yield 3 for x in to_sync_iterable(slow_async_generator()): print(x)
Функция to_async_iterable преобразует любую итерацию синхронизации в асинхронную итерацию:
def to_async_iterable(iterable, maxsize = 0): async def async_iterable(): queue = asyncio.Queue(maxsize=maxsize) loop = asyncio.get_event_loop() task = loop.run_in_executor(None, lambda: _consume_iterable(loop, iterable, queue)) while True: x = await queue.get() if x is DONE: break else: yield x await task return async_iterable() def _consume_iterable(loop, iterable, queue): for x in iterable: while True: if not queue.full(): loop.call_soon_threadsafe(queue.put_nowait, x) break else: time.sleep(TIMEOUT) while True: if not queue.full(): loop.call_soon_threadsafe(queue.put_nowait, DONE) break else: time.sleep(TIMEOUT)
Это особенно полезно для программ asyncio, потому что он не будет блокировать цикл событий, даже если итерационные блоки синхронизации. Вы можете использовать его следующим образом:
def slow_sync_generator(): yield 0 time.sleep(1) yield 1 time.sleep(1) yield 2 time.sleep(1) yield 3 async def async_task(): async for x in to_async_iterable(slow_sync_generator()): print(x) asyncio.get_event_loop().run_until_complete(async_task())