Весенний облачный поток с кроличьей связкой-имена очередей источник / приемник не совпадают


Недавно я начал играть с Spring Cloud Stream и RabbitMQ binder.

Если я все правильно понял, когда две службы хотят передать сообщение, одна должна настроить Источник для отправки сообщений, а другая должна настроить приемник для приема сообщений - оба должны использовать один и тот же канал.

У меня есть канал С именем testchannel. Однако я заметил, что source создал RabbitMQ привязка:

  • обмен testchannel,
  • ключ маршрутизации testchannel,
  • очередь testchannel.default (долговечная),

While sink создал привязку RabbitMQ:

  • обмен testchannel,
  • ключ маршрутизации #,
  • очередь testchannel.anonymous.RANDOM_ID (excusive).

Я пропустил префикс, для краткости.

Теперь, когда я запустил оба приложения. Сначала он отправляет сообщение в testchannel exchange, которое затем направляется в обе очереди (я предполагаю, что ключ маршрутизации - testchannel). Второе приложение использует сообщение из случайной очереди, но сообщение из очереди по умолчанию никогда не используется.

Моя другая проблема - 2-е приложение использует только приемник, но он также создает привязку для выходного канала, который является output по умолчанию, потому что я ничего не указал.

Я строю оба приложения с одним и тем же скриптом Gradle:

buildscript {
    ext {
        springBootVersion = '1.3.2.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'spring-boot'

repositories {
    mavenCentral()
    maven { url 'https://repo.spring.io/snapshot' }
    maven { url 'https://repo.spring.io/milestone' }
}

dependencies {
    compile(
            'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
    )
}

dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
    }
}

Первые свойства приложения:

server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

Исходный код приложения Fisrt:

@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}

Второе приложение свойства:

server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw

Второй исходный код Приложения:

@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}
Таковы мои вопросы.
  • Не Должны ли Источник и приемник использовать один и тот же канал и, как результат, одну и ту же очередь брокера? Что такое правильная конфигурация для достижения этого? (Моя цель - иметь несколько экземпляров службыприемника , но только один должен использовать сообщение.)
  • должен ли фреймворк создавать привязку вывода, когда я использую только приемник? Если да, то как отключить оно.
1 3

1 ответ:

По умолчанию; каждый потребитель получает свою собственную очередь; это сценарий публикации / подписки.

Существует понятие потребителя group, поэтому вы можете иметь несколько экземпляров, конкурирующих за сообщения из одной очереди.

При привязке производителя привязывается очередь по умолчанию.

Если вы хотите подписаться на группу default, Вы должны установить группу:

spring.cloud.stream.bindings.input.group=default

Если вы не предоставляете группу,вы получаете эксклюзивную очередь автоматического удаления.

EDIT

С тех пор очередь по умолчанию долговечна, вы также должны установить

spring.cloud.stream.bindings.input.durableSubscription=true

Чтобы избежать предупреждения, когда потребитель связывается, и чтобы убедиться, что очередь долговечна, если потребитель связывается первым, а очередь еще не существует.