Маршрутизация задачи сельдерея в определенную очередь


У меня есть два отдельных процесса celeryd, запущенных на моем сервере, управляемых supervisor. Они настроены на прослушивание отдельных очередей как таковые:

[program:celeryd1]
command=/path/to/celeryd --pool=solo --queues=queue1
...

[program:celeryd2]
command=/path/to/celeryd --pool=solo --queues=queue2
...

И мой celeryconfig выглядит примерно так:

from celery.schedules import crontab

BROKER_URL = "amqp://guest:guest@localhost:5672//"

CELERY_DISABLE_RATE_LIMITS = True
CELERYD_CONCURRENCY = 1
CELERY_IGNORE_RESULT = True

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = {
    'default': {
        "exchange": "default",
        "binding_key": "default",
    },
    'queue1': {
        'exchange': 'queue1',
        'routing_key': 'queue1',
    },
    'queue2': {
        'exchange': 'queue2',
        'routing_key': 'queue2',
    },
}

CELERY_IMPORTS = ('tasks', )

CELERYBEAT_SCHEDULE = {
    'first-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_1'},
        'options': {'queue': 'queue1'},
    },
    'second-queue': {
        'task': 'tasks.sync',
        'schedule': crontab(hour=02, minute=00),
        'kwargs': {'client': 'client_2'},
        'options': {'queue': 'queue1'},
    },
}

Все задачи tasks.sync должны быть направлены в определенную очередь (и, следовательно, celeryd progress). Но когда я пытаюсь запустить задачу вручную с помощью sync.apply_async(kwargs={'client': 'value'}, queue='queue1'), оба работника сельдерея берут задачу. Как я могу сделать маршрут задачи к правильной очереди и выполнять его только тем работником, который привязан к очередь?

1 5

1 ответ:

Вы запускаете только один экземпляр celerybeat, верно?

Может быть, у вас есть старые привязки очереди, которые конфликтуют с этим? Попробуйте запустить rabbitmqctl list_queues и rabbitmqctl list_bindings, возможно, сбросить данные в брокере, чтобы начать с нуля.

Пример, который вы привели здесь, должен сработать, и он работает на меня, когда я только что попробовал его.

Совет: поскольку вы используете то же значение exchange и binding_key, что и имя очереди, вам не нужно явно перечислять их в CELERY_QUEUES. Когда CELERY_CREATE_MISSING_QUEUES является при включении (по умолчанию) очереди будут создаваться автоматически точно так же, как и у вас если вы просто делаете celeryd -Q queue1 или отправляете задачу в очередь, которая не определена.