Реализация и тестирование таймаута подключения WebSocket server


Я внедряю сервер WebSockets в Tornado 3.2. Клиент, подключающийся к серверу, не будет браузером.

Для случаев, когда между сервером и клиентом существует обратная связь, я хотел бы добавить max. время, в течение которого сервер будет ждать ответа клиента перед закрытием соединения.

Это примерно то, что я пытался сделать:

import datetime
import tornado

class WSHandler(WebSocketHandler):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timeout = None

    def _close_on_timeout(self):
        if self.ws_connection:
            self.close()

    def open(self):
        initialize()

    def on_message(self, message):
        # Remove previous timeout, if one exists.
        if self.timeout:
            tornado.ioloop.IOLoop.instance().remove_timeout(self.timeout)
            self.timeout = None

        if is_last_message:
            self.write_message(message)
            self.close()
        else:
            # Add a new timeout.
            self.timeout = tornado.ioloop.IOLoop.instance().add_timeout(
                datetime.timedelta(milliseconds=1000), self._close_on_timeout)
            self.write_message(message)

Я что, недотепа и есть ли более простой способ сделать это? Я даже не могу, кажется, запланировать простой распечатайте инструкцию через add_timeout выше.

Мне также нужна помощь в тестировании этого. Вот что у меня есть до сих пор:

from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time

class WSTests(AsyncHTTPTestCase):

    @gen_test
    def test_long_response(self):
        ws = yield websocket_connect('ws://address', io_loop=self.io_loop)

        # First round trip.
        ws.write_message('First message.')
        result = yield ws.read_message()
        self.assertEqual(result, 'First response.')

        # Wait longer than the timeout.
        # The test is in its own IOLoop, so a blocking sleep should be okay?
        time.sleep(1.1)

        # Expect either write or read to fail because of a closed socket.
        ws.write_message('Second message.')
        result = yield ws.read_message()

        self.assertNotEqual(result, 'Second response.')

У клиента нет проблем с записью и чтением из сокета. Это, вероятно, потому, что add_timeout не срабатывает.

Должен ли тест каким-то образом уступить, чтобы разрешить обратный вызов таймаута на сервере? Я бы подумал, что нет, поскольку врачи говорят, что тесты выполняются в их собственном IOLoop.

Edit

Это и есть работа версия, согласно предложениям Бена.

import datetime
import tornado

class WSHandler(WebSocketHandler):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timeout = None

    def _close_on_timeout(self):
        if self.ws_connection:
            self.close()

    def open(self):
        initialize()

    def on_message(self, message):
        # Remove previous timeout, if one exists.
        if self.timeout:
            tornado.ioloop.IOLoop.current().remove_timeout(self.timeout)
            self.timeout = None

        if is_last_message:
            self.write_message(message)
            self.close()
        else:
            # Add a new timeout.
            self.timeout = tornado.ioloop.IOLoop.current().add_timeout(
                datetime.timedelta(milliseconds=1000), self._close_on_timeout)
            self.write_message(message)

Испытание:

from tornado.websocket import websocket_connect
from tornado.testing import AsyncHTTPTestCase, gen_test
import time

class WSTests(AsyncHTTPTestCase):

    @gen_test
    def test_long_response(self):
        ws = yield websocket_connect('ws://address', io_loop=self.io_loop)

        # First round trip.
        ws.write_message('First message.')
        result = yield ws.read_message()
        self.assertEqual(result, 'First response.')

        # Wait a little more than the timeout.
        yield gen.Task(self.io_loop.add_timeout, datetime.timedelta(seconds=1.1))

        # Expect either write or read to fail because of a closed socket.
        ws.write_message('Second message.')
        result = yield ws.read_message()
        self.assertEqual(result, None)
2 2

2 ответа:

Код обработки таймаута в вашем первом примере кажется мне правильным.

Для тестирования каждый тестовый случай получает свой собственный IOLoop, но существует только один IOLoop как для теста, так и для всего остального, что он выполняет, поэтому вы должны использовать add_timeout вместо time.sleep () здесь также, чтобы избежать блокировки сервера.

Эй Бен, я знаю, что этот вопрос был давно решен, но я хотел поделиться с любым пользователем, читающим это решение, которое я сделал для этого. Он в основном основан на Вашем, но он решает проблему с помощью внешнего сервиса, который может быть легко интегрирован в любой websocket, используя композицию вместо наследования:

class TimeoutWebSocketService():
    _default_timeout_delta_ms = 10 * 60 * 1000  # 10 min

    def __init__(self, websocket, ioloop=None, timeout=None):
        # Timeout
        self.ioloop = ioloop or tornado.ioloop.IOLoop.current()
        self.websocket = websocket
        self._timeout = None
        self._timeout_delta_ms = timeout or TimeoutWebSocketService._default_timeout_delta_ms

    def _close_on_timeout(self):
        self._timeout = None
        if self.websocket.ws_connection:
            self.websocket.close()

    def refresh_timeout(self, timeout=None):
        timeout = timeout or self._timeout_delta_ms
        if timeout > 0:
            # Clean last timeout, if one exists
            self.clean_timeout()

            # Add a new timeout (must be None from clean).
            self._timeout = self.ioloop.add_timeout(
                datetime.timedelta(milliseconds=timeout), self._close_on_timeout)

    def clean_timeout(self):
        if self._timeout is not None:
            # Remove previous timeout, if one exists.
            self.ioloop.remove_timeout(self._timeout)
            self._timeout = None

Для того, чтобы использовать сервис, это так же просто, как создать новый экземпляр TimeoutWebService (необязательно с таймаутом в ms, а также ioloop, где он должен быть выполняется) и вызовите метод "refresh_timeout", чтобы либо установить тайм-аут в первый раз, либо сбросить уже существующий тайм-аут, либо" clean_timeout", чтобы остановить службу тайм-аута.

class BaseWebSocketHandler(WebSocketHandler):
    def prepare(self):
        self.timeout_service = TimeoutWebSocketService(timeout=(1000*60))

        ## Optionally starts the service here 
        self.timeout_service.refresh_timeout()

        ## rest of prepare method 

    def on_message(self):
        self.timeout_service.refresh_timeout()

    def on_close(self):
        self.timeout_service.clean_timeout()
Благодаря этому подходу вы можете контролировать, когда именно и при каких условиях вы хотите перезапустить тайм-аут, который может отличаться от приложения к приложению. Например, вы можете обновить тайм-аут только в том случае, если пользователь выполняет условия X или если сообщение является ожидаемым.

Я надеюсь ppl наслаждайтесь этим решением !