Источник событий / CQRS read model-projections


У меня микрослужб-приложение работает на платформе AWS лямбда. Два из микросервисов, наиболее важных, используют event-sourcing/CQR.

Предыстория: (это также для меня, чтобы организовать свои мысли)

Я использую эту библиотеку и храню события в DynamoDB и проекции в AWS S3.

Часть записи работает как заклинание: каждый вызов команды загружает текущее состояние агрегата из DynamoDB (путем запуска событий через обработчик и/или загрузка кэшированного агрегата), он принимает решение принять или отклонить команду на основе некоторой бизнес-логики, а затем записывает в DynamoDB с KeyConditionExpression: 'aggregateId = :a AND version >= :v', Где версия-это количество событий, обработанных для этого агрегата. Если возникает конфликт, запись завершается неудачей. Мне кажется, что это хорошая система!

Каждое событие затем передается в SNS (имя темы - это имя службы), чтобы другие службы могли реагировать на событие, если они хотят.

Часть, с которой я действительно борюсь, - это чтение. Проекции сохраняются в S3 и помечается последним commitId, обработанным для каждого источника событий. Когда приходит запрос чтения, он загружает все проецируемое состояние из S3 (для всех агрегатов) , запрашивает источники событий для всех новых событий, вычисляет последнее состояние (опять же, для всех агрегатов - и записывает обновленный объект в S3, если он более новый) и возвращает соответствующие части состояния на основе параметров запроса.

Моя проблема: (или одна из них)

Я думаю, что делаю проекции неправильно.

Большинство моих проекций группируют идентификаторы только по важным атрибутам, поэтому файлы остаются относительно небольшими. Но мне также нужен способ получить индивидуальную совокупность. Использование проекций для этого кажется сумасшедшим, потому что мне нужно загружать все состояние каждый раз (т. е. каждый прогнозируемый агрегат), применять к нему новые события, а затем извлекать нужную запись (возможно, она даже не изменилась).

Это то, что я делаю сейчас, он отлично работает (

Другая проблема-это запросы. Мне нужно построить проекционное отображение значения для сопоставления aggregateIds для каждого атрибута, который мне нужно запросить!! Должен же быть способ получше!

Независимо от того, как я думаю об этой проблеме, проекции всегда нужно все текущее состояние + любые новые события, прежде чем она сможет вернуть хотя бы одну запись, которая не изменилась.
2 4

2 ответа:

Я думаю, что делаю проекции неправильно.

Я тоже так думаю; похоже, что ваши запросы связаны с вашими проекциями

Когда приходит запрос на чтение, он загружает все проецируемое состояние из S3 (для всех агрегатов), запрашивает источники событий для всех новых событий, вычисляет последнее состояние

Да, это звучит как беспорядок. Или, более конкретно, это звучит так, как будто запрос запускает работу, которую должен выполнить пользователь. проекция.

Если вы можете отделить запросы от проекций, то все становится проще. Основная идея заключается в том, что ваши запросы не описывают текущее состояние , они описывают состояние на момент последнего запуска проекции .

Та же идея, другое написание: вы отвечаете на запросы из документов, которые вы кэшируете в S3. При обнаружении новых событий ваши проекции запускаются, загружаются новые данные по мере необходимости, вычисляется новый документ и заменяются записи. в кэше.

Я думаю о треугольнике

  • команды приносят информацию извне в книгу записей
  • Проекции приносят информацию из книги записей в кэш
  • запросы приносят информацию из кэша во внешний мир

Где каждая ветвь треугольника работает асинхронно с другими.

Я предлагаю вам работать в обратном направлении от запросов - какие документы вам нужны для поддержки каждого запроса? что такое латентность целей, которые вы должны бить? Затем вы начинаете балансировать компромиссы - для этого нового запроса я создаю результат из существующих документов или мне нужен новый документ, построенный с более тонким зерном?

Если я правильно понимаю, я должен запускать обновления проекции по мере поступления событий, а не в совокупности, когда выполняется запрос. Это избавляет меня от необходимости запрашивать в хранилище событий новые события по каждому запросу

Да, И... события - это только один из способов запуск; вы также можете запускать процессы проецирования по часам (проверяйте каждые 15 минут, чтобы увидеть, нужно ли нам обновлять) или по прихоти человека-оператора (Хм, похоже, что Ваш баланс счета устарел, позвольте мне попытаться обновить это для вас). Есть несколько способов сделать это, и вы можете смешивать и сочетать стратегии.

Мне все равно нужно будет загрузить все состояние, как при обновлении проекции, так и при загрузке одного агрегата.

Не обязательно. Есть нет правила, которое говорит, что вы не можете использовать ранее кэшированное представление в качестве отправной точки, а затем извлекать из книги записей только те изменения, которые вам нужны.

Например, предположим, что вы создаете представление, объединяющее агрегаты A{id:7} и B{id:9}. Вы хватаете кэшированную копию и смотрите в ее мета-данные (куда вы поместили ее на предыдущей записи) и находите внутри нее что-то вроде metadata:{A:{id:7, version:21}, B:{id:9, version:19}}. Теперь вам нужно только загрузить события после тех, которые вы использовали в прошлый раз, обновить локальную копию в память, обновите локальную копию метаданных и переместите лот в кэш.

Я не знаком с вашей технической инфраструктурой, но способ реализации проекций выглядит следующим образом:

Каждое событие домена имеетглобальный порядковый номер, который охватывает все агрегатные корни. Проекция-это модель чтения, имеющая произвольное имя и последнюю обработанную позицию, представленную этим глобальным порядковым номером. Я могу добавить новую проекцию в любое время вместе с ее обработчиками событий, и она будет начинаться с позиции 0. Я могу очистить проекцию в любое время и вернуться назад позиция до 0. Я также могу использовать комбинацию добавления новой проекции, которая заменит существующую, построить ее, даже если это займет несколько дней, а затем удалить старую.

Существует служба, которая отслеживает проекции и использует хранилище событий почти как очередь. Служба проекции проверяет наличие событий с глобальными идентификаторами после текущей позиции и передает их обработчикам, а затем обновляет позицию. Именно здесь ваша проекция может даже фильтровать события типы для повышения производительности.

Это основная идея. Ваши проекции-это то, что вы спрашиваете. Как только проекция догонит "голову" хранилища событий, события из хранилища событий будут стекать в проекцию.

Как это отразится на вашем техническом пространстве, я не совсем уверен. У меня есть небольшой эксперимент под названием челнок.Вспомните переход на C#, если вы хотите взглянуть, чтобы получить некоторые идеи.