Модульное тестирование с Django-сельдерей?
Я пытаюсь придумать методику тестирования для наших Джанго-сельдерей. Я прочитал заметки в документация, но это не дало мне хорошего представления о том, что на самом деле делать. Я не беспокоюсь о тестировании задач в реальных демонах, просто функциональность мой код. В основном мне интересно:
- как мы можем обойти
task.delay()
во время теста (я пробовал параметрCELERY_ALWAYS_EAGER = True
но это не разница)? - как мы используем рекомендуемые параметры тестирования (если это лучший способ) без фактического изменения наших settings.py?
- мы все еще можем использовать
manage.py test
или мы должны использовать пользовательский бегун?
в целом любые подсказки или советы для тестирования с сельдереем было бы очень полезно.
4 ответа:
попробовать настройка:
BROKER_BACKEND = 'memory'
(спасибо asksolкомментарий.)
Я хотел бы использовать override_settings декоратор на тесты, которые должны сельдерей результаты для завершения.
from django.test import TestCase from django.test.utils import override_settings from myapp.tasks import mytask class AddTestCase(TestCase): @override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CELERY_ALWAYS_EAGER=True, BROKER_BACKEND='memory') def test_mytask(self): result = mytask.delay() self.assertTrue(result.successful())
Если вы хотите применить это ко всем тестам, вы можете использовать тестовый бегун сельдерея, как описано в http://docs.celeryproject.org/en/2.5/django/unit-testing.html который в основном устанавливает эти же настройки, за исключением (
BROKER_BACKEND = 'memory'
).настройки:
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
посмотрите на источник для CeleryTestSuiteRunner, и это довольно ясно, что событие.
вот отрывок из моего базового класса тестирования, который заглушает
apply_async
метод и записи к вызовам к нему (который включаетTask.delay
.) Это немного грубо, но мне удалось удовлетворить мои потребности за последние несколько месяцев, когда я его использовал.from django.test import TestCase from celery.task.base import Task # For recent versions, Task has been moved to celery.task.app: # from celery.app.task import Task # See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html class CeleryTestCaseBase(TestCase): def setUp(self): super(CeleryTestCaseBase, self).setUp() self.applied_tasks = [] self.task_apply_async_orig = Task.apply_async @classmethod def new_apply_async(task_class, args=None, kwargs=None, **options): self.handle_apply_async(task_class, args, kwargs, **options) # monkey patch the regular apply_sync with our method Task.apply_async = new_apply_async def tearDown(self): super(CeleryTestCaseBase, self).tearDown() # Reset the monkey patch to the original method Task.apply_async = self.task_apply_async_orig def handle_apply_async(self, task_class, args=None, kwargs=None, **options): self.applied_tasks.append((task_class, tuple(args), kwargs)) def assert_task_sent(self, task_class, *args, **kwargs): was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2] for task in self.applied_tasks) self.assertTrue(was_sent, 'Task not called w/class %s and args %s' % (task_class, args)) def assert_task_not_sent(self, task_class): was_sent = any(task_class == task[0] for task in self.applied_tasks) self.assertFalse(was_sent, 'Task was not expected to be called, but was. Applied tasks: %s' % self.applied_tasks)
вот пример того, как вы будете использовать его в своих тестовых случаях:
mymodule.py
from my_tasks import SomeTask def run_some_task(should_run): if should_run: SomeTask.delay(1, some_kwarg=2)
test_mymodule.py
class RunSomeTaskTest(CeleryTestCaseBase): def test_should_run(self): run_some_task(should_run=True) self.assert_task_sent(SomeTask, 1, some_kwarg=2) def test_should_not_run(self): run_some_task(should_run=False) self.assert_task_not_sent(SomeTask)
поскольку я все еще вижу, что это появляется в результатах поиска, настройки переопределяются с помощью
TEST_RUNNER = 'djcelery.contrib.test_runner.CeleryTestSuiteRunner'
работал на меня, как на Сельдерей Docs