Что определяет потребительское смещение Кафки?
Я относительно новичок в Кафке. Я немного поэкспериментировал с ним, но несколько вещей мне непонятны в отношении потребительского смещения. Из того, что я понял, когда потребитель запускается, смещение, с которого он начнет чтение, определяется настройкой конфигурации auto.offset.reset
(поправьте меня, если я ошибаюсь).
теперь скажем, например, в теме есть 10 сообщений (смещения от 0 до 9), и потребитель случайно потребил 5 из них до того, как он упал (или до того, как я убил потребитель.) Затем скажите, что я перезапускаю этот потребительский процесс. Мои вопросы:
если auto.offset.reset
установлено значение smallest
, Он всегда будет начинать потреблять со смещения 0 ?
если auto.offset.reset
установлено значение largest
, он собирается начать потреблять от смещения 5 ?
всегда ли поведение в отношении такого сценария детерминировано ? Пожалуйста, не стесняйтесь комментировать, если что-то в моем вопросе неясно. Спасибо заранее.
3 ответа:
это немного сложнее, чем вы описали. Элемент
auto.offset.reset
config запускается только в том случае, если ваша группа потребителей не имеет допустимого смещения, зафиксированного где-то (2 поддерживаемых хранилища смещений теперь являются Kafka и Zookeeper). И это также зависит от того, какой потребитель вы используете.если вы используете высокоуровневый потребитель java, то представьте себе следующие сценарии:
у вас есть потребитель в группу потребителей
group1
что потребил 5 сообщений и умер. Следующий запуске этого потребитель даже не использоватьauto.offset.reset
config и будет продолжаться с того места, где он умер, потому что он просто получит сохраненное смещение из хранилища смещения (Кафка или ZK, как я уже упоминал).у вас есть сообщения в теме (Как вы описали), и вы начинаете потребитель в новую группу потребителей
group2
. Там нет смещения хранится в любом месте и на этот разauto.offset.reset
config решит, следует ли начинать с начала темы (smallest
) или с конца темы (largest
)еще одна вещь, которая влияет на то, что значение смещения будет соответствовать
smallest
иlargest
configs-это политика хранения журнала. Представьте, что у вас есть тема с сохранением, настроенным на 1 час. Вы создаете 5 сообщений, а затем через час вы отправляете еще 5 сообщений. Элементlargest
смещение все равно останется таким же, как в предыдущем примере, ноsmallest
один не сможет быть0
потому что Кафка уже удалит эти сообщения и таким образом наименьшее доступное смещение будет5
.все вышесказанное не имеет отношения к
SimpleConsumer
и каждый раз, когда вы запустите его, он будет решать, с чего начать с помощьюauto.offset.reset
config.
просто обновление: от Kafka 0.9 и далее, Kafka использует новую версию Java потребителя и авто.сдвиг.имена параметров сброса изменились; из руководства:
Что делать, когда нет начального смещения в Кафке или если текущий смещение больше не существует на сервере (например, потому что данные был удален):
раннее: автоматический сброс смещения до самого раннего смещения
последний: автоматический сброс смещения до последнего смещения
нет: исключение броска для потребителя, если не найдено никакого предыдущего смещения для группы потребителей
что-нибудь еще: бросьте исключение для потребителя.
Я потратил некоторое время, чтобы найти это после проверки принято отвечать, поэтому я подумал, что это может быть полезно для сообщества, чтобы разместить его.