Как я могу отправлять большие сообщения с Kafka (более 15 МБ)?
Я отправляю строковые сообщения в Kafka V. 0.8 с помощью API производителя Java.
Если размер сообщения составляет около 15 мб, я получаю MessageSizeTooLargeException.
Я попытался установить message.max.bytesдо 40 МБ, но я все равно получаю исключение. Небольшие сообщения работали без проблем.
(исключение появляется в производителе, у меня нет потребителя в этом приложении.)
что я могу сделать, чтобы избавиться от этого исключения?
мой пример производителя конфигурации
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Error-Log:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
5 ответов:
вам нужно настроить три (или четыре) свойства:
- потребителя:
fetch.message.max.bytes- это определит наибольший размер сообщения, которое может быть извлечено потребителем.- Брокер бок:
replica.fetch.max.bytes- Это позволит репликам в брокерах отправлять сообщения в пределах кластера и убедиться, что сообщения реплицируются правильно. Если это слишком мало, то сообщение никогда не будет реплицировано, и поэтому потребитель никогда не увидит сообщение потому что сообщение никогда не будет зафиксировано (полностью реплицировано).- Брокер бок:
message.max.bytes- Это самый большой размер сообщения, которое может быть получено брокером от производителя.- сторона брокера (по теме):
max.message.bytes- Это самый большой размер сообщения, который брокер позволит добавить к теме. Этот размер проверяется перед сжатием. (По умолчанию брокерmessage.max.bytes.)я узнал трудный путь о номере 2-вы не получите ни одного исключения, сообщения или предупреждения от Кафки, поэтому обязательно учитывайте это при отправке больших сообщений.
незначительные изменения, необходимые для Кафка 0.10 и новый потребитель по сравнению с laughing_man's answer:
- Брокер: никаких изменений, вам все равно нужно увеличить свойства
message.max.bytesиreplica.fetch.max.bytes.message.max.bytesдолжно быть равно или меньше (*), чемreplica.fetch.max.bytes.- Продюсер: Увеличение
max.request.sizeдля отправки большего сообщения.- Потребитель: Увеличение
max.partition.fetch.bytesполучать больше сообщения.(*) читаю комментарии, чтобы узнать больше о
message.max.bytesreplica.fetch.max.bytes
необходимо переопределить следующие свойства:
конфигурации брокера($KAFKA_HOME / config / server.свойства)
- реплика.привести.максимум.байты
- сообщение.максимум.байты
Consumer Configs($KAFKA_HOME/config / consumer.свойства)
этот шаг не работает для меня. Я добавляю его в потребительское приложение, и он работает штраф в размере
- fetch.сообщение.максимум.байты
перезапустить сервер.
посмотрите на эту документацию для получения дополнительной информации: http://kafka.apache.org/08/configuration.html
идея состоит в том, чтобы иметь одинаковый размер сообщения, отправляемого от производителя Kafka брокеру Kafka, а затем получаемого потребителем Kafka, т. е.
производитель Кафки -- > Брокер Кафки -- > потребитель Кафки
предположим, что если требуется отправить 15 МБ сообщения, то производитель, Брокер и потребитель, все три, должны быть синхронизированы.
Кафка Продюсер отправляет 15 МБ -->Кафка Брокер позволяет / хранит 15 МБ -->Кафка Потребителя получает 15 МБ
поэтому настройка должна быть А.) На Брокера: сообщение.максимум.байт=15728640 копия.привести.максимум.байт=15728640
Б) На Потребителя: привести.сообщение.максимум.байт=15728640
одна ключевая вещь, чтобы помнить, что должен быть синхронизация с потребителем
fetch.message.max.bytesсобственность. размер выборки должен быть по крайней мере таким же большим, как максимальный размер сообщения, иначе может возникнуть ситуация, когда производители могут отправлять сообщения больше, чем потребитель может потреблять/извлекать. Возможно, стоит взглянуть на это.
Какую версию Кафки вы используете? Также предоставьте некоторые дополнительные сведения трассировки, которые вы получаете. есть ли что-то подобное ...payload size of xxxx larger than 1000000поднимаемся в журнал?