Как вы обрабатываете дубликаты записей Amazon Kinesis?


Согласно документации Amazon Kinesis Streams , запись может быть доставлена несколько раз.

Единственный способ убедиться, что каждая запись обрабатывается только один раз, - это временно хранить их в базе данных, которая поддерживает проверку целостности (например, DynamoDB, Elasticache или MySQL/PostgreSQL) или просто проверять идентификатор записи для каждого фрагмента Kinesis.

Знаете ли вы лучший / более эффективный способ обработки дубликатов?

2 9

2 ответа:

Именно с этой проблемой мы столкнулись при создании телеметрической системы для мобильного приложения. В нашем случае мы также не были уверены, что производители отправляют каждое сообщение ровно один раз, поэтому для каждой полученной записи мы вычисляли ее MD5 на лету и проверяли, представлена ли она в какой-то форме постоянного хранилища, но действительно, какое хранилище использовать-самый сложный бит.

Во-первых, мы попробовали тривиальную реляционную базу данных, но она быстро стала главным узким местом всей системы, как это это не только тяжелый для чтения, но и тяжелый для записи случай, поскольку объем данных, идущих через Кинезис, был довольно значительным.

В итоге мы получили таблицу DynamoDB, хранящую MD5 для каждого уникального сообщения. Проблема, с которой мы столкнулись, заключалась в том, что удалить сообщения было не так просто - несмотря на то, что наша таблица содержала ключи разделов и сортировки, DynamoDB не позволяет удалять все записи с заданным ключом раздела, нам приходилось запрашивать все значения ключей сортировки (что тратило время и ресурсы). К сожалению, время от времени нам приходилось просто опускать весь стол. Другим неоптимальным решением является регулярная ротация таблиц DynamoDB, хранящих идентификаторы сообщений.

Однако недавно DynamoDB ввел очень удобную функцию - Time To Live , что означает, что теперь мы можем контролировать размер таблицы, включив автоматическое истечение срока действия на основе каждой записи. В этом смысле DynamoDB кажется очень похожим на ElastiCache, однако ElastiCache (по крайней мере Memcached cluster) является гораздо менее долговечны-там нет избыточности, и все данные, находящиеся на завершенных узлах, теряются в случае масштабирования в работе или сбоя.

То, что вы упомянули, является общей проблемой всех систем очередей с подходом" по крайней мере один раз". Кроме того, не только системы очередей, производители и потребители могут обрабатывать одно и то же сообщение несколько раз (из-за ошибок ReadTimeout и т. д.). И Кинезис, и Кафка используют эту парадигму. К сожалению, на этот вопрос нет простого ответа.

Вы также можете попробовать использовать очередь сообщений "ровно один раз" с более строгим подходом к транзакциям. Например AWS SQS делает это: https://aws.amazon.com/about-aws/whats-new/2016/11/amazon-sqs-introduces-fifo-queues-with-exactly-once-processing-and-lower-prices-for-standard-queues/ . Имейте в виду, что пропускная способность SQS намного меньше, чем Kinesis.

Чтобы решить вашу проблему, вы должны знать о вашем домене приложения и попытаться решить ее внутренне, как вы предложили (проверка базы данных). Особенно, когда вы общаетесь с внешней службой (скажем, сервером электронной почты, например), вы должны иметь возможность восстановить состояние операции для предотвращения двойной обработки (поскольку двойная отправка в Примере сервера электронной почты может привести к созданию нескольких копий одной и той же записи в почтовом ящике получателя).

См. также следующие понятия;

  1. по крайней мере один раз доставка: http://www.cloudcomputingpatterns.org/at_least_once_delivery/
  2. точно-один раз доставка: http://www.cloudcomputingpatterns.org/exactly_once_delivery/
  3. Идемпотентный Процессор: http://www.cloudcomputingpatterns.org/idempotent_processor/