Встроенный способ преобразования асинхронного итерационного списка в синхронный итерационный список


Python3. 6 теперь асинхронные итераторы . Существует ли встроенный способ преобразования асинхронной итерации в синхронную итерацию.

В настоящее время у меня есть эта вспомогательная функция, но она кажется очень непифонической. Есть ли лучший способ сделать это?

async def aiter_to_list(aiter):
    l = []
    async for i in aiter:
        l.append(i)
    return l
3 2

3 ответа:

Вы можете использовать aiostream.поток.список :

from aiostream import stream

async def agen():
    yield 1
    yield 2
    yield 3

async def main():
    lst = await stream.list(agen())
    print(lst)  # prints [1, 2, 3]

Дополнительные операторы и примеры в документации .

Ваш помощник "от асинхронного к синхронному" сам по себе асинхронен; это совсем не большое изменение. В общем: нет, вы не можете сделать что-то асинхронное синхронным. Асинхронное значение будет предоставлено "когда-нибудь позже"; вы не можете сделать это в "сейчас", потому что значение не существует "сейчас", и вам придется ждать его асинхронно.

Эти функции позволяют конвертировать из / в iterable async iterable, не просто списки.

Основной импорт

import asyncio
import threading
import time

DONE = object()
TIMEOUT = 0.001

Функция to_sync_iterable преобразует любую асинхронную итерацию в синхронизирующую итерацию:

def to_sync_iterable(async_iterable, maxsize = 0):

    def sync_iterable():

        queue = asyncio.Queue(maxsize=maxsize)
        loop = asyncio.get_event_loop()

        t = threading.Thread(target=_run_coroutine, args=(loop, async_iterable, queue))
        t.daemon = True
        t.start()

        while True:
            if not queue.empty():
                x = queue.get_nowait()

                if x is DONE:
                    break
                else:
                    yield x
            else:
                time.sleep(utils.TIMEOUT)

        t.join()

    return sync_iterable()

def _run_coroutine(loop, async_iterable, queue):

    loop.run_until_complete(_consume_async_iterable(async_iterable, queue))

async def _consume_async_iterable(async_iterable, queue):

    async for x in async_iterable:
        await queue.put(x)

    await queue.put(DONE)

Вы можете использовать его следующим образом:

async def slow_async_generator():
    yield 0

    await asyncio.sleep(1)
    yield 1

    await asyncio.sleep(1)
    yield 2

    await asyncio.sleep(1)
    yield 3


for x in to_sync_iterable(slow_async_generator()):
    print(x)

Функция to_async_iterable преобразует любую итерацию синхронизации в асинхронную итерацию:

def to_async_iterable(iterable, maxsize = 0):

    async def async_iterable():
        queue = asyncio.Queue(maxsize=maxsize)
        loop = asyncio.get_event_loop()
        task = loop.run_in_executor(None, lambda: _consume_iterable(loop, iterable, queue))

        while True:
            x = await queue.get()

            if x is DONE:
                break
            else:
                yield x

        await task


    return async_iterable()

def _consume_iterable(loop, iterable, queue):

    for x in iterable:
        while True:
            if not queue.full():
                loop.call_soon_threadsafe(queue.put_nowait, x)
                break
            else:
                time.sleep(TIMEOUT)

    while True:
        if not queue.full():
            loop.call_soon_threadsafe(queue.put_nowait, DONE)
            break
        else:
            time.sleep(TIMEOUT)

Это особенно полезно для программ asyncio, потому что он не будет блокировать цикл событий, даже если итерационные блоки синхронизации. Вы можете использовать его следующим образом:

def slow_sync_generator():
    yield 0

    time.sleep(1)
    yield 1

    time.sleep(1)
    yield 2

    time.sleep(1)
    yield 3

async def async_task():
    async for x in to_async_iterable(slow_sync_generator()):
        print(x)

asyncio.get_event_loop().run_until_complete(async_task())