Многопоточный asyncio в Python


В настоящее время я делаю свои первые шаги с asyncio в Python 3.5, и есть одна проблема, которая меня беспокоит. Очевидно, я не до конца понял соратников...

Вот упрощенная версия того, что я делаю.

В моем классе есть метод open (), который создает новый поток. В этом потоке я создаю новый цикл событий и соединение сокета с некоторым хостом. Затем я позволил петле бежать вечно.

def open(self):
    # create thread
    self.thread = threading.Thread(target=self._thread)
    self.thread.start()
    # wait for connection
    while self.protocol is None:
        time.sleep(0.1)

def _thread(self):
    # create loop, connection and run forever
    self.loop = asyncio.new_event_loop()
    coro = self.loop.create_connection(lambda: MyProtocol(self.loop),
                                       'somehost.com', 1234)
    self.loop.run_until_complete(coro)
    self.loop.run_forever()

Остановить соединение теперь довольно просто, я просто останавливаю цикл из основного потока:

loop.call_soon_threadsafe(loop.stop)

К сожалению, мне нужно сделать некоторую очистку, особенно мне нужно очистить очередь перед отключением от сервера. Поэтому я попробовал что-то вроде этого метода stop() в MyProtocol:

class MyProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self._loop = loop
        self._queue = []

    async def stop(self):
        # wait for all queues to empty
        while self._queue:
            await asyncio.sleep(0.1)
        # disconnect
        self.close()
        self._loop.stop()

Очередь опустошается из метода data_received () протокола, поэтому я просто хочу подождать, пока это произойдет, используя цикл while с asyncio.sleep() вызов. После этого я закрываю соединение и останавливаю цикл.

Но как я могу вызвать этот метод из главная нить и ждать ее? Я попробовал следующее, Но ни один из них, кажется, не работает (протокол в настоящее время используется экземпляр MyProtocol):

loop.call_soon_threadsafe(protocol.stop)
loop.call_soon_threadsafe(functools.partial(asyncio.ensure_future, protocol.stop(), loop=loop))
asyncio.ensure_future(protocol.stop(), loop=loop)
Может ли кто-нибудь помочь мне здесь? Спасибо!
1 3

1 ответ:

В основном вы хотите запланировать сопрограмму на петле другого потока. Вы могли бы использовать run_coroutine_threadsafe:

future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop)
future.result()  # wait for results

Или старый стиль async как в https://stackoverflow.com/a/32084907/681044

import asyncio
from threading import Thread

loop = asyncio.new_event_loop()

def f(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

t = Thread(target=f, args=(loop,))
t.start()    

@asyncio.coroutine
def g():
    yield from asyncio.sleep(1)
    print('Hello, world!')

loop.call_soon_threadsafe(asyncio.async, g())