Как получить данные из старой точки смещения в Кафке?


Я использую zookeeper, чтобы получить данные от Кафки. И здесь я всегда получаю данные из последней точки смещения. Есть ли способ указать время смещения для получения старых данных?

Есть один вариант autooffset.сброс. Он принимает самые маленькие или самые большие. Может кто-нибудь объяснить, что такое самый маленький и самый большой? Может autooffset.сброс помогает в получении данных из старой точки смещения вместо последней точки смещения?

7 33

7 ответов:

Потребители всегда принадлежат к группе, и для каждого раздела смотритель зоопарка отслеживает прогресс этой группы потребителей в разделе.

Чтобы извлечь с самого начала, вы можете удалить все данные, связанные с прогрессом, как упоминал Хусейн

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

Вы также можете указать смещение раздела, которое вы хотите, как указано в core / src/main/scala/kafka/tools / UpdateOffsetsInZK.scala

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)

Однако смещение не индексируется по времени, но вы знаете для каждого разбиение-это последовательность.

Если ваше сообщение содержит метку времени (и помните, что эта метка времени не имеет ничего общего с моментом, когда Кафка получил ваше сообщение), вы можете попробовать сделать индексатор, который пытается извлечь одну запись шагами, увеличивая смещение на N, и сохранить кортеж (тема X, часть 2, смещение 100, метка времени) где-нибудь.

Если вы хотите получить записи из указанного момента времени, вы можете применить двоичный поиск к вашему грубому индексу, пока не найдете запись, которую вы хотите, и принести оттуда.

Из документации Кафки они говорят: "Кафка.прикладной программный интерфейс.OffsetRequest.Earlieresttime() находит начало данных в журналах и начинает потоковую передачу оттуда, Кафка.прикладной программный интерфейс.OffsetRequest.LatestTime() будет передавать только новые сообщения. Не думайте, что смещение 0 является начальным смещением, так как сообщения устаревают из журнала с течением времени. "

Используйте SimpleConsumerExample здесь: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0 + SimpleConsumer+Пример

Аналогично Вопрос: Kafka High Level Consumer Fetch All Messages From Topic Using Java API (эквивалент --from-beginning)

Это может помочь

См. ДОК о конфигурации Кафки: http://kafka.apache.org/08/configuration.html для вашего запроса на наименьшее и наибольшее значения параметра offset.

Кстати, когда я изучал Кафку, мне было интересно, как воспроизвести все сообщения для потребителя. Я имею в виду, если потребительская группа опросила все сообщения и хочет их повторно получить.

Это можно сделать, удалив данные из zookeeper. Используйте Кафку.utils.Класс ZkUtils для удаления узла в zookeeper. Ниже его использование:

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

А Пока

Часто задаваемые вопросы Кафки дают ответ на эту проблему.

Как я могу точно получить смещения сообщений для определенной метки времени с помощью OffsetRequest?

Кафка позволяет запрашивать смещения сообщений по времени и делает это при сегментной детализации. Параметр timestamp является меткой времени unix, и запрос смещения по метке времени возвращает последнее возможное смещение сообщения, которое добавляется не позднее указанной метки времени. Есть 2 специальных значения метки времени-самая последняя и самая ранняя. Для любого другого значения метки времени unix Kafka получит начальное смещение сегмента журнала, созданного не позднее указанной метки времени. Из-за этого, а также из-за того, что запрос смещения подается только при детализации сегмента, запрос извлечения смещения возвращает менее точные результаты для больших размеров сегмента.

Для получения более точных результатов можно настроить размер сегмента журнала в зависимости от времени (log.roll.ms) вместо размера (бревно.сегмент.байты). Однако следует соблюдать осторожность, так как это может привести к увеличению числа обработчиков файлов из-за частого перемещения сегментов журнала.


План На Будущее

Кафка добавит метку времени в формат сообщения. См.

Https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

Kafka Protocol Doc-отличный источник для воспроизведения запросов / ответов / смещений / сообщений: https://cwiki.apache.org/confluence/display/KAFKA/A + руководство+к + протокол + Кафка вы используете простой потребительский пример, где следующий код демонстрирует состояние:

FetchRequest req = new FetchRequestBuilder()

        .clientId(clientName)

        .addFetch(a_topic, a_partition, readOffset, 100000) 

        .build();

FetchResponse fetchResponse = simpleConsumer.fetch(req);

Установите значение readOffset для начала начального смещения. но вам нужно проверить максимальное смещение, а также выше будет предоставлено ограниченное количество смещений в соответствии с FetchSize в последнем параме метода addFetch.

Используя KafkaConsumer, вы можете использовать Seek, SeekToBeginning и SeekToEnd для перемещения в потоке.

Https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(java.util.Collection)

Кроме того, если раздел не указан, он будет стремиться к первому смещению для всех назначенных в данный момент разделов.

Вы пробовали это?

Bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Он распечатает все сообщения для данной темы, "Тест" в этом примере.

Подробнее по этой ссылке https://kafka.apache.org/quickstart