Многопоточный asyncio в Python
В настоящее время я делаю свои первые шаги с asyncio в Python 3.5, и есть одна проблема, которая меня беспокоит. Очевидно, я не до конца понял соратников...
Вот упрощенная версия того, что я делаю.В моем классе есть метод open (), который создает новый поток. В этом потоке я создаю новый цикл событий и соединение сокета с некоторым хостом. Затем я позволил петле бежать вечно.
def open(self):
# create thread
self.thread = threading.Thread(target=self._thread)
self.thread.start()
# wait for connection
while self.protocol is None:
time.sleep(0.1)
def _thread(self):
# create loop, connection and run forever
self.loop = asyncio.new_event_loop()
coro = self.loop.create_connection(lambda: MyProtocol(self.loop),
'somehost.com', 1234)
self.loop.run_until_complete(coro)
self.loop.run_forever()
Остановить соединение теперь довольно просто, я просто останавливаю цикл из основного потока:
loop.call_soon_threadsafe(loop.stop)
К сожалению, мне нужно сделать некоторую очистку, особенно мне нужно очистить очередь перед отключением от сервера. Поэтому я попробовал что-то вроде этого метода stop() в MyProtocol:
class MyProtocol(asyncio.Protocol):
def __init__(self, loop):
self._loop = loop
self._queue = []
async def stop(self):
# wait for all queues to empty
while self._queue:
await asyncio.sleep(0.1)
# disconnect
self.close()
self._loop.stop()
Очередь опустошается из метода data_received () протокола, поэтому я просто хочу подождать, пока это произойдет, используя цикл while с asyncio.sleep() вызов. После этого я закрываю соединение и останавливаю цикл.
Но как я могу вызвать этот метод из главная нить и ждать ее? Я попробовал следующее, Но ни один из них, кажется, не работает (протокол в настоящее время используется экземпляр MyProtocol):
loop.call_soon_threadsafe(protocol.stop)
loop.call_soon_threadsafe(functools.partial(asyncio.ensure_future, protocol.stop(), loop=loop))
asyncio.ensure_future(protocol.stop(), loop=loop)
Может ли кто-нибудь помочь мне здесь? Спасибо!1 ответ:
В основном вы хотите запланировать сопрограмму на петле другого потока. Вы могли бы использовать
run_coroutine_threadsafe
:future = asyncio.run_coroutine_threadsafe(protocol.stop, loop=loop) future.result() # wait for results
Или старый стиль
async
как в https://stackoverflow.com/a/32084907/681044import asyncio from threading import Thread loop = asyncio.new_event_loop() def f(loop): asyncio.set_event_loop(loop) loop.run_forever() t = Thread(target=f, args=(loop,)) t.start() @asyncio.coroutine def g(): yield from asyncio.sleep(1) print('Hello, world!') loop.call_soon_threadsafe(asyncio.async, g())