Что определяет потребительское смещение Кафки?


Я относительно новичок в Кафке. Я немного поэкспериментировал с ним, но несколько вещей мне непонятны в отношении потребительского смещения. Из того, что я понял, когда потребитель запускается, смещение, с которого он начнет чтение, определяется настройкой конфигурации auto.offset.reset (поправьте меня, если я ошибаюсь).

теперь скажем, например, в теме есть 10 сообщений (смещения от 0 до 9), и потребитель случайно потребил 5 из них до того, как он упал (или до того, как я убил потребитель.) Затем скажите, что я перезапускаю этот потребительский процесс. Мои вопросы:

если auto.offset.reset установлено значение smallest, Он всегда будет начинать потреблять со смещения 0 ?

если auto.offset.reset установлено значение largest, он собирается начать потреблять от смещения 5 ?

всегда ли поведение в отношении такого сценария детерминировано ? Пожалуйста, не стесняйтесь комментировать, если что-то в моем вопросе неясно. Спасибо заранее.

3 101

3 ответа:

это немного сложнее, чем вы описали. Элемент auto.offset.reset config запускается только в том случае, если ваша группа потребителей не имеет допустимого смещения, зафиксированного где-то (2 поддерживаемых хранилища смещений теперь являются Kafka и Zookeeper). И это также зависит от того, какой потребитель вы используете.

если вы используете высокоуровневый потребитель java, то представьте себе следующие сценарии:

  1. у вас есть потребитель в группу потребителей group1 что потребил 5 сообщений и умер. Следующий запуске этого потребитель даже не использовать auto.offset.reset config и будет продолжаться с того места, где он умер, потому что он просто получит сохраненное смещение из хранилища смещения (Кафка или ZK, как я уже упоминал).

  2. у вас есть сообщения в теме (Как вы описали), и вы начинаете потребитель в новую группу потребителей group2. Там нет смещения хранится в любом месте и на этот раз auto.offset.reset config решит, следует ли начинать с начала темы (smallest) или с конца темы (largest)

еще одна вещь, которая влияет на то, что значение смещения будет соответствовать smallest и largest configs-это политика хранения журнала. Представьте, что у вас есть тема с сохранением, настроенным на 1 час. Вы создаете 5 сообщений, а затем через час вы отправляете еще 5 сообщений. Элемент largest смещение все равно останется таким же, как в предыдущем примере, но smallest один не сможет быть 0 потому что Кафка уже удалит эти сообщения и таким образом наименьшее доступное смещение будет 5.

все вышесказанное не имеет отношения к SimpleConsumer и каждый раз, когда вы запустите его, он будет решать, с чего начать с помощью auto.offset.reset config.

просто обновление: от Kafka 0.9 и далее, Kafka использует новую версию Java потребителя и авто.сдвиг.имена параметров сброса изменились; из руководства:

Что делать, когда нет начального смещения в Кафке или если текущий смещение больше не существует на сервере (например, потому что данные был удален):

раннее: автоматический сброс смещения до самого раннего смещения

последний: автоматический сброс смещения до последнего смещения

нет: исключение броска для потребителя, если не найдено никакого предыдущего смещения для группы потребителей

что-нибудь еще: бросьте исключение для потребителя.

Я потратил некоторое время, чтобы найти это после проверки принято отвечать, поэтому я подумал, что это может быть полезно для сообщества, чтобы разместить его.

далее еще есть смещения.удержание.протокол. Если время с момента последнего коммита > offsets.retention.minutes, чем auto.offset.reset также пинки в