Как я могу отправлять большие сообщения с Kafka (более 15 МБ)?


Я отправляю строковые сообщения в Kafka V. 0.8 с помощью API производителя Java. Если размер сообщения составляет около 15 мб, я получаю MessageSizeTooLargeException. Я попытался установить message.max.bytesдо 40 МБ, но я все равно получаю исключение. Небольшие сообщения работали без проблем.

(исключение появляется в производителе, у меня нет потребителя в этом приложении.)

что я могу сделать, чтобы избавиться от этого исключения?

мой пример производителя конфигурации

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Error-Log:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
5 66

5 ответов:

вам нужно настроить три (или четыре) свойства:

  • потребителя:fetch.message.max.bytes - это определит наибольший размер сообщения, которое может быть извлечено потребителем.
  • Брокер бок: replica.fetch.max.bytes - Это позволит репликам в брокерах отправлять сообщения в пределах кластера и убедиться, что сообщения реплицируются правильно. Если это слишком мало, то сообщение никогда не будет реплицировано, и поэтому потребитель никогда не увидит сообщение потому что сообщение никогда не будет зафиксировано (полностью реплицировано).
  • Брокер бок: message.max.bytes - Это самый большой размер сообщения, которое может быть получено брокером от производителя.
  • сторона брокера (по теме):max.message.bytes - Это самый большой размер сообщения, который брокер позволит добавить к теме. Этот размер проверяется перед сжатием. (По умолчанию брокер message.max.bytes.)

я узнал трудный путь о номере 2-вы не получите ни одного исключения, сообщения или предупреждения от Кафки, поэтому обязательно учитывайте это при отправке больших сообщений.

незначительные изменения, необходимые для Кафка 0.10 и новый потребитель по сравнению с laughing_man's answer:

  • Брокер: никаких изменений, вам все равно нужно увеличить свойства message.max.bytes и replica.fetch.max.bytes. message.max.bytes должно быть равно или меньше (*), чем replica.fetch.max.bytes.
  • Продюсер: Увеличение max.request.size для отправки большего сообщения.
  • Потребитель: Увеличение max.partition.fetch.bytes получать больше сообщения.

(*) читаю комментарии, чтобы узнать больше о message.max.bytesreplica.fetch.max.bytes

необходимо переопределить следующие свойства:

конфигурации брокера($KAFKA_HOME / config / server.свойства)

  • реплика.привести.максимум.байты
  • сообщение.максимум.байты

Consumer Configs($KAFKA_HOME/config / consumer.свойства)
этот шаг не работает для меня. Я добавляю его в потребительское приложение, и он работает штраф в размере

  • fetch.сообщение.максимум.байты

перезапустить сервер.

посмотрите на эту документацию для получения дополнительной информации: http://kafka.apache.org/08/configuration.html

идея состоит в том, чтобы иметь одинаковый размер сообщения, отправляемого от производителя Kafka брокеру Kafka, а затем получаемого потребителем Kafka, т. е.

производитель Кафки -- > Брокер Кафки -- > потребитель Кафки

предположим, что если требуется отправить 15 МБ сообщения, то производитель, Брокер и потребитель, все три, должны быть синхронизированы.

Кафка Продюсер отправляет 15 МБ -->Кафка Брокер позволяет / хранит 15 МБ -->Кафка Потребителя получает 15 МБ

поэтому настройка должна быть А.) На Брокера: сообщение.максимум.байт=15728640 копия.привести.максимум.байт=15728640

Б) На Потребителя: привести.сообщение.максимум.байт=15728640

одна ключевая вещь, чтобы помнить, что должен быть синхронизация с потребителем fetch.message.max.bytes собственность. размер выборки должен быть по крайней мере таким же большим, как максимальный размер сообщения, иначе может возникнуть ситуация, когда производители могут отправлять сообщения больше, чем потребитель может потреблять/извлекать. Возможно, стоит взглянуть на это.
Какую версию Кафки вы используете? Также предоставьте некоторые дополнительные сведения трассировки, которые вы получаете. есть ли что-то подобное ... payload size of xxxx larger than 1000000 поднимаемся в журнал?