Весенний облачный поток с кроличьей связкой-имена очередей источник / приемник не совпадают
Недавно я начал играть с Spring Cloud Stream и RabbitMQ binder.
Если я все правильно понял, когда две службы хотят передать сообщение, одна должна настроить Источник для отправки сообщений, а другая должна настроить приемник для приема сообщений - оба должны использовать один и тот же канал.
У меня есть канал С именем testchannel
. Однако я заметил, что source создал RabbitMQ привязка:
- обмен
testchannel
, - ключ маршрутизации
testchannel
, - очередь
testchannel.default
(долговечная),
While sink создал привязку RabbitMQ:
- обмен
testchannel
, - ключ маршрутизации
#
, - очередь
testchannel.anonymous.RANDOM_ID
(excusive).
Я пропустил префикс, для краткости.
Теперь, когда я запустил оба приложения. Сначала он отправляет сообщение в testchannel
exchange, которое затем направляется в обе очереди (я предполагаю, что ключ маршрутизации - testchannel
). Второе приложение использует сообщение из случайной очереди, но сообщение из очереди по умолчанию никогда не используется.
Моя другая проблема - 2-е приложение использует только приемник, но он также создает привязку для выходного канала, который является output
по умолчанию, потому что я ничего не указал.
Я строю оба приложения с одним и тем же скриптом Gradle:
buildscript {
ext {
springBootVersion = '1.3.2.RELEASE'
}
repositories {
mavenCentral()
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
apply plugin: 'java'
apply plugin: 'spring-boot'
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/snapshot' }
maven { url 'https://repo.spring.io/milestone' }
}
dependencies {
compile(
'org.springframework.cloud:spring-cloud-starter-stream-rabbit',
)
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Brixton.BUILD-SNAPSHOT"
}
}
Первые свойства приложения:
server.port=8010
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=start
spring.cloud.stream.bindings.output=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
Исходный код приложения Fisrt:
@EnableBinding(Processor.class)
...
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public byte[] handleIncomingMessage(byte[] payload) {}
Второе приложение свойства:
server.port=8011
spring.cloud.stream.binder.rabbit.default.prefix=z.
spring.cloud.stream.bindings.input=testchannel
spring.rabbitmq.addresses=host1:5672,host2:5672
spring.rabbitmq.username=user
spring.rabbitmq.password=psw
Второй исходный код Приложения:
@EnableBinding(Sink.class)
...
@ServiceActivator(inputChannel = Sink.INPUT)
public void handleIncomingMessage(byte[] payload) {}
Таковы мои вопросы.
- Не Должны ли Источник и приемник использовать один и тот же канал и, как результат, одну и ту же очередь брокера? Что такое правильная конфигурация для достижения этого? (Моя цель - иметь несколько экземпляров службыприемника , но только один должен использовать сообщение.)
- должен ли фреймворк создавать привязку вывода, когда я использую только приемник? Если да, то как отключить оно.
1 ответ:
По умолчанию; каждый потребитель получает свою собственную очередь; это сценарий публикации / подписки.
Существует понятие потребителя
group
, поэтому вы можете иметь несколько экземпляров, конкурирующих за сообщения из одной очереди.При привязке производителя привязывается очередь по умолчанию.
Если вы хотите подписаться на группу
default
, Вы должны установить группу:spring.cloud.stream.bindings.input.group=default
Если вы не предоставляете группу,вы получаете эксклюзивную очередь автоматического удаления.
EDIT
С тех пор очередь по умолчанию долговечна, вы также должны установить
spring.cloud.stream.bindings.input.durableSubscription=true
Чтобы избежать предупреждения, когда потребитель связывается, и чтобы убедиться, что очередь долговечна, если потребитель связывается первым, а очередь еще не существует.