Как получить данные из старой точки смещения в Кафке?
Я использую zookeeper, чтобы получить данные от Кафки. И здесь я всегда получаю данные из последней точки смещения. Есть ли способ указать время смещения для получения старых данных?
Есть один вариант autooffset.сброс. Он принимает самые маленькие или самые большие. Может кто-нибудь объяснить, что такое самый маленький и самый большой? Может autooffset.сброс помогает в получении данных из старой точки смещения вместо последней точки смещения?
7 ответов:
Потребители всегда принадлежат к группе, и для каждого раздела смотритель зоопарка отслеживает прогресс этой группы потребителей в разделе.
Чтобы извлечь с самого начала, вы можете удалить все данные, связанные с прогрессом, как упоминал Хусейн
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
Вы также можете указать смещение раздела, которое вы хотите, как указано в core / src/main/scala/kafka/tools / UpdateOffsetsInZK.scala
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
Однако смещение не индексируется по времени, но вы знаете для каждого разбиение-это последовательность.
Если ваше сообщение содержит метку времени (и помните, что эта метка времени не имеет ничего общего с моментом, когда Кафка получил ваше сообщение), вы можете попробовать сделать индексатор, который пытается извлечь одну запись шагами, увеличивая смещение на N, и сохранить кортеж (тема X, часть 2, смещение 100, метка времени) где-нибудь.
Если вы хотите получить записи из указанного момента времени, вы можете применить двоичный поиск к вашему грубому индексу, пока не найдете запись, которую вы хотите, и принести оттуда.
Из документации Кафки они говорят: "Кафка.прикладной программный интерфейс.OffsetRequest.Earlieresttime() находит начало данных в журналах и начинает потоковую передачу оттуда, Кафка.прикладной программный интерфейс.OffsetRequest.LatestTime() будет передавать только новые сообщения. Не думайте, что смещение 0 является начальным смещением, так как сообщения устаревают из журнала с течением времени. "
Используйте SimpleConsumerExample здесь: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0 + SimpleConsumer+Пример
Аналогично Вопрос: Kafka High Level Consumer Fetch All Messages From Topic Using Java API (эквивалент --from-beginning)
Это может помочь
См. ДОК о конфигурации Кафки: http://kafka.apache.org/08/configuration.html для вашего запроса на наименьшее и наибольшее значения параметра offset.
Кстати, когда я изучал Кафку, мне было интересно, как воспроизвести все сообщения для потребителя. Я имею в виду, если потребительская группа опросила все сообщения и хочет их повторно получить.Это можно сделать, удалив данные из zookeeper. Используйте Кафку.utils.Класс ZkUtils для удаления узла в zookeeper. Ниже его использование:
ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
А Пока
Часто задаваемые вопросы Кафки дают ответ на эту проблему.
Как я могу точно получить смещения сообщений для определенной метки времени с помощью OffsetRequest?
Кафка позволяет запрашивать смещения сообщений по времени и делает это при сегментной детализации. Параметр timestamp является меткой времени unix, и запрос смещения по метке времени возвращает последнее возможное смещение сообщения, которое добавляется не позднее указанной метки времени. Есть 2 специальных значения метки времени-самая последняя и самая ранняя. Для любого другого значения метки времени unix Kafka получит начальное смещение сегмента журнала, созданного не позднее указанной метки времени. Из-за этого, а также из-за того, что запрос смещения подается только при детализации сегмента, запрос извлечения смещения возвращает менее точные результаты для больших размеров сегмента.
Для получения более точных результатов можно настроить размер сегмента журнала в зависимости от времени (log.roll.ms) вместо размера (бревно.сегмент.байты). Однако следует соблюдать осторожность, так как это может привести к увеличению числа обработчиков файлов из-за частого перемещения сегментов журнала.
План На Будущее
Кафка добавит метку времени в формат сообщения. См.
Https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
Kafka Protocol Doc-отличный источник для воспроизведения запросов / ответов / смещений / сообщений: https://cwiki.apache.org/confluence/display/KAFKA/A + руководство+к + протокол + Кафка вы используете простой потребительский пример, где следующий код демонстрирует состояние:
FetchRequest req = new FetchRequestBuilder() .clientId(clientName) .addFetch(a_topic, a_partition, readOffset, 100000) .build(); FetchResponse fetchResponse = simpleConsumer.fetch(req);
Установите значение readOffset для начала начального смещения. но вам нужно проверить максимальное смещение, а также выше будет предоставлено ограниченное количество смещений в соответствии с FetchSize в последнем параме метода addFetch.
Используя KafkaConsumer, вы можете использовать Seek, SeekToBeginning и SeekToEnd для перемещения в потоке.
Кроме того, если раздел не указан, он будет стремиться к первому смещению для всех назначенных в данный момент разделов.
Вы пробовали это?
Bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Он распечатает все сообщения для данной темы, "Тест" в этом примере.
Подробнее по этой ссылке https://kafka.apache.org/quickstart