Потребитель Kafka восстанавливается после неудачной обработки сообщений


Я работаю с простым потребителем Кафки в одном из моих проектов, и моя желаемая логика заключается в том, что когда потребитель не смог обработать какое-то сообщение, он зафиксирует последнее правильно обработанное сообщение, а затем при следующем опросе он продолжит работу с неудачным сообщением.

Я попытался зафиксировать каждое сообщение вручную со следующим кодом:

public void fetchMessages() {
  ConsumerRecords<String, MyObject> messages = kafkaConsumer.poll(10000);
  for (ConsumerRecord message : messages) {
      logger.info("Reading kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], offset ["+message.offset()+"]");
      try {
          MyObject myObject = (MyObject) message.value();
          logger.info("Handling message," + myObject);
          handleMessage(myObject);
          commitMessage(message);
      } catch (Exception e) {
          logger.error("Error handling message");              throw e;
      }
  }
}


private void commitMessage(ConsumerRecord message) {
        long              nextOffset        = message.offset() + 1;

        TopicPartition    topicPartition    = new TopicPartition(kafkaTopic,message.partition());
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(nextOffset);

        Map<TopicPartition,OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
        offsetAndMetadataMap.put(topicPartition,offsetAndMetadata);

        logger.info("Commiting processed kafka message, topic ["+kafkaTopic+"], partition ["+message.partition()+"], next offset ["+nextOffset+"]");
        kafkaConsumer.commitSync(offsetAndMetadataMap);
}

Но, например, когда я получаю 3 сообщения, каждое из которых из разных разделов, я успешно обрабатываю первое, а затем не могу обработать второе сообщение, я просто выйдите из ConsumerRecords для цикла, и я ожидаю получить те же 2 сообщения, которые я еще не зафиксировал в следующей итерации poll. Вместо этого потребитель просто продолжает получать новые сообщения и никогда не возвращается к неудачным сообщениям.

Также пытался применить seek к неудачному сообщению, а затем выйти из цикла, но он работает на 1 разделе и не работает на многих.

kafkaConsumer.seek(new TopicPartition(kafkaTopic,message.partition()),message.offset());    

Некоторые детали:

  • тема имеет 12 разделов
  • один потребитель для всех разделов
  • потребитель выполняет цикл опроса один в минуту
  • включить.авто.фиксация: false
Что не так с моим кодом или с моей логикой?
1 2

1 ответ:

Я нашел, как работает seek, и при неудачном сообщении я должен искать все смещения для всех разделов текущего потребителя.

private void seekAllPartitions() {
    logger.info("Processing of some kafka message was failed, seeking all partitions to last committed");
    List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(kafkaTopic);
    for (PartitionInfo partitionInfo : partitionInfos) {
        TopicPartition topicPartition = new TopicPartition(kafkaTopic, partitionInfo.partition());
        OffsetAndMetadata committedForPartition = kafkaConsumer.committed(topicPartition);
        if (committedForPartition != null) {
            kafkaConsumer.seek(topicPartition,committedForPartition.offset());
        }
    }
}

Проверка Null для committedForPartition необходима, когда последнее смещение некоторой группы потребителей на некотором разделе еще не установлено (неизвестно)