Реализация и тестирование таймаута подключения WebSocket server
Я внедряю сервер WebSockets в Tornado 3.2. Клиент, подключающийся к серверу, не будет браузером.
Для случаев, когда между сервером и клиентом существует обратная связь, я хотел бы добавить max. время, в течение которого сервер будет ждать ответа клиента перед закрытием соединения.
Это примерно то, что я пытался сделать:
import datetime
import tornado
class WSHandler(WebSocketHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timeout = None
def _close_on_timeout(self):
if self.ws_connection:
self.close()
def open(self):
initialize()
def on_message(self, message):
# Remove previous timeout, if one exists.
if self.timeout:
tornado.ioloop.IOLoop.instance().remove_timeout(self.timeout)
self.timeout = None
if is_last_message:
self.write_message(message)
self.close()
else:
# Add a new timeout.
self.timeout = tornado.ioloop.IOLoop.instance().add_timeout(
datetime.timedelta(milliseconds=1000), self._close_on_timeout)
self.write_message(message)
Я что, недотепа и есть ли более простой способ сделать это? Я даже не могу, кажется, запланировать простой распечатайте инструкцию через add_timeout выше.
Мне также нужна помощь в тестировании этого. Вот что у меня есть до сих пор:
from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time
class WSTests(AsyncHTTPTestCase):
@gen_test
def test_long_response(self):
ws = yield websocket_connect('ws://address', io_loop=self.io_loop)
# First round trip.
ws.write_message('First message.')
result = yield ws.read_message()
self.assertEqual(result, 'First response.')
# Wait longer than the timeout.
# The test is in its own IOLoop, so a blocking sleep should be okay?
time.sleep(1.1)
# Expect either write or read to fail because of a closed socket.
ws.write_message('Second message.')
result = yield ws.read_message()
self.assertNotEqual(result, 'Second response.')
У клиента нет проблем с записью и чтением из сокета. Это, вероятно, потому, что add_timeout не срабатывает.
Должен ли тест каким-то образом уступить, чтобы разрешить обратный вызов таймаута на сервере? Я бы подумал, что нет, поскольку врачи говорят, что тесты выполняются в их собственном IOLoop.
Edit
Это и есть работа версия, согласно предложениям Бена.
import datetime
import tornado
class WSHandler(WebSocketHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timeout = None
def _close_on_timeout(self):
if self.ws_connection:
self.close()
def open(self):
initialize()
def on_message(self, message):
# Remove previous timeout, if one exists.
if self.timeout:
tornado.ioloop.IOLoop.current().remove_timeout(self.timeout)
self.timeout = None
if is_last_message:
self.write_message(message)
self.close()
else:
# Add a new timeout.
self.timeout = tornado.ioloop.IOLoop.current().add_timeout(
datetime.timedelta(milliseconds=1000), self._close_on_timeout)
self.write_message(message)
Испытание:
from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time
class WSTests(AsyncHTTPTestCase):
@gen_test
def test_long_response(self):
ws = yield websocket_connect('ws://address', io_loop=self.io_loop)
# First round trip.
ws.write_message('First message.')
result = yield ws.read_message()
self.assertEqual(result, 'First response.')
# Wait a little more than the timeout.
yield gen.Task(self.io_loop.add_timeout, datetime.timedelta(seconds=1.1))
# Expect either write or read to fail because of a closed socket.
ws.write_message('Second message.')
result = yield ws.read_message()
self.assertEqual(result, None)
2 ответа:
Код обработки таймаута в вашем первом примере кажется мне правильным.
Для тестирования каждый тестовый случай получает свой собственный IOLoop, но существует только один IOLoop как для теста, так и для всего остального, что он выполняет, поэтому вы должны использовать add_timeout вместо time.sleep () здесь также, чтобы избежать блокировки сервера.
Эй Бен, я знаю, что этот вопрос был давно решен, но я хотел поделиться с любым пользователем, читающим это решение, которое я сделал для этого. Он в основном основан на Вашем, но он решает проблему с помощью внешнего сервиса, который может быть легко интегрирован в любой websocket, используя композицию вместо наследования:
class TimeoutWebSocketService(): _default_timeout_delta_ms = 10 * 60 * 1000 # 10 min def __init__(self, websocket, ioloop=None, timeout=None): # Timeout self.ioloop = ioloop or tornado.ioloop.IOLoop.current() self.websocket = websocket self._timeout = None self._timeout_delta_ms = timeout or TimeoutWebSocketService._default_timeout_delta_ms def _close_on_timeout(self): self._timeout = None if self.websocket.ws_connection: self.websocket.close() def refresh_timeout(self, timeout=None): timeout = timeout or self._timeout_delta_ms if timeout > 0: # Clean last timeout, if one exists self.clean_timeout() # Add a new timeout (must be None from clean). self._timeout = self.ioloop.add_timeout( datetime.timedelta(milliseconds=timeout), self._close_on_timeout) def clean_timeout(self): if self._timeout is not None: # Remove previous timeout, if one exists. self.ioloop.remove_timeout(self._timeout) self._timeout = None
Для того, чтобы использовать сервис, это так же просто, как создать новый экземпляр TimeoutWebService (необязательно с таймаутом в ms, а также ioloop, где он должен быть выполняется) и вызовите метод "refresh_timeout", чтобы либо установить тайм-аут в первый раз, либо сбросить уже существующий тайм-аут, либо" clean_timeout", чтобы остановить службу тайм-аута.
Благодаря этому подходу вы можете контролировать, когда именно и при каких условиях вы хотите перезапустить тайм-аут, который может отличаться от приложения к приложению. Например, вы можете обновить тайм-аут только в том случае, если пользователь выполняет условия X или если сообщение является ожидаемым.class BaseWebSocketHandler(WebSocketHandler): def prepare(self): self.timeout_service = TimeoutWebSocketService(timeout=(1000*60)) ## Optionally starts the service here self.timeout_service.refresh_timeout() ## rest of prepare method def on_message(self): self.timeout_service.refresh_timeout() def on_close(self): self.timeout_service.clean_timeout()
Я надеюсь ppl наслаждайтесь этим решением !